kafka producer客户端
KafkaProducer的send方法:
1.等待kafka要发送的topic的partition都在线 2.序列化key,value; key:org.apache.kafka.common.serialization.IntegerSerializer value:org.apache.kafka.common.serialization.StringSerializer 3.根据发送数据计算索要发送的topic的partition 使用record记录中的partition,若为空,用paritition类计算 partition:org.apache.kafka.clients.producer.internals.DefaultPartitioner 4.确保所要发送的信息的序列化大小不超过阈值 阈值:MAX_REQUEST_SIZE_CONFIG = "max.request.size" BUFFER_MEMORY_CONFIG = "buffer.memory" 5.实例化topic的partition,实例化发送对象result,添加accumulator中的topic队列中 封装为writable records,包含compresor压缩,再封装为batch 压缩参数:COMPRESSION_TYPE_CONFIG = "compression.type"; 6.查看result的batch是否或是新建的,则唤醒sener发送消息 7.返回result的future
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { try { // @1 first make sure the metadata for the topic is available waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); byte[] serializedKey; try { // @2 序列化key serializedKey = keySerializer.serialize(record.topic(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer"); } byte[] serializedValue; try { //序列化value serializedValue = valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } //@3 计算partition int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); ensureValidRecordSize(serializedSize);//@4 确保发送请求不超过阈值 TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); //@5 发送封装好的对象 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); if (result.batchIsFull || result.newBatchCreated) {//@6 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;//@7 // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); throw e; } }
其中第5步,添加到accumulator代码如下:
将record添加到topic的partition队列中,如果存在则添加;
如果不存在则创建队列,二次检查队列是否有值,如果有,则将record添加
如果没有,则封装writable records,包含compresor压缩
和batch类;
record.append的时候调用compressor进行压缩
存在与否都将当record添加到队列中,并且进行压缩(如果配置压缩)
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); try { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // check if we have an in-progress batch Deque<RecordBatch> dq = dequeFor(tp); synchronized (dq) {//添加到已经存在的topic队列中 RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); //重复判断其他producer有没有放到dp中 RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } //没有当前topic的数据 //封装writable records,包含compresor压缩 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//封装为recordBatch FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { appendsInProgress.decrementAndGet(); } }
MemoryRecords
public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { if (!this.records.hasRoomFor(key, value)) {//mem溢出 return null; } else { this.records.append(0L, key, value);//添加到record中,又进行压缩操作 this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }
在第6步,sender类发送消息run方法:
1.检查record是否准备好:
The record set is full The record set has sat in the accumulator for at least lingerMs milliseconds The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are immediately considered ready). The accumulator has been closed
2.获取accumulator中所有数据
3. 生成request中
4.填入selector的client中
5.client selector nio发送数据
public void run(long now) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send 获得数据的leader RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); //only for debug test // if(batches.size()>=1){ // System.out.println(batches.size()); // } sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now);//生成request // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this.client.poll(pollTimeout, now);//nio 发送数据 }
相关推荐
物业公司绩效考核制度
2025最新小学数学义务教育课程标准(2022年版)必考题库附含答案.docx
1、文件内容:SDL-1.2.15-17.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/SDL-1.2.15-17.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、安装指导:私信博主,全程指导安装
软件研发绩效考核办法
FactoryIO液位PID仿真程序入门指南:使用TIA Portal V15与FactoryIO 2.4.0的梯形图编程,factoryio液位PID仿真程序 使用简单的梯形图编写,通俗易懂,起到抛砖引玉的作用,比较适合有动手能力的入门初学者。 软件环境: 1、西门子编程软件:TIA Portal V15(博图V15) 2、FactoryIO 2.4.0 内容清单: 1、FactoryIO中文说明书+场景模型文件 2、博图V15PLC程序(源码)。 ,核心关键词:FactoryIO液位PID仿真程序; 梯形图编写; 通俗易懂; 入门初学者; 西门子编程软件; TIA Portal V15(博图V15); FactoryIO 2.4.0; FactoryIO中文说明书; 场景模型文件; PLC程序(源码)。,"FactoryIO液位控制:梯形图PID仿真程序"
# 微信自动锁定:您的隐私保护神器在当今数字化时代,隐私安全日益重要。微信作为我们日常沟通和信息交流的重要工具,其中的隐私保护不容忽视。为了满足大家对微信隐私保护的需求,我们特别推出了“微信自动锁定”软件。 1. **自动锁定**:这是软件的核心功能。当您在使用微信的过程中,若3分钟内没有任何鼠标和键盘操作,软件会自动锁定微信。这一功能有效防止了他人在您离开电脑时,未经授权访问您的微信,保护您的聊天记录、个人信息等隐私不被泄露。 2. **便捷的托盘操作**:程序启动后,会自动收纳到电脑右下角的托盘中,不占用过多屏幕空间,也不会干扰您的正常工作和使用。当您需要退出程序时,只需右键点击托盘图标,选择“退出”即可,操作简单便捷。 3. **灵活的托盘图标设置**:托盘图标支持两种设置方式。您可以选择指定路径的图片作为托盘图标,按照自己的喜好进行个性化设置;也可以使用base64编码的图标,满足不同用户的多样化需求。
【作品名称】:基于Python 的BP神经网络的高频金融时间序列分析【毕业设计】 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】: This project includes three sections. GetData Data crawling. Responsible for crawling and processing the high frequency data of stock transcation in recent years. And the data format would be converted to an appropriate format. predict Core algorithm. Implement the Back Propagation Neural Networks 【资源声明】:本资源作为“参考资料”而不是“定制需求”,代码只能作为参考,不能完全复制照搬。需要有一定的基础看懂代码,自行调试代码并解决报错,能自行添加功能修改代码。
1、文件内容:PackageKit-gtk3-module-1.1.10-2.el7.centos.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/PackageKit-gtk3-module-1.1.10-2.el7.centos.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、安装指导:私信博主,全程指导安装
三相PWM整流器的双闭环控制与C语言实现的PI控制及SVPWM模块技术解析,三相PWM整流器,采用双闭环控制,用C语言实现PI控制,SVPWM等模块。 ,核心关键词:三相PWM整流器;双闭环控制;C语言实现;PI控制;SVPWM模块;,三相PWM整流器双闭环PI控制与SVPWM模块实现
反激式开关电源仿真研究:电压外环PI控制下12V输出电压及MATLAB建模分析,反激式开关电源,反激仿真电力电子仿真,电压外环PI控制,输入电压18-75V,输出电压12V,输出功率12W,MATLAB simulink软件。 ,核心关键词:反激式开关电源; 反激仿真; 电力电子仿真; 电压外环PI控制; 输入电压18-75V; 输出电压12V; 输出功率12W; MATLAB Simulink软件。,基于反激式开关电源的电力电子仿真与电压外环PI控制研究
电赛小车源码 常规C/C++编程 【核心代码】 ├── 2003智能小车(全国大学生电子设计竞赛) │ ├── BrainCar.M51 │ ├── BrainCar.Opt │ ├── BrainCar.Uv2 │ ├── BrainCar.hex │ ├── BrainCar.lnp │ ├── BrainCar.plg │ ├── BrainCar_Opt.Bak │ ├── BrainCar_Uv2.Bak │ ├── Config.h │ ├── Follow.LST │ ├── Follow.OBJ │ ├── Follow.c │ ├── Follow.h │ ├── IOCfg.LST │ ├── IOCfg.OBJ │ ├── IOCfg.c │ ├── LightDTC.LST │ ├── LightDTC.OBJ │ ├── LightDTC.c │ ├── LightDTC.h │ ├── MetalDTC.LST │ ├── MetalDTC.OBJ │ ├── Met
1、文件内容:alsa-plugins-speex-1.1.6-1.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/alsa-plugins-speex-1.1.6-1.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、安装指导:私信博主,全程指导安装
值得借鉴的IT部门绩效考核范文
双端VSC(三阶)MMC平均值模型四阶小信号模型代码解析与阶跃验证,双端VSC(3阶) MMC平均值模型(4阶)小信号模型代码,小信号阶跃验证代码 ,核心关键词:双端VSC; 3阶; MMC平均值模型; 4阶; 小信号模型代码; 小信号阶跃验证代码;,双端VSC 3阶小信号模型代码及4阶MMC平均值阶跃验证研究
P020250120583214598713 2.et
23 采购部门KPI指标(结合BSC)
东方日升集团薪酬绩效管理办法
"基于Turbo编译码算法的通信信道编码译码仿真及logmap和Sova算法研究",Turbo编译码实现 通信专业 信道编码译码识别 接turbo码译码算法仿真 译码算法logmap sova ,核心关键词:Turbo编译码实现; 通信专业; 信道编码译码识别; Turbo码译码算法仿真; 译码算法logmap; 译码算法sova;,Turbo编译码实现与算法仿真:信道编码译码识别技术
1、文件内容:adwaita-qt4-1.0-1.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/adwaita-qt4-1.0-1.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、安装指导:私信博主,全程指导安装
ComfyUI工作流文件和开发API文件 更多内容可以查阅 工作流讲解,文件和文件汇总: 《ComfyUI工作流教程、软件使用、开发指导、模型下载》https://datayang.blog.csdn.net/article/details/145220524 图形桌面工具使用教程: 《我的AI工具箱Tauri+Django环境开发,支持局域网使用》https://datayang.blog.csdn.net/article/details/141897682