`
eighthspace
  • 浏览: 26178 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

jms p2p和PubSub模型总结

阅读更多

首先先总结下jms规范下定义的实现接口:

 

 

ConnectionFactory 接口(连接工厂)

  用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。

Connection 接口(连接)

  连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。

Destination 接口(目标)

  目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。

MessageConsumer 接口(消息消费者)

  由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

MessageProducer 接口(消息生产者)

  由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。

Message 接口(消息)

  是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:

  消息头(必须):包含用于识别和为消息寻找路由的操作设置。

  一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。

  一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。

  消息接口非常灵活,并提供了许多方式来定制消息的内容。

Session 接口(会话)

 

  表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

 

p2p(point-to-point)模型:

 

其中包含几个比较重要的概念:消息队列(Queue),发送者(sender),接受者(receiver),每个消息被发送到规

 

定的消息队列中,并且该消息会被持久化。每个消息只能对应一个消费者,即使发送消息时,消费者没有在运行,仍然不影

 

响该消息被发送到消息队列,当消费者运行时,该消息就会被消费者接受到。下面以一个实例说明p2p模型的运行机制。

 

服务器端代码:

 

 

@MessageDriven(
		activationConfig={
				@ActivationConfigProperty(propertyName="destinationType" , propertyValue="javax.jms.Queue"), //指定使用的是queue
				@ActivationConfigProperty(propertyName="destination" , propertyValue="queue/LeadfarQueue")//指定具体使用哪个queue
		}
)
public class MdbBeanTest01 implements MessageListener {

	public void onMessage(Message msg) {
		// TODO Auto-generated method stub
		try {
			TextMessage tm = (TextMessage)msg; 
			String text =tm.getText();
			System.out.println("服务器接收到了信息:" +text);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
 

 

上面的“queue/LeadfarQueue”队列可以在jboss的配置文件中进行配置,具体路径如下:

 

JBOSS_HOME\server\default\deploy\messaging\destinations-service.xml

 

在此文件中添加:

 

 

 <mbean code="org.jboss.jms.server.destination.QueueService"
      name="jboss.messaging.destination:service=Queue,name=LeadfarQueue"
      xmbean-dd="xmdesc/Queue-xmbean.xml">
      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
      <depends>jboss.messaging:service=PostOffice</depends>
 </mbean>  

 

 

这样服务器端的测试代码就完成了。

 

客户端代码:

 

 

try {
			InitialContext context = new InitialContext();
			QueueConnectionFactory factory = (QueueConnectionFactory)context.lookup("ConnectionFactory");//创建ConnectionFactory

			QueueConnection connection = factory.createQueueConnection(); //创建Connection
			
			/*
			 * 第一个参数:true表示开启事务,最后要手动进行commit提交,false表示自动提交*/
			QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);//创建会话

			Queue destination = (Queue)context.lookup("queue/LeadfarQueue");//创建Destination

			QueueSender sender = session.createSender(destination);//创建发送者

			TextMessage msg = session.createTextMessage("mdbBean , 你好");//创建文本消息

			sender.send(msg);//发送消息

			System.out.println("客户端消息发送完成");
			sender.close();
			session.close();
			connection.close();
		} catch (NamingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
 

 

这样就可以进行测试,测试结果:

 

服务器端调试信息:19:39:56,234 INFO  [STDOUT] 服务器接收到了信息:mdbBean , 你好

 

验证p2p模式下消息的持久化的特性:

 

首先停止服务器端EJB的运行,再次运行客户端测试代码。此时,该消息仍然会发送到指定的消息队列,这时再次启动服务

 

器端的运行,消息仍然会发送到服务器端。说明,队列会将消息持久化。

 

PubSub模型

 

其中包含的几个概念:主题(Topic),发布者(Publisher),订阅者(Subscriber)。与p2p模型不同,p2p模型中消

 

息与接收者是一对一的关系,而在PubSub模型中,主题与订阅者是一对多的关系,也就是一个Topic会发送给多个订阅者

 

,但是主题并不会被持久化,也就是说如果在Topic发送时,订阅者程序没有运行,那么等到订阅者下次再次运行时,该

 

Topic也不会再发送给该订阅者,该Topic对该订阅者来说已经丢失。测试代码如下:

 

 

 

@MessageDriven(
		activationConfig={
				@ActivationConfigProperty(propertyName="destinationType" , propertyValue="javax.jms.Topic"),//指定使用的是Topic
				@ActivationConfigProperty(propertyName="destination" , propertyValue="topic/LeadfarTopic")//指定使用哪个Topic
		}
)
public class TopicMdbBeanTest01 implements MessageListener {

	public void onMessage(Message msg) {
		// TODO Auto-generated method stub
		try {
			TextMessage tm = (TextMessage)msg;
			String text =tm.getText();
			System.out.println("服务器接收到了信息:" +text);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
 

 

“topic/LeadfarTopic”选项还是在上面的xml文件中进行指定,这个实例中,在服务器端定义了多个EJB,都是使用“

 

topic/LeadfarTopic”。

 

客户端代码:

 

try {
			InitialContext context = new InitialContext();
			TopicConnectionFactory factory = (TopicConnectionFactory)context.lookup("ConnectionFactory");
			TopicConnection connection = factory.createTopicConnection();
			TopicSession session = connection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
			Topic destination = (Topic)context.lookup("topic/LeadfarTopic");
			TopicPublisher publisher = session.createPublisher(destination);
			TextMessage msg = session.createTextMessage("topicMdbBean , 你好");
			publisher.send(msg);
			System.out.println("客户端消息发送完成");
			publisher.close();
			session.close();
			connection.close();
		} catch (NamingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
 

与上面的客户端代码类似,这个测试结果如下:

 

服务器端调试信息:

 

20:24:57,281 INFO  [STDOUT] 服务器接收到了信息:topicMdbBean , 你好

20:24:57,281 INFO  [STDOUT] 服务器接收到了信息:topicMdbBean , 你好

20:24:57,281 WARN  [InterceptorsFactory] EJBTHREE-1246: Do not use InterceptorsFactory with a ManagedObjectAdvisor, InterceptorRegistry should be used via the bean container

20:24:57,281 WARN  [InterceptorsFactory] EJBTHREE-1246: Do not use InterceptorsFactory with a ManagedObjectAdvisor, InterceptorRegistry should be used via the bean container

20:24:57,281 INFO  [STDOUT] 服务器接收到了信息:topicMdbBean , 你好

 

可见,服务器端的三个EJB都收到了该Topic。

 

如果将服务器端程序停止运行,再运行客户端程序,再重新启动服务器端程序,也不会收到该Topic,也就是说Topic被丢

 

失了。也可以将Topic持久化,方法如下:在@MessageDriven的配置中增加几个配置选项:

 

@MessageDriven(
		activationConfig={
				@ActivationConfigProperty(propertyName="destinationType" , propertyValue="javax.jms.Topic"),
				@ActivationConfigProperty(propertyName="destination" , propertyValue="topic/LeadfarTopic"),
				@ActivationConfigProperty(propertyName="SubscriptionDurability" ,propertyValue="Durable"),
				@ActivationConfigProperty(propertyName="subscriptionName" ,propertyValue="sn"),
				@ActivationConfigProperty(propertyName="clientId" , propertyValue="snClientId")
		}
)
 

经过这样的配置后,该Topic对于配置成这样的EJB来说就进行了持久化,这样,这个EJB在Topic发送后再运行也能接收到

 

。以上就是两种模型各自的特点。

分享到:
评论

相关推荐

    JMS的两种消息模型P2P模型和PubSub模型实例.pdf

    JMS的两种消息模型P2P模型和PubSub模型实例.pdf

    JMS pubsub实例

    JMS pubsub实例 提供学习 希望有帮助 (*^__^*) 嘻嘻……

    小而美的pubsub库发布订阅者模式

    在`pubsub-master`这个压缩包中,可能包含了`pubsub-js`库的源码、示例、文档和测试等资源。通过阅读源码,开发者可以更深入地理解其工作原理,并根据需要进行定制和扩展。同时,测试用例可以帮助确保代码的正确性,...

    前端开源库-angular-global-pubsub

    **总结** Angular Global Pubsub 提供了一种高效且灵活的解决方案,使得手动引导的Angular应用程序能够优雅地处理组件间的通信问题。通过使用这个库,开发者可以构建更加健壮、可扩展的前端应用,同时保持代码的...

    smackx-pubsub.0.6.jar

    Pubsub Extensions for Smack

    Go-PubSub队列和长轮询订阅(不绑定到http)

    PubSub模式是一种常见的消息传递模型,其中生产者(Publisher)发布消息到一个主题(Topic),而消费者(Subscriber)则通过订阅这些主题来接收消息。在Go-longpoll中,它允许订阅者保持一个持久的HTTP连接,等待新...

    Go-pubsub-一个简单的go订阅包

    - **轻量级**:Go-pubsub库的设计简洁,易于理解和使用,适合小型项目或作为大型系统的一部分。 - **非阻塞**:发布和订阅操作是非阻塞的,保证了高并发环境下的性能。 - **多订阅者**:一个主题可以有多个订阅者...

    Laravel开发-laravel-pubsub-queue

    Pub/Sub是一个完全托管的消息传递服务,它允许应用程序之间通过发布和订阅模型进行通信。发布者发布消息到主题(Topic),订阅者则可以订阅这些主题并接收发布的消息。这种解耦设计使得系统更加灵活,能够处理高并发...

    PyPI 官网下载 | django-pika-pubsub-0.4.tar.gz

    `django-pika-pubsub-0.4.tar.gz` 包的下载和安装就是利用了PyPI和pip。 5. **源代码分发**:`.tar.gz` 文件是Unix/Linux系统常用的归档压缩格式,用于打包和压缩源代码,便于分发和存储。开发者通常会将源代码打包...

    JXTA——Java P2P网络编程技术.zip

    **JXTA(Java eXtensible ...总结起来,JXTA是Java开发者构建P2P网络应用的强大工具,通过学习和实践,开发者不仅可以掌握P2P网络的基本概念和技术,还能利用JXTA提供的API高效地开发出复杂且具有弹性的分布式系统。

    Laravel开发-lumen-pubsub

    **Laravel 开发 - Lumen Pub/Sub 深度...总结,Lumen 的 Pub/Sub 能力使得开发者能够在微服务架构中实现灵活且可靠的通信。通过合理设计事件和监听器,结合合适的中间件,可以构建出高效、可扩展的 Lumen 应用程序。

    PubSub.rar

    在给定的“PubSub.rar”压缩包文件中,我们看到与实现这一模式相关的组件,包括`Monitor.ice`、`IceSub`和`IcePub`。这里我们将深入探讨IceStorm如何在冰川(Ice)框架下实现发布订阅服务。 首先,`IceStorm`是...

    local_pubsub:flutterl_pubsub

    local_pubsub Dart的简单本地pubsub库。你可以做什么该库允许您订阅不同的主题,并向该主题的所有订阅者发送消息。 订阅主题void main () { PubSub pubsub = PubSub (); Subscription ? sub = pubsub ? . subscribe ...

    PubSub:PubSub

    在C++编程中,实现PubSub模型通常涉及到设计一个中间件或者框架,以便于管理发布者和订阅者的交互。以下是一些关键知识点: 1. **主题(Topic)**:主题是消息的分类,它定义了消息的类型。发布者将消息发送到特定...

    前端项目-pubsub-js.zip

    "前端项目-pubsub-js.zip"是一个专注于此模式的JavaScript库,它允许在不直接耦合的情况下进行事件驱动的通信。在这个库中,我们主要关注的是`PubSubJS-master`这个目录,它包含了实现这一功能的源代码和其他相关...

    WxPython界面利用pubsub 展示进程工作的进度条

    一个WxPython界面利用pubsub 展示进程工作的进度条的例子,实际使用, 只要修改  WorkThread 里的 run 内容 及 MainFrame 里的 updateDisplay 内容即可。

    Phoenix 框架的 Redis PubSub 适配器.zip

    用法要使用 Redis 作为你的 PubSub 适配器,只需将其添加到你的 deps 和应用程序的 Supervisor 树中# mix.exsdefp deps do [{:phoenix_pubsub_redis, "~&gt; 3.0.0"}],end# application.exchildren = [ # ..., {Phoenix...

    strong-pubsub, 用于 node.js,浏览器,移动和物联网的PubSub.zip

    strong-pubsub, 用于 node.js,浏览器,移动和物联网的PubSub 安装$ npm install strong-pubsub使用注意:在版本之前,API可能会更改 ! var Client = require('strong-pubsub');var Adapter

    Python库 | fastapi_websocket_pubsub-0.1.15-py3-none-any.whl

    总的来说,"fastapi_websocket_pubsub"为FastAPI开发者提供了一个简单易用的工具,帮助他们轻松地实现WebSocket的发布/订阅功能,提升应用的实时性和交互性。无论你是初学者还是经验丰富的开发者,掌握这个库都能使...

Global site tag (gtag.js) - Google Analytics