Apache Kafka消费者

1. 什么是消费者?

消费者(Consumer)是指从Kafka 主题读取数据的应用程序。Kafka消费者订阅了Kafka集群中的一个或多个主题,然后就能持续的从Kafka主题中读取消息。

通过Heartbeat,我们可以知道消费者与Kafka集群的连接性。Heartbeat设置在消费者端,让Zookeeper或代理协调器(Broker Coordinator)知道消费者是否仍然连接到集群。因此,如果Heartbeat不存在,Kafka消费者就被认为不再与集群连接,在这种情况下,代理协调器必须重新平衡负载。但是,Heartbeat会增加集群的开销,所以考虑到数据吞吐量和这些开销,我们需要合理配置消费者的Heartbeat时间间隔。

pic

多个消费者可以组成一个小组并共同消费一个主题,同一组中的每个使用者都被赋予共享的group_id 例如,如果一个消费者是你的foobar进程,它运行在三台机器上,那么你可以为这组消费者分配ID “foobar”。此组ID在消费者的配置中提供,并且由你决定让消费者属于哪个组。组中的消费者尽可能公平地划分分区,一个分区仅由消费者组中的一个消费者使用。

2. KafkaConsumer API

要连接到Kafka集群并使用数据流,Kafka的消费者API会有所帮助。

下图演示了Apache Kafka 消费者的工作原理:

pic

要订阅一个或多个主题并处理应用程序生成的记录流,我们使用Kafka消费者API,换句话说,我们使用KafkaConsumer API来读取来自Kafka集群的消息。下面看一下KafkaConsumer类的构造函数。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

通过提供一组键值对作为配置来实例化消费者。此处记录了有效的配置项。

KafkaConsumer类中主要有以下方法:

1. public java.util.Set<TopicPar-tition> assignment() 

    获取当前分配给此消费者的分区集。

2. public string subscription()

    获取当前订阅的主题

3. public void subscribe(java.util.List<java.lang.String> topics) 

    订阅给定的主题列表以获取动态分配的分区。主题订阅不是增量的。参数传递的列表将替换当前已有的分配(如果存在)。如果给定的主题列表为空,则将其视为与unsubscribe()相同。

4.public void subscribe(java.util.regex.Pattern pattern,  ConsumerRebalanceListener listener)

   订阅与指定模式匹配的所有主题以获取动态分配的分区。模式匹配操作将针对检查时存在的主题定期进行。

5. public void unsubscribe()

   取消通过调用subscribe(Collection)方法订阅的主题,这也清除了通过assign(Collection)直接分配的任何分区。

6. public void assign(java.util.Collection<TopicPartition> partitions)

   手动为消费者分配分区列表。该接口不允许增量分配,并将替换先前的分配(如果存在)。如果给定的主题分区列表为空,则将其视为与unsubscribe()相同。

7. public ConsumerRecords<K,V> poll(long timeout)

   获取由某个subscribeassign API指定的主题或分区的数据。如果获取数据前没有订阅任何主题或分区将报错。

8. public void commitSync()

   提交在所有订阅的主题和分区列表的最后一个poll()上返回的偏移量(offset)。这是一个同步提交,将阻塞直到提交成功或遇到不可恢复的错误。

9. public void commitAsync()

   同上,只是这个方法异步提交,不阻塞。

10. public void seek(TopicPartition partition,  long offset)

   覆盖消费者在下次调用poll(timeout)时将使用的提取偏移量fetch offset。如果多次为同一分区调用此API,则将在下次调用poll()时使用最后一次seek()提供的偏移量。请注意,如果在消耗过程中任意使用此API,则可能会丢失数据。

11. public void pause(java.util.Collection<TopicPartition> partitions)

   暂停读取数据的分区。

12. public void resume(java.util.Collection<TopicPartition> partitions)

   恢复由pause(partitions)暂停的分区。

13. public void wakeup()

   唤醒消费者。此方法是线程安全的,特别适用于中止长轮询。

14. public void close()

   关闭消费者,等待任何所需清理的工作30秒(默认超时时间)。如果启用了自动提交,则会在默认超时时间内提交当前偏移量(如果可能)。请注意,wakeup()不能用于中断关闭。

3. ConsumerRecord API

要从Kafka集群接收记录,我们使用ConsumerRecord API,即ConsumerRecord类。它是从Kafka收到的键/值对,包括主题名称和从中接收记录的分区号,指向Kafka分区中的记录的偏移量,以及由相应的ProducerRecord标记的时间戳。构造函数是: 

public ConsumerRecord(string topic,int partition, long offset,K key, V value)

