回到之前未分析完的doBind逻辑,前一篇文章已分析到dobind方法中initAndRegister方法,该方法最终触发了对regPromise 的listener的回调,Listener将bind任务加到boss线程的任务队列中
//AbstractBootstrap private ChannelFuture AbstractBootstrap doBind(final SocketAddress localAddress) { final ChannelFuture regPromise = initAndRegister(); final Channel channel = regPromise.channel(); final ChannelPromise promise = channel.newPromise(); if (regPromise.isDone()) { doBind0(regPromise, channel, localAddress, promise); } else { regPromise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(future, channel, localAddress, promise); } }); } return promise; } private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
channel.bind(localAddress, promise)调用AbstractChannel的bind方法
//AbstractChannel public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
pipeline.bind(localAddress, promise)调用的是DefaultChannelPipeline的方法
//DefaultChannelPipeline public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
tail.bind(localAddress, promise)会调用DefaultChannelHandlerContext的方法
//DefaultChannelHandlerContext public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } validatePromise(promise, false); return findContextOutbound().invokeBind(localAddress, promise); } private DefaultChannelHandlerContext findContextOutbound() { DefaultChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!(ctx.handler() instanceof ChannelOutboundHandler)); return ctx; }
bind是一个Outbound事件,因此会按照tail->head的顺序执行所有的Outbound处理器,目前有三个处理器:tail-> ServerBootstrapAcceptor->head,但只有head是outbound处理器,所以看一下Head的invokeBind方法
// DefaultChannelHandlerContext private ChannelFuture invokeBind(final SocketAddress localAddress, final ChannelPromise promise) { EventExecutor executor = executor(); if (executor.inEventLoop()) { invokeBind0(localAddress, promise); } else { executor.execute(new Runnable() { @Override public void run() { invokeBind0(localAddress, promise); } }); } return promise; } private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise)这行代码会调用Headhandler的bind方法
//Headhandler public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
//AbstractUnsafe public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { if (!ensureOpen(promise)) { return; } try { boolean wasActive = isActive(); // See: https://github.com/netty/netty/issues/576 if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() && Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } doBind(localAddress); promise.setSuccess(); if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } } catch (Throwable t) { promise.setFailure(t); closeIfClosed(); } }
//NioServerSocketChannel protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog()); }
//AbstractUnsafe private void invokeLater(Runnable task) { // This method is used by outbound operation implementations to trigger an inbound event later. // They do not trigger an inbound event immediately because an outbound operation might have been // triggered by another inbound event handler method. If fired immediately, the call stack // will look like this for example: // // handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection. // -> handlerA.ctx.close() // -> channel.unsafe.close() // -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet // // which means the execution of two inbound handler methods of the same handler overlap undesirably. eventLoop().execute(task); }
ChannelActive是一个inbound事件,因此会按照head->tail的顺序执行Inbound处理器,目前有三个处理器:head-> ServerBootstrapAcceptor->tail, ServerBootstrapAcceptor和tail都是inbound处理器,先看一下Head的fireChannelActive方法
//DefaultChannelPipeline public ChannelPipeline fireChannelActive() { head.fireChannelActive(); if (channel.config().isAutoRead()) { channel.read(); } return this; }
// DefaultChannelHandlerContext public ChannelHandlerContext fireChannelActive() { final DefaultChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelActive(); } }); } return this; } private DefaultChannelHandlerContext findContextInbound() { DefaultChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!(ctx.handler() instanceof ChannelInboundHandler)); return ctx; } private void invokeChannelActive() { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } }
//abstractChannel public Channel read() { pipeline.read(); return this; } //DefaultChannelPipeline public ChannelPipeline read() { tail.read(); return this; } //DefaultChannelHandlerContext public ChannelHandlerContext read() { findContextOutbound().invokeRead(); return this; }
private void invokeRead() { EventExecutor executor = executor(); if (executor.inEventLoop()) { invokeRead0(); } else { Runnable task = invokeRead0Task; if (task == null) { invokeRead0Task = task = new Runnable() { @Override public void run() { invokeRead0(); } }; } executor.execute(task); } } private void invokeRead0() { try { ((ChannelOutboundHandler) handler()).read(this); } catch (Throwable t) { notifyHandlerException(t); } }
((ChannelOutboundHandler) handler()).read(this)这行代码会调用Headhandler的read方法
//Headhandler public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); }
//AbstractUnsafe public void beginRead() { if (!isActive()) { return; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
//AbstractNioChannel protected void doBeginRead() throws Exception { if (inputShutdown) { return; } final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
selectionKey.interestOps()的值是之前AbastractUnsafe类中的doRegister方法执行如下代码selectionKey = javaChannel().register(eventLoop().selector, 0, this)时设置的,因此值为0。
而readInterestOp是之前创建NioServerSocketChanne时,NioServerSocketChannel类的构造函数中设置的super(null, newSocket(), SelectionKey.OP_ACCEPT),因此值为16。
selectionKey.interestOps(interestOps | readInterestOp)会将ops设置为16。
《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...
总的来说,Netty 源码分析涉及了网络编程、并发处理、事件驱动、协议编解码等多个领域,对理解 Java 高性能网络应用开发有着重要的指导意义。通过阅读源码,我们可以更深入地了解 Netty 如何实现高效的网络通信,并...
### Netty源码解析——服务启动过程 #### 一、Netty概述 Netty是一个高性能、异步事件驱动的网络应用框架,它被广泛应用于快速开发高性能协议服务器和客户端。Netty通过高度优化的设计和实现提供了低延迟和高吞吐...
在本文中,我们将深入分析 Netty 4.1 源码中的 EchoServer 示例,以理解其核心组件和工作原理。 首先,我们关注 EchoServer 服务端的初始化,这涉及到两个关键组件:`bossGroup` 和 `workerGroup`。它们都是 `...
《Netty源码深入剖析》一书旨在帮助读者深入了解Netty框架的工作原理和技术细节,从基础知识入手,逐步过渡到高级优化技巧,使开发者能够更好地掌握并应用Netty于实际项目中。 ### 一、Netty简介与核心特性 Netty...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨“Netty ...在实践中,结合源码分析和使用工具,可以进一步提升我们的开发和运维能力。
Netty源码分析 ##### 3.1. 服务端创建 Netty是一个高性能的网络应用程序框架,其核心优势在于异步事件驱动的I/O处理机制。接下来我们将深入分析Netty服务端的创建过程。 ###### 3.1.1. 服务端启动辅助类...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入浅出...结合源码分析和实践操作,我们可以更深入地理解 HTTP 协议及其在网络编程中的应用。
7. **源码分析与调试** 阅读并理解 Netty 源码可以帮助我们更好地掌握其工作原理。Netty 的代码结构清晰,模块化设计使得我们可以针对具体需求进行定制。在实际项目中,可以使用 IDE 的调试功能逐步跟踪请求处理...
Netty 是一个高性能、异步事件驱动的网络应用框架...主从 Reactor 线程组设计保证了服务端可以高效处理大规模并发连接,而通过源码分析,我们可以更深入地理解其内部机制,从而更好地利用 Netty 开发高性能的网络应用。
3. **事件驱动模型**:DotNetty基于Netty的事件驱动模型,通过事件循环Group(如NioEventLoopGroup)来处理网络事件。 此外,博主可能还会讲解如何调试和测试这些示例,以及如何优化性能,比如通过线程池管理、内存...