操作
Table of Contents
首先开始实现单节点单代理配置,然后将设置迁移到单节点多代理配置。
启动
在迁移到Kafka Cluster Setup之前,首先需要启动ZooKeeper,因为Kafka Cluster使用ZooKeeper
启动Zookpeer:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka Broker:
bin/kafka-server-start.sh config/server.properties
使用 jps 命令可以看到类似的结果:
821 QuorumPeerMain 928 Kafka 931 Jps
这说明有两个 JVM守护进程 在运行, QuorumPeerMain 对应于ZooKeeper守护进程,另一个是Kafka守护进程
单节点 - 单代理配置
在此配置中,有一个ZooKeeper和代理id实例
创建Kafka主题
Kafka提供了一个名为 kafka-topics.sh 的命令行实用程序,用于在服务器上创建主题:
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka
创建了一个名为 Hello-Kafka 的 主题 ,其中包含 一个分区 和 一个副本因子
创建主题后,可以在Kafka代理终端窗口中获取通知 并在config/server.properties文件中的“/ tmp / kafka-logs /"中指定的创建主题的日志
主题列表
要获取Kafka服务器中的主题列表,可以使用以下命令:
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
由于只创建了一个主题,它将仅列出 Hello-Kafka 。 假设,如果已经创建多个主题,将在输出中获取所有的主题名称
启动生产者以发送消息
可以在 config/producer.properties 文件中指定默认生产者属性
生产者 命令行客户端 需要两个主要参数:
- 代理 列表:在这种情况下,只有一个代理
- config/server.properties 文件 包含代理端口ID
- 这个代理正在侦听端口 9092 ,因此可以 直接指定 它
- 主题 名称
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
生产者将 等待 来自 stdin的输入 并发布到Kafka集群
默认情况下,每一个新行都作为新消息发布
现在在终端中键入几行消息,如下所示:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka WARN property topic is not valid (kafka.utils.Verifia-bleProperties) Hello My first message
启动消费者以接收消息
与生产者类似,在 config/consumer.proper-ties 文件中指定了缺省使用者属性
打开一个新终端并键入:
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 ―topic Hello-Kafka --from-beginning
输出:
Hello My first message
单节点多代理配置
在进入多个代理集群设置之前,首先启动ZooKeeper服务器
创建多个Kafka Brokers
原来的配置 server.properties 文件中已有一个Kafka代理实例
现在需要 多个 代理实例 :
- 将现有的 server.properties 文件 复制 到 两个新的配置文件 中
- 将其重命名为 server-one.properties 和 server-two.properties
- 编辑这两个新文件
server-one.properties 看起来如下:
# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # The port the socket server listens on port=9093 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-1
server-two.properties 类似:
# The id of the broker. This must be set to a unique integer for each broker. broker.id=2 # The port the socket server listens on port=9094 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-2
启动多个代理
在三台服务器上进行所有更改后,打开三个新终端,逐个启动每个代理:
# Broker1 $ bin/kafka-server-start.sh config/server.properties
#Broker2 $ bin/kafka-server-start.sh config/server-one.properties
#Broker3 $ bin/kafka-server-start.sh config/server-two.properties
现在就有了三个不同的代理在运行
创建主题
由于有三个不同的代理运行,所以将 主题的复制因子 指定为 3 个:
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibrokerapplication
created topic “Multibrokerapplication"
describe 命令:检查哪个代理正在侦听当前创建的主题
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerapplication
Topic:Multibrokerapplication PartitionCount:1 ReplicationFactor:3 Configs: Topic:Multibrokerapplication Partition:0 Leader:0 Replicas:0,2,1 Isr:0,2,1
- 第一行给出 所有分区的摘要 ,显示 主题名称 , 分区数量 和已经选择的 复制因子 。
- 第二行中,每个节点将是 分区的随机选择部分的领导者
- 第一个broker(broker.id=0)是领导者
- Replicas:0,2,1意味着所有代理复制主题
- Isr 是 in-sync 副本的集合
启动生产者以发送消息
与单代理类似:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication This is single node-multi broker demo This is the second message
启动消费者以接收消息
与单代理类似:
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 ―topic Multibrokerapplica-tion ―from-beginning This is single node-multi broker demo This is the second message
主题
修改
$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic Hello-kafka --parti-tions 2 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
删除
$ bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka Topic Hello-kafka marked for deletion
注意:如果 delete.topic.enable 未设置为true,则此删除操作不会产生任何影响