`
liyonghui160com
  • 浏览: 777204 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

kafka开发实例

阅读更多

 

 

1.启动kafka。

//启动zookeeper server (用&是为了能退出命令行):
bin/zookeeper-server-start.sh config/zookeeper.properties  &
//启动kafka server: 
bin/kafka-server-start.sh config/server.properties  &

 

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.1</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.10</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.10</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-annotation</artifactId>
            <version>2.2.0</version>
        </dependency>




        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
        <dependency>
            <groupId>zkclient</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.3</version>
        </dependency>


    </dependencies>

 

 


2.新建一个生产者例子

 

import java.util.Properties;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class KafkaTest {
    public static void main(String[] args) { 
        Properties props = new Properties(); 
        props.put("zk.connect", "10.103.22.47:2181"); 
        props.put("serializer.class", "kafka.serializer.StringEncoder"); 
        props.put("metadata.broker.list", "10.103.22.47:9092");
        props.put("request.required.acks", "1");
        //props.put("partitioner.class", "com.xq.SimplePartitioner");
        ProducerConfig config = new ProducerConfig(props); 
        Producer<String, String> producer = new Producer<String, String>(config); 
        String ip = "192.168.2.3";
        String msg ="this is a messageuuu!";
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", ip,msg); 
        producer.send(data);
        producer.close(); 
    } 
 
}

 

3.新建一个消费者例子

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
 
 
public class ConsumerSample {
 
    public static void main(String[] args) { 
        // specify some consumer properties 
        Properties props = new Properties(); 
        props.put("zookeeper.connect", "10.103.22.47:2181"); 
        props.put("zookeeper.connectiontimeout.ms", "1000000"); 
        props.put("group.id", "test_group"); 
 
            // Create the connection to the cluster 
        ConsumerConfig consumerConfig = new ConsumerConfig(props); 
        ConsumerConnector connector = Consumer.createJavaConsumerConnector(consumerConfig); 
 
 
        Map<String,Integer> topics = new HashMap<String,Integer>(); 
        topics.put("test", 2); 
        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = connector.createMessageStreams(topics); 
        List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test");
        ExecutorService threadPool = Executors.newFixedThreadPool(2); 
        for (final KafkaStream<byte[], byte[]> stream : streams) { 
            threadPool.submit(new Runnable() { 
                public void run() { 
                    for (MessageAndMetadata msgAndMetadata : stream) { 
                        // process message (msgAndMetadata.message()) 
                        System.out.println("topic: " + msgAndMetadata.topic()); 
                        Message message = (Message) msgAndMetadata.message(); 
                        ByteBuffer buffer = message.payload(); 
                        byte[] bytes = new byte[message.payloadSize()]; 
                        buffer.get(bytes); 
                        String tmp = new String(bytes); 
                        System.out.println("message content: " + tmp); 
                    } 
                } 
            }); 
        }   
    }
}

 

分享到:
评论

相关推荐

    C#kafka开发实例

    在本文中,我们将深入探讨如何使用C#进行Kafka开发,以及如何实现数据同步。Kafka是一种高效的消息中间件,最初由LinkedIn开发,现在已成为Apache软件基金会的顶级项目。它被设计为支持高吞吐量、低延迟的消息传递,...

    kafka项目实例

    Kafka是一个开源的分布式流处理平台,由LinkedIn开发并贡献给Apache软件基金会。它被设计为可扩展、高吞吐量、低延迟的消息系统,广泛用于实时数据管道和流应用中。在这个“kafka项目实例”中,我们将深入探讨如何...

    Kafka客户端开发实例java源码.zip

    在本压缩包“Kafka客户端开发实例java源码.zip”中,包含的是使用Java语言编写的Kafka客户端应用示例代码。Kafka是一款高吞吐量、分布式的消息系统,广泛应用于大数据实时处理、日志收集等领域。这个实例将帮助我们...

    kafka代码实例

    在IT行业中,Kafka是一种广泛使用的分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。这个"Kafka代码实例"的主题显然指向了如何在实际编程中应用Kafka。Kafka主要被设计用来构建实时数据管道和流应用...

    kafka运行实例

    Kafka是一种分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。它被设计为能够高效地处理大量的实时数据,支持发布订阅模型,并且可以作为消息队列使用。在这个"Kafka运行实例"中,你将找到运行Kafka所...

    kafka开发jar包

    这个“kafka开发jar包”包含了你需要在Java项目中与Kafka交互所必需的库。以下是对这些知识点的详细解释: 1. **Kafka**:Kafka是一种高吞吐量、低延迟的消息队列系统,最初由LinkedIn开发,现在是Apache软件基金会...

    kafka-python开发文档

    kafka-python是Apache Kafka分布式流处理系统的...在开发和维护使用kafka-python的应用程序时,开发者应当定期查看更新的文档,确保其应用程序可以利用到最新的功能和改进,同时确保应用程序与新版本的Kafka兼容。

    Kafka实例Kafka实例

    在IT行业中,Kafka是一个非常重要的分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。Kafka实例是实现大规模实时数据处理的关键组件,广泛应用于日志聚合、流数据分析、消息传递等多个领域。本篇文章将...

    kafka实例资源

    **Kafka实例资源** 在IT行业中,Apache Kafka是一款广泛使用的分布式流处理平台,它被设计为高吞吐、低延迟的消息系统。Kafka的核心功能包括发布订阅消息队列、存储和处理大规模实时数据流。本资源包是针对Kafka...

    Flink在CDH配置部署及读取kafka的实例验证;

    本文主要介绍Flink在Cloudera Distribution Hadoop(CDH)集群上的配置部署流程,以及如何利用Flink从Kafka中读取数据并进行处理的实例验证。在开始前,需要明白几个核心概念及其之间的关系: 1. Flink 是一个开源...

    kafka实例 消费者生产者

    Kafka是由LinkedIn开发并贡献给Apache软件基金会的开源项目。它主要设计为一个发布/订阅消息系统,允许数据生产者将消息发布到主题(topics),然后由数据消费者订阅并消费这些消息。Kafka的数据模型包括生产者、...

    kafka 实例

    这个"Kafka实例"的压缩包文件很可能是提供了一个实际的开发环境,包括消费者和生产者的Java代码示例,适用于在IntelliJ IDEA这样的集成开发环境中运行的Maven项目。下面将详细讨论Kafka、消费者、生产者以及与Maven...

    storm与kafka整合的客户端开发实例java源码.zip

    标题中的"storm与kafka整合的客户端开发实例java源码.zip"表明这是一个关于使用Java语言在Storm和Kafka之间建立集成的案例项目。这个压缩包包含的源代码为我们提供了实现这种集成的具体步骤和方法。 首先,让我们...

    kafka学习代码(java开发kafka)

    在本文中,我们将深入探讨如何使用Java进行Kafka开发,主要基于提供的" kafka-study "压缩包中的学习代码。Kafka是由Apache开发的分布式流处理平台,它被广泛用于构建实时数据管道和流应用程序。Java是Kafka客户端库...

    Kafka .Net Framework4.0 版本

    使用Kafka .Net库,你可以创建生产者和消费者实例,它们是与Kafka集群交互的基本单元。生产者负责生成消息并将其发布到特定的主题,而消费者则订阅这些主题并处理接收到的消息。Kafka .Net支持同步和异步消息发送,...

    kafka生产者消费者实例

    Kafka 是一个高吞吐量、低延迟的分布式发布订阅消息系统,最初由 LinkedIn 开发,并于2011年开源。它设计的目标是能够处理大量的实时数据,支持多个消费者组,同时提供容错性和可扩展性。 **生产者** 在 Kafka 中...

    C#+kafka实例+kafka本地环境

    在本文中,我们将深入探讨如何在C#环境中与Apache Kafka进行交互,并且会介绍如何设置Kafka的本地环境。...记得在实际开发中,要根据生产环境的规模和要求进行适当的调整和优化,确保系统的稳定性和性能。

    kafka-java-demo 基于java的kafka生产消费者示例

    Kafka是由Apache开发的分布式流处理平台,它主要被设计用来处理实时数据流。在大数据处理领域,Kafka常被用于构建实时数据管道和流应用,能够高效地处理大量的实时数据。 【Java与Kafka的结合】 在Java中使用Kafka...

    kafka安装包-2.13-3.6.2

    Apache Kafka是一款高性能、分布式的消息中间件,由LinkedIn开发并捐献给Apache软件基金会。它最初设计的目标是构建一个实时的数据管道,能够高效地处理大量的数据流,同时支持发布订阅和队列模型,使得数据在系统...

    java开发kafka-clients所需要的所有jar包以及源码

    Java开发Kafka客户端是构建基于Apache Kafka的应用程序的关键步骤,Kafka-clients库提供了与Kafka服务器进行交互的API,支持生产者和消费者的实现。在Java中使用Kafka-clients,你需要包含相应的jar包,这些包包含了...

Global site tag (gtag.js) - Google Analytics