(1)生产者
//局部有序消息
public class Producer2 {
public static void main(String[] args) throws Exception{
Producer2 p = new Producer2();
p.sendMessage();
}
// 发送消息
void sendMessage() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_test");
// 指定NameServer地址
producer.setNamesrvAddr("192.168.191.133:9876"); // 多个ip以分号分隔
producer.start();
for (int i = 0; i < 20; i++) {
int orderId = i % 10;
// 自定义一个tag数组
String[] tags = new String[]{"orders", "money", "goods"};
// 构建消息
Message msg = new Message("MyTopicOrder",
tags[i % tags.length],
"KEY"+i,
("hi " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送10条消息到Topic为MyTopicOrder,tag为tags数组按顺序取值,
// key值为“KEY”拼接上i的值,消息内容为“Hi”拼接上i的值
SendResult sendResult = producer.send(msg,
new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message m, Object arg1) {
// arg的值其实就是orderId
Integer id = (Integer) arg1;
// mqs是队列集合,也就是topic所对应的所有队列
int index = id % mqs.size();
// 这里根据前面的id对队列集合大小求余来返回所对应的队列
return mqs.get(index);
}
},
orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
(2)消费者-有序消费
//消息有序消费:先启动消费者,再启动生产者
public class Consumer2 {
public static void main(String[] args) throws Exception{
Consumer2 c = new Consumer2();
c.receiveMessage();
}
void receiveMessage() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_test");
// 指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr("192.168.191.133:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置consumer所订阅的Topic和Tag
consumer.subscribe("MyTopicOrder", "orders || money || goods");
consumer.registerMessageListener(
new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
String msgbody;
try {
msgbody = new String(msg.getBody(), "utf-8");
System.out.println("接收消息: " + msgbody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//返回消费状态
//SUCCESS 消费成功
//SUSPEND_CURRENT_QUEUE_A_MOMENT 消费失败,暂停当前队列的消费
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("consumer start..%n");
}
}
分享到:
相关推荐
RocketMQ提供了有序消息功能,如全局顺序消息和局部顺序消息,以满足不同需求。 4. **消息回溯**: RocketMQ提供消息回溯功能,允许消费者在一段时间内回溯消费历史消息,这对于故障排查和数据恢复非常有用。 5. ...
2. 消息顺序:RocketMQ支持全局有序和局部有序两种消息顺序模式。 3. 消息回溯:通过设置消息的保留时间,可以实现消息的回溯,方便问题排查。 4. 分布式事务:RocketMQ提供了分布式事务消息,保证事务的一致性。 5....
局部有序消息允许相同订单 ID 的消息在同一个 Queue 中保持顺序,不同订单 ID 的消息则可以并行处理。通过取模等策略分配消息到不同的 Queue,可以在保证局部顺序的同时提升性能。 **延时消息**,是指消息被发送到 ...
RocketMQ支持全局有序和局部有序,通过Group Topic和Order Tag实现;Kafka则通过分区策略和严格的消息写入顺序来部分保证消息顺序。 4. **处理消息堆积**:当消费者处理速度跟不上生产者速度时,消息会堆积。可以...
- **RocketMQ**:RocketMQ是阿里巴巴开源的一款分布式消息中间件,具有高吞吐量、低延迟、高可用性等特点,广泛应用于订单、支付、事件驱动等场景。 - **基础操作**:包括创建主题、发布/订阅消息、消费消息等。 ...
- Java栈中局部变量表中的引用对象。 - 本地方法栈中JNI(Native方法)引用的对象。 3. **项目如何排查JVM问题** - 使用 JVM 监控工具如 VisualVM、JConsole 等。 - 分析内存泄漏、CPU 使用率等问题。 - 优化...
RocketMQ的底层实现基于发布/订阅模型,通过Broker进行消息路由和存储。 Dubbo的架构设计包括服务提供者、消费者、注册中心、监控中心等组件。 Spring Cloud组件如Zuul(API网关)、Feign(声明式服务调用)等,与...