- 浏览: 43255 次
- 性别:
- 来自: 大连
文章分类
其中主要想探讨的是一个监听连接的AcceptorReactor类,一个监听数据到达的SessionReactor类,一个服务器断主控类ServerManager,一个控制数据发送、接收、存储用户信息的Session类。
在服务器运行的时候,只有3个线程在跑,一个是main主线程,一个是监听连接的线程,一个是监听客户端数据到达的线程。当有客户端数据达时,会另开辟线程处理,处理结束后销毁该线程。
在使用的时候,需要自己写类继承ServerManager实现自己server的功能,需要写类继承Session实现自己的数据处理,在server.properties中配置服务器端口号、客户端数据编码、需要加载的Session类(也就是自己写的继承自Session的类)、发送接收数据时的数据分隔符。
其中的Reactor模式是参考网上的,具体网址已经忘记了。
AcceptorReactor类:
/** * */ package zys.net.tcp; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import zys.ThreadRunnable; /** * @author Administrator */ final class AcceptorReactor extends ThreadRunnable { private ServerManager serverManager; private Class<Session> sessionClass; private ServerSocketChannel serverSocketChannel; private Selector selector; public AcceptorReactor() { super(); } /** * */ public void run() { try { serverSocketChannel = ServerSocketChannel.open(); try { ServerSocket sSocket = serverSocketChannel.socket(); try { sSocket.bind(new InetSocketAddress(serverManager.getPort())); serverSocketChannel.configureBlocking(false); selector = Selector.open(); try { SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); serverManager.logInfo("Listener Reactor started."); querySelector(); } finally { selector.close(); } } finally { sSocket.close(); } } finally { serverSocketChannel.close(); } } catch (IOException e) { e.printStackTrace(); } } /** * @param aSelector * @throws IOException */ private void querySelector() throws IOException { ExecutorService pool = Executors.newFixedThreadPool(50); try { while (!Thread.interrupted()) { int n = selector.select(); if (n != 0) { Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) (it.next()); pool.execute((Runnable) key.attachment()); it.remove(); } } } } finally { pool.shutdown(); try { if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); } } catch (InterruptedException e) { pool.shutdownNow(); } } } class Acceptor implements Runnable { // inner public void run() { try { SocketChannel c = serverSocketChannel.accept(); if (c != null) { c.socket().setSoLinger(true, 0); serverManager.logInfo("One Session conncted."); Session session = sessionClass.newInstance(); session.setManager(serverManager); session.setSocketChannel(c); session.setConnTime(new Date()); session.setConnIP(c.socket().getInetAddress().getHostAddress()); session.setConnStatus(Session.CONN_STATUS_CONNECT); serverManager.registerSession(session); } } catch (Exception ex) { // log } } } public ServerManager getServerManager() { return serverManager; } public void setServerManager(ServerManager aServerManager) { serverManager = aServerManager; } public Class<Session> getSessionClass() { return sessionClass; } public void setSessionClass(Class<Session> aSessionClass) { sessionClass = aSessionClass; } }
其中,在run方法中注册连接监听,在querySelector中捕获连接请求,在Acceptor的run中实现对监听的处理。
SessionReactor类:
/** * */ package zys.net.tcp; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import zys.ThreadRunnable; /** * @author Administrator */ final class SessionReactor extends ThreadRunnable { private ServerManager serverManager; private Selector selector; private ArrayList<Session> preparedSessions; /** * */ public SessionReactor() { super(); preparedSessions = new ArrayList<Session>(); } /** * @param aSession * @throws IOException */ public void registerSession(Session aSession) throws IOException { synchronized (preparedSessions) { preparedSessions.add(aSession); } selector.wakeup(); } public void clearPreparedSessions() { synchronized (preparedSessions) { preparedSessions.clear(); } selector.wakeup(); } public void stop() throws Exception { super.stop(); selector.wakeup(); } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ public void run() { try { selector = Selector.open(); try { serverManager.logInfo("Session Reactor started."); querySelector(); } finally { selector.close(); } } catch (Exception e) { e.printStackTrace(); } } private void querySelector() throws Exception { ExecutorService pool = Executors.newFixedThreadPool(200); try { int preparedCount = 0; while (!Thread.interrupted()) { synchronized (preparedSessions) { preparedCount = preparedSessions.size(); if (preparedCount > 0) { Iterator<Session> sessionIt = preparedSessions.iterator(); while (sessionIt.hasNext()) { Session session = sessionIt.next(); SocketChannel channel = session.getSocketChannel(); channel.configureBlocking(false); SelectionKey skReader = channel.register(selector, SelectionKey.OP_READ); skReader.attach(new Reader(session)); serverManager.logInfo("One Session registered."); } preparedSessions.clear(); } } int n = selector.select(); if (n != 0) { Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { // dispatch((SelectionKey) (it.next())); SelectionKey key = (SelectionKey) (it.next()); pool.execute((Runnable) key.attachment()); it.remove(); } } } Iterator<SelectionKey> it = selector.keys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) (it.next()); Reader r = (Reader) (key.attachment()); key.cancel(); r.getSession().distroy(); } } finally { pool.shutdown(); try { if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); } } catch (InterruptedException e) { pool.shutdownNow(); } } } class Reader implements Runnable { // inner private Session session; public Reader(Session aSession) { session = aSession; } public void run() { if(!session.isActive()){ return; } SocketChannel channel = session.getSocketChannel(); int count; ByteBuffer buffer = null; try { synchronized(channel){ while(true){ buffer = ByteBuffer.allocate(10); count = channel.read(buffer); if (count > 0) { buffer = ByteBuffer.allocate(Integer.valueOf(new String(buffer.array(), 0, count, serverManager.getCharSet()))); count = channel.read(buffer); String sMsg = new String(buffer.array(), 0, count, serverManager.getCharSet()); serverManager.logDebugReceive(sMsg); session.onReceive(sMsg); }else{ if(count == -1){ if (session.isActive()) { session.distroy(); } } break; } } } } catch (Exception e) { e.printStackTrace(); serverManager.logError(this.getClass(), "Reader.run" , e.getMessage()); if (session.isActive()) { session.distroy(); } } } public Session getSession() { return session; } } public void setServerManager(ServerManager aServerManager) { serverManager = aServerManager; } }
其中,在registerSession中准备需要注册接收数据的Session对象,在querySelector中的synchronized (preparedSessions) {吧准备好的Session对象注册成接收数据监听,之后处理接收到数据的请求,Reader是处理接收到的数据的类。
所有代码已经上传,包zys.net.tcp中是核心类,包server中的是测试类,功能是客户端连接后每隔一秒向客户端发送字符串变量COST_PARAMS_STR的值,以实现客户端数据的事实刷新,发送格式是10位的代表数据长度的数字(比如0000000013),5位代表数据类别的数字(比如20000,代表每秒发的同步数据),紧接着是实际数据。
可以用telnet xxx.xxx.xxx.xxx 9999 测试。
所有核心代码以及测试代码已经上传,utf-8编码的。
比较担心的是当用户交互比较频繁的时候,服务器开辟线程过多,是否会引起服务器效率低下,不过经过测试在100个客户端的情况下,数据交互正常,丝毫看不出来延迟。比较失败的是zys.ThreadRunnable类,在某些地方引用了,这个类很脱裤子放屁。
欢迎批评,建议,请致zyongsheng83@163.com,谢谢!
- ServerLibSource.zip (10.5 KB)
- 下载次数: 98
发表评论
-
java - RMI一步一步学习
2007-07-31 21:34 607RMI,远程方法调用(Remote Method Invoca ... -
从追MM谈Java的23种设计模式
2007-07-31 21:37 592设计模式做为程序员的 ... -
jdk1.5新特性:
2007-08-01 20:48 11331. 范型(Generic) ArrayList< ... -
java中的String与常量池
2007-08-02 19:15 9721. 首先String不属于8种基本数据类型,String是一 ... -
java中的++运算符
2007-08-02 19:24 828int a = 0; a = a++; System.ou ... -
xml文件解析后不能删除
2007-08-03 21:52 925碰到的问题://XMLReader readerreader. ... -
变量初始化总结,防止以后忘记
2007-08-15 21:17 732在一个类中:1. 基本类型 static final - 编译 ...
相关推荐
Java TCP服务器框架是一种用于构建高性能、可扩展网络应用的核心组件,尤其适合开发需要稳定通信的分布式系统。这个框架是基于Java编程语言实现的,利用了Java的Socket编程接口来搭建服务器与客户端之间的TCP连接。...
Java中的TCP和UDP Socket通信是网络编程的基础,用于在不同设备之间建立可靠的数据传输通道。TCP(Transmission Control Protocol)提供的是面向连接、有序且无损的服务,而UDP(User Datagram Protocol)则是无连接...
TCP服务器框架是网络编程中的基础组件,用于接收和处理客户端的连接请求。在这个特定的案例中,该框架可能包括了服务器启动脚本(RunServer.bat)、客户端连接脚本(RunClient.bat)以及项目配置文件(.classpath、....
QuickServer是一个轻量级的、线程池驱动的Java TCP服务器框架。它支持SSL加密,可以处理大量的并发连接,并提供了一套灵活的事件处理机制。利用QuickServer,开发者可以通过编写少量代码快速搭建功能丰富的服务器...
本文将深入探讨如何使用Java的Netty框架实现一个基于DTU(Data Transfer Unit)的TCP服务器,该服务器具备多端口通信和多协议解析的能力。 首先,DTU是一种专门用于远程数据传输的设备,它能够通过GPRS、3G/4G等...
Mina,全称为“Apache MINA (Multipurpose Infrastructure for Network Applications)”,是由Apache软件基金会开发的开源项目,它为Java开发者提供了一个高级的网络通信框架。Mina不仅支持TCP/IP协议,还支持UDP/IP...
在Java端,Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。利用Netty,我们可以轻松实现TCP服务器,创建ServerBootstrap,配置通道处理器pipeline,监听客户端...
要查看完整的变更日志,请单击以下链接 - https://github.com/davidg95/JConn/releases ---该项目仍处于早期阶段。 请在 ... --- Java TCP 网络框架,使Java 网络应用程序的开发更容易。
3. **Socket编程**:在C#中,通过System.Net.Sockets命名空间提供的Socket类,我们可以创建TCP服务器和客户端。服务器端创建监听套接字,等待客户端连接;客户端则通过连接服务器的IP地址和端口号来建立连接。 4. *...
通过以上描述,我们可以了解到一个完整的Java TCP聊天程序涉及到了网络编程、多线程、数据传输、用户管理等多个方面的知识。实际开发中,开发者需要结合具体的业务需求和用户体验,进行细致的设计与实现。
TestNettyServerBaseDemo这个文件名可能是该框架的示例代码,用于展示如何使用Netty搭建一个基础的服务端。在实际项目中,开发者可以参考这个示例,结合自己的业务需求,构建数据采集、处理和发送的完整流程。 综上...
Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高性能、异步事件驱动的网络应用程序框架,主要用Java语言编写。MINA旨在简化网络编程,特别是TCP/IP和UDP/IP协议的应用开发,如HTTP、...
在这个"基于 Java Socket 的一个通信框架"中,我们很可能会找到实现客户端和服务器之间通信的代码示例。这个框架可能是为了简化网络应用开发,提供了一套结构化的模式来处理连接建立、数据传输以及断开连接等步骤。 ...
这个服务器会创建一个新的线程来处理每个客户端的连接,确保服务器能够同时处理多个客户端。`ClientHandler`类处理来自客户端的数据,并将其回显回去。 客户端的代码可能如下: ```java import java.io.*; import ...
本项目采用Maven作为构建工具,Maven是一个强大的Java项目管理和综合工具,它能够帮助开发者管理项目的依赖关系,构建过程,以及生成文档。通过在项目中引入Maven,开发者可以方便地将此TCP通讯框架与其他Java项目...
本教程旨在帮助开发者深入理解和掌握这些技术,并提供了一个可直接使用的基础平台框架。 Java NIO,全称为New Input/Output,是Java在1.4版本引入的一种新的I/O模型,它提供了与传统的BIO(Blocking I/O)不同的...
在本项目"基于java语言实现的游戏服务器框架"中,我们可以深入探讨Java技术在游戏服务器开发中的应用,以及如何构建高效、稳定、可扩展的服务器架构。 首先,Java作为一种跨平台的编程语言,具有丰富的类库和优秀的...
本教程将重点讲解如何利用Java实现一个简单的TCP文件上传到服务器的程序。 首先,我们来看`uploadServer.java`。这个文件代表服务器端的程序,主要任务是监听客户端的连接请求,并接收客户端发送过来的文件。在Java...
总的来说,Java Socket库提供了一个强大且灵活的框架,用于构建基于TCP协议的客户端-服务器应用程序。无论是简单的文件传输,还是复杂的应用级协议,都可以通过Java Socket实现。在实际开发中,了解和熟练掌握Socket...