- 浏览: 2203261 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (1240)
- mac/IOS (287)
- flutter (1)
- J2EE (115)
- android基础知识 (582)
- android中级知识 (55)
- android组件(Widget)开发 (18)
- android 错误 (21)
- javascript (18)
- linux (70)
- 树莓派 (18)
- gwt/gxt (1)
- 工具(IDE)/包(jar) (18)
- web前端 (17)
- java 算法 (8)
- 其它 (5)
- chrome (7)
- 数据库 (8)
- 经济/金融 (0)
- english (2)
- HTML5 (7)
- 网络安全 (14)
- 设计欣赏/设计窗 (8)
- 汇编/C (8)
- 工具类 (4)
- 游戏 (5)
- 开发频道 (5)
- Android OpenGL (1)
- 科学 (4)
- 运维 (0)
- 好东西 (6)
- 美食 (1)
最新评论
-
liangzai_cool:
请教一下,文中,shell、C、Python三种方式控制led ...
树莓派 - MAX7219 -
jiazimo:
...
Kafka源码分析-序列5 -Producer -RecordAccumulator队列分析 -
hp321:
Windows该命令是不是需要安装什么软件才可以?我试过不行( ...
ImageIO读jpg的时候出现javax.imageio.IIOException: Unsupported Image Type -
hp321:
Chenzh_758 写道其实直接用一下代码就可以解决了:JP ...
ImageIO读jpg的时候出现javax.imageio.IIOException: Unsupported Image Type -
huanghonhpeng:
大哥你真强什么都会,研究研究。。。。小弟在这里学到了很多知识。 ...
android 浏览器
在Kafka源码分析-序列2中,我们提到了整个Producer client的架构图,如下所示:
其它几个组件我们在前面都讲过了,今天讲述最后一个组件RecordAccumulator.
Batch发送
在以前的kafka client中,每条消息称为 “Message”,而在Java版client中,称之为”Record”,同时又因为有批量发送累积功能,所以称之为RecordAccumulator.
RecordAccumulator最大的一个特性就是batch消息,扔到队列中的多个消息,可能组成一个RecordBatch,然后由Sender一次性发送出去。
每个TopicPartition一个队列
下面是RecordAccumulator的内部结构,可以看到,每个TopicPartition对应一个消息队列,只有同一个TopicPartition的消息,才可能被batch。
batch的策略
那什么时候,消息会被batch,什么时候不会呢?下面从KafkaProducer的send方法看起:
从上面代码可以看到,batch逻辑,都在accumulator.append函数里面:
从上面代码我们可以看出Batch的策略:
1。如果是同步发送,每次去队列取,RecordBatch都会为空。这个时候,消息就不会batch,一个Record形成一个RecordBatch
2。Producer 入队速率 < Sender出队速率 && lingerMs = 0 ,消息也不会被batch
3。Producer 入队速率 > Sender出对速率, 消息会被batch
4。lingerMs > 0,这个时候Sender会等待,直到lingerMs > 0 或者 队列满了,或者超过了一个RecordBatch的最大值,就会发送。这个逻辑在RecordAccumulator的ready函数里面。
为什么是Deque?
在上面我们看到,消息队列用的是一个“双端队列“,而不是普通的队列。
一端生产,一端消费,用一个普通的队列不就可以吗,为什么要“双端“呢?
这其实是为了处理“发送失败,重试“的问题:当消息发送失败,要重发的时候,需要把消息优先放入队列头部重新发送,这就需要用到双端队列,在头部,而不是尾部加入。
当然,即使如此,该消息发出去的顺序,还是和Producer放进去的顺序不一致了。
其它几个组件我们在前面都讲过了,今天讲述最后一个组件RecordAccumulator.
Batch发送
在以前的kafka client中,每条消息称为 “Message”,而在Java版client中,称之为”Record”,同时又因为有批量发送累积功能,所以称之为RecordAccumulator.
RecordAccumulator最大的一个特性就是batch消息,扔到队列中的多个消息,可能组成一个RecordBatch,然后由Sender一次性发送出去。
每个TopicPartition一个队列
下面是RecordAccumulator的内部结构,可以看到,每个TopicPartition对应一个消息队列,只有同一个TopicPartition的消息,才可能被batch。
public final class RecordAccumulator { private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches; ... }
batch的策略
那什么时候,消息会被batch,什么时候不会呢?下面从KafkaProducer的send方法看起:
//KafkaProducer public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { try { // first make sure the metadata for the topic is available long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); ... RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs); //核心函数:把消息放入队列 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;
从上面代码可以看到,batch逻辑,都在accumulator.append函数里面:
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { appendsInProgress.incrementAndGet(); try { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); Deque<RecordBatch> dq = dequeFor(tp); //找到该topicPartiton对应的消息队列 synchronized (dq) { RecordBatch last = dq.peekLast(); //拿出队列的最后1个元素 if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); //最后一个元素, 即RecordBatch不为空,把该Record加入该RecordBatch if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } //队列里面没有RecordBatch,建一个新的,然后把Record放进去 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { appendsInProgress.decrementAndGet(); } } private Deque<RecordBatch> dequeFor(TopicPartition tp) { Deque<RecordBatch> d = this.batches.get(tp); if (d != null) return d; d = new ArrayDeque<>(); Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else return previous; }
从上面代码我们可以看出Batch的策略:
1。如果是同步发送,每次去队列取,RecordBatch都会为空。这个时候,消息就不会batch,一个Record形成一个RecordBatch
2。Producer 入队速率 < Sender出队速率 && lingerMs = 0 ,消息也不会被batch
3。Producer 入队速率 > Sender出对速率, 消息会被batch
4。lingerMs > 0,这个时候Sender会等待,直到lingerMs > 0 或者 队列满了,或者超过了一个RecordBatch的最大值,就会发送。这个逻辑在RecordAccumulator的ready函数里面。
public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<Node>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; boolean unknownLeadersExist = false; boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; } else if (!readyNodes.contains(leader)) { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; long waitedTimeMs = nowMs - batch.lastAttemptMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); //关键的一句话 if (sendable && !backingOff) { readyNodes.add(leader); } else { nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); }
为什么是Deque?
在上面我们看到,消息队列用的是一个“双端队列“,而不是普通的队列。
一端生产,一端消费,用一个普通的队列不就可以吗,为什么要“双端“呢?
这其实是为了处理“发送失败,重试“的问题:当消息发送失败,要重发的时候,需要把消息优先放入队列头部重新发送,这就需要用到双端队列,在头部,而不是尾部加入。
当然,即使如此,该消息发出去的顺序,还是和Producer放进去的顺序不一致了。
发表评论
-
小程序textarea完美填坑
2020-07-07 16:09 528相信做微信小程序的码友们都被textarea这个原生组件坑过 ... -
Nginx+Https自己敲命令生成证书
2020-05-18 09:35 953一、准备 环境:centos6.8 ... -
https证书生成环境搭建配置(基于Tomcat和Nginx)
2020-04-24 11:06 826一、基于Tomcat、JDK内置密钥工具: 1、生成服务端证 ... -
史上最强Tomcat8性能优化
2019-11-01 21:41 909授人以鱼不如授人以渔 ... -
SpringBoot配置HTTPS,并实现HTTP访问自动转HTTPS访问
2019-10-07 09:13 5651.使用jdk自带的 keytools 创建证书 打开cmd ... -
Spring Boot工程集成全局唯一ID生成器 UidGenerator
2019-09-16 09:04 860概述 流水号生成器(全局唯一 ID生成器)是服务化系统的基础 ... -
CentOS7下Redis的安装与使用
2019-08-17 11:45 612一、手动安装过程 1、准备工作(安装gcc依赖) yum ... -
Nginx与tomcat组合的简单使用
2019-08-17 10:05 443配置tomcat跳转 请求http出现400的时候在这里配置 ... -
linux下lvs+keepalived安装配置
2019-07-10 14:20 466keepalived主机:192.168.174. ... -
使用Docker搭建Tomcat运行环境
2019-02-08 21:32 4941 准备宿主系统 准备一 ... -
Netty笔记-GlobalEventExecutor
2019-02-06 23:00 6431.概念 /** * Single-thread si ... -
Netty4转发服务的实现方案
2019-02-06 15:03 1146如果用Netty做转发服务(不需要同步应答),Netty中有一 ... -
java手机号归属地查询
2018-12-25 17:16 749所需的包:carrier-1.75.jar 、geocoder ... -
基于Netty4的HttpServer和HttpClient的简单实现
2018-10-17 20:02 700Http 消息格式: Http request: Met ... -
javafx : 支持使用微调(spinner)控制的数字的文本框(NemberTextField)
2018-10-16 00:00 1099最近花了一些时间学习javaFX, 要更深入地理解新GUI包, ... -
我的Java(定制你的Java/JavaFX Runtime)
2018-10-12 23:29 681最新的JDK 11发布了,撒花 新版本的JDK终于有了ope ... -
javaFX的几个新特性,让swing彻底过时
2018-10-12 22:42 669首先声明,Java的GUI曾经 ... -
mac os系统用install4j把jar包生成app
2018-10-05 23:02 1440install4j有windows版也有mac版 mac电脑 ... -
JavaFX Alert对话框
2018-10-05 22:01 23801. 标准对话框 消息对话框 Alert alert = ... -
IDEA Properties中文unicode转码问题
2017-02-17 19:54 1051摘要: 如何让IDEA的properties中的中文进行uni ...
相关推荐
6. **灵活的数据模型**: Kafka支持多种数据模型,包括简单的消息队列、时间序列数据存储以及复杂的事件流处理。这使得它在各种场景下都能发挥作用,如日志聚合、用户行为追踪、物联网(IoT)数据处理等。 7. **广泛的...
3. **消息队列**:Kafka作为一个高可用的消息中间件,可以替代传统的消息队列系统。 4. **事件源**:在微服务架构中,Kafka作为事件源,使得服务间通信更加解耦。 5. **数据集成**:在数据湖或数据仓库建设中,...
Kafka是一种高吞吐量、低延迟的消息队列系统,它允许应用程序以发布/订阅模式进行通信。在Kafka中,数据以主题(Topic)的形式存储,每个主题可以分为多个分区(Partition),确保数据的分布和负载均衡。此外,Kafka...
Kafka是一个高吞吐量、低延迟的消息队列,它可以处理PB级别的数据,适用于日志收集、实时分析、流处理等多种场景。其核心组成部分包括生产者(Producer)、消费者(Consumer)以及主题(Topic)。 在SpringBoot项目...
1.3 主题与分区(Topics and Partitions):主题是逻辑上的消息队列,每个主题可以被分为多个分区,分区是有序且不可变的消息序列。 1.4 broker:Kafka集群由多个broker节点组成,它们存储并处理主题的分区。0.8....
它最初设计为一个高吞吐量、低延迟的消息队列系统,但随着时间的发展,Kafka已经发展成为一个强大的实时数据管道和流处理工具。在版本2.11-0.10.1.0中,Kafka继续提供了高度可扩展性和容错性,适用于大数据实时处理...
Pentaho Kafka Producer是一款用于Pentaho Data Integration(Kettle)平台的插件,它允许用户在数据集成过程中将数据流发布到Apache Kafka消息队列。Kafka是一个分布式流处理平台,广泛应用于实时数据管道和流应用...
Kafka,由LinkedIn开发并开源的分布式流处理平台,已成为大数据领域中的消息队列标准。Confluent Kafka作为其官方提供的商业支持版本,提供了丰富的功能和优化,包括更稳定的性能、全面的监控以及强大的连接器。在...
了解并掌握这些知识点,将有助于你有效地使用和部署Kafka 0.10.0.0,实现高效的数据流处理和实时分析。在实际应用中,还需要关注Kafka与其他系统的集成,如Hadoop、Elasticsearch等,以构建完整的数据处理链路。
《Kafka消息队列源码深度解析》 Kafka,由LinkedIn开源并被Apache基金会接纳,是一款高吞吐量的分布式消息系统,广泛应用于大数据实时处理、日志收集、流计算等多个领域。本文将深入剖析Kafka的核心概念,以及其...
综上所述,Kafka-Tail-Producer是一个利用Java和Linux `tail`命令实现的实时日志数据收集工具,它与Apache Kafka紧密集成,为企业提供了强大的实时日志分析能力。通过合理的配置和扩展,可以满足不同规模和复杂性的...
标题中的“kafka队列下载”指的是Apache Kafka的下载过程,Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用。描述中的“kafka_2.12-0.11.0.0”是Kafka的一个特定版本,它表明是在使用Scala 2.12编译的...
#### 三、Kafka源码分析 ##### 3.1 生产者发送流程 1. **消息序列化**:生产者在发送消息之前,首先需要将消息对象序列化为字节数组。 2. **消息分发**:根据用户指定的分区策略(如轮询、按key哈希等),将消息...
- **Producer**:Producer 负责向 Kafka 发布消息,可以指定消息发送到哪个 Topic 和哪个 Partition。 - **Consumer**:Consumer 用于从 Kafka 中读取消息。消费者可以按照队列模式或发布-订阅模式来消费消息。 ###...
Kafka 是基于发布/订阅模式的消息队列,它的核心组件包括生产者(Producer)、消费者(Consumer)和代理(Broker)。生产者负责发布消息到主题,消费者订阅主题并消费消息,而 Broker 是 Kafka 集群中的服务器节点,...
标题"17道消息队列Kafka面试题!"表明这是一个关于Kafka面试问题的集合,可能涵盖了Kafka的核心概念、功能特性、使用场景以及常见问题等方面。描述同样强调了是17个与Kafka相关的面试问题,这暗示了我们可能需要了解...
Apache Kafka是一个高性能、可扩展的消息队列(MQ),它提供实时的数据流处理能力。在Java应用程序中集成Kafka API可以让开发者轻松地将消息生产到Kafka主题(topics)中,或者从这些主题中消费消息。 首先,让我们...
Apache Kafka是一个高吞吐量、低延迟的消息队列系统,最初由LinkedIn开发,后来成为Apache软件基金会的顶级项目。Kafka的设计目标是提供一个能够处理大量实时数据的平台,它支持发布订阅模型,可以用于日志聚合、流...
由于其出色的性能表现和可扩展性,Kafka已经成为大数据领域中最受欢迎的消息队列中间件之一。 #### 二、Kafka的核心概念 在深入了解Kafka的安装过程之前,我们需要先掌握一些Kafka的基本概念: 1. **Broker**:...