示例源码
源码地址:
Example
消息中间件中有两个角色:消息生产者和消息消费者。Meta里同样有这两个概念,消息生产者负责创建消息并发送到meta服务器,meta服务器会将消息持久化到磁盘,消息消费者从meta服务器拉取消息并提交给应用消费。我们假设你已经部署了你的meta服务器,参见如何开始。
Java客户端例子
推荐你使用maven,引用meta java client非常简单:
<dependency>
<groupId>com.taobao.metamorphosis</groupId>
<artifactId>metamorphosis-client</artifactId>
<version>1.4.6.2</version>
</dependency>
创建一个示例工程,或者直接将metamorphosis-example clone出来到你本地机器。
git clone git://github.com/killme2008/metamorphosis.git
导入metamorphosis-example工程。
请注意,1.4.3及以上版本的java客户端只能连接1.4.3及以上版本的MetaQ服务器,而1.4.3之前的老客户端则没有限制。主要是因为1.4.3引入了发布和订阅topic的分离,1.4.3的新客户端只能查找到新版本的broker
消息会话工厂类
在使用消息生产者和消费者之前,我们需要创建它们,这就需要用到消息会话工厂类——MessageSessionFactory,由这个工厂帮你创建生产者或者消费者。除了这些,MessageSessionFactory还默默无闻地在后面帮你做很多事情,包括:
1.服务的查找和发现,通过diamond和zookeeper帮你查找日常的meta服务器地址列表
2.连接的创建和销毁,自动创建和销毁到meta服务器的连接,并做连接复用,也就是到同一台meta的服务器在一个工厂内只维持一个连接。
3.消息消费者的消息存储和恢复,后续我们会谈到这一点。
4.协调和管理各种资源,包括创建的生产者和消费者的。
因此,我们首先需要创建一个会话工厂类,MessageSessionFactory仅是一个接口,它的实现类是MetaMessageSessionFactory:
MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(new MetaClientConfig());
请注意,MessageSessionFactory应当尽量复用,也就是作为应用中的单例来使用,简单的做法是交给spring之类的容器帮你托管。
消息生产者
翠花,上代码:
package com.taobao.metamorphosis.example;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
public class Producer {
public static void main(String[] args) throws Exception {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
//设置zookeeper地址
zkConfig.zkConnect = "127.0.0.1:2181";
metaClientConfig.setZkConfig(zkConfig);
// New session factory,强烈建议使用单例
MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
// create producer,强烈建议使用单例
MessageProducer producer = sessionFactory.createProducer();
// publish topic
final String topic = "test";
producer.publish(topic);
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line = null;
while ((line = reader.readLine()) != null) {
// send message
SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes()));
// check result
if (!sendResult.isSuccess()) {
System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());
}
else {
System.out.println("Send message successfully,sent to " + sendResult.getPartition());
}
}
}
}
消息生产者的接口是MessageProducer,你可以通过它来发送消息。创建生产者很简单,通过MessageSessionFactory的createProducer方法即可以创建一个生产者。在Meta里,每个消息对象都是Message类的实例,Message表示一个消息对象,它包含这么几个属性:
- id: Long型的消息id,消息的唯一id,系统自动产生,用户无法设置,在发送成功后由服务器返回,发送失败则为0。
- topic: 消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,生产者通过指定发布的topic查找到需要连接的服务器地址,必须。
- data: 消息的有效载荷,二进制数据,也就是消息内容,meta永远不会修改消息内容,你发送出去是什么样子,接收到就是什么样子。消息内容通常限制在1M以内,我的建议是最好不要发送超过上百K的消息,必须。数据是否压缩也完全取决于用户。
- attribute: 消息属性,一个字符串,可选。发送者可设置消息属性来让消费者过滤。
细心的朋友可能注意到,我们在sendMessage之前还调用了MessageProducer的publish(topic)方法
producer.publish(topic);
这一步在发送消息前是必须的,你必须发布你将要发送消息的topic,这是为了让会话工厂帮你去查找接收这些topic的meta服务器地址并初始化连接。这个步骤针对每个topic只需要做一次,多次调用无影响。
总结下这个例子,从标准输入读入你输入的数据,并将数据封装成一个Message对象,发送到topic为test的服务器上。
请注意,MessageProducer是线程安全的,完全可重复使用,因此最好在应用中作为单例来使用,一次创建,到处使用,配置为spring里的singleton bean。MessageProducer创建的代价昂贵,每次都需要通过zk查找服务器并创建tcp长连接。
消息消费者
发送消息后,消费者可以接收消息了,下面的代码创建消费者并订阅test这个主题,等待消息送达并打印消息内容
package com.taobao.metamorphosis.example;
import java.util.concurrent.Executor;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
public class AsyncConsumer {
public static void main(String[] args) throws Exception {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
//设置zookeeper地址
zkConfig.zkConnect = "127.0.0.1:2181";
metaClientConfig.setZkConfig(zkConfig);
// New session factory,强烈建议使用单例
MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
// subscribed topic
final String topic = "test";
// consumer group
final String group = "meta-example";
// create consumer,强烈建议使用单例
MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
// subscribe topic
consumer.subscribe(topic, 1024 * 1024, new MessageListener() {
public void recieveMessages(Message message) {
System.out.println("Receive message " + new String(message.getData()));
}
public Executor getExecutor() {
// Thread pool to process messages,maybe null.
return null;
}
});
// complete subscribe
consumer.completeSubscribe();
}
}
通过createConsumer方法来创建MessageConsumer,注意到我们传入一个ConsumerConfig参数,这是消费者的配置对象。每个消息者都必须有一个ConsumerConfig配置对象,我们这里只设置了group属性,这是消费者的分组名称。Meta的Producer、Consumer和Broker都可以为集群。消费者可以组成一个集群共同消费同一个topic,发往这个topic的消息将按照一定的负载均衡规则发送给集群里的一台机器。同一个消费者集群必须拥有同一个分组名称,也就是同一个group。我们这里将分组名称设置为meta-example。
订阅消息通过subscribe方法,这个方法接受三个参数
- topic,订阅的主题
- maxSize,因为meta是一个消费者主动拉取的模型,这个参数规定每次拉取的最大数据量,单位为字节,这里设置为1M,默认最大为1M。
- MessageListener,消息监听器,负责消息消息。
MessageListener的接口方法如下:
public interface MessageListener {
/**
* 接收到消息列表,只有message不为空并且不为null的情况下会触发此方法
*
* @param message
*/
public void recieveMessages(Message message);
/**
* 处理消息的线程池
*
* @return
*/
public Executor getExecutor();
}
消息的消费过程可以是一个并发处理的过程,getExecutor返回你想设置的线程池,每次消费都会在这个线程池里进行。recieveMessage方法用于实际的消息消费处理,message参数即为消费者收到的消息,它必不为null。
我们这里简单地打印收到的消息内容就完成消费。如果在消费过程中抛出任何异常,该条消息将会在一定间隔后重新尝试提交给MessageListener消费。在多次消费失败的情况下,该消息将会存储到消费者应用的本次磁盘,并在后台自动恢复重试消费。
细心的你一定还注意到,在调用subscribe之后,我们还调用了completeSubscribe方法来完成订阅过程。请注意,subscribe仅是将订阅信息保存在本地,并没有实际跟meta服务器交互,要使得订阅关系生效必须调用一次completeSubscribe,completeSubscribe仅能被调用一次,多次调用将抛出异常。 为什么需要completeSubscribe方法呢,原因有二:
- 首先,subscribe方法可以被调用多次,也就是一个消费者可以消费多种topic
- 其次,如果每次调用subscribe都跟zk和meta服务器交互一次,代价太高
因此completeSubscribe一次性将所有订阅的topic生效,并处理跟zk和meta服务器交互的所有过程。
同样,MessageConsumer也是线程安全的,创建的代价不低,因此也应该尽量复用。
来自淘宝
相关推荐
在深入探究阿里消息中间件MetaQ学习...阿里消息中间件MetaQ学习Demo不仅仅是一个简单的示例程序,它是帮助开发者快速学习和掌握MetaQ使用的一个有力工具,同时也是深入理解消息中间件在分布式系统中作用的重要一步。
阿里云ONS(消息队列服务)是基于阿里云提供的分布式的、高可靠性的消息服务产品,它来源于阿里内部广泛使用的消息中间件MetaQ,亦即后来的开源项目RocketMQ。ONS支持海量消息的生产与消费,以无单点故障、高可扩展...
它提供了多种服务,如配置维护、名字服务、分布式同步、组服务等,旨在封装那些复杂且容易出错的关键任务,并向用户提供简单易用的接口和性能高效、功能稳定的系统。Zookeeper 已成为 Hadoop 生态系统中的基础组件。...
ZooKeeper提供了简单易用的API,使得开发者能够方便地构建各种分布式应用程序和服务。本文将详细探讨ZooKeeper在实际项目中的多种应用场景。 #### 二、数据发布与订阅(配置中心) 在分布式系统中,数据发布与订阅...
少儿编程scratch项目源代码文件案例素材-绝地求生.zip
嵌入式八股文面试题库资料知识宝典-文思创新面试题2010-04-08.zip
一种基于剪切波和特征信息检测的太阳斑点图融合算法.pdf
内容概要:本文详细介绍了并联型有源电力滤波器(APF)在Matlab/Simulink环境下的仿真研究。主要内容涵盖三个关键技术点:一是dq与αβ坐标系下的谐波和无功检测,利用dq变换和FBD技术实现实时检测;二是两相旋转坐标系(dq)与两相静止坐标系(αβ)下的PI控制,通过调整比例和积分环节实现精准控制;三是SVPWM调制方式的应用,通过优化开关时序提升系统效率和性能。文中还提供了详细的仿真介绍文档,包括模型搭建、参数设定以及结果分析。 适合人群:从事电力电子、自动化控制领域的研究人员和技术人员,尤其是对电力滤波器仿真感兴趣的读者。 使用场景及目标:适用于需要深入了解并联型APF工作原理和实现方式的研究人员,旨在通过仿真工具掌握谐波和无功检测、PI控制及SVPWM调制的具体应用。 其他说明:本文不仅提供了理论知识,还结合了实际操作步骤,使读者能够通过仿真模型加深对APF的理解。
Arduino KEY实验例程,开发板:正点原子EPS32S3,本人主页有详细实验说明可供参考。
嵌入式八股文面试题库资料知识宝典-嵌入式C语言面试题汇总(66页带答案).zip
.archivetempdebug.zip
嵌入式系统开发_CH551单片机_USB_HID复合设备模拟_基于CH551单片机的USB键盘鼠标复合设备模拟器项目_用于通过CH551微控制器模拟USB键盘和鼠标输入设备_实现硬
少儿编程scratch项目源代码文件案例素材-剑客冲刺.zip
少儿编程scratch项目源代码文件案例素材-火影.zip
内容概要:本文详细介绍了两极式单相光伏并网系统的组成及其仿真优化方法。前级采用Boost电路结合扰动观察法(P&O)进行最大功率点跟踪(MPPT),将光伏板输出电压提升至并网所需水平;后级利用全桥逆变加L型滤波以及电压外环电流内环控制,确保并网电流与电网电压同频同相,实现高效稳定的并网传输。文中还提供了具体的仿真技巧,如开关频率设置、L滤波参数计算和并网瞬间软启动等,最终实现了98.2%的系统效率和低于0.39%的总谐波失真率(THD)。 适合人群:从事光伏并网系统研究、设计和开发的技术人员,特别是对Boost电路、MPPT算法、逆变技术和双环控制系统感兴趣的工程师。 使用场景及目标:适用于希望深入了解两极式单相光伏并网系统的工作原理和技术细节的研究人员和工程师。目标是在实际项目中应用这些理论和技术,提高光伏并网系统的效率和稳定性。 其他说明:文中提供的仿真技巧和伪代码有助于读者更好地理解和实现相关算法,在实践中不断优化系统性能。同时,注意电网电压跌落时快速切换到孤岛模式的需求,确保系统的安全性和可靠性。
矢量边界,行政区域边界,精确到乡镇街道,可直接导入arcgis使用
嵌入式八股文面试题库资料知识宝典-嵌入式c面试.zip
嵌入式八股文面试题库资料知识宝典-I2C总线.zip
内容概要:本文详细介绍了三种注浆模型——随机裂隙网络注浆模型、基于两相达西定律的注浆模型、基于层流和水平集的注浆扩散模型。首先,随机裂隙网络注浆模型基于地质学原理,模拟裂隙网络发育的实际地质情况,在不同注浆压力下进行注浆作业,以增强地基稳定性和提高承载能力。其次,基于两相达西定律的注浆模型利用数学公式模拟裂隙网络中的流体输送过程,适用于裂隙网络地质条件下的注浆效果分析。最后,基于层流和水平集的注浆扩散模型通过引入层流特性和水平集方法,更准确地模拟注浆过程中的扩散过程。文中还讨论了不同注浆压力对注浆效果的影响,并提出了优化建议。 适合人群:从事岩土工程、地基加固等相关领域的工程师和技术人员。 使用场景及目标:①帮助工程师选择合适的注浆模型和注浆压力;②为实际工程项目提供理论支持和技术指导;③提升地基加固的效果和效率。 其他说明:文章强调了在实际应用中需要结合地质条件、裂隙网络特点等因素进行综合分析,以达到最佳注浆效果。同时,鼓励不断创新注浆工艺和方法,以满足日益增长的地基加固需求。
内容概要:本文详细比较了COMSOL Multiphysics软件5.5和6.0版本在模拟Ar棒板粗通道流注放电现象方面的异同。重点探讨了不同版本在处理电子密度、电子温度、电场强度以及三维视图等方面的优缺点。文中不仅介绍了各版本特有的操作方式和技术特点,还提供了具体的代码实例来展示如何进行精确的仿真设置。此外,文章还讨论了网格划分、三维数据提取和电场强度后处理等方面的技术难点及其解决方案。 适合人群:从事等离子体物理研究的专业人士,尤其是熟悉COMSOL Multiphysics软件并希望深入了解其最新特性的研究人员。 使用场景及目标:帮助用户选择合适的COMSOL版本进行高效、精确的等离子体仿真研究,特别是在处理复杂的Ar棒板粗通道流注放电现象时提供指导。 其他说明:文章强调了在实际应用中,选择COMSOL版本不仅要考虑便捷性和视觉效果,还需兼顾仿真精度和可控性。