Kafka 基础配置与使用完全指南

Kafka 基础配置与使用完全指南

Kafka 是由 LinkedIn 开源的分布式消息系统,被广泛应用于大数据领域。本指南将详细介绍 Kafka 的配置、使用和最佳实践,帮助你快速掌握这一强大的消息队列工具。

一、Kafka 核心概念

Kafka 的核心概念包括 Producer、Consumer、Broker、Topic、Partition 和 Consumer Group。

1. Producer(生产者)

负责向 Kafka 集群发送消息的应用程序。

2. Consumer(消费者)

从 Kafka 集群订阅并消费消息的应用程序。

3. Broker(代理)

Kafka 集群中的服务器节点,负责存储和管理消息。

4. Topic(主题)

消息的逻辑分类,类似数据库中的表。

5. Partition(分区)

Topic 的物理拆分,用于水平扩展。

6. Consumer Group(消费者组)

一组协同工作的消费者,共同消费一个 Topic 的消息。

Kafka 架构

二、Kafka 安装配置

1. 环境要求

  • Java 8 或更高版本
  • Linux/Unix 系统
  • 足够的磁盘空间

2. 下载 Kafka

cd /opt\nwget https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz\ntar -xzf kafka_2.13-3.7.0.tgz\nmv kafka_2.13-3.7.0 kafka\ncd kafka

3. 配置 server.properties

# 服务器 ID\nbroker.id=0\n\n# 日志目录\nlog.dirs=/opt/kafka/logs\n\n# 默认副本数\nnum.partitions=3\n\n# 默认保留时间\nlog.retention.hours=168\n\n# 监听地址\nlisteners=PLAINTEXT://0.0.0.0:9092\n\n# advertised 地址\nadvertised.listeners=PLAINTEXT://localhost:9092\n\n# 副本因子\ndefault.replication.factor=1\n\n# 最小同步副本数\nmin.insync.replicas=1\n\n# ZooKeeper 连接\nzookeeper.connect=localhost:2181\n\n# 会话超时\nzookeeper.session.timeout.ms=18000

4. 启动 Kafka

# 先启动 ZooKeeper\nbin/zookeeper-server-start.sh config/zookeeper.properties\n\n# 再启动 Kafka\nbin/kafka-server-start.sh config/server.properties

三、Kafka 命令行操作

1. 创建 Topic

# 创建 Topic,3 个分区,副本数 1\nbin/kafka-topics.sh --create --topic test-topic \\n  --bootstrap-server localhost:9092 \\n  --partitions 3 --replication-factor 1\n\n# 查看 Topic 详情\nbin/kafka-topics.sh --describe --topic test-topic \\n  --bootstrap-server localhost:9092

2. 发送消息

# 开启一个 producer 终端\ncat > messages.txt << EOF\nMessage 1\nMessage 2\nMessage 3\nEOF\n\n# 发送消息到 Topic\nbin/kafka-console-producer.sh --topic test-topic \\n  --bootstrap-server localhost:9092 < messages.txt

3. 消费消息

# 消费最新位置开始\nbin/kafka-console-consumer.sh --topic test-topic \\n  --bootstrap-server localhost:9092 --from-beginning

4. 查看消息偏移量

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \\n  --describe --group my-group

5. 删除 Topic

# 启用 topic 删除功能后执行\ncat >> config/server.properties << EOF\ndelete.topic.enable=true\nEOF\n\nbin/kafka-topics.sh --delete --topic test-topic \\n  --bootstrap-server localhost:9092

四、Kafka Java 客户端

1. Maven 依赖

<dependency>\n    <groupId>org.apache.kafka</groupId>\n    <artifactId>kafka-clients</artifactId>\n    <version>3.7.0</version>\n</dependency>

2. Producer 配置

Properties props = new Properties();\nprops.put("bootstrap.servers", "localhost:9092");\nprops.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");\nprops.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");\nprops.put("acks", "all");  // 确认级别\nprops.put("retries", Integer.MAX_VALUE);\nprops.put("batch.size", 16384);  // 批量大小\nprops.put("buffer.memory", 33554432);  // 缓冲内存

3. Producer 示例

public class KafkaProducerDemo {\n    public static void main(String[] args) {\n        Properties props = new Properties();\n        props.put("bootstrap.servers", "localhost:9092");\n        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");\n        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");\n        \n        Producer producer = new KafkaProducer(props);\n        \n        for (int i = 0; i < 10; i++) {\n            ProducerRecord record = \n                new ProducerRecord("test-topic", "key-" + i, "message-" + i);\n            \n            producer.send(record, (metadata, exception) -> {\n                if (exception == null) {\n                    System.out.println("发送成功:partition=" + \n                        metadata.partition() + ", offset=" + metadata.offset());\n                } else {\n                    exception.printStackTrace();\n                }\n            });\n        }\n        \n        producer.close();\n    }\n}

4. Consumer 配置

Properties props = new Properties();\nprops.put("bootstrap.servers", "localhost:9092");\nprops.put("group.id", "my-group");\nprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");\nprops.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");\nprops.put("auto.offset.reset", "earliest");  // 从头开始消费\nprops.put("enable.auto.commit", "true");\nprops.put("auto.commit.interval.ms", "1000");

5. Consumer 示例

