内容
1. 准备工作
为了获得更好的Kafka服务可靠性和高可用性,我们需要在集群模式下设置Kafka。请按以下步骤先准备好环境:
-
从Apache Kafka的网站下载Kafka,然后,解压缩zip文件。
-
假如解压后的文件夹是kafka_2.11-1.1.0,跳转到kafka_2.11-1.1.0文件夹。
2. 搭建Kafka集群
按以下步骤搭建集群:
1. 创建一个名为“logs_1”的文件夹。这个文件夹将存储所有Kafka代理1的日志。
2. 进入config目录,打开server.properties文件,这是包含Kafka代理配置的文件。
3. 将broker.id设置为1.确保它是Kafka群集中代理的ID,因此对于每个代理,它必须是唯一的。
4. 激活关于listener的配置,并将其设置为PLAINTEXT:// localhost:9092。这个设置的意思是, Kafka代理将监听9092端口来处理连接请求。
5. 把log.dirs的值设置为我们在步骤1中创建的logs文件夹路径。
6. 在zookeeper.connect配置中设置Apache Zookeeper地址。需要注意的是,如果Zookeeper在Kafka群集中运行,请确保将Zookeeper地址指定为以逗号分隔的列表,例如:localhost:2181,localhost:2182。
基本上,这些是我们需要为开发环境设置的一些常规配置。
通过这种方式,我们的第一个Kafka代理配置就绪。现在,我们来设置另外两个代理的配置:
-
首先把server.properties复制两份,分别叫做server-1.properties和server-2.properties
-
创建两个日志文件夹“logs_2”和“logs_3”
-
按照上面的相同的步骤对server-1.properties和server-2.properties进行以下更改。
-
在步骤3中将broker.id分别改为2和3。
-
在步骤4把端口号分别改为9093和9094。注意:可以使用任何可用的端口号。
-
在步骤5中把log.dirs设置成对应的logs_2和logs_3文件夹。
至此,我们对所有代理的配置已准备就绪。现在,我们来启动Kafka集群服务。
3. 启动Kafka集群
1. 进入到Kafka文件夹的主目录kafka_2.11-1.1.0,首先启动Zookeeper:
./bin/zookeeper-server-start.sh config/zookeeper.properties
2. 然后启动Kafka服务器:
./bin/kafka-server-start.sh config/server.properties ./bin/kafka-server-start.sh config/server-1.properties & ./bin/kafka-server-start.sh config/server-2.properties &
3. 然后运行以下命令创建一个主题:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
在这条命令里,创建了1个分区,每个分区的复制因子为3。复制因子为3,意味着对于每个分区,都有一个领导者(Leader)和两个追随者(Follower)。消息或记录被发送给领导者时,它也被复制到追随者中。
4. 运行以下命令可以帮助我们知道每个分区的领导者和追随者是谁:
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
输出如下:
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
我们来详细了解一下输出的内容:第一行给出了所有分区的摘要,之后每一行列出了一个分区的信息。由于我们的主题只有一个分区,因此只有一行。
-
“leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的一部分分区的领导者。
-
“replicas”是复制此分区日志的节点列表,不管它们是否为领导者,或者当前是否处于活动状态。
-
“isr”是“已同步”副本的集合。
请注意,在这个示例中,节点(代理)1是该主题的唯一分区的领导者。
5. 运行以下命令可以列出所有的主题:
./bin/kafka-topics.sh --list --zookeeper localhost:2181
6. 向主题中发布消息:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C
7. 通过消费者读取消息:
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
8. 现在我们测试一下容错。代理1是领导者,那么我们把它停掉:
> ps aux | grep server-1.properties 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java... > kill -9 7564
领导者已经切换成原来追随者中的一个,代理1已经不再同步副本(ISR)中:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
但是即使最初接受写入的领导者已经停机,这些消息仍可供消费:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C