`
234390216
  • 浏览: 10232871 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
A5ee55b9-a463-3d09-9c78-0c0cf33198cd
Oracle基础
浏览量:462622
Ad26f909-6440-35a9-b4e9-9aea825bd38e
springMVC介绍
浏览量:1775509
Ce363057-ae4d-3ee1-bb46-e7b51a722a4b
Mybatis简介
浏览量:1398350
Bdeb91ad-cf8a-3fe9-942a-3710073b4000
Spring整合JMS
浏览量:395022
5cbbde67-7cd5-313c-95c2-4185389601e7
Ehcache简介
浏览量:679980
Cc1c0708-ccc2-3d20-ba47-d40e04440682
Cas简介
浏览量:530892
51592fc3-854c-34f4-9eff-cb82d993ab3a
Spring Securi...
浏览量:1183945
23e1c30e-ef8c-3702-aa3c-e83277ffca91
Spring基础知识
浏览量:467894
4af1c81c-eb9d-365f-b759-07685a32156e
Spring Aop介绍
浏览量:151381
2f926891-9e7a-3ce2-a074-3acb2aaf2584
JAXB简介
浏览量:68148
社区版块
存档分类
最新评论

RocketMQ(01)——简介

阅读更多

RocketMQ简介

笔者使用的是Apache RocketMQ,官网是http://rocketmq.apache.org/。RocketMQ是Alibaba开源的一个分布式消息队列,可以通过http://rocketmq.apache.org/dowloading/releases/下载当前最新的版本。下载后解压缩,然后通过bin/mqnamesrv启动一个Name Server,它默认监听在9876端口。然后需要通过bin/mqbroker -n localhost:9876启动一个Broker,并把它注册到刚刚启动的那个Name Server上,Broker默认监听在端口10911上。生产者和消费者都是跟Broker打交道,但是它们不会直接指定Broker的地址,而是通过Name Server来间接的获取Broker的地址。这样做的好处是可以动态的增加Broker,多个Broker之间可以组成一个集群。应用中使用RocketMQ时需要添加RocketMQ Client依赖。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.0</version>
</dependency>

然后可以通过DefaultMQProducer进行消息的发送,每一个生产者必须指定一个Group,下面代码就指定了Group为group1。相同处理逻辑的生产者必须定义相同的Group。这个Group只对于发送事务消息的生产者有用。然后需要通过setNamesrvAddr()指定Name Server的地址。在发送消息前必须调用其start()。发送的消息是通过org.apache.rocketmq.common.message.Message对象表示的。它必须要指定一个Topic,RocketMQ是通过抽象的Topic来管理一组队列的,这个Topic必须在Broker中进行创建。可以通过bin/mqadmin updateTopic -b localhost:10911 -t topic1在本地刚刚启动的Broker上创建名为topic1的Topic。它默认拥有8个读队列,8个写队列。下面的代码指定了消息都将发送到名为topic1的Topic。通过其send()进行消息发送,它是同步发送的,发送完后会返回一个SendResult。其SendStatus为SEND_OK时表示发送成功。下面的代码一共发送了10条消息到topic1,消息内容分别是hello0..hello9。

@Test
public void testSend() throws Exception {
  //指定Producer的Group为group1
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  //指定需要连接的Name Server
  producer.setNamesrvAddr(nameServer);
  //发送消息前必须调用start(),其内部会进行一些初始化工作。
  producer.start();
  for (int i = 0; i < 10; i++) {
    //指定消息发送的Topic是topic1。
    Message message = new Message("topic1", ("hello" + i).getBytes());
    //同步发送,发送成功后才会返回
    SendResult sendResult = producer.send(message);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
      System.out.println("消息发送成功:" + sendResult);
    } else {
      System.out.println("消息发送失败:" + sendResult);
    }
  }
  //使用完毕后需要把Producer关闭,以释放相应的资源
  producer.shutdown();
}

消息的消费者可以通过DefaultMQPushConsumer进行消费。DefaultMQPushConsumer是进行推模式消费的,它也需要指定一个Group。默认情况下相同Group的消费者将对同一个队列中的消息进行集群消费,即同一条消息只会被一个Consumer实例进行消费。DefaultMQPushConsumer也需要通过setNamesrvAddr()指定需要连接的Name Server。通过subscribe()指定需要消费的Topic和对应的Tag。下面指定了需要消费的Topic是topic1,通过*指定将消费所有的Tag。Tag是用来对消息进行分类标记的,需要在发送消息的时候指定。通过registerMessageListener()注册消息监听器,当收到消息后会回调它。下面代码注册的是一个MessageListenerConcurrently类型的监听器。消息正常消费后需要返回CONSUME_SUCCESS,如果消费失败可以返回RECONSUME_LATER,这样可以先跳过这一条消息的消费,Broker会过一段时间再投递这一条消息。Consumer也是需要通过start()进行启动。这样消费者就可以开始进行消息消费了,默认只有它启动之后发送的消息才能收到。

@Test
public void testConsumer() throws Exception {
  //创建Consumer并指定消费者组。
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
  //指定需要连接的Name Server
  consumer.setNamesrvAddr(nameServer);
  //订阅topic1上的所有Tag。
  consumer.subscribe("topic1", "*");
  //注册一个消息监听器
  consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
      AtomicInteger counter = new AtomicInteger();
      msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
  });
  //启动消费者
  consumer.start();
  //为了确保Junit线程不立即死掉。
  TimeUnit.SECONDS.sleep(120);
}

(注:本文是基于Apache RocketMQ4.5.0所写)

0
0
分享到:
评论

相关推荐

    Kafka vs RocketMQ—— Topic数量对单机性能的影响1

    在本文中,我们将对比分析Apache Kafka与Apache RocketMQ在处理大量Topic时的性能表现。上一期测试主要关注了三款消息中间件(Kafka、RabbitMQ、RocketMQ)在简单消息发送场景下的性能,而本期则模拟了一个更为实际...

    万亿级数据洪峰下的消息引擎——Apache RocketMQ--阿里.pdf

    ### 万亿级数据洪峰下的消息引擎——Apache RocketMQ #### 阿里消息中间件的演变历史 自2007年起,阿里巴巴集团在消息中间件领域不断探索与实践,经历了从Notify到MetaQ再到Apache RocketMQ的发展历程。以下是这一...

    rocketmq-5.1.3.zip

    5. **消息模型**:RocketMQ支持两种主要的消息模型——发布/订阅(Publish/Subscribe)和点对点(Point-to-Point)。理解这两种模型的区别和应用场景是使用RocketMQ的关键。 6. **事务消息**:RocketMQ的事务消息...

    rocketmq安装包,rocketmq-all-5.1.3-bin-release

    6. **Message模型**:RocketMQ支持两种消息模型——发布/订阅模型和点对点模型。发布/订阅模型下,多Consumer可以订阅同一个Topic,消息会被广播给所有订阅者;点对点模型下,消息只会被单个Consumer消费,且消费后...

    rocketmq-externals-master.zip

    在这个"rocketmq-externals-master.zip"压缩包中,主要包含的是RocketMQ的监控端——RocketMQ-Console,因为从GitHub下载可能速度较慢,所以通过CDN提供了快速访问。 RocketMQ-Console是RocketMQ的Web管理控制台,...

    RocketMq集群搭建3——启动

    cd /opt/rocketmq-all-4.3.0-bin-release # nohup sh bin/mqnamesrv & #启动每个服务器的nameserver # tail -f nohup.out The Name Server boot success #输出此类信息,说明启动成功 启动broker 服务器Namserver1...

    RocketMQ消息队列demo

    - **消息模型**:RocketMQ支持两种消息模型——点对点(P2P)和发布/订阅(Pub/Sub)模式。P2P模式下,每个消息只有一个消费者;而在Pub/Sub模式下,一个消息可以被多个消费者消费。 2. ** RocketMQ的使用步骤 ** ...

    rocketmq-4.7.0.zip

    4. **Consumer**:消费者是接收消息的客户端,有两种消费模式——Push模式和Pull模式。Push模式下,服务器主动推送消息到消费者;Pull模式下,消费者主动从服务器拉取消息。4.7.0版本可能优化了消费者线程池和负载...

    黑马rocketmq md文档

    6. **消息模型**:RocketMQ支持两种消息模型——点对点(Point-to-Point,简称P2P)和发布/订阅(Publish/Subscribe,简称Pub/Sub)。P2P模型中,每个消息仅被一个消费者消费,适合一对一通信;Pub/Sub模型下,消息...

    RocketMQ用户指南v3.2.4

    RocketMQ用户指南v3.2.4是针对阿里巴巴开源的消息中间件项目——RocketMQ的一份详细开发指南。此文档旨在帮助开发者理解并有效地利用RocketMQ的功能特性,为分布式系统提供可靠的消息传递服务。 1. **产品发展历史*...

    RocketMQ-3.2.6_part0

    1. **发布/订阅模型**:RocketMQ支持两种消息模型——点对点(Queue)和发布/订阅(Topic)。点对点模型中,每个消息仅被一个消费者消费;而在发布/订阅模型中,消息可以被多个消费者同时消费。 2. **消息队列**:...

    rocketmq使用.zip

    总结来说,“rocketmq使用.zip”这个压缩包包含了关于RocketMQ的基本使用和核心特性——事务消息的资料,对于理解并运用RocketMQ在分布式系统中的事务处理非常有帮助。通过学习和实践,你可以构建出更健壮、高效的...

    SpringBoot3.0 + RocketMq 构建企业级数据中台完结11章

    视频课程下载——SpringBoot3.0 + RocketMq 构建企业级数据中台【完结11章】

    4. Apache RocketMQ 架构演进之道——杜恒.pdf

    技术文档分享,免费获取请私信博主。

    rocketmq-all-4.0.0-incubating-bin-release

    1. **Message Model**: RocketMQ支持两种消息模型——点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)。点对点模型中,每个消息只会被一个消费者消费,而发布/订阅模型则允许多消费者订阅同一个主题,...

    RocketMQ Summit 2022全球开发者峰会PPT(33份).zip

    在本压缩包“RocketMQ Summit 2022全球开发者峰会PPT(33份).zip”中,我们聚焦于一场重要的技术盛宴——RocketMQ Summit 2022全球开发者峰会。这场峰会聚集了世界各地的顶尖开发人员、架构师和技术爱好者,共同...

    rocketmq小示例项目及Linux下的编译安装说明

    2. 消费者(Consumer):消费者用于接收和处理生产者发送的消息,有两种消费模式——Push模式和Pull模式。Push模式下,RocketMQ服务器主动推送消息;Pull模式下,消费者主动从服务器拉取消息。 3. Topic:主题是消息...

    rocketmq可视化jar包.zip

    RocketMQ是一款开源的消息中间件,由阿里巴巴开发并贡献给Apache软件基金会,被广泛应用于大数据、实时计算、微服务等场景。RocketMQ的核心功能是提供高可靠、高可用、低延迟的消息传输服务。在这个"rocketmq可视化...

    rocketmq-server-4.7.1.rar

    这个压缩包"rocketmq-server-4.7.1.rar"包含了该版本的源代码,适用于Windows环境的部署,同时也整合了RocketMQ的控制台管理工具——rocketmq-console。 首先,让我们深入理解RocketMQ的基本概念。RocketMQ提供了一...

    apache-rocketmq.zip

    3. README.md:这是RocketMQ项目的README文件,用Markdown格式编写,包含项目简介、安装指南、使用方法、开发与贡献等信息,对于初学者来说非常重要。 4. NOTICE:此文件列出了项目中使用到的第三方组件和它们的...

Global site tag (gtag.js) - Google Analytics