前面我们分析了netty内存池、线程模型,比较难的两个点已经被一一消化,接下来我们开始进入大家最关心的环节,总体流程分析。 这里我选了io.netty.example.http.snoop来作为分析的入口,分别从server端、client端的维度来看看netty是如果设计的。这里你将了解比较详细的netty处理流程,让你在今后的应用中不再感到疑惑 。 如果还有不清楚的地方,可以直接交流,通过交流发现问题,并不断完善这系列文章。 本文假设你对netty已有大致了解,需要更深入的了解它的运作流程。如果是初学者,可能会发现头晕、眼花、脑抽经等症状。
配置篇
首先看看snoop包的server端启动代码。 在netty中,不管是server还是client,都是由引导类进行启动。在启动之前需要先做好各种参数的配置。可以配置的参数如下:
字段 | 类型 | 说明 | server模式 | client模式 |
options | Map | channel的配置项 | 作用于ServerChannel | |
childOptions | Map | channel的配置项 | 作用于Channel | |
attrs | Map | 自定义的channel属性 | 作用于ServerChannel | 作用于Channel |
childAttrs | Map | 自定义的channel属性 | 作用于Channel | |
handler | ChannelHandler | 连接处理器 | 作用于ServerChannel | 作用于Channel |
childHandler | ChannelHandler | 连接处理器 | 作用于Channel | |
group | EventLoopGroup | 注册并处理连接 | 作用于ServerChannel | 作用于Channel |
childGroup | EventLoopGroup | 注册并处理连接 | 作用于Channel | |
channelFactory | ChannelFactory | 生成连接对象的工厂类 | 生成ServerChannel | 生成Channel |
除了channelFactory所有的字段都分成了xxx和childxxx两个相对应的字段,名称上能很容易的分出来字段的作用范围。 如我们希望设置SO_REUSEADDR参数,该参数作用于ServerSocket,则设置时调用option(ChannelOption.SO_REUSEADDR, true)。对于Server端来说,比较常见的几个设置:SO_KEEPALIVE、SO_REUSEADDR、TCP_NODELAY、SO_BACKLOG。
我们知道netty采用了reactor的设计模式,其中mainReactor主要负责连接的建立,连接建立后交由subReactor处理,而subReactor则主要负责处理读写等具体的事件。这里mainReactor的实际执行者是bossGroup,而subReactor的实际执行者则是workerGroup。 下面是HttpSnoopServer类中main方法的主要代码(去掉了一部分)
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new HttpSnoopServerInitializer(sslCtx)); Channel ch = b.bind(PORT).sync().channel(); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
这里bossGroup只启用了一个线程,因为一个端口只能创建一个ServerChannel,该ServerChannel的整个生命周期都在bossGroup中。如果你想用同一个ServerBootstrap启动多个端口,则bossGroup的大小需要根据启动的端口数调整。handler设置为LogginHandler,表示在ServerChannel的处理链中加入了日志记录(这个与客户端连接无关,即它只记录ServerChannel的注册、注销、关闭等,而不会记录客户端连接的相应事件。之前有同学加了LoggingHandler而没看到客户端的相应日志,就是这样了。需要的话要在childHandler的Initializer中加入LoggingHandler)。 childHandler设置为HttpSnoopServerInitializer,即用户连接使用HttpSnoopServerInitializer进行处理。
初始化完成开始调用bind(port)方法,bind首先会对各个参数进行验证,如channelFactory是否设置,group、childGroup是否设置,端口是否设置等,验证通过后,最终调用doBind方法(AbstractBootstrap中)。
private ChannelFuture doBind(final SocketAddress localAddress) { // 初始化并注册Channel(此时是ServerChannel) final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); // 如果注册出错则直接返回 if (regFuture.cause() != null) { return regFuture; } // 注册完成调用doBind0,否则添加一个注册事件的监听器,该监听器在监听到注册完成后也会触发doBind0操作 if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // 一般来说都会是进入isDone,这里是以防万一 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
doBind首先会调用initAndRegister方法,来看看这个方法做了什么:
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); // 此时连接还未注册到EventLoopGroup,因此使用GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // 将连接注册到group中 ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
channelFactory().newChannel()方法创建了一个NioServerSocketChannel实例,该实例初始化时由SelectorProvider.provider().openServerSocketChannel()来打开一个ServerSocketChannel,同时会调用configureBlocking(false)将其IO模式设置为非阻塞。
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static ServerSocketChannel newSocket(SelectorProvider provider) { try { // 打开一个ServerSocketChannel return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel(ServerSocketChannel channel) { // 只对OP_ACCEPT事件感兴趣 super(null, channel, SelectionKey.OP_ACCEPT); // 初始化连接对应的配置 config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { // 将ServerSocketChannel设置为非阻塞模式 ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } protected AbstractChannel(Channel parent) { this.parent = parent; // 非配id,该id全局唯一 id = DefaultChannelId.newInstance(); // 初始化Unsafe, Server生成的Unsafe类为NioMessageUnsafe,Unsafe属于较底层的操作,不对应用开放 // 它处理的各种操作:register、bind、connect、disconnect、close、deregister,beginRead、write、flush unsafe = newUnsafe(); // 创建pipeline pipeline = new DefaultChannelPipeline(this); }
完成后调用init进行对该ServerSocketChannel进行其他部分的初始化,init方法主要是:1、设置option;2、设置attr;3、如果设置了handler,将handler加入到处理链中(本例中加入LoggingHandler)。最后会加入一个ChannelInitializer,该ChannelInitializer主要功能是获取客户端连接后对连接进行初始化(具体如何初始化稍后再讲)。从下面代码可以看到,所有option/childOption之类的字段最终都会生成一份copy的数据,也就是该引导类可以继续使用(但是不能多个线程同时调用),用于引导其他端口的启动。
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { // 这里的ServerBootstrapAcceptor比较重要先记住 ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
初始化完成后立即将ServerChannel注册到bossGroup中,注册的时候会进行哪些操作呢?如果你还记得之前的EventLoop源码分析,就是这一句了:channel.unsafe().register(this, promise); 这行代码最终会调用AbstractChannel.AbstractUnsafe.register(EventLoop eventLoop, final ChannelPromise promise)
方法:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ...去掉非主要代码... // channel的eventLoop被PausableChannelEventLoop包装,这样设置isAcceptingNewTasks=false时,新任务将被拒绝。这在关闭channel的时候非常有用 if (AbstractChannel.this.eventLoop == null) { AbstractChannel.this.eventLoop = new PausableChannelEventLoop(eventLoop); } else { AbstractChannel.this.eventLoop.unwrapped = eventLoop; } if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
上面的代码最重要的部分就是PausableChannelEventLoop的封装,接下来调用register0。
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // 真正的注册方法 doRegister(); neverRegistered = false; registered = true; // 注册完成以后开启接受任务的开关 eventLoop.acceptNewTasks(); safeSetSuccess(promise); // 触发channelRegistered事件 pipeline.fireChannelRegistered(); // 只有从未注册的channel才会触发channelActive,避免连接注销并重新注册时多次触发channelActive。 // 注意后面还会出现fireChannelActive方法的调用,正常的第一次启动应该是触发后面那个fireChannelActive而不是这个 if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
doRegister方法将调用sun.nio.ch.ServerSocketChannelImpl.register方法,该方法将ServerSocketChannel注册到Selector上,因为传入的ops=0,此时并不会有连接进来(到目前为止都还没有与实际的端口进行绑定)。
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // 如果发送异常则强制执行Selector.selectNow()方法使 "canceled"的SelectionKey从Selector中移除 ((NioEventLoop) eventLoop().unwrap()).selectNow(); selected = true; } else { //JDK bug ? throw e; } } } }
注册完成后调用pipeline.fireChannelRegistered(); 该方法最终会是pipeline的处理链进行链式处理,在本例中他会触发两个操作:1、LogginHandler中的channelRegistered;2、在ServerBootstrap.init(Channel)方法中的代码:
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { // 触发这里执行 ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } });
ServerBootstrapAcceptor类主要作用是接收到客户端连接后,使用childOptions和childAttrs对连接初始化,然后将连接注册到childGroup中。ServerBootstrapAcceptor的channelRead方法如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
回到主流程,如果是第一次启动触发channelActive方法,本例中主要触发LoggerHandler.channelActive。调用完成后回到AbstractBootstrap.doBind0()方法:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
doBind0最终调用channel.bind方法对执行端口进行监听。需要注意的是,为了保证线程安全,channel的所有方法都需要到EventLoop中执行。channel.bind最终调用AbstractChannel.AbstractUnsafe.bind(final SocketAddress localAddress, final ChannelPromise promise):
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { // ---这里去掉了部分代码--- boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { // 增加一个任务,该任务触发pipeline.fireChannelActive方法, 该方法将最终触发channel.read() invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } // 最终调用socket的bind方式进行绑定,注意backlog在windows下默认为200,其他系统默认128 protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog()); } // 上面的channel.read()最终会触发AbstractNioChannel.doBeginRead()方法 protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called if (inputShutdown) { return; } final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; // 注册readInterestOp,ServerSocket关注的op为OP_ACCEPT final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
到这里启动的步骤已经完成,我们再来回顾一下整个启动过程:
1、应用设置启动所需的各个参数
2、应用调用bind(port)启动监听,bind过程如下
3、验证启动参数设置是否正确,调用doBind
4、doBind创建NioServerSocketChannel,并对其进行初始化,包括创建一个实际的ServerSocket,设置其为非阻塞模式,创建底层处理实例NioMessageUnsafe,创建pipeline
5、pipeline中加入一个ChannelInitializer,该ChannelInitializer往pipleline中加入ServerBootstrapAcceptor用于接收客户连接后设置其初始化参数,然后注册到childGroup处理
6、将NioServerSocketChannel注册到bossGroup,此时bossGroup被激活开始接收任务及IO事件。
7、往EventLoop中添加一个任务,该任务的内容为将之前创建的ServerSocket绑定到指定端口。
8、绑定端口后增加一个任务,该任务内容为注册NioServerSocketChannel关注的事件OP_ACCEPT到SelectKey中。到此,服务端可以接收到来自客户端的请求。
到此,ServerBootstrap的启动过程结束,服务端可以接收到客户端的连接请求。这里还有很多概念比较模糊,pipeline.addLast进行了什么操作,pipeline.channelXXX(如channelActive)是如何最终调用到channel的对应方法的。解开了这个问题,才能往下分析NioServerSocketChannel的请求接收、分发流程。ok, 下一篇文章就是对ChannelPipeline进行分析!
来一张图解解馋,netty本身还是很复杂的,该图进行了简化。
相关推荐
### Netty学习笔记知识点概述 #### 一、Netty简介 Netty是一个广泛使用的高性能、异步事件驱动的网络应用程序框架,它简化了网络编程的复杂性,使得开发者能够更轻松地开发高性能、高可靠性的网络服务器和客户端。...
Netty笔记二主要聚焦在如何使用Netty框架在服务端和客户端之间发送对象。Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。这篇博客提供了可运行的源码,方便读者...
Netty学习笔记_Springboot实现自定义协议 本文主要介绍了使用Netty框架在Springboot项目中实现自定义协议的方法。自定义协议是指在网络通信中,使用特定的数据格式来传输数据,以满足特定的业务需求。在本文中,...
1. **Netty基础**:介绍Netty的基本概念,如Bootstrap、ServerBootstrap、Channel、Pipeline等,并演示如何创建一个简单的服务端和客户端。 2. **Netty的事件模型**:详细解释EventLoop和EventLoopGroup的工作原理...
这个“自动Netty笔记111”可能是某个开发者或学习者记录的一系列关于Netty学习过程中的关键点和理解。下面将详细阐述Netty的相关知识点。 1. **异步事件驱动模型**: Netty采用了非阻塞I/O模型,基于Java NIO(Non...
- 在Netty中,我们首先需要创建一个`ServerBootstrap`实例,它是服务器的启动类。 - 接着,配置`EventLoopGroup`,这是Netty的I/O线程模型,通常包括BossGroup(负责接收连接)和WorkerGroup(处理I/O事件)。 - ...
1. **Channel**: Channel是Netty中的基本组件,它代表了网络连接,可以读写数据。例如,Socket连接就是一个典型的Channel。 2. **Bootstrap**: Bootstrap是客户端启动器,用来配置并启动一个Channel实例。它允许...
在 "netty-learning-master" 文件中,可能包含了作者学习 Netty 的笔记、示例代码和项目结构,这些都是很好的学习资源。通过阅读和实践这些内容,你将能够深入理解 Netty 的工作原理和最佳实践。
1. **异步事件驱动**:Netty 使用了非阻塞I/O模型,基于Java NIO(非阻塞输入/输出)库,通过事件循环(EventLoop)和通道(Channel)机制,实现了高并发性能,降低了系统的资源消耗。 2. **高效的数据处理**:...
在Netty中,Server的启动通常涉及创建一个`ServerBootstrap`实例。`ServerBootstrap`是一个配置对象,用于设置服务器的各种参数,如绑定的端口、线程池、处理器管道等。例如,我们可以通过以下代码创建一个监听8080...
1. 创建服务器:通过ServerBootstrap初始化服务器,并配置所需的处理器链,例如添加ByteToMessageDecoder用于解码接收到的数据,以及业务处理器来处理具体逻辑。 2. 客户端连接:使用Bootstrap创建客户端,连接到...
在实际编程中,开发者通常会创建`ServerBootstrap`来配置和启动服务器,设置`EventLoopGroup`,指定`Channel`类型,然后通过`ChannelFuture`来监听和管理连接的状态。Netty的这种设计模式使得网络编程变得更加简洁和...