/** * Wrapped push consumer.in fact,it works as remarkable as the pull consumer * * @author shijia.wxr<vintage.wang@gmail.com> * @since 2013-7-24 */ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { /** * Minimum consumer thread number */ private int consumeThreadMin = 20; /** * Max consumer thread number */ private int consumeThreadMax = 64;
com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl public void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer .getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper(// mQClientFactory,// this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } } this.offsetStore.load(); if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } //消费者服务 this.consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, "// + this.serviceState// + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); }
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { private static final Logger log = ClientLogger.getLog(); private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; private final DefaultMQPushConsumer defaultMQPushConsumer; private final MessageListenerConcurrently messageListener; private final BlockingQueue<Runnable> consumeRequestQueue; private final ThreadPoolExecutor consumeExecutor; private final String consumerGroup; private final ScheduledExecutorService scheduledExecutorService; public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor(// this.defaultMQPushConsumer.getConsumeThreadMin(),// this.defaultMQPushConsumer.getConsumeThreadMax(),// 1000 * 60,// TimeUnit.MILLISECONDS,// this.consumeRequestQueue,// new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "ConsumeMessageScheduledThread_")); } public void start() { }
@Override public void submitConsumeRequest(// final List<MessageExt> msgs, // final ProcessQueue processQueue, // final MessageQueue messageQueue, // final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } else { for (int total = 0; total < msgs.size();) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } } }
com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl public void pullMessage(final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is droped.", pullRequest.toString()); return; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this.makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenException); return; } if (this.isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}", this.defaultMQPushConsumer.getInstanceName()); this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenSuspend); return; } long size = processQueue.getMsgCount().get(); if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenFlowControl); if ((flowControlTimes1++ % 1000) == 0) { log.warn("the consumer message buffer is full, so do flow control, {} {} {}", size, pullRequest, flowControlTimes1); } return; } if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenFlowControl); if ((flowControlTimes2++ % 1000) == 0) { log.warn("the queue's messages, span too long, so do flow control, {} {} {}", processQueue.getMaxSpan(), pullRequest, flowControlTimes2); } return; } } final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { // 由于并发关系,即使找不到订阅关系,也要重试下,防止丢失PullRequest this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}", pullRequest); return; } final long beginTimestamp = System.currentTimeMillis(); PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult( pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT( pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS( pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// pullResult.getMsgFoundList(), // processQueue, // pullRequest.getMessageQueue(), // dispathToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset// || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",// pullResult.getNextBeginOffset(),// firstMsgOffset,// prevRequestOffset); } break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}",// pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset( pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest .getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl .removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } }
相关推荐
- **线程安全**:在多线程环境下,确保生产者和消费者的线程安全。 - **消息幂等性**:设计幂等消费策略,避免重复消息导致的问题。 - **监控与报警**:集成监控工具,如Prometheus和Grafana,实时监控RocketMQ的...
源码分析可以帮助理解消费模式的实现、消费进度管理、多线程消费策略等。 5. **Message Model**:RocketMQ支持点对点(Point-to-Point)和发布订阅(Publish/Subscribe)两种消息模型。点对点模型下,每个消息只被...
- 并行处理:通过多线程和批量发送/接收消息提高处理速度。 - 分布式调度:通过NameServer进行分布式调度,减少网络通信开销。 7. **源码分析**: - 消息发送流程:理解如何将消息从生产者端发送到队列,涉及...
2. **RocketMQ概念解析**:RocketMQ的核心概念包括Producer(生产者)、Consumer(消费者)、Message(消息)、Topic(主题)、Queue(队列)等。Producer负责发送消息,Consumer负责接收消息,Message是传输的对象...
三、 RocketMQ消费者集成 1. 创建消费者实例 消费者同样需要创建一个实例,并指定消费组名。根据消费模式,有Push Consumer(推模式)和Pull Consumer(拉模式)两种。 ```java DefaultMQPushConsumer consumer = ...
在学习RocketMQ源码时,重点关注其设计模式、线程模型、网络通信、存储结构等方面,理解其实现原理,有助于提升分布式系统的设计与优化能力。对于开发者而言,熟悉RocketMQ的各项特性和使用方式,将有助于构建高可靠...
- 并发能力:优化了多线程处理,提升了并发消息处理能力。 - 消息压缩:支持消息压缩,降低网络传输开销,提高传输效率。 - Batch Message:批量发送消息,减少网络交互次数,提高整体性能。 7. **管理工具**: ...
通过阅读"rocketmq-all-4.0.0-incubating-bin-release"的源码,我们可以深入了解RocketMQ的内部实现,如网络通信、线程模型、调度算法等,有助于优化系统性能,解决实际问题,或者为 RocketMQ 开发新的特性。...
7. **线程模型**:RocketMQ-Client-CPP 库内部采用多线程模型来提高并发处理能力,如使用单独的线程池处理发送和接收消息。 8. **回调函数**:用户可以通过注册回调函数来处理发送和消费消息的结果,提供异步处理的...
【全面解剖RocketMQ与项目实战-day4-part2】是一个深入学习Apache RocketMQ的课程资料压缩包,包含多个视频教程,旨在帮助用户深入了解RocketMQ的工作原理及实际应用。RocketMQ是一个开源的消息中间件,广泛应用于...
RocketMQ更倾向于Pull模型,允许消费者主动拉取,而Kafka则在生产者推送和消费者拉取之间找到了平衡,通过 broker 的持有消息能力实现了类似Push的效果。 【事务消息】 在消息队列中,事务消息是一个重要的话题。...
1. **多线程/异步编程**:实现生产者和消费者的并发处理。 2. **网络通信**:使用TCP/IP或HTTP协议进行消息传输。 3. **数据序列化与反序列化**:将消息转换为网络可传输的格式。 4. **队列数据结构**:用于存储待...