1. QueueItem介绍,从类的定义来看,这个里面包含一个数据,超那个topic的那个分区发送
public class QueueItem<T> { public final T data ; public final int partition ; public final String topic ; }
2. EventHandler介绍,从类的定义来看,可以初始化,可以被关闭,在handle方法里面包含一个encoder,一个SyncProducer,和一批消息
public interface EventHandler<T> extends Closeable{ /** * Initializes the event handler using a Properties object * * @param properties the properties used to initialize the event * handler */ void init(Properties properties); /** * Callback to dispatch the batched data and send it to a Jafka server * * @param events the data sent to the producer * @param producer the low- level producer used to send the data * @param encoder data encoder */ void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder); /** * Cleans up and shuts down the event handler */ void close(); }
3. CallBackHandler从类的命名来看就是一个回调函数,这个在异步发送的时候,可以回调用户端的一些程序
public interface EventHandler<T> extends Closeable{ /** * Initializes the event handler using a Properties object * * @param properties the properties used to initialize the event * handler */ void init(Properties properties); /** * Callback to dispatch the batched data and send it to a Jafka server * * @param events the data sent to the producer * @param producer the low- level producer used to send the data * @param encoder data encoder */ void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder); /** * Cleans up and shuts down the event handler */ void close(); }
4 我们接着来看下SyncProducer的实现
public class SyncProducer implements Closeable { private final Logger logger = Logger.getLogger(SyncProducer .class ); //private static final RequestKeys RequestKey = RequestKeys.Produce;//0 ///////////////////////////////////////////////////////////////////// // 同步发送器的配置 private final SyncProducerConfig config ; // BlockingChannel这个待会再讲,感觉是封装了一个channel private final BlockingChannel blockingChannel ; // 这个是一个对象锁,待会讲解锁的妙用 private final Object lock = new Object(); // 是否已经关闭的标志位 private volatile boolean shutdown = false; // 主机和端口号 private final String host ; private final int port ; 接着是它的构造函数: public SyncProducer(SyncProducerConfig config) { super(); this.config = config; this.host = config.getHost(); this.port = config.getPort(); // this.blockingChannel = new BlockingChannel(host, port , -1, config.socketTimeoutMs, config.bufferSize ); }
这个里面讲readBufferSize设置成-1了,因为它不需要读数据
我们接下来看下这三个send的重载函数
// 采用随机partition进行发送 public void send(String topic, ByteBufferMessageSet message) { send(topic, ProducerRequest. RandomPartition, message); } // 检验消息大小后,构建ProducerRequest对象进行send public void send(String topic, int partition, ByteBufferMessageSet messages) { messages.verifyMessageSize(config .maxMessageSize ); send( new ProducerRequest(topic, partition, messages)); } private void send(Request request) { // 从request对象构建出send对象 BoundedByteBufferSend send = new BoundedByteBufferSend(request); synchronized (lock ) { long startTime = System.nanoTime(); int written = -1; try { // 建立链接并发送send,最终返回一个number,表明发送的字节数量 written = connect().send(send); } catch (IOException e) { // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry disconnect(); throw new RuntimeException(e); } finally { if (logger .isDebugEnabled()) { logger.debug(format( "write %d bytes data to %s:%d", written, host, port)); } } final long endTime = System.nanoTime (); SyncProducerStats.recordProduceRequest(endTime - startTime); } }
相关推荐
### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...
消息队列允许Producer发送消息,其他进程(消费者)可以接收并处理这些消息。 4. **规则(Rules)**:描述中的“基于规则的简单类解析”可能意味着Producer在创建或调度任务时遵循特定的业务规则。这些规则可能是...
《深入解析Helix Producer Plus设置指南》 Helix Producer Plus是一款功能强大的多媒体转换与编码工具,相较于REALPRODUCER 8 PLUS,它在模板管理、服务器管理以及压缩进程管理等方面实现了重大升级,使得多媒体...
Photodex ProShow Producer是一款专业的视频幻灯片制作工具,它提供了丰富的模板资源和插件,使得视频制作变得快速、简便且高度可定制。它非常适合爱好制作电子相册的用户,能够满足他们多样化的制作需求。ProShow ...
《ProShow Producer 模板:打造浪漫“玫瑰婚礼”》 在数字时代,人们越来越注重个性化和创意化的表达,尤其是在婚礼这种人生中的重要时刻。ProShow Producer,一款专业的幻灯片制作软件,就为新人们提供了这样的...
总结来说,`kafka_producer.zip`提供的资源展示了如何使用Python和`kafka-python`库创建一个简单的Kafka生产者,该生产者能够从配置文件读取参数,向指定的Kafka主题发送数据。理解这个过程对于任何需要在Python环境...
Pentaho Kafka Producer是一款用于Pentaho Data Integration(Kettle)平台的插件,它允许用户在数据集成过程中将数据流发布到Apache Kafka消息队列。Kafka是一个分布式流处理平台,广泛应用于实时数据管道和流应用...
1. **直观易用的界面**:Easy RealMedia Producer采用了用户友好的界面设计,使得操作流程简单明了,减少了学习成本。 2. **多媒体输入支持**:支持导入各种常见的音频、视频格式,如MP3、WAV、AVI、MPEG等,方便...
OID Producer是一款用于生成和管理Object Identifier(OID)的工具,主要在信息技术领域,尤其是网络协议和软件开发中使用。OID是标识数据对象的一种国际标准,它在ASN.1(抽象语法标记一世)编码规则下工作,是网络...
一个`Producer Group`由多个`Producer`组成,它们共享同组内的配置信息,如发送策略和主题。生产者可以选择同步、异步或单向发送消息。 - **同步发送**:生产者发送消息后等待Broker的确认,确保消息被成功接收。 ...
《Easy RealMedia Producer:高效便捷的Real媒体转换工具》 在多媒体处理领域,各种格式的视频文件广泛存在,其中,MPEG(Moving Picture Experts Group)因其高质量和广泛的兼容性而备受青睐。然而,在某些特定...
在IT领域,多线程处理是提升程序性能和并发能力的重要技术之一。在这个"Producer/Consumer 多线程处理文件"的主题中,我们将深入探讨Java中的生产者消费者模型,以及如何利用这一模式来高效地处理大量数据,如一秒钟...
### 大数据平台Kafka组件应用研究说明书 #### 一、组件介绍 ##### 基础介绍 Apache Kafka 是一种高吞吐量的分布式发布订阅消息系统,它最初由LinkedIn公司开发,之后成为Apache软件基金会的顶级项目。Kafka的设计...
Flex中的消息推送机制是基于Producer和Consumer模型实现的,这种机制允许应用程序实时地发送和接收消息,常用于构建聊天室、通知系统等实时交互的应用。下面将详细解释Flex消息推送的相关知识点。 1. **Producer与...
配合Helix先进的功能,Realnetworks推出了第10代的流媒体压缩软件Helix Producer。Realnetworks全新改写代码的图形化专业流媒体文件制作工具。利用它,你可以轻松地实现RealAudio8、RealAudio9文件格式到实时文件的...
对于初学者来说,Impact Image Producer提供了学习光线跟踪渲染的绝佳平台。通过实际操作,可以深入理解光线跟踪算法的工作原理,并提升图像处理技能。而对于专业设计师和开发者,该软件则提供了一个强大的工具,能...
v10内核,根据实际使用的需要提供了比Helix RealMedia Producer和RealProducer10还要多的过滤设置。 基本上可以用它来替代Helix RealMedia Producer和RealProducer v10,使用Real v10内核时,全面支持Real10文件...
Helix Producer是一款强大的多媒体编码工具,尤其在Linux操作系统环境下,它为用户提供了高效且灵活的视频编码解决方案。这款软件由RealNetworks开发,主要用于将各种格式的多媒体内容转换成RealMedia格式,使得内容...
amazon-kinesis-producer, 亚马逊Kinesis制作库 室Producer库简介在亚马逊 Kinesis Producer Producer Producer Producer Producer Producer Producer performs performs performs per
Helix Producer Plus是一款专业的流媒体内容创作工具,主要用于创建、编码和发布高质量的数字媒体,如视频、音频和动画。V9.01是该软件的一个版本,它可能包含了性能优化、新功能以及修复了一些已知问题。在这个版本...