  • 浏览: 22021 次
  • 性别: Icon_minigender_1
  • 来自: 深圳

Netty源码阅读(一) ServerBootstrap启动



Netty源码阅读(一) ServerBootstrap启动


1. ChannelPromise关联了Channel和Executor,当然channel中也会有EventLoop的实例。 2. 每个channel有自己的pipeline实例。 3. 每个NioEventLoop中有自己的Executor实例和Selector实例。



1 // Configure the server. 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 3 EventLoopGroup workerGroup = new NioEventLoopGroup; 4 try { 5 ServerBootstrap b = new ServerBootstrap; 6 b.group(bossGroup, workerGroup) 7 .channel(NioServerSocketChannel.class) 8 .option(ChannelOption.SO_BACKLOG, 100) // 设置tcp协议的请求等待队列 9 .handler(new LoggingHandler(LogLevel.INFO)) 10 .childHandler(new ChannelInitializer<SocketChannel> { 11 @Override 12 public void initChannel(SocketChannel ch) throws Exception { 13 ChannelPipeline p = ch.pipeline; 14 if (sslCtx != null) { 15 p.addLast(sslCtx.newHandler(ch.alloc)); 16 } 17 p.addLast(new EchoServerHandler); 18 } 19 }); 20 21 // Start the server. 22 ChannelFuture f = b.bind(PORT).sync; 23 24 // Wait until the server socket is closed. 25 f.channel.closeFuture.sync; 26 } finally { 27 // Shut down all event loops to terminate all threads. 28 bossGroup.shutdownGracefully; 29 workerGroup.shutdownGracefully; 30 }








1 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { 2 super.group(parentGroup); 3 if (childGroup == null) { 4 throw new NullPointerException("childGroup"); 5 } 6 if (this.childGroup != null) { 7 throw new IllegalStateException("childGroup set already"); 8 } 9 this.childGroup = childGroup; 10 return this; 11 }


1 public B channel(Class<? extends C> channelClass) { 2 if (channelClass == null) { 3 throw new NullPointerException("channelClass"); 4 } 5 return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); 6 }




1 public ChannelFuture bind(int inetPort) { 2 return bind(new InetSocketAddress(inetPort)); 3 } 4 5 /** 6 * Create a new {@link Channel} and bind it. 7 */ 8 public ChannelFuture bind(SocketAddress localAddress) { 9 validate; 10 if (localAddress == null) { 11 throw new NullPointerException("localAddress"); 12 } 13 return doBind(localAddress); 14 } 15 16 // AbstractBootstrap 17 private ChannelFuture doBind(final SocketAddress localAddress) { 18 final ChannelFuture regFuture = initAndRegister; 19 final Channel channel = regFuture.channel; 20 if (regFuture.cause != null) { 21 return regFuture; 22 } 23 24 if (regFuture.isDone) { 25 // At this point we know that the registration was complete and successful. 26 ChannelPromise promise = channel.newPromise; 27 doBind0(regFuture, channel, localAddress, promise); 28 return promise; 29 } else { 30 // Registration future is almost always fulfilled already, but just in case it's not. 31 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); 32 regFuture.addListener(new ChannelFutureListener { 33 @Override 34 public void operationComplete(ChannelFuture future) throws Exception { 35 Throwable cause = future.cause; 36 if (cause != null) { 37 // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an 38 // IllegalStateException once we try to access the EventLoop of the Channel. 39 promise.setFailure(cause); 40 } else { 41 // Registration was successful, so set the correct executor to use. 42 // See https://github.com/netty/netty/issues/2586 43 promise.registered; 44 45 doBind0(regFuture, channel, localAddress, promise); 46 } 47 } 48 }); 49 return promise; 50 } 51 }

我们可以看到bind的调用最终调用到了doBind(final SocketAddress),在这里我们看到先调用了initAndRegister方法进行初始化和register操作。了解JavaNIO框架的同学应该能看出来是在这个方法中将channel注册到selector中的。最后程序再调用了doBind0方法进行绑定,先按照顺序看initAndRegister方法做了什么操作。

