`
m635674608
  • 浏览: 5043645 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

RocketMQ 消费者 多线程 源码

    博客分类:
  • MQ
 
阅读更多
/**
 * Wrapped push consumer.in fact,it works as remarkable as the pull consumer
 *
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-7-24
 */
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { 


   /**
     * Minimum consumer thread number
     */
    private int consumeThreadMin = 20;
    /**
     * Max consumer thread number
     */
    private int consumeThreadMax = 64;

 

 

  

  com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl

public void start() throws MQClientException {
        switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}",
                this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(),
                this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            this.copySubscription();

            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }

            this.mQClientFactory =
                    MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,
                        this.rpcHook);

            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
                .getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            this.pullAPIWrapper = new PullAPIWrapper(//
                mQClientFactory,//
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            }
            else {
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    this.offsetStore =
                            new LocalFileOffsetStore(this.mQClientFactory,
                                this.defaultMQPushConsumer.getConsumerGroup());
                    break;
                case CLUSTERING:
                    this.offsetStore =
                            new RemoteBrokerOffsetStore(this.mQClientFactory,
                                this.defaultMQPushConsumer.getConsumerGroup());
                    break;
                default:
                    break;
                }
            }
            this.offsetStore.load();

            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this,
                            (MessageListenerOrderly) this.getMessageListenerInner());
            }
            else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this,
                            (MessageListenerConcurrently) this.getMessageListenerInner());
            }
            //消费者服务
            this.consumeMessageService.start();

            boolean registerOK =
                    mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown();
                throw new MQClientException("The consumer group["
                        + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please."
                        + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
            }

            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
                    + this.serviceState//
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
        default:
            break;
        }

        this.updateTopicSubscribeInfoWhenSubscriptionChanged();

        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        this.mQClientFactory.rebalanceImmediately();
    }

 

 

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
    private static final Logger log = ClientLogger.getLog();
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final MessageListenerConcurrently messageListener;
    private final BlockingQueue<Runnable> consumeRequestQueue;
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;

    private final ScheduledExecutorService scheduledExecutorService;


    public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
            MessageListenerConcurrently messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;

        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

        this.consumeExecutor = new ThreadPoolExecutor(//
            this.defaultMQPushConsumer.getConsumeThreadMin(),//
            this.defaultMQPushConsumer.getConsumeThreadMax(),//
            1000 * 60,//
            TimeUnit.MILLISECONDS,//
            this.consumeRequestQueue,//
            new ThreadFactoryImpl("ConsumeMessageThread_"));

        this.scheduledExecutorService =
                Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
                    "ConsumeMessageScheduledThread_"));
    }


    public void start() {
    }

  

   @Override
    public void submitConsumeRequest(//
            final List<MessageExt> msgs, //
            final ProcessQueue processQueue, //
            final MessageQueue messageQueue, //
            final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
        else {
            for (int total = 0; total < msgs.size();) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    }
                    else {
                        break;
                    }
                }

                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                this.consumeExecutor.submit(consumeRequest);
            }
        }
    }

   

com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
    public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is droped.", pullRequest.toString());
            return;
        }

        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

        try {
            this.makeSureStateOK();
        }
        catch (MQClientException e) {
            log.warn("pullMessage exception, consumer state not ok", e);
            this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenException);
            return;
        }

        if (this.isPause()) {
            log.warn("consumer was paused, execute pull request later. instanceName={}",
                this.defaultMQPushConsumer.getInstanceName());
            this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenSuspend);
            return;
        }

        long size = processQueue.getMsgCount().get();
        if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenFlowControl);
            if ((flowControlTimes1++ % 1000) == 0) {
                log.warn("the consumer message buffer is full, so do flow control, {} {} {}", size,
                    pullRequest, flowControlTimes1);
            }
            return;
        }

        if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenFlowControl);
                if ((flowControlTimes2++ % 1000) == 0) {
                    log.warn("the queue's messages, span too long, so do flow control, {} {} {}",
                        processQueue.getMaxSpan(), pullRequest, flowControlTimes2);
                }
                return;
            }
        }

        final SubscriptionData subscriptionData =
                this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            // 由于并发关系,即使找不到订阅关系,也要重试下,防止丢失PullRequest
            this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenException);
            log.warn("find the consumer's subscription failed, {}", pullRequest);
            return;
        }

        final long beginTimestamp = System.currentTimeMillis();

        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult =
                            DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                                pullRequest.getMessageQueue(), pullResult, subscriptionData);

                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        long prevRequestOffset = pullRequest.getNextOffset();
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                        long pullRT = System.currentTimeMillis() - beginTimestamp;
                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(
                            pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);

                        long firstMsgOffset = Long.MAX_VALUE;
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        }
                        else {
                            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(
                                pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(),
                                pullResult.getMsgFoundList().size());

                            boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                pullResult.getMsgFoundList(), //
                                processQueue, //
                                pullRequest.getMessageQueue(), //
                                dispathToConsume);

                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                            }
                            else {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            }
                        }

                        if (pullResult.getNextBeginOffset() < prevRequestOffset//
                                || firstMsgOffset < prevRequestOffset) {
                            log.warn(
                                "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",//
                                pullResult.getNextBeginOffset(),//
                                firstMsgOffset,//
                                prevRequestOffset);
                        }

                        break;
                    case NO_NEW_MSG:
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        break;
                    case NO_MATCHED_MSG:
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        break;
                    case OFFSET_ILLEGAL:
                        log.warn("the pull request offset illegal, {} {}",//
                            pullRequest.toString(), pullResult.toString());

                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                        pullRequest.getProcessQueue().setDropped(true);
                        DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                            @Override
                            public void run() {
                                try {
                                    DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(
                                        pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);

                                    DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest
                                        .getMessageQueue());

                                    DefaultMQPushConsumerImpl.this.rebalanceImpl
                                        .removeProcessQueue(pullRequest.getMessageQueue());

                                    log.warn("fix the pull request offset, {}", pullRequest);
                                }
                                catch (Throwable e) {
                                    log.error("executeTaskLater Exception", e);
                                }
                            }
                        }, 10000);
                        break;
                    default:
                        break;
                    }
                }
            }

 

分享到:
评论

相关推荐

    springboot整合rocketmq源码

    - **线程安全**:在多线程环境下,确保生产者和消费者的线程安全。 - **消息幂等性**:设计幂等消费策略,避免重复消息导致的问题。 - **监控与报警**:集成监控工具,如Prometheus和Grafana,实时监控RocketMQ的...

    rocketmq的源码文件

    源码分析可以帮助理解消费模式的实现、消费进度管理、多线程消费策略等。 5. **Message Model**:RocketMQ支持点对点(Point-to-Point)和发布订阅(Publish/Subscribe)两种消息模型。点对点模型下,每个消息只被...

    rocketmq实战与源码分析.zip

    - 并行处理:通过多线程和批量发送/接收消息提高处理速度。 - 分布式调度:通过NameServer进行分布式调度,减少网络通信开销。 7. **源码分析**: - 消息发送流程:理解如何将消息从生产者端发送到队列,涉及...

    LG机构RocketMQ视频源码资料课件

    2. **RocketMQ概念解析**:RocketMQ的核心概念包括Producer(生产者)、Consumer(消费者)、Message(消息)、Topic(主题)、Queue(队列)等。Producer负责发送消息,Consumer负责接收消息,Message是传输的对象...

    rocketMq实战(2)-客户端集成

    三、 RocketMQ消费者集成 1. 创建消费者实例 消费者同样需要创建一个实例,并指定消费组名。根据消费模式,有Push Consumer(推模式)和Pull Consumer(拉模式)两种。 ```java DefaultMQPushConsumer consumer = ...

    rocketmq-all-4.4.0.zip

    在学习RocketMQ源码时,重点关注其设计模式、线程模型、网络通信、存储结构等方面,理解其实现原理,有助于提升分布式系统的设计与优化能力。对于开发者而言,熟悉RocketMQ的各项特性和使用方式,将有助于构建高可靠...

    最新版 rocketmq-all-4.8.0-bin-release.zip

    - 并发能力:优化了多线程处理,提升了并发消息处理能力。 - 消息压缩:支持消息压缩,降低网络传输开销,提高传输效率。 - Batch Message:批量发送消息,减少网络交互次数,提高整体性能。 7. **管理工具**: ...

    rocketmq-all-4.0.0-incubating-bin-release

    通过阅读"rocketmq-all-4.0.0-incubating-bin-release"的源码,我们可以深入了解RocketMQ的内部实现,如网络通信、线程模型、调度算法等,有助于优化系统性能,解决实际问题,或者为 RocketMQ 开发新的特性。...

    rocketmq-client-cpp-master.zip

    7. **线程模型**:RocketMQ-Client-CPP 库内部采用多线程模型来提高并发处理能力,如使用单独的线程池处理发送和接收消息。 8. **回调函数**:用户可以通过注册回调函数来处理发送和消费消息的结果,提供异步处理的...

    全面解剖RocketMQ和项目实战-day4-part2.7z

    【全面解剖RocketMQ与项目实战-day4-part2】是一个深入学习Apache RocketMQ的课程资料压缩包,包含多个视频教程,旨在帮助用户深入了解RocketMQ的工作原理及实际应用。RocketMQ是一个开源的消息中间件,广泛应用于...

    消息中间件消息队列常见面试题

    RocketMQ更倾向于Pull模型,允许消费者主动拉取,而Kafka则在生产者推送和消费者拉取之间找到了平衡,通过 broker 的持有消息能力实现了类似Push的效果。 【事务消息】 在消息队列中,事务消息是一个重要的话题。...

    消息队列程序源码.rar-综合文档

    1. **多线程/异步编程**:实现生产者和消费者的并发处理。 2. **网络通信**:使用TCP/IP或HTTP协议进行消息传输。 3. **数据序列化与反序列化**:将消息转换为网络可传输的格式。 4. **队列数据结构**:用于存储待...

Global site tag (gtag.js) - Google Analytics