1. 引言
Apache Kafka与Spark Streaming集成是构建实时应用程序的最佳组合,所以在本文中,我们将详细了解Kafka中Spark Streaming集成的整个概念。此外,我们将学习Spark Streaming-Kafka示例。然后,我们将基于Receiver的方法和直接方法来讨论Kafka与Spark Streaming的集成。此外,我们将了解在Kafka和Spark Streaming集成中直接方法与基于Receiver的方法相比有什么优势。
2. Kafka与Spark Streaming的集成
在Apache Kafka与Spark Streaming的集成中,有两种方法可以配置Spark Streaming从Kafka接收数据。第一种是使用Receiver和Kafka的高级API,第二种是不使用Receiver。这两种方法都有不同的编程模型,例如性能特征和语义保证。
pic
a. 基于Receiver的方法
在这种方法里,我们使用Receiver接收数据,Receiver通过使用Kafka高级消费者API实现。接收的数据存储在Spark的executor中,然后由Kafka – Spark Streaming启动的作业处理数据。
这种方法可能会在默认配置下遇到故障时丢失数据,因此,我们必须在Kafka – Spark Streaming中另外启用预写日志,以确保零数据丢失,方法是将所有收到的Kafka数据同步保存到分布式文件系统上的预写日志中。通过这种方式,可以在故障时恢复所有数据。
下面,我们将讨论如何在Kafka – Spark Streaming应用程序中使用这种基于Receiver的方法。
i. 引用
对于使用SBT/Maven项目定义的Scala/Java应用程序,在Streaming应用程序中引用以下库:
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.2.0
但是,在部署Python应用程序时,我们必须添加上面的库及其依赖库。
ii. 编写程序
然后,通过在Streaming应用程序代码中导入KafkaUtils来创建输入用的DStream:
import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
此外,使用createStream的变体,我们可以指定键和值的类及其相应的解码器类。
iii. 部署
与任何Spark应用程序一样,spark-submit用于启动你的应用程序,但是,在细节上,Scala/Java应用程序和Python应用程序略有不同。
对于缺乏SBT/Maven项目管理的Python应用程序,可以使用–packages 将spark-streaming-Kafka-0-8_2.11及其依赖直接添加到spark-submit。
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...
我们还可以从Maven存储库下载Maven工具spark-streaming-Kafka-0-8-assembly的JAR,然后使用-jars参数将其添加到spark-submit。
b. 直接方法(Direct Approach)
在基于Receiver的方法之后,引入了新的无Receiver的“直接(direct)”方法。直接方法提供了更强大的端到端保证。该方法定期向Kafka查询每个主题+分区中的最新偏移量,而不是使用Receiver来接收数据,而且,它定义了要在每个批次中处理的偏移量范围。另外,它使用简单的消费者API从Kafka中读取已定义的偏移量范围,尤其是在启动处理数据的作业时。简单来说,它类似于从文件系统读取文件。
请注意:此功能是在Spark 1.3中为Scala和Java API引入的,在Spark 1.4中为Python API引入。
现在,我们讨论如何在Streaming应用程序中使用此方法。
i. 引用
只有Scala/Java应用程序支持此方法。请在SBT/Maven项目添加以下引用:
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.2.0
ii. 编写程序
导入KafkaUtils并在Streaming应用程序代码中创建输入 的DStream:
import org.apache.spark.streaming.kafka._ val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume])
我们必须在Kafka参数中指定metadata.broker.list或bootstrap.servers。默认情况下,它将从每个Kafka分区的最新偏移量开始读取数据。但是,如果将Kafka参数中的配置auto.offset.reset设置为最小,它将从最小偏移量开始读取。
使用KafkaUtils.createDirectStream的其他变体,我们可以从任意偏移量开始读取。执行以下操作以访问每个批次中被读取的Kafka偏移量。
// Hold a reference to the current offset ranges, so downstream can use it var offsetRanges = Array.empty[OffsetRange] directKafkaStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.map { ... }.foreachRDD { rdd => for (o <- offsetRanges) { println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } ... }
iii. 部署
部署过程类似于基于Receiver的方法的部署过程。
3. 直接方法的优势
在Kafka与Spark Streaming的集成中, 第二种方法有以下优势:
a. 简化的并行性
不需要创建多个输入Kafka流并将它们联合起来,Kafka – Spark Streaming将使用直接流(direct stream)创建与Kafka分区一样多的RDD分区,这样就能并行从Kafka读取数据。因此,我们可以说,直接方法是Kafka和RDD分区之间的一对一映射,更容易理解和调整。
b. 效率
在第一种方法(基于Receiver的方法)中实现零数据丢失需要将数据存储在预写日志中,这需要再一次复制数据。这种方法实际上是低效的,因为数据被复制两次 – 一次由Kafka复制,第二次由预写日志复制。第二种方法(直接方法)消除了这个问题,因为没有Receiver,因此不需要预写日志。 只要我们有足够的Kafka预留空间,就可以从Kafka恢复消息。
c. 精确一次的语义
我们通过Kafka的高级API 用第一种方法中在Zookeeper中存储已消耗的偏移量,这是读取Kakfa数据的一种传统方式。即使它可以确保零数据丢失,但在某些故障情况下,某些记录可能会被消耗两次,这是因为Kakfa – Spark Streaming接收到的数据和Zookeeper记录的偏移量不一致。因此,在第二种方法中,我们不使用Zookeeper而是直接调用简单的Kafka API。Kafka – Spark Streaming通过其检查点跟踪偏移量,这消除了Spark Streaming和Zookeeper/之间的不一致问题。
因此,即使出现故障,Spark Streaming对每条记录也只会有效地接收一次。因此,请确保将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务,这有助于为我们的结果输出实现精确一次的语义。
当然,这个方法也有一个缺点,即它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监视工具不会显示进度。但是,我们仍然可以在每个批次中访问由此方法处理的偏移量,并自行更新Zookeeper。