1 // AbstractBootstrap 2 final ChannelFuture initAndRegister { 3 Channel channel = null; 4 try { 5 channel = channelFactory.newChannel; 6 init(channel); 7 } catch (Throwable t) { 8 // ... 9 } 10 11 ChannelFuture regFuture = config.group.register(channel); 12 // ... 13 return regFuture; 14 }


1 // ServerBootstrap 2 void init(Channel channel) throws Exception { 3 final Map<ChannelOption<?>, Object> options = options0; 4 synchronized (options) { 5 channel.config.setOptions(options); 6 } 7 8 // 设置channel.attr 9 final Map<AttributeKey<?>, Object> attrs = attrs0; 10 synchronized (attrs) { 11 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet) { 12 @SuppressWarnings("unchecked") 13 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey; 14 channel.attr(key).set(e.getValue); 15 } 16 } 17 18 ChannelPipeline p = channel.pipeline; 19 20 final EventLoopGroup currentChildGroup = childGroup; 21 // childGroup的handler 22 final ChannelHandler currentChildHandler = childHandler; 23 final Entry<ChannelOption<?>, Object> currentChildOptions; 24 final Entry<AttributeKey<?>, Object> currentChildAttrs; 25 synchronized (childOptions) { 26 currentChildOptions = childOptions.entrySet.toArray(newOptionArray(childOptions.size)); 27 } 28 synchronized (childAttrs) { 29 currentChildAttrs = childAttrs.entrySet.toArray(newAttrArray(childAttrs.size)); 30 } 31 // 给channelpipeline添加handler 32 p.addLast(new ChannelInitializer<Channel> { 33 @Override 34 public void initChannel(Channel ch) throws Exception { 35 final ChannelPipeline pipeline = ch.pipeline; 36 // group的handler 37 ChannelHandler handler = config.handler; 38 if (handler != null) { 39 pipeline.addLast(handler); 40 } 41 42 // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler. 43 // In this case the initChannel(...) method will only be called after this method returns. Because 44 // of this we need to ensure we add our handler in a delayed fashion so all the users handler are 45 // placed in front of the ServerBootstrapAcceptor. 46 ch.eventLoop.execute(new Runnable { 47 @Override 48 public void run { 49 pipeline.addLast(new ServerBootstrapAcceptor( 50 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 51 } 52 }); 53 } 54 }); 55 }


1 // MultithreadEventLoopGroup 2 public ChannelFuture register(Channel channel) { 3 return next.register(channel); 4 } 5 6 public EventLoop next { 7 return (EventLoop) super.next; 8 } 9 10 // SingleThreadEventLoop 11 public ChannelFuture register(Channel channel) { 12 return register(new DefaultChannelPromise(channel, this)); 13 } 14 15 @Override 16 public ChannelFuture register(final ChannelPromise promise) { 17 ObjectUtil.checkNotNull(promise, "promise"); 18 promise.channel.unsafe.register(this, promise); 19 return promise; 20 }


