`

flex blazeds推送机制

    博客分类:
  • flex
阅读更多
[color=orange]Java后台推动数据到flex
——姜可亮
前言:
对于数据实时性页面呈现非常强的业务,通过前台定时刷新这样的效率明显不行。下面通过flex技术来实现这个功能。
思路:
以告警数据为例,告警数据通过mq、socket等方式进入系统,在java服务器通过监听来监听消息队列数据,接收后推送到flex各个客户端。blazeDS采用amf协议。该协议可以传输Object, Array, Date, XML。由于AMF采用二进制编码,这种方式可以高度压缩数据,因此非常适合用来传递大量的资料。数据量越大,Flash Remoting的传输效能就越高,远远超过Web Service。至于XML, LoadVars和loadVariables() ,它们使用纯文本的传输方式,效能就更不能与Flash Remoting相提并论了

本例mq使用activeMq测试,推动到flex的方式采用adobe提供的java flex通信包blazeDS中提供的推送功能,

具体步骤:
一. 创建一个java flex使用blazeDS通信的工程,具体步骤在此不详细说明搭建的步骤。
搭建成功后导入activemq-all-5.1.0.jar,为后面接收activemq消息使用。
二. 配置:
在remoting-config.xml加入推送服务。
<destination id="serverpushserver">
<properties>
<source>
com.east.flex.serverpush.chart.ServerPushServer
</source>
</properties>
</destination>
1.服务发布 Java代码:ServerPushServer.Java
package com.east.flex.serverpush.chart;

/**
 * 服务发布类
 * 
 * @author East(姜可亮)
 * 推送方式参考于网络,感谢网络。自己添加了mq驱动。
 */
public class ServerPushServer {	
	//ServerPushThread serverpushthread = new  ServerPushThread();
	MessageBrokFactory messageBrokFactory;
	public void controlThread(String str){
		if(str.equals("start")){
			try{
				messageBrokFactory=new MessageBrokFactory();
				messageBrokFactory.start();
			}catch (Exception e){
				 e.printStackTrace();
			}
			
		//serverpushthread.start();
		}else{
			messageBrokFactory.stop();
			//serverpushthread.stop();
		}
	}
}


2.消息处理事务: messageBrokFactory
具有功能:启动mq,并监听mq接收消息,消息推送到flex.三大功能。

package com.east.flex.serverpush.chart;

import javax.jms.JMSException;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.naming.NamingException;

import org.apache.log4j.Logger;

import com.zznode.gixpert.provider.activemq.service.impl.ActiveMQAdapter;

import flex.messaging.MessageBroker;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.util.UUIDUtils;


/**
 * 消息处理事务
 * 功能:启动mq,并监听mq接收消息,消息推送到flex.三大功能。
 * @author 姜可亮
 * 2011-3-23
 * 
 * **/
public class MessageBrokFactory {
	/**
	 * flex消息组件
	 */
	private MessageBroker msgBroker;
	/**
	 * flex uuid
	 * */
	private String clientId ;
	
	/********************以下为mq初始化变量****************************/
	/**端口*/
	private  static int port=61616;
	/**mq服务器主机ip/主机名*/
	private static String hostname = "localhost";
	/**activemq用户名*/
	private  static String user = "";
	/**activeMq密码*/
	private  static String password= "";
	/**
	 * 队列名称
	 */
	private String queuename="sendToFlex";
	/**
	 * activemq服务器
	 * */
	ActiveMQAdapter amqAdapter;
	/**
	 * 采用的接受方式 默认是Queue
	 * 一个消息向多个客户端发送使用topic
	 * 一个消息只发送到一个客户端使用Queue
	 * */
	private static boolean  pattern;
	/**
	 * 是否采用消息持久化
	 * 暂时不用
	 * */
	private  static String durable="是";
	/**
	 * 睡眠时间
	 * 暂时不用
	 * */
	private  static String sleepTime="否";
	/**
	 * 超时时间   
	 * 暂时不用
	 */
	private  static String receiveTimeOut="否";
	/***********************初始化变量结束************************************/
	
	private Logger logger = Logger.getLogger(ActiveMQAdapter.class);

	public boolean sedMessage(String sub,Object message) {
		try{
			AsyncMessage msg = new AsyncMessage();
			msg.setDestination("serverpush");
			msg.setClientId(clientId);
			msg.setMessageId(UUIDUtils.createUUID());
			msg.setTimestamp(System.currentTimeMillis());
			/* 设置消息信息,发布到客户端去 */
			msg.setBody(message);
			msgBroker.routeMessageToService(msg, null);
			return true;
		}catch(Exception e){
			e.printStackTrace();
			return true;
		}
		
	}
	/*
	 * 启动activeMq采集
	 * 
	 * */
	public void start(){
		try {
			//初始化flex消息组建
			msgBroker = MessageBroker.getMessageBroker(null);
			clientId = UUIDUtils.createUUID();
			//启动mq消息接收
			amqAdapter = new ActiveMQAdapter();
			//版本1.0只需要对hostname  port 起作用 有更高需求再另添加
			amqAdapter.setUrl("failover:(tcp://"+hostname+":"+port+"?wireFormat.maxInactivityDuration=0)");
		
			amqAdapter.addMessageListener(queuename, pattern, new MyTestMessageListener());
		} catch (JMSException e) {
			String msg = "Cannot start AlarmCollectQueue.";
			logger.error(msg, e);
			System.out.println("set config ex"+e);
			e.printStackTrace();
		} catch (NamingException e) {
			String msg = "Naming context error.";
			logger.error(msg, e);
			System.out.println("set config ex"+e);
			e.printStackTrace();
		}
	}
	/*
	 * 停止activeMq采集
	 * 
	 * */
	public void stop() {
		amqAdapter.close();
	}
	/**
	 * 消息接收处理工厂
	 * */
	class MyTestMessageListener implements MessageListener {
		
		public void onMessage(Message msg) {
			try{
				//
				String message=((TextMessage)msg).getText();
				logger.info("recieve a message from: "+msg.getJMSDestination()+"  " +
						"message text: "+message);
				sedMessage("test",message);
			}catch(Exception e){
				e.printStackTrace();
				System.out.println("set config ex"+e);
			}
		}
	}

}

2.ActiveMq接收消息的代码见附件工程。在此不列出。
Flex端:本例以一个textarea文本来显示数据。告警数据具体可以使用datagrid

展现代码getMessageClient.mxml
引用
<?xml version="1.0" encoding="utf-8"?>
<mx:Application xmlns:mx="http://www.adobe.com/2006/mxml" layout="absolute">
	 <mx:Script>
    	<![CDATA[
    		import com.east.flex.entity.Product;
        import mx.messaging.messages.IMessage;
        import mx.messaging.events.MessageEvent;
        import mx.utils.ArrayUtil;
        import mx.rpc.events.ResultEvent;
        import mx.collections.ArrayCollection;
        import mx.controls.Alert;
        import mx.messaging.ChannelSet;
        import mx.messaging.Consumer;

		[Bindable]
		private var p:Product;
		
		private var array:ArrayUtil;
		
        [Bindable]
        private var expensesAC:ArrayCollection = null;
            
        public var consumer2:Consumer = null;
           
        public function messageHandler(event:MessageEvent):void
        {
        	textArea.text =  event.message.body+textArea.text;
        	trace("server push data to client..............");
        }
        
        
        public function remoteServerCallback(event:ResultEvent):void
        {
        	trace("Server is start....");
        	//Alert.show("Server is start....."+event.message);
        }
        
        public function startServer(event:MouseEvent):void
        {
//        	server.start();
			server.controlThread("start");
        }
        
        public function stopServer(event:MouseEvent):void
        {
//        	server.stop();
			server.controlThread("stop");
        }
    	]]>
    </mx:Script>
	
	<mx:Consumer id="consumer" destination="serverpush" message="messageHandler(event)"/>
	
	<mx:RemoteObject id="server" destination="serverpushserver"  result="remoteServerCallback(event)"/>
	
    <!-- Define custom colors for use as fills in the AreaChart control. -->
    <mx:SolidColor id="sc1" color="blue" alpha=".3"/>
    <mx:SolidColor id="sc2" color="red" alpha=".3"/>
    <mx:SolidColor id="sc3" color="green" alpha=".3"/>

    <!-- Define custom Strokes. -->
    <mx:Stroke id = "s1" color="blue" weight="2"/>
    <mx:Stroke id = "s2" color="red" weight="2"/>
    <mx:Stroke id = "s3" color="green" weight="2"/>

    <mx:Panel title="LineChart and AreaChart Controls Example" 
        height="100%" width="100%" layout="absolute">

     		<mx:TextArea id="textArea" width="1017" height="565">
     			
     		</mx:TextArea>

        
        
        <mx:Button id="autoOk" label="AutoRefresh" x="1038" y="87" width="107" click="consumer.subscribe()" enabled="{!consumer.subscribed}"/>
        <mx:Button id="cancel" x="1038" y="135" width="107" label="Cancel Auto "  click="consumer.unsubscribe()" enabled="{consumer.subscribed}"/>
        <mx:Button id="cancel0" x="1038" y="187" width="107" label="Start Server"  click="startServer(event)"/>
        <mx:Button id="cancel1" x="1038" y="237" width="107" label="Stop Server"  click="stopServer(event)"/>
    </mx:Panel>
</mx:Application>

使用操作:

在第一次打开的页面点击startserver.这个共是启动监听,注意只需要在第一次打开点击了就行,其他打开的页面不需要。再次点击
然后点击autorefesh,用来启动自动显示数据,注意autorefesh这不是自动刷新请求数据。是推动上来的数据。
其他打开的页面都只要点击autorefesh即可。

启动一个acitmvemq在提供的页面发送数据,可以一次发送几千条数据。同事打开几个页面来测试下下过。


[color=red]优化:[/color]
项目最近开始运行发现客户端web非常耗费cpu,非常头疼,当然这与我的数据量肯定是有关系的,我后台推送到前台的数据大约每秒100条。消息量少时streaming没有问题。
后来经过了多种情况的测试,最后发现把streaming设置为polling方式轮询推送占用的cpu资源一下子降了下来。后来吧轮询时间这只为0.3s效果比streaming还好。streaming方式是,消息上来的太快,表格显示时非常闪眼睛,用户没法操作,相反设为polling后这种现象降低了。
总结steaming耗费cpu大的原因是,flex执行死循环时比java执行死循环耗资源打多了。也就是对于一直占用cpu不释放。steaming方式消息量大了也跟执行死循环一样,一直在执行不释放cpu.
下面是改造后的配置
 <channel-definition id="my-polling-amf" class="mx.messaging.channels.AMFChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amfpolling" class="flex.messaging.endpoints.AMFEndpoint"/>
            <properties>
                <polling-enabled>true</polling-enabled>
                <polling-interval-seconds>0.2</polling-interval-seconds>
               
            </properties>
        </channel-definition>




end[/color]
6
4
分享到:
评论
12 楼 qq123zhz 2014-11-27  
neusoft_jerry 写道
大侠,能否看下你通过流channel实现的推送的源码吗?
我的tomcat7.0.22怎么一直也不好使呢?


这个是很久以前的
11 楼 neusoft_jerry 2014-11-26  
大侠,能否看下你通过流channel实现的推送的源码吗?
我的tomcat7.0.22怎么一直也不好使呢?
10 楼 zhanghuanhuan 2014-06-16  
可以指点一下吗
9 楼 zhanghuanhuan 2014-06-16  
运行没有反应呢
8 楼 286103480 2012-10-08  
学习了,不错的介绍。
7 楼 zy14shadow 2012-04-20  
例子版本低,无法运行!
6 楼 kuro_0505 2012-02-06  
你好,用你的列子跑起来怎么出现了异常,异常信息如下:

[DEBUG,FailoverTransport,ActiveMQ Task] Attempting connect to: tcp://localhost:61616?wireFormat.maxInactivityDuration=0
[DEBUG,FailoverTransport,ActiveMQ Task] Connect fail to: tcp://localhost:61616?wireFormat.maxInactivityDuration=0, reason: java.net.ConnectException: Connection refused: connect
[DEBUG,FailoverTransport,ActiveMQ Task] Waiting 20 ms before attempting connection.
[DEBUG,FailoverTransport,http-8080-1] Waiting for transport to reconnect.
5 楼 tt434341228 2011-10-25  
请问你这个实例如果放到flex+ssh环境下能跑吗?
4 楼 yu_xueli 2011-10-18  
有没有采用BYTESMESSAGE测试过?
3 楼 jklliang 2011-06-10  
最近这个应用到实践后由于我推送到前台的数据量非常大,导致flash非常占用cpu.正在寻求解决思路,有研究的朋友给提示一下。
2 楼 jklliang 2011-06-07  
qq123zhz 写道
你这个使用到了JMS,可以在不使用jms的情况下实现推送吗?

回一楼:
当然可以啊,jms是我数据的来源,你完全可以吧jms替换成其他的数据来源。文章核心在怎样将数据推送到flash
1 楼 qq123zhz 2011-04-07  
你这个使用到了JMS,可以在不使用jms的情况下实现推送吗?

相关推荐

    Flex + blazeds + Java推送

    Flex + blazeds + Java推送Demo 本例实现由Flex一端客户端发送消息, 然后由Java端在发布到所有订阅的其它Flex端. 里面有说明与源码, 还有一个直接放到Tomcat里面的直接发布的项目 小编使用工具 eclipse3.5 flex sdk...

    Flex BlazeDs 推数据生成lineChart实例

    Flex BlazeDs推数据生成lineChart实例是一个典型的前端与后端实时通信的应用场景,它结合了Adobe Flex技术用于前端用户界面的构建,BlazeDS作为数据推送服务,以及使用lineChart组件来展示动态更新的数据。...

    FLEX_Blazeds数据推送技术

    FLEX,BlazeDS,JAVA实现服务器端数据推送技术,下载后直接导到你的MYECLIPSE即可运行。先在IE内输入:http://localhost:8080/pushDemo/flex/pushDemo.html,点击“button”后,再另一个IE窗口内输入:...

    flex blazeDS开发使用手册,对于集成blaze开发很有用处。

    BlazeDS是一款开源的服务器端技术,它为富互联网应用(RIA)提供了数据推送、消息传递和Remoting功能,主要用于Flash/Flex与Java后端之间的通信。 1. **BlazeDS概述**: BlazeDS是Adobe Flex企业版的一部分,它...

    Flex BlazeDS

    flex blazeds实现服务器向客户端推送数据 环境Eclipse Java EE IDE 服务apache-tomcat-7.0.23-windows-x86 blazeds采用的是blazeds4.0.0.14931 (附件中包含 apache-tomcat-7.0.23 + 源码 + 说明文档)

    flex+spring+blazeds消息推送

    标题中的“flex+spring+blazeds消息推送”是指在开发富互联网应用程序(RIA)时,采用Adobe Flex作为前端UI框架,Spring作为后端服务层框架,BlazeDS作为数据通信中间件,实现服务器到客户端的消息实时推送功能。...

    Flex BlazeDS通信

    6. **实时推送(Push)**:BlazeDS支持基于HTTP的Long Polling和Streaming,实现服务器向客户端的实时数据推送。这种特性使得金融、聊天室等需要实时更新的应用场景得以实现。 7. **安全性和性能优化**:BlazeDS...

    Flex与Java的消息推送

    BlazeDS支持两种主要的推送机制:Polling和Long-Polling,以及更高级的WebSocket。 1. **Polling**:客户端定期向服务器发送请求,询问是否有新数据。如果有,服务器立即返回数据;如果没有,则可能等待一段时间后...

    flex 推送 例子

    有关于flex推送技术的工程例子,使用blazeds的StreamingAMFChannel 通道实现推送功能。(使用前先启动http://localhost:8080/flex-blazeds/TickCacheServlet?cmd=start)

    Flex Blazeds java实现企业级Web系统页面【源码】

    它支持EJB、Spring、Hibernate等Java企业框架,方便整合现有的企业服务,同时,Blazeds还支持数据推送,使得服务器可以主动将数据推送到客户端,提升了用户体验。 5. **源码分析**:在"FlexTest"这个压缩包中,可能...

    Flex blazeds封装实体

    BlazDS提供了丰富的数据服务,包括远程方法调用(Remote Method Invocation, RMI)、消息代理(Message Brokering)以及数据推送等功能,使得Flex应用程序能够轻松地与Java服务器进行数据交互。 在Flex BlazDS中,...

    flex+blazeds+spring

    BlazeDS是Adobe提供的一个免费服务器端技术,它为Flex客户端提供了与Java应用程序交互的能力,包括实时消息传递、Remoting和数据推送等功能。Spring是一个广泛使用的Java企业级应用开发框架,它简化了Java应用程序的...

    flex采用blazeds实现服务器向客户端推数据

    在IT行业中,Flex和...BlazDS的StreamingAMFChannel提供了低延迟、高吞吐量的通信机制,使得实时数据推送变得简单而可靠。在实际项目中,开发者还可以根据需求进行性能优化和安全控制,以满足各种复杂场景的需求。

    BlazeDS实现java后台消息推送flex前台接收的例子

    在"BlazeDS实现java后台消息推送flex前台接收的例子"中,我们主要关注以下几个关键知识点: 1. **BlazeDS架构**:BlazeDS包含了一组服务,如Remoting服务、LiveCycle Data Services(LCDS)以及MessageBroker服务。...

    消息推送 blazeds

    总结一下,BlazDS的消息推送功能使得Flex应用程序能够实现实时数据更新,通过消息订阅机制,服务器可以在有新数据时主动推送到客户端。在Eclipse工程中实现这一功能,包括设置BlazDS服务、配置消息通道、编写客户端...

    Flex基于Producer和Consumer方式的简易消息推送机制

    Flex中的消息推送机制是基于Producer和Consumer模型实现的,这种机制允许应用程序实时地发送和接收消息,常用于构建聊天室、通知系统等实时交互的应用。下面将详细解释Flex消息推送的相关知识点。 1. **Producer与...

    基于Java_BlazeDS_Flex_服务器消息推的聊天室

    在本场景中,我们将探讨如何利用BlazeDS和Flex实现服务器消息推送。 首先,让我们了解BlazeDS的基本概念。BlazeDS是Adobe Flex与Java后端进行数据交互的一个中间件,它支持AMF(Action Message Format)协议,能够...

    PureMVC Flex BlazeDS Spring Hibernate.doc

    - 提供了实时数据推送和远程调用的功能; - 使得 Flex 应用能够与后端服务高效通信。 #### 4. Spring - **特点**: - 一款流行的 Java 应用程序框架; - 提供了 IoC(Inversion of Control)容器和 AOP(Aspect ...

    Flex + BlazeDS + Java + JMS 通信实例(附源码)

    BlazeDS是Adobe提供的一个免费服务器端组件,它允许Flex客户端与Java后端进行双向通信,实现了轻量级的远程方法调用(RPC)以及数据推送功能。Java是一种广泛使用的面向对象的编程语言,常用于构建服务器端应用。JMS...

Global site tag (gtag.js) - Google Analytics