`

Jafka学习之Producer发送之组件说明

    博客分类:
  • mq
阅读更多

         1. QueueItem介绍,从类的定义来看,这个里面包含一个数据,超那个topic的那个分区发送

         

public class QueueItem<T> {

    public final T data ;

    public final int partition ;

    public final String topic ;
}

         2. EventHandler介绍,从类的定义来看,可以初始化,可以被关闭,在handle方法里面包含一个encoder,一个SyncProducer,和一批消息

      

public interface EventHandler<T> extends Closeable{

    /**
     * Initializes the event handler using a Properties object
     *
     * @param properties the properties used to initialize the event
     *        handler
     */
    void init(Properties properties);

    /**
     * Callback to dispatch the batched data and send it to a Jafka server
     *
     * @param events the data sent to the producer
     * @param producer the low- level producer used to send the data
     * @param encoder data encoder
     */
    void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder);

    /**
     * Cleans up and shuts down the event handler
     */
    void close();
}

 

           3. CallBackHandler从类的命名来看就是一个回调函数,这个在异步发送的时候,可以回调用户端的一些程序

         

public interface EventHandler<T> extends Closeable{

    /**
     * Initializes the event handler using a Properties object
     *
     * @param properties the properties used to initialize the event
     *        handler
     */
    void init(Properties properties);

    /**
     * Callback to dispatch the batched data and send it to a Jafka server
     *
     * @param events the data sent to the producer
     * @param producer the low- level producer used to send the data
     * @param encoder data encoder
     */
    void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder);

    /**
     * Cleans up and shuts down the event handler
     */
    void close();
}

          4 我们接着来看下SyncProducer的实现

        

public class SyncProducer implements Closeable {

    private final Logger logger = Logger.getLogger(SyncProducer .class );

    //private static final RequestKeys RequestKey = RequestKeys.Produce;//0

    /////////////////////////////////////////////////////////////////////
    // 同步发送器的配置
    private final SyncProducerConfig config ;
    // BlockingChannel这个待会再讲,感觉是封装了一个channel
    private final BlockingChannel blockingChannel ;
    // 这个是一个对象锁,待会讲解锁的妙用
    private final Object lock = new Object();
    // 是否已经关闭的标志位
    private volatile boolean shutdown = false;

    // 主机和端口号
    private final String host ;

    private final int port ;

  接着是它的构造函数:
 public SyncProducer(SyncProducerConfig config) {
        super();
        this.config = config;
        this.host = config.getHost();
        this.port = config.getPort();
        //
        this.blockingChannel = new BlockingChannel(host, port , -1, config.socketTimeoutMs, config.bufferSize );
    }

        这个里面讲readBufferSize设置成-1了,因为它不需要读数据

        我们接下来看下这三个send的重载函数
        
// 采用随机partition进行发送
public void send(String topic, ByteBufferMessageSet message) {
        send(topic, ProducerRequest. RandomPartition, message);
    }

// 检验消息大小后,构建ProducerRequest对象进行send
    public void send(String topic, int partition, ByteBufferMessageSet messages) {
        messages.verifyMessageSize(config .maxMessageSize );
        send( new ProducerRequest(topic, partition, messages));
    }

    private void send(Request request) {
        // 从request对象构建出send对象
        BoundedByteBufferSend send = new BoundedByteBufferSend(request);
        synchronized (lock ) {
            long startTime = System.nanoTime();
            int written = -1;
            try {
                // 建立链接并发送send,最终返回一个number,表明发送的字节数量
                written = connect().send(send);
            } catch (IOException e) {
                // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
                disconnect();
                throw new RuntimeException(e);
            } finally {
                if (logger .isDebugEnabled()) {
                    logger.debug(format( "write %d bytes data to %s:%d", written, host, port));
                }
            }
            final long endTime = System.nanoTime ();
            SyncProducerStats.recordProduceRequest(endTime - startTime);
        }
    }
 
        
分享到:
评论

相关推荐

    Kafka Producer机制优化-提高发送消息可靠性

    ### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...

    Laravel开发-producer

    消息队列允许Producer发送消息,其他进程(消费者)可以接收并处理这些消息。 4. **规则(Rules)**:描述中的“基于规则的简单类解析”可能意味着Producer在创建或调度任务时遵循特定的业务规则。这些规则可能是...

    Helix Producer Plus设置说明.doc

    《深入解析Helix Producer Plus设置指南》 Helix Producer Plus是一款功能强大的多媒体转换与编码工具,相较于REALPRODUCER 8 PLUS,它在模板管理、服务器管理以及压缩进程管理等方面实现了重大升级,使得多媒体...

    Photodex ProShow Producer 模板资源5G

    Photodex ProShow Producer是一款专业的视频幻灯片制作工具,它提供了丰富的模板资源和插件,使得视频制作变得快速、简便且高度可定制。它非常适合爱好制作电子相册的用户,能够满足他们多样化的制作需求。ProShow ...

    ProShow Producer 模板 玫瑰婚礼

    《ProShow Producer 模板:打造浪漫“玫瑰婚礼”》 在数字时代,人们越来越注重个性化和创意化的表达,尤其是在婚礼这种人生中的重要时刻。ProShow Producer,一款专业的幻灯片制作软件,就为新人们提供了这样的...

    pentaho-kafka-producer.zip

    Pentaho Kafka Producer是一款用于Pentaho Data Integration(Kettle)平台的插件,它允许用户在数据集成过程中将数据流发布到Apache Kafka消息队列。Kafka是一个分布式流处理平台,广泛应用于实时数据管道和流应用...

    kafka_producer.zip

    总结来说,`kafka_producer.zip`提供的资源展示了如何使用Python和`kafka-python`库创建一个简单的Kafka生产者,该生产者能够从配置文件读取参数,向指定的Kafka主题发送数据。理解这个过程对于任何需要在Python环境...

    Easy RealMedia Producer V1.94

    1. **直观易用的界面**:Easy RealMedia Producer采用了用户友好的界面设计,使得操作流程简单明了,减少了学习成本。 2. **多媒体输入支持**:支持导入各种常见的音频、视频格式,如MP3、WAV、AVI、MPEG等,方便...

    绿色版 OID Producer

    OID Producer是一款用于生成和管理Object Identifier(OID)的工具,主要在信息技术领域,尤其是网络协议和软件开发中使用。OID是标识数据对象的一种国际标准,它在ASN.1(抽象语法标记一世)编码规则下工作,是网络...

    RocketMQ概念 producer:生产者,消息发送者

    一个`Producer Group`由多个`Producer`组成,它们共享同组内的配置信息,如发送策略和主题。生产者可以选择同步、异步或单向发送消息。 - **同步发送**:生产者发送消息后等待Broker的确认,确保消息被成功接收。 ...

    Easy RealMedia Producer

    《Easy RealMedia Producer:高效便捷的Real媒体转换工具》 在多媒体处理领域,各种格式的视频文件广泛存在,其中,MPEG(Moving Picture Experts Group)因其高质量和广泛的兼容性而备受青睐。然而,在某些特定...

    Producer/Consumer 多线程处理文件

    在IT领域,多线程处理是提升程序性能和并发能力的重要技术之一。在这个"Producer/Consumer 多线程处理文件"的主题中,我们将深入探讨Java中的生产者消费者模型,以及如何利用这一模式来高效地处理大量数据,如一秒钟...

    大数据平台Kafka组件应用研究说明书

    ### 大数据平台Kafka组件应用研究说明书 #### 一、组件介绍 ##### 基础介绍 Apache Kafka 是一种高吞吐量的分布式发布订阅消息系统,它最初由LinkedIn公司开发,之后成为Apache软件基金会的顶级项目。Kafka的设计...

    Flex基于Producer和Consumer方式的简易消息推送机制

    Flex中的消息推送机制是基于Producer和Consumer模型实现的,这种机制允许应用程序实时地发送和接收消息,常用于构建聊天室、通知系统等实时交互的应用。下面将详细解释Flex消息推送的相关知识点。 1. **Producer与...

    Helix Producer Plus V9.01

    配合Helix先进的功能,Realnetworks推出了第10代的流媒体压缩软件Helix Producer。Realnetworks全新改写代码的图形化专业流媒体文件制作工具。利用它,你可以轻松地实现RealAudio8、RealAudio9文件格式到实时文件的...

    Impact Image Producer

    对于初学者来说,Impact Image Producer提供了学习光线跟踪渲染的绝佳平台。通过实际操作,可以深入理解光线跟踪算法的工作原理,并提升图像处理技能。而对于专业设计师和开发者,该软件则提供了一个强大的工具,能...

    Easy Real Media Producer v1.93

    v10内核,根据实际使用的需要提供了比Helix RealMedia Producer和RealProducer10还要多的过滤设置。 基本上可以用它来替代Helix RealMedia Producer和RealProducer v10,使用Real v10内核时,全面支持Real10文件...

    helix producer linux

    Helix Producer是一款强大的多媒体编码工具,尤其在Linux操作系统环境下,它为用户提供了高效且灵活的视频编码解决方案。这款软件由RealNetworks开发,主要用于将各种格式的多媒体内容转换成RealMedia格式,使得内容...

    amazon-kinesis-producer, 亚马逊Kinesis制作库.zip

    amazon-kinesis-producer, 亚马逊Kinesis制作库 室Producer库简介在亚马逊 Kinesis Producer Producer Producer Producer Producer Producer Producer performs performs performs per

    Helix Producer Plus V9.01 附汉化

    Helix Producer Plus是一款专业的流媒体内容创作工具,主要用于创建、编码和发布高质量的数字媒体,如视频、音频和动画。V9.01是该软件的一个版本,它可能包含了性能优化、新功能以及修复了一些已知问题。在这个版本...

Global site tag (gtag.js) - Google Analytics