内容
1. Kafka和负载测试的概要介绍
Apache Kafka是分布式数据库和消息队列的混合体。它的特点是:
-
默认情况下,消息存储时间很长:一周。
-
由于顺序I/O,所以性能很高。
-
方便做集群
-
消息能在集群中进行复制和分发,所以有很高的可用性
-
可以用Streaming API处理
Kafka非常适合处理大数据量的业务,所以我们在用Jmeter对Kafka做负载测试的时候,需要注意以下几个方面:
-
如果我们不断地将数据写入磁盘,那将影响服务器的容量。如果磁盘容量不足,会产生拒绝服务(Denial of Service)的错误。
-
分区的分别和代理的数量也会影响服务能力的使用。
-
当使用复制功能时,一切都变得更加复杂。原因是,复制功能的维护需要更多的资源,而且有可能发生代理拒绝接收消息的情况。
即使大多数流程都是自动化的,在处理大量的数据时也可能丢失一些数据。因此,对Kafka服务的测试非常重要,并且要特别关注它所能承受的负载。
我们在Ubuntu上对Apache Kafka进行负载测试,以便进行演示。我们将使用Pepper-Box插件作为生产者(Producer),因为它比kafkameter更方便的生成消息。但是,没有插件提供消费者(Consumer)实现,所以我们必须自己实现消费者,我们将使用JSR223 Sampler来做到这一点。
2. 配置生产者Pepper-Box
要在Jmeter中安装Pepper-Box插件,我们需要下载jar文件,并且创建和编译一个JSR233类。然后,将jar文件和编译后的类文件放在Jmeter的lib/ext文件夹中并重新启动JMeter。
Pepper-Box插件包括3个元素:
-
Pepper-Box PlainText Config – 它允许根据特定的模板构建文本信息。
-
Pepper-Box Serialized Config – 它允许构建作为java序列化对象的消息。
-
PepperBoxKafkaSampler – 它用来发送由以上两种元素构建的消息。
下面我们来详细了解一下它们。
a. Pepper-Box PlainText Config
请依次点击Thread Group – > Add – > Config Element – > Pepper-Box PlainText Config 来添加该元素。
Pepper-Box PlainText Config有两个字段:
i. Message Placeholder Key
需要在PepperBoxKafkaSampler中指定的键,在这个元素中需要通过这个键找到模板
ii. Schema Template
一个消息模板,你可以在其中使用JMeter变量和函数,以及插件函数。消息的结构可以是任何形式,从纯文本到JSON或XML。
在这个例子中,我们JSON字符串传递消息,并在其中定义了几个插件函数,这些函数包括:SEQUENCE, UUID和TIMESTAMP。
b. Pepper-Box Serialized Config
请依次点击Thread Group – > Add – > Config Element – > Pepper-Box Serialized Config来添加该元素。
该元素有一个Key字段和类名称Class Name字段,我们要在Class Name字段里指定Java类,包含指定类的jar文件必须放在lib/ext文件夹中。指定后,类的属性显示在下方,你可以为它们指定所需的值。我们重复使用了上一个元素的设置,但这次它是一个Java对象。
c. PepperBoxKafkaSampler
要添加此元素,请先跳转到Thread group -> Add -> Sampler -> Java Request,然后,从下拉列表中选择com.gslab.pepper.sampler.PepperBoxKafkaSampler。
这个元素有以下设置:
-
bootstrap.servers/zookeeper.servers – Zookeeper的地址(一个Zookeeper是一个接口,它为不同代理之间的生产者分配负载),格式为broker-ip-1:port,broker-ip-2:port 等
-
kafka.topic.name – 是把消息发布到的主题的名称。
-
key.serializer – 是一个用于键序列化的类。如果消息中没有键,请保持不变。
-
value.serializer – 是一个用于值序列化的类。对于简单文本,该字段保持不变。使用Pepper-Box Serialized Config时,需要指定“com.gslab.pepper.input.serialized.ObjectSerializer”。
-
compression.type – 是一种消息压缩的类型(none/gzip/snappy/lz4)
-
batch.size – 一个批次中消息大小的最大值。
-
linger.ms – 是消息等待时间。
-
buffer.memory – 是生产者的缓冲区大小。
-
acks – 在完成请求之前,生产者要求分区leader收到的确认数量。这可以控制发送记录的持久性。(-1(all)/0/1 – 收到所有的ISP确认/不等待任何确认/不用等待所有的ISP确认)
-
receive.buffer.bytes/send.buffer.bytes – TCP发送/接收缓冲区的大小。-1表示使用默认操作系统的值。
-
security.protocol – 是加密协议(PLAINTEXT/SSL/SASL_PLAINTEXT/SASL_SSL)。
-
message.placeholder.key – 是消息键,在前面的元素中指定。
-
kerberos.auth.enabled, java.security.auth.login.config, java.security.krb5.conf, sasl.kerberos.service.name – 一个字段组,用于认证(Authentication)
此外,如有必要,你可以以名称前面加上前缀_来添加其他参数,例如_ssl.key.password。
3. 配置消费者
虽然生产者在服务器上创建了最大的负载,但Kafka也必须传递消息。因此,我们还应该在测试中考虑到消费者,以更准确地再现情况。消费者还可用于检查所有的消息是否已被传递。
作为示例,让我们采用以下源代码并简要介绍其步骤:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); }
1. 创建了一个连接所需要的配置实例
2. 指定了一个主题并对其进行订阅
3. 从主题中接收消息,并在控制台上显示出来
把以上代码修改一下,然后可以添加到JMeter中的JSR223 Sampler中。
4. 在JMeter中构建Kafka负载测试场景
现在我们已经研究了创建负载的所有必要元素,让我们尝试将几条消息发布到我们的Kafka主题中。假设我们有一个资源,我们可以收集有关它的活动的数据。消息将作为XML文档发送。
1. 添加Pepper-Box PlainText Config并创建模板。 消息的结构如下:消息号,消息ID,从中收集统计信息的项ID,统计信息,发送日期戳。如下图所示:
2. 添加PepperBoxKafkaSampler。在属性设置中指定bootstrap.servers的地址和kafka.topic.name。在我们的例子中,bootstrap.servers的地址是localhost:9092,演示的主题是TuttorialTopic。我们还要把placeholder.key指定为上一步中创建的模板。
3. 将含有消费者代码的JSR223 Sampler添加到单独的线程组。要使Sampler工作,你还需要一个kafka-clients-x.x.x.x.jar文件,其中包含能和Kafka工作的类,这个jar文件可以在kafka目录中找到 – /kafka/lib。
在这里,我们修改了部分脚本,现在将数据保存到文件中,而不是在控制台中显示,这样做是为了更方便地分析结果。我们还添加了消费者执行时间的设置,执行时间为5秒。
Updated part: long t = System.currentTimeMillis(); long end = t + 5000; f = new FileOutputStream(".\\data.csv", true); p = new PrintStream(f); while (System.currentTimeMillis()<end) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { p.println( "offset = " + record.offset() +" value = " + record.value()); } consumer.commitSync(); } consumer.close(); p.close(); f.close();
现在脚本的结构如下所示。两个线程同时工作,生产者开始将消息发布到指定的主题。 消费者连接到主题并等待来自Kafka的消息。当消费者收到消息时,它会将消息写入文件。
4. 运行脚本并查看结果。
正如你在上面的屏幕截图中看到的,发送了十条消息。你可以在打开的文件中看到收到的消息。之后,你只需调整消费者和生产者的数量即可增加负载。
注意:在测试期间不要使用随机数据作为消息发送,因为随机数据的大小可能与当前消息的大小不同,并且这种差异可能会影响测试结果。
值得提醒的是,Apache Kafka专为大量连接而设计,因此你可以很容易的达到网络负载生成器的容量限制。在这种情况下,JMeter使用分布式测试的功能。