[color=orange]Java后台推动数据到flex
——姜可亮
前言:
对于数据实时性页面呈现非常强的业务,通过前台定时刷新这样的效率明显不行。下面通过flex技术来实现这个功能。
思路:
以告警数据为例,告警数据通过mq、socket等方式进入系统,在java服务器通过监听来监听消息队列数据,接收后推送到flex各个客户端。blazeDS采用amf协议。该协议可以传输Object, Array, Date, XML。由于AMF采用二进制编码,这种方式可以高度压缩数据,因此非常适合用来传递大量的资料。数据量越大,Flash Remoting的传输效能就越高,远远超过Web Service。至于XML, LoadVars和loadVariables() ,它们使用纯文本的传输方式,效能就更不能与Flash Remoting相提并论了
本例mq使用activeMq测试,推动到flex的方式采用adobe提供的java flex通信包blazeDS中提供的推送功能,
具体步骤:
一. 创建一个java flex使用blazeDS通信的工程,具体步骤在此不详细说明搭建的步骤。
搭建成功后导入activemq-all-5.1.0.jar,为后面接收activemq消息使用。
二. 配置:
在remoting-config.xml加入推送服务。
<destination id="serverpushserver">
<properties>
<source>
com.east.flex.serverpush.chart.ServerPushServer
</source>
</properties>
</destination>1.服务发布 Java代码:ServerPushServer.Java
package com.east.flex.serverpush.chart;
/**
* 服务发布类
*
* @author East(姜可亮)
* 推送方式参考于网络,感谢网络。自己添加了mq驱动。
*/
public class ServerPushServer {
//ServerPushThread serverpushthread = new ServerPushThread();
MessageBrokFactory messageBrokFactory;
public void controlThread(String str){
if(str.equals("start")){
try{
messageBrokFactory=new MessageBrokFactory();
messageBrokFactory.start();
}catch (Exception e){
e.printStackTrace();
}
//serverpushthread.start();
}else{
messageBrokFactory.stop();
//serverpushthread.stop();
}
}
}
2.消息处理事务: messageBrokFactory
具有功能:启动mq,并监听mq接收消息,消息推送到flex.三大功能。
package com.east.flex.serverpush.chart;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import org.apache.log4j.Logger;
import com.zznode.gixpert.provider.activemq.service.impl.ActiveMQAdapter;
import flex.messaging.MessageBroker;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.util.UUIDUtils;
/**
* 消息处理事务
* 功能:启动mq,并监听mq接收消息,消息推送到flex.三大功能。
* @author 姜可亮
* 2011-3-23
*
* **/
public class MessageBrokFactory {
/**
* flex消息组件
*/
private MessageBroker msgBroker;
/**
* flex uuid
* */
private String clientId ;
/********************以下为mq初始化变量****************************/
/**端口*/
private static int port=61616;
/**mq服务器主机ip/主机名*/
private static String hostname = "localhost";
/**activemq用户名*/
private static String user = "";
/**activeMq密码*/
private static String password= "";
/**
* 队列名称
*/
private String queuename="sendToFlex";
/**
* activemq服务器
* */
ActiveMQAdapter amqAdapter;
/**
* 采用的接受方式 默认是Queue
* 一个消息向多个客户端发送使用topic
* 一个消息只发送到一个客户端使用Queue
* */
private static boolean pattern;
/**
* 是否采用消息持久化
* 暂时不用
* */
private static String durable="是";
/**
* 睡眠时间
* 暂时不用
* */
private static String sleepTime="否";
/**
* 超时时间
* 暂时不用
*/
private static String receiveTimeOut="否";
/***********************初始化变量结束************************************/
private Logger logger = Logger.getLogger(ActiveMQAdapter.class);
public boolean sedMessage(String sub,Object message) {
try{
AsyncMessage msg = new AsyncMessage();
msg.setDestination("serverpush");
msg.setClientId(clientId);
msg.setMessageId(UUIDUtils.createUUID());
msg.setTimestamp(System.currentTimeMillis());
/* 设置消息信息,发布到客户端去 */
msg.setBody(message);
msgBroker.routeMessageToService(msg, null);
return true;
}catch(Exception e){
e.printStackTrace();
return true;
}
}
/*
* 启动activeMq采集
*
* */
public void start(){
try {
//初始化flex消息组建
msgBroker = MessageBroker.getMessageBroker(null);
clientId = UUIDUtils.createUUID();
//启动mq消息接收
amqAdapter = new ActiveMQAdapter();
//版本1.0只需要对hostname port 起作用 有更高需求再另添加
amqAdapter.setUrl("failover:(tcp://"+hostname+":"+port+"?wireFormat.maxInactivityDuration=0)");
amqAdapter.addMessageListener(queuename, pattern, new MyTestMessageListener());
} catch (JMSException e) {
String msg = "Cannot start AlarmCollectQueue.";
logger.error(msg, e);
System.out.println("set config ex"+e);
e.printStackTrace();
} catch (NamingException e) {
String msg = "Naming context error.";
logger.error(msg, e);
System.out.println("set config ex"+e);
e.printStackTrace();
}
}
/*
* 停止activeMq采集
*
* */
public void stop() {
amqAdapter.close();
}
/**
* 消息接收处理工厂
* */
class MyTestMessageListener implements MessageListener {
public void onMessage(Message msg) {
try{
//
String message=((TextMessage)msg).getText();
logger.info("recieve a message from: "+msg.getJMSDestination()+" " +
"message text: "+message);
sedMessage("test",message);
}catch(Exception e){
e.printStackTrace();
System.out.println("set config ex"+e);
}
}
}
}
2.ActiveMq接收消息的代码见附件工程。在此不列出。
Flex端:本例以一个textarea文本来显示数据。告警数据具体可以使用datagrid
展现代码getMessageClient.mxml
引用
<?xml version="1.0" encoding="utf-8"?>
<mx:Application xmlns:mx="http://www.adobe.com/2006/mxml" layout="absolute">
<mx:Script>
<![CDATA[
import com.east.flex.entity.Product;
import mx.messaging.messages.IMessage;
import mx.messaging.events.MessageEvent;
import mx.utils.ArrayUtil;
import mx.rpc.events.ResultEvent;
import mx.collections.ArrayCollection;
import mx.controls.Alert;
import mx.messaging.ChannelSet;
import mx.messaging.Consumer;
[Bindable]
private var p:Product;
private var array:ArrayUtil;
[Bindable]
private var expensesAC:ArrayCollection = null;
public var consumer2:Consumer = null;
public function messageHandler(event:MessageEvent):void
{
textArea.text = event.message.body+textArea.text;
trace("server push data to client..............");
}
public function remoteServerCallback(event:ResultEvent):void
{
trace("Server is start....");
//Alert.show("Server is start....."+event.message);
}
public function startServer(event:MouseEvent):void
{
// server.start();
server.controlThread("start");
}
public function stopServer(event:MouseEvent):void
{
// server.stop();
server.controlThread("stop");
}
]]>
</mx:Script>
<mx:Consumer id="consumer" destination="serverpush" message="messageHandler(event)"/>
<mx:RemoteObject id="server" destination="serverpushserver" result="remoteServerCallback(event)"/>
<!-- Define custom colors for use as fills in the AreaChart control. -->
<mx:SolidColor id="sc1" color="blue" alpha=".3"/>
<mx:SolidColor id="sc2" color="red" alpha=".3"/>
<mx:SolidColor id="sc3" color="green" alpha=".3"/>
<!-- Define custom Strokes. -->
<mx:Stroke id = "s1" color="blue" weight="2"/>
<mx:Stroke id = "s2" color="red" weight="2"/>
<mx:Stroke id = "s3" color="green" weight="2"/>
<mx:Panel title="LineChart and AreaChart Controls Example"
height="100%" width="100%" layout="absolute">
<mx:TextArea id="textArea" width="1017" height="565">
</mx:TextArea>
<mx:Button id="autoOk" label="AutoRefresh" x="1038" y="87" width="107" click="consumer.subscribe()" enabled="{!consumer.subscribed}"/>
<mx:Button id="cancel" x="1038" y="135" width="107" label="Cancel Auto " click="consumer.unsubscribe()" enabled="{consumer.subscribed}"/>
<mx:Button id="cancel0" x="1038" y="187" width="107" label="Start Server" click="startServer(event)"/>
<mx:Button id="cancel1" x="1038" y="237" width="107" label="Stop Server" click="stopServer(event)"/>
</mx:Panel>
</mx:Application>
使用操作:
在第一次打开的页面点击startserver.这个共是启动监听,注意只需要在第一次打开点击了就行,其他打开的页面不需要。再次点击
然后点击autorefesh,用来启动自动显示数据,注意autorefesh这不是自动刷新请求数据。是推动上来的数据。
其他打开的页面都只要点击autorefesh即可。
启动一个acitmvemq在提供的页面发送数据,可以一次发送几千条数据。同事打开几个页面来测试下下过。
[color=red]优化:[/color]
项目最近开始运行发现客户端web非常耗费cpu,非常头疼,当然这与我的数据量肯定是有关系的,我后台推送到前台的数据大约每秒100条。消息量少时streaming没有问题。
后来经过了多种情况的测试,最后发现把streaming设置为polling方式轮询推送占用的cpu资源一下子降了下来。后来吧轮询时间这只为0.3s效果比streaming还好。streaming方式是,消息上来的太快,表格显示时非常闪眼睛,用户没法操作,相反设为polling后这种现象降低了。
总结steaming耗费cpu大的原因是,flex执行死循环时比java执行死循环耗资源打多了。也就是对于一直占用cpu不释放。steaming方式消息量大了也跟执行死循环一样,一直在执行不释放cpu.
下面是改造后的配置
<channel-definition id="my-polling-amf" class="mx.messaging.channels.AMFChannel">
<endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amfpolling" class="flex.messaging.endpoints.AMFEndpoint"/>
<properties>
<polling-enabled>true</polling-enabled>
<polling-interval-seconds>0.2</polling-interval-seconds>
</properties>
</channel-definition>
end[/color]
分享到:
相关推荐
Flex + blazeds + Java推送Demo 本例实现由Flex一端客户端发送消息, 然后由Java端在发布到所有订阅的其它Flex端. 里面有说明与源码, 还有一个直接放到Tomcat里面的直接发布的项目 小编使用工具 eclipse3.5 flex sdk...
Flex BlazeDs推数据生成lineChart实例是一个典型的前端与后端实时通信的应用场景,它结合了Adobe Flex技术用于前端用户界面的构建,BlazeDS作为数据推送服务,以及使用lineChart组件来展示动态更新的数据。...
FLEX,BlazeDS,JAVA实现服务器端数据推送技术,下载后直接导到你的MYECLIPSE即可运行。先在IE内输入:http://localhost:8080/pushDemo/flex/pushDemo.html,点击“button”后,再另一个IE窗口内输入:...
BlazeDS是一款开源的服务器端技术,它为富互联网应用(RIA)提供了数据推送、消息传递和Remoting功能,主要用于Flash/Flex与Java后端之间的通信。 1. **BlazeDS概述**: BlazeDS是Adobe Flex企业版的一部分,它...
flex blazeds实现服务器向客户端推送数据 环境Eclipse Java EE IDE 服务apache-tomcat-7.0.23-windows-x86 blazeds采用的是blazeds4.0.0.14931 (附件中包含 apache-tomcat-7.0.23 + 源码 + 说明文档)
标题中的“flex+spring+blazeds消息推送”是指在开发富互联网应用程序(RIA)时,采用Adobe Flex作为前端UI框架,Spring作为后端服务层框架,BlazeDS作为数据通信中间件,实现服务器到客户端的消息实时推送功能。...
6. **实时推送(Push)**:BlazeDS支持基于HTTP的Long Polling和Streaming,实现服务器向客户端的实时数据推送。这种特性使得金融、聊天室等需要实时更新的应用场景得以实现。 7. **安全性和性能优化**:BlazeDS...
BlazeDS支持两种主要的推送机制:Polling和Long-Polling,以及更高级的WebSocket。 1. **Polling**:客户端定期向服务器发送请求,询问是否有新数据。如果有,服务器立即返回数据;如果没有,则可能等待一段时间后...
有关于flex推送技术的工程例子,使用blazeds的StreamingAMFChannel 通道实现推送功能。(使用前先启动http://localhost:8080/flex-blazeds/TickCacheServlet?cmd=start)
BlazeDS是Adobe提供的一个免费服务器端组件,它允许Flex客户端与Java后端进行双向通信,实现了轻量级的远程方法调用(RPC)以及数据推送功能。Java是一种广泛使用的面向对象的编程语言,常用于构建服务器端应用。JMS...
它支持EJB、Spring、Hibernate等Java企业框架,方便整合现有的企业服务,同时,Blazeds还支持数据推送,使得服务器可以主动将数据推送到客户端,提升了用户体验。 5. **源码分析**:在"FlexTest"这个压缩包中,可能...
BlazDS提供了丰富的数据服务,包括远程方法调用(Remote Method Invocation, RMI)、消息代理(Message Brokering)以及数据推送等功能,使得Flex应用程序能够轻松地与Java服务器进行数据交互。 在Flex BlazDS中,...
BlazeDS是Adobe提供的一个免费服务器端技术,它为Flex客户端提供了与Java应用程序交互的能力,包括实时消息传递、Remoting和数据推送等功能。Spring是一个广泛使用的Java企业级应用开发框架,它简化了Java应用程序的...
在IT行业中,Flex和...BlazDS的StreamingAMFChannel提供了低延迟、高吞吐量的通信机制,使得实时数据推送变得简单而可靠。在实际项目中,开发者还可以根据需求进行性能优化和安全控制,以满足各种复杂场景的需求。
在"BlazeDS实现java后台消息推送flex前台接收的例子"中,我们主要关注以下几个关键知识点: 1. **BlazeDS架构**:BlazeDS包含了一组服务,如Remoting服务、LiveCycle Data Services(LCDS)以及MessageBroker服务。...
总结一下,BlazDS的消息推送功能使得Flex应用程序能够实现实时数据更新,通过消息订阅机制,服务器可以在有新数据时主动推送到客户端。在Eclipse工程中实现这一功能,包括设置BlazDS服务、配置消息通道、编写客户端...
Flex中的消息推送机制是基于Producer和Consumer模型实现的,这种机制允许应用程序实时地发送和接收消息,常用于构建聊天室、通知系统等实时交互的应用。下面将详细解释Flex消息推送的相关知识点。 1. **Producer与...
在本场景中,我们将探讨如何利用BlazeDS和Flex实现服务器消息推送。 首先,让我们了解BlazeDS的基本概念。BlazeDS是Adobe Flex与Java后端进行数据交互的一个中间件,它支持AMF(Action Message Format)协议,能够...