`
kane_xie
  • 浏览: 144983 次
社区版块
存档分类
最新评论

KafkaConsumer0.9(三)

阅读更多

 

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<TopicPartition> list = new ArrayList<TopicPartition>();
TopicPartition tp = new TopicPartition("test_topic", 0);
list.add(tp);
consumer.assign(list);
consumer.seek(tp, 96);
// consumer.seekToBeginning(tp);
// consumer.seekToEnd(tp);
int commitInterval = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
while (true) {
	ConsumerRecords<String, String> records = consumer.poll(100);
	for (ConsumerRecord<String, String> record : records) {
		buffer.add(record);
		if (buffer.size() >= commitInterval) {
			batchProcessRecords(buffer);
			consumer.commitSync();
			buffer.clear();
		}
	}
}

 

1. 在上一篇中我们看了一个简单的自动提交offset的example,但很多情况下我们为了避免消息丢失,需要确保消息被处理完了之后才提交offset,这就需要手动地提交。在上面这个例子中,我们从kafka中抓取数据并缓存在List中,只有当消息达到一定的数量的时候我们才批量处理,假设我们使用自动提交,如果在我们还没来得及处理之前consumer就异常终止,那么有可能这些消息的offset已经被自动提交掉了,等我们的consumer重新连接上来了之后,上次没有处理完成的消息会被我们完全略过,造成数据丢失,这就是"at-most-once delivery"。解决的办法是,只有在批量处理完消息之后,才用consumer.commitSync()手动地提交offset,但这样的副作用的,假如我们正在批量处理消息,这时consumer异常终止,offset没有被提交但有部分消息已经被处理过了,当consumer重连上来时,这批没有被commit的消息会被重新处理一次,造成会有部分消息被重复处理,这就是"at-least-once delivery"。

 

2. kafka提供load balance机制来确保consumer正常工作,简单的说partitions会被分配给正在监听这个topic的多个consumers(同一个group),当其中一个consumer process异常终止,它之前所占有的partitions会被分配给其他consumer process,从而保证所有的数据都能被正常消费掉。但有时我们并不需要load balance机制,例如:

  • 为了节省网络带宽,我们只希望consumer从某一个partition抓取数据,并存储在本地。这在大数据计算或存储中是很常见的行为。在这种情况下我们并不希望另一台机器的consumer来消费这台机器的partition。
  • 如果程序本身带有HA机制,例如使用类似于YARN,Mesos等集群管理框架,那么当一个consumer终止了之后,它会被重启,或者是另一个consumer会被启动来替代它,在这种情况下我们不需要kafka重新分配partition。

要做到这点很简单,替换掉上个例子的consumer.subscribe(Arrays.asList("test_topic")),我们使用consumer.assign(list),在本例中,consumer只会消费partition0的数据。

 

3. 在之前的版本中,如果我们需要消费旧数据(已经commit offset),我们需要用SimpleConsumer。但是在0.9中,这变得更简单了。在本例中,consumer.seek(tp, 96)表示我们从partition 0的offset 96开始抓取数据,consumer.seekToBeginning(tp)表示从头开始抓取数据,consumer.seekToEnd(tp)表示从最后开始抓取数据,换句话说,只消费consumer启动之后新进来的数据。

 

 

 

 

1
1
分享到:
评论

相关推荐

    kafka-0.9.0.0-src.tgz

    Kafka的核心组件包括生产者(Producer)、消费者(Consumer)、主题(Topic)和分区(Partition)。在0.9.0.0版本中,Kafka进一步强化了其稳定性和性能。源代码中,我们可以看到这些组件的实现细节,如`Producer....

    kafka 2.10-0.9.0.1 源码

    3. **消费者**:`kafka.consumer.SimpleConsumer` 和 `kafka.consumer.ZookeeperConsumerConnector` 实现了简单的消费者模型,而 `kafka.consumer.ConsumerIterator` 负责消息迭代。 4. **生产者**:`kafka.producer...

    Kafka 0.9版本

    Kafka 0.9引入了新消费者API(Consumer Group API),这是一次重大更新,将消费者从旧的ZooKeeper依赖中解放出来,转而使用Kafka自身来协调消费者组。这一变化显著提高了消费者的可伸缩性和容错性,并简化了消费者...

    kafka_2.11-0.9.0.1

    《Apache Kafka 0.9.0.1:构建实时数据管道》 Apache Kafka 是一个分布式流处理平台,被广泛用于构建实时数据管道和流应用程序。这个名为 "kafka_2.11-0.9.0.1" 的压缩包包含了 Kafka 的一个特定版本——0.9.0.1,...

    springboot kafka整合

    **三、Spring Boot整合Kafka步骤** 1. **添加依赖**:首先,在`pom.xml`文件中添加Spring Boot和Kafka的依赖。针对0.9.0.0版本的Kafka,需要指定相应的版本号。 ```xml &lt;groupId&gt;org.springframework.boot ...

    kafka_2.11-0.9.0.1.tgz

    三、Kafka 应用场景 1. **日志聚合**:Kafka 常用于收集应用程序的日志数据,形成统一的日志流,便于分析和存储。 2. **实时处理**:与 Spark 或 Flink 等流处理引擎结合,实现数据的实时分析和处理。 3. **消息...

    PyPI 官网下载 | opentracing-python-kafka-client-0.9.tar.gz

    Kafka Python客户端是Python与Kafka通信的桥梁,提供了生产者(Producer)和消费者(Consumer)接口,让开发者能够方便地发送和接收消息。而这个OpenTracing的Kafka客户端扩展,允许我们在发送和接收消息的过程中...

    Apache Kafka 0.9.0说明文档

    Kafka 0.9引入了消费者小组的概念,允许动态平衡分区的分配。 三、Kafka 配置 1. **Kafka 服务器配置 (Broker Configs)**:Kafka服务器的配置涉及到诸如broker.id、num.partitions、replication.factor等参数,...

    disruptor-kafka-consumer:基于React流的卡夫卡消费者

    卡夫卡消费者的破坏者演示如何在Kafka 0.9 Consumer上使用LMAX Disruptor 好处-&gt;一旦先前的使用者完全处理完消息,便可以使用序列屏障来提交消息。想象力是极限。如果环形缓冲区可以容纳在L3缓存中,则处理速度会更...

    kafka-python开发文档

    KafkaConsumer的API和配置细节可以参考KafkaConsumer相关文档。消费者的迭代器会返回ConsumerRecords,这是简单命名元组,用于暴露基本消息属性,如topic、partition等。 开发者在使用kafka-python时需要注意版本...

    sarama-cluster:Sarama的集群扩展,Apache Kafka 0.9的Go客户端库[已弃用]

    集群扩展,Apache Kafka 0.9(及更高版本)的Go客户端库。 停产通知 请注意,由于已合并并发布(&gt; = v1.19.0),因此该库已正式弃用。 本机实现支持该库无法提供的各种用例。 文献资料 可通过godoc在中获得文档和...

    kafka-node:适用于Apache Kafka 0.8及更高版本的Node.js客户端

    Kafka-node是Apache Kafka 0.9及更高版本的Node.js客户端。 目录 抵消 行政 故障排除/常见问题解答 首次发送时出现KeyedPartitioner错误的HighLevelProducer 如何调试问题? 对于新使用者,如何从分区中的最新...

    kafka-consumer-producer:卡夫卡0.8.2.2的简单示例

    Java中的`KafkaConsumer`类用于实现消费者。 3. **Kafka主题**: 主题是消息的逻辑分类。生产者将消息发布到主题,消费者从主题中订阅和消费消息。 4. **Kafka Broker**: Kafka集群由一个或多个broker组成,它们...

    kafka基本指令

    #### 三、Kafka常用操作命令 Kafka提供了丰富的命令行工具来管理和操作Topic。 ##### 1. 查看当前服务器中的所有Topic 可以通过`kafka-topics.sh`命令来查看当前服务器上的所有Topic: ```bash kafka-topics.sh ...

    Kafka使用Java客户端进行访问的示例代码

    KafkaConsumer, String&gt; consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("topic1")); while (true) { ConsumerRecords, String&gt; records = consumer.poll(100); for ...

    kafka常用命令归纳

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties ``` 这些命令提供了更高级的功能和更好的兼容...

    Kafka & Mafka技术分享及讨论

    - **Kafka-0.9.x:** 增强了安全性,引入了客户端访问的读写权限控制,并对 Connect 模块进行了重构,还重新设计了 Consumer。 #### 应用领域 Kafka 的应用非常广泛,以下是一些典型的应用场景: - **Kafka+Flume...

    kafka开发运维实战分享.pptx

    - **多线程消费**:每个线程维护一个 KafkaConsumer,速度较快,但可能导致 TCP 连接开销增大,扩展性和消息顺序维护困难。 - **单个消费者与多个 worker 线程**:更易扩展,但实现复杂,可能难以维护消息顺序和...

    Kafka 实战演练 7

    - 偏移量可以存储在Zookeeper(Kafka早期版本)或Kafka内部的专用Offset主题中(Kafka 0.9及以上版本)。内部存储提供了扩展性和性能优势。 6. **幂等性与 Exactly-Once 语义** - Kafka从0.11版本开始支持幂等性...

Global site tag (gtag.js) - Google Analytics