Apache Kafka操作

1. 添加和删除Kafka主题

你可以选择手动添加主题,也可以在数据首次发布到不存在的主题时自动创建主题。 如果自动创建主题,则可能需要调整用于自动创建主题的默认配置。

添加的命令是:

> bin/kafka-topics.sh --zookeeper zk_host:port/chroot   --create --topic my_topic_name
      --partitions 20   --replication-factor 3 --config x=y

复制因子控制需要复制每条消息的服务器数量。如果复制因子为3,则在失去对数据的访问权限之前,允许最多2台服务器失败。我们建议你使用复制因子23,这样你就可以不中断数据的读取。 

分区数量决定将主题分成多少个日志。分区数量有几个影响:首先,每个分区必须完全适合单个服务器。因此,如果你有20个分区,则完整数据集(以及读取和写入负载)将由不超过20个服务器(不计算副本在内)处理。最后,分区计数会影响消费者的最大并行度。

2. 如何修改一个Kakfa主题 

可以使用相同的工具更改主题的配置或分区。

添加分区:

> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40

请注意,分区的一个用例是对数据进行语义分区,添加分区不会更改现有数据的分区,因此,这可能会干扰消费者如果消费者依赖于该分区。也就是说,如果数据是通过hash(key)%number_of_partitions进行分区,那么这个分区可能会通过添加分区进行混洗,但Kafka不会尝试以任何方式自动重新分配数据。

添加配置:

> bin/kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name my_topic_name --alter --add-config x=y

删除配置:

> bin/kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name my_topic_name --alter --delete-config x

删除主题:

 > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

3. 优雅的关机

Kafka群集将自动检测任何代理关闭或故障,并为该计算机上的分区选择新的领导者。 无论服务器是故障还是有意关闭以进行维护或配置更改,都会发生这种情况。对于后一种情况,Kafka支持更优雅的机制来停止服务器,而不仅仅是杀死服务器。当服务器优雅的停止时,它有两个优化可以利用:

    1. 它会将所有日志同步到磁盘,以避免在重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。因为恢复日志需要时间,所以这个优化加速了主动重启。

    2. 在关闭之前,它会将领导者的任何分区迁移到其他副本。这将使领导权转移更快,并将每个分区不可用的时间缩短到几毫秒。

4. Kafka集群之间镜像数据 

我们指的是在Kafka集群之间复制数据“镜像”的过程,以避免与单个集群中的节点之间发生的复制混淆。Kafka附带了一个用于在Kafka集群之间镜像数据的工具,该工具从源集群中读取数据并生成到目标集群。我们可以运行许多此类镜像进程以提高吞吐量和容错性。

数据从源集群中的主题读取出来,然后被写入目标集群中具有相同名称的主题。实际上,镜像生成器更像是一个Kafka消费者和生产者结合在一起的工具。

源和目标集群是完全独立的实体:它们可以具有不同数量的分区,并且偏移量将不相同。 所以,镜像集群并不是为了容错(因为消费者的位置会有所不同);为此,要容错我们建议使用正常的群集内复制。但是,镜像生成器进程将保留并使用消息的键值进行分区,因此消息可以根据键值保留顺序。

下面是一个示例,说明如何从输入集群镜像单个主题(名为my-topic):

> bin/kafka-mirror-maker.sh
      --consumer.config consumer.properties
      --producer.config producer.properties --whitelist my-topic

请注意,我们使用–whitelist选项指定主题列表,此选项允许使用Java格式的正则表达式。因此,你可以使用–whitelist ‘A|B’镜像名为AB的两个主题,或者你可以使用—whitelist ‘*’镜像所有主题。请把正则表达式放在引号里面以确保shell不会尝试将其解析为文件路径。为方便起见,我们允许使用而不是“|” 指定多个主题。

也可以不使用–whitelist来列出你要镜像的内容,相反你可以使用–blacklist来列出要排除的内容,–blacklist也需要一个正则表达式参数。 但是,启用新的消费者时(即,在消费者配置中定义了bootstrap.servers时),不支持–blacklist

将镜像与配置auto.create.topics.enable = true相结合,即使添加了新主题,也可以拥有一个副本集群,该集群将自动创建和复制源集群中的所有数据。

5. 检查消费者的位置 

有时看到消费者的位置很有用。我们有一个工具可以显示消费者组中所有消费者的位置以及他们离日志的结尾有多远。 如果一个名为my-group的消费者组相关联的主题是my-topic,那么可以按以下命令运行此工具:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
my-topic                       0          2               4               2          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
my-topic                       1          2               3               1          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
my-topic                       2          2               3               1          consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2   /127.0.0.1                     consumer-2

这个工具也适用于基于Zookeeper的消费者:

> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group my-group 

Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).
 
TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID
my-topic                       0          2               4               2          my-group_consumer-1
my-topic                       1          2               3               1          my-group_consumer-1
my-topic                       2          2               3               1          my-group_consumer-2

6. 扩展集群

将服务器添加到Kafka集群很简单,只需为它们分配一个唯一的代理ID并在新服务器上启动Kafka即可。但是,这些新服务器不会自动分配任何数据分区,因此除非将分区移动到它们,否则在创建新主题之前它们不会执行任何工作。所以,通常在将计算机添加到集群时,需要将一些现有数据迁移到这些计算机上。

迁移数据的过程是手动启动的,但整个过程完全自动化。Kafka将新服务器作为正在迁移的分区的跟随者,并允许它完全复制该分区中的现有数据。当新服务器完全复制此分区的内容并加入同步副本列表时,其中一个现有副本将删除其分区的数据。

分区重新分配工具可用于跨代理移动分区。 理想的分区分布能确保在所有代理上都有均匀的数据负载和分区大小。分区重新分配工具无法自动研究Kafka群集中的数据分布并移动分区以实现均匀的负载分配,因此,管理员必须弄清楚应该移动哪些主题或分区。

