- 自动提交offset
以下实例代码展示了如何自动提交topic的offset:
public void autoOffsetCommit() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
Properties的实例props中存放的key意义:
1)bootstrap.servers表示要连接的Kafka集群中的节点,其中9092表示端口号;
2)enable.auto.commit为true,表示在auto.commit.interval.ms时间后会自动提交topic的offset,其中auto.commit.interval.ms默认值为5000ms;
3)其中foo和bar为要消费的topic名称,由group.id为test作为consumer group统一进行管理;
4)key.deserializer和value.deserializer表示指定将字节序列化为对象。
- 手动提交offset
生产环境中,需要在数据消费完全后再提交offset,也就是说在数据从kafka的topic取出来后并被逻辑处理后,才算是数据被消费掉,此时需要手动去提交topic的offset。
以下实例代码展示了如何手动提交topic的offset:
public void manualOffsetCommit() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.Stirng.Deserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
// operation to handle data
consumer.commitSync();
buffer.clear();
}
}
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.Stirng.Deserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
// operation to handle data
consumer.commitSync();
buffer.clear();
}
}
}
本方案的缺点是必须保证所有数据被处理后,才提交topic的offset。为避免数据的重复消费,可以用第三种方案,根据每个partition的数据消费情况进行提交,称之为“at-least-once”。
- 手动提交partition的offset
以下实例代码展示了如何手动提交topic中每一partition的offset:
public void manualOffsetCommitOfPartition() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.Stirng.Deserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
boolean running = true;
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + " : " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
}
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.Stirng.Deserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
boolean running = true;
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + " : " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
}
相关推荐
【标题】"Kafka Demo 两种线程消费方式"展示了在Kafka中处理消息的两种常见线程模型,这是理解Kafka消费者工作原理的关键部分。Kafka是一个分布式流处理平台,用于构建实时数据管道和应用,它允许生产者发布消息到...
本文将详细探讨如何在Spring Boot项目中基于Spring Kafka动态创建Kafka消费者。 首先,了解Kafka基本概念:Kafka是一种分布式流处理平台,用于构建实时数据管道和流应用程序。它提供了高吞吐量、低延迟的消息传递,...
在本文中,我们将深入探讨Apache Kafka的两种线程消费方式,这是基于提供的标题"Kafka Demo,两种线程消费方式"。...记住,无论选择哪种方式,都需要确保代码的正确性和线程安全性,以及对Kafka消费者API的熟练掌握。
在本文中,我们将深入探讨如何在集群模式下模拟Kafka的生产者和消费者。Kafka是一种分布式流处理平台,常用于大数据实时处理和消息传递。它由Apache开发,以其高吞吐量、低延迟和可扩展性而闻名。 首先,我们要理解...
这里的`@KafkaListener`注解使得`listen`方法成为Kafka消费者,它会监听"testTopic"上的消息。当有新消息到达时,该方法会被调用,打印出接收到的消息。 在Spring MVC的环境中,你可以将消费者方法与业务逻辑结合...
在本文中,我们将深入探讨如何将SpringBoot与Apache Kafka进行集成,实现消息生产和消费功能,同时涵盖自动分配分区和指定分区消费的策略。SpringBoot以其轻量级、快速开发的特性,深受开发者喜爱,而Kafka作为一个...
4. **offset管理**:消费者负责维护自己的消费位置,Kafka 提供自动提交和手动提交两种模式。 了解并熟练掌握这些核心概念,将有助于我们构建高效、可靠的数据处理系统。在实际应用中,根据业务需求选择合适的配置...
**大数据采集技术——Kafka消费模式详解** 在大数据领域,数据采集是整个流程的基础,而Apache Kafka作为一个高效、可扩展的分布式流处理平台,扮演着关键角色。Kafka以其高吞吐量、低延迟和持久化特性,在实时数据...
5. **消息模型**:Kafka支持发布/订阅模式,生产者发送消息到主题,消费者订阅主题来接收消息。示例代码可能展示了如何定义和使用这种模型。 6. **Offset管理**:消费者组中的每个消费者都会维护一个offset,表示其...
在本文中,我们将深入探讨如何将Spring Boot与Apache Kafka整合,以便实现发布/消费消息队列模式。Apache Kafka是一款高效、可扩展且分布式的消息中间件,而Spring Boot则是一个简化了Spring应用程序开发的框架。...
Kafka是一种高吞吐量、低延迟的消息队列系统,它允许应用程序以发布/订阅模式进行通信。在Kafka中,数据以主题(Topic)的形式存储,每个主题可以分为多个分区(Partition),确保数据的分布和负载均衡。此外,Kafka...
二、Kafka消费者 1. 添加依赖:同样需要添加Kafka的Java客户端库。 2. 创建消费者实例:配置消费者的属性,如bootstrap servers,group.id(消费组ID),key.deserializer和value.deserializer。 3. 注册监听器:...
Kafka支持两种消费模式:单消费者模式(每个分区只能有一个消费者)和消费者组模式(多个消费者可以组成一个组,共享主题的分区)。 5. **Partitions**: 主题被分成多个分区,分区内的消息按照顺序存储,且每个分区...
* 消费者组管理:Kafka 的消费者组管理机制,用于管理消费者组和消费者。 * 集群元数据管理:Kafka 的集群元数据管理机制,用于管理集群和代理节点。 6. Kafka 的应用 Kafka 的应用包括: * 数据分析平台:Kafka ...
- 为了确保数据的一致性,可以设置Kafka Connect为幂等模式,这样即使同一消息被多次处理,MySQL中的数据也不会重复。 - 另外,可以设置适当的Kafka保留策略,避免数据丢失。 7. **异常处理和故障恢复** - 如果...
### 三、Java Kafka消费者 消费者从Kafka主题中读取和处理消息。消费者的知识点包括: 1. **配置**: 与生产者类似,消费者也需要配置,如group.id(消费者组标识)和enable.auto.commit(是否自动提交偏移量)等。...
Kafka支持发布订阅模式,生产者发布消息到主题(Topics),消费者订阅感兴趣的主题并消费消息。这种模式灵活且易于扩展,适合处理多种场景的数据交换。 5. **消费者组**: Kafka引入了消费者组的概念,每个组内的...
2. **消费模式**:有两种消费模式——拉取(Pull)和推送(Push)。默认情况下,消费者主动向服务器请求数据(拉取)。Kafka Connect提供了一种自动推送模式,通过持续轮询来获取新数据。 3. **偏移量管理**:消费...
#### 三、Flume采集数据至Kafka **3.1 配置Flume Agent** 1. **创建Flume配置文件**: - 编写Flume配置文件,例如`a1.sources = r1`用于定义数据源。 2. **配置Flume Source**: - 使用`exec`或`spooling_...
7. **性能测试**:Kafkatool包含一个简单的生产者模式,可以用于测试Kafka集群的吞吐量和延迟,帮助优化集群性能。 **安装与使用Kafkatool**: 在Windows环境下,你可以下载名为“kafkatool_64bit.exe”的可执行...