`

Kafka的Producer和Consumer的示例(使用Java语言)

 
阅读更多

http://www.tuicool.com/articles/uu6r2e

 

 

我使用的kafka版本是:0.7.2

jdk版本是:1.6.0_20

http://kafka.apache.org/07/quickstart.html 官方给的示例并不是很完整,以下代码是经过我补充的并且编译后能运行的。

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Kafka使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm

Producer Code

import java.util.*;

import kafka.message.Message;

import kafka.producer.ProducerConfig;

import kafka.javaapi.producer.Producer;

import kafka.javaapi.producer.ProducerData;

public class ProducerSample {

public static void main(String[] args) {

ProducerSample ps = new ProducerSample();

Properties props = new Properties();

props.put("zk.connect", "127.0.0.1:2181");

props.put("serializer.class", "kafka.serializer.StringEncoder");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);

ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message2");

producer.send(data);

producer.close();

}

}

Consumer Code

import java.nio.ByteBuffer;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.Message;

import kafka.message.MessageAndMetadata;

public class ConsumerSample {

public static void main(String[] args) {

// specify some consumer properties

Properties props = new Properties();

props.put("zk.connect", "localhost:2181");

props.put("zk.connectiontimeout.ms", "1000000");

props.put("groupid", "test_group");

// Create the connection to the cluster

ConsumerConfig consumerConfig = new ConsumerConfig(props);

ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

// create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume

HashMap<String, Integer> map = new HashMap<String, Integer>();

map.put("test-topic", 4);

Map<String, List<KafkaStream<Message>>> topicMessageStreams =

consumerConnector.createMessageStreams(map);

List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");

// create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(4);

// consume the messages in the threads

for (final KafkaStream<Message> stream : streams) {

executor.submit(new Runnable() {

public void run() {

for (MessageAndMetadata msgAndMetadata : stream) {

// process message (msgAndMetadata.message())

System.out.println("topic: " + msgAndMetadata.topic());

Message message = (Message) msgAndMetadata.message();

ByteBuffer buffer = message.payload();

<SPAN style="WHITE-SPACE: pre"> </SPAN>byte[] bytes = new byte[message.payloadSize()];

buffer.get(bytes);

String tmp = new String(bytes);

System.out.println("message content: " + tmp);

}

}

});

}

分别启动zookeeper,kafka server之后,依次运行Producer,Consumer的代码

运行ProducerSample:

运行ConsumerSample:

更多详情见请继续阅读下一页的精彩内容http://www.linuxidc.com/Linux/2014-09/107385p2.htm

分享到:
评论

相关推荐

    kafka-java-demo 基于java的kafka生产消费者示例

    在"Kafka-java-demo"中,你将看到如何使用这些接口来实现一个简单的生产者和消费者示例。 【Kafka Producer】 Kafka生产者是负责将数据发布到Kafka主题的组件。在Java中,我们可以创建一个Producer实例,配置相关...

    spring-kafka-producer-consumer-example_java_springboot_kafka_

    标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...

    Springboot集成Kafka实现producer和consumer的示例代码

    这只是一个基本的示例,实际使用时可以根据需求进行更复杂的配置,如定制序列化器、设置回调函数、处理异常等。同时,确保 Kafka 服务和 ZooKeeper 正常运行,以便于 Spring Boot 应用程序能够顺利与 Kafka 集群交互...

    kafka java 下载的jar

    Kafka的核心组件包括生产者(Producer)、消费者(Consumer)和代理(Broker)。生产者负责发布消息到主题(Topic),消费者则订阅并消费这些消息,而代理是Kafka集群中的节点,它们接收、存储和转发消息。 在Java...

    kafka-java-demo 基于java的kafka生产消费者例子

    - **Consumer示例**:演示如何创建一个Kafka消费者,订阅特定主题,接收并处理来自生产者的消息。 **4. Kafka生产者实现** 生产者的主要任务是将数据写入Kafka。在Java中,这通常涉及以下几个步骤: 1. 创建`...

    Kafka-Simple-Producer-Consumer:使用Java 8的kafka的生产者和消费者的简单变化

    在"Kafka-Simple-Producer-Consumer-master"这个项目中,可能包含了示例代码,展示了如何使用Java 8编写Kafka的生产者和消费者。这些示例通常会包含以下部分: - 生产者类(ProducerClass.java):创建并配置...

    kafka安装相关文件以及java调用kafka示例项目

    在这个“kafka安装相关文件以及java调用kafka示例项目”中,我们包含了几个关键组件和示例,以便于理解和实践Kafka的使用。 首先,我们有Kafka的安装包,这通常包含服务器端的二进制文件,配置文件,以及必要的脚本...

    kafka学习代码(java开发kafka)

    在" kafka-study "压缩包中,你可能会找到一些示例代码,这些代码展示了如何使用Java API创建Kafka生产者和消费者,以及如何处理错误和配置选项。通过研究这些示例,你可以更好地理解和应用Kafka的Java开发实践。...

    Kafka中生产者和消费者java实现

    本篇文章将深入探讨如何在Java环境中使用IDEA,通过Maven构建工具来实现Kafka的生产者和消费者。 首先,我们需要设置项目环境。使用IntelliJ IDEA创建一个新的Java Maven项目,然后在pom.xml文件中添加Kafka相关的...

    kafka 发送和接收消息-Java版

    在本文中,我们将深入探讨如何使用Java来发送和接收消息到Apache Kafka,这是一个流行的分布式流处理平台。Apache Kafka被广泛用于构建实时数据管道和流应用,因为它提供了高吞吐量、低延迟的消息传递能力。 首先,...

    java实现flink订阅Kerberos认证的Kafka消息示例源码

    - 对于生产者,创建一个`FlinkKafkaProducer`实例,同样需要包含Kerberos配置。 - 生产者需要在发送消息前完成Kerberos认证。 5. **处理票证刷新** - Kerberos票证有有效期,需要定期刷新。在Java中,可以通过`...

    Kafka示例代码

    通过提供的Kafka示例代码,你可以学习如何在Java和Scala环境中设置和操作Kafka的生产者和消费者,理解Kafka与Hadoop的集成方式,以及如何在Scala这样的函数式语言中优雅地使用Kafka API。这些知识对于构建实时数据...

    kafka-producer-consumer

    在Kafka-producer-consumer项目中,"kafka-producer-consumer-main"可能是一个包含启动生产者和消费者示例的主程序。生产者会将消息发布到一个或多个主题,而消费者则订阅这些主题并处理接收到的消息。这种交互方式...

    kafka生产及消费示例代码,下到就是赚到

    这个"Kafka生产及消费示例代码"压缩包显然包含了一些帮助学习和理解Kafka基本操作的代码实例,包括如何创建生产者发送消息以及如何创建消费者接收消息。 首先,我们来了解一下Kafka的基本概念。Kafka是一个高吞吐量...

    Kafka使用Java客户端进行访问的示例代码

    Kafka 使用 Java 客户端进行访问的示例代码 ...本文介绍了使用 Java 客户端来访问 Kafka 的示例代码,包括生产者代码和消费者代码。这些代码可以作为开发者使用 Java 客户端来访问 Kafka 的参考。

    kafka java小例子

    在本文中,我们将深入探讨如何使用Java来实现Apache Kafka,这是一个强大的分布式消息系统。Kafka以其高吞吐量、持久...记住,理解和掌握Kafka的核心概念以及Java API的使用,对于在大数据和实时分析领域工作至关重要。

    apache-kafka-1.0.0 java Demo

    在这个"apache-kafka-1.0.0 java Demo"中,我们将探讨如何使用Java API与Kafka进行交互,实现基本的消息生产与消费功能。这个Demo虽然没有采用连接池来优化性能,但依然能展示Kafka的核心特性。 首先,我们要了解...

    spring boot操作kafka例子

    Spring Boot简化了Kafka的配置和使用,使得开发者能够快速地构建基于事件驱动的应用程序。 首先,让我们理解Kafka的基本概念。Apache Kafka是一个分布式流处理平台,它允许我们发布和订阅实时数据流。它具有高吞吐...

    KafkaDemo示例

    4. **编写生产者代码**: 创建一个Java应用程序,使用Kafka的Producer API发送消息到刚才创建的主题。 5. **编写消费者代码**: 创建另一个Java应用程序,定义消费者并订阅该主题,使用Consumer API读取消息。 6. **...

    kafka-consumer-producer:卡夫卡0.8.2.2的简单示例

    Kafka-consumer-producer示例会演示如何使用Java API在Kafka 0.8.2.2版本中创建生产者和消费者,以及如何进行消息的发布和消费。通过这个例子,开发者可以了解Kafka的基本工作流程,并为构建自己的实时数据处理系统...

Global site tag (gtag.js) - Google Analytics