内容
1. 什么是Kafka生产者(Producer)?
基本上,如果一个应用程序是数据流的源,我们就认为这个应用程序是一个生产者。在Kafka中,我们使用Apache Kafka生产者生成令牌(token)或消息并进一步将其发布到Kafka集群中的一个或多个主题。此外,Kafka的Producer API能帮助我们打包消息或令牌并将其传递给Kafka Server。
下图演示了Kafka生产者如何工作:
pic
2. Producer API
Kafka Producer API 允许应用程序将记录流发布到一个或多个Kafka主题,它的核心部分是KafkaProducer类。KafkaProducer类是Producer接口的一个实现,它是线程安全的。
KafkaProducer类的主要方法有:
-
为了将消息异步发送到主题,KafkaProducer类提供了send方法。 send()的签名是:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
ProducerRecord – 要发送给Kafka的键/值对。参数包括要将记录发送到的主题名称,可选的分区号以及可选的键和值。
Callback – 当服务器确认收到被发送的记录时,执行用户提供的回调函数。回调函数可以为空。
-
为确保所有先前发送的消息都已实际完成,KafkaProducer类提供了flush方法。flush方法的签名是:
public void flush()
-
为了获取给定主题的分区元数据,KafkaProducer类提供了partitionFor方法,而且,我们可以将它用于自定义分区。partitionFor方法的签名是:
public List<PartitionInfo> partitionsFor(String topic)
-
为了获取生产者维护的全部内部指标,KafkaProducer类提供了metrics方法:
public java.util.Map<MetricName,? extends Metric> metrics()
-
KafkaProducer类提供了close方法来关闭生产者:
public void close()
close方法会阻塞直到之前所有的请求完成。
3. Producer API的配置
创建一个KafkaProducer的实例需要初始化一些配置,因为它的构造方法如下:
KafkaProducer A producer is instantiated by providing a set of key-value pairs as configuration. |
KafkaProducer A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value |
KafkaProducer A producer is instantiated by providing a set of key-value pairs as configuration. |
KafkaProducer A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value |
所有我们需要了解一下有哪些常用的配置,当创建实例时我们就可以给这些配置赋值。这里我们列出一些主要的配置:
a. client.id – 生产者应用程序的id
b. producer.type – sync (同步)或者async(异步)
c. acks – 在一个请求被认为完成之前,生产者要求领导者收到的确认数量。 这可以控制发送记录的持久性。它的值可以是0,1,all。
d. retries – 如果生产者的请求因为某种原因失败,则自动重试该设置设定的次数。
e. bootstrap.servers – 用于建立与Kafka群集的初始连接的主机/端口对列表.
f. linger.ms – 如果我们想减少请求的数量,我们可以将linger.ms设置为大于某个值的值。
g. key.serializer – 用于实现org.apache.kafka.common.serialization.Serializer接口的key的Serializer类。
h. value.serializer – 用于实现org.apache.kafka.common.serialization.Serializer接口的value的Serializer类。
i. batch.size – 简单来说,就是缓存区的大小
j. buffer.memory – 生产者可用于缓冲区的总内存量。
4. ProducerRecord API
要发送给Kafka的键/值对。创建ProducerRecord实例时参数包括要将记录发送到的主题名称,可选的分区号以及可选的键和值。
以下是常用的构造函数:
public ProducerRecord (string topic, int partition, k key, v value)
1. Topic – 记录要发送到的主题
2. Partition – 记录要发送到的分区索引
3. Key – 要发送的记录包含的键值
4. Value – 要发送的记录的内容
public ProducerRecord (string topic, k key, v value)
1. Topic – 记录要发送到的主题
2. Key ? 要发送的记录包含的键值
3. Value – 要发送的记录的内容
public ProducerRecord (string topic, v value)
1. Topic ? 记录要发送到的主题
2. Value ? 要发送的记录的内容
以下是ProducerRecord类提供的主要方法:
1. public string topic()
记录要发送到的主题
2. public K key()
包含在记录中的键值,如果创建ProducerRecord实例时没有指定键值,则返回null。
3. public V value()
记录的内容
4. public Integer partition()
记录要发送到的分区,如果创建ProducerRecord实例时没有指定分区,就返回null。
5. 一个简单的Kafka生产者应用程序
首先请确保启动ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中创建你自己的主题。
1. 然后创建一个名为SimpleProducer.java的java类,代码如下:
//import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; //Create java class named “SimpleProducer” public class SimpleProducer { public static void main(String[] args) throws Exception{ // Check arguments length value if(args.length == 0){ System.out.println("Enter topic name”); return; } //Assign topicName to string variable String topicName = args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", “localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", “all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for(int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); } System.out.println(“Message sent successfully”); producer.close(); } }
2. 然后,编译这个类:
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*:.” SimpleProducer.java
3. 接下来,运行:
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*:.” SimpleProducer <topic_name>
4. 输出:
在控制台上可以看到以下信息:
Message sent successfully
打开一个新的终端,然后运行消费者的脚本来接收消息:
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic <topic_name> --from-beginning 1 2 3 4 5 6 7 8 9 10