`
艾建锋
  • 浏览: 24398 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

(二)Pushlet框架服务器端分析-框架源码下载-转载

阅读更多
下篇文章将分析Pushlet2.0.3的客户端js:js-pushlet-client.js
文件附件提供了Pushlet框架源码下载http://dl.iteye.com/topics/download/aef40c55-a8d4-3a50-a941-bea7f6ffd18d

转载地址:http://blog.csdn.net/yxw246/archive/2008/05/08/2418255.aspx
Pushlet 2.0.3 源码分析
----服务器端
1 总体架构
Pushlet从功能上实现了服务器推技术,整个框架涉及了服务器端以及客户端的部署。服务器端采用servlet技术,监听客户端请求。客户端分为两大类,浏览器以及桌面应用程序。下图描述了系统的整体框架:


图1 pushlet总体架构图
从图中可以看出服务器端返回响应的出口只有一个,那就是clientAdapter,它只是一个接口,根据不同的客户端类型来产生相应的adapter发送响应结果。
各个类的主要职责描述:
Pushlet:负责接收所有用户请求,并将请求包装为event对象,在根据session、event、request、response对象构造一个command对象,最后将command对象交由controller处理。
Session:代表一次用户会话,此类不同于httpsession,因为此session的实现是使用类似url重写方式,在服务器分配了sessionid之后的每个请求中都加入这个参数以标识会话。会话在其存活期内有效。
Controller:是所有命令的执行器,包括各种控制命令以及数据推送命令。不过对于数据推送的实际执行并不是在controller中实现,而是委托给subscriber只执行。
Subscriber:这是核心类之一。它维护了一个阻塞的事件队列,根据客户端使用的不同模式(框架定义的模式有:stream,pull/poll,stream使用了http长连接,pull/poll则是通过客户端定时刷新实现服务端推送)来发送响应事件。
Dispatcher:事件分发器,也是核心类之一。事件来源可以是客户(通过publish命令发布事件),也可以是eventSource。实现了多播,广播以及单播事件,具体采用哪种方式根据事件属性决定。事件接收端即是subscriber的事件队列。
clientAdapter:有3个具体实现,browserAdapter、XMLAdapter、serializedAdapter。分别用来发送javascript、xml、序列化数据。使用于不同的客户端。具体使用哪种adapter需要根据用户请求事件的format参数决定。
其他公共类:提供了日志、可配置等功能。


图2 核心类的对应关系
2 原理分析
Pushlet采用服务器端回调技术以及HTTP长连接实现了服务器推服务的两种模式,即stream,pull/poll。其中对于浏览器客户端还应用了DHTML技术,通过回调javascript函数在不刷新页面的前提下实时更新,普通的桌面应用很容易便可以实现这种效果。为了提高系统的可靠性以及健壮性,通信过程中开通了两条通道,控制通道和数据通道。控制通道不会阻塞,能够实时接收客户命令,而数据通道工作在阻塞模式下,当传输模式为stream时,数据通道连接不断开,直至用户发送断开命令或客户端退出或服务器异常,为了防止阻塞时间过长导致客户端无法得知服务器是否正常工作,系统设置了阻塞过期时间,并且在过期之后向客户端发送心跳消息表明自身仍然存活。而在pull/poll模式下,阻塞直至有数据可以发送,然后断开连接。浏览器客户端需要不停的发送心跳请求,目的是为了解决浏览器一直繁忙的状态。以下是系统的协议服务(protocol services)

Service Description
join 启动一个会话
leave 结束会话
subscribe 订阅相关主题
unsubscribe 取消订阅相关主题
listen 打开数据通道,以stream、pull或poll模式开始数据流传输。在pull/poll模式下,服务器提供所谓的刷新操作,实际上是客户端来重新请求以获取数据
join-listen 通过一个请求完成会话启动,订阅并监听数据。执行完之后的状态与执行完listen类似。
publish 发布数据,然后服务器将其分发。客户端可以使用它进行多播或单播数据。
heartbeat 表明会话存活


3 具体实现
Pushlet采用了大量的单例和工厂模式,另外还有适配器模式、命令模式。实现中遵循面向接口以及抽象类编程,这些使得系统易于理解,易于扩展。系统的大多数属性都是在配置文件中指定,如果有通过系统扩展点编写的扩展类要替代默认实现的话,只需要修改配置文件指向你自己的类文件即可,不需改动代码。接下来就沿着请求—响应的主线来分析系统源码。
请求入口pushlet
Init()方法:
30 String webInfPath = getServletContext().getRealPath("/") + "/WEB-INF";
31 Config.load(webInfPath);//载入配置文件,存放在该类的变量中
32
33 Log.init();//初始化日志类
34
35 // Start
36 Log.info("init() Pushlet Webapp - version=" + Version.SOFTWARE_VERSION + " built=" + Version.BUILD_DATE);
37
38 // Start session manager,负责管理session生命周期,这是系统的扩展点,下文详解.
39 SessionManager.getInstance().start();
40
41 // Start event Dispatcher,负责分发系统或客户事件
42 Dispatcher.getInstance().start();
43
44
45 if (Config.getBoolProperty(Config.SOURCES_ACTIVATE)) {
46 EventSourceManager.start(webInfPath);//启动事件源管理器
47 } else {
48 Log.info("Not starting local event sources");
49 }
初始化完毕之后便可以处理用户请求了.它可以处理两种请求,get和post,处理方式主要是提取请求参数,然后将其封装成event事件对象,再进一步构造command对象,最终的处理有交给了controller。这部分的代码很简单,因为主要的处理逻辑都委托给了controller。这段代码有几点是值得学习的。
1) 抽象。Event对象封装了属性—值对,内部通过hashmap实现,原理上来讲,它可以封装任何信息,为了使这样的一个抽象能够适于作为系统的通用数据抽象形式,还需要加入一个必备属性,即event_type。请求以及数据均被定义为事件,然后通过内部协议来区分它们。通过抽象之后,系统可以以一致的处理形式应对各种数据。后面将要分析的subscriber维护着一个事件队列,使用该队列完成所有的交互。这便是使用了这个抽象机制的好处。
2) 命令模式command。一个命令里包含了请求事件、响应事件以及response,request,session对象。Controller便是这个命令的执行器,通过一个简单的doCommand接口隐藏了内部复杂的处理逻辑,降低了模块的耦合度。执行完命令之后,要输出的结果就是响应事件responseEvent。Controller处理代码如下
49 // Update lease time to live,更新session生存期,防止过期
50 session.kick();
51
52 // Set remote IP address of client,设置远程客户端地址
53 session.setAddress(aCommand.httpReq.getRemoteAddr());
54
55 debug("doCommand() event=" + aCommand.reqEvent);
56
57 // Get event type
58 String eventType = aCommand.reqEvent.getEventType();
59
60 // Determine action based on event type,根据事件类型采取
相应的操作,分别构造响应事件
61 if (eventType.equals(Protocol.E_REFRESH)) {
62 // Pull/poll mode clients that refresh
63 doRefresh(aCommand);
64 } else if (eventType.equals(Protocol.E_SUBSCRIBE)) {
65 // Subscribe
66 doSubscribe(aCommand);
67 } else if (eventType.equals(Protocol.E_UNSUBSCRIBE)) {
68 // Unsubscribe
69 doUnsubscribe(aCommand);
70 } else if (eventType.equals(Protocol.E_JOIN)) {
71 // Join
72 doJoin(aCommand);
73 } else if (eventType.equals(Protocol.E_JOIN_LISTEN)) {
74 // Join and listen (for simple and e.g. REST apps)
75 doJoinListen(aCommand);
76 } else if (eventType.equals(Protocol.E_LEAVE)) {
77 // Leave
78 doLeave(aCommand);
79 } else if (eventType.equals(Protocol.E_HEARTBEAT)) {
80 // Heartbeat mainly to do away with browser "busy" cursor
81 doHeartbeat(aCommand);
82 } else if (eventType.equals(Protocol.E_PUBLISH)) {
83 // Publish event
84 doPublish(aCommand);
85 } else if (eventType.equals(Protocol.E_LISTEN)) {
86 // Listen to pushed events
87 doListen(aCommand);
88 }
89
90 // Handle response back to client
91 if (eventType.endsWith(Protocol.E_LISTEN) ||
92 eventType.equals(Protocol.E_REFRESH)) {
//请求类型是listen或refresh,表明是获取数据
93 // Data channel events
94 // Loops until refresh or connection closed
95 getSubscriber().fetchEvents(aCommand);
96
97 } else {
98 // Send response for control commands,控制命令,直接返回。
99 sendControlResponse(aCommand);
00 }

sendControlResponse()代码:

31 aCommand.sendResponseHeaders();//设置响应头,主要是客户端//缓存无效
32
33 // Let clientAdapter determine how to send event
//通过clientAdapter发送响应事件
34 aCommand.getClientAdapter().start();
35
36 // Push to client through client adapter
37 aCommand.getClientAdapter().push(aCommand.getResponseEvent());
38
39 // One shot response
40 aCommand.getClientAdapter().stop();

Subscriber:
fetchEvents()部分代码:
。。。。。。。。。。。。。。。。。
。。。。。。。。。。。。。。。。
15 Event[] events = null;
16
17 // Main loop: as long as connected, get events and push to client
18 long eventSeqNr = 1;
19 while (isActive()) {//这个循环保证了连接不被关闭,即可以以流的//方式发送响应到客户端,真正意义上的服务器推送数据
20 // Indicate we are still alive
21 lastAlive = Sys.now();
22
23 // Update session time to live
24 session.kick();
25
26 // Get next events; blocks until timeout or entire contents
27 // of event queue is returned. Note that "poll" mode
28 // will return immediately when queue is empty.
29 try {
30 // Put heartbeat in queue when starting to listen in stream mode
31 // This speeds up the return of *_LISTEN_ACK
32 if (mode.equals(MODE_STREAM) && eventSeqNr == 1) {
33 eventQueue.enQueue(new Event(E_HEARTBEAT));
34 }
35 //此方法获取事件队列里的事件,有超时设置,为阻塞操作
36 events = eventQueue.deQueueAll(queueReadTimeoutMillis);
37 } catch (InterruptedException ie) {
38 warn("interrupted");
39 bailout();
40 }
41
42 // Send heartbeat when no events received,超时后,发送心跳信息
43 if (events == null) {
44 events = new Event[1];
45 events[0] = new Event(E_HEARTBEAT);
46 }
47
48 // ASSERT: one or more events available
49
50 // Send events to client using adapter
51 // debug("received event count=" + events.length);
52 for (int i = 0; i < events.length; i++) {
53 // Check for abort event
54 if (events[i].getEventType().equals(E_ABORT)) {
55 warn("Aborting Subscriber");
56 bailout();
57 }
58
59 // Push next Event to client
60 try {
61 // Set sequence number
62 events[i].setField(P_SEQ, eventSeqNr++);
63
64 // Push to client through client adapter
65 clientAdapter.push(events[i]);
66 } catch (Throwable t) {
67 bailout();
68 return;
69 }
70 }
71
72 // Force client refresh request in pull or poll modes
73 if (mode.equals(MODE_PULL) || mode.equals(MODE_POLL)) { //如果不是stream模式,就在向客户端发送刷新命令,以获取新 //的数据 ,并退出循环,服务器自动关闭连接。因为这是http连接,响应方法//只要返回连接就会被关闭。
74 sendRefresh(clientAdapter, refreshURL);
75
76 // Always leave loop in pull/poll mode
77 break;
78 }
79 }
最后,响应事件通过clientAdapter真正发送到客户端。
BrowserAdapter部分代码:
13 protected String event2JavaScript(Event event) throws IOException {
14//将事件对象转化为javascript脚本,实际上是回调脚本的代码
15 // Convert the event to a comma-separated string.
16 String jsArgs = "";
17 for (Iterator iter = event.getFieldNames(); iter.hasNext();) {
18 String name = (String) iter.next();
19 String value = event.getField(name);
20 String nextArgument = (jsArgs.equals("") ? "" : ",") + "'" + name + "'" + ", \"" + value + "\"";
21 jsArgs += nextArgument;
22 }
23
24 // Construct and return the function call */
25 return "<script language=\"JavaScript\">parent.push(" + jsArgs + ");</script>";
26 }
Command部分代码:使用适配器模式,可以将不同客户端处理方式的不同点隐藏,客户代码使用同一接口调用,这样可以很方便的添加其他客户端类型的适配器。不过我个人觉得这三种适配器已经可以适应所有客户端类型了,而且框架的作者也没做扩展的打算。因为在这里是直接硬编码生成适配器对象的,而没有用到反射机制动态生成配置文件所定义的类型。
protected ClientAdapter createClientAdapter() throws PushletException {
96
97 // Assumed to be set by parent.获取响应格式,系统定义了4种格式,
// js、xml、 ser(序列化对象)、xml-strict
98 String outputFormat = session.getFormat();
99
00 // Determine client adapter to create.根据不同的格式返回相应的//Adapter
01 if (outputFormat.equals(FORMAT_JAVASCRIPT)) {
02 // Client expects to receive Events as JavaScript dispatch calls..
03 return new BrowserAdapter(httpRsp);
04 } else if (outputFormat.equals(FORMAT_SERIALIZED_JAVA_OBJECT)) {
05 // Client expects to receive Events as Serialized Java Objects.
06 return new SerializedAdapter(httpRsp);
07 } else if (outputFormat.equals(FORMAT_XML)) {
08 // Client expects to receive Events as stream of XML docs.
09 return new XMLAdapter(httpRsp);
10 } else if (outputFormat.equals(FORMAT_XML_STRICT)) {
11 // Client expects to receive Events embedded in single XML doc.
12 return new XMLAdapter(httpRsp, true);
13 } else {
14 throw new PushletException("Null or invalid output format: " + outputFormat);
15 }
16 }

单例模式以及工厂模式:
Dispatcher,SessionManager两者都使用了单例模式,在全局维持一个实例,充当了全局对象的作用,因为保存在这些对象里的数据或方法可以很方便的被进程内的其他对象访问,如session集合、dispatcher的各种分发事件的方法。这种方案在进程内可以很好的工作,但是如果想将应用扩展成为分布式应用,那就必须修改这些实现。
为什么要考虑分布式的可能呢?因为stream模式是通过HTTP长连接实现的。保持这个连接意味着每有一个订阅请求,就会持续占用那个连接,直到产生取消订阅的请求或者服务器异常。因为连接一致被占用,相应的servlet线程也被占用了,这样系统的总吞吐量就取决于线程池的大小乃至操作系统的连接限制。这样的限制直接导致了这个框架无法满足中型以上的系统需求。其中一种解决方案就是使框架支持分布式,通过多台服务器并行处理请求,在分布式系统中,相应的分布式sessionManger,Dispatcher是必须的,但是实现这两个类的难度显然是很高的,不知在以后的版本中是否会有这种实现。
目前,我觉得pull/poll模式更为实用,因为这种模式不会持续保持连接,使线程池可以发挥作用,但是,它是靠客户端定时刷新的,这样会给服务器带来较大的压力,如果刷新很频繁的话,实际的吞吐量也不高。(本人并没有实际测试过,但是从理论分析应该是这样的)
Controller、Session、Subscriber、Subscription、EventSourceManager这些类使用了工厂模式并结合java反射机制动态生成实例对象,这些都是框架预留的扩展点,开发人员可以通过扩展点实现符合自己需求的类,并通过配置文件将其整合到框架中来。一段典型的代码如下:
摘自Controller.java
33 public static Controller create(Session aSession) throws PushletException {
34 Controller controller;
35 try {
//读取配置文件,并生成实例对象
36 controller = (Controller) Config.getClass(CONTROLLER_CLASS, "nl.justobjects.pushlet.core.Controller").newInstance();
37 } catch (Throwable t) {
38 throw new PushletException("Cannot instantiate Controller from config", t);
39 }
40 controller.session = aSession;
41 return controller;
42 }

总结:通过阅读pushlet的源码,让我学到了很多实战编程经验,希望本文可以给java爱好者一点点帮助。
注:本文并没有分析所有代码细节,而且只针对服务器端代码,如果有兴趣的话可以自己到网上下载pushlet的源码,去体验高人的风范!



本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/yxw246/archive/2008/05/08/2418255.aspx
分享到:
评论
2 楼 pcgreat 2012-03-15  
已经相当的详细了.   
1 楼 xiaoniu_oo 2010-03-26  
写得比较详细

相关推荐

    pushlet的笔记

    - **服务器端源码**:在《二)Pushlet框架服务器端分析-框架源码下载-转载.htm》中,博主深入剖析了Pushlet服务器端的架构和核心类,包括了如何处理连接、事件分发机制以及服务器如何维护长连接等关键代码。...

    http长连接(服务器推)使用pushlet教程示例以及jar包源码

    Pushlet框架提供了服务器端和客户端的API,使得开发者能够轻松地创建和维护长连接。它的核心组件包括Pushlet Server、Pushlet Client和Event Dispatcher。Pushlet Server负责接收和处理客户端的连接,Pushlet Client...

    comet 框架 之 pushlet

    3. **Event Source**:这是服务器端的数据源,可以是数据库、消息队列或者其他任何能够提供实时数据更新的系统。当数据源发生变化时,Pushlet Server会将这些变化推送到相应的客户端。 4. **Protocol**:Pushlet...

    pushlet.jar 和示例工程

    压缩包wjw465150-Pushlet-09f5559可能包含了Pushlet框架的源码、文档、配置示例和一个运行示例工程。源码可以帮助开发者理解内部工作原理,以便进行定制和扩展。文档通常会详述如何配置和使用Pushlet,包括设置...

    pushlet 之 Pushlet使用手把手实例

    Pushlet框架包括服务器端组件和客户端组件,通过简单的API和协议,使得开发者可以方便地实现服务器向客户端的实时推送。 二、Pushlet 工作原理 1. 客户端连接:客户端通过建立一个持久的HTTP连接到Pushlet服务器,...

    pushlet的JAR包和文档

    3. **源码分析**:"pushlet 2.0.3 源码分析(服务器端) .htm" 提供了对Pushlet服务器端代码的深度解读,对于开发者来说,这是一个很好的学习和理解Pushlet内部机制的资源。通过源码分析,可以了解Pushlet如何利用HTTP...

    服务器推送——PushLet的应用<一>

    本文将深入探讨一种名为PushLet的服务器推送框架,并通过源码分析,揭示其工作原理与应用。 PushLet是一个轻量级的Java服务器推送框架,它基于HTTP长连接,利用了HTTP 1.1的Keep-Alive特性,实现了服务器到客户端的...

    开源框架Pushlet入门(转)

    Pushlet框架由两部分组成:服务器端组件和客户端组件。服务器端通常是一个Servlet容器,如Tomcat或Jetty,其中部署了Pushlet服务。客户端则是一个JavaScript库,用于接收来自服务器的数据推送。 **服务器端工作原理...

    基于pushlet web 实时聊天系统

    【基于Pushlet Web实时聊天系统】是一个利用Pushlet框架构建的在线实时通信应用。Pushlet是一种服务器推送(Server-Sent Events)技术,它允许服务器主动将数据推送到客户端,而不是传统的客户端轮询请求数据的方式...

    Pushlet后台往jsp前台推送消息实例

    - **Pushlet Server**:服务器端,处理来自客户端的连接请求,维护长连接,并负责将消息推送给客户端。 - **Producer**:消息生产者,负责生成需要推送的数据。 - **Consumer**:消息消费者,通常是Web浏览器或...

    java服务端推送实例-pushlet-及中文问题

    然后创建一个Pushlet客户端连接到服务器,服务器端则需要创建一个Pushlet服务端组件,该组件监听特定的事件并负责数据推送。在这个过程中,可能会遇到中文字符编码的问题,因为默认的UTF-8编码可能不被正确处理。 ...

    服务器推送——PushLet的应用<二>

    在提供的压缩包文件`PushLet_B`中,包含了PushLet框架的部分源代码,通过阅读这些源码,我们可以更深入地理解其内部工作原理和实现方式。例如,`PushletServlet`类是服务器端的核心,负责处理客户端连接和推送事件;...

    通过推送机制(Tomcat comet,pushlet)实现进度条

    Pushlet框架提供了一种简单的方式,让服务器可以持续发送数据到客户端,同时客户端可以通过订阅(subscribe)和取消订阅(unsubscribe)来管理这些推送的数据流。 在Pushlet模型中,服务器端有一个Pushlet容器,...

    pushlet聊天工具编辑中。。。。

    - **Comet技术**:一种实现服务器端推送的早期方法,通过长时间保持HTTP连接来实现双向通信。 - **WebSocket**:现代Web应用中的标准推送技术,提供全双工、低延迟的通信通道,适合实时聊天应用。 2. **架构设计*...

    pushlet java 消息实时推送

    通过分析这个项目源码,我们可以深入理解Pushlet的工作原理,学习如何在Java中实现消息推送,并解决实际应用中的中文字符处理问题。 总的来说,Pushlet Java 消息实时推送实例是一个很好的学习资源,它不仅展示了...

    pushlet的原文件

    2. **Publisher API**:提供了一套API,让开发者可以在服务器端创建和管理数据发布者,发布数据到Pushlet Server。 3. **Subscriber API**:客户端可以使用这套API来订阅感兴趣的数据源,并接收服务器推送的数据。 ...

    pushlet2.0.2

    这样,当服务器端有新数据可用时,会立即推送给订阅的客户端。 2. **HTTP长连接**:传统的HTTP协议是基于短连接的,而Pushlet通过保持持久连接来克服这个问题,允许服务器在连接保持期间随时向客户端推送数据。 3....

    Extjs 聊天窗口 -续2 - http长连接的实现

    读者可以通过提供的博文链接进一步学习具体实现细节,如如何在Extjs中集成WebSocket库,或者如何使用Pushlet框架进行Comet通信。同时,`pushlet.doc`文档很可能是关于Pushlet框架的详细指南,可以帮助理解其工作原理...

    Ext demopushlet + Extjs 聊天室v0.9 (含源码)

    【标题】"Ext demopushlet + Extjs 聊天室v0.9 (含源码)" 是一个基于Web的实时聊天应用示例,它整合了Pushlet技术和ExtJS库,提供了完整的源代码供开发者学习和参考。Pushlet是一种推送技术,用于实现实时数据从...

Global site tag (gtag.js) - Google Analytics