`
wandejun1012
  • 浏览: 2729833 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

kafka java demo

 
阅读更多

环境:ubuntu10 2台(32位)+JDK1.8(32位)+kafka2.11+Intellij15

 

目标:Java启动一个Producer,启动一个Consumer,Linux启动一个Consumer.

观察3者是否能相互通信。

 

注意到,Java的Producer和Consumer全是用maven构建的,父项目是kafka_demo,他们两个是module.

 

1、Java Producer Demo:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
 * Created by Germmy on 2016/7/10.
 */
public class KafkaProducer {

    private  final Producer<String,String> producer;

    public final static String TOPIC="TEST-TOPIC";

    private  KafkaProducer(){
        Properties props=new Properties();
        props.put("metadata.broker.list","192.168.200.129:9092");
        props.put("serializer.class","kafka.serializer.StringEncoder");
        props.put("key.serializer.class","kafka.serializer.StringEncoder");
        props.put("request.required.acks","-1");
        producer=new Producer<String, String>(new ProducerConfig(props)) ;
    }


    void produce(){
        int messageNo=1000;
        final int COUNT=10000;
        while(messageNo<COUNT){
            String key=String.valueOf(messageNo);
            String data="hello kafka message"+key;
            producer.send(new KeyedMessage<String,String>(TOPIC,key,data));
            System.out.println(data);
            messageNo++;
        }
    }

    public  static  void main(String[] args){
        new KafkaProducer().produce();
    }

}

 

2、Java Consumer Demo:

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Created by Germmy on 2016/7/10.
 */
public class KafkaConsumer {


    private  final ConsumerConnector consumer;

    public final static String TOPIC="TEST-TOPIC";

    private KafkaConsumer(){
        Properties props=new Properties();
        props.put("zookeeper.connect","192.168.200.129:2181");
        props.put("group.id","jd-group");//消费组是什么概念?

        props.put("zookeeper.session.timeout.ms","60000");
        props.put("zookeeper.sync.time.ms","200");
        props.put("auto.commit.interval.ms","1000");
        props.put("auto.offset.reset","smallest");

        props.put("serializer.class","kafka.serializer.StringEncoder");

        ConsumerConfig config=new ConsumerConfig(props);

        consumer=kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }


    void consume(){
        Map<String,Integer> topicCountMap=new HashMap<String, Integer>();
        topicCountMap.put(KafkaConsumer.TOPIC,new Integer(1));
        StringDecoder keyDecoder=new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder=new StringDecoder(new VerifiableProperties());

        Map<String,List<KafkaStream<String,String>>> consumerMap=
                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

        KafkaStream<String,String> stream=consumerMap.get(KafkaConsumer.TOPIC).get(0);
        ConsumerIterator<String,String> it=stream.iterator();
        while(it.hasNext()){
            System.out.println(it.next().message());
        }
    }


    public  static  void main(String[] args){
        new KafkaConsumer().consume();
    }

}

 

注意到,Consumer连接zookeeper的超时时间需要设置长一点,之前的版本是4秒,会报连接超时异常。我这里设置的是60S,参考链接

 

 

3、父POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.germmy</groupId>
    <artifactId>kafkademo</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>kafka_producer</module>
        <module>kafka_consumer</module>
    </modules>

    <dependencies>

        <!--<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.0</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.jms</groupId>
                    <artifactId>jms</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jmx</groupId>
                    <artifactId>jmxri</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jdmk</groupId>
                    <artifactId>jmxtools</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>


</project>

 

 

注意到,这里引入的client客户端要去除对jms的依赖,否则会报如下错:

Could not transfer artifact com.sun.jdmk:jmxtools:jar:1.2.1 from/to java.net (https://maven-repository.dev.java.net/nonav/repository): No connector available to access repository java.net (https://maven-repository.dev.java.net/nonav/repository) of type legacy using the available factories 

overstack上的解决的方案

 

 

至此,demo已经成功运行。

 

4、遗留的问题如下:

      * Producer用mvn打包失败,具体原因待查。

      * props.put("group.id","jd-group");//消费组的概念待明确。

      * intellij启动多个main后,console是直接TAB在一起的,根本不要显式切换。

      * alt+1,直接打开projectViews.

      * 点击左下角的正方形,可以打开或者关闭所有的侧边栏。

      * 关于用0.8的client报jms的错误问题,有人说是因为它去maven2的仓库中了,所以要将repositories设置指向maven3的仓库。参考链接

      * 用intellij有时maven不会自动下载依赖,此时可以用cmd直接敲mvn compile命令试试。

      * 在git命令中已经和远程仓库关联,但是在intellij中还不知道如何关联。

 

 

 

-------------------------------------------------------------------------------------------------

本文参考的java demo

 

 

 

分享到:
评论

相关推荐

    kafka-java-demo 基于java的kafka生产消费者示例

    除了基本的生产和消费功能,Kafka还支持一些高级特性,如幂等性生产者、事务性消费者、连接器(Connectors)以及Kafka Streams,这些都可能在"Kafka-java-demo"中有所体现,帮助你更好地理解和应用Kafka。...

    kafka-java-demo 基于java的kafka生产消费者例子

    在本文中,我们将深入探讨基于Java的Kafka生产者与消费者的实现,这主要围绕着"Kafka-java-demo"项目展开。Kafka是一个分布式流处理平台,由LinkedIn开发并开源,现在是Apache软件基金会的一部分。它被广泛用于实时...

    SpingBoot中使用Kafka的Demo

    为了让这个Demo运行起来,你需要在主应用类(例如`Application.java`)中添加以下代码,启动Kafka消费者: ```java import org.springframework.boot.SpringApplication; import org.springframework.boot....

    storm集成kafka插demo.zip

    【标题】"storm集成kafka插demo.zip"指的是一个演示如何将Apache Storm与Apache Kafka集成的实例项目。这个压缩包包含了一个示例,用于展示如何在Storm拓扑中消费和处理Kafka的消息。 【描述】"storm集成kafka插件...

    Spirng整合Kafka的Demo

    Spring框架提供了Spring Kafka模块,使得开发者能够轻松地在Java应用中集成Apache Kafka。在这个"Spring整合Kafka的Demo"中,我们可以学习到如何配置、创建生产者和消费者,以及如何在Spring应用中使用Kafka的API。 ...

    springboot-kafka-simple-demo

    本示例"springboot-kafka-simple-demo"旨在帮助开发者了解如何在SpringBoot应用中集成和使用Kafka。 首先,我们需要了解Kafka的基本概念。Kafka是一个高吞吐量、低延迟的消息队列,它可以处理PB级别的数据,适用于...

    KafkaConsumerDemo.java

    KafkaConsumerDemo.java

    kafka_demo.rar

    下载`kafka_demo.rar`压缩包,解压后导入IDE,确保本地已经运行了Kafka服务,然后运行主类启动SpringBoot应用。此时,你可以观察到生产者发送的消息被消费者定时接收并打印,验证了SpringBoot整合Kafka的功能正常...

    kafka-demo.rar

    使用java客户端, kafka-producer, kafka-consumer进行kafka的连接 注: 0.10 版本之后, 连接kafka只需要brokerip即可, 不需要zookeeper的信息

    apache-kafka-1.0.0 java Demo

    在这个"apache-kafka-1.0.0 java Demo"中,我们将探讨如何使用Java API与Kafka进行交互,实现基本的消息生产与消费功能。这个Demo虽然没有采用连接池来优化性能,但依然能展示Kafka的核心特性。 首先,我们要了解...

    springMVC+多线程+kafka的 demo基于maven

    在本项目中,我们探索了如何将Spring MVC框架与多线程、线程池和Apache Kafka集成,构建一个高效...通过修改配置文件中的Kafka集群IP地址,这个demo就可以在不同的环境中运行,展示了这三者在实际应用中的集成和作用。

    springboot-kafka-demo.zip

    SpringBoot与Kafka的整合是现代Java开发中常见的任务,特别是在构建实时数据处理或消息传递系统时。SpringBoot以其简洁的配置和强大的依赖管理而受到欢迎,而Apache Kafka则是一个高性能、分布式的发布/订阅消息系统...

    kafkaConsumerDemo.zip

    `kafkaConsumerDemo`可能是一个简单的Java应用,演示了如何使用Kafka Consumer API订阅主题、接收消息并处理它们。这个项目可能包含以下关键组件: 1. **配置**:设置消费者配置,如`bootstrap.servers`、`group.id...

    KafkaDemo示例

    **KafkaDemo示例详解** Kafka是一种分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会,现在已经成为大数据领域中的重要组件。它主要用于构建实时数据管道和流应用,能够处理大量的实时数据。在这个名为...

    spring-kafka-demo

    Spring Kafka 是 Spring 框架为集成 Apache Kafka 提供的一个模块,它使得在 Java 应用程序中使用 Kafka 变得简单且直观。本项目 "spring-kafka-demo" 提供了一个完整的示例,涵盖了生产者和消费者的配置与实现,...

    kafka2.11demo

    本示例将带你深入理解Kafka 2.11版本的操作,通过`Consumertest.java`、`Producertest.java`和`UserKafkaProducer.java`三个关键文件,我们将探索如何构建生产者和消费者,以及如何实际应用Kafka进行数据传输。...

    kafka demo

    在本Kafka Demo中,我们将通过Java代码来演示如何创建生产者发送消息以及构建消费者接收消息,以深入理解Kafka的工作原理。 一、Kafka基本概念 1. 主题(Topic):Kafka中的数据是按照主题进行组织的,主题是逻辑上...

    kafka java 生产消费程序 demo 示例

    kafka java 生产消费程序 demo 示例 kafka 是吞吐量巨大的一个消息系统,它是用 scala 写的,和普通的消息的生产消费还有所不同,写了个 demo 程序供大家参考。kafka 的安装请参考官方文档。 首先我们需要新建一个 ...

    springboot后端kafka demo

    `kafkaDemo`可能是一个简单的Spring Boot应用,它集成了Kafka的相关配置和API,用于发送消息到Kafka主题。在Spring Boot中,我们通常会使用`@EnableKafka`注解开启Kafka的支持,然后通过`@Service`或`@Component`...

Global site tag (gtag.js) - Google Analytics