- 浏览: 270685 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
hongxingyaru_d5b032:
但是其中“这个Accept和Request Headers中的 ...
基于Spring MVC的Web应用开发(7) - Headers -
hongxingyaru_d5b032:
学习到了,感谢分享!
基于Spring MVC的Web应用开发(7) - Headers -
bo_hai:
总结的不错。
基于Spring MVC的Web应用开发(7) - Headers -
fendou3754:
那你也说一下spring-asm重新打包以后的源码怎么获取,尽 ...
spring将spring-asm重新打包原因 -
di1984HIT:
谢谢啊~学习了~~
JMS、AMQP实例讲解
- 使用Git从GitHub上将samples代码拷贝到本机,然后导入到IDE中
git clone git://github.com/stephansun/samples.git
- samples-jms-plain:使用JMS原生API;
- samples-jms-spring:使用Spring对JMS原生API封装后的spring-jms;
- samples-jms-spring-remoting:使用spring-jms实现JMS的请求/响应模式,需要用到spring提供的远程调用框架;
- samples-spring-remoting:介绍spring的远程调用框架;
- samples-amqp-plain:使用RabbitMQ提供的AMQP Java客户端;
- samples-amqp-spring:使用spring对AMQP Java客户端封装后的spring-amqp-rabbit;
- samples-amqp-spring-remoting:使用spring-amqp-rabbit实现AMQP的请求/响应模式,需要用到spring提供的远程调用框架;
samples-amqp-plain
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
com.rabbitmq.client.BasicProperties com.rabbitmq.client.Channel com.rabbitmq.client.Connection com.rabbitmq.client.ConnectionFactory com.rabbitmq.client.Consumer com.rabbitmq.client.MessageProperties com.rabbitmq.client.QueueingConsumer
helloworld
package stephansun.github.samples.amqp.plain.helloworld; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException { // AMQP的连接其实是对Socket做的封装, 注意以下AMQP协议的版本号,不同版本的协议用法可能不同。 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); // 下一步我们创建一个channel, 通过这个channel就可以完成API中的大部分工作了。 Channel channel = connection.createChannel(); // 为了发送消息, 我们必须声明一个队列,来表示我们的消息最终要发往的目的地。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; // 然后我们将一个消息发往这个队列。 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[" + message + "]"); // 最后,我们关闭channel和连接,释放资源。 channel.close(); connection.close(); } }
package stephansun.github.samples.amqp.plain.helloworld; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 注意我们也在这里声明了一个queue,因为我们有可能在发送者启动前先启动接收者。 // 我们要确保当从这个queue消费消息时,这个queue是存在的。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("CRTL+C"); // 这个另外的QueueingConsumer类用来缓存服务端推送给我们的消息。 // 下面我们准备告诉服务端给我们传递存放在queue里的消息,因为消息是由服务端推送过来的。 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[" + message + "]"); } } }
work queues
package stephansun.github.samples.amqp.plain.workqueues; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class Worker { private final static String QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("CRTL+C"); // 这条语句告诉RabbitMQ在同一时间不要给一个worker一个以上的消息。 // 或者换一句话说, 不要将一个新的消息分发给worker知道它处理完了并且返回了前一个消息的通知标志(acknowledged) // 替代的,消息将会分发给下一个不忙的worker。 channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); // 自动通知标志 boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("r[" + message + "]"); doWord(message); System.out.println("r[done]"); // 发出通知标志 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static void doWord(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
Parameters: deliveryTag the tag from the received com.rabbitmq.client.AMQP.Basic.GetOk or com.rabbitmq.client.AMQP.Basic.Deliver multiple true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.我之前错误的将and作为的断句点,认为true通知所有的untracked消息,包含tag指定的那个,其实应该将 up to and including 作为一个整体理解,通知所有拥有相同tag的untracked消息(暂时还没有在代码中模拟出这种场景)。尼玛英语不好害死人啊。参考这个版本的API
package stephansun.github.samples.amqp.plain.workqueues; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { // 使用Work Queues (也称为Task Queues)最主要的想法是分流那些耗时,耗资源的任务,不至于使队列拥堵。 private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } private final static String QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException { String[] strs = new String[] { "First message." }; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 跟helloworld的不同点 boolean durable = true; // 下面这个声明队列的队列名字改了,所以生产者和消费者两边的程序都要改成统一的队列名字。 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); // 有了durable为true,我们可以保证名叫task_queue的队列即使在RabbitMQ重启的情况下也不会消失。 String message = getMessage(strs); // 现在我们需要将消息标记成可持久化的。 // 如果你需要更强大的保证消息传递,你可以将发布消息的代码打包到一个事务里。 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("s[" + message + "]"); channel.close(); connection.close(); } }
publish subscribe
package stephansun.github.samples.amqp.plain.publishsubscribe; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { // 在前面,我们使用queue,都给定了一个指定的名字。能够对一个queue命名对于我们来说是很严肃的 // 下面我们需要将worker指定到同一个queue。 // echange的类型有: direct, topic, headers and fanout. private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // fanout exchange 将它收的所有消息广播给它知道的所有队列。 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(new String[] { "test" }); // 如果routingkey存在的话,消息通过一个routingkey指定的名字路由至队列 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("sent [" + message + "]"); channel.close(); connection.close(); } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
package stephansun.github.samples.amqp.plain.publishsubscribe; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class ReceiveLogs { // 就像你看到的, 创建了连接后,我们声明了一个exchange,这一步是必须的,因为将消息发送到一个并不存在的exchange上是不允许的。 // 如果还没有queue绑定到exchange上,消息将会丢失。 // 但那对我们来说是ok的。 // 如果没有消费者在监听,我们可以安全地丢弃掉消息。 // RabbitMQ中有关消息模型地核心观点是,生产者永远不会直接将消息发往队列。 // 事实上,相当多的生产者甚至根本不知道一个消息是否已经传递给了一个队列。 // 相反,生产者只能将消息发送给一个exchange。 // exchange是一个很简单的东西。 // 一边它接收来自生产者的消息,另一边它将这些消息推送到队列。 // exchagne必须明确地知道拿它收到的消息来做什么。把消息附在一个特定的队列上?把消息附在很多队列上?或者把消息丢弃掉。 // 这些规则在exchange类型里都有定义。 private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 创建fanout类型的exchange, 我们叫它logs: // 这种类型的exchange将它收到的所有消息广播给它知道的所有队列。 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 临时队列(temporary queue) // 首先,无论什么时候连接Rabbit时,我们需要一个fresh的,空的队列 // First, whenever we connect to Rabbit we need a fresh, empty queue. // 为了做到这一点,我们可以创建一个随机命名的队列,或者更好的,就让服务端给我们选择一个随机的队列名字。 // 其次,一旦我们关闭消费者的连接,这个临时队列应该自动销毁。 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("r[" + message + "]"); } } }
routing
package stephansun.github.samples.amqp.plain.routing; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // diff String serverity = getServerity(new String[] { "test" }); String message = getMessage(new String[] { "test" }); channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes()); System.out.println("s[" + serverity + "]:[" + message + "]"); channel.close(); connection.close(); } private static String getServerity(String[] strings) { return "info"; } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
package stephansun.github.samples.amqp.plain.routing; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); String[] strs = new String[] { "info", "waring", "error" }; for (String str : strs) { channel.queueBind(queueName, EXCHANGE_NAME, str); } System.out.println("CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println("r:[" + routingKey + "]:[" + message + "]"); } } }
topics
package stephansun.github.samples.amqp.plain.topics; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // diff String routingKey = getServerity(new String[] { "test" }); String message = getMessage(new String[] { "test" }); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("s[" + routingKey + "]:[" + message + "]"); channel.close(); connection.close(); } private static String getServerity(String[] strings) { return "kern.critical"; } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
package stephansun.github.samples.amqp.plain.topics; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class ReceiveLogsTopic { // FIXME // Some teasers: // Will "*" binding catch a message sent with an empty routing key? // Will "#.*" catch a message with a string ".." as a key? Will it catch a message with a single word key? // How different is "a.*.#" from "a.#"? private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); String[] strs = new String[] { "kern.critical", "A critical kernel error" }; for (String str : strs) { channel.queueBind(queueName, EXCHANGE_NAME, str); } System.out.println("CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println("r:[" + routingKey + "]:[" + message + "]"); } } }
RPC
package stephansun.github.samples.amqp.plain.rpc; import java.io.IOException; import java.util.UUID; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class RPCClient { // FIXME // AMQP协议预定义了14种伴随着消息的属性。大多数属性很少使用到。除了以下这些异常情况: // deliveryMode: // contentType: // replyTo: // correlationId: // FIXME // 为什么我们忽略掉callback队列里的消息,而不是抛出错误? // 这取决于服务端的竞争条件的可能性。 // 虽然不太可能,但这种情况是存在的,即 // RPC服务在刚刚将答案发给我们,然而没等我们将通知标志后返回时就死了 // 如果发生了这种情况, 重启的RPC服务将会重新再次处理该请求。 // 这就是为什么在客户端我们必须优雅地处理重复性的响应,及RPC在理想情况下应该时幂等的。(不太理解这句话的意思) private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); // temporary queue. replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { String response = null; String corrId = UUID.randomUUID().toString(); // in order to receive a response we need to send a 'callback' queue address with the request. // We can use the default queue(which is exclusive in the Java client) BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; } public void close() throws IOException { connection.close(); } public static void main(String[] args) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println("fib(30)"); response = fibonacciRpc.call("30"); System.out.println("got[" + response + "]"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.clone(); } catch (Exception ignore) { // ignore } } } } }
package stephansun.github.samples.amqp.plain.rpc; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class RPCServer { // 我们的代码仍然相当简单,没有试图解决更复杂(或者更重要)的问题,像: // 客户端在没有服务端运行的情况下如何处理? // 一个RPC的客户端应该有一些超时类型吗? // 如果服务端出现异常,是否应该将异常返回给客户端? // 在进行业务处理前阻止不合法的消息进入(比如检查绑定,类型) // Protecting against invalid incoming messages (eg checking bounds, type) before processing. private static final String RPC_QUEUE_NAME = "rpc_queue"; // FIXME Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible. private static int fib(int n) { if (n == 0) { return 0; } if (n == 1) { return 1; } return fib(n - 1) + fib(n - 2); } public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); // We might want to run more than one server process. // In order to spread the load equally over multiple servers we need to set the prefetchCount setting in channel.basicQos. channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("[x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build(); try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response = "" + fib(n); } catch (Exception e) { System.out.println(" [.] " + e.toString()); response = ""; } finally { channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { // ignore } } } } }
samples-amqp-spring
<dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp-rabbit</artifactId> <version>1.0.0.RC1</version> </dependency> </dependencies>常用的类有:org.springframework.amqp.AmqpAdmin
org.springframework.amqp.AmqpTemplate org.springframework.amqp.Binding org.springframework.amqp.DirectExchange org.springframework.amqp.FanoutExchange org.springframework.amqp.TopicExchange org.springframework.amqp.Message org.springframework.amqp.MessageListener org.springframework.amqp.MessageProperties
helloworld
package stephansun.github.samples.amqp.spring.helloworld; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Send { private final static String QUEUE_NAME = "hello"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/helloworld/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); String message = "Hello World!"; rabbitTempalte.send("", QUEUE_NAME, messageConverter.toMessage(message, null)); } }
package stephansun.github.samples.amqp.spring.helloworld; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Recv { private final static String QUEUE_NAME = "hello"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/helloworld/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); Message message = rabbitTempalte.receive(QUEUE_NAME); Object obj = messageConverter.fromMessage(message); System.out.println("received:[" + obj + "]"); } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="hello" durable="false" exclusive="false" auto-delete="false"/> </beans>
work queues
package stephansun.github.samples.amqp.spring.workqueues; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.support.converter.SimpleMessageConverter; import com.rabbitmq.client.Channel; public class MyWorker implements ChannelAwareMessageListener { private void doWord(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } throw new RuntimeException("test exception"); } @Override public void onMessage(Message message, Channel channel) throws Exception { System.out.println("MyWorker"); MessageProperties messageProperties = message.getMessageProperties(); String messageContent = (String) new SimpleMessageConverter().fromMessage(message); System.out.println("r[" + message + "]"); // 写在前面会怎样? // channel.basicAck(messageProperties.getDeliveryTag(), true); doWord(messageContent); System.out.println("deliveryTag是递增的"); System.out.println(messageProperties.getDeliveryTag()); // 写在后面会怎样? // channel.basicAck(messageProperties.getDeliveryTag(), false); System.out.println("r[done]"); } }
package stephansun.github.samples.amqp.spring.workqueues; import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; import org.springframework.amqp.rabbit.support.MessagePropertiesConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class NewTask { private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } private final static String QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/workqueues/spring-rabbitmq-sender.xml"); RabbitTemplate rabbitTemplate = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); String[] strs = new String[] { "First message." }; String messageStr = getMessage(strs); MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter(); MessageProperties messageProperties = messagePropertiesConverter.toMessageProperties( com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN, null, null); MessageConverter messageConverter = new SimpleMessageConverter(); Message message = messageConverter.toMessage(messageStr, messageProperties); rabbitTemplate.send("", QUEUE_NAME, message); System.out.println("s[" + message + "]"); } }Worker.java
package stephansun.github.samples.amqp.spring.workqueues; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Worker { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/workqueues/spring-rabbitmq-receiver.xml"); } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" channel-transacted="true"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="task_queue" durable="true" exclusive="false" auto-delete="false"/> <bean id="myWorker" class="stephansun.github.samples.amqp.spring.workqueues.MyWorker"/> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="none" prefetch="1"> <rabbit:listener ref="myWorker" queue-names="task_queue"/> </rabbit:listener-container> </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="task_queue" durable="true" exclusive="false" auto-delete="false"/> </beans>
publish subscribe
package stephansun.github.samples.amqp.spring.publishsubscribe; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/publishsubscribe/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); String message = getMessage(new String[] { "test" }); rabbitTempalte.send(EXCHANGE_NAME, "", messageConverter.toMessage(message, null)); System.out.println("sent [" + message + "]"); } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
package stephansun.github.samples.amqp.spring.publishsubscribe; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Binding.DestinationType; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/publishsubscribe/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); RabbitAdmin rabbitAdmin = (RabbitAdmin) applicationContext.getBean(RabbitAdmin.class); FanoutExchange fanoutExchange = new FanoutExchange(EXCHANGE_NAME); rabbitAdmin.declareExchange(fanoutExchange); String queueName = rabbitAdmin.declareQueue().getName(); Binding binding = new Binding(queueName, DestinationType.QUEUE, EXCHANGE_NAME, "", null); rabbitAdmin.declareBinding(binding); System.out.println("CTRL+C"); // FIXME 为什么要在这里暂停10秒钟? try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } Message message = rabbitTempalte.receive(queueName); Object obj = messageConverter.fromMessage(message); System.out.println("received:[" + obj + "]"); } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> </beans>
routing
package stephansun.github.samples.amqp.spring.routing; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/routing/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); // diff String serverity = getServerity(new String[] { "test" }); String message = getMessage(new String[] { "test" }); rabbitTempalte.send(EXCHANGE_NAME, serverity, messageConverter.toMessage(message, null)); System.out.println("s[" + serverity + "]:[" + message + "]"); } private static String getServerity(String[] strings) { return "info"; } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
package stephansun.github.samples.amqp.spring.routing; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Binding.DestinationType; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/routing/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); RabbitAdmin rabbitAdmin = (RabbitAdmin) applicationContext.getBean(RabbitAdmin.class); DirectExchange directExchange = new DirectExchange(EXCHANGE_NAME); rabbitAdmin.declareExchange(directExchange); String queueName = rabbitAdmin.declareQueue().getName(); String[] strs = new String[] { "info", "waring", "error" }; for (String str : strs) { Binding binding = new Binding(queueName, DestinationType.QUEUE, EXCHANGE_NAME, str, null); rabbitAdmin.declareBinding(binding); } System.out.println("CTRL+C"); // FIXME 请你先思考一下,为什么要在这里暂停10秒钟?然后问我。 try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } Message message = rabbitTempalte.receive(queueName); Object obj = messageConverter.fromMessage(message); System.out.println("received:[" + obj + "]"); } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> </beans>
topics
package stephansun.github.samples.amqp.spring.topics; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/topics/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); // diff String serverity = getServerity(new String[] { "test" }); String message = getMessage(new String[] { "test" }); rabbitTempalte.send(EXCHANGE_NAME, serverity, messageConverter.toMessage(message, null)); System.out.println("s[" + serverity + "]:[" + message + "]"); } private static String getServerity(String[] strings) { return "kern.critical"; } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
package stephansun.github.samples.amqp.spring.topics; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Binding.DestinationType; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; private static MessageConverter messageConverter = new SimpleMessageConverter(); public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/topics/spring-rabbitmq.xml"); RabbitTemplate rabbitTempalte = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); RabbitAdmin rabbitAdmin = (RabbitAdmin) applicationContext.getBean(RabbitAdmin.class); TopicExchange directExchange = new TopicExchange(EXCHANGE_NAME); rabbitAdmin.declareExchange(directExchange); String queueName = rabbitAdmin.declareQueue().getName(); String[] strs1 = new String[] { "#" }; String[] strs2 = new String[] { "kern.*" }; String[] strs3 = new String[] { "*.critical" }; String[] strs4 = new String[] { "kern.*", "*.critical" }; String[] strs5 = new String[] { "kern.critical", "A critical kernel error" }; for (String str : strs5) { Binding binding = new Binding(queueName, DestinationType.QUEUE, EXCHANGE_NAME, str, null); rabbitAdmin.declareBinding(binding); } System.out.println("CTRL+C"); // FIXME 为什么要在这里暂停10秒钟? try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } Message message = rabbitTempalte.receive(queueName); Object obj = messageConverter.fromMessage(message); System.out.println("received:[" + obj + "]"); } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> </beans>
rpc
package stephansun.github.samples.amqp.spring.rpc; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.support.ClassPathXmlApplicationContext; public class RPCClient { private static String requestQueueName = "rpc_queue"; public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/rpc/spring-rabbitmq-client.xml"); RabbitTemplate rabbitTemplate = (RabbitTemplate) applicationContext.getBean("rabbitTemplate"); String message = "30"; Message reply = rabbitTemplate.sendAndReceive("", requestQueueName, new SimpleMessageConverter().toMessage(message, null)); if (reply == null) { System.out.println("接收超时,返回null"); } else { System.out.println("接收到消息:"); System.out.println(reply); } } }
package stephansun.github.samples.amqp.spring.rpc; import org.springframework.context.support.ClassPathXmlApplicationContext; public class RPCServer { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/amqp/spring/rpc/spring-rabbitmq-server.xml"); } }
package stephansun.github.samples.amqp.spring.rpc; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; public class RPCServerListener implements MessageListener { private RabbitTemplate rabbitTemplate; private static MessageConverter messageConverter = new SimpleMessageConverter(); public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @Override public void onMessage(Message requestMessage) { Object obj = messageConverter.fromMessage(requestMessage); String str = (String) obj; int n = Integer.parseInt(str); System.out.println(" [.] fib(" + requestMessage + ")"); String response = "" + fib(n); String replyTo = requestMessage.getMessageProperties().getReplyTo(); rabbitTemplate.send( "", replyTo, messageConverter.toMessage(response, null)); } private static int fib(int n) { if (n == 0) { return 0; } if (n == 1) { return 1; } return fib(n - 1) + fib(n - 2); } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" reply-timeout="1000"/> <rabbit:admin connection-factory="connectionFactory"/> </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="rpc_queue" durable="false" exclusive="false" auto-delete="false"> </rabbit:queue> <bean id="myListener" class="stephansun.github.samples.amqp.spring.rpc.RPCServerListener"> <property name="rabbitTemplate" ref="rabbitTemplate"/> </bean> <rabbit:listener-container connection-factory="connectionFactory" prefetch="1"> <rabbit:listener queue-names="rpc_queue" ref="myListener"/> </rabbit:listener-container> </beans>
spring-amqp-spring-remoting
随后会讲到Spring远程调用框架,在此先把代码列出来
Main.java
package stephansun.github.samples.amqp.spring.remoting; import java.util.HashMap; import java.util.Map; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Main { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(new String[] { "stephansun/github/samples/amqp/spring/remoting/amqp-remoting.xml", "stephansun/github/samples/amqp/spring/remoting/amqp-remoting-sender.xml", "stephansun/github/samples/amqp/spring/remoting/amqp-remoting-receiver.xml" }); MyService sender = (MyService) applicationContext.getBean("sender"); sender.sayHello(); Map<String, Object> param = new HashMap<String, Object>(); param.put("name", "stephan"); param.put("age", 26); String str = sender.foo(param); System.out.println("str:" + str); } }
MyService.java
package stephansun.github.samples.amqp.spring.remoting; import java.util.Map; public interface MyService { void sayHello(); String foo(Map<String, Object> param); }
MyServiceImpl.java
package stephansun.github.samples.amqp.spring.remoting; import java.util.Map; public class MyServiceImpl implements MyService { @Override public void sayHello() { System.out.println("hello world!"); } @Override public String foo(Map<String, Object> param) { return param.toString(); } }
amqp-remoting-receiver.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="myService" class="stephansun.github.samples.amqp.spring.remoting.MyServiceImpl"/> <bean id="receiver" class="org.springframework.amqp.remoting.AmqpInvokerServiceExporter"> <property name="serviceInterface" value="stephansun.github.samples.amqp.spring.remoting.MyService"/> <property name="service" ref="myService"/> </bean> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="receiver" queue-names="si.test.queue"/> </rabbit:listener-container> </beans>
amqp-remoting-sender.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="sender" class="org.springframework.amqp.remoting.AmqpInvokerProxyFactoryBean"> <property name="amqpTemplate" ref="amqpTemplate"/> <property name="serviceInterface" value="stephansun.github.samples.amqp.spring.remoting.MyService"/> <property name="exchange" value="si.test.exchange"/> <property name="routingKey" value="si.test.binding"/> </bean> </beans>
amqp-remoting.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Infrastructure --> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="si.test.queue"/> <rabbit:direct-exchange name="si.test.exchange"> <rabbit:bindings> <rabbit:binding queue="si.test.queue" key="si.test.binding"/> </rabbit:bindings> </rabbit:direct-exchange> </beans>
关键的几个类有:
org.springframework.amqp.remoting.AmqpInvokerClientIntecrptor org.springframework.amqp.remoting.AmqpInvokerProxyFactoryBean org.springframework.amqp.remoting.AmqpInvokerServiceExporter
其中AmqpInvokerProxyFactoryBean继承与AmqpInvokerClientInterceptor
AmqpInvovkerServiceExporter除了继承了Spring远程调用框架的RemoteInvocationBasedExporter,还额外实现了ChannelAwareMessageListener接口,这个接口的handle方法处理消息,且实现该接口的类都可以被SimpleMessageListenerContainer管理起来。
samples-spring-remoting
下面我们写一段简单的代码初步领略一下Spring远程调用框架
pom.xml
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>3.1.0.RELEASE</version> </dependency> </dependencies>
Main.java
package stephansun.github.samples.spring.remoting; import java.util.HashMap; import java.util.Map; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Main { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(new String[] { "stephansun/github/samples/spring/remoting/spring-remoting.xml" }); MyService myService = (MyService) applicationContext.getBean("sender"); Map<String, Object> param = new HashMap<String, Object>(); param.put("name", "stephan"); param.put("age", 26); String str = myService.foo(param); System.out.println("str:" + str); } }
MyInvokerClientInterceptor.java
package stephansun.github.samples.spring.remoting; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.springframework.beans.factory.InitializingBean; import org.springframework.remoting.support.DefaultRemoteInvocationFactory; import org.springframework.remoting.support.RemoteInvocation; import org.springframework.remoting.support.RemoteInvocationFactory; import org.springframework.remoting.support.RemoteInvocationResult; public class MyInvokerClientInterceptor implements MethodInterceptor, InitializingBean { private RemoteInvocationFactory remoteInvocationFactory = new DefaultRemoteInvocationFactory(); public void setRemoteInvocationFactory(RemoteInvocationFactory remoteInvocationFactory) { this.remoteInvocationFactory = (remoteInvocationFactory != null ? remoteInvocationFactory : new DefaultRemoteInvocationFactory()); } protected RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) { return this.remoteInvocationFactory.createRemoteInvocation(methodInvocation); } @Override public void afterPropertiesSet() throws Exception { System.out.println("afterPropertiesSet"); } @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable { RemoteInvocation invocation = createRemoteInvocation(methodInvocation); Object[] arguments = invocation.getArguments(); System.out.println("arguments:" + arguments); String methodName = invocation.getMethodName(); System.out.println("methodName:" + methodName); Class[] classes = invocation.getParameterTypes(); System.out.println("classes:" + classes); // do whatever you want to do RemoteInvocationResult result = new RemoteInvocationResult("hello, world!"); return result.getValue(); } }
MyInvokerProxyFactoryBean.java
package stephansun.github.samples.spring.remoting; import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.beans.factory.FactoryBean; import org.springframework.util.ClassUtils; public class MyInvokerProxyFactoryBean extends MyInvokerClientInterceptor implements FactoryBean<Object>, BeanClassLoaderAware { private Class serviceInterface; private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader(); private Object serviceProxy; // FIXME for Spring injection public void setServiceInterface(Class serviceInterface) { this.serviceInterface = serviceInterface; } public void afterPropertiesSet() throws Exception { super.afterPropertiesSet(); if (this.serviceInterface == null) { throw new IllegalArgumentException("Property 'serviceInterface' is required"); } this.serviceProxy = new ProxyFactory(this.serviceInterface, this).getProxy(this.beanClassLoader); } @Override public void setBeanClassLoader(ClassLoader classLoader) { this.beanClassLoader = classLoader; } @Override public Object getObject() throws Exception { return this.serviceProxy; } @Override public Class<?> getObjectType() { return this.serviceInterface; } @Override public boolean isSingleton() { return true; } }
MyService.java
package stephansun.github.samples.spring.remoting; import java.util.Map; public interface MyService { void sayHello(); String foo(Map<String, Object> param); }
spring-remoting.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="sender" class="stephansun.github.samples.spring.remoting.MyInvokerProxyFactoryBean"> <property name="serviceInterface" value="stephansun.github.samples.spring.remoting.MyService"/> </bean> </beans>
从输出的结果可以看出,Spring将接口的参数,调用方法,类名字封装到RemoteInvocation类中,这个类是序列的,意味着它可以自由地以字节形式在网络上传输,jms,http,amqp都支持字节形式地消息传输,所以我们能基于接口远程方法调用,无论你采用那种网络传输协议。
samples-jms-plain
pom.xml
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.3.0</version> </dependency> </dependencies>
point-to-point
Receiver.java
package stephansun.github.samples.jms.plain.pointtopoint; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class Receiver { public static void main(String[] args) { // 获得连接工厂 ConnectionFactory cf = new ActiveMQConnectionFactory( "tcp://localhost:61616"); // javax.jms.Connection Connection conn = null; // javax.jms.Session Session session = null; try { // 创建连接 conn = cf.createConnection(); // 创建会话 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 选择目标 Destination destination = new ActiveMQQueue("myQueue"); // MessageConsumer consumer = session.createConsumer(destination); conn.start(); // 接收消息 Message message = consumer.receive(); TextMessage textMessage = (TextMessage) message; System.out.println("得到一个消息:" + textMessage.getText()); } catch (JMSException e) { // 处理异常 e.printStackTrace(); } finally { try { // 清理资源 if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch (JMSException ex) { ex.printStackTrace(); } } } }
Sender.java
package stephansun.github.samples.jms.plain.pointtopoint; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class Sender { public static void main(String[] args) { // 获得连接工厂 ConnectionFactory cf = new ActiveMQConnectionFactory( "tcp://localhost:61616"); // javax.jms.Connection Connection conn = null; // javax.jms.Session Session session = null; try { // 创建连接 conn = cf.createConnection(); // 创建会话 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列 Destination destination = new ActiveMQQueue("myQueue"); // 设置消息 MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage(); message.setText("Hello World!"); producer.send(message); } catch (JMSException e) { // 处理异常 e.printStackTrace(); } finally { try { // 清理资源 if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch (JMSException ex) { ex.printStackTrace(); } } } }
publish-subscribe
Receiver1.java
package stephansun.github.samples.jms.plain.pubsub; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTopic; public class Receiver1 { public static void main(String[] args) { // 获得连接工厂 ConnectionFactory cf = new ActiveMQConnectionFactory( "tcp://localhost:61616"); // javax.jms.Connection Connection conn = null; // javax.jms.Session Session session = null; try { // 创建连接 conn = cf.createConnection(); // 创建会话 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 选择目标 Destination destination = new ActiveMQTopic("myTopic"); // MessageConsumer consumer = session.createConsumer(destination); conn.start(); // 接收消息 Message message = consumer.receive(); TextMessage textMessage = (TextMessage) message; System.out.println("接收者1 得到一个消息:" + textMessage.getText()); } catch (JMSException e) { // 处理异常 e.printStackTrace(); } finally { try { // 清理资源 if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch (JMSException ex) { ex.printStackTrace(); } } } }
Sender.java
package stephansun.github.samples.jms.plain.pubsub; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTopic; public class Sender { public static void main(String[] args) { // 获得连接工厂 ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); // javax.jms.Connection Connection conn = null; // javax.jms.Session Session session = null; try { // 创建连接 conn = cf.createConnection(); // 创建会话 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列 Destination destination = new ActiveMQTopic("myTopic"); // 设置消息 MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage(); message.setText("Hello World!"); producer.send(message); } catch (JMSException e) { // 处理异常 e.printStackTrace(); } finally { try { // 清理资源 if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch (JMSException ex) { ex.printStackTrace(); } } } }
samples-jms-spring
pom.xml
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.3.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>3.1.0.RELEASE</version> </dependency> </dependencies>
point-to-point
Receiver.java
package stephansun.github.samples.jms.spring.pointtopoint; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Queue; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; public class Receiver { public static void main(String[] args) throws JMSException { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/jms/spring/pointtopoint/jms-point-to-point.xml"); Queue myQueue = (Queue) applicationContext.getBean("myQueue"); JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate"); MapMessage message = (MapMessage) jmsTemplate.receive(myQueue); String name = message.getString("name"); int age = message.getInt("age"); System.out.println("name:" + name); System.out.println("age:" + age); } }
Sender.java
package stephansun.github.samples.jms.spring.pointtopoint; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.Queue; import javax.jms.Session; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class Sender { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/jms/spring/pointtopoint/jms-point-to-point.xml"); Queue myQueue = (Queue) applicationContext.getBean("myQueue"); JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate"); jmsTemplate.send(myQueue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setString("name", "stephan"); message.setInt("age", 26); return message; } }); } }
jms-point-to-point.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/> <bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="myQueue"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> </beans>
publish-subscribe
Receiver1.java
package stephansun.github.samples.jms.spring.pubsub; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Topic; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; public class Receiver1 { public static void main(String[] args) throws JMSException { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/jms/spring/pubsub/jms-pub-sub.xml"); Topic myTopic = (Topic) applicationContext.getBean("myTopic"); JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate"); MapMessage message = (MapMessage) jmsTemplate.receive(myTopic); String name = message.getString("name"); int age = message.getInt("age"); System.out.println("name:" + name); System.out.println("age:" + age); } }
Sender.java
package stephansun.github.samples.jms.spring.pubsub; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.Session; import javax.jms.Topic; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class Sender { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "stephansun/github/samples/jms/spring/pubsub/jms-pub-sub.xml"); Topic myTopic = (Topic) applicationContext.getBean("myTopic"); JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate"); jmsTemplate.send(myTopic, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setString("name", "stephan"); message.setInt("age", 26); return message; } }); } }
jms-pub-sub.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/> <bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="myTopic"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> </beans>
samples-jms-spring-remoting
pom.xml
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.3.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>3.1.0.RELEASE</version> </dependency> </dependencies>
Main.java
package stephansun.github.samples.jms.spring.remoting; import java.util.HashMap; import java.util.Map; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Main { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(new String[] { "stephansun/github/samples/jms/spring/remoting/jms-remoting.xml", "stephansun/github/samples/jms/spring/remoting/jms-remoting-sender.xml", "stephansun/github/samples/jms/spring/remoting/jms-remoting-receiver.xml" }); MyService sender = (MyService) applicationContext.getBean("sender"); sender.sayHello(); Map<String, Object> param = new HashMap<String, Object>(); param.put("name", "stephan"); param.put("age", 26); String str = sender.foo(param); System.out.println("str:" + str); } }
MyService.java
package stephansun.github.samples.jms.spring.remoting; import java.util.Map; public interface MyService { void sayHello(); String foo(Map<String, Object> param); }
MyServiceImpl.java
package stephansun.github.samples.jms.spring.remoting; import java.util.Map; public class MyServiceImpl implements MyService { @Override public void sayHello() { System.out.println("hello world!"); } @Override public String foo(Map<String, Object> param) { return param.toString(); } }
jms-remoting-receiver.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="myService" class="stephansun.github.samples.jms.spring.remoting.MyServiceImpl"/> <bean id="receiver" class="org.springframework.jms.remoting.JmsInvokerServiceExporter"> <property name="serviceInterface" value="stephansun.github.samples.jms.spring.remoting.MyService"/> <property name="service" ref="myService"/> </bean> <bean id="container" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="messageListener" ref="receiver"/> <property name="destination" ref="myQueue"/> </bean> </beans>
jms-remoting-sender.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="sender" class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean"> <property name="connectionFactory" ref="connectionFactory"/> <property name="queue" ref="myQueue"/> <property name="serviceInterface" value="stephansun.github.samples.jms.spring.remoting.MyService"/> <property name="receiveTimeout" value="5000"/> </bean> </beans>
jms-remoting.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/> <bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="myQueue"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> </beans>
JMS跟AMQP有很大的区别,JMS有两种类型的队列,一个是点对点的,一种是主题订阅的,发送者直接将消息发送至队列,接受者从队列收消息,对于发布订阅模式,每个消费者都从队列中得到了相同的消息拷贝。
相关推荐
总结来说,这个压缩包文件提供了全面的学习资源,涵盖了理论讲解(PPTX)、实践操作指导(Readme)以及可运行的示例代码(Eclipse工程),对于想要了解和掌握JMS与ActiveMQ的开发者来说,是一个非常宝贵的资源。...
- **创建连接工厂和目的地**:讲解如何在代码中创建JMS连接工厂和队列或主题。 - **发送与接收消息**:示例代码展示如何使用JMS API发送和接收消息,包括同步和异步方式。 - **消息选择器**:介绍如何使用消息...
ActiveMQ是Apache软件基金会的一个项目,它提供了多种协议的支持,包括JMS、AMQP、STOMP、OpenWire等,能够用于企业级的应用集成,实现可靠的消息传递。 在描述中提到了一个博客链接,虽然具体内容没有提供,但通常...
3. **客户端API使用**:讲解如何在Java、C#或其他支持的语言中使用ActiveMQ客户端库来发送和接收消息,包括JMS接口的使用,如MessageProducer、MessageConsumer和Session。 4. **消息持久化**:介绍ActiveMQ如何将...
ActiveMQ是一款开源的消息中间件,它遵循Java Message Service (JMS) 规范,支持多种协议,如OpenWire、AMQP、STOMP等,能够帮助开发者实现高效、可靠的消息传递。本实例将详细讲解如何利用ActiveMQ为Android应用推...
本文将全面讲解ActiveMQ的使用流程,并结合Spring框架进行实例解析,帮助开发者深入理解如何在实际项目中部署和使用ActiveMQ。 1. **ActiveMQ的基本概念** - **消息**: 在ActiveMQ中,消息是数据传输的基本单元,...
- **11.1.17 直接实例化JMS对象**:提供了直接实例化JMS对象的方法。 - **11.1.18 拦截器(Interceptor)**:讲解了如何使用拦截器自定义消息处理流程。 - **11.1.19 JAAS**:解释了JAAS(Java Authentication ...
- **JMS与AMQP**:介绍Java消息服务(JMS)和高级消息队列协议(AMQP),以及它们在企业消息传递中的角色。 - **集成示例**:给出具体的集成示例,展示如何利用Spring Integration与JMS或AMQP服务进行集成。 #### ...
- 支持多种协议:除了JMS外,还支持AMQP、STOMP等,允许不同语言和平台的应用程序集成。 - 事务支持:JMS事务确保消息要么全部成功发送,要么全部失败,保证数据一致性。 - 主题与队列:提供发布/订阅模式(主题...
在讲解ActiveMQ的基本概念和示例代码之前,我们先来了解下**JMS(Java Message Service)**。JMS是一种为应用程序提供创建、发送、接收和读取消息的标准API。它允许应用程序进行异步通信,提高了系统的可伸缩性和...
标题 "activemq入门总结" 暗示了本文将主要围绕Apache ActiveMQ,一个流行的开源消息代理和队列中间件进行讲解。ActiveMQ是基于Java Message Service (JMS) API 的,它允许应用程序在分布式环境中发送和接收消息,...
Spring 3.x还引入了强大的消息支持,包括JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol)抽象,这使得异步通信和解耦成为可能,提高了系统的可扩展性。 在源码中,读者可能会找到关于以上...
这个课件可能包含以上这些主题的详细讲解,帮助学习者深入理解Spring框架的各个方面,通过实例和最佳实践来提升技能。无论你是初学者还是经验丰富的开发者,这份资料都将是你学习和提升Spring技术的重要资源。
ActiveMQ是Apache软件基金会下的一个开源项目,它是Java消息服务(JMS)的一个实现,支持多种协议,如OpenWire、STOMP、AMQP、XMPP等。ActiveMQ具有高可用性、可伸缩性以及性能优秀等特点,能够处理大量并发的消息...
此外,还涵盖了JMS(Java Message Service)接口,它是Java应用程序使用消息中间件的标准方式,让Java开发者能够轻松地在ActiveMQ上构建应用。 本书还深入探讨了高级特性,例如事务处理、消息确认、消息分组、死信...
5. **消息支持**:Spring 3.x强化了对消息传递的支持,如JMS、AMQP,便于构建分布式系统和实时应用。 6. **AOP增强**:AOP的增强使得切面定义更加灵活,可以创建更复杂的切面模式,同时支持更多类型的切入点表达式...
ActiveMQ是一款功能强大的消息代理,它支持多种消息协议,如OpenWire、AMQP、STOMP、XMPP等,提供JMS(Java Message Service)接口,使得开发者可以方便地在Java应用中使用消息队列。ActiveMQ的高可用性和负载均衡...