`
jaesonchen
  • 浏览: 311732 次
  • 来自: ...
社区版块
存档分类
最新评论

Kafka Producer端封装自定义消息

 
阅读更多

这篇文章主要讲kafka producer端的编程,通过一个应用案例来描述kafka在实际应用中的作用。如果你还没有搭建起kafka的开发环境,可以先参考:<kafka开发环境搭建>

首先描述一下应用的情况:一个站内的搜索引擎,运营人员想知道某一时段,各类用户对商品的不同需求。通过对这些数据的分析,从而获得更多有价值的市场分析报表。这样的情况,就需要我们对每次的搜索进行记录,当然,不太可能使用数据库区记录这些信息(数据库存这些数据我会觉得是种浪费,个人意见)。最好的办法是存日志。然后通过对日志的分析,计算出有用的数据。我们采用kafka这种分布式日志系统来实现这一过程。

完成上述一系列的工作,可以按照以下步骤来执行:

 

1.         搭建kafka系统运行环境。

2.         设计数据存储格式(按照自定义格式来封装消息)

3.         Producer端获取真实数据(搜索记录),并对数据按上述2中设计的格式进行编码。

4.         Producer将已经编码的数据发送到broker上,在broker上进行存储(分配存储策略)。

5.         Consumer端从broker中获取数据,分析计算。

如果用淘宝数据服务平台的架构来匹配这一过程,broker就好比数据中心中存储的角色,producer端基本是放在了应用中心的开放API中,consumer端则一般用于数据产品和应用中心的获取数据中使用。

 

今天主要写的是234三个步骤。我们先看第二步。为了快速实现,这里就设计一个比较简单的消息格式,复杂的原理和这个一样。

 

 

用四个字段分别表示消息的ID、用户、查询关键词和查询时间。当然你如果要设计的更复杂,可以加入IP这些信息。这些用java写就是一个简单的pojo类,这是getter/setter方法即可。由于在封转成kafkamessage时需要将数据转化成bytep[]类型,可以提供一个序列化的方法。我在这里直接重写toString了:

@Override
	public String toString() {
		String keyword = "[info kafka producer:]";
		keyword = keyword + this.getId() + "-" + this.getUser() + "-"
				+ this.getKeyword() + "-" + this.getCurrent();
		return keyword;
	}

这样还没有完成,这只是将数据格式用java对象表现出来,解析来要对其按照kafka的消息类型进行封装,在这里我们只需要实现Encoder类即可:

 

package org.gfg.kafka.message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.message.Message;

public classKeywordMessageimplementskafka.serializer.Encoder<Keyword>{
	
	public static final Logger LOG=LoggerFactory.getLogger(Keyword.class); 
	
	@Override
	public Message toMessage(Keyword words){
		LOG.info("start in encoding...");
		return new Message(words.toString().getBytes());
	}

}

注意泛型和返回类型即可。这样KeywordMessage就是一个可以被kafka发送和存储的对象了。

接下来,我们可以编写一部分producer,获取业务系统的数据。要注意,producer数据的推送到broker的,所以发起者还是业务系统,下面的代码就能直接发送一次数据,注释都很详细:

/**配置producer必要的参数*/
		Properties props = new Properties();
		props.put("zk.connect", "192.168.10.11:2181");
		/**选择用哪个类来进行序列化*/
		props.put("serializer.class", "org.gfg.kafka.message.KeywordMessage");
		props.put("zk.connectiontimeout.ms", "6000");
		ProducerConfig config=new ProducerConfig(props);
		
		/**制造数据*/
		Keyword keyword=new Keyword();
		keyword.setUser("Chenhui");
		keyword.setId(0);
		keyword.setKeyword("china");
		
		List<Keyword> msg=new ArrayList<Keyword>();
		msg.add(keyword);
		
		/**构造数据发送对象*/
		Producer<String, Keyword> producer=new Producer<String, Keyword>(config);		
		ProducerData<String,Keyword> data=new ProducerData<String, Keyword>("test", msg);
		producer.send(data);

发送完之后,我们可以用bin目录下的kafka-console-consumer来看发送的结果(当然现在用的topictest)。可以用命令:

./kafka-console-consumer –zookeeper 192.168.10.11:2181 –topic test –from-beginning

 

 

如果是在使用zookeeper搭建分布式的情况下(zookeeper based broker discovery),我们可以执行第三个步骤,用编码来实现partition的分配策略。这里需要我们实现Partitioner对象:

package org.gfg.kafka.partitioner;

import org.gfg.kafka.message.Keyword;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.producer.Partitioner;

/**
 * 
 * @author Chen.Hui
 * 
 */
public classProducerPartitionerimplementsPartitioner<String> {
	
	public static final Logger LOG=LoggerFactory.getLogger(Keyword.class); 
	
	@Override
	publicintpartition(String key, int numPartitions){
		LOG.info("ProducerPartitioner key:"+key+" partitions:"+numPartitions);
		return key.length() % numPartitions;
	}

}

在上面的partition方法中,值得注意的是,key我们是在构造数据发送对象时设置的,这个key是区分存储的关键,比如我想将我的数据按照不同的用户类别存储。Partition的好处是可以并发的获取同类数据,提高效率,具体可以看之前的文章。

所以在第二部时的producer代码需要有所改进:

/**选择用哪个类来进行设置partition*/
		props.put("partitioner.class", "org.gfg.kafka.partitioner.ProducerPartitioner");

		ProducerData<String,Keyword> data=new ProducerData<String, Keyword>("test","developer", msg);

增加了对partition的配置,并且修改了ProducerData的参数,其中,中间的就是key,如果不设置partitionkafka则随机的向broker中发送请求。我们可以看一眼ProducerData的源码:

package kafka.javaapi.producer

import scala.collection.JavaConversions._

classProducerData[K, V](private val topic: String,
                         private val key: K,
                         private val data: java.util.List[V]) {

  def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)

  def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d)))

  def getTopic: String = topic

  def getKey: K = key

  def getData: java.util.List[V] = data
}

至此,producer端的事情都做完了,当然这就是个demo,还有很多性能上的优化需要做,当然有了这个基础,我们就能将数据存储到broker上,下一步,就是用consumer来消费这些日志,形成有价值的数据产品。

分享到:
评论

相关推荐

    kafka系列解读.pdf

    #### 八、Kafka Producer端封装自定义消息 1. **消息格式**:了解Kafka消息的键值对结构。 2. **消息序列化**:使用序列化器将自定义对象转换为字节数组。 3. **发送消息**:通过`ProducerRecord`对象指定发送的...

    很全面的kafka技术文档

    #### 七、Kafka Producer端封装自定义消息 Producer 端的主要任务是生成并发送消息到 Kafka。自定义消息通常包含以下几个部分: - **消息键(Key)**:用于分区的依据。 - **消息值(Value)**:消息的实际内容。 ...

    spring cloud kafka数据中间件源码

    "dm-kafka-client"可能是一个自定义的Kafka客户端库,用于封装Spring Cloud Kafka的相关操作,提供更方便的API供应用调用。这个客户端可能包含了生产者和消费者的实现,以及一些定制的配置和工具类,以便于在多平台...

    04、Kafka核心源码剖析.zip

    【Kafka核心源码剖析】 Kafka是一款高吞吐量的...通过深入Kafka的源码,我们可以了解其内部的工作机制,这将有助于我们优化Kafka的部署和使用,更好地解决可能出现的问题,同时也能为开发自定义插件或扩展提供基础。

    producerNotification:回购包含API以在Kafka中推送消息

    - 配置选项:允许自定义各种Kafka Producer配置,如acks设置(决定何时确认消息已成功写入)、batch.size(决定批量发送消息的大小)等。 - 分布式事务支持:如果库支持,可以保证消息的原子性,即使在故障情况下也...

    Kafka生产者详解(很细)

    1. **消息封装**:首先,生产者将待发送的消息封装成`ProducerRecord`对象,该对象中包含了目标主题(topic)、消息的具体内容等关键信息。此外,用户还可以指定键(key)和特定的分区(partition)。 2. **序列化**:...

    librdkafka-1.9.0.zip kafka C++

    例如,`Producer`类用于发送消息,`Consumer`类用于接收消息,而`Topic`类则代表Kafka中的主题。 3. **源码结构** librdkafka的源码组织清晰,主要分为几个核心部分: - **Core**:实现了Kafka协议和网络通信,...

    kafka.docx

    生产者发送消息时,将消息封装在 `ProducerRecord` 中,指定主题、内容,可选键和分区。生产者负责序列化消息,使用分区器确定分区,批量发送至 Broker。若消息发送失败,生产者会根据重试策略进行重试,直到成功或...

    MbUtils.Kafka:Kafka消息产生和使用库

    MbUtils.Kafka是一个方便的C#库,它封装了与Apache Kafka的交互,使得.NET开发者能更简单地利用Kafka进行消息传递。通过理解和应用这个库,你可以构建可靠的数据流应用,利用Kafka的强大功能进行实时数据处理和传输...

    大数据课程体系.docx

    - **Kafka的存储策略**:解释Kafka如何存储消息,包括日志段(Log Segments)的管理机制。 - **Kafka分区特点**:说明Kafka如何通过分区来提高吞吐量和可扩展性。 - **Kafka的发布与订阅**:阐述Kafka中的发布-订阅...

    jeesuite-libs-其他

    支持特殊场景发送有状态的消息(如:同一个用户的消息全部由某一个消费节点处理)producer、consumer端监控数据采集,由(jeesuite-admin)输出兼容遗留kafka系统、支持发送和接收无封装的消息mybatis模块代码生成、...

Global site tag (gtag.js) - Google Analytics