`
iwinit
  • 浏览: 454785 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

[metaq]Producer

阅读更多

Procuder相对consumer来说比较简单,根据topic从zk拿broker列表,注意这里只拿master类型的broker,slave型的broker和master拥有同样的broker id,主要为了HA用。roubd-robin取一个partition,发送消息。

 

1.MetaMessageSessionFactory初始化zookeeper连接,创建MessageProducer,默认SimpleMessageProducer

 

2.订阅topic,是一个异步操作

public void publishTopic(final String topic, final Object ref) {
        if (this.topicConnectionListeners.get(topic) != null) {
            this.addRef(topic, ref);
            return;
        }
        final FutureTask<BrokerConnectionListener> task =
                new FutureTask<BrokerConnectionListener>(new Callable<BrokerConnectionListener>() {
                    @Override
                    public BrokerConnectionListener call() throws Exception {
                        final BrokerConnectionListener listener = new BrokerConnectionListener(topic);
                        if (ProducerZooKeeper.this.zkClient != null) {
                            ProducerZooKeeper.this.publishTopicInternal(topic, listener);
                        }
                        listener.references.add(ref);
                        return listener;
                    }

                });

        final FutureTask<BrokerConnectionListener> existsTask = this.topicConnectionListeners.putIfAbsent(topic, task);
        if (existsTask == null) {
            task.run();
        }
        else {
            this.addRef(topic, ref);
        }
    }

 订阅过程

private void publishTopicInternal(final String topic, final BrokerConnectionListener listener) throws Exception,
    NotifyRemotingException, InterruptedException {
	///meta/brokers/topics-pub/的topic节点,
        final String partitionPath = this.metaZookeeper.brokerTopicsPubPath + "/" + topic;
        ZkUtils.makeSurePersistentPathExists(ProducerZooKeeper.this.zkClient, partitionPath);
	//BrokerConnectionListener处理broker变化的情况
        ProducerZooKeeper.this.zkClient.subscribeChildChanges(partitionPath, listener);
        // 第一次要同步等待就绪
        listener.syncedUpdateBrokersInfo();
    }

 同步broker信息过程

 void syncedUpdateBrokersInfo() throws NotifyRemotingException, InterruptedException {
            this.lock.lock();
            try {
		//取该topic下所有master节点,节点名称有后缀'-m',key是brokerId,value是broker地址,从节点目录/meta/brokers/ids/0/master下读取
                final Map<Integer, String> newBrokerStringMap =
                        ProducerZooKeeper.this.metaZookeeper.getMasterBrokersByTopic(this.topic);
                final List<String> topics = new ArrayList<String>(1);
                topics.add(this.topic);
		//返回master的topic到partition映射的map,key是topic,value是对应的master中配置的partition集合
                final Map<String, List<Partition>> newTopicPartitionMap =
                        ProducerZooKeeper.this.metaZookeeper.getPartitionsForTopicsFromMaster(topics);

                log.warn("Begin receiving broker changes for topic " + this.topic + ",broker ids:"
                        + newTopicPartitionMap);
                // Connect to new brokers
		//新broker创建连接
                for (final Map.Entry<Integer, String> newEntry : newBrokerStringMap.entrySet()) {
                    final Integer newBrokerId = newEntry.getKey();
                    final String newBrokerString = newEntry.getValue();
                    // 新的有,旧的没有,创建
                    if (!this.brokersInfo.oldBrokerStringMap.containsKey(newBrokerId)) {
                        ProducerZooKeeper.this.remotingClient.connect(newBrokerString, this);
                        ProducerZooKeeper.this.remotingClient.awaitReadyInterrupt(newBrokerString);
                        log.warn("Connect to " + newBrokerString);
                    }
                }
                // Close removed brokers.
		//没用的broker删除连接
                for (final Map.Entry<Integer, String> oldEntry : this.brokersInfo.oldBrokerStringMap.entrySet()) {
                    final Integer oldBrokerId = oldEntry.getKey();
                    final String oldBrokerString = oldEntry.getValue();
                    final String newBrokerString = newBrokerStringMap.get(oldBrokerId);
                    // 新旧都有
                    if (newBrokerStringMap.containsKey(oldBrokerId)) {
                        // 判断内容是否变化
                        if (!newBrokerString.equals(oldBrokerString)) {
                            log.warn("Close " + oldBrokerString + ",connect to " + newBrokerString);
                            ProducerZooKeeper.this.remotingClient.connect(newBrokerString, this);
                            ProducerZooKeeper.this.remotingClient.awaitReadyInterrupt(newBrokerString);
                            ProducerZooKeeper.this.remotingClient.close(oldBrokerString, this, false);
                        }
                        else {
                            // ignore
                        }
                    }
                    else {
                        // 新的没有,旧的有,关闭
                        ProducerZooKeeper.this.remotingClient.close(oldBrokerString, this, false);
                        log.warn("Close " + oldBrokerString);
                    }
                }

                // Set the new brokers info.
                this.brokersInfo = new BrokersInfo(newBrokerStringMap, newTopicPartitionMap);
                log.warn("End receiving broker changes for topic " + this.topic);
            }
            finally {
                this.lock.unlock();
            }
        }
    }

 3. 发送消息

 private SendResult send0(final Message message, final byte[] encodedData, final long timeout, final TimeUnit unit)
            throws InterruptedException, MetaClientException {
        try {
            final String topic = message.getTopic();
            Partition partition = null;
            String serverUrl = null;
            // 如果在事务内,则使用上一次发送消息时选择的broker
            if (this.isInTransaction()) {
                final LastSentInfo info = this.lastSentInfo.get();
                if (info != null) {
                    serverUrl = info.serverUrl;
                    // 选择该broker内的某个分区
                    partition =
                            this.producerZooKeeper.selectPartition(topic, message, this.partitionSelector, serverUrl);
                    if (partition == null) {
                        // 没有可用分区,抛出异常
                        throw new MetaClientException("There is no partitions in `" + serverUrl
                            + "` to send message with topic `" + topic + "` in a transaction");
                    }
                }
            }
		//默认采用round-robin策略发送,partition里取一个
            if (partition == null) {
                partition = this.selectPartition(message);
            }
            if (partition == null) {
                throw new MetaClientException("There is no aviable partition for topic " + topic
                    + ",maybe you don't publish it at first?");
            }
            if (serverUrl == null) {
                serverUrl = this.producerZooKeeper.selectBroker(topic, partition);
            }
            if (serverUrl == null) {
                throw new MetaClientException("There is no aviable server right now for topic " + topic
                    + " and partition " + partition + ",maybe you don't publish it at first?");
            }

            if (this.isInTransaction() && this.lastSentInfo.get() == null) {
                // 第一次发送,需要启动事务
                this.beforeSendMessageFirstTime(serverUrl);
            }

            final int flag = MessageFlagUtils.getFlag(message);
		//Command拼装
            final PutCommand putCommand =
                    new PutCommand(topic, partition.getPartition(), encodedData, flag, CheckSum.crc32(encodedData),
                        this.getTransactionId(), OpaqueGenerator.getNextOpaque());
		//发送
            final BooleanCommand resp = this.invokeToGroup(serverUrl, partition, putCommand, message, timeout, unit);
		//返回结果
            return this.genSendResult(message, partition, serverUrl, resp);
        }
        catch (final TimeoutException e) {
            throw new MetaOpeartionTimeoutException("Send message timeout in "
                    + TimeUnit.MILLISECONDS.convert(timeout, unit) + " mills");
        }
        catch (final InterruptedException e) {
            throw e;
        }
        catch (final MetaClientException e) {
            throw e;
        }
        catch (final Exception e) {
            throw new MetaClientException("send message failed", e);
        }
    }

 

