http://www.tuicool.com/articles/uu6r2e
我使用的kafka版本是:0.7.2
jdk版本是:1.6.0_20
http://kafka.apache.org/07/quickstart.html 官方给的示例并不是很完整,以下代码是经过我补充的并且编译后能运行的。
分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm
Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm
Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm
Kafka使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm
Producer Code
import java.util.*;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
public class ProducerSample {
public static void main(String[] args) {
ProducerSample ps = new ProducerSample();
Properties props = new Properties();
props.put("zk.connect", "127.0.0.1:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message2");
producer.send(data);
producer.close();
}
}
Consumer Code
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
public class ConsumerSample {
public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("test-topic", 4);
Map<String, List<KafkaStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");
// create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads
for (final KafkaStream<Message> stream : streams) {
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata msgAndMetadata : stream) {
// process message (msgAndMetadata.message())
System.out.println("topic: " + msgAndMetadata.topic());
Message message = (Message) msgAndMetadata.message();
ByteBuffer buffer = message.payload();
<SPAN style="WHITE-SPACE: pre"> </SPAN>byte[] bytes = new byte[message.payloadSize()];
buffer.get(bytes);
String tmp = new String(bytes);
System.out.println("message content: " + tmp);
}
}
});
}
分别启动zookeeper,kafka server之后,依次运行Producer,Consumer的代码
运行ProducerSample:
运行ConsumerSample:
更多详情见请继续阅读下一页的精彩内容 : http://www.linuxidc.com/Linux/2014-09/107385p2.htm
相关推荐
在"Kafka-java-demo"中,你将看到如何使用这些接口来实现一个简单的生产者和消费者示例。 【Kafka Producer】 Kafka生产者是负责将数据发布到Kafka主题的组件。在Java中,我们可以创建一个Producer实例,配置相关...
标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...
这只是一个基本的示例,实际使用时可以根据需求进行更复杂的配置,如定制序列化器、设置回调函数、处理异常等。同时,确保 Kafka 服务和 ZooKeeper 正常运行,以便于 Spring Boot 应用程序能够顺利与 Kafka 集群交互...
Kafka的核心组件包括生产者(Producer)、消费者(Consumer)和代理(Broker)。生产者负责发布消息到主题(Topic),消费者则订阅并消费这些消息,而代理是Kafka集群中的节点,它们接收、存储和转发消息。 在Java...
- **Consumer示例**:演示如何创建一个Kafka消费者,订阅特定主题,接收并处理来自生产者的消息。 **4. Kafka生产者实现** 生产者的主要任务是将数据写入Kafka。在Java中,这通常涉及以下几个步骤: 1. 创建`...
在"Kafka-Simple-Producer-Consumer-master"这个项目中,可能包含了示例代码,展示了如何使用Java 8编写Kafka的生产者和消费者。这些示例通常会包含以下部分: - 生产者类(ProducerClass.java):创建并配置...
在这个“kafka安装相关文件以及java调用kafka示例项目”中,我们包含了几个关键组件和示例,以便于理解和实践Kafka的使用。 首先,我们有Kafka的安装包,这通常包含服务器端的二进制文件,配置文件,以及必要的脚本...
在" kafka-study "压缩包中,你可能会找到一些示例代码,这些代码展示了如何使用Java API创建Kafka生产者和消费者,以及如何处理错误和配置选项。通过研究这些示例,你可以更好地理解和应用Kafka的Java开发实践。...
本篇文章将深入探讨如何在Java环境中使用IDEA,通过Maven构建工具来实现Kafka的生产者和消费者。 首先,我们需要设置项目环境。使用IntelliJ IDEA创建一个新的Java Maven项目,然后在pom.xml文件中添加Kafka相关的...
在本文中,我们将深入探讨如何使用Java来发送和接收消息到Apache Kafka,这是一个流行的分布式流处理平台。Apache Kafka被广泛用于构建实时数据管道和流应用,因为它提供了高吞吐量、低延迟的消息传递能力。 首先,...
- 对于生产者,创建一个`FlinkKafkaProducer`实例,同样需要包含Kerberos配置。 - 生产者需要在发送消息前完成Kerberos认证。 5. **处理票证刷新** - Kerberos票证有有效期,需要定期刷新。在Java中,可以通过`...
通过提供的Kafka示例代码,你可以学习如何在Java和Scala环境中设置和操作Kafka的生产者和消费者,理解Kafka与Hadoop的集成方式,以及如何在Scala这样的函数式语言中优雅地使用Kafka API。这些知识对于构建实时数据...
在Kafka-producer-consumer项目中,"kafka-producer-consumer-main"可能是一个包含启动生产者和消费者示例的主程序。生产者会将消息发布到一个或多个主题,而消费者则订阅这些主题并处理接收到的消息。这种交互方式...
这个"Kafka生产及消费示例代码"压缩包显然包含了一些帮助学习和理解Kafka基本操作的代码实例,包括如何创建生产者发送消息以及如何创建消费者接收消息。 首先,我们来了解一下Kafka的基本概念。Kafka是一个高吞吐量...
Kafka 使用 Java 客户端进行访问的示例代码 ...本文介绍了使用 Java 客户端来访问 Kafka 的示例代码,包括生产者代码和消费者代码。这些代码可以作为开发者使用 Java 客户端来访问 Kafka 的参考。
在本文中,我们将深入探讨如何使用Java来实现Apache Kafka,这是一个强大的分布式消息系统。Kafka以其高吞吐量、持久...记住,理解和掌握Kafka的核心概念以及Java API的使用,对于在大数据和实时分析领域工作至关重要。
在这个"apache-kafka-1.0.0 java Demo"中,我们将探讨如何使用Java API与Kafka进行交互,实现基本的消息生产与消费功能。这个Demo虽然没有采用连接池来优化性能,但依然能展示Kafka的核心特性。 首先,我们要了解...
Spring Boot简化了Kafka的配置和使用,使得开发者能够快速地构建基于事件驱动的应用程序。 首先,让我们理解Kafka的基本概念。Apache Kafka是一个分布式流处理平台,它允许我们发布和订阅实时数据流。它具有高吞吐...
4. **编写生产者代码**: 创建一个Java应用程序,使用Kafka的Producer API发送消息到刚才创建的主题。 5. **编写消费者代码**: 创建另一个Java应用程序,定义消费者并订阅该主题,使用Consumer API读取消息。 6. **...
Kafka-consumer-producer示例会演示如何使用Java API在Kafka 0.8.2.2版本中创建生产者和消费者,以及如何进行消息的发布和消费。通过这个例子,开发者可以了解Kafka的基本工作流程,并为构建自己的实时数据处理系统...