Apache Kafka的负载测试

1. Kafka和负载测试的概要介绍

Apache Kafka是分布式数据库和消息队列的混合体。它的特点是: 

  • 默认情况下,消息存储时间很长:一周。

  • 由于顺序I/O,所以性能很高。

  • 方便做集群

  • 消息能在集群中进行复制和分发,所以有很高的可用性

  • 可以用Streaming API处理

Kafka非常适合处理大数据量的业务,所以我们在用JmeterKafka做负载测试的时候,需要注意以下几个方面: 

  • 如果我们不断地将数据写入磁盘,那将影响服务器的容量。如果磁盘容量不足,会产生拒绝服务(Denial of Service)的错误。

  • 分区的分别和代理的数量也会影响服务能力的使用。

  • 当使用复制功能时,一切都变得更加复杂。原因是,复制功能的维护需要更多的资源,而且有可能发生代理拒绝接收消息的情况。

即使大多数流程都是自动化的,在处理大量的数据时也可能丢失一些数据。因此,对Kafka服务的测试非常重要,并且要特别关注它所能承受的负载。

我们在Ubuntu上对Apache Kafka进行负载测试,以便进行演示。我们将使用Pepper-Box插件作为生产者(Producer),因为它比kafkameter更方便的生成消息。但是,没有插件提供消费者(Consumer)实现,所以我们必须自己实现消费者,我们将使用JSR223 Sampler来做到这一点。

2. 配置生产者Pepper-Box 

要在Jmeter中安装Pepper-Box插件,我们需要下载jar文件,并且创建和编译一个JSR233类。然后,将jar文件和编译后的类文件放在Jmeterlib/ext文件夹中并重新启动JMeter

Pepper-Box插件包括3个元素:

  • Pepper-Box PlainText Config它允许根据特定的模板构建文本信息。

  • Pepper-Box Serialized Config它允许构建作为java序列化对象的消息。

  • PepperBoxKafkaSampler它用来发送由以上两种元素构建的消息。

下面我们来详细了解一下它们。

a. Pepper-Box PlainText Config

请依次点击Thread Group – > Add – > Config Element – > Pepper-Box PlainText Config 来添加该元素。

image.png

Pepper-Box PlainText Config有两个字段:

i. Message Placeholder Key

需要在PepperBoxKafkaSampler中指定的键,在这个元素中需要通过这个键找到模板

ii. Schema Template

一个消息模板,你可以在其中使用JMeter变量和函数,以及插件函数。消息的结构可以是任何形式,从纯文本到JSONXML

在这个例子中,我们JSON字符串传递消息,并在其中定义了几个插件函数,这些函数包括:SEQUENCE UUIDTIMESTAMP

b. Pepper-Box Serialized Config

请依次点击Thread Group – > Add – > Config Element – > Pepper-Box Serialized Config来添加该元素。

image.png

该元素有一个Key字段和类名称Class Name字段,我们要在Class Name字段里指定Java类,包含指定类的jar文件必须放在lib/ext文件夹中。指定后,类的属性显示在下方,你可以为它们指定所需的值。我们重复使用了上一个元素的设置,但这次它是一个Java对象。 

c. PepperBoxKafkaSampler

要添加此元素,请先跳转到Thread group -> Add -> Sampler -> Java Request,然后,从下拉列表中选择com.gslab.pepper.sampler.PepperBoxKafkaSampler

