1. 脚本概览
bin目录下的脚本作用
脚本 | 作用 |
---|---|
connect-distributed.sh | 用于启动多节点的Distributed模式的Kafka Connect组件 |
connect-standalone.sh | 用于启动单节点的Standalone模式的Kafka Connect组件 |
kafka-acls.sh | 用于设置Kafka权限,比如设置哪些用户可以访问Kafka的哪些TOPIC的权限 |
kafka-broker-api-versions.sh | 主要用于验证不同Kafka版本之间服务器和客户端的适配性 |
kafka-configs.sh | 配置管理脚本 |
kafka-console-consumer.sh | kafka消费者控制台 |
kafka-console-producer.sh | kafka生产者控制台 |
kafka-consumer-groups.sh | kafka消费者组相关信息 |
kafka-consumer-perf-test.sh | kafka消费者性能测试脚本 |
kafka-delegation-tokens.sh | 用于管理Delegation Token。基于Delegation Token的认证是一种轻量级的认证机制,是对SASL认证机制的补充。 |
kafka-delete-records.sh | 用于删除Kafka的分区消息 |
kafka-dump-log.sh | 用于查看Kafka消息文件的内容 |
kafka-log-dirs.sh | 用于查询各个Broker上的各个日志路径的磁盘占用情况 |
kafka-mirror-maker.sh | 用于在Kafka集群间实现数据镜像 |
kafka-preferred-replica-election.sh | 用于执行Preferred Leader选举,可以为指定的主题执行更换Leader的操作 |
kafka-producer-perf-test.sh | kafka生产者性能测试脚本 |
kafka-reassign-partitions.sh | 用于执行分区副本迁移以及副本文件路径迁移。 |
kafka-replica-verification.sh | 复制进度验证脚本 |
kafka-run-class.sh | 用于执行任何带main方法的Kafka类 |
kafka-server-start.sh | 启动kafka服务 |
kafka-server-stop.sh | 停止kafka服务 |
kafka-streams-application-reset.sh | 用于给Kafka Streams应用程序重设位移,以便重新消费数据 |
kafka-topics.sh | topic管理脚本 |
kafka-verifiable-consumer.sh | 可检验的kafka消费者 |
kafka-verifiable-producer.sh | 可检验的kafka生产者 |
trogdor.sh | Kafka的测试框架,用于执行各种基准测试和负载测试 |
zookeeper-server-start.sh | 启动zk服务 |
zookeeper-server-stop.sh | 停止zk服务 |
zookeeper-shell.sh | zk客户端 |
2. Broker服务
1、启动服务
1 | ./kafka-server-start.sh ../config/server.properties |
2、后台启动
1 | ./kafka-server-start.sh -daemon ../config/server.properties |
3、停止服务
1 | ./kafka-server-stop.sh ../config/server.properties |
3. 元数据
1、创建topic
1 | ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-test-topic |
2、查看所有topic
1 | ./kafka-topics.sh --bootstrap-server localhost:9092 --list |
从Kafka 2.2版本开始,Kafka社区推荐用–bootstrap-server参数替换–zookeeper参数用于指定Kafka Broker。集群的多个IP端口用逗号,隔开
3、查看topic详细信息
1 | kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic_name |
4、给topic增加分区(只能增加不能减少)
1 | ./kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-test-topic --partitions 10 |
5、删除topic(标记删除)
1 | ./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-test-topic |
6、永久删除需要修改配置文件:
1 | true = |
7、强制删除TOPIC的方法:
- 手动删除ZooKeeper节点/admin/delete_topics下以待删除TOPIC为名的znode。
- 手动删除TOPIC在磁盘上的分区目录。
- 在ZooKeeper中执行rmr /controller,触发Controller重选举,刷新Controller缓存。可能会造成大面积的分区Leader重选举。可以不执行,只是Controller缓存中没有清空待删除TOPIC,不影响使用。
8、修改partition副本数:
先配置一个reassign.json文件,内容:
例如my-test-topic有3个分区,原来只有一个副本,增加到2个副本
1 | {"version":1, "partitions":[ |
执行kafka-reassign-partitions脚本:
1 | ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file reassign.json --execute |
4. 生产者
1、发送消息
1 | ./kafka-console-producer.sh --broker-list localhost:9092 --topic my-test-topic |
5. 消费者
1、消费消息
1 | ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-test-topic |
2、查看消费者组提交的位移数据:
1 | ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning |