`

Jafka学习之Producer发送前调度

    博客分类:
  • mq
阅读更多
     我们再来看下消息的整个发送的过程,做工作的类为:ProducerSendThread 我们来看看:
   
final String threadName ;

    // 消息队列
    final BlockingQueue<QueueItem<T>> queue;

    // 消息的encoder
    final Encoder<T> serializer;

    // 底层的同步的消息发送器
    final SyncProducer underlyingProducer;

    // 事件处理类,它执行真实的消息发送
    final EventHandler<T> eventHandler;

    // 用户可干预的事件处理类
    final CallbackHandler<T> callbackHandler;

    final long queueTime ;

    final int batchSize ;

    private final Logger logger = LoggerFactory.getLogger(ProducerSendThread .class );

    /////////////////////////////////////////////////////////////////////
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);

    private volatile boolean shutdown = false;
    有几个属性我们目前不知道什么意思,待分析完代码后再看是什么意思吧。
  说到线程类,我们主要看下run方法即可:
    
public void run() {
        try {
            List<QueueItem<T>> remainingEvents = processEvents();
            //handle remaining events
            if (remainingEvents.size() > 0) {
                logger.debug(format( "Dispatching last batch of %d events to the event handler", remainingEvents.size()));
                tryToHandle(remainingEvents);
            }
        } catch (Exception e) {
            logger.error("Error in sending events: ", e);
        } finally {
            shutdownLatch.countDown();
        }
    }
    从这里可以看到,处理方式是比较简单的,调用了processEvents函数后,如果还有剩余,就调用tryToHandle函数再去处理。
  我们先来看看processEvents函数吧:
   
private List<QueueItem<T>> processEvents() {
        long lastSend = System.currentTimeMillis();
        final List<QueueItem<T>> events = new ArrayList<QueueItem<T>>();
        boolean full = false;
        while (!shutdown ) {
            try {
                // 从队列里面获取一个元素
                QueueItem<T> item = queue.poll(Math.max(0, (lastSend + queueTime) - System.currentTimeMillis()), TimeUnit. MILLISECONDS);
                long elapsed = System.currentTimeMillis() - lastSend;
                boolean expired = item == null;
                if (item != null) {
                    if (callbackHandler != null) {
                        // 如果callbackHandler不为空,则进行回调
                        List<QueueItem<T>> items = callbackHandler.afterDequeuingExistingData(item);
                        if (items != null) {
                            events.addAll(items);
                        }
                    } else {
                        events.add(item);
                    }
                    //
                    full = events.size() >= batchSize;
                }
                // 如果超时或者队列已满
                if (full || expired) {
                    if (logger .isDebugEnabled()) {
                        if (expired) {
                            logger.debug(elapsed + " ms elapsed. Queue time reached. Sending..");
                        } else {
                            logger.debug(format( "Batch(%d) full. Sending.." , batchSize));
                        }
                    }
                    // 调用tryToHandle函数去处理事件
                    tryToHandle(events);
                    lastSend = System.currentTimeMillis();
                    events.clear();
                }
            } catch (InterruptedException e) {
                logger.warn(e.getMessage(), e);
            }
        }
        if (queue .size() > 0) {
            throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, " + queue.size() + " remaining items in the queue");
        }
        // 回调用户的lastBatchBeforeClose函数,这个函数还可以再返回一些消息
        if (this .callbackHandler != null) {
            List<QueueItem<T>> remainEvents = this.callbackHandler .lastBatchBeforeClose();
            if (remainEvents != null) {
                events.addAll(remainEvents);
            }
        }
        return events;
    }
    我们最后来看下tryToHandle函数
private void tryToHandle(List<QueueItem<T>> events) {
        if (logger .isDebugEnabled()) {
            logger.debug("handling " + events.size() + " events");
        }
        if (events.size() > 0) {
            try {
                this.eventHandler .handle(events, underlyingProducer, serializer );
            } catch (RuntimeException e) {
                logger.error("Error in handling batch of " + events.size() + " events", e);
            }
        }
    }
 
   从这个函数来看,最终的消息发送其实是eventHandler在起左右。
我们今天来看下DefaultEventHandler这个类的实现,它其实是底层的发送的处理类。
   
public class DefaultEventHandler<T> implements EventHandler<T> {
    // 用户的回调的handler,这个handler其实贯穿准备消息和用户发送消息的整个生命周期之中
    private final CallbackHandler<T> callbackHandler;
    // 从命名上就能看出是队列的压缩
    private final Set<String> compressedTopics ;
    // 消息是否编码
    private final CompressionCodec codec ;

    private final Logger logger = LoggerFactory.getLogger(DefaultEventHandler. class);
    // 重试次数
    private final int numRetries ;

下面我们再来看下比较重要的方法handle

 public void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder) {
        List<QueueItem<T>> processedEvents = events;
        if (this .callbackHandler != null) {
            processedEvents = this.callbackHandler .beforeSendingData(events);
        }
        send(collate(processedEvents, encoder), producer);
    }
   从这个地方来看,有两个函数,一个是collate,一个是send,这两个函数比较重要。
       我们先来看collate函数:
       
 private List<ProducerRequest> collate(List<QueueItem<T>> events, Encoder<T> encoder) {
        if(events == null || events.isEmpty()){
            return Collections.emptyList();
        }
        final Map<String, Map<Integer, List<Message>>> topicPartitionData = new HashMap<String, Map<Integer, List<Message>>>();
        for (QueueItem<T> event : events) {
            // 初始化map,可以看出来,key是topic,值是每个partition的一个消息列表
            Map<Integer, List<Message>> partitionData = topicPartitionData.get(event.topic );
            if (partitionData == null) {
                partitionData = new HashMap<Integer, List<Message>>();
                topicPartitionData.put(event. topic, partitionData);
            }
            List<Message> data = partitionData.get(event.partition );
            if (data == null) {
                data = new ArrayList<Message>();
                partitionData.put(event. partition, data);
            }
            // 经过encoder处理后将消息入队列
            data.add(encoder.toMessage(event. data));
        }
        //
        final List<ProducerRequest> requests = new ArrayList<ProducerRequest>();
        for (Map.Entry<String, Map<Integer, List<Message>>> e : topicPartitionData.entrySet()) {
            final String topic = e.getKey();
            for (Map.Entry<Integer, List<Message>> pd : e.getValue().entrySet()) {
                final Integer partition = pd.getKey();
                // 后进ProducerRequest东西,这个里面包含topic,partition以及一个value列表
                requests.add( new ProducerRequest(topic, partition, convert(topic, pd.getValue())));
            }
        }
        return requests;
    }
      我们接下来再看下convert函数,这个函数明显就是一个工具函数,包含一个消息列表好codec,为发送MessageSet做准备。
     
private ByteBufferMessageSet convert(String topic, List<Message> messages) {
        //compress condition:
        if (codec != CompressionCodec.NoCompressionCodec //
                && ( compressedTopics.isEmpty() || compressedTopics.contains(topic))) {
            return new ByteBufferMessageSet(codec, messages.toArray(new Message[messages.size()]));
        }
        return new ByteBufferMessageSet(CompressionCodec.NoCompressionCodec, messages.toArray( new Message[messages
                .size()]));
    }
       最后我们来看下send函数:
   
private void send(List<ProducerRequest> produces, SyncProducer syncProducer) {
        if (produces.isEmpty()) {
            return;
        }
        final int maxAttempts = 1 + numRetries;
        for (int i = 0; i < maxAttempts; i++) {
            try {
                syncProducer.multiSend(produces);
                break;
            } catch (RuntimeException e) {
                logger.warn("error sending message, attempts times: " + i, e);
                if (i == maxAttempts - 1) {
                    throw e;
                }
            }
        }
    }
      这个地方可以看出来,最终还是调用的是syncProducer的multiSend函数。
SyncProducer类以及multiSend函数我们在下一篇博客里面讲解。
分享到:
评论

相关推荐

    Kafka Producer机制优化-提高发送消息可靠性

    ### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...

    Laravel开发-producer

    消息队列允许Producer发送消息,其他进程(消费者)可以接收并处理这些消息。 4. **规则(Rules)**:描述中的“基于规则的简单类解析”可能意味着Producer在创建或调度任务时遵循特定的业务规则。这些规则可能是...

    Photodex ProShow Producer 模板资源5G

    Photodex ProShow Producer是一款专业的视频幻灯片制作工具,它提供了丰富的模板资源和插件,使得视频制作变得快速、简便且高度可定制。它非常适合爱好制作电子相册的用户,能够满足他们多样化的制作需求。ProShow ...

    pentaho-kafka-producer.zip

    Pentaho Kafka Producer是一款用于Pentaho Data Integration(Kettle)平台的插件,它允许用户在数据集成过程中将数据流发布到Apache Kafka消息队列。Kafka是一个分布式流处理平台,广泛应用于实时数据管道和流应用...

    ProShow Producer 模板 玫瑰婚礼

    《ProShow Producer 模板:打造浪漫“玫瑰婚礼”》 在数字时代,人们越来越注重个性化和创意化的表达,尤其是在婚礼这种人生中的重要时刻。ProShow Producer,一款专业的幻灯片制作软件,就为新人们提供了这样的...

    kafka_producer.zip

    总结来说,`kafka_producer.zip`提供的资源展示了如何使用Python和`kafka-python`库创建一个简单的Kafka生产者,该生产者能够从配置文件读取参数,向指定的Kafka主题发送数据。理解这个过程对于任何需要在Python环境...

    Easy RealMedia Producer V1.94

    1. **直观易用的界面**:Easy RealMedia Producer采用了用户友好的界面设计,使得操作流程简单明了,减少了学习成本。 2. **多媒体输入支持**:支持导入各种常见的音频、视频格式,如MP3、WAV、AVI、MPEG等,方便...

    绿色版 OID Producer

    OID Producer是一款用于生成和管理Object Identifier(OID)的工具,主要在信息技术领域,尤其是网络协议和软件开发中使用。OID是标识数据对象的一种国际标准,它在ASN.1(抽象语法标记一世)编码规则下工作,是网络...

    RocketMQ概念 producer:生产者,消息发送者

    一个`Producer Group`由多个`Producer`组成,它们共享同组内的配置信息,如发送策略和主题。生产者可以选择同步、异步或单向发送消息。 - **同步发送**:生产者发送消息后等待Broker的确认,确保消息被成功接收。 ...

    Helix Producer Plus设置说明.doc

    《深入解析Helix Producer Plus设置指南》 Helix Producer Plus是一款功能强大的多媒体转换与编码工具,相较于REALPRODUCER 8 PLUS,它在模板管理、服务器管理以及压缩进程管理等方面实现了重大升级,使得多媒体...

    consumer_producer.rar_操作系统_生产 消费_生产者-消费者问题_进程调度

    在"consumer_producer.rar"这个文件中,可能包含了一个操作系统模拟实验,通过编程实现生产者-消费者问题的解决方案,可能包括代码示例、分析报告等。通过学习这个案例,你可以深入了解操作系统中进程同步的重要性,...

    Easy RealMedia Producer

    《Easy RealMedia Producer:高效便捷的Real媒体转换工具》 在多媒体处理领域,各种格式的视频文件广泛存在,其中,MPEG(Moving Picture Experts Group)因其高质量和广泛的兼容性而备受青睐。然而,在某些特定...

    Easy Real Media Producer v1.93

    软件介绍 一个批量RealMedia文件生成器。采用全新的RealVideo v9&RealVideo; v10内核,根据实际使用的需要提供了比Helix RealMedia ...自动关机前有30秒响应时间,用户可以取消关机。 任务结束后有详细的信息报告。

    Impact Image Producer

    对于初学者来说,Impact Image Producer提供了学习光线跟踪渲染的绝佳平台。通过实际操作,可以深入理解光线跟踪算法的工作原理,并提升图像处理技能。而对于专业设计师和开发者,该软件则提供了一个强大的工具,能...

    helix producer linux

    Helix Producer是一款强大的多媒体编码工具,尤其在Linux操作系统环境下,它为用户提供了高效且灵活的视频编码解决方案。这款软件由RealNetworks开发,主要用于将各种格式的多媒体内容转换成RealMedia格式,使得内容...

    amazon-kinesis-producer, 亚马逊Kinesis制作库.zip

    amazon-kinesis-producer, 亚马逊Kinesis制作库 室Producer库简介在亚马逊 Kinesis Producer Producer Producer Producer Producer Producer Producer performs performs performs per

    Producer/Consumer 多线程处理文件

    在IT领域,多线程处理是提升程序性能和并发能力的重要技术之一。在这个"Producer/Consumer 多线程处理文件"的主题中,我们将深入探讨Java中的生产者消费者模型,以及如何利用这一模式来高效地处理大量数据,如一秒钟...

    kafka中文文档producer配置参数

    在Kafka中,Producer(生产者)负责将数据流发送到Kafka集群中。生产者的配置参数非常关键,因为它们会直接影响到生产者的行为和性能。以下是根据提供的文件内容整理的Kafka生产者配置参数的知识点: 1. bootstrap....

    Flex基于Producer和Consumer方式的简易消息推送机制

    Flex中的消息推送机制是基于Producer和Consumer模型实现的,这种机制允许应用程序实时地发送和接收消息,常用于构建聊天室、通知系统等实时交互的应用。下面将详细解释Flex消息推送的相关知识点。 1. **Producer与...

    Helix Producer Plus V9.01

    配合Helix先进的功能,Realnetworks推出了第10代的流媒体压缩软件Helix Producer。Realnetworks全新改写代码的图形化专业流媒体文件制作工具。利用它,你可以轻松地实现RealAudio8、RealAudio9文件格式到实时文件的...

Global site tag (gtag.js) - Google Analytics