示例源码
源码地址:
Example
消息中间件中有两个角色:消息生产者和消息消费者。Meta里同样有这两个概念,消息生产者负责创建消息并发送到meta服务器,meta服务器会将消息持久化到磁盘,消息消费者从meta服务器拉取消息并提交给应用消费。我们假设你已经部署了你的meta服务器,参见如何开始。
Java客户端例子
推荐你使用maven,引用meta java client非常简单:
<dependency>
<groupId>com.taobao.metamorphosis</groupId>
<artifactId>metamorphosis-client</artifactId>
<version>1.4.6.2</version>
</dependency>
创建一个示例工程,或者直接将metamorphosis-example clone出来到你本地机器。
git clone git://github.com/killme2008/metamorphosis.git
导入metamorphosis-example工程。
请注意,1.4.3及以上版本的java客户端只能连接1.4.3及以上版本的MetaQ服务器,而1.4.3之前的老客户端则没有限制。主要是因为1.4.3引入了发布和订阅topic的分离,1.4.3的新客户端只能查找到新版本的broker
消息会话工厂类
在使用消息生产者和消费者之前,我们需要创建它们,这就需要用到消息会话工厂类——MessageSessionFactory,由这个工厂帮你创建生产者或者消费者。除了这些,MessageSessionFactory还默默无闻地在后面帮你做很多事情,包括:
1.服务的查找和发现,通过diamond和zookeeper帮你查找日常的meta服务器地址列表
2.连接的创建和销毁,自动创建和销毁到meta服务器的连接,并做连接复用,也就是到同一台meta的服务器在一个工厂内只维持一个连接。
3.消息消费者的消息存储和恢复,后续我们会谈到这一点。
4.协调和管理各种资源,包括创建的生产者和消费者的。
因此,我们首先需要创建一个会话工厂类,MessageSessionFactory仅是一个接口,它的实现类是MetaMessageSessionFactory:
MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(new MetaClientConfig());
请注意,MessageSessionFactory应当尽量复用,也就是作为应用中的单例来使用,简单的做法是交给spring之类的容器帮你托管。
消息生产者
翠花,上代码:
package com.taobao.metamorphosis.example;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
public class Producer {
public static void main(String[] args) throws Exception {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
//设置zookeeper地址
zkConfig.zkConnect = "127.0.0.1:2181";
metaClientConfig.setZkConfig(zkConfig);
// New session factory,强烈建议使用单例
MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
// create producer,强烈建议使用单例
MessageProducer producer = sessionFactory.createProducer();
// publish topic
final String topic = "test";
producer.publish(topic);
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line = null;
while ((line = reader.readLine()) != null) {
// send message
SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes()));
// check result
if (!sendResult.isSuccess()) {
System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());
}
else {
System.out.println("Send message successfully,sent to " + sendResult.getPartition());
}
}
}
}
消息生产者的接口是MessageProducer,你可以通过它来发送消息。创建生产者很简单,通过MessageSessionFactory的createProducer方法即可以创建一个生产者。在Meta里,每个消息对象都是Message类的实例,Message表示一个消息对象,它包含这么几个属性:
- id: Long型的消息id,消息的唯一id,系统自动产生,用户无法设置,在发送成功后由服务器返回,发送失败则为0。
- topic: 消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,生产者通过指定发布的topic查找到需要连接的服务器地址,必须。
- data: 消息的有效载荷,二进制数据,也就是消息内容,meta永远不会修改消息内容,你发送出去是什么样子,接收到就是什么样子。消息内容通常限制在1M以内,我的建议是最好不要发送超过上百K的消息,必须。数据是否压缩也完全取决于用户。
- attribute: 消息属性,一个字符串,可选。发送者可设置消息属性来让消费者过滤。
细心的朋友可能注意到,我们在sendMessage之前还调用了MessageProducer的publish(topic)方法
producer.publish(topic);
这一步在发送消息前是必须的,你必须发布你将要发送消息的topic,这是为了让会话工厂帮你去查找接收这些topic的meta服务器地址并初始化连接。这个步骤针对每个topic只需要做一次,多次调用无影响。
总结下这个例子,从标准输入读入你输入的数据,并将数据封装成一个Message对象,发送到topic为test的服务器上。
请注意,MessageProducer是线程安全的,完全可重复使用,因此最好在应用中作为单例来使用,一次创建,到处使用,配置为spring里的singleton bean。MessageProducer创建的代价昂贵,每次都需要通过zk查找服务器并创建tcp长连接。
消息消费者
发送消息后,消费者可以接收消息了,下面的代码创建消费者并订阅test这个主题,等待消息送达并打印消息内容
package com.taobao.metamorphosis.example;
import java.util.concurrent.Executor;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
public class AsyncConsumer {
public static void main(String[] args) throws Exception {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
//设置zookeeper地址
zkConfig.zkConnect = "127.0.0.1:2181";
metaClientConfig.setZkConfig(zkConfig);
// New session factory,强烈建议使用单例
MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
// subscribed topic
final String topic = "test";
// consumer group
final String group = "meta-example";
// create consumer,强烈建议使用单例
MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
// subscribe topic
consumer.subscribe(topic, 1024 * 1024, new MessageListener() {
public void recieveMessages(Message message) {
System.out.println("Receive message " + new String(message.getData()));
}
public Executor getExecutor() {
// Thread pool to process messages,maybe null.
return null;
}
});
// complete subscribe
consumer.completeSubscribe();
}
}
通过createConsumer方法来创建MessageConsumer,注意到我们传入一个ConsumerConfig参数,这是消费者的配置对象。每个消息者都必须有一个ConsumerConfig配置对象,我们这里只设置了group属性,这是消费者的分组名称。Meta的Producer、Consumer和Broker都可以为集群。消费者可以组成一个集群共同消费同一个topic,发往这个topic的消息将按照一定的负载均衡规则发送给集群里的一台机器。同一个消费者集群必须拥有同一个分组名称,也就是同一个group。我们这里将分组名称设置为meta-example。
订阅消息通过subscribe方法,这个方法接受三个参数
- topic,订阅的主题
- maxSize,因为meta是一个消费者主动拉取的模型,这个参数规定每次拉取的最大数据量,单位为字节,这里设置为1M,默认最大为1M。
- MessageListener,消息监听器,负责消息消息。
MessageListener的接口方法如下:
public interface MessageListener {
/**
* 接收到消息列表,只有message不为空并且不为null的情况下会触发此方法
*
* @param message
*/
public void recieveMessages(Message message);
/**
* 处理消息的线程池
*
* @return
*/
public Executor getExecutor();
}
消息的消费过程可以是一个并发处理的过程,getExecutor返回你想设置的线程池,每次消费都会在这个线程池里进行。recieveMessage方法用于实际的消息消费处理,message参数即为消费者收到的消息,它必不为null。
我们这里简单地打印收到的消息内容就完成消费。如果在消费过程中抛出任何异常,该条消息将会在一定间隔后重新尝试提交给MessageListener消费。在多次消费失败的情况下,该消息将会存储到消费者应用的本次磁盘,并在后台自动恢复重试消费。
细心的你一定还注意到,在调用subscribe之后,我们还调用了completeSubscribe方法来完成订阅过程。请注意,subscribe仅是将订阅信息保存在本地,并没有实际跟meta服务器交互,要使得订阅关系生效必须调用一次completeSubscribe,completeSubscribe仅能被调用一次,多次调用将抛出异常。 为什么需要completeSubscribe方法呢,原因有二:
- 首先,subscribe方法可以被调用多次,也就是一个消费者可以消费多种topic
- 其次,如果每次调用subscribe都跟zk和meta服务器交互一次,代价太高
因此completeSubscribe一次性将所有订阅的topic生效,并处理跟zk和meta服务器交互的所有过程。
同样,MessageConsumer也是线程安全的,创建的代价不低,因此也应该尽量复用。
来自淘宝
相关推荐
在深入探究阿里消息中间件MetaQ学习...阿里消息中间件MetaQ学习Demo不仅仅是一个简单的示例程序,它是帮助开发者快速学习和掌握MetaQ使用的一个有力工具,同时也是深入理解消息中间件在分布式系统中作用的重要一步。
阿里云ONS(消息队列服务)是基于阿里云提供的分布式的、高可靠性的消息服务产品,它来源于阿里内部广泛使用的消息中间件MetaQ,亦即后来的开源项目RocketMQ。ONS支持海量消息的生产与消费,以无单点故障、高可扩展...
它提供了多种服务,如配置维护、名字服务、分布式同步、组服务等,旨在封装那些复杂且容易出错的关键任务,并向用户提供简单易用的接口和性能高效、功能稳定的系统。Zookeeper 已成为 Hadoop 生态系统中的基础组件。...
ZooKeeper提供了简单易用的API,使得开发者能够方便地构建各种分布式应用程序和服务。本文将详细探讨ZooKeeper在实际项目中的多种应用场景。 #### 二、数据发布与订阅(配置中心) 在分布式系统中,数据发布与订阅...
内容概要:本文详细介绍了8轴插补运动控制系统的实现,重点探讨了双DMA技术的应用,实现了高频率脉冲输出(最高可达500kHz)。文中首先解释了双DMA的工作原理及其相对于传统脉冲输出方式的优势,即减少CPU负载并提高数据传输速率。接着阐述了8轴插补算法的设计思想,包括基于时间分割的方法来确定各轴在特定时间段内的脉冲数。此外,还讨论了加减速控制策略,尤其是S型加减速算法的应用,以确保运动的平顺性。最后,文章展示了具体的代码实现细节,涵盖DMA配置、插补算法、加减速控制等方面。 适合人群:从事运动控制系统开发的技术人员,尤其是对嵌入式系统有一定了解的研发人员。 使用场景及目标:适用于需要高精度、高频脉冲输出的工业应用场景,如工业机器人、3D打印、激光切割等。目标是帮助开发者理解和掌握8轴插补运动控制的关键技术和实现方法,从而应用于实际项目中。 其他说明:文中提供的代码示例主要基于STM32系列单片机,但相关概念和技术可以迁移至其他平台。同时,强调了硬件细节处理的重要性,如RC滤波电路的应用,以应对实际工程中的常见问题。
2303040222橡胶232熊文栋(苯乙烯悬浮聚合)副本.pdf
内容概要:本文详细介绍了音乐喷泉的设计与制作过程,涵盖了从原理图绘制到具体代码实现的各个方面。首先介绍了Altium Designer这款强大的电子设计软件,接着展示了如何利用现有文件进行设计,包括水泵控制、灯光效果和音乐解析三大核心模块的具体实现方法。文中提供了多个代码片段,如单片机控制喷头升降、PWM调速控制水泵以及灯光效果同步音乐节奏等。同时,强调了在实际制作过程中需要注意的问题,如焊接温度、布线规划、元件选择等。此外,还分享了一些实用技巧和经验教训,帮助读者更好地理解和应用相关知识。 适合人群:对电子设计感兴趣的爱好者、初学者以及有一定基础的电子工程师。 使用场景及目标:适用于希望深入了解音乐喷泉工作原理和技术实现的人群,目标是掌握如何使用Altium Designer完成音乐喷泉的电路设计,并能够编写相应的控制代码。 其他说明:文章不仅提供了详细的理论讲解,还包括了许多实战经验和技巧,有助于读者在实践中少走弯路。
内容概要:本文详细介绍了汽车主动悬架系统的工作原理及其参数仿真的方法。首先解释了主动悬架的基本概念,即它可以根据车辆行驶状态和路面情况进行实时调整,提高行车安全性和舒适度。接着展示了如何利用简化的单自由度模型进行参数设置并进行仿真,具体涉及到了动力学方程、状态空间模型以及PID控制器的设计。此外还提到了更高级别的LQR控制器的应用,并强调了实际应用中需要注意的问题,如执行器响应延迟、物理限制等。文中通过实例演示了被动悬架与主动悬架在面对相同路面输入时的不同表现,突出了主动控制系统的优势。同时,针对传感器噪声处理、卡尔曼滤波器的使用、PWM信号生成等方面进行了深入探讨,揭示了主动悬架背后的复杂技术和工程挑战。 适用人群:对汽车工程特别是悬架系统感兴趣的研究人员和技术爱好者。 使用场景及目标:帮助读者理解主动悬架的工作机制,掌握基本的建模和仿真技能,为进一步开展相关领域的研究提供理论支持和技术指导。 其他说明:文中不仅提供了详细的数学推导和代码片段,还分享了许多实践经验,使读者能够全面地了解主动悬架系统的各个方面。
(3)请修改代码,解决临界区问题。解决后,无论如何运行,counter值均输出0
少儿编程scratch项目源代码文件案例素材-Mc v2.zip
内容概要:本文详细介绍了将Carsim与Simulink联合用于十四自由度车辆动力学模型的构建与验证过程。文中首先概述了整车架构的模块化分解方法,接着深入探讨了各个子系统的具体实现细节,如转向系统、轮胎模型、悬架子系统以及PI驾驶员控制器的设计与调优。针对联合仿真过程中遇到的关键问题,如采样率同步、参数调优、模型验证等进行了详细的讨论,并提供了具体的解决方案和技术技巧。通过对多种典型工况(如阶跃转向、正弦油门、双移线等)的仿真测试,验证了所建立模型的有效性和准确性。 适合人群:从事车辆动力学研究、汽车仿真领域的工程师和技术人员,尤其是那些希望深入了解Carsim与Simulink联合仿真的从业者。 使用场景及目标:适用于需要进行复杂车辆动力学仿真和模型验证的研究机构或企业。主要目标是提高仿真精度,缩短开发周期,确保模型能够准确反映实际车辆行为。此外,还可以作为教学材料帮助学生掌握先进的车辆建模技术和仿真工具。 其他说明:文中不仅分享了大量的实战经验和技巧,还附带了完整的源代码和详细的调试记录,对于想要深入理解和应用这一技术的人来说非常有价值。
内容概要:本文探讨了基于雨流计数法的源-荷-储双层协同优化配置,旨在提高能源系统的效率和经济性。文中介绍了双层优化架构,即外层优化储能系统的功率和容量,内层优化储能系统的充放电曲线并评估其寿命。通过Python代码示例展示了具体的实现过程,包括外层和内层优化的具体步骤以及雨流计数法的应用。此外,文章还讨论了常见的调试问题及解决方案,强调了内外层变量之间的相互影响。 适合人群:从事能源系统优化的研究人员和技术人员,尤其是对储能系统优化感兴趣的读者。 使用场景及目标:适用于需要进行源-荷-储系统优化的实际工程项目,如光伏电站、风力发电站等。目标是通过合理的储能配置,延长储能系统的使用寿命,降低成本,提高经济效益。 其他说明:文章提供了详细的代码示例和理论解释,帮助读者更好地理解和应用这一优化方法。同时提醒读者,在实际应用中需要注意数据的准确性以及参数的选择。
很多盗版PCI卡都在用的雕刻机控制程序
内容概要:本文详细介绍了三机并联的风光储混合系统在Matlab中的仿真方法及其关键技术。首先,针对光伏阵列模型,讨论了其核心二极管方程以及MPPT(最大功率点跟踪)算法的应用,强调了环境参数对输出特性的影响。接着,探讨了永磁同步风机的矢量控制,尤其是转速追踪和MPPT控制策略。对于混合储能系统,则深入讲解了超级电容和蓄电池的充放电策略,以及它们之间的协调机制。此外,还涉及了PQ控制的具体实现,包括双闭环结构的设计和锁相环的优化。最后,提供了仿真过程中常见的问题及解决方案,如求解器选择、参数敏感性和系统稳定性等。 适合人群:从事电力电子、新能源系统设计与仿真的工程师和技术人员,以及相关专业的研究生。 使用场景及目标:适用于希望深入了解风光储混合系统工作原理的研究人员,旨在帮助他们掌握Matlab仿真技巧,提高系统设计和优化的能力。 其他说明:文中不仅提供了详细的理论推导和代码示例,还分享了许多实践经验,有助于读者更好地理解和应用所学知识。
内容概要:本文详细介绍了基于NGSIM数据对Wiedemann99跟驰模型进行参数标定的过程。作者使用Matlab编写代码,实现了数据读取与预处理、Wiedemann99模型定义、拟合优度函数(RMSPE)计算以及改进粒子群算法(IPSO)。通过这些步骤,成功地对标定了Wiedemann99模型的关键参数,并对其进行了性能评估。文中不仅展示了具体的代码实现细节,还探讨了参数选择、算法改进等方面的经验教训。 适合人群:从事交通工程、智能交通系统研究的专业人士,尤其是那些对车辆跟驰行为建模感兴趣的科研工作者和技术开发者。 使用场景及目标:适用于需要精确模拟车辆跟驰行为的研究项目,如交通流量仿真、自动驾驶测试等。目标是提高模型的准确性和可靠性,以便更好地理解和预测真实的道路交通状况。 其他说明:文章提供了详细的代码片段和理论背景介绍,有助于读者深入理解整个标定流程。同时,作者分享了一些实用的小技巧,如参数敏感度分析、适应度函数设计等,对于相关领域的研究人员具有较高的参考价值。
内容概要:本文为中国信息通信研究院发布的《2024年大模型落地路线图研究报告》,旨在梳理大模型应用落地的共性需求和关键要素,为大模型赋能各行业提供参考。报告重点介绍了大模型应用落地的四个重要阶段——现状诊断、能力建设、应用部署、运营管理,归纳了八个关键步骤,包括能力分析、需求挖掘、方案设计、研发测试、应用开发、效能评估、运维监测和运营管理。报告详细分析了大模型在基础设施、数据资源、算法模型、应用服务、安全可信五个层面应重点关注的发展要素和亟待解决的问题。此外,报告还探讨了大模型在金融、工业、教育、医疗、政务等行业的具体应用场景及其带来的降本增效、提质增效等优势。最后,报告展望了大模型的发展趋势,强调了架构优化、行业数字化转型和可信发展的必要性。 适合人群:具备一定技术背景,特别是从事人工智能、大数据、云计算等领域工作的研发人员、管理人员和技术决策者。 使用场景及目标:①帮助企业和机构评估自身大模型应用的基础条件,明确业务转型需求;②指导大模型建设方案的设计和实施,确保技术选型的科学性和合理性;③提供应用部署和效能评估的具体方法,确保大模型在实际应用中的稳定性和高效性;④建立健全大模型的运营管理体系,保障业务的高效稳定开展。 其他说明:报告强调了大模型在推动各行业数字化转型中的重要作用,提出了未来大模型发展的重点方向,如架构优化、技术应用和可信发展。报告还呼吁社会各界共同关注大模型的安全可信问题,确保其与人类价值观的对齐,推动大模型的健康发展。
少儿编程scratch项目源代码文件案例素材-Scratch泡泡龙.zip
软考初级程序员是中国计算机技术与软件专业技术资格(水平)考试中的一个重要级别,主要面向打算进入IT行业的初学者或初级程序员。这个级别的考试旨在测试考生的基础编程能力、计算机基础知识以及解决问题的能力。历年真题是备考的重要参考资料,可以帮助考生了解考试的题型、难度以及考点。 在"软考初级程序员09-18年真题及答案解析"的压缩包中,包含了从2009年至2018年上半年的所有程序员考试真题。这些真题涵盖了多个方面,包括但不限于: 1. **基础编程语言**:如C语言、Java、Python等,主要考察基本语法、数据类型、控制结构、函数使用等方面。 2. **数据结构与算法**:如数组、链表、栈、队列、树、图等,以及排序算法(冒泡、选择、插入、快速、归并等)和查找算法(线性查找、二分查找等)。 3. **计算机系统知识**:包括计算机组成原理、操作系统、网络基础知识,例如CPU结构、内存管理、进程与线程、网络协议等。 4. **数据库基础**:SQL语言的基本操作,如增删改查、子查询、联接操作、索引等。 5. **软件工程与项目管理**:软件生命周期、需求分析、设计原则、测试方法、版本控制等。 6. **法律法规与职业道德**:涉及知识产权、合同法、信息安全与隐私保护等。 每份真题后的答案解析部分,是对题目答案的详细解释,通常包括解题思路、关键步骤以及知识点的扩展。通过阅读解析,考生不仅能知道自己答案的正确与否,还能深入理解相关知识点,提高自己的分析和解决问题的能力。 在准备软考初级程序员考试时,考生应充分利用这些真题资源,进行模拟练习,掌握各类题目的解答技巧。同时,考生还需要广泛阅读教材,补充相关知识,提高对理论的理解。此外,多做编程实践,提高实际编程能力,也是非常重要的。 总结来说,这个压缩包是备考软考初级程序员的宝贵资料,它能帮助考生熟悉考试形式,了解重
内容概要:本文详细介绍了如何在Zynq扩展口上使用FPGA和W5500实现稳定的TCP网络通信。作者通过一系列实验和技术手段,解决了多个实际问题,最终实现了零丢包的数据回环处理。主要内容包括:硬件搭建(SPI接口配置)、数据回环处理(双时钟域流水线)、压力测试(信号抓波形和防抖处理)、多路复用扩展以及上位机测试脚本的编写。文中提供了大量Verilog代码片段,展示了具体实现细节。 适合人群:具备一定FPGA开发经验的工程师,尤其是对TCP/IP协议栈感兴趣的嵌入式系统开发者。 使用场景及目标:适用于需要高性能、低延迟网络通信的应用场景,如工业控制系统、实时数据采集等。目标是帮助读者掌握在FPGA上实现高效TCP通信的方法和技术。 其他说明:文章不仅提供了详细的代码实现,还分享了许多实践经验,如SPI时钟优化、CS信号防抖、FIFO深度选择等。此外,作者还讨论了未来可能的改进方向,如UDP组播和QoS优先级控制。
内容概要:本文探讨了在汽车动力学研究和自动驾驶领域中,使用无迹扩展卡尔曼滤波(UKF/EKF)在Matlab/Simulink环境中对路面附着系数进行估计的方法。文中介绍了选择Matlab/Simulink的原因及其强大功能,详细解析了7自由度整车模型的构建,以及UKF和EKF的具体实现方式。UKF通过非线性处理和sigma点传播概率分布,适用于复杂工况;EKF则通过线性化处理,更适合计算资源有限的场景。两者在不同路面条件下表现出各自的优劣,如UKF在突变路面下表现更好,而EKF在不变路面上效率更高。此外,还讨论了调参技巧、工程实现细节及实际测试结果。 适用人群:从事汽车动力学研究、自动驾驶技术研发的专业人士,尤其是对非线性滤波算法感兴趣的研究人员和技术开发者。 使用场景及目标:①用于车辆稳定性控制系统中,提高行驶安全性;②优化滤波算法性能,平衡精度与实时性;③为复杂工况下的路面附着系数估计提供解决方案。 其他说明:文章不仅提供了理论分析,还包括大量代码示例和实践经验分享,有助于读者深入理解和实际应用。