一、启动RocketMQ
[root@master ~]# cat /etc/hosts
# Do not remove the following line, or various programs
# that require network functionality will fail.
127.0.0.1 localhost.localdomain localhost
::1 localhost6.localdomain6 localhost6
192.168.1.106 node1
192.168.1.103 master
192.168.1.110 node2
[root@master ~]# cd /opt/alibaba-rocketmq/bin/
[root@master bin]# cat play.sh
#!/bin/sh
#
# Name Server
#
nohup sh mqnamesrv > ns.log 2>&1 &
#
# Service Addr
#
ADDR=`hostname -i`:9876
#
# Broker
#
nohup sh mqbroker -n ${ADDR} > bk.log 2>&1 &
echo "Start Name Server and Broker Successfully, ${ADDR}"
[root@master bin]# sh play.sh
Start Name Server and Broker Successfully, 192.168.1.103:9876
[root@master bin]# sh mqadmin topicList -n 192.168.1.103:9876
BenchmarkTest
DefaultCluster
SELF_TEST_TOPIC
%RETRY%please_rename_unique_group_name_4
TBW102
gaojingsong
master
OFFSET_MOVED_EVENT
[root@master bin]# cd ../
备注:此时topic不存在,但是生产数据的时候会自动创建
二、生产和消费数据
生产:
package cn.cn.mq.demo;
import java.util.concurrent.TimeUnit;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws MQClientException,
InterruptedException{
/**
* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ProducerGroupName需要由应用来保证唯一<br>
* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
* 因为服务器会回查这个Group下的任意一个Producer
*/
final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.1.103:9876");
producer.setInstanceName("Producer");
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
/**
* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
*/
for (int i = 0; i < 3; i++){
try {
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"OrderID001",// key
("我的名字是程序员:"+i).getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest1",// topic
"TagC",// tag
"OrderID001",// key
("我来测试RocketMQ:"+i).getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
}catch(Exception e) {
e.printStackTrace();
}
TimeUnit.MILLISECONDS.sleep(4000);
}
/**
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
*/
// producer.shutdown();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
producer.shutdown();
}
}));
System.exit(0);
}
}
消费:
package cn.cn.mq.demo;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
public class PushConsumer {
/**
* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
* 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
*/
public static void main(String[] args) throws InterruptedException,
MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ConsumerGroupName需要由应用来保证唯一
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"ConsumerGroupName");
consumer.setNamesrvAddr("192.168.1.103:9876");
consumer.setInstanceName("Consumber");
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
/**
* 订阅指定topic下所有消息<br>
* 注意:一个consumer对象可以订阅多个topic
*/
//consumer.subscribe("TopicTest2", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("TopicTest1")) {
// 执行TopicTest1的消费逻辑
if (msg.getTags() != null // 执行TagA的消费
&& msg.getTags().equals("TagA")) {
System.out.println("TagA:"+new String(msg.getBody()));
} else if (msg.getTags() != null// 执行TagC的消费
&& msg.getTags().equals("TagC")) {
System.out.println("TagC:"+new String(msg.getBody()));
} else if (msg.getTags() != null// 执行TagD的消费
&& msg.getTags().equals("TagD")) {
System.out.println("TagD:"+new String(msg.getBody()));
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("ConsumerStarted.");
}
}
三、验证消费结果
[root@master bin]# sh mqadmin topicList -n 192.168.1.103:9876
BenchmarkTest
TopicTest1
DefaultCluster
SELF_TEST_TOPIC
%RETRY%please_rename_unique_group_name_4
%RETRY%ConsumerGroupName
TBW102
gaojingsong
master
OFFSET_MOVED_EVENT
[root@master bin]# sh mqadmin topicStatus -n 192.168.1.103:9876 -t TopicTest1
#Broker Name #QID #Min Offset #Max Offset #Last Updated
master 0 0 4 2016-10-20 14:38:19,236
master 1 0 4 2016-10-20 14:38:19,243
master 2 0 2 2016-10-20 14:38:15,171
master 3 0 2 2016-10-20 14:38:15,180
[root@master bin]# shutdown -h now
消费数据
四、错误解决方案
解决方案:版本不一致问题,更改同一版本问题解决
相关推荐
通过阅读和运行这些代码,开发者可以了解RocketMQ在Java环境中的具体使用方式,为实际项目开发提供参考。 为了确保程序能够正常运行,还需要配置RocketMQ服务器,启动NameServer和Broker节点,这些步骤通常涉及修改...
- **发送与接收消息**:通过具体的代码示例,演示如何使用Java API发送普通消息、顺序消息以及事务消息,并展示消息的消费逻辑。 - **性能优化**:讲解如何通过合理的配置提高RocketMQ系统的吞吐量和降低延迟,比如...
通过上述知识点,开发者可以了解到Java与Apache RocketMQ在云原生环境中的最佳实践,以及如何利用Java API进行消息生产和消费。同时,了解如何将RocketMQ部署到云环境中,并实现自动化的管理和监控,对于提升系统的...
RocketMQ实例代码解析 ...通过阅读和理解上述代码,你可以创建一个简单的RocketMQ生产者和消费者实例,进一步了解其工作原理和使用方法。在实际项目中,可以根据具体需求调整配置,实现更复杂的业务逻辑。
使用Java开发RocketMQ应用程序,首先需要添加RocketMQ的依赖库到项目中,然后创建Producer和Consumer对象,编写发送和接收消息的代码。 1. 创建Producer: ```java DefaultMQProducer producer = new ...
Rocketmq-AvroDemo 是一个基于Java的项目,其主要目的是展示如何按照Avro规范来生产和消费数据到Apache RocketMQ中。Avro是Apache Hadoop项目的一部分,它提供了一种高效、跨语言的数据序列化系统。这个Demo将帮助...
RocketMQ使用它来增强代码的可读性和可维护性,特别是在处理字符串和执行通用编程任务时。 3. **rocketmq-client-4.6.1.jar**:这是RocketMQ的核心客户端库,包含了生产者、消费者、队列管理、事务处理等关键功能的...
Java客户端是与RocketMQ服务器进行交互的主要接口,提供了丰富的API来实现消息的发送和接收。 在Java客户端使用RocketMQ的过程中,主要涉及以下几个核心概念和操作: 1. **Producer**: 生产者是发送消息的实体,你...
在集群部署中,RocketMQ通过NameServer进行路由发现,Producer负责生产消息,Consumer则负责消费消息。为了保证高可用性,通常会配置多个NameServer和Broker节点,以实现故障切换和负载均衡。 在分布式WebSocket...
5. **示例代码**:可能包含实际的Java代码示例,展示如何创建生产者和消费者线程,以及如何使用阻塞队列进行数据交换。 6. **性能优化**:在实际应用中,可能会讨论如何选择合适的队列大小、是否需要预创建线程、...
在Java代码中,生产者和消费者都会涉及到创建和使用主题。 4. **Message Queue**:消息队列是RocketMQ中的物理存储单位,每个主题可以有多个消息队列,消息在队列中顺序存储。消费者通常按照轮询或者负载均衡的方式...
1. **环境准备**:确保你的系统已安装Java运行环境(JRE)和Go语言环境,因为RocketMQ Exporter可能是用Go编写的。检查系统环境变量是否正确设置。 2. **构建或下载二进制**:如果你有Go环境,可以克隆"rocketmq-...
6. **消息回溯**:RocketMQ提供消息回溯功能,允许消费者重新消费历史消息,这对于调试和数据挖掘非常有用。 7. **丰富的API和客户端支持**:RocketMQ提供了Java、Python、Go等多种语言的SDK,方便不同技术栈的...
这个可视化界面能够帮助运维人员和开发者直观地查看RocketMQ的运行状态,包括消费队列、生产者、消费者等关键指标。通过这个界面,你可以监控消息发送和接收的情况,查看消息堆积、延迟等信息,从而及时发现并解决...
总的来说,这个压缩包提供了一个基础的RocketMQ消费者实现示例,帮助开发者了解如何在Java应用中集成和使用RocketMQ进行消息消费。通过学习和调试这两个Java文件,开发者可以掌握RocketMQ的基本用法,进而将其应用到...
在Python代码中,我们需要初始化JVM,加载RocketMQ的Java类,然后创建并使用Producer和Consumer对象。 在实际应用中,可能会遇到的挑战包括性能优化(如批量发送消息、合理设置Consumer的拉取消息间隔)、消息丢失...
1. **集群监控**:RocketMQ Console可以展示各个Broker的运行状态,包括在线的Broker数量、每个Broker的存储容量、消息发送和消费的速率等关键指标。 2. **主题与队列管理**:用户可以通过界面创建、删除和修改...
11. **Java API**:RocketMQ 3.2.6 版本提供了 Java 开发接口,开发者可以方便地在 Java 应用中集成 RocketMQ,实现消息的发送和消费。 学习RocketMQ 3.2.6,你需要理解以上核心概念,并通过实践操作加深理解。同时...
5. **NameServer**:RocketMQ的路由中心,生产者和消费者都需要通过NameServer获取主题和队列的路由信息。 6. **RocketMQChecker**:这个标签可能指的是RocketMQ的健康检查工具,用于检测RocketMQ集群的运行状态,...