`

kafka消费者实例

阅读更多

1.配置文件 consumer.properties

#zookeeper地址
zookeeper.connect=master:2181,slave1:2181,slave2:2181
#zookeeper超时时间
zookeeper.connectiontimeout.ms=1000000
#kafka的consumer组
group.id=test-group



2. 组织代码
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class init {
 public static void main(String[] args) throws FileNotFoundException, IOException {
  //1、读取配置文件
  Properties p = new Properties();
  //p.load(new FileInputStream(new File("./consumer.properties")));
  p.load(init.class.getClassLoader().getResourceAsStream("consumer.properties"));
  //2、通过配置文件,创建消费者配置
  ConsumerConfig config = new ConsumerConfig(p);
  //3、通过配置,创建消费者
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
  //4、创建封装消息的map
  Map<String,Integer> topics = new HashMap<String,Integer>();
  //5、封装消费的topic名字和消费这个topic中几个partition
  topics.put("test-topic", 1);
  //6、创建消息流,传入刚刚创建的map
  Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topics);
  //7、获取消息,封装到list中
  List<KafkaStream<byte[], byte[]>> partitions = streams.get("test-topic");
  //8、创建线程池,用不同的线程消费list中的不同消息,可以消费一个,也可以消费多个
  ExecutorService threadPool = Executors.newFixedThreadPool(1);
  for(KafkaStream<byte[], byte[]> partition : partitions){
   //9、将消息传到线程中消费
   threadPool.execute(new TestConsumer(partition));
  }
 }
}


3.调用显示代码

import java.io.UnsupportedEncodingException;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
public class TestConsumer extends Thread {
 private KafkaStream<byte[], byte[]> partition;
 /**
  * 构造函数,传进来消息
  */
 public TestConsumer(KafkaStream<byte[], byte[]> partition){
  this.partition = partition;
 }
 public void run() {
   //1、取出消息
   ConsumerIterator<byte[], byte[]> iterator = partition.iterator();
   //2、判断是否有下一条,有的话迭代消息,没有的话,停止等待
   while(iterator.hasNext()){
    //3、取出消息
    MessageAndMetadata<byte[], byte[]> next = iterator.next();
     try {
      //4、打印消息
              System.out.println("partiton:" + next.partition());//partition
              System.out.println("offset:" + next.offset());     //偏移量
              System.out.println("message:" + new String(next.message(), "utf-8"));//消息主体
     } catch (UnsupportedEncodingException e){
      e.printStackTrace();
     }
   }
 }

分享到:
评论

相关推荐

    kafka生产者消费者实例

    本实例将深入探讨 Kafka 的核心概念——生产者和消费者,以及它们在实际应用中的工作原理。 Kafka 是一个高吞吐量、低延迟的分布式发布订阅消息系统,最初由 LinkedIn 开发,并于2011年开源。它设计的目标是能够...

    kafka demo ,两种线程消费方式

    在本文中,我们将深入探讨Apache Kafka的两种线程消费方式,这是基于提供的标题"Kafka Demo,两种线程消费方式"。...记住,无论选择哪种方式,都需要确保代码的正确性和线程安全性,以及对Kafka消费者API的熟练掌握。

    kafka实例 消费者生产者

    通过以上信息,你可以开始构建Kafka的生产者和消费者实例。不过,要记住,实际部署时还需要考虑集群配置、容错机制、数据保留策略等高级特性。同时,Kafka还提供了更复杂的操作,如幂等性生产者、事务性消费者等,以...

    kafka大数据 生产者消费者实例

    在这个"Kafka大数据 生产者消费者实例"中,我们将探讨如何通过Java编程语言来实现Kafka的生产者和消费者。 首先,我们要理解Kafka中的**生产者(Producer)**,它是负责发布消息到特定主题的组件。在Java中,我们...

    C#kafka开发实例

    让我们看看如何使用C#编写一个简单的Kafka消费者和生产者。 1. **Kafka生产者**: 要创建一个C# Kafka生产者,你需要初始化一个ProducerConfig对象,并使用其创建一个Producer实例。设置必要的配置,如...

    kafka模拟生产者消费者(集群模式)实例

    6. **运行模拟**:启动多个消费者实例,形成一个消费组。Kafka会自动平衡分区的消费,使得同一分区在消费组内只由一个消费者消费,保证消息的顺序。 在集群模式下,如果一个broker宕机,其上的分区将被自动重新分配...

    Flume+Kafka+HBase实例

    HBase的连接参数需要在Kafka消费者端配置,以便将接收到的数据写入HBase。 4. 数据流转:Flume从源收集数据,将其发送到Kafka;Kafka作为中间层,接收并暂存数据,然后由HBase消费者读取并写入HBase。整个过程形成了...

    Java实现Kafka生产者消费者代码实例

    Kafka消费者的代码实现一般包括创建KafkaConsumer实例、订阅主题、轮询接收消息等步骤。根据代码片段,以下为消费者的关键知识点: 1. 创建KafkaConsumer实例前需要配置消费者属性,包括Kafka集群的地址(bootstrap...

    java kafka 生产者/消费者demo

    2. 创建消费者实例:配置消费者的属性,如bootstrap servers,group.id(消费组ID),key.deserializer和value.deserializer。 3. 注册监听器:使用`KafkaConsumer.subscribe()`方法订阅主题,并实现`...

    kafka-java-demo 基于java的kafka生产消费者示例

    Kafka消费者则用于订阅和消费Kafka主题中的消息。在Java中,消费者需要设置group id、订阅主题、以及offset管理策略等。一旦配置完成,可以使用`poll()`方法从Kafka服务器拉取新消息。 【Kafka Topic】 在Kafka中...

    kafka生产者消费者Demo

    2. 消费者代码示例:展示如何创建消费者实例,订阅主题并处理消息。 3. 配置文件:可能包括生产者和消费者的配置文件,其中包含了如上所述的关键参数。 4. 运行脚本或指南:指导用户如何启动和运行这些示例。 通过...

    RdKafka::KafkaConsumer使用实例

    接下来,我们创建消费者实例,并订阅感兴趣的主题: ```cpp rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_...

    kafka 可运行实例

    Kafka支持高可用性和容错性的消费者群组机制,当消费者加入或离开群组时,分区的分配会自动调整。 3. **主题和分区**:主题可以被分为多个分区,每个分区有唯一的标识符。分区内的消息是有序的,但是不同分区之间...

    kafka中partition和消费者对应关系1

    Partition是Kafka主题(Topic)的逻辑分片,每个Partition内部的消息是有序的,并且只能被同一个消费者组(Consumer Group)中的一个消费者实例消费,这就确保了消息的唯一消费性。消费者组是由一组消费者组成的,...

    netty4推送+kafka消费

    这通常通过线程模型来实现,例如在单独的线程池中运行Kafka消费者,一旦有新的消息,就将这些消息放入一个共享的数据结构,如阻塞队列。然后,Netty的IO线程可以在合适的时候从队列中取出消息,推送给客户端。 在...

    02、Kafka基础实战:消费者和生产者实例.zip

    这个压缩包很可能包含代码示例,演示如何创建Kafka生产者和消费者实例,以及如何在Java或其他语言中使用Kafka API。这些实例可能涵盖: - **连接集群**:设置配置参数,如服务器地址、端口和安全设置。 - **创建主题...

    kafka项目实例

    要获取历史偏移量,你需要使用Kafka的消费者API。消费者可以查询每个分区的最新和最早偏移量,也可以查询特定时间戳对应的消息偏移量。这在需要回溯消费或者分析历史数据时非常有用。 ### 4. 配置Kafka服务 在...

    kafka学习实例

    3. **消费者API**:消费者从Kafka的主题中读取数据。Kafka支持两种消费者模式:旧版的Simple Consumer和新版的Consumer Group。Consumer Group允许多消费者并行消费同一主题,实现负载均衡。在Kafka-demo中,我们...

    kafka-java-demo 基于java的kafka生产消费者例子

    - **Consumer示例**:演示如何创建一个Kafka消费者,订阅特定主题,接收并处理来自生产者的消息。 **4. Kafka生产者实现** 生产者的主要任务是将数据写入Kafka。在Java中,这通常涉及以下几个步骤: 1. 创建`...

    Kafka客户端开发实例java源码.zip

    Kafka的核心组件包括生产者(Producer)、消费者(Consumer)、主题(Topic)和分区(Partition)。生产者负责发布消息到主题,消费者则订阅并消费这些消息。主题是逻辑上的分类,每个主题可以分为多个分区,分区内部的消息...

Global site tag (gtag.js) - Google Analytics