Message filtering
Messaging Service允许给Producer组件添加message headers
或者增加subtopic
信息。然后Consumer组件在调用subscribe()方法的时候会把它的过滤信息发送回服务器进行消息过滤
。
对于更高级复杂的消息
,建议使用mx.messaging.MultiTopicConsumer
和mx.messaging.MultiTopicProducer
。
Using selectors:使用message headers
Producer
:message header的名字不能为
“JMS
”和“DS
”这些保留字符串
。
var message:AsyncMessage = new AsyncMessage();
message.headers = new Array();
message.headers["prop1"] = 5;
message.body = input.text;
producer.send(message);
Consumer
:使用Consumer.selector
来制定一个message selector
<mx:Consumer id="consumer"
destination="chat"
selector="prop1 > 4"
message="messageHandler(event);"/>
Using subtopics
-
不能在JMS destination
上使用subtopics
。但是可以在JMS destination
上使用message headers
。
-
producer
.subtopic
= "chat.fds.newton"; // 指定Producer派发mesage的subtopic
-
consumer
.subtopic
= "chat.fds.*"; // 指定Consumer可以接收的subtopic;在consumer.subscribe();之前设置
- 在destination中允许使用subtopic,默认通配符是“.”
,可以不用指定
<destination id="chat">
<properties>
<network>
<subscription-timeout-minutes>0</subscription-timeout-minutes>
</network>
<server>
<message-time-to-live>0</message-time-to-live>
<allow-subtopics>true</allow-subtopics>
<subtopic-separator>.</subtopic-separator>
</server>
</properties>
<channels>
<channel ref="my-polling-amf"/>
</channels>
</destination>
Configuring the Messaging Service
示例
<service id="message-service"
class="flex.messaging.services.MessageService">
<adapters>
<adapter-definition id="actionscript"
class="flex.messaging.services.messaging.adapters.ActionScriptAdapter"
default="true"/>
<adapter-definition id="jms"
class="flex.messaging.services.messaging.adapters.JMSAdapter"/>
</adapters>
<destination id="chat-topic">
<properties>
<server>
<message-time-to-live>0</message-time-to-live>
</server>
</properties>
<channels>
<channel ref="samples-rtmp"/>
<channel ref="samples-amf-polling"/>
</channels>
</destination>
</service>
Configuring the adapter
如果
在destination里不指定
adapter,那么destination会使用缺省的
在service里<adapters>标签中定义的adapter。
给destination自定义引用adapter
<destination id="chat-topic">
<adapter ref="actionscript"/>
</destination>
Defining the destination
-
Setting network
properties in the destination
<destination id="chat-topic">
<properties>
<network>
<throttle-inbound policy="ERROR" max-frequency="50"/>
<throttle-outbound policy="ERROR" max-frequency="500"/>
</network>
</properties>
</destination>
subscription-timeout-minutes
: 订阅者自动退订时间
;即N分钟内没有收到message就自动退订;该值默认为0,表示永远不会强制退订。
throttle-inbound
:max-frequency属性表示该destination每秒接收
消息的最高值;policy属性表示当达到mesage limit的时候该如何处理:NONE
:不采取任何措施;ERROR
:不接收多出的mesage并且发送error到客户端;IGNORE
:不接收多出的message但不发送error到客户端。
throttle-outbound
:max-frequency属性表示该destination每秒发送
消息的最高值;policy属性表示当达到mesage
limit的时候该如何处理:NONE
:不采取任何措施;ERROR
:不发送多出的mesage并且发送error到客户端;IGNORE
:不发送多出的
message但不发送error到客户端。
-
Setting server
properties in the destination
<destination id="chat-topic">
<properties>
...
<server>
<message-time-to-live>0</message-time-to-live>
</server>
</properties>
</destination>
allow-subtopics
:允许
destination中使用subtopics
。
cluster-message-routing
:destination使用的message-route的模式:server-to-server
(default) 和 broadcast
。前者表示只有data message路由到servers,但是subscribe and unsubscribe messages全部通过cluster(集群)来broadcast
;后者表示全部messages全部通过cluster来broadcast。
message-time-to-live
:表示某条message的存活时间(毫秒)
,超过这个时间就会被抛弃;0表示这条message永远不会过期。
send-security-constraint
:接收消息安全限制,针对Producer
组件,下面有示例代码
subscribe-security-constraint
:发送消息安全限制,针对Consumer
组件,下面有示例代码
subtopic-separator
:设置subtopic的通配符
;默认是“.”。
-
Referencing message channels
in the destination
<destination id="chat-topic">
...
<channels>
<channel ref="samples-rtmp"/>
<channel ref="samples-amf-polling"/>
</channels>
...
</destination>
-
Applying security
to the destination
<destination id="chat">
...
<properties>
<server>
<send-security-constraint ref="sample-users"/>
<subscribe-security-constraint ref="sample-users"/>
</server>
</properties>
...
</destination>
Creating a custom Message Service adapter
- 一个Message Service adapter类必须继承flex.messaging.services.MessageServiceAdapter
类。
- 需要创建一个flex.messaging.MessageService
的实例
。
- 需要一个invoke()
方法,接收客户端传过来的消息,再将处理好的消息发送回客户端。
- MessageService.pushMessageToClients
()方法向客户端发送消息。第一个参数是消息对象,第二个参数是布尔值,表示是否使用消息选择器语句。
- MessageService.sendPushMessageFromPeer
()方法用来在集群环境
下向每个服务器节点广播消息。第一个参数是消息对象,第二个参数是布尔值,表示是否使用消息选择器语句。
package com;
import java.util.Random;
import flex.messaging.MessageBroker;
import flex.messaging.io.ArrayCollection;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.Message;
import flex.messaging.services.MessageService;
import flex.messaging.services.ServiceAdapter;
import flex.messaging.util.UUIDUtils;
public class RandomNumberGenerator extends ServiceAdapter {
private static DataGenerator thread;
private static ArrayCollection randomNumbers = new ArrayCollection();
public RandomNumberGenerator()
{
Random random = new Random();
for (int i = 0; i < 6; i++)
{
randomNumbers.add(new Integer(random.nextInt(100)));
}
}
public void start()
{
if (thread == null)
{
thread = new DataGenerator();
thread.start();
}
}
public void stop()
{
thread.running = false;
thread = null;
}
public static class DataGenerator extends Thread
{
public boolean running = true;
public void run()
{
MessageBroker msgBroker = MessageBroker.getMessageBroker(null);
String clientID = UUIDUtils.createUUID();
Random random = new Random();
while (running)
{
randomNumbers.clear();
{
for (int i = 0; i < 6; i++)
{
randomNumbers.add(new Integer(random.nextInt(100)));
}
}
AsyncMessage msg = new AsyncMessage();
msg.setDestination("RandomDataPush");
msg.setClientId(clientID);
msg.setMessageId(UUIDUtils.createUUID());
msg.setBody(randomNumbers);
msgBroker.routeMessageToService(msg, null);
try
{
Thread.sleep(1000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}
public Object invoke(Message message)
{
if (message.getBody().equals("New"))
{
return randomNumbers;
} else
{
AsyncMessage newMessage = (AsyncMessage) message;
newMessage.setBody(randomNumbers);
MessageService msgService = (MessageService) getDestination().getService();
msgService.pushMessageToClients(newMessage, false);
}
return null;
}
}
<adapters>
...
adapter-definition id="cfgateway" class="foo.bar.SampleMessageAdapter"/>
...
</adapters>
<destination id="chat">
<adapter ref="cfgateway" />
<channels>
<channel ref="my-polling-amf"/>
</channels>
</destination>
分享到:
相关推荐
- **核心组件**:Messaging Service是LCDS 2.6的核心组成部分,它的性能直接影响整个服务器的效率。 - **通信协议**:该服务可以通过Real Time Messaging Protocol (RTMP) 和 HTTP 协议与客户端进行通信。文档中的...
LCDS提供了多种服务,包括Remoting服务(用于远程方法调用),Dataservice服务(支持ORM映射,方便与数据库交互),以及Messaging服务(支持发布/订阅模型的事件传递)。这些服务使得Flex应用能够无缝地与Java服务...
var remoteService:RemotingObject = new RemotingObject(); remoteService.destination = "myService"; ``` 4. 调用后台方法:通过RemotingObject实例调用后台服务的方法,例如: ```as3 remoteService....
- BlazeDS/Lightweight Messaging (LCDS):Adobe提供的中间件,支持Flex与Java应用之间的数据传输。BlazeDS是开源版本,LCDS是商业版本,两者都支持AMF协议,提供高效的数据序列化和反序列化。 - Java:后端服务,...
数据服务的架构涉及多个方面,包括远程调用(Remoting)、消息传递(Messaging)和媒体流(Media Streaming)。 #### 远程调用与RPC Flex应用程序可以通过Data Services访问Java服务器端。具体来说,Flex可以访问...
1. **LiveCycle Data Services (LCDS)**:BlazeDS是LCDS的一个开源版本,提供了一套完整的数据服务,包括Remoting、Messaging、Data Management和Service Invocation。 2. **Remoting**:通过AMF(Action Message ...
var remoteService:RemoteObject = new RemoteObject(); remoteService.destination = "javaService"; remoteService.javaMethod.addEventListener(ResultEvent.RESULT, onResult); remoteService.javaMethod....
在Java环境中,通常与 BlazeDS 或 LCDS 服务一起使用,它们提供了AMF通道,使得Flex与Java应用程序之间的通信变得简单。 配置RemoteObject的第一步是在Flex客户端创建一个RemoteObject实例。在MXML文件中,你可以...
- **LiveCycle Data Services ES**:进一步发展为 LCDS 2.5.1,并与 ColdFusion 8 实现集成安装。 #### 二、BlazeDS 的核心功能 ##### 1. **远程处理(Remoting)** - **概念**:Flex 和 AIR 应用程序可以通过 mx...
BlazeDS还可以与其他消息系统集成,例如JMS(Java Messaging Service)和ActiveMQ。由于其远程技术和消息系统都采用了基于HTTP的AMF数据传输方式,因此在性能方面具有明显优势。 #### 二、BlazeDS的组成部分 ...
`<service>`标签是顶级元素,它定义了一个名为`remoting-service`的服务,类型为`flex.messaging.services.RemotingService`。服务中的`<adapters>`部分定义了适配器,这里默认使用的是`JavaAdapter`,允许Flex...
BlazeDS提供了丰富的消息传递服务,包括Remoting、LiveCycle Data Services (LCDS) 和 Messaging,使得开发人员可以方便地构建富互联网应用程序(RIA)。在本文中,我们将深入探讨如何使用BlazeDS实现Flex与Java之间...
RemotingObject是Flex的 Cairngorm 框架中的一部分,它提供了与后端服务(如AMF服务器,如BlazeDS或LCDS)交互的能力。RemotingObject允许我们调用远程方法,就像调用本地对象的方法一样,极大地简化了客户端与...
它源于Adobe的另一款产品LiveCycle Data Services(LCDS)的开源版本,其核心优势在于提供了以下几项关键功能: 1. **数据访问**:允许客户端(如Flex、AIR应用)通过AMF协议与Java后端进行高效数据交互。 2. **...
1. **LiveCycle Data Services (LCDS)**:这是BlazeDS的一个高级版本,提供了更全面的企业级功能,如数据缓存、事务处理和用户会话管理等。 2. **Remoting Services**:提供AMF通道,使得Flex客户端可以通过HTTP...
BlazeDS是一款由Adobe公司推出的免费开源产品,它是Lightweight ColdFusion Data Services (LCDS)的精简版,主要用于Web开发,特别是针对Flex和AIR客户端。BlazeDS的主要目标是促进客户端与服务端之间的数据交互,...
6. ** Messaging **:如果应用需要实时通信或发布/订阅模式,可以利用Flex的LiveCycle Data Services(LCDS)或者BlazeDS的MessageBroker,配合JMS(Java Message Service)实现实时数据传输。 7. ** Error ...
BlazeDS是一款由Adobe公司推出的免费开源产品,它是Lightweight ColdFusion Data Services (LCDS)的开源简化版。BlazeDS的主要目的是为Flex和AIR客户端提供与Java服务端之间的高效通信。它提供了多种功能,包括AMF...
BlazeDS是Adobe LiveCycle Data Services (LCDS)的一个分支,它提供了一种基于AMF(Action Message Format)的数据传输协议,使得Flex客户端能够直接调用Java服务端的方法。这一过程的核心就是反射机制,它允许运行...