`
海浪儿
  • 浏览: 274104 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

netty4源码分析-bind

阅读更多

本文为原创,转载请注明出处

netty4源码分析-bind 

前一篇文章中分析了监听套接字ServerSocketChannel的创建过程,本文接着分析绑定IP和端口的过程。

 回到之前未分析完的doBind逻辑,前一篇文章已分析到dobind方法中initAndRegister方法,该方法最终触发了对regPromise 的listener的回调,Listener将bind任务加到boss线程的任务队列中

//AbstractBootstrap
private ChannelFuture AbstractBootstrap doBind(final SocketAddress localAddress) {
        final ChannelFuture regPromise = initAndRegister();
        final Channel channel = regPromise.channel();
        final ChannelPromise promise = channel.newPromise();
        if (regPromise.isDone()) {
            doBind0(regPromise, channel, localAddress, promise);
        } else {
            regPromise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doBind0(future, channel, localAddress, promise);
                }
            });
        }
        return promise;
}

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

       本文就来分析bind任务

        channel.bind(localAddress, promise)调用AbstractChannel的bind方法

//AbstractChannel
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

 pipeline.bind(localAddress, promise)调用的是DefaultChannelPipeline的方法

//DefaultChannelPipeline
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

          tail.bind(localAddress, promise)会调用DefaultChannelHandlerContext的方法

//DefaultChannelHandlerContext
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        validatePromise(promise, false);
        return findContextOutbound().invokeBind(localAddress, promise);
}
private DefaultChannelHandlerContext findContextOutbound() {
        DefaultChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!(ctx.handler() instanceof ChannelOutboundHandler));
        return ctx;
    }

       bind是一个Outbound事件,因此会按照tail->head的顺序执行所有的Outbound处理器,目前有三个处理器:tail-> ServerBootstrapAcceptor->head,但只有head是outbound处理器,所以看一下Head的invokeBind方法

// DefaultChannelHandlerContext
private ChannelFuture invokeBind(final SocketAddress localAddress, final ChannelPromise promise) {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            invokeBind0(localAddress, promise);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    invokeBind0(localAddress, promise);
                }
            });
        }
        return promise;
}
private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

         ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise)这行代码会调用Headhandler的bind方法

//Headhandler
public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);
        }

 而headHandler会调用AbstractUnsafe的bind方法

//AbstractUnsafe
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            if (!ensureOpen(promise)) {
                return;
            }

            try {
                boolean wasActive = isActive();

                // See: https://github.com/netty/netty/issues/576
                if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
                    Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                    localAddress instanceof InetSocketAddress &&
                    !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
                    // Warn a user about the fact that a non-root user can't receive a
                    // broadcast packet on *nix if the socket is bound on non-wildcard address.
                    logger.warn(
                            "A non-root user can't receive a broadcast packet if the socket " +
                            "is not bound to a wildcard address; binding to a non-wildcard " +
                            "address (" + localAddress + ") anyway as requested.");
                }

                doBind(localAddress);
                promise.setSuccess();
                if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }
            } catch (Throwable t) {
                promise.setFailure(t);
                closeIfClosed();
            }
        }

      因为AbstractUnsafe是AbstractChannel的内部类,所以doBind(localAddress)调用的就是AbstractChannel的子类NioServerSocketChannel的方法

//NioServerSocketChannel
protected void doBind(SocketAddress localAddress) throws Exception {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }

 

//AbstractUnsafe
private void invokeLater(Runnable task) {
            // This method is used by outbound operation implementations to trigger an inbound event later.
            // They do not trigger an inbound event immediately because an outbound operation might have been
            // triggered by another inbound event handler method.  If fired immediately, the call stack
            // will look like this for example:
            //
            //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
            //   -> handlerA.ctx.close()
            //      -> channel.unsafe.close()
            //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
            //
            // which means the execution of two inbound handler methods of the same handler overlap undesirably.
            eventLoop().execute(task);
        }

       终于看到熟悉的ServerSocket的bind方法的调用了吧,至此,就完成了对IP和端口的绑定。注意:此处的backlog(最大完成连接队列数)的默认值为3072。

       由于此时bind已执行,所以isActive方法会返回true,然而channelActive是一个Inbound事件,所以不能由outbound操作直接触发(具体原因看上面代码的注释),需要将channelActive任务加入到boss线程的任务队列中,此时boss线程的任务队列已经执行完了bind任务,接着再执行channelActive任务。

        ChannelActive是一个inbound事件,因此会按照head->tail的顺序执行Inbound处理器,目前有三个处理器:head-> ServerBootstrapAcceptor->tail, ServerBootstrapAcceptor和tail都是inbound处理器,先看一下Head的fireChannelActive方法

//DefaultChannelPipeline
public ChannelPipeline fireChannelActive() {
        head.fireChannelActive();
        if (channel.config().isAutoRead()) {
            channel.read();
        }
        return this;
    }

 head.fireChannelActive()的代码如下:

// DefaultChannelHandlerContext
public ChannelHandlerContext fireChannelActive() {
        final DefaultChannelHandlerContext next = findContextInbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
        return this;
}
private DefaultChannelHandlerContext findContextInbound() {
        DefaultChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!(ctx.handler() instanceof ChannelInboundHandler));
        return ctx;
    }
private void invokeChannelActive() {
        try {
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }

       ServerBootstrapAcceptor和tail的channelActive方法都没有做任何实质性的事情。最后以tailHandler的空实现结束

      接着再看DefaultChannelPipeline执行完head.fireChannelActive()后,对channel.read()的执行

      里面调用了abstractChannel的如下方法:

//abstractChannel
public Channel read() {
        pipeline.read();
        return this;
}
//DefaultChannelPipeline
public ChannelPipeline read() {
        tail.read();
        return this;
}
//DefaultChannelHandlerContext
public ChannelHandlerContext read() {
        findContextOutbound().invokeRead();
        return this;
    }

       Read是一个Outbound事件,因此findContextOutbound()会按照tail->head的顺序执行所有的Outbound处理器,目前有三个处理器:tail->ServerBootstrapAcceptor->head,但只有head是outbound处理器,所以看一下Head的invokeRead方法

private void invokeRead() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            invokeRead0();
        } else {
            Runnable task = invokeRead0Task;
            if (task == null) {
                invokeRead0Task = task = new Runnable() {
                    @Override
                    public void run() {
                        invokeRead0();
                    }
                };
            }
            executor.execute(task);
        }
}
private void invokeRead0() {
        try {
            ((ChannelOutboundHandler) handler()).read(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }

 ((ChannelOutboundHandler) handler()).read(this)这行代码会调用Headhandler的read方法

//Headhandler
public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

 unsafe.beginRead()会调用AbstractUnsafe的beginRead方法

//AbstractUnsafe
public void beginRead() {
            if (!isActive()) {
                return;
            }
            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

        因为AbstractUnsafe是AbstractChannel的内部类,所以doBeginRead()调用的就是AbstractChannel的子类AbstractNioChannel的方法

//AbstractNioChannel
protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return;
        }

        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

        selectionKey.interestOps()的值是之前AbastractUnsafe类中的doRegister方法执行如下代码selectionKey = javaChannel().register(eventLoop().selector, 0, this)时设置的,因此值为0。

  而readInterestOp是之前创建NioServerSocketChanne时,NioServerSocketChannel类的构造函数中设置的super(null, newSocket(), SelectionKey.OP_ACCEPT),因此值为16。

      selectionKey.interestOps(interestOps | readInterestOp)会将ops设置为16。

 

      总结:

依次发生了以下事件:Bind(outbound)->channelActive(inbound)->read(outbound)。

注意:channelActive是在bind中触发的。

 

Boss线程的任务队列变化为:Bind任务->channelActive任务

 

bind任务共做了以下几件事情:

1、将监听套接字绑定IP和端口,并设置最大完成连接队列数

2、将channelActive任务加入到boss线程的任务队列中

 

      channelActive任务做了以下事情:将selectionKey的interestOps设置为SelectionKey.OP_ACCEPT,即16

分享到:
评论
5 楼 wengliang_1986 2017-01-20  
4 楼 langwolf 2014-08-15  
 
3 楼 budairenqin 2013-12-06  
水准很高,顶
2 楼 cjdxlgb 2013-09-23  
public 
1 楼 lxzh504 2013-08-15  
写的好啊。呵呵,非常详细,支持楼主

相关推荐

    netty源码深入分析

    《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...

    Netty权威指南-Netty源码

    总的来说,Netty 源码分析涉及了网络编程、并发处理、事件驱动、协议编解码等多个领域,对理解 Java 高性能网络应用开发有着重要的指导意义。通过阅读源码,我们可以更深入地了解 Netty 如何实现高效的网络通信,并...

    Netty源码解析-服务启动过程.pdf

    ### Netty源码解析——服务启动过程 #### 一、Netty概述 Netty是一个高性能、异步事件驱动的网络应用框架,它被广泛应用于快速开发高性能协议服务器和客户端。Netty通过高度优化的设计和实现提供了低延迟和高吞吐...

    以netty4.1源码中的EchoServer为例对netty的源码进行分析.docx

    在本文中,我们将深入分析 Netty 4.1 源码中的 EchoServer 示例,以理解其核心组件和工作原理。 首先,我们关注 EchoServer 服务端的初始化,这涉及到两个关键组件:`bossGroup` 和 `workerGroup`。它们都是 `...

    netty源码解析视频.txt

    通过对Netty源码的深入分析,我们不仅能够了解到其内部工作原理,还能学习到许多优秀的设计思想和技术实践,这对于提高个人的技术水平和解决实际问题有着非常重要的意义。希望本文能够帮助读者更好地理解和掌握Netty...

    netty源码深入剖析.txt

    《Netty源码深入剖析》一书旨在帮助读者深入了解Netty框架的工作原理和技术细节,从基础知识入手,逐步过渡到高级优化技巧,使开发者能够更好地掌握并应用Netty于实际项目中。 ### 一、Netty简介与核心特性 Netty...

    netty 通过端口调用关闭

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨“Netty ...在实践中,结合源码分析和使用工具,可以进一步提升我们的开发和运维能力。

    Netty5.0架构剖析和源码解读

    Netty源码分析 ##### 3.1. 服务端创建 Netty是一个高性能的网络应用程序框架,其核心优势在于异步事件驱动的I/O处理机制。接下来我们将深入分析Netty服务端的创建过程。 ###### 3.1.1. 服务端启动辅助类...

    基于netty的nio使用demo源码

    Netty是一个高性能、异步事件驱动的网络应用框架,它为Java开发人员提供了构建服务器和客户端应用程序的强大...通过阅读和分析源码,我们可以深入理解Netty的事件驱动模型、NIO机制以及如何在实际项目中应用这些概念。

    netty5 HTTP协议栈浅析与实践

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入浅出...结合源码分析和实践操作,我们可以更深入地理解 HTTP 协议及其在网络编程中的应用。

    Netty HTTP协议简单实现

    7. **源码分析与调试** 阅读并理解 Netty 源码可以帮助我们更好地掌握其工作原理。Netty 的代码结构清晰,模块化设计使得我们可以针对具体需求进行定制。在实际项目中,可以使用 IDE 的调试功能逐步跟踪请求处理...

    聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇).doc

    Netty 是一个高性能、异步事件驱动的网络应用框架...主从 Reactor 线程组设计保证了服务端可以高效处理大规模并发连接,而通过源码分析,我们可以更深入地理解其内部机制,从而更好地利用 Netty 开发高性能的网络应用。

    DotNetty系列二:基本使用,博文里的源代码

    3. **事件驱动模型**:DotNetty基于Netty的事件驱动模型,通过事件循环Group(如NioEventLoopGroup)来处理网络事件。 此外,博主可能还会讲解如何调试和测试这些示例,以及如何优化性能,比如通过线程池管理、内存...

Global site tag (gtag.js) - Google Analytics