`

tomcat中的BIO应用——JIoEndpoint

阅读更多

一、BIO基础

    BIO即阻塞式I/O,是Java提供的最基本的IO方式,在网络通信中,需要通过Socket在客户端与服务端建立双向链接以实现通信,其主要步骤如下:

    1、服务端监听某个端口是否有链接请求。

    2、客户端向服务端发出链接请求。

    3、服务端向客户端返回Accept接受消息,此时链接成功。

    4、客户端和服务端通过Send(), Write()等方法与对方通信。

    5、关闭链接。

 

    一张BIO的思维导图(图片来源:https://my.oschina.net/langxSpirit/blog/830620



 

     Java分别提供了两个类Socket和ServerSocket,用来表示双向链接的客户端和服务端,基于这两个类,一个简单的网络通信示例如下:

    1、客户端

package com.wang.test.net.io.bio;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/**
 * 客户端
 */
public class Client {

    public static void main(String[] args) throws Exception {
        try (
            //向本机的8080端口发送请求
            Socket socket = new Socket("127.0.0.1", 8080);
            //根据标准输入构造BufferedReader对象
            BufferedReader clientInput = new BufferedReader(new InputStreamReader(System.in));
            //通过Socket得到输出流,构造PrintWriter对象
            PrintWriter writer = new PrintWriter(socket.getOutputStream());
            //通过Socket得到输入流,构造BufferedReader对象
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        ){
            //读取输入信息
            String input = clientInput.readLine();
            while ( !input.equals("exit") ){
                //将输入信息发送到服务器
                writer.print(input);
                //刷新输出流,使服务器端可以马上收到请求信息
                writer.flush();
                //读取服务器端返回信息
                System.out.println("服务器端相应为:" + reader.readLine());
                //读取下一条输入信息
                input = clientInput.readLine();
            }
        } catch ( Exception e ) {
            System.out.println(e);
        }
    }
}

     2、服务端

package com.wang.test.net.io.bio;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * 服务器端
 */
public class Server {

    public static void main(String[] args) {
        try(
            //创建ServerSocket监听端口8080
            ServerSocket server = new ServerSocket(8080);
            //等待客户端请求
            Socket socket = server.accept();
            //根据标准输入构造BufferedReader对象
            BufferedReader serverInput = new BufferedReader(new InputStreamReader(System.in));
            //通过Socket对象得到输出流,BufferedReader
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            //通过Socket对象得到输出流,并构造PrintWrite对象
            PrintWriter writer = new PrintWriter(socket.getOutputStream());
        ){
            //读取客户端请求
            System.out.println("客户端请求:" + reader.readLine());
            //输入服务端相应
            String input = serverInput.readLine();
            //如果输入内容为"exit",则退出
            while (!input.equals("exit")){
                writer.print(input);
                //向客户端输出该字符串
                writer.flush();
                //读取客户端请求
                System.out.println("客户端请求:" + reader.readLine());
                //输入服务器相应
                input = serverInput.readLine();
            }
        } catch ( Exception e ){
            System.out.println(e);
        }
    }
}

 

二、JIoEndpoint

    Tomcat服务器实现中,Tomcat的IO监听由Endpoint完成,具体到BIO是JIoEndpoint,JIoEndpoint启动过程如下:

    1、根据IP地址(多IP的情况)及端口创建ServerSocket实例;JIoEndpoint类bind()方法部分代码

        if (serverSocket == null) {	//根据IP地址(多IP的情况)及端口创建ServerSocket实例
            try {
                if (getAddress() == null) {
                    serverSocket = serverSocketFactory.createSocket(getPort(),
                            getBacklog());
                } else {
                    serverSocket = serverSocketFactory.createSocket(getPort(),
                            getBacklog(), getAddress());
                }
            } catch (BindException orig) {
                String msg;
                if (getAddress() == null)
                    msg = orig.getMessage() + " <null>:" + getPort();
                else
                    msg = orig.getMessage() + " " +
                            getAddress().toString() + ":" + getPort();
                BindException be = new BindException(msg);
                be.initCause(orig);
                throw be;
            }
        }

     2、如果Connector没有配置共享线程池,创建请求处理线程池;JIoEndpoint类startInternal()方法部分代码

            // Create worker collection
            if (getExecutor() == null) {	//如果Connector没有配置共享线程池
                createExecutor();	//创建请求处理线程池
            }

     createExecutor()方法实现在JIoEndpoint父抽象类中

    public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }

     3、根据AcceptThreadCount配置的数量,创建并启动Accept线程,JIoEndpoint类startInternal()方法部分代码

startAcceptorThreads();	//根据AcceptorThreadCount配置的数量,创建并启动Acceptor线程

     startAcceptThread方法实现在父抽象类中:

/**
     * 根据AcceptorThreadCount配置的数量,创建并启动Acceptor线程
     */
    protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];

        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);	//线程单独启动,并未放到线程池中,因此不会影响请求并发处理
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }

     在该方法中,这些线程都是单独启动的,因此不会影响请求并发数量。createAcceptor获取JIoEndpoint类中的内部类Accept,该内部类继承JIoEndpoint父抽象类中的内部类Accept, Acceptor实现了Runnable接口,负责轮询接收客户端请求。JIoEndpoint类中的内部类Acceptor

/**
     * 负责轮询接收客户端请求, 还会检测EndPoint状态以及最大连接数
     * The background thread that listens for incoming TCP/IP connections and
     * hands them off to an appropriate processor.
     */
    protected class Acceptor extends AbstractEndpoint.Acceptor {

        @Override
        public void run() {

            int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();

                    Socket socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = serverSocketFactory.acceptSocket(serverSocket);	
                    } catch (IOException ioe) {
                        countDownConnection();
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused && setSocketOptions(socket)) {	//当接收到客户端请求
                        // Hand this socket off to an appropriate processor
                        if (!processSocket(socket)) {	//
                            countDownConnection();
                            // Close socket right away
                            closeSocket(socket);
                        }
                    } else {
                        countDownConnection();
                        // Close socket right away
                        closeSocket(socket);
                    }
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (NullPointerException npe) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), npe);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }
    }

    4、当接收到客户端请求后,创建SocketProcessor对象,并提交给线程池处理。

    5、SocketProcessor并未直接处理Socket,而是将其交由具体的协议处理类,BIO方式下的HTTP协议使用HTTP11Processor。

    JIoEndpoint类中的内部类SocketProcessor

/**
     * This class is the equivalent of the Worker, but will simply use in an
     * external Executor thread pool.
     * SocketProcessor并未直接处理Socket,而是将其交由具体的协议处理类,如Http11Processor用于处理BIO方式下的HTTP协议
     */
    protected class SocketProcessor implements Runnable {

        protected SocketWrapper<Socket> socket = null;
        protected SocketStatus status = null;

        public SocketProcessor(SocketWrapper<Socket> socket) {
            if (socket==null) throw new NullPointerException();
            this.socket = socket;
        }

        public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
            this(socket);
            this.status = status;
        }

        @Override
        public void run() {
            boolean launch = false;
            synchronized (socket) {
                try {
                    SocketState state = SocketState.OPEN;
                    handler.beforeHandshake(socket);
                    try {
                        // SSL handshake
                        serverSocketFactory.handshake(socket.getSocket());
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString("endpoint.err.handshake"), t);
                        }
                        // Tell to close the socket
                        state = SocketState.CLOSED;
                    }

                    if ((state != SocketState.CLOSED)) {
                        if (status == null) {
                            state = handler.process(socket, SocketStatus.OPEN_READ);
                        } else {
                            state = handler.process(socket,status);
                        }
                    }
                    if (state == SocketState.CLOSED) {
                        // Close socket
                        if (log.isTraceEnabled()) {
                            log.trace("Closing socket:"+socket);
                        }
                        countDownConnection();
                        try {
                            socket.getSocket().close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    } else if (state == SocketState.OPEN ||
                            state == SocketState.UPGRADING  ||
                            state == SocketState.UPGRADED){
                        socket.setKeptAlive(true);
                        socket.access();
                        launch = true;
                    } else if (state == SocketState.LONG) {
                        socket.access();
                        waitingRequests.add(socket);
                    }
                } finally {
                    if (launch) {
                        try {
                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
                        } catch (RejectedExecutionException x) {
                            log.warn("Socket reprocessing request was rejected for:"+socket,x);
                            try {
                                //unable to handle connection at this time
                                handler.process(socket, SocketStatus.DISCONNECT);
                            } finally {
                                countDownConnection();
                            }


                        } catch (NullPointerException npe) {
                            if (running) {
                                log.error(sm.getString("endpoint.launch.fail"),
                                        npe);
                            }
                        }
                    }
                }
            }
            socket = null;
            // Finish up this request
        }

    }

 6、此外,JIoEndpoint还构造了一个单独的线程用于检测超时请求。JIoEndpoint类中的startInternal方法的部分代码

      // Start async timeout thread 用于检测超时请求
      setAsyncTimeout(new AsyncTimeout());
      Thread timeoutThread = new Thread(getAsyncTimeout(), getName() + "-AsyncTimeout");
      timeoutThread.setPriority(threadPriority);
      timeoutThread.setDaemon(true);
      timeoutThread.start();

 

 

 

  • 大小: 119.7 KB
1
0
分享到:
评论

相关推荐

    从连接器组件看Tomcat的线程模型——BIO模式(推荐)

    在Tomcat中,BIO模式下,`JIoEndpoint`组件扮演了核心角色,它启动监听特定端口的ServerSocket,一旦有新的连接请求,这个请求会被放入线程池进行处理。线程池中的线程会调用`Http11Processor`这个协议解析器,对...

    我的tomcat7源码手撕过程

    1. **Web应用加载**:当客户端首次访问Web应用时,Tomcat会加载该应用的`StandardContext`。 2. **Servlet加载**:`StandardContext`查找并加载该应用中声明的Servlet。 3. **初始化Servlet**:调用`Servlet`的`init...

    Tomcat7核心架构

    - **BIO(JIoEndpoint)**:传统的阻塞式I/O模型。 - **NIO(NioEndpoint)**:非阻塞式I/O模型。 - **APR(Apache Portable Runtime libraries)**:利用本地库提供的更高效的I/O操作。 #### 二、请求处理 请求处理是...

    j2ee+tomcat6.0核心api

    chm文件,方便查找,包含tomcat6.0核心类,如Connector,Lifecycle,http11Protocal,JIoEndPoint,javax包等。

    yj软件项目开发设计说明文档

    at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:624) at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:445)

    tomcat源码分析

    Apache Tomcat是Java开发人员最常用的Web服务器之一,它不仅能够作为独立的应用程序运行Servlet和JavaServer Pages(JSP),还可以作为一个内嵌的容器来运行Web应用。Tomcat的核心设计遵循了模块化原则,这使得其...

    struts2漏洞S2-0211

    - **创建Socket连接**:Tomcat通过调用`JIoEndpoint.java`的`run()`方法创建socket连接。 - **解析HTTP协议**:通过`processor.process(socket)`方法解析HTTP请求并返回响应内容。`processor`是`HttpProcessor`的一...

    commons-beanutils-1.7.0

    java.lang.SecurityException: class "org.apache.commons.collections.SequencedHashMap... at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:447) at java.lang.Thread.run(Unknown Source)

    解决struts2下载异常的jar包 struts2-sunspoter-stream-1.0.jar

    at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:489) at java.lang.Thread.run(Thread.java:662) 网络解决办法: (虽然该办法可行,但是本人并不提倡。具体原因在之后解释。) 在...

Global site tag (gtag.js) - Google Analytics