Apache Kafka与Spark Streaming集成

1. 引言

Apache KafkaSpark Streaming集成是构建实时应用程序的最佳组合,所以在本文中,我们将详细了解KafkaSpark Streaming集成的整个概念。此外,我们将学习Spark Streaming-Kafka示例。然后,我们将基于Receiver的方法和直接方法来讨论KafkaSpark Streaming的集成。此外,我们将了解在KafkaSpark Streaming集成中直接方法与基于Receiver的方法相比有什么优势。

2. Kafka与Spark Streaming的集成

Apache KafkaSpark Streaming的集成中,有两种方法可以配置Spark StreamingKafka接收数据。第一种是使用ReceiverKafka的高级API,第二种是不使用Receiver。这两种方法都有不同的编程模型,例如性能特征和语义保证。

pic

a. 基于Receiver的方法

在这种方法里,我们使用Receiver接收数据,Receiver通过使用Kafka高级消费者API实现。接收的数据存储在Sparkexecutor中,然后由Kafka – Spark Streaming启动的作业处理数据。

这种方法可能会在默认配置下遇到故障时丢失数据,因此,我们必须在Kafka – Spark Streaming中另外启用预写日志,以确保零数据丢失,方法是将所有收到的Kafka数据同步保存到分布式文件系统上的预写日志中。通过这种方式,可以在故障时恢复所有数据。

下面,我们将讨论如何在Kafka – Spark Streaming应用程序中使用这种基于Receiver的方法。

i. 引用

对于使用SBT/Maven项目定义的Scala/Java应用程序,在Streaming应用程序中引用以下库:

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

但是,在部署Python应用程序时,我们必须添加上面的库及其依赖库。

ii. 编写程序

然后,通过在Streaming应用程序代码中导入KafkaUtils来创建输入用的DStream

import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
    [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

此外,使用createStream的变体,我们可以指定键和值的类及其相应的解码器类。

iii. 部署

与任何Spark应用程序一样,spark-submit用于启动你的应用程序,但是,在细节上,Scala/Java应用程序和Python应用程序略有不同。

对于缺乏SBT/Maven项目管理的Python应用程序,可以使用–packages spark-streaming-Kafka-0-8_2.11及其依赖直接添加到spark-submit

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...

我们还可以从Maven存储库下载Maven工具spark-streaming-Kafka-0-8-assemblyJAR,然后使用-jars参数将其添加到spark-submit

b. 直接方法(Direct Approach

在基于Receiver的方法之后,引入了新的无Receiver的“直接(direct)”方法。直接方法提供了更强大的端到端保证。该方法定期向Kafka查询每个主题+分区中的最新偏移量,而不是使用Receiver来接收数据,而且,它定义了要在每个批次中处理的偏移量范围。另外,它使用简单的消费者APIKafka中读取已定义的偏移量范围,尤其是在启动处理数据的作业时。简单来说,它类似于从文件系统读取文件。

请注意:此功能是在Spark 1.3中为ScalaJava API引入的,在Spark 1.4中为Python API引入。

现在,我们讨论如何在Streaming应用程序中使用此方法。

i. 引用

只有Scala/Java应用程序支持此方法。请在SBT/Maven项目添加以下引用:

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

ii. 编写程序

导入KafkaUtils并在Streaming应用程序代码中创建输入 DStream

import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
    [key class], [value class], [key decoder class], [value decoder class] ](
    streamingContext, [map of Kafka parameters], [set of topics to consume])

我们必须在Kafka参数中指定metadata.broker.listbootstrap.servers。默认情况下,它将从每个Kafka分区的最新偏移量开始读取数据。但是,如果将Kafka参数中的配置auto.offset.reset设置为最小,它将从最小偏移量开始读取。

使用KafkaUtils.createDirectStream的其他变体,我们可以从任意偏移量开始读取。执行以下操作以访问每个批次中被读取的Kafka偏移量。

// Hold a reference to the current offset ranges, so downstream can use it
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd
}.map {
          ...
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
  ...
}

iii. 部署

部署过程类似于基于Receiver的方法的部署过程。

3. 直接方法的优势

KafkaSpark Streaming的集成中, 第二种方法有以下优势:

a. 简化的并行性

不需要创建多个输入Kafka流并将它们联合起来,Kafka – Spark Streaming将使用直接流(direct stream)创建与Kafka分区一样多的RDD分区,这样就能并行从Kafka读取数据。因此,我们可以说,直接方法是KafkaRDD分区之间的一对一映射,更容易理解和调整。

b. 效率

在第一种方法(基于Receiver的方法)中实现零数据丢失需要将数据存储在预写日志中,这需要再一次复制数据。这种方法实际上是低效的,因为数据被复制两次一次由Kafka复制,第二次由预写日志复制。第二种方法(直接方法)消除了这个问题,因为没有Receiver,因此不需要预写日志。 只要我们有足够的Kafka预留空间,就可以从Kafka恢复消息。

c. 精确一次的语义

我们通过Kafka的高级API 用第一种方法中在Zookeeper中存储已消耗的偏移量,这是读取Kakfa数据的一种传统方式。即使它可以确保零数据丢失,但在某些故障情况下,某些记录可能会被消耗两次,这是因为Kakfa – Spark Streaming接收到的数据和Zookeeper记录的偏移量不一致。因此,在第二种方法中,我们不使用Zookeeper而是直接调用简单的Kafka APIKafka – Spark Streaming通过其检查点跟踪偏移量,这消除了Spark StreamingZookeeper/之间的不一致问题。

因此,即使出现故障,Spark Streaming对每条记录也只会有效地接收一次。因此,请确保将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务,这有助于为我们的结果输出实现精确一次的语义。

当然,这个方法也有一个缺点,即它不会更新Zookeeper中的偏移量,因此基于ZookeeperKafka监视工具不会显示进度。但是,我们仍然可以在每个批次中访问由此方法处理的偏移量,并自行更新Zookeeper

Apache Kafka与Spark Streaming集成

发表评论

电子邮件地址不会被公开。 必填项已用*标注

三 + 七 =