`
kane_xie
  • 浏览: 144591 次
社区版块
存档分类
最新评论

KafkaConsumer0.9(二)

阅读更多

先看一个简单的KafkaConsumer例子:

 

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test_topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
    }
}

 

我们看到0.9的consumer最大的变化是:

 

  • 通过consumer.subscribe(Arrays.asList("test_topic"))来声明要订阅的topic,而之前的版本是用Whitelist声明。
  • 通过consumer.poll(100)直接抓取消息,而之前需要遍历KafkaStream的迭代器。(这个比之前方便太多了。。。)。
  • MessageAndMetadata变成了ConsumerRecords

enable.auto.commit表示已一个固定的时间间隔自动提交offsets,时间间隔由auto.commit.interval.ms控制。

 

bootstrap.servers表示kafka集群的broker列表。客户端会连接到这个列表中的任意一台机器,获取到整个集群的信息,因此理论上只需要在bootstrap.servers列出一台机器就够了,但是考虑到灾备,建议在bootstrap.servers中包含所有broker。

 

key.deserializer和value.deserializer指定了如何解析记录的key和value,在本例中我们认定key和value都是字符串。

 

在本例中,client端启动了一个从属于test_group的consumer来订阅test_topic。当其中一个consumer process断开之后,kafka broker会通过心跳机制自动检测到,因此集群始终能够知道哪些consumer是活着的。只要被认为是活着,这个consumer就能够从分配给它的partition中获取数据;一旦心跳丢失超过session.timeout.ms,consumer会被认为死掉,它所占有的partition将会被分配给别的process。

1
1
分享到:
评论

相关推荐

    kafka-0.9.0.0-src.tgz

    Kafka的核心组件包括生产者(Producer)、消费者(Consumer)、主题(Topic)和分区(Partition)。在0.9.0.0版本中,Kafka进一步强化了其稳定性和性能。源代码中,我们可以看到这些组件的实现细节,如`Producer....

    kafka 2.10-0.9.0.1 源码

    3. **消费者**:`kafka.consumer.SimpleConsumer` 和 `kafka.consumer.ZookeeperConsumerConnector` 实现了简单的消费者模型,而 `kafka.consumer.ConsumerIterator` 负责消息迭代。 4. **生产者**:`kafka.producer...

    Kafka 0.9版本

    Kafka 0.9引入了新消费者API(Consumer Group API),这是一次重大更新,将消费者从旧的ZooKeeper依赖中解放出来,转而使用Kafka自身来协调消费者组。这一变化显著提高了消费者的可伸缩性和容错性,并简化了消费者...

    kafka_2.11-0.9.0.1

    这个 "kafka_2.11-0.9.0.1" 包解压后,会包含 Kafka 的二进制文件、配置文件、示例脚本等,用户可以直接运行这些文件来启动 Kafka 服务,进行生产环境的部署或者本地测试。在使用时,用户需要根据实际需求配置 `...

    springboot kafka整合

    **二、Kafka客户端版本0.9.0.0** Kafka的0.9.0.0版本引入了许多改进和新特性,例如支持更高效的ISR(In-Sync Replicas)策略,增强了消费者的API,以及对SASL和SSL安全性的增强。这些改进使得Kafka更适合大规模生产...

    kafka_2.11-0.9.0.1.tgz

    二、Kafka 0.9.0.1 的关键改进 1. **消费者API更新**:0.9 版本的消费者 API 更加稳定,提供了更好的消费控制,包括自动或手动提交偏移量,以及更灵活的配置选项。 2. **Zookeeper依赖减少**:虽然 Kafka 仍然使用...

    PyPI 官网下载 | opentracing-python-kafka-client-0.9.tar.gz

    Kafka Python客户端是Python与Kafka通信的桥梁,提供了生产者(Producer)和消费者(Consumer)接口,让开发者能够方便地发送和接收消息。而这个OpenTracing的Kafka客户端扩展,允许我们在发送和接收消息的过程中...

    Apache Kafka 0.9.0说明文档

    Kafka 0.9引入了消费者小组的概念,允许动态平衡分区的分配。 三、Kafka 配置 1. **Kafka 服务器配置 (Broker Configs)**:Kafka服务器的配置涉及到诸如broker.id、num.partitions、replication.factor等参数,...

    disruptor-kafka-consumer:基于React流的卡夫卡消费者

    卡夫卡消费者的破坏者演示如何在Kafka 0.9 Consumer上使用LMAX Disruptor 好处-&gt;一旦先前的使用者完全处理完消息,便可以使用序列屏障来提交消息。想象力是极限。如果环形缓冲区可以容纳在L3缓存中,则处理速度会更...

    kafka-python开发文档

    KafkaConsumer的API和配置细节可以参考KafkaConsumer相关文档。消费者的迭代器会返回ConsumerRecords,这是简单命名元组,用于暴露基本消息属性,如topic、partition等。 开发者在使用kafka-python时需要注意版本...

    sarama-cluster:Sarama的集群扩展,Apache Kafka 0.9的Go客户端库[已弃用]

    集群扩展,Apache Kafka 0.9(及更高版本)的Go客户端库。 停产通知 请注意,由于已合并并发布(&gt; = v1.19.0),因此该库已正式弃用。 本机实现支持该库无法提供的各种用例。 文献资料 可通过godoc在中获得文档和...

    kafka-node:适用于Apache Kafka 0.8及更高版本的Node.js客户端

    Kafka-node是Apache Kafka 0.9及更高版本的Node.js客户端。 目录 抵消 行政 故障排除/常见问题解答 首次发送时出现KeyedPartitioner错误的HighLevelProducer 如何调试问题? 对于新使用者,如何从分区中的最新...

    kafka-consumer-producer:卡夫卡0.8.2.2的简单示例

    Java中的`KafkaConsumer`类用于实现消费者。 3. **Kafka主题**: 主题是消息的逻辑分类。生产者将消息发布到主题,消费者从主题中订阅和消费消息。 4. **Kafka Broker**: Kafka集群由一个或多个broker组成,它们...

    kafka基本指令

    #### 二、关闭Kafka服务 同样地,可以在每个节点上关闭Kafka服务: ```bash nohup /opt/modules/kafka_2.11-0.9.0.1/bin/kafka-server-stop.sh /opt/modules/kafka_2.11-0.9.0.1/config/server.properties & ``` ...

    kafka常用命令归纳

    #### 二、Kafka服务管理 ##### 1. 启动Kafka服务 启动Kafka服务的基本命令如下: ```bash bin/kafka-server-start.sh config/server.properties &gt; /dev/null 2&gt;&1 & ``` 其中`config/server.properties`为Kafka...

    Kafka使用Java客户端进行访问的示例代码

    KafkaConsumer, String&gt; consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("topic1")); while (true) { ConsumerRecords, String&gt; records = consumer.poll(100); for ...

    Kafka & Mafka技术分享及讨论

    - **Kafka-0.9.x:** 增强了安全性,引入了客户端访问的读写权限控制,并对 Connect 模块进行了重构,还重新设计了 Consumer。 #### 应用领域 Kafka 的应用非常广泛,以下是一些典型的应用场景: - **Kafka+Flume...

    kafka开发运维实战分享.pptx

    - **多线程消费**:每个线程维护一个 KafkaConsumer,速度较快,但可能导致 TCP 连接开销增大,扩展性和消息顺序维护困难。 - **单个消费者与多个 worker 线程**:更易扩展,但实现复杂,可能难以维护消息顺序和...

    Kafka 实战演练 7

    - 偏移量可以存储在Zookeeper(Kafka早期版本)或Kafka内部的专用Offset主题中(Kafka 0.9及以上版本)。内部存储提供了扩展性和性能优势。 6. **幂等性与 Exactly-Once 语义** - Kafka从0.11版本开始支持幂等性...

Global site tag (gtag.js) - Google Analytics