`
xsh5324
  • 浏览: 71381 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Netty源码分析 之 NioServerSocketChannelFactory

    博客分类:
  • java
阅读更多

NioServerSocketChannelFactory 是ChannelFactory的实现接口之一,负责创建并管理服务端Channel。

先来看下它的周边类图是怎样


 

 

 

下面针对核心的类做下讲解,然后就开始跟踪源码一步一步分析

NioServerSocketPipelineSink

接受和处理终端的下游ChannelEvent事件.

 

AbstractNioBossPool 

    处理socket accept、connect的工作线程池抽象类,提供了创建工作线程的模板方法newBoss 

 

NioServerBossPool 

    继续自AbstractNioBossPool重写了newBoss方法创建Boss工作线程

 

AbstractNioSelector 

    它间接的实现了Runnable接口,负责执行内部的工作队列、处理Selector事件(抽象方法process)、关闭相关的Channel(抽象方法close)、关闭内部了Selector;值得注意的是这个类的register抽象方法,它的作用是将一个新的Channel注册到自身这个工作线程中完成Channel的绑定或连接工作,这个方法由sink调用。

 

NioServerBoss 

    继承自AbstractNioSelector,完成如下工作

1、把ServerSocketChannel绑定到SocketAddress并将Selector实例注册到ServerSocketChannel监听其SelectionKey.OP_ACCEPT事件(其内部类干的事RegisterTask)

2、处理OP_ACCEPT事件,将新Accpet的Channel注册到一个工作线程中去(workPool.nextWorker().register());(网络模型图中acceptor到subRecator的过程)

 

NioWorker

与NioServerBoss类似,继承自AbstractNioSelector,完成如下工作

1、处理OP_READ、OP_WRITE事件

2、将Selector实例注册到SocketChannel,并监听OP_READ、OP_WRITE事件(其内部类干的事RegisterTask) 

 

下面是一段是简单的服务端代码,我们顺着代码中的关键类分析 

public class ServerTest {
	private final ChannelFactory channelFactory = new NioServerSocketChannelFactory(
			Executors.newCachedThreadPool(),
			Executors.newCachedThreadPool());

	private final ServerBootstrap bootstrap;

	public ServerTest() {
		this.bootstrap = new ServerBootstrap(channelFactory);
		this.bootstrap.setOption("child.tcpNoDelay", true);
		this.bootstrap.setOption("child.keepAlive", true);
		
		this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				ChannelPipeline pipeline=Channels.pipeline();
//				pipeline.addFirst("encoder", new Encoder());
//				pipeline.addFirst("decoder", new encoder());
//				...
				return pipeline;
			}
		});
	}
	public void service(){
		this.bootstrap.bind(new InetSocketAddress(9008));
	}
	
	public static void main(String[] args){
		new ServerTest().service();
	}
}
 NioServerSocketChannelFactory

  这个类的代码非常简洁就4个实例变量和几个方法  

private final WorkerPool<NioWorker> workerPool;
private final NioServerSocketPipelineSink sink;
private final BossPool<NioServerBoss> bossPool;
private boolean releasePools;//标记当前ChannelFactory是否已还没有关闭或者说工作线程还没有被shutdown
  这个类有个BUG,在默认的构造方法中将releasePools置为true了,但是在其它构造方法中却没有,这意味着在它被关闭的时候可能无法释放某些扩展资源

  

public NioServerSocketChannelFactory() {
        this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
        releasePools = true;
}

public NioServerSocketChannelFactory(
            Executor bossExecutor, Executor workerExecutor) {
        this(bossExecutor, workerExecutor, getMaxThreads(workerExecutor));//调用此构造方法时,getMaxThreads控制着工作线程数小于等于CPU数*2
}

public NioServerSocketChannelFactory(
            Executor bossExecutor, Executor workerExecutor,
            int workerCount) {
        this(bossExecutor, 1, workerExecutor, workerCount);//默认情况下boss线程的数量为1
    }
public NioServerSocketChannelFactory(
            Executor bossExecutor, int bossCount, Executor workerExecutor,
            int workerCount) {
        this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount));
}

public NioServerSocketChannelFactory(
            Executor bossExecutor, WorkerPool<NioWorker> workerPool) {
        this(bossExecutor, 1 , workerPool);
}

public NioServerSocketChannelFactory(
            Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
        this(new NioServerBossPool(bossExecutor, bossCount, null), workerPool);
}

public NioServerSocketChannelFactory(BossPool<NioServerBoss> bossPool, WorkerPool<NioWorker> workerPool) {
        if (bossPool == null) {
            throw new NullPointerException("bossExecutor");
        }
        if (workerPool == null) {
            throw new NullPointerException("workerPool");
        }
        this.bossPool = bossPool;
        this.workerPool = workerPool;
        sink = new NioServerSocketPipelineSink();
 }
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
        return new NioServerSocketChannel(this, pipeline, sink, bossPool.nextBoss(), workerPool);//注意nextBoss方法,它选择一个工作线程来处理ServerSocketChannel的读写事件
}
 这个类主要依赖于这4个类的实例NioWorkerPool、NioServerBossPool、NioServerSocketPipelineSink、NioServerSocketChannel,关于它们的作用前面已经有说明了,在此不讲了。先来看下NioWorkerPool吧。

 

NioWorkerPool这个类也很简单,其内部主要维护了一个线程数组,它继承自AbstractNioWorkerPool并重写了createWorker方法 

protected NioWorker createWorker(Executor executor) {
return new NioWorker(executor, determiner);
}
 再来看下它的父类AbstractNioWorkerPool

 这个类也非常简单,它只做2件事——负载均衡和工作线程的创建

public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
        implements WorkerPool<E>, ExternalResourceReleasable {

    private final AbstractNioWorker[] workers;
    private final AtomicInteger workerIndex = new AtomicInteger();
    private final Executor workerExecutor;
private volatile boolean initDone;
//此处省略了部分代码
	AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean autoInit) {
        if (workerExecutor == null) {
            throw new NullPointerException("workerExecutor");
        }
        if (workerCount <= 0) {
            throw new IllegalArgumentException(
                    "workerCount (" + workerCount + ") " + "must be a positive integer.");
        }
        workers = new AbstractNioWorker[workerCount];
        this.workerExecutor = workerExecutor;
        if (autoInit) {
            init();
        }
}
protected void init() {
        if (initDone) {
            throw new IllegalStateException("Init was done before");
        }
        initDone = true;

        for (int i = 0; i < workers.length; i++) {
            workers[i] = newWorker(workerExecutor);
    }
protected E newWorker(Executor executor) {
        return createWorker(executor);//调用抽象方法,把具体的工作线程交由子类完成
    }
public E nextWorker() {
        return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];//负载均衡
    }
 下面来具体的工作线程NioWorker

 

NioWorker继承自AbstractNioWorker,AbstractNioWorker继承自AbstractNioSelector,还是先从最顶层父类AbstractNioSelector看起吧

AbstractNioSelector实现了NioSelector接口,而NioSelector接口继承自Runnable接口AbstractNioSelector实现run方法,Netty把这个线程叫做IO线程

AbstractNioSelector类的构造方法会调用openSelector方法来创建一个Selector,并执行当前线程(这段代码很简单,我们就跳过直接看run方法了)

 

public void run() {
        thread = Thread.currentThread();//将当前线程保存起来,用来区分调用该类其它某些方法的线程是否为IO线程,请参看它的isIoThread方法。其它线程会调用此方法来区分自己是不是IO线程,如果是,则直接调用此类的方法完成,否则把相关的操作添加到IO线程的工作队列中。这样做的好处是,避免IO线程与其它线程的同步操作,带来了效率的提升。

        int selectReturnsImmediately = 0;
        Selector selector = this.selector; 
        if (selector == null) {
            return;
        }
        // use 80% of the timeout for measure
        final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
        boolean wakenupFromLoop = false;
        for (;;) {
            wakenUp.set(false);

            try {
                long beforeSelect = System.nanoTime();
                int selected = select(selector);
                if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {//看代码中的解释说jdk的epoll可能存在BUG,这个if用来处理这类错误的.看下面的代码冒似是说Channel被关闭后没有从Selector中移除相关的键而导致过多的垃圾键,当这类键超过1024个时,Selector将被重键.
                    long timeBlocked = System.nanoTime() - beforeSelect;

                    if (timeBlocked < minSelectTimeout) {
                        boolean notConnected = false;
                        for (SelectionKey key: selector.keys()) {
                            SelectableChannel ch = key.channel();
                            try {
                                if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() ||
                                        ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
                                    notConnected = true;
                                    key.cancel();
                                }
                            } catch (CancelledKeyException e) {
                            }
                        }
                        if (notConnected) {
                            selectReturnsImmediately = 0;
                        } else {
                            selectReturnsImmediately ++;
                        }
                    } else {
                        selectReturnsImmediately = 0;
                    }

                    if (selectReturnsImmediately == 1024) {
                        rebuildSelector();
                        selector = this.selector;
                        selectReturnsImmediately = 0;
                        wakenupFromLoop = false;
                        continue;
                    }
                } else {
                    selectReturnsImmediately = 0;
                }
		//代码中这里的解释很清楚,请看代码
                if (wakenUp.get()) {
                    wakenupFromLoop = true;
                    selector.wakeup();
                } else {
                    wakenupFromLoop = false;
                }

                cancelledKeys = 0;
                processTaskQueue();//执行队列中的任务
                selector = this.selector; //因为rebuildSelector()方法是一个公共的方法,也许在上一步的processTaskQueue方法中这个方法被调了

                if (shutdown) {//如果当前IO线程标记为关闭,则执行关闭逻辑
                    this.selector = null;
                    processTaskQueue();//处理任务队列

                    for (SelectionKey k: selector.keys()) {
                        close(k);//关闭相关的Channel,这里会执行Pipeline的Upstream直到sink中才关闭Channel
                    }
                    try {
                        selector.close();
                    } catch (IOException e) {
                        logger.warn(
                                "Failed to close a selector.", e);
                    }
                    shutdownLatch.countDown();//shutdown方法必需等待相关的释放操作完成才能返回,此处唤醒调用shutdown方法的线程
                    break;//工作线程退出
                } else {
                    process(selector);//执行process抽象方法,SelectionKey的处理行为交给子类去完成
                }
            } catch (Throwable t) {
                logger.warn(
                        "Unexpected exception in the selector loop.", t);
                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }
        }
}
 这个类还有两个方法值得解释一下,看完这两个方法再看process方法 

 

//这个方法在Boss线程中被调用在Channel被bind到本地端口(Server)或成功连接到服务器(Client端)的时候被调用
public void register(Channel channel, ChannelFuture future) {
        Runnable task = createRegisterTask(channel, future);//具体的注册操作交由子类去完成,透露一下 这个task面会有socketChannel.register方法的调用
        registerTask(task);
    }

    protected final void registerTask(Runnable task) {
        taskQueue.add(task);

        Selector selector = this.selector;

        if (selector != null) {
            if (wakenUp.compareAndSet(false, true)) {//唤醒IO线程,这个CAS操作是防止Selector被过早的唤醒的情况下导致信号丢失.请参看AbstractNioSelector类源码282行上面的解释
                selector.wakeup();
            }
        } else {
            if (taskQueue.remove(task)) {
                // the selector was null this means the Worker has already been shutdown.
                throw new RejectedExecutionException("Worker has already been shutdown");
            }
        }
    }
 子类AbstractNioWorker重写了process方法来处理SelectionKey 

 

protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey k = i.next();
            i.remove();
            try {
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                    if (!read(k)) {//调用抽象方法读取数据,如果此方法返回false则表示连接已经被关闭,无需再做处理
                        continue;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    writeFromSelectorLoop(k);//如果该Selector中触发了写事件则写出数据。用户代码调用channel.write(obj)方法后数据并不是马上被写出,而是被提交Channel的writeBufferQueue队列中(sink放的)等待IO线程去完成写出,写操作实际上是由write0这个方法完成的.
                }
            } catch (CancelledKeyException e) {
                close(k);//出形异常关闭Channel并释放相关资源
            }

            if (cleanUpCancelledKeys()) {
                break; // break the loop to avoid ConcurrentModificationException
            }
        }
    }
 先看该类的其它几个重要的方法再看NioWorker类的方法 

 

/**
 * 这个方法是由sink调用的,sink将要写出的数据offer到writeBufferQueue中然后调用该方法(上面解释process方法中有提到)
*/
void writeFromUserCode(final AbstractNioChannel<?> channel) {
        if (!channel.isConnected()) {
	//如果Channel已经关闭则没必要继续写了
            cleanUpWriteBuffer(channel);
            return;
        }

        if (scheduleWriteIfNecessary(channel)) {
	//这是个抽象方法,实现在NioWorker类中,这个方法中判断当前线程是否为IO线程,如果是则返回false,否则向IO线程的 任务队列添加一个写出任务并返回true
            return;
        }

	//到这里,就能确定当前线程是否是IO线程
        if (channel.writeSuspended) {//如果写出通道被挂起,则返回
            return;
        }

        if (channel.inWriteNowLoop) {//当前正在写数据,则返回
            return;
        }

        write0(channel);//写出数据
    }
protected void write0(AbstractNioChannel<?> channel) {
        boolean open = true;//标记channel是否处理打开状态
        boolean addOpWrite = false;//是否向此Channel关联的SelectionKey的interestOps添加SelectionKey.OP_WRITE,这会导致前面的process方法中这句代码“if ((readyOps & SelectionKey.OP_WRITE) != 0) ”返回true,然后再次触发写操作
        boolean removeOpWrite = false;//与上面对应
        boolean iothread = isIoThread(channel);//当前线程是否是该Channel的工作线程

        long writtenBytes = 0;

        final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
        final WritableByteChannel ch = channel.channel;
        final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
        final int writeSpinCount = channel.getConfig().getWriteSpinCount();//最多尝试写的次数
        List<Throwable> causes = null;

        synchronized (channel.writeLock) {
            channel.inWriteNowLoop = true;//上面提到过这个writeFromUserCode方法
            for (;;) {

                MessageEvent evt = channel.currentWriteEvent;
                SendBuffer buf = null;
                ChannelFuture future = null;
                try {
                    if (evt == null) {
                        if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
//如果没有要写的东西,则打个标记,在后面向此Channel关联的SelectionKey的interestOps移除SelectionKey.OP_WRITE
                            removeOpWrite = true;
                            channel.writeSuspended = false;
                            break;
                        }
                        future = evt.getFuture();

                        channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
                    } else {
                        future = evt.getFuture();
                        buf = channel.currentWriteBuffer;
                    }

                    long localWrittenBytes = 0;
			//下面的代码尝试发送数据,直到有数据被写出,否则尽可能的尝试
                    for (int i = writeSpinCount; i > 0; i --) {
                        localWrittenBytes = buf.transferTo(ch);
                        if (localWrittenBytes != 0) {
                            writtenBytes += localWrittenBytes;
                            break;
                        }
                        if (buf.finished()) {
                            break;
                        }
                    }

                    if (buf.finished()) {
                        //如果当前消息已经写完了释放资源,并准备下一条消息的写出
                        buf.release();
                        channel.currentWriteEvent = null;
                        channel.currentWriteBuffer = null;
                        // Mark the event object for garbage collection.
                        //noinspection UnusedAssignment
                        evt = null;
                        buf = null;
                        future.setSuccess();
                    } else {
                        //没有全部写完,则打上addOpWrite标记,并设置写出进度
                        addOpWrite = true;
                        channel.writeSuspended = true;

                        if (localWrittenBytes > 0) {
                            // Notify progress listeners if necessary.
                            future.setProgress(
                                    localWrittenBytes,
                                    buf.writtenBytes(), buf.totalBytes());
                        }
                        break;
                    }
                } catch (AsynchronousCloseException e) {
                    // Doesn't need a user attention - ignore.
                } catch (Throwable t) {
                    if (buf != null) {
                        buf.release();
                    }
                    channel.currentWriteEvent = null;
                    channel.currentWriteBuffer = null;
                    // Mark the event object for garbage collection.
                    //noinspection UnusedAssignment
                    buf = null;
                    //noinspection UnusedAssignment
                    evt = null;
                    if (future != null) {
                        future.setFailure(t);//通知消息写出失败
                    }
                    if (iothread) {
                        // An exception was thrown from within a write in the iothread. We store a reference to it
                        // in a list for now and notify the handlers in the chain after the writeLock was released
                        // to prevent possible deadlock.
                        // See #1310
			//如果异常在IO线程中被抛出,则存储它的一个引用,待锁释放后触发pipeline的UpStream以防止死锁
                        if (causes == null) {
                            causes = new ArrayList<Throwable>(1);
                        }
                        causes.add(t);
                    } else {
                        fireExceptionCaughtLater(channel, t);
                    }
                    if (t instanceof IOException) {
                        // close must be handled from outside the write lock to fix a possible deadlock
                        // which can happen when MemoryAwareThreadPoolExecutor is used and the limit is exceed
                        // and a close is triggered while the lock is hold. This is because the close(..)
                        // may try to submit a task to handle it via the ExecutorHandler which then deadlocks.
                        // See #1310
                        open = false;
                    }
                }
            }
            channel.inWriteNowLoop = false;
			// Initially, the following block was executed after releasing
            // the writeLock, but there was a race condition, and it has to be
            // executed before releasing the writeLock:
            //
            //     https://issues.jboss.org/browse/NETTY-410
            //
            //这个issues是说 之前下面这段代码是被放到同步段之外的,导致未完全写完的数据丢失的问题,所以移到同步块里面来了。
            if (open) {
                if (addOpWrite) {
                    setOpWrite(channel);
                } else if (removeOpWrite) {
                    clearOpWrite(channel);
                }
            }
        }
        if (causes != null) {
            for (Throwable cause: causes) {
                // notify about cause now as it was triggered in the write loop
                fireExceptionCaught(channel, cause);
            }
        }
        if (!open) {
            // close the channel now
            close(channel, succeededFuture(channel));
        }
        if (iothread) {
            fireWriteComplete(channel, writtenBytes);
        } else {
            fireWriteCompleteLater(channel, writtenBytes);
        }
    }
 AbstractNioSelector类把IO线程的构架搭起来, AbstractNioWorker类实现了数据的写出逻辑,只留下变化的部分交给子类去完成。现在来看来NioWorker类的几个比较重要的方法

 

protected boolean read(SelectionKey k) {
        final SocketChannel ch = (SocketChannel) k.channel();
        final NioSocketChannel channel = (NioSocketChannel) k.attachment();

//获取缓存大小分配策略,Netty中缓存分配策略有2种,一种是自动扩展的缓存分配策略,另一种是固定大小的缓存分配策略。自动扩展的缓存分配策略内部维护了一个缓存大小的int数组,数组中的值分段按一定数量递增
        final ReceiveBufferSizePredictor predictor =
            channel.getConfig().getReceiveBufferSizePredictor();
        final int predictedRecvBufSize = predictor.nextReceiveBufferSize();//获取下一个接收缓冲区大小
        final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();

        int ret = 0;
        int readBytes = 0;
        boolean failure = true;

        ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());//recvBufferPool目的是为了缓存重用
        try {
			//读取Channel中的数据,直到缓冲区满或没有可读的字节
            while ((ret = ch.read(bb)) > 0) {
                readBytes += ret;
                if (!bb.hasRemaining()) {
                    break;
                }
            }
            failure = false;
        } catch (ClosedChannelException e) {
            // Can happen, and does not need a user attention.
        } catch (Throwable t) {
            fireExceptionCaught(channel, t);
        }

        if (readBytes > 0) {
            bb.flip();

            final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
            buffer.setBytes(0, bb);
            buffer.writerIndex(readBytes);

            //向缓存分配策略报告当前读到的缓存大小,以预计算下一次的缓存大小
            predictor.previousReceiveBufferSize(readBytes);

            // 触发MessageReceive UpStream
            fireMessageReceived(channel, buffer);
        }

        if (ret < 0 || failure) {//如果代码执行到这,则表示此Channel不可用
            k.cancel(); // Some JDK implementations run into an infinite loop without this.
            close(channel, succeededFuture(channel));
            return false;
        }

        return true;
    }
//这个方法实现父类的抽象方法,返回Channel注册到IO线程的Task.这个方法一定是在Boss线程中被调用的,当boss线程接收到新的连接或一条新的连接与服务端建立,则调用这个方法,这个方法创建的Task完成将Channel注册到工作线程的Selector上
protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
        boolean server = !(channel instanceof NioClientSocketChannel);
        return new RegisterTask((NioSocketChannel) channel, future, server);
    }

    private final class RegisterTask implements Runnable {
        private final NioSocketChannel channel;
        private final ChannelFuture future;
        private final boolean server;

        RegisterTask(
                NioSocketChannel channel, ChannelFuture future, boolean server) {

            this.channel = channel;
            this.future = future;
            this.server = server;
        }

        public void run() {
            SocketAddress localAddress = channel.getLocalAddress();
            SocketAddress remoteAddress = channel.getRemoteAddress();

            if (localAddress == null || remoteAddress == null) {
                if (future != null) {
                    future.setFailure(new ClosedChannelException());
                }
                close(channel, succeededFuture(channel));
                return;
            }

            try {
                if (server) {
                    channel.channel.configureBlocking(false);
                }

                channel.channel.register(
                        selector, channel.getRawInterestOps(), channel);//注册感兴趣的事件(OP_READ),在write0方法中会添加或移除OP_WRITE(请参看上面的write0方法)

                if (future != null) {
                    channel.setConnected();
                    future.setSuccess();
                }

                if (server || !((NioClientSocketChannel) channel).boundManually) {//触发BOUND事件
                    fireChannelBound(channel, localAddress);
                }
//触发Connected事件
                fireChannelConnected(channel, remoteAddress);
            } catch (IOException e) {
                if (future != null) {
                    future.setFailure(e);
                }
                close(channel, succeededFuture(channel));
                if (!(e instanceof ClosedChannelException)) {
                    throw new ChannelException(
                            "Failed to register a socket to the selector.", e);
                }
            }
        }
    } 
 到这里为止, NioWorker工作线程(IO线程)核心路径已经基本分析完毕,现在来看NioWorker它主要是做下面这两件事。

 

1、执行内部工作队列中的任务

2、处理SelectionKey的读写事件

 

 

我们知道如果Selector不被注册到java.nio.channels.SelectableChannel上,它是没有任何作用的。那什么时候将被注册到SelectableChannel上呢(上面提到过RegisterTask的职责)?Boss线程,我们来看下NioServerBoss这个类

NioServerBoss

该类继承自AbstractNioSelector类并且实现了process抽象方法处理SelectionKey关于AbstractNioSelector类前面已经提过了,process方法

 

protected void process(Selector selector) {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey k = i.next();
            i.remove();
            NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();

            try {
                for (;;) {
//获取新连接到服务端的的SocketChannel
                    SocketChannel acceptedSocket = channel.socket.accept();
                    if (acceptedSocket == null) {
                        break;
                    }
//取出Channel,并注册到IO线程
                    registerAcceptedChannel(channel, acceptedSocket, thread);
                }
            } catch (CancelledKeyException e) {
                // Raised by accept() when the server socket was closed.
                k.cancel();
                channel.close();
            } catch (SocketTimeoutException e) {
                // Thrown every second to get ClosedChannelException
                // raised.
            } catch (ClosedChannelException e) {
                // Closed as requested.
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to accept a connection.", t);
                }

                try {
//避免在出现异常的情况下,无线循环导致cpu资源浪费
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    // Ignore
                }
            }
        }
}
private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
                                         Thread currentThread) {
        try {
            ChannelSink sink = parent.getPipeline().getSink();
            ChannelPipeline pipeline =
                    parent.getConfig().getPipelineFactory().getPipeline();
            NioWorker worker = parent.workerPool.nextWorker();//获取一个工作线程
			//NioWorker类的register方法完成注册,之前说过的
            worker.register(new NioAcceptedSocketChannel(
                    parent.getFactory(), pipeline, parent, sink
                    , acceptedSocket,
                    worker, currentThread), null);
        } catch (Exception e) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to initialize an accepted socket.", e);
            }

            try {
                acceptedSocket.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially accepted socket.",
                            e2);
                }
            }
        }
    }
 该类同样也有实现了父类的createRegisterTask方法来创建并返回一个Task,这个Task负责监听客户端连接Channel,并把Channel注册到IO线程上.在后面讲的NioServerSocketPipelineSink类会调用执行到这个方法

 

 

void bind(final NioServerSocketChannel channel, final ChannelFuture future,
              final SocketAddress localAddress) {
        registerTask(new RegisterTask(channel, future, localAddress));//这里会调父类的registerTask方法,父类的这个方法是个模板方法,具体bind任务的创建会调用该类的createRegisterTask方法
}

protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
        return new RegisterTask((NioServerSocketChannel) channel, future, null);
    }

    private final class RegisterTask implements Runnable {
        private final NioServerSocketChannel channel;
        private final ChannelFuture future;
        private final SocketAddress localAddress;

        public RegisterTask(final NioServerSocketChannel channel, final ChannelFuture future,
                            final SocketAddress localAddress) {
            this.channel = channel;
            this.future = future;
            this.localAddress = localAddress;
        }

        public void run() {
            boolean bound = false;
            boolean registered = false;
            try {
                channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());//绑定到本地
                bound = true;

                future.setSuccess();
                fireChannelBound(channel, channel.getLocalAddress());
                channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);//将此IO线程的Selector注册到该ServerSocketChannel上

                registered = true;
            } catch (Throwable t) {
                future.setFailure(t);
                fireExceptionCaught(channel, t);
            } finally {
                if (!registered && bound) {
                    close(channel, future);
                }
            }
        }
    }
 现在知道IO线程(NioWorker)的Selector在什么时候被注册到SelectableChannel上了,那Boss线程的Selector是什么时候被注册到SelectableChannel呢?ServerBootstrap的bind方法。

 

public Channel bind(final SocketAddress localAddress) {
        ChannelFuture future = bindAsync(localAddress);

        // 等待完成
        future.awaitUninterruptibly();
        if (!future.isSuccess()) {
            future.getChannel().close().awaitUninterruptibly();
            throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
        }

        return future.getChannel();
    }

public ChannelFuture bindAsync(final SocketAddress localAddress) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        Binder binder = new Binder(localAddress);
        ChannelHandler parentHandler = getParentHandler();

        ChannelPipeline bossPipeline = pipeline();
        bossPipeline.addLast("binder", binder);//注意这个
        if (parentHandler != null) {
            bossPipeline.addLast("userHandler", parentHandler);
        }
		//newChannel实际上是调用的NioServerSocketChannelFactory.newChannel方法new NioServerSocketChannel,NioServerSocketChannel构造方法创建完ServerSocketChannel后会触发UpStream(ChannelState.OPEN事件)这个事件被ServerBootstrap类的内部类Binder处理并完成Selector与Channel的注册操作
        Channel channel = getFactory().newChannel(bossPipeline);
        final ChannelFuture bfuture = new DefaultChannelFuture(channel, false);
        binder.bindFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    bfuture.setSuccess();
                } else {
                    // Call close on bind failure
                    bfuture.getChannel().close();
                    bfuture.setFailure(future.getCause());
                }
            }
        });
        return bfuture;
    }


private final class Binder extends SimpleChannelUpstreamHandler {

        private final SocketAddress localAddress;
        private final Map<String, Object> childOptions =
            new HashMap<String, Object>();
        private final DefaultChannelFuture bindFuture = new DefaultChannelFuture(null, false);
        Binder(SocketAddress localAddress) {
            this.localAddress = localAddress;
        }

        @Override
        public void channelOpen(
                ChannelHandlerContext ctx,
                ChannelStateEvent evt) {

            try {
                evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());

                // Split options into two categories: parent and child.
                Map<String, Object> allOptions = getOptions();
                Map<String, Object> parentOptions = new HashMap<String, Object>();
                for (Entry<String, Object> e: allOptions.entrySet()) {
                    if (e.getKey().startsWith("child.")) {
                        childOptions.put(
                                e.getKey().substring(6),
                                e.getValue());
                    } else if (!"pipelineFactory".equals(e.getKey())) {
                        parentOptions.put(e.getKey(), e.getValue());
                    }
                }

                // Apply parent options.
                evt.getChannel().getConfig().setOptions(parentOptions);
            } finally {
                ctx.sendUpstream(evt);
            }
			//这里将NioServerSocketChannel绑定到本地端口,并触发Downstream(ChannelState.BOUND)事件,最终被传递sin处理(NioServerSocketPipelineSink)
            evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        bindFuture.setSuccess();
                    } else {
                        bindFuture.setFailure(future.getCause());
                    }
                }
            });
        }

  NioServerSocketPipelineSink类用来处理最底层的事件。

 

class NioServerSocketPipelineSink extends AbstractNioChannelSink {

    public void eventSunk(
            ChannelPipeline pipeline, ChannelEvent e) throws Exception {
        Channel channel = e.getChannel();
        if (channel instanceof NioServerSocketChannel) {
            handleServerSocket(e);//处理服务端Channel
        } else if (channel instanceof NioSocketChannel) {
            handleAcceptedSocket(e);//处理客户端Channel
        }
    }

    private static void handleServerSocket(ChannelEvent e) {
        if (!(e instanceof ChannelStateEvent)) {
            return;
        }

        ChannelStateEvent event = (ChannelStateEvent) e;
        NioServerSocketChannel channel =
            (NioServerSocketChannel) event.getChannel();
        ChannelFuture future = event.getFuture();
        ChannelState state = event.getState();
        Object value = event.getValue();

        switch (state) {
        case OPEN://根据一个值标记是Open还是Close,其它case也是这样
            if (Boolean.FALSE.equals(value)) {
                ((NioServerBoss) channel.boss).close(channel, future);
            }
            break;
        case BOUND:
            if (value != null) {//在ServerBootStraup内部类Binder中触发BOUND事件,传递此value的值为localAddress,所以进入这个if调用boss的bind方法(NioServerBoss类的)完成地址绑定与OP_ACCEPT的注册,请参见上面讲NioServerBoss中的bind方法
                ((NioServerBoss) channel.boss).bind(channel, future, (SocketAddress) value);
            } else {
                ((NioServerBoss) channel.boss).close(channel, future);
            }
            break;
        default:
            break;
        }
    }

    private static void handleAcceptedSocket(ChannelEvent e) {
        if (e instanceof ChannelStateEvent) {
            ChannelStateEvent event = (ChannelStateEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
            ChannelFuture future = event.getFuture();
            ChannelState state = event.getState();
            Object value = event.getValue();

            switch (state) {
            case OPEN:
                if (Boolean.FALSE.equals(value)) {
                    channel.worker.close(channel, future);
                }
                break;
            case BOUND:
            case CONNECTED:
                if (value == null) {
                    channel.worker.close(channel, future);
                }
                break;
            case INTEREST_OPS:
                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                break;
            }
        } else if (e instanceof MessageEvent) {
            MessageEvent event = (MessageEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
            boolean offered = channel.writeBufferQueue.offer(event);//将消息提交到该Channel的writeBuffer中
            assert offered;
            channel.worker.writeFromUserCode(channel);//写(不一定马上进行写入,它会把写入工作放到IO线程中去做)
        }
    }
}

   Netty的代码可以说是即漂亮又简单,了解实现原理后,再去看代码感觉很容易,发现没什么可讲的了...

 

  • 大小: 76.6 KB
分享到:
评论

相关推荐

    netty源码分析教程视频

    一个netty的入门教程以及源码分析视频,适合刚学习的人

    netty源码深入分析

    《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...

    netty源码剖析视频.zip

    《Netty源码剖析视频》课程是一份深度探讨Netty框架源码及其实战应用的资源集合。课程分为两个主要部分,旨在帮助开发者深入理解Netty的内部机制,并通过实战项目提升其在实际开发中的应用能力。 第一部分,深入浅...

    Netty源码分析总结.rar

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的...在分析源码的过程中,我们通常会关注类的设计模式、线程模型、内存管理以及性能优化等方面,这对于提升网络编程和系统架构能力大有裨益。

    netty源码分析之服务端启动全解析

    Netty是一款高性能的网络应用程序框架,它使用Java编程语言开发,主要用于网络应用程序的快速和易于开发,支持TCP和UDP...通过对Netty源码的深入分析,可以更好地理解其工作机制,对开发高性能的网络应用有极大的帮助。

    netty源码和相关中文文档

    接下来,我们谈谈 Netty 的源码分析。通过阅读 Netty 源码,我们可以深入了解其设计模式和优化策略: 1. **EventLoop(事件循环)**:Netty 使用单线程的 EventLoop 实现了事件的高效分发,减少了线程切换的开销。 ...

    【项目实战】Netty源码剖析&NIO;+Netty5各种RPC架构实战演练三部曲视频教程(未加密)

    尤其是在RPC架构领域,Netty凭借其强大的功能和灵活的设计成为了构建分布式系统的首选工具之一。希望本教程能够帮助大家更好地理解和运用Netty进行项目实战。 以上就是对Netty源码剖析及NIO与Netty5在RPC架构中的...

    Netty源码剖析+视频

    Netty源码剖析+视频

    Netty5.0架构剖析和源码解读.pdf

    本文档主要讲述了Netty5.0架构剖析和源码解读,涵盖了Netty的架构、源码分析、NIO入门等方面的知识点。 概述 JAVA 的 IO 演进是一个长期的过程,从传统的 BIO 通信到 NIO 的出现,都是为了解决通信中的问题。传统...

    netty源码解析视频

    #### 五、Netty源码分析实战案例 1. **ChannelHandlerContext与ChannelHandlerAdaptor详解**: - 分析`ChannelHandlerContext`的生命周期及其与`ChannelHandler`之间的交互方式。 - 深入理解`...

    Netty权威指南-Netty源码

    总的来说,Netty 源码分析涉及了网络编程、并发处理、事件驱动、协议编解码等多个领域,对理解 Java 高性能网络应用开发有着重要的指导意义。通过阅读源码,我们可以更深入地了解 Netty 如何实现高效的网络通信,并...

    Netty 完整依赖的jar包, 你只需要下载netty源码,再添加这些jar就可以编译通过了

    在描述中提到的"只需要下载netty源码,再添加这些jar就可以编译通过了",这意味着你需要获取Netty的源代码仓库,通常可以从GitHub等开源平台获得。源代码包含了Netty的所有模块和组件,可以让你深入了解其内部工作...

    netty源码jar包

    netty-3.3.1.Final-sources.jar src源码

    netty源码剖析视频

    视频分两部分 第1 章 : 第一部分、深入浅出Netty源码剖析。。 第2 章 : 第二部分、NIO+Netty5各种RPC架构实战演练

    Netty源码依赖包

    1. **理解底层机制**:通过分析Netty的源码依赖包,可以更深入地理解其内部的工作原理和设计模式,这对于优化网络应用性能至关重要。 2. **学习优秀实践**:Netty作为一个成熟且广泛使用的项目,其代码质量和架构...

    NIO+Netty5视频教程与Netty源码剖析视频教程

    压缩包内的文件"netty源码剖析视频教程.txt"可能是课程的详细大纲或笔记,提供了对课程内容的进一步概述,包括每个章节的重点和案例分析,是学习过程中不可或缺的参考资料。通过结合视频教程和文本资料,学习者可以...

    netty源码 4.*版本

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在深入探讨 Netty 源码之前...通过分析源码,不仅可以提升自己的技术能力,还能为解决实际问题提供灵感和参考。

    netty源码包

    netty源码包,可以本地搭建netty源码环境,学习nio模式

Global site tag (gtag.js) - Google Analytics