IT人生

  • 首页
  • 归档
  • kafka
  • Java
  • Spring
  • Golang
  • SQL
  • Spark
  • ElasticSearch
  • 关于

  • 搜索
Phoenix HBase Kudu ElasticSearch Spring 数据结构 操作系统 Kettle Azkaban Sqoop Hive Yarn Redis Mybatis Impala Cloudera 大数据 HDFS mycat shell Linux 架构 并发 mysql sql golang java 工具 spark kafka 人生

Apache Kafka系列(二) 命令行工具(CLI)

发表于 2018-12-03 | 分类于 kafka | 0 | 阅读次数 4428

Apache Kafka命令行工具(Command Line Interface,CLI),下文简称CLI。

1. 启动Kafka

  启动Kafka需要两步:

1.1. 启动ZooKeeper 

[root@Server1 kafka_2.12-0.11.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties

1.2. 启动Kafka Server

[root@Server1 kafka_2.12-0.11.0.0]# bin/kafka-server-start.sh config/server.properties 

2. 列出Topic

[root@Server1 kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
HelloWorld

3. 创建Topic

[root@Server1 kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Demo1
Created topic "Demo1".

  上述命令会创建一个名为Demo1的Topic,并指定了replication-factor和partitions分别为1。其中replication-factor控制一个Message会被写到多少台服务器上,因此这个值必须小于或者等于Broker的数量。

4. 描述Topic

[root@Server1 kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Demo1
Topic:Demo1     PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: Demo1    Partition: 0    Leader: 0       Replicas: 0     Isr: 0

5. 发布消息到指定的Topic

[root@Server1 kafka_2.12-0.11.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Demo1
>this
>is 
>the 
>firest
>input

可以在控制台逐行输入任意消息。命令的终止符是:control + C组合键。

6. 消费指定Topic上的消息

[root@Server1 kafka_2.12-0.11.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic Demo1
this
is 
the 
firest
input

7. 修改Topic

  7.1 增加指定Topic的partition,在第3步中创建的Demo1的partition是1。如下命令将增加10个partition

[root@Server1 kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 11 --topic Demo1
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

7.2. 删除指定Topic

[root@Server1 kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic Demo1
Topic Demo1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

Note中指出该Topic并没有真正的删除,如果真删除,需要把server.properties中的delete.topic.enable置为true

7.3 给指定的Topic增加配置项,如给一个增加max message size值为128000

[root@Server1 kafka_2.12-0.11.0.0]# bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic Demo1 --config max.message.bytes=128000
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "Demo1".

warning中指出该命令已经过期,将来可能被删除,替代的命令是使用kafka-config.sh。新命令如下:

[root@Server1 kafka_2.12-0.11.0.0]# bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-type topics --entity-name Demo1 --add-config max.message.bytes=12800
Completed Updating config for entity: topic 'Demo1'.

需要使用entity-type置为topics,并在entity-name中指定对应的名称

7.4 reset指定consumer的topic offset

bash-4.4# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mine_message --topic mine_account_reg -execute --reset-offsets --to-offset 37 
Error: Assignments can only be reset if the group 'mine_message' is inactive, but the current state is Stable.

TOPIC                          PARTITION  NEW-OFFSET     
bash-4.4# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mine_message --topic mine_account_reg -execute --reset-offsets --to-offset 37

TOPIC                          PARTITION  NEW-OFFSET     
mine_account_reg               0          37     

8. 结论

  本文展示了CLI所提供的一些常用的命令,这些基本的命令在运维Kafka过程中很实用。

  • 本文作者: Randy
  • 本文链接: http://www.itrensheng.com/archives/apache-kafka-cli
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Phoenix # HBase # Kudu # ElasticSearch # Spring # 数据结构 # 操作系统 # Kettle # Azkaban # Sqoop # Hive # Yarn # Redis # Mybatis # Impala # Cloudera # 大数据 # HDFS # mycat # shell # Linux # 架构 # 并发 # mysql # sql # golang # java # 工具 # spark # kafka # 人生
Apache Kafka系列(一) 起步
Apache Kafka系列(三) Java API使用
  • 文章目录
  • 站点概览
Randy

Randy

技术可以暂时落后,但任何时候都要有上进的信念

80 日志
27 分类
31 标签
RSS
Github E-mail
Creative Commons
© 2021 备案号:沪ICP备19020689号-1
Randy的个人网站