分区重新分配工具可以以3种互斥模式运行:

–generate: 在此模式下,给定主题列表和代理列表,该工具会生成候选的重新分配方案,以将指定主题的所有分区移动到新代理。此选项仅提供了一种方便的方法,可在给定主题列表和目标代理的情况下生成分区重新分配计划。

–execute: 在此模式下,该工具将根据用户提供的重新分配计划启动分区的重新分配(使用–reassignment-json-file参数)。参数的值是可以是由管理员手动创建的自定义重新分配计划,也可以时使用–generate选项生成的计划。 

–verify: 在此模式下,该工具将验证最后一次–execute期间列出的所有分区的重新分配状态。状态可以是成功完成,失败或正在进行中。

7. 自动迁移数据

分区重新分配工具(bin/kafka-reassign-partitions.sh)可用于将一些主题从当前的代理移动到新添加的代理上。这在扩展现有集群时通常很有用,因为将整个主题移动到新的代理上更容易,而不是一次移动一个分区。当用于执行此操作时,用户应提供应移动到新的代理上的主题列表和作为目标的新代理列表,然后,该工具在新的代理中均匀分配给定主题列表的所有分区,在移动期间,主题的复制因子保持不变。通过这个过程,我们可以高效地把输入主题列表的所有分区的副本将从旧的代理移动到新添加的代理上。

例如,以下示例将主题foo1foo2的所有分区移动到新的代理5,6上。 在此移动结束时,主题foo1foo2的所有分区将仅存在于代理5,6上。

由于该工具接受包含主题列表的json文件作为输入参数,因此首先需要确定要移动的主题并创建json文件,如下所示:

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
"version":1
}

准备好json文件后,使用分区重新分配工具生成候选分配计划:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment
 
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
              {"topic":"foo1","partition":0,"replicas":[3,4]},
              {"topic":"foo2","partition":2,"replicas":[1,2]},
              {"topic":"foo2","partition":0,"replicas":[3,4]},
              {"topic":"foo1","partition":1,"replicas":[2,3]},
              {"topic":"foo2","partition":1,"replicas":[2,3]}]
}
 
Proposed partition reassignment configuration
 
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
              {"topic":"foo1","partition":0,"replicas":[5,6]},
              {"topic":"foo2","partition":2,"replicas":[5,6]},
              {"topic":"foo2","partition":0,"replicas":[5,6]},
              {"topic":"foo1","partition":1,"replicas":[5,6]},
              {"topic":"foo2","partition":1,"replicas":[5,6]}]
}

该工具生成一个候选分配计划,将所有分区从主题foo1foo2移动到代理5,6。但请注意,此时分区移动尚未开始,它只是告诉你当前的分配和建议的新分配。应当保存当前分配,以防你想要回滚它。新的分配计划应保存在一个json文件(例如expand-cluster-reassignment.json)中,等使用–exceute选项执行重新分配时传入这个文件的路径作为参数,如下所示:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment
 
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
              {"topic":"foo1","partition":0,"replicas":[3,4]},
              {"topic":"foo2","partition":2,"replicas":[1,2]},
              {"topic":"foo2","partition":0,"replicas":[3,4]},
              {"topic":"foo1","partition":1,"replicas":[2,3]},
              {"topic":"foo2","partition":1,"replicas":[2,3]}]
}
 
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
              {"topic":"foo1","partition":0,"replicas":[5,6]},
              {"topic":"foo2","partition":2,"replicas":[5,6]},
              {"topic":"foo2","partition":0,"replicas":[5,6]},
              {"topic":"foo1","partition":1,"replicas":[5,6]},
              {"topic":"foo2","partition":1,"replicas":[5,6]}]
}

最后,使用该工具和–verify选项来检查分区重新分配的状态。请注意, expand-cluster-reassignment.json(与–execute选项一起使用)也应该做为参数和–verify选项一起使用:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully

8. 退役代理

分区重新分配工具无法为退役代理自动生成重新分配计划,因此,管理员必须设计重新分配计划,以将退役代理上托管的所有分区的副本移动到其他代理。 这可能相对繁琐,因为重新分配需要确保所有副本不会从退役的代理只移动到一个其他的代理上。 为了使这一过程毫不费力,我们计划在未来为退役代理添加工具支持。

9. 数据中心

某些部署需要管理跨多个数据中心的数据管道。 我们推荐的方法是在每个数据中心部署一个本地Kafka集群,每个数据中心中的应用程序实例仅与其本地集群交互,并在集群之间进行镜像(有关如何执行此操作,请参阅镜像数据有关的文档)。

此部署模式允许数据中心充当独立实体,并允许我们集中管理和调整数据中心之间的复制。这样,即使数据中心之间的链路不可用,每个设施也可以独立运行:当发生这种情况时,镜像会滞后,直到链路恢复后,镜像才能同步。

也可以通过WAN读取或写入远程Kafka集群,但显然这会增加任何连接集群所需的延迟。

Kafka能在生产者和消费者中批量处理数据,因此即使在高延迟的连接上也可以实现高吞吐量。为了实现这一点,可能需要使用socket.send.buffer.bytessocket.receive.buffer.bytes配置来增加生产者、使用者和代理的TCP套接字缓冲区大小。正确的设置方法请参考这里

通常不建议在高延迟链路上运行跨越多个数据中心的单个Kafka群集,因为这将导致Kafka写入和ZooKeeper写入的复制延迟非常高,如果位置之间的网络不可用,KafkaZooKeeper在所有位置都不能使用。

Apache Kafka操作

发表评论

邮箱地址不会被公开。 必填项已用*标注

4 × = 十六

滚动到顶部