`
liyixing1
  • 浏览: 963818 次
  • 性别: Icon_minigender_1
  • 来自: 江西上饶
社区版块
存档分类
最新评论

jms基础,与例子

    博客分类:
  • jms
阅读更多
MOM,面向消息中间件的交互模式




各个系统间,可以认为是独立的,消息通过中间件传递。中间件类似一个路由器,决定消息的去处等等。


集中式


如上,可见,对中间件的依赖感觉还是挺严重的。当然实际应用中可能是存在多个如上图所示的结构群组成。

分步式



IP多播允许应用程序连接一个或多个IP多播组,但是依然存在一个路由功能的,用来作为广播分发等功能存在。

以上两种方式经常是混合存在的。


JMS的两种模式
分为点对点和发布/订阅模式。
点对点消息生产者称为发送者,消息消费者称为接收器。具有唯一消费者特性,并且消费者可以在生产者之前或者运行,消费者能获取到在其运行前的消息,只要该消息还未过期,还未被其他消费者获取。

发布订阅模式,消息生产者称为发布者,消息消费者称为订户,可以支持一个主题的内容被多个消费者订阅和获取,但是消费者只能获取在其运行之后的消息,之前的消息无法获取。广播中心在获取到消息后会自动推送到各个客户端。



点对点依赖于queue(队列),而发布订阅依赖于Topic(主题)

相关接口
JMS主要接口有7个,分别是
• ConnectionFactory
• Destination
• Connection
• Session
• Message
• MessageProducer
• MessageConsumer

需要注意的是事物控制和通讯是通过Session而不是Connection来进行的。Connection往往一个就可以了,会建立Session池。




以上是在使用JNDI来管理JMS的时候,可以从JNDI获取ConnectionFactory和Destination

ConnectionFactory 接口(连接工厂)
用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。

Connection 接口(连接)
连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。

Destination 接口(目标)
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
Destination有2个子接口,Queue和Topic

MessageConsumer 接口(消息消费者)
由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或(非阻塞)接收队列和主题类型的消息。

MessageProducer 接口(消息生产者)
由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。

Message 接口(消息)
是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:
消息头(必须):包含用于识别和为消息寻找路由的操作设置。
一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。
消息接口非常灵活,并提供了许多方式来定制消息的内容。

Session 接口(会话)
表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

基本概念
jms的实现,称为jms provider,可以认为是jms的服务器

点对点模式的API
与JMS基础API类似,基本上都是基础的子接口
• QueueConnectionFactory
• Queue
• QueueConnection
• QueueSession
• Message
• QueueSender
• QueueReceiver



如上是在JNDI管理的方式的交互

发布订阅模式的
• TopicConnectionFactory
• Topic
• TopicConnection
• TopicSession
• Message
• TopicPublisher
• TopicSubscriber





SOA
JMS是实现SOA的很好的手段,应用之间通过JMS作为消息传递,隔离具体实现,实际上WEBSERVICE,HTTP等方式也是可以实现,只是对于事物控制等等没有那么强大。

事件驱动体系结构
如当员工离职,产生一个事件源,广播到系统各处,各处之间进行处理。对于发布订阅模式的JMS很适合。


企业整合
企业系统之间,可能使用的语言不一样,对应的消息协议也不一样,可能需要再之间进行桥接。

企业间的整合
这方面技术选择很多,webservice,http,jms都是不错选择。

跨地域的整合
这个问题,JMS提供了更安全可靠的特性,而webservice,http等都不具有,当然通过改进方案,其他技术方案也是可选择的。

传统的RPC模式


如上,各个系统之间进行交互,当订单系统开始,需要调用CRM,CRM又会调用库存系统,等等,每次调用间存在的等待,阻塞等时间消耗,比如当CRM服务提供者,暂停了几分钟的运行,那么整个信息都将失败。

企业JMS结构



订单开始,发送一个CRM请求的消息,并继续自己的下一步,无需等待,而且当CRM停止运行了一段时间,而订单系统则继续进行。当然对JMS server的依赖就会很大了。JMSserver要是挂了,就呵呵吧,因此对JMS Server的容错处理,难度和技术点要求比较高。一般情况下JMS SERVER会把收到的消息进行持久化,以确保消息不会因为JMS本身暂停而造成消息丢失。

例子使用activemq

简单的例子,非JNDI
package lyx;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 
 * 
 * 描述:消息生产者
 * 
 * @author liyixing
 * @version 1.0
 * @since 2014-9-24 下午10:18:18
 */
public class Client {
	public static void main(String[] args) throws JMSException {
		String l = "tcp://localhost:61616";
		ConnectionFactory factory = new ActiveMQConnectionFactory(l);
		Connection connection = factory.createConnection();
		Session session = connection.createSession(false,
				Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createQueue("lyx");
		MessageProducer producer = session.createProducer(destination);
		Message message = session.createTextMessage("hello");

		producer.send(message);
		producer.close();
		session.close();
		connection.close();
	}
}





package lyx;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 
 * 
 * 描述:消息消费者
 * 
 * @author liyixing
 * @version 1.0
 * @since 2014-9-24 下午10:18:41
 */
public class Consumer {

	/**
	 * 描述:
	 * 
	 * @param args
	 * @author liyixing 2014-9-24 下午10:13:08
	 * @throws JMSException
	 */

	public static void main(String[] args) throws JMSException {
		String l = "tcp://localhost:61616";// 地址
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(l);// 连接器
		Connection connection = connectionFactory.createConnection();// 创建连接
		Session session = connection.createSession(false,
				Session.AUTO_ACKNOWLEDGE);// 打开会话
		String destinationName = "lyx";
		Destination dest = session.createQueue(destinationName);// 消息目的地
		MessageConsumer consumer = session.createConsumer(dest);

		connection.start();

		Message message = consumer.receive();
		TextMessage textMessage = (TextMessage) message;
		String text = textMessage.getText();

		System.out.println("消息: " + text);
		consumer.close();
		session.close();
		connection.close();
	}

}



//上面的代码中,可以试试先把JMS中间件启动,运行Client后把JMS关闭,重启一次,再运行Consumer,Consumer依然能获取到数据,这说明activemq默认情况下,是会对消息进行持久化的。


通过发布订阅模式,来实现的聊天功能,使用的是ActiveMQ,该框架本身就支持JNDI,只需要在classpath的根目录下面增加一个文件
jndi.properties

#该文件是ActiveMQ自己解析的。
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames=lyxcf
topic.topic1=jms.topic1

package lyx.chat;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
 * 
 * 
 * 描述:聊天室
 * 
 * @author liyixing
 * @version 1.0
 * @since 2014-9-25 下午4:26:20
 */
public class Chat implements MessageListener {
	private TopicConnection connection;
	private TopicSession session;// 发布者
	private TopicPublisher publisher;
	private String userName;

	/**
	 * 
	 * 构造函数
	 * 
	 * @param factoryName
	 *            本例子使用JNDI,因此是只JNDI的名字
	 * @param topicName
	 *            相关主题名字
	 * @param userName
	 *            相关用户名
	 * @throws NamingException
	 * @throws JMSException
	 */
	public Chat(String factoryName, String topicName, String userName)
			throws NamingException, JMSException {
		this.userName = userName;
		// JNDI中获取连接工厂
		InitialContext initialContext = new InitialContext();
		TopicConnectionFactory factory = (TopicConnectionFactory) initialContext
				.lookup(factoryName);
		// 生成连接
		connection = factory.createTopicConnection();
		// 创建会话
		// 发布者
		session = connection
				.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);//false意味着不开启事物,我们选择AUTO_ACKNOWLEDGE,表示该消息是接收客户端后,它会自动确认。 false的时候值可以忽略该传什么值,支持的模式有Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, and Session.DUPS_OK_ACKNOWLEDGE
		// 订阅者
		// TopicSession subSession = connection.createTopicSession(false,
		// Session.AUTO_ACKNOWLEDGE);
		// 获取主题信息
		Topic topic = (Topic) initialContext.lookup(topicName);
		// 创建发布者
		publisher = session.createPublisher(topic);
		// 创建订阅者
		TopicSubscriber subscriber = session.createSubscriber(topic);

		// 设置消息监控
		subscriber.setMessageListener(this);
		connection.start();
	}

	/**
	 * 接收到消息
	 * 
	 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
	 */
	@Override
	public void onMessage(Message message) {
		TextMessage textMessage = (TextMessage) message;
		try {
			System.out.println("收到信息:"
					+ textMessage.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 
	 * 描述:发送
	 * 
	 * @param msg
	 * @throws JMSException
	 * @author liyixing 2014-9-25 下午4:48:42
	 */
	public void send(String msg) throws JMSException {
		TextMessage textMessage = session.createTextMessage();

		textMessage.setText(userName + ":" + msg);
		publisher.send(textMessage);
	}

	public void close() throws JMSException {
		connection.close();
	}

	public static void main(String[] args) throws NamingException,
			JMSException, IOException {
		Chat chat = new Chat(args[0], args[1], args[2]);
		BufferedReader commandLine = new java.io.BufferedReader(
				new InputStreamReader(System.in));
		while (true) {
			String s = commandLine.readLine();
			if (s.equalsIgnoreCase("exit")) {
				chat.close();
				System.exit(0);
			} else
				chat.send(s);
		}
	}
}





TopicConnectionFactory接口
一共只有两个重载的方法,用于创建TopicConnection。

package javax.jms;

/**
* @version $Rev: 467553 $ $Date: 2006-10-25 06:01:51 +0200 (Wed, 25 Oct 2006) $
*/
public interface TopicConnectionFactory extends ConnectionFactory {
    TopicConnection createTopicConnection() throws JMSException;

    TopicConnection createTopicConnection(String userName, String password)
        throws JMSException;
}

TopicConnection
通过TopicConnectionFactory 创建,connection = factory.createTopicConnection();
它是继承自Connection,比较重要的三个方法是
public interface Connection {
    void start() throws JMSException;

    void stop() throws JMSException;

    void close() throws JMSException;

    ........
}

start方法打开网络流,并开始等待消息的到来。
stop会暂时性的不接受消息,但流未关闭,知道下次start的调用
close会关闭流,并且会关闭所有相关的需要关闭的对象,包括TopicSession,TopicPublisher,TopicSubscriber.

TopicSession
在获得了TopicConnection就可以获取TopicSession,它是用来创建消息Message,发布者TopicPublisher和订阅者TopicSubscriber的工厂,以及控制事物,如上的例子,使用的是一个session同时创建了订阅者和发布者,实际上更加推荐的方式的创建两个session,分别用来处理发布端和订阅端,只有一个session,会受到网络限制,线程限制等问题。如上面聊天的例子,使用的是一个session,那么如果发布者进行发布的时候,网络堵塞了1分钟,那么订阅者也必须等待一分钟。
另外session还用来创建session。


Topic
可以简单的当做标示,有一个name,用来作为主题名。
之所以不直接使用String作为主题名,是为了避开不同的JMS实现,或者不同的命名规范,而造成不可移植问题。

TopicPublisher
消息发布者,通过session创建,创建的时候需要以主题对象为参数。通过publish方法进行发布。只是传递消息,而不等待接收者,因此它主要负责消息发送就可以了。

TopicSubscriber
订阅器
上面例子使用TopicSubscriber subscriber = session.createSubscriber(topic);
另外一个更完整的重载是TopicSubscriber subscriber =
subSession.createSubscriber(chatTopic, null, true);
1.相关主题,第二个是消息选择器(可以认为是条件),第三个是表示是否接收自己发布的消息。JMS中间件收到消息后会主动推送到订阅者,而订阅者无需轮训查问。
另外订阅是基于事件驱动模式的,可以为其设置一个
package javax.jms;
public interface MessageListener {
public void onMessage(Message message);
}
监听者的实例。

Message
消息对象,有三个部分,头,属性,消息体

他有很多子集
TextMessage 文本
ObjectMessage java对象
BytesMessage 二进制
StreamMessage 流模式
MapMessage 键值对模式,但是值必须是java原生类型或者其包装类

  • 大小: 16.1 KB
  • 大小: 35.3 KB
  • 大小: 28.3 KB
  • 大小: 24.3 KB
  • 大小: 25.5 KB
  • 大小: 25.3 KB
  • 大小: 26.2 KB
  • 大小: 32.9 KB
  • 大小: 28.4 KB
分享到:
评论

相关推荐

    JMS入门小例子以及weblogic安装,并且在weblogic中配置JMS服务

    1. **创建JMS模块**:在WebLogic管理控制台中,首先需要创建一个新的JMS模块,这将定义消息传递基础设施的容器。 2. **定义JMS服务器**:在JMS模块下,需要创建JMS服务器,这将是实际运行消息传递服务的地方。 3. ...

    java-jms小例子

    总结,"java-jms小例子"是一个基础教程,帮助开发者理解如何在Java应用程序中使用JMS进行异步通信。通过创建消息生产者和消费者,设置消息队列或主题,以及发送和接收不同类型的JMS消息,开发者能够掌握JMS的核心...

    JBOSS建立JMS应用实例

    一、JMS基础知识 1. 概念理解:JMS是Java平台中用于企业级消息传递的API,它定义了生产、发送、接收和消费消息的标准接口。 2. 消息模型:JMS支持两种消息模型——点对点(Point-to-Point)和发布/订阅(Publish/...

    JMS简单示例1

    **JMS简介** Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用集成的标准化API,...通过学习和理解JMS的基础知识,我们可以构建出稳定、可靠的分布式系统,实现不同服务间的解耦和异步通信。

    JMS相关,教程,例子,学习笔记

    **JMS(Java Message Service)** 是Java平台中用于企业级...而 **JMS教程.pdf** 和 **基于XML和JMS的异构数据交换集成的研究.pdf** 则提供了理论基础和高级应用场景的讲解,有助于进一步提升对JMS的理解和应用能力。

    JMS开发例子.pdf

    ### JMS开发基础及实践 #### 一、JMS简介 Java消息服务(JMS)是一种标准的Java API,用于在应用程序之间发送消息。它提供了一种异步通信机制,允许程序通过消息中间件发送和接收消息。JMS支持两种消息模型:点对点...

    jms+activeMq+spring学习简单例子

    1. **JMS基础**:JMS允许应用程序创建、发送、接收和阅读消息。它定义了两种消息模型:点对点(Queue)和发布/订阅(Topic)。在点对点模型中,每个消息只被一个消费者接收;而在发布/订阅模型中,消息可以被多个...

    spring-jms使用queue发送消息简单例子

    在这个"spring-jms使用queue发送消息简单例子"中,我们将深入探讨如何使用Spring JMS与ActiveMQ结合,通过队列(Queue)来发送和接收消息。 首先,`pom.xml`文件是Maven项目的配置文件,它包含了项目所依赖的库。...

    jms之activeMQ 队列和广播模式例子(主要给初学者提供入门知识)

    这篇博客"jms之activeMQ 队列和广播模式例子"主要面向初学者,旨在提供ActiveMQ入门级的知识,通过实例解释队列(Queue)和主题(Topic)这两种基本的消息模式。 首先,我们要理解JMS中的队列和主题的区别。队列...

    JMS入门

    在samples目录下,有一些基础示例,通过运行这些例子,你可以直观地了解JMS的工作原理。 OpenJMS遵循SUN的JMS API 1.0.2规范,支持消息队列和发布/订阅模式。这两种模式提供了不同类型的通信方式,队列模式确保每个...

    JMS样例项目

    我自己用Eclipse写的JMS例子。完全是用来理解JMS的基础用法,没有太大的生产价值。 代码结构简单,只有三个java代码页,只演示基本功能。安装好jboss之后,不需要进行任何配置,在本机运行和编译代码。仅供学习...

    JMS-ActiveMQ入门实例

    **JMS(Java Message Service)** 是一个Java平台上的标准接口,它定义了一种统一的API,使得应用程序可以与...通过实践这些例子,我们可以更好地掌握JMS和ActiveMQ的使用,为构建可扩展和高可用的分布式系统奠定基础。

    PG20191_example_J2EE基础讲座_

    4. **JMS(Java Message Service)**:JMS提供了一种标准的方式来创建、发送、接收和读取消息,用于异步通信和解耦应用程序组件。 5. **JTA(Java Transaction API)**:JTA提供了一种标准的方式来管理跨越多个资源...

    Apache ActiveMQ 入门最简单例子

    例如,在Java环境中,我们可以使用JMS(Java Message Service)API与ActiveMQ交互: 1. **消息生产者**:生产者创建一个连接到ActiveMQ服务器的ConnectionFactory,然后创建一个Session,用于发送消息。Session可以...

    MQjava基础编程.pdf

    本文档将探讨MQ Java基础编程,包括如何使用Java客户端与MQ Server交互,以及MQ作为WebSphere Application Server的JMS资源提供者。 首先,了解JMS(Java Message Service)是Java平台上的标准API,用于访问消息...

    《EJB3.0实例子教程》jar包2

    《EJB3.0实例子教程》jar包2包含了多个重要的Java库文件,这些文件对于理解和实践企业级JavaBeans(EJB)3.0技术至关重要。EJB是Java平台企业版(Java EE)的一部分,主要用于构建可扩展、分布式、安全且事务处理...

    分布式JAVA应用基础与实践(林昊)完整版pad+源码

    分布式JAVA应用基础与实践是Java开发领域中一个重要的主题,主要涵盖了如何在大规模网络环境中设计、部署和管理Java应用程序。本书由林昊编著,旨在帮助开发者深入理解分布式系统的基本概念,掌握Java在分布式环境中...

Global site tag (gtag.js) - Google Analytics