Apache Kafka生产者

1. 什么是Kafka生产者(Producer)?

基本上,如果一个应用程序是数据流的源,我们就认为这个应用程序是一个生产者。在Kafka中,我们使用Apache Kafka生产者生成令牌(token)或消息并进一步将其发布到Kafka集群中的一个或多个主题。此外,Kafka的Producer API能帮助我们打包消息或令牌并将其传递给Kafka Server。

下图演示了Kafka生产者如何工作:

pic

2. Producer API

Kafka Producer API 允许应用程序将记录流发布到一个或多个Kafka主题,它的核心部分是KafkaProducer类。KafkaProducer类是Producer接口的一个实现,它是线程安全的。

KafkaProducer类的主要方法有:

  • 为了将消息异步发送到主题,KafkaProducer类提供了send方法。 send()的签名是:

  producer.send(new ProducerRecord<byte[],byte[]>(topic,   partition, key1, value1) , callback);

       ProducerRecord要发送给Kafka的键/值对。参数包括要将记录发送到的主题名称,可选的分区号以及可选的键和值。

       Callback当服务器确认收到被发送的记录时,执行用户提供的回调函数。回调函数可以为空。

  • 为确保所有先前发送的消息都已实际完成,KafkaProducer类提供了flush方法。flush方法的签名是:

  public void flush()
  • 为了获取给定主题的分区元数据,KafkaProducer类提供了partitionFor方法,而且,我们可以将它用于自定义分区。partitionFor方法的签名是:

  public List<PartitionInfo> partitionsFor(String topic)
  • 为了获取生产者维护的全部内部指标,KafkaProducer类提供了metrics方法:

  public java.util.Map<MetricName,? extends Metric> metrics()
  • KafkaProducer类提供了close方法来关闭生产者:

  public void close()

        close方法会阻塞直到之前所有的请求完成。

3. Producer API的配置

创建一个KafkaProducer的实例需要初始化一些配置,因为它的构造方法如下:

KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs)

A producer is instantiated by   providing a set of key-value pairs as configuration.

KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer)

A producer is instantiated by   providing a set of key-value pairs as configuration, a key and a value Serializer.

KafkaProducer(java.util.Properties properties)

A producer is instantiated by   providing a set of key-value pairs as configuration.

KafkaProducer(java.util.Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer)

A producer is instantiated by   providing a set of key-value pairs as configuration, a key and a value Serializer.

所有我们需要了解一下有哪些常用的配置,当创建实例时我们就可以给这些配置赋值。这里我们列出一些主要的配置:

a. client.id生产者应用程序的id

b. producer.type – sync (同步)或者async(异步)

c. acks在一个请求被认为完成之前,生产者要求领导者收到的确认数量。 这可以控制发送记录的持久性。它的值可以是01all

d. retries如果生产者的请求因为某种原因失败,则自动重试该设置设定的次数。

e. bootstrap.servers用于建立与Kafka群集的初始连接的主机/端口对列表.

f. linger.ms如果我们想减少请求的数量,我们可以将linger.ms设置为大于某个值的值。

g. key.serializer用于实现org.apache.kafka.common.serialization.Serializer接口的keySerializer类。

h. value.serializer用于实现org.apache.kafka.common.serialization.Serializer接口的valueSerializer类。

i. batch.size简单来说,就是缓存区的大小

j. buffer.memory生产者可用于缓冲区的总内存量。

4. ProducerRecord API

要发送给Kafka的键/值对。创建ProducerRecord实例时参数包括要将记录发送到的主题名称,可选的分区号以及可选的键和值。

以下是常用的构造函数: 

public ProducerRecord (string topic, int partition, k key, v value)

1.     Topic – 记录要发送到的主题

2.     Partition – 记录要发送到的分区索引

3.     Key – 要发送的记录包含的键值

4.     Value – 要发送的记录的内容

public ProducerRecord (string topic, k key, v value)

1.     Topic – 记录要发送到的主题

2.     Key ? 要发送的记录包含的键值

3.     Value – 要发送的记录的内容

public ProducerRecord (string topic, v value)

1.     Topic ? 记录要发送到的主题

2.     Value ? 要发送的记录的内容


以下是ProducerRecord类提供的主要方法:

1. public string topic()

    记录要发送到的主题

2. public K key()

    包含在记录中的键值,如果创建ProducerRecord实例时没有指定键值,则返回null

3. public V value()

    记录的内容

4. public Integer partition()

    记录要发送到的分区,如果创建ProducerRecord实例时没有指定分区,就返回null

5. 一个简单的Kafka生产者应用程序

首先请确保启动ZooKeeperKafka代理,然后使用create topic命令在Kafka代理中创建你自己的主题。

1. 然后创建一个名为SimpleProducer.javajava,代码如下 

//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
 
//Create java class named “SimpleProducer”
public class SimpleProducer {
    public static void main(String[] args) throws Exception{
    // Check arguments length value
    if(args.length == 0){
        System.out.println("Enter topic name”);
        return;
}
 
    //Assign topicName to string variable
    String topicName = args[0].toString();
    // create instance for properties to access producer configs
    Properties props = new Properties();
    //Assign localhost id
    props.put("bootstrap.servers", “localhost:9092");
    //Set acknowledgements for producer requests.
    props.put("acks", “all");
    //If the request fails, the producer can automatically retry,
    props.put("retries", 0);
    //Specify buffer size in config
    props.put("batch.size", 16384);
    //Reduce the no of requests less than 0
    props.put("linger.ms", 1);
    //The buffer.memory controls the total amount of memory available to the producer for     buffering.
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer");
 
    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    for(int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
    }
    System.out.println(“Message sent successfully”);
    producer.close();
    }
}

2. 然后,编译这个类

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*:.” SimpleProducer.java

3. 接下来,运行

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*:.” SimpleProducer <topic_name>

 4. 输出

在控制台上可以看到以下信息:

Message sent successfully

 打开一个新的终端,然后运行消费者的脚本来接收消息:

>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic <topic_name> --from-beginning
 
1
2
3
4
5
6
7
8
9
10

Apache Kafka生产者

发表评论

邮箱地址不会被公开。 必填项已用*标注

九 × 一 =

滚动到顶部