3.2.1.1 消息消费的demo代码
消息消费的demo代码如下:
package com.tuozixuan.kafka.demo;
import java.util.Arrays; import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;
publicclass ConsumerTest {
publicstaticvoid main(String[] args) {
String topicName = "test"; String groupId = "test-group";
Properties props = new Properties(); // 必须指定的属性 props.put("bootstrap.servers", "10.4.23.159:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", groupId);
// 可选属性 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); // 从最早的消息开始读取
// 创建consumer实例,订阅topic KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topicName));
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset:%d key:%s value:%s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }
|
构造consumer需要下面6个步骤:
- 构造一个java.util.Properties对象,至少指定bootstrap.servers、key.deserializer、value.deserializer和group.id的值。
- 使用上一步创建的Properties实例构造KafkaConsumer对象。
- 调用 KafkaConsumer.subscribe方法订阅consumer group感兴趣的topic列表。
- 循环调用KafkaConsumer.poll方法获取封装在ConsumerRecord的topic消息。
- 处理获取到的ConsumerRecord对象。
- 关闭KafkaConsumer。
3.2.1.2 构造Properties对象
在创建的Properties对象中,必须指定的参数有4个:bootstrap.servers、key.deserializer、value.deserializer和group.id的值。参数的具体含义见3.2.2 consumer主要参数
3.2.1.3 构造KafkaConsumer对象
创建KafkaConsumer实例代码如下:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
创建KafkaConsumer也可同时指定key和value的deseralizer,若采用这种方式,则不需要在Properties中指定key.deserializer和value.deserializer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props,new StringDeserializer(),new StringDeserializer()); |
3.2.1.4 订阅topic列表
订阅topic的代码如下:
consumer.subscribe(Arrays.asList("topic1","topic2","topic3")); |
该方法还支持正则表达式。假设consumer group要消费所有以kafka开头的topic,则可以如此订阅:
consumer.subscribe(Pattern.compile("kafka.*"),new NoOpConsumerRebalanceListener()); |
注意:subscribe方法不是增量式的,后续的subscribe调用会完全覆盖之前的订阅语句。
3.2.1.5 获取消息
consumer使用KafkaConsumer.poll方法从订阅topic中并行地获取多个分区的消息。为了实现这一点,新版本的consumer的poll方法使用了类似linux的select I/O机制--所有相关的事件(包括rebalance、获取消息等)都发生在一个事件循环(event loop)中。这样consumer端只使用一个线程就能够完成所有类型的I/O操作。
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); // 执行具体的消费逻辑 } } finally { consumer.close(); } |
上面代码中的1000代表超时设置(timeout),通常情况下如果consumer拿到了足够多的可用数据,那么它可以立即从该方法返回;但若当前没有足够多的数据可供返回,consumer会处于阻塞状态。这个超时参数即控制阻塞的最大时间。这里的1000表示即使没有那么多数据,consumer最多也不要等待超过1秒的时间。
若用户有定时方面的需求,那么根据需求设定timeout是一个不错的选择。否则,设定一个比较大的值甚至Integer.MAX_VALUE,是不错的建议。
3.2.1.6 处理ConsumerRecord对象
poll调用返回ConsumerRecord封装的Kafka消息,拿到这些消息后consumer可以处理自己的业务逻辑。
从Kafka consumer的角度而言,poll方法返回即认为consumer成功消费了消息。如果发现poll返回消息的速度过慢,那么可以调节相应的参数来提升poll方法的效率;若消息的业务级处理逻辑过慢,则应该考虑简化处理逻辑或者把处理逻辑放入单独的线程执行。
3.2.1.7 关闭consumer
consumer程序结束后一定要显式关闭consumer以释放KafkaConsumer运行过程中占用的各种系统资源(比如线程资源、内存、Socket连接等)。
KafkaConsumer.close():关闭consumer并最多等待30秒
KafkaConsumer.close(timeout):关闭consumer并最多等待给定的timeout秒。
相关推荐
1. **创建Consumer实例**:首先,我们需要配置一个`Properties`对象,设置必要的参数如bootstrap servers、group id等,然后使用`KafkaConsumer`类的构造函数创建消费者实例。 2. **订阅主题**:消费者通过调用`...
《Go-consumergroup:构建基于Golang的Kafka消费者库》 在现代软件开发中,消息队列系统如Apache Kafka扮演着至关重要的角色,它提供了高效、可靠的异步通信能力。而Go语言以其简洁的语法和高性能特性,成为了编写...
【描述】"kafka_hdfs_consumer实现"意味着我们需要理解如何构建一个消费者应用,该应用能够连接到Kafka,获取消息,然后将这些消息写入HDFS。这通常涉及以下几个步骤: 1. **Kafka消费者配置**:`file2kafka.conf`...
【标题】"phpkafkaconsumer"是一个专门为PHP设计的Kafka消费者库,它不仅提供了基本的Kafka消息消费功能,还特别支持了消费者组(Consumer Group)和再平衡(Rebalance)机制。Kafka是一种分布式流处理平台,常用于...
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} import java.util.Properties val props = new Properties() props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...
首先,了解Kafka基本概念:Kafka是一种分布式流处理平台,用于构建实时数据管道和流应用程序。它提供了高吞吐量、低延迟的消息传递,支持发布/订阅模式,适用于大数据实时处理场景。 Spring Kafka是Spring ...
KafkaConsumer, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords, String> records = consumer.poll(Duration.ofMillis(100)); ...
**Kafka插件详解** ...总的来说,Kafka插件极大地丰富了Kafka的使用场景,使得开发者可以灵活地构建基于Kafka的各种数据处理系统。正确理解和使用Kafka插件,是构建高效、可靠数据基础设施的关键。
KafkaConsumer, String> consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList("myTopic")); // 订阅主题 while (true) { ConsumerRecords, String> records = consumer.poll...
在本文中,我们将深入探讨如何在Spring Boot应用中使用KafkaConsumer来消费Apache Kafka主题中的消息。Apache Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用程序。Spring Boot简化了Kafka消费者的...
通过以上知识点的整合,我们可以构建一个高效的TCP长连接服务器,它将接收到的TCP数据流写入Kafka,并通过Kafka的批量消费功能进行高效处理。这种架构适用于实时数据传输、日志收集、物联网设备通信等多种场景。
2. **消费者(Consumer)**:从Kafka获取消息,可以使用`KafkaConsumer`类来实现。消费者可以订阅一个或多个主题,并且可以采用分组消费,确保每个消息仅被组内的一个消费者处理。 3. **主题(Topic)**:Kafka中的...
* 消费者(Consumer):Kafka 中的消费者角色,用于消费消息。 * Broker:Kafka 中的代理节点,用于处理和存储消息。 * 集群(Cluster):Kafka 中的集群概念,用于分布式存储和处理消息。 3. Kafka 的优点 Kafka ...
- **Consumer**: 通过 `@KafkaListener` 注解定义消费方法,消息会被自动转换并传递给该方法。 **4. KafkaTemplate 和 ListenerContainer** - **KafkaTemplate**: 是一个工具类,提供了丰富的 API 用于发送消息,...
private KafkaConsumer kafkaConsumer; @Scheduled(cron = "0 0/5 * * * ?") // 每5分钟执行一次 public void consumeMessage() { kafkaConsumer.listen(); } } ``` 四、项目源码运行 下载`kafka_demo.rar`...
在Spring Boot框架中集成Kafka,可以简化开发流程,使得构建基于Kafka的应用变得更加便捷。 1. **Spring Boot与Kafka的集成**: - Spring Boot提供了`spring-kafka`模块,简化了Kafka的配置和使用。只需添加相关...
综上所述,kettle kafka 消息者插件是Pentaho ETL工具集中的一个重要组成部分,它有效地打通了Kafka和Pentaho之间的数据通道,为企业构建实时数据处理和分析的解决方案提供了便利。用户可以根据实际需求定制插件配置...
这里,我们创建了`KafkaConsumer`类,配置了消费组ID和自动重置偏移量的策略。`StartConsuming`方法启动消费过程,当接收到消息时,会调用传入的`handleMessage`回调函数。 现在,我们可以在主程序中创建生产者和...
《Rust语言实现Kafka消费者实例详解》 在软件开发中,消息队列(Message Queue)作为一...通过理解并实践`rust-kafka-consumer-example`,开发者可以更好地利用Rust的强大能力处理大数据流和构建高可用的分布式系统。
KafkaConsumer, String> consumer = new KafkaConsumer(consumerProps); ``` 4. **订阅主题**:消费者可以使用`subscribe()`方法订阅一个或多个主题。 ```java consumer.subscribe(Arrays.asList("my-topic")); ``...