× 快速导航
全部课程目录_QQ
分类
Apache Kafka与Storm集成
发布日期:2022-11-12 00:26:41

内容


1. 什么是Storm?

2. Storm和Kafka集成

1. 什么是Storm?

Apache Storm是一个开源,分布式,可靠且容错的系统。Storm有各种用例,如实时分析,在线机器学习,连续计算和提取转换负载(ETL)范例。对于流数据处理,Storm中有几个组件需要协同工作,例如:


Spout – spout从流中持续的读取数据,把每条数据转换成Tuple然后发送给bolt。


Bolt – spout将数据(Tuple)传递给我们称之为bolt的组件。Bolt读取输入的Tuple,进行一些处理,并可能发出新的Tuple给其他的bolt。


可以有多个bolt组成一个链,每个链对输入的数据(Tuple)做一些处理,然后把处理后的数据发送到下一个bolt。多个节点组成一个Storm集群,一个Storm拓扑(Topology)被分配到这些节点上以任务的方式执行。Storm的拓扑类似Hadoop的作业(Job)。拓扑会永远运行下去,直到进程被终止。用拓扑来定义处理流数据时需要哪些Spout和Bolt。


2. Storm和Kafka集成

Kafka和Storm相互补充,他们强大的合作可以为快速移动的大数据提供实时流分析。因此,为了使开发人员更容易从Storm拓扑中摄取和发布数据流,我们把Kafka和Storm集成起来开发应用程序。


pic


a. 使用KafkaSpout


从Kafka集群读取数据的spout的实现类是KafkaSpout。要创建一个KafkaSpout的实例去连接Kafka集群,需要提供以下参数:


Kafka代理的主机名列表


每个代理的分区数量


要从中读取消息的主题名称


Spout用来存储消费者偏移量的文件夹在ZooKeeper中的根路径


在ZooKeeper中存储消费者偏移量所需的消费者ID


示例:



SpoutConfig spoutConfig = new SpoutConfig(

 ImmutableList.of("kafkahost1:9092", "kafkahost2:9093"), // list of Kafka brokers

 8, // number of partitions per host

 "topicName", // topic to read from

 "/kafkastorm", // the root path in Zookeeper for the spout to store the consumer offsets

 "consumerID"); // an id for this consumer for storing the consumer offsets in Zookeeper

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

分区的偏移量将存储在由根路径、消费者ID和分区ID拼接成的文件夹中,如下所示,其中“0”,“1”是分区的ID:



{root path}/{id}/0

{root path}/{id}/1

{root path}/{id}/2

{root path}/{id}/3

默认情况下,偏移量将存储在Storm使用的Zookeeper集群中。另外,我们可以通过spout的配置来覆盖它,如下所示:



spoutConfig.zkServers = ImmutableList.of("otherserver.com");

spoutConfig.zkPort = 2191;

我们可以在SpoutConfig上执行forceStartOffsetTime来强制spout回卷到先前的偏移量,如下所示:


spoutConfig.forceStartOffsetTime(-2);

forceStartOffsetTime将强制Spout从离当前时间最近的最新偏移量开始读取数据。此外,我们可以通过传入-1来强制Spout始终从最新的偏移量开始,并且我们可以通过传入-2来强制Spout从最早的偏移量开始。


Kafka Spout使用ZooKeeper来存储消息偏移量和段消耗跟踪(如果它被消耗)的状态。


Storm默认使用自己的Zookeeper集群存储消息偏移量,这些偏移量被存储在在SpoutConfig中指定的Zookeeper根路径中。但是我们也可以通过在SpoutConfig中设置其他Zookeeper主机名来实现把偏移量存储到其他Zookeeper中。Spout以单线程方式d工作,因为所有的并行工作由Storm集群处理。KafkaSpout还可以回退到先前的偏移量,而不是从上次保存的偏移量开始。


KafkaSpout还提供了一个选项来指定如何通过设置属性让Spout从Kafka集群中获取消息,如缓冲区大小和超时这些属性。