`

rocketmq开发手册

 
阅读更多
chaojianc添加,由Jonson Xia最后更新于九月 01, 2014  (查看更改)
转至元数据起始 

  

绑定host

使用前必须将jmenv.taobao.net域名绑定到提供nameserver地址的静态服务器地址 
例如,如果本地部署了提供nameserver地址的静态服务,可以这么配置
127.0.0.1 jmenv.taobao.net

目前没有测试机器,,拿我的机器做测试机器,,可以这么绑定
192.168.66.172 jmenv.taobao.net

 

为什么要绑定呢?参考 Rocektmq 部署章节

定义生产者


 

每一类生产者必须定义一个spring bean,,,而且该bean的producerGroup属性必须唯一(不能跟其他类的生产者重复),否则在使用分布式事务的时候会出现问题。

根据官方规范:producerGroup: 一般发送同样消息的Producer,归为同一个Group,应用必须设置,并保证命名唯一

 

producer配置
<bean id="smsproducer" class="com.alibaba.rocketmq.client.producer.DefaultMQProducer" init-method="start" lazy-init="true">
     <property name="producerGroup" value="smsproductgroup" />
</bean>

注意事项

init-method必须定义为start ,否则producer不会启动

 使用生产者

 


调用producer
<bean id="sendmessage" class="com.lifeix.apollo.user.service.impl.SendMessageDemo">
        <property name="producer" ref="smsproducer" />
</bean>

 

 

 

发消息
public class SendMessageDemo {
    private DefaultMQProducer producer;
    public DefaultMQProducer getProducer() {
    return producer;
    }
    public void setProducer(DefaultMQProducer producer) {
    this.producer = producer;
    }
    public void sendMessage() {
    Message msg = new Message("TopicTest1",// topic
            "TagA",// tag
            "OrderID001",// key
            ("Hello MetaQ").getBytes());// body
    SendResult sendResult;
    try {
        sendResult = producer.send(msg);
        System.out.println(sendResult);
        } catch (Exception e) {
        e.printStackTrace();
       }
    }
}

序列化

rocketmq 存储的是二进制数据,序列化,反序列化由使用者自己定义。 我们统一使用阿里推荐的fastjson 序列化对象,,。发送消息的时候把json string通过utf-8转成byte放入message中。 消费的时候,将byte数组转对象

https://github.com/alibaba/fastjson

 

 

pushconsumer 广播模式用法

 

consumer使用之前必须先调用subscribe 订阅topic ,还需要定义消费消息的Listener,,,没有办法全部在spring里面管理,,,

所以,只能显示的new DefaultMQPushConsumer对象,进行消费的操作

使用注意事项

以下代码必须在bean init方法里面使用,,,而且该bean spring配置必须配置成 init-method="init"

 

 

sample
public class ConsumeMessageClusteringDemo {
    protected void init() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
    /**
     * 订阅指定topic下tags分别等于TagA或TagC或TagD
     */
    try {
        consumer.setMessageModel(MessageModel.BROADCASTING); //设置 广播消费模式
        consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
        /**
         * 订阅指定topic下所有消息<br>
         * 注意:一个consumer对象可以订阅多个topic
         */
        consumer.subscribe("TopicTest2", "*");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
        /**
         * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
         */
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            MessageExt msg = msgs.get(0);
            if (msg.getTopic().equals("TopicTest1")) {
            // 执行TopicTest1的消费逻辑
            if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                // 执行TagA的消费
                String msgbody = new String(msg.getBody());
                System.out.println(msgbody);
            } else if (msg.getTags() != null && msg.getTags().equals("TagC")) {
                // 执行TagC的消费
                String msgbody = new String(msg.getBody());
                System.out.println(msgbody);
            } else if (msg.getTags() != null && msg.getTags().equals("TagD")) {
                // 执行TagD的消费
                String msgbody = new String(msg.getBody());
                System.out.println(msgbody);
            }
            } else if (msg.getTopic().equals("TopicTest2")) {
            // 执行TopicTest2的消费逻辑
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        });
        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         */
        consumer.start();
     } catch (MQClientException e) {
        e.printStackTrace();
     }
    }
}

 

pushconsumer 集群模式用法

pushconsumer 集群消费模式,,,跟广播很相似,,只是setMessageModel 方法传参做点修改即可

  

consumer.setMessageModel(MessageModel.CLUSTERING); //设置 广播消费模式

 

demo代码参考

注:参考 apollo 工程目录结构  ssh://git@pangtong.l99.com:19022/framework/lifeix-demo.git

apollo.user.impl 模块

分享到:
评论

相关推荐

    RocketMQ 开发手册 3.2.4

    **RocketMQ 开发手册 3.2.4 知识点详解** Apache RocketMQ 是一个高性能、分布式的消息中间件,最初由阿里巴巴开源,并且现在已经成为Apache顶级项目。它主要用于实现应用解耦、消息队列、异步处理以及构建事件驱动...

    (最新) 阿里云消息 RocketMQ开发手册

    RocketMQ作为开源项目,社区活跃,用户可根据自己的需求进行定制开发,并且能够从社区获得支持。它的源码托管在GitHub上,为社区贡献者提供了共同参与和改进产品的平台。 #### 官网与社区链接 - 官网地址:*** * ...

    RocketMQ 开发手册3.2.4

    RocketMQ开发手册3.2.4版本详细介绍了该产品的架构、特性、使用方法和最佳实践。本文将结合手册内容,对RocketMQ的核心知识点进行详尽阐述。 首先,RocketMQ采用了发布/订阅(Publish/Subscribe)模式,这是消息...

    RocketMQ 开发手册3.2.4.pdf

    RocketMQ是一个高性能、高可用的消息中间件,它在分布式系统中扮演着至关重要的角色,提供可靠的消息传递和消息队列服务。以下从文档中提取的知识点涵盖了RocketMQ的核心特性和解决的消息中间件问题。 1. RocketMQ...

    RocketMQ 开发手册3.2.4-1

    本文档旨在描述 RocketMQ 的多个关键特性的实现原理,幵对消息中间件遇到的各种问题迕行总结,阐述 RocketMQ 如何解决返些问题。文中主要引用了 JMS 规范不 CORBA Notification 规范,规范为我们设计系统挃明了 方吐...

    RocketMQ开发指南

    ### RocketMQ开发指南知识点概述 #### 一、RocketMQ简介及发展历程 - **RocketMQ**是由阿里巴巴自主研发的消息中间件,经过多年的迭代和发展,已经成为业界广泛使用的高性能消息队列系统之一。 - **版本**: 本指南...

    RocketMQ 手册3.2.4.pdf

    《RocketMQ开发手册3.2.4》是2015年发布的一份详细指南,主要面向希望深入了解和使用RocketMQ的开发者。RocketMQ是由阿里巴巴开源的分布式消息中间件,它在大规模分布式系统中扮演着重要的角色,提供高可用、高可靠...

    rocketmq4.5.0安装部署(安装包,开发手册,配置文件).zip

    在这个"rocketmq4.5.0安装部署(安装包,开发手册,配置文件).zip"压缩包中,包含了进行RocketMQ 4.5.0版本安装和部署所需的所有关键资源。 首先,我们来看"RocketMQ 开发手册3.2.4.pdf",这是一份详细的开发者指南...

    RocketMQ相关文档.zip

    杨开元(详细书签).pdf》、《RocketMQ架构原理剖析.pptx》和《RocketMQ开发手册3.2.4.pdf》,它们涵盖了从实战技巧到深度理论的全面知识。 首先,《RocketMQ实战与原理解析.杨开元(详细书签).pdf》由杨开元撰写,这...

    rocketMQ-开发手册-3.2.4中文版最新.zip

    3.2.4版本的开发手册提供了全面的指南,帮助开发者深入理解和使用RocketMQ。以下是对RocketMQ核心知识点的详细阐述: 一、RocketMQ概述 RocketMQ源于阿里巴巴内部的消息系统,后逐渐发展成为一款开源产品。它具有高...

    RocketMQ 手册.pdf

    - **定义**:RocketMQ是一款高性能、可扩展的分布式消息中间件,它由阿里巴巴集团开发并开源,广泛应用于微服务架构中进行消息传递。 - **特点**: - **严格的顺序保证**:支持按需保障消息顺序,确保消息处理的...

    rocketmq使用手册

    ### RocketMQ 使用手册 #### 一、RocketMQ 简介及特性 RocketMQ 是一款高性能、可扩展的分布式消息中间件,它被设计用于解决大规模互联网应用中的消息传输问题。该中间件具备以下特点: 1. **严格的消息顺序保证*...

    RocketMQ文档

    本文档集合包含了RocketMQ的核心参数配置、使用方法以及详细的手册,适用于开发人员、运维人员以及对消息队列感兴趣的学者进行深入学习。 首先,我们来看看《rocketmq参数配置.pdf》。这份文档详细介绍了RocketMQ的...

    rocketmq-externals-release-rocketmq-console-1.0.0

    总的来说,"rocketmq-externals-release-rocketmq-console-1.0.0"压缩包提供了RocketMQ集群管理的重要工具,是运维和开发人员管理RocketMQ系统不可或缺的一部分。通过理解和使用这个控制台,可以更有效地监控和优化...

    RocketMQ相关资料

    《RocketMQ_userguide》是RocketMQ的用户手册,它详细介绍了如何安装、配置和使用RocketMQ。在这个文档中,你可以了解到RocketMQ的基本架构,包括NameServer、Producer、Consumer以及Broker等核心组件。NameServer是...

    资料-全面解剖RocketMQ和项目实战.zip

    总的来说,这个压缩包是一个全面的学习资源,适合那些想要理解和掌握RocketMQ的开发人员,无论是初学者还是有经验的开发者,都能从中受益。通过阅读资料、实践代码示例和查阅文档,学习者可以系统地学习RocketMQ,并...

    rocketmq-all-4.2.0-bin-release.zip

    4. **docs** 目录:文档资料,包括用户手册、开发者指南和API参考,对于理解RocketMQ的工作原理和开发应用非常有帮助。 5. **src** 目录:源代码,虽然通常在部署时我们不需要修改源码,但对于深入理解 RocketMQ ...

    rocketmq4.1.0.tar.gz和控制台jar

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大数据、实时计算、微服务等领域。...无论是开发分布式应用、实施数据同步,还是进行复杂的消息处理,RocketMQ都是一个值得信赖的选择。

    阿里巴巴Java开发手册v1.2.0-1_java_kidsbs2_

    《阿里巴巴Java开发手册》是阿里巴巴集团为提升Java开发效率与代码质量而编撰的一份重要指导文档,尤其在“码出高效,码出质量”的愿景下,它为开发者提供了全面的编程规范和最佳实践。这份手册针对不同级别的开发者...

Global site tag (gtag.js) - Google Analytics