`
jianfulove
  • 浏览: 120041 次
  • 性别: Icon_minigender_1
  • 来自: 湛江
社区版块
存档分类
最新评论

tigase源码分析2:ConnectionOpenThread 处理服务端socket的线程

阅读更多

 

 

 

  一、    ConnectionOpenThread 使用单例模式,他是负责建立服务端SOCKET和 接收连接客户端socket 线程。

在初始化ConnectionManager的时候ConnectionManager.connectThread 属性所引用的服务端SOCKET连接线程ConnectionOpenThread 就被初始化了

ConnectionManager::
private static ConnectionOpenThread connectThread = ConnectionOpenThread.getInstance();

 

 

ConnectionOpenThread .getInstance()的实现  

private Selector  selector= null;

public static ConnectionOpenThread getInstance() {

		if (acceptThread == null) {
			acceptThread = new ConnectionOpenThread();

			Thread thrd = new Thread(acceptThread);

			thrd.setName("ConnectionOpenThread");
			thrd.start(); //启动ConnectionOpenThread线程,则this.run()方法将被被执行
			if (log.isLoggable(Level.FINER)) {
				log.finer("ConnectionOpenThread started.");
			}
		}    // end of if (acceptThread == null)

		return acceptThread;
	}


private ConnectionOpenThread() {
   .......
		try {
			selector = Selector.open();//得到一个选择器,可以去了解下nio api
		} catch (Exception e) {
			log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);
			stopping = true;
		}    // end of try-catch
	}

 

 

 ConnectionOpenThread .run()的实现  

在该方法中,selector管理的都是服务端SOCKET

public void run() {
		while (!stopping) {
			try {
				selector.select();
                  //此方法为阻塞方法,当选择器管理channel(也就是向selector注册的channel)                    中发生读、写或异常事件时,select()将会被触发会往下执行

				// Set<SelectionKey> selected_keys = selector.selectedKeys();
				// for (SelectionKey sk : selected_keys) {
                               //返回已此通道已准备就绪的键集,已选择始终是键集的一个子集。
                //begin iterator
		for (Iterator i = selector.selectedKeys().iterator(); i.hasNext(); ) {
					SelectionKey sk = (SelectionKey) i.next();

					i.remove();

					SocketChannel sc        = null;
					boolean       throttled = false;
					int           port_no   = 0;

				if ((sk.readyOps() & SelectionKey.OP_ACCEPT) != 0) {                     //在此是否为被动SOCKET也就是服务端SOCKET,是则接受客户端socket
			ServerSocketChannel nextReady = (ServerSocketChannel) sk.channel();
					port_no = nextReady.socket().getLocalPort();
					sc = nextReady.accept();//得到一个客户端SOCKET
					...
				}    // end of if (sk.readyOps() & SelectionKey.OP_ACCEPT)
			 if ((sk.readyOps() & SelectionKey.OP_CONNECT) != 0) {
				sk.cancel();  // 从Selector中删除指定的SelectionKey  
                                //所以这个普通的conect socket只会处理一次侦听到的发生事件
				sc = (SocketChannel) sk.channel();//得到connect SOCKET

				}    // end of if (sk.readyOps() & SelectionKey.OP_ACCEPT)
					if (sc != null) { //设置接收到的SOCKET的一些信息
						try {
				sc.configureBlocking(false);//将客户端通道设置为非阻塞
							sc.socket().setSoLinger(false, 0);
							sc.socket().setReuseAddress(true);
		
                                              //每个ServerSocketChannel在创建注册到selector                                              时就被绑定了一个ConnectionOpenListener对象,                                               用这个对象来处理该接受到的socket,该注册过程                                               在addAllWaiting()中进行
		    ConnectionOpenListener al = (ConnectionOpenListener) sk.attachment();

			sc.socket().setTrafficClass(al.getTrafficClass());
		        sc.socket().setReceiveBufferSize(al.getReceiveBufferSize());
		       al.accept(sc);//此方法 为建立连接socket的进行后续处理的设定
			} catch (java.net.SocketException e) {

		ConnectionOpenListener al = (ConnectionOpenListener) sk.attachment();
                  	al.accept(sc);
						}
					} else {
						log.log(Level.INFO,
								"Can not obtain socket channel from selection key, throttling activated = {0}, for port: {1}",
								new Object[] { throttled, port_no });
					}    // end of if (sc != null) else
					++accept_counter;
				}
              //end of iterator
	 addAllWaiting();//加载要注册到selector中的ServerSocketChannel或connect socket
			} catch (IOException e) {
				log.log(Level.SEVERE, "Server I/O error.", e);

				// stopping = true;
			}        // end of catch
					catch (Exception e) {
				log.log(Level.SEVERE, "Other service exception.", e);

				// stopping = true;
			}        // end of catch
		}
	}

 

 

 在说解到addAllWaiting();加载要注册到selector中的ServerSocketChannel时,先看下源码:

在waiting队列中如果有等待处理的ConnectionOpenListener对象,则创建一个对应的ServerSocketChannel

private void addAllWaiting() throws IOException {
		ConnectionOpenListener al = null;

		while ((al = waiting.poll()) != null) {
			try {
				addPort(al);//绑定相关的端口进行监听
			} catch (Exception e) {
				log.log(Level.WARNING, "Error: creating connection for: " + al, e);
				al.accept(null);
			}    // end of try-catch
		}      // end of for ()
	}




private void addPort(ConnectionOpenListener al) throws IOException {
	if ((al.getConnectionType() == ConnectionType.connect) && (al.getRemoteAddress() !=
				null)) {
     addISA(al.getRemoteAddress(), al);
	} else if ((al.getIfcs() == null) || (al.getIfcs().length == 0) || al.getIfcs()[0]
				.equals("ifc") || al.getIfcs()[0].equals("*")) {
   addISA(new InetSocketAddress(al.getPort()), al);//绑定到InetSocketAddress进行监听服务
	} else {
			for (String ifc : al.getIfcs()) {
				addISA(new InetSocketAddress(ifc, al.getPort()), al);
			}    // end of for ()
	}      // end of if (ip == null || ip.equals("")) else
	}


  //addISA(..)这才是真正创建ServerSocketChannel方法,绑定到服务器某一个端口上进行监听服务,
  //开启了服务端socket
  private void addISA(InetSocketAddress isa, ConnectionOpenListener al)throws IOException {
		switch (al.getConnectionType()) {
		case accept :
			...
	ServerSocketChannel ssc = ServerSocketChannel.open();
	ssc.socket().setReceiveBufferSize(al.getReceiveBufferSize());
        ssc.configureBlocking(false);//服务端socket也是非阻塞方法
	ssc.socket().bind(isa, (int) (port_throttling)); //绑定到相关地址的某一个端口上
	ssc.register(selector, SelectionKey.OP_ACCEPT, al);//注册服务端socket到selector中,        并且附带绑定一个ConnectionOpenListener对象,该对象为服务端socket接收到新来的socket         进行后续处理。所以selector能监听这些已注册socket的事件发生

			break;

		case connect :
		...
                      //服务器socket之间要进行通讯,则先要连接
			SocketChannel sc = SocketChannel.open();
			sc.socket().setReceiveBufferSize(al.getReceiveBufferSize());
			sc.socket().setTrafficClass(al.getTrafficClass());
			sc.configureBlocking(false);
			sc.connect(isa);
	  sc.register(selector, SelectionKey.OP_CONNECT, al);
          //在此也注册到ConnectionOpenThread.selector中

			break;

		default :
			..
			break;
		}    
	}

 

   二、从以上addAllWaiting();分析中看到处理的都是waiting队列里的ConnectionOpenListener对象,那这个ConnectionOpenListener对象是什么时候就会被放到waiting队列的呢,这得从ConnectionManager.initializationCompleted()中说起,在启动章节中分析到MessageRouter.setProperties(map)负责加载了其它的组件最后对每一个组件都执行了初始化完成动作。从而ConnectionManager.initializationCompleted()将会被执行

 

MessageRouter::
for (ServerComponent comp : components.values()) {
		comp.initializationCompleted();
		}
 

 

 ConnectionManager.initializationCompleted()源码如下

 

public void initializationCompleted() {
		if (isInitializationComplete()) {
			// Do we really need to do this again?
			return;
		}
		super.initializationCompleted();
		initializationCompleted = true;
            //加载组件中的服务配置
	    for (Map<String, Object> params : waitingTasks) {
       //启动一个定时任务,设置准备加入ConnectionOpenThread.waiting的ConnectionListener对象
		 reconnectService(params, connectionDelay);
		}
		waitingTasks.clear();
		if ( null != watchdog ){
			watchdog.start();
		}
	}
 
 2.1 也许看到上面的waitingTask你在想他是什么样的配置信息呢,其实他就是启动服务器监听端口的配置,系统 默认的有如下几种,bosh,c2s,s2s,ws2s组件的服务器配置,waitingTask装的是每一个组件的端口配置信息
c2s/connections/5222/type[S]=accept
c2s/connections/5222/socket[S]=plain
c2s/connections/5222/ifc[s]=*
c2s/connections/5222/remote-host[S]=localhost
c2s/connections/5222/connections/tls/required[B]=false
c2s/connections/5223/type[S]=accept
c2s/connections/5223/socket[S]=ssl
c2s/connections/5223/ifc[s]=*
c2s/connections/5223/remote-host[S]=localhost
c2s/connections/5223/connections/tls/required[B]=false
c2s/connections/ports[i]=5222, 5223

bosh/connections/5280/type[S]=accept
bosh/connections/5280/socket[S]=plain
bosh/connections/5280/ifc[s]=*
bosh/connections/5280/remote-host[S]=localhost
bosh/connections/5280/connections/tls/required[B]=false
bosh/connections/ports[i]=5280

s2s/connections/5269/type[S]=accept
s2s/connections/5269/socket[S]=plain
s2s/connections/5269/ifc[s]=*
s2s/connections/5269/remote-host[S]=localhost
s2s/connections/5269/connections/tls/required[B]=false
s2s/connections/ports[i]=5269

ws2s/connections/5290/type[S]=accept
ws2s/connections/5290/socket[S]=plain
ws2s/connections/5290/ifc[s]=*
ws2s/connections/5290/remote-host[S]=localhost
ws2s/connections/5290/connections/tls/required[B]=false
ws2s/connections/ports[i]=5290
 
 
 
    2.1.1 ConnectionManager.setProperties(.)中对上面的配置信息作了解析,放到map里保存着
ConnectionManager.setProperties(Map<String, Object> props){
...
for (int i = 0; i < ports.length; i++) {
addWaitingTask(port_props);
}
}

//conn信息加入waitingTasks队列
protected void addWaitingTask(Map<String, Object> conn) {
		if (initializationCompleted) {
			reconnectService(conn, connectionDelay);
		} else {
			waitingTasks.add(conn);
		}
	}
 
 

 2.2  然后回来ConnectionManager对象继续分析ConnectionOpenThread.waiting队列是怎么样增加了数据的。

 

 ConnectionManager::

 //启动一个定时任务,设置准备加入ConnectionOpenThread.waiting的ConnectionListener对象
private void reconnectService(final Map<String, Object> port_props, long delay) {
		...
		addTimerTask(new tigase.util.TimerTask() {
			@Override
			public void run() {
		  startService(port_props);
			}
		}, delay);
	}


   //ConnectionOpenThread是单例模式,在此该对象只被初始化一次
    private static ConnectionOpenThread connectThread = ConnectionOpenThread.getInstance();

   private void startService(Map<String, Object> port_props) {
		if (port_props == null) {
			throw new NullPointerException("port_props cannot be null.");
		}
               //根据组件的配置信息生成一个相关的ConnectionListener对象
		ConnectionListenerImpl cli = new ConnectionListenerImpl(port_props);

		if (cli.getConnectionType() == ConnectionType.accept) {
			pending_open.add(cli);
		}
                //将ConnectionListener对象加入ConnectionOpenThread.waiting队列中
		connectThread.addConnectionOpenListener(cli);
	}
 

 

  最后在ConnectionOpenThread中可以看到ConnectionListener是怎么样加入waiting队列的了

 ConnectionOpenThread::
   public void addConnectionOpenListener(ConnectionOpenListener al) {
		waiting.offer(al);
		selector.wakeup();
	}
 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    tigase-utils-3.5.1.jar

    tigase相关jar包

    xmpp之java服务端实现tigase整合项目源代码

    该资源是整合了tigase的java服务端源代码,环境为:idea + gradle + postgresql 注意,这部分项目只包括java源代码,而数据库备份将在下一个资源打包上传,有疑问请阅读相关博文: ...

    tigase-web-chat

    在描述中提到“官方下载真实有效”,这意味着你可以从Tigase项目的官方网站获取到这个Web Chat的源码或者预编译的版本,确保软件的安全性和可靠性。官方提供的软件通常会经过严格的测试和更新,能够避免第三方源可能...

    tigase-server-tigase-server-8.0.0.zip 源码

    4. **多线程处理**:由于实时通信的特性,Tigase服务器需要高效地处理并发连接,因此在设计时考虑了多线程和异步处理,以保证性能。 5. **安全机制**:Tigase支持SSL/TLS加密,确保通信的安全性。同时,它还提供了...

    tigase 内部处理流程

    ### Tigase内部处理流程详解 #### 一、引言 Tigase是一款开源的XMPP服务器,被广泛应用于即时通信领域。本文旨在深入探讨Tigase的内部处理流程,特别是其核心组件及其交互机制,帮助读者更好地理解Tigase的工作原理...

    tigase-server:高度优化,高度模块化和非常灵活的XMPPJabber服务器

    Tigase XMPP服务器是高度优化,高度模块化且非常灵活的用Java编写的XMPP / Jabber服务器。 该存储库包含Tigase XMPP服务器主要部分的源代码。 该项目自2004年成立以来,我们最近已将其移至GitHub。 与XMPP相关的...

    XMPP_tigase_IM服务部署安装

    - **Tigase XML Tools**:用于处理XML数据的工具集,包括解析器等。 - **Tigase Utils**:一个共享库,包含了多个Tigase子项目中常用的功能。 - **JAXMPP**:一个客户端库,用于实现基于Tigase服务器的应用程序。 - ...

    tigase开发指南.pdf

    * S2s:tigase服务端节点之间交互 * Sess-man:会话管理 tigase插件: tigase服务器也支持插件,插件可以扩展服务器的功能。 以下是tigase服务器的插件列表: * jabber:iq:register:注册服务 * message-archive...

    基于tigase的独立IM系统.zip

    2. **Tigase API**:学习Tigase提供的API,如SessionManager、PacketFilter等。 3. **插件开发**:编写Java代码实现特定功能,如自定义业务逻辑、新的数据存储策略等。 4. **测试与调试**:使用Tigase的内置日志系统...

    tigase http-api 源码部署

    源码部署对于想要深入了解Tigase内部工作原理的开发者来说是一个很好的选择,因为它允许开发者查看、修改甚至重新构建源代码。 首先,需要更改项目的pom.xml文件,添加对gmaven的运行时环境的支持,这是因为在...

    tigase快速配置

    2. Git:用于从官方仓库克隆Tigase源代码,虽然我们这里提到的是快速部署,但了解如何获取最新源码总是有好处的。 接下来,我们将详细介绍快速配置和部署Tigase的步骤: 1. **下载Tigase**:您可以从Tigase官方...

    Tigase 概述

    同时,Tigase 的设计也充分考虑到容错能力,自动处理错误,保证应用尽可能的不 down 掉。 RFC 的实现 Tigase 实现了 RFC-3920 和 RFC-3921 两个核心协议,分别定义了 XML 流、基本 XMPP 架构、传输控制协议、安全...

    tigase组件

    2. **socks5**:SOCKS5是一种网络协议,Tigase实现了该协议以提供代理服务。这使得客户端可以通过代理服务器连接到XMPP网络,增强了网络连接的灵活性和安全性。 3. **stun**:STUN(Simple Traversal of UDP ...

    tigase-swift:Tigase Swift XMPP客户端库

    Tigase Swift XMPP客户端库这是什么Tigase Swift XMPP客户端库是用编程语言编写的客户端库。 它提供了XMPP标准核心的实现并处理XML。 此外,它还提供了对许多流行扩展(XEP)的支持。 该存储库包含该库的源文件。...

    tigase-local

    2. **下载**:访问Tigase官方网站下载最新版本的Tigase服务器软件包。 3. **解压**:下载的文件通常为`.tar.gz`或`.zip`格式,解压到你希望安装的目录下。 4. **启动**:在解压后的目录中找到`bin`文件夹,运行...

    Tigase Server 7.0.1 源代码

    《Tigase Server 7.0.1 源代码详解》 Tigase Server,作为一款基于Java语言开发的轻量级、可伸缩的Jabber/XMPP服务器,自诞生以来就以其高效能和高并发能力受到业界的广泛关注。在7.0.1版本中,Tigase Server继续...

    tigase-server 组件

    2. **XML解析**:由于Tigase依赖tigase-xml处理XML,开发者应熟悉基本的XML语法,理解如何将XML数据转换为XMPP协议的消息。 3. **安全与加密**:Tigase支持SSL/TLS加密,确保通信的安全性。开发者需要配置证书,并...

    QuickBlox-Tigase-CustomFeatures:www.tigase.org的QuickBlox自定义功能列表

    QuickBlox-Tigase-CustomFeatures 对于QuickBlox自定义功能的列表 tigase服务器: QBAuth-AuthRepository的自定义实现 CustomObjects插件-将聊天消息保存到QuickBlox CustomObjects模块 LastRequestAtPlugin-在...

    tigase monitor配置

    Tigase Monitor配置详解 Tigase Monitor是一款用于监控Tigase XMPP服务器性能和状态的工具,特别适用于Tigase 7.0版本。本文将详细介绍如何在Tigase 7.0上配置Monitor模块,包括客户端和服务器部分。 首先,确保你...

    tigase-server-8.0.0-b10083-dist-max.zip

    2. **解压文件**:使用`unzip`命令将下载的“tigase-server-8.0.0-b10083-dist-max.zip”解压到一个合适的目录,例如 `/opt/tigase`。 3. **配置Tigase**:进入解压后的目录,找到并编辑配置文件,如 `tigase.conf`...

Global site tag (gtag.js) - Google Analytics