`

SpringBoot-第十三章 SpringBoot整合RocketMQ

 
阅读更多

RocketMQ 的安装很简单,下载,解压,执行2个启动命令即可,我们下载的是二进制版本不需要官网中的mvn install命令。注意要把9876和10119端口添加到防火墙,允许访问。具体可以参考官网的安装步骤http://rocketmq.apache.org/docs/quick-start/

RocketMQ没有正式发布控制台,需要到https://github.com/search?q=rocket-external下载rocketmq-console工程,更改配置rocketmq.config.namesrvAddr=xxx.xxx.xxx.xxx:9876运行即可。

 

<properties>
   <rocketmq.version>4.1.0-incubating</rocketmq.version>
</properties>

 

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>${rocketmq.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-common</artifactId>
   <version>${rocketmq.version}</version>
</dependency>

 

#消费者组名
apache.rocketmq.consumer.PushConsumer=orderConsumer
#生产者组名
apahce.rocketmq.producter.producerGroup=Producer
#namers路由发现服务的地址
apache.rocketmq.namesrvAddr=xxx.xxx.xxx.xxx:9876

 

@Component
public class MsgProducter {

    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String producterGroup;

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvServerIp;

    private  DefaultMQProducer producer;

    public DefaultMQProducer getProducter()
    {
        return producer;
    }
    @PostConstruct
public void init()
    {
        producer = new DefaultMQProducer(producterGroup);
        //集群部署是ip:prot;ip:prot;ip:port  生产者先从路由服务中获取broker的信息,在将信息发送到broker
producer.setNamesrvAddr(namesrvServerIp);
        producer.setVipChannelEnabled(false);

        try
{
            producer.start();
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
    }
}

 

@RestController
@RequestMapping("/order")
public class OrderMQController {

    @Autowired
private MsgProducter msgProducter;

    @RequestMapping("/done2")
    public Object order2(String msg,String tag)
    {
        try {
            Message message = new Message("kevin_rocketmq_topic","tag2", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult result = msgProducter.getProducter().send(message);
            return result;
        }
        catch (Exception e)
        {
            e.printStackTrace();
            return "fail";
        }
    }
    @RequestMapping("/done3")
    public Object order3(String msg,String tag)
    {
        try {
            Message message = new Message("kevin_rocketmq_topic","tag3", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult result = msgProducter.getProducter().send(message);
            return result;
        }
        catch (Exception e)
        {
            e.printStackTrace();
            return "fail";
        }
    }
}

 

@Component
public class MsgConsumer {

    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvServerIp;

    DefaultMQPushConsumer consumer;

    @PostConstruct
public void consumer()
    {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        try {
            consumer.subscribe("kevin_rocketmq_topic","tag2");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener((MessageListenerConcurrently) (list,context)->{
            try {
                for(MessageExt messageExt:list)
                {
                    System.out.println(messageExt);
                    String messageBody = new String (messageExt.getBody(),RemotingHelper.DEFAULT_CHARSET);
                    System.out.println(messageExt.getMsgId()+" "+messageBody);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

            catch(Exception e)
            {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics