内容
1. 引言
Apache Kafka针对容量小的消息进行了优化。 根据基准测试,1 KB消息可以获得最佳性能。 较大的消息(例如,10 MB到100 MB)可能会降低吞吐量并显着影响操作。
下面我们将从以下几个方面来介绍可以提高Kafka性能和可靠性的配置。
2. 分区和内存使用率
代理为它们复制的每个分区分配一个大小为replica.fetch.max.bytes的缓冲区。如果replica.fetch.max.bytes设置为1 MiB,并且你有1000个分区,则需要大约1 GiB的RAM。请确保分区数乘以最大消息的大小不超过可用内存。
同样的考虑适用于消费者fetch.message.max.bytes设置。请确保你有足够的内存用于消费者复制的每个分区的最大消息。对于较大的消息,你可能需要使用较少的分区或提供更多的RAM。
分区重分配
在某些时候,系统上的资源可能会超出配置闲置。如果将一个新的代理添加到kafka集群以处理增加的需求,就得为它分配新分区(与任何其他代理相同),但它不会自动共享其他代理上现有分区的负载。要在代理之间重新分配现有负载,必须手动重新分配分区。 可以使用bin/kafka-reassign-partitions.sh脚本执行此操作。
要重新分配分区,请按以下步骤操作:
1. 创建一个要移动的主题的json格式的文件
topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 }
2. 使用kafka-reassign-partitions.sh中的–generate选项可以列出当前代理上的分区和副本的分布,然后列出新代理上分区的建议位置列表。
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "4" --generate Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,1]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,2]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } {"version":1, "partitions":[{"topic":"foo1","partition":3,"replicas":[4]}, {"topic":"foo1","partition":1,"replicas":[4]}, {"topic":"foo2","partition":2,"replicas":[4]}] }
3. 如果需要,请修改建议的列表,然后将其另存为JSON文件。
4. 使用kafka-reassign-partitions.sh中的–execute选项执行重新分配过程,在某些情况下可能需要几个小时。
> bin/kafka-reassign-partitions.sh \ --zookeeper localhost:2181 \ --reassignment-json-file expand-cluster-reassignment.json --execute
5. 使用kafka-reassign-partitions.sh中的–verify选项检查重新分配后分区的状态。
虽然重新分配分区是代价比较大,但你应该预测系统增长并在系统容量达到70%时重新分配负载。如果你等到已达到资源限制而被迫重新分配分区,那么这个重分配过程可能会非常缓慢。
3. 垃圾回收
发送大容量的消息可能导致更长的垃圾回收(GC)暂停因为代理需要分配大块内存。请监视GC日志和服务器日志。如果长时间GC暂停导致Kafka放弃ZooKeeper会话,则可能需要为zookeeper.session.timeout.ms配置更长的超时值。
4. 处理大容量的消息
在配置Kafka以处理大消息之前,请首先考虑以下选项以减少消息大小:
-
Kafka生产者可以压缩消息。例如,如果原始消息是基于文本的格式(例如XML),则在大多数情况下,压缩后消息将足够小。使用compression.codec和compressed.topics生产者配置参数来启用压缩。支持Gzip和Snappy。
-
如果共享存储(例如NAS,HDFS或S3)可用,请考虑将大文件放在共享存储上,并使用Kafka发送带有文件位置的消息。在许多情况下,这比使用Kafka发送大文件本身要快得多。
-
使用生产客户端将大消息拆分为大小为1 KB的许多段,然后使用分区键确保以正确的顺序将所有段发送到同一Kafka分区。然后,消费客户端可以获取这些段并重新构造原始的大消息。
如果你仍需要使用Kafka发送大消息,请修改以下配置参数以满足你的要求:
代理配置
-
message.max.bytes
代理将接受的最大消息的大小。该值必须小于消费者的fetch.message.max.bytes的值,否则消费者不能使用消息。默认值:1000000(1MiB)
-
log.segment.bytes
Kafka数据文件的大小。必须大于任何单个消息。
默认值:1073741824(1GiB) -
replica.fetch.max.bytes
代理可以复制的最大消息的大小。必须大于message.max.bytes,否则代理可能接受一条无法复制的消息,从而可能导致数据丢失。
默认值:1048576(1MiB)
消费者设置
如果单个消息批次大于下面的任何一个默认值,消费者仍然可以处理这个批次,但这个批次将被单独发送,这可能会导致性能下降。
-
max.partition.fetch.bytes
服务器返回的每个分区的最大数据量。
默认值:1048576(10MiB) -
fetch.max.bytes
服务器为一个获取请求返回的最大数据量。
默认值:52428800(50MiB) -
fetch.message.max.bytes
消费者可以读取的最大消息的大小。必须至少与message.max.bytes一样大。
默认值:1048576(1MiB)
5. 调整Kafka以优化性能
性能调优涉及两个重要指标:延迟(Latency)测量处理一个事件所需的时间,吞吐量(Throughput)测量在特定时间内到达的事件数量。大多数系统都针对延迟或吞吐量进行了优化,Kafka也是如此,而且两者在Kafka中达到了平衡。考虑到在收到消息时处理消息所需的延迟,经过良好调整的Kafka系统恰好只有足够的代理来处理主题吞吐量。
调整你的生产者,代理和消费者,以便在可管理的时间内发送,处理和接收尽可能多的批次,从而最大限度地平衡Kafka群集的延迟和吞吐量。
a. 调整Kafka生产者
Kafka使用异步发布/订阅模型。当生产者调用send()命令时,返回的结果是java.util.concurrent.Future实例。Future提供了一些方法,可以让你检查正在处理的信息的状态。消息批次准备就绪后,生产者将其发送给代理。Kafka代理等待事件,接收结果,然后响应事务完成。
如果你不使用Future,那么只能获得一条记录,等待结果,然后发送响应。这样做延迟非常低,但吞吐量也非常低。如果每个事务需要5毫秒,吞吐量是每秒200个事件,那么结果就比预期的每秒100,000个事件慢很多。
调用Producer.send()时,会先把数据填充的生产者的缓冲区。当缓冲区已满时,生产者将缓冲区的数据发送到Kafka代理并开始重新填充缓冲区。
有两个参数对延迟和吞吐量特别重要:batch size 和 linger time
i. batch size
batch.size以总字节数而不是消息数量来度量批次(batch)的大小。它控制在向Kafka代理发送消息之前要收集的数据字节数。可以在不超出可用内存的情况下将其设置得尽可能高,默认值为16384。
如果增加缓冲区的大小,它可能永远不会满。生产者最终根据其他触发条件发送信息,例如linger time(以毫秒为单位)。虽然你可以通过将缓冲区大小设置得很高来减少内存使用率,但这不会影响延迟。
ii. linger time
linger.ms设置在异步模式下缓冲数据的最长时间。例如,为100个批次的数据设置linger time 为100ms,即停留100ms后再发送数据。这个设置提高了吞吐量,但缓冲增加了消息传递延迟。
默认情况下,生产者不会等待,当数据可用时它会在任意时间发送缓冲区。
你可以将linger.ms设置为5并在一批中发送更多消息,而不是立即发送每一条消息。这样可以减少发送的请求数量,但是对于被发送的记录将增加5毫秒的延迟。
代理离生产者越远,发送消息所需的开销就越大。增加linger.ms以提高生产者的延迟和吞吐量。
b. 调整Kafka代理
主题分为几个分区,每个分区都有一个领导者,大多数分区的数据都会被写入领导者和多个副本中。当领导者之间没有得到适当的平衡时,有的领导者可能会负载过度。
根据你的系统以及数据的重要程度,你需要确保拥有足够的副本来保留数据。我们建议最开始请将你的每个物理存储磁盘对应一个分区,每个分区一个对应一个消费者者。
c. 调整Kafka消费者
消费者在管道的另一端可能会造成吞吐量问题。主题的最大消费者数量等于分区数量。你需要足够的分区来对应所有的消费者以满足生产者的产能。
消费者组将主题的分区分配给本组内的消费者,向组中添加更多消费者可以提高性能。添加更多消费者组不会影响性能。
replica.high.watermark.checkpoint.interval.ms属性的使用可能会影响吞吐量。从分区读取数据时,你可以为读取到的最后一条消息设置标记,这样,如果你以后必须向前查找并找到丢失的数据,你就有了一个检查点,无需重新读取先前的数据就可以向前移动。如果为每个事件设置检查点水印,就永远不会丢失消息,但这样做会显着影响性能。相反,如果你将其设置为每100条消息检查一个偏移量,你会有一个安全边际,从而对吞吐量的影响要小得多。
6. 生产环境服务器配置
根据集群环境和计算机配置的可用性,以下是我们可以修改的一些配置参数及其值:
a. num.replica.fetchers
该参数定义将数据从领导者(leader)复制到追随者(follower)的线程数,我们可以根据机器线程的可用性修改此参数的值。如果我们有足够的可用线程,那么我们就可以让一定数量的副本获取器(fetcher)并行完成复制。
b. replica.fetch.max.bytes
该参数决定在每个获取请求中我们想要从一个分区里获取多少数据。增加这个参数的值是的好处是可以帮助我们在追随者中更快速的创建副本。
c. replica.socket.receive.buffer.bytes
如果我们只有很少的线程用来创建副本,那么我们可以增加缓冲区的大小。另外,如果与传入消息速率相比复制线程工作较慢,就会有助于保存更多数据。
d. num.partitions
当Kafka运行时,我们应该谨慎修改这个配置。我们可以启用并行级别并且并行写入数据,这将自动增加吞吐量。
不过,如果系统配置无法处理,那么增加分区数量会降低Kafka的性能和吞吐量。如果系统没有足够的线程或只有单个磁盘,那么创建的大量分区并不能获得更高的吞吐量。因此,是否要为主题创建更多分区直接依赖于可用的线程和磁盘。
e. num.io.threads
集群中有多少磁盘决定了I/O线程的设置值,线程的数量依赖于磁盘的数量。服务器用这些线程来执行请求。
7. 限额(Quotas)
Kafka可以对生产和读取请求强制实施限额。生产者和消费者都会使用非常大量的数据,这可能会垄断代理资源,导致网络饱和,并且拒绝向其他客户端和代理本身提供服务。限额可以防范这些问题,对于大型多租户集群非常重要,因为使用大量数据的少量客户端会降低用户体验。
限额是字节速率阈值,按客户端ID定义。一个客户端ID在逻辑上标识一个发出请求的应用程序,单个客户端ID可以跨多个生产者和消费者实例。 限额作为独立实体应用于所有实例:例如,如果客户端ID的生产限额为10 MB/s,则该限额将在具有相同ID的所有实例之间共享。
将Kafka作为服务运行时,配额可以强制对API进行限制。默认情况下,每个唯一客户端ID都会收到以每秒字节数为单位的固定限额,该限额由集群(quota.producer.default,quota.consumer.default)配置。限额是基于每个代理定义的,每个客户端在被禁止工作之前,每秒可以对每个代理发布或获取最多X个字节。
当客户端超过其限额时,代理不会返回错误,而是尝试降低客户端的访问速度。代理会计算将客户端限制在其限额之下所需的延迟请求数量,并将延迟响应这些请求。这个方法使限额违规处理对客户端(客户端指标之外)透明,这也可以防止客户端实现特殊的退避和重试行为。
设置限额
你可以覆盖客户端ID的默认限额,如果你需要设置一个更高或更低的值,该机制类似于覆盖每个主题的日志配置。将对客户端ID的覆盖写入/config/clients下的ZooKeeper设置中,所有代理都会读取被覆盖的值,并且这些覆盖立即生效。更改限额无需重新启动整个集群。
默认情况下,每个客户端ID都会收到无限制的限额。以下配置将每个生产者和消费者客户端ID的默认限额设置为10 MB/s。
quota.producer.default=10485760 quota.consumer.default=10485760
8. 为Kakfa设置用户限制
大多数类Unix系统上的最大打开文件数默认设置是1024,而Kafka会同时打开许多文件,默认1024个文件可能是不够的。任何重大负载都可能导致失败或者错误消息,例如java.io.IOException …(Too many open files),这些错误信息被记录在Kafka或HDFS日志文件中。你可能还会注意到以下错误:
ERROR Error in acceptor (kafka.network.Acceptor) java.io.IOException: Too many open files
我们建议你把默认最大打开文件数设置为一个相对较大的值,例如32768。