`

rocketMQ之局部有序消息

阅读更多

(1)生产者
//局部有序消息
public class Producer2 {
public static void main(String[] args) throws Exception{
Producer2 p = new Producer2();
p.sendMessage();
}

// 发送消息
void sendMessage() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_test");
// 指定NameServer地址
producer.setNamesrvAddr("192.168.191.133:9876"); // 多个ip以分号分隔
producer.start();
for (int i = 0; i < 20; i++) {
int orderId = i % 10;
// 自定义一个tag数组
            String[] tags = new String[]{"orders", "money", "goods"};
         // 构建消息
         Message msg = new Message("MyTopicOrder",
         tags[i % tags.length],
         "KEY"+i,
         ("hi " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
           // 发送10条消息到Topic为MyTopicOrder,tag为tags数组按顺序取值,
            // key值为“KEY”拼接上i的值,消息内容为“Hi”拼接上i的值
SendResult sendResult = producer.send(msg,
new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message m, Object arg1) {
// arg的值其实就是orderId
                        Integer id = (Integer) arg1;
                        // mqs是队列集合,也就是topic所对应的所有队列
                        int index = id % mqs.size();
                        // 这里根据前面的id对队列集合大小求余来返回所对应的队列
                        return mqs.get(index);
}
},
orderId);

System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}

(2)消费者-有序消费
//消息有序消费:先启动消费者,再启动生产者
public class Consumer2 {

public static void main(String[] args) throws Exception{
Consumer2 c = new Consumer2();
c.receiveMessage();
}

void receiveMessage() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_test");
// 指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr("192.168.191.133:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置consumer所订阅的Topic和Tag
        consumer.subscribe("MyTopicOrder", "orders || money || goods");
consumer.registerMessageListener(
new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
String msgbody;
try {
msgbody = new String(msg.getBody(), "utf-8");
System.out.println("接收消息: " + msgbody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//返回消费状态
                //SUCCESS 消费成功
                //SUSPEND_CURRENT_QUEUE_A_MOMENT 消费失败,暂停当前队列的消费
                return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("consumer start..%n");
}
}
0
0
分享到:
评论

相关推荐

    RocketMQ自定义selector实现消息通道定向发送和拉取

    RocketMQ是一款开源的消息中间件,由阿里巴巴开发并贡献给Apache基金会,它在分布式系统中扮演着重要的角色,尤其是在大规模并发、高可用性以及消息可靠性方面。本文将深入探讨如何通过自定义Selector实现在RocketMQ...

    RocketMQ消息队列demo

    - **消息顺序**:对于需要保持消息顺序的场景,RocketMQ提供了有序消息的支持。 4. ** Demo分析 ** 在"rocketmq-demo"中,可能包含以下内容: - 一个简单的命令行界面,用户输入IP和端口后,连接到RocketMQ...

    消息中间件 RocketMQ 开发指南

    《RocketMQ 开发指南》是一本详尽介绍Apache RocketMQ这一高效消息中间件的书籍,旨在帮助开发者理解和掌握其核心概念、使用方法以及最佳实践。RocketMQ,作为阿里巴巴开源的分布式消息中间件,广泛应用于大数据处理...

    RocketMQ消息丢失解决方案:事务消息.docx

    RocketMQ 消息丢失解决方案:事务消息 在本文中,我们将讨论如何解决 RocketMQ 中的消息丢失问题,特别是在生产者发送消息到 MQ 过程中的消息丢失问题。我们将介绍 RocketMQ 的事务消息机制,如何使用事务消息来...

    基于RocketMQ的MQTT消息推送服务器分布式部署方案.pdf

    MQTT 协议是Android 系统中消息推送的实现技术之一, 由于其具有低功耗、节省流量和可扩展性强的优点, 目前已得到了众多应用. 同时, RocketMQ 作为一种分布式消息队列, 在服务器分布式部署上具有很大优势, 具有高...

    spingboot-rocketmq事务消息

    SpringBoot集成RocketMQ实现事务消息 在分布式系统中,事务消息是一种重要的机制,它能够确保在分布式环境下的数据一致性。SpringBoot与RocketMQ的结合,为开发者提供了方便地处理事务消息的能力,从而解决业务场景...

    RocketMQ实践:确保消息不丢失与顺序性的高效策略

    在使用Apache RocketMQ时,确保消息的不丢失和顺序性是维持系统稳定性和可靠性的重要方面。RocketMQ提供了多种策略来解决这些问题。 一、确保消息不丢失 1. 环节中的丢消息可能性:在消息的生产、传输、存储和消费...

    SpringBoot整合rocketmq事务消息

    在本文中,我们将深入探讨如何将SpringBoot框架与Apache RocketMQ集成,特别是在处理事务消息方面。RocketMQ是一款高性能、分布式的消息中间件,广泛应用于大型互联网公司,尤其在处理大规模并发和高可靠性的消息...

    消息中间件 RocketMQ 性能压测工具

    2 功能强大,覆盖普通消息、定时(延时)消息、事务消息(提交、回滚)等基本场景的发送场景。集群订阅和广播订阅的消费场景。内含多种命令行参数(例如消费位点的调整,消息体大小调整,并发数调整,JVM 参数调优,...

    阿里云rocketmq消息队列对接demo

    该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。

    消息队列-RocketMQ1

    RocketMQ是一款高性能、分布式的消息中间件,主要特点是它能确保消息的严格顺序、提供多种消息拉取模式、支持大规模订阅者的水平扩展以及实时的消息订阅机制。它最初是为电商平台设计的,因此特别适合处理高并发和大...

    RocketMQ消息队列资料

    - ** RocketMq之初识消息中间件**:这部分内容可能涵盖了消息中间件的基本概念、工作原理,以及RocketMQ在消息中间件领域中的独特优势。 - **31-Rocketmq特性详解**:深入理解RocketMQ的各项特性,包括高可用、高...

    rocketmq可视化界面,rocketmq-console

    1. **消息队列(Message Queue)**:RocketMQ的核心组件,它是一个存储和转发消息的中间层,负责消息的有序性和可靠性传输。 2. **生产者(Producer)**:负责发送消息到RocketMQ的消息队列。 3. **消费者...

    RocketMQ讲义-03.pdf

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大数据、实时计算、微服务等领域。本文将深入探讨RocketMQ的核心概念、优缺点、安装配置以及高级特性。 **1. MQ介绍** * **异步解耦**:MQ通过异步通信...

    rocketmq消息中间件.zip

    RocketMQ 是阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统中扮演着关键角色,用于解耦应用、异步处理以及提高系统的响应速度和吞吐量。本压缩包中的资源可能是一个逐步学习 RocketMQ 的教程,从基础...

    深入探索RocketMQ源码:透视消息中间件的内核机制

    《深入探索RocketMQ源码:透视消息中间件的内核机制》 RocketMQ,作为一款高性能、高可用的消息中间件,被广泛应用于分布式系统设计中。本文将带领读者深入其源码,理解其核心组件与功能,从而更好地掌握RocketMQ在...

    RocketMQ分布式消息队列

    消息堆积:Producer已经将消息发送到消息队列RocketMQ的服务端,但由于Consumer消费能力有限,未能在短时间内将所有消息正确消费掉,此时在消息队列RocketMQ的服务端保存着未被消费的消息,该状态即消息堆积。...

    SpringBoot集成RocketMq,打包成jar包引入到SpringBoot项目中,使用RocketMq发送消费消息的功能

    SpringBoot集成RocketMq,可打包成jar包引入到SpringBoot项目中,方便快捷的使用RocketMq的发送消费消息的功能

Global site tag (gtag.js) - Google Analytics