我们再来看下消息的整个发送的过程,做工作的类为:ProducerSendThread 我们来看看:
final String threadName ; // 消息队列 final BlockingQueue<QueueItem<T>> queue; // 消息的encoder final Encoder<T> serializer; // 底层的同步的消息发送器 final SyncProducer underlyingProducer; // 事件处理类,它执行真实的消息发送 final EventHandler<T> eventHandler; // 用户可干预的事件处理类 final CallbackHandler<T> callbackHandler; final long queueTime ; final int batchSize ; private final Logger logger = LoggerFactory.getLogger(ProducerSendThread .class ); ///////////////////////////////////////////////////////////////////// private final CountDownLatch shutdownLatch = new CountDownLatch(1); private volatile boolean shutdown = false;有几个属性我们目前不知道什么意思,待分析完代码后再看是什么意思吧。
说到线程类,我们主要看下run方法即可:
public void run() { try { List<QueueItem<T>> remainingEvents = processEvents(); //handle remaining events if (remainingEvents.size() > 0) { logger.debug(format( "Dispatching last batch of %d events to the event handler", remainingEvents.size())); tryToHandle(remainingEvents); } } catch (Exception e) { logger.error("Error in sending events: ", e); } finally { shutdownLatch.countDown(); } }从这里可以看到,处理方式是比较简单的,调用了processEvents函数后,如果还有剩余,就调用tryToHandle函数再去处理。
我们先来看看processEvents函数吧:
private List<QueueItem<T>> processEvents() { long lastSend = System.currentTimeMillis(); final List<QueueItem<T>> events = new ArrayList<QueueItem<T>>(); boolean full = false; while (!shutdown ) { try { // 从队列里面获取一个元素 QueueItem<T> item = queue.poll(Math.max(0, (lastSend + queueTime) - System.currentTimeMillis()), TimeUnit. MILLISECONDS); long elapsed = System.currentTimeMillis() - lastSend; boolean expired = item == null; if (item != null) { if (callbackHandler != null) { // 如果callbackHandler不为空,则进行回调 List<QueueItem<T>> items = callbackHandler.afterDequeuingExistingData(item); if (items != null) { events.addAll(items); } } else { events.add(item); } // full = events.size() >= batchSize; } // 如果超时或者队列已满 if (full || expired) { if (logger .isDebugEnabled()) { if (expired) { logger.debug(elapsed + " ms elapsed. Queue time reached. Sending.."); } else { logger.debug(format( "Batch(%d) full. Sending.." , batchSize)); } } // 调用tryToHandle函数去处理事件 tryToHandle(events); lastSend = System.currentTimeMillis(); events.clear(); } } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } if (queue .size() > 0) { throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, " + queue.size() + " remaining items in the queue"); } // 回调用户的lastBatchBeforeClose函数,这个函数还可以再返回一些消息 if (this .callbackHandler != null) { List<QueueItem<T>> remainEvents = this.callbackHandler .lastBatchBeforeClose(); if (remainEvents != null) { events.addAll(remainEvents); } } return events; }我们最后来看下tryToHandle函数
private void tryToHandle(List<QueueItem<T>> events) { if (logger .isDebugEnabled()) { logger.debug("handling " + events.size() + " events"); } if (events.size() > 0) { try { this.eventHandler .handle(events, underlyingProducer, serializer ); } catch (RuntimeException e) { logger.error("Error in handling batch of " + events.size() + " events", e); } } }
从这个函数来看,最终的消息发送其实是eventHandler在起左右。
我们今天来看下DefaultEventHandler这个类的实现,它其实是底层的发送的处理类。
public class DefaultEventHandler<T> implements EventHandler<T> { // 用户的回调的handler,这个handler其实贯穿准备消息和用户发送消息的整个生命周期之中 private final CallbackHandler<T> callbackHandler; // 从命名上就能看出是队列的压缩 private final Set<String> compressedTopics ; // 消息是否编码 private final CompressionCodec codec ; private final Logger logger = LoggerFactory.getLogger(DefaultEventHandler. class); // 重试次数 private final int numRetries ; 下面我们再来看下比较重要的方法handle public void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder) { List<QueueItem<T>> processedEvents = events; if (this .callbackHandler != null) { processedEvents = this.callbackHandler .beforeSendingData(events); } send(collate(processedEvents, encoder), producer); }从这个地方来看,有两个函数,一个是collate,一个是send,这两个函数比较重要。
我们先来看collate函数:
private List<ProducerRequest> collate(List<QueueItem<T>> events, Encoder<T> encoder) { if(events == null || events.isEmpty()){ return Collections.emptyList(); } final Map<String, Map<Integer, List<Message>>> topicPartitionData = new HashMap<String, Map<Integer, List<Message>>>(); for (QueueItem<T> event : events) { // 初始化map,可以看出来,key是topic,值是每个partition的一个消息列表 Map<Integer, List<Message>> partitionData = topicPartitionData.get(event.topic ); if (partitionData == null) { partitionData = new HashMap<Integer, List<Message>>(); topicPartitionData.put(event. topic, partitionData); } List<Message> data = partitionData.get(event.partition ); if (data == null) { data = new ArrayList<Message>(); partitionData.put(event. partition, data); } // 经过encoder处理后将消息入队列 data.add(encoder.toMessage(event. data)); } // final List<ProducerRequest> requests = new ArrayList<ProducerRequest>(); for (Map.Entry<String, Map<Integer, List<Message>>> e : topicPartitionData.entrySet()) { final String topic = e.getKey(); for (Map.Entry<Integer, List<Message>> pd : e.getValue().entrySet()) { final Integer partition = pd.getKey(); // 后进ProducerRequest东西,这个里面包含topic,partition以及一个value列表 requests.add( new ProducerRequest(topic, partition, convert(topic, pd.getValue()))); } } return requests; }我们接下来再看下convert函数,这个函数明显就是一个工具函数,包含一个消息列表好codec,为发送MessageSet做准备。
private ByteBufferMessageSet convert(String topic, List<Message> messages) { //compress condition: if (codec != CompressionCodec.NoCompressionCodec // && ( compressedTopics.isEmpty() || compressedTopics.contains(topic))) { return new ByteBufferMessageSet(codec, messages.toArray(new Message[messages.size()])); } return new ByteBufferMessageSet(CompressionCodec.NoCompressionCodec, messages.toArray( new Message[messages .size()])); }最后我们来看下send函数:
private void send(List<ProducerRequest> produces, SyncProducer syncProducer) { if (produces.isEmpty()) { return; } final int maxAttempts = 1 + numRetries; for (int i = 0; i < maxAttempts; i++) { try { syncProducer.multiSend(produces); break; } catch (RuntimeException e) { logger.warn("error sending message, attempts times: " + i, e); if (i == maxAttempts - 1) { throw e; } } } }这个地方可以看出来,最终还是调用的是syncProducer的multiSend函数。
SyncProducer类以及multiSend函数我们在下一篇博客里面讲解。
相关推荐
### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...
消息队列允许Producer发送消息,其他进程(消费者)可以接收并处理这些消息。 4. **规则(Rules)**:描述中的“基于规则的简单类解析”可能意味着Producer在创建或调度任务时遵循特定的业务规则。这些规则可能是...
Photodex ProShow Producer是一款专业的视频幻灯片制作工具,它提供了丰富的模板资源和插件,使得视频制作变得快速、简便且高度可定制。它非常适合爱好制作电子相册的用户,能够满足他们多样化的制作需求。ProShow ...
《ProShow Producer 模板:打造浪漫“玫瑰婚礼”》 在数字时代,人们越来越注重个性化和创意化的表达,尤其是在婚礼这种人生中的重要时刻。ProShow Producer,一款专业的幻灯片制作软件,就为新人们提供了这样的...
总结来说,`kafka_producer.zip`提供的资源展示了如何使用Python和`kafka-python`库创建一个简单的Kafka生产者,该生产者能够从配置文件读取参数,向指定的Kafka主题发送数据。理解这个过程对于任何需要在Python环境...
Pentaho Kafka Producer是一款用于Pentaho Data Integration(Kettle)平台的插件,它允许用户在数据集成过程中将数据流发布到Apache Kafka消息队列。Kafka是一个分布式流处理平台,广泛应用于实时数据管道和流应用...
1. **直观易用的界面**:Easy RealMedia Producer采用了用户友好的界面设计,使得操作流程简单明了,减少了学习成本。 2. **多媒体输入支持**:支持导入各种常见的音频、视频格式,如MP3、WAV、AVI、MPEG等,方便...
OID Producer是一款用于生成和管理Object Identifier(OID)的工具,主要在信息技术领域,尤其是网络协议和软件开发中使用。OID是标识数据对象的一种国际标准,它在ASN.1(抽象语法标记一世)编码规则下工作,是网络...
一个`Producer Group`由多个`Producer`组成,它们共享同组内的配置信息,如发送策略和主题。生产者可以选择同步、异步或单向发送消息。 - **同步发送**:生产者发送消息后等待Broker的确认,确保消息被成功接收。 ...
《深入解析Helix Producer Plus设置指南》 Helix Producer Plus是一款功能强大的多媒体转换与编码工具,相较于REALPRODUCER 8 PLUS,它在模板管理、服务器管理以及压缩进程管理等方面实现了重大升级,使得多媒体...
在"consumer_producer.rar"这个文件中,可能包含了一个操作系统模拟实验,通过编程实现生产者-消费者问题的解决方案,可能包括代码示例、分析报告等。通过学习这个案例,你可以深入了解操作系统中进程同步的重要性,...
《Easy RealMedia Producer:高效便捷的Real媒体转换工具》 在多媒体处理领域,各种格式的视频文件广泛存在,其中,MPEG(Moving Picture Experts Group)因其高质量和广泛的兼容性而备受青睐。然而,在某些特定...
软件介绍 一个批量RealMedia文件生成器。采用全新的RealVideo v9&RealVideo; v10内核,根据实际使用的需要提供了比Helix RealMedia ...自动关机前有30秒响应时间,用户可以取消关机。 任务结束后有详细的信息报告。
配合Helix先进的功能,Realnetworks推出了第10代的流媒体压缩软件Helix Producer。Realnetworks全新改写代码的图形化专业流媒体文件制作工具。利用它,你可以轻松地实现RealAudio8、RealAudio9文件格式到实时文件的...
对于初学者来说,Impact Image Producer提供了学习光线跟踪渲染的绝佳平台。通过实际操作,可以深入理解光线跟踪算法的工作原理,并提升图像处理技能。而对于专业设计师和开发者,该软件则提供了一个强大的工具,能...
Helix Producer是一款强大的多媒体编码工具,尤其在Linux操作系统环境下,它为用户提供了高效且灵活的视频编码解决方案。这款软件由RealNetworks开发,主要用于将各种格式的多媒体内容转换成RealMedia格式,使得内容...
amazon-kinesis-producer, 亚马逊Kinesis制作库 室Producer库简介在亚马逊 Kinesis Producer Producer Producer Producer Producer Producer Producer performs performs performs per
在IT领域,多线程处理是提升程序性能和并发能力的重要技术之一。在这个"Producer/Consumer 多线程处理文件"的主题中,我们将深入探讨Java中的生产者消费者模型,以及如何利用这一模式来高效地处理大量数据,如一秒钟...
Flex中的消息推送机制是基于Producer和Consumer模型实现的,这种机制允许应用程序实时地发送和接收消息,常用于构建聊天室、通知系统等实时交互的应用。下面将详细解释Flex消息推送的相关知识点。 1. **Producer与...
Helix Producer Plus是一款专业的流媒体内容创作工具,主要用于创建、编码和发布高质量的数字媒体,如视频、音频和动画。V9.01是该软件的一个版本,它可能包含了性能优化、新功能以及修复了一些已知问题。在这个版本...