前言
本菜鸟有过几年的网络IO相关经验, java层面netty也一直关注, 最近想对自己所了解的netty做一个系列的笔记, 不光技术水平有限, 写作水平更有限, 难免有错误之处欢迎指正, 共同学习.
源码来自Netty5.x版本, 本系列文章不打算从架构的角度去讨论netty, 只想从源码细节展开, 又不想通篇的贴代码, 如果没有太大的必要, 我会尽量避免贴代码或是去掉不影响主流程逻辑的代码, 尽量多用语言描述. 这个过程中我会把我看到的netty对代码进行优化的一些细节提出来探讨, 大家共同学习, 更希望能抛砖引玉.
上一篇讲了EventLoop, 这一篇看一下server端如何bind的, 继续从上一篇开篇代码示例开始
服务端启动代码示例
服务端启动第一步是ChannelFuture f = b.bind(PORT), 接下来分析其详细过程, 先直接进入AbstractBootstrap#doBind()方法
这个方法调用栈太深, 具体执行流程就不像上一章那样在这里一一列出了, 跟着代码调用链走好了
首先第一步我们最关心的肯定是, NIO服务端嘛, 总要有个监听套接字ServerSocketChannel吧?这个动作是由initAndRegister()完成的
先重点看一下initAndRegister()方法:
1.首先channelFactory在开篇示例代码b.channel(NioServerSocketChannel.class)中被设置成了new ReflectiveChannelFactory<C>(NioServerSocketChannel.class)
再看一下ReflectiveChannelFactory代码, 实际上是factory通过反射创建一个NioServerSocketChannel对象
现在清楚了b.channel(NioServerSocketChannel.class)都做了什么,但是构造NioServerSocketChannel还是做了很多工作的, clazz.newInstance()调用的是默认无参构造方法, 来看一下 NioServerSocketChannel的无参构造方法:
1.到这里终于找到了,在newSocket()中创建了开篇提到的监听套接字ServerSocketChannel
2.这里可以看到SelectionKey.OP_ACCEPT标志就是监听套接字所感兴趣的事件了(但是还没注册进去,别着急, 自己挖的坑会含泪填完)
3.在父类构造函数我们看到了configureBlocking为false,终于将ServerSocketChannel设置为非阻塞模式, NIO之旅可以顺利开始了
4.继续父类构造方法,这里干了如下几件重要的事情:
1)构造一个unsafe绑定在serverChanel上,newUnsafe()由子类AbstractNioMessageChannel实现, unsafe的类型为NioMessageUnsafe,
NioMessageUnsafe类型专为serverChanel服务, 专门处理accept连接
2)创建用于NioServerSocketChannel的管道 boss pipeline
5.head和tail是pipeline的两头, head是outbound event的末尾, tail是inbound event的末尾.
按照上行事件(inbound)顺序来看, 现在pipeline中的顺序是head-->tail
再回到initAndRegister()方法, 继续看下面这段代码:
由于现在讲的是server的bind, 所以去看ServerBootstrap的init()实现:
1.init方法的代码比较多, 但是不难理解, 最上面我省略的部分做了这些事情:
1)设置NioServerSocketChannel的options和attrs.
2)预先复制好将来要设置给NioSocketChannel的options和attrs.
这里强调一下, 通常channel可分为两类XXXServerSocketChannel和XXXSocketChannel, 前者可以先简单理解为accept用的, 后者用来read和write等, 后面流程梳理通畅了这个问题也就迎刃而解了.
2.init做的第二件事就是在boss pipeline添加一个ChannelInitializer,
那么现在pipeline中的顺序变成了head-->ChannelInitializer-->tail(注意head和tail永远在两头, addLast方法对他俩不起作用)
ChannelInitializer这个类很有意思, 来看下它的代码吧
1.首先当有regist事件发生时, 最终会调用到channelRegistered(), 在这个方法里
先调用了抽象方法initChannel, 回看init()的代码,此时ServerBootstrapAcceptor会被加入到pipeline
现在的顺序是head--> ServerBootstrapAcceptor-->tail
2.然后从pipeline中移除自己, 因为它的工作已进入收尾阶段, 接下来只要将channelRegistered继续往pipeline中后续handler流转它就可以退出历史舞台了.
3.至于ServerBootstrapAcceptor是干什么的?先简单介绍一下,它是在一个accept的channel从boss移交给worker过程中的一个重要环节, 等以后的流程涉及到了它再详细分析(此坑下一篇填)
init() 的主要流程至此已分析的差不多了, init之后就是group().register(channel)了
ChannelFuture regFuture = group().register(channel);
1.这里group()返回的自然是开篇示例代码中的bossGroup了
2.register调用的是MultithreadEventLoopGroup的实现:
看见next()不知是否想起了前一篇分析EventLoop时提到的 "用取模的方式从group中拿出一个EventLoop"?对的就是这么干的, 调用栈是这样的:
然后调用:
或是调用(这个类是在个数为2的N次方时的优化chooser版本):
但是由于bossEventLoop我们在开篇示例中设置只有1个, 通常情况下1个也够用了,除非你要绑定多个端口,所以这里next()其实总会返回同一个
3.接着register()调用路径往下
SingleThreadEventLoop#register()中调用了channel.unsafe().register(this, promise);
到此类似闯关游戏最后一关大怪基本要现身了, unsafe#register()代码在AbstractChannel$AbstractUnsafe中
重点看一下AbstractUnsafe#register()
eventLoop.inEventLoop()是判断当前线程是否为EventLoop线程, 此时当前线程还是我们的main线程, bossEventLoop线程还没有启动,
所以会走到else分支调用eventLoop.execute(),在SingleThreadEventExecutor的execute方法中会判断当前线程是否为eventLoop如果不是, 则启动当前eventLoop线程
到现在为止, bossEventLoop终于开门红, 接了第一笔单子, 即register0()任务, 丢到了MPSC队列里.
接下来分析register0()
1.到这里我已经要哭了, register0()还不是最终大怪, 还有一个doRegister()
2.不过在doRegister()之后还调用了pipeline.fireChannelRegistered(), 是的就是它, 还能想起上文中提到的ChannelInitializer吗? ChannelInitializer#channelRegistered()方法就是在这里被触发的.
3.剩下的代码fireChannelActive()注释上已经写的很明白了, 不多做解释了(这里一般不会调用, 因为此时isActive()很难是true).
继续转战doRegister()
javaChannel().register(), 终于看到最关键的这行代码, 实际调用的实现是:
至此NIO固有的套路出现了,这里先把interestOps注册为0, OP_ACCEPT相信接下来会出现的, 继续看代码
在返回doBind()接着看doBind0()之前, 先留意一下register0()中我刻意留着没有删除的代码safeSetSuccess(promise)
上面那句代码会将promise的success设置为true并触发回调在doBind()中添加的listener
到这里,bossEventLoop已经接到第二个任务了(bind), 第一个还记得吧(register0)
接下来继续bind的细节吧
AbstractChannel#bind()代码:
ChannelPipeline目前只有一个默认实现即DefaultChannelPipeline
接着调用AbstractChannelHandlerContext的bind
1.第一行代码中findContextOutbound(), 看字面意思就知道是找出下一个outbound handler 的 ctx, 这里是找到第一个outbound 的 ctx.(注意bind是outbound event)
2.invoker()也是一个默认实现即DefaultChannelHandlerInvoker, 不详细解释了,
还记的上面已经说过的目前pipeline中的顺序是head--> ServerBootstrapAcceptor-->tail 吧?
第一章节已经讲过outbound的执行顺序是反过来的, 而这三个当中只有head是处理outbound的
又见unsafe咯
上面的doBind()调用的是NioServerSocketChannel的实现:
到此, 赤裸裸的nio api 之 ServerSocket.bind()已呈现在你眼前
大家做网络IO开发的一定了解第二个参数backlog的重要性,在linux内核中TCP握手过程总共会有两个队列:
1)一个俗称半连接队列, 装着那些握手一半的连接(syn queue)
2)另一个是装着那些握手成功但是还没有被应用层accept的连接的队列(accept queue)
backlog的大小跟这两个队列的容量之和息息相关, 还有哇, "爱情不是你想买,想买就能买", backlog的值也不是你设置多少它就是多少的, 具体你要参考linux内核代码(甚至文档都不准)
我临时翻了一下linux-3.10.28的代码, 逻辑是这样的(socket.c):
我们清楚的看到backlog 并不是按照你所设置的backlog大小,实际上取的是backlog和somaxconn的最小值
somaxconn的值定义在:
/proc/sys/net/core/somaxconn
netty中backlog在linux下的默认值也是somaxconn
还有一点要注意, 对于TCP连接的ESTABLISHED状态, 并不需要应用层accept, 只要在accept queue里就已经变成状态ESTABLISHED, 所以在使用ss或netstat排查这方面问题不要被ESTABLISHED迷惑.
额, 白呼了一堆java层大家一般不是很关心的东西, 现在我们回到正题, 回到unsafe.bind()方法
1.在doBind()之前wasActive基本上会是false了, doBind()之后isActive()为true, 所以这里会触发channelActive事件
2.这里由于bind是一个outbound, 所以选择invokeLater()的方式去触发channelActive这个inbound, 具体原因我还是把invokeLater()的注释放出来吧, 说的很明白:
3.channelActive是一个inbound, 再一次回到pipeline的顺序head--> ServerBootstrapAcceptor-->tail, 此时按照head-->tail的顺序执行.
ServerBootstrapAcceptor和tail都是indound handler
先看DefaultChannelPipeline的fireChannelActive()
head.fireChannelActive()会让channelActive event按顺序在ServerBootstrapAcceptor和tail中流转, 但是他们俩对这个event都没有实质性的处理, 所以代码我就不贴出来了.
下面这句 channel.read()才是能提起兴趣的代码
注意这个read可不是一个读数据的 inbound event, 他是一个outbound event, 是"开始读"的意思, 这个event在pipeline中从tail开始溜达最终会溜达到head的read()方法:
又回到unsafe了
记不住了的往上面翻翻看, 上面挖的坑, 注册的时候,doRegister()方法里面的代码:
selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
现在是时候填坑了, 当时注册了0, 现在要把readInterestOp注册进去了, readInterestOps就是NioServerSocketChannel构造函数传入的OP_ACCEPT, 再贴一遍代码:
好了, bind到此分析结束.下一篇会尝试分析accept
本菜鸟有过几年的网络IO相关经验, java层面netty也一直关注, 最近想对自己所了解的netty做一个系列的笔记, 不光技术水平有限, 写作水平更有限, 难免有错误之处欢迎指正, 共同学习.
源码来自Netty5.x版本, 本系列文章不打算从架构的角度去讨论netty, 只想从源码细节展开, 又不想通篇的贴代码, 如果没有太大的必要, 我会尽量避免贴代码或是去掉不影响主流程逻辑的代码, 尽量多用语言描述. 这个过程中我会把我看到的netty对代码进行优化的一些细节提出来探讨, 大家共同学习, 更希望能抛砖引玉.
上一篇讲了EventLoop, 这一篇看一下server端如何bind的, 继续从上一篇开篇代码示例开始
服务端启动代码示例
// Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler()); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
服务端启动第一步是ChannelFuture f = b.bind(PORT), 接下来分析其详细过程, 先直接进入AbstractBootstrap#doBind()方法
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
这个方法调用栈太深, 具体执行流程就不像上一章那样在这里一一列出了, 跟着代码调用链走好了
首先第一步我们最关心的肯定是, NIO服务端嘛, 总要有个监听套接字ServerSocketChannel吧?这个动作是由initAndRegister()完成的
先重点看一下initAndRegister()方法:
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { // ...... } ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
1.首先channelFactory在开篇示例代码b.channel(NioServerSocketChannel.class)中被设置成了new ReflectiveChannelFactory<C>(NioServerSocketChannel.class)
再看一下ReflectiveChannelFactory代码, 实际上是factory通过反射创建一个NioServerSocketChannel对象
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Class<? extends T> clazz; public ReflectiveChannelFactory(Class<? extends T> clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz; } @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } }
现在清楚了b.channel(NioServerSocketChannel.class)都做了什么,但是构造NioServerSocketChannel还是做了很多工作的, clazz.newInstance()调用的是默认无参构造方法, 来看一下 NioServerSocketChannel的无参构造方法:
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a server socket.", e); } }
1.到这里终于找到了,在newSocket()中创建了开篇提到的监听套接字ServerSocketChannel
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
2.这里可以看到SelectionKey.OP_ACCEPT标志就是监听套接字所感兴趣的事件了(但是还没注册进去,别着急, 自己挖的坑会含泪填完)
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { // ...... } }
3.在父类构造函数我们看到了configureBlocking为false,终于将ServerSocketChannel设置为非阻塞模式, NIO之旅可以顺利开始了
protected AbstractChannel(Channel parent) { this.parent = parent; id = DefaultChannelId.newInstance(); unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
4.继续父类构造方法,这里干了如下几件重要的事情:
1)构造一个unsafe绑定在serverChanel上,newUnsafe()由子类AbstractNioMessageChannel实现, unsafe的类型为NioMessageUnsafe,
NioMessageUnsafe类型专为serverChanel服务, 专门处理accept连接
2)创建用于NioServerSocketChannel的管道 boss pipeline
DefaultChannelPipeline(AbstractChannel channel) { // ...... this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
5.head和tail是pipeline的两头, head是outbound event的末尾, tail是inbound event的末尾.
按照上行事件(inbound)顺序来看, 现在pipeline中的顺序是head-->tail
再回到initAndRegister()方法, 继续看下面这段代码:
init(channel);
由于现在讲的是server的bind, 所以去看ServerBootstrap的init()实现:
void init(Channel channel) throws Exception { // ...... p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
1.init方法的代码比较多, 但是不难理解, 最上面我省略的部分做了这些事情:
1)设置NioServerSocketChannel的options和attrs.
2)预先复制好将来要设置给NioSocketChannel的options和attrs.
这里强调一下, 通常channel可分为两类XXXServerSocketChannel和XXXSocketChannel, 前者可以先简单理解为accept用的, 后者用来read和write等, 后面流程梳理通畅了这个问题也就迎刃而解了.
2.init做的第二件事就是在boss pipeline添加一个ChannelInitializer,
那么现在pipeline中的顺序变成了head-->ChannelInitializer-->tail(注意head和tail永远在两头, addLast方法对他俩不起作用)
ChannelInitializer这个类很有意思, 来看下它的代码吧
public abstract class ChannelInitializer<C extends Channel> extends ChannelHandlerAdapter { protected abstract void initChannel(C ch) throws Exception; @Override public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { ChannelPipeline pipeline = ctx.pipeline(); boolean success = false; try { initChannel((C) ctx.channel()); pipeline.remove(this); ctx.fireChannelRegistered(); success = true; } catch (Throwable t) { // ...... } finally { // ...... } } }
1.首先当有regist事件发生时, 最终会调用到channelRegistered(), 在这个方法里
先调用了抽象方法initChannel, 回看init()的代码,此时ServerBootstrapAcceptor会被加入到pipeline
现在的顺序是head--> ServerBootstrapAcceptor-->tail
2.然后从pipeline中移除自己, 因为它的工作已进入收尾阶段, 接下来只要将channelRegistered继续往pipeline中后续handler流转它就可以退出历史舞台了.
3.至于ServerBootstrapAcceptor是干什么的?先简单介绍一下,它是在一个accept的channel从boss移交给worker过程中的一个重要环节, 等以后的流程涉及到了它再详细分析(此坑下一篇填)
init() 的主要流程至此已分析的差不多了, init之后就是group().register(channel)了
ChannelFuture regFuture = group().register(channel);
1.这里group()返回的自然是开篇示例代码中的bossGroup了
2.register调用的是MultithreadEventLoopGroup的实现:
public ChannelFuture register(Channel channel) { return next().register(channel); }
看见next()不知是否想起了前一篇分析EventLoop时提到的 "用取模的方式从group中拿出一个EventLoop"?对的就是这么干的, 调用栈是这样的:
public EventExecutor next() { return chooser.next(); }
然后调用:
private final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } }
或是调用(这个类是在个数为2的N次方时的优化chooser版本):
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1]; } }
但是由于bossEventLoop我们在开篇示例中设置只有1个, 通常情况下1个也够用了,除非你要绑定多个端口,所以这里next()其实总会返回同一个
3.接着register()调用路径往下
SingleThreadEventLoop#register()中调用了channel.unsafe().register(this, promise);
到此类似闯关游戏最后一关大怪基本要现身了, unsafe#register()代码在AbstractChannel$AbstractUnsafe中
重点看一下AbstractUnsafe#register()
public final void register(EventLoop eventLoop, final ChannelPromise promise) { // ...... // It's necessary to reuse the wrapped eventloop object. Otherwise the user will end up with multiple // objects that do not share a common state. if (AbstractChannel.this.eventLoop == null) { AbstractChannel.this.eventLoop = new PausableChannelEventLoop(eventLoop); } else { AbstractChannel.this.eventLoop.unwrapped = eventLoop; } if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { // ...... } } }
eventLoop.inEventLoop()是判断当前线程是否为EventLoop线程, 此时当前线程还是我们的main线程, bossEventLoop线程还没有启动,
所以会走到else分支调用eventLoop.execute(),在SingleThreadEventExecutor的execute方法中会判断当前线程是否为eventLoop如果不是, 则启动当前eventLoop线程
public void execute(Runnable task) { // ...... boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startExecution(); // ...... } // ...... }
到现在为止, bossEventLoop终于开门红, 接了第一笔单子, 即register0()任务, 丢到了MPSC队列里.
接下来分析register0()
private void register0(ChannelPromise promise) { try { // ...... doRegister(); // ...... safeSetSuccess(promise); // 注意这行代码我不会平白无故不删它的, 下面会提到它 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // ...... } }
1.到这里我已经要哭了, register0()还不是最终大怪, 还有一个doRegister()
2.不过在doRegister()之后还调用了pipeline.fireChannelRegistered(), 是的就是它, 还能想起上文中提到的ChannelInitializer吗? ChannelInitializer#channelRegistered()方法就是在这里被触发的.
3.剩下的代码fireChannelActive()注释上已经写的很明白了, 不多做解释了(这里一般不会调用, 因为此时isActive()很难是true).
继续转战doRegister()
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this); return; } catch (CancelledKeyException e) { // ...... } } }
javaChannel().register(), 终于看到最关键的这行代码, 实际调用的实现是:
java.nio.channels.spi.AbstractSelectableChannel#register(Selector sel, int ops, Object att)
至此NIO固有的套路出现了,这里先把interestOps注册为0, OP_ACCEPT相信接下来会出现的, 继续看代码
在返回doBind()接着看doBind0()之前, 先留意一下register0()中我刻意留着没有删除的代码safeSetSuccess(promise)
上面那句代码会将promise的success设置为true并触发回调在doBind()中添加的listener
regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // ...... doBind0(regFuture, channel, localAddress, promise); } }); private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
到这里,bossEventLoop已经接到第二个任务了(bind), 第一个还记得吧(register0)
接下来继续bind的细节吧
AbstractChannel#bind()代码:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
ChannelPipeline目前只有一个默认实现即DefaultChannelPipeline
public ChannelFuture bind(SocketAddress localAddress) { return tail.bind(localAddress); }
接着调用AbstractChannelHandlerContext的bind
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); next.invoker().invokeBind(next, localAddress, promise); return promise; }
1.第一行代码中findContextOutbound(), 看字面意思就知道是找出下一个outbound handler 的 ctx, 这里是找到第一个outbound 的 ctx.(注意bind是outbound event)
2.invoker()也是一个默认实现即DefaultChannelHandlerInvoker, 不详细解释了,
还记的上面已经说过的目前pipeline中的顺序是head--> ServerBootstrapAcceptor-->tail 吧?
第一章节已经讲过outbound的执行顺序是反过来的, 而这三个当中只有head是处理outbound的
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
又见unsafe咯
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { // ...... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
上面的doBind()调用的是NioServerSocketChannel的实现:
protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog()); }
到此, 赤裸裸的nio api 之 ServerSocket.bind()已呈现在你眼前
大家做网络IO开发的一定了解第二个参数backlog的重要性,在linux内核中TCP握手过程总共会有两个队列:
1)一个俗称半连接队列, 装着那些握手一半的连接(syn queue)
2)另一个是装着那些握手成功但是还没有被应用层accept的连接的队列(accept queue)
backlog的大小跟这两个队列的容量之和息息相关, 还有哇, "爱情不是你想买,想买就能买", backlog的值也不是你设置多少它就是多少的, 具体你要参考linux内核代码(甚至文档都不准)
我临时翻了一下linux-3.10.28的代码, 逻辑是这样的(socket.c):
sock = sockfd_lookup_light(fd, &err, &fput_needed); if (sock) { somaxconn = sock_net(sock->sk)->core.sysctl_somaxconn; if ((unsigned int)backlog > somaxconn) backlog = somaxconn; err = security_socket_listen(sock, backlog); if (!err) err = sock->ops->listen(sock, backlog); fput_light(sock->file, fput_needed); }
我们清楚的看到backlog 并不是按照你所设置的backlog大小,实际上取的是backlog和somaxconn的最小值
somaxconn的值定义在:
/proc/sys/net/core/somaxconn
netty中backlog在linux下的默认值也是somaxconn
还有一点要注意, 对于TCP连接的ESTABLISHED状态, 并不需要应用层accept, 只要在accept queue里就已经变成状态ESTABLISHED, 所以在使用ss或netstat排查这方面问题不要被ESTABLISHED迷惑.
额, 白呼了一堆java层大家一般不是很关心的东西, 现在我们回到正题, 回到unsafe.bind()方法
1.在doBind()之前wasActive基本上会是false了, doBind()之后isActive()为true, 所以这里会触发channelActive事件
2.这里由于bind是一个outbound, 所以选择invokeLater()的方式去触发channelActive这个inbound, 具体原因我还是把invokeLater()的注释放出来吧, 说的很明白:
private void invokeLater(Runnable task) { try { // This method is used by outbound operation implementations to trigger an inbound event later. // They do not trigger an inbound event immediately because an outbound operation might have been // triggered by another inbound event handler method. If fired immediately, the call stack // will look like this for example: // // handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection. // -> handlerA.ctx.close() // -> channel.unsafe.close() // -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet // // which means the execution of two inbound handler methods of the same handler overlap undesirably. eventLoop().unwrap().execute(task); } catch (RejectedExecutionException e) { logger.warn("Can't invoke task later as EventLoop rejected it", e); } }
3.channelActive是一个inbound, 再一次回到pipeline的顺序head--> ServerBootstrapAcceptor-->tail, 此时按照head-->tail的顺序执行.
ServerBootstrapAcceptor和tail都是indound handler
先看DefaultChannelPipeline的fireChannelActive()
public ChannelPipeline fireChannelActive() { head.fireChannelActive(); if (channel.config().isAutoRead()) { channel.read(); } return this; }
head.fireChannelActive()会让channelActive event按顺序在ServerBootstrapAcceptor和tail中流转, 但是他们俩对这个event都没有实质性的处理, 所以代码我就不贴出来了.
下面这句 channel.read()才是能提起兴趣的代码
注意这个read可不是一个读数据的 inbound event, 他是一个outbound event, 是"开始读"的意思, 这个event在pipeline中从tail开始溜达最终会溜达到head的read()方法:
public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); }
又回到unsafe了
// AbstractChannel public final void beginRead() { // ...... try { doBeginRead(); } catch (final Exception e) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } } // AbstractNioChannel protected void doBeginRead() throws Exception { // ...... final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
记不住了的往上面翻翻看, 上面挖的坑, 注册的时候,doRegister()方法里面的代码:
selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
现在是时候填坑了, 当时注册了0, 现在要把readInterestOp注册进去了, readInterestOps就是NioServerSocketChannel构造函数传入的OP_ACCEPT, 再贴一遍代码:
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
好了, bind到此分析结束.下一篇会尝试分析accept
相关推荐
ChannelFuture future = bootstrap.bind(getPort()).sync(); future.channel().closeFuture().sync(); } @Override public void destroy() throws Exception { bossGroup.shutdownGracefully(); workerGroup...
在深入探讨 Netty 源码之前,我们先了解一下 Netty 的核心概念和架构。 Netty 的核心组件包括:ByteBuf(字节缓冲区)、Channel(通道)、EventLoop(事件循环)、Pipeline(处理链)以及Handler(处理器)。...
《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...
### Netty源码解析——服务启动过程 #### 一、Netty概述 Netty是一个高性能、异步事件驱动的网络应用框架,它被广泛应用于快速开发高性能协议服务器和客户端。Netty通过高度优化的设计和实现提供了低延迟和高吞吐...
然后,调用`bind()`方法启动服务器,等待客户端连接。 项目还引入了MyBatis作为持久层框架,用于数据库交互。MyBatis允许开发者将SQL语句与Java代码解耦,提供了灵活的数据访问方式。在"Netty_demo-master"中,可能...
根据提供的文件信息,这里将对Netty源码的相关知识点进行详细解析。Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器与客户端。Netty因其优秀的性能和灵活的设计而受到...
《Netty源码深入剖析》一书旨在帮助读者深入了解Netty框架的工作原理和技术细节,从基础知识入手,逐步过渡到高级优化技巧,使开发者能够更好地掌握并应用Netty于实际项目中。 ### 一、Netty简介与核心特性 Netty...
在本文中,我们将深入分析 Netty 4.1 源码中的 EchoServer 示例,以理解其核心组件和工作原理。 首先,我们关注 EchoServer 服务端的初始化,这涉及到两个关键组件:`bossGroup` 和 `workerGroup`。它们都是 `...
流程走到了启动好reactor线程后,ServerSocketChannel注册到selector上,但是感兴趣事件填的0,我们继续跟流程,走到这里,initAndRegister方法完成,继续跟bind方法,我们看bind方法中的doBind0方法 /** * * @...
Netty源码分析 ##### 3.1. 服务端创建 Netty是一个高性能的网络应用程序框架,其核心优势在于异步事件驱动的I/O处理机制。接下来我们将深入分析Netty服务端的创建过程。 ###### 3.1.1. 服务端启动辅助类...
在"基于netty的nio使用demo源码"中,我们可以期待看到以下几个关键部分: 1. **初始化ServerBootstrap**:Netty服务器的启动过程通常从创建ServerBootstrap实例开始,设置其配置,如BossGroup(接收新连接的线程组...
2. **设置Handler**:定义自定义的ChannelInboundHandlerAdapter,这是处理接收到的WebSocket帧的类。你需要重写`channelActive()`方法来处理连接建立,`channelRead()`方法来处理接收到的数据,以及`...
2. **创建 ChannelHandler**:Netty 中的 `ChannelHandler` 是处理网络事件的核心组件。我们需要创建一个或多个 `ChannelHandler` 来处理 SDTP 相关的事件,如连接建立、数据读写、断开连接等。在 `channelRead()` ...
通过调用 `bind()` 方法并传入目标端口号,Netty 服务器将开始监听指定端口上的连接请求。 2. **Channel 和 ChannelFuture** 在 Netty 中,`Channel` 是网络连接的抽象,它代表了服务器与客户端之间的通信通道。`...
return BindingBuilder.bind(queueMessage).to(exchange).with("web.#"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queue...
在实践中,我们可以结合标签中的"源码"和"工具",通过阅读Netty的源代码来进一步了解其实现细节,同时利用各种工具进行性能监控和调优,确保服务的稳定运行。在学习过程中,参考博客链接(已提供但无法在此处访问)...
2. **Netty与Spring的交互** - 使用Spring的`ApplicationContext`在Netty的`ChannelInitalizer`或`Handler`中获取Bean,实现业务逻辑的注入。 - 通过`@Service`和`@Autowired`注解,将Spring管理的Bean注入到Netty...
在阅读 Netty 源码时,我们可以深入了解其内部机制,比如如何高效地处理 I/O 事件、如何实现零拷贝等。这对于提升我们的网络编程技能和优化应用性能非常有帮助。 总结来说,Netty 通过提供全面的 HTTP 协议栈支持,...