1 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 2 if (eventLoop == null) { 3 throw new NullPointerException("eventLoop"); 4 } 5 if (isRegistered) { 6 promise.setFailure(new IllegalStateException("registered to an event loop already")); 7 return; 8 } 9 if (!isCompatible(eventLoop)) { 10 promise.setFailure( 11 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass.getName)); 12 return; 13 } 14 // 设置eventLoop 15 AbstractChannel.this.eventLoop = eventLoop; 16 // 这里是跟Netty的线程模型有关的,注册的方法只能在channel的工作线程中执行 17 if (eventLoop.inEventLoop) { 18 register0(promise); 19 } else { 20 try { 21 eventLoop.execute(new Runnable { 22 @Override 23 public void run { 24 register0(promise); 25 } 26 }); 27 } catch (Throwable t) { 28 logger.warn( 29 "Force-closing a channel whose registration task was not accepted by an event loop: {}", 30 AbstractChannel.this, t); 31 closeForcibly; 32 closeFuture.setClosed; 33 safeSetFailure(promise, t); 34 } 35 } 36 } 37 38 // AbstractNioChannel 39 protected void doRegister throws Exception { 40 boolean selected = false; 41 for (;;) { 42 try { 43 selectionKey = javaChannel.register(eventLoop.selector, 0, this); 44 return; 45 } catch (CancelledKeyException e) { 46 // ... 47 } 48 } 49 } 50 51 // AbstractSelectableChannel 52 public final SelectionKey register(Selector sel, int ops,Object att) 53 throws ClosedChannelException 54 { 55 synchronized (regLock) { 56 if (!isOpen) 57 throw new ClosedChannelException; 58 if ((ops & ~validOps) != 0) 59 throw new IllegalArgumentException; 60 if (blocking) 61 throw new IllegalBlockingModeException; 62 SelectionKey k = findKey(sel); 63 if (k != null) { 64 k.interestOps(ops); 65 k.attach(att); 66 } 67 if (k == null) { 68 // New registration 69 synchronized (keyLock) { 70 if (!isOpen) 71 throw new ClosedChannelException; 72 k = ((AbstractSelector)sel).register(this, ops, att); 73 addKey(k); 74 } 75 } 76 return k; 77 } 78 }



1 // SingleThreadEventExecutor 2 public void execute(Runnable task) { 3 if (task == null) { 4 throw new NullPointerException("task"); 5 } 6 7 boolean inEventLoop = inEventLoop; 8 if (inEventLoop) { 9 addTask(task); 10 } else { 11 startThread; 12 addTask(task); 13 if (isShutdown && removeTask(task)) { 14 reject; 15 } 16 } 17 18 if (!addTaskWakesUp && wakesUpForTask(task)) { 19 wakeup(inEventLoop); 20 } 21 }


1 // SingleThreadEventLoop 2 private void startThread { 3 if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { 4 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { 5 doStartThread; 6 } 7 } 8 } 9 10 private void doStartThread { 11 assert thread == null; 12 executor.execute(new Runnable { 13 @Override 14 public void run { 15 thread = Thread.currentThread; 16 if (interrupted) { 17 thread.interrupt; 18 } 19 20 boolean success = false; 21 updateLastExecutionTime; 22 try { 23 SingleThreadEventExecutor.this.run; 24 success = true; 25 } catch (Throwable t) { 26 logger.warn("Unexpected exception from an event executor: ", t); 27 } finally { 28 // Some clean work 29 } 30 } 31 }); 32 }


