对于这2种io以及在socket的应用不作描述,主要探究一下在tomcat中是如何应用这2种io的。找了2个版本的tomcat做一下对比,以tomcat4、tomcat6为例。
1 .tomcat4
在tomcat4中,只有bio的使用。首先看这个类
public final class HttpConnector
implements Connector, Lifecycle, Runnable
它实现了Runnable接口以及Lifecycle接口,在tomcat中,实现Lifecycle接口的类都需要实现start和stop等接口,作为一种规范,在类被实例化后start方法会被显式调用;并且在启动和销毁时都会触发相应事件,这里不多说。
HttpConnector在被实例化后它的start方法会被调用
private Stack processors = new Stack();
public void start() throws LifecycleException {
// Validate and update our current state
if (started)
throw new LifecycleException
(sm.getString("httpConnector.alreadyStarted"));
threadName = "HttpConnector[" + port + "]";
lifecycle.fireLifecycleEvent(START_EVENT, null);
started = true;
// Start our background thread
threadStart();
// Create the specified minimum number of processors
while (curProcessors < minProcessors) {
if ((maxProcessors > 0) && (curProcessors >= maxProcessors))
break;
HttpProcessor processor = newProcessor();
recycle(processor);
}
}
private void threadStart() {
log(sm.getString("httpConnector.starting"));
thread = new Thread(this, threadName);
thread.setDaemon(true);
thread.start();
}
public void run() {
// Loop until we receive a shutdown command
while (!stopped) {
// Accept the next incoming connection from the server socket
Socket socket = null;
try {
// if (debug >= 3)
// log("run: Waiting on serverSocket.accept()");
socket = serverSocket.accept();
// if (debug >= 3)
// log("run: Returned from serverSocket.accept()");
if (connectionTimeout > 0)
socket.setSoTimeout(connectionTimeout);
socket.setTcpNoDelay(tcpNoDelay);
} catch (AccessControlException ace) {
log("socket accept security exception", ace);
continue;
} catch (IOException e) {
// if (debug >= 3)
// log("run: Accept returned IOException", e);
try {
// If reopening fails, exit
synchronized (threadSync) {
if (started && !stopped)
log("accept error: ", e);
if (!stopped) {
// if (debug >= 3)
// log("run: Closing server socket");
serverSocket.close();
// if (debug >= 3)
// log("run: Reopening server socket");
serverSocket = open();
}
}
// if (debug >= 3)
// log("run: IOException processing completed");
} catch (IOException ioe) {
log("socket reopen, io problem: ", ioe);
break;
} catch (KeyStoreException kse) {
log("socket reopen, keystore problem: ", kse);
break;
} catch (NoSuchAlgorithmException nsae) {
log("socket reopen, keystore algorithm problem: ", nsae);
break;
} catch (CertificateException ce) {
log("socket reopen, certificate problem: ", ce);
break;
} catch (UnrecoverableKeyException uke) {
log("socket reopen, unrecoverable key: ", uke);
break;
} catch (KeyManagementException kme) {
log("socket reopen, key management problem: ", kme);
break;
}
continue;
}
// Hand this socket off to an appropriate processor
HttpProcessor processor = createProcessor();
if (processor == null) {
try {
log(sm.getString("httpConnector.noProcessor"));
socket.close();
} catch (IOException e) {
;
}
continue;
}
// if (debug >= 3)
// log("run: Assigning socket to processor " + processor);
processor.assign(socket);
// The processor will recycle itself when it finishes
}
// Notify the threadStop() method that we have shut ourselves down
// if (debug >= 3)
// log("run: Notifying threadStop() that we have shut down");
synchronized (threadSync) {
threadSync.notifyAll();
}
}
调用start方法,线程被启动,在此之前会创建好ServerSocket,如果ServerSocket异常关闭会重新启动。接着run方法内会一直while循环,调用ServerSocket的accept从请求队列中获取套接字socket
private ServerSocket open()
throws IOException, KeyStoreException, NoSuchAlgorithmException,
CertificateException, UnrecoverableKeyException,
KeyManagementException
{
// Acquire the server socket factory for this Connector
ServerSocketFactory factory = getFactory();
// If no address is specified, open a connection on all addresses
if (address == null) {
log(sm.getString("httpConnector.allAddresses"));
try {
return (factory.createSocket(port, acceptCount));
} catch (BindException be) {
throw new BindException(be.getMessage() + ":" + port);
}
}
// Open a server socket on the specified address
try {
InetAddress is = InetAddress.getByName(address);
log(sm.getString("httpConnector.anAddress", address));
try {
return (factory.createSocket(port, acceptCount, is));
} catch (BindException be) {
throw new BindException(be.getMessage() + ":" + address +
":" + port);
}
} catch (Exception e) {
log(sm.getString("httpConnector.noAddress", address));
try {
return (factory.createSocket(port, acceptCount));
} catch (BindException be) {
throw new BindException(be.getMessage() + ":" + port);
}
}
}
在处理socket的时候,会开启线程异步执行,这样可以保证多线程同时处理多个请求,避免单线程只能处理一个请求而其他请求阻塞的情况。HttpConnector采用线程池的方式来实现线程的复用,用stack维护一定数量的HttpProcessor,在需要的时候从stack中pop线程处理请求
private HttpProcessor createProcessor() {
synchronized (processors) {
if (processors.size() > 0) {
// if (debug >= 2)
// log("createProcessor: Reusing existing processor");
return ((HttpProcessor) processors.pop());
}
if ((maxProcessors > 0) && (curProcessors < maxProcessors)) {
// if (debug >= 2)
// log("createProcessor: Creating new processor");
return (newProcessor());
} else {
if (maxProcessors < 0) {
// if (debug >= 2)
// log("createProcessor: Creating new processor");
return (newProcessor());
} else {
// if (debug >= 2)
// log("createProcessor: Cannot create new processor");
return (null);
}
}
}
}
再看处理请求的类
final class HttpProcessor
implements Lifecycle, Runnable
它也是实现了Runnable接口,run方法中一直执行while循环,只要有新的socket,就会调用process处理它。
synchronized void assign(Socket socket) {
// Wait for the Processor to get the previous Socket
while (available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Store the newly available Socket and notify our thread
this.socket = socket;
available = true;
notifyAll();
if ((debug >= 1) && (socket != null))
log(" An incoming request is being assigned");
}
public void run() {
// Process requests until we receive a shutdown signal
while (!stopped) {
// Wait for the next socket to be assigned
Socket socket = await();
if (socket == null)
continue;
// Process the request from this socket
try {
process(socket);
} catch (Throwable t) {
log("process.invoke", t);
}
// Finish up this request
connector.recycle(this);
}
// Tell threadStop() we have shut ourselves down successfully
synchronized (threadSync) {
threadSync.notifyAll();
}
}
当assign方法被调用,线程被唤醒,run方法中while循环执行,处理socket请求,当一个socket被处理完,调用recycle归还该线程至线程池。
void recycle(HttpProcessor processor) {
// if (debug >= 2)
// log("recycle: Recycling processor " + processor);
processors.push(processor);
}
可以看到,调用了Stack的push方法。
总结:tomcat4基于阻塞io的方法,使用基于Stack数据结构来维护线程池,可以开启多个线程异步同时处理多个并发请求。
2.tomcat6
tomcat6开始支持nio了,可以通过改conf/server.xml配置文件开启
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol"
connectionTimeout="20000"
redirectPort="8443" />
相关类在org.apache.catalina.tribes.transport包下。
默认是bio模式,实现方式虽然和tomcat4不一样,但是核心内容大差不差。相关类换成了
//ServerSocket启动,监听
public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback
//具体的处理socket类
public class BioReplicationThread extends WorkerThread
线程池的实现也和之前不同,维护了idle,used两个LinkedList,WorkerThread就在两个集合中来回切换。
package org.apache.catalina.tribes.transport;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
* @author not attributable
* @version 1.0
*/
public class ThreadPool
{
/**
* A very simple thread pool class. The pool size is set at
* construction time and remains fixed. Threads are cycled
* through a FIFO idle queue.
*/
List idle = new LinkedList();
List used = new LinkedList();
Object mutex = new Object();
boolean running = true;
private static int counter = 1;
private int maxThreads;
private int minThreads;
private ThreadCreator creator = null;
private static synchronized int inc() {
return counter++;
}
public ThreadPool (int maxThreads, int minThreads, ThreadCreator creator) throws Exception {
// fill up the pool with worker threads
this.maxThreads = maxThreads;
this.minThreads = minThreads;
this.creator = creator;
//for (int i = 0; i < minThreads; i++) {
for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
WorkerThread thread = creator.getWorkerThread();
setupThread(thread);
idle.add (thread);
}
}
protected void setupThread(WorkerThread thread) {
synchronized (thread) {
thread.setPool(this);
thread.setName(thread.getClass().getName() + "[" + inc() + "]");
thread.setDaemon(true);
thread.setPriority(Thread.MAX_PRIORITY);
thread.start();
try {thread.wait(500); }catch ( InterruptedException x ) {}
}
}
/**
* Find an idle worker thread, if any. Could return null.
*/
public WorkerThread getWorker()
{
WorkerThread worker = null;
synchronized (mutex) {
while ( worker == null && running ) {
if (idle.size() > 0) {
try {
worker = (WorkerThread) idle.remove(0);
} catch (java.util.NoSuchElementException x) {
//this means that there are no available workers
worker = null;
}
} else if ( used.size() < this.maxThreads && creator != null) {
worker = creator.getWorkerThread();
setupThread(worker);
} else {
try { mutex.wait(); } catch ( InterruptedException x ) {Thread.currentThread().interrupted();}
}
}//while
if ( worker != null ) used.add(worker);
}
return (worker);
}
public int available() {
return idle.size();
}
/**
* Called by the worker thread to return itself to the
* idle pool.
*/
public void returnWorker (WorkerThread worker) {
if ( running ) {
synchronized (mutex) {
used.remove(worker);
//if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
else {
worker.setDoRun(false);
synchronized (worker){worker.notify();}
}
mutex.notify();
}
}else {
worker.setDoRun(false);
synchronized (worker){worker.notify();}
}
}
public int getMaxThreads() {
return maxThreads;
}
public int getMinThreads() {
return minThreads;
}
public void stop() {
running = false;
synchronized (mutex) {
Iterator i = idle.iterator();
while ( i.hasNext() ) {
WorkerThread worker = (WorkerThread)i.next();
returnWorker(worker);
i.remove();
}
}
}
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
}
public void setMinThreads(int minThreads) {
this.minThreads = minThreads;
}
public ThreadCreator getThreadCreator() {
return this.creator;
}
public static interface ThreadCreator {
public WorkerThread getWorkerThread();
}
}
在开启了nio模式后,tomcat6就会基于nio的方式启动,工作。
nio主线程启动,监听主要看这个类
public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback
看关键部分代码
public void start() throws IOException {
try {
// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
} catch (Exception x) {
log.fatal("ThreadPool can initilzed. Listener not started", x);
if ( x instanceof IOException ) throw (IOException)x;
else throw new IOException(x.getMessage());
}
try {
getBind();
bind();
Thread t = new Thread(this, "NioReceiver");
t.setDaemon(true);
t.start();
} catch (Exception x) {
log.fatal("Unable to start cluster receiver", x);
if ( x instanceof IOException ) throw (IOException)x;
else throw new IOException(x.getMessage());
}
}
protected void bind() throws IOException {
// allocate an unbound server socket channel
serverChannel = ServerSocketChannel.open();
// Get the associated ServerSocket to bind it with
ServerSocket serverSocket = serverChannel.socket();
// create a new Selector for use below
selector = Selector.open();
// set the port the server channel will listen to
//serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
bind(serverSocket,getTcpListenPort(),getAutoBind());
// set non-blocking mode for the listening socket
serverChannel.configureBlocking(false);
// register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
protected void listen() throws Exception {
if (doListen()) {
log.warn("ServerSocketChannel already started");
return;
}
setListen(true);
while (doListen() && selector != null) {
// this may block for a long time, upon return the
// selected set contains keys of the ready channels
try {
events();
socketTimeouts();
int n = selector.select(getTcpSelectorTimeout());
if (n == 0) {
//there is a good chance that we got here
//because the TcpReplicationThread called
//selector wakeup().
//if that happens, we must ensure that that
//thread has enough time to call interestOps
// synchronized (interestOpsMutex) {
//if we got the lock, means there are no
//keys trying to register for the
//interestOps method
// }
continue; // nothing to do
}
// get an iterator over the set of selected keys
Iterator it = selector.selectedKeys().iterator();
// look at each key in the selected set
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.socket().setReceiveBufferSize(getRxBufSize());
channel.socket().setSendBufferSize(getTxBufSize());
channel.socket().setTcpNoDelay(getTcpNoDelay());
channel.socket().setKeepAlive(getSoKeepAlive());
channel.socket().setOOBInline(getOoBInline());
channel.socket().setReuseAddress(getSoReuseAddress());
channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
channel.socket().setTrafficClass(getSoTrafficClass());
channel.socket().setSoTimeout(getTimeout());
Object attach = new ObjectReader(channel);
registerChannel(selector,
channel,
SelectionKey.OP_READ,
attach);
}
// is there data to read on this channel?
if (key.isReadable()) {
readDataFromSocket(key);
} else {
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
}
// remove key from selected set, it's been handled
it.remove();
}
} catch (java.nio.channels.ClosedSelectorException cse) {
// ignore is normal at shutdown or stop listen socket
} catch (CancelledKeyException nx) {
log.warn("Replication client disconnected, error when polling key. Ignoring client.");
} catch (Throwable x) {
try {
log.error("Unable to process request in NioReceiver", x);
}catch ( Throwable tx ) {
//in case an out of memory error, will affect the logging framework as well
tx.printStackTrace();
}
}
}
serverChannel.close();
if (selector != null)
selector.close();
}
protected void registerChannel(Selector selector,
SelectableChannel channel,
int ops,
Object attach) throws Exception {
if (channel == null)return; // could happen
// set the new channel non-blocking
channel.configureBlocking(false);
// register it with the selector
channel.register(selector, ops, attach);
}
public void run() {
try {
listen();
} catch (Exception x) {
log.error("Unable to run replication listener.", x);
}
}
protected void readDataFromSocket(SelectionKey key) throws Exception {
NioReplicationThread worker = (NioReplicationThread) getPool().getWorker();
if (worker == null) {
// No threads available, do nothing, the selection
// loop will keep calling this method until a
// thread becomes available, the thread pool itself has a waiting mechanism
// so we will not wait here.
if (log.isDebugEnabled())
log.debug("No TcpReplicationThread available");
} else {
// invoking this wakes up the worker thread then returns
worker.serviceChannel(key);
}
}
线程被启动,run执行,ServerSocketChannel和Selector被创建,绑定端口并且注册accept事件。while循环,调用selector的select方法。并且设定阻塞事件,这样既不会在没有请求需要处理的时候线程一直阻塞,也不会不停loop一直占用cpu。当请求到达,就会有acceptable的SelectionKey,主线程会循环处理完这些SelectionKey。获取SocketChannel并将其注册到selector,注册read事件。待后续select被调用会取出这些readable的SelectionKey。调用readDataFromSocket方法,从线程池获取worker来处理请求。
再看NioReplicationThread的实现
public class NioReplicationThread extends WorkerThread {
private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationThread.class );
private ByteBuffer buffer = null;
private SelectionKey key;
private int rxBufSize;
private NioReceiver receiver;
public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
{
super(callback);
this.receiver = receiver;
}
// loop forever waiting for work to do
public synchronized void run() {
this.notify();
if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
buffer = ByteBuffer.allocateDirect(getRxBufSize());
}else {
buffer = ByteBuffer.allocate (getRxBufSize());
}
while (isDoRun()) {
try {
// sleep and release object lock
this.wait();
} catch (InterruptedException e) {
if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e);
// clear interrupt status
Thread.interrupted();
}
if (key == null) {
continue; // just in case
}
if ( log.isTraceEnabled() )
log.trace("Servicing key:"+key);
try {
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader == null ) {
if ( log.isTraceEnabled() )
log.trace("No object reader, cancelling:"+key);
cancelKey(key);
} else {
if ( log.isTraceEnabled() )
log.trace("Draining channel:"+key);
drainChannel(key, reader);
}
} catch (Exception e) {
//this is common, since the sockets on the other
//end expire after a certain time.
if ( e instanceof CancelledKeyException ) {
//do nothing
} else if ( e instanceof IOException ) {
//dont spew out stack traces for IO exceptions unless debug is enabled.
if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
} else if ( log.isErrorEnabled() ) {
//this is a real error, log it.
log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
}
cancelKey(key);
} finally {
}
key = null;
// done, ready for more, return to pool
getPool().returnWorker (this);
}
}
/**
* Called to initiate a unit of work by this worker thread
* on the provided SelectionKey object. This method is
* synchronized, as is the run() method, so only one key
* can be serviced at a given time.
* Before waking the worker thread, and before returning
* to the main selection loop, this key's interest set is
* updated to remove OP_READ. This will cause the selector
* to ignore read-readiness for this channel while the
* worker thread is servicing it.
*/
public synchronized void serviceChannel (SelectionKey key) {
if ( log.isTraceEnabled() )
log.trace("About to service key:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
this.key = key;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
this.notify(); // awaken the thread
}
/**
* The actual code which drains the channel associated with
* the given key. This method assumes the key has been
* modified prior to invocation to turn off selection
* interest in OP_READ. When this method completes it
* re-enables OP_READ and calls wakeup() on the selector
* so the selector will resume watching this channel.
*/
protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception {
reader.setLastAccess(System.currentTimeMillis());
reader.access();
SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear(); // make buffer empty
// loop while data available, channel is non-blocking
while ((count = channel.read (buffer)) > 0) {
buffer.flip(); // make buffer readable
if ( buffer.hasArray() )
reader.append(buffer.array(),0,count,false);
else
reader.append(buffer,count,false);
buffer.clear(); // make buffer empty
//do we have at least one package?
if ( reader.hasPackage() ) break;
}
int pkgcnt = reader.count();
if (count < 0 && pkgcnt == 0 ) {
//end of stream, and no more packages to process
remoteEof(key);
return;
}
ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks
for ( int i=0; i<msgs.length; i++ ) {
/**
* Use send ack here if you want to ack the request to the remote
* server before completing the request
* This is considered an asynchronized request
*/
if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
try {
if ( Logs.MESSAGES.isTraceEnabled() ) {
try {
Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
}catch ( Throwable t ) {}
}
//process the message
getCallback().messageDataReceived(msgs[i]);
/**
* Use send ack here if you want the request to complete on this
* server before sending the ack to the remote server
* This is considered a synchronized request
*/
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
}catch ( RemoteProcessException e ) {
if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
}catch ( Exception e ) {
log.error("Processing of cluster message failed.",e);
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
}
if ( getUseBufferPool() ) {
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
msgs[i].setMessage(null);
}
}
if (count < 0) {
remoteEof(key);
return;
}
}
private void remoteEof(SelectionKey key) {
// close channel on EOF, invalidates the key
if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
cancelKey(key);
}
protected void registerForRead(final SelectionKey key, ObjectReader reader) {
if ( log.isTraceEnabled() )
log.trace("Adding key for read event:"+key);
reader.finish();
//register our OP_READ interest
Runnable r = new Runnable() {
public void run() {
try {
if (key.isValid()) {
// cycle the selector so this key is active again
key.selector().wakeup();
// resume interest in OP_READ, OP_WRITE
int resumeOps = key.interestOps() | SelectionKey.OP_READ;
key.interestOps(resumeOps);
if ( log.isTraceEnabled() )
log.trace("Registering key for read:"+key);
}
} catch (CancelledKeyException ckx ) {
NioReceiver.cancelledKey(key);
if ( log.isTraceEnabled() )
log.trace("CKX Cancelling key:"+key);
} catch (Exception x) {
log.error("Error registering key for read:"+key,x);
}
}
};
receiver.addEvent(r);
}
private void cancelKey(final SelectionKey key) {
if ( log.isTraceEnabled() )
log.trace("Adding key for cancel event:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) {
reader.setCancelled(true);
reader.finish();
}
Runnable cx = new Runnable() {
public void run() {
if ( log.isTraceEnabled() )
log.trace("Cancelling key:"+key);
NioReceiver.cancelledKey(key);
}
};
receiver.addEvent(cx);
}
/**
* send a reply-acknowledgement (6,2,3)
* @param key
* @param channel
*/
protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
try {
ByteBuffer buf = ByteBuffer.wrap(command);
int total = 0;
while ( total < command.length ) {
total += channel.write(buf);
}
if (log.isTraceEnabled()) {
log.trace("ACK sent to " + channel.socket().getPort());
}
} catch ( IOException x ) {
log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
}
}
public void setRxBufSize(int rxBufSize) {
this.rxBufSize = rxBufSize;
}
public int getRxBufSize() {
return rxBufSize;
}
}
基本上就是获取SocketChannel并处理,和bio中处理socket逻辑差不多。
在这里tomcat是将自己封装的ObjectReader绑定到SelectionKey上,来分步处理这个有状态的ObjectReader,ObjectReader中维护了XByteBuffer,可以从维护的pool中获取XByteBuffer而不是直接创建。这一点和WorkThread中维护ByteBuffer一样,不用频繁的创建,减小内存开销。tomcat的处理过程和《Java NIO》这本书中的例子大体一致,只是tomcat的处理更加完善。
分享到:
相关推荐
为了处理与外部世界的交互,Java提供了三种不同的I/O模型:BIO( Blocking I/O)、NIO(Non-blocking I/O)和AIO(Asynchronous I/O)。这些模型各有优缺点,适用于不同场景。下面我们将深入探讨这三种I/O模型,并...
Java作为一门广泛使用的开发语言,提供了多种I/O(Input/Output)通信模型,包括传统的阻塞I/O(BIO)、非阻塞I/O(NIO)以及异步I/O(AIO)。这些通信模型在不同的场景下有着各自的优势,理解和掌握它们对于优化...
在Java中,Socket通信涉及三种不同的模型:BIO(Blocking I/O)、NIO(Non-blocking I/O)和Netty,这些都是实现高并发、高性能网络服务的重要手段。 **1. Socket基础** Socket,通常被称为套接字,是网络通信的...
【标题】:“手写 Tomcat NIO” 在深入探讨手写Tomcat NIO之前,我们首先需要理解NIO(Non-blocking I/O)的概念。NIO是Java提供的一个用于替代传统I/O模型(即BIO,Blocking I/O)的库。在BIO中,每个连接都需要一...
全面理解 Java 网络编程 - BIO、NIO、AIO 本课程旨在帮助学生全面理解 Java 网络编程中的 BIO、NIO、AIO 三剑客,掌握 RPC 编程的基础知识,并结合实战项目巩固所学。 一、网络编程三剑客 - BIO、NIO、AIO BIO...
在Java中,Socket主要分为两种模式:BIO(Blocking I/O)和NIO(Non-blocking I/O)。这两个模式分别有不同的应用场景和优缺点。 **一、Java Socket BIO** BIO,即阻塞I/O模型,是Java最初提供的网络通信方式。在...
Java BIO NIO AIO Java BIO、NIO、AIO是 Java 中的三种 I/O 模式,每种模式都有其特点和应用场景。下面对每种模式进行详细解释。 Java BIO Java BIO( Blocking I/O)是一种同步阻塞式的 I/O 模式,即服务器实现...
本文将深入探讨BIO( Blocking I/O)、NIO(Non-blocking I/O)、AIO(Asynchronous I/O)以及Netty框架中的线程模型,并与TCP网络协议相结合,为您提供全网最全面的解析。 首先,让我们从基础开始,了解这些I/O...
读书笔记:java网络编程BIONIO, AIO 源码示例
这里我们主要探讨三种不同的I/O模型:BIO( Blocking I/O)、NIO(Non-blocking I/O)和AIO(Asynchronous I/O)。这三种模型各有特点,适用于不同的场景。 BIO(阻塞I/O)是Java早期的标准I/O模型,它基于流...
手写BIO 线程池 NIO 的maven quickstart项目 csdn博客地址:https://blog.csdn.net/qq_36963950/article/details/106462191
本文将深入探讨Java中的三种IO模型:传统IO(BIO)、非阻塞IO(NIO)以及反应器模式(Reactor),并结合提供的压缩包文件中的示例代码进行解析。 一、传统IO(BIO) 传统的Java IO基于流(Stream)进行数据传输,它...
对java io总结时编写的测试代码,包括BIO,NIO,AIO的实现,Java io操作是编程人员经常使用到的,以前只是使用没有对这三种IO做系统的了解,本文将对这三种IO作详细的介绍并附有测试完整代码
此版本在Tomcat7.0.69 的基础上...2、 启用nio 和 线程池配置 (server.xml) 3、修改了context.xml ,添加redis 支持(注意redis 连接密码) 4、在Root目录添加了test.jsp 用于查看多tomcat集群时sessionId是否一直
### BIO、NIO、AIO、Netty 面试题解析 #### 1. Java IO 基础概述 Java中的I/O操作是通过流(Stream)来实现的,所有的数据都通过流的方式被串行化处理。串行化的含义在于数据必须按顺序输入输出。Java中的IO操作...
Jetty、Tomcat和Mina都是Java领域中著名的Web服务器和应用服务器,它们在NIO架构上有着相似的设计模式。本文将从这三个框架中提炼出NIO构架网络服务器的经典模式,并逐一解析它们的核心机制。 首先,Jetty的NIO实现...
- **BIO**:适用于简单的服务器,如早期的Tomcat服务器,以及需要低延迟、高响应的单线程或少量连接的应用。 - **NIO**:常用于需要处理大量并发连接的服务器,如高并发的Web服务器、聊天服务器等。 - **AIO**:适合...
SocketIO-BIO-NIO-AIO.zip是一个压缩包文件,它包含了一个关于Java中三种不同的I/O模型——BIO( Blocking I/O)、NIO(Non-blocking I/O)和AIO(Asynchronous I/O)的深入讲解。这些I/O模型是Java进行网络编程时的...
3-5Tomcat响应请求源码与nio处理请求源码实现.mp4