Spark Streaming 使用Direct模式读取Kafka数据源流程,主要分为两部分,一部分是driver端,给每个批次分配offsetRanges(fromOffset — untilOffset),一部分是executor,按driver端给的分配计划从Kafka服务端拉取数据。
driver端入口为 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream
的start方法,该方法在流任务启动时被调用,用于获取起始offset值
override def start(): Unit = {
val c = consumer
paranoidPoll(c)
if (currentOffsets.isEmpty) {
currentOffsets = c.assignment().asScala.map { tp =>
tp -> c.position(tp)
}.toMap
}
// don't actually want to consume any messages, so pause all partitions
c.pause(currentOffsets.keySet.asJava)
}
复制代码
先获取到consumer对象:
def consumer(): Consumer[K, V] = this.synchronized {
if (null == kc) {
kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava)
}
kc
}
复制代码
判断属性KafkaConsumer是否已经存在,如果不存在,则先创建,这里调用了ConsumerStrategy(消费策略类)的onStart方法。
有三种实现类
其中Subscribe是指定具体topic名称进行订阅方式,SubscribePattern是支持topic名称模式(比如你可以指定topic1-*)进行订阅方式,Assign手动指定topic分区消费方式。
以常用的Subscribe方式为例看下它的onStart方法实现:
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(topics)
val toSeek = if (currentOffsets.isEmpty) {
offsets
} else {
currentOffsets
}
if (!toSeek.isEmpty) {
// work around KAFKA-3370 when reset is none
// poll will throw if no position, i.e. auto offset reset none and no explicit position
// but cant seek to a position before poll, because poll is what gets subscription partitions
// So, poll, suppress the first exception, then seek
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
val shouldSuppress =
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
try {
consumer.poll(0)
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
logWarning("Catching NoOffsetForPartitionException since " +
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
}
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
// we've called poll, we must pause or next poll may consume messages and set position
consumer.pause(consumer.assignment())
}
consumer
}
}
复制代码
新建一个KafkaConsumer对象,然后调用subscribe方法订阅指定的topics列表,如果传入的offsets值不为空,则继续调用poll(0)方法尝试去连接一下Kafka集群进行自动分区分配,也就是获取到所订阅的topics列表的各个分区,然后调用seek方法将消费组offset值指定到所传入的offsets,最后调用pause方法暂停获取数据(也就是下次调用poll不再获取数据)。然后把KafkaConsumer对象返回。
再调用paranoidPoll方法,用于将各个分区的offset位置重置:
/**
* The concern here is that poll might consume messages despite being paused,
* which would throw off consumer position. Fix position if this happens.
*/
private def paranoidPoll(c: Consumer[K, V]): Unit = {
val msgs: ConsumerRecords[K, V] = c.poll(0)
if (!msgs.isEmpty) {
// position should be minimum offset per topicpartition
msgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
val tp = new TopicPartition(m.topic, m.partition)
val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)
acc + (tp -> off)
}.foreach { case (tp, off) =>
logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate")
c.seek(tp, off)
}
}
}
复制代码
最后调用KafkaConsumer的pause方法,再各个分区上暂停消费数据(即下次调用poll不再返回消息)。
流程如下:
driver端开始调度执行批次任务时,每个批次开始会调用DirectKafkaInputDStream的compute方法生成KafkaRDD。现在来分析该方法:
override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
val untilOffsets = clamp(latestOffsets())
val offsetRanges = untilOffsets.map { case (tp, uo) =>
val fo: Long = currentOffsets(tp)
if (fo > uo) {
logWarning(s"Beginning offset ${fo} is after the ending offset ${uo} " +
s"for topic ${tp.topic()} partition ${tp.partition()}. " +
"You either provided an invalid fromOffset, or the Kafka topic has been damaged")
OffsetRange(tp.topic, tp.partition, fo, fo)
} else {
OffsetRange(tp.topic, tp.partition, fo, uo)
}
}
val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
true)
logInfo(s"create KafkaRDD for this batch ${validTime}")
val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
getPreferredHosts, useConsumerCache)
// Report the record number and metadata of this batch interval to InputInfoTracker.
val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets
commitAll()
Some(rdd)
}
复制代码
整个流程如下:
这里面,生成KafkaRDD时,会传入offsetRanges(即获取到计划处理的topic各个分区offset范围),然后根据这个参数生成RDD的各个分区,即与订阅的topic的分区一一对应。
至此,driver逻辑分析完成,接着我们来看executor端逻辑。
看下KafkaRDD的compute方法,其实就是新建了一个迭代器对象,如果是读取非连续offset的topic则使用CompactedKafkaRDDIterator,否则使用KafkaRDDIterator。
这里用KafkaRDDIterator分析一下,主要看这个迭代器的next方法:
override def next(): ConsumerRecord[K, V] = {
if (!hasNext) {
throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached")
}
val r = consumer.get(requestOffset, pollTimeout)
requestOffset += 1
r
}
复制代码
/**
* Get the record for the given offset, waiting up to timeout ms if IO is necessary.
* Sequential forward access will use buffers, but random access will be horribly inefficient.
*/
def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
if (offset != nextOffset) {
logInfo(s"Initial fetch for $groupId $topic $partition $offset")
seek(offset)
poll(timeout)
}
if (!buffer.hasNext()) {
poll(timeout)
}
require(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()
if (record.offset != offset) {
logInfo(s"Buffer miss for $groupId $topic $partition $offset")
seek(offset)
poll(timeout)
require(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
record = buffer.next()
require(record.offset == offset,
s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " +
"spark.streaming.kafka.allowNonConsecutiveOffsets"
)
}
nextOffset = offset + 1
record
}
复制代码
可以看到,拉取数据的逻辑是,从指定offset开始,拉取一批到缓存buffer(一个内存集合迭代器)中,然后每次next从这个buffer中取一条,buffer没有了,就再去拉取一批。
至此,整个拉取数据流程结束。