`

RabbitMQ初体验

 
阅读更多

一直听说MQ这个东西,也一直没有去尝试下,刚好部门开会又提到了这个东西,这几天闲点,抽个空体验下。

 

分享下自己的学习经验吧:

1.纠结的搭建过程

2.具体组件以及相关组件说明

3.应用场景和代码实现

 

1.纠结的搭建过程

   说起搭建过程,真是过程曲折啊,暂时还没有搞好,所以就先使用测试环境的MQ了。

   a)RabbitMQ依赖erlang的包,erlang的官网包很难下,基本上点击进去就访问不了,最后终于知乎上看到一个可用连接了,分享下:http://erlang-users.jp/ 日本的一个镜像,虽然是日语翻译一下将就看吧,基本可以下载到erlang的源码包。

   b)便宜erlang又报了个"No curses library functions found" 我用一下yum命令发现虚拟机连不上公网,改了网络模式也没有生效,本地先不搭了吧,先测试环境将就着用吧。

yum list|grep ncurses
yum -y install ncurses-devel
yum install ncurses-devel

 

2.具体组件以及相关组件说明

   

 

    a) exchange

       exchange有以下几种模式:

       default 默认的exchange,所有的队列都会注册该exchange,exchangeName为空,routingKey为队列名称。 

       direct模式,指定exchangeName和routingKey即可绑定指定的队列。

       fanout模式,广播模式,忽略routingKey属性,向所有绑定该exchange的队列发送消息。

       topic模式,感觉和direct类似,只是支持多维度的routingKey和* #匹配。

       header模式,根据消息头的部分属性来绑定消息队列。


      b) queue

      Queue有以下几个属性:

        Durable(消息代理重启后,队列依旧存在)

        Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)

        Auto-delete(当最后一个消费者退订后即被删除)

        Arguments(一些消息代理用他来完成类似与TTL的某些额外功能)


 
     c) Consumer

         消息确认模式:消费者任务做完,或者收到消息发送ack。

         拒绝消息:消费者发现消息过期或者消息处理失败,同时告诉MQ这条消息该如何处理,重新放入队列还是丢弃。

         预取消息:一个消费者可以一次从队列中获取多条消息。


 

    d) Channel 

      一个连接多个通道,每个通道有自己的chanelID,主要用于连接复用。类似NIO中的channel。

     

 

 

 

 

3.应用场景和代码实现

   应用场景主要参考官网的入门文档:https://www.rabbitmq.com/getstarted.html

   看完应用场景觉得这个文档特别贴心,专门为小白用户写的,由浅入深,逐步深入,点赞。

   下面说说我对这几个场景的理解吧,也便于自己梳理,新手的话还是建议参考官网文档。

 

   a) Hello World场景:

       这是最简单的场景,生产者丢消息到队列中,消费中从队列中取消息。

    

 

    Send.java

 public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setPort(QUEUE_PORT);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        // 发布消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }

 
    Consumer.java

    

 public static  void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setPort(QUEUE_PORT);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        // 测试发现这是个阻塞任务,处理完消息就会等待队列的下一条消息
        channel.basicConsume(QUEUE_NAME, true, consumer);

    }

 
    b) Work Queue模式

      队列主要存放比较耗费资源(时间,内存,CPU)的任务,生产者提交任务到队列,多个消费者一起来完成任务。

 

     Send.java

 public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setPort(QUEUE_PORT);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 第二个参数标记是否持久化队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = getMessage(args);

        channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }

    /**
     * 生产消息
     * @param strings
     * @return
     */
    public static String getMessage(String[] strings){
        /**
         * 随机数用于测试同一条消息只被一个消费者处理还是多个消费者同时处理
         * 测试证明,如果是这种策略的话只会被一个消费者接受和处理
         */
        if (strings.length < 1)
            return "Hello World!..." + (int)(Math.random()*100);
        return joinStrings(strings, " ");
    }

    public static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
    

 

    Consumer.java

    

 public static  void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setPort(QUEUE_PORT);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println(" [x] Received '" + message + "'");
                try {
                    try {
                        doWork(message);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } finally {
                    System.out.println(" [x] Done");
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

    private static void doWork(String task) throws InterruptedException {
        for (char ch: task.toCharArray()) {
            // 模拟耗时任务
            if (ch == '.') Thread.sleep(1000);
        }
    }
 

 

   c) 发布订阅模式

    终于进入了Rabbit MQ的核心组件Exchange,官方给出的例子是日志系统,生成的日志写入两个队列,一个队列的消费者用于打印在控制台,另一个队列的消费者写入日志文件。这个主要使用的是fanout模式,意思是该exchange的消息会同步分发给所有绑定它的队列。和上面两种模式很明显的区别是:上面两种模式一条消息只能被一个消费者接收,而这里一条消息给多个队列从而给多个消费者使用。


 

     Send.java

      

  public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setPort(QUEUE_PORT);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String message = getMessage(args);
        // 发布消息
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    

 

   Consumer.java

 

public static  void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setPort(QUEUE_PORT);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 生成名字随机的队列,绑定exchange来接受信息
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
    

 

    d) Routing模式

    这个说一个明显的需求就能看出和上面的区别。一个消费者需要将error级别的日志打印到文件,error和别的级别都打印到控制台。这里使用的是direct模式,只需要指定哪个exchange,使用哪个routingkey即可根据该规则过滤到指定的队列。

    

    Send.java

    

  public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setPort(QUEUE_PORT);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(args);
        String message = Send.getMessage(args);

        // 主要关注这里的前两个参数一个是exchangeName 一个是routingKey
        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();

    }

    private static String getSeverity(String[] strings) {
        if ((int)(Math.random()*10) > 5 )
            return "info";
        return "error";
    }
   

 

   Consumer.java

   

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setPort(QUEUE_PORT);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 生成系统指定名称的队列,方便回收
        String queueName = channel.queueDeclare().getQueue();

        /**
         * 这里启动多个Receive进行测试
         * 一个是args = new String[1];args[0] = "error";
         * 一个是args = new String[2];args[0] = "error";args[1] = "info";
         */
        args = new String[1];
        args[0] = "error";
        for(String severity : args){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
 

 

   e) Topics模式

   其实只要看需求就能理解Topics模式,跟上面的需求对比下,如果我需要过滤定时任务的错误日志,我不想要系统内核的错误日志,怎么办呢?上面的direct只能解决一维的问题,但是topics可以解决多维的问题。同时这里的这两句需要关注下。”* (star) can substitute for exactly one word.“ ”# (hash) can substitute for zero or more words.“ 这里就不贴代码,跟上面的类似,只需要注意两个参数routingKey 和 the exchange type。

 


 

 

    f) RPC模式

     主要是RPC调用请求发送到exchange,exchange绑定到相应的请求队列,这里的消费者即服务端,服务端处理完成后,同时将结果封装到Response队列中,客户端通过判断是否是自己的correlationId,如果是结束调用,或者下一次调用。


 
   Client.java

   

  public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setPort(QUEUE_PORT);
        connection = factory.newConnection();
        channel = connection.createChannel();

        // 请求相应的消息队列
        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws Exception {
        String response = null;
        String corrId = UUID.randomUUID().toString();

        // 封装请求的属性
        BasicProperties props = new BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            // 判断和自己的请求ID是否相同
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody(),"UTF-8");
                break;
            }
        }
        return response;
    }

    public void close() throws Exception {
        connection.close();
    }

    public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
            fibonacciRpc = new RPCClient();

            System.out.println(" [x] Requesting fib(30)");
            response = fibonacciRpc.call("30");
            System.out.println(" [.] Got '" + response + "'");
        }
        catch  (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (fibonacciRpc!= null) {
                try {
                    fibonacciRpc.close();
                }
                catch (Exception ignore) {}
            }
        }
    }

   

   RPCServer.java

   

 private static int fib(int n) {
        if (n ==0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setPort(QUEUE_PORT);
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare(RPC_REQUEST_QUEUE_NAME, false, false, false, null);

            // 设置每个消费这最多处理一个任务
            channel.basicQos(1);

            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(RPC_REQUEST_QUEUE_NAME, false, consumer);
            System.out.println(" [x] Awaiting RPC requests");

            while (true) {
                String response = null;
                // Main application-side API: wait for the next message delivery and return it.
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();

                BasicProperties requestProps = delivery.getProperties();
                // 相应只要指定请求ID即可
                BasicProperties replyProps = new BasicProperties
                        .Builder()
                        .correlationId(requestProps.getCorrelationId())
                        .build();
                try {
                    String message = new String(delivery.getBody(),"UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response = "" + fib(n);
                }
                catch (Exception e){
                    System.out.println(" [.] " + e.toString());
                    response = "";
                }
                finally {
                    // 写入response队列
                    channel.basicPublish( "", requestProps.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            }
        }
        catch  (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                }
                catch (Exception ignore) {}
            }
        }
    }

 
 

 

 

 

  • 大小: 9.9 KB
  • 大小: 12.3 KB
  • 大小: 10.1 KB
  • 大小: 22.8 KB
  • 大小: 23.9 KB
  • 大小: 35.9 KB
  • 大小: 36.2 KB
  • 大小: 5.1 KB
  • 大小: 7.4 KB
  • 大小: 8.8 KB
  • 大小: 4.5 KB
分享到:
评论

相关推荐

    头歌springboot初体验

    在"头歌springboot初体验"中,我们可以探讨以下几个关键知识点: 1. **起步依赖(Starter Dependencies)**:Spring Boot通过起步依赖管理项目中的依赖。例如,如果你想使用Spring MVC进行Web开发,只需添加`spring...

    Web请求异步处理和海量数据即时分析在淘宝开放平台的实践(岑文初)

    这篇文章将深入探讨这两个关键领域,并基于岑文初的实践经验进行解析。 首先,Web请求异步处理是提升系统性能和可扩展性的重要手段。在传统的Web服务中,每个请求都是同步的,即服务器接收到请求后立即处理并返回...

    128元尚硅谷Java视频教程_Spring Boot视频教程(下)整合篇

    4、尚硅谷-SpringBoot高级-缓存-@Cacheable初体验 5、尚硅谷-SpringBoot高级-缓存-缓存工作原理&@Cacheable运行流程 6、尚硅谷-SpringBoot高级-缓存-@Cacheable其他属性 7、尚硅谷-SpringBoot高级-缓存-@CachePut 8...

    基于嵌入式Linux的点菜系统的设计与实现.pdf

    2. **硬件平台选择**:设计之初需选择适合的嵌入式硬件,如ARM架构的处理器,因其高性能、低功耗特性,常被用于嵌入式设备。硬件应具备足够的计算能力,同时考虑体积、接口、电源管理等因素,以适应餐厅环境。 3. *...

    Baishop是一款B2C电子商务网站,可以生成通用的电子商务构建平台,您可以非常方便的开一个网上商店.zip

    4. **分布式系统**:在高并发场景下,Baishop可能采用了分布式缓存如Redis、分布式消息队列如RabbitMQ,以及负载均衡技术,以保证系统的稳定性和可扩展性。 5. **安全性**:Java的Secure Socket Layer (SSL) 和...

    大型网站技术架构:核心原理与案例分析-李智慧.pdf

    总之,《大型网站技术架构:核心原理与案例分析》是一本涵盖了现代互联网架构多个层面的深度指南,无论你是初入互联网行业的开发者,还是寻求技术进阶的技术负责人,都能从中受益匪浅,理解并掌握构建大型网站所需的...

    亿级流量网站架构核心技术 跟开涛学搭建高可用高并发系统

    总之,《亿级流量网站架构核心技术 跟开涛学搭建高可用高并发系统》是一本对互联网从业者极具价值的参考书,无论你是初入架构设计的工程师还是希望进一步提升架构能力的技术领导者,都能从中受益匪浅。通过阅读本书...

    [扫描版] 大型网站技术架构:核心原理与案例分析

    《大型网站技术架构:核心原理与案例分析》这本书深入...总之,《大型网站技术架构:核心原理与案例分析》是了解和掌握大型网站背后技术的宝贵资源,无论是对于初入行业的新人还是经验丰富的从业者,都能从中受益匪浅。

    系统架构演进探析

    - **消息队列**:通过引入中间件如RabbitMQ、Kafka等来实现异步通信,降低了服务之间的直接依赖。 - **API网关**:作为外部系统的统一入口,可以隐藏服务的具体细节,提供一致性的API接口。 #### 结语 软件架构的...

    云原生开发框架及免费云原生框架获取,spring cloud 云原生 低代码

    自2020年初1.0.0版本发布以来,KOCA保持着稳定的版本更新节奏,截至2021年5月,已应用于75个产品/项目,覆盖证券、资产管理等多个领域,并有多个重要客户采用,如安信证券、光大证券等。KOCA通过其微服务开发能力、...

    新款小额现金贷网络贷款系统源码可打包成APP,支持三级分销、赚取佣金等内附安装说明.txt

    - **消息队列**:RabbitMQ或Kafka等中间件能够实现异步通信,提高系统的并发处理能力。 2. **前端技术栈**: - **HTML/CSS/JavaScript**:作为前端开发的基础技术。 - **框架与库**:React.js、Vue.js等现代前端...

    springboot物业管理系统

    - **Bootstrap**或**Element UI**:提供响应式布局和丰富的UI组件,提升用户体验。 5. **开发流程** - **需求分析**:明确物业管理系统的需求,包括业主管理、房屋管理、费用收取、投诉建议等模块。 - **设计**...

    CarCreatorMicroservice:我的博客的源代码-Car source code

    - 利用RabbitMQ或Kafka作为消息中间件,实现异步处理和解耦。 - 使用Feign或Zuul进行服务间的调用,实现API的路由和负载均衡。 5. 持续集成与部署: - 项目可能结合Jenkins或GitLab CI/CD实现自动化构建、测试和...

    Java-Billing-System

    14. **消息队列**:使用RabbitMQ或Kafka等消息队列,可以实现异步处理和解耦,提高系统的可扩展性。 以上只是Java计费系统可能涉及到的一部分技术栈和知识点,实际项目中还可能根据需求和团队选择采用其他技术和...

    客户订单系统

    4. **异步处理**:对于非实时性要求高的任务,如发送邮件、生成报表,可以采用消息队列(如RabbitMQ)进行异步处理,提高系统吞吐量。 六、安全防护 1. **数据加密**:对敏感信息如密码进行加密存储,确保用户信息...

Global site tag (gtag.js) - Google Analytics