kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.
我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.
其中kafka为0.8V,zookeeper为3.4.5V
一.Zookeeper集群构建
我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.(本示例基于伪分布式部署)
1) zk-0
调整配置文件:
clientPort=2181 server.0=127.0.0.1:2888:3888 server.1=127.0.0.1:2889:3889 server.2=127.0.0.1:2890:3890 ##只需要修改上述配置,其他配置保留默认值
启动zookeeper
./zkServer.sh start
2) zk-1
调整配置文件(其他配置和zk-0一只):
clientPort=2182 ##只需要修改上述配置,其他配置保留默认值
启动zookeeper
./zkServer.sh start
3) zk-2
调整配置文件(其他配置和zk-0一只):
clientPort=2183 ##只需要修改上述配置,其他配置保留默认值
启动zookeeper
./zkServer.sh start
二. Kafka集群构建
因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.
1) kafka-0
在config目录下修改配置文件为:
broker.id=0 port=9092 num.network.threads=2 num.io.threads=2 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dir=./logs num.partitions=2 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 #log.retention.bytes=1073741824 log.segment.bytes=536870912 ##replication机制,让每个topic的partitions在kafka-cluster中备份2个 ##用来提高cluster的容错能力.. default.replication.factor=1 log.cleanup.interval.mins=10 ##zookeeper.connect指定zookeeper的地址,默认情况下将会在zk的“/”目录下 ##创建meta信息和路径,为了对znode进行归类,我们可以在connect之后追加路径,比如 ##127.0.0.1:2183/kafka ##不过需要注意,此后的producer、consumer都需要带上此根路径 zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 zookeeper.connection.timeout.ms=1000000
因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。
> cd kafka-0 > ./sbt update > ./sbt package > ./sbt assembly-package-dependency
其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:
> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &
因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.
2) kafka-1
broker.id=1 port=9093 ##其他配置和kafka-0保持一致
然后和kafka-0一样执行打包命令,然后启动此broker.
> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &
仍然可以通过如下指令查看topic的"partition"/"replicas"的分布和存活情况.
> bin/kafka-list-topic.sh --zookeeper localhost:2181 topic: my-replicated-topic partition: 0 leader: 2 replicas: 1,2,0 isr: 2 topic: test partition: 0 leader: 0 replicas: 0 isr: 0
到目前为止环境已经OK了,那我们就开始展示编程实例吧。[配置参数详解]
三.项目准备
项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.
<dependencies> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.2</artifactId> <version>0.8.0</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency> </dependencies>
四.Producer端代码
1) producer.properties文件:此文件放在/resources目录下
#partitioner.class= ##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata ##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来 ##此值,我们可以在spring中注入过来 ##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093 ##,127.0.0.1:9093 ##同步,建议为async producer.type=sync compression.codec=0 serializer.class=kafka.serializer.StringEncoder ##在producer.type=async时有效 #batch.num.messages=100
2) KafkaProducerClient.java代码样例
import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * User: guanqing-liu */ public class KafkaProducerClient { private Producer<String, String> inner; private String brokerList;//for metadata discovery,spring setter private String location = "kafka-producer.properties";//spring setter private String defaultTopic;//spring setter public void setBrokerList(String brokerList) { this.brokerList = brokerList; } public void setLocation(String location) { this.location = location; } public void setDefaultTopic(String defaultTopic) { this.defaultTopic = defaultTopic; } public KafkaProducerClient(){} public void init() throws Exception { Properties properties = new Properties(); properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location)); if(brokerList != null) { properties.put("metadata.broker.list", brokerList); } ProducerConfig config = new ProducerConfig(properties); inner = new Producer<String, String>(config); } public void send(String message){ send(defaultTopic,message); } public void send(Collection<String> messages){ send(defaultTopic,messages); } public void send(String topicName, String message) { if (topicName == null || message == null) { return; } KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message); inner.send(km); } public void send(String topicName, Collection<String> messages) { if (topicName == null || messages == null) { return; } if (messages.isEmpty()) { return; } List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(); int i= 0; for (String entry : messages) { KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry); kms.add(km); i++; if(i % 20 == 0){ inner.send(kms); kms.clear(); } } if(!kms.isEmpty()){ inner.send(kms); } } public void close() { inner.close(); } /** * @param args */ public static void main(String[] args) { KafkaProducerClient producer = null; try { producer = new KafkaProducerClient(); //producer.setBrokerList(""); int i = 0; while (true) { producer.send("test-topic", "this is a sample" + i); i++; Thread.sleep(2000); } } catch (Exception e) { e.printStackTrace(); } finally { if (producer != null) { producer.close(); } } } }
3) spring配置
<bean id="kafkaProducerClient" class="com.test.kafka.KafkaProducerClient" init-method="init" destroy-method="close"> <property name="zkConnect" value="${zookeeper_cluster}"></property> <property name="defaultTopic" value="${kafka_topic}"></property> </bean>
五.Consumer端
1) consumer.properties:文件位于/resources目录下
## 此值可以配置,也可以通过spring注入 ##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 ##,127.0.0.1:2182,127.0.0.1:2183 # timeout in ms for connecting to zookeeper zookeeper.connectiontimeout.ms=1000000 #consumer group id group.id=test-group #consumer timeout #consumer.timeout.ms=5000 auto.commit.enable=true auto.commit.interval.ms=60000
2) KafkaConsumerClient.java代码样例
package com.test.kafka; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.message.MessageAndMetadata; /** * User: guanqing-liu */ public class KafkaConsumerClient { private String groupid; //can be setting by spring private String zkConnect;//can be setting by spring private String location = "kafka-consumer.properties";//配置文件位置 private String topic; private int partitionsNum = 1; private MessageExecutor executor; //message listener private ExecutorService threadPool; private ConsumerConnector connector; private Charset charset = Charset.forName("utf8"); public void setGroupid(String groupid) { this.groupid = groupid; } public void setZkConnect(String zkConnect) { this.zkConnect = zkConnect; } public void setLocation(String location) { this.location = location; } public void setTopic(String topic) { this.topic = topic; } public void setPartitionsNum(int partitionsNum) { this.partitionsNum = partitionsNum; } public void setExecutor(MessageExecutor executor) { this.executor = executor; } public KafkaConsumerClient() {} //init consumer,and start connection and listener public void init() throws Exception { if(executor == null){ throw new RuntimeException("KafkaConsumer,exectuor cant be null!"); } Properties properties = new Properties(); properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location)); if(groupid != null){ properties.put("groupid", groupid); } if(zkConnect != null){ properties.put("zookeeper.connect", zkConnect); } ConsumerConfig config = new ConsumerConfig(properties); connector = Consumer.createJavaConsumerConnector(config); Map<String, Integer> topics = new HashMap<String, Integer>(); topics.put(topic, partitionsNum); Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics); List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic); threadPool = Executors.newFixedThreadPool(partitionsNum * 2); //start for (KafkaStream<byte[], byte[]> partition : partitions) { threadPool.execute(new MessageRunner(partition)); } } public void close() { try { threadPool.shutdownNow(); } catch (Exception e) { // } finally { connector.shutdown(); } } class MessageRunner implements Runnable { private KafkaStream<byte[], byte[]> partition; MessageRunner(KafkaStream<byte[], byte[]> partition) { this.partition = partition; } public void run() { ConsumerIterator<byte[], byte[]> it = partition.iterator(); while (it.hasNext()) { // connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用 MessageAndMetadata<byte[], byte[]> item = it.next(); try{ executor.execute(new String(item.message(),charset));// UTF-8,注意异常 }catch(Exception e){ // } } } public String getContent(Message message){ ByteBuffer buffer = message.payload(); if (buffer.remaining() == 0) { return null; } CharBuffer charBuffer = charset.decode(buffer); return charBuffer.toString(); } } public static interface MessageExecutor { public void execute(String message); } /** * @param args */ public static void main(String[] args) { KafkaConsumerClient consumer = null; try { MessageExecutor executor = new MessageExecutor() { public void execute(String message) { System.out.println(message); } }; consumer = new KafkaConsumerClient(); consumer.setTopic("test-topic"); consumer.setPartitionsNum(2); consumer.setExecutor(executor); consumer.init(); } catch (Exception e) { e.printStackTrace(); } finally { if(consumer != null){ consumer.close(); } } } }
3) spring配置(略)
需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.
在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。
相关推荐
大华无插件播放项目111
内容概要:本文详细介绍了Oracle 19c数据库的备份恢复和导入导出操作。首先概述了基本命令,然后分别讲述了三种工作方式(交互式、命令行、参数文件)和三种模式(表、用户、全库)。接着介绍了高级选项,如分割成多个文件、增量导出/导入、以SYSDBA进行导出/导入、表空间传输等。最后讨论了优化技巧,包括加快导出和导入速度的方法。还解决了一些常见问题,如字符集问题和版本问题。 适用人群:Oracle数据库管理员和相关技术人员。 使用场景及目标:适合在日常数据库管理和维护中进行数据备份、恢复、导入和导出操作,提高数据安全性和管理效率。 其他说明:文章内容丰富,涉及多种实用技巧,适用于不同场景下的具体操作,有助于提升工作效率。
基于Python Flask开发的旅游酒店大数据可视化项目,可以直接运行。 操作步骤: 1. 解压缩项目文件 2. 使用 pycharm打开项目 3. 运行项目中的app.py文件 注意:需要确保项目的Flask Python相关的环境已经搭建完成。
Android 毕业设计,Android 毕业设计,小Android 程设计,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。
Android 毕业设计,Android 毕业设计,小Android 程设计,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。
基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告,个人高分设计项目、经导师指导并认可通过的高分设计项目,评审分99分,代码完整确保可以运行,小白也可以亲自搞定,主要针对计算机相关专业的学生和需要项目实战练习的学习者。 基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告基于stm32和openmv的电赛校赛自动泊车题目源码+文档设计报告个人高分设计项目、经导师指导并认可通过的高分设计项目,评审分99分,代码完整确保可以运行,小白也可以亲自搞定,主要针对计算机相关专业的学生和需要项目实战练习的学习者。 个人高分设计项目、经导师指导并认可通过的高分设
棉花检测20-YOLO(v5至v9)、COCO、CreateML、Darknet、Paligemma、TFRecord、VOC数据集合集.rar棉-V2释放 ============================= *与您的团队在计算机视觉项目上合作 *收集和组织图像 *了解非结构化图像数据 *注释,创建数据集 *导出,训练和部署计算机视觉模型 *使用主动学习随着时间的推移改善数据集 它包括406张图像。 以可可格式注释棉花。 将以下预处理应用于每个图像: 没有应用图像增强技术。
项目包含前后台完整源码。 项目都经过严格调试,确保可以运行! 具体项目介绍可查看博主文章或私聊获取 助力学习实践,提升编程技能,快来获取这份宝贵的资源吧!
windwos环境下python 3.11系列64位安装包,仅推荐个人学习、开发、娱乐或者测试环境下使用。
使用精品酒销售管理系统的用户分管理员和用户两个角色的权限子模块。 管理员所能使用的功能主要有:主页、个人中心、用户管理、商品分类管理、商品信息管理、系统管理、订单管理等。 用户可以实现主页、个人中心、我的收藏管理、订单管理等。 前台首页可以实现商品信息、新闻资讯、我的、跳转到后台、购物车等。 项目包含完整前后端源码和数据库文件 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7
Video_2024-12-18_000023.wmv
ppt最終版asasaadd
计算机图形学期末考试
springboot-基于SpringBootVue的家具商城系统设计与实现.zip
PenTablet_5.2.4-5.zip
考虑了企业管理者的实际工作环境和需求,最终将人力资源系统划分为5个部分,即登录模块、组织发展模块、员工团队模块、合同管理模块、党建管理模块。 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7
QT音乐播放器MP3 可点击播放可上一首下一首可调节音量 可暂停可上传音乐
椅子检测6-YOLO(v5至v9)、COCO、CreateML、Darknet、Paligemma、TFRecord、VOC数据集合集.rar对象检测实验室-V1 2023-08-21 2:28 PM ============================= *与您的团队在计算机视觉项目上合作 *收集和组织图像 *了解和搜索非结构化图像数据 *注释,创建数据集 *导出,训练和部署计算机视觉模型 *使用主动学习随着时间的推移改善数据集 对于最先进的计算机视觉培训笔记本,您可以与此数据集一起使用 该数据集包括997张图像。 对象以可可格式注释。 将以下预处理应用于每个图像: *像素数据的自动取向(带有Exif-Arientation剥离) *调整大小为640x640(拉伸) 应用以下扩展来创建每个源图像的3个版本: *将盐和胡椒噪声应用于10%的像素
Python课程设计,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。
Altas PF拧紧枪 OP协议,开发协议