(1)生产者
//局部有序消息
public class Producer2 {
public static void main(String[] args) throws Exception{
Producer2 p = new Producer2();
p.sendMessage();
}
// 发送消息
void sendMessage() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_test");
// 指定NameServer地址
producer.setNamesrvAddr("192.168.191.133:9876"); // 多个ip以分号分隔
producer.start();
for (int i = 0; i < 20; i++) {
int orderId = i % 10;
// 自定义一个tag数组
String[] tags = new String[]{"orders", "money", "goods"};
// 构建消息
Message msg = new Message("MyTopicOrder",
tags[i % tags.length],
"KEY"+i,
("hi " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送10条消息到Topic为MyTopicOrder,tag为tags数组按顺序取值,
// key值为“KEY”拼接上i的值,消息内容为“Hi”拼接上i的值
SendResult sendResult = producer.send(msg,
new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message m, Object arg1) {
// arg的值其实就是orderId
Integer id = (Integer) arg1;
// mqs是队列集合,也就是topic所对应的所有队列
int index = id % mqs.size();
// 这里根据前面的id对队列集合大小求余来返回所对应的队列
return mqs.get(index);
}
},
orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
(2)消费者-有序消费
//消息有序消费:先启动消费者,再启动生产者
public class Consumer2 {
public static void main(String[] args) throws Exception{
Consumer2 c = new Consumer2();
c.receiveMessage();
}
void receiveMessage() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_test");
// 指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr("192.168.191.133:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置consumer所订阅的Topic和Tag
consumer.subscribe("MyTopicOrder", "orders || money || goods");
consumer.registerMessageListener(
new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
String msgbody;
try {
msgbody = new String(msg.getBody(), "utf-8");
System.out.println("接收消息: " + msgbody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//返回消费状态
//SUCCESS 消费成功
//SUSPEND_CURRENT_QUEUE_A_MOMENT 消费失败,暂停当前队列的消费
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("consumer start..%n");
}
}
分享到:
相关推荐
RocketMQ是一款开源的消息中间件,由阿里巴巴开发并贡献给Apache基金会,它在分布式系统中扮演着重要的角色,尤其是在大规模并发、高可用性以及消息可靠性方面。本文将深入探讨如何通过自定义Selector实现在RocketMQ...
- **消息顺序**:对于需要保持消息顺序的场景,RocketMQ提供了有序消息的支持。 4. ** Demo分析 ** 在"rocketmq-demo"中,可能包含以下内容: - 一个简单的命令行界面,用户输入IP和端口后,连接到RocketMQ...
《RocketMQ 开发指南》是一本详尽介绍Apache RocketMQ这一高效消息中间件的书籍,旨在帮助开发者理解和掌握其核心概念、使用方法以及最佳实践。RocketMQ,作为阿里巴巴开源的分布式消息中间件,广泛应用于大数据处理...
RocketMQ 消息丢失解决方案:事务消息 在本文中,我们将讨论如何解决 RocketMQ 中的消息丢失问题,特别是在生产者发送消息到 MQ 过程中的消息丢失问题。我们将介绍 RocketMQ 的事务消息机制,如何使用事务消息来...
MQTT 协议是Android 系统中消息推送的实现技术之一, 由于其具有低功耗、节省流量和可扩展性强的优点, 目前已得到了众多应用. 同时, RocketMQ 作为一种分布式消息队列, 在服务器分布式部署上具有很大优势, 具有高...
SpringBoot集成RocketMQ实现事务消息 在分布式系统中,事务消息是一种重要的机制,它能够确保在分布式环境下的数据一致性。SpringBoot与RocketMQ的结合,为开发者提供了方便地处理事务消息的能力,从而解决业务场景...
在使用Apache RocketMQ时,确保消息的不丢失和顺序性是维持系统稳定性和可靠性的重要方面。RocketMQ提供了多种策略来解决这些问题。 一、确保消息不丢失 1. 环节中的丢消息可能性:在消息的生产、传输、存储和消费...
在本文中,我们将深入探讨如何将SpringBoot框架与Apache RocketMQ集成,特别是在处理事务消息方面。RocketMQ是一款高性能、分布式的消息中间件,广泛应用于大型互联网公司,尤其在处理大规模并发和高可靠性的消息...
2 功能强大,覆盖普通消息、定时(延时)消息、事务消息(提交、回滚)等基本场景的发送场景。集群订阅和广播订阅的消费场景。内含多种命令行参数(例如消费位点的调整,消息体大小调整,并发数调整,JVM 参数调优,...
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
RocketMQ是一款高性能、分布式的消息中间件,主要特点是它能确保消息的严格顺序、提供多种消息拉取模式、支持大规模订阅者的水平扩展以及实时的消息订阅机制。它最初是为电商平台设计的,因此特别适合处理高并发和大...
- ** RocketMq之初识消息中间件**:这部分内容可能涵盖了消息中间件的基本概念、工作原理,以及RocketMQ在消息中间件领域中的独特优势。 - **31-Rocketmq特性详解**:深入理解RocketMQ的各项特性,包括高可用、高...
1. **消息队列(Message Queue)**:RocketMQ的核心组件,它是一个存储和转发消息的中间层,负责消息的有序性和可靠性传输。 2. **生产者(Producer)**:负责发送消息到RocketMQ的消息队列。 3. **消费者...
RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大数据、实时计算、微服务等领域。本文将深入探讨RocketMQ的核心概念、优缺点、安装配置以及高级特性。 **1. MQ介绍** * **异步解耦**:MQ通过异步通信...
RocketMQ 是阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统中扮演着关键角色,用于解耦应用、异步处理以及提高系统的响应速度和吞吐量。本压缩包中的资源可能是一个逐步学习 RocketMQ 的教程,从基础...
《深入探索RocketMQ源码:透视消息中间件的内核机制》 RocketMQ,作为一款高性能、高可用的消息中间件,被广泛应用于分布式系统设计中。本文将带领读者深入其源码,理解其核心组件与功能,从而更好地掌握RocketMQ在...
消息堆积:Producer已经将消息发送到消息队列RocketMQ的服务端,但由于Consumer消费能力有限,未能在短时间内将所有消息正确消费掉,此时在消息队列RocketMQ的服务端保存着未被消费的消息,该状态即消息堆积。...
SpringBoot集成RocketMq,可打包成jar包引入到SpringBoot项目中,方便快捷的使用RocketMq的发送消费消息的功能