`

java代码实现kafka消费端consumer的from-beginning功能

 
阅读更多
只需要在代码中加入

props.put("auto.offset.reset", "earliest");
props.put("group.id", UUID.randomUUID().toString());

 完整例子

//1、准备配置文件
	    Properties props = new Properties();
	    props.put("bootstrap.servers", "hadoop1:9092");
	    props.put("acks", "all");
	    props.put("retries", 0);
	    props.put("batch.size", 16384);
	    props.put("linger.ms", 1);
	    props.put("buffer.memory", 33554432);
	    
	    props.put("group.id", "test");
	    props.put("enable.auto.commit", "true");
	    props.put("auto.commit.interval.ms", "1000");
	    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	    
	    props.put("auto.offset.reset", "earliest");
	    props.put("group.id", UUID.randomUUID().toString());
	    
	    
	    
	     
	 // 2、创建KafkaConsumer
	    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
	    // 3、订阅数据,这里的topic可以是多个
	    kafkaConsumer.subscribe(Arrays.asList("yun03"));
	    // 4、获取数据
	    while (true) {
	        ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
	        for (ConsumerRecord<String, String> record : records) {
	            System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());
	        }
	    }

 

分享到:
评论

相关推荐

    kafka_2.12-2.4.1.zip

    - 使用`./kafka-console-producer.sh --broker-list localhost:9092 --topic test`和`./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning`进行消息的生产和消费,操作...

    kafka_2.9.2-0.8.2.1.tgz

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning produce启动的时候参数使用的是kafka的端口而consumer启动的时候使用的是zookeeper的端口; 单机连通性能测试 运行...

    最新版kafka kafka_2.12-2.5.1.tgz

    - 消费消息:`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning` 7. **最佳实践** - 为了保证高可用性,推荐至少配置 3 个 broker。 - 合理设置分区数和...

    kafka-2.12-3.6.1.tgz

    /usr/local/kafka_2.12-3.6.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 这将从主题的开头开始消费所有消息。 九、扩展与优化 Kafka支持集群部署,...

    Kafka安装包

    $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` **五、Kafka进阶** - **集群部署**:为了提高可用性和容错性,需要在多个节点上部署Kafka...

    kafka_2.11-2.2.2.tgz

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` `--from-beginning`参数表示从头开始消费。 十、管理Kafka Kafka提供了丰富的命令行工具,如`kafka-...

    kafka-2.11-2.4.1安装包以及linux(centos7)安装kafka-2.11-2.4.1详细文档

    sudo -u kafka /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ``` 以上就是安装Kafka 2.4.1在Linux CentOS 7上的详细步骤。为了保持Kafka的...

    高级Java人才培训专家-1-第一章 kafka基础

    from kafka import KafkaConsumer consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092') for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, ...

    kafka_2.11-0.11.0.3.zip

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ``` 九、监控与管理 Kafka提供了命令行工具用于查看主题信息、管理消费者组等。例如,列出所有主题: ```bash ./...

    kafka-2.10-0.10.2.1.zip

    - 消费消息:`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning`。 5. **API使用**: - Java API允许开发者直接在应用程序中创建生产者和消费者对象,发送和...

    最新版windows kafka_2.12-2.5.0.zip

    - 消费消息:`bin\windows\kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning`,从头开始消费消息。 **Kafka的特性与使用场景** - **消息持久化**:Kafka将消息...

    kafka环境搭建

    Kafka API 提供了丰富的接口供开发者使用,如 `ProducerRecord` 用于创建消息,`KafkaConsumer` 类用于订阅主题并消费消息。 在实际开发中,你可能需要处理更复杂的情况,比如配置多个 broker 构建分布式 Kafka ...

    kafka集群搭建及测试.docx

    3. 消费消息,运行`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test-topic`,查看消息是否被正确消费。 **6. 集群监控与管理** 为了确保Kafka集群的稳定运行,你...

    kafka_2.10-0.10.1.0

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 这将从"my-topic"的开头开始消费所有消息。 六、测试与调试 在本地开发环境中,我们可以快速地创建多个...

    21.消息中间件之Kafka入门讲解

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --group my-group ``` ### 4. Spring Cloud Kafka集成 Spring Cloud Kafka为Spring应用提供了与Kafka的无缝...

    kafka-的安装包!

    bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092 ``` 9. **集群部署** 对于生产环境,通常需要部署多个 Kafka 节点,形成一个高可用的集群。此时需要配置...

    linux下kafka

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ``` 这将开始从`test`主题的开头消费消息。 **监控和管理** Kafka提供了一些命令行工具用于管理主题、检查集群...

    Flume采集数据到Kafka,然后从kafka取数据存储到HDFS的方法思路和完整步骤

    - 启动消费者:`kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning`。 #### 三、Flume采集数据至Kafka **3.1 配置Flume Agent** 1. **创建Flume配置文件**: ...

Global site tag (gtag.js) - Google Analytics