- 浏览: 10417 次
- 性别:
- 来自: 成都
最新评论
下面就开始我们的探索之旅
我下载的源码的版本是netty4.0。通过netty源码中自带的例子我们就可以看到netty是如何运行起来的,首先我们来探索客户端。
源码中的客户端启动代码,
路径:example\src\main\java\io\netty\example\echo\EchoClient
观察以上的代码,代码比较精炼。短短的几行代码就是netty客户端初始化所需的所有内容,这就是netty,它后面帮我们已经做了很多事情,我们只需要简单的使用就行。
接下来我们深入的探索代码,看看到底做了什么事件。
第一句代码就很有看头:
EventLoopGroup group = new NioEventLoopGroup();
进入NioEventLoopGroup的构造函数看看它一路都做了什么。
看了一路的super()方法,其实最重要的就是MultithreadEventExecutorGroup类
1,创建一个nThreads大小的SingleThreadEventExecutor数组
2,根据数组的长度来创建chooser,如果nThreads是2的幂,则使用 PowerOfTwoEventExecutorChooser, 如果不是使用GenericEventExecutorChooser。它们的功能就是从SingleThreadEventExecutor(children变量)数组中选出一个合适的EventExecutor(NioEventLoop)实例。
GenericEventExecutorChooser:
PowerOfTwoEventExecutorChooser:
3,调用newChild方法初始化SingleThreadEventExecutor数组,newChild方法由子类NioEventLoopGroup实现,创建的实际对象是NioEventLoop对象。
当netty需要一个EventLoop对象时,会调用next()方法获取一个NioEventLoop对象,此对象就是newChild方法生成的。
chooser对象的next方法是从SingleThreadEventExecutor(children变量)数组获取。
这里简单的提一下NioEventLoop:NioEventLoop里面包含了一个重要的变量selector,在NioEventLoop初始化的时候会对selector赋值:
小总结一下:
1,NioEventLoopGroup内部维护一个类型为EventExecutor数组(变量:children),数组里面的对象是NioEventLoop,构造了一个线程池(处理IO事件和任务的线程池,后面会详细说明)。
2,调用newChild抽象方法来初始化children数组
3,抽象方法newChild在NioEventLoopGroup中实现的,返回一个NioEventLoop实例
NioSocketChannel:
关于NioSocketChannel我使用EA画了它的类关系图,比较多,看的不是很清楚。
在netty中,channel是socket的抽象,是对socket状态和读写操作的封装。当netty每建立一个连接后,都会有一个对应的channel对象。
channel有不同的类型,分别对应着不同的协议和不同的阻塞类型,常见的有:
NioSocketChannel和NioServerSocketChannel;OioSocketChannel和OioServerSocketChannel等。我们在调用channel()方法,传入一个我们需要的channel的类型,(例子中的:channel(NioSocketChannel.class))。
进入channel方法:
BootstrapChannelFactory(AbstractBootstrap的内部类):实现了ChannelFactory接口,其中唯一的方法是newChannel(),这个就是典型生产channel的工厂类,BootstrapChannelFactory.newChannel()的实现方法
channelFactory方法把创建的BootstrapChannelFactory实例赋值给:AbstractBootstrap的channelFactory变量。
小总结一下:
通过以上的代码我们可以确定:
1,AbstractBootstrap中的ChannelFactory的实例是BootstrapChannelFactory对象。
2,channel具体的类型由我们自己决定(NioSocketChannel.class)。channel实例化过程就是调用BootstrapChannelFactory的newChannel()方法来完成。
以上的代码是创建channel的地方,那么在哪里调用这个创建channel实例的方法喃?
发现这句代码没?ChannelFuture f = b.connect(HOST, PORT).sync()。connect方法里面的其他的代码我们先不研究,看我们现在此时关心的,进入connect看到了doConnect,接着进入看到了initAndRegister。哈哈哈发现了。在initAndRegister()中有句:channel = channelFactory().newChannel();
newChannel()调用BootstrapChannelFactory实现方法,创建一个NioSocketChannel对象。
NioSocketChannel的构造函数:
newSocket方法打开一个新的Java NIO SocketChannel
this(newSocket(DEFAULT_SELECTOR_PROVIDER));这时会调用super方法最终会调用到AbstractNioByteChannel中
parent为空,ch为刚刚newSocket方法创建的SocketChannel,SelectionKey.OP_READ
继续:AbstractNioChannel
AbstractNioChannel类中保存对象:SocketChannel,SelectionKey.OP_READ。并把SocketChannel设置成非阻塞。
继续:AbstractChannel
到这里就完成了Channel的初始化工作。
小总结一下:
1,NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新的 Java NIO SocketChannel
2,AbstractChannel的初始化
parent:NULL
unsafe:newUnsafe()实例化一个unsafe对象,类型是AbstractNioByteChannel.NioByteUnsafe内部类
pipeline:DefaultChannelPipeline实例:一个channel一个管道。
3,AbstractNioChannel的初始化
ch:socketchannel
readInterestOp:OP_READ
socketchannel设置为非阻塞
4,NioSocketChannel
config:NioSocketChannelConfig
channel(socketchannel)和Pipeline(DefaultChannelPipeline)是在Channel的初始化工作关联起来的。在实例化channel时,就会实例化一个ChannelPipeline,在上面的分析中,实例出来的对象其实是DefaultChannelPipeline。那么DefaultChannelPipeline的又在做什么工作喃。继续往下看:
channel变量把创建的channel(NioSocketChannel)带入到了管道中,看到tail和head,熟悉数据结构的同学就晓得了,这个是一个双向链表的头和尾,在DefaultChannelPipeline维护了一个AbstractChannelHandlerContext为节点(TailContext和HeadContext都继承AbstractChannelHandlerContext)的双向链表。这个我们在后文单独来详细分析一下。
链表的头HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler两个接口。
按理说HeadContext只实现ChannelOutboundHandler,为什么也实现了ChannelInboundHandler还有待研究。
链表的尾TailContext实现了ChannelInboundHandler。
它们的父类AbstractChannelHandlerContext
HeadContext传入参数 inbound = false, outbound = true。
TailContext传入参数 inbound = true, outbound = false。
HeadContext是一个outboundHandler,TailContext是一个inboundHandler。
DefaultChannelPipeline的分析暂时就到这里,回到对客户端的分析。
还记得前面我们创建出来了channel吗?channel创建出来了,那么这个channel怎么使用喃?
所以我们继续回到AbstractBootstrap类的initAndRegister()方法
newChannel()方法创建出来channel后,调用init(),此方法由Bootstrap实现类完成。
当channel初始化完成后,会继续调用group().register(channel)来注册channel。group()返回是NioEventLoopGroup。
NioEventLoopGroup的register(channel)方法,其实就是MultithreadEventLoopGroup的register
next()还记得MultithreadEventExecutorGroup中的Chooser不(从children数组中选出一个合适的NioEventLoop对象),
最终会调用SingleThreadEventLoop的register方法
最终我们发现是调用到了unsafe的register 方法,那么接下来我们就仔细看一下 AbstractUnsafe.register方法中到底做了什么:
我们主要看2句代码:AbstractChannel.this.eventLoop = eventLoop;把eventLoop(其实就是MultithreadEventLoopGroup.next()获取的NioEventLoop对象,参照本文写EventLoop的地方)对象赋值给channel中的eventLoop对象。然后是register0(promise);在AbstractChannel类中。
register0又调用了AbstractNioChannel.doRegister:
看到了很熟悉的方法了。javaChannel()返回的前面初始化ch的参数,也就是SocketChannel,这里就是把channel注册到NioEventLoop对象生成的selector上(这里就是传统看的selector注册channel的方法)。这样我们将这个SocketChannel注册到与eventLoop关联的selector上了。
小总结一下:
channel的注册过程,在netty中每个channel都会关联一个EventLoop,EventLoop负责执行channel中的所有IO操作。关联好Channel和EventLoop后,调用SocketChannel的register方法,将SocketChannel注册到selector中。
我们继续看EchoClient中的这句:
handler的概念孕育而生了。netty中的handler是添加到pipeline中的,至于pipeline的实现机制后续专门拿出来分析(这篇文章的前面只是分析了一下pipeline的初始化)。
handler()方法参数是ChannelHandler对象,上面代码的参数是ChannelInitializer实现了ChannelHandler接口,并Override了initChannel方法,把我们自定义的handler加入到ChannelPipeline中。在ChannelInitializer的channelRegistered方法中会调用initChannel方法:
小总结一下:
这里只是想说明一下handler是怎么添加到ChannelPipeline中的,至于ChannelPipeline的底层机制,后面慢讲。
最后我们来分析一下客户端连接,这篇文章就要大功告成了!
ChannelFuture f = b.connect(HOST, PORT).sync();
客户端通过调用Bootstrap的connect方法进行连接,追踪connect的一系列方法:
execute方法会把这个匿名线程添加到eventloop的任务队列中,让调度线程来执行。
run方法中调用channel的connect方法, 而这个channel: NioSocketChannel(在前面的channel小结讨论过)。继续看其实是调用DefaultChannelPipeline的connect方法。
ChannelPipeline中的tail字段,前面我也提到了,tail是TailContext实例,也是 AbstractChannelHandlerContext的子类,tail.connect是调用AbstractChannelHandlerContext的connect方法。
先来看看findContextOutbound():从DefaultChannelPipeline内的双向链表的tail开始, 不断向前寻找第一个outbound为true的AbstractChannelHandlerContext,然后调用它的 invokeConnect方法。
按照前面讲pipeline的地方,双向链表的头和尾.head是HeadContext的实例,实现了ChannelOutboundHandler接口,并且它的outbound字段为true. 因此在 findContextOutbound中,找到的AbstractChannelHandlerContext对象其实就是链表的head.HeadContext从写了connect方法。
unsafe是在HeadContext构造器中pipeline.channel().unsafe()返回的,就是AbstractNioByteChannel.NioByteUnsafe内部类。
就是调用AbstractNioUnsafe中的connect方法,
doConnect方法其实又是调用到了NioSocketChannel中去了
又一次出现了javaChannel()就是socketChannel,
SocketUtils.connect:
看到了socketChannel.connect(remoteAddress);终于完成了socket连接。
以上就是客户端的初始化和连接了,好绕好绕!
再总结一下各个组件的关系:
NioEventLoop包含
selectorProvider:selector的提供者,SelectorProvider.provider()获取
selector:具体的selector。provider.openSelector()获取一个selector对象
SocketChannel实例会被注册到此selector上
NioEventLoopGroup包含
children数组,存放NioEventLoop实例
NioSocketChannel包含
SocketChannel实例
DefaultChannelPipeline实例
unsafe实例
NioEventLoop实例
在实例化channel时,就会实例化一个ChannelPipeline
DefaultChannelPipeline包含
NioSocketChannel实例
TailContext实例(AbstractChannelHandlerContext)
HeadContext实例(AbstractChannelHandlerContext)
自定义的handler
AbstractChannelHandlerContext包含
DefaultChannelPipeline实例
大概梳理了一下,以后发现要加再加!
我下载的源码的版本是netty4.0。通过netty源码中自带的例子我们就可以看到netty是如何运行起来的,首先我们来探索客户端。
源码中的客户端启动代码,
路径:example\src\main\java\io\netty\example\echo\EchoClient
final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); }
观察以上的代码,代码比较精炼。短短的几行代码就是netty客户端初始化所需的所有内容,这就是netty,它后面帮我们已经做了很多事情,我们只需要简单的使用就行。
接下来我们深入的探索代码,看看到底做了什么事件。
第一句代码就很有看头:
EventLoopGroup group = new NioEventLoopGroup();
进入NioEventLoopGroup的构造函数看看它一路都做了什么。
看了一路的super()方法,其实最重要的就是MultithreadEventExecutorGroup类
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { children = new SingleThreadEventExecutor[nThreads]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } for (int i = 0; i < nThreads; i ++) { children[i] = newChild(threadFactory, args); } }
1,创建一个nThreads大小的SingleThreadEventExecutor数组
2,根据数组的长度来创建chooser,如果nThreads是2的幂,则使用 PowerOfTwoEventExecutorChooser, 如果不是使用GenericEventExecutorChooser。它们的功能就是从SingleThreadEventExecutor(children变量)数组中选出一个合适的EventExecutor(NioEventLoop)实例。
GenericEventExecutorChooser:
public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; }
PowerOfTwoEventExecutorChooser:
public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1]; }
3,调用newChild方法初始化SingleThreadEventExecutor数组,newChild方法由子类NioEventLoopGroup实现,创建的实际对象是NioEventLoop对象。
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
当netty需要一个EventLoop对象时,会调用next()方法获取一个NioEventLoop对象,此对象就是newChild方法生成的。
public EventExecutor next() { return chooser.next(); }
chooser对象的next方法是从SingleThreadEventExecutor(children变量)数组获取。
这里简单的提一下NioEventLoop:NioEventLoop里面包含了一个重要的变量selector,在NioEventLoop初始化的时候会对selector赋值:
selector = openSelector(); private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } .............
小总结一下:
1,NioEventLoopGroup内部维护一个类型为EventExecutor数组(变量:children),数组里面的对象是NioEventLoop,构造了一个线程池(处理IO事件和任务的线程池,后面会详细说明)。
2,调用newChild抽象方法来初始化children数组
3,抽象方法newChild在NioEventLoopGroup中实现的,返回一个NioEventLoop实例
NioSocketChannel:
关于NioSocketChannel我使用EA画了它的类关系图,比较多,看的不是很清楚。
在netty中,channel是socket的抽象,是对socket状态和读写操作的封装。当netty每建立一个连接后,都会有一个对应的channel对象。
channel有不同的类型,分别对应着不同的协议和不同的阻塞类型,常见的有:
NioSocketChannel和NioServerSocketChannel;OioSocketChannel和OioServerSocketChannel等。我们在调用channel()方法,传入一个我们需要的channel的类型,(例子中的:channel(NioSocketChannel.class))。
进入channel方法:
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new BootstrapChannelFactory<C>(channelClass)); }
BootstrapChannelFactory(AbstractBootstrap的内部类):实现了ChannelFactory接口,其中唯一的方法是newChannel(),这个就是典型生产channel的工厂类,BootstrapChannelFactory.newChannel()的实现方法
public T newChannel() { try { return clazz.newInstance();//创建一个channel的实例,比如:NioSocketChannel } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
channelFactory方法把创建的BootstrapChannelFactory实例赋值给:AbstractBootstrap的channelFactory变量。
小总结一下:
通过以上的代码我们可以确定:
1,AbstractBootstrap中的ChannelFactory的实例是BootstrapChannelFactory对象。
2,channel具体的类型由我们自己决定(NioSocketChannel.class)。channel实例化过程就是调用BootstrapChannelFactory的newChannel()方法来完成。
以上的代码是创建channel的地方,那么在哪里调用这个创建channel实例的方法喃?
发现这句代码没?ChannelFuture f = b.connect(HOST, PORT).sync()。connect方法里面的其他的代码我们先不研究,看我们现在此时关心的,进入connect看到了doConnect,接着进入看到了initAndRegister。哈哈哈发现了。在initAndRegister()中有句:channel = channelFactory().newChannel();
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory().newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { channel.unsafe().closeForcibly(); } return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
newChannel()调用BootstrapChannelFactory实现方法,创建一个NioSocketChannel对象。
NioSocketChannel的构造函数:
public NioSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
newSocket方法打开一个新的Java NIO SocketChannel
private static SocketChannel newSocket(SelectorProvider provider) { try { return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } }
this(newSocket(DEFAULT_SELECTOR_PROVIDER));这时会调用super方法最终会调用到AbstractNioByteChannel中
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); }
parent为空,ch为刚刚newSocket方法创建的SocketChannel,SelectionKey.OP_READ
继续:AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
AbstractNioChannel类中保存对象:SocketChannel,SelectionKey.OP_READ。并把SocketChannel设置成非阻塞。
继续:AbstractChannel
protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
到这里就完成了Channel的初始化工作。
小总结一下:
1,NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新的 Java NIO SocketChannel
2,AbstractChannel的初始化
parent:NULL
unsafe:newUnsafe()实例化一个unsafe对象,类型是AbstractNioByteChannel.NioByteUnsafe内部类
pipeline:DefaultChannelPipeline实例:一个channel一个管道。
3,AbstractNioChannel的初始化
ch:socketchannel
readInterestOp:OP_READ
socketchannel设置为非阻塞
4,NioSocketChannel
config:NioSocketChannelConfig
channel(socketchannel)和Pipeline(DefaultChannelPipeline)是在Channel的初始化工作关联起来的。在实例化channel时,就会实例化一个ChannelPipeline,在上面的分析中,实例出来的对象其实是DefaultChannelPipeline。那么DefaultChannelPipeline的又在做什么工作喃。继续往下看:
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
channel变量把创建的channel(NioSocketChannel)带入到了管道中,看到tail和head,熟悉数据结构的同学就晓得了,这个是一个双向链表的头和尾,在DefaultChannelPipeline维护了一个AbstractChannelHandlerContext为节点(TailContext和HeadContext都继承AbstractChannelHandlerContext)的双向链表。这个我们在后文单独来详细分析一下。
链表的头HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler两个接口。
按理说HeadContext只实现ChannelOutboundHandler,为什么也实现了ChannelInboundHandler还有待研究。
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); }
链表的尾TailContext实现了ChannelInboundHandler。
TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); }
它们的父类AbstractChannelHandlerContext
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; ordered = executor == null || executor instanceof OrderedEventExecutor; }
HeadContext传入参数 inbound = false, outbound = true。
TailContext传入参数 inbound = true, outbound = false。
HeadContext是一个outboundHandler,TailContext是一个inboundHandler。
DefaultChannelPipeline的分析暂时就到这里,回到对客户端的分析。
还记得前面我们创建出来了channel吗?channel创建出来了,那么这个channel怎么使用喃?
所以我们继续回到AbstractBootstrap类的initAndRegister()方法
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory().newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { channel.unsafe().closeForcibly(); } return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
newChannel()方法创建出来channel后,调用init(),此方法由Bootstrap实现类完成。
void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(handler()); final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } } }
当channel初始化完成后,会继续调用group().register(channel)来注册channel。group()返回是NioEventLoopGroup。
NioEventLoopGroup的register(channel)方法,其实就是MultithreadEventLoopGroup的register
public ChannelFuture register(Channel channel) { return next().register(channel); }
next()还记得MultithreadEventExecutorGroup中的Chooser不(从children数组中选出一个合适的NioEventLoop对象),
最终会调用SingleThreadEventLoop的register方法
public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; }
最终我们发现是调用到了unsafe的register 方法,那么接下来我们就仔细看一下 AbstractUnsafe.register方法中到底做了什么:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
我们主要看2句代码:AbstractChannel.this.eventLoop = eventLoop;把eventLoop(其实就是MultithreadEventLoopGroup.next()获取的NioEventLoop对象,参照本文写EventLoop的地方)对象赋值给channel中的eventLoop对象。然后是register0(promise);在AbstractChannel类中。
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
register0又调用了AbstractNioChannel.doRegister:
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } }
看到了很熟悉的方法了。javaChannel()返回的前面初始化ch的参数,也就是SocketChannel,这里就是把channel注册到NioEventLoop对象生成的selector上(这里就是传统看的selector注册channel的方法)。这样我们将这个SocketChannel注册到与eventLoop关联的selector上了。
小总结一下:
channel的注册过程,在netty中每个channel都会关联一个EventLoop,EventLoop负责执行channel中的所有IO操作。关联好Channel和EventLoop后,调用SocketChannel的register方法,将SocketChannel注册到selector中。
我们继续看EchoClient中的这句:
.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } });
handler的概念孕育而生了。netty中的handler是添加到pipeline中的,至于pipeline的实现机制后续专门拿出来分析(这篇文章的前面只是分析了一下pipeline的初始化)。
handler()方法参数是ChannelHandler对象,上面代码的参数是ChannelInitializer实现了ChannelHandler接口,并Override了initChannel方法,把我们自定义的handler加入到ChannelPipeline中。在ChannelInitializer的channelRegistered方法中会调用initChannel方法:
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (initChannel(ctx)) { ctx.pipeline().fireChannelRegistered(); } else { ctx.fireChannelRegistered(); } }
小总结一下:
这里只是想说明一下handler是怎么添加到ChannelPipeline中的,至于ChannelPipeline的底层机制,后面慢讲。
最后我们来分析一下客户端连接,这篇文章就要大功告成了!
ChannelFuture f = b.connect(HOST, PORT).sync();
客户端通过调用Bootstrap的connect方法进行连接,追踪connect的一系列方法:
private static void doConnect0( final ChannelFuture regFuture, final Channel channel, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { if (localAddress == null) { channel.connect(remoteAddress, promise); } else { channel.connect(remoteAddress, localAddress, promise); } promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
execute方法会把这个匿名线程添加到eventloop的任务队列中,让调度线程来执行。
run方法中调用channel的connect方法, 而这个channel: NioSocketChannel(在前面的channel小结讨论过)。继续看其实是调用DefaultChannelPipeline的connect方法。
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { return tail.connect(remoteAddress, localAddress); }
ChannelPipeline中的tail字段,前面我也提到了,tail是TailContext实例,也是 AbstractChannelHandlerContext的子类,tail.connect是调用AbstractChannelHandlerContext的connect方法。
public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } if (!validatePromise(promise, false)) { return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeConnect(remoteAddress, localAddress, promise); } }, promise, null); } return promise; }
先来看看findContextOutbound():从DefaultChannelPipeline内的双向链表的tail开始, 不断向前寻找第一个outbound为true的AbstractChannelHandlerContext,然后调用它的 invokeConnect方法。
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { connect(remoteAddress, localAddress, promise); } }
按照前面讲pipeline的地方,双向链表的头和尾.head是HeadContext的实例,实现了ChannelOutboundHandler接口,并且它的outbound字段为true. 因此在 findContextOutbound中,找到的AbstractChannelHandlerContext对象其实就是链表的head.HeadContext从写了connect方法。
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
unsafe是在HeadContext构造器中pipeline.channel().unsafe()返回的,就是AbstractNioByteChannel.NioByteUnsafe内部类。
protected class NioByteUnsafe extends AbstractNioUnsafe
就是调用AbstractNioUnsafe中的connect方法,
........ if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } .......
doConnect方法其实又是调用到了NioSocketChannel中去了
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { doBind0(localAddress); } boolean success = false; try { boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }
又一次出现了javaChannel()就是socketChannel,
SocketUtils.connect:
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { return socketChannel.connect(remoteAddress); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
看到了socketChannel.connect(remoteAddress);终于完成了socket连接。
以上就是客户端的初始化和连接了,好绕好绕!
再总结一下各个组件的关系:
NioEventLoop包含
selectorProvider:selector的提供者,SelectorProvider.provider()获取
selector:具体的selector。provider.openSelector()获取一个selector对象
SocketChannel实例会被注册到此selector上
NioEventLoopGroup包含
children数组,存放NioEventLoop实例
NioSocketChannel包含
SocketChannel实例
DefaultChannelPipeline实例
unsafe实例
NioEventLoop实例
在实例化channel时,就会实例化一个ChannelPipeline
DefaultChannelPipeline包含
NioSocketChannel实例
TailContext实例(AbstractChannelHandlerContext)
HeadContext实例(AbstractChannelHandlerContext)
自定义的handler
AbstractChannelHandlerContext包含
DefaultChannelPipeline实例
大概梳理了一下,以后发现要加再加!
发表评论
-
netty探索之旅八
2017-03-09 13:27 363我们继续EventLoop。走起! 在前一节我们谈到了一个e ... -
netty探索之旅七
2017-03-06 16:28 363前面我们分析了Pipeline,还有一个东西值得我们去研究研究 ... -
netty探索之旅六
2017-03-05 15:45 397netty中的管道--ChannelPipeline的事件传输 ... -
netty探索之旅五
2017-03-02 11:09 395netty中的管道--ChannelPipeline机制 前 ... -
netty探索之旅二
2017-02-13 13:30 320上一篇只是简单的介绍了一下NIO中的Selector。 这里我 ... -
netty探索之旅四
2017-02-27 16:19 360上一篇我们研究了netty的客户端代码,这一篇我们研究一下服务 ... -
netty探索之旅一
2017-02-12 16:38 345其实一直都在关注NETTY,前面也花了点时间去看过,但是 ...
相关推荐
Java进阶技术-netty进阶之路
《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...
Netty进阶之路 跟着案例学Netty 整本书无密码,Netty进阶之路 跟着案例学Netty
《Netty进阶之路:跟着案例学Netty》中的案例涵盖了Netty的启动和停止、内存、并发多线程、性能、可靠性、安全等方面,囊括了Netty绝大多数常用的功能及容易让人犯错的地方。在案例的分析过程中,还穿插讲解了Netty...
精选自1000多个一线业务实际案例,从原理到实践全景式讲解Netty项目实践,快速领悟Netty专家花大量时间积累的经验,提高编程水平及分析解决问题的能力,《Netty木又威指南》作者力作,众专家力荐 Netty将Java NIO...
Netty学习之ServerChannel Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本篇中,我们将深入探讨ServerChannel这一核心概念,它是Netty中用于接收客户端...
3. **Netty 的核心组件** - **Bootstrap**:用于启动服务器或客户端的配置类。 - **Channel**:代表一个连接,可以读写数据。 - **EventLoopGroup**:负责事件的处理和调度,通常有两组线程池,一个用于接收连接...
《Netty权威指南》是一本深入探讨Netty框架的详细教程,旨在帮助读者全面理解并熟练运用Netty进行高性能网络应用开发。Netty是Java领域的一款开源、异步事件驱动的网络应用程序框架,广泛应用于高性能服务器和客户端...
在“Netty 3中文版”中,我们可以深入了解到这个强大的Java网络库,它为开发者提供了构建网络应用的强大工具。下面将详细阐述Netty 3的核心概念、特性以及实际应用中的关键知识点。 1. **核心概念** - **NIO (非...
读书笔记:Netty权威指南学习之旅
在 Netty 3 的配置中,我们需要确保包含所有必要的 Netty jar 包,以及其他可能的第三方库,如 Log4j 或 SLF4J 用于日志记录。 2. **src/main/java**:源代码目录,包含你的服务器或客户端应用的 Java 类。在 Netty...
3. ChannelPipeline:ChannelPipeline是Netty中的核心概念之一,负责管理和传递事件流。 4. ChannelHandler:它是业务逻辑处理的核心,不同的Handler负责不同类型的任务。 5. 编解码器:Netty提供了很多内置的编...
3. **ByteBuf**:Netty的字节缓冲区,用于高效地处理网络数据。相比Java的ByteBuffer,ByteBuf提供了一套更友好的API。 4. **Pipeline**:数据在Channel中传输时会经过一系列处理器,这些处理器组成了Pipeline。每...
《Netty权威指南》是一本深入讲解Netty框架的书籍,专为希望快速掌握Netty开发的读者设计。Netty是Java平台上的一个高性能、异步事件驱动的网络应用程序框架,广泛应用于分布式系统、微服务架构以及高并发的网络应用...
3. **ByteBuf**:Netty提供了自己的ByteBuf类,作为缓冲区,它比Java的ByteBuffer更易用且高效,支持直接内存和堆内存操作,避免了频繁的内存复制。 4. **零拷贝**:Netty通过使用FileRegion实现零拷贝,减少了CPU...
在《Netty进阶之路:跟着案例学Netty》中,作者将在过去几年实践中遇到的问题,以及Netty学习者咨询的相关问题,进行了归纳和总结,以问题案例做牵引,通过对案例进行剖析,讲解问题背后的原理,并结合Netty源码分析...
《Netty实战》这本书是针对Java网络编程框架Netty的一本深入实践教程,旨在帮助读者掌握Netty的核心特性和实际应用。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于各种分布式系统、微服务架构以及...
Netty基础,用于学习Netty,参考黑马程序员的netty教程
在这个“Netty Protobuf3 测试服务器”项目中,开发者使用Netty框架来构建一个服务器,该服务器与Unity游戏引擎中的protobuf3(Protocol Buffers版本3)进行通信。protobuf3是Google开发的一种数据序列化协议,它...
**三、Netty 在实际项目中的应用** 1. **分布式系统**:Netty 的高性能和低延迟特性使其成为分布式系统间的通信首选。 2. **游戏服务器**:游戏服务器需要处理大量并发连接和快速响应,Netty 的异步模型和高性能...