public class KafkaConsumerDemo {\n    public static void main(String[] args) {\n        Properties props = new Properties();\n        props.put("bootstrap.servers", "localhost:9092");\n        props.put("group.id", "my-group");\n        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");\n        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");\n        props.put("auto.offset.reset", "earliest");\n        props.put("enable.auto.commit", "true");\n        props.put("auto.commit.interval.ms", "1000");\n        \n        KafkaConsumer consumer = new KafkaConsumer(props);\n        consumer.subscribe(Collections.singletonList("test-topic"));\n        \n        try {\n            while (true) {\n                ConsumerRecords records = consumer.poll(Duration.ofMillis(100));\n                for (ConsumerRecord record : records) {\n                    System.out.printf("offset=%d, key=%s, value=%s, partition=%d%n",\n                        record.offset(), record.key(), record.value(), record.partition());\n                }\n            }\n        } finally {\n            consumer.close();\n        }\n    }\n}

五、高级特性

1. 分区策略

Kafka 的分区策略决定了消息发送到哪个分区。

// 默认策略:基于 key 的 hash\n// 如果没有 key,则轮询分发\n\n// 自定义分区器\nclass CustomPartitioner implements Partitioner {\n    @Override\n    public int partition(String topic, Object key, byte[] keyBytes, \n                         Object value, byte[] valueBytes, Cluster cluster) {\n        // 自定义分区逻辑\n        List partitions = cluster.partitionsForTopic(topic);\n        return partitions.get(0).partition();\n    }\n}

2. 消费者组

消费者组实现消息的负载均衡。

Properties props = new Properties();\nprops.put("bootstrap.servers", "localhost:9092");\nprops.put("group.id", "my-consumer-group");\n// 同一组内的消费者不会重复消费同一消息

3. 偏移量管理

// 手动提交偏移量\nconsumer.commitSync();  // 同步提交\nconsumer.commitAsync(); // 异步提交\n\n// 提交指定偏移量\nMap offsets = new HashMap();\noffsets.put(new TopicPartition("test-topic", 0), new OffsetAndMetadata(100));\nconsumer.commitSync(offsets);

4. 重试和容错

// Producer 重试配置\nprops.put("retries", Integer.MAX_VALUE);\nprops.put("retry.backoff.ms", 100);\n\n// Consumer 重试配置\nprops.put("max.poll.records", 500);\nprops.put("max.poll.interval.ms", 300000);\nprops.put("session.timeout.ms", 30000);

六、监控和管理

1. 查看 Broker 状态

bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

2. 查看 Topic 列表

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

3. 查看消费者组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

4. 重置消费者偏移量

# 重置到最早\nbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \\n  --group my-group --topic test-topic --reset-offsets --to-earliest --execute\n\n# 重置到最新\nbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \\n  --group my-group --topic test-topic --reset-offsets --to-latest --execute\n\n# 重置到特定时间\nbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \\n  --group my-group --topic test-topic --reset-offsets --to-datetime 2026-04-24T00:00:00 --execute

5. 监控指标

指标 说明
BytesInPerSec 每秒流入的消息字节数
BytesOutPerSec 每秒流出的消息字节数
MsgRateIn 每秒流入的消息数
MsgRateOut 每秒流出的消息数
UnderReplicatedPartitions 未同步的分区数

七、性能优化

1. Producer 优化

props.put("batch.size", 32768);           // 批量大小\nprops.put("linger.ms", 5);                // 等待时间\nprops.put("buffer.memory", 67108864);     // 缓冲内存\nprops.put("compression.type", "lz4");     // 压缩类型

2. Consumer 优化

props.put("max.poll.records", 1000);      // 最大拉取条数\nprops.put("fetch.min.bytes", 1048576);     // 最小拉取字节数\nprops.put("fetch.max.wait.ms", 500);       // 最大等待时间

3. 服务器优化

# 增加缓冲区\nsocket.send.buffer.bytes=102400\nsocket.receive.buffer.bytes=102400\n\n# 日志配置\nlog.segment.bytes=1073741824\nlog.retention.check.interval.ms=300000\n\n# 线程池\nnum.io.threads=8\nnum.network.threads=3

八、最佳实践

1. Topic 命名规范

业务线_模块_类型\n例如:order_service_orders_create

2. 分区数规划

  • 根据并发消费能力决定
  • 建议设置为 CPU 核数的 2-3 倍
  • 不要频繁调整分区数

3. 消息大小限制

# 消息最大大小\nmessage.max.bytes=1048576\n\n# Producer 最大批次\nmax.request.size=1048576

4. 数据安全

  • 启用 SSL 加密传输
  • 配置 SASL 认证
  • 设置访问权限控制

5. 集群部署

# 至少 3 个 Broker 以保证高可用\ndefault.replication.factor=3\nmin.insync.replicas=2

九、常见问题

1. 消息丢失

可能原因:

  • Producer acks=0
  • Consumer 提交偏移量过早
  • Broker 磁盘故障

解决方案:

  • Producer 设置 acks=all
  • Consumer 手动提交偏移量
  • 设置合适的副本数

2. 消息重复

可能原因:

  • Consumer 提交偏移量失败
  • Producer 重试导致

解决方案:

  • 实现幂等性 Consumer
  • 开启 Producer 幂等

3. 消费延迟

可能原因:

  • Consumer 处理能力不足
  • 网络延迟
  • Broker 性能瓶颈

解决方案:

  • 增加 Consumer 实例
  • 优化消费逻辑
  • 升级硬件

十、总结

Kafka 是一个高性能、可扩展的分布式消息系统。通过合理的配置和优化,可以满足各种业务场景的需求。掌握 Kafka 的核心概念、配置方法和最佳实践,能够帮助构建稳定高效的消息驱动系统。

#Kafka #消息队列 #分布式系统 #大数据 #编程教程

标签

发表评论