- 浏览: 496422 次
- 性别:
- 来自: 广州
文章分类
- 全部博客 (502)
- Java (70)
- Linux (10)
- 数据库 (38)
- 网络 (10)
- WEB (13)
- JSP (4)
- 互联网 (71)
- JavaScript (30)
- Spring MVC (19)
- HTML (13)
- CSS (3)
- AngularJS (18)
- Redis (5)
- Bootstrap CSS (1)
- ZooKeeper (4)
- kafka (6)
- 服务器缓存 (4)
- Storm (1)
- MongoDB (9)
- Spring boot (16)
- log4j (2)
- maven (3)
- nginx (5)
- Tomcat (2)
- Eclipse (4)
- Swagger (2)
- Netty (5)
- Dubbo (1)
- Docker (7)
- Hadoop (12)
- OAuth (1)
- webSocket (4)
- 服务器性能 (7)
- Session共享 (1)
- tieye修改 (1)
- 工作 (1)
- 有用的语录 (0)
- https (2)
- common (5)
- 产品开发管理 (1)
- CDN 工作原理 (1)
- APNS、GCM (1)
- 架构图 (3)
- 功能实现分析 (1)
- JMX (1)
- 服务器相关操作命令 (1)
- img02 (0)
- 服务器环境搭建 (9)
- goodMenuBook (1)
- CEInstantPot (0)
- 有用数据 (1)
- 百度地图WEB API (2)
- 正则表达式 (1)
- 样式例子 (2)
- staticRecipePressureCooker.zip (1)
- jCanvas (1)
- 网站攻击方法原理 (1)
- 架构设计 (3)
- 物联网相关 (3)
- 研发管理 (7)
- 技术需求点 (1)
- 计划 (1)
- spring cloud (11)
- 服务器开发的一些实用工具和方法 (1)
- 每天学到的技术点 (4)
- Guava (1)
- ERP 技术注意要点 (2)
- 微信小程序 (1)
- FineRepor (1)
- 收藏夹 (1)
- temp (5)
- 服务架构 (4)
- 任职资格方案 (0)
- osno_test (1)
- jquery相关 (3)
- mybatis (4)
- ueditor (1)
- VueJS (7)
- python (10)
- Spring EL (1)
- shiro (1)
- 前端开发原理与使用 (7)
- YARN (1)
- Spark (1)
- Hbase (2)
- Pig (2)
- 机器学习 (30)
- matplotlib (1)
- OpenCV (17)
- Hystrix (1)
- 公司 (1)
- miniui (4)
- 前端功能实现 (3)
- 前端插件 (1)
- 钉钉开发 (2)
- Jenkins (1)
- elasticSearch使用 (2)
- 技术规范 (4)
- 技术实现原理 (0)
最新评论
kafka java原生简单应用
KafkaTestMain.java
KafkaProducer.java
KafkaConsumer.java
依赖包(pom.xml)
结果:
参考原文(配置属性):http://blog.csdn.net/louisliaoxh/article/details/51516070
参考原文:http://chengjianxiaoxue.iteye.com/blog/2190488
KafkaTestMain.java
package com; public class KafkaTestMain extends Thread{ private static String topic = "test-xing"; public static void main(String[] args) { new KafkaConsumer(topic).start();// 消费者 new KafkaProducer(topic).start();// 生产者 } }
KafkaProducer.java
package com; import java.util.Properties; import java.util.concurrent.TimeUnit; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; public class KafkaProducer extends Thread { private static String topic; public KafkaProducer(String topic) { super(); this.topic = topic; } @Override public void run() { Producer producer = createProducer(); int i = 0; while (true) { producer.send(new KeyedMessage<String, String>(topic, "message: " + i++)); System.out.println("已经发送一则消息"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } private Producer createProducer() { Properties properties = new Properties(); properties.put("zookeeper.connect", "114.55.72.173:2181");// 声明zk IP properties.put("serializer.class", StringEncoder.class.getName()); // 声明kafka broker IP properties.put("metadata.broker.list", "114.55.72.173:9092"); return new Producer<String, String>(new ProducerConfig(properties)); } }
KafkaConsumer.java
package com; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer extends Thread { private String topic; private ConsumerConnector consumer; public KafkaConsumer(String topic) { super(); this.topic = topic; } @Override public void run() { consumer = createConsumer(); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); // 一次从主题中获取一个数据 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while (iterator.hasNext()) { String message = new String(iterator.next().message()); System.out.println("接收到: " + message); } } private ConsumerConnector createConsumer() { Properties properties = new Properties(); properties.put("zookeeper.connect", "114.55.72.173:2181");// 声明zk // 必须要使用别的组名称,如果生产者和消费者都在同一组,则不能访问同一组内的topic数据 properties.put("group.id", "group_xing"); // ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大 // properties.put("zookeeper.session.timeout.ms", "4000"); // zk follower落后于zk leader的最长时间 // properties.put("zookeeper.sync.time.ms", "200"); // 往zookeeper上写offset的频率 properties.put("auto.commit.interval.ms", "1000"); // 消费最老消息为smallest,最新为largest properties.put("auto.offset.reset", "smallest"); // 序列化类 // properties.put("serializer.class", "kafka.serializer.StringEncoder"); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } }
依赖包(pom.xml)
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.3</version> </dependency>
结果:
接收到: message: 0 接收到: message: 1 接收到: message: 2 已经发送一则消息 接收到: message: 3 已经发送一则消息 接收到: message: 4 已经发送一则消息 接收到: message: 5 已经发送一则消息 接收到: message: 6 已经发送一则消息 接收到: message: 7
参考原文(配置属性):http://blog.csdn.net/louisliaoxh/article/details/51516070
参考原文:http://chengjianxiaoxue.iteye.com/blog/2190488
发表评论
-
rocketmq安装部署.txt
2021-11-07 19:10 218docker search rocketmq docke ... -
各种MQ应用分式
2018-05-25 16:07 445RabbitMQ RabbitMQ有一个交换机(Exchan ... -
spring kafka 配置详解
2016-09-28 10:24 6276spring kafka 配置详解 使用spring-in ... -
spring-integration-kafka简单应用
2016-09-26 19:52 1276spring-integration-kafka简单应用 ... -
Kafka
2016-09-10 10:33 909Kafka 消息队列MQ技术的一种应用 kafka的构 ...
相关推荐
Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络应用,如TCP服务器。而Kafka是一款分布式流处理平台,它在大数据实时处理和消息传递中扮演着核心角色。 标题"使用netty实现TCP...
Java_AutoMQ是一个创新的项目,它是Apache Kafka的云原生分支,旨在提供更高效、更灵活的分布式消息传递解决方案。这个项目的核心理念是将存储层与计算层分离,允许用户将消息数据存储在Amazon S3和EBS(Elastic ...
在当前的IT技术领域中,微服务架构和云原生应用已经成为构建和部署软件系统的主流范式。该文件标题为“构建微服务云原生应用——介绍.pdf”,顾名思义,该文档涉及的主题和知识点围绕微服务架构、云原生应用以及与之...
将Kafka设置为Windows服务,我们可以利用Java的Windows服务包装器(如nssm,Non-Sucking Service Manager),它允许我们将Java应用作为Windows服务来启动。下载nssm后,指定Java的路径、Kafka的可执行jar文件以及...
3. 简单API:提供Java和Scala的API,方便开发者集成到各种应用程序中。 4. 分布式设计:Kafka集群支持跨多个服务器部署,能自动处理节点故障。 5. 时间窗口:支持基于时间的窗口操作,适用于实时数据处理。 二、...
1. **Kafka概述**:Kafka是一个高吞吐量、低延迟的分布式发布订阅消息系统,主要用于构建实时数据管道和流应用。它具有持久化、可扩展性和高可用性等特性。 2. **Kafka架构**:Kafka由Brokers、Producers、...
- Windows并不是Kafka的原生支持平台,但通过JDK和一些调整,可以在Windows上运行Kafka。首先,确保安装了Java Development Kit (JDK) 8或以上版本。 - 解压`kafka_2.11-2.4.1.tgz`文件到指定目录,配置环境变量,...
原生应用的优势在于性能优化和对各自平台特性的深度支持,如Android的Material Design和iOS的Human Interface Guidelines。 **Java核心技术:** 1. **Spring Boot框架**:作为后端开发的基础,Spring Boot简化了...
Know Streaming是一套云原生的Kafka管控平台,脱胎于众多互联网内部多年的Kafka运营实践经验,专注于Kafka运维管控、监控告警、资源治理、多活容灾等核心场景。在用户体验、监控、运维管控上进行了平台化、可视化、...
这个库的主要目标是让 Scala 开发者能更自然地使用 Kafka Streams 功能,同时保持与原生 Java API 的紧密兼容性。 Kafka Streams 是 Apache Kafka 的一个模块,它允许开发者以流处理的方式对数据进行实时分析和处理...
Azkarra Streams是一个轻量级的Java框架,可轻松开发和操作Kafka Streams应用程序(Azkarra是巴斯克语,表示“快速” ) 是用于构建应用程序和微服务的客户端库,其中输入和输出数据存储在Kafka集群中。 它结合了在...
而当我们谈论“Python-Apache Kafka Streams”的实现时,这通常意味着开发者正在寻找或者已经实现了使用 Python 语言与 Kafka Streams API 进行交互的方法,尽管原生的 Kafka Streams 是用 Java 编写的。 Python ...
【标题】"Kafka-Grpc_kafka_assignment_源码" 涉及的主要知识点是Apache Kafka和gRPC的整合应用...这将有助于提升对分布式系统设计和实现的理解,对于从事大数据处理和云原生应用开发的人员来说,是非常有价值的实践。
首先,我们要理解“双端原生源码”意味着该软件提供了Android和iOS两端的应用程序源代码,这些源代码是使用各自平台的原生语言(Java和Swift/Objective-C)编写的。原生开发的优势在于性能更优,用户体验更佳,同时...
2. **MyBatis**:MyBatis是一个持久层框架,它将SQL语句与Java代码分离,通过XML或注解方式配置和映射原生信息,使得开发者可以更好地控制SQL执行过程,提高了数据库操作的灵活性。 3. **Dubbo**:Dubbo是阿里巴巴...
【派单系统平台】是一个基于Java技术开发的抢单系统,提供了完整的源代码,包括原生的Android和iOS应用程序,以及详细的项目说明文档。这个系统主要用于实现服务提供者与服务需求者之间的快速匹配,通常应用于物流、...
本篇文章将详细介绍在Java环境中,如何利用原生Java API、Spring框架以及Spring Cloud Stream来实现Kafka的集成与应用。 首先,我们来看**原生Java API**的实现方式。Apache Kafka提供了Java客户端库,允许开发者...
- **Android与iOS原生开发**:这款源码覆盖了Android和iOS两大主流平台,使用Java或Kotlin(Android)和Swift或Objective-C(iOS)进行原生开发,确保了良好的性能和用户体验。 - **UI设计**:高度模仿微信的界面...
8. **Quarkus**:这是Red Hat开发的一个超快、全功能的Kubernetes原生Java框架,旨在为微服务和云原生环境提供极致的性能。 9. **Elasticsearch**:Elasticsearch是一个基于Lucene的搜索服务器,提供全文检索、分析...