先看一个简单的KafkaConsumer例子:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("test_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } }
我们看到0.9的consumer最大的变化是:
- 通过consumer.subscribe(Arrays.asList("test_topic"))来声明要订阅的topic,而之前的版本是用Whitelist声明。
- 通过consumer.poll(100)直接抓取消息,而之前需要遍历KafkaStream的迭代器。(这个比之前方便太多了。。。)。
- MessageAndMetadata变成了ConsumerRecords
enable.auto.commit表示已一个固定的时间间隔自动提交offsets,时间间隔由auto.commit.interval.ms控制。
bootstrap.servers表示kafka集群的broker列表。客户端会连接到这个列表中的任意一台机器,获取到整个集群的信息,因此理论上只需要在bootstrap.servers列出一台机器就够了,但是考虑到灾备,建议在bootstrap.servers中包含所有broker。
key.deserializer和value.deserializer指定了如何解析记录的key和value,在本例中我们认定key和value都是字符串。
在本例中,client端启动了一个从属于test_group的consumer来订阅test_topic。当其中一个consumer process断开之后,kafka broker会通过心跳机制自动检测到,因此集群始终能够知道哪些consumer是活着的。只要被认为是活着,这个consumer就能够从分配给它的partition中获取数据;一旦心跳丢失超过session.timeout.ms,consumer会被认为死掉,它所占有的partition将会被分配给别的process。
相关推荐
Kafka的核心组件包括生产者(Producer)、消费者(Consumer)、主题(Topic)和分区(Partition)。在0.9.0.0版本中,Kafka进一步强化了其稳定性和性能。源代码中,我们可以看到这些组件的实现细节,如`Producer....
3. **消费者**:`kafka.consumer.SimpleConsumer` 和 `kafka.consumer.ZookeeperConsumerConnector` 实现了简单的消费者模型,而 `kafka.consumer.ConsumerIterator` 负责消息迭代。 4. **生产者**:`kafka.producer...
Kafka 0.9引入了新消费者API(Consumer Group API),这是一次重大更新,将消费者从旧的ZooKeeper依赖中解放出来,转而使用Kafka自身来协调消费者组。这一变化显著提高了消费者的可伸缩性和容错性,并简化了消费者...
这个 "kafka_2.11-0.9.0.1" 包解压后,会包含 Kafka 的二进制文件、配置文件、示例脚本等,用户可以直接运行这些文件来启动 Kafka 服务,进行生产环境的部署或者本地测试。在使用时,用户需要根据实际需求配置 `...
**二、Kafka客户端版本0.9.0.0** Kafka的0.9.0.0版本引入了许多改进和新特性,例如支持更高效的ISR(In-Sync Replicas)策略,增强了消费者的API,以及对SASL和SSL安全性的增强。这些改进使得Kafka更适合大规模生产...
二、Kafka 0.9.0.1 的关键改进 1. **消费者API更新**:0.9 版本的消费者 API 更加稳定,提供了更好的消费控制,包括自动或手动提交偏移量,以及更灵活的配置选项。 2. **Zookeeper依赖减少**:虽然 Kafka 仍然使用...
Kafka Python客户端是Python与Kafka通信的桥梁,提供了生产者(Producer)和消费者(Consumer)接口,让开发者能够方便地发送和接收消息。而这个OpenTracing的Kafka客户端扩展,允许我们在发送和接收消息的过程中...
Kafka 0.9引入了消费者小组的概念,允许动态平衡分区的分配。 三、Kafka 配置 1. **Kafka 服务器配置 (Broker Configs)**:Kafka服务器的配置涉及到诸如broker.id、num.partitions、replication.factor等参数,...
卡夫卡消费者的破坏者演示如何在Kafka 0.9 Consumer上使用LMAX Disruptor 好处->一旦先前的使用者完全处理完消息,便可以使用序列屏障来提交消息。想象力是极限。如果环形缓冲区可以容纳在L3缓存中,则处理速度会更...
KafkaConsumer的API和配置细节可以参考KafkaConsumer相关文档。消费者的迭代器会返回ConsumerRecords,这是简单命名元组,用于暴露基本消息属性,如topic、partition等。 开发者在使用kafka-python时需要注意版本...
集群扩展,Apache Kafka 0.9(及更高版本)的Go客户端库。 停产通知 请注意,由于已合并并发布(> = v1.19.0),因此该库已正式弃用。 本机实现支持该库无法提供的各种用例。 文献资料 可通过godoc在中获得文档和...
Kafka-node是Apache Kafka 0.9及更高版本的Node.js客户端。 目录 抵消 行政 故障排除/常见问题解答 首次发送时出现KeyedPartitioner错误的HighLevelProducer 如何调试问题? 对于新使用者,如何从分区中的最新...
Java中的`KafkaConsumer`类用于实现消费者。 3. **Kafka主题**: 主题是消息的逻辑分类。生产者将消息发布到主题,消费者从主题中订阅和消费消息。 4. **Kafka Broker**: Kafka集群由一个或多个broker组成,它们...
#### 二、关闭Kafka服务 同样地,可以在每个节点上关闭Kafka服务: ```bash nohup /opt/modules/kafka_2.11-0.9.0.1/bin/kafka-server-stop.sh /opt/modules/kafka_2.11-0.9.0.1/config/server.properties & ``` ...
#### 二、Kafka服务管理 ##### 1. 启动Kafka服务 启动Kafka服务的基本命令如下: ```bash bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 & ``` 其中`config/server.properties`为Kafka...
KafkaConsumer, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("topic1")); while (true) { ConsumerRecords, String> records = consumer.poll(100); for ...
- **Kafka-0.9.x:** 增强了安全性,引入了客户端访问的读写权限控制,并对 Connect 模块进行了重构,还重新设计了 Consumer。 #### 应用领域 Kafka 的应用非常广泛,以下是一些典型的应用场景: - **Kafka+Flume...
- **多线程消费**:每个线程维护一个 KafkaConsumer,速度较快,但可能导致 TCP 连接开销增大,扩展性和消息顺序维护困难。 - **单个消费者与多个 worker 线程**:更易扩展,但实现复杂,可能难以维护消息顺序和...
- 偏移量可以存储在Zookeeper(Kafka早期版本)或Kafka内部的专用Offset主题中(Kafka 0.9及以上版本)。内部存储提供了扩展性和性能优势。 6. **幂等性与 Exactly-Once 语义** - Kafka从0.11版本开始支持幂等性...