yuanq_20 写道
引用:if you want to learn something, you might read up on it and then write down some notes.
这个图很漂亮,我也写了netty的学习系列文章,希望大家批评指正:http://asialee.iteye.com/blog/1769508
锁定老帖子 主题:netty源代码解析(1)——服务端流程
精华帖 (1) :: 良好帖 (1) :: 新手帖 (1) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2012-06-26
最后修改:2012-06-27
netty服务端启动代码如下 ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { ChannelPipeline pipleline = pipeline(); //默认最大传输帧大小为16M pipleline.addLast("encode", new ObjectEncoder(1048576 * 16)); pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null))); pipleline.addLast("handler", handler); return pipleline; } }); //设置缓冲区为64M bootstrap.setOption("receiveBufferSize", 1048576 * 64); bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法 //tcp定期发送心跳包 比如IM里边定期探测对方是否下线 //只有tcp长连接下才有意义 // bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(port)); 服务端事件处理顺序如下: UpStream.ChannelState.OPEN—–>DownStream.ChannelState.BOUND(需要绑定) ——–>UpStream.ChannelState.BOUND(已经绑定)——>DownStream.CONNECTED(需要连接)——->UpStream.CONNECTED(连接成功) 在bind的时候做了如下处理 public Channel bind(final SocketAddress localAddress) { if (localAddress == null) { throw new NullPointerException("localAddress"); } final BlockingQueue<ChannelFuture> futureQueue = new LinkedBlockingQueue<ChannelFuture>(); ChannelHandler binder = new Binder(localAddress, futureQueue); ChannelHandler parentHandler = getParentHandler(); 这里创建了一个Binder,它继承了SimpleChannelUpstreamHandler。先说说UpStreamHandler和DownStreamHandler,一般来说,UpStream类型的事件主要是由网络底层反馈给Netty的,比如messageReceived,channelConnected等事件,而DownStream类型的事件是由框架自己发起的,比如bind,write,connect,close等事件。 接着 ChannelPipeline bossPipeline = pipeline(); bossPipeline.addLast("binder", binder); if (parentHandler != null) { bossPipeline.addLast("userHandler", parentHandler); } Channel channel = getFactory().newChannel(bossPipeline); 这里创建出一个channel,每一个channel都是由一个tcp四元组组成。channel由ChannelFactory创建而成。在创建完NioServerSocketChannel后,会调用 fireChannelOpen(this);这是发出一个ChannelState.OPEN事件,前面注册的BinderHandler会处理这个事件。我们来看看Binder的代码 @Override public void channelOpen( ChannelHandlerContext ctx, ChannelStateEvent evt) { try { evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory()); // Split options into two categories: parent and child. Map<String, Object> allOptions = getOptions(); Map<String, Object> parentOptions = new HashMap<String, Object>(); for (Entry<String, Object> e: allOptions.entrySet()) { if (e.getKey().startsWith("child.")) { childOptions.put( e.getKey().substring(6), e.getValue()); } else if (!e.getKey().equals("pipelineFactory")) { parentOptions.put(e.getKey(), e.getValue()); } } // Apply parent options. evt.getChannel().getConfig().setOptions(parentOptions); } finally { ctx.sendUpstream(evt); } boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); //这里发出bind事件,return Channels.bind(this, localAddress) assert finished; } bind就触发了一个DownStream的ChannelState.BOUND事件。表明需要将该Channel绑定至指定的地址。 public void sendDownstream(ChannelEvent e) { DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail); if (tail == null) { try { getSink().eventSunk(this, e); return; } catch (Throwable t) { notifyHandlerException(e, t); return; } } sendDownstream(tail, e); } 接着就要看NioServerSocketPipelineSink了,这个主要关注于具体传输数据的处理,同时也包括其他方面的内容,比如异常处理等等。执行eventSunk方法。 public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { Channel channel = e.getChannel(); if (channel instanceof NioServerSocketChannel) { handleServerSocket(e); } else if (channel instanceof NioSocketChannel) { handleAcceptedSocket(e); } } nio方式ChannelSink一般会有1个boss实例(implements Runnable),以及若干个worker实例(不设置默认为cpu cores*2),它将channel分为 ServerSocketChannel和SocketChannel分开处理。这主要原因是boss线程accept()一个新的连接生成一个 SocketChannel交给worker进行数据接收。 看下ServerSocketChannel的处理 private void handleServerSocket(ChannelEvent e) { if (!(e instanceof ChannelStateEvent)) { return; } ChannelStateEvent event = (ChannelStateEvent) e; NioServerSocketChannel channel = (NioServerSocketChannel) event.getChannel(); ChannelFuture future = event.getFuture(); ChannelState state = event.getState(); Object value = event.getValue(); switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (SocketAddress) value); } else { close(channel, future); } break; } } 主要是处理bind事件, private void bind( NioServerSocketChannel channel, ChannelFuture future, SocketAddress localAddress) { boolean bound = false; boolean bossStarted = false; try { channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); bound = true; future.setSuccess(); fireChannelBound(channel, channel.getLocalAddress()); //取出一个boss线程,然后交给Boss类去处理。 Executor bossExecutor = ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; DeadLockProofWorker.start( bossExecutor, new ThreadRenamingRunnable( new Boss(channel), "New I/O server boss #" + id + " (" + channel + ')')); bossStarted = true; } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); } finally { if (!bossStarted && bound) { close(channel, future); } } } 看下Boss类,它实现了Runnable接口 private final Selector selector; private final NioServerSocketChannel channel; Boss(NioServerSocketChannel channel) throws IOException { this.channel = channel; selector = Selector.open(); boolean registered = false; try { channel.socket.register(selector, SelectionKey.OP_ACCEPT); registered = true; } finally { if (!registered) { closeSelector(); } } channel.selector = selector; 代码是不是有点熟悉,没错,是nio里的代码,需要注意的是,ServerSocketChannel只注册OP_ACCEPT事件。 再看下Boss类的run方法 public void run() { final Thread currentThread = Thread.currentThread(); channel.shutdownLock.lock(); try { for (;;) { try { if (selector.select(1000) > 0) { selector.selectedKeys().clear(); } SocketChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket != null) { registerAcceptedChannel(acceptedSocket, currentThread); } } catch (SocketTimeoutException e) { // Thrown every second to get ClosedChannelException // raised. } catch (CancelledKeyException e) { // Raised by accept() when the server socket was closed. } catch (ClosedSelectorException e) { // Raised by accept() when the server socket was closed. } catch (ClosedChannelException e) { // Closed as requested. break; } catch (Throwable e) { logger.warn( "Failed to accept a connection.", e); try { Thread.sleep(1000); } catch (InterruptedException e1) { // Ignore } } } } finally { channel.shutdownLock.unlock(); closeSelector(); } } 这里会调用registerAcceptedChannel(acceptedSocket, currentThread);方法 private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) { try { ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline(); NioWorker worker = nextWorker(); //获取一个NioWorker //将Channel注册到NioWorker上去 worker.register(new NioAcceptedSocketChannel( channel.getFactory(), pipeline, channel, NioServerSocketPipelineSink.this, acceptedSocket, worker, currentThread), null); } catch (Exception e) { logger.warn( "Failed to initialize an accepted socket.", e); try { acceptedSocket.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially accepted socket.", e2); } } } 当有新的连接建立,会交给NioWorker的线程池去处理,boss只负责accept到新的连接,新的SocketChannel会被注册到一个work线程中去。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2012-06-26
谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?
|
|
返回顶楼 | |
发表时间:2012-06-27
finallygo 写道 谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?
从iso网络七层模型来理解 |
|
返回顶楼 | |
发表时间:2012-06-28
fengfeng925 写道 finallygo 写道 谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?
从iso网络七层模型来理解 不用那么弦乎吧。字面上理解也行啊,down就是下发的意思啊,反过来接收就是up啦 ,嘿嘿 |
|
返回顶楼 | |
发表时间:2012-06-28
ljl_ss 写道 fengfeng925 写道 finallygo 写道 谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?
从iso网络七层模型来理解 不用那么弦乎吧。字面上理解也行啊,down就是下发的意思啊,反过来接收就是up啦 ,嘿嘿 我也觉得就是把思维角度换一下去理解。和我们自身的习惯有所不同而已.呵呵 |
|
返回顶楼 | |
发表时间:2012-07-02
bootstrap.setOption("receiveBufferSize", 1048576 * 64);
这么漂亮的代码为什么还要用魔数呢 |
|
返回顶楼 | |
发表时间:2013-01-06
引用:if you want to learn something, you might read up on it and then write down some notes.
|
|
返回顶楼 | |
发表时间:2013-02-20
yuanq_20 写道
引用:if you want to learn something, you might read up on it and then write down some notes.
这个图很漂亮,我也写了netty的学习系列文章,希望大家批评指正:http://asialee.iteye.com/blog/1769508 |
|
返回顶楼 | |