1、消息发送
- //连接工厂
- ConnectionFactory connFactory = new ActiveMQConnectionFactory(
- ActiveMQConnection.DEFAULT_USER,
- ActiveMQConnection.DEFAULT_PASSWORD,
- "tcp://localhost:61616");
- //连接到JMS提供者
- Connection conn = connFactory.createConnection();
- conn.start();
- //事务性会话,自动确认消息
- Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
- //消息的目的地
- Destination destination = session.createQueue("queue.hello");
- //消息生产者
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化
- //文本消息
- TextMessage textMessage = session.createTextMessage("文本消息");
- producer.send(textMessage);
- //键值对消息
- MapMessage mapMessage = session.createMapMessage();
- mapMessage.setLong("age", new Long(32));
- mapMessage.setDouble("sarray", new Double(5867.15));
- mapMessage.setString("username", "键值对消息");
- producer.send(mapMessage);
- //流消息
- StreamMessage streamMessage = session.createStreamMessage();
- streamMessage.writeString("streamMessage流消息");
- streamMessage.writeLong(55);
- producer.send(streamMessage);
- //字节消息
- String s = "BytesMessage字节消息";
- BytesMessage bytesMessage = session.createBytesMessage();
- bytesMessage.writeBytes(s.getBytes());
- producer.send(bytesMessage);
- //对象消息
- User user = new User("cjm", "对象消息"); //User对象必须实现Serializable接口
- ObjectMessage objectMessage = session.createObjectMessage();
- objectMessage.setObject(user);
- producer.send(objectMessage);
- session.commit(); //在事务性会话中,只有commit之后,消息才会真正到达目的地
- producer.close();
- session.close();
- conn.close();
- //连接工厂
- ConnectionFactory connFactory = new ActiveMQConnectionFactory(
- ActiveMQConnection.DEFAULT_USER,
- ActiveMQConnection.DEFAULT_PASSWORD,
- "tcp://localhost:61616");
- //连接到JMS提供者
- Connection conn = connFactory.createConnection();
- conn.start();
- //事务性会话,自动确认消息
- Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
- //消息的目的地
- Destination destination = session.createQueue("queue.hello");
- //消息生产者
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化
- //文本消息
- TextMessage textMessage = session.createTextMessage("文本消息");
- producer.send(textMessage);
- //键值对消息
- MapMessage mapMessage = session.createMapMessage();
- mapMessage.setLong("age", new Long(32));
- mapMessage.setDouble("sarray", new Double(5867.15));
- mapMessage.setString("username", "键值对消息");
- producer.send(mapMessage);
- //流消息
- StreamMessage streamMessage = session.createStreamMessage();
- streamMessage.writeString("streamMessage流消息");
- streamMessage.writeLong(55);
- producer.send(streamMessage);
- //字节消息
- String s = "BytesMessage字节消息";
- BytesMessage bytesMessage = session.createBytesMessage();
- bytesMessage.writeBytes(s.getBytes());
- producer.send(bytesMessage);
- //对象消息
- User user = new User("cjm", "对象消息"); //User对象必须实现Serializable接口
- ObjectMessage objectMessage = session.createObjectMessage();
- objectMessage.setObject(user);
- producer.send(objectMessage);
- session.commit(); //在事务性会话中,只有commit之后,消息才会真正到达目的地
- producer.close();
- session.close();
- conn.close();
2、消息接收:通过消息监听器的方式接收消息
- public class Receiver implements MessageListener{
- private boolean stop = false;
- public void execute() throws Exception {
- //连接工厂
- ConnectionFactory connFactory = new ActiveMQConnectionFactory(
- ActiveMQConnection.DEFAULT_USER,
- ActiveMQConnection.DEFAULT_PASSWORD,
- "tcp://localhost:61616");
- //连接到JMS提供者
- Connection conn = connFactory.createConnection();
- conn.start();
- //事务性会话,自动确认消息
- Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
- //消息的来源地
- Destination destination = session.createQueue("queue.hello");
- //消息消费者
- MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(this);
- //等待接收消息
- while(!stop){
- Thread.sleep(5000);
- }
- session.commit();
- consumer.close();
- session.close();
- conn.close();
- }
- public void onMessage(Message m) {
- try{
- if(m instanceof TextMessage){ //接收文本消息
- TextMessage message = (TextMessage)m;
- System.out.println(message.getText());
- }else if(m instanceof MapMessage){ //接收键值对消息
- MapMessage message = (MapMessage)m;
- System.out.println(message.getLong("age"));
- System.out.println(message.getDouble("sarray"));
- System.out.println(message.getString("username"));
- }else if(m instanceof StreamMessage){ //接收流消息
- StreamMessage message = (StreamMessage)m;
- System.out.println(message.readString());
- System.out.println(message.readLong());
- }else if(m instanceof BytesMessage){ //接收字节消息
- byte[] b = new byte[1024];
- int len = -1;
- BytesMessage message = (BytesMessage)m;
- while((len=message.readBytes(b))!=-1){
- System.out.println(new String(b, 0, len));
- }
- }else if(m instanceof ObjectMessage){ //接收对象消息
- ObjectMessage message = (ObjectMessage)m;
- User user = (User)message.getObject();
- System.out.println(user.getUsername() + " _ " + user.getPassword());
- }else{
- System.out.println(m);
- }
- stop = true;
- }catch(JMSException e){
- stop = true;
- e.printStackTrace();
- }
- }
- }
- public class Receiver implements MessageListener{
- private boolean stop = false;
- public void execute() throws Exception {
- //连接工厂
- ConnectionFactory connFactory = new ActiveMQConnectionFactory(
- ActiveMQConnection.DEFAULT_USER,
- ActiveMQConnection.DEFAULT_PASSWORD,
- "tcp://localhost:61616");
- //连接到JMS提供者
- Connection conn = connFactory.createConnection();
- conn.start();
- //事务性会话,自动确认消息
- Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
- //消息的来源地
- Destination destination = session.createQueue("queue.hello");
- //消息消费者
- MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(this);
- //等待接收消息
- while(!stop){
- Thread.sleep(5000);
- }
- session.commit();
- consumer.close();
- session.close();
- conn.close();
- }
- public void onMessage(Message m) {
- try{
- if(m instanceof TextMessage){ //接收文本消息
- TextMessage message = (TextMessage)m;
- System.out.println(message.getText());
- }else if(m instanceof MapMessage){ //接收键值对消息
- MapMessage message = (MapMessage)m;
- System.out.println(message.getLong("age"));
- System.out.println(message.getDouble("sarray"));
- System.out.println(message.getString("username"));
- }else if(m instanceof StreamMessage){ //接收流消息
- StreamMessage message = (StreamMessage)m;
- System.out.println(message.readString());
- System.out.println(message.readLong());
- }else if(m instanceof BytesMessage){ //接收字节消息
- byte[] b = new byte[1024];
- int len = -1;
- BytesMessage message = (BytesMessage)m;
- while((len=message.readBytes(b))!=-1){
- System.out.println(new String(b, 0, len));
- }
- }else if(m instanceof ObjectMessage){ //接收对象消息
- ObjectMessage message = (ObjectMessage)m;
- User user = (User)message.getObject();
- System.out.println(user.getUsername() + " _ " + user.getPassword());
- }else{
- System.out.println(m);
- }
- stop = true;
- }catch(JMSException e){
- stop = true;
- e.printStackTrace();
- }
- }
- }
相关推荐
在Java世界中,Java Message Service (JMS) 是一个标准接口,用于在分布式环境中发送和接收消息。Spring框架提供了一种简单而强大的方式来集成JMS,使得开发者可以轻松地在应用中实现异步通信和解耦。本篇文章将深入...
在这个"spring-jms使用queue发送消息简单例子"中,我们将深入探讨如何使用Spring JMS与ActiveMQ结合,通过队列(Queue)来发送和接收消息。 首先,`pom.xml`文件是Maven项目的配置文件,它包含了项目所依赖的库。...
在我们的例子中,我们将使用Spring与ActiveMQ的集成,以便在Web应用中发送和接收消息。 集成步骤如下: 1. **配置ActiveMQ服务器**:首先,你需要在本地或远程服务器上安装并运行ActiveMQ。下载并解压ActiveMQ,...
JMSTemplate提供了一种简单的方式来发送JMS消息,而MQTT客户端API则用于发送MQTT消息。 4. 定义消息消费者:创建一个监听特定主题的MessageListener。当有消息到达时,这个监听器会被触发并处理消息。 5. 编写消息...
Java消息服务(Java Message Service,...通过创建消息生产者和消费者,设置消息队列或主题,以及发送和接收不同类型的JMS消息,开发者能够掌握JMS的核心功能,并将其应用于实际项目中,提升系统的可扩展性和容错性。
JMS是一种标准的API,允许Java应用程序创建、发送、接收和读取消息。它提供了异步通信的能力,使得应用程序可以解耦消息生产者和消费者,提高系统的可扩展性和可靠性。JMS支持两种消息模型:点对点(Point-to-Point,...
首先,Spring框架通过其`JmsTemplate`类简化了JMS的使用,提供了一种模板方法来发送和接收消息。开发者可以配置JMS连接工厂,指定目的地(队列或主题),并使用`convertAndSend`或`receive`等方法进行操作。 其次,...
这些代码会演示如何正确地初始化JMS环境,创建连接和会话,定义消息属性,发送和接收消息。通过分析和运行这些代码,我们可以更好地理解JMS的工作流程,包括同步和异步消息处理、事务支持、持久化消息以及消息选择器...
JMS允许应用程序创建、发送、接收和读取消息,以此来解耦生产者和消费者,使得两者不必同时在线也能进行通信。在基于WebLogic的环境中,JMS被广泛应用于构建可扩展、高可用性的应用程序。 WebLogic Server是Oracle...
ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递标准(JMS,Java Message Service),用于在分布式系统中实现可靠的消息传递。在本示例中,我们将探讨如何利用Ajax技术与ActiveMQ进行交互...
JMS的基本操作是发送和接收消息。在这些实例中,我们将看到如何使用生产者(Producer)创建消息并发送到队列(Queue)或主题(Topic)。消费者(Consumer)则从这些队列或主题中接收消息。理解如何创建消息对象,...
7. **事务(Transactions)**:JMS允许在发送和接收消息时使用JTA(Java Transaction API),确保消息操作的原子性。 在J2EE环境中,JMS通常与EJB(Enterprise JavaBeans)结合使用,尤其是Message-Driven Bean...
这个"JMS开发例子.pdf"文件可能包含了如何配置JMS,创建生产者和消费者,发送和接收不同类型的JMS消息,以及如何处理异常和事务等内容。通过阅读和实践这个例子,开发者能够更好地理解JMS的工作原理,并在实际项目中...
Java消息服务(JMS)是一种标准的Java API,用于在应用程序之间发送消息。它提供了一种异步通信机制,允许程序通过消息中间件发送和接收消息。JMS支持两种消息模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/...
Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用集成的标准化API,它允许应用程序创建、发送、接收和读取消息。JMS提供了一种在分布式环境中可靠地传递信息的方式,使得不同的应用程序...
Spring框架提供了强大的支持来简化JMS的使用,如`JmsTemplate`类用于发送消息,以及`MessageListener`接口用于接收消息。在`spring-jms.xml`中,你可以配置`JmsTemplate`的bean,设置连接工厂、目的地等属性。同时,...
在IT行业中,Java消息服务(Java Message Service,简称JMS)是一种标准,它定义了应用程序如何创建、发送、接收和读取消息的标准API。ActiveMQ是Apache软件基金会开发的一个开源JMS提供者,它允许开发者在分布式...
1. **JMS基础**:JMS允许应用程序创建、发送、接收和阅读消息。它定义了两种消息模型:点对点(Queue)和发布/订阅(Topic)。在点对点模型中,每个消息只被一个消费者接收;而在发布/订阅模型中,消息可以被多个...