1 protected void run { 2 for (;;) { 3 try { 4 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks)) { 5 case SelectStrategy.CONTINUE: 6 continue; 7 case SelectStrategy.SELECT: 8 select(wakenUp.getAndSet(false)); 9 if (wakenUp.get) { 10 selector.wakeup; 11 } 12 default: 13 // fallthrough 14 } 15 16 cancelledKeys = 0; 17 needsToSelectAgain = false; 18 final int ioRatio = this.ioRatio; 19 if (ioRatio == 100) { 20 processSelectedKeys; 21 runAllTasks; 22 } else { 23 final long ioStartTime = System.nanoTime; 24 25 processSelectedKeys; 26 27 final long ioTime = System.nanoTime - ioStartTime; 28 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 29 } 30 31 if (isShuttingDown) { 32 closeAll; 33 if (confirmShutdown) { 34 break; 35 } 36 } 37 } catch (Throwable t) { 38 logger.warn("Unexpected exception in the selector loop.", t); 39 40 // Prevent possible consecutive immediate failures that lead to 41 // excessive CPU consumption. 42 try { 43 Thread.sleep(1000); 44 } catch (InterruptedException e) { 45 // Ignore. 46 } 47 } 48 } 49 } 50 51 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { 52 if (selectedKeys.isEmpty) { 53 return; 54 } 55 56 Iterator<SelectionKey> i = selectedKeys.iterator; 57 for (;;) { 58 final SelectionKey k = i.next; 59 final Object a = k.attachment; 60 i.remove; 61 62 if (a instanceof AbstractNioChannel) { 63 // 处理ServerSocketChannl的事件,如accept 64 processSelectedKey(k, (AbstractNioChannel) a); 65 } else { 66 @SuppressWarnings("unchecked") 67 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; 68 processSelectedKey(k, task); 69 } 70 71 if (!i.hasNext) { 72 break; 73 } 74 75 if (needsToSelectAgain) { 76 selectAgain; 77 selectedKeys = selector.selectedKeys; 78 79 // Create the iterator again to avoid ConcurrentModificationException 80 if (selectedKeys.isEmpty) { 81 break; 82 } else { 83 i = selectedKeys.iterator; 84 } 85 } 86 } 87 }



1. ServerSocketChannel的interestOps的注册 2. accept请求的处理 3. 线程模型 4. pipeline的链式调用 5. buffer 。。。



    《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...


    3. **Bootstrap**:启动引导类,用于配置并创建一个新的 ServerBootstrap 或 ClientBootstrap,用于启动服务器或客户端。 4. **Pipeline(管道)**:数据在 Channel 之间传输时会经过一系列处理步骤,这些步骤构成...




    ### Netty源码解析——服务启动过程 #### 一、Netty概述 Netty是一个高性能、异步事件驱动的网络应用框架,它被广泛应用于快速开发高性能协议服务器和客户端。Netty通过高度优化的设计和实现提供了低延迟和高吞吐...


    ### Netty源码解析知识点概览 #### 一、Netty简介与应用场景 - **Netty**是一款由JBOSS提供的高性能的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。 - **应用场景**:Netty广泛...


    NIO服务端可以通过ServerBootstrap辅助类来启动,而NIO客户端则可以通过SocketChannel来建立连接。NIO的出现解决了传统的BIO通信中的问题,提高了IO的效率。 Netty架构剖析 Netty架构可以分为三个部分:服务端、...

    netty源码 4.*版本

    通过阅读 Netty 源码,我们可以学习到以下知识点: - 理解 Java NIO 的工作原理,如何利用 Selector 监听多个 Channel 的事件。 - 掌握 ByteBuf 的内存管理策略,如何避免不必要的内存拷贝。 - 学习如何构建 ...


    源码分析时,首先需要关注的是 Netty 的启动流程,这通常从 `ServerBootstrap` 类开始。ServerBootstrap 配置了 EventLoopGroup(包含多个 EventLoop)和 Channel 实例,如 NioServerSocketChannel。然后通过绑定...



    Netty3.x 源码解析



    《Netty源码深入剖析》一书旨在帮助读者深入了解Netty框架的工作原理和技术细节,从基础知识入手,逐步过渡到高级优化技巧,使开发者能够更好地掌握并应用Netty于实际项目中。 ### 一、Netty简介与核心特性 Netty...




    NIO客户端13 3.Netty源码分析16 3.1. 服务端创建16 3.1.1. 服务端启动辅助类ServerBootstrap16 3.1.2. NioServerSocketChannel 的注册21 3.1.3. 新的客户端接入25 3.2. 客户端创建28 3.2.1. 客户端连接辅助类...


    在描述中提到的 "netty 实战源码 13 章",可能是指一系列关于 Netty 使用和实现的教程,覆盖了从基础到进阶的多个主题。这通常会包括 Channel、Handler、ByteBuf、Pipeline 等核心组件的使用,以及如何构建服务器和...


    Netty 是一个功能强大且高效的网络应用框架,阅读 Netty 的源码可以帮助我们更好地理解网络编程的领域知识和代码结构组织的方法。同时,Netty 的设计思想和编程模型也能够帮助我们提高自己的编程技能。



Global site tag (gtag.js) - Google Analytics