`
liyixing1
  • 浏览: 959020 次
  • 性别: Icon_minigender_1
  • 来自: 江西上饶
社区版块
存档分类
最新评论

jms-点对点

    博客分类:
  • jms
阅读更多
在点对点模式中,消息创建者称为发送者,消息消费者称为接收者。
特点
1.通过一个queque(队列)的通道传递。
2.队列可以被多个消费者申请监听,但是只有一个获取消息,获取后,消息会从队列去除。
3.消息是有顺序的(先进先出),但是设置优先级的除外。
4.消费者和发送者的无偶性,两者之间的先后运行顺序没有关系。

在监听模式(实现onMessage)是异步的,而调用receive方法则是同步,会占用本线程的资源。

General API Point-to-point API
ConnectionFactory QueueConnectionFactory
Destination Queue
Connection QueueConnection
Session QueueSession
MessageConsumer QueueSender
MessageProducer QueueReceiver


以下例子(来源Java Message Service_2nd),是一个关于借贷的例子
package lyx.money;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class QBorrower {
	private QueueConnection qConnect = null;
	private QueueSession qSession = null;
	private Queue responseQ = null;
	private Queue requestQ = null;

//这里也是初始化
	public QBorrower(String queuecf, String requestQueue, String responseQueue) {
		try {
			// Connect to the provider and get the JMS connection
			Context ctx = new InitialContext();
			QueueConnectionFactory qFactory = (QueueConnectionFactory) ctx
					.lookup(queuecf);
			qConnect = qFactory.createQueueConnection();
			// Create the JMS Session
			qSession = qConnect.createQueueSession(false,
					Session.AUTO_ACKNOWLEDGE);
			// Lookup the request and response queues
			requestQ = (Queue) ctx.lookup(requestQueue);
			responseQ = (Queue) ctx.lookup(responseQueue);
			// Now that setup is complete, start the Connection
//如果只是发送消息,而不需要接收消息,这部其实可以省略的,但是这种情况很少。
			qConnect.start();
		} catch (JMSException jmse) {
			jmse.printStackTrace();
			System.exit(1);
		} catch (NamingException jne) {
			jne.printStackTrace();
			System.exit(1);
		}
	}

	private void sendLoanRequest(double salary, double loanAmt) {
		try {
			// Create JMS message
			MapMessage msg = qSession.createMapMessage();
			msg.setDouble("Salary", salary);
			msg.setDouble("LoanAmount", loanAmt);
//这里有设置,当接收者要回复的时候,需要给哪个目标回复。这里使用的是另外一个通道,是因为接收者在接收的时候,没有进行过滤,会接收任何消息,使用另外一个通道是防止回复信息被其他接收者给接收掉。
			msg.setJMSReplyTo(responseQ);
			// Create the sender and send the message
//关于这里为何发送者和接收者都使用同一个session,而不是不同的session,其实可以这个例子的想象业务是发送一个贷款请求,系统需要等待有人处理申请,并等待处理结果,因此可以认为是同步的业务,这种情况使用同步会比异步更加合理。
			QueueSender qSender = qSession.createSender(requestQ);
			qSender.send(msg);
			// Wait to see if the loan request was accepted or declined
//这里有一个过滤器,这种过滤器是为了在获取贷款处理结果的时候,只获取自己的结果而不是别人的。
			String filter = "JMSCorrelationID = '" + msg.getJMSMessageID()
					+ "'";
			QueueReceiver qReceiver = qSession
					.createReceiver(responseQ, filter);
			TextMessage tmsg = (TextMessage) qReceiver.receive(3000000);
			if (tmsg == null) {
				System.out.println("QLender not responding");
			} else {
				System.out.println("Loan request was " + tmsg.getText());
			}
		} catch (JMSException jmse) {
			jmse.printStackTrace();
			System.exit(1);
		}
	}

	private void exit() {
		try {
			qConnect.close();
		} catch (JMSException jmse) {
			jmse.printStackTrace();
		}
		System.exit(0);
	}

//初始化的问题,不许多加说明
	public static void main(String argv[]) {
		String queuecf = null;
		String requestq = null;
		String responseq = null;
		if (argv.length == 3) {
			queuecf = argv[0];
			requestq = argv[1];
			responseq = argv[2];
		} else {
			System.out.println("Invalid arguments. Should be: ");
			System.out
					.println("java QBorrower factory requestQueue responseQueue");
			System.exit(0);
		}
		QBorrower borrower = new QBorrower(queuecf, requestq, responseq);
		try {
			// Read all standard input and send it as a message
			BufferedReader stdin = new BufferedReader(new InputStreamReader(
					System.in));
			System.out.println("QBorrower Application Started");
			System.out.println("Press enter to quit application");
			System.out.println("Enter: Salary, Loan_Amount");
			System.out.println("\ne.g. 50000, 120000");
			while (true) {
				System.out.print("> ");
				String loanRequest = stdin.readLine();
				if (loanRequest == null || loanRequest.trim().length() <= 0) {
					borrower.exit();
				}
				// Parse the deal description
				StringTokenizer st = new StringTokenizer(loanRequest, ",");
				double salary = Double.valueOf(st.nextToken().trim())
						.doubleValue();
				double loanAmt = Double.valueOf(st.nextToken().trim())
						.doubleValue();
				borrower.sendLoanRequest(salary, loanAmt);
			}
		} catch (IOException ioe) {
			ioe.printStackTrace();
		}
	}
}



jndi.properties
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames=lyxcf
topic.topic1=jms.topic1
queue.qs=jms.qs
queue.qr=jms.qr


通过connection的ConnectionMetaData metadata = qConnect.getMetaData();放方法,可以获取到数据源,里面包含了一些信息。
System.out.println("JMS Version: " +
metadata.getJMSMajorVersion() + "." +
metadata.getJMSMinorVersion());
System.out.println("JMS Provider: " +
metadata.getJMSProviderName());
System.out.println("JMSX Properties Supported: ");
Enumeration e = metadata.getJMSXPropertyNames();
while (e.hasMoreElements()) {
System.out.println(" " + e.nextElement());
}


另外message必须通过session创建,以便完成对应的初始化,而不是通过new创建,new创建出来的对象,确实很多信息。


package lyx.money;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class QLender implements MessageListener {
	private QueueConnection qConnect = null;
	private QueueSession qSession = null;
	private Queue requestQ = null;

	public QLender(String queuecf, String requestQueue) {
		try {
			// Connect to the provider and get the JMS connection
			Context ctx = new InitialContext();
			QueueConnectionFactory qFactory = (QueueConnectionFactory) ctx
					.lookup(queuecf);
			qConnect = qFactory.createQueueConnection();
			// Create the JMS Session
			qSession = qConnect.createQueueSession(false,
					Session.AUTO_ACKNOWLEDGE);
			// Lookup the request queue
			requestQ = (Queue) ctx.lookup(requestQueue);
			// Now that setup is complete, start the Connection
			qConnect.start();
			// Create the message listener
			QueueReceiver qReceiver = qSession.createReceiver(requestQ);
			qReceiver.setMessageListener(this);
			System.out.println("Waiting for loan requests...");
		} catch (JMSException jmse) {
			jmse.printStackTrace();
			System.exit(1);
		} catch (NamingException jne) {
			jne.printStackTrace();
			System.exit(1);
		}
	}

//使用监听模式
	public void onMessage(Message message) {
	try {
	boolean accepted = false;
	// Get the data from the message
	MapMessage msg = (MapMessage)message;
	double salary = msg.getDouble("Salary");
	double loanAmt = msg.getDouble("LoanAmount");
	// Determine whether to accept or decline the loan
	if (loanAmt < 200000) {
	accepted = (salary / loanAmt) > .25;
	} else {
	accepted = (salary / loanAmt) > .33;
	}
	System.out.println("" +
	"Percent = " + (salary / loanAmt) + ", loan is "
	+ (accepted ? "Accepted!" : "Declined"));
	// Send the results back to the borrower
	TextMessage tmsg = qSession.createTextMessage();
	tmsg.setText(accepted ? "Accepted!" : "Declined");
//为了和消息发送者保持逻辑一致,(这里有一定耦合度了)
	tmsg.setJMSCorrelationID(message.getJMSMessageID());
	// Create the sender and send the message
//在进行回复的时候,使用发送者指定的通道。
	QueueSender qSender =
	qSession.createSender((Queue)message.getJMSReplyTo());
	qSender.send(tmsg);
	System.out.println("\nWaiting for loan requests...");
	} catch (JMSException jmse) {
	jmse.printStackTrace();
	System.exit(1);
	} catch (Exception jmse) {
	jmse.printStackTrace();
	System.exit(1);
	}
	}

	private void exit() {
		try {
			qConnect.close();
		} catch (JMSException jmse) {
			jmse.printStackTrace();
		}
		System.exit(0);
	}

	public static void main(String argv[]) {
		String queuecf = null;
		String requestq = null;
		if (argv.length == 2) {
			queuecf = argv[0];
			requestq = argv[1];
		} else {
			System.out.println("Invalid arguments. Should be: ");
			System.out.println("java QLender factory request_queue");
			System.exit(0);
		}
		QLender lender = new QLender(queuecf, requestq);
		try {
			// Run until enter is pressed
			BufferedReader stdin = new BufferedReader(new InputStreamReader(
					System.in));
			System.out.println("QLender application started");
			System.out.println("Press enter to quit application");
			stdin.readLine();
			lender.exit();
		} catch (IOException ioe) {
			ioe.printStackTrace();
		}
	}
}



QueueBrowser
通过它可以访问队列中的快照
QueueSession session = connection.createQueueSession
(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(queue);
Enumeration e = browser.getEnumeration();
while (e.hasMoreElements()) {
TextMessage msg = (TextMessage)e.nextElement();
System.out.println("Browsing: " + msg.getText());
}
browser.close();



动态队列
假设一种情况,有1000个发送者,都要使用不同的队列,并且队列存活期很短,那么可能想到是每个队列命名XXX1-XXXN这种方式,其实还有更好的方式
QueueSession.createTemporaryQueue()
这样创建的队列是临时的,并且只是针对一个connection而存在的。
分享到:
评论

相关推荐

    jms-1.1.jar

    1. 面向消息模型:提供点对点(Queue)和发布/订阅(Topic)两种消息模型,满足不同场景需求。 2. 异步通信:消息的生产和消费可以是异步的,提高系统的响应性和可扩展性。 3. 可靠传输:支持事务和持久化,确保...

    javax.jms-1.1.jar

    2. **消息模型**:JMS支持两种消息模型——点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。在点对点模型中,消息从一个队列(Queue)中发送到另一个队列,每个消息仅被一个消费者接收。...

    jms-1_1-fr-apidocs.zip

    1. **消息模型**:JMS支持两种消息模型——点对点(Point-to-Point,PTP)和发布/订阅(Publish/Subscribe,Pub/Sub)。在点对点模型中,消息由一个生产者发送到一个队列,然后由一个消费者接收;而在发布/订阅模型...

    jms-1_0_2-upd-sampleprograms.zip

    队列则是点对点模型,每个消息仅由一个消费者接收。示例会演示如何创建和使用这两种类型的目标。 5. **持久化和非持久化消息**:持久化消息即使在发送后消费者未在线也能保留,当消费者重新上线时可以接收到。非...

    jms-1.1接口定义代码

    8. **消息模式**:包括点对点、发布/订阅、请求/响应等,每种模式都有其特定的应用场景和优点。 9. **消息类型**:JMS定义了几种消息类型,如TextMessage用于传输文本数据,ObjectMessage用于传输Java对象,...

    jms-1.1.jar+jmxtools-1.2.1.jar+jmxri-1.2.1.jar

    它提供了一个框架,允许开发者对Java应用程序进行监控和管理。JMXTools-1.2.1.jar包含了一组工具和API,这些工具和API扩展了JMX的功能,使得开发者能够更方便地远程管理和监控Java应用。这包括MBeans(Managed Beans...

    前端开源库-jms-deploy

    在实际使用`jms-deploy`时,开发者需要注意以下几点: - **安全**:确保JMS服务器的连接信息在传输过程中是加密的,并且不在公开的代码仓库中暴露敏感信息。 - **测试环境**:在正式部署前,应在测试环境中进行...

    PHP 调用 JMS -Stomp

    - **PHP编程基础**:确保对PHP的基本语法和面向对象编程有扎实的理解,以便正确使用Stomp客户端。 - **异步处理**:理解消息队列如何帮助实现后台处理和解耦系统。 通过上述知识点的学习和实践,你将能够熟练地在...

    JMS--J2EE培训材料

    JMS提供了两个主要的消息域:点对点(PTP)和发布/订阅(Pub/Sub)。 1. **点对点(PTP)**:在此模式下,消息发送给特定的目标队列,每个消息会被一个消费者接收并消费。一旦消息被消费,就从队列中移除。 - **队列...

    jms-appender-5.0.1.zip

    【标题】"jms-appender-5.0.1.zip" 涉及的主要知识点是Java消息服务(Java Message Service,简称JMS)以及日志处理的Appender机制。JMS是Java平台上的一个标准接口,它允许应用程序创建、发送、接收和读取消息。在...

    ActiveMQ 中javax.jms的源码 javax.jms-sources-1.1.zip

    Queue提供点对点的消息传递,而Topic支持发布/订阅模型。 3. **MessageProducer**和**MessageConsumer**:它们分别负责发送和接收消息。MessageProducer通过Destination发送消息,MessageConsumer从Destination接收...

    spring-jms-4.2.xsd.zip

    Spring JMS支持各种特性,如点对点(P2P)和发布/订阅(Pub/Sub)消息模型,事务管理,以及通过JCA(Java Connector Architecture)适配器集成企业信息系统的功能。 在Spring应用中,开发者通常会在XML配置文件中...

    jms-test.zip_jms activemq_jms test

    JMS支持两种主要的消息模型:点对点(队列)和发布/订阅(主题)。 2. **点对点模型**:在这个模型中,生产者发送消息到一个队列,消费者从队列中接收消息。每个消息只被一个消费者接收,提供了一种可靠的一对一...

    jms-1.1+jmxri-1.2.1+jmxtools-1.2.1.zip

    在JMS-1.1规范中,主要有两种类型的消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。点对点模型通过队列(Queue)实现,每个消息只有一个消费者;而发布/订阅模型通过主题...

    jms-spring.zip

    在Spring中,JMS的集成主要依赖于`spring-jms`模块,该模块提供了对各种JMS供应商的抽象和支持,例如ActiveMQ,RabbitMQ,IBM WebSphere MQ等。ActiveMQ是Apache软件基金会的一个开源项目,是一个功能丰富的JMS提供...

    jms-jmxri-jmxtools的jar文件

    例如,你可以使用JMS发送异步消息、实现点对点或发布/订阅通信模式,以及在分布式环境中进行可靠的通信。 接着,JMX是一种Java平台的管理框架,用于管理和监控Java应用程序、设备和网络。`jmxri-1.2.1.jar`是JMX ...

    storm1.2.1-wangzs-jms-v4.0-完成

    6. **版本升级**:从1.2.1到wangzs的v4.0,可能涉及到对原生Storm功能的增强,以及对JMS接口的优化,使得整个系统更加稳定且高效。 综上所述,"storm1.2.1-wangzs-jms-v4.0-完成"项目是一个集成了Apache Storm与JMS...

    jms-jar所需的所有jar包

    JMS支持两种消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe)。 - **点对点模型**:在这种模型中,消息从一个生产者发送到一个队列,然后由一个或多个消费者从队列中取出并消费。每个...

    activemq-all-5.15.2.jar 和 jms-1.1.jar

    "jms-1.1.jar"包含了JMS 1.1规范的实现,这是JMS的第二个主要版本,提供了发布/订阅和点对点两种消息传递模式。 4. **使用场景**: activemq-all-5.15.2.jar和jms-1.1.jar通常在以下场景中使用:大型分布式系统中的...

Global site tag (gtag.js) - Google Analytics