`
234390216
  • 浏览: 10233563 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
A5ee55b9-a463-3d09-9c78-0c0cf33198cd
Oracle基础
浏览量:462665
Ad26f909-6440-35a9-b4e9-9aea825bd38e
springMVC介绍
浏览量:1775564
Ce363057-ae4d-3ee1-bb46-e7b51a722a4b
Mybatis简介
浏览量:1398395
Bdeb91ad-cf8a-3fe9-942a-3710073b4000
Spring整合JMS
浏览量:395029
5cbbde67-7cd5-313c-95c2-4185389601e7
Ehcache简介
浏览量:680000
Cc1c0708-ccc2-3d20-ba47-d40e04440682
Cas简介
浏览量:530910
51592fc3-854c-34f4-9eff-cb82d993ab3a
Spring Securi...
浏览量:1184053
23e1c30e-ef8c-3702-aa3c-e83277ffca91
Spring基础知识
浏览量:467988
4af1c81c-eb9d-365f-b759-07685a32156e
Spring Aop介绍
浏览量:151405
2f926891-9e7a-3ce2-a074-3acb2aaf2584
JAXB简介
浏览量:68170
社区版块
存档分类
最新评论

RocketMQ(02)——发送消息的三种方式

阅读更多

发送消息的三种方式

 

同步发送

Producer在进行消息发送时可以是阻塞的,也可以是非阻塞的。具体对应到发送方式一共有三种,分别是同步、异步和单向的(ONEWAY)。之前介绍的调用send()返回SendResult的方法是阻塞的,它一定要等到Broker进行了响应后才会返回,才能继续往下执行。对于下面的代码就是只有第一条消息发送完了后,才能发送第二条消息,接着是第三条。这种阻塞发送的方式也叫同步发送,它的整个响应时间还包括可能的重试时间。其内部会默认进行两次重试,可以通过setRetryTimesWhenSendFailed()指定同步发送时内部最大的重试次数。

@Test
public void testSyncSend() throws Exception {
  //指定Producer的Group为group1
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  //指定需要连接的Name Server
  producer.setNamesrvAddr(nameServer);
  //发送消息前必须调用start(),其内部会进行一些初始化工作。
  producer.start();
  for (int i = 0; i < 10; i++) {
    //指定消息发送的Topic是topic1。
    Message message = new Message("topic1", ("hello" + i).getBytes());
    //同步发送,发送成功后才会返回
    SendResult sendResult = producer.send(message);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
      System.out.println("消息发送成功:" + sendResult);
    } else {
      System.out.println("消息发送失败:" + sendResult);
    }
  }
  //使用完毕后需要把Producer关闭,以释放相应的资源
  producer.shutdown();
}

 

异步发送

同步发送时调用send()的线程会阻塞,而异步发送时当前线程是不会阻塞的。发送结果将由一个回调函数进行回调。下面的代码就是异步发送消息的示例,它与同步发送消息的区别是它在发送消息时多传递了一个SendCallback对象,该方法一调用立马返回,而不需要等待Broker的响应返回。消息发送成功或失败后将回调SendCallback对象的对应方法。所以对于下面示例而言,第一条消息Broker还没有确认发送成功时,第二条消息就发送了,第三条消息也是一样。它们真正在Broker发送成功的顺序其实是不确定的。

@Test
public void sendAsync() throws Exception {
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  producer.setNamesrvAddr(nameServer);
  producer.start();
  CountDownLatch latch = new CountDownLatch(10);
  for (int i = 0; i < 10; i++) {
    Message message = new Message("topic1", ("send by async, no." + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    producer.send(message, new SendCallback() {
      @Override
      public void onSuccess(SendResult sendResult) {
        System.out.println("发送成功:" + message);
        latch.countDown();
      }

      @Override
      public void onException(Throwable throwable) {
        System.out.println("发送失败");
        latch.countDown();
      }
    });
  }
  latch.await();
  producer.shutdown();
}

通过异步方式发送消息如果失败了,其内部也是会进行重试的,其最大重试次数是通过setRetryTimesWhenSendAsyncFailed()指定的,默认也是2次。

 

ONEWAY

除了同步发送和异步发送外,还有一种发送方式叫ONEWAY,它的发送是单向的,即它不需要等待Broker的响应,只管发送即可,而不论发送成功与失败。通常应用于一些消息不是那么重要,可丢失的场景。它的发送是通过调用sendOneway()发送的。

@Test
public void sendOneway() throws Exception {
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  producer.setNamesrvAddr(nameServer);
  producer.start();
  for (int i=0; i<10; i++) {
    Message message = new Message("topic1", "tag2", ("message send with oneway, no."+i).getBytes());
    producer.sendOneway(message);
  }
  producer.shutdown();
}

(注:本文是基于Apache RocketMQ4.5.0所写)

0
1
分享到:
评论

相关推荐

    Kafka vs RocketMQ—— Topic数量对单机性能的影响1

    上一期测试主要关注了三款消息中间件(Kafka、RabbitMQ、RocketMQ)在简单消息发送场景下的性能,而本期则模拟了一个更为实际的业务场景:消息的发送和订阅并存,并支持多个订阅者订阅特定消息。 首先,我们要理解...

    RocketMQ消息队列demo

    - **消息模型**:RocketMQ支持两种消息模型——点对点(P2P)和发布/订阅(Pub/Sub)模式。P2P模式下,每个消息只有一个消费者;而在Pub/Sub模式下,一个消息可以被多个消费者消费。 2. ** RocketMQ的使用步骤 ** ...

    rocketmq使用.zip

    总结来说,“rocketmq使用.zip”这个压缩包包含了关于RocketMQ的基本使用和核心特性——事务消息的资料,对于理解并运用RocketMQ在分布式系统中的事务处理非常有帮助。通过学习和实践,你可以构建出更健壮、高效的...

    rocketmq安装包,rocketmq-all-5.1.3-bin-release

    6. **Message模型**:RocketMQ支持两种消息模型——发布/订阅模型和点对点模型。发布/订阅模型下,多Consumer可以订阅同一个Topic,消息会被广播给所有订阅者;点对点模型下,消息只会被单个Consumer消费,且消费后...

    黑马rocketmq md文档

    6. **消息模型**:RocketMQ支持两种消息模型——点对点(Point-to-Point,简称P2P)和发布/订阅(Publish/Subscribe,简称Pub/Sub)。P2P模型中,每个消息仅被一个消费者消费,适合一对一通信;Pub/Sub模型下,消息...

    rocketmq-5.1.3.zip

    5. **消息模型**:RocketMQ支持两种主要的消息模型——发布/订阅(Publish/Subscribe)和点对点(Point-to-Point)。理解这两种模型的区别和应用场景是使用RocketMQ的关键。 6. **事务消息**:RocketMQ的事务消息...

    rocketmq-4.7.0.zip

    4. **Consumer**:消费者是接收消息的客户端,有两种消费模式——Push模式和Pull模式。Push模式下,服务器主动推送消息到消费者;Pull模式下,消费者主动从服务器拉取消息。4.7.0版本可能优化了消费者线程池和负载...

    rocketmq-externals-master.zip

    1. **集群监控**:通过RocketMQ-Console,管理员可以查看整个RocketMQ集群的状态,包括NameServer、Broker、Topic、Producer和Consumer的运行情况,以及消息的发送、消费等统计信息。 2. **Topic管理**:用户可以在...

    RocketMQ消息丢失解决方案:同步刷盘+手动提交.docx

    这种方式虽然提高了消息处理速度,但也增加了消息丢失的风险——如果Broker服务器在消息尚未被刷盘至磁盘之前发生故障,那么这部分消息将会丢失。 - **解决方案**:为了降低消息丢失的可能性,可以通过修改Broker...

    rocketmq-all-4.0.0-incubating-bin-release

    1. **Message Model**: RocketMQ支持两种消息模型——点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)。点对点模型中,每个消息只会被一个消费者消费,而发布/订阅模型则允许多消费者订阅同一个主题,...

    RocketMQ-3.2.6_part0

    1. **发布/订阅模型**:RocketMQ支持两种消息模型——点对点(Queue)和发布/订阅(Topic)。点对点模型中,每个消息仅被一个消费者消费;而在发布/订阅模型中,消息可以被多个消费者同时消费。 2. **消息队列**:...

    LG机构RocketMQ视频源码资料课件

    4. **消息模型**:RocketMQ支持两种消息模型——点对点(Pull)和发布/订阅(Push)。点对点模型中,每个消息只有一个Consumer消费;发布/订阅模型中,一个消息可以被多个Consumer消费。 5. **高可用性**:RocketMQ...

    RocketMQ用户指南v3.2.4

    RocketMQ用户指南v3.2.4是针对阿里巴巴开源的消息中间件项目——RocketMQ的一份详细开发指南。此文档旨在帮助开发者理解并有效地利用RocketMQ的功能特性,为分布式系统提供可靠的消息传递服务。 1. **产品发展历史*...

    rocketmq-all-4.6.0-bin-release

    7. **消息模型**:RocketMQ支持两种基本的消息模型——发布/订阅(Publish/Subscribe)和点对点(Point-to-Point)。发布/订阅模型中,一个Topic可以有多个订阅者,消息发布一次即可被所有订阅者接收;点对点模型中...

    rocketmq-release-4.8.0.zip

    1. **消息模型**:RocketMQ支持两种基本的消息模型——发布/订阅(Publish/Subscribe)和点对点(Point-to-Point)。发布/订阅模型中,生产者发送消息到主题,多个消费者可以订阅同一主题并接收消息;点对点模型中,...

    RocketMQ源码分析讲解

    RocketMQ存储篇中会详细介绍消息的存储结构,包括CommitLog和ConsumeQueue两种文件类型的设计和作用。在RocketMQ中,消息的存储和管理是非常核心的部分,它涉及到了消息的顺序写入、随机读取、文件刷盘、文件的预...

    【面试资料】-(机构内训资料)java面试题_消息中间件--RocketMq(14题).zip

    RocketMQ提供了同步、异步和单向三种发送模式,消费者则有Push模式和Pull模式,Push模式由服务器推送消息,Pull模式由消费者主动拉取。 5. ** 消息的事务特性 **:RocketMQ支持分布式事务消息,实现分布式环境下的...

    消息中间件+RocketMq+入门文档+用于学习

    3. **消息消费者Consumer**:有两种消费方式——拉取式消费和推动式消费。拉取式消费由应用主动从Broker获取消息,而推动式消费由Broker主动推送消息给Consumer。 4. **主题Topic**:消息的分类,每条消息属于一个...

    rocketmq-server-4.7.1.rar

    RocketMQ提供了一个消息队列服务,它将生产者(Producer)发送的消息存储在队列中,然后由消费者(Consumer)进行消费。这种异步通信模式能够有效地解耦系统,提高系统的伸缩性和稳定性。 RocketMQ的主要组件包括:...

Global site tag (gtag.js) - Google Analytics