本文共 8440 字,大约阅读时间需要 28 分钟。
[酷玩 Spark] Structured Streaming 源码解析系列 ,返回目录请
技术团队荣誉出品
本文内容适用范围:* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0)* 2017.10.02 update, Spark 2.1 全系列 √ (已发布:2.1.0, 2.1.1, 2.1.2)
阅读本文前,请一定先阅读 一文,其中概述了 Structured Streaming 的实现思路(包括 StreamExecution, Source, Sink 等在 Structured Streaming 里的作用),有了全局概念后再看本文的细节解释。
Structured Streaming 非常显式地提出了输入(Source)、执行(StreamExecution)、输出(Sink)的 3 个组件,并且在每个组件显式地做到 fault-tolerant,由此得到整个 streaming 程序的 end-to-end exactly-once guarantees.
具体到源码上,Sink 是一个抽象的接口 [1],只有一个方法:
trait Sink { def addBatch(batchId: Long, data: DataFrame): Unit}
这个仅有的 addBatch()
方法支持了 Structured Streaming 实现 end-to-end exactly-once 处理所一定需要的功能。我们将马上解析这个 addBatch()
方法。
相比而言,前作 Spark Streaming 并没有对输出进行特别的抽象,而只是在 DStreamGraph [2] 里将一些 dstreams 标记为了 output。当需要 exactly-once 特性时,程序员可以根据当前批次的时间标识,来 自行维护和判断 一个批次是否已经执行过。
进化到 Structured Streaming 后,显式地抽象出了 Sink,并提供了一些原生幂等的 Sink 实现:
在 Structured Streaming 里,由 StreamExecution 作为持续查询的驱动器,分批次不断地:
(start, end]
区间范围内的数据Sink.addBatch(batchId: Long, data: DataFrame)
data: DataFrame
的数据写出,并在完成后记录下 batchId: Long
通常我们使用如下方法方法写出到 HDFS-API compatible FS:
writeStream .format("parquet") // parquet, csv, json, text, orc ... .option("checkpointLocation", "path/to/checkpoint/dir") .option("path", "path/to/destination/dir")
那么我们看这里 FileStreamSink
具体的 addBatch()
实现是:
// 来自:class FileStreamSink extends Sink // 版本:Spark 2.1.0 override def addBatch(batchId: Long, data: DataFrame): Unit = { /* 首先根据持久化的 fileLog 来判断这个 batchId 是否已经写出过 */ if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { /* 如果 batchId 已经完整写出过,则本次跳过 addBatch */ logInfo(s"Skipping already committed batch $batchId") } else { /* 本次需要具体执行写出 data */ /* 初始化 FileCommitter -- FileCommitter 能正确处理 task 推测执行、task 失败重做等情况 */ val committer = FileCommitProtocol.instantiate( className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, jobId = batchId.toString, outputPath = path, isAppend = false) committer match { case manifestCommitter: ManifestFileCommitProtocol => manifestCommitter.setupManifestOptions(fileLog, batchId) case _ => // Do nothing } /* 获取需要做 partition 的 columns */ val partitionColumns: Seq[Attribute] = partitionColumnNames.map { col => val nameEquality = data.sparkSession.sessionState.conf.resolver data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse { throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}") } } /* 真正写出数据 */ FileFormatWriter.write( sparkSession = sparkSession, queryExecution = data.queryExecution, fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec(path, Map.empty), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = None, refreshFunction = _ => (), options = options) } }
通常我们使用如下方法写出到 foreach sink:
writeStream /* 假设进来的每条数据是 String 类型的 */ .foreach(new ForeachWriter[String] { /* 每个 partition 即每个 task 会在开始时调用此 open() 方法 */ /* 注意对于同一个 partitionId/version,此方法可能被先后调用多次,如 task 失效重做时 */ /* 注意对于同一个 partitionId/version,此方法也可能被同时调用,如推测执行时 */ override def open(partitionId: Long, version: Long): Boolean = { println(s"open($partitionId, $version)") true } /* 此 partition 内即每个 task 内的每条数据,此方法都被调用 */ override def process(value: String): Unit = println(s"process $value") /* 正常结束或异常结束时,此方法被调用。但一些异常情况时,此方法不一定被调用。 */ override def close(errorOrNull: Throwable): Unit = println(s"close($errorOrNull)") })
那么我们看这里 ForeachSink
具体的 addBatch()
实现是:
// 来自:class ForeachSink extends Sink with Serializable // 版本:Spark 2.1.0 override def addBatch(batchId: Long, data: DataFrame): Unit = { val encoder = encoderFor[T].resolveAndBind( data.logicalPlan.output, data.sparkSession.sessionState.analyzer) /* 是 rdd 的 foreachPartition,即是 task 级别 */ data.queryExecution.toRdd.foreachPartition { iter => /* partition/task 级别的 open */ if (writer.open(TaskContext.getPartitionId(), batchId)) { try { while (iter.hasNext) { /* 对每条数据调用 process() 方法 */ writer.process(encoder.fromRow(iter.next())) } } catch { case e: Throwable => /* 异常时调用 close() 方法 */ writer.close(e) throw e } /* 正常写完调用 close() 方法 */ writer.close(null) } else { /* 不写数据、直接调用 close() 方法 */ writer.close(null) } } }
所以我们看到,foreach sink 需要使用者提供 writer,所以这里的可定制度就非常高。
但是仍然需要注意,由于 foreach 的 writer 可能被 open() 多次,可能有多个 task 同时调用一个 writer。所以推荐 writer 一定要写成幂等的,如果 writer 不幂等、那么 Structured Streaming 框架本身也没有更多的办法能够保证 end-to-end exactly-once guarantees 了。
Spark 2.1.1 版本开始加入了 KafkaSink,使得 Spark 也能够将数据写入到 kafka 中。
通常我们使用如下方法写出到 kafka sink:
writeStream .format("kafka") .option("checkpointLocation", ...) .outputMode(...) .option("kafka.bootstrap.servers", ...) // 写出到哪个集群 .option("topic", ...) // 写出到哪个 topic
那么我们看这里 KafkaSink
具体的 addBatch()
实现是:
// 来自:class KafkaSink extends Sink // 版本:Spark 2.1.1, 2.2.0 override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= latestBatchId) { logInfo(s"Skipping already committed batch $batchId") } else { // 主要是通过 KafkaWriter.write() 来做写出; // 在 KafkaWriter.write() 里,主要是继续通过 KafkaWriteTask.execute() 来做写出 KafkaWriter.write(sqlContext.sparkSession, data.queryExecution, executorKafkaParams, topic) latestBatchId = batchId } }
那么我们继续看这里 KafkaWriteTask
具体的 execute()
实现是:
// 来自:class KafkaWriteTask // 版本:Spark 2.1.1, 2.2.0 def execute(iterator: Iterator[InternalRow]): Unit = { producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration) while (iterator.hasNext && failedWrite == null) { val currentRow = iterator.next() // 这里的 projection 主要是构建 projectedRow,使得: // 其第 0 号元素是 topic // 其第 1 号元素是 key 的 binary 表示 // 其第 2 号元素是 value 的 binary 表示 val projectedRow = projection(currentRow) val topic = projectedRow.getUTF8String(0) val key = projectedRow.getBinary(1) val value = projectedRow.getBinary(2) if (topic == null) { throw new NullPointerException(s"null topic present in the data. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) val callback = new Callback() { override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = { if (failedWrite == null && e != null) { failedWrite = e } } } producer.send(record, callback) } }
这里我们需要说明的是,由于 Spark 本身会失败重做 —— 包括单个 task 的失败重做、stage 的失败重做、整个拓扑的失败重做等 —— 那么同一条数据可能被写入到 kafka 一次以上。由于 kafka 目前还不支持 transactional write,所以多写入的数据不能被撤销,会造成一些重复。当然 kafka 自身的高可用写入(比如写入 broker 了的数据的 ack 消息没有成功送达 producer,导致 producer 重新发送数据时),也有可能造成重复。
在 kafka 支持 transactional write 之前,可能需要下游实现下去重机制。比如如果下游仍然是 Structured Streaming,那么可以使用 streaming deduplication 来获得去重后的结果。
我们总结一下截至目前,Sink 已有的具体实现:
Sinks | 是否幂等写入 | 原生内置支持 | 注解 |
---|---|---|---|
HDFS-compatible file system | 已支持 | 包括但不限于 text, json, csv, parquet, orc, ... | |
ForeachSink (自定操作幂等) | 已支持 | 可定制度非常高的 sink | |
Kafka | 已支持 | Kafka 目前不支持幂等写入,所以可能会有重复写入 (但推荐接着 Kafka 使用 streaming de-duplication 来去重) | |
ForeachSink (自定操作不幂等) | 已支持 | 不推荐使用不幂等的自定操作 |
这里我们特别强调一下,虽然 Structured Streaming 也内置了 console
这个 Source,但其实它的主要用途只是在技术会议/讲座上做 demo,不应用于线上生产系统。
转载地址:http://afvmi.baihongyu.com/