`
QING____
  • 浏览: 2253408 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

JMS中点对点消息

    博客分类:
  • JMS
 
阅读更多

    点对点消息传送模型允许JMS客户端通过队列(Queue)这个虚拟通道来同步和异步发送、接收消息。消息的生产者成为QueueSender,消费者为QueueReceiver。点对点模型是一个基于拉取(pull,即receive方式)或者基于轮询(polling,即异步接收)的消息传输模型,这种模型从队列中请求消息,而不是JMS Provider将消息自动的推送给客户端。发送到队列的任何一条消息,将会被一个而且仅仅一个消费者接收,即使有多个消费者侦听同一个队列。

    点对点消息传送模型,即支持异步“即发即弃”,又支持同步的“请求--应答”的消息发送方式。(稍后代码实例)

    点对点模型支持负载均衡,即多个消息消费者侦听同一个队列时,JMS Provider负责将这些消息以"某种策略"均衡的分发给它们.事实上点对点模型的负载均衡很容易实现,因为消息消费者只有处于"空闲时"才会向JMS Provider"索取"消息.对于Queue,还提供了使用Browser的方式查看队列消息而不消费它们.

    点对点消息模型,对于JMS Provider而言,就是一个队列,JMS Provider将会对消息按照其发送到队列的顺序排序.然后依次发送给消费者(权重优先),一旦消息被消费者接收(确认),消息将会从队列中移除.

    不过需要特殊强调一点:JMS Prodiver并不是严格的保证队列中消息移除的顺序,特别是在多个消费者Client时;当多个消费者同时侦听消息队列,JMS Prodiver将会把队列中的消息,依次转发给各个Client,此时如果消息被接收且确认,消息将会从队列中移除,此消息有可能并不是队列的第一个元素;当一个消息未能确认时,JMS Provider会将此消息不断的重发,直到重发次数达到阀值;一条消息不能正确接收,这种情况不会影响到队列中其他消息的正常消费;而且由于网络的问题,优先发送的消息,可能在消费者接收的时间上滞后.由此可见,依靠Queue来完全实现消息的队列化消费是错误的.(理想的情况是,一个消息接收确认之后,队列中此后的消息才会被发送给消费者).

    你可以简单的认为P2P消息模型的存储机制为:

+++++++++++++++
++MessageId   |    Created               |    Priority++
++100                 100000000000               3
++101                 100000002000               3
++102                 100000004000               2

    对于消息的发送顺序为"order by priority des,created asc",一旦发送成功,将会delete.

 

    点对点消息传送模型有两种:

    1) 即发即弃: 一种异步的消息发送和确认机制,Producer发送消息之后,它无需等待此消息被消费的"响应".

    2) 请求-应答: 一种同步机制,Producer发送消息之后,它阻塞,直到此消息被消费;当消息被消费时,将会向一个"响应队列"中发送一个"通知",那么对于Producer而言,就是接收到这个"通知后"才会返回.

###contextFactory
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
###brokerUrl,any protocol
java.naming.provider.url = tcp://localhost:61616
##username
##java.naming.security.principal=
##password
##java.naming.security.credentials=
##connectionFactory,for building sessions
connectionFactoryNames = QueueCF,TopicCF
##topic.<topicName> = <physicalName-of-topic>
##your application should use <topicName>,such as:
## context.lookup("topic1");
##It can be more than once
topic.topic1 = jms.topic1
##queue.<topicName> = <physicalName-of-queue>
queue.queue1 = jms.queue1

 

///请求应答模式:生产者
package com.test.jms.simple;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

public class SimpleProductor {

	private MessageProducer producer;
	private Session session;
	private Connection connection;
	private boolean isOpen = true;
	private Destination replyTo;
	
	public SimpleProductor() throws Exception{
		Context context = new InitialContext();
		ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
		connection = connectionFactory.createConnection();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination queue = (Queue)context.lookup("queue1");
		replyTo = (Queue)context.lookup("test-repyto");
		producer = session.createProducer(queue);
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		connection.start();
		
	}
	
	
	public boolean send(String text) {
		if(!isOpen){
			throw new RuntimeException("session has been closed!");
		}
		try{
			Message message = session.createTextMessage(text);
			String cid = "ID:" + System.currentTimeMillis();
			message.setJMSCorrelationID(cid);
			producer.send(message);
			MessageConsumer consumer = session.createConsumer(replyTo,"JMSCorrelationID='" + cid + "'");
			//最多重发5次
			for(int i=0;i< 5;i++){
				Message replyMessage = consumer.receive(30000);
				if(replyMessage != null){
					System.out.println("Reply success:" + replyMessage.getJMSCorrelationID());
					break;
				}
			}
			consumer.close();
			return true;
		}catch(Exception e){
			return false;
		}
	}
	
	public synchronized void close(){
		try{
			if(isOpen){
				isOpen = false;
			}
			session.close();
			connection.close();
		}catch (Exception e) {
			//
		}
	}
	
}

 

//请求应答模式:消费者
package com.test.jms.simple;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

import com.test.jms.object.QueueMessageListener;

public class SimpleConsumer {

	private Connection connection;
	private Session session;
	private MessageConsumer consumer;
	
	private boolean isStarted;
	
	public SimpleConsumer() throws Exception{
		Context context = new InitialContext();
		ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
		connection = connectionFactory.createConnection();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination queue = (Queue)context.lookup("queue1");
		consumer = session.createConsumer(queue);
		consumer.setMessageListener(new QueueMessageListener(session));
		connection.start();
	}
	
	
	public synchronized boolean start(){
		if(isStarted){
			return true;
		}
		try{
			connection.start();
			isStarted = true;
			return true;
		}catch(Exception e){
			return false;
		}
	}
	
	public synchronized void close(){
		isStarted = false;
		try{
			session.close();
			connection.close();
		}catch(Exception e){
			//
		}
	}
}

 

///请求应答模式:消息侦听器
package com.test.jms.object;

import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class QueueMessageListener implements MessageListener{

	private Session session;
	public QueueMessageListener(Session session){
		this.session = session;
	}
	public void onMessage(Message message) {
		if(message == null){
			return;
		}
		try{
			Destination replyTo = message.getJMSReplyTo();
			if(message instanceof TextMessage){
				String text = ((TextMessage)message).getText();
				System.out.println(text);
			}
			if(replyTo != null){
				message.clearBody();
				MessageProducer producer = session.createProducer(replyTo);
				producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
				producer.setTimeToLive(50000);
				producer.send(message);
				producer.close();
			}
			
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

 

 

 

分享到:
评论

相关推荐

    weblogic中使用JMS发送和接受消息

    通过理解并熟练掌握上述内容,你将在WebLogic环境中成功地利用JMS进行消息传递,无论是简单的点对点通信还是复杂的发布/订阅模式,都能游刃有余。请务必根据具体需求进行配置,并确保测试环节充分,以确保JMS服务的...

    JMS 教程 - 消息队列、消息服务

    JMS支持两种主要的消息传递模型:发布/订阅(Publish and Subscribe)和点对点(Point-to-Point)。这两种模型分别对应于不同的应用场景。 **发布/订阅(Publish and Subscribe)** 模型适用于一对多的通信场景。在...

    JMS之ActiveMQ 点对点+发布/订阅

    在JMS中,消息传递有两种基本模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。 **点对点(Point-to-Point,P2P)模型** 点对点模型是基于队列(Queue)的通信方式。在这个模型中,...

    JMS简明教程创建消息 -> 发送消息 -> 接收消息 -> 读取消息

    什么是消息 消息是一个用于在组件和应用程序之间通讯的的方法。...由于消息是点对点的,所以 JMS 的所有用户都称为客户端(clients)。JMS 应用由定义 消息的应用和一系列与他们交互的客户端组成。

    JMS消息队列机制及案例

    4. **队列(Queue)**:队列是一种点对点的通信模型。每个消息仅被一个消费者接收,确保消息的顺序传递和至少一次传递(可能因重复消费而超过一次)。 5. **主题(Topic)**:主题支持发布/订阅模式,一个消息可以...

    JMS消息发送及订阅

    4. **消息队列(Queue)**: 一种点对点模型,一个消息由一个消费者接收,一旦被消费,消息就从队列中移除。 5. **主题(Topic)**: 发布/订阅模型,多个订阅者可以接收同一个消息。 **JMS消息类型** 1. **点对点...

    JMS消息模型 JMS学习.doc

    3. **JMS Domains**:JMS有两种主要的消息模型,即点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)模式。 - **Point-to-Point (PTP)模式**:在这种模型中,消息从一个生产者发送到一个...

    Spring发送接收JMS消息

    JMS提供两种类型的消息模型:点对点(Point-to-Point, P2P)和发布/订阅(Publish/Subscribe)。在点对点模型中,消息由一个生产者发送到一个队列,然后由一个或多个消费者接收。发布/订阅模型中,消息发布到一个...

    jms远程IBM MQ 收发消息

    JMS提供两种主要的消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe)。在点对点模型中,消息从一个生产者发送到一个队列,然后由一个或多个消费者接收。发布/订阅模型下,消息发布到一个...

    jms对获取消息

    JMS定义了两种主要的消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。 1. **点对点模型**:在这种模型中,消息从一个生产者发送到一个队列,然后由一个或多个消费者从队列中...

    Spring+weblogic接收JMS消息

    标题"Spring+weblogic接收JMS消息"涉及到的核心知识点是Spring框架与WebLogic Server之间的整合,以及如何利用Spring来处理JMS消息。下面将详细讲解这两个方面的内容。 1. **Spring对JMS的支持**: - Spring通过`...

    消息中间件和JMS消息服务.pdf

    JMS支持两种基本的消息模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。这两种模型分别对应于不同的应用场景和需求。 1. **点对点模型**:在此模型中,消息被发送到特定的队列,每个...

    JMS消息队列

    JMS-java message Service ,消息队列原理介绍,适合activeMQ开发使用

    OSB中JMS配置及队列使用说明

    JMS 提供了两种类型的消息模型:点对点模型和发布/订阅模型。点对点模型中,生产者将消息发送到队列中,消费者从队列中接收消息。发布/订阅模型中,生产者将消息发布到主题中,订阅者从主题中接收消息。 JMS 配置 ...

    Jms基础知识整理创建消息 -> 发送消息 -> 接收消息 -> 读取消息 ()

    消息传递是点对点(Point-to-Point, PTP)或发布/订阅(Publish/Subscribe, Pub/Sub)两种模型之一。在PTP模型中,消息从一个生产者(发送者)发送到一个队列,然后由一个或多个消费者(接收者)从队列中读取并处理...

    JMS的中文教程(Java的消息驱动)

    ### JMS的中文教程(Java的消息驱动)知识点详解 #### 一、JMS简介与重要性 **JMS**(Java Message Service)是一种消息传递中间件的API标准,它定义了一套标准接口,允许应用程序创建、发送、接收和读取消息。JMS的...

    利用soapUI3.5测试JMS消息

    JMS支持两种主要的消息模型:点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)。理解这两种模型是进行JMS测试的基础。 点对点模型基于队列,每个消息只有一个消费者,消息被发送到队列后,消费者从队列中...

    Spring+weblogic9.2发送JMS消息

    同时,理解JMS的核心概念,如消息模型(点对点、发布/订阅)、消息类型(文本、对象、文件等)以及事务管理也是非常重要的。 通过这种方式,Spring和WebLogic 9.2的集成使得应用能够利用JMS进行高效、可靠的通信,...

    点对点消息模型示例

    点对点消息模型是Java消息服务(Java Message Service,简称JMS)中的一种核心消息传递模式,它在分布式系统中广泛用于实现异步通信。在这个模型中,消息生产者发送消息到一个特定的队列(Queue),而消息消费者从这...

    jms消息通讯

    JMS提供两种类型的消息模型:点对点(Point-to-Point,PTP)和发布/订阅(Publish/Subscribe,Pub/Sub)。在点对点模型中,消息由一个生产者发送到一个队列,然后由一个消费者接收。队列中的消息被消费后即被删除,...

Global site tag (gtag.js) - Google Analytics