精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2010-01-27
最后修改:2010-01-27
IM概念和Jabber协议 3.2会话池维持客户端连接 一个典型的jabber服务将维持许多并发的,长时间的客户端连接。每一个会话在客户端和服务器端定义一个上下文包,并在他们之间通行。每一Session的上下文都必须为维持每一个连接保持连接。它它包含了如下信息: l Session保持连接的jabberID l Session保持连接的StreamID l 被Session用到的java.net.Socket以及对应的java.io.Reader/Writer对象 l Session状态(disconnected,connected,streaming,authenticated)
这个连接的集合和他的维持信息被封装在一个Session对象中。另外,所有的在服务器中的会话在包信息存取都是非常容易丢失的。我们开发一个集中的SessionIndex类就是为了保持我们的活动的会话不丢失以及保证通过jabberID能够找到会话。 我们首先看看Session类。 3.2.1会话类抽象一个连接 会话类提供了一个方便的session上下文信息分组的办法。类开始声明了两个构造器,基本的数据域和访问方法。我们也提供了两个为SessionSocket对象读写功能的方法。大部分时候,session对象用到java.io.Writer写信息到SessionSocket的outputStream中,或者Java.io.Reader读取信息。通过创建和保存一个Reader和Werter,使用者可以不用得到Socket,自己就可以得到Input/OutputStream,和创建Reader/writer。 public class Session{ public Session(Socket socket) { setSocket(socket); } public Session() { setStatus(DISCONNECTED); } JabberID jid; public JabberID getJID() { return jid; } public void setJID(JabberID newID) { jid = newID; } String sid; public String getStreamID() { return sid; } public void setStreamID(String streamID) { sid = streamID; } Socket sock; public Socket getSocket() { return sock; } public void setSocket(Socket socket) { sock = socket; setStatus(CONNECTED); } Writer out; public Writer getWriter() throws IOException { if (out == null){ out = new BufferedWriter(new OutputStreamWriter(sock.getOutputStream())); } return out; } Reader in; public Reader getReader() throws IOException { if (in == null){ in = new BufferedReader(new InputStreamReader(sock.getInputStream())); } return in; } 最让人感兴趣的是session里的状态管理代码。通过几个类我们能够知道怎么在状态发生改变同时session状态发生改变。我们在下面的章节将会在客户端代码中真切的体会。为了支持这些功能,我们用的会话状态事件模型可以在Swing类中发现。 在此模型中,注册监听事件。当一个事件被触发,会话会通知注册的监听事件。这个过程一时间看来有些小混乱。但是其实这是一条康庄大道,是解决多个对象监控另外对象的变量的一个好办法。ListinListing 3.2 The Session class st atus event code LinkedList statusListeners = new LinkedList(); public boolean addStatusListener(StatusListener listener){ return statusListeners.add(listener); } public boolean removeStatusListener(StatusListener listener){ return statusListeners.remove(listener); } public static final int DISCONNECTED = 1; public static final int CONNECTED = 2; public static final int STREAMING = 3; public static final int AUTHENTICATED = 4; int status; public int getStatus() { return status; } public synchronized void setStatus(int newStatus){ status = newStatus; ListIterator iter = statusListeners.listIterator(); while (iter.hasNext()){ StatusListener listener = (StatusListener)iter.next(); listener.notify(status); } } } 一个Java.util.LinkedList类用来维持一个Session监听列表。加入和删除一个状态是非常容易的。所有的状态时间监听必须继承StatusListener接口。 The StatusListener interface public interface StatusListener { public void notify(int status); } 它只有一个notify()方法,被用在setStatus()方法发送一个事件信息给监听器。为了连贯性,我也为我设想的Session类的4个状态定义几个标准值。 会话类抽象了在客户端和服务器端的一个网络连接和jabberSesison。服务器将处理许多同步的Session,通一个组织结构为jabberID定位Sessions。SessionIndex类承担这些职责。 3.2.3SessionIndex类提供session查找 SessionIndex的主要职责是更具jabberID查找Session对象。对服务器来说按照jabberID定位到正确的Session对象是很重要的,因为多数的jabber包来自于jabber用户。服务器必须根据收到的jabberID定位到适当的session并回复包到客户端。 为实现这些功能,SessonIndex类包括两个Java.util.HashTable对象:userIndex和JidIndex。UserIndex Hashtable提供了Session的用户和Session对象的映射。JidIndex提供了Session的jabberID字符串和Session对象之间的映射。 使用SessonIndex引导session的查找使用以下算法: l 检查如果接收者的jabber id是在jidindex 。比较使用 l 如果没有,查看jabberId的用户名在userIndex中 l 如果没有找到,返回null。 使用这个算法服务器能够显示一个合理的消息回路。例如,假设一个客户端的jabberID是iain@shigeoka.com/home,连接服务器。SessionIndex将查找JabberID,在jidIndex中找到它,返回合适的Session。现在,假设消息的地址是iain@shigeoka.com。SessinIndex类在JIdIndex中查找,但没有找到。然而当SessionIndex在userIndex中查找,找到iain,返回连接iain@shigeoka.com/home的Session。
最后,考虑一下消息地址iain@shigeoka/work。SessionIndex类在jidIndex中查找失败,但是能够看到iain实体在userIndex并且返回Session附在iain@shigeoka.com/home。因此,消息被传到适当的用户但是是备用的资源。这个动作符合jabber消息路由规定。 执行的类可以简单明了的管理这些映射。
Hashtable userIndex = new Hashtable(); Hashtable jidIndex = new Hashtable(); public Session getSession(String jabberID){ return getSession(new JabberID(jabberID)); } public Session getSession(JabberID jabberID){ String jidString = jabberID.toString(); Session session = (Session)jidIndex.get(jidString); if (session == null){ LinkedList resources = (LinkedList)userIndex.get(jabberID.getUser()); if (resources == null){ return null; } session = (Session)resources.getFirst(); }
return session; } public void removeSession(Session session){ String jidString = session.getJID().toString(); String user = session.getJID().getUser(); if (jidIndex.containsKey(jidString)){ jidIndex.remove(jidString); } LinkedList resources = (LinkedList)userIndex.get(user); if (resources == null){ return; } if (resources.size() <= 1){ userIndex.remove(user); return; } resources.remove(session); } public void addSession(Session session){ jidIndex.put(session.getJID().toString(),session); String user = session.getJID().getUser(); LinkedList resources = (LinkedList)userIndex.get(user); if (resources == null){ resources = new LinkedList(); userIndex.put(user,resources); } resources.addLast(session); } } 如你所见,我在userIndex中为每一个用户名使用了一个Sessions的LinkedList,messages遵循着先到先服务的原则。换句话说,如果你连接以下客户端: 一个发送到iain@shigeoka.com的消息将别发送到iain@shigeoka.com/home。如果iain@shigeoka.com/home没有连接,那么消息发送到iain@shigeoka.com给iain@shigeoka.com/work。这些不是标准的jabber路由动作,但是是在第8章之前,在我们添加用户帐户,用户出席和支持协议之前,能做到得到最好的状态。当我们在这里使用用户出席支持,我们将能够实现更加老道的,优先的路由元数据按照指定的jabber标准。使用SessionIndex的关键是包处理类关联的QueueThread类。 3.3xml解析子系统 3.3.2 服务的焦点,packetQueue类 packetQueue类是一个受限的响应集合的基本数据结构类。然而,他是服务器端的信息流的焦点。Packet从客户端聚集到packetQueue。然后packet按照packet的元素名发散出去。许多操作方法能够被服务器激活,并且同步到packetQueue。 Comand操作设计模式:
For those familiar with design patterns, our packet handling system is a minor variation on the Command Processor design pattern.9 “The Command Processor design pattern separates the request for a service from its execution. A command processor component manages requests as separate objects, schedules their execution, and provides additional services such as the storing of request objects for later undo.
在我们的jabber服务,XML解析动作作为command控制器接受请求,转换xml流到命令。QueeueThread充当命令处理机获得paceket并调度他们的执行。不管像不像命令处理模式,packets是不会自己处理他们自己的运行。双重功能的命令处理供应者和命令处理代替我们的packet处理类,提供我们的整个的包处理功能。 使用comandprocessor模式的好处是: l 请求触发的可扩展性——支持选择启动的方法是的能够很容易的桥连我们的jabber服务和其它message系统。 l 请求的操作和数量上的可扩展性——快速改变jabber协议是非常重要的。 l 相关服务的可编程——在特殊情况下,我们只要稍稍修改QueueThread就能够容易的记录packets和重放jabber会话。 l 应用的可测试性——QueueThread是一个极好的接入点,为我们测试xml解析和packet处理。一旦开启,记录和重放session是一个巨大的帮助。 l 并发——包和他们的处理是一个相对分离的计算。当线程同时执行时,QueueThread能够容易的分配处理。 Commander processor模式有一些缺陷: l 效率的损耗——转换数据格式和提供中间处理步骤需要而外的计算时间的存储空间。 l 潜在着过多的命令类——大部分时候,我们避免把所有的包表现为单一的,一般的paceket类。我们为简单性而降低了一半的呈现效率。另外,一个packet类能够呈现不定数量的packet类。我们的服务必须建立额为的逻辑来垂腭这些包。 PacketQueue必须: l 存储包——接受packet对象推入packetQueue的底 l 回收包——允许packet对象退出packetQueue,并移除它 l 线程安全——无障碍的允许多线程同步从packetQueeue 中push和pull packet。 l 提供线程同步——如果PacketQueue是空的,调整pull包之前让调用者停止一会儿直到一个新的packet进入队列,或者线程终止。线程能够同步的使用它们的操作和保存服务资源。 操作packet对象的线程能够有效的不断的从packetqueue拉出packet对象。PacketQueue自己必须确保这些工作的线程只能够在packetQueue正常有效的情况下执行。 packetQueeue的对象是非常简单的。一个java.util.LinkedList对象,用来存储Packets和一点用来支持服务器多线程环境的基本要素。 在下面高亮的部分提示了方法push()和pull()是线程安全保护的。每一个push()方法的调用时引起了notifyAll()唤醒等待的pull()处理线程。Pull()方法的定时器在队列是空的时候调用wait()方法。
//Actual storage is handled by a java.util.LinkedList LinkedList queue = new LinkedList(); public synchronized void push(Packet packet){ queue.add(packet); notifyAll(); } public synchronized Packet pull(){ try { while (queue.isEmpty()) { wait(); } } catch (InterruptedException e){ return null; } return (Packet)queue.remove(0); } } loop是必须的,因为push()方法调用notifyAll(),唤醒所有等待的pull()。如果仅仅一个packet被推进队列,第一个线程执行移除从队列中。所有其他等待的线程继续运行,队列是空的,他们定时wait(),直到下一个包的到来。 如果你不能理解java线程代码,你可以信任这些工作,写一些测试验证一下,或者学习关于java线程的知识。我已经尽力将本书的中关于thread的代码最小化,但是服务代码倾向于沉重的线程。如果你希望学习到线程,有几个很好的书可以看。另外,java在线文档也提供免费指导。 现在,我们看看用jabberInputHandle类处理xml解析。 3.3.5java中的 SAX 解析(略) 3.4包处理和服务器线程 我最终需要的jabber服务器,包括服务包处理类和服务器端线程。包处理是继承自PacketListerer接口的简单对象,处理包对象。我们用的包处理类实现我们知道的jabber协议。 在这一章,我们将创建三个packet处理类: l 当未关闭的<stream:stream>发送时一个OpenStreamhandler类为每一个连接创建Sessiong对象。 l 一个简单的messagehandler类,实现Jabber包路由初始版本。 l 当发送关闭标签<stream:stream>时候,CloseStreamhandler类经关闭连接。 服务的线程系统有三个类组成,每一个类代表一个线程执行: l Server——主应用程序类和主执行线程。这个类创建jabber服务socket和接受新的jabber客户连接。 l ProcessTread——服务器线程类接受连接处理。他创建一个jabberInputHandler和一个输入xml流过程。ProcessThread线程处理和连接一一相对。 l QueueThread——这个线程类从PacketQueeue得到包然后他打发给适当的包处理类。在jabber服务中仅仅有一个活动的queueThread对象。
我们看看服务的主应用首先启动。他的首要任务是创建QueueThread类。新的queueThread立即调用pull()接受PacketQueue的包。QueueThread定时是因为队列中没有包。服务然后等待这用java.net.ServerSocekt.accept()调用连接。
队列中新的Packet唤醒QueueThread。QueeuThread确定包已经被推入队列,然后用当前处理类处理它。包处理发送输出xml包到客户端。 注意仅有一个服务线程接受连接,并且一个QueueThread处理从packetQueue的包对象。然而,每一个客户链接有一个活动的processThread。有共享执行线程的方法处理输入的客户连接。另外,服务器有许多的包处理线程。然而,在我们的服务中,我们使他们尽量的简单。 从服务的主类创建QueueThread和ProcessThread,我们首先看到有两个帮助线程类,在运行服务助力额之前。QueueThread是个更加的复杂,我们就在这开始。
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
浏览 1728 次