`
jianfulove
  • 浏览: 120478 次
  • 性别: Icon_minigender_1
  • 来自: 湛江
社区版块
存档分类
最新评论

tigase源码分析4:packet处理

 
阅读更多

 

 这节主要讲数据包packet 的流转过程,如图大概说明packet被处理的流程,但实际上packet最终的处理者是插件,这个过程是在packet流转到SM中被分发到对它感兴趣的processor中处理的,下节将会详细说明packet被SM处理情况:

 

 

被处理的包packet,一旦被会话管理器和处理器插件(session manager and processor plugins处理完成,数据包会被摧毁。因此一个处理器将数据包转发到目的地前必须创建一个包的副本,并设置所有属性才返回处理结果。当然处理器可以生成任意数量的数据包。

所以你会看到上图显示2个用户userA 和userB之间的通信的数据包送到最终目的地前被复制两次,messagerouter可以看作为一个包的路由器,他根据packet中的to属性值选择相应的处理组件进行投递到对方的处理队列里,这个包将被下一个组件执行processPacket(packet)进行处理

 

 

 

在启动章分析到XMPPServer中执行 MessageRouter.start();开启多线程处理in out packet.

 

	public void MessageRouter.start() {
		super.start();
	}

   从以上的继承图得知道,将执行AbstractMessageReceiver.start()

	@Override
	public void AbstractMessageReceiver.start() {
		startThreads();
	}

private ArrayDeque<QueueListener>  threadsQueue=null;
private void startThreads() {
  if (threadsQueue == null) {
	threadsQueue = new ArrayDeque<QueueListener>(8);
	for (int i = 0; i < in_queues_size; i++) {
	QueueListener in_thread = new QueueListener(in_queues.get(i), QueueType.IN_QUEUE);

		in_thread.setName("in_" + i + "-" + getName());
		in_thread.start();
		threadsQueue.add(in_thread);
	}
  for (int i = 0; i < out_queues_size; i++) {
	QueueListener out_thread = new QueueListener(out_queues.get(i), QueueType
						.OUT_QUEUE);

	        out_thread.setName("out_" + i + "-" + getName());
		out_thread.start();
		threadsQueue.add(out_thread);
	}
    }    // end of if (thread == null || ! thread.isAlive())
......
}

每个MessageReceiver组件都 有多个线程分别处理各自packet,来一个简单的模型图表明

 

 

     QueueListener是AbstractMessageReceiver的内部类,所以QueueListener内部能直接访问到AbstractMessageReceiver对象的方法。由此可见,象ClientConnectionManager,S2SConnectionManager

,MessageRouter,SessionManager 这些子类都有各自的in out线程负责处理投递到他们节点上的packet,

 

 

private QueueType    type          = null;
private boolean      threadStopped = false;
private PriorityQueueAbstract<Packet> queue;
private QueueListener(PriorityQueueAbstract<Packet> q, QueueType type) {
	this.queue = q;
	this.type  = type;
	compName   = AbstractMessageReceiver.this.getName();
}

@Override
public void QueueListener.run() {
			

	Packet        packet  = null;
	Queue<Packet> results = new ArrayDeque<Packet>(2);

	while (!threadStopped) {
		try {

		packet = queue.take();//阻塞方法
		++packetCounter;
		switch (type) {
		 case IN_QUEUE :
		       long startPPT = System.currentTimeMillis();

			PacketReceiverTask task = null;

			if (packet.getTo() != null) {
			String id = packet.getTo().toString() + packet.getStanzaId();

		       task = waitingTasks.remove(id);
						}
			if (task != null) {
			 task.handleResponse(packet);
			} else {
		boolean processed = false;
	     if (packet.isCommand() && (packet.getStanzaTo() != null) && compName.equals(
                  packet.getStanzaTo().getLocalpart()) && isLocalDomain(packet
				.getStanzaTo().getDomain())) {
				processed = processScriptCommand(packet, results);
		if (processed) {
		 Packet result = null;
		  while ((result = results.poll()) != null) {
			addOutPacket(result);
		}
	 }
       }
       if (!processed && ((packet = filterPacket(packet, incoming_filters)) !=null)) {
		processPacket(packet);//执行具体实现类的处理方法
	}

	int idx = pptIdx;

	pptIdx = (pptIdx + 1) % processPacketTimings.length;

	long timing = System.currentTimeMillis() - startPPT;

	processPacketTimings[idx] = timing;
	}

	break;

     case OUT_QUEUE :

	         if ((packet = filterPacket(packet, outgoing_filters)) != null) {
			processOutPacket(packet);//执行具体的实现类的处理方法
		  }

			break;
		default :
			break;
		}    // end of switch (qel.type)
		} catch (InterruptedException e) {
                                     .....
		}      // end of while (! threadStopped)
		}

 

private MessageReceiver     parent  = null;
//在初始化时,parent被赋值为MessageRouter对象。
public void AbstractMessageReceiver.processOutPacket(Packet packet) {
		if (parent != null) {
		  parent.addPacket(packet);//过渡到MessageRouter对象进行处理
		} else {
		 addPacketNB(packet);
		}    // end of else
	}

 

 

 

 父类实现默认addPacketNB()添加packet到队列的方法,如果子类没有重写该方法则使用父类的这个方法加入packet到他们各自的in 或out的处理队列里

 

public boolean AbstractMessageReceiver.addPacketNB(Packet packet) {
   int queueIdx = Math.abs(hashCodeForPacket(packet) % in_queues_size);//得到一个hash值
   //根据那个hash值加入到对应的in队列里,run()里会监听到阻塞队列有处理packet,则处理之
   boolean result = in_queues.get(queueIdx).offer(packet, packet.getPriority().ordinal());
 
		if (result) {
			++statReceivedPacketsOk;
		} else {

			// Queue overflow!
			++statReceivedPacketsEr;
		}

		return result;
	}

 

   同上,只不过这个是放入out处理队列,由out线程监听处理之

	protected boolean AbstractMessageReceiver.addOutPacketNB(Packet packet) {
		int queueIdx = Math.abs(hashCodeForPacket(packet) % out_queues_size);
		boolean result = false;
        //放到相应的out队列里等待处理
	result = out_queues.get(queueIdx).offer(packet, packet.getPriority().ordinal());
		if (result) {
			++statSentPacketsOk;
		} else {

			++statSentPacketsEr;

		}

		return result;
	}

 

 

 当我们知道如果要用哪一个AbstractMessageReceiver的实现类来处理packet的时候,我只需要把packet投递到对应的实现类(如SessionManager)里的in_queue或out_queue里就可以了,因为这些实现类开启了多条in 和out线程在等待处理in out队列里的packet,所以当客户端和服务器端建立连接后,发来有效的packet,而这个packet被投递的入口要从下面分析说起了。

 

 

public void ConnectionManager.accept(SocketChannel sc) {
IO serv = getXMPPIOServiceInstance(); //每一个接受到的新socket都有一个与之对应的ioservice
//这个ioservice设置监听器ConnectionManager.this可能是(ClientConnectionManager,
//S2SConnectionManager,BoshConnectionManager,WebSocketClientConnectionManager)的对象,主要是看是这些客户端socket是从哪个服务端socket对象监听的端口进来的
serv.setIOServiceListener(ConnectionManager.this);
}

//前面ConnectionOpenThread分析的章节已经详细分析过了,当有数据包来的时间,客户端socket是由服务端创建的一个对应的IoService作为处理类的。在SocketThread中监听到的socket有可以处理的数据时completionService.submit(serv);用线程池里的线程来执行IOService.call()方法,开始进入数据处理
private IOServiceListener<IOService<RefObject>> serviceListener  = null;
public IOService<?> IOService.call() throws IOException {
		writeData(null);

		boolean readLock = true;

		if (stopping) {
			stop();
		} else {
			readLock = readInProgress.tryLock();
			if (readLock) {
				try {
					processSocketData();//执行具体的子类的处理数据方法
                                                           该方法包括解析的数据封装成packet
				if ((receivedPackets() > 0) && (serviceListener != null)) {
			           //由前面分析可知道执行ConnectionManager子类的方法开始处                                     理数据包packet
                                    serviceListener.packetsReady(this);
					}    // end of if (receivedPackets.size() > 0)
				} finally {
					readInProgress.unlock();
					if (!isConnected()) {

						forceStop();
					}
				}
			}
		}

		return readLock
				? this
				: null;
	}

 

 

public void ConnectionManager.packetsReady(IO serv) throws IOException {

		if (checkTrafficLimits(serv)) {
                //processSocketData(serv)是读入方向的packet处理入口
                //writePacketsToSocket 是写出方向的packet处理方法
		writePacketsToSocket(serv, processSocketData(serv));
		}
	}

 processSocketData可能的继承实现结构,由图可见,执行processSocketData(IO serv)由继承ConnectionManager的子类的重写方法。

 

 

 

//拿实现类ClientConnectionManager来分析
public Queue<Packet> ClientConnectionManager.processSocketData(XMPPIOService<Object> serv) {

		JID id = serv.getConnectionId();

                 Packet p = null;

		while ((p = serv.getReceivedPackets().poll()) != null) {
               if (p.getAttributeStaticStr(Packet.XMLNS_ATT) == null) {
				p.setXMLNS(XMLNS);
			}
			

			if (p.getStanzaFrom() != null) {
				p.initVars(null, p.getStanzaTo());
			}
                        //设置包的from值
			p.setPacketFrom(id);

			JID receiver = serv.getDataReceiver();
                        //设置包的to值
			if (receiver != null) {
		           p.setPacketTo(serv.getDataReceiver());
                        //投递到(如ClientConnectionManager类)中的out_queue队列中,
                          由run()处理,这个包直到处理完成经过多次投递,
			   addOutPacket(p);
			} else {

			}

		}    // end of while ()

		return null;
	}

      //投递packet到相应的out_queue列队中
      protected boolean addOutPacket(Packet packet) {
	int queueIdx = Math.abs(hashCodeForPacket(packet) % out_queues_size);

	try {
	    out_queues.get(queueIdx).put(packet, packet.getPriority().ordinal());
		++statSentPacketsOk;
	 } catch (InterruptedException e) {
		..
	 }    // end of try-catch

		return true;
	}

 

  • 大小: 497.9 KB
  • 大小: 231.2 KB
  • 大小: 10.1 KB
  • 大小: 241.3 KB
  • 大小: 244.1 KB
分享到:
评论

相关推荐

    tigase-utils-3.5.1.jar

    tigase相关jar包

    tigase-web-chat

    在描述中提到“官方下载真实有效”,这意味着你可以从Tigase项目的官方网站获取到这个Web Chat的源码或者预编译的版本,确保软件的安全性和可靠性。官方提供的软件通常会经过严格的测试和更新,能够避免第三方源可能...

    tigase-server-tigase-server-8.0.0.zip 源码

    4. **多线程处理**:由于实时通信的特性,Tigase服务器需要高效地处理并发连接,因此在设计时考虑了多线程和异步处理,以保证性能。 5. **安全机制**:Tigase支持SSL/TLS加密,确保通信的安全性。同时,它还提供了...

    tigase 内部处理流程

    ### Tigase内部处理流程详解 #### 一、引言 Tigase是一款开源的XMPP服务器,被广泛应用于即时通信领域。本文旨在深入探讨Tigase的内部处理流程,特别是其核心组件及其交互机制,帮助读者更好地理解Tigase的工作原理...

    tigase-server:高度优化,高度模块化和非常灵活的XMPPJabber服务器

    Tigase XMPP服务器是高度优化,高度模块化且非常灵活的用Java编写的XMPP / Jabber服务器。 该存储库包含Tigase XMPP服务器主要部分的源代码。 该项目自2004年成立以来,我们最近已将其移至GitHub。 与XMPP相关的...

    基于tigase的独立IM系统.zip

    2. **Tigase API**:学习Tigase提供的API,如SessionManager、PacketFilter等。 3. **插件开发**:编写Java代码实现特定功能,如自定义业务逻辑、新的数据存储策略等。 4. **测试与调试**:使用Tigase的内置日志系统...

    XMPP_tigase_IM服务部署安装

    - **Tigase XML Tools**:用于处理XML数据的工具集,包括解析器等。 - **Tigase Utils**:一个共享库,包含了多个Tigase子项目中常用的功能。 - **JAXMPP**:一个客户端库,用于实现基于Tigase服务器的应用程序。 - ...

    tigase http-api 源码部署

    源码部署对于想要深入了解Tigase内部工作原理的开发者来说是一个很好的选择,因为它允许开发者查看、修改甚至重新构建源代码。 首先,需要更改项目的pom.xml文件,添加对gmaven的运行时环境的支持,这是因为在...

    xmpp之java服务端实现tigase整合项目源代码

    该资源是整合了tigase的java服务端源代码,环境为:idea + gradle + postgresql 注意,这部分项目只包括java源代码,而数据库备份将在下一个资源打包上传,有疑问请阅读相关博文: ...

    tigase快速配置

    4. **启动Tigase**:在命令行中输入`tigase-server`以启动服务器。首次启动时,Tigase会生成默认配置文件`tigase.conf`。 5. **配置Tigase**:打开`tigase.conf`文件,根据您的需求进行配置。主要的配置项包括: -...

    tigase组件

    6. **text**:这个可能是文本处理或解析相关的模块,可能包含了对XMPP消息中的文本内容进行处理的逻辑,比如编码解码、格式转换等。 7. **tigase-pubsub**:发布订阅(PubSub)是XMPP的一项扩展,允许用户和应用...

    tigase-local

    4. **启动**:在解压后的目录中找到`bin`文件夹,运行相应的启动脚本(例如`./tigase.sh`或`tigase.bat`)。 **三、Tigase配置** 1. **配置文件**:主要的配置文件是`tigase.conf`,位于`conf`目录下。这里可以...

    Tigase Server 7.0.1 源代码

    4. **存储层**:Tigase Server支持多种数据库后端,如MySQL、PostgreSQL等,用于存储用户信息、会话状态等数据。源代码中的数据库访问层确保了与各种数据库系统的兼容性。 5. **安全层**:为了保障通信安全,Tigase...

    tigase-swift:Tigase Swift XMPP客户端库

    它提供了XMPP标准核心的实现并处理XML。 此外,它还提供了对许多流行扩展(XEP)的支持。 该存储库包含该库的源文件。特征Tigase Swift实现了对和。 此外,它还支持许多流行的XEP。 以下是一些受支持的XEP的列表:-...

    tigase-server 组件

    1. **安装与配置**:Tigase服务器的安装通常涉及解压源码、编译、配置服务器文件(如`tigase.conf`),以及设置监听端口、认证方式、虚拟主机等参数。 2. **XML解析**:由于Tigase依赖tigase-xml处理XML,开发者应...

    tigase-server-8.0.0-b10083-dist-max.zip

    4. **创建数据库**:Tigase服务器通常使用MySQL或PostgreSQL作为后端存储。你需要预先创建数据库并设置相应的用户权限。 5. **初始化数据库**:运行Tigase服务器的初始化脚本,将数据库结构导入到你创建的数据库中...

    tigase开发指南.pdf

    tigase开发指南.pdf tigase开发指南.pdf是一份关于tigase服务器的开发指南,旨在帮助开发者更好地理解tigase服务器的组件、插件、配置和数据库存储。 tigase组件: tigase服务器在启动时会初始化并启动相关组件和...

    tigase monitor配置

    Tigase Monitor配置详解 Tigase Monitor是一款用于监控Tigase XMPP服务器性能和状态的工具,特别适用于Tigase 7.0版本。本文将详细介绍如何在Tigase 7.0上配置Monitor模块,包括客户端和服务器部分。 首先,确保你...

    tigase-server.7.0.2

    4. 监控和日志:增强了监控和日志记录功能,便于管理员进行故障排查和性能分析,确保服务器的正常运行。 5. API和插件:提供丰富的API接口,允许开发者创建自定义插件以扩展服务器功能,如用户认证、存储后端、...

    tigase 集群设置

    4. 集群连接参数:参数 "--cluster-connect-all=true" 表示集群中的所有节点都将相互连接,这允许集群中的任何节点都能处理客户端请求和资源的分发。 5. 管理员和虚拟主机设置:集群需要配置管理员账号和虚拟主机...

Global site tag (gtag.js) - Google Analytics