kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考。kafka的安装请参考官方文档。
首先我们需要新建一个maven项目,然后在pom中引用kafka jar包,引用依赖如下:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> </dependency>
我们用的版本是0.8, 下面我们看下生产消息的代码:
package cn.outofmemory.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * Hello world! * */ public class KafkaProducer { private final Producer<String, String> producer; public final static String TOPIC = "TEST-TOPIC"; private KafkaProducer(){ Properties props = new Properties(); //此处配置的是kafka的端口 props.put("metadata.broker.list", "192.168.193.148:9092"); //配置value的序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); //配置key的序列化类 props.put("key.serializer.class", "kafka.serializer.StringEncoder"); //request.required.acks //0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails). //1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost). //-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains. props.put("request.required.acks","-1"); producer = new Producer<String, String>(new ProducerConfig(props)); } void produce() { int messageNo = 1000; final int COUNT = 10000; while (messageNo < COUNT) { String key = String.valueOf(messageNo); String data = "hello kafka message " + key; producer.send(new KeyedMessage<String, String>(TOPIC, key ,data)); System.out.println(data); messageNo ++; } } public static void main( String[] args ) { new KafkaProducer().produce(); } }
下面是消费端的代码实现:
package cn.outofmemory.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class KafkaConsumer { private final ConsumerConnector consumer; private KafkaConsumer() { Properties props = new Properties(); //zookeeper 配置 props.put("zookeeper.connect", "192.168.193.148:2181"); //group 代表一个消费组 props.put("group.id", "jd-group"); //zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); //序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(KafkaProducer.TOPIC, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) System.out.println(it.next().message()); } public static void main(String[] args) { new KafkaConsumer().consume(); } }
注意消费端需要配置成zk的地址,而生产端配置的是kafka的ip和端口。
来自:http://outofmemory.cn/code-snippet/33051/java-kafka-producer-consumer-example
相关推荐
kafka java 生产消费程序 demo 示例 kafka 是吞吐量巨大的一个消息系统,它是用 scala 写的,和普通的消息的生产消费还有所不同,写了个 demo 程序供大家参考。kafka 的安装请参考官方文档。 首先我们需要新建一个 ...
在"Kafka-java-demo"中,你将看到如何使用这些接口来实现一个简单的生产者和消费者示例。 【Kafka Producer】 Kafka生产者是负责将数据发布到Kafka主题的组件。在Java中,我们可以创建一个Producer实例,配置相关...
"Kafka-java-demo"项目是一个简单的示例,演示了如何使用Java API创建Kafka生产者和消费者。项目中通常会包含以下关键部分: - **Producer示例**:展示如何创建一个Kafka生产者,设置必要的配置(如服务器地址、...
Java Kafka 生产者/消费者Demo详解 在分布式消息系统中,Apache Kafka是一个高效、可扩展且高可用的数据流平台,广泛应用于实时数据处理和大数据领域。Kafka通过使用主题(Topics)和分区(Partitions)的概念,...
在本文中,我们将深入探讨如何将Spring Boot与Apache Kafka整合,以便实现发布/...这个"springboot整合kafka的发布/消费demo项目源码"提供了一个基础的示例,帮助开发者快速上手并理解Kafka在Spring Boot中的工作方式。
**KafkaDemo示例详解** Kafka是一种分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会,现在已经成为大数据领域中的重要组件。它主要用于构建实时数据管道和流应用,能够处理大量的实时数据。在这个名为...
在本文中,我们将深入探讨如何使用Apache Kafka 0.10.2版本的API来创建一个简单的生产者和消费者示例。Kafka是一种分布式流处理平台,常用于实时数据管道和消息传递,它允许应用程序高效地发布和订阅大量数据流。 #...
总结来说,"springboot-kafka-simple-demo"展示了如何在SpringBoot应用中实现Kafka的简单使用,包括配置Kafka连接、创建生产者和消费者,并进行消息的发送与接收。这只是一个基础的起点,实际项目中可能还需要考虑...
"demo"部分则是实际操作的实例,通过这些示例,开发者可以了解如何创建生产者、消费者,发布和消费消息,理解Kafka的基本工作原理,以及如何处理数据流。这些演示代码对于初学者理解和掌握Kafka的API用法非常有帮助...
在这个示例中,我们将关注如何使用Java API在Kafka中实现多线程消费,以及单个消费者组内的多线程消费。 首先,我们了解Kafka的基本概念。Kafka是一个发布/订阅模型的消息队列,它包含生产者(Producer)、消费者...
在这个"apache-kafka-1.0.0 java Demo"中,我们将探讨如何使用Java API与Kafka进行交互,实现基本的消息生产与消费功能。这个Demo虽然没有采用连接池来优化性能,但依然能展示Kafka的核心特性。 首先,我们要了解...
`kafka-demo`这个压缩包可能包含了示例代码,用于演示如何在集群模式下创建Kafka生产者和消费者。解压后,你可以看到相关的Java或Python源码,这些代码提供了如何连接到Kafka集群、发布和接收消息的示例。通过学习和...
在这个“kafka demo”中,你可能会看到如何创建生产者和消费者,以及如何发布和接收消息。这通常涉及到以下步骤: 1. **设置环境**:安装JDK,下载Kafka和ZooKeeper的二进制包,并配置相应的环境变量。 2. **启动...
在本文中,我们将深入探讨如何使用Spring Boot整合Apache Kafka,以构建一个生产者和消费者的示例。Apache Kafka是一个分布式流处理平台,常被用于构建实时数据管道和流应用。Spring Boot简化了Java开发,提供了开箱...
为了让这个Demo运行起来,你需要在主应用类(例如`Application.java`)中添加以下代码,启动Kafka消费者: ```java import org.springframework.boot.SpringApplication; import org.springframework.boot....
在"Maven工程"的背景下,这个示例代码很可能是用Java编写的,利用了Kafka的Java API。开发者可以通过阅读源代码,了解如何配置消费者,设置GroupId,以及如何启动线程来消费消息。代码中的注释对于初学者来说非常...
描述中提到的“卡夫卡示例SSL代码,阿里云连接demo示例”进一步确认了我们将讨论的内容,即在Java中如何设置和使用SSL连接来与阿里云上的Kafka集群通信。 首先,我们需要了解Kafka是什么。Kafka是一款分布式流处理...
在这个"Spring整合Kafka的Demo"中,我们可以学习到如何配置、创建生产者和消费者,以及如何在Spring应用中使用Kafka的API。 1. **Spring Kafka简介** Spring Kafka是Spring框架的一部分,它为Apache Kafka提供了一...
在实际编程中,理解并熟练运用Java的多线程生产和消费者模型,不仅可以提高程序的并发性能,还能有效地解决资源争抢和同步问题,是提升Java并发编程能力的关键一步。通过`demo`项目的实践,你可以更好地理解和掌握这...
本示例"Kafka+Log4j demo"将展示如何将Log4j配置为将日志消息发送到Kafka主题。 **一、Kafka基础** Kafka是一种高吞吐量、低延迟的消息中间件,设计初衷是为了处理大规模的日志数据。它支持发布/订阅模型,允许...