`
jianfulove
  • 浏览: 120475 次
  • 性别: Icon_minigender_1
  • 来自: 湛江
社区版块
存档分类
最新评论

tigase源码分析3:SocketThread

阅读更多

 

SocketThread 专用于处理客户端SOCKET的读写事件的线程,当服务器端SOCKET接受到客户socket,就会生成一个与对应的IOService,IOService.socketIO指向SocketIO对象,

SocketIO是对java api中SocketChannel的封装,所以拿到IOService也就等于拿到客户端SocketChannel了。SocketThread 是一个私有类,他在第一次加载的时候,就会创建了3类线程,

socketReadThread():负责读socket的数据;

socketWriteThread():负责写入socket数据;

ResultsListener:负责监视CompletionService执行结果IOService完成情况,判断IOService中的socket连接是否关闭,如没有则继续注册入SocketThread 的Selector中进行事件侦听

 

SocketThread ::
private static SocketThread[] socketReadThread = null;
private static SocketThread[] socketWriteThread = null;
private static ThreadPoolExecutor executor = null;
private static CompletionService<IOService<?>> completionService = null;
//下面是实例属性
private Selector clientsSel = null;
private boolean reading = false;
private boolean writing = false;

static {
		if (socketReadThread == null) {
			int nThreads = (cpus * DEF_MAX_THREADS_PER_CPU) / 2 + 1;

			executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
					new LinkedBlockingQueue<Runnable>());
			completionService = new ExecutorCompletionService<IOService<?>>(executor);
                        //执行任务的线程池
			socketReadThread = new SocketThread[nThreads]; //一组负责读socket的数据; 
                         socketWriteThread = new SocketThread[nThreads]; //一组负责写socket的数据;
			for (int i = 0; i < socketReadThread.length; i++) {
				socketReadThread[i] = new SocketThread("socketReadThread-" + i);
				socketReadThread[i].reading = true;

				Thread thrd = new Thread(socketReadThread[i]);

				thrd.setName("socketReadThread-" + i);
				thrd.start();//启动,会执行run()
			}

			log.log(Level.WARNING, "{0} socketReadThreads started.", socketReadThread.length);

			for (int i = 0; i < socketWriteThread.length; i++) {
				socketWriteThread[i] = new SocketThread("socketWriteThread-" + i);
				socketWriteThread[i].writing = true;

				Thread thrd = new Thread(socketWriteThread[i]);

				thrd.setName("socketWriteThread-" + i);
				thrd.start();////启动,会执行run()
			}

			log.log(Level.WARNING, "{0} socketWriteThreads started.", socketWriteThread.length);
		}    // end of if (acceptThread == null)
	}


       //生成每一个SocketThread都会有一个对应ResultsListener线程
	private SocketThread(String name) {
		try {
			clientsSel = Selector.open();
		} catch (Exception e) {
			log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);
			stopping = true;
		}    // end of try-catch

		new ResultsListener("ResultsListener-" + name).start();
	}



  public void SocketThread.run() {
		while ( !stopping) {
			try {
				clientsSel.select();

				if (log.isLoggable(Level.FINEST)) {
					log.log(Level.FINEST, "Selector AWAKE: {0}", clientsSel);
				}
                              //等到已选择的key,证明有数据要处理
				Set<SelectionKey> selected = clientsSel.selectedKeys();
				int selectedKeys = selected.size();

				if ((selectedKeys == 0) && (waiting.size() == 0)) {
					if (log.isLoggable(Level.FINEST)) {
						log.finest("Selected keys = 0!!! a bug again?");
					}

					if ((++empty_selections) > MAX_EMPTY_SELECTIONS) {
						recreateSelector();
					}
				} else {
					empty_selections = 0;

					if (selectedKeys > 0) {

						for (SelectionKey sk : selected) {
                                                //得到ConnectionListenerImpl.accept()中绑定的ioservice
							IOService s = (IOService) sk.attachment();

							try {
							
								.....
                                                       //下一次socket从selector监听队列中移除
								sk.cancel();
                                                           
								forCompletion.add(s);

								
							} catch (CancelledKeyException e) {
							...
							}
						}
					}

					// Clean-up cancelled keys...
					clientsSel.selectNow();
				}
                              //注册新的socket到selector中进行监听     
				addAllWaiting();

				IOService serv = null;

				while ((serv = forCompletion.pollFirst()) != null) {
                                       //放线程沲中执行,调用了IOService.call()进行数据处理
					completionService.submit(serv);
				}

				// clientsSel.selectNow();
			} catch (CancelledKeyException brokene) {

				。。
			} catch (IOException ioe) {
                           。。
			} catch (Exception exe) {
			..
			}
		}
	}



//ResultsListener.run()
    public void ResultsListener.run() {
    for (;;) {
	try {
//CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法得到的对象其实就是IOService。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。
      //其实这里的设计非常巧妙,当读到要处理事件进来后,把selector中对应的socket移出,当完成socket数据处理后只要连接还开启,再次加入selector中进行监听,所以客户端可以发送一个空字符串来进行心跳处理,维持客户端和服务器进行长连接。

	IOService<?> service = completionService.take().get();
		if (service != null) {
		if (service.isConnected()) {//只要连接没关闭
			addSocketService(service);//就再次注册到线程的Selector中
         ............
                  }
           }
        }

 

 

 

 

分享到:
评论

相关推荐

    tigase-utils-3.5.1.jar

    tigase相关jar包

    tigase-web-chat

    在描述中提到“官方下载真实有效”,这意味着你可以从Tigase项目的官方网站获取到这个Web Chat的源码或者预编译的版本,确保软件的安全性和可靠性。官方提供的软件通常会经过严格的测试和更新,能够避免第三方源可能...

    tigase-server:高度优化,高度模块化和非常灵活的XMPPJabber服务器

    Tigase XMPP服务器是高度优化,高度模块化且非常灵活的用Java编写的XMPP / Jabber服务器。 该存储库包含Tigase XMPP服务器主要部分的源代码。 该项目自2004年成立以来,我们最近已将其移至GitHub。 与XMPP相关的...

    tigase-server-tigase-server-8.0.0.zip 源码

    3. **模块化设计**:Tigase服务器采用模块化架构,每个功能模块可以独立开发和测试,增强了系统的可维护性和可扩展性。开发者可以根据需要选择或编写不同的模块。 4. **多线程处理**:由于实时通信的特性,Tigase...

    tigase http-api 源码部署

    源码部署对于想要深入了解Tigase内部工作原理的开发者来说是一个很好的选择,因为它允许开发者查看、修改甚至重新构建源代码。 首先,需要更改项目的pom.xml文件,添加对gmaven的运行时环境的支持,这是因为在...

    xmpp之java服务端实现tigase整合项目源代码

    该资源是整合了tigase的java服务端源代码,环境为:idea + gradle + postgresql 注意,这部分项目只包括java源代码,而数据库备份将在下一个资源打包上传,有疑问请阅读相关博文: ...

    tigase 7.10 mongodb 3 配置

    tigase 7.10 mongodb 3 配置

    XMPP_tigase_IM服务部署安装

    3. **Tigase Server安装**:Tigase提供了多种安装方式,包括Windows下的EXE安装包、JAR文件以及Linux下的压缩包。用户可以根据自己的操作系统选择合适的安装方式。 - **EXE安装**:适用于Windows环境,安装过程...

    tigase快速配置

    3. **设置环境变量**:为了让系统知道Tigase的安装位置,需要在环境变量中添加Tigase的bin目录。编辑`~/.bashrc`或`~/.bash_profile`(根据您的shell类型),添加如下行: ``` export PATH=$PATH:/path/to/tigase/...

    基于tigase的独立IM系统.zip

    **基于Tigase的独立IM系统** Tigase是一款开源的、基于Java的即时通讯(Instant Messaging, IM)服务器,可以用于构建独立的、高度可扩展的IM网络。该系统支持XMPP(Extensible Messaging and Presence Protocol)...

    tigase-local

    【Tigase局部部署详解】 Tigase是一款开源的、基于XMPP协议的即时通讯服务器。"tigase-local"这个主题显然与在本地环境中安装和配置Tigase服务器有关。下面将详细介绍Tigase服务器的基本概念、安装过程、配置步骤...

    tigase组件

    Tigase是一款开源的XMPP服务器,用于实现即时通讯和在线状态服务。它采用Java语言编写,具有跨平台性,并且支持多种扩展协议,如多用户聊天(MUC)、消息存档(Message Archiving)和发布订阅(PubSub)等。在你提供...

    tigase-swift:Tigase Swift XMPP客户端库

    Tigase Swift XMPP客户端库这是什么Tigase Swift XMPP客户端库是用编程语言编写的客户端库。 它提供了XMPP标准核心的实现并处理XML。 此外,它还提供了对许多流行扩展(XEP)的支持。 该存储库包含该库的源文件。...

    tigase-server-8.0.0-b10083-dist-max.zip

    3. **配置Tigase**:进入解压后的目录,找到并编辑配置文件,如 `tigase.conf`。这里你需要设置服务器的主机名、端口号、管理员账号、数据库连接信息等。配置文件可能包含XML格式的配置项,需要根据实际情况进行修改...

    tigase monitor配置

    3. 配置Tigase服务器的JMX访问权限。在`/etc/jmx.password`文件中,创建或更新与`monitor.properties`中配置的JMX账户相同的用户和密码。 完成以上步骤后,Tigase Monitor应能成功连接到Tigase服务器并开始监控。你...

    tigase 内部处理流程

    通过对Tigase启动流程、消息路由器以及客户端连接管理的深入分析,我们不仅了解了Tigase的核心组件是如何工作的,还明白了它们之间如何协作以提供稳定可靠的XMPP服务。这种深入的技术解析有助于开发者更好地理解...

    Tigase Server 7.0.1 源代码

    《Tigase Server 7.0.1 源代码详解》 Tigase Server,作为一款基于Java语言开发的轻量级、可伸缩的Jabber/XMPP服务器,自诞生以来就以其高效能和高并发能力受到业界的广泛关注。在7.0.1版本中,Tigase Server继续...

    QuickBlox-Tigase-CustomFeatures:www.tigase.org的QuickBlox自定义功能列表

    QuickBlox-Tigase-CustomFeatures 对于QuickBlox自定义功能的列表 tigase服务器: QBAuth-AuthRepository的自定义实现 CustomObjects插件-将聊天消息保存到QuickBlox CustomObjects模块 LastRequestAtPlugin-在...

    tigase-server.7.0.2

    《Tigase服务器7.0.2:一个强大的XMPP服务器详解》 Tigase服务器,作为一款开源的、跨平台的XMPP(Extensible Messaging and Presence Protocol)服务器,为全球范围内的即时通讯和在线状态服务提供了高效且可扩展...

    Tigase 概述

    Tigase 概述 Tigase 是一个功能强大且灵活的 XMPP 服务器,提供了许多出色的特性和功能,以下是其主要特点和实现的 XMPP 扩展协议: 为什么选择 Tigase Tigase 完全实现了 XMPP 协议,除了全面实施的两个核心协议...

Global site tag (gtag.js) - Google Analytics