- 浏览: 981231 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Tomcat的Connector(Protocol,CoyoteAdapterAdapter,AprEndPoint)初始化及请求处理过程:
http://donald-draper.iteye.com/blog/2330139
上一篇文章中讲的是Connector(Protocol,CoyoteAdapterAdapter,AprEndPoint)初始化及请求处理过程,
其中EndPoint我们讲的是AprEndPoint,这一篇,我们就来谈谈JioEndPoint
从Http11Protocol我们可以看出EndPoint默认为JIoEndpoint
//DefaultServerSocketFactory
从bind方法可以看出实际为初始化serverSocketFactory及serverSocket;
再来看
先看createExecutor,此方法在AbstractEndPoint中
//创建线程执行器
//查看TaskQueue
//查看TaskThreadFactory
查看ThreadPoolExecutor
ThreadPoolExecutor继承了java.util.concurrent.ThreadPoolExecutor
回到JIoEndpoint的startInternal的initializeConnectionLatch方法
在AbstractEndPoint中
回到JIoEndpoint的startInternal的startAcceptorThreads方法
此方法在AbstractEndPoint中
//创建Socket请求接受处理线程Acceptor
查看JIoEndpoint的createAcceptor方法
这个Acceptor是,JIoEndpoint的内部类
//查看JIoEndpoint的 processSocket(Socket socket)方法
//SocketProcessor,JIOEndPoint内部类
//Socket状态不为null,且不处于关闭状态,处理Socket请求
从上一面一句找,handler
在Http11Protocol的构造方法中
p
,Handler为Http11ConnectionHandler。
查看Http11ConnectionHandler
查看Http11ConnectionHandler是Http11Protocol的内部类
而Http11ConnectionHandler没有process(socket, status),这可方法定已在Handler在,是一个抽象方法
查看AbstractConnectionHandler,为AbstractProtocol的内部类
查看Http11AprProcessor
无处理函数process(socket, status)
查看AbstractHttp11Processor
从AbstractHttp11Processor的process(SocketWrapper<S> socketWrapper)可以看出,
Http处理器,首先解析HTTP的头部协议,过滤器,以及编码问题,然后交给Adapter去处理
来看CoyoteAdapter
前文中我们分析过,每个connector关联一个Service,每个Service有一个Engine,每个Engine有多个Host,每个
Host有一个Pipeline(请求处理链),而Pipeline中是StandardWrappeValve链,StandardWrappeValve是处理
Request的Servlet包装类,那么,此时应该理解这句的含义了吧!
回到JioEndp的回到JIoEndpoint的startInternal的异步线程启动段
查看AsyncTimeout,Jio内部类
查看JIoEndpoint的processSocketAsync方法
总结:
Connetcor初始化,并初始化Http11Protocol,即初始化AbstractProtocol,而AbstractProtocol初始化,相应的初始化AbstractEndPoint初始化方法,AbstractEndPoint初始化方法调用bind方法,bind待子类扩展JioEndPoint,bind方法,主要初始化serverSocketFactory及serverSocket。Connetcor启动,并启动Http11Protocol,即启动AbstractProtocol,而AbstractProtocol启动start方法,启动AbstractEndPoint的start方法,start方法调用 startInternal方法, startInternal带子类扩展,JioEndPoint,startInternal的方法,首先出创建线程执行执行器,然后初始化Request共享锁,然后创建TCP/IP监听,接收线程Acceptor,而Acceptor实际上是JioEndPoint的一个
内部类,继承AbstractEndPoint的内部类Acceptor,最后创建异步TCP/IP监听,接收线程Acceptor。Acceptor接收到TCP/IP请求后,创建SocketProcessor线程处理SOCKET请求,SocketProcessor首先进行SSL handshake,然后,SocketProcessor将实际处理
委托给Http11ConnectionHandler,实际在AbstractConnectionHandler的process(SocketWrapper<S> wrapper,SocketStatus status)
将处理交给Http11Processor,Http11Processor继承AbstractHttp11Processor<Socket>,
AbstractHttp11Processor的process(SocketWrapper<S> socketWrapper)方法,首相处理请求的头部,然后通过调用CoyoteAdapter(Http11Protocol的Adapter)的Service方法,而CoyoteAdapter关联一个Connetcor,CoyoteAdapter的Service方法,通过反射调用Servlet的Service处理方法。
即connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
从前篇文章和这篇文章,我们看一看到JioEndPoint实际上是通过Acceptor来处理请求,及JavaSocket,基于java bio实现,特点是每建立一个连接分配一个线程,读数据阻塞。
而AprEndPoint是通过Poller处理请求,使用Apache Portable Runtime实现,
直接调用native方法,有更高的效率,但是实现依赖具体平台
http://donald-draper.iteye.com/blog/2330139
上一篇文章中讲的是Connector(Protocol,CoyoteAdapterAdapter,AprEndPoint)初始化及请求处理过程,
其中EndPoint我们讲的是AprEndPoint,这一篇,我们就来谈谈JioEndPoint
public class Http11Protocol extends AbstractHttp11JsseProtocol<Socket> { protected Http11ConnectionHandler cHandler; private int disableKeepAlivePercentage = 75; public Http11Protocol() { //新建请求处理EndPoint为JIoEndpoint endpoint = new JIoEndpoint(); cHandler = new Http11ConnectionHandler(this); //设置endpoint的handler ((JIoEndpoint) endpoint).setHandler(cHandler); setSoLinger(Constants.DEFAULT_CONNECTION_LINGER); setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); } }
从Http11Protocol我们可以看出EndPoint默认为JIoEndpoint
/** * Handle incoming TCP connections. * * This class implement a simple server model: one listener thread accepts on a socket and * creates a new worker thread for each incoming connection. * More advanced Endpoints will reuse the threads, use queues, etc. */ public class JIoEndpoint extends AbstractEndpoint<Socket> { /** * Associated server socket. */ protected ServerSocket serverSocket = null; public JIoEndpoint() { // Set maxConnections to zero so we can tell if the user has specified // their own value on the connector when we reach bind() setMaxConnections(0); // Reduce the executor timeout for BIO as threads in keep-alive will not // terminate when the executor interrupts them. setExecutorTerminationTimeoutMillis(0); } // ------------------------------------------------------------- Properties /** * Handling of accepted sockets. */ //实际为Http11ConnectionHandler protected Handler handler = null; /** * Server socket factory. */ protected ServerSocketFactory serverSocketFactory = null; @Override //初始化serverSocketFactory及serverSocket public void bind() throws Exception { // Initialize thread count defaults for acceptor if (acceptorThreadCount == 0) { acceptorThreadCount = 1; } // Initialize maxConnections if (getMaxConnections() == 0) { // User hasn't set a value - use the default setMaxConnections(getMaxThreadsExecutor(true)); } if (serverSocketFactory == null) { if (isSSLEnabled()) { serverSocketFactory = handler.getSslImplementation().getServerSocketFactory(this); } else { //创建serverSocketFactory serverSocketFactory = new DefaultServerSocketFactory(this); } } if (serverSocket == null) { try { if (getAddress() == null) { serverSocket = serverSocketFactory.createSocket(getPort(), getBacklog()); } else { //serverSocketFactory根据ip地址和Port创建serverSocket serverSocket = serverSocketFactory.createSocket(getPort(), getBacklog(), getAddress()); } } catch (BindException orig) { String msg; if (getAddress() == null) msg = orig.getMessage() + " <null>:" + getPort(); else msg = orig.getMessage() + " " + getAddress().toString() + ":" + getPort(); BindException be = new BindException(msg); be.initCause(orig); throw be; } } } }
//DefaultServerSocketFactory
public class DefaultServerSocketFactory implements ServerSocketFactory { /** * * @param endpoint Unused in this implementation. */ public DefaultServerSocketFactory(AbstractEndpoint<?> endpoint) { } @Override public ServerSocket createSocket (int port) throws IOException { return new ServerSocket (port); } @Override public ServerSocket createSocket (int port, int backlog) throws IOException { return new ServerSocket (port, backlog); } @Override //serverSocketFactory根据ip地址和Port创建serverSocket public ServerSocket createSocket (int port, int backlog, InetAddress ifAddress) throws IOException { return new ServerSocket (port, backlog, ifAddress); } @Override public Socket acceptSocket(ServerSocket socket) throws IOException { return socket.accept(); } @Override public void handshake(Socket sock) throws IOException { // NOOP } }
从bind方法可以看出实际为初始化serverSocketFactory及serverSocket;
再来看
public class JIoEndpoint extends AbstractEndpoint<Socket> { @Override public void startInternal() throws Exception { if (!running) { running = true; paused = false; // Create worker collection if (getExecutor() == null) { //线程执行器创建 createExecutor(); } //初始化连接共享锁 initializeConnectionLatch(); startAcceptorThreads(); // Start async timeout thread Thread timeoutThread = new Thread(new AsyncTimeout(), getName() + "-AsyncTimeout"); timeoutThread.setPriority(threadPriority); timeoutThread.setDaemon(true); timeoutThread.start(); } } }
先看createExecutor,此方法在AbstractEndPoint中
//创建线程执行器
public void createExecutor() { internalExecutor = true; //新建任务队列 TaskQueue taskqueue = new TaskQueue(); //新建任务线程工程 TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); //初始化线程执行器 executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }
//查看TaskQueue
/** * As task queue specifically designed to run with a thread pool executor. * The task queue is optimised to properly utilize threads within * a thread pool executor. If you use a normal queue, the executor will spawn threads * when there are idle threads and you wont be able to force items unto the queue itself * */ TaskQueue继承了LinkedBlockingQueue public class TaskQueue extends LinkedBlockingQueue<Runnable> { private ThreadPoolExecutor parent = null; // no need to be volatile, the one times when we change and read it occur in // a single thread (the one that did stop a context and fired listeners) private Integer forcedRemainingCapacity = null; @Override //如果可能的话,将Runnable加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。 public boolean offer(Runnable o) { //we can't do any checks if (parent==null) return super.offer(o); //we are maxed out on threads, simply queue the object if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //we have idle threads, just add it to the queue if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o); //if we have less threads than maximum force creation of a new thread if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; //if we reached here, we need to add it to the queue return super.offer(o); } @Override //获取并移除此队列的头Runnable,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { Runnable runnable = super.poll(timeout, unit); if (runnable == null && parent != null) { // the poll timed out, it gives an opportunity to stop the current // thread if needed to avoid memory leaks. parent.stopCurrentThreadIfNeeded(); } return runnable; } @Override //获取BlockingQueue里排在首位的对象Runnable,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止 public Runnable take() throws InterruptedException { if (parent != null && parent.currentThreadShouldBeStopped()) { return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); // yes, this may return null (in case of timeout) which normally // does not occur with take() // but the ThreadPoolExecutor implementation allows this } return super.take(); } }
//查看TaskThreadFactory
/** * Simple task thread factory to use to create threads for an executor implementation. */ public class TaskThreadFactory implements ThreadFactory { private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; private final boolean daemon; private final int threadPriority; public TaskThreadFactory(String namePrefix, boolean daemon, int priority) { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); this.namePrefix = namePrefix; this.daemon = daemon; this.threadPriority = priority; } @Override public Thread newThread(Runnable r) { TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(daemon); t.setPriority(threadPriority); return t; } }
查看ThreadPoolExecutor
ThreadPoolExecutor继承了java.util.concurrent.ThreadPoolExecutor
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {}
回到JIoEndpoint的startInternal的initializeConnectionLatch方法
在AbstractEndPoint中
protected LimitLatch initializeConnectionLatch() { if (maxConnections==-1) return null; if (connectionLimitLatch==null) { connectionLimitLatch = new LimitLatch(getMaxConnections()); } return connectionLimitLatch; } //LimitLatch /** * Shared latch that allows the latch to be acquired a limited number of times * after which all subsequent requests to acquire the latch will be placed in a * FIFO queue until one of the shares is returned. */ public class LimitLatch { private final Sync sync; private final AtomicLong count; private volatile long limit; private volatile boolean released = false; /** * Instantiates a LimitLatch object with an initial limit. * @param limit - maximum number of concurrent acquisitions of this latch */ public LimitLatch(long limit) { this.limit = limit; this.count = new AtomicLong(0); this.sync = new Sync(); } //获取共享锁 @Override protected int tryAcquireShared(int ignored) { long newCount = count.incrementAndGet(); if (!released && newCount > limit) { // Limit exceeded count.decrementAndGet(); return -1; } else { return 1; } } //释放共享锁 @Override protected boolean tryReleaseShared(int arg) { count.decrementAndGet(); return true; } }
回到JIoEndpoint的startInternal的startAcceptorThreads方法
此方法在AbstractEndPoint中
//创建Socket请求接受处理线程Acceptor
protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } } /** * Hook to allow Endpoints to provide a specific Acceptor implementation. */ //带子类扩展 protected abstract Acceptor createAcceptor();
查看JIoEndpoint的createAcceptor方法
@Override protected AbstractEndpoint.Acceptor createAcceptor() { return new Acceptor(); }
这个Acceptor是,JIoEndpoint的内部类
// --------------------------------------------------- Acceptor Inner Class /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ //监听TCP/IP,并交给相应的processor protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait countUpOrAwaitConnection(); Socket socket = null; try { // Accept the next incoming connection from the server // socket //接收TCP/IP请求 socket = serverSocketFactory.acceptSocket(serverSocket); // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused && setSocketOptions(socket)) { // Hand this socket off to an appropriate processor //处理SOCKET的请求 if (!processSocket(socket)) { countDownConnection(); // Close socket right away closeSocket(socket); } } else { countDownConnection(); // Close socket right away closeSocket(socket); } } state = AcceptorState.ENDED; } }
//查看JIoEndpoint的 processSocket(Socket socket)方法
/** * Process a new connection from a new client. Wraps the socket so * keep-alive and other attributes can be tracked and then passes the socket * to the executor for processing. * * @param socket The socket associated with the client. * * @return <code>true</code> if the socket is passed to the * executor, <code>false</code> if something went wrong or * if the endpoint is shutting down. Returning * <code>false</code> is an indication to close the socket * immediately. */ protected boolean processSocket(Socket socket) { // Process the request from this socket try { SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket); wrapper.setKeepAliveLeft(getMaxKeepAliveRequests()); wrapper.setSecure(isSSLEnabled()); // During shutdown, executor may be null - avoid NPE if (!running) { return false; } //新建SOCKET处理线程 getExecutor().execute(new SocketProcessor(wrapper)); } return true; }
//SocketProcessor,JIOEndPoint内部类
/** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketProcessor implements Runnable { protected SocketWrapper<Socket> socket = null; protected SocketStatus status = null; public SocketProcessor(SocketWrapper<Socket> socket) { if (socket==null) throw new NullPointerException(); this.socket = socket; } public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) { this(socket); this.status = status; } @Override public void run() { boolean launch = false; synchronized (socket) { try { SocketState state = SocketState.OPEN; try { // SSL handshake //SSL捂手,SSL上下文初始化在AbstractEndPoint.init方法中 serverSocketFactory.handshake(socket.getSocket()); } if ((state != SocketState.CLOSED)) { if (status == null) { state = handler.process(socket, SocketStatus.OPEN_READ); } else { //Socket状态不为null,且不处于关闭状态,处理Socket请求 state = handler.process(socket,status); } } if (state == SocketState.CLOSED) { // Close socket if (log.isTraceEnabled()) { log.trace("Closing socket:"+socket); } countDownConnection(); try { socket.getSocket().close(); } catch (IOException e) { // Ignore } } else if (state == SocketState.OPEN || state == SocketState.UPGRADING || state == SocketState.UPGRADING_TOMCAT || state == SocketState.UPGRADED){ socket.setKeptAlive(true); socket.access(); launch = true; } else if (state == SocketState.LONG) { socket.access(); waitingRequests.add(socket); } } finally { if (launch) { try { getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ)); } catch (RejectedExecutionException x) { log.warn("Socket reprocessing request was rejected for:"+socket,x); try { //unable to handle connection at this time handler.process(socket, SocketStatus.DISCONNECT); } finally { countDownConnection(); } } } socket = null; // Finish up this request } }
//Socket状态不为null,且不处于关闭状态,处理Socket请求
state = handler.process(socket,status);
从上一面一句找,handler
在Http11Protocol的构造方法中
p
public Http11Protocol() { //新建请求处理EndPoint为JIoEndpoint endpoint = new JIoEndpoint(); cHandler = new Http11ConnectionHandler(this); //设置endpoint的handler ((JIoEndpoint) endpoint).setHandler(cHandler); setSoLinger(Constants.DEFAULT_CONNECTION_LINGER); setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); }
,Handler为Http11ConnectionHandler。
查看Http11ConnectionHandler
查看Http11ConnectionHandler是Http11Protocol的内部类
// ----------------------------------- Http11ConnectionHandler Inner Class protected static class Http11ConnectionHandler extends AbstractConnectionHandler<Socket, Http11Processor> implements Handler { }
而Http11ConnectionHandler没有process(socket, status),这可方法定已在Handler在,是一个抽象方法
查看AbstractConnectionHandler,为AbstractProtocol的内部类
// ------------------------------------------- Connection handler base class protected abstract static class AbstractConnectionHandler<S,P extends Processor<S>> implements AbstractEndpoint.Handler { //每一个Socket,关联一个Processor protected final Map<S,Processor<S>> connections = new ConcurrentHashMap<S,Processor<S>>(); @SuppressWarnings("deprecation") // Old HTTP upgrade method has been deprecated //处理HTTP请求 public SocketState process(SocketWrapper<S> wrapper,SocketStatus status) { if (wrapper == null) { // Nothing to do. Socket has been closed. return SocketState.CLOSED; } S socket = wrapper.getSocket(); if (socket == null) { // Nothing to do. Socket has been closed. return SocketState.CLOSED; } Processor<S> processor = connections.get(socket); if (status == SocketStatus.DISCONNECT && processor == null) { // Nothing to do. Endpoint requested a close and there is no // longer a processor associated with this socket. return SocketState.CLOSED; } wrapper.setAsync(false); ContainerThreadMarker.markAsContainerThread(); try { if (processor == null) { processor = recycledProcessors.poll(); } if (processor == null) { //创建处理器 processor = createProcessor(); } //初始化SSL,在子类中扩展 initSsl(wrapper, processor); SocketState state = SocketState.CLOSED; do { if (status == SocketStatus.CLOSE_NOW) { processor.errorDispatch(); state = SocketState.CLOSED; } else if (status == SocketStatus.DISCONNECT && !processor.isComet()) { // Do nothing here, just wait for it to get recycled // Don't do this for Comet we need to generate an end // event (see BZ 54022) } else if (processor.isAsync() || state == SocketState.ASYNC_END) { state = processor.asyncDispatch(status); if (state == SocketState.OPEN) { // release() won't get called so in case this request // takes a long time to process, remove the socket from // the waiting requests now else the async timeout will // fire getProtocol().endpoint.removeWaitingRequest(wrapper); // There may be pipe-lined data to read. If the data // isn't processed now, execution will exit this // loop and call release() which will recycle the // processor (and input buffer) deleting any // pipe-lined data. To avoid this, process it now. state = processor.process(wrapper); } } else if (processor.isComet()) { state = processor.event(status); } { state = processor.process(wrapper); } } //带子类扩展 protected abstract P createProcessor(); protected abstract void initSsl(SocketWrapper<S> socket, Processor<S> processor); protected abstract void longPoll(SocketWrapper<S> socket, Processor<S> processor); protected abstract void release(SocketWrapper<S> socket, Processor<S> processor, boolean socketClosing, boolean addToPoller); } //查看Http11ConnectionHandler,createProcessor @Override protected Http11AprProcessor createProcessor() { Http11AprProcessor processor = new Http11AprProcessor( proto.getMaxHttpHeaderSize(), (AprEndpoint)proto.endpoint, proto.getMaxTrailerSize(), proto.getAllowedTrailerHeadersAsSet(), proto.getMaxExtensionSize(), proto.getMaxSwallowSize()); //设置处理器适配器,在connector构造函数中, //adapter = new CoyoteAdapter(this); //protocolHandler.setAdapter(adapter); //有这么两句 processor.setAdapter(proto.adapter); processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests()); processor.setKeepAliveTimeout(proto.getKeepAliveTimeout()); processor.setConnectionUploadTimeout( proto.getConnectionUploadTimeout()); processor.setDisableUploadTimeout(proto.getDisableUploadTimeout()); processor.setCompressionMinSize(proto.getCompressionMinSize()); processor.setCompression(proto.getCompression()); processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents()); processor.setCompressableMimeTypes(proto.getCompressableMimeTypes()); processor.setRestrictedUserAgents(proto.getRestrictedUserAgents()); processor.setSocketBuffer(proto.getSocketBuffer()); processor.setMaxSavePostSize(proto.getMaxSavePostSize()); processor.setServer(proto.getServer()); processor.setClientCertProvider(proto.getClientCertProvider()); register(processor); return processor; }
查看Http11AprProcessor
public class Http11AprProcessor extends AbstractHttp11Processor<Long> {}
无处理函数process(socket, status)
查看AbstractHttp11Processor
public abstract class AbstractHttp11Processor<S> extends AbstractProcessor<S> { /** * Process pipelined HTTP requests using the specified input and output * streams. * @param socketWrapper Socket from which the HTTP requests will be read * and the HTTP responses will be written. * @throws IOException error during an I/O operation */ //处理HTTP请求 @Override public SocketState process(SocketWrapper<S> socketWrapper) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // Setting up the I/O setSocketWrapper(socketWrapper); getInputBuffer().init(socketWrapper, endpoint); getOutputBuffer().init(socketWrapper, endpoint); while (!getErrorState().isError() && keepAlive && !comet && !isAsync() && upgradeInbound == null && httpUpgradeHandler == null && !endpoint.isPaused()) { // Parsing the request header try { setRequestLineReadTimeout(); //503 if (endpoint.isPaused()) { // 503 - Service unavailable response.setStatus(503); setErrorState(ErrorState.CLOSE_CLEAN, null); } else { keptAlive = true; // Set this every time in case limit has been changed via JMX request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount()); // Currently only NIO will ever return false here if (!getInputBuffer().parseHeaders()) { // We've read part of the request, don't recycle it // instead associate it with the socket openSocket = true; readComplete = false; break; } if (!disableUploadTimeout) { setSocketTimeout(connectionUploadTimeout); } } catch (Throwable t) { // 400 - Bad Request response.setStatus(400); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } if (!getErrorState().isError()) { // Setting up filters, and parse some request headers rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); try { //请求预处理 prepareRequest(); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } } //503,400,500,看到这些数字想到了什么? // Process the request in the adapter //适配器,处理request, response if (!getErrorState().isError()) { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); //这个就是关键了,service函数,这个adapter,上面已经讲过 //实际上是CoyoteAdapter adapter.service(request, response); } /** * After reading the request headers, we have to setup the request filters. */ protected void prepareRequest() { MimeHeaders headers = request.getMimeHeaders(); // Check for a full URI (including protocol://host:port/) ByteChunk uriBC = request.requestURI().getByteChunk(); if (uriBC.startsWithIgnoreCase("http", 0)) { int pos = uriBC.indexOf("://", 0, 3, 4); int uriBCStart = uriBC.getStart(); int slashPos = -1; if (pos != -1) { byte[] uriB = uriBC.getBytes(); slashPos = uriBC.indexOf('/', pos + 3); if (slashPos == -1) { slashPos = uriBC.getLength(); // Set URI as "/" request.requestURI().setBytes (uriB, uriBCStart + pos + 1, 1); } else { request.requestURI().setBytes (uriB, uriBCStart + slashPos, uriBC.getLength() - slashPos); } MessageBytes hostMB = headers.setValue("host"); hostMB.setBytes(uriB, uriBCStart + pos + 3, slashPos - pos - 3); } } } }
从AbstractHttp11Processor的process(SocketWrapper<S> socketWrapper)可以看出,
Http处理器,首先解析HTTP的头部协议,过滤器,以及编码问题,然后交给Adapter去处理
来看CoyoteAdapter
public class CoyoteAdapter implements Adapter { /** * Construct a new CoyoteProcessor associated with the specified connector. * * @param connector CoyoteConnector that owns this processor */ //一个Adapter关联一个Connector public CoyoteAdapter(Connector connector) { super(); this.connector = connector; } private Connector connector = null; /** * Encoder for the Location URL in HTTP redirects. */ protected static URLEncoder urlEncoder; //url安全字符集 static { urlEncoder = new URLEncoder(); urlEncoder.addSafeCharacter('-'); urlEncoder.addSafeCharacter('_'); urlEncoder.addSafeCharacter('.'); urlEncoder.addSafeCharacter('*'); urlEncoder.addSafeCharacter('/'); } ** * Service method. */ @Override public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception { Request request = (Request) req.getNote(ADAPTER_NOTES); Response response = (Response) res.getNote(ADAPTER_NOTES); try { // Parse and set Catalina and configuration specific // request parameters req.getRequestProcessor().setWorkerThreadName(Thread.currentThread().getName()); postParseSuccess = postParseRequest(req, request, res, response); if (postParseSuccess) { //check valves if we support async request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported()); // Calling the container //这个就是我们一直想要找的,Servlet处理request connector.getService().getContainer().getPipeline().getFirst().invoke(request, response); } }
前文中我们分析过,每个connector关联一个Service,每个Service有一个Engine,每个Engine有多个Host,每个
Host有一个Pipeline(请求处理链),而Pipeline中是StandardWrappeValve链,StandardWrappeValve是处理
Request的Servlet包装类,那么,此时应该理解这句的含义了吧!
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
回到JioEndp的回到JIoEndpoint的startInternal的异步线程启动段
// Start async timeout thread Thread timeoutThread = new Thread(new AsyncTimeout(), getName() + "-AsyncTimeout"); timeoutThread.setPriority(threadPriority); timeoutThread.setDaemon(true); timeoutThread.start();
查看AsyncTimeout,Jio内部类
/** * Async timeout thread */ protected class AsyncTimeout implements Runnable { /** * The background thread that checks async requests and fires the * timeout if there has been no activity. */ @Override public void run() { // Loop until we receive a shutdown command while (running) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } long now = System.currentTimeMillis(); Iterator<SocketWrapper<Socket>> sockets = waitingRequests.iterator(); while (sockets.hasNext()) { SocketWrapper<Socket> socket = sockets.next(); long access = socket.getLastAccess(); if (socket.getTimeout() > 0 && (now-access)>socket.getTimeout()) { // Prevent multiple timeouts socket.setTimeout(-1); //异步Socket处理 processSocketAsync(socket,SocketStatus.TIMEOUT); } } // Loop if endpoint is paused while (paused && running) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } } } }
查看JIoEndpoint的processSocketAsync方法
/** * Process an existing async connection. If processing is required, passes * the wrapped socket to an executor for processing. * * @param socket The socket associated with the client. * @param status Only OPEN and TIMEOUT are used. The others are used for * Comet requests that are not supported by the BIO (JIO) * Connector. */ @Override public void processSocketAsync(SocketWrapper<Socket> socket, SocketStatus status) { try { synchronized (socket) { //等待TIMEOUT if (waitingRequests.remove(socket)) { SocketProcessor proc = new SocketProcessor(socket,status); ClassLoader loader = Thread.currentThread().getContextClassLoader(); try { //threads should not be created by the webapp classloader if (Constants.IS_SECURITY_ENABLED) { PrivilegedAction<Void> pa = new PrivilegedSetTccl( getClass().getClassLoader()); AccessController.doPrivileged(pa); } else { Thread.currentThread().setContextClassLoader( getClass().getClassLoader()); } // During shutdown, executor may be null - avoid NPE if (!running) { return; } getExecutor().execute(proc); //TODO gotta catch RejectedExecutionException and properly handle it } finally { if (Constants.IS_SECURITY_ENABLED) { PrivilegedAction<Void> pa = new PrivilegedSetTccl(loader); AccessController.doPrivileged(pa); } else { Thread.currentThread().setContextClassLoader(loader); } } } } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); } } //waitingRequests protected ConcurrentLinkedQueue<SocketWrapper<Socket>> waitingRequests = new ConcurrentLinkedQueue<SocketWrapper<Socket>>();
总结:
Connetcor初始化,并初始化Http11Protocol,即初始化AbstractProtocol,而AbstractProtocol初始化,相应的初始化AbstractEndPoint初始化方法,AbstractEndPoint初始化方法调用bind方法,bind待子类扩展JioEndPoint,bind方法,主要初始化serverSocketFactory及serverSocket。Connetcor启动,并启动Http11Protocol,即启动AbstractProtocol,而AbstractProtocol启动start方法,启动AbstractEndPoint的start方法,start方法调用 startInternal方法, startInternal带子类扩展,JioEndPoint,startInternal的方法,首先出创建线程执行执行器,然后初始化Request共享锁,然后创建TCP/IP监听,接收线程Acceptor,而Acceptor实际上是JioEndPoint的一个
内部类,继承AbstractEndPoint的内部类Acceptor,最后创建异步TCP/IP监听,接收线程Acceptor。Acceptor接收到TCP/IP请求后,创建SocketProcessor线程处理SOCKET请求,SocketProcessor首先进行SSL handshake,然后,SocketProcessor将实际处理
委托给Http11ConnectionHandler,实际在AbstractConnectionHandler的process(SocketWrapper<S> wrapper,SocketStatus status)
将处理交给Http11Processor,Http11Processor继承AbstractHttp11Processor<Socket>,
AbstractHttp11Processor的process(SocketWrapper<S> socketWrapper)方法,首相处理请求的头部,然后通过调用CoyoteAdapter(Http11Protocol的Adapter)的Service方法,而CoyoteAdapter关联一个Connetcor,CoyoteAdapter的Service方法,通过反射调用Servlet的Service处理方法。
即connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
从前篇文章和这篇文章,我们看一看到JioEndPoint实际上是通过Acceptor来处理请求,及JavaSocket,基于java bio实现,特点是每建立一个连接分配一个线程,读数据阻塞。
而AprEndPoint是通过Poller处理请求,使用Apache Portable Runtime实现,
直接调用native方法,有更高的效率,但是实现依赖具体平台
发表评论
-
tomcat日志及log4j日志乱码
2017-04-26 15:00 5500Linux系统字符编码:http://www.cnblogs. ... -
SQL盲注的解决方案
2016-10-27 18:35 2570引起SQL盲注的原因:http://www-01.ibm.co ... -
Tomcat的HTTPS配置详解
2016-10-27 16:59 972HTTPS原理详解:http://blog.csdn.net/ ... -
Tomcat的StandardService,Executor初始化与启动
2016-10-13 09:15 1610Tomcat的Server初始化及启动过程:http://do ... -
Tomcat的Connector(Protocol,CoyoteAdapterAdapter,AprEndPoint)初始化及请求处理过程
2016-10-12 16:46 1620Tomcat的Server初始化及启动过程:http://do ... -
Tomcat问题集
2016-10-09 17:50 809tcnative-1.dll: Can't load AMD ... -
Tomcat-ConfigContext监听器(Context.xml,web.xml)
2016-09-27 12:59 1984Tomcat的Host初始化(Context,Listener ... -
Tomcat的Host初始化(Context,Listener,Filter,Servlet)
2016-09-27 08:42 1499Tomcat的Engine初始化,启动过程:http://do ... -
Tomcat的Engine初始化,启动过程
2016-09-26 15:33 2301Tomcat的Server初始化及启动过程:http://do ... -
Tomcat的Server初始化及启动过程
2016-09-26 11:02 2939Tomcat7,启动过程(BootStrap、Catalina ... -
Tomcat7,启动过程(BootStrap、Catalina)
2016-09-23 19:08 1862Tomcat 系统架构与设计模式,工作原理:http://ww ... -
405 request method post not supported
2016-08-18 18:09 32301.检查链接地址有没有错误 2.看RequestMethod, ... -
apache-tomcat-7.0.67\bin\tcnative-1.dll: Can't load AMD 64-bit .dll on a IA 3
2016-08-12 13:02 1125F:\apache-tomcat-7.0.67\bin\tcn ...
相关推荐
#### Tomcat处理一次请求过程分析 当客户端向Tomcat发送HTTP请求时,Tomcat会通过以下步骤处理请求: 1. **连接器接收请求**:`Connector`组件负责监听客户端请求,并将请求传递给适配器。 2. **协议适配器处理...
请求处理是Tomcat的核心功能之一,涉及到HTTP请求的接收、解析以及最终的Servlet执行。 - **请求处理流程**: - 接收客户端发送的HTTP请求。 - 解析HTTP请求。 - 根据URL映射到相应的Servlet。 - 执行Servlet...
每个请求都会分配一个线程进行处理,这意味着在高并发场景下,如果所有线程都在处理请求,新的连接可能会因为没有可用线程而被阻塞,这也是BIO模式的主要缺点之一。 在Spring Boot中,如果你想研究BIO模式,需要对...
chm文件,方便查找,包含tomcat6.0核心类,如Connector,Lifecycle,http11Protocal,JIoEndPoint,javax包等。
##### 3.1 Tomcat处理HTTP请求 - **创建Socket连接**:Tomcat通过调用`JIoEndpoint.java`的`run()`方法创建socket连接。 - **解析HTTP协议**:通过`processor.process(socket)`方法解析HTTP请求并返回响应内容。`...
- **ProtocolHandler**:`Http11Protocol`是处理HTTP/1.1协议的协议处理器,它通过`endpoint`(如`JioEndpoint`)与客户端建立连接。 - **Endpoint**:`Http11ConnectionHandler`是`endpoint`内部的一个处理连接的类...
at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:624) at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:445)
java.lang.SecurityException: class "org.apache.commons.collections.SequencedHashMap... at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:447) at java.lang.Thread.run(Unknown Source)
at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:489) at java.lang.Thread.run(Thread.java:662) 网络解决办法: (虽然该办法可行,但是本人并不提倡。具体原因在之后解释。) 在...