`
java最爱
  • 浏览: 6261 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

网络编程与RabbitMQ消息通信

 
阅读更多
1.RabbitMQ简介,RabbitMQ是一个开源的消息中间件,负责接收Producer的消息,并将该消息放入队列中,然后Consumer从队列里取出消息并进行处理。
在这里Producer不需要关心Consumer如何处理消息,Consumer也不需要关心Producer的消息如何发送。两方实现了完全的解耦。代码实例:

  public class Producer {
    private static final String EXCHANGE_NAME="rabbit_test_exchange";
    private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";

    public static void main(String[] args)throws Exception{
        //先和RabbitMQ Server建立連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("duanzx");
        factory.setPassword("duanzx");
        factory.setVirtualHost("/duanzx_host");
        Connection connection = factory.newConnection();
        //创建出连接通道
        Channel channel = connection.createChannel();
        //声明交换机名称和类型(直接根据路由键)
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //向RabbitMQ发送消息
        channel.basicPublish(EXCHANGE_NAME,EXCHANGE_ROOTING_KEY, MessageProperties.TEXT_PLAIN,"just for test".getBytes());
        //关闭和RabbitMQ Server之间的通道
        channel.close();
        //关闭连接
        connection.close();
    }
}

public class ConsumerTest {

    private static final String EXCHANGE_NAME="rabbit_test_exchange";
    private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";
    private static final String QUEUE_NAME="rabbit_test_queue";

    public static void main(String[] args)throws Exception{
        //先和RabbitMQ Server建立連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("duanzx");
        factory.setPassword("duanzx");
        factory.setVirtualHost("/duanzx_host");
        Connection connection = factory.newConnection();
        //创建出连接通道
        Channel channel = connection.createChannel();
        //声明交换机名称和类型(直接根据路由键)
        channel.exchangeDeclare(EXCHANGE_NAME"direct");
        //声明队列名称
        channel.queueDeclare(QUEUE_NAMEfalsefalsefalsenull);
        //将队列绑定到交换机上
        channel.queueBind(QUEUE_NAMEEXCHANGE_NAMEEXCHANGE_ROOTING_KEY);
        //读取队列里的数据并进行处理
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                try {
                    System.out.println(new String(body,"UTF-8"));
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

2.Socket 网络通信包括同步阻塞,异步阻塞,同步非阻塞,异步非阻塞。
      其中同步指的是应用程序与系统之间的通信方式,假设应用程序与系统需要完成两个动作,如果第一个动作完成后才能执行第二个动作。则该通信方式是同步的。如果在执行第一个动作后,重新启动了另一个线程执行,而在主线程里不需要等待第一个动作完成后,就继续执行第二个动作。则该通信方式是异步的。
      阻塞指的是应用程序读取IO的操作方式。传统的IO操作都是阻塞的,对IO的操作都是在流里面进行,如果文件过大的话,执行时间会变长,并且内存占用的也会很多。
jdk1.5之后的NIO操作是非阻塞的。在操作时候会先将数据放到Buffer里,等到缓冲区保存好数据后再通知接收方处理数据。
同步非阻塞Socket通信代码实例:
public class ClientClass {
    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("localhost"8080));
            String message = "just for test";
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            byteBuffer.put(message.getBytes());
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            socketChannel.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (socketChannel != null) {
                socketChannel.close();
            }
        }
    }
}
 
public class SocketClass implements Runnable {

    private Selector selector;
    private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

    public SocketClass(int port) {
        try {
            //声明Selector
            selector = Selector.open();
            //声明ServerSocketChannel
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //声明ServerSocketChannel通信方式为非阻塞
            serverSocketChannel.configureBlocking(false);
            //ServerSocketChannel端口号码
            serverSocketChannel.bind(new InetSocketAddress(port));
            //注册到Selector
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("start server port:" + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        //一直执行该线程
        while (true) {
            try {
                //阻塞等待,直到声明READ事件的那些通道已经就绪。
                selector.select();
                //遍历所有已经注册的通道
                Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
                while (selectionKeyIterator.hasNext()) {
                    SelectionKey key = selectionKeyIterator.next();
                    selectionKeyIterator.remove();
                    if (key.isValid()) {
                        //如果是ACCEPT事件,代表ServerSocketChannel,此时应该接收请求
                        if (key.isAcceptable()) {
                            this.accept(key);
                        }
                        //如果是READ事件,代表SocketChannel,此时应该处理请求
                        if (key.isReadable()) {
                            this.read(key);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void accept(SelectionKey key) {
        //获取服务端通道
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        try {
            //接收客户端请求,并创建通道
            SocketChannel socketChannel = serverSocketChannel.accept();
            //声明SocketChannel通信方式为非阻塞
            socketChannel.configureBlocking(false);
            //注册到Selector
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void read(SelectionKey key) {
        try {
            //获取客户端连接通道
            SocketChannel socketChannel = (SocketChannel) key.channel();
            //先清空缓冲区里的数据
            byteBuffer.clear();
            //读取通道里的数据并写入到缓冲区
            int count = socketChannel.read(byteBuffer);
            //如果通道里没有数据,关闭通道并取消SelectionKey,
            if (count == -1) {
                socketChannel.close();
                key.cancel();
                return;
            }
            //将缓冲区由写模式切换到读模式
            byteBuffer.flip();
            byte[] bytes = new byte[byteBuffer.remaining()];
            //读取缓冲区里的数据并放入byte数组里
            byteBuffer.get(bytes);
            System.out.println("has receive request:"+new String(bytes));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws Exception {
        new SocketClass(8080).run();
    }
}
分享到:
评论

相关推荐

    AMQP-CPP是用于与RabbitMq消息中间件通信的c++库

    AMQP-CPP是一个开源的C++库,专为与RabbitMQ消息中间件进行通信而设计。RabbitMQ是一款广泛使用的开源消息代理,基于Advanced Message Queuing Protocol (AMQP)标准,提供高效、可靠的异步消息传递。AMQP-CPP库使得...

    RabbitMQ客户端Qt项目工程

    通过这个RabbitMQ客户端工程,开发者不仅可以学习到如何使用Qt和QAMQP库与RabbitMQ进行通信,还能了解如何设计和组织一个完整的Qt项目,这对于构建其他基于Qt的消息传递应用具有很高的参考价值。此外,由于工程已经...

    rabbitmq发送&接收消息

    你可以使用各种编程语言(如Python、Java、Ruby等)的RabbitMQ客户端库来实现。发送消息时,指定Exchange、Routing Key以及消息体。 **5. 消费者(Consumer)接收消息** 消费者是从RabbitMQ接收消息的应用。消费者...

    springboot与rabbitmq消息队列的整合

    RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)的消息中间件,它允许不同的应用程序之间通过消息传递进行通信,从而实现解耦、异步处理和容错。RabbitMQ支持多种编程语言,并提供稳定的性能和强大的监控...

    RabbitMq消息队列3.8.3

    RabbitMQ是一种广泛使用的开源消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,允许应用程序之间进行异步通信。RabbitMQ的主要目的是通过提供可靠的消息传递机制来解耦系统组件,使得...

    JavaScript连接消息(RabbitMQ)

    JavaScript连接消息(RabbitMQ)是将JavaScript编程语言与RabbitMQ消息队列系统相结合,实现分布式系统中的异步通信和解耦。RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP(Advanced Message Queuing ...

    RabbitMQ消息列队

    1. **解耦**: 当系统各部分之间通过RabbitMQ通信,可以降低耦合度,增加系统的可扩展性。 2. **异步处理**: 对于耗时的操作,可以通过发送消息到队列,让后台服务异步处理,提高系统响应速度。 3. **负载均衡**: 多...

    kettle rabbitmq 插件开发

    RabbitMQ 是一个流行的消息队列系统,用于应用程序之间的异步通信。 描述没有提供具体细节,但我们可以假设内容可能涵盖如何结合 Kettle 和 RabbitMQ 实现数据流的发布和订阅。这通常涉及以下几个关键知识点: 1. ...

    消息队列+RabbitMQ3.12.10和Erlang安装包

    - AMQP是一种开放标准,定义了消息格式和网络协议,使得不同应用和平台能够进行消息通信。 - RabbitMQ作为AMQP服务器,支持多种客户端库,使得开发者可以轻松地在各种编程语言中集成消息队列。 4. 消息队列的使用...

    RabbitMQ与SpringMVC集成

    **RabbitMQ与SpringMVC集成** RabbitMQ是一个开源的消息代理和队列服务器,...在实际应用中,需要正确安装和配置Erlang及RabbitMQ,然后利用Spring的AMQP支持将消息队列功能融入到Web应用中,从而实现高效的消息通信。

    Erlang和RabbitMQ 消息队列

    **Erlang与RabbitMQ消息队列详解** 在分布式系统设计中,消息队列扮演着至关重要的角色,它能够实现系统间的异步通信、负载均衡以及解耦合。Erlang是一种为并发和分布式计算设计的编程语言,而RabbitMQ是一个基于...

    经典安全消息队列工具rabbitMQ rabbitmq-server-3.7.9

    RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议实现的消息队列服务,支持多种编程语言的客户端,如Java、Python、Ruby等。消息队列的主要优点包括解耦、异步处理、流量控制和扩展性。 【压缩包子...

    RabbitMQ实战 高效部署分布式消息队列 带目录 高清版 PDF

    RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议的开源消息代理和队列服务器,它允许应用程序之间进行异步通信,从而提高系统的可扩展性和可靠性。作为Java开发的项目,RabbitMQ具有跨平台的特性,...

    RabbitMq消息队列指南.docx

    消息队列MQ,全称Message Queue,是一种在应用程序之间用于通信的技术,主要功能是作为生产者与消费者之间的缓冲。在消息队列模型中,生产者不断地向队列中发布消息,而消费者则从队列中获取并处理这些消息。这种...

    RabbitMQ实战:高效部署分布式消息队列 pdf版

    - **AMQP协议**:了解Advanced Message Queuing Protocol(AMQP)的规范,它是RabbitMQ通信的基础。 - **五种交换器类型**:Direct、Fanout、Topic、Header和Routing,以及它们在不同场景下的应用。 **2. 安装与...

    利用 Spring 和 RabbitMQ 解决消息传送难题

    RabbitMQ是一个开源的消息代理,遵循AMQP协议,支持多种编程语言,包括Java。它作为消息的中间人,接收、存储和转发消息,确保了消息的可靠传输。RabbitMQ具有高可用性、可扩展性和容错性,可以处理大量的并发连接和...

    RabbitMQ实战高效部署分布式消息队列.pdf+rabbitmq学习手册.pdf

    RabbitMQ是一款开源的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,提供高可靠性的消息传递服务。在本文中,我们将深入探讨RabbitMQ的核心概念、功能特性、部署...

    尚硅谷_消息中间件RabbitMQ_课件.docx

    ### 消息中间件RabbitMQ相关知识点 #### 一、消息队列(MQ)概述 **1.1 MQ的概念** MQ(Message Queue),即消息队列,是一种用于实现进程间通信的技术。它通过在消息的生产者和消费者之间提供一个先进先出(FIFO...

    RabbitMQ 高效部署分布式消息队列.zip

    首先,RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议,这是一个标准化的、跨语言的消息传递协议,使得不同编程语言的应用程序可以无缝通信。理解AMQP的基本概念,如交换器(Exchanges)、队列(Queues...

    RabbitMQ高效部署分布式消息队列实战篇

    此外,还会介绍如何使用各种编程语言(如Java、Python、Ruby等)的客户端库与RabbitMQ进行交互。 接下来,我们将深入到RabbitMQ的核心概念——交换机(Exchanges)、队列(Queues)和绑定(Bindings)。交换机负责...

Global site tag (gtag.js) - Google Analytics