分享到:
评论
1 楼 chenghaitao111111 2016-06-14  
兄弟syncedUpdateBrokersInfo这个代码和github上有不同,是不是你这边修复了bug呢。我的程序在这里就出现了一个bug。

相关推荐

    Metaq原理与应用

    - **Producer**:生产者是负责创建并发送消息的组件,通常由业务系统生成消息。 - **Consumer**:消费者用于接收和处理消息,通常为后台系统进行异步消费。 - **Consumer Group**:消费者组是一组具有相同消费...

    metaq-server-1.4.6.2.tar.gz

    MetaQ Server的架构主要包括Producer、Consumer、Broker和Controller四个主要部分: 1. Producer:生产者负责将消息发送到MetaQ Server,通过API接口与Server进行通信。 2. Consumer:消费者从MetaQ Server订阅并...

    metaq-server-1.4.6.2.zip 和原版一样就是换了个名字

    Client则分为Producer和Consumer,Producer负责发送消息,Consumer负责接收和消费消息。 在性能优化方面,MetaQ 1.4.6.2版本可能包括了对网络IO、多线程并行处理和内存管理等方面的改进。例如,可能会采用零拷贝...

    Metaq详细手册.docx

    《Metaq详细手册》 Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息...

    MetaQ 分布式消息服务中间件.pdf

    在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者主动从MetaQ拉取数据,解析成消息并进行消费。MetaQ的架构设计中,...

    阿里rocketMQ

    RocketMQ的核心组件包括Producer、Consumer、NameServer和Broker四部分: 1. **Producer**:生产者是消息的发送方,负责创建并发送消息到RocketMQ Broker。RocketMQ提供了同步和异步两种发送模式,同步模式确保...

    RocketMQ最全介绍与实战.pdf

    RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...

    rocketmq-note.pdf

    RocketMQ 的发展历程可以追溯到Metaq 1.x,它是基于Kafka的Java重写版。随后,Metaq经历了2.x版本的存储优化,以适应阿里集团的大规模交易需求。2012年,RocketMQ 3.0版本发布,成为独立的产品,并在2017年成为...

    消息中间件rocketmq原理解析

    - **发送消息到broker**:Producer将消息发送至Master Broker,然后消息通过Broker的主从复制机制同步到Slave Broker上。 ### Producer消息发送 - **轮询队列实现负载均衡**:Producer通过轮询的方式遍历Topic下的...

    kafka学习文档

    数据生产者,producer 的用法:《producer 的用法》、《producer 使用注意》 数据消费者,consumer 的用法:《consumer 的用法》 迓有些零碎的,关亍通信段的源码览读:《net 包源码览读》、《broker 配置》 扩展的...

    消息中间件 rocketmq原理解析

    ### 一、Producer #### 1. Producer启动流程 Producer是消息的发送者,它的启动流程如下: - 在发送消息时,如果Producer集合中没有对应topic的信息,则会向NameServer查询该topic的相关信息,并将其缓存在本地; -...

    【系统架构】最全最强解析:支付宝钱包系统架构内部剖析(架构图).docx

    支付宝钱包系统架构内部剖析 支付宝钱包系统架构概况: 支付宝钱包系统架构是...4. 分布式环境下(broker,producer,consumer都为集群)的消息路由,对顺序和可靠性有极高要求的场景 5. 作为一般MQ来使用的其他功能

    阿里云ons指南

    阿里云ONS(消息队列服务)是基于阿里云提供的分布式的、高可靠性的消息服务产品,它来源于阿里内部广泛使用的消息中间件MetaQ,亦即后来的开源项目RocketMQ。ONS支持海量消息的生产与消费,以无单点故障、高可扩展...

    Window搭建部署RocketMQ步骤详解

    RocketMQ是一个由阿里巴巴开源的消息中间件,脱胎自阿里巴巴的MetaQ,在设计上借鉴了Kafka。下面将详细介绍Window搭建部署RocketMQ的步骤。 一、安装RocketMQ 首先需要下载RocketMQ的发行版本,解压缩后得到bin、...

    RocketMQ原理简介

    最后,文档中还提供了产品发展历史的概述,从Metaq的迭代到RocketMQ的正式上线,以及其后续版本的发展。介绍了如何在不同的业务系统中深度定制RocketMQ来满足特定的需求,如淘宝、支付宝和B2B等场景下的应用。并且,...

    你应该知道的RocketMQ

    RocketMQ前身叫做MetaQ,在MeataQ发布3.0版本的时候改名为RocketMQ,其本质上的设计思路和Kafka类似,但是和Kafka不同的是其使用Java进行开发,由于在国内的Java受众群体远远多于Scala,所以RocketMQ是很多以Java语言...

    阿里RocketMQ用户指南V3.2.4.pdf

    文档中提到了RocketMQ从早期的Metaq版本到3.x版本的发展历程,以及与早期版本的兼容性问题。用户指南V3.2.4还提及了与其它系统的整合,比如与JMS、CORBA Notification的整合,这说明了RocketMQ的开放性和对不同消息...

    分布式消息引擎Apache RocketMQ最佳实践

    3. **Producer**:消息生产者,负责向Broker发送消息。 4. **Consumer**:消息消费者,负责从Broker拉取消息或接收消息推送。 RocketMQ的设计充分考虑了分布式系统的特性和需求,实现了高并发、低延迟以及高吞吐量...

Global site tag (gtag.js) - Google Analytics