`
wbj0110
  • 浏览: 1587410 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

metamorphosis-3-metaQ初步

阅读更多

Java客户端例子

使用maven,引用metaq的java client非常简单:

<dependency>
  <groupId>com.taobao.metamorphosis</groupId>
  <artifactId>metamorphosis-client-extension</artifactId>
  <version>1.4.3</version>
</dependency>

 

 也可以引用 client-extend

<dependency>
  <groupId>com.taobao.metamorphosis</groupId>
  <artifactId>metamorphosis-client-extension</artifactId>
  <version>1.4.3</version>
</dependency>

 

 

直接用git 将metamorphosis-example clone  下载页面:https://github.com/killme2008/metamorphosis-example 。

请注意,1.4.3及以上版本的java客户端只能连接1.4.3及以上版本的MetaQ服务器,而1.4.3之前的老客户端则没有限制。主要是因为1.4.3引入了发布和订阅topic的分离,1.4.3的新客户端只能查找到新版本的broker

消息会话工厂类

在使用消息生产者和消费者之前,需要用到消息会话工厂类——MessageSessionFactory,由这个工厂帮你创建生产者或者消费者。除了这些,MessageSessionFactory还默默无闻地在后面帮你做很多事情,包括:

1.服务的查找和发现,通过diamond和zookeeper帮你查找日常的meta服务器地址列表,diamond可以忽略。

2.连接的创建和销毁,自动创建和销毁到meta服务器的连接,并做连接复用,也就是到同一台meta的服务器在一个工厂内只维持一个连接。

3.消费者的消息存储和恢复。

4.协调和管理各种资源,包括创建的生产者和消费者的。

因此,我们首先需要创建一个会话工厂类,MessageSessionFactory仅是一个接口,它的实现类常用的是MetaMessageSessionFactory:

 

MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(new MetaClientConfig()); 

请注意,MessageSessionFactory应当尽量复用,也就是作为应用中的单例来使用,简单的做法是交给spring之类的容器帮你托管

 

消息生产者  

 

 

package com.taobao.metamorphosis.example;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;

public class Producer {
    public static void main(String[] args) throws Exception {
        final MetaClientConfig metaClientConfig = new MetaClientConfig();
        final ZKConfig zkConfig = new ZKConfig();
        //设置zookeeper地址
        zkConfig.zkConnect = "127.0.0.1:2181";

        metaClientConfig.setZkConfig(zkConfig);
        // New session factory,强烈建议使用单例
        MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);

        // create producer,强烈建议使用单例
        MessageProducer producer = sessionFactory.createProducer();

        // publish topic
        final String topic = "matatest";

        producer.publish(topic);

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        String line = null;
        while ((line = reader.readLine()) != null) {
            // send message
            SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes()));

            // check result
            if (!sendResult.isSuccess()) {

                System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());
            }
            else {
                System.out.println("Send message successfully,sent to " + sendResult.getPartition());
            }
        }
    }

}

消息生产者的接口是MessageProducer,你可以通过它来发送消息。创建生产者很简单,通过MessageSessionFactory的createProducer方法即可以创建一个生产者。在Meta里,每个消息对象都是Message类的实例,Message表示一个消息对象,它包含这么几个属性:

  • id: Long型的消息id,消息的唯一id,系统自动产生,用户无法设置,在发送成功后由服务器返回,发送失败则为0。
  • topic: 消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,生产者通过指定发布的topic查找到需要连接的服务器地址,必须。
  • data: 消息的有效载荷,二进制数据,也就是消息内容,meta永远不会修改消息内容。消息内容通常限制在1M以内,作者的建议是最好不要发送超过上百K的消息,必须。数据是否压缩也完全取决于使用方。
  • attribute: 消息属性,一个字符串,可选。生产者可通过设置消息属性来让消费者简单过滤。

在sendMessage之前必须调用 MessageProducer的publish(topic)方法

producer.publish(topic); 

 

这一步在发送消息前是必要的,必须先发布将要发送消息的topic(另外,metaq的broker必须要先通过配置文件设置好topic,启动broker后,客户端才可能发布消息成功,不然就会报出异常),这是为了让会话工厂帮你去查找接收这些topic的meta服务器地址并初始化连接。这个步骤针对每个topic只需要做一次,多次调用无影响。

总结下这个例子,从标准输入读入你输入的数据,并将数据封装成一个Message对象,发送到[localhost:8123]服务的 topic为metatestbroker 上。

请注意,MessageProducer是线程安全的,完全可重复使用,因此最好在应用中作为单例来使用,一次创建,到处使用,配置为spring里的singleton bean。MessageProducer创建的代价昂贵,每次都需要通过zk查找服务器并创建tcp长连接。

消息消费者

发送消息后,消费者可以接收消息了,下面的代码创建异步消费者并订阅test这个主题,等待消息送达并打印消息内容

 

package com.taobao.metamorphosis.example;

import java.util.concurrent.Executor;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;


public class AsyncConsumer {
    public static void main(String[] args) throws Exception {
        final MetaClientConfig metaClientConfig = new MetaClientConfig();
        final ZKConfig zkConfig = new ZKConfig();
        //设置zookeeper地址
        zkConfig.zkConnect = "127.0.0.1:2181";

        metaClientConfig.setZkConfig(zkConfig);
        // New session factory,强烈建议使用单例
        MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);

        // subscribed topic
        final String topic = "metatest";

        // consumer group
        final String group = "meta-example";

        // create consumer,强烈建议使用单例
        MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));

        // subscribe topic
        consumer.subscribe(topic, 1024 * 1024, new MessageListener() {


            public void recieveMessages(Message message) {
                System.out.println("Receive message " + new String(message.getData()));
            }


            public Executor getExecutor() {
                // Thread pool to process messages,maybe null.
                return null;

            }
        });
        // complete subscribe
        consumer.completeSubscribe();

    }
}

通过createConsumer方法来创建MessageConsumer,请注意传入一个ConsumerConfig参数,这是消费者的配置对象。每个消息者都必须有一个ConsumerConfig配置对象,这里只设置了group属性,这是消费者的分组名称。Meta的Producer、Consumer和Broker都可以为集群。消费者可以组成一个集群共同消费同一个topic,发往这个topic的消息将按照一定的负载均衡规则发送给集群里的一台机器。同一个消费者集群必须拥有同一个分组名称,也就是同一个group。我们这里将分组名称设置为meta-example。

订阅消息通过subscribe方法,这个方法接受三个参数

  • topic,订阅的主题
  • maxSize,因为meta是一个消费者主动拉取的模型,这个参数规定每次拉取的最大数据量,单位为字节,这里设置为1M,默认最大为1M。
  • MessageListener,消息监听器,负责消息消息。

MessageListener的接口方法如下:

 

public interface MessageListener {
    /**
     * 接收到消息列表,只有message不为空并且不为null的情况下会触发此方法
     * 
     * 
@param message
     
*/

    public void recieveMessages(Message message);


    /**
     * 处理消息的线程池
     * 
     * 
@return
     
*/

    public Executor getExecutor();
}

消息的消费过程可以是一个并发处理的过程,getExecutor返回你想设置的线程池,每次消费都会在这个线程池里进行。recieveMessage方法用于实际的消息消费处理,message参数即为消费者收到的消息,它必不为null。

这里简单地打印收到的消息内容就完成消费。如果在消费过程中抛出任何异常,该条消息将会在一定间隔后重新尝试提交给MessageListener消费。在多次消费失败的情况下,该消息将会存储到消费者应用的本次磁盘,并在后台自动恢复重试消费。

在调用subscribe之后,我们还调用了completeSubscribe方法来完成订阅过程。请注意,subscribe仅是将订阅信息保存在本地,并没有实际跟meta服务器交互,要使得订阅关系生效必须调用一次completeSubscribe,completeSubscribe仅能被调用一次,多次调用将抛出异常。 为什么需要completeSubscribe方法呢,原因有二:

  • 首先,subscribe方法可以被调用多次,也就是一个消费者可以消费多种topic
  • 其次,如果每次调用subscribe都跟zk和meta服务器交互一次,代价太高

因此completeSubscribe一次性将所有订阅的topic生效,并处理跟zk和meta服务器交互的所有过程。

同样,MessageConsumer也是线程安全的,创建的代价不低,因此也应该尽量复用。

分享到:
评论

相关推荐

    zookeeper-3.4.5,metamorphosis-server-wrapper

    本篇将深入探讨Zookeeper 3.4.5版本以及与其相关的Metamorphosis-Server-Wrapper工具包,旨在帮助读者理解和掌握这两个组件的核心概念、功能及应用场景。 首先,Zookeeper是Apache软件基金会的一个开源项目,它提供...

    metamorphosis(metaq)

    3. 消费者:消费者订阅感兴趣的Topic,从MetaQ服务器拉取或接收消息。消费者可以设置为推拉模式,推模式下,服务器主动推送消息;拉模式下,消费者按需主动请求。 4. 消息确认:MetaQ支持消息确认机制,消费者在...

    支付宝内部架构剖析?

    支付宝的开源分布式消息中间件--Metamorphosis(MetaQ) Metamorphosis (MetaQ) 是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,...

    strom的jar包

    strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的...

    Metaq详细手册.docx

    3. **客户端消费状态保存**:消费状态保存在客户端,允许消费者在断线后能从上次断点继续消费,无需重新读取已处理的消息,提升了效率。 4. **分布式架构**:Metaq支持分布式部署,生产者、服务器和消费者都可以...

    Metamorphosis

    3. **提高代码复用**:由于UI外观的变化可以通过配置而非代码实现,这使得代码更加简洁,易于维护,也减少了重复工作。 4. **增强用户体验**:通过提供多种视觉样式,用户可以根据个人喜好选择界面,提升其在使用...

    metaq消息中间件服务端、客户端资源汇集

    Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。

    淘宝消息队列metamorphosis使用

    淘宝消息队列metamorphosis使用,下载java项目后查看readme.txt文本配置zookeeper,metaq后在测试代码修改下自己的配置即可执行demo,可以通过生产者生产消息,消费者消费消息

    客户端使用例子1

    - `metamorphosis-client-extension`:包含客户端扩展功能,如Log4j Appender。 请注意,如果在构建过程中遇到问题,请检查是否已将客户端库发布到自己的Maven仓库。 使用异步单向和Log4j发送的场景: 当对消息的...

    Metamorphosis介绍.pptx

    Metamorphosis介绍 Metamorphosis是一个分布式publish-subscribe消息系统,作为开源的MQ系统,相比于Kafka,它有着不同的设计原则和实现方式。下面是Metamorphosis的详细介绍: 设计原则 Metamorphosis的设计原则...

    淘宝团队自己开发的中间件在ubuntu上搭建的过程

    主工程的打包命令是`mvn -U -Dtest -DfailIfNoTests=false clean install package assembly:assembly`,这将生成所有需要的服务端文件,包括`taobao-metamorphosis-server-wrapper.dir`或不带扩展的`taobao-...

    打包部署1

    生成的服务端所需内容会出现在目标目录下的`taobao-metamorphosis-server-wrapper.dir`或不带扩展名的`taobao-metamorphosis-server.dir`。 - **独立工程打包**:对于其他独立工程,进入其目录,同样执行`mvn -U -...

    Metamorphosis, 一种高可用高性能的分布式.zip

    Metamorphosis, 一种高可用高性能的分布式 #新闻MetaQ 1.4.6.2 发布。更新日志MetaQ 1.4.6.1 发布。更新日志MetaQ 1.4.5.1 发布。更新日志MetaQ 1.4.5发布。更新日志meta: 一个用于的ruby 客户端。 源代码

    morpher_source.tar.gz_Metamorphosis

    3. `results`:可能存放着使用该算法处理后的结果图像,便于用户直观地看到变形效果。 4. `documentation`:这里可能有相关的文档资料,如算法原理介绍、使用教程、API参考等,帮助用户理解和使用源代码。 5. `...

    支付宝之所以牛逼的原因:来看内部架构剖析

    Metamorphosis(MetaQ)是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在...

    支付宝钱包系统架构内部剖析(架构图)

    ### 支付宝钱包系统架构内部剖析 ...其中,Metamorphosis (MetaQ)作为支付宝内部广泛使用的一款分布式消息中间件,凭借其卓越的性能表现和丰富的功能特性,成为了支撑支付宝钱包系统高效运行的重要基石。

    【系统架构】最全最强解析:支付宝钱包系统架构内部剖析(架构图).docx

    3. 交易柔性事务支付宝的开源分布式消息中间件–Metamorphosis(MetaQ):负责处理交易消息的传输和处理,包括消息存储、顺序写、吞吐量大和支持本地和XA事务等功能。 Metamorphosis(MetaQ) 介绍: Metamorphosis...

    Metamorphosis.txt

    One morning, when Gregor Samsa woke from troubled dreams, he found himself transformed in his bed into a horrible vermin. He lay on his armour-like back, and if he lifted his head a little he could ...

    Amphibian metamorphosis. From morphology to molecular biology

    Amphibian metamorphosis. From morphology to molecular biology Molecular metamorphosis Amphibian metamorphosis. From morphology to molecular biology (1999). Yun-Bo Shi. Wiley-Liss, £54.95, hardback...

Global site tag (gtag.js) - Google Analytics