<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.test</groupId> <artifactId>Kafka-Demo</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>io.appium</groupId> <artifactId>java-client</artifactId> <version>6.0.0-BETA5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.4</version> </dependency> </dependencies> </project>
2、生产者
package com; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaProducer { private final Producer<String, String> producer; public final static String TOPIC = "linlin"; private KafkaProducer() { Properties props = new Properties(); // 此处配置的是kafka的端口 props.put("metadata.broker.list", "127.0.0.1:9092"); props.put("zk.connect", "127.0.0.1:2181"); // 配置value的序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); // 配置key的序列化类 props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "-1"); producer = new Producer<String, String>(new ProducerConfig(props)); } void produce() { int messageNo = 1000; final int COUNT = 10000; while (messageNo < COUNT) { String key = String.valueOf(messageNo); String data = "hello kafka message " + key; producer.send(new KeyedMessage<String, String>(TOPIC, key, data)); System.out.println(data); messageNo++; } } public static void main(String[] args) { new KafkaProducer().produce(); } }
右键:run as java application,执行前需要启动Zookeeper、kafka,具体操作详见
http://zhangwenlongchina.iteye.com/admin/blogs/2420493
运行结果:
3、消费者
package com; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class KafkaConsumer { private final ConsumerConnector consumer; private KafkaConsumer() { Properties props = new Properties(); // zookeeper 配置 props.put("zookeeper.connect", "127.0.0.1:2181"); // group 代表一个消费组 props.put("group.id", "lingroup"); // zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("rebalance.max.retries", "5"); props.put("rebalance.backoff.ms", "1200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); // 序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(KafkaProducer.TOPIC, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + it.next().message() + "<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"); } public static void main(String[] args) { new KafkaConsumer().consume(); } }
运行结果:
相关推荐
下面将详细介绍Kafka入门的相关知识点。 **Kafka简介** Kafka是一个分布式流处理平台,它的核心功能是消息队列系统,用于处理大量实时数据的发布和订阅。它被设计为在分布式环境中运行,可以处理高并发的读写请求,...
### Kafka入门知识点详解 #### 一、概述与系统环境 Kafka是一款强大的分布式消息系统,主要应用于实时数据处理场景。其高效的数据传输能力和高吞吐量特性使其在大数据领域受到广泛青睐。 - **系统环境**:本文档...
**Kafka入门详解** Kafka是一款高性能、分布式的消息中间件,最初由LinkedIn开发,后成为Apache顶级项目。它主要用于处理实时数据流,提供高吞吐量的发布订阅服务,同时也支持离线数据处理。在本篇文章中,我们将...
而消费者则通过消费组(Consumer Group)来标识,每个消费组可以订阅一个或多个topic,并且每个消息记录都会被分配给订阅组中的一个消费者实例。 搭建单节点的Kafka环境涉及几个步骤,首先需要搭建单节点的...
### Kafka入门简介 #### 一、Kafka概述 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。它是一个高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据,如网页...
本资源包是针对Kafka集成SpringBoot的简单收发实例,适合初学者进行Kafka入门学习。 **一、Kafka简介** Kafka最初由LinkedIn开发,后来成为Apache顶级项目。它的主要特点是: 1. **高性能**:Kafka支持每秒数十万...
1.kafka的初认识 2.Kafka 基础实战 :消费者和生产者实例 3.Kafka 核心源码剖析 4.Kafka 用户日志上报实时统计
- **Broker(服务代理节点)**:通常指Kafka集群中的一个独立服务节点或实例。在大多数情况下,Broker也可以理解为一台部署了一个Kafka实例的服务器。多个Broker组成Kafka集群。 #### 四、主题与分区 **主题...
Kafka入门Demo主要介绍了Apache Kafka的基本概念、工作原理以及如何进行简单操作,是初学者了解和使用Kafka的一个基础教程。Kafka是一个分布式流处理平台,由LinkedIn开发并开源,后来成为了Apache软件基金会的顶级...
- 消费者组(Consumer Group):消费者实例组成的集合,提高并发消费能力。 - 重平衡(Rebalance):消费者组内分区分配的动态调整过程。 - 三层消息架构:主题层、分区层和消息层,确保数据分布和冗余。 2. Kafka...
一本经典的kafka入门书籍。 Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行...
在本篇Kafka快速入门教程中,我们主要探讨了如何使用Python客户端库`confluent-kafka`来与Apache Kafka进行交互。`confluent-kafka`是一个轻量级的Python模块,它对librdkafka进行了封装,支持Kafka 0.8以上的版本。...
通过这个Kafka入门Demo,你可以了解如何使用Java API进行消息生产和消费,为进一步学习和使用Kafka打下基础。实践中,还需要注意配置优化、数据保留策略、监控和故障恢复等方面,以确保Kafka在生产环境中的稳定运行...
《Kafka入门与实践》中的大量实例来源于作者在实际工作中的实践,具有现实指导意义。相信读者阅读完本书之后,能够全面掌握Kafka的基本实现原理及其基本操作,能够根据书中的案例举一反三,解决实际工作和学习中的...
入门、进阶、商业实战》可能会涵盖如何设计Kafka架构、优化性能、处理异常情况、与其他技术栈(如Hadoop、Spark)的集成等内容,帮助读者从基础到高级全面掌握Kafka的使用。 总的来说,Kafka作为大数据领域的重要...
- **Broker(代理)**:单个Kafka服务器实例。 - **Partition(分区)**:为了提高并行度,将一个topic分成多个partition。 - **Replication(复制)**:每个partition可以有多个副本,提高数据可靠性和可用性。 ##...
本入门实例通过实现一个简单的聊天室,旨在帮助初学者理解消息队列的基本概念、工作原理及其应用价值。 首先,我们需要理解消息队列的核心概念。消息队列是一种中间件,它的主要任务是作为生产者和消费者之间的缓冲...
### Kafka入门学习笔记知识点概述 #### 一、Kafka简介及安装 - **Kafka**:Apache Kafka 是一款开源的分布式消息系统,主要用于构建实时数据管道以及基于流的数据处理应用程序。它具有高吞吐量、低延迟的特点,...
kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper...
- 使用`KafkaConsumer`类创建消费者实例。 - 调用`subscribe()`方法订阅一个或多个主题。 - 循环调用`poll()`方法获取消息。 - 关闭消费者实例。 #### 四、Kafka高级特性 **4.1 分区策略** Kafka支持自定义分区...