这节主要是分析数据包packet是如何被SessionManager和插件处理的 ,首先分析一些开发的理论知识先:
一、 Tigase服务器插件开发重要的是要了解它是如何工作的。不同类型的插件负责处理数据流中不同阶段的数据包(packet)。
在Tigase服务器中, 插件代码负责处理特定的XMPP节。一个单独的插件可能会负责处理<message>,另一个用于处理<presence>,和还有些插件是单独负责<iq>处理,还有些处理不同版本等等。
一个插件应该提供它感兴趣信息,如哪些XML元素节点,和叫什么名,它具有什么样xmlns名称空间等,这将使得插件在sm中匹配到相对应的packet,从而可以对匹配的packet进行处理。所以你可以创建一个插件来处理包含有感兴趣的caps子节点的packet。也可能都没有一个插件处理某个特定xml节,然而系统会默认的行为是简单转发这个xml节到目的地址。也可能有一个以上的插件处理一个特定的XML元素,然后他们在单独的线程上同时处理同一xml节,所以不保证的不同的插件能按顺序处理xml节。
//一个插件要处理哪些xml元素,是通过以下两个方法来告知系统他将处理那些它感兴趣的xml元素的。 @Override public String[][] supElementNamePaths() { return ELEMENTS; //感兴趣的xml元素。如<message> <iq> } @Override public String[] supNamespaces() { return XMLNSS; //supElementNamePaths中一一对应的xmlns }
每个packet通过SessionManager组件时被处理的几个步骤:
照片显示,每个packet 通过SessionManager处理的4个步骤:
1,Pre-processing
为了不影响Session Manager的性能需要限制该方法处理时间为极小值,用于判断当前package是否应该被阻塞,如果返回为true,则表示阻塞。(只要有一个pre-processor阻塞就算阻塞)
2,Processing
如果一个Package没有被任何的pre-processors阻塞,则才继续执行该方法。所有对当前XML段感兴趣的processor都会将该段加入到独立的线程里运行,这些现成使用内部固定的队列。当所有感兴趣的processor都执行完后就可以得到通知进入下一步。
3,post-processor
对于在第2步中没有被任何processor处理的package将会通过所有的post-processors,并被最后一个post-processor转发到一个目的地,大多数情况是以<message/>的形式被转发。例如MessageAmp.postProcess(..),它判断用户不在线时,massage信息将被保存入数据库。
4,filter
对于以上三步任何形式的结果result输出,都会被所有的filters拦截过滤,这些结果可能最终被拦截也可能被放行。
由于session manager和processors都属于消费者,所以在所有的processors中应该至少有一个processor复制一个新package并发送给某个目标。当然processor处理过程中可以生成任意数量的packet。以上4个步骤任何一个processor都可以生成packet作为结果返出。
看看以下的图片:
如果UserA发送packet-P1到服务器以外的其它节点,例如另一个服务器的用户或一些组件(Muc,PubSub),那么其中一个processor必须新创建一个副本P2的packet并正确地设置所有属性和目的地地址。packet-P1 在被SessionManager处理后消毁了,然而某一个插件生成了一个新的packet-P2。
当然同理,packet从组件转发给用户:
packet的处理组件或插件,必须生成新packet的副本提供给用户。当然,如果没有被任何插件处理的package将会通过系统默认方式转发到一个目的地,注意实现这种方式,因为输入数据包P1可以被许多插件同时处理,因此实际上packet一旦到了SessionManager进行处理应该不得变它。
很明显,下图的处理流程是当用户发送请求到服务器和期望服务器的响应:
这里有一个令人惊讶的设计结果。如果你看看下图显示2个用户之间的交流,可以看到packet送到最终目的地前复制了两次:
上图方式的packet必须被Session Manager处理两次。处理第一次代表packet作为一个即将离开用户A的包和第二次处理它代表传入用户B的packet。 这是确保用户A有权限发送一个packet,确保用户B有权利接收一个packet。另外,如果用户B是离线或脱机的,<message> processors应该把packet信息保存到一个数据库。
二、当需要编写一个自己的PLUGIN的时候根据以上SM分析可以知道 ,
可以按需求来分别实现以下四个接口,可以实现一个也可以实现多个:
1,XMPPPreprocessorIfc
2,XMPPProcessorIfc
3,XMPPPostprocessorIfc
4,XMPPPacketFilterIfc
这四个接口各需要实现一个简单的方法,每个方法的参数类似,参数描述如下:
-Packet packet 需要被处理的PACKET,该PACKET不能为NULL虽然在PROCESS处理过程中无法修改它
-XMPPResourceConnection session 用于保存所有用户的数据,它提供权限访问用户的仓库数据,在没有在线用户SESSION的情况下该参数可以为NULL
-NonAuthUserRepository repo 该参数往往在参数session为NULL的时候被使用,它用于为不在线的用户保存私有或公开的数据信息。
-Queue<Packet> results 这个为输入packet的处理结果产生的packet集合,它总被要求一定要存放一个输入数据包packet的副本到里面,其实包含了所有需要进一步处理的packet,包括process生成的结果packet。
-Map<String, Object> settings 为PLUGIN制定配置信息,一般情况下不需要使用,然而如果需要访问额外的数据库则可以通过配置文件将数据库连接字符串传给plugin
以下就是具体的代码分析了:
SessionManager
public void SessionManager.processPacket(final Packet packet) { if (packet.isCommand() && processCommand(packet)) { packet.processedBy("SessionManager"); } // end of if (pc.isCommand()) //这个方法查找相对应的connection,这方法很重要,from-to ,to-from思想。后面详细分析 XMPPResourceConnection conn = getXMPPResourceConnection(packet); if ((conn == null) && (isBrokenPacket(packet)) || processAdminsOrDomains(packet)) { return; } processPacket(packet, conn); }
protected XMPPResourceConnection getXMPPResourceConnection(Packet p) { XMPPResourceConnection conn = null; //首先先根据发用户发送的packet信息来找连接session,这样的设计正如前面理论中所指出的,发用户是否有权利发packet,发用户A的packet到了SM这时处理为一个阶段,这个packet在这一阶段处理完成就会被消毁了,经过processor处理会生成该packet的副本继续往后面流转 JID from = p.getPacketFrom(); if (from != null) { conn = connectionsByFrom.get(from); if (conn != null) { return conn; } } // It might be a message _to_ some user on this server // so let's look for established session for this user... //如果packet没有含有发用户接连信息则查询接收目标用户连接session,这为下一阶段-这是经过某一个processor处理后转发出来的packet,也就是说packet由系统转发到目的地阶段,判断目的地 接收用户B 是否在线是否有连接在在,如果在线则packet再次经过processor就会转发到用户B了,如果不在线,则最终会被保存入库。 JID to = p.getStanzaTo(); if (to != null) { conn = getResourceConnection(to); } else { } // end of else return conn; } public XMPPResourceConnection getResourceConnection(JID jid) { XMPPSession session = getSession(jid.getBareJID()); if (session != null) { XMPPResourceConnection res = session.getResourceConnection(jid); } return res; } // end of if (session != null) // Maybe this is a call for the server session? if (isLocalDomain(jid.toString(), false)) { return smResourceConnection; } return null; }
protected void processPacket(Packet packet, XMPPResourceConnection conn) { long startTime = System.currentTimeMillis(); int idx = tIdx; tIdx = (tIdx + 1) % maxIdx; packet.setPacketTo(getComponentId()); Queue<Packet> results = new ArrayDeque<Packet>(2); boolean stop = false; if (!stop) { if (defPacketHandler.preprocess(packet, conn, naUserRepository, results)) { packet.processedBy("filter-foward"); addOutPackets(packet, conn, results); return; } } if (!stop) { for (XMPPPreprocessorIfc preproc : preProcessors.values()) { stop |= preproc.preProcess(packet, conn, naUserRepository, results, plugin_config .get(preproc.id())); if (stop && log.isLoggable(Level.FINEST)) { break; } } // end of for (XMPPPreprocessorIfc preproc: preProcessors) } // prepTm = System.currentTimeMillis() - startTime; if (!stop) { if (defPacketHandler.forward(packet, conn, naUserRepository, results)) { packet.processedBy("filter-foward"); addOutPackets(packet, conn, results); return; } } // defForwTm = System.currentTimeMillis() - startTime; if (!stop) { walk(packet, conn); try { if ((conn != null) && conn.getConnectionId().equals(packet.getPacketFrom())) { handleLocalPacket(packet, conn); } } catch (NoConnectionIdException ex) { } } if (!stop) { for (XMPPPostprocessorIfc postproc : postProcessors.values()) { String plug_id = postproc.id(); long[] postProcTime = null; synchronized (postTimes) { postProcTime = postTimes.get(plug_id); if (postProcTime == null) { postProcTime = new long[maxIdx]; postTimes.put(plug_id, postProcTime); } } long stTime = System.currentTimeMillis(); postproc.postProcess(packet, conn, naUserRepository, results, plugin_config.get(postproc.id())); postProcTime[idx] = System.currentTimeMillis() - stTime; } // end of for (XMPPPostprocessorIfc postproc: postProcessors) } // end of if (!stop) // postTm = System.currentTimeMillis() - startTime; if (!stop &&!packet.wasProcessed() && ((packet.getStanzaTo() == null) || (!isLocalDomain(packet.getStanzaTo().toString())))) { if (defPacketHandler.canHandle(packet, conn)) { ProcessingThreads<ProcessorWorkerThread> pt = workerThreads.get( defHandlerProc.id()); if (pt == null) { pt = workerThreads.get(defPluginsThreadsPool); } pt.addItem(defHandlerProc, packet, conn); packet.processedBy(defHandlerProc.id()); } } setPermissions(conn, results); addOutPackets(packet, conn, results); if (packet.wasProcessed() || processAdminsOrDomains(packet)) { Packet error = null; if (stop || ((conn == null) && (packet.getStanzaFrom() != null) && (packet .getStanzaTo() != null) &&!packet.getStanzaTo().equals(getComponentId()) && ((packet.getElemName() == Iq.ELEM_NAME) || (packet.getElemName() == Message .ELEM_NAME)))) { try { error = Authorization.SERVICE_UNAVAILABLE.getResponseMessage(packet, "Service not available.", true); } catch (PacketErrorTypeException e) { } // end of else }
private void walk(final Packet packet, final XMPPResourceConnection connection) { for (XMPPProcessorIfc proc_t : processors.values()) { XMPPProcessorIfc processor = proc_t; Authorization result = processor.canHandle(packet, connection); if (result == Authorization.AUTHORIZED) { ProcessingThreads<ProcessorWorkerThread> pt = workerThreads.get(processor.id()); //获取processor的处理线程,如果预先没有放入,则使用默认的处理线程集处理 if (pt == null) { pt = workerThreads.get(defPluginsThreadsPool); } //加入待处理队列中,则用单独线程去执行process()这样可以提高效率, //而不是用sessionmanager线程执行 if (pt.addItem(processor, packet, connection)) { packet.processedBy(processor.id()); } else { if (result != null) { // TODO: A plugin returned an error, the packet should be bounced back // with an appropriate error } } // end of for () }
相关推荐
tigase相关jar包
在描述中提到“官方下载真实有效”,这意味着你可以从Tigase项目的官方网站获取到这个Web Chat的源码或者预编译的版本,确保软件的安全性和可靠性。官方提供的软件通常会经过严格的测试和更新,能够避免第三方源可能...
高度优化,高度模块化且非常灵活的XMPP / Jabber服务器 这是什么 Tigase XMPP服务器是... -SOCKS5字节流: -用于Tigase的组件 -该组件基于JDK内置HTTP服务器,提供易于使用的HTTP端点进行服务器管理和集成。 -高性
2. **Tigase API**:学习Tigase提供的API,如SessionManager、PacketFilter等。 3. **插件开发**:编写Java代码实现特定功能,如自定义业务逻辑、新的数据存储策略等。 4. **测试与调试**:使用Tigase的内置日志系统...
5. **安全机制**:Tigase支持SSL/TLS加密,确保通信的安全性。同时,它还提供了身份验证和访问控制功能,防止未经授权的访问。 6. **数据库集成**:Tigase可以与多种数据库系统集成,如MySQL、PostgreSQL等,用于...
该资源是整合了tigase的java服务端源代码,环境为:idea + gradle + postgresql 注意,这部分项目只包括java源代码,而数据库备份将在下一个资源打包上传,有疑问请阅读相关博文: ...
源码部署对于想要深入了解Tigase内部工作原理的开发者来说是一个很好的选择,因为它允许开发者查看、修改甚至重新构建源代码。 首先,需要更改项目的pom.xml文件,添加对gmaven的运行时环境的支持,这是因为在...
### XMPP_tigase_IM服务部署安装 #### Tigase开源项目简介 Tigase是一个开源项目,使用Java语言编写,遵循Jabber(即XMPP)协议标准,为用户提供了一个高性能且可扩展的消息传递和即时通讯(IM)平台。Tigase的...
2. Git:用于从官方仓库克隆Tigase源代码,虽然我们这里提到的是快速部署,但了解如何获取最新源码总是有好处的。 接下来,我们将详细介绍快速配置和部署Tigase的步骤: 1. **下载Tigase**:您可以从Tigase官方...
tigase开发指南.pdf tigase开发指南.pdf是一份关于tigase服务器的开发指南,旨在帮助开发者更好地理解tigase服务器的组件、插件、配置和数据库存储。 tigase组件: tigase服务器在启动时会初始化并启动相关组件和...
2. **socks5**:SOCKS5是一种网络协议,Tigase实现了该协议以提供代理服务。这使得客户端可以通过代理服务器连接到XMPP网络,增强了网络连接的灵活性和安全性。 3. **stun**:STUN(Simple Traversal of UDP ...
【Tigase局部部署详解】 Tigase是一款开源的、基于XMPP协议的即时通讯服务器。"tigase-local"这个主题显然与在本地环境中安装和配置Tigase服务器有关。下面将详细介绍Tigase服务器的基本概念、安装过程、配置步骤...
Tigase 概述 Tigase 是一个功能强大且灵活的 XMPP 服务器,提供了许多出色的特性和功能,以下是其主要特点和实现的 XMPP 扩展协议: 为什么选择 Tigase Tigase 完全实现了 XMPP 协议,除了全面实施的两个核心协议...
Tigase Swift XMPP客户端库这是什么Tigase Swift XMPP客户端库是用编程语言编写的客户端库。 它提供了XMPP标准核心的实现并处理XML。 此外,它还提供了对许多流行扩展(XEP)的支持。 该存储库包含该库的源文件。...
5. **初始化数据库**:运行Tigase服务器的初始化脚本,将数据库结构导入到你创建的数据库中。 6. **启动Tigase**:使用命令行启动Tigase服务器。在解压后的目录中,执行类似 `bin/tigase.sh start` 的命令。 7. **...
Tigase Monitor配置详解 Tigase Monitor是一款用于监控Tigase XMPP服务器性能和状态的工具,特别适用于Tigase 7.0版本。本文将详细介绍如何在Tigase 7.0上配置Monitor模块,包括客户端和服务器部分。 首先,确保你...
QuickBlox-Tigase-CustomFeatures 对于QuickBlox自定义功能的列表 tigase服务器: QBAuth-AuthRepository的自定义实现 CustomObjects插件-将聊天消息保存到QuickBlox CustomObjects模块 LastRequestAtPlugin-在...
5. **安全层**:为了保障通信安全,Tigase Server支持SSL/TLS加密,防止数据在传输过程中被窃取。同时,它也提供了防止DDoS攻击和滥用的机制。 6. **监控和日志系统**:源代码中包含了完善的监控和日志模块,便于...
《Tigase服务器7.0.2:一个强大的XMPP服务器详解》 Tigase服务器,作为一款开源的、跨平台的XMPP(Extensible Messaging and Presence Protocol)服务器,为全球范围内的即时通讯和在线状态服务提供了高效且可扩展...
《Tigase服务器的配置详解》 Tigase服务器是一款基于Java开发的XMPP(Extensible Messaging and Presence Protocol)服务器,广泛应用于即时通讯、在线状态服务以及多用户聊天室等场景。本文将深入探讨Tigase服务器...