SocketThread 专用于处理客户端SOCKET的读写事件的线程,当服务器端SOCKET接受到客户socket,就会生成一个与对应的IOService,IOService.socketIO指向SocketIO对象,
SocketIO是对java api中SocketChannel的封装,所以拿到IOService也就等于拿到客户端SocketChannel了。SocketThread 是一个私有类,他在第一次加载的时候,就会创建了3类线程,
socketReadThread():负责读socket的数据;
socketWriteThread():负责写入socket数据;
ResultsListener:负责监视CompletionService执行结果IOService完成情况,判断IOService中的socket连接是否关闭,如没有则继续注册入SocketThread 的Selector中进行事件侦听;
SocketThread :: private static SocketThread[] socketReadThread = null; private static SocketThread[] socketWriteThread = null; private static ThreadPoolExecutor executor = null; private static CompletionService<IOService<?>> completionService = null; //下面是实例属性 private Selector clientsSel = null; private boolean reading = false; private boolean writing = false; static { if (socketReadThread == null) { int nThreads = (cpus * DEF_MAX_THREADS_PER_CPU) / 2 + 1; executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); completionService = new ExecutorCompletionService<IOService<?>>(executor); //执行任务的线程池 socketReadThread = new SocketThread[nThreads]; //一组负责读socket的数据; socketWriteThread = new SocketThread[nThreads]; //一组负责写socket的数据; for (int i = 0; i < socketReadThread.length; i++) { socketReadThread[i] = new SocketThread("socketReadThread-" + i); socketReadThread[i].reading = true; Thread thrd = new Thread(socketReadThread[i]); thrd.setName("socketReadThread-" + i); thrd.start();//启动,会执行run() } log.log(Level.WARNING, "{0} socketReadThreads started.", socketReadThread.length); for (int i = 0; i < socketWriteThread.length; i++) { socketWriteThread[i] = new SocketThread("socketWriteThread-" + i); socketWriteThread[i].writing = true; Thread thrd = new Thread(socketWriteThread[i]); thrd.setName("socketWriteThread-" + i); thrd.start();////启动,会执行run() } log.log(Level.WARNING, "{0} socketWriteThreads started.", socketWriteThread.length); } // end of if (acceptThread == null) } //生成每一个SocketThread都会有一个对应ResultsListener线程 private SocketThread(String name) { try { clientsSel = Selector.open(); } catch (Exception e) { log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e); stopping = true; } // end of try-catch new ResultsListener("ResultsListener-" + name).start(); } public void SocketThread.run() { while ( !stopping) { try { clientsSel.select(); if (log.isLoggable(Level.FINEST)) { log.log(Level.FINEST, "Selector AWAKE: {0}", clientsSel); } //等到已选择的key,证明有数据要处理 Set<SelectionKey> selected = clientsSel.selectedKeys(); int selectedKeys = selected.size(); if ((selectedKeys == 0) && (waiting.size() == 0)) { if (log.isLoggable(Level.FINEST)) { log.finest("Selected keys = 0!!! a bug again?"); } if ((++empty_selections) > MAX_EMPTY_SELECTIONS) { recreateSelector(); } } else { empty_selections = 0; if (selectedKeys > 0) { for (SelectionKey sk : selected) { //得到ConnectionListenerImpl.accept()中绑定的ioservice IOService s = (IOService) sk.attachment(); try { ..... //下一次socket从selector监听队列中移除 sk.cancel(); forCompletion.add(s); } catch (CancelledKeyException e) { ... } } } // Clean-up cancelled keys... clientsSel.selectNow(); } //注册新的socket到selector中进行监听 addAllWaiting(); IOService serv = null; while ((serv = forCompletion.pollFirst()) != null) { //放线程沲中执行,调用了IOService.call()进行数据处理 completionService.submit(serv); } // clientsSel.selectNow(); } catch (CancelledKeyException brokene) { 。。 } catch (IOException ioe) { 。。 } catch (Exception exe) { .. } } } //ResultsListener.run() public void ResultsListener.run() { for (;;) { try { //CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法得到的对象其实就是IOService。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。 //其实这里的设计非常巧妙,当读到要处理事件进来后,把selector中对应的socket移出,当完成socket数据处理后只要连接还开启,再次加入selector中进行监听,所以客户端可以发送一个空字符串来进行心跳处理,维持客户端和服务器进行长连接。 IOService<?> service = completionService.take().get(); if (service != null) { if (service.isConnected()) {//只要连接没关闭 addSocketService(service);//就再次注册到线程的Selector中 ............ } } }
相关推荐
tigase相关jar包
在描述中提到“官方下载真实有效”,这意味着你可以从Tigase项目的官方网站获取到这个Web Chat的源码或者预编译的版本,确保软件的安全性和可靠性。官方提供的软件通常会经过严格的测试和更新,能够避免第三方源可能...
Tigase XMPP服务器是高度优化,高度模块化且非常灵活的用Java编写的XMPP / Jabber服务器。 该存储库包含Tigase XMPP服务器主要部分的源代码。 该项目自2004年成立以来,我们最近已将其移至GitHub。 与XMPP相关的...
3. **模块化设计**:Tigase服务器采用模块化架构,每个功能模块可以独立开发和测试,增强了系统的可维护性和可扩展性。开发者可以根据需要选择或编写不同的模块。 4. **多线程处理**:由于实时通信的特性,Tigase...
源码部署对于想要深入了解Tigase内部工作原理的开发者来说是一个很好的选择,因为它允许开发者查看、修改甚至重新构建源代码。 首先,需要更改项目的pom.xml文件,添加对gmaven的运行时环境的支持,这是因为在...
该资源是整合了tigase的java服务端源代码,环境为:idea + gradle + postgresql 注意,这部分项目只包括java源代码,而数据库备份将在下一个资源打包上传,有疑问请阅读相关博文: ...
tigase 7.10 mongodb 3 配置
3. **Tigase Server安装**:Tigase提供了多种安装方式,包括Windows下的EXE安装包、JAR文件以及Linux下的压缩包。用户可以根据自己的操作系统选择合适的安装方式。 - **EXE安装**:适用于Windows环境,安装过程...
3. **设置环境变量**:为了让系统知道Tigase的安装位置,需要在环境变量中添加Tigase的bin目录。编辑`~/.bashrc`或`~/.bash_profile`(根据您的shell类型),添加如下行: ``` export PATH=$PATH:/path/to/tigase/...
**基于Tigase的独立IM系统** Tigase是一款开源的、基于Java的即时通讯(Instant Messaging, IM)服务器,可以用于构建独立的、高度可扩展的IM网络。该系统支持XMPP(Extensible Messaging and Presence Protocol)...
【Tigase局部部署详解】 Tigase是一款开源的、基于XMPP协议的即时通讯服务器。"tigase-local"这个主题显然与在本地环境中安装和配置Tigase服务器有关。下面将详细介绍Tigase服务器的基本概念、安装过程、配置步骤...
Tigase 概述 Tigase 是一个功能强大且灵活的 XMPP 服务器,提供了许多出色的特性和功能,以下是其主要特点和实现的 XMPP 扩展协议: 为什么选择 Tigase Tigase 完全实现了 XMPP 协议,除了全面实施的两个核心协议...
Tigase是一款开源的XMPP服务器,用于实现即时通讯和在线状态服务。它采用Java语言编写,具有跨平台性,并且支持多种扩展协议,如多用户聊天(MUC)、消息存档(Message Archiving)和发布订阅(PubSub)等。在你提供...
Tigase Swift XMPP客户端库这是什么Tigase Swift XMPP客户端库是用编程语言编写的客户端库。 它提供了XMPP标准核心的实现并处理XML。 此外,它还提供了对许多流行扩展(XEP)的支持。 该存储库包含该库的源文件。...
3. **配置Tigase**:进入解压后的目录,找到并编辑配置文件,如 `tigase.conf`。这里你需要设置服务器的主机名、端口号、管理员账号、数据库连接信息等。配置文件可能包含XML格式的配置项,需要根据实际情况进行修改...
3. 配置Tigase服务器的JMX访问权限。在`/etc/jmx.password`文件中,创建或更新与`monitor.properties`中配置的JMX账户相同的用户和密码。 完成以上步骤后,Tigase Monitor应能成功连接到Tigase服务器并开始监控。你...
通过对Tigase启动流程、消息路由器以及客户端连接管理的深入分析,我们不仅了解了Tigase的核心组件是如何工作的,还明白了它们之间如何协作以提供稳定可靠的XMPP服务。这种深入的技术解析有助于开发者更好地理解...
《Tigase Server 7.0.1 源代码详解》 Tigase Server,作为一款基于Java语言开发的轻量级、可伸缩的Jabber/XMPP服务器,自诞生以来就以其高效能和高并发能力受到业界的广泛关注。在7.0.1版本中,Tigase Server继续...
Tigase是一个用Java编写的开源XMPP服务器,它具有高性能、高度可伸缩和模块化的特点。在构建需要处理大量并发用户和高消息吞吐量的系统时,部署一个Tigase集群变得非常必要。集群模式允许将多个Tigase服务器实例组合...
QuickBlox-Tigase-CustomFeatures 对于QuickBlox自定义功能的列表 tigase服务器: QBAuth-AuthRepository的自定义实现 CustomObjects插件-将聊天消息保存到QuickBlox CustomObjects模块 LastRequestAtPlugin-在...