首先先总结下jms规范下定义的实现接口:
ConnectionFactory 接口(连接工厂)
用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
Connection 接口(连接)
连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
Destination 接口(目标)
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
MessageConsumer 接口(消息消费者)
由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
MessageProducer 接口(消息生产者)
由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。
Message 接口(消息)
是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:
消息头(必须):包含用于识别和为消息寻找路由的操作设置。
一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。
消息接口非常灵活,并提供了许多方式来定制消息的内容。
Session 接口(会话)
表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。
p2p(point-to-point)模型:
其中包含几个比较重要的概念:消息队列(Queue),发送者(sender),接受者(receiver),每个消息被发送到规
定的消息队列中,并且该消息会被持久化。每个消息只能对应一个消费者,即使发送消息时,消费者没有在运行,仍然不影
响该消息被发送到消息队列,当消费者运行时,该消息就会被消费者接受到。下面以一个实例说明p2p模型的运行机制。
服务器端代码:
@MessageDriven(
activationConfig={
@ActivationConfigProperty(propertyName="destinationType" , propertyValue="javax.jms.Queue"), //指定使用的是queue
@ActivationConfigProperty(propertyName="destination" , propertyValue="queue/LeadfarQueue")//指定具体使用哪个queue
}
)
public class MdbBeanTest01 implements MessageListener {
public void onMessage(Message msg) {
// TODO Auto-generated method stub
try {
TextMessage tm = (TextMessage)msg;
String text =tm.getText();
System.out.println("服务器接收到了信息:" +text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
上面的“queue/LeadfarQueue”队列可以在jboss的配置文件中进行配置,具体路径如下:
JBOSS_HOME\server\default\deploy\messaging\destinations-service.xml
在此文件中添加:
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=LeadfarQueue"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
</mbean>
这样服务器端的测试代码就完成了。
客户端代码:
try {
InitialContext context = new InitialContext();
QueueConnectionFactory factory = (QueueConnectionFactory)context.lookup("ConnectionFactory");//创建ConnectionFactory
QueueConnection connection = factory.createQueueConnection(); //创建Connection
/*
* 第一个参数:true表示开启事务,最后要手动进行commit提交,false表示自动提交*/
QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);//创建会话
Queue destination = (Queue)context.lookup("queue/LeadfarQueue");//创建Destination
QueueSender sender = session.createSender(destination);//创建发送者
TextMessage msg = session.createTextMessage("mdbBean , 你好");//创建文本消息
sender.send(msg);//发送消息
System.out.println("客户端消息发送完成");
sender.close();
session.close();
connection.close();
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
这样就可以进行测试,测试结果:
服务器端调试信息:19:39:56,234 INFO [STDOUT] 服务器接收到了信息:mdbBean , 你好
验证p2p模式下消息的持久化的特性:
首先停止服务器端EJB的运行,再次运行客户端测试代码。此时,该消息仍然会发送到指定的消息队列,这时再次启动服务
器端的运行,消息仍然会发送到服务器端。说明,队列会将消息持久化。
PubSub模型
其中包含的几个概念:主题(Topic),发布者(Publisher),订阅者(Subscriber)。与p2p模型不同,p2p模型中消
息与接收者是一对一的关系,而在PubSub模型中,主题与订阅者是一对多的关系,也就是一个Topic会发送给多个订阅者
,但是主题并不会被持久化,也就是说如果在Topic发送时,订阅者程序没有运行,那么等到订阅者下次再次运行时,该
Topic也不会再发送给该订阅者,该Topic对该订阅者来说已经丢失。测试代码如下:
@MessageDriven(
activationConfig={
@ActivationConfigProperty(propertyName="destinationType" , propertyValue="javax.jms.Topic"),//指定使用的是Topic
@ActivationConfigProperty(propertyName="destination" , propertyValue="topic/LeadfarTopic")//指定使用哪个Topic
}
)
public class TopicMdbBeanTest01 implements MessageListener {
public void onMessage(Message msg) {
// TODO Auto-generated method stub
try {
TextMessage tm = (TextMessage)msg;
String text =tm.getText();
System.out.println("服务器接收到了信息:" +text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
“topic/LeadfarTopic”选项还是在上面的xml文件中进行指定,这个实例中,在服务器端定义了多个EJB,都是使用“
topic/LeadfarTopic”。
客户端代码:
try {
InitialContext context = new InitialContext();
TopicConnectionFactory factory = (TopicConnectionFactory)context.lookup("ConnectionFactory");
TopicConnection connection = factory.createTopicConnection();
TopicSession session = connection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Topic destination = (Topic)context.lookup("topic/LeadfarTopic");
TopicPublisher publisher = session.createPublisher(destination);
TextMessage msg = session.createTextMessage("topicMdbBean , 你好");
publisher.send(msg);
System.out.println("客户端消息发送完成");
publisher.close();
session.close();
connection.close();
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
与上面的客户端代码类似,这个测试结果如下:
服务器端调试信息:
20:24:57,281 INFO [STDOUT] 服务器接收到了信息:topicMdbBean , 你好
20:24:57,281 INFO [STDOUT] 服务器接收到了信息:topicMdbBean , 你好
20:24:57,281 WARN [InterceptorsFactory] EJBTHREE-1246: Do not use InterceptorsFactory with a ManagedObjectAdvisor, InterceptorRegistry should be used via the bean container
20:24:57,281 WARN [InterceptorsFactory] EJBTHREE-1246: Do not use InterceptorsFactory with a ManagedObjectAdvisor, InterceptorRegistry should be used via the bean container
20:24:57,281 INFO [STDOUT] 服务器接收到了信息:topicMdbBean , 你好
可见,服务器端的三个EJB都收到了该Topic。
如果将服务器端程序停止运行,再运行客户端程序,再重新启动服务器端程序,也不会收到该Topic,也就是说Topic被丢
失了。也可以将Topic持久化,方法如下:在@MessageDriven的配置中增加几个配置选项:
@MessageDriven(
activationConfig={
@ActivationConfigProperty(propertyName="destinationType" , propertyValue="javax.jms.Topic"),
@ActivationConfigProperty(propertyName="destination" , propertyValue="topic/LeadfarTopic"),
@ActivationConfigProperty(propertyName="SubscriptionDurability" ,propertyValue="Durable"),
@ActivationConfigProperty(propertyName="subscriptionName" ,propertyValue="sn"),
@ActivationConfigProperty(propertyName="clientId" , propertyValue="snClientId")
}
)
经过这样的配置后,该Topic对于配置成这样的EJB来说就进行了持久化,这样,这个EJB在Topic发送后再运行也能接收到
。以上就是两种模型各自的特点。
分享到:
相关推荐
JMS的两种消息模型P2P模型和PubSub模型实例.pdf
JMS pubsub实例 提供学习 希望有帮助 (*^__^*) 嘻嘻……
在`pubsub-master`这个压缩包中,可能包含了`pubsub-js`库的源码、示例、文档和测试等资源。通过阅读源码,开发者可以更深入地理解其工作原理,并根据需要进行定制和扩展。同时,测试用例可以帮助确保代码的正确性,...
**总结** Angular Global Pubsub 提供了一种高效且灵活的解决方案,使得手动引导的Angular应用程序能够优雅地处理组件间的通信问题。通过使用这个库,开发者可以构建更加健壮、可扩展的前端应用,同时保持代码的...
Pubsub Extensions for Smack
PubSub模式是一种常见的消息传递模型,其中生产者(Publisher)发布消息到一个主题(Topic),而消费者(Subscriber)则通过订阅这些主题来接收消息。在Go-longpoll中,它允许订阅者保持一个持久的HTTP连接,等待新...
- **轻量级**:Go-pubsub库的设计简洁,易于理解和使用,适合小型项目或作为大型系统的一部分。 - **非阻塞**:发布和订阅操作是非阻塞的,保证了高并发环境下的性能。 - **多订阅者**:一个主题可以有多个订阅者...
Pub/Sub是一个完全托管的消息传递服务,它允许应用程序之间通过发布和订阅模型进行通信。发布者发布消息到主题(Topic),订阅者则可以订阅这些主题并接收发布的消息。这种解耦设计使得系统更加灵活,能够处理高并发...
`django-pika-pubsub-0.4.tar.gz` 包的下载和安装就是利用了PyPI和pip。 5. **源代码分发**:`.tar.gz` 文件是Unix/Linux系统常用的归档压缩格式,用于打包和压缩源代码,便于分发和存储。开发者通常会将源代码打包...
**JXTA(Java eXtensible ...总结起来,JXTA是Java开发者构建P2P网络应用的强大工具,通过学习和实践,开发者不仅可以掌握P2P网络的基本概念和技术,还能利用JXTA提供的API高效地开发出复杂且具有弹性的分布式系统。
**Laravel 开发 - Lumen Pub/Sub 深度...总结,Lumen 的 Pub/Sub 能力使得开发者能够在微服务架构中实现灵活且可靠的通信。通过合理设计事件和监听器,结合合适的中间件,可以构建出高效、可扩展的 Lumen 应用程序。
在给定的“PubSub.rar”压缩包文件中,我们看到与实现这一模式相关的组件,包括`Monitor.ice`、`IceSub`和`IcePub`。这里我们将深入探讨IceStorm如何在冰川(Ice)框架下实现发布订阅服务。 首先,`IceStorm`是...
local_pubsub Dart的简单本地pubsub库。你可以做什么该库允许您订阅不同的主题,并向该主题的所有订阅者发送消息。 订阅主题void main () { PubSub pubsub = PubSub (); Subscription ? sub = pubsub ? . subscribe ...
在C++编程中,实现PubSub模型通常涉及到设计一个中间件或者框架,以便于管理发布者和订阅者的交互。以下是一些关键知识点: 1. **主题(Topic)**:主题是消息的分类,它定义了消息的类型。发布者将消息发送到特定...
"前端项目-pubsub-js.zip"是一个专注于此模式的JavaScript库,它允许在不直接耦合的情况下进行事件驱动的通信。在这个库中,我们主要关注的是`PubSubJS-master`这个目录,它包含了实现这一功能的源代码和其他相关...
一个WxPython界面利用pubsub 展示进程工作的进度条的例子,实际使用, 只要修改 WorkThread 里的 run 内容 及 MainFrame 里的 updateDisplay 内容即可。
用法要使用 Redis 作为你的 PubSub 适配器,只需将其添加到你的 deps 和应用程序的 Supervisor 树中# mix.exsdefp deps do [{:phoenix_pubsub_redis, "~> 3.0.0"}],end# application.exchildren = [ # ..., {Phoenix...
strong-pubsub, 用于 node.js,浏览器,移动和物联网的PubSub 安装$ npm install strong-pubsub使用注意:在版本之前,API可能会更改 ! var Client = require('strong-pubsub');var Adapter
总的来说,"fastapi_websocket_pubsub"为FastAPI开发者提供了一个简单易用的工具,帮助他们轻松地实现WebSocket的发布/订阅功能,提升应用的实时性和交互性。无论你是初学者还是经验丰富的开发者,掌握这个库都能使...