这个元素有以下设置:

  • bootstrap.servers/zookeeper.servers – Zookeeper的地址(一个Zookeeper是一个接口,它为不同代理之间的生产者分配负载),格式为broker-ip-1:portbroker-ip-2:port

  • kafka.topic.name是把消息发布到的主题的名称。

  • key.serializer是一个用于键序列化的类。如果消息中没有键,请保持不变。

  • value.serializer是一个用于值序列化的类。对于简单文本,该字段保持不变。使用Pepper-Box Serialized Config时,需要指定“com.gslab.pepper.input.serialized.ObjectSerializer”

  • compression.type是一种消息压缩的类型(none/gzip/snappy/lz4

  • batch.size一个批次中消息大小的最大值。

  • linger.ms是消息等待时间。

  • buffer.memory是生产者的缓冲区大小。

  • acks在完成请求之前,生产者要求分区leader收到的确认数量。这可以控制发送记录的持久性。(-1(all)/0/1 – 收到所有的ISP确认/不等待任何确认/不用等待所有的ISP确认)

  • receive.buffer.bytes/send.buffer.bytes – TCP发送/接收缓冲区的大小。-1表示使用默认操作系统的值。

  • security.protocol是加密协议(PLAINTEXT/SSL/SASL_PLAINTEXT/SASL_SSL)。

  • message.placeholder.key是消息键,在前面的元素中指定。

  • kerberos.auth.enabled,      java.security.auth.login.config, java.security.krb5.conf,      sasl.kerberos.service.name  一个字段组,用于认证(Authentication

此外,如有必要,你可以以名称前面加上前缀_来添加其他参数,例如_ssl.key.password

3. 配置消费者

虽然生产者在服务器上创建了最大的负载,但Kafka也必须传递消息。因此,我们还应该在测试中考虑到消费者,以更准确地再现情况。消费者还可用于检查所有的消息是否已被传递。

作为示例,让我们采用以下源代码并简要介绍其步骤:

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", group);
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);     
     consumer.subscribe(Arrays.asList(topic));
     System.out.println("Subscribed to topic " + topic);
     int i = 0;  
     while (true) {
        ConsumerRecords<String, String> records = con-sumer.poll(100);
           for (ConsumerRecord<String, String> record : records)
              System.out.printf("offset = %d, key = %s, value = %s\n",
              record.offset(), record.key(), record.value());
     }

1. 创建了一个连接所需要的配置实例

2. 指定了一个主题并对其进行订阅

3. 从主题中接收消息,并在控制台上显示出来

把以上代码修改一下,然后可以添加到JMeter中的JSR223 Sampler中。

4. JMeter中构建Kafka负载测试场景

现在我们已经研究了创建负载的所有必要元素,让我们尝试将几条消息发布到我们的Kafka主题中。假设我们有一个资源,我们可以收集有关它的活动的数据。消息将作为XML文档发送。

1. 添加Pepper-Box PlainText Config并创建模板。 消息的结构如下:消息号,消息ID,从中收集统计信息的项ID,统计信息,发送日期戳。如下图所示:

image.png

2. 添加PepperBoxKafkaSampler。在属性设置中指定bootstrap.servers的地址和kafka.topic.name。在我们的例子中,bootstrap.servers的地址是localhost:9092,演示的主题是TuttorialTopic。我们还要把placeholder.key指定为上一步中创建的模板。

image.png

3. 将含有消费者代码的JSR223 Sampler添加到单独的线程组。要使Sampler工作,你还需要一个kafka-clients-x.x.x.x.jar文件,其中包含能和Kafka工作的类,这个jar文件可以在kafka目录中找到 – /kafka/lib

在这里,我们修改了部分脚本,现在将数据保存到文件中,而不是在控制台中显示,这样做是为了更方便地分析结果。我们还添加了消费者执行时间的设置,执行时间为5秒。

Updated part:
 long t = System.currentTimeMillis();
 long end = t + 5000;
 f = new FileOutputStream(".\\data.csv", true);
 p = new PrintStream(f);
 while (System.currentTimeMillis()<end)
 {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records)
    {
        p.println( "offset = " + record.offset() +" value = " + record.value());
    }
    consumer.commitSync();
  }
  consumer.close();
  p.close();
  f.close();

现在脚本的结构如下所示。两个线程同时工作,生产者开始将消息发布到指定的主题。 消费者连接到主题并等待来自Kafka的消息。当消费者收到消息时,它会将消息写入文件。

image.png

 4. 运行脚本并查看结果。

image.png

正如你在上面的屏幕截图中看到的,发送了十条消息。你可以在打开的文件中看到收到的消息。之后,你只需调整消费者和生产者的数量即可增加负载。

注意:在测试期间不要使用随机数据作为消息发送,因为随机数据的大小可能与当前消息的大小不同,并且这种差异可能会影响测试结果。

值得提醒的是,Apache Kafka专为大量连接而设计,因此你可以很容易的达到网络负载生成器的容量限制。在这种情况下,JMeter使用分布式测试的功能。

Apache Kafka的负载测试

发表评论

邮箱地址不会被公开。 必填项已用*标注

+ 八十 五 = 86

滚动到顶部