kafka相關shell命令

啟動kafka(进程守护)
1
2
3
kafka-server-start.sh -daemon /opt/kafka_2.11-2.0.0/config/server.properties &

nohup bin/kafka-server-start.sh config/server.properties &
停止kafka
1
kafka-server-stop.sh  -daemon /opt/kafka_2.11-2.0.0/config/server.properties
1
kafka-roducer.sh --broker-list 192.168.106.199:9092 --topic dzy-test
启动kafka-eager
1
ke.sh start
查看topic列表
1
kafka-topics.sh --list --zookeeper localhost:2181
查看TOPIC描述信息
1
kafka-topics.sh --describe --zookeeper 192.168.106.199:2181
查看某一topic描述信息
1
kafka-topics.sh --describe --zookeeper 192.168.106.199:2181 --topic dzy-test
创建topic
1
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dzy-test
创建发送窗口
1
kafka-console-producer.sh --broker-list 192.168.66.194:9092 --topic dzy-test
创建接收窗口
1
kafka-console-consumer.sh --bootstrap-server 192.168.66.194:9092 --topic kafka_key_out --from-beginning
删除Topic
1
kafka-topics.sh --zookeeper 192.168.106.199:2181 --delete --topic my_topic_name
查看所有消费者
列表
1
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查询某个测点数据

1
sh kafka-console-consumer.sh --bootstrap-server microservice-dc-1:9092  --topic dis-excess-ampli-chang |grep PLC6.S7300.XNJBJ3_TIME_H

修改topic相关配置

1、cleanup.policy:过期或达到日志上限的清理策略(delete-删除;compact-压缩)默认值为delete

1
bin/kafka-topics.sh   --alter --zookeeper cdh-worker-dc-1:2181,cdh-worker-dc-2:2181,cdh-worker-dc-3:2181 --topic test_topic --config cleanup.policy=delete

2、compression.type:指定给该topic最终的压缩类型(uncompressed;snappy;lz4;gzip;producer)默认值为producer

1
bin/kafka-topics.sh   --alter --zookeeper cdh-worker-dc-1:2181,cdh-worker-dc-2:2181,cdh-worker-dc-3:2181/kafka --topic test_dis-cumulative-amount-gzip --config compression.type=gzip

3、delete.retention.ms:压缩日志保留的最长时间,也是消费端消息的最长时间。单位为毫秒(默认值:86400000)

1
bin/kafka-topics.sh   --alter --zookeeper 192.168.153.128:2181 --topic test_topic --config delete.retention.ms=3600000

4、file.delete.delay.ms:从文件系统中删除前所等待的时间(默认值:60000)

1
bin/kafka-topics.sh   --alter --zookeeper 192.168.153.128:2181 --topic test_topic --config file.delete.delay.ms=3600000

5、flush.messages:在消息刷到磁盘前,日志分区收集的消息数(默认值:MAX_VALUE)

1
bin/kafka-topics.sh   --alter --zookeeper 192.168.153.128:2181 --topic test_topic --config flush.messages=MAX_VALUE

6、flush.ms:消息刷到磁盘前,保存在内存中的最长时间,单位ms(默认值:MAX_VALUE)

1
bin/kafka-topics.sh   --alter --zookeeper 192.168.153.128:2181 --topic test_topic --config flush.ms=3600000

7、index.interval.bytes:当执行fetch操作后,需要一定的空间来扫描最近的offset大小。设置越大,扫描速度更快但更耗内存,一般情况下不用设置此参数。(默认值:4096)

1
bin/kafka-topics.sh   --alter --zookeeper 192.168.153.128:2181 --topic test_topic --config index.interval.bytes=3600000

8、max.message.bytes:log中能够容纳消息的最大字节数(默认值:1000012)

1
bin/kafka-topics.sh   --alter --zookeeper 192.168.153.128:2181 --topic test_topic --config max.message.bytes=1000012

9、min.cleanable.dirty.ratio:日志清理的频率控制,越大清理更高效(默认值:0.5)

1
bin/kafka-topics.sh   --alter --zookeeper cdh-worker-dc-1:2181,cdh-worker-dc-2:2181,cdh-worker-dc-3:2181 --topic test_dis-upper-lower-limits-gzip --config min.cleanable.dirty.ratio=0.5

10、retention.bytes:topic每个分区的最大文件大小。(默认值:-1没有限制)

1
bin/kafka-topics.sh   --alter --zookeeper 192.168.153.128:2181 --topic test_topic --config retention.bytes=-1

注:这个topic的数据大小=每个分区下的数据大小

11、retention.ms:日志文件保留的分钟数。数据存储的最大时间超过这个时间会根据cleanup.policy策略处理数据。

1
bin/kafka-topics.sh   --alter --zookeeper 192.168.153.128:2181 --topic test_topic --config retention.ms=3600000

12、segment.bytes:topic的文件是以segment文件存储的,该参数控制每个segment文件的大小(默认值:1073741824)

1
bin/kafka-topics.sh   --alter --zookeeper 192.168.153.128:2181 --topic test_topic --config segment.bytes=1073741824

13、segment.index.bytes:对于segment日志的索引文件大小限制(默认值:10M)

1
bin/kafka-topics.sh   --alter --zookeeper 192.168.153.128:2181 --topic test_topic --config segment.index.bytes=3600000

修改相关配置

1
2
3
4
5
6
7
8
9
10
11
可在创建topic时,通过--config进行指定项的参数配置,覆盖默认配置:
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test_dis-upper-lower-limits-lz4 --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config retention.ms=3600000

也可以在创建topic之后通过config.sh文件对其中的特定指标进行修改,下面操作对my-topic相关指标进行配置:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config max.message.bytes=128000

查看是否修改成功:
> bin/kafka-configs.sh --zookeeper cdh-worker-dc-1:2181,cdh-worker-dc-2:2181,cdh-worker-dc-3:2181/kafka --entity-type topics --entity-name test_dis-cumulative-amount-gzip --describe

也可以撤销/删除某些指定配置,将该项重置为默认配置:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2018-2020 丁振莹
  • 访问人数: | 浏览次数:

你的每一分支持,是我努力下去的最大的力量 ٩(๑❛ᴗ❛๑)۶

支付宝
微信