`

kafka java 生产消费程序示例

 
阅读更多
自已测试通过的


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumer {

    private final ConsumerConnector consumer;

    private KafkaConsumer() {
        Properties props = new Properties();
        //zookeeper 配置
        //props.put("zookeeper.connect", "103.29.134.193:2181");
        props.put("zookeeper.connect", "103.7.221.141:2181");
        //group 代表一个消费组
        props.put("group.id", "jd-group");

        //zk连接超时
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        //序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");

        ConsumerConfig config = new ConsumerConfig(props);

        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put("srp_word", new Integer(1));

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap =
                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get("srp_word").get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        //进行入库操作
        while (it.hasNext()) {

            System.out.println("=====标示:" + it.next().message());
        }
    }

    public static void main(String[] args) {
        new KafkaConsumer().consume();
    }
}


上面上单线程的处理端,但是在实际的应用中,只有多线程的处理才能够提高性能。
 
 void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//传入参数,为分区数量,多线程取多分区
        topicCountMap.put("srp_word", paritonsNum);

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap =
                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        List<KafkaStream<String, String>> streamList = consumerMap.get("srp_word");

//启动多个线程来处理list
ExecutorService threadpool = Executors.newFixedThreadPool(paritonsNum);
for(KafkaStream<String, String> partition : streamList){
threadPool.execute(new MessageRunner(partition));
}
      
分享到:
评论

相关推荐

    kafka java 生产消费程序 demo 示例

    kafka java 生产消费程序 demo 示例 kafka 是吞吐量巨大的一个消息系统,它是用 scala 写的,和普通的消息的生产消费还有所不同,写了个 demo 程序供大家参考。kafka 的安装请参考官方文档。 首先我们需要新建一个 ...

    kafka生产和消费示例

    总结来说,"kafka生产和消费示例"这个项目展示了如何在Spring MVC应用中利用Spring for Apache Kafka库进行消息的生产和消费。通过创建定时任务生产消息和监听主题消费消息,我们可以实现数据的实时传输和处理,这...

    kafka-java-demo 基于java的kafka生产消费者例子

    "Kafka-java-demo"项目是一个简单的示例,演示了如何使用Java API创建Kafka生产者和消费者。项目中通常会包含以下关键部分: - **Producer示例**:展示如何创建一个Kafka生产者,设置必要的配置(如服务器地址、...

    kafka生产及消费示例代码,下到就是赚到

    这个"Kafka生产及消费示例代码"压缩包显然包含了一些帮助学习和理解Kafka基本操作的代码实例,包括如何创建生产者发送消息以及如何创建消费者接收消息。 首先,我们来了解一下Kafka的基本概念。Kafka是一个高吞吐量...

    java kafka 生产者/消费者demo

    Java Kafka的生产者/消费者示例展示了如何在Java应用程序中使用Kafka进行消息传递。Kafka与Zookeeper的协作确保了集群的稳定运行,而与MySQL的集成则提供了消息的持久化保障。理解并掌握这些基本概念和操作,对于...

    Java实现Kafka生产者消费者代码实例

    在Java中实现Kafka的生产者和消费者,需要依赖Kafka提供的客户端库。Maven项目中通常需要在pom.xml文件中加入kafka-clients依赖,如下: ```xml &lt;groupId&gt;org.apache.kafka &lt;artifactId&gt;kafka-clients ...

    kafka java小例子

    接下来,我们将通过一个简单的Java程序来演示如何使用Kafka。在"**kafkaDemoTest**"这个项目中,我们通常会包含以下两个关键部分:生产者(Producer)和消费者(Consumer)。 1. **生产者实现**: 要创建一个Kafka...

    kafka实例 消费者生产者

    在本文中,我们将深入探讨Apache Kafka,一个分布式流处理平台,以及如何在其中创建消费者和生产者实例。Kafka是一个高度可扩展、高吞吐量的消息队列系统,广泛用于实时数据流处理和大数据分析。 首先,让我们了解...

    kafka生产消费demo

    在本文中,我们将深入探讨如何使用Apache Kafka 0.10.2版本的API来创建一个简单的生产者和消费者示例。Kafka是一种分布式流处理平台,常用于实时数据管道和消息传递,它允许应用程序高效地发布和订阅大量数据流。 #...

    kafka学习代码(java开发kafka)

    在" kafka-study "压缩包中,你可能会找到一些示例代码,这些代码展示了如何使用Java API创建Kafka生产者和消费者,以及如何处理错误和配置选项。通过研究这些示例,你可以更好地理解和应用Kafka的Java开发实践。...

    kafka java单线程,多线程,多线程管理器代码

    在本文中,我们将深入探讨Apache Kafka的Java编程实践,特别是关注单线程和多线程在Kafka生产者与消费者中的应用,以及多线程管理器的实现。Apache Kafka是一个分布式流处理平台,广泛用于实时数据管道和消息传递。...

    kettle kafka 消息生产插件

    标题中的“kettle kafka 消息生产插件”指的是Pentaho Data Integration(通常称为Kettle或PDI)中的一款插件,它允许用户通过Kettle工作流将数据发布到Apache Kafka分布式消息系统。Kafka是一种高效、可扩展且容错...

    Kafka示例代码

    通过提供的Kafka示例代码,你可以学习如何在Java和Scala环境中设置和操作Kafka的生产者和消费者,理解Kafka与Hadoop的集成方式,以及如何在Scala这样的函数式语言中优雅地使用Kafka API。这些知识对于构建实时数据...

    spring boot操作kafka例子

    通过以上步骤,我们已经掌握了如何在Spring Boot应用中利用Kafka进行消息生产和消费。在实际开发中,我们还可以利用Spring Boot的特性,如AOP(面向切面编程)、事务管理等,来实现更复杂的消息处理逻辑。此外,还...

    springboot整合kafka的发布/消费demo项目源码

    在本文中,我们将深入探讨如何将Spring Boot与Apache Kafka整合,以便实现发布/...这个"springboot整合kafka的发布/消费demo项目源码"提供了一个基础的示例,帮助开发者快速上手并理解Kafka在Spring Boot中的工作方式。

    spring-kafka-producer-consumer-example_java_springboot_kafka_

    描述中的“Simple application demonstrate kafka java springboot”进一步确认了这是一个基于Java和Spring Boot的简单示例,用于演示Kafka的功能。 Apache Kafka是一种高吞吐量、分布式的发布/订阅消息系统,它被...

    springboot整合kafka,指定分区发送,批量消费,指定topic分区消费

    在本文中,我们将深入探讨如何在Spring Boot 2.x应用程序中整合Apache Kafka,重点是实现指定分区发送、批量消费以及指定topic分区消费的功能。Apache Kafka是一个分布式流处理平台,广泛用于构建实时数据管道和流...

    spark streamming消费kafka数据存入hbase示例代码

    这个示例代码是用 Scala 编写的,用于演示如何使用 Spark Streaming 消费来自 Kafka 的数据,并将这些数据存储到 HBase 数据库中。Kafka 是一个分布式流处理平台,而 HBase 是一个基于 Hadoop 的非关系型数据库,...

    apache-kafka-1.0.0 java Demo

    这个Demo虽然简单,但它演示了Kafka最基础的生产和消费流程。实际应用中,可能需要考虑更多因素,例如错误处理、幂等性、连接池的使用等。通过这个例子,你可以进一步学习Kafka的高级特性,比如分区策略、幂等性生产...

Global site tag (gtag.js) - Google Analytics