UP | HOME

操作

Table of Contents

  首先开始实现单节点单代理配置,然后将设置迁移到单节点多代理配置。

启动

    在迁移到Kafka Cluster Setup之前,首先需要启动ZooKeeper,因为Kafka Cluster使用ZooKeeper

启动Zookpeer:

bin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka Broker:

bin/kafka-server-start.sh config/server.properties

使用 jps 命令可以看到类似的结果:

821 QuorumPeerMain
928 Kafka
931 Jps

这说明有两个 JVM守护进程 在运行, QuorumPeerMain 对应于ZooKeeper守护进程,另一个是Kafka守护进程

单节点 - 单代理配置

    在此配置中,有一个ZooKeeper和代理id实例

创建Kafka主题

Kafka提供了一个名为 kafka-topics.sh 的命令行实用程序,用于在服务器上创建主题:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1  --partitions 1 --topic Hello-Kafka

创建了一个名为 Hello-Kafka主题 ,其中包含 一个分区一个副本因子

     创建主题后,可以在Kafka代理终端窗口中获取通知
     并在config/server.properties文件中的“/ tmp / kafka-logs /"中指定的创建主题的日志

主题列表

要获取Kafka服务器中的主题列表,可以使用以下命令:

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

由于只创建了一个主题,它将仅列出 Hello-Kafka 。 假设,如果已经创建多个主题,将在输出中获取所有的主题名称

启动生产者以发送消息

   可以在 config/producer.properties 文件中指定默认生产者属性

生产者 命令行客户端 需要两个主要参数:

  1. 代理 列表:在这种情况下,只有一个代理
    • config/server.properties 文件 包含代理端口ID
    • 这个代理正在侦听端口 9092 ,因此可以 直接指定
  2. 主题 名称
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

生产者将 等待 来自 stdin的输入 并发布到Kafka集群

     默认情况下,每一个新行都作为新消息发布

现在在终端中键入几行消息,如下所示:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message

启动消费者以接收消息

     与生产者类似,在 config/consumer.proper-ties 文件中指定了缺省使用者属性

打开一个新终端并键入:

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 ―topic Hello-Kafka --from-beginning

输出:

Hello
My first message

单节点多代理配置

    在进入多个代理集群设置之前,首先启动ZooKeeper服务器

创建多个Kafka Brokers

     原来的配置 server.properties 文件中已有一个Kafka代理实例

现在需要 多个 代理实例

  1. 将现有的 server.properties 文件 复制两个新的配置文件
  2. 将其重命名为 server-one.propertiesserver-two.properties
  3. 编辑这两个新文件

server-one.properties 看起来如下:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

server-two.properties 类似:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

启动多个代理

在三台服务器上进行所有更改后,打开三个新终端,逐个启动每个代理:

# Broker1
$ bin/kafka-server-start.sh config/server.properties
#Broker2
$ bin/kafka-server-start.sh config/server-one.properties
#Broker3
$ bin/kafka-server-start.sh config/server-two.properties

现在就有了三个不同的代理在运行

创建主题

由于有三个不同的代理运行,所以将 主题的复制因子 指定为 3 个:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibrokerapplication
created topic “Multibrokerapplication"

describe 命令:检查哪个代理正在侦听当前创建的主题

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerapplication
Topic:Multibrokerapplication  PartitionCount:1 
ReplicationFactor:3 Configs:

Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1
  • 第一行给出 所有分区的摘要 ,显示 主题名称分区数量 和已经选择的 复制因子
  • 第二行中,每个节点将是 分区的随机选择部分的领导者
    • 第一个broker(broker.id=0)是领导者
    • Replicas:0,2,1意味着所有代理复制主题
    • Isr 是 in-sync 副本的集合

启动生产者以发送消息

与单代理类似:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication

This is single node-multi broker demo
This is the second message

启动消费者以接收消息

与单代理类似:

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 ―topic Multibrokerapplica-tion ―from-beginning

This is single node-multi broker demo
This is the second message

主题

修改

$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic Hello-kafka --parti-tions 2

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

删除

$ bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Topic Hello-kafka marked for deletion
     注意:如果 delete.topic.enable 未设置为true,则此删除操作不会产生任何影响

Next:实例

Previous: 流程

Home: 目录