1.配置文件 consumer.properties
#zookeeper地址
zookeeper.connect=master:2181,slave1:2181,slave2:2181
#zookeeper超时时间
zookeeper.connectiontimeout.ms=1000000
#kafka的consumer组
group.id=test-group
2. 组织代码
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class init {
public static void main(String[] args) throws FileNotFoundException, IOException {
//1、读取配置文件
Properties p = new Properties();
//p.load(new FileInputStream(new File("./consumer.properties")));
p.load(init.class.getClassLoader().getResourceAsStream("consumer.properties"));
//2、通过配置文件,创建消费者配置
ConsumerConfig config = new ConsumerConfig(p);
//3、通过配置,创建消费者
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
//4、创建封装消息的map
Map<String,Integer> topics = new HashMap<String,Integer>();
//5、封装消费的topic名字和消费这个topic中几个partition
topics.put("test-topic", 1);
//6、创建消息流,传入刚刚创建的map
Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topics);
//7、获取消息,封装到list中
List<KafkaStream<byte[], byte[]>> partitions = streams.get("test-topic");
//8、创建线程池,用不同的线程消费list中的不同消息,可以消费一个,也可以消费多个
ExecutorService threadPool = Executors.newFixedThreadPool(1);
for(KafkaStream<byte[], byte[]> partition : partitions){
//9、将消息传到线程中消费
threadPool.execute(new TestConsumer(partition));
}
}
}
3.调用显示代码
import java.io.UnsupportedEncodingException;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
public class TestConsumer extends Thread {
private KafkaStream<byte[], byte[]> partition;
/**
* 构造函数,传进来消息
*/
public TestConsumer(KafkaStream<byte[], byte[]> partition){
this.partition = partition;
}
public void run() {
//1、取出消息
ConsumerIterator<byte[], byte[]> iterator = partition.iterator();
//2、判断是否有下一条,有的话迭代消息,没有的话,停止等待
while(iterator.hasNext()){
//3、取出消息
MessageAndMetadata<byte[], byte[]> next = iterator.next();
try {
//4、打印消息
System.out.println("partiton:" + next.partition());//partition
System.out.println("offset:" + next.offset()); //偏移量
System.out.println("message:" + new String(next.message(), "utf-8"));//消息主体
} catch (UnsupportedEncodingException e){
e.printStackTrace();
}
}
}
分享到:
相关推荐
本实例将深入探讨 Kafka 的核心概念——生产者和消费者,以及它们在实际应用中的工作原理。 Kafka 是一个高吞吐量、低延迟的分布式发布订阅消息系统,最初由 LinkedIn 开发,并于2011年开源。它设计的目标是能够...
在本文中,我们将深入探讨Apache Kafka的两种线程消费方式,这是基于提供的标题"Kafka Demo,两种线程消费方式"。...记住,无论选择哪种方式,都需要确保代码的正确性和线程安全性,以及对Kafka消费者API的熟练掌握。
通过以上信息,你可以开始构建Kafka的生产者和消费者实例。不过,要记住,实际部署时还需要考虑集群配置、容错机制、数据保留策略等高级特性。同时,Kafka还提供了更复杂的操作,如幂等性生产者、事务性消费者等,以...
在这个"Kafka大数据 生产者消费者实例"中,我们将探讨如何通过Java编程语言来实现Kafka的生产者和消费者。 首先,我们要理解Kafka中的**生产者(Producer)**,它是负责发布消息到特定主题的组件。在Java中,我们...
让我们看看如何使用C#编写一个简单的Kafka消费者和生产者。 1. **Kafka生产者**: 要创建一个C# Kafka生产者,你需要初始化一个ProducerConfig对象,并使用其创建一个Producer实例。设置必要的配置,如...
6. **运行模拟**:启动多个消费者实例,形成一个消费组。Kafka会自动平衡分区的消费,使得同一分区在消费组内只由一个消费者消费,保证消息的顺序。 在集群模式下,如果一个broker宕机,其上的分区将被自动重新分配...
HBase的连接参数需要在Kafka消费者端配置,以便将接收到的数据写入HBase。 4. 数据流转:Flume从源收集数据,将其发送到Kafka;Kafka作为中间层,接收并暂存数据,然后由HBase消费者读取并写入HBase。整个过程形成了...
Kafka消费者的代码实现一般包括创建KafkaConsumer实例、订阅主题、轮询接收消息等步骤。根据代码片段,以下为消费者的关键知识点: 1. 创建KafkaConsumer实例前需要配置消费者属性,包括Kafka集群的地址(bootstrap...
2. 创建消费者实例:配置消费者的属性,如bootstrap servers,group.id(消费组ID),key.deserializer和value.deserializer。 3. 注册监听器:使用`KafkaConsumer.subscribe()`方法订阅主题,并实现`...
Kafka消费者则用于订阅和消费Kafka主题中的消息。在Java中,消费者需要设置group id、订阅主题、以及offset管理策略等。一旦配置完成,可以使用`poll()`方法从Kafka服务器拉取新消息。 【Kafka Topic】 在Kafka中...
2. 消费者代码示例:展示如何创建消费者实例,订阅主题并处理消息。 3. 配置文件:可能包括生产者和消费者的配置文件,其中包含了如上所述的关键参数。 4. 运行脚本或指南:指导用户如何启动和运行这些示例。 通过...
接下来,我们创建消费者实例,并订阅感兴趣的主题: ```cpp rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_...
Kafka支持高可用性和容错性的消费者群组机制,当消费者加入或离开群组时,分区的分配会自动调整。 3. **主题和分区**:主题可以被分为多个分区,每个分区有唯一的标识符。分区内的消息是有序的,但是不同分区之间...
Partition是Kafka主题(Topic)的逻辑分片,每个Partition内部的消息是有序的,并且只能被同一个消费者组(Consumer Group)中的一个消费者实例消费,这就确保了消息的唯一消费性。消费者组是由一组消费者组成的,...
这通常通过线程模型来实现,例如在单独的线程池中运行Kafka消费者,一旦有新的消息,就将这些消息放入一个共享的数据结构,如阻塞队列。然后,Netty的IO线程可以在合适的时候从队列中取出消息,推送给客户端。 在...
这个压缩包很可能包含代码示例,演示如何创建Kafka生产者和消费者实例,以及如何在Java或其他语言中使用Kafka API。这些实例可能涵盖: - **连接集群**:设置配置参数,如服务器地址、端口和安全设置。 - **创建主题...
要获取历史偏移量,你需要使用Kafka的消费者API。消费者可以查询每个分区的最新和最早偏移量,也可以查询特定时间戳对应的消息偏移量。这在需要回溯消费或者分析历史数据时非常有用。 ### 4. 配置Kafka服务 在...
3. **消费者API**:消费者从Kafka的主题中读取数据。Kafka支持两种消费者模式:旧版的Simple Consumer和新版的Consumer Group。Consumer Group允许多消费者并行消费同一主题,实现负载均衡。在Kafka-demo中,我们...
- **Consumer示例**:演示如何创建一个Kafka消费者,订阅特定主题,接收并处理来自生产者的消息。 **4. Kafka生产者实现** 生产者的主要任务是将数据写入Kafka。在Java中,这通常涉及以下几个步骤: 1. 创建`...
Kafka的核心组件包括生产者(Producer)、消费者(Consumer)、主题(Topic)和分区(Partition)。生产者负责发布消息到主题,消费者则订阅并消费这些消息。主题是逻辑上的分类,每个主题可以分为多个分区,分区内部的消息...