`
zyongsheng83
  • 浏览: 43260 次
  • 性别: Icon_minigender_1
  • 来自: 大连
最近访客 更多访客>>
社区版块
存档分类
最新评论

一个java tcp服务器的基础框架

    博客分类:
  • jse
阅读更多

其中主要想探讨的是一个监听连接的AcceptorReactor类,一个监听数据到达的SessionReactor类,一个服务器断主控类ServerManager,一个控制数据发送、接收、存储用户信息的Session类。

 

在服务器运行的时候,只有3个线程在跑,一个是main主线程,一个是监听连接的线程,一个是监听客户端数据到达的线程。当有客户端数据达时,会另开辟线程处理,处理结束后销毁该线程。

 

在使用的时候,需要自己写类继承ServerManager实现自己server的功能,需要写类继承Session实现自己的数据处理,在server.properties中配置服务器端口号、客户端数据编码、需要加载的Session类(也就是自己写的继承自Session的类)、发送接收数据时的数据分隔符。

 

其中的Reactor模式是参考网上的,具体网址已经忘记了。

 

AcceptorReactor类:

/**
 * 
 */
package zys.net.tcp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import zys.ThreadRunnable;

/**
 * @author Administrator
 */
final class AcceptorReactor extends ThreadRunnable {

  private ServerManager serverManager;

  private Class<Session> sessionClass;

  private ServerSocketChannel serverSocketChannel;

  private Selector selector;

  public AcceptorReactor() {
    super();
  }

  /**
   * 
   */
  public void run() {
    try {
      serverSocketChannel = ServerSocketChannel.open();
      try {
        ServerSocket sSocket = serverSocketChannel.socket();
        try {
          sSocket.bind(new InetSocketAddress(serverManager.getPort()));
          serverSocketChannel.configureBlocking(false);
          selector = Selector.open();
          try {
            SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            sk.attach(new Acceptor());
            serverManager.logInfo("Listener Reactor started.");
            querySelector();
          } finally {
            selector.close();
          }
        } finally {
          sSocket.close();
        }
      } finally {
        serverSocketChannel.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  /**
   * @param aSelector
   * @throws IOException
   */
  private void querySelector() throws IOException {
    ExecutorService pool = Executors.newFixedThreadPool(50);
    try {
      while (!Thread.interrupted()) {
        int n = selector.select();
        if (n != 0) {
          Iterator it = selector.selectedKeys().iterator();
          while (it.hasNext()) {
            SelectionKey key = (SelectionKey) (it.next());
            pool.execute((Runnable) key.attachment());
            it.remove();
          }
        }
      }
    } finally {
      pool.shutdown();
      try {
        if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
          pool.shutdownNow();
        }
      } catch (InterruptedException e) {
        pool.shutdownNow();
      }
    }
  }

  class Acceptor implements Runnable { // inner
    public void run() {
      try {
        SocketChannel c = serverSocketChannel.accept();
        if (c != null) {
          c.socket().setSoLinger(true, 0);
          serverManager.logInfo("One Session conncted.");
          Session session = sessionClass.newInstance();
          session.setManager(serverManager);
          session.setSocketChannel(c);
          session.setConnTime(new Date());
          session.setConnIP(c.socket().getInetAddress().getHostAddress());
          session.setConnStatus(Session.CONN_STATUS_CONNECT);
          serverManager.registerSession(session);
        }
      } catch (Exception ex) {
        // log
      }
    }
  }

  public ServerManager getServerManager() {
    return serverManager;
  }

  public void setServerManager(ServerManager aServerManager) {
    serverManager = aServerManager;
  }

  public Class<Session> getSessionClass() {
    return sessionClass;
  }

  public void setSessionClass(Class<Session> aSessionClass) {
    sessionClass = aSessionClass;
  }
}

 

 

其中,在run方法中注册连接监听,在querySelector中捕获连接请求,在Acceptor的run中实现对监听的处理。

 

SessionReactor类:

/**
 * 
 */
package zys.net.tcp;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import zys.ThreadRunnable;

/**
 * @author Administrator
 */
final class SessionReactor extends ThreadRunnable {
  private ServerManager serverManager;

  private Selector selector;

  private ArrayList<Session> preparedSessions;

  /**
   * 
   */
  public SessionReactor() {
    super();
    preparedSessions = new ArrayList<Session>();
  }

  /**
   * @param aSession
   * @throws IOException
   */
  public void registerSession(Session aSession) throws IOException {
    synchronized (preparedSessions) {
      preparedSessions.add(aSession);
    }
    selector.wakeup();
  }

  public void clearPreparedSessions() {
    synchronized (preparedSessions) {
      preparedSessions.clear();
    }
    selector.wakeup();
  }

  public void stop() throws Exception {
    super.stop();
    selector.wakeup();
  }

  /*
   * (non-Javadoc)
   * 
   * @see java.lang.Runnable#run()
   */
  public void run() {
    try {
      selector = Selector.open();
      try {
        serverManager.logInfo("Session Reactor started.");
        querySelector();
      } finally {
        selector.close();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  private void querySelector() throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(200);
    try {
      int preparedCount = 0;
      while (!Thread.interrupted()) {
        synchronized (preparedSessions) {
          preparedCount = preparedSessions.size();
          if (preparedCount > 0) {
            Iterator<Session> sessionIt = preparedSessions.iterator();
            while (sessionIt.hasNext()) {
              Session session = sessionIt.next();
              SocketChannel channel = session.getSocketChannel();
              channel.configureBlocking(false);
              SelectionKey skReader = channel.register(selector, SelectionKey.OP_READ);
              skReader.attach(new Reader(session));
              serverManager.logInfo("One Session registered.");
            }
            preparedSessions.clear();
          }
        }

        int n = selector.select();
        if (n != 0) {
          Iterator it = selector.selectedKeys().iterator();
          while (it.hasNext()) {
            // dispatch((SelectionKey) (it.next()));
            SelectionKey key = (SelectionKey) (it.next());
            pool.execute((Runnable) key.attachment());
            it.remove();
          }
        }
      }

      Iterator<SelectionKey> it = selector.keys().iterator();
      while (it.hasNext()) {
        SelectionKey key = (SelectionKey) (it.next());
        Reader r = (Reader) (key.attachment());
        key.cancel();
        r.getSession().distroy();
      }
    } finally {
      pool.shutdown();
      try {
        if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
          pool.shutdownNow();
        }
      } catch (InterruptedException e) {
        pool.shutdownNow();
      }
    }
  }

  class Reader implements Runnable { // inner
    private Session session;

    public Reader(Session aSession) {
      session = aSession;
    }

    public void run() {
      if(!session.isActive()){
        return;
      }
      SocketChannel channel = session.getSocketChannel();
      int count;
      ByteBuffer buffer = null;
      try {
        synchronized(channel){
          while(true){
            buffer = ByteBuffer.allocate(10);
            count = channel.read(buffer);
            if (count > 0) {
              buffer = ByteBuffer.allocate(Integer.valueOf(new String(buffer.array(), 0, count, serverManager.getCharSet())));
              count = channel.read(buffer);
              String sMsg = new String(buffer.array(), 0, count, serverManager.getCharSet());
              serverManager.logDebugReceive(sMsg);
              session.onReceive(sMsg);
            }else{
              if(count == -1){
                if (session.isActive()) {
                  session.distroy();
                }
              }
              break;
            }
          }
        }
      } catch (Exception e) {
        e.printStackTrace();
        serverManager.logError(this.getClass(), "Reader.run" , e.getMessage());
        if (session.isActive()) {
          session.distroy();
        }
      }
    }

    public Session getSession() {
      return session;
    }
  }

  public void setServerManager(ServerManager aServerManager) {
    serverManager = aServerManager;
  }
}

 

 

其中,在registerSession中准备需要注册接收数据的Session对象,在querySelector中的synchronized (preparedSessions) {吧准备好的Session对象注册成接收数据监听,之后处理接收到数据的请求,Reader是处理接收到的数据的类。

 

所有代码已经上传,包zys.net.tcp中是核心类,包server中的是测试类,功能是客户端连接后每隔一秒向客户端发送字符串变量COST_PARAMS_STR的值,以实现客户端数据的事实刷新,发送格式是10位的代表数据长度的数字(比如0000000013),5位代表数据类别的数字(比如20000,代表每秒发的同步数据),紧接着是实际数据。
可以用telnet xxx.xxx.xxx.xxx 9999 测试。

 

所有核心代码以及测试代码已经上传,utf-8编码的。

 

比较担心的是当用户交互比较频繁的时候,服务器开辟线程过多,是否会引起服务器效率低下,不过经过测试在100个客户端的情况下,数据交互正常,丝毫看不出来延迟。比较失败的是zys.ThreadRunnable类,在某些地方引用了,这个类很脱裤子放屁。

 

欢迎批评,建议,请致zyongsheng83@163.com,谢谢!

分享到:
评论

相关推荐

    Java tcp服务器框架

    Java TCP服务器框架是一种用于构建高性能、可扩展网络应用的核心组件,尤其适合开发需要稳定通信的分布式系统。这个框架是基于Java编程语言实现的,利用了Java的Socket编程接口来搭建服务器与客户端之间的TCP连接。...

    JAVA TCP和UDP Socket通信框架

    Java中的TCP和UDP Socket通信是网络编程的基础,用于在不同设备之间建立可靠的数据传输通道。TCP(Transmission Control Protocol)提供的是面向连接、有序且无损的服务,而UDP(User Datagram Protocol)则是无连接...

    tcp服务器框架 以及一个简单命令行聊天的测试例子

    TCP服务器框架是网络编程中的基础组件,用于接收和处理客户端的连接请求。在这个特定的案例中,该框架可能包括了服务器启动脚本(RunServer.bat)、客户端连接脚本(RunClient.bat)以及项目配置文件(.classpath、....

    java -> QuickServer 新建 TCP 服务器端

    QuickServer是一个轻量级的、线程池驱动的Java TCP服务器框架。它支持SSL加密,可以处理大量的并发连接,并提供了一套灵活的事件处理机制。利用QuickServer,开发者可以通过编写少量代码快速搭建功能丰富的服务器...

    Java采用Netty实现基于DTU的TCP服务器 + 多端口 + 多协议

    本文将深入探讨如何使用Java的Netty框架实现一个基于DTU(Data Transfer Unit)的TCP服务器,该服务器具备多端口通信和多协议解析的能力。 首先,DTU是一种专门用于远程数据传输的设备,它能够通过GPRS、3G/4G等...

    TCP通讯框架资源包,mina 通讯框架

    Mina,全称为“Apache MINA (Multipurpose Infrastructure for Network Applications)”,是由Apache软件基金会开发的开源项目,它为Java开发者提供了一个高级的网络通信框架。Mina不仅支持TCP/IP协议,还支持UDP/IP...

    c++客户端和java(Netty)服务器端tcp通讯

    在Java端,Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。利用Netty,我们可以轻松实现TCP服务器,创建ServerBootstrap,配置通道处理器pipeline,监听客户端...

    JConn:Java TCP 网络框架-开源

    要查看完整的变更日志,请单击以下链接 - https://github.com/davidg95/JConn/releases ---该项目仍处于早期阶段。 请在 ... --- Java TCP 网络框架,使Java 网络应用程序的开发更容易。

    TCP协议服务器/客户端框架

    3. **Socket编程**:在C#中,通过System.Net.Sockets命名空间提供的Socket类,我们可以创建TCP服务器和客户端。服务器端创建监听套接字,等待客户端连接;客户端则通过连接服务器的IP地址和端口号来建立连接。 4. *...

    java tcp 聊天程序

    通过以上描述,我们可以了解到一个完整的Java TCP聊天程序涉及到了网络编程、多线程、数据传输、用户管理等多个方面的知识。实际开发中,开发者需要结合具体的业务需求和用户体验,进行细致的设计与实现。

    Java服务器数据采集框架

    TestNettyServerBaseDemo这个文件名可能是该框架的示例代码,用于展示如何使用Netty搭建一个基础的服务端。在实际项目中,开发者可以参考这个示例,结合自己的业务需求,构建数据采集、处理和发送的完整流程。 综上...

    Apache MINA基于JAVA的网络服务器框架 教程.zip

    Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高性能、异步事件驱动的网络应用程序框架,主要用Java语言编写。MINA旨在简化网络编程,特别是TCP/IP和UDP/IP协议的应用开发,如HTTP、...

    基于 Java Socket 的一个通信框架.zip

    在这个"基于 Java Socket 的一个通信框架"中,我们很可能会找到实现客户端和服务器之间通信的代码示例。这个框架可能是为了简化网络应用开发,提供了一套结构化的模式来处理连接建立、数据传输以及断开连接等步骤。 ...

    javatcp.rar_java TCP线程_java tcp 线程

    这个服务器会创建一个新的线程来处理每个客户端的连接,确保服务器能够同时处理多个客户端。`ClientHandler`类处理来自客户端的数据,并将其回显回去。 客户端的代码可能如下: ```java import java.io.*; import ...

    TCP开发通讯框架,maven项目格式,方便集成

    本项目采用Maven作为构建工具,Maven是一个强大的Java项目管理和综合工具,它能够帮助开发者管理项目的依赖关系,构建过程,以及生成文档。通过在项目中引入Maven,开发者可以方便地将此TCP通讯框架与其他Java项目...

    Java springboot 整合mina 框架,nio通讯基础教程,mina框架基础教程.zip

    本教程旨在帮助开发者深入理解和掌握这些技术,并提供了一个可直接使用的基础平台框架。 Java NIO,全称为New Input/Output,是Java在1.4版本引入的一种新的I/O模型,它提供了与传统的BIO(Blocking I/O)不同的...

    基于java语言实现的游戏服务器框架.zip

    在本项目"基于java语言实现的游戏服务器框架"中,我们可以深入探讨Java技术在游戏服务器开发中的应用,以及如何构建高效、稳定、可扩展的服务器架构。 首先,Java作为一种跨平台的编程语言,具有丰富的类库和优秀的...

    Java网络编程之TCP协议下—上传文件到服务器程序

    本教程将重点讲解如何利用Java实现一个简单的TCP文件上传到服务器的程序。 首先,我们来看`uploadServer.java`。这个文件代表服务器端的程序,主要任务是监听客户端的连接请求,并接收客户端发送过来的文件。在Java...

    JAVA_Socket_TCP(客户端服务器)

    总的来说,Java Socket库提供了一个强大且灵活的框架,用于构建基于TCP协议的客户端-服务器应用程序。无论是简单的文件传输,还是复杂的应用级协议,都可以通过Java Socket实现。在实际开发中,了解和熟练掌握Socket...

Global site tag (gtag.js) - Google Analytics