- 浏览: 10404 次
- 性别:
- 来自: 成都
最新评论
上一篇我们研究了netty的客户端代码,这一篇我们研究一下服务端代码
以下是源码中服务端的启动代码
路径:example\src\main\java\io\netty\example\echo\EchoServer
和客户端的代码的层次比较相似。
NioEventLoopGroup:无论客户端还是服务端都需要指定。不过服务端中指定了两个NioEventLoopGroup,变量是bossGroup用于处理客户端的连接请求,变量workerGroup用于处理与各个客户端连接的IO操作。
Channel类型:因为是服务器端,所以是NioServerSocketChannel。
channel的初始化过程和客户端的大同小异。
同:结构基本一致
异:参数的类型不一样
客户端的Bootstrap和服务端的ServerBootstrap;客户端的NioSocketChannel和服务端的NioServerSocketChannel;
NioServerSocketChannel:
注意这里是openServerSocketChannel()。
this(newSocket(DEFAULT_SELECTOR_PROVIDER)):
这里调用父类构造器时,传入的参数是SelectionKey.OP_ACCEPT。客户端的NioSocketChannel传入的参数是SelectionKey.OP_READ。看过我关于Reactor和Proactor模式的文章中,就知道NIO是一种Reactor模式,通过selector来实现I/O的多路复用,服务端开始时需要监听客户端的连接请求,因此在这里我们设置了SelectionKey.OP_ACCEPT,即通知selector我们对客户端的连接请求事件感兴趣。
同样在AbstractChannel中会实例化一个unsafe和pipeline:值得注意的是客户端的unsafe是一个AbstractNioByteChannel:NioByteUnsafe的实例。服务端的NioServerSocketChannel是继承的AbstractNioMessageChannel类,在此类中重写了newUnsafe方法,
服务器端,unsafe是AbstractNioMessageChannel.AbstractNioUnsafe的实例。
小总结一下:
1,NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER)打开一个Java NIO ServerSocketChannel。
2,AbstractChannel(Channel parent)中初始化AbstractChannel属性:
parent 属性置为 null
unsafe 通过newUnsafe()实例化一个unsafe对象,类型是 AbstractNioMessageChannel.AbstractNioUnsafe
pipeline是new DefaultChannelPipeline(this)创建的实例.
3,AbstractNioChannel的属性:
SelectableChannel ch 被设置为ServerSocketChannel.
readInterestOp 被设置为 SelectionKey.OP_ACCEPT
SelectableChannel ch 被配置为非阻塞的 ch.configureBlocking(false)
4,NioServerSocketChannel的属性:
ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket())
ChannelPipeline初始化和channel的注册
这2块客户端和服务端过程是一样的。可以参考上一篇http://jishuaige.iteye.com/admin/blogs/2356798。
NioEventLoopGroup
在客户端的时候,我们只提供了一个NioEventLoopGroup对象,而在服务端我们初始化了2个NioEventLoopGroup对象,一个bossGroup,一个workerGroup。
bossGroup:用于服务端处理客户端的连接请求。类似于前台接待员。
workerGroup:负责客户端连接通道的IO操作。就是实际做事情的人。
bossGroup把人员领进门后,就可以等待下一个人员进来。人员进门之后workerGroup就负责对他进行服务了。
通过源码来看看这两位的工作:
EchoServer中:
看看ServerBootstrap中的group方法
把bossGroup赋值给了ServerBootstrap的父类AbstractBootstrap的group(EventLoopGroup)变量。
把workerGroup赋值给了ServerBootstrap的childGroup(EventLoopGroup)变量。
这个方法调用链是:
AbstractBootstrap.bind-->AbstractBootstrap.doBind--> AbstractBootstrap.initAndRegister
又看到了initAndRegister,前面在分析客户端的时候就看过了,里面有几段重要的代码
newChannel()是一个NioServerSocketChannel实例,group()就是bossGroup(NioEventLoopGroup),group().register(channel)【和客户端的一样】将NioServerSocketChannel和bossGroup关联起来(将ServerSocketChannel注册到eventLoop关联的selector上)。
接着我们来看看服务端是怎样接受客户端请求的。首先根据以上的分析,我们已经知道NioServerSocketChannel是对SelectionKey.OP_ACCEPT事件感兴趣。那么当ACCEPT事件发生的时候netty是在哪里进行处理的喃?
回到b.bind(PORT)的调用链AbstractBootstrap.bind -> AbstractBootstrap.doBind方法中可以看到doBind0方法。
eventLoop()返回NioEventLoop对象,调用NioEventLoop.execute的方法
继续看里面的startThread()方法:
thread对象是什么喃!还记得NioEventLoop的初始化方法不?在一系列的super方法后在SingleThreadEventExecutor的构造方法中
对thread变量赋值一个匿名线程,回到startThread()方法中。这个方法中会将这个匿名线程启动起来。上面代码中run()方法会调用。注意:SingleThreadEventExecutor.this.run()
SingleThreadEventExecutor的子类NioEventLoop重写了run()方法,
1,轮询selector上的所有的channel的IO事件:select(wakenUp.getAndSet(false));
2,处理产生网络IO事件的channel:processSelectedKeys();
3,处理任务队列runAllTasks();
当发生了OP_ACCEPT事件就绪后,processSelectedKeys就开始处理就绪的事件,继续跟踪:
看到NioEventLoop中的processSelectedKey方法:
完成了客户端的连接操作,删除SelectionKey.OP_CONNECT事件。从以上代码看到OP_READ事件就绪后,会调用unsafe.read();就是调用到NioMessageUnsafe的read方法:看重点代码
继续调用到NioServerSocketChannel.doReadMessages方法:
在doReadMessages中,通过javaChannel().accept()获取到客户端新连接的SocketChannel,接着就实例化一个NioSocketChannel,并且传入NioServerSocketChannel对象。这个方法后返回到NioMessageUnsafe的read方法中
当有读取信息的后,通过ChannelPipeline机制,将读取事件逐级发送到各个handler中,这样就顺利完成数据读取。
上面的分析,我们已经知道了bossGroup和NioServerSocketChannel关联起来了,那么剩下的 workerGroup怎么关联喃???这个我们就要回到initAndRegister方法中接着往下看了:init方法。在ServerBootstrap中重写了init方法
ch.eventLoop().execute方法会把这个匿名线程放到SingleThreadEventExecutor的任务队列中。请关注:pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler,currentChildOptions,currentChildAttrs));这句代码中的ServerBootstrapAcceptor对象(这句代码把ServerBootstrapAcceptor对象当成了一个handle放入了管道的最后)。childGroup对象就是前面的workerGroup赋值的值。
ServerBootstrapAcceptor中重写了channelRead方法
Channel child是一个NioSocketChannel,通过childGroup.register(child)这样workerGroup就和NioSocketChannel关联起来了。
workerGroup关联NioSocketChannel。
bossGroup关联NioServerSocketChannel。
看看ServerBootstrapAcceptor中的channelRead方法是怎么调用的。
上面提到了:当有读取信息的后,通过ChannelPipeline机制,将读取事件逐级发送到各个handler中的方法:
这个代码会调用到DefaultChannelPipeline的fireChannelRead方法中:
继续:AbstractChannelHandlerContext的invokeChannelRead
会调用next(HeadContext):AbstractChannelHandlerContext的方法invokeChannelRead,也是在AbstractChannelHandlerContext中:
handler()就会得到ServerBootstrapAcceptor对象,这样就调到了ServerBootstrapAcceptor重写的channelRead方法。
还记得我们上面的ServerBootstrap的init方法吗?在这个方法中添加了handler,方法里面通过handler()方法获取一个handler,不为空的情况下会添加到pipeline中,那么handler()得到的是什么对象喃?
其实handler()返回的就是我们在EchoServer类中添加的handler:
在ServerBootstrap初始化时的管道里面的handler情况是:
head---->ChannelInitializer--->tail
根据上一篇客户端的经验,当channel绑定到eventLoop后,这里是NioServerSocketChannel绑定到bossGroup中,会在pipeline中发出fireChannelRegistered事件,接着就会触发 ChannelInitializer.initChannel方法的调用这个方法被覆盖了的:
这个时候NioServerSocketChannel对应的管道里面就是:
head---->LoggingHandler--->ServerBootstrapAcceptor--->tail
在ServerBootstrapAcceptor.channelRead中会为新建的Channel设置handler并注册到一个 eventLoop中:
childHandler就是我们在启动的时候,设置的:
当这个客户端连接Channel注册到eventLoop后就会触发事件,调用ChannelInitializer.initChannel方法。那么新客户端连接后,NioSocketChannel对于的管道里面对应就是:
head---->sslHandler--->EchoServerHandler--->tail
看了服务端有2个handler:一个是通过handler()方法设置handler字段,一个是过 childHandler()设置childHandler字段。handler负责处理客户端的连接请求;childHandler 就是负责和客户端的连接的IO交互。
算是走完了服务端和客户端吧!但是还是比较混乱,只是跟着代码在走,还是跳跃的在走。有时自己都走晕了。至于netty为什么要这样写,这样写的优势,各种机制的实现还是没有分析到位,后面再努力吧!
以下是源码中服务端的启动代码
路径:example\src\main\java\io\netty\example\echo\EchoServer
public static void main(String[] args) throws Exception { final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoServerHandler()); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
和客户端的代码的层次比较相似。
NioEventLoopGroup:无论客户端还是服务端都需要指定。不过服务端中指定了两个NioEventLoopGroup,变量是bossGroup用于处理客户端的连接请求,变量workerGroup用于处理与各个客户端连接的IO操作。
Channel类型:因为是服务器端,所以是NioServerSocketChannel。
channel的初始化过程和客户端的大同小异。
同:结构基本一致
异:参数的类型不一样
客户端的Bootstrap和服务端的ServerBootstrap;客户端的NioSocketChannel和服务端的NioServerSocketChannel;
NioServerSocketChannel:
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
注意这里是openServerSocketChannel()。
this(newSocket(DEFAULT_SELECTOR_PROVIDER)):
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
这里调用父类构造器时,传入的参数是SelectionKey.OP_ACCEPT。客户端的NioSocketChannel传入的参数是SelectionKey.OP_READ。看过我关于Reactor和Proactor模式的文章中,就知道NIO是一种Reactor模式,通过selector来实现I/O的多路复用,服务端开始时需要监听客户端的连接请求,因此在这里我们设置了SelectionKey.OP_ACCEPT,即通知selector我们对客户端的连接请求事件感兴趣。
同样在AbstractChannel中会实例化一个unsafe和pipeline:值得注意的是客户端的unsafe是一个AbstractNioByteChannel:NioByteUnsafe的实例。服务端的NioServerSocketChannel是继承的AbstractNioMessageChannel类,在此类中重写了newUnsafe方法,
@Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); }
服务器端,unsafe是AbstractNioMessageChannel.AbstractNioUnsafe的实例。
小总结一下:
1,NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER)打开一个Java NIO ServerSocketChannel。
2,AbstractChannel(Channel parent)中初始化AbstractChannel属性:
parent 属性置为 null
unsafe 通过newUnsafe()实例化一个unsafe对象,类型是 AbstractNioMessageChannel.AbstractNioUnsafe
pipeline是new DefaultChannelPipeline(this)创建的实例.
3,AbstractNioChannel的属性:
SelectableChannel ch 被设置为ServerSocketChannel.
readInterestOp 被设置为 SelectionKey.OP_ACCEPT
SelectableChannel ch 被配置为非阻塞的 ch.configureBlocking(false)
4,NioServerSocketChannel的属性:
ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket())
ChannelPipeline初始化和channel的注册
这2块客户端和服务端过程是一样的。可以参考上一篇http://jishuaige.iteye.com/admin/blogs/2356798。
NioEventLoopGroup
在客户端的时候,我们只提供了一个NioEventLoopGroup对象,而在服务端我们初始化了2个NioEventLoopGroup对象,一个bossGroup,一个workerGroup。
bossGroup:用于服务端处理客户端的连接请求。类似于前台接待员。
workerGroup:负责客户端连接通道的IO操作。就是实际做事情的人。
bossGroup把人员领进门后,就可以等待下一个人员进来。人员进门之后workerGroup就负责对他进行服务了。
通过源码来看看这两位的工作:
EchoServer中:
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup)
看看ServerBootstrap中的group方法
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }
把bossGroup赋值给了ServerBootstrap的父类AbstractBootstrap的group(EventLoopGroup)变量。
把workerGroup赋值给了ServerBootstrap的childGroup(EventLoopGroup)变量。
ChannelFuture f = b.bind(PORT).sync();
这个方法调用链是:
AbstractBootstrap.bind-->AbstractBootstrap.doBind--> AbstractBootstrap.initAndRegister
又看到了initAndRegister,前面在分析客户端的时候就看过了,里面有几段重要的代码
final Channel channel = channelFactory().newChannel(); init(channel); ChannelFuture regFuture = group().register(channel);
newChannel()是一个NioServerSocketChannel实例,group()就是bossGroup(NioEventLoopGroup),group().register(channel)【和客户端的一样】将NioServerSocketChannel和bossGroup关联起来(将ServerSocketChannel注册到eventLoop关联的selector上)。
接着我们来看看服务端是怎样接受客户端请求的。首先根据以上的分析,我们已经知道NioServerSocketChannel是对SelectionKey.OP_ACCEPT事件感兴趣。那么当ACCEPT事件发生的时候netty是在哪里进行处理的喃?
回到b.bind(PORT)的调用链AbstractBootstrap.bind -> AbstractBootstrap.doBind方法中可以看到doBind0方法。
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
eventLoop()返回NioEventLoop对象,调用NioEventLoop.execute的方法
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
继续看里面的startThread()方法:
private void startThread() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { thread.start(); } } }
thread对象是什么喃!还记得NioEventLoop的初始化方法不?在一系列的super方法后在SingleThreadEventExecutor的构造方法中
.................... thread = threadFactory.newThread(new Runnable() { @Override public void run() { boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; .......................
对thread变量赋值一个匿名线程,回到startThread()方法中。这个方法中会将这个匿名线程启动起来。上面代码中run()方法会调用。注意:SingleThreadEventExecutor.this.run()
SingleThreadEventExecutor的子类NioEventLoop重写了run()方法,
@Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
1,轮询selector上的所有的channel的IO事件:select(wakenUp.getAndSet(false));
2,处理产生网络IO事件的channel:processSelectedKeys();
3,处理任务队列runAllTasks();
当发生了OP_ACCEPT事件就绪后,processSelectedKeys就开始处理就绪的事件,继续跟踪:
看到NioEventLoop中的processSelectedKey方法:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop != this || eventLoop == null) { return; } unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps();NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect();
完成了客户端的连接操作,删除SelectionKey.OP_CONNECT事件。从以上代码看到OP_READ事件就绪后,会调用unsafe.read();就是调用到NioMessageUnsafe的read方法:看重点代码
int localRead = doReadMessages(readBuf);
继续调用到NioServerSocketChannel.doReadMessages方法:
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
在doReadMessages中,通过javaChannel().accept()获取到客户端新连接的SocketChannel,接着就实例化一个NioSocketChannel,并且传入NioServerSocketChannel对象。这个方法后返回到NioMessageUnsafe的read方法中
for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); }
当有读取信息的后,通过ChannelPipeline机制,将读取事件逐级发送到各个handler中,这样就顺利完成数据读取。
上面的分析,我们已经知道了bossGroup和NioServerSocketChannel关联起来了,那么剩下的 workerGroup怎么关联喃???这个我们就要回到initAndRegister方法中接着往下看了:init方法。在ServerBootstrap中重写了init方法
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
ch.eventLoop().execute方法会把这个匿名线程放到SingleThreadEventExecutor的任务队列中。请关注:pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler,currentChildOptions,currentChildAttrs));这句代码中的ServerBootstrapAcceptor对象(这句代码把ServerBootstrapAcceptor对象当成了一个handle放入了管道的最后)。childGroup对象就是前面的workerGroup赋值的值。
ServerBootstrapAcceptor中重写了channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
Channel child是一个NioSocketChannel,通过childGroup.register(child)这样workerGroup就和NioSocketChannel关联起来了。
workerGroup关联NioSocketChannel。
bossGroup关联NioServerSocketChannel。
看看ServerBootstrapAcceptor中的channelRead方法是怎么调用的。
上面提到了:当有读取信息的后,通过ChannelPipeline机制,将读取事件逐级发送到各个handler中的方法:
pipeline.fireChannelRead(readBuf.get(i));
这个代码会调用到DefaultChannelPipeline的fireChannelRead方法中:
@Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
继续:AbstractChannelHandlerContext的invokeChannelRead
static void invokeChannelRead(final AbstractChannelHandlerContext next, final Object msg) { ObjectUtil.checkNotNull(msg, "msg"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(msg); } }); } }
会调用next(HeadContext):AbstractChannelHandlerContext的方法invokeChannelRead,也是在AbstractChannelHandlerContext中:
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
handler()就会得到ServerBootstrapAcceptor对象,这样就调到了ServerBootstrapAcceptor重写的channelRead方法。
还记得我们上面的ServerBootstrap的init方法吗?在这个方法中添加了handler,方法里面通过handler()方法获取一个handler,不为空的情况下会添加到pipeline中,那么handler()得到的是什么对象喃?
其实handler()返回的就是我们在EchoServer类中添加的handler:
.handler(new LoggingHandler(LogLevel.INFO))
在ServerBootstrap初始化时的管道里面的handler情况是:
head---->ChannelInitializer--->tail
根据上一篇客户端的经验,当channel绑定到eventLoop后,这里是NioServerSocketChannel绑定到bossGroup中,会在pipeline中发出fireChannelRegistered事件,接着就会触发 ChannelInitializer.initChannel方法的调用这个方法被覆盖了的:
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = handler(); if (handler != null) { pipeline.addLast(handler); } pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } });
这个时候NioServerSocketChannel对应的管道里面就是:
head---->LoggingHandler--->ServerBootstrapAcceptor--->tail
在ServerBootstrapAcceptor.channelRead中会为新建的Channel设置handler并注册到一个 eventLoop中:
final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } });
childHandler就是我们在启动的时候,设置的:
.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new EchoServerHandler()); } });
当这个客户端连接Channel注册到eventLoop后就会触发事件,调用ChannelInitializer.initChannel方法。那么新客户端连接后,NioSocketChannel对于的管道里面对应就是:
head---->sslHandler--->EchoServerHandler--->tail
看了服务端有2个handler:一个是通过handler()方法设置handler字段,一个是过 childHandler()设置childHandler字段。handler负责处理客户端的连接请求;childHandler 就是负责和客户端的连接的IO交互。
算是走完了服务端和客户端吧!但是还是比较混乱,只是跟着代码在走,还是跳跃的在走。有时自己都走晕了。至于netty为什么要这样写,这样写的优势,各种机制的实现还是没有分析到位,后面再努力吧!
发表评论
-
netty探索之旅八
2017-03-09 13:27 362我们继续EventLoop。走起! 在前一节我们谈到了一个e ... -
netty探索之旅七
2017-03-06 16:28 363前面我们分析了Pipeline,还有一个东西值得我们去研究研究 ... -
netty探索之旅六
2017-03-05 15:45 396netty中的管道--ChannelPipeline的事件传输 ... -
netty探索之旅五
2017-03-02 11:09 393netty中的管道--ChannelPipeline机制 前 ... -
netty探索之旅三
2017-02-17 16:00 501下面就开始我们的探索 ... -
netty探索之旅二
2017-02-13 13:30 319上一篇只是简单的介绍了一下NIO中的Selector。 这里我 ... -
netty探索之旅一
2017-02-12 16:38 343其实一直都在关注NETTY,前面也花了点时间去看过,但是 ...
相关推荐
Java进阶技术-netty进阶之路
《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...
Netty进阶之路 跟着案例学Netty 整本书无密码,Netty进阶之路 跟着案例学Netty
《Netty进阶之路:跟着案例学Netty》中的案例涵盖了Netty的启动和停止、内存、并发多线程、性能、可靠性、安全等方面,囊括了Netty绝大多数常用的功能及容易让人犯错的地方。在案例的分析过程中,还穿插讲解了Netty...
ChannelHandlerAdapter 4.X版本和5.X版本的差别很大。ChannelRead是属于5.X版本的4.X版本没有这个方法,所以如果要用ChannelRead。可以更换5.X版本的Netty。
精选自1000多个一线业务实际案例,从原理到实践全景式讲解Netty项目实践,快速领悟Netty专家花大量时间积累的经验,提高编程水平及分析解决问题的能力,《Netty木又威指南》作者力作,众专家力荐 Netty将Java NIO...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨 Netty 如何与 4G DTU 设备结合,以及如何构建基于 Java 的物联网(IoT)解决方案。...
4. **ChannelHandler 配置**: ChannelHandler 是 Netty 中处理网络事件的核心组件。在 Spring 集成中,可以创建一个 ChannelHandlerContext 的工厂类,该工厂由 Spring 管理,并在需要时为每个新建的 Channel 提供 ...
Netty学习之ServerChannel Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本篇中,我们将深入探讨ServerChannel这一核心概念,它是Netty中用于接收客户端...
ChannelHandler是Netty的核心组件之一,它定义了处理这些事件的接口,如读取、写入、连接和关闭事件。 Netty 4.1引入了许多增强和改进,包括: 1. **ByteBuf**:Netty的缓冲区机制,它比Java的ByteBuffer更强大,...
读书笔记:Netty权威指南学习之旅
4. **处理数据**:当接收到"Hello Netty"时,我们在`channelRead()`方法中解析数据,然后使用`writeAndFlush()`方法向客户端回写"Hello World"。 5. **关闭资源**:当不再需要服务时,记得关闭Channel和...
《Netty 4 in Action》是一本专注于Netty框架的权威指南,专为那些希望深入理解和使用Netty构建高性能、高可靠性的网络应用的开发者而编写。Netty是由JBOSS组织开发的一个开源项目,它提供了一个高效、灵活且可扩展...
Netty4是该框架的最新版本,于2016年3月份更新,提供了用户手册以方便开发者学习和使用。 用户手册首先介绍了 Netty 的架构总览,强调了 Netty 在解决网络编程中通用问题时的高效性。Netty 采用了拦截链模式的事件...
4. **Pipeline**:Netty的ChannelHandler链是通过Pipeline实现的。Pipeline允许你自定义处理网络数据的方式,每个Handler可以执行特定的任务,如解码、加密、认证等。数据在网络中的流动就像在Pipeline中的传输一样...
在《Netty进阶之路:跟着案例学Netty》中,作者将在过去几年实践中遇到的问题,以及Netty学习者咨询的相关问题,进行了归纳和总结,以问题案例做牵引,通过对案例进行剖析,讲解问题背后的原理,并结合Netty源码分析...
《Netty实战》这本书是针对Java网络编程框架Netty的一本深入实践教程,旨在帮助读者掌握Netty的核心特性和实际应用。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于各种分布式系统、微服务架构以及...
Netty基础,用于学习Netty,参考黑马程序员的netty教程
4. **心跳机制**:Netty 支持自定义心跳包,确保长连接的有效性,防止因网络延迟导致的连接断开。 5. **WebSocket 和 HTTP/2 支持**:Netty 提供了对 WebSocket 和 HTTP/2 协议的全面支持,方便构建 Web 服务和实时...
四、Netty实战应用 1. 创建服务器:通过ServerBootstrap类创建服务器,配置好EventLoopGroup、Channel、Pipeline等,监听指定端口。 2. 连接处理:通过ChannelFuture监听客户端连接,处理连接建立和断开事件。 3. ...