简介: 服务器推送技术已经出来一段时间了,业界上也有不少基于这种技术(应该说是设计模式)的开源实现,但是要移植或者说应用到自己的项目上都比较麻烦。Dojo 这样一个大型的 Web2.0 开发框架提供了一套封装好的基于服务端推送技术的具体实现(包括服务端 Java 和客户端 Web 和 JavaScript),它基于 Bayeux 协议,提供了一些简单而且强大的接口可以让你快速构建自己的服务端推送功能。客户端实现即 Dojo 的 Cometd 前端组件,它封装了建立连接、消息订阅等等接口。服务端基于 Jetty 和 annotation,组建消息推送机制,同样也封装了比较简单但实用的消息推送接口,与前端 Dojox 的 Cometd 接口协同工作。这篇文章将重点介绍 Dojo 的服务端推送机制是如何运作的,以及我们应该如何基于 Dojo 的 Cometd 工具包构建自己的服务端推送功能。
服务器推送技术和 Bayeux 协议简介
服务器推送技术的基础思想是将浏览器主动查询信息改为服务器主动发送信息。服务器发送一批数据,浏览器显示这些数据,同时保证与服务器的连接。当服务器需要再次发送一批数据时,浏览器显示数据并保持连接。以后,服务器仍然可以发送批量数据,浏览器继续显示数据,依次类推。基于这种思想,这里我们要引出 Bayeux 协议。
Bayeux 是一套基于 Publish / Subscribe 模式,以 JSON 格式在浏览器与服务器之间传输事件的通信协议。该协议规定了浏览器与服务器之问的双向通信机制,克服了传统 Web 通信模式的缺点。
Bayeux 协议主要基于 HTTP 来传输低延迟的、异步的事件消息。这些消息通过频道 (Channels) 来投递,能够实现从服务器端到客户端、从客户端到服务器端或者通过服务器从一个客户端到另一个客户端的传送。Bayeux 协议的主要目的是为使用了 Ajax 和 Comet 技术的 Web 客户端实现高响应的用户交互。Bayeux 协议旨在通过允许执行者更容易的实现互操作性,来降低开发 Comet 应用程序的复杂性。它解决了共同的消息发布和路由问题,并提供了渐进式的改进和扩展机制。
一般情况下,在 HTTP 协议中,Client 要想获得 Server 的消息,必须先自己发送一个 Request,然后 Server 才会给予 Response。而 Bayeux 协议改变了这个情况,它允许 Server 端异步 Push 自己的消息到 Client 端。从而实现了 Client 和 Server 之间的双向操作模式。
回页首
服务器推送技术的一个简单实现
基于 Bayeux 协议实现服务器推送技术的方式有很多,可以通过 Flex 或者 Java 的 Applet。基于这两种技术,我们可以建立在客户端建立服务套接字接口,“双向操作模式”自然很容易实现,但是这些方式需要除浏览器以外的运行环境的支持。这里我们希望能采用一种纯脚本的方式,这种方式是不可能建立服务套接字接口的,那如何实现基于 Bayeux 协议的服务器推送呢?其实是可以模拟实现的,主要有两种方式:
1. 基于 HTTP 的长轮询来进行消息通信(基于 Ajax 的长轮询(long-polling)方式)。
2. 基于 Iframe 及 htmlfile 的流(streaming)方式。
这里我们采用第一种方式实现,即:客户端先向服务器端发送一个 HTTP Request,服务器端接收到后,阻塞在那边,等服务器有消息的时候,则返回一个 HTTP Response 给客户端,客户端收到后,断开连接,紧接着再发第二个 HTTP Request,以此反复进行,保持这个“长轮询”。期间,如果连接超时,那么会断开重连,以保持连接。
基于以上的思想,我们来看一下一个简单的实现,这个简单实现是基于 PHP 的。示例很简单,即便没用过 PHP 也能够很容易看明白,而且我们会在后面一一作出解释。
这个示例主要实现这样一个功能:
我们在浏览器里面分别打开三个窗口,并访问同一张页面。修改其中一个页面上的内容,另外两个页面上的内容也随即发生变化(注意:这里不用刷新页面)。这就会给我们一种:数据是服务器推送过来的感觉。
图 1. 简单服务器推送示例 -- 内容修改前
我们修改其中第一个窗口(左上)的内容(输入“222”,点击“Send”按钮,发送到后台)。此时不仅第一个窗口的内容变化了,其余两个窗口的内容也随即变化。
图 2. 简单服务器推送示例 -- 内容修改
接下来我们来看看示例代码吧:
清单 1. 简单服务器推送 -- 前端代码 HTML
<form action="" method="get"
onsubmit="comet.doRequest($('word').value);$('word').value='';return false;">
<input type="text" name="word" id="word" value="" />
<input type="submit" name="submit" value="Send" />
</form>
这个是我们所看到的输入框和提交按钮,大家可以注意一下它的“onsubmit”方法:当我们输入内容并点击提交时,它会执行“comet.doRequest($('word').value)”方法向后端发起请求(其实在这之前我们就已经建立了与服务端的长轮询并可随时开始服务器推送数据)。接下来我们来看看这个“comet”是什么样子的以及他的 Request 的具体实现:
清单 2. 简单服务器推送 -- 前端代码 JavaScript
[javascript] view plaincopy
var Comet = Class.create();
Comet.prototype = {
timestamp: 0,
url: './backend.php',
noerror: true,
initialize: function(){
},
connect: function(){
this.ajax = new Ajax.Request(this.url, {
method: 'get',
parameters: {
'timestamp': this.timestamp
},
onSuccess: function(transport){
var response = transport.responseText.evalJSON();
this.comet.timestamp = response['timestamp'];
this.comet.handleResponse(response);
this.comet.noerror = true;
},
onComplete: function(transport){
if (!this.comet.noerror) setTimeout(function(){
comet.connect()
}, 5000);
else
this.comet.connect();
this.comet.noerror = false;
}
});
this.ajax.comet = this;
},
handleResponse: function(response){
$('content').innerHTML += '<div>' + response['msg'] + '</div>';
},
doRequest: function(request){
new Ajax.Request(this.url, {
method: 'get',
parameters: {
'msg': request
}
});
}
}
var comet = new Comet();
comet.connect();
我们先看最后两段代码,这里是页面初始化时会执行的代码,其实在这里,我们就建立了一服务端的长轮询,我们来看看“connect”方法的实现吧:
“connect”方法这里是发了一个 Ajax 请求,然后分别设定了成功时(onSuccess)的返回处理和请求完成时(onComplete)的处理(注意 onComplete 不论成功失败都会执行)。我们要挂住这里的 onComplete 方法。可以看到,当请求完成时,如果连接有问题,它会过 5 秒重新连接,;如果没有问题,他会立即重新连接。
相信大家看到这里应该会有点眉目了,这里其实没有什么所谓的恒定不断的连接(类似 TCP 方式),它的真正实现是通过不断的 Ajax 请求实现的。
所以,当我们开启 3 个窗口时,其实我们打开了 3 个模拟的不间断的客户端与服务端的连接,所以他们会即时解到服务端的信息,不需要刷新页面。
我们再来看看服务端的实现,看看他是如何推送的:
清单 3. 简单服务器推送 -- 后端代码 PHP
[php] view plaincopy
$filename = dirname(__FILE__).'/data.txt';
// 将新消息存入文件中
$msg = isset($_GET['msg']) ? $_GET['msg'] : '';
if ($msg != '')
{
file_put_contents($filename,$msg);
die();
}
// 这是一个无限循环,一旦发现文件被修改,便会跳出循环并返回文件修改数据。如果文件一直没有修改,则会一
// 直处于循环检测状态,此时的 Ajax 连接也会一直保留,直到文件被修改为止,这就是所谓的“长轮询”。
$lastmodif = isset($_GET['timestamp']) ? $_GET['timestamp'] : 0;
$currentmodif = filemtime($filename);
while ($currentmodif <= $lastmodif) // 检测文件是否被修改
{
usleep(10000); // sleep 10ms to unload the CPU
clearstatcache();
$currentmodif = filemtime($filename);
}
// 返回 JSON 数组
$response = array();
$response['msg'] = file_get_contents($filename);
$response['timestamp'] = $currentmodif;
echo json_encode($response);
flush();
我们可以参照上面的注释理解该代码,其实并不需要多少 PHP 的知识。服务端推送技术不是一个开发用的控件库,而是一个思想。这里的 while 循环便说明了服务端推送是如何保留所谓的“长轮询”的。
现在大家应该明白为什么三个窗口会同步变化了。其主要的核心思想就是服务端“握住”长轮询,然后在适当的时候“放手”。
回页首
Dojo 的 Cometd 工具包简介
之前我们是基于 JavaScript 自己实现了一个简单的 Cometd 应用,我们花了大量的代码来建立一个 Cometd 框架,真正用于处理我们自己的业务逻辑的代码其实就是“handleResponse”里面的那一行。我们能不能吧这些通用的代码省掉呢?答案是肯定的。Dojo 已经对 Cometd 做了封装,基于 Dojo 的 Cometd 包,我们不用再浪费大量的代码在搭建 Cometd 框架上。对于前端脚本代码,我们只需要加上一个 Cometd 包的简单接口代码,便可以开始加入我们自己的业务逻辑代码了。
当然,Dojo 的 Cometd 包还包括后端的代码,可以在 Dojo 的官网下载中找到,它不与 Dojo 包一起发布,是一个单独的服务端开源代码,基于 Java 和 Jetty 的,有兴趣的读者可以下载下来研究一下。
通过 Dojo 的这两部分代码,我们便可以迅速地搭建我们的 Cometd 框架,我们剩下需要做的就是加入我们的业务逻辑。
回页首
Dojo 的 Cometd 工具包之前端
接下来我们来看看 Dojo 的 Cometd 工具包的前端封装:
清单 4. Cometd 前端初始化
[javascript] view plaincopy
dojox.cometd.init("http://www.xxx.com/cometd");
这个接口用于建立并初始化与服务端的握手连接(Bayeux handshake,初始化了“Bayeux communication” 消息通讯)。建立这个连接是基于 Bayeux 协议的,它主要有两个任务:
客户端与服务端协商传输的消息类型。
如果协商成功,服务端会通知客户端具体的请求参数配置。
如果协商失败,客户端重新发起协商流程。
我们深入 Dojo 的 init 方法内部可以看到握手连接的具体实现过程,它的实现也是不间断的重复发送客户端的 Ajax 请求,与我们之前的自制案类似,有兴趣的同学可以参考如下代码(摘取部分):
清单 5. Cometd 内部机制
[javascript] view plaincopy
this.init = function(...){
............
var bindArgs = {
url: this.url,
handleAs: this.handleAs,
content: { "message": dojo.toJson([props]) },
load: dojo.hitch(this,function(msg){
this._backon();
this._finishInit(msg);
}),
error: dojo.hitch(this,function(e){
this._backoff();
this._finishInit(e);
}),
timeout: this.expectedNetworkDelay
};
..............
if(this._isXD){
r = dojo.io.script.get(bindArgs);
}else{
r = dojo.xhrPost(bindArgs);
}
..............
}
this._finishInit = function(data){
..................
if(successful){
........
//ajax request inside
this.tunnelInit = transport.tunnelInit && dojo.hitch(transport,
"tunnelInit");
this.tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport,
"tunnelCollapse");
transport.startup(data);
}else{
if(!this._advice || this._advice["reconnect"] != "none"){
setTimeout(dojo.hitch(this, "init", this.url, this._props),
this._interval());
}
}
....................
}
可见,它们的 callback 方法里面都带有对自己本身的调用,这里的”init“方法也不例外。细心的读者可能还会发现,其实从例子上可以看出:Dojo 的 Cometd 也支持跨域,它的跨域是通过“script”的方式实现的。这里有一点需要大家了解,我们默认的服务端推送实现方式是长轮询(long-polling)模式,遇到跨域时,“long-polling”便不再适用,转为基于“script”的返回调用(callback-polling)模式。
接下来我们再来看看 Cometd 中关于消息推送的一些接口,这些消息通讯主要是基于渠道:
清单 6. Cometd 前端发布消息
[javascript] view plaincopy
dojox.cometd.publish("/service/echo", { msg: msg });
这里的所谓“发布消息”就是向后端发送消息,用于前端主动向后端推送。
这里的第一个参数是发送消息的渠道标识(channel),这种“channel”共有三种类型:
1. 元渠道(meta channels):示例“/meta/connect”(通常以“/meta/”为开头)。元渠道主要不是用来消息传输,而是用于客户端监听,如握手连接或者网络连接等等的错误。通常我们会在客户端调用“addListener()”来开启监听元渠道,它可以在握手连接的建立之前就开启监听,而且这种消息监听是同步的。
2. 服务渠道(service channels):示例“/service/connect”(通常以“/service/”为开头)。它主要用于私有消息通讯,主要是一对一的通讯。通常我们会在客户端调用“subscribe()”来订阅服务渠道消息。服务渠道只有等握手连接建立好后才能开启,而且它是异步通讯的。
3. 普通渠道(normal channels):示例“/foo/bar”(无限制)。这种渠道没有什么限制,主要用于广播消息,即:多个客户端订阅了一个服务,该服务可以通过普通渠道进行消息广播。
渠道是通信的基础模式,我们可以根据需要选择相应的渠道模式。
第二个参数则是消息对象,这里的“msg”则是消息内容。
有一点要注意:这里的“publish”是基于 Bayeux 协议的,采用的异步消息传输机制,所以它是在服务端(Bayeux 服务器)收到消息之前就返回的。所以 publish 的返回并不代表服务端收到你 publish 的消息了。
Dojo 的 Cometd 还支持批量发送消息,通过这个接口可以有效地避免不必要的网络消息传输的浪费:
清单 7. Cometd 前端批量发布消息
[javascript] view plaincopy
// 方法 1
cometd.batch(function()
{
cometd.publish('/channel1', { product: 'foo' });
cometd.publish('/channel2', { notificationType: 'all' });
cometd.publish('/channel3', { update: false });
});
// 方法 2
cometd.startBatch()
cometd.publish('/channel1', { product: 'foo' });
cometd.publish('/channel2', { notificationType: 'all' });
cometd.publish('/channel3', { update: false });
cometd.endBatch()
上述两种方案都可以实现消息的批量发送,推荐使用方法 1。
接下来我们看看服务端的消息推送:
清单 8. Cometd 前端订阅消息
[javascript] view plaincopy
dojox.cometd.subscribe("/service/echo",echoRpcReturn);
function echoRpcReturn(msg){
dojo.byId("responses").innerHTML += msg;
}
这里所谓的“订阅消息”,其实就是接收服务端推送的消息,是后端主动向前端推送。这也是服务端推送的精华所在,同样也是很简单的一行代码。
这里我们看到了一个熟悉的方法 --- “subscribe”,之前我们已经介绍过了,它主要用于订阅服务渠道私有消息,这里就是它用法的一个示例。对应的服务端 Service 向对应的前端订阅者推送消息,这里就是通过“echo”渠道向前端推送消息,他会回调“echoRpcReturn”方法,并传入推送的消息作为实参。对于后端的每次推送,都会调用前端的“echoRpcReturn”方法。
回页首
Dojo 的 Cometd 工具包之后端
Dojo 的 Cometd 工具包的后端实现是基于 Java 和 Jetty 组件的,通过 Dojo 的服务端 Cometd 组件,我们同样能极其迅速的构建 Cometd 框架。我们需要做的仅仅是加入我们的业务逻辑代码即可。
先来看看 web.xml 的配置参数:
清单 9. 基本配置参数(web.xml)
[html] view plaincopy
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
version="2.5">
<servlet>
<servlet-name>cometd</servlet-name>
<servlet-class>
org.cometd.server.continuation.ContinuationCometdServlet
</servlet-class>
<init-param>
<param-name>timeout</param-name>
<param-value>60000</param-value>
</init-param>
</servlet>
<servlet-mapping>
<servlet-name>cometd</servlet-name>
<url-pattern>/cometd/*</url-pattern>
</servlet-mapping>
<filter>
<filter-name>cross-origin</filter-name>
<filter-class>org.eclipse.jetty.servlets.CrossOriginFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>cross-origin</filter-name>
<url-pattern>/cometd/*</url-pattern>
</filter-mapping>
</web-app>
这里我们先来看看“ContinuationCometdServlet”,这个 Servlet 主要用于解释 Bayeux 协议,所以关于它的配置是必须的。基于“ContinuationCometdServlet”的其他配置参数还有很多,如:
Timeout:长轮询的过期时间。如果超过这个时间还没有客户端消息,服务端会推送一个空消息。
Interval:轮询间隔时间。客户端结束前一个请求到发送下一个请求之间的间隔时间。
maxInterval:服务端最长等待时间。即:建立连接时,如果超过这个时间仍没有接到一个新的长轮询连接请求,服务端就会认为该客户端无效或者关闭了。
logLevel:日志级别。“0 = warn, 1 = info, 2 = debug”。
以上是主要的配置参数,其余的配置参数还有很多,这里不一一介绍,有需要的读者可以查阅 Dojo 的帮助文档。另外,最后几行我们还配置了一个“cross-origin”,对应着“CrossOriginFilter”类,他用于支持跨域的 JavaScript 请求,如果您的项目中要支持跨域的服务器推送,请加入该配置。
接下来我们再来看看一些高级配置参数:
清单 10. 高级配置参数(web.xml)
[html] view plaincopy
<servlet>
<servlet-name>cometd</servlet-name>
<servlet-class>org.cometd.java.annotation.AnnotationCometdServlet</servlet-class>
<init-param>
<param-name>logLevel</param-name>
<param-value>1</param-value>
</init-param>
<init-param>
<param-name>services</param-name>
<param-value>org.cometd.examples.ChatService</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>cometd</servlet-name>
<url-pattern>/cometd/*</url-pattern>
</servlet-mapping>
<servlet>
<servlet-name>cometdDemo</servlet-name>
<servlet-class>org.cometd.examples.CometdDemoServlet</servlet-class>
<load-on-startup>2</load-on-startup>
</servlet>
这里我们主要要注意三个地方:
1. “CometdDemoServlet”:它是用于启动服务端 Cometd 框架的 Servlet,我们在后面会介绍。由于他配置了“load-on-startup”参数,所以在服务容器启动的时候,我们的 Cometd 服务端就已经搭建好了,之后我们会着重介绍他的“init”方法中的行为。
2. “AnnotationCometdServlet”:这个 Servlet 配置在这里表示了我们在服务端代码是基于 annotation 的。这是一个非常实用的 Servlet,通过这个 Servlet,你会发现,我们要做的事情仅仅是定义几个 Service 类,实现其中的几个方法即可。连很多调用 Cometd 框架 API 接口的代码都省去了。
3. “ChatService”:这里声明了一个 Service 类,他的用途是处理服务渠道的消息。这里声明的作用等同于代码中的“processor.process(new ChatService())”。
配置完成后,我们接下来可以看看代码了。通过以上的配置之后,你会发现,我们接下来要写的代码非常简单精炼:
清单 11. 服务类初始化 init
[java] view plaincopy
public void init() throws ServletException
{
final BayeuxServerImpl bayeux =
(BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);
if (bayeux==null)
throw new UnavailableException("No BayeuxServer!");
.................
// 创建扩展点
bayeux.addExtension(new TimesyncExtension());
bayeux.addExtension(new AcknowledgedMessagesExtension());
// 设定握手连接权限
bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(
GrantAuthorizer.GRANT_PUBLISH);
// 启动服务渠道
ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);
processor.process(new EchoRPC());
processor.process(new Monitor());
//processor.process(new ChatService());
bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer()
{
public void configureChannel(ConfigurableServerChannel channel)
{
channel.setPersistent(true);
}
});
if (bayeux.getLogger().isDebugEnabled())
System.err.println(bayeux.dump());
.................
}
这里我们介绍三个知识点:
1. Extension:Extension 是一个函数,它会在消息发出之前或者收到之后被调用,专门用来修改消息内容,例如加入一些特殊属性(这些属性多在消息的 ext 属性中)。注意,这些属性大多是应用无关的,如记录长轮询的次数等等。这里的“TimesyncExtension”和“AcknowledgedMessagesExtension”是两个比较常用的 Extension:
1) “Timesync Extension”用于计算客户端事件和服务端时间的偏差。客户端需要同时引入“dojox.cometd.timesync”类,该 Extension 使得客户端和服务端在每次握手或者连接的时候能够互相交换各自的时钟信息,这也是的客户端可以很精确的计算出他与服务端时钟的偏移量。消息格式如下:
{ext:{timesync:{tc:12345567890,ts:1234567900,p:123,a:3},...},...}
TC:客户端发消息的时间(距离 1970 年 1 月号的时长,单位为毫秒)
TS:服务端收到消息的时间
2) “Acknowledge Extension”用于提供可靠的顺序消息机制。一旦加入了“Acknowledge Extension”,服务端会阻截非长轮询的客户端请求,这样会使你的服务器更加的高效。注意:客户端需要同时引入“dojox.cometd.ack”类与其协同工作。
2. Authorizer:设定握手连接权限,这里设定值为“GrantAuthorizer.GRANT_PUBLISH”,表示允许所有客户端建立握手连接。
3. Process Service:启动服务渠道“processor.process(new EchoRPC())”。通过这些服务渠道类,我们可以启动服务渠道处理客户端请求。这是我们服务端推送技术的关键所在,我们的业务逻辑代码也是主要放在这些服务渠道类里面。
接下来我们来看看这些服务渠道类的具体实现:
清单 12. Echo Service 实现
[java] view plaincopy
@Service("echo")
public static class EchoRPC
{
@Session
private ServerSession _session;
@SuppressWarnings("unused")
@Configure("/service/echo")
private void configureEcho(ConfigurableServerChannel channel)
{
channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);
}
@Listener("/service/echo")
public void doEcho(ServerSession session, ServerMessage message)
{
Map<String,Object> data = message.getDataAsMap();
Log.info("ECHO from "+session+" "+data);
for(int i = 0; i < 50; i++){
session.deliver(_session, message.getChannel(), data, null);
}
}
}
我们可以在“configureEcho”里面设定该服务渠道支持的权限。我们主要来看看“doEcho”方法,它被标识为“@Listener("/service/echo")”,所以它可以用于像客户端推送服务渠道为“echo”的消息,我们之前客户端代码示例里面的如下代码:“dojox.cometd.subscribe("/service/echo",echoRpcReturn)”就是专门用于处理这里服务渠道推送的消息,消息推送通过“deliver”方法,推送的消息信息放在“data”实参中。
再来看看 Monitor 类:
清单 13. Monitor Service 实现
[java] view plaincopy
@Service("monitor")
public static class Monitor
{
@Listener("/meta/subscribe")
public void monitorSubscribe(ServerSession session, ServerMessage message)
{
Log.info("Monitored Subscribe from "+session+" for "
+message.get(Message.SUBSCRIPTION_FIELD));
}
@Listener("/meta/unsubscribe")
public void monitorUnsubscribe(ServerSession session, ServerMessage message)
{
Log.info("Monitored Unsubscribe from "+session+" for "
+message.get(Message.SUBSCRIPTION_FIELD));
}
@Listener("/meta/*")
public void monitorMeta(ServerSession session, ServerMessage message)
{
if (Log.isDebugEnabled())
Log.debug(message.toString());
}
}
Monitor 渠道类与之前的 Echo 服务渠道类比较类似,不过它主要用于处理 meta 渠道,与业务逻辑无关。
最后,我们来看看被注释掉的“ChatService”类,他也可以通过“processor.process(new ChatService())”来启用,但是我们这里用了一个更为简单的方法:直接配置在 web.xml 文件中:
清单 14. ChatService 的配置
[html] view plaincopy
<servlet>
...............
<init-param>
<param-name>services</param-name>
<param-value>org.cometd.examples.ChatService</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
细心的读者可能在之前的代码示例中已经看到,这里就是通过配置的方式加载服务渠道类。参考以下具体实现的代码:
清单 15. ChatService 实现
[java] view plaincopy
@Service("chat")
public class ChatService
{
..........................................
@Listener("/service/members")
public void handleMembership(ServerSession client, ServerMessage message)
{
Map<String, Object> data = message.getDataAsMap();
final String room = ((String)data.get("room")).substring("/chat/".length());
Map<String, String> roomMembers = _members.get(room);
if (roomMembers == null)
{
Map<String, String> new_room = new ConcurrentHashMap<String, String>();
roomMembers = _members.putIfAbsent(room, new_room);
if (roomMembers == null) roomMembers = new_room;
}
final Map<String, String> members = roomMembers;
String userName = (String)data.get("user");
members.put(userName, client.getId());
client.addListener(new ServerSession.RemoveListener()
{
public void removed(ServerSession session, boolean timeout)
{
members.values().remove(session.getId());
broadcastMembers(room,members.keySet());
}
});
broadcastMembers(room,members.keySet());
}
private void broadcastMembers(String room, Set<String> members)
{
// Broadcast the new members list
ClientSessionChannel channel =
_session.getLocalSession().getChannel("/members/"+room);
channel.publish(members);
}
..........................................
@Listener("/service/privatechat")
protected void privateChat(ServerSession client, ServerMessage message)
{
Map<String,Object> data = message.getDataAsMap();
String room = ((String)data.get("room")).substring("/chat/".length());
Map<String, String> membersMap = _members.get(room);
if (membersMap==null)
{
Map<String,String>new_room=new ConcurrentHashMap<String, String>();
membersMap=_members.putIfAbsent(room,new_room);
if (membersMap==null)
membersMap=new_room;
}
String[] peerNames = ((String)data.get("peer")).split(",");
ArrayList<ServerSession> peers = new ArrayList<ServerSession>(peerNames.length);
.................
}
}
以上是摘录部分 ChatService 实现代码,它主要是实现一个在线的聊天室,包括公开发言和私有(1 对 1)聊天等等功能,它的实现方式与之前的 Echo 和 Monitor 类似,这里不做详述,有兴趣的读者可以参考一下他的实现,来构造自己的服务器推送应用。
回页首
服务器推送技术之比较
其实有很多种方式实现服务器推送,它们各有各的优缺点:
传统轮询:此方法是利用 HTML 里面 meta 标签的刷新功能,在一定时间间隔后进行页面的转载,以此循环往复。它的最大缺点就是页面刷性给人带来的体验很差,而且服务器的压力也会比较大。
Ajax 轮询:异步响应机制,即通过不间断的客户端 Ajax 请求,去发现服务端的变化。这种方式由于是客户端主动连接的,所以会有一定程度的延时,并且服务器的压力也不小。
长连接:这也是我们之前所介绍的一种方式。由于它是利用客户端的现有连接实现服务器主动向客户端推送信息,所以延时的情况很少,并且由于服务端的可操控性使得服务器的压力也迅速减小。其实这种技术还有其他的实现方式,通过 Iframe,在页面上嵌入一个隐藏帧(Iframe),将其“src”属性指向一个长连接的请求,这样一来,服务端就能够源源不断的向客户端发送数据。这种方式的不足就在于:它会造成浏览器的进度栏一直显示没有加载完成,当然我们可以通过 Google 的一个称为“htmlfile”的 ActiveX 控件解决,但是毕竟他需要安装 ActiveX 控件,对于终端用户也是不合适的。
套接字:可以利用 Flash 的 XMLSocket 类或者 Java 的 Applet 来建立 Socket 连接,实现全双工的服务器推送,然后通过 Flash 或者 Applet 与 JavaScript 通信的接口来实现最终的数据推送。但是这种方式需要 Flash 或者 JVM 的支持,同样不太合适于终端用户。
HTML5 的 WebSocket:这种方式其实与套接字一样,但是这里需要单独强调一下:它是不需要用户而外安装任何插件的。HTML5 提供了一个 WebSocket 的 JavaScript 接口,可以直接与服务端建立 Socket 连接,实现全双工通信,这种方式的服务器推送就是完全意义上的服务器推送了,没有半点模拟的成分,只是现阶段支持 HTML5 的浏览器并不多,而且一般老版本的各种浏览器基本都不支持。不过 HTML5 是一套非常好的标准,在将来,当 HTML5 流行起来以后将是我们实现服务器推送技术的不二选择。
回页首
结束语
这篇文章介绍了 Dojo 中的服务器推送 Cometd 工具包。基于服务器推送的理念,介绍了 Bayeux 协议的核心思想,并结合一个简单示例介绍了服务器推送的基本实现。随后,本着快速建立服务器推送框架的想法,介绍了 Dojo 的 Cometd 工具包,并分别从客户端接口和服务端接口两个方面分别介绍了 Dojo 的服务器推送框架的搭建和实现原理。最后,通过一些简单的示例展示了基于服务端推送的业务逻辑的具体实现。服务端推送技术具有很强的实用性,希望广大读者在开发自己的项目的过程中多关注一下,以尽可能多的完善自己的 Web 应用。
参考资料
学习
Dojo 校园文档主页:Dojo 中控件的比较完全的 API 文档主页,包括 Dojo,Dijit,Dojox 等等。
Dojo 官方文档主页:Dojo 官方的很多支持 Ajax 应用程序开发的组件的文档。
“在 Ajax 应用程序中实现实时数据推送”(developerWorks,2009 年 11 月):一些特殊的应用场景会要求 Web 程序的客户端能够在服务器端数据发生变化时立即得到通知,这就要求应用程序具有“服务器推送”的特性。本文讲述了如何利用 RIA 技术来通过套接字来实现相应的功能,并介绍了具体的实现方法。
“使用 HTML5 WebSocket 构建实时 Web 应用”(developerWorks,2011 年 12 月):本文主要介绍了 HTML5 WebSocket 的原理以及它给实时 Web 开发带来的革命性的创新,并通过一个 WebSocket 服务器和客户端的案例来充分展示 WebSocket 的强大和易用。
developerWorks Web development 专区:通过专门关于 Web 技术的文章和教程,扩展您在网站开发方面的技能。
developerWorks Ajax 资源中心:这是有关 Ajax 编程模型信息的一站式中心,包括很多文档、教程、论坛、blog、wiki 和新闻。任何 Ajax 的新信息都能在这里找到。
developerWorks Web 2.0 资源中心,这是有关 Web 2.0 相关信息的一站式中心,包括大量 Web 2.0 技术文章、教程、下载和相关技术资源。您还可以通过 Web 2.0 新手入门栏目,迅速了解 Web 2.0 的相关概念。
查看 HTML5 专题,了解更多和 HTML5 相关的知识和动向。
原文转自:http://blog.csdn.net/dojotoolkit/article/details/7298417
服务器推送技术和 Bayeux 协议简介
服务器推送技术的基础思想是将浏览器主动查询信息改为服务器主动发送信息。服务器发送一批数据,浏览器显示这些数据,同时保证与服务器的连接。当服务器需要再次发送一批数据时,浏览器显示数据并保持连接。以后,服务器仍然可以发送批量数据,浏览器继续显示数据,依次类推。基于这种思想,这里我们要引出 Bayeux 协议。
Bayeux 是一套基于 Publish / Subscribe 模式,以 JSON 格式在浏览器与服务器之间传输事件的通信协议。该协议规定了浏览器与服务器之问的双向通信机制,克服了传统 Web 通信模式的缺点。
Bayeux 协议主要基于 HTTP 来传输低延迟的、异步的事件消息。这些消息通过频道 (Channels) 来投递,能够实现从服务器端到客户端、从客户端到服务器端或者通过服务器从一个客户端到另一个客户端的传送。Bayeux 协议的主要目的是为使用了 Ajax 和 Comet 技术的 Web 客户端实现高响应的用户交互。Bayeux 协议旨在通过允许执行者更容易的实现互操作性,来降低开发 Comet 应用程序的复杂性。它解决了共同的消息发布和路由问题,并提供了渐进式的改进和扩展机制。
一般情况下,在 HTTP 协议中,Client 要想获得 Server 的消息,必须先自己发送一个 Request,然后 Server 才会给予 Response。而 Bayeux 协议改变了这个情况,它允许 Server 端异步 Push 自己的消息到 Client 端。从而实现了 Client 和 Server 之间的双向操作模式。
回页首
服务器推送技术的一个简单实现
基于 Bayeux 协议实现服务器推送技术的方式有很多,可以通过 Flex 或者 Java 的 Applet。基于这两种技术,我们可以建立在客户端建立服务套接字接口,“双向操作模式”自然很容易实现,但是这些方式需要除浏览器以外的运行环境的支持。这里我们希望能采用一种纯脚本的方式,这种方式是不可能建立服务套接字接口的,那如何实现基于 Bayeux 协议的服务器推送呢?其实是可以模拟实现的,主要有两种方式:
1. 基于 HTTP 的长轮询来进行消息通信(基于 Ajax 的长轮询(long-polling)方式)。
2. 基于 Iframe 及 htmlfile 的流(streaming)方式。
这里我们采用第一种方式实现,即:客户端先向服务器端发送一个 HTTP Request,服务器端接收到后,阻塞在那边,等服务器有消息的时候,则返回一个 HTTP Response 给客户端,客户端收到后,断开连接,紧接着再发第二个 HTTP Request,以此反复进行,保持这个“长轮询”。期间,如果连接超时,那么会断开重连,以保持连接。
基于以上的思想,我们来看一下一个简单的实现,这个简单实现是基于 PHP 的。示例很简单,即便没用过 PHP 也能够很容易看明白,而且我们会在后面一一作出解释。
这个示例主要实现这样一个功能:
我们在浏览器里面分别打开三个窗口,并访问同一张页面。修改其中一个页面上的内容,另外两个页面上的内容也随即发生变化(注意:这里不用刷新页面)。这就会给我们一种:数据是服务器推送过来的感觉。
图 1. 简单服务器推送示例 -- 内容修改前
我们修改其中第一个窗口(左上)的内容(输入“222”,点击“Send”按钮,发送到后台)。此时不仅第一个窗口的内容变化了,其余两个窗口的内容也随即变化。
图 2. 简单服务器推送示例 -- 内容修改
接下来我们来看看示例代码吧:
清单 1. 简单服务器推送 -- 前端代码 HTML
<form action="" method="get"
onsubmit="comet.doRequest($('word').value);$('word').value='';return false;">
<input type="text" name="word" id="word" value="" />
<input type="submit" name="submit" value="Send" />
</form>
这个是我们所看到的输入框和提交按钮,大家可以注意一下它的“onsubmit”方法:当我们输入内容并点击提交时,它会执行“comet.doRequest($('word').value)”方法向后端发起请求(其实在这之前我们就已经建立了与服务端的长轮询并可随时开始服务器推送数据)。接下来我们来看看这个“comet”是什么样子的以及他的 Request 的具体实现:
清单 2. 简单服务器推送 -- 前端代码 JavaScript
[javascript] view plaincopy
var Comet = Class.create();
Comet.prototype = {
timestamp: 0,
url: './backend.php',
noerror: true,
initialize: function(){
},
connect: function(){
this.ajax = new Ajax.Request(this.url, {
method: 'get',
parameters: {
'timestamp': this.timestamp
},
onSuccess: function(transport){
var response = transport.responseText.evalJSON();
this.comet.timestamp = response['timestamp'];
this.comet.handleResponse(response);
this.comet.noerror = true;
},
onComplete: function(transport){
if (!this.comet.noerror) setTimeout(function(){
comet.connect()
}, 5000);
else
this.comet.connect();
this.comet.noerror = false;
}
});
this.ajax.comet = this;
},
handleResponse: function(response){
$('content').innerHTML += '<div>' + response['msg'] + '</div>';
},
doRequest: function(request){
new Ajax.Request(this.url, {
method: 'get',
parameters: {
'msg': request
}
});
}
}
var comet = new Comet();
comet.connect();
我们先看最后两段代码,这里是页面初始化时会执行的代码,其实在这里,我们就建立了一服务端的长轮询,我们来看看“connect”方法的实现吧:
“connect”方法这里是发了一个 Ajax 请求,然后分别设定了成功时(onSuccess)的返回处理和请求完成时(onComplete)的处理(注意 onComplete 不论成功失败都会执行)。我们要挂住这里的 onComplete 方法。可以看到,当请求完成时,如果连接有问题,它会过 5 秒重新连接,;如果没有问题,他会立即重新连接。
相信大家看到这里应该会有点眉目了,这里其实没有什么所谓的恒定不断的连接(类似 TCP 方式),它的真正实现是通过不断的 Ajax 请求实现的。
所以,当我们开启 3 个窗口时,其实我们打开了 3 个模拟的不间断的客户端与服务端的连接,所以他们会即时解到服务端的信息,不需要刷新页面。
我们再来看看服务端的实现,看看他是如何推送的:
清单 3. 简单服务器推送 -- 后端代码 PHP
[php] view plaincopy
$filename = dirname(__FILE__).'/data.txt';
// 将新消息存入文件中
$msg = isset($_GET['msg']) ? $_GET['msg'] : '';
if ($msg != '')
{
file_put_contents($filename,$msg);
die();
}
// 这是一个无限循环,一旦发现文件被修改,便会跳出循环并返回文件修改数据。如果文件一直没有修改,则会一
// 直处于循环检测状态,此时的 Ajax 连接也会一直保留,直到文件被修改为止,这就是所谓的“长轮询”。
$lastmodif = isset($_GET['timestamp']) ? $_GET['timestamp'] : 0;
$currentmodif = filemtime($filename);
while ($currentmodif <= $lastmodif) // 检测文件是否被修改
{
usleep(10000); // sleep 10ms to unload the CPU
clearstatcache();
$currentmodif = filemtime($filename);
}
// 返回 JSON 数组
$response = array();
$response['msg'] = file_get_contents($filename);
$response['timestamp'] = $currentmodif;
echo json_encode($response);
flush();
我们可以参照上面的注释理解该代码,其实并不需要多少 PHP 的知识。服务端推送技术不是一个开发用的控件库,而是一个思想。这里的 while 循环便说明了服务端推送是如何保留所谓的“长轮询”的。
现在大家应该明白为什么三个窗口会同步变化了。其主要的核心思想就是服务端“握住”长轮询,然后在适当的时候“放手”。
回页首
Dojo 的 Cometd 工具包简介
之前我们是基于 JavaScript 自己实现了一个简单的 Cometd 应用,我们花了大量的代码来建立一个 Cometd 框架,真正用于处理我们自己的业务逻辑的代码其实就是“handleResponse”里面的那一行。我们能不能吧这些通用的代码省掉呢?答案是肯定的。Dojo 已经对 Cometd 做了封装,基于 Dojo 的 Cometd 包,我们不用再浪费大量的代码在搭建 Cometd 框架上。对于前端脚本代码,我们只需要加上一个 Cometd 包的简单接口代码,便可以开始加入我们自己的业务逻辑代码了。
当然,Dojo 的 Cometd 包还包括后端的代码,可以在 Dojo 的官网下载中找到,它不与 Dojo 包一起发布,是一个单独的服务端开源代码,基于 Java 和 Jetty 的,有兴趣的读者可以下载下来研究一下。
通过 Dojo 的这两部分代码,我们便可以迅速地搭建我们的 Cometd 框架,我们剩下需要做的就是加入我们的业务逻辑。
回页首
Dojo 的 Cometd 工具包之前端
接下来我们来看看 Dojo 的 Cometd 工具包的前端封装:
清单 4. Cometd 前端初始化
[javascript] view plaincopy
dojox.cometd.init("http://www.xxx.com/cometd");
这个接口用于建立并初始化与服务端的握手连接(Bayeux handshake,初始化了“Bayeux communication” 消息通讯)。建立这个连接是基于 Bayeux 协议的,它主要有两个任务:
客户端与服务端协商传输的消息类型。
如果协商成功,服务端会通知客户端具体的请求参数配置。
如果协商失败,客户端重新发起协商流程。
我们深入 Dojo 的 init 方法内部可以看到握手连接的具体实现过程,它的实现也是不间断的重复发送客户端的 Ajax 请求,与我们之前的自制案类似,有兴趣的同学可以参考如下代码(摘取部分):
清单 5. Cometd 内部机制
[javascript] view plaincopy
this.init = function(...){
............
var bindArgs = {
url: this.url,
handleAs: this.handleAs,
content: { "message": dojo.toJson([props]) },
load: dojo.hitch(this,function(msg){
this._backon();
this._finishInit(msg);
}),
error: dojo.hitch(this,function(e){
this._backoff();
this._finishInit(e);
}),
timeout: this.expectedNetworkDelay
};
..............
if(this._isXD){
r = dojo.io.script.get(bindArgs);
}else{
r = dojo.xhrPost(bindArgs);
}
..............
}
this._finishInit = function(data){
..................
if(successful){
........
//ajax request inside
this.tunnelInit = transport.tunnelInit && dojo.hitch(transport,
"tunnelInit");
this.tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport,
"tunnelCollapse");
transport.startup(data);
}else{
if(!this._advice || this._advice["reconnect"] != "none"){
setTimeout(dojo.hitch(this, "init", this.url, this._props),
this._interval());
}
}
....................
}
可见,它们的 callback 方法里面都带有对自己本身的调用,这里的”init“方法也不例外。细心的读者可能还会发现,其实从例子上可以看出:Dojo 的 Cometd 也支持跨域,它的跨域是通过“script”的方式实现的。这里有一点需要大家了解,我们默认的服务端推送实现方式是长轮询(long-polling)模式,遇到跨域时,“long-polling”便不再适用,转为基于“script”的返回调用(callback-polling)模式。
接下来我们再来看看 Cometd 中关于消息推送的一些接口,这些消息通讯主要是基于渠道:
清单 6. Cometd 前端发布消息
[javascript] view plaincopy
dojox.cometd.publish("/service/echo", { msg: msg });
这里的所谓“发布消息”就是向后端发送消息,用于前端主动向后端推送。
这里的第一个参数是发送消息的渠道标识(channel),这种“channel”共有三种类型:
1. 元渠道(meta channels):示例“/meta/connect”(通常以“/meta/”为开头)。元渠道主要不是用来消息传输,而是用于客户端监听,如握手连接或者网络连接等等的错误。通常我们会在客户端调用“addListener()”来开启监听元渠道,它可以在握手连接的建立之前就开启监听,而且这种消息监听是同步的。
2. 服务渠道(service channels):示例“/service/connect”(通常以“/service/”为开头)。它主要用于私有消息通讯,主要是一对一的通讯。通常我们会在客户端调用“subscribe()”来订阅服务渠道消息。服务渠道只有等握手连接建立好后才能开启,而且它是异步通讯的。
3. 普通渠道(normal channels):示例“/foo/bar”(无限制)。这种渠道没有什么限制,主要用于广播消息,即:多个客户端订阅了一个服务,该服务可以通过普通渠道进行消息广播。
渠道是通信的基础模式,我们可以根据需要选择相应的渠道模式。
第二个参数则是消息对象,这里的“msg”则是消息内容。
有一点要注意:这里的“publish”是基于 Bayeux 协议的,采用的异步消息传输机制,所以它是在服务端(Bayeux 服务器)收到消息之前就返回的。所以 publish 的返回并不代表服务端收到你 publish 的消息了。
Dojo 的 Cometd 还支持批量发送消息,通过这个接口可以有效地避免不必要的网络消息传输的浪费:
清单 7. Cometd 前端批量发布消息
[javascript] view plaincopy
// 方法 1
cometd.batch(function()
{
cometd.publish('/channel1', { product: 'foo' });
cometd.publish('/channel2', { notificationType: 'all' });
cometd.publish('/channel3', { update: false });
});
// 方法 2
cometd.startBatch()
cometd.publish('/channel1', { product: 'foo' });
cometd.publish('/channel2', { notificationType: 'all' });
cometd.publish('/channel3', { update: false });
cometd.endBatch()
上述两种方案都可以实现消息的批量发送,推荐使用方法 1。
接下来我们看看服务端的消息推送:
清单 8. Cometd 前端订阅消息
[javascript] view plaincopy
dojox.cometd.subscribe("/service/echo",echoRpcReturn);
function echoRpcReturn(msg){
dojo.byId("responses").innerHTML += msg;
}
这里所谓的“订阅消息”,其实就是接收服务端推送的消息,是后端主动向前端推送。这也是服务端推送的精华所在,同样也是很简单的一行代码。
这里我们看到了一个熟悉的方法 --- “subscribe”,之前我们已经介绍过了,它主要用于订阅服务渠道私有消息,这里就是它用法的一个示例。对应的服务端 Service 向对应的前端订阅者推送消息,这里就是通过“echo”渠道向前端推送消息,他会回调“echoRpcReturn”方法,并传入推送的消息作为实参。对于后端的每次推送,都会调用前端的“echoRpcReturn”方法。
回页首
Dojo 的 Cometd 工具包之后端
Dojo 的 Cometd 工具包的后端实现是基于 Java 和 Jetty 组件的,通过 Dojo 的服务端 Cometd 组件,我们同样能极其迅速的构建 Cometd 框架。我们需要做的仅仅是加入我们的业务逻辑代码即可。
先来看看 web.xml 的配置参数:
清单 9. 基本配置参数(web.xml)
[html] view plaincopy
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
version="2.5">
<servlet>
<servlet-name>cometd</servlet-name>
<servlet-class>
org.cometd.server.continuation.ContinuationCometdServlet
</servlet-class>
<init-param>
<param-name>timeout</param-name>
<param-value>60000</param-value>
</init-param>
</servlet>
<servlet-mapping>
<servlet-name>cometd</servlet-name>
<url-pattern>/cometd/*</url-pattern>
</servlet-mapping>
<filter>
<filter-name>cross-origin</filter-name>
<filter-class>org.eclipse.jetty.servlets.CrossOriginFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>cross-origin</filter-name>
<url-pattern>/cometd/*</url-pattern>
</filter-mapping>
</web-app>
这里我们先来看看“ContinuationCometdServlet”,这个 Servlet 主要用于解释 Bayeux 协议,所以关于它的配置是必须的。基于“ContinuationCometdServlet”的其他配置参数还有很多,如:
Timeout:长轮询的过期时间。如果超过这个时间还没有客户端消息,服务端会推送一个空消息。
Interval:轮询间隔时间。客户端结束前一个请求到发送下一个请求之间的间隔时间。
maxInterval:服务端最长等待时间。即:建立连接时,如果超过这个时间仍没有接到一个新的长轮询连接请求,服务端就会认为该客户端无效或者关闭了。
logLevel:日志级别。“0 = warn, 1 = info, 2 = debug”。
以上是主要的配置参数,其余的配置参数还有很多,这里不一一介绍,有需要的读者可以查阅 Dojo 的帮助文档。另外,最后几行我们还配置了一个“cross-origin”,对应着“CrossOriginFilter”类,他用于支持跨域的 JavaScript 请求,如果您的项目中要支持跨域的服务器推送,请加入该配置。
接下来我们再来看看一些高级配置参数:
清单 10. 高级配置参数(web.xml)
[html] view plaincopy
<servlet>
<servlet-name>cometd</servlet-name>
<servlet-class>org.cometd.java.annotation.AnnotationCometdServlet</servlet-class>
<init-param>
<param-name>logLevel</param-name>
<param-value>1</param-value>
</init-param>
<init-param>
<param-name>services</param-name>
<param-value>org.cometd.examples.ChatService</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>cometd</servlet-name>
<url-pattern>/cometd/*</url-pattern>
</servlet-mapping>
<servlet>
<servlet-name>cometdDemo</servlet-name>
<servlet-class>org.cometd.examples.CometdDemoServlet</servlet-class>
<load-on-startup>2</load-on-startup>
</servlet>
这里我们主要要注意三个地方:
1. “CometdDemoServlet”:它是用于启动服务端 Cometd 框架的 Servlet,我们在后面会介绍。由于他配置了“load-on-startup”参数,所以在服务容器启动的时候,我们的 Cometd 服务端就已经搭建好了,之后我们会着重介绍他的“init”方法中的行为。
2. “AnnotationCometdServlet”:这个 Servlet 配置在这里表示了我们在服务端代码是基于 annotation 的。这是一个非常实用的 Servlet,通过这个 Servlet,你会发现,我们要做的事情仅仅是定义几个 Service 类,实现其中的几个方法即可。连很多调用 Cometd 框架 API 接口的代码都省去了。
3. “ChatService”:这里声明了一个 Service 类,他的用途是处理服务渠道的消息。这里声明的作用等同于代码中的“processor.process(new ChatService())”。
配置完成后,我们接下来可以看看代码了。通过以上的配置之后,你会发现,我们接下来要写的代码非常简单精炼:
清单 11. 服务类初始化 init
[java] view plaincopy
public void init() throws ServletException
{
final BayeuxServerImpl bayeux =
(BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);
if (bayeux==null)
throw new UnavailableException("No BayeuxServer!");
.................
// 创建扩展点
bayeux.addExtension(new TimesyncExtension());
bayeux.addExtension(new AcknowledgedMessagesExtension());
// 设定握手连接权限
bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(
GrantAuthorizer.GRANT_PUBLISH);
// 启动服务渠道
ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);
processor.process(new EchoRPC());
processor.process(new Monitor());
//processor.process(new ChatService());
bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer()
{
public void configureChannel(ConfigurableServerChannel channel)
{
channel.setPersistent(true);
}
});
if (bayeux.getLogger().isDebugEnabled())
System.err.println(bayeux.dump());
.................
}
这里我们介绍三个知识点:
1. Extension:Extension 是一个函数,它会在消息发出之前或者收到之后被调用,专门用来修改消息内容,例如加入一些特殊属性(这些属性多在消息的 ext 属性中)。注意,这些属性大多是应用无关的,如记录长轮询的次数等等。这里的“TimesyncExtension”和“AcknowledgedMessagesExtension”是两个比较常用的 Extension:
1) “Timesync Extension”用于计算客户端事件和服务端时间的偏差。客户端需要同时引入“dojox.cometd.timesync”类,该 Extension 使得客户端和服务端在每次握手或者连接的时候能够互相交换各自的时钟信息,这也是的客户端可以很精确的计算出他与服务端时钟的偏移量。消息格式如下:
{ext:{timesync:{tc:12345567890,ts:1234567900,p:123,a:3},...},...}
TC:客户端发消息的时间(距离 1970 年 1 月号的时长,单位为毫秒)
TS:服务端收到消息的时间
2) “Acknowledge Extension”用于提供可靠的顺序消息机制。一旦加入了“Acknowledge Extension”,服务端会阻截非长轮询的客户端请求,这样会使你的服务器更加的高效。注意:客户端需要同时引入“dojox.cometd.ack”类与其协同工作。
2. Authorizer:设定握手连接权限,这里设定值为“GrantAuthorizer.GRANT_PUBLISH”,表示允许所有客户端建立握手连接。
3. Process Service:启动服务渠道“processor.process(new EchoRPC())”。通过这些服务渠道类,我们可以启动服务渠道处理客户端请求。这是我们服务端推送技术的关键所在,我们的业务逻辑代码也是主要放在这些服务渠道类里面。
接下来我们来看看这些服务渠道类的具体实现:
清单 12. Echo Service 实现
[java] view plaincopy
@Service("echo")
public static class EchoRPC
{
@Session
private ServerSession _session;
@SuppressWarnings("unused")
@Configure("/service/echo")
private void configureEcho(ConfigurableServerChannel channel)
{
channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);
}
@Listener("/service/echo")
public void doEcho(ServerSession session, ServerMessage message)
{
Map<String,Object> data = message.getDataAsMap();
Log.info("ECHO from "+session+" "+data);
for(int i = 0; i < 50; i++){
session.deliver(_session, message.getChannel(), data, null);
}
}
}
我们可以在“configureEcho”里面设定该服务渠道支持的权限。我们主要来看看“doEcho”方法,它被标识为“@Listener("/service/echo")”,所以它可以用于像客户端推送服务渠道为“echo”的消息,我们之前客户端代码示例里面的如下代码:“dojox.cometd.subscribe("/service/echo",echoRpcReturn)”就是专门用于处理这里服务渠道推送的消息,消息推送通过“deliver”方法,推送的消息信息放在“data”实参中。
再来看看 Monitor 类:
清单 13. Monitor Service 实现
[java] view plaincopy
@Service("monitor")
public static class Monitor
{
@Listener("/meta/subscribe")
public void monitorSubscribe(ServerSession session, ServerMessage message)
{
Log.info("Monitored Subscribe from "+session+" for "
+message.get(Message.SUBSCRIPTION_FIELD));
}
@Listener("/meta/unsubscribe")
public void monitorUnsubscribe(ServerSession session, ServerMessage message)
{
Log.info("Monitored Unsubscribe from "+session+" for "
+message.get(Message.SUBSCRIPTION_FIELD));
}
@Listener("/meta/*")
public void monitorMeta(ServerSession session, ServerMessage message)
{
if (Log.isDebugEnabled())
Log.debug(message.toString());
}
}
Monitor 渠道类与之前的 Echo 服务渠道类比较类似,不过它主要用于处理 meta 渠道,与业务逻辑无关。
最后,我们来看看被注释掉的“ChatService”类,他也可以通过“processor.process(new ChatService())”来启用,但是我们这里用了一个更为简单的方法:直接配置在 web.xml 文件中:
清单 14. ChatService 的配置
[html] view plaincopy
<servlet>
...............
<init-param>
<param-name>services</param-name>
<param-value>org.cometd.examples.ChatService</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
细心的读者可能在之前的代码示例中已经看到,这里就是通过配置的方式加载服务渠道类。参考以下具体实现的代码:
清单 15. ChatService 实现
[java] view plaincopy
@Service("chat")
public class ChatService
{
..........................................
@Listener("/service/members")
public void handleMembership(ServerSession client, ServerMessage message)
{
Map<String, Object> data = message.getDataAsMap();
final String room = ((String)data.get("room")).substring("/chat/".length());
Map<String, String> roomMembers = _members.get(room);
if (roomMembers == null)
{
Map<String, String> new_room = new ConcurrentHashMap<String, String>();
roomMembers = _members.putIfAbsent(room, new_room);
if (roomMembers == null) roomMembers = new_room;
}
final Map<String, String> members = roomMembers;
String userName = (String)data.get("user");
members.put(userName, client.getId());
client.addListener(new ServerSession.RemoveListener()
{
public void removed(ServerSession session, boolean timeout)
{
members.values().remove(session.getId());
broadcastMembers(room,members.keySet());
}
});
broadcastMembers(room,members.keySet());
}
private void broadcastMembers(String room, Set<String> members)
{
// Broadcast the new members list
ClientSessionChannel channel =
_session.getLocalSession().getChannel("/members/"+room);
channel.publish(members);
}
..........................................
@Listener("/service/privatechat")
protected void privateChat(ServerSession client, ServerMessage message)
{
Map<String,Object> data = message.getDataAsMap();
String room = ((String)data.get("room")).substring("/chat/".length());
Map<String, String> membersMap = _members.get(room);
if (membersMap==null)
{
Map<String,String>new_room=new ConcurrentHashMap<String, String>();
membersMap=_members.putIfAbsent(room,new_room);
if (membersMap==null)
membersMap=new_room;
}
String[] peerNames = ((String)data.get("peer")).split(",");
ArrayList<ServerSession> peers = new ArrayList<ServerSession>(peerNames.length);
.................
}
}
以上是摘录部分 ChatService 实现代码,它主要是实现一个在线的聊天室,包括公开发言和私有(1 对 1)聊天等等功能,它的实现方式与之前的 Echo 和 Monitor 类似,这里不做详述,有兴趣的读者可以参考一下他的实现,来构造自己的服务器推送应用。
回页首
服务器推送技术之比较
其实有很多种方式实现服务器推送,它们各有各的优缺点:
传统轮询:此方法是利用 HTML 里面 meta 标签的刷新功能,在一定时间间隔后进行页面的转载,以此循环往复。它的最大缺点就是页面刷性给人带来的体验很差,而且服务器的压力也会比较大。
Ajax 轮询:异步响应机制,即通过不间断的客户端 Ajax 请求,去发现服务端的变化。这种方式由于是客户端主动连接的,所以会有一定程度的延时,并且服务器的压力也不小。
长连接:这也是我们之前所介绍的一种方式。由于它是利用客户端的现有连接实现服务器主动向客户端推送信息,所以延时的情况很少,并且由于服务端的可操控性使得服务器的压力也迅速减小。其实这种技术还有其他的实现方式,通过 Iframe,在页面上嵌入一个隐藏帧(Iframe),将其“src”属性指向一个长连接的请求,这样一来,服务端就能够源源不断的向客户端发送数据。这种方式的不足就在于:它会造成浏览器的进度栏一直显示没有加载完成,当然我们可以通过 Google 的一个称为“htmlfile”的 ActiveX 控件解决,但是毕竟他需要安装 ActiveX 控件,对于终端用户也是不合适的。
套接字:可以利用 Flash 的 XMLSocket 类或者 Java 的 Applet 来建立 Socket 连接,实现全双工的服务器推送,然后通过 Flash 或者 Applet 与 JavaScript 通信的接口来实现最终的数据推送。但是这种方式需要 Flash 或者 JVM 的支持,同样不太合适于终端用户。
HTML5 的 WebSocket:这种方式其实与套接字一样,但是这里需要单独强调一下:它是不需要用户而外安装任何插件的。HTML5 提供了一个 WebSocket 的 JavaScript 接口,可以直接与服务端建立 Socket 连接,实现全双工通信,这种方式的服务器推送就是完全意义上的服务器推送了,没有半点模拟的成分,只是现阶段支持 HTML5 的浏览器并不多,而且一般老版本的各种浏览器基本都不支持。不过 HTML5 是一套非常好的标准,在将来,当 HTML5 流行起来以后将是我们实现服务器推送技术的不二选择。
回页首
结束语
这篇文章介绍了 Dojo 中的服务器推送 Cometd 工具包。基于服务器推送的理念,介绍了 Bayeux 协议的核心思想,并结合一个简单示例介绍了服务器推送的基本实现。随后,本着快速建立服务器推送框架的想法,介绍了 Dojo 的 Cometd 工具包,并分别从客户端接口和服务端接口两个方面分别介绍了 Dojo 的服务器推送框架的搭建和实现原理。最后,通过一些简单的示例展示了基于服务端推送的业务逻辑的具体实现。服务端推送技术具有很强的实用性,希望广大读者在开发自己的项目的过程中多关注一下,以尽可能多的完善自己的 Web 应用。
参考资料
学习
Dojo 校园文档主页:Dojo 中控件的比较完全的 API 文档主页,包括 Dojo,Dijit,Dojox 等等。
Dojo 官方文档主页:Dojo 官方的很多支持 Ajax 应用程序开发的组件的文档。
“在 Ajax 应用程序中实现实时数据推送”(developerWorks,2009 年 11 月):一些特殊的应用场景会要求 Web 程序的客户端能够在服务器端数据发生变化时立即得到通知,这就要求应用程序具有“服务器推送”的特性。本文讲述了如何利用 RIA 技术来通过套接字来实现相应的功能,并介绍了具体的实现方法。
“使用 HTML5 WebSocket 构建实时 Web 应用”(developerWorks,2011 年 12 月):本文主要介绍了 HTML5 WebSocket 的原理以及它给实时 Web 开发带来的革命性的创新,并通过一个 WebSocket 服务器和客户端的案例来充分展示 WebSocket 的强大和易用。
developerWorks Web development 专区:通过专门关于 Web 技术的文章和教程,扩展您在网站开发方面的技能。
developerWorks Ajax 资源中心:这是有关 Ajax 编程模型信息的一站式中心,包括很多文档、教程、论坛、blog、wiki 和新闻。任何 Ajax 的新信息都能在这里找到。
developerWorks Web 2.0 资源中心,这是有关 Web 2.0 相关信息的一站式中心,包括大量 Web 2.0 技术文章、教程、下载和相关技术资源。您还可以通过 Web 2.0 新手入门栏目,迅速了解 Web 2.0 的相关概念。
查看 HTML5 专题,了解更多和 HTML5 相关的知识和动向。
原文转自:http://blog.csdn.net/dojotoolkit/article/details/7298417
相关推荐
本“dojo服务器推送技术的HelloWorld例子”旨在帮助初学者理解如何使用Dojo和Comet技术来实现实时通信。下面我们将深入探讨这两个关键概念。 1. **Dojo框架**: Dojo是一个开源的JavaScript库,提供了丰富的功能和...
这样,当服务器推送新数据时,相应的处理逻辑就会被触发。 4. **错误处理与重试机制**:在连接中断或失败时,cometd库会自动尝试重新建立连接,确保数据传输的可靠性。 **应用场景与优势** dojo数据推送库结合...
- **Pushing**:服务器推送,当服务器有新数据时,主动通知客户端。 **5. 安全性与优化** - **安全性**:DWR可以通过限制可访问的方法、IP白名单等措施来确保安全。 - **性能优化**:批量调用、缓存策略、压缩传输...
- **使用Server Push技术**:例如Comet技术,可以让服务器主动推送数据到客户端。 - **选择合适的框架**:根据项目需求选择合适的框架,如DWR、Dojo Toolkit等。 ##### 3.2 实现案例 - **即时股市报价**:可以采用...
这些技术能够实现服务器主动推送消息给客户端,而不是客户端持续轮询服务器,从而实现高效的数据同步。 7. **安全性与隐私**:在开发聊天软件时,必须考虑用户数据的安全性和隐私保护。这包括使用HTTPS协议加密通信...
虽然AJAX主要是基于请求-响应模型,但现代Web开发中也有采用WebSocket等技术实现服务器向客户端推送数据的方式,以实现更加实时的通信需求。 #### HTML与CSS的局限 在AJAX出现之前,HTML和CSS主要用于构建静态网页...
CometD是一个基于Bayeux协议的开源框架,专门用于实现服务器到客户端的实时双向通信。这个框架在Web应用中非常有用,...通过深入理解和实践,你可以掌握这些技术并应用于实际项目中,提升用户体验并降低服务器压力。
- **Comet**:一种服务器推送技术,常用于实时聊天和股票报价等场景。 - **WebSockets**:现代浏览器支持的双向通信协议,提供持久连接,适用于实时应用。 6. **Ajax与服务器端技术** - **Java Servlets** 和 **...
它提供了服务器推送技术,使得服务器可以主动向客户端发送数据,而不仅仅是响应客户端的请求。CometD的核心理念是通过长连接来实现双向通信,极大地提高了Web应用的交互性和实时性。 在"cometd-3.0.0.beta2-...
5. **实时通信**:介绍DWR的推送技术,使得服务器可以主动向客户端发送数据,实现聊天室、股票报价等实时应用。 6. **安全与优化**:讨论如何防止跨站脚本攻击(XSS)和跨站请求伪造(CSRF),以及如何优化DWR性能...
1. **ch12**: 可能是第十二章,可能涉及Ajax的高级话题,如 Comet 或 WebSockets 技术,用于实现服务器向客户端推送数据。 2. **ch9**: 可能讲述了Ajax与服务器端脚本的集成,如PHP、Python或Ruby等。 3. **ch13**: ...
1. **反向Ajax (Reverse AJAX)**:DWR的核心是反向Ajax技术,即服务器主动向客户端推送数据,而不是等待客户端的请求。 2. **远程方法调用 (Remote Method Invocation)**:DWR允许JavaScript在客户端直接调用服务器...
DWR的核心概念是反向AJAX或Comet,这是一种技术,通过它,服务器能够主动推送数据到客户端,而不仅仅是响应客户端的请求。这大大提高了实时性和用户体验。DWR通过提供一套JavaScript库和Servlet来简化这个过程,使得...
- **实时聊天**:DWR可以实现服务器端消息的即时推送,适合构建实时通讯系统。 - **文件上传**:DWR可以处理大文件上传,同时显示上传进度。 5. **DWR的扩展和优化** - **DWR Engine**:控制与服务器的交互,...
之后,客户端可以通过Ajax(Asynchronous JavaScript and XML)技术定期轮询服务器,检查处理是否完成,或者服务器可以使用WebSockets等技术主动推送结果给客户端。 在Struts2中,异步调用通常涉及以下组件: 1. *...
- **Comet技术**:长轮询和流式传输,实现服务器向客户端推送数据。 9. **Ajax与Web存储** (第09章) - **Cookie、LocalStorage和SessionStorage**:讲解三种在客户端存储数据的方式,以及它们的区别和使用场景。 ...
- **Server-Sent Events (SSE)**:服务器可以主动推送给客户端,常用于实时更新。 8. **安全性与隐私** - **XSS(Cross-site scripting)**:防止恶意脚本注入,确保数据安全。 - **CSRF(Cross-site request ...