只需要在代码中加入
props.put("auto.offset.reset", "earliest");
props.put("group.id", UUID.randomUUID().toString());
props.put("group.id", UUID.randomUUID().toString());
完整例子
//1、准备配置文件 Properties props = new Properties(); props.put("bootstrap.servers", "hadoop1:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); props.put("group.id", UUID.randomUUID().toString()); // 2、创建KafkaConsumer KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); // 3、订阅数据,这里的topic可以是多个 kafkaConsumer.subscribe(Arrays.asList("yun03")); // 4、获取数据 while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value()); } }
相关推荐
- 使用`./kafka-console-producer.sh --broker-list localhost:9092 --topic test`和`./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning`进行消息的生产和消费,操作...
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning produce启动的时候参数使用的是kafka的端口而consumer启动的时候使用的是zookeeper的端口; 单机连通性能测试 运行...
- 消费消息:`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning` 7. **最佳实践** - 为了保证高可用性,推荐至少配置 3 个 broker。 - 合理设置分区数和...
/usr/local/kafka_2.12-3.6.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 这将从主题的开头开始消费所有消息。 九、扩展与优化 Kafka支持集群部署,...
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` **五、Kafka进阶** - **集群部署**:为了提高可用性和容错性,需要在多个节点上部署Kafka...
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` `--from-beginning`参数表示从头开始消费。 十、管理Kafka Kafka提供了丰富的命令行工具,如`kafka-...
sudo -u kafka /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ``` 以上就是安装Kafka 2.4.1在Linux CentOS 7上的详细步骤。为了保持Kafka的...
from kafka import KafkaConsumer consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092') for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, ...
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ``` 九、监控与管理 Kafka提供了命令行工具用于查看主题信息、管理消费者组等。例如,列出所有主题: ```bash ./...
- 消费消息:`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning`。 5. **API使用**: - Java API允许开发者直接在应用程序中创建生产者和消费者对象,发送和...
- 消费消息:`bin\windows\kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning`,从头开始消费消息。 **Kafka的特性与使用场景** - **消息持久化**:Kafka将消息...
Kafka API 提供了丰富的接口供开发者使用,如 `ProducerRecord` 用于创建消息,`KafkaConsumer` 类用于订阅主题并消费消息。 在实际开发中,你可能需要处理更复杂的情况,比如配置多个 broker 构建分布式 Kafka ...
3. 消费消息,运行`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test-topic`,查看消息是否被正确消费。 **6. 集群监控与管理** 为了确保Kafka集群的稳定运行,你...
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 这将从"my-topic"的开头开始消费所有消息。 六、测试与调试 在本地开发环境中,我们可以快速地创建多个...
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --group my-group ``` ### 4. Spring Cloud Kafka集成 Spring Cloud Kafka为Spring应用提供了与Kafka的无缝...
bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092 ``` 9. **集群部署** 对于生产环境,通常需要部署多个 Kafka 节点,形成一个高可用的集群。此时需要配置...
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ``` 这将开始从`test`主题的开头消费消息。 **监控和管理** Kafka提供了一些命令行工具用于管理主题、检查集群...
- 启动消费者:`kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning`。 #### 三、Flume采集数据至Kafka **3.1 配置Flume Agent** 1. **创建Flume配置文件**: ...