`
ganliang13
  • 浏览: 252537 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

kafka之java编程模型

阅读更多
package com.ganglia.kafka;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
   
public class ProducerTest2 {   
     
        public static void main(String[] args) {   
            Properties props = new Properties();   
            props.setProperty("metadata.broker.list","bfdbjc1:9092,test1:9092,test2:9092");   
            props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
            ProducerConfig config = new ProducerConfig(props);   
            Producer<String, String> producer = new Producer<String, String>(config);   
            try {   
                int i =1; 
                while(true){ 
                    i++;
                    String text = new StringBuffer((i+"")).reverse()+":test-kafka_"+args[0]+"_"+i;
                    KeyedMessage<String, String> data = new KeyedMessage<String, String>("test",text);   
                    producer.send(data);   
                    Thread.sleep(100);
                    System.out.println(DateUtil.fmtDateToYMDHMS(new Date())+"\t"+text);
                } 
            } catch (Exception e) {   
                e.printStackTrace();   
            }   
            producer.close();   
        }   
}

 

1.安装zookeeper.

2.启动zookeeper.

3.启动kafka服务, 在zk1,zk2,zk3上分别运行:
   kafka-server-start.sh  ../config/server.properties /启动kafka
4. 新建一个TOPIC(replication-factor=num of brokers)
   kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181
5.假设我们在zk2上,开一个终端,发送消息至kafka
   kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test
  在发送消息的终端输入:Hello Kafka
6.假设我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)
   kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning

 

 

package com.ganglia.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
   
public class TestConsumer extends Thread{   
        private final ConsumerConnector consumer;   
        private final String topic;   
     
        public static void main(String[] args) {   
            TestConsumer consumerThread = new TestConsumer("test");   
            consumerThread.start();   
        }   
        public TestConsumer(String topic) {   
            consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());   
            this.topic =topic;   
        }   
     
    private static ConsumerConfig createConsumerConfig() {   
        Properties props = new Properties();   
        props.put("zookeeper.connect","test1:2181,test2:2181,bfdbjc1:2181");   
        props.put("group.id", "0");   
        props.put("zookeeper.session.timeout.ms","10000");   
        return new ConsumerConfig(props);   
    }   
     
    public void run(){   
        Map<String,Integer> topickMap = new HashMap<String, Integer>();   
        topickMap.put(topic, 1);   
        Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);   
        KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);   
        ConsumerIterator<byte[],byte[]> it =stream.iterator();   
        System.out.println("*********Results********");   
        while(true){   
            if(it.hasNext()){ 
               /* MessageAndMetadata<byte[], byte[]> mm = it.next();
                System.err.println("get data:" +new String(mm.message())); */  
                System.err.println("get data:" +new String(it.next().message()));
            } 
             
        }   
    }   
}

 

  

 

 

 

分享到:
评论

相关推荐

    kafka集群Java开发jar包

    7. **Kafka Streams**:对于更复杂的流处理任务,Java开发者还可以利用Kafka提供的`KafkaStreams`库,它提供了一种编程模型,用于构建状态ful的、容错的流处理应用。 了解并熟练掌握以上知识点,对于在Java中进行...

    Java面试题+Java并发编程(J.U.C)+Java8实战+Redis+kafka

    Java面试题+Java并发编程(J.U.C)+Java8实战+Redis+kafka Java 『必看』2021 版最新Java 学习路线图(持续刷新):+1::+1::+1: Java入门面试题 Java基础入门80问,适合新手,老鸟直接跳过 Java并发编程(J.U.C) ...

    kafka java单线程,多线程,多线程管理器代码

    在本文中,我们将深入探讨Apache Kafka的Java编程实践,特别是关注单线程和多线程在Kafka生产者与消费者中的应用,以及多线程管理器的实现。Apache Kafka是一个分布式流处理平台,广泛用于实时数据管道和消息传递。...

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    1. **Netty**:Netty提供了丰富的网络编程模型,包括TCP和UDP协议的支持。TCP长连接是一种保持连接状态,允许多次数据交换的通信方式。在Netty中,我们可以创建一个ServerBootstrap实例来启动服务器,并配置...

    springMVC+多线程+kafka的 demo基于maven

    - 它支持RESTful编程模型,允许创建HTTP服务,方便前后端分离。 - Spring MVC通过依赖注入(DI)和面向切面编程(AOP)提供灵活的控制反转,使代码更易于测试和维护。 2. **多线程**: - 多线程是指在一个程序中...

    kafka以及依赖安装包jdk+zookeeper+kafka

    JDK是Java编程语言的运行环境,是运行Kafka和Zookeeper所必需的。 **JDK** JDK(Java Development Kit)是开发和运行Java应用程序的必备工具。这里的`jdk1.8.0_144.tar.gz`是Oracle JDK的一个版本,适用于Linux...

    Java实现Kafka数据生产者

    "Java实现Kafka数据生产者"这个功能可以让你使用Java编程语言创建一个能够连接到指定的Kafka集群(通过bootstrap servers)并向其中发送数据的应用程序。通过这个功能,你可以将数据以消息的形式发送到Kafka主题中,...

    kafka实例资源

    4. **灵活性**:Kafka支持发布/订阅模型,也可以用作消息队列,同时提供API供各种编程语言使用。 **二、SpringBoot与Kafka集成** SpringBoot简化了Java应用程序的开发,而Spring for Apache Kafka提供了与Kafka...

    Kafka与spring集成

    Spring Framework是一种流行的Java应用程序框架,提供了一个广泛的编程模型和配置机制。将Kafka与Spring集成,可以实现实时数据处理、日志处理和消息队列等功能。本文将详细介绍如何将Kafka与Spring集成,并提供了...

    kafka全套资源环境+demo

    总结来说,这个"Kafka全套资源环境+demo"是一个全面的学习资源,它覆盖了从安装配置到实际编程的所有环节,对于想深入了解和使用Kafka的Java开发者来说,是一份极具价值的参考资料。通过深入研究和实践,开发者不仅...

    kafka大数据 生产者消费者实例

    在这个"Kafka大数据 生产者消费者实例"中,我们将探讨如何通过Java编程语言来实现Kafka的生产者和消费者。 首先,我们要理解Kafka中的**生产者(Producer)**,它是负责发布消息到特定主题的组件。在Java中,我们...

    java实现SparkSteamming接受发送Kafka消息

    3. **Java与Spark Streaming**:Spark Streaming API支持多种编程语言,包括Java。在Java中,我们使用`JavaDStream`接口来表示连续的数据流,并通过`JavaInputDStream`从各种数据源(如Kafka)接收数据。 4. **配置...

    Kafka 消息队列(高清版)深入理解Kafka:核心设计与实践原理.zip

    在Java编程环境下,Kafka以其高效、可扩展性和容错性赢得了开发者们的青睐。 1. **Kafka基本概念** - **主题(Topic)**:Kafka中的数据以主题的形式存在,主题是逻辑上的分类,可以看作是消息的分类目录。 - **...

    springboot和kafka的集成

    通过`spring-boot-starter-data-kafka`起步依赖,我们可以快速创建生产者和消费者,利用Spring的注解驱动特性简化编程模型。同时,利用Kafka的强大功能,如高吞吐量、持久化和复制,可以在分布式系统中构建可靠的...

    kafka实战pdf

    3. **数据模型**:深入探讨Kafka的数据模型,包括如何创建和管理主题,以及如何分配分区和副本,以实现数据的水平扩展和容错性。 4. **生产者API**:学习如何使用Java或Scala等语言的Kafka生产者API,发送消息到...

    Java编程语言在大数据开发中的应用探究.pdf

    1. 学习与精通Java:对于大数据开发者来说,熟悉Java语言是基础,包括理解其语法、类库和并发编程模型,以便有效地利用Java进行大数据处理。 2. 利用Java生态系统:Java拥有丰富的开源库和框架,如Apache Spark、...

    kafka_2.13-2.8.1.tgz

    Scala是一种多范式的编程语言,与Java API兼容,是构建大规模并发系统的好选择,因其强大的函数式编程特性,使得编写Kafka这样的分布式系统更为便捷。 4. **文件结构解析** 解压kafka_2.13-2.8.1.tgz后,我们会...

    kafka-java-app

    【标签】"Java" 明确指出该项目是用Java编程语言编写的,这意味着它遵循Java的编程规范,使用了Java的相关库和框架。由于Java具有跨平台性,这个应用可以在多种操作系统上运行。开发者可能会使用Spring Boot等框架来...

    大数据 分布式 读写 kafka

    6. Java编程:可能包含用Java实现的Kafka客户端代码。 7. 示例代码和教程:适合学习者理解如何在实际项目中使用Kafka进行读写操作。 为了深入学习这些知识点,可以解压“Kafaka_Opt”文件,查看项目结构,阅读源...

    spring-kafka源代码

    它通过Spring的声明式编程模型,使得开发者可以轻松地在应用中添加消息传递功能。在源代码层面,Spring Kafka主要由以下几个模块组成: 1. `org.springframework.kafka.core`:这是Spring Kafka的核心模块,包含...

Global site tag (gtag.js) - Google Analytics