`
jaesonchen
  • 浏览: 309802 次
  • 来自: ...
社区版块
存档分类
最新评论

Kafka Producer端封装自定义消息

 
阅读更多

这篇文章主要讲kafka producer端的编程,通过一个应用案例来描述kafka在实际应用中的作用。如果你还没有搭建起kafka的开发环境,可以先参考:<kafka开发环境搭建>

首先描述一下应用的情况:一个站内的搜索引擎,运营人员想知道某一时段,各类用户对商品的不同需求。通过对这些数据的分析,从而获得更多有价值的市场分析报表。这样的情况,就需要我们对每次的搜索进行记录,当然,不太可能使用数据库区记录这些信息(数据库存这些数据我会觉得是种浪费,个人意见)。最好的办法是存日志。然后通过对日志的分析,计算出有用的数据。我们采用kafka这种分布式日志系统来实现这一过程。

完成上述一系列的工作,可以按照以下步骤来执行:

 

1.         搭建kafka系统运行环境。

2.         设计数据存储格式(按照自定义格式来封装消息)

3.         Producer端获取真实数据(搜索记录),并对数据按上述2中设计的格式进行编码。

4.         Producer将已经编码的数据发送到broker上,在broker上进行存储(分配存储策略)。

5.         Consumer端从broker中获取数据,分析计算。

如果用淘宝数据服务平台的架构来匹配这一过程,broker就好比数据中心中存储的角色,producer端基本是放在了应用中心的开放API中,consumer端则一般用于数据产品和应用中心的获取数据中使用。

 

今天主要写的是234三个步骤。我们先看第二步。为了快速实现,这里就设计一个比较简单的消息格式,复杂的原理和这个一样。

 

 

用四个字段分别表示消息的ID、用户、查询关键词和查询时间。当然你如果要设计的更复杂,可以加入IP这些信息。这些用java写就是一个简单的pojo类,这是getter/setter方法即可。由于在封转成kafkamessage时需要将数据转化成bytep[]类型,可以提供一个序列化的方法。我在这里直接重写toString了:

@Override
	public String toString() {
		String keyword = "[info kafka producer:]";
		keyword = keyword + this.getId() + "-" + this.getUser() + "-"
				+ this.getKeyword() + "-" + this.getCurrent();
		return keyword;
	}

这样还没有完成,这只是将数据格式用java对象表现出来,解析来要对其按照kafka的消息类型进行封装,在这里我们只需要实现Encoder类即可:

 

package org.gfg.kafka.message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.message.Message;

public classKeywordMessageimplementskafka.serializer.Encoder<Keyword>{
	
	public static final Logger LOG=LoggerFactory.getLogger(Keyword.class); 
	
	@Override
	public Message toMessage(Keyword words){
		LOG.info("start in encoding...");
		return new Message(words.toString().getBytes());
	}

}

注意泛型和返回类型即可。这样KeywordMessage就是一个可以被kafka发送和存储的对象了。

接下来,我们可以编写一部分producer,获取业务系统的数据。要注意,producer数据的推送到broker的,所以发起者还是业务系统,下面的代码就能直接发送一次数据,注释都很详细:

/**配置producer必要的参数*/
		Properties props = new Properties();
		props.put("zk.connect", "192.168.10.11:2181");
		/**选择用哪个类来进行序列化*/
		props.put("serializer.class", "org.gfg.kafka.message.KeywordMessage");
		props.put("zk.connectiontimeout.ms", "6000");
		ProducerConfig config=new ProducerConfig(props);
		
		/**制造数据*/
		Keyword keyword=new Keyword();
		keyword.setUser("Chenhui");
		keyword.setId(0);
		keyword.setKeyword("china");
		
		List<Keyword> msg=new ArrayList<Keyword>();
		msg.add(keyword);
		
		/**构造数据发送对象*/
		Producer<String, Keyword> producer=new Producer<String, Keyword>(config);		
		ProducerData<String,Keyword> data=new ProducerData<String, Keyword>("test", msg);
		producer.send(data);

发送完之后,我们可以用bin目录下的kafka-console-consumer来看发送的结果(当然现在用的topictest)。可以用命令:

./kafka-console-consumer –zookeeper 192.168.10.11:2181 –topic test –from-beginning

 

 

如果是在使用zookeeper搭建分布式的情况下(zookeeper based broker discovery),我们可以执行第三个步骤,用编码来实现partition的分配策略。这里需要我们实现Partitioner对象:

package org.gfg.kafka.partitioner;

import org.gfg.kafka.message.Keyword;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.producer.Partitioner;

/**
 * 
 * @author Chen.Hui
 * 
 */
public classProducerPartitionerimplementsPartitioner<String> {
	
	public static final Logger LOG=LoggerFactory.getLogger(Keyword.class); 
	
	@Override
	publicintpartition(String key, int numPartitions){
		LOG.info("ProducerPartitioner key:"+key+" partitions:"+numPartitions);
		return key.length() % numPartitions;
	}

}

在上面的partition方法中,值得注意的是,key我们是在构造数据发送对象时设置的,这个key是区分存储的关键,比如我想将我的数据按照不同的用户类别存储。Partition的好处是可以并发的获取同类数据,提高效率,具体可以看之前的文章。

所以在第二部时的producer代码需要有所改进:

/**选择用哪个类来进行设置partition*/
		props.put("partitioner.class", "org.gfg.kafka.partitioner.ProducerPartitioner");

		ProducerData<String,Keyword> data=new ProducerData<String, Keyword>("test","developer", msg);

增加了对partition的配置,并且修改了ProducerData的参数,其中,中间的就是key,如果不设置partitionkafka则随机的向broker中发送请求。我们可以看一眼ProducerData的源码:

package kafka.javaapi.producer

import scala.collection.JavaConversions._

classProducerData[K, V](private val topic: String,
                         private val key: K,
                         private val data: java.util.List[V]) {

  def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)

  def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d)))

  def getTopic: String = topic

  def getKey: K = key

  def getData: java.util.List[V] = data
}

至此,producer端的事情都做完了,当然这就是个demo,还有很多性能上的优化需要做,当然有了这个基础,我们就能将数据存储到broker上,下一步,就是用consumer来消费这些日志,形成有价值的数据产品。

分享到:
评论

相关推荐

    【大厂面试专栏】一份Java程序员需要的技术指南,这里有面试题、系统架构

    【大厂面试专栏】一份Java程序员需要的技术指南,这里有面试题、系统架构、职场锦囊、主流中间件等,让你成为更牛的自己!_technology-talk

    flashocc-QAT-PTQ.zip

    flashocc-QAT-PTQ.zip

    大连理工大学城市学院在四川2020-2024各专业最低录取分数及位次表.pdf

    那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据

    川北医学院在四川2020-2024各专业最低录取分数及位次表.pdf

    那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据

    黑河学院在四川2020-2024各专业最低录取分数及位次表.pdf

    那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据

    西安邮电大学在四川2020-2024各专业最低录取分数及位次表.pdf

    那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据

    【光学】基于matlab两列单色平面波+合成【含Matlab源码 9007期】.zip

    CSDN海神之光上传的全部代码均可运行,亲测可用,尽我所能,为你服务; 1、代码压缩包内容 主函数:main.m; 调用函数:其他m文件;无需运行 运行结果效果图; 2、代码运行版本 Matlab 2019b;若运行有误,根据提示修改;若不会,可私信博主; 3、运行操作步骤 步骤一:将所有文件放到Matlab的当前文件夹中; 步骤二:双击打开main.m文件; 步骤三:点击运行,等程序运行完得到结果; 4、物理应用 仿真:导航、地震、电磁、电路、电能、机械、工业控制、水位控制、直流电机、平面电磁波、管道瞬变流、刚度计算 光学:光栅、杨氏双缝、单缝、多缝、圆孔、矩孔衍射、夫琅禾费、干涉、拉盖尔高斯、光束、光波、涡旋 定位问题:chan、taylor、RSSI、music、卡尔曼滤波UWB 气动学:弹道、气体扩散、龙格库弹道 运动学:倒立摆、泊车 天体学:卫星轨道、姿态 船舶:控制、运动 电磁学:电场分布、电偶极子、永磁同步、变压器

    文件比较工具、文件夹比较工具、linux、ubuntu、linx麒麟等免费使用多日

    文件比较工具、文件夹比较工具、linux、ubuntu、linx麒麟等免费使用多日

    Spire.XLS是一个基于.NET的组件,使用它我们可以创建Excel文件

    Spire.XLS是一个基于.NET的组件,使用它我们可以创建Excel文件,编辑已有的Excel并且可以转换Excel文件.zip

    【Unity完整游戏模板】Downhill Ride 轻松开发极限运动或竞速类的下坡滑行游戏

    文件名:Downhill Ride - Game Template 2020 LTS v1.2.3.unitypackage Downhill Ride - Game Template (2020 LTS) 是一个为 Unity 2020 LTS 版本开发的完整游戏模板,主要适用于开发极限运动或竞速类的下坡滑行游戏。这个模板专为快速原型设计和项目开发而打造,提供了关键功能和资源,帮助开发者轻松实现类似下坡竞速的游戏项目。 主要特点: 完整的游戏框架: 该模板包含基础的游戏逻辑,允许玩家通过控制角色在下坡道上滑行或骑行,避开障碍物并尽可能快速完成赛道。 物理与控制系统: 内置的物理引擎和角色控制器已经经过优化,可以实现平滑的下坡滑行体验,提供真实感十足的物理效果。 多种关卡支持: 模板支持多个关卡设计,开发者可以根据需要扩展或自定义不同难度的关卡。 UI 和交互设计: 包含基本的用户界面(UI)设计,带有主菜单、关卡选择、计分系统等功能,用户可以轻松扩展或定制这些 UI 元素。 优化的性能: 模板专为移动平台和桌面平台优化,确保良好的性能表现......

    Java课程设计之销售管理系统

    (1)课程设计项目简单描述 鉴于当今超市产品种类繁多,光靠人手动的登记已经不能满足一般商家的需求。我们编辑该程序帮助商家完成产品、商家信息的管理,包括产品、客户、供应商等相关信息的添加、修改、删除等功能。 (2)需求分析(或是任务分析) 1)产品类别信息管理:对客户的基本信息进行添加、修改和删除。 2)产品信息管理:对产品的基本信息进行添加、修改和删除。 3)供应商信息管理: 对供应商的基本信息进行添加、修改和删除。 4)订单信息管理:对订单的基本信 息进行添加、修改和删除。 5)统计报表:按选择日期期间,并按产品类别分组统 计订单金额,使用表格显示统计结果

    常州大学在四川2020-2024各专业最低录取分数及位次表.pdf

    那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据

    yolo算法-工地佩戴头盔数据集-1608张图像带标签-epi-d4clr.zip

    yolo系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值

    Android System Webview(com.google.android.webvie) 125.0.6422.82

    Android System Webview(com.google.android.webvie) 125.0.6422.82 一般情况下设备可以从google play上更新,但是google play 中没有历史版本下载,所以在自己需要之后把资源上传

    VLP超低轮廓铜箔,全球前10强生产商排名及市场份额(by QYResearch).docx

    VLP超低轮廓铜箔,全球前10强生产商排名及市场份额(by QYResearch).docx

    南宁学院在四川2020-2024各专业最低录取分数及位次表.pdf

    那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据

    PPServ是一个Web开发集成环境,可以使用Apache,PHP,Mysql创建Web应用 -PPServ.zip

    网鼎杯PPServ是一个Web开发集成环境,可以使用Apache,PHP,Mysql创建Web应用。_PPServ.zip

    沧州交通学院在四川2020-2024各专业最低录取分数及位次表.pdf

    那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据

    windows 安装包Miniconda3-py38-4.11.0-Windows-x86-64

    windows 安装包Miniconda3-py38-4.11.0-Windows-x86-64

    yolo算法-公路等级数据集-8188张图像带标签-汽车客车摩托highway-classification.zip

    yolo系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值

Global site tag (gtag.js) - Google Analytics