内容
1. 什么是消费者?
消费者(Consumer)是指从Kafka 主题读取数据的应用程序。Kafka消费者订阅了Kafka集群中的一个或多个主题,然后就能持续的从Kafka主题中读取消息。
通过Heartbeat,我们可以知道消费者与Kafka集群的连接性。Heartbeat设置在消费者端,让Zookeeper或代理协调器(Broker Coordinator)知道消费者是否仍然连接到集群。因此,如果Heartbeat不存在,Kafka消费者就被认为不再与集群连接,在这种情况下,代理协调器必须重新平衡负载。但是,Heartbeat会增加集群的开销,所以考虑到数据吞吐量和这些开销,我们需要合理配置消费者的Heartbeat时间间隔。
pic
多个消费者可以组成一个小组并共同消费一个主题,同一组中的每个使用者都被赋予共享的group_id。 例如,如果一个消费者是你的foobar进程,它运行在三台机器上,那么你可以为这组消费者分配ID “foobar”。此组ID在消费者的配置中提供,并且由你决定让消费者属于哪个组。组中的消费者尽可能公平地划分分区,一个分区仅由消费者组中的一个消费者使用。
2. KafkaConsumer API
要连接到Kafka集群并使用数据流,Kafka的消费者API会有所帮助。
下图演示了Apache Kafka 消费者的工作原理:
pic
要订阅一个或多个主题并处理应用程序生成的记录流,我们使用Kafka消费者API,换句话说,我们使用KafkaConsumer API来读取来自Kafka集群的消息。下面看一下KafkaConsumer类的构造函数。
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
通过提供一组键值对作为配置来实例化消费者。此处记录了有效的配置项。
KafkaConsumer类中主要有以下方法:
1. public java.util.Set<TopicPar-tition> assignment()
获取当前分配给此消费者的分区集。
2. public string subscription()
获取当前订阅的主题
3. public void subscribe(java.util.List<java.lang.String> topics)
订阅给定的主题列表以获取动态分配的分区。主题订阅不是增量的。参数传递的列表将替换当前已有的分配(如果存在)。如果给定的主题列表为空,则将其视为与unsubscribe()相同。
4.public void subscribe(java.util.regex.Pattern pattern, ConsumerRebalanceListener listener)
订阅与指定模式匹配的所有主题以获取动态分配的分区。模式匹配操作将针对检查时存在的主题定期进行。
5. public void unsubscribe()
取消通过调用subscribe(Collection)方法订阅的主题,这也清除了通过assign(Collection)直接分配的任何分区。
6. public void assign(java.util.Collection<TopicPartition> partitions)
手动为消费者分配分区列表。该接口不允许增量分配,并将替换先前的分配(如果存在)。如果给定的主题分区列表为空,则将其视为与unsubscribe()相同。
7. public ConsumerRecords<K,V> poll(long timeout)
获取由某个subscribe或assign API指定的主题或分区的数据。如果获取数据前没有订阅任何主题或分区将报错。
8. public void commitSync()
提交在所有订阅的主题和分区列表的最后一个poll()上返回的偏移量(offset)。这是一个同步提交,将阻塞直到提交成功或遇到不可恢复的错误。
9. public void commitAsync()
同上,只是这个方法异步提交,不阻塞。
10. public void seek(TopicPartition partition, long offset)
覆盖消费者在下次调用poll(timeout)时将使用的提取偏移量(fetch offset)。如果多次为同一分区调用此API,则将在下次调用poll()时使用最后一次seek()提供的偏移量。请注意,如果在消耗过程中任意使用此API,则可能会丢失数据。
11. public void pause(java.util.Collection<TopicPartition> partitions)
暂停读取数据的分区。
12. public void resume(java.util.Collection<TopicPartition> partitions)
恢复由pause(partitions)暂停的分区。
13. public void wakeup()
唤醒消费者。此方法是线程安全的,特别适用于中止长轮询。
14. public void close()
关闭消费者,等待任何所需清理的工作30秒(默认超时时间)。如果启用了自动提交,则会在默认超时时间内提交当前偏移量(如果可能)。请注意,wakeup()不能用于中断关闭。
3. ConsumerRecord API
要从Kafka集群接收记录,我们使用ConsumerRecord API,即ConsumerRecord类。它是从Kafka收到的键/值对,包括主题名称和从中接收记录的分区号,指向Kafka分区中的记录的偏移量,以及由相应的ProducerRecord标记的时间戳。构造函数是:
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
参数:
-
topic – 从中接收记录的主题名称
-
partition – 从中接收记录的分区
-
offset – 该记录在相应的Kafka分区中的偏移量
-
key – 该记录的键值,运行为空
-
value – 该记录的内容
ConsumerRecord类提供了方法来获取记录有关的主题、分区、偏移量、键值、内容等信息。
4. ConsumerRecords API
一个用于保存特定主题的每个分区的ConsumerRecord列表的容器。
Consumer.poll(long)操作为每个主题分区返回一个ConsumerRecord列表。
构造函数是:
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List<ConsumerRecord>K,V>>> records)
参数:
-
records – 一个由主题分区为键,一组ConsumerRecord为值的Map
ConsumerRecords类的主要方法有:
1. public int count()
所有主题的记录数。
2. public java.util.Set<TopicPartition> partitions()
获取包含该记录集中记录的分区。
3. public java.util.Iterator<ConsumerRecord<K,V>> iterator()
遍历记录集中的记录
4. public java.util.List<ConsumerRecord<K,V>> records(TopicPartition partition)
获取给定分区的记录
5. public java.lang.Iterable<ConsumerRecord<K,V>> records(java.lang.String topic)
获取给定主题的记录
5. 配置
这里我们列出一些消费者客户端API的主要配置:
1. bootstrap.servers
用于建立与Kafka群集的初始连接的主机/端口对列表.
2. group.id
单个消费者所属的消费者组的ID
3. enable.auto.commit
启用自动提交,如果该设置为真,就自动提交偏移量。
4. auto.commit.interval.ms
自动提交的时间间隔,即消费者偏移量被自动提交到Kafka的频率
5. session.timeout.ms
使用Kafka的组管理工具检测消费者故障的超时时间。消费者定期发送心跳以指示其对代理的活跃性,如果在此会话超时到期之前代理没有收到心跳,则代理将从该组中删除此消费者并启动重新平衡。
6.消费者应用示例
a. 代码
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class SimpleConsumer { public static void main(String[] args) throws Exception { if(args.length == 0){ System.out.println("Enter topic name"); return; } //Kafka consumer configuration settings String topicName = args[0].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)) //print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) { // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } } }
b. 编译
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*:.” SimpleConsumer.java
c. 运行
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*:.”. SimpleConsumer <topic-name>
d. 发布消息
运行生产者CLI并向主题发送一些消息,我们可以把简单的输入作为'Hello Consumer'。
e. 输出
Subscribed to topic Hello-Kafka offset = 3, key = null, value = Hello Consumer
7. Kafka消费者组
多个消费者可以组成一个消费者组,如下图所示:
pic
-
通过使用相同的group.id,消费者可以加入一个组。
-
组的最大并行度是组中的消费者数量,即分区的数量。
-
Kafka将主题的分区分配给组中的消费者,每个分区仅由该组中的一个消费者使用。
-
Kafka保证一条消息只能由组中的一个消费者读取。
-
消费者可以按照消息在日志中存储的顺序查看它们。
a. 重新平衡消费者
添加更多进程/线程将导致Kafka重新平衡资源。如果任何消费者或代理以某种方式无法向ZooKeeper发送心跳,则可以通过Kafka群集重新配置它。此外,在此重新平衡期间,Kafka将可用分区分配给可用线程,也可能将分区分配给另一个进程。
b. 代码示例
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class ConsumerGroup { public static void main(String[] args) throws Exception { if(args.length < 2){ System.out.println("Usage: consumer <topic> <groupname>"); return; } String topic = args[0].toString(); String group = args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } } }
编译:
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*:." ConsumerGroup.java
运行:
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*:.” ConsumerGroup <topic-name> my-group
我们创建了一个名为my-group的消费者组,和两个消费者。
c. 发布消息
运行生产者CLI,并发送以下消息:
Test consumer group 01 Test consumer group 02
d. 输出
第一个进程:
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 01
第二个进程:
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 02