public class Producer {
private static final String EXCHANGE_NAME="rabbit_test_exchange";
private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";
public static void main(String[] args)throws Exception{
//先和RabbitMQ Server建立連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("duanzx");
factory.setPassword("duanzx");
factory.setVirtualHost("/duanzx_host");
Connection connection = factory.newConnection();
//创建出连接通道
Channel channel = connection.createChannel();
//声明交换机名称和类型(直接根据路由键)
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//向RabbitMQ发送消息
channel.basicPublish(EXCHANGE_NAME,EXCHANGE_ROOTING_KEY, MessageProperties.TEXT_PLAIN,"just for test".getBytes());
//关闭和RabbitMQ Server之间的通道
channel.close();
//关闭连接
connection.close();
}
}
public class ConsumerTest {
private static final String EXCHANGE_NAME="rabbit_test_exchange";
private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";
private static final String QUEUE_NAME="rabbit_test_queue";
public static void main(String[] args)throws Exception{
//先和RabbitMQ Server建立連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("duanzx");
factory.setPassword("duanzx");
factory.setVirtualHost("/duanzx_host");
Connection connection = factory.newConnection();
//创建出连接通道
Channel channel = connection.createChannel();
//声明交换机名称和类型(直接根据路由键)
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//声明队列名称
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROOTING_KEY);
//读取队列里的数据并进行处理
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
try {
System.out.println(new String(body,"UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));
String message = "just for test";
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(message.getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
socketChannel.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socketChannel != null) {
socketChannel.close();
}
}
}
private Selector selector;
private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
public SocketClass(int port) {
try {
//声明Selector
selector = Selector.open();
//声明ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//声明ServerSocketChannel通信方式为非阻塞
serverSocketChannel.configureBlocking(false);
//ServerSocketChannel端口号码
serverSocketChannel.bind(new InetSocketAddress(port));
//注册到Selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("start server port:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//一直执行该线程
while (true) {
try {
//阻塞等待,直到声明READ事件的那些通道已经就绪。
selector.select();
//遍历所有已经注册的通道
Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
while (selectionKeyIterator.hasNext()) {
SelectionKey key = selectionKeyIterator.next();
selectionKeyIterator.remove();
if (key.isValid()) {
//如果是ACCEPT事件,代表ServerSocketChannel,此时应该接收请求
if (key.isAcceptable()) {
this.accept(key);
}
//如果是READ事件,代表SocketChannel,此时应该处理请求
if (key.isReadable()) {
this.read(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void accept(SelectionKey key) {
//获取服务端通道
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
try {
//接收客户端请求,并创建通道
SocketChannel socketChannel = serverSocketChannel.accept();
//声明SocketChannel通信方式为非阻塞
socketChannel.configureBlocking(false);
//注册到Selector
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
private void read(SelectionKey key) {
try {
//获取客户端连接通道
SocketChannel socketChannel = (SocketChannel) key.channel();
//先清空缓冲区里的数据
byteBuffer.clear();
//读取通道里的数据并写入到缓冲区
int count = socketChannel.read(byteBuffer);
//如果通道里没有数据,关闭通道并取消SelectionKey,
if (count == -1) {
socketChannel.close();
key.cancel();
return;
}
//将缓冲区由写模式切换到读模式
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
//读取缓冲区里的数据并放入byte数组里
byteBuffer.get(bytes);
System.out.println("has receive request:"+new String(bytes));
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
new SocketClass(8080).run();
}
相关推荐
AMQP-CPP是一个开源的C++库,专为与RabbitMQ消息中间件进行通信而设计。RabbitMQ是一款广泛使用的开源消息代理,基于Advanced Message Queuing Protocol (AMQP)标准,提供高效、可靠的异步消息传递。AMQP-CPP库使得...
通过这个RabbitMQ客户端工程,开发者不仅可以学习到如何使用Qt和QAMQP库与RabbitMQ进行通信,还能了解如何设计和组织一个完整的Qt项目,这对于构建其他基于Qt的消息传递应用具有很高的参考价值。此外,由于工程已经...
你可以使用各种编程语言(如Python、Java、Ruby等)的RabbitMQ客户端库来实现。发送消息时,指定Exchange、Routing Key以及消息体。 **5. 消费者(Consumer)接收消息** 消费者是从RabbitMQ接收消息的应用。消费者...
RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)的消息中间件,它允许不同的应用程序之间通过消息传递进行通信,从而实现解耦、异步处理和容错。RabbitMQ支持多种编程语言,并提供稳定的性能和强大的监控...
RabbitMQ是一种广泛使用的开源消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,允许应用程序之间进行异步通信。RabbitMQ的主要目的是通过提供可靠的消息传递机制来解耦系统组件,使得...
JavaScript连接消息(RabbitMQ)是将JavaScript编程语言与RabbitMQ消息队列系统相结合,实现分布式系统中的异步通信和解耦。RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP(Advanced Message Queuing ...
1. **解耦**: 当系统各部分之间通过RabbitMQ通信,可以降低耦合度,增加系统的可扩展性。 2. **异步处理**: 对于耗时的操作,可以通过发送消息到队列,让后台服务异步处理,提高系统响应速度。 3. **负载均衡**: 多...
RabbitMQ 是一个流行的消息队列系统,用于应用程序之间的异步通信。 描述没有提供具体细节,但我们可以假设内容可能涵盖如何结合 Kettle 和 RabbitMQ 实现数据流的发布和订阅。这通常涉及以下几个关键知识点: 1. ...
- AMQP是一种开放标准,定义了消息格式和网络协议,使得不同应用和平台能够进行消息通信。 - RabbitMQ作为AMQP服务器,支持多种客户端库,使得开发者可以轻松地在各种编程语言中集成消息队列。 4. 消息队列的使用...
**RabbitMQ与SpringMVC集成** RabbitMQ是一个开源的消息代理和队列服务器,...在实际应用中,需要正确安装和配置Erlang及RabbitMQ,然后利用Spring的AMQP支持将消息队列功能融入到Web应用中,从而实现高效的消息通信。
**Erlang与RabbitMQ消息队列详解** 在分布式系统设计中,消息队列扮演着至关重要的角色,它能够实现系统间的异步通信、负载均衡以及解耦合。Erlang是一种为并发和分布式计算设计的编程语言,而RabbitMQ是一个基于...
RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议实现的消息队列服务,支持多种编程语言的客户端,如Java、Python、Ruby等。消息队列的主要优点包括解耦、异步处理、流量控制和扩展性。 【压缩包子...
RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议的开源消息代理和队列服务器,它允许应用程序之间进行异步通信,从而提高系统的可扩展性和可靠性。作为Java开发的项目,RabbitMQ具有跨平台的特性,...
消息队列MQ,全称Message Queue,是一种在应用程序之间用于通信的技术,主要功能是作为生产者与消费者之间的缓冲。在消息队列模型中,生产者不断地向队列中发布消息,而消费者则从队列中获取并处理这些消息。这种...
- **AMQP协议**:了解Advanced Message Queuing Protocol(AMQP)的规范,它是RabbitMQ通信的基础。 - **五种交换器类型**:Direct、Fanout、Topic、Header和Routing,以及它们在不同场景下的应用。 **2. 安装与...
RabbitMQ是一个开源的消息代理,遵循AMQP协议,支持多种编程语言,包括Java。它作为消息的中间人,接收、存储和转发消息,确保了消息的可靠传输。RabbitMQ具有高可用性、可扩展性和容错性,可以处理大量的并发连接和...
RabbitMQ是一款开源的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,提供高可靠性的消息传递服务。在本文中,我们将深入探讨RabbitMQ的核心概念、功能特性、部署...
### 消息中间件RabbitMQ相关知识点 #### 一、消息队列(MQ)概述 **1.1 MQ的概念** MQ(Message Queue),即消息队列,是一种用于实现进程间通信的技术。它通过在消息的生产者和消费者之间提供一个先进先出(FIFO...
首先,RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议,这是一个标准化的、跨语言的消息传递协议,使得不同编程语言的应用程序可以无缝通信。理解AMQP的基本概念,如交换器(Exchanges)、队列(Queues...
此外,还会介绍如何使用各种编程语言(如Java、Python、Ruby等)的客户端库与RabbitMQ进行交互。 接下来,我们将深入到RabbitMQ的核心概念——交换机(Exchanges)、队列(Queues)和绑定(Bindings)。交换机负责...