参数:

  • topic – 从中接收记录的主题名称

  • partition – 从中接收记录的分区

  • offset – 该记录在相应的Kafka分区中的偏移量

  • key – 该记录的键值,运行为空

  • value – 该记录的内容

ConsumerRecord类提供了方法来获取记录有关的主题、分区、偏移量、键值、内容等信息。

4. ConsumerRecords API 

一个用于保存特定主题的每个分区的ConsumerRecord列表的容器。

Consumer.poll(long)操作为每个主题分区返回一个ConsumerRecord列表。

构造函数是: 

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List<ConsumerRecord>K,V>>> records)

 参数:

  • records – 一个由主题分区为键,一组ConsumerRecord为值的Map

ConsumerRecords类的主要方法有: 

1. public int count()

   所有主题的记录数。

2. public java.util.Set<TopicPartition> partitions()

   获取包含该记录集中记录的分区。

3. public java.util.Iterator<ConsumerRecord<K,V>> iterator()

   遍历记录集中的记录

4. public java.util.List<ConsumerRecord<K,V>> records(TopicPartition partition)

   获取给定分区的记录

5. public java.lang.Iterable<ConsumerRecord<K,V>> records(java.lang.String topic)

   获取给定主题的记录

5. 配置

这里我们列出一些消费者客户端API的主要配置:

1. bootstrap.servers

   用于建立与Kafka群集的初始连接的主机/端口对列表.

2. group.id

   单个消费者所属的消费者组的ID

3. enable.auto.commit

   启用自动提交,如果该设置为真,就自动提交偏移量。

4. auto.commit.interval.ms

   自动提交的时间间隔,即消费者偏移量被自动提交到Kafka的频率

5. session.timeout.ms

   使用Kafka的组管理工具检测消费者故障的超时时间。消费者定期发送心跳以指示其对代理的活跃性,如果在此会话超时到期之前代理没有收到心跳,则代理将从该组中删除此消费者并启动重新平衡。

6.消费者应用示例

a. 代码

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
 
public class SimpleConsumer {
 
    public static void main(String[] args) throws Exception {
        if(args.length == 0){
            System.out.println("Enter topic name");
            return;
        }
 
        //Kafka consumer configuration settings
        String topicName = args[0].toString();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //Kafka Consumer subscribes list of topics here.
        consumer.subscribe(Arrays.asList(topicName))
        //print the topic name
        System.out.println("Subscribed to topic " + topicName);
 
        int i = 0;
        while (true) {
            ConsumerRecords<String, String> records = con-sumer.poll(100);
            for (ConsumerRecord<String, String> record : records)  {
                // print the offset,key and value for the consumer records.
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
            }
        }
    }
}

b. 编译 

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*:.” SimpleConsumer.java

c. 运行

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*:.”. SimpleConsumer <topic-name>

d. 发布消息

运行生产者CLI并向主题发送一些消息,我们可以把简单的输入作为'Hello Consumer'

e. 输出 

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

7. Kafka消费者组

多个消费者可以组成一个消费者组,如下图所示:

pic

  • 通过使用相同的group.id,消费者可以加入一个组。

  • 组的最大并行度是组中的消费者数量,即分区的数量。

  •  Kafka将主题的分区分配给组中的消费者,每个分区仅由该组中的一个消费者使用。

  • Kafka保证一条消息只能由组中的一个消费者读取。

  • 消费者可以按照消息在日志中存储的顺序查看它们

a. 重新平衡消费者

添加更多进程/线程将导致Kafka重新平衡资源。如果任何消费者或代理以某种方式无法向ZooKeeper发送心跳,则可以通过Kafka群集重新配置它。此外,在此重新平衡期间,Kafka将可用分区分配给可用线程,也可能将分区分配给另一个进程。

b. 代码示例

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
  public static void main(String[] args) throws Exception {
     if(args.length < 2){
        System.out.println("Usage: consumer <topic> <groupname>");
        return;
     }     
     String topic = args[0].toString();
     String group = args[1].toString();
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", group);
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer",          
        "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);   
     consumer.subscribe(Arrays.asList(topic));
     System.out.println("Subscribed to topic " + topic);
     int i = 0;        
     while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
           for (ConsumerRecord<String, String> record : records) {
              System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
           }
     }
  }
}

编译:

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*:." ConsumerGroup.java

运行:

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*:.”
ConsumerGroup <topic-name> my-group

我们创建了一个名为my-group的消费者组,和两个消费者。

c. 发布消息

运行生产者CLI,并发送以下消息:

Test consumer group 01
Test consumer group 02

d. 输出

第一个进程:

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

第二个进程:

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

Apache Kafka消费者

发表评论

邮箱地址不会被公开。 必填项已用*标注

七十 − 六十 四 =

滚动到顶部