Apache Kafka集群

1. 准备工作

为了获得更好的Kafka服务可靠性和高可用性,我们需要在集群模式下设置Kafka。请按以下步骤先准备好环境:

  • 从Apache Kafka的网站下载Kafka,然后,解压缩zip文件。

  • 假如解压后的文件夹是kafka_2.11-1.1.0,跳转到kafka_2.11-1.1.0文件夹。

2. 搭建Kafka集群

按以下步骤搭建集群:

     1. 创建一个名为“logs_1”的文件夹。这个文件夹将存储所有Kafka代理1的日志。

     2. 进入config目录,打开server.properties文件,这是包含Kafka代理配置的文件。

     3. 将broker.id设置为1.确保它是Kafka群集中代理的ID,因此对于每个代理,它必须是唯一的。

     4. 激活关于listener的配置,并将其设置为PLAINTEXT:// localhost:9092。这个设置的意思是, Kafka代理将监听9092端口来处理连接请求。

     5. 把log.dirs的值设置为我们在步骤1中创建的logs文件夹路径。

     6. 在zookeeper.connect配置中设置Apache Zookeeper地址。需要注意的是,如果Zookeeper在Kafka群集中运行,请确保将Zookeeper地址指定为以逗号分隔的列表,例如:localhost:2181,localhost:2182。

基本上,这些是我们需要为开发环境设置的一些常规配置。

通过这种方式,我们的第一个Kafka代理配置就绪。现在,我们来设置另外两个代理的配置:

  • 首先把server.properties复制两份,分别叫做server-1.properties和server-2.properties

  • 创建两个日志文件夹“logs_2”和“logs_3”

  • 按照上面的相同的步骤对server-1.properties和server-2.properties进行以下更改。

  • 在步骤3中将broker.id分别改为2和3。

  • 在步骤4把端口号分别改为9093和9094。注意:可以使用任何可用的端口号。

  • 在步骤5中把log.dirs设置成对应的logs_2和logs_3文件夹。

至此,我们对所有代理的配置已准备就绪。现在,我们来启动Kafka集群服务。

3. 启动Kafka集群

1. 进入到Kafka文件夹的主目录kafka_2.11-1.1.0,首先启动Zookeeper:

 ./bin/zookeeper-server-start.sh config/zookeeper.properties

2. 然后启动Kafka服务器:

 ./bin/kafka-server-start.sh config/server.properties
 ./bin/kafka-server-start.sh config/server-1.properties &
 ./bin/kafka-server-start.sh config/server-2.properties &

3. 然后运行以下命令创建一个主题:

 ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

     在这条命令里,创建了1个分区,每个分区的复制因子为3。复制因子为3,意味着对于每个分区,都有一个领导者(Leader)和两个追随者(Follower)。消息或记录被发送给领导者时,它也被复制到追随者中。

4. 运行以下命令可以帮助我们知道每个分区的领导者和追随者是谁:

 ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

     输出如下:

 Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
         Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

     我们来详细了解一下输出的内容:第一行给出了所有分区的摘要,之后每一行列出了一个分区的信息。由于我们的主题只有一个分区,因此只有一行。

  • “leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的一部分分区的领导者。

  • “replicas”是复制此分区日志的节点列表,不管它们是否为领导者,或者当前是否处于活动状态。

  • “isr”是“已同步”副本的集合。

     请注意,在这个示例中,节点(代理)1是该主题的唯一分区的领导者。

5. 运行以下命令可以列出所有的主题:

 ./bin/kafka-topics.sh --list --zookeeper localhost:2181

6. 向主题中发布消息:

 > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
 ...
 my test message 1
 my test message 2
 ^C

7. 通过消费者读取消息:

 > ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
 ...
 my test message 1
 my test message 2
 ^C

8. 现在我们测试一下容错。代理1是领导者,那么我们把它停掉:

 > ps aux | grep server-1.properties
 7564 ttys002    0:15.91         
 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
 > kill -9 7564

     领导者已经切换成原来追随者中的一个,代理1已经不再同步副本(ISR)中:

 > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
      Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
           Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

     但是即使最初接受写入的领导者已经停机,这些消息仍可供消费:

 > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
 ...
 my test message 1
 my test message 2
 ^C

Apache Kafka集群

发表评论

邮箱地址不会被公开。 必填项已用*标注

七十 一 − = 65

滚动到顶部