`
cuker919
  • 浏览: 100874 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

xsocket源码解读

 
阅读更多

关于xsocket可见于 我的另外一篇文章http://www.blogjava.net/freeman1984/archive/2011/04/25/302706.html,或者查看官网http://xsocket.org/
当然阅读xsocket需要一些线程,nio,niosocket,和java.util.concurrent(锁,线程池等)包的一些知识。要不读起来很费劲,建议先去了解下这些知识。可以在我的文章分类concurrent里面有一些,其他去网上找找。
本文只读了一个主要的流程,对于一些其他的代码例如:ssl相关,读数据相关没有涉及,看有时间能补上。
首先xsocket的几个关键的类
Server: 服务器端初始化线程池创建IoAcceptor
IoAcceptor:采用while循环接收客户端连接,并创建IoSocketDispatcher和IoChainableHandler
IoSocketDispatcher:负责注册SelectionKey以及事件的分发,并交给IoChainableHandler处理,通过一个while循环来处理注册的SelectionKey事件。
IHandler:事件处理,数据的读写等等。
INonBlockingConnection客户端接口。
(1)首先看下Server创建:
构造方法

protectedServer(InetSocketAddressaddress,Map<String,Object>options,IHandlerhandler,SSLContextsslContext,
booleansslOn,intbacklog,intminPoolsize,intmaxPoolsize,inttaskqueueSize)
这个方法主要是初始化线程池,构件acceptor
defaultWorkerPool
=newWorkerPool(minPoolsize,maxPoolsize,taskqueueSize);
workerpool
=defaultWorkerPool;

if(sslContext!=null){//是否使用ssl
acceptor=ConnectionUtils.getIoProvider().createAcceptor(newLifeCycleHandler(),address,backlog,options,sslContext,sslOn);

}
else{
acceptor
=ConnectionUtils.getIoProvider().createAcceptor(newLifeCycleHandler(),address,backlog,options);
}



其中线程池:使用jdk1.5以后的ThreadPoolExecutor,线程池最小默认2,最大100,QUEUE的大小默认也是100
线程池最小:MIN_SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolMinSize", "2"));
SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolSize", "100"));
TASK_QUEUE_SIZE = Integer.parseInt(System.getProperty("org.xsocket.connection.server.taskqueuesize", Integer.toString(SIZE_WORKER_POOL)));

(2)构件acceptor细节,最后server启动的时候会启动acceptor监听客户端

publicIoAcceptor(IIoAcceptorCallbackcallback,InetSocketAddressaddress,intbacklog,SSLContextsslContext,booleansslOn,booleanisReuseAddress)throwsIOException{
.
serverChannel
=ServerSocketChannel.open();

serverChannel.configureBlocking(
true);
serverChannel.socket().setSoTimeout(
0);//acceptmethodnevertimesout
serverChannel.socket().setReuseAddress(isReuseAddress);
.
serverChannel.socket().bind(address,backlog);
dispatcherPool
=newIoSocketDispatcherPool("Srv"+getLocalPort(),IoProvider.getServerDispatcherInitialSize());
.
}


(3)启动server
server.start();
服务的启动用了一个单独的线程,这里面使用到了CountDownLatch可参见另外一篇关于CountDownLatch用法的文章:
http://www.blogjava.net/freeman1984/archive/2011/07/04/353654.html
使用CountDownLatch来控制server的启动时间,操作多少时间为启动就,默认是60秒,这里就不讲CountDownLatch的代码了
整个启动的方法如下,能看懂CountDownLatch的用法基本上就理解了。

publicstaticvoidstart(IServerserver,inttimeoutSec)throwsSocketTimeoutException{


//startserverwithinadedicatedthread
Threadt=newThread(server);
t.setName(
"xServer");
t.start();
//请看下面的run方法分析

}


看看他的server线程的run方法:

publicvoidrun(){

acceptor.listen();
//启动前面创建的acceptor开始监听客户端连接

}



接着查看listen()方法:

publicvoidlisten()throwsIOException{
callback.onConnected();
//通知server已经启动
accept();//接受客户端连接
}

}



查看accept();很明了,使用一个while循环监听客户端连接,并建立可客户端相关的处理类:

while(isOpen.get()){

//blockingacceptcall
SocketChannelchannel=serverChannel.accept();

//createIoSocketHandler
//创建事件分发器
IoSocketDispatcherdispatcher=dispatcherPool.nextDispatcher();
//创建io处理
IoChainableHandlerioHandler=ConnectionUtils.getIoProvider().createIoHandler(false,dispatcher,channel,sslContext,sslOn);
//notifycallback
callback.onConnectionAccepted(ioHandler);//很关键的一个地方,会注册SelectionKey.OP_READ,此时客户端发来的消息就可以北服务端获取
}


查看callback.onConnectionAccepted(ioHandler);
此方法会初始化server端的ioHandler,查看初始化的代码:
dispatcher.register(this, SelectionKey.OP_READ);首先会注册read选择器,
如果有read事件发生dispatcher就会处理。看看dispatcher的run方法(通过一个循环来不停的处理已经注册的事件)

while(isOpen.get()){

inteventCount=selector.select(5000);

handledTasks
=performRegisterHandlerTasks();//处理事件
handledTasks+=performKeyUpdateTasks();
if(eventCount>0){
handleReadWriteKeys();
//处理读写,调用我们自己定义的hander来处理onData等事件
}

handledTasks
+=performDeregisterHandlerTasks();

}




(4)客户端
NonBlockingConnection,例如:
new NonBlockingConnection("localhost", 8090,new MyHandler() )
构造方法的主要代码:

.
SocketChannelchannel
=openSocket(localAddress,options);//实际调用:SocketChannelchannel=SocketChannel.open();

IoConnectorconnector
=getDefaultConnector();

IIoConnectorCallbackcallback
=newAsyncIoConnectorCallback(remoteAddress,channel,sslContext,isSecured,connectTimeoutMillis);
connector.connectAsync(channel,remoteAddress,connectTimeoutMillis,callback);

}



建立连接,生成IoConnector用来管理连接,然后connector开始启动,做一些初始化的工作:

其中connector.connectAsync(…方法会执行会产生一个RegisterTask任务到IoConnector,这个RegisterTask做的事情如下:
selectionKey = channel.register(selector, SelectionKey.OP_CONNECT);,也就是注册SelectionKey.OP_CONNECT
当IoConnector运行会执行这个任务:
看下他的run方法主要代码:

while(isOpen.get()){

handledTasks
=performTaskQueue();//首先运行上一步创建的RegisterTask注册SelectionKey.OP_CONNECT
inteventCount=selector.select(1000);//查看SelectionKey.OP_CONNECT事件是否已经准备好
if(eventCount>0){
handleConnect();
//如果准备好就处理连接事件
}
else{
checkForLooping(handledTasks);
}


}



handleConnect()的代码:

privatevoidhandleConnect(){
Set
<SelectionKey>selectedEventKeys=selector.selectedKeys();

Iterator
<SelectionKey>it=selectedEventKeys.iterator();
while(it.hasNext()){
SelectionKeyeventKey
=it.next();
it.remove();

RegisterTaskregisterTask
=(RegisterTask)eventKey.attachment();

if(eventKey.isValid()&&eventKey.isConnectable()){

try{
booleanisConnected=((SocketChannel)eventKey.channel()).finishConnect();//已经通讯连接
if(isConnected){
eventKey.cancel();
registerTask.callback.onConnectionEstablished();
//连接建立好就做下一步工作,注册read事件。
}


}

}

}


接着看registerTask.callback.onConnectionEstablished()
主要初始化iohander并注册read事件

privatevoidinit(IoChainableHandlerioHandler,IIoHandlerCallbackhandlerCallback)throwsIOException,SocketTimeoutException{
this.ioHandler=ioHandler;
ioHandler.init(handlerCallback);
//这个方法里面注册了read
isConnected.set(true);//这个时候通讯连接才真正建立起来了
}



继续看ioHandler.init(handlerCallback)方法:

publicvoidinit(IIoHandlerCallbackcallbackHandler)throwsIOException,SocketTimeoutException{

dispatcher.register(
this,SelectionKey.OP_READ);//注册SelectionKey.OP_READ时间,可以接受服务端的消息了
}


服务端和客户端就可以互相通信了。本文大致讲解了xsocket的代码的流程,其中讲解有误的地方请兄弟们指出,多谢!

http://www.blogjava.net/freeman1984/archive/2011/10/19/361593.html


分享到:
评论

相关推荐

    xsocket源码和socket源码,文档

    在本资源包中,包含的是"XSocket"和标准"Socket"的源码以及相关的技术文档,这对于理解和掌握这两种网络通信机制非常有帮助。下面,我们将深入探讨这两个主题。 首先,让我们来看看"Socket"。Socket是网络编程的...

    xsocket 2.5.4 源代码

    xSocket-2.5.4-sources.jar , 2.5.4版的源代码jar包,引入项目即可查看

    基于java的开发源码-NIO网络框架 xSocket.zip

    基于java的开发源码-NIO网络框架 xSocket.zip 基于java的开发源码-NIO网络框架 xSocket.zip 基于java的开发源码-NIO网络框架 xSocket.zip 基于java的开发源码-NIO网络框架 xSocket.zip 基于java的开发源码-NIO网络...

    JAVA源码NIO网络框架xSocket

    JAVA源码NIO网络框架xSocket

    java源码:NIO网络框架 xSocket.rar

    xSocket是一个基于Java NIO实现的网络通信框架,它旨在提供简单易用、性能卓越的网络编程接口,适用于开发高并发、低延迟的网络应用。 1. **Java NIO基础** Java NIO的核心组件包括通道(Channel)、缓冲区...

    基于Java的实例开发源码-NIO网络框架 xSocket.zip

    Java NIO(New IO)是Java 1.4版本引入的一个新特性,它为Java应用程序提供了非阻塞I/O操作的能力,...通过深入学习和实践xSocket源码,不仅可以提升Java NIO的技术能力,还可以为将来构建自己的网络框架打下坚实基础。

    xSocket api 2.6.6version

    xSocket api 2.6.6version

    NIO网络框架 xSocket

    NIO网络框架 xSocket

    xsocket NIO框架示例

    xsocket NIO框架示例 resources 中有相关的 资料。telnet服务测试教程。和相关jar

    xsocket.jar包

    socket通讯框架xsocket所需的jar包

    小程序 NIO网络框架 xSocket(源码).rar

    免责声明:资料部分来源于合法的互联网渠道收集和整理,部分自己学习积累成果,供大家学习参考与交流。收取的费用仅用于收集和整理资料耗费时间的酬劳。 本人尊重原创作者或出版方,资料版权归原作者或出版方所有,...

    xsocket使用指南

    xSocket 使用指南 xSocket 是一个轻量级的基于 NIO 的服务器框架,用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读写等方面。下面是 xSocket 的一些核心功能和使用指南: 核心功能 ...

    基于Java的源码-NIO网络框架 xSocket.zip

    通过研究xSocket的源码,我们可以学习到如何有效地利用Java NIO进行网络编程,如何设计高效的事件驱动架构,以及如何优化网络通信性能。这对于任何希望构建高性能、高并发网络应用的Java开发者来说,都是宝贵的实践...

    xSocket-2.8.1.jar

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。

    轻量级JAVA scoket 服务器XSOCKET

    轻量级JAVA scoket 服务器XSOCKET

    XSocket.rar

    在XSocket中,可能包含了客户端和服务器端的源码示例,帮助开发者理解和实现这两个角色之间的通信流程。 多播(Multicast)是一种网络通信模式,与传统的单播和广播不同。在多播中,一个发送者可以同时向多个接收者...

    基于Java的实例源码-NIO网络框架 xSocket.zip

    - **阅读源码**:通过分析xSocket的源码,可以了解其设计思路和实现细节,提高解决问题的能力。 - **实践项目**:参与实际项目开发,将理论知识应用于实践,提升编程技能。 6. **社区与文档**: - **官方文档**...

    android版微信源码(通过Xsocket实现)

    通过Xsocket实现的Android微信,并不是特别完善,可以在上面进行修改,界面仿照微信进行开发的,Xstream实现XML的传输,可以改成JSON,包括系统的登录,添加好友,聊天,朋友圈啊等功能。使用Activity、intent、...

    tcp协议使用xsocket的demo

    XSocket则是一个专门用于简化Java中TCP通信的库,它使得开发者能够更方便地构建基于TCP的应用程序。 SpringBoot是一个轻量级的Java框架,它简化了创建独立的、生产级别的基于Spring的应用程序。SpringBoot集成了...

    ws(websocket)例子(xsocket\xlightweb)

    在给定的标题“ws(websocket)例子(xsocket\xlightweb)”中,我们可以看出这个压缩包可能包含了两个不同的WebSocket实现示例,分别是xsocket和xlightweb。接下来,我们将详细探讨这两个库以及WebSocket的基本概念和...

Global site tag (gtag.js) - Google Analytics