ListenerConsumer
一、构造方法,两件事,定义一个rebalance listener,consumer订阅topic
private ListenerConsumer(MessageListener<K, V> listener, AcknowledgingMessageListener<K, V> ackListener) { Assert.state(!this.isAnyManualAck || !this.autoCommit, "Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode()); final Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(); ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) {//rebalance开始前、consumer处理最后一条消息后,触发 // do not stop the invoker if it is not started yet // this will occur on the initial start on a subscription if (!ListenerConsumer.this.autoCommit) { if (ListenerConsumer.this.logger.isTraceEnabled()) { ListenerConsumer.this.logger.trace("Received partition revocation notification, " + "and will stop the invoker."); } if (ListenerConsumer.this.listenerInvokerFuture != null) { stopInvokerAndCommitManualAcks();//停掉listenerInvoker,人工commit offset,这个时候不管ackMode是啥,都commit ListenerConsumer.this.recordsToProcess.clear(); ListenerConsumer.this.unsent = null; } else { if (!CollectionUtils.isEmpty(partitions)) { ListenerConsumer.this.logger.error("Invalid state: the invoker was not active, " + "but the consumer had allocated partitions"); } } } else { if (ListenerConsumer.this.logger.isTraceEnabled()) { ListenerConsumer.this.logger.trace("Received partition revocation notification, " + "but the container is in autocommit mode, " + "so transition will be handled by the consumer");//这句话是说自动提交时,是consumer自己处理rebalance的消息? } } getContainerProperties().getConsumerRebalanceListener().onPartitionsRevoked(partitions); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) {//rebalance结束后、consumer处理第一条消息前触发 ListenerConsumer.this.assignedPartitions = partitions; if (!ListenerConsumer.this.autoCommit) { // Commit initial positions - this is generally redundant but // it protects us from the case when another consumer starts // and rebalance would cause it to reset at the end // see https://github.com/spring-projects/spring-kafka/issues/110 Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); for (TopicPartition partition : partitions) {//commit一下初始的offset offsets.put(partition, new OffsetAndMetadata(consumer.position(partition))); } if (ListenerConsumer.this.logger.isDebugEnabled()) { ListenerConsumer.this.logger.debug("Committing: " + offsets); } if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) { ListenerConsumer.this.consumer.commitSync(offsets); } else { ListenerConsumer.this.consumer.commitAsync(offsets, KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback()); } } // We will not start the invoker thread if we are in autocommit mode, // as we will execute synchronously then // We will not start the invoker thread if the container is stopped // We will not start the invoker thread if there are no partitions to // listen to if (!ListenerConsumer.this.autoCommit && KafkaMessageListenerContainer.this.isRunning() && !CollectionUtils.isEmpty(partitions)) { startInvoker();//自动提交是在consumer里同步执行的,所以只在非自动提交时启动listenerInvoker } getContainerProperties().getConsumerRebalanceListener().onPartitionsAssigned(partitions); } }; if (KafkaMessageListenerContainer.this.topicPartitions == null) {//如果没有指定partition,给订阅一下 if (this.containerProperties.getTopicPattern() != null) { consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener); } else { consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener); } } else {//有指定的,初始化definedPartitions,把partition assign给consumer List<TopicPartitionInitialOffset> topicPartitions = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions); this.definedPartitions = new HashMap<>(topicPartitions.size()); for (TopicPartitionInitialOffset topicPartition : topicPartitions) { this.definedPartitions.put(topicPartition.topicPartition(), topicPartition.initialOffset()); } consumer.assign(new ArrayList<>(this.definedPartitions.keySet())); } this.consumer = consumer; this.listener = listener; this.acknowledgingMessageListener = ackListener; }
@Override public void run() { this.count = 0; this.last = System.currentTimeMillis(); if (isRunning() && this.definedPartitions != null) { initPartitionsIfNeeded();//初始化各partition的offset 1、topicPartition指定了初始offset的话就是这个 2、没指定的话不初始化,那就是从队头取起 3、指定了负数n,就取该partition的最大offset+n,也就是队尾往前数n条 // we start the invoker here as there will be no rebalance calls to // trigger it, but only if the container is not set to autocommit // otherwise we will process records on a separate thread if (!this.autoCommit) { startInvoker();//启动invoker,开始消费消息 } } long lastReceive = System.currentTimeMillis(); long lastAlertAt = lastReceive; while (isRunning()) { try { if (!this.autoCommit) { processCommits();//非自动提交,处理提交操作 } if (this.logger.isTraceEnabled()) { this.logger.trace("Polling (paused=" + this.paused + ")..."); } ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout()); if (records != null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records"); } if (records != null && records.count() > 0) { if (this.containerProperties.getIdleEventInterval() != null) { lastReceive = System.currentTimeMillis(); } // if the container is set to auto-commit, then execute in the // same thread // otherwise send to the buffering queue //自动提交的立刻执行,非自动提交的发送到阻塞队列recordsToProcess里去,ListenerInvoker是从这个队列里取数据处理的,然而在哪里commit的呢,没看到。。。。 if (this.autoCommit) { invokeListener(records); } else { if (sendToListener(records)) { if (this.assignedPartitions != null) { // avoid group management rebalance due to a slow // consumer this.consumer.pause(this.assignedPartitions .toArray(new TopicPartition[this.assignedPartitions.size()])); this.paused = true; this.unsent = records; } } } } else { if (this.containerProperties.getIdleEventInterval() != null) { long now = System.currentTimeMillis(); if (now > lastReceive + this.containerProperties.getIdleEventInterval() && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) { publishIdleContainerEvent(now - lastReceive); lastAlertAt = now; } } } this.unsent = checkPause(this.unsent); } catch (WakeupException e) { this.unsent = checkPause(this.unsent); } catch (Exception e) { if (this.containerProperties.getErrorHandler() != null) { this.containerProperties.getErrorHandler().handle(e, null); } else { this.logger.error("Container exception", e); } } } //能执行到这里,说明isRunning=false了,ListenerConsumer要停止了 if (this.listenerInvokerFuture != null) { stopInvokerAndCommitManualAcks(); } try { this.consumer.unsubscribe(); } catch (WakeupException e) { // No-op. Continue process } this.consumer.close(); if (this.logger.isInfoEnabled()) { this.logger.info("Consumer stopped"); } }
private void startInvoker() { ListenerConsumer.this.invoker = new ListenerInvoker(); ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor() .submit(ListenerConsumer.this.invoker); //使用线程池跑ListenerInvoker }
private final class ListenerInvoker implements SchedulingAwareRunnable { private final CountDownLatch exitLatch = new CountDownLatch(1); private volatile boolean active = true; private volatile Thread executingThread; @Override public void run() { Assert.isTrue(this.active, "This instance is not active anymore"); try { this.executingThread = Thread.currentThread(); while (this.active) { try { ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1, TimeUnit.SECONDS); if (this.active) { if (records != null) { invokeListener(records); } else { if (ListenerConsumer.this.logger.isTraceEnabled()) { ListenerConsumer.this.logger.trace("No records to process"); } } } } catch (InterruptedException e) { if (!this.active) { Thread.currentThread().interrupt(); } else { ListenerConsumer.this.logger.debug("Interrupt ignored"); } } } } finally { this.active = false; this.exitLatch.countDown(); } } @Override public boolean isLongLived() { return true; } private void stop() { if (ListenerConsumer.this.logger.isDebugEnabled()) { ListenerConsumer.this.logger.debug("Stopping invoker"); } this.active = false; try { if (!this.exitLatch.await(getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS) && this.executingThread != null) { if (ListenerConsumer.this.logger.isDebugEnabled()) { ListenerConsumer.this.logger.debug("Interrupting invoker"); } this.executingThread.interrupt(); } } catch (InterruptedException e) { if (this.executingThread != null) { this.executingThread.interrupt(); } Thread.currentThread().interrupt(); } if (ListenerConsumer.this.logger.isDebugEnabled()) { ListenerConsumer.this.logger.debug("Invoker stopped"); } } }
private void invokeListener(final ConsumerRecords<K, V> records) { Iterator<ConsumerRecord<K, V>> iterator = records.iterator(); //自动提交或者有invoker,都进入,费解,目前看到的只有非自动提交时才startInvoker啊,得继续看看 while (iterator.hasNext() && (this.autoCommit || (this.invoker != null && this.invoker.active))) { final ConsumerRecord<K, V> record = iterator.next(); if (this.logger.isTraceEnabled()) { this.logger.trace("Processing " + record); } try { if (this.acknowledgingMessageListener != null) {//有Acknowledgment参数的onMessage方法 this.acknowledgingMessageListener.onMessage(record, this.isAnyManualAck ? new ConsumerAcknowledgment(record, this.isManualImmediateAck) : null); } else {//没有Acknowledgment参数的onMessage方法 this.listener.onMessage(record); } if (!this.isAnyManualAck && !this.autoCommit) {//非人工提交、非自动提交,加入到acks this.acks.add(record); } } catch (Exception e) { if (this.containerProperties.isAckOnError() && !this.autoCommit) {//开启了ackOnError时,加入到acks this.acks.add(record); } if (this.containerProperties.getErrorHandler() != null) { this.containerProperties.getErrorHandler().handle(e, record);//errorhandler在这里被调用 } else { this.logger.error("Listener threw an exception and no error handler for " + record, e); } } } }
private void processCommits() { handleAcks();//处理acks队列,manual_immedaite的立刻提交,其他的加入到offsets this.count += this.acks.size(); long now; AckMode ackMode = this.containerProperties.getAckMode(); if (!this.isManualImmediateAck) {//不是立刻人工提交时 if (!this.isManualAck) {//不是人工提交时 updatePendingOffsets();//既不是自动提交又不是人工提交,那么就走这里,是更新offsets数据,为什么要把manual的排除在外呢? } boolean countExceeded = this.count >= this.containerProperties.getAckCount(); if (this.isManualAck || this.isBatchAck || this.isRecordAck || (ackMode.equals(AckMode.COUNT) && countExceeded)) {//ackMode是人工、单条、批量、数量时,提交操作 if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) { this.logger.debug("Committing in AckMode.COUNT because count " + this.count + " exceeds configured limit of " + this.containerProperties.getAckCount()); } commitIfNecessary(); this.count = 0; } else { now = System.currentTimeMillis(); boolean elapsed = now - this.last > this.containerProperties.getAckTime(); if (ackMode.equals(AckMode.TIME) && elapsed) {//ackMode是time时 if (this.logger.isDebugEnabled()) { this.logger.debug("Committing in AckMode.TIME " + "because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime()); } commitIfNecessary(); this.last = now; } else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {//ackMode是count_time时 if (this.logger.isDebugEnabled()) { if (elapsed) { this.logger.debug("Committing in AckMode.COUNT_TIME " + "because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime()); } else { this.logger.debug("Committing in AckMode.COUNT_TIME " + "because count " + this.count + " exceeds configured limit of" + this.containerProperties.getAckCount()); } } commitIfNecessary(); this.last = now; this.count = 0; } } } }
private void handleAcks() { ConsumerRecord<K, V> record = this.acks.poll(); while (record != null) { if (this.logger.isTraceEnabled()) { this.logger.trace("Ack: " + record); } processAck(record); record = this.acks.poll(); } } private void processAck(ConsumerRecord<K, V> record) { if (ListenerConsumer.this.isManualImmediateAck) { try { ackImmediate(record);//人工立即提交的,就这里提交了 } catch (WakeupException e) { // ignore - not polling } } else { addOffset(record); //加到offsets里 } }
private void updatePendingOffsets() { ConsumerRecord<K, V> record = this.acks.poll();//从阻塞队列取数据,数据在invokeListener里放进去的 while (record != null) { addOffset(record); record = this.acks.poll(); } } private void addOffset(ConsumerRecord<K, V> record) { if (!this.offsets.containsKey(record.topic())) { this.offsets.put(record.topic(), new HashMap<Integer, Long>()); } this.offsets.get(record.topic()).put(record.partition(), record.offset());//offsets是个嵌套MAP,topic partition offset }
private void commitIfNecessary() { Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>(); //提交是从offsets里取出来的,不是人工提交时是updatePendingOffsets()方法放进去的,人工提交时是handleAcks()方法放进去的 //这两个方法都是从acks队列取数据,那么新的问题是,人工提交时在哪里放到acks队列里去的? for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) { for (Entry<Integer, Long> offset : entry.getValue().entrySet()) { commits.put(new TopicPartition(entry.getKey(), offset.getKey()), new OffsetAndMetadata(offset.getValue() + 1)); } } this.offsets.clear();//offsets清空 if (this.logger.isDebugEnabled()) { this.logger.debug("Commit list: " + commits); } if (!commits.isEmpty()) { if (this.logger.isDebugEnabled()) { this.logger.debug("Committing: " + commits); } try { if (this.containerProperties.isSyncCommits()) { this.consumer.commitSync(commits);//kakfa-clients.jar里面的方法了 } else { this.consumer.commitAsync(commits, this.commitCallback);//kakfa-clients.jar里面的方法了 } } catch (WakeupException e) { // ignore - not polling if (this.logger.isDebugEnabled()) { this.logger.debug("Woken up during commit"); } } } }
也就是说:
一、autocommit =false时
1、ackMode不是manual、manual_immediate的时候,invokeListener把消息加入到ack队列里去
2、是manual、manual_immediate的时候,用户调用acknowledge方法,把消息加入到ack队列;
autocommit = true时,消息不放到ack队列
二、看processCommits方法的代码,貌似manual_immediate的发起提交也并不比manual、batch、record、count、time、count_time的发起提交提前很多,不太理解是个什么节奏
三、autocommit =ture时,在哪里commit offset的还没看见。。。
相关推荐
**Kafka Tool 连接 Kafka 工具详解** 在大数据处理和实时流处理领域,Apache Kafka 是一个不可或缺的组件,它作为一个分布式的消息中间件,提供高效、可扩展且可靠的发布订阅服务。为了方便管理和操作 Kafka 集群,...
**Kafka Tool:高效管理Apache Kafka集群的利器** Apache Kafka是一个分布式的流处理平台,广泛应用于大数据实时处理、日志聚合、消息系统等多个领域。在Kafka的实际操作中,管理和监控集群是至关重要的任务,而...
**Kafka工具详解——Kafkatool** Kafka作为一个分布式流处理平台,广泛应用于大数据实时处理和消息传递。然而,管理Kafka集群和操作其组件(如topics、partitions、offsets等)可能会变得复杂,这时就需要一些可视...
Kafka集群为每个topic维护一个分区日志,分区是一个有序且不可变的记录集,并且数据记录会不断地追加到结构化的commit log文件中。每个记录都有一个offset标识,表示顺序。 Kafka的四个核心API包括: 1. 生产者API...
在Java开发环境中,Kafka作为一个分布式流处理平台,被广泛用于构建实时数据管道和流应用。这个"Kafka的Java依赖包"包含了所有你需要在Java项目中与Kafka进行本地交互所需的jar包。这些jar包提供了完整的API,使得...
在Spring Boot应用中,我们可以利用Spring Kafka框架来与Apache Kafka进行集成,实现高效的消息传递。本文将详细探讨如何在Spring Boot项目中基于Spring Kafka动态创建Kafka消费者。 首先,了解Kafka基本概念:...
本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一个开源的分布式实时计算系统,它能够持续处理无限的数据流,确保每个事件都得到精确一次(Exactly...
Apache Kafka 是一个分布式流处理平台,常用于构建实时的数据管道和应用。Kafka 提供了高吞吐量、低延迟的消息传递能力,是大数据领域中重要的消息队列(MQ)解决方案。Kafka-Eagle 是针对 Kafka 集群设计的一款高效...
### Kafka 概述 Kafka 是一款开源的分布式消息系统,以其高吞吐量、低延迟的特点,在大数据处理领域有着广泛的应用。Kafka 由 LinkedIn 开发并贡献给 Apache 软件基金会,最终成为其顶级项目之一。Kafka 的核心设计...
在IT行业中,Kafka是一种广泛使用的分布式流处理平台,它由Apache软件基金会开发,主要用于构建实时数据管道和流应用。本文将围绕标题和描述中提到的两种Kafka工具——kafkatool-64bit.exe和kafka-eagle-bin-1.4.6....
**Kafka详细课程讲义** 本课程主要涵盖了Apache Kafka的核心概念、安装配置、架构解析、API使用以及监控与面试知识点,旨在帮助学习者全面理解并掌握这一强大的分布式流处理平台。 **第 1 章 Kafka 概述** Apache...
【Kafka基础知识】 Kafka是由Apache开发的分布式流处理平台,它主要被设计用来处理实时数据流。在大数据处理领域,Kafka常被用于构建实时数据管道和流应用,能够高效地处理大量的实时数据。 【Java与Kafka的结合】...
**Kafka Tool for Linux: 管理与使用Apache Kafka集群的高效工具** Apache Kafka是一款分布式流处理平台,常用于构建实时数据管道和流应用。Kafka Tool是针对Kafka集群进行管理和操作的一款图形用户界面(GUI)工具...
**Kafka概述** Kafka是由LinkedIn开发并贡献给Apache软件基金会的一个开源消息系统,它是一个高性能、可扩展的分布式消息中间件。Kafka最初设计的目标是处理网站活动流数据,但随着时间的发展,它已被广泛应用于...
《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
**Kafka介绍** Apache Kafka是一款高性能、分布式的消息中间件,由LinkedIn开发并捐献给Apache软件基金会。它最初设计的目标是构建一个实时的数据管道,能够高效地处理大量的数据流,同时支持发布订阅和队列模型,...
**Kafka 2.5.1 知识点详解** Kafka 是一个分布式流处理平台,由 Apache 软件基金会开发,广泛应用于大数据实时处理、日志收集、消息系统等多个领域。`kafka_2.12-2.5.1` 是 Kafka 的一个特定版本,针对 Scala 2.12 ...
Kafka Tool 2.0.4是一款专为Kafka设计的强大的客户端工具,尤其适用于Mac操作系统。它提供了一种直观且可视化的界面,让用户能够轻松地连接到Kafka服务并进行各种操作,包括但不限于管理Topic、监控集群状态以及进行...
### 关于Kafka资源下载kafka_2.11-2.0.0.tgz的知识点 #### Kafka简介 Apache Kafka是一种开源的消息队列服务,它最初由LinkedIn开发,并于2011年成为Apache软件基金会的一个顶级项目。Kafka因其高性能、可扩展性和...