package com.ganglia.kafka; import java.util.Date; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class ProducerTest2 { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("metadata.broker.list","bfdbjc1:9092,test1:9092,test2:9092"); props.setProperty("serializer.class","kafka.serializer.StringEncoder"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); try { int i =1; while(true){ i++; String text = new StringBuffer((i+"")).reverse()+":test-kafka_"+args[0]+"_"+i; KeyedMessage<String, String> data = new KeyedMessage<String, String>("test",text); producer.send(data); Thread.sleep(100); System.out.println(DateUtil.fmtDateToYMDHMS(new Date())+"\t"+text); } } catch (Exception e) { e.printStackTrace(); } producer.close(); } }
1.安装zookeeper.
2.启动zookeeper.
3.启动kafka服务, 在zk1,zk2,zk3上分别运行:
kafka-server-start.sh ../config/server.properties /启动kafka
4. 新建一个TOPIC(replication-factor=num of brokers)
kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181
5.假设我们在zk2上,开一个终端,发送消息至kafka
kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test
在发送消息的终端输入:Hello Kafka
6.假设我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)
kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning
package com.ganglia.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class TestConsumer extends Thread{ private final ConsumerConnector consumer; private final String topic; public static void main(String[] args) { TestConsumer consumerThread = new TestConsumer("test"); consumerThread.start(); } public TestConsumer(String topic) { consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic =topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect","test1:2181,test2:2181,bfdbjc1:2181"); props.put("group.id", "0"); props.put("zookeeper.session.timeout.ms","10000"); return new ConsumerConfig(props); } public void run(){ Map<String,Integer> topickMap = new HashMap<String, Integer>(); topickMap.put(topic, 1); Map<String, List<KafkaStream<byte[],byte[]>>> streamMap =consumer.createMessageStreams(topickMap); KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0); ConsumerIterator<byte[],byte[]> it =stream.iterator(); System.out.println("*********Results********"); while(true){ if(it.hasNext()){ /* MessageAndMetadata<byte[], byte[]> mm = it.next(); System.err.println("get data:" +new String(mm.message())); */ System.err.println("get data:" +new String(it.next().message())); } } } }
相关推荐
7. **Kafka Streams**:对于更复杂的流处理任务,Java开发者还可以利用Kafka提供的`KafkaStreams`库,它提供了一种编程模型,用于构建状态ful的、容错的流处理应用。 了解并熟练掌握以上知识点,对于在Java中进行...
Java面试题+Java并发编程(J.U.C)+Java8实战+Redis+kafka Java 『必看』2021 版最新Java 学习路线图(持续刷新):+1::+1::+1: Java入门面试题 Java基础入门80问,适合新手,老鸟直接跳过 Java并发编程(J.U.C) ...
在本文中,我们将深入探讨Apache Kafka的Java编程实践,特别是关注单线程和多线程在Kafka生产者与消费者中的应用,以及多线程管理器的实现。Apache Kafka是一个分布式流处理平台,广泛用于实时数据管道和消息传递。...
1. **Netty**:Netty提供了丰富的网络编程模型,包括TCP和UDP协议的支持。TCP长连接是一种保持连接状态,允许多次数据交换的通信方式。在Netty中,我们可以创建一个ServerBootstrap实例来启动服务器,并配置...
- 它支持RESTful编程模型,允许创建HTTP服务,方便前后端分离。 - Spring MVC通过依赖注入(DI)和面向切面编程(AOP)提供灵活的控制反转,使代码更易于测试和维护。 2. **多线程**: - 多线程是指在一个程序中...
JDK是Java编程语言的运行环境,是运行Kafka和Zookeeper所必需的。 **JDK** JDK(Java Development Kit)是开发和运行Java应用程序的必备工具。这里的`jdk1.8.0_144.tar.gz`是Oracle JDK的一个版本,适用于Linux...
"Java实现Kafka数据生产者"这个功能可以让你使用Java编程语言创建一个能够连接到指定的Kafka集群(通过bootstrap servers)并向其中发送数据的应用程序。通过这个功能,你可以将数据以消息的形式发送到Kafka主题中,...
4. **灵活性**:Kafka支持发布/订阅模型,也可以用作消息队列,同时提供API供各种编程语言使用。 **二、SpringBoot与Kafka集成** SpringBoot简化了Java应用程序的开发,而Spring for Apache Kafka提供了与Kafka...
Scala是一种多范式的编程语言,Kafka的API是用Scala编写的,因此这个数字表示的是与哪个版本的Scala API兼容。在这个例子中,是Scala 2.13。 - `3.6.2` 是Kafka本身的版本号。每个版本都会包含新功能、性能优化、bug...
Spring Framework是一种流行的Java应用程序框架,提供了一个广泛的编程模型和配置机制。将Kafka与Spring集成,可以实现实时数据处理、日志处理和消息队列等功能。本文将详细介绍如何将Kafka与Spring集成,并提供了...
总结来说,这个"Kafka全套资源环境+demo"是一个全面的学习资源,它覆盖了从安装配置到实际编程的所有环节,对于想深入了解和使用Kafka的Java开发者来说,是一份极具价值的参考资料。通过深入研究和实践,开发者不仅...
在这个"Kafka大数据 生产者消费者实例"中,我们将探讨如何通过Java编程语言来实现Kafka的生产者和消费者。 首先,我们要理解Kafka中的**生产者(Producer)**,它是负责发布消息到特定主题的组件。在Java中,我们...
在Java编程环境下,Kafka以其高效、可扩展性和容错性赢得了开发者们的青睐。 1. **Kafka基本概念** - **主题(Topic)**:Kafka中的数据以主题的形式存在,主题是逻辑上的分类,可以看作是消息的分类目录。 - **...
3. **Java与Spark Streaming**:Spark Streaming API支持多种编程语言,包括Java。在Java中,我们使用`JavaDStream`接口来表示连续的数据流,并通过`JavaInputDStream`从各种数据源(如Kafka)接收数据。 4. **配置...
通过`spring-boot-starter-data-kafka`起步依赖,我们可以快速创建生产者和消费者,利用Spring的注解驱动特性简化编程模型。同时,利用Kafka的强大功能,如高吞吐量、持久化和复制,可以在分布式系统中构建可靠的...
3. **数据模型**:深入探讨Kafka的数据模型,包括如何创建和管理主题,以及如何分配分区和副本,以实现数据的水平扩展和容错性。 4. **生产者API**:学习如何使用Java或Scala等语言的Kafka生产者API,发送消息到...
1. 学习与精通Java:对于大数据开发者来说,熟悉Java语言是基础,包括理解其语法、类库和并发编程模型,以便有效地利用Java进行大数据处理。 2. 利用Java生态系统:Java拥有丰富的开源库和框架,如Apache Spark、...
Scala是一种多范式的编程语言,与Java API兼容,是构建大规模并发系统的好选择,因其强大的函数式编程特性,使得编写Kafka这样的分布式系统更为便捷。 4. **文件结构解析** 解压kafka_2.13-2.8.1.tgz后,我们会...
【标签】"Java" 明确指出该项目是用Java编程语言编写的,这意味着它遵循Java的编程规范,使用了Java的相关库和框架。由于Java具有跨平台性,这个应用可以在多种操作系统上运行。开发者可能会使用Spring Boot等框架来...
6. Java编程:可能包含用Java实现的Kafka客户端代码。 7. 示例代码和教程:适合学习者理解如何在实际项目中使用Kafka进行读写操作。 为了深入学习这些知识点,可以解压“Kafaka_Opt”文件,查看项目结构,阅读源...