论坛首页 Java企业应用论坛

netty源代码解析(1)——服务端流程

浏览 8993 次
精华帖 (1) :: 良好帖 (1) :: 新手帖 (1) :: 隐藏帖 (0)
作者 正文
   发表时间:2012-06-26   最后修改:2012-06-27
今天看了下netty代码,对代码做了个流程分析,netty的代码写的真是漂亮。
netty服务端启动代码如下
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
				Executors.newCachedThreadPool()));
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

			@Override
			public ChannelPipeline getPipeline() {
				ChannelPipeline pipleline = pipeline();
				//默认最大传输帧大小为16M
				pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));
				pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));
				pipleline.addLast("handler", handler);
				return pipleline;
			}

		});
		//设置缓冲区为64M
		bootstrap.setOption("receiveBufferSize", 1048576 * 64);
		bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法
		//tcp定期发送心跳包 比如IM里边定期探测对方是否下线
		//只有tcp长连接下才有意义
//		bootstrap.setOption("child.keepAlive", true); 
		bootstrap.bind(new InetSocketAddress(port));

服务端事件处理顺序如下:
UpStream.ChannelState.OPEN—–>DownStream.ChannelState.BOUND(需要绑定)
——–>UpStream.ChannelState.BOUND(已经绑定)——>DownStream.CONNECTED(需要连接)——->UpStream.CONNECTED(连接成功)

在bind的时候做了如下处理
public Channel bind(final SocketAddress localAddress) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }

        final BlockingQueue<ChannelFuture> futureQueue =
            new LinkedBlockingQueue<ChannelFuture>();

        ChannelHandler binder = new Binder(localAddress, futureQueue);
        ChannelHandler parentHandler = getParentHandler();

这里创建了一个Binder,它继承了SimpleChannelUpstreamHandler。先说说UpStreamHandler和DownStreamHandler,一般来说,UpStream类型的事件主要是由网络底层反馈给Netty的,比如messageReceived,channelConnected等事件,而DownStream类型的事件是由框架自己发起的,比如bind,write,connect,close等事件。
接着
ChannelPipeline bossPipeline = pipeline();
        bossPipeline.addLast("binder", binder);
        if (parentHandler != null) {
            bossPipeline.addLast("userHandler", parentHandler);
        }

        Channel channel = getFactory().newChannel(bossPipeline);

这里创建出一个channel,每一个channel都是由一个tcp四元组组成。channel由ChannelFactory创建而成。在创建完NioServerSocketChannel后,会调用
fireChannelOpen(this);这是发出一个ChannelState.OPEN事件,前面注册的BinderHandler会处理这个事件。我们来看看Binder的代码
 @Override
        public void channelOpen(
                ChannelHandlerContext ctx,
                ChannelStateEvent evt) {

            try {
                evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());

                // Split options into two categories: parent and child.
                Map<String, Object> allOptions = getOptions();
                Map<String, Object> parentOptions = new HashMap<String, Object>();
                for (Entry<String, Object> e: allOptions.entrySet()) {
                    if (e.getKey().startsWith("child.")) {
                        childOptions.put(
                                e.getKey().substring(6),
                                e.getValue());
                    } else if (!e.getKey().equals("pipelineFactory")) {
                        parentOptions.put(e.getKey(), e.getValue());
                    }
                }

                // Apply parent options.
                evt.getChannel().getConfig().setOptions(parentOptions);
            } finally {
                ctx.sendUpstream(evt);
            }

            boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); //这里发出bind事件,return Channels.bind(this, localAddress)
            assert finished;
        }

bind就触发了一个DownStream的ChannelState.BOUND事件。表明需要将该Channel绑定至指定的地址。
 public void sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
        if (tail == null) {
            try {
                getSink().eventSunk(this, e);
                return;
            } catch (Throwable t) {
                notifyHandlerException(e, t);
                return;
            }
        }

        sendDownstream(tail, e);
    }

接着就要看NioServerSocketPipelineSink了,这个主要关注于具体传输数据的处理,同时也包括其他方面的内容,比如异常处理等等。执行eventSunk方法。
 public void eventSunk(
            ChannelPipeline pipeline, ChannelEvent e) throws Exception {
        Channel channel = e.getChannel();
        if (channel instanceof NioServerSocketChannel) {
            handleServerSocket(e);
        } else if (channel instanceof NioSocketChannel) {
            handleAcceptedSocket(e);
        }
    }

nio方式ChannelSink一般会有1个boss实例(implements Runnable),以及若干个worker实例(不设置默认为cpu cores*2),它将channel分为 ServerSocketChannel和SocketChannel分开处理。这主要原因是boss线程accept()一个新的连接生成一个 SocketChannel交给worker进行数据接收。

看下ServerSocketChannel的处理
private void handleServerSocket(ChannelEvent e) {
        if (!(e instanceof ChannelStateEvent)) {
            return;
        }

        ChannelStateEvent event = (ChannelStateEvent) e;
        NioServerSocketChannel channel =
            (NioServerSocketChannel) event.getChannel();
        ChannelFuture future = event.getFuture();
        ChannelState state = event.getState();
        Object value = event.getValue();

        switch (state) {
        case OPEN:
            if (Boolean.FALSE.equals(value)) {
                close(channel, future);
            }
            break;
        case BOUND:
            if (value != null) {
                bind(channel, future, (SocketAddress) value);
            } else {
                close(channel, future);
            }
            break;
        }
    }

主要是处理bind事件,
private void bind(
            NioServerSocketChannel channel, ChannelFuture future,
            SocketAddress localAddress) {

        boolean bound = false;
        boolean bossStarted = false;
        try {
            channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
            bound = true;

            future.setSuccess();
            fireChannelBound(channel, channel.getLocalAddress());

            //取出一个boss线程,然后交给Boss类去处理。
            Executor bossExecutor =
                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
            DeadLockProofWorker.start(
                    bossExecutor,
                    new ThreadRenamingRunnable(
                            new Boss(channel),
                            "New I/O server boss #" + id + " (" + channel + ')'));
            bossStarted = true;
        } catch (Throwable t) {
            future.setFailure(t);
            fireExceptionCaught(channel, t);
        } finally {
            if (!bossStarted && bound) {
                close(channel, future);
            }
        }
    }

看下Boss类,它实现了Runnable接口
private final Selector selector;
        private final NioServerSocketChannel channel;

        Boss(NioServerSocketChannel channel) throws IOException {
            this.channel = channel;

            selector = Selector.open();

            boolean registered = false;
            try {
                channel.socket.register(selector, SelectionKey.OP_ACCEPT);
                registered = true;
            } finally {
                if (!registered) {
                    closeSelector();
                }
            }

            channel.selector = selector;

代码是不是有点熟悉,没错,是nio里的代码,需要注意的是,ServerSocketChannel只注册OP_ACCEPT事件。

再看下Boss类的run方法
public void run() {
            final Thread currentThread = Thread.currentThread();

            channel.shutdownLock.lock();
            try {
                for (;;) {
                    try {
                        if (selector.select(1000) > 0) {
                            selector.selectedKeys().clear();
                        }

                        SocketChannel acceptedSocket = channel.socket.accept();
                        if (acceptedSocket != null) {
                            registerAcceptedChannel(acceptedSocket, currentThread);
                        }
                    } catch (SocketTimeoutException e) {
                        // Thrown every second to get ClosedChannelException
                        // raised.
                    } catch (CancelledKeyException e) {
                        // Raised by accept() when the server socket was closed.
                    } catch (ClosedSelectorException e) {
                        // Raised by accept() when the server socket was closed.
                    } catch (ClosedChannelException e) {
                        // Closed as requested.
                        break;
                    } catch (Throwable e) {
                        logger.warn(
                                "Failed to accept a connection.", e);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
                            // Ignore
                        }
                    }
                }
            } finally {
                channel.shutdownLock.unlock();
                closeSelector();
            }
        }

这里会调用registerAcceptedChannel(acceptedSocket, currentThread);方法
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
            try {
                ChannelPipeline pipeline =
                    channel.getConfig().getPipelineFactory().getPipeline();
                NioWorker worker = nextWorker(); //获取一个NioWorker
                //将Channel注册到NioWorker上去
                worker.register(new NioAcceptedSocketChannel(
                        channel.getFactory(), pipeline, channel,
                        NioServerSocketPipelineSink.this, acceptedSocket,
                        worker, currentThread), null);
            } catch (Exception e) {
                logger.warn(
                        "Failed to initialize an accepted socket.", e);
                try {
                    acceptedSocket.close();
                } catch (IOException e2) {
                    logger.warn(
                            "Failed to close a partially accepted socket.",
                            e2);
                }
            }
        }

当有新的连接建立,会交给NioWorker的线程池去处理,boss只负责accept到新的连接,新的SocketChannel会被注册到一个work线程中去。
   发表时间:2012-06-26  
谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?
0 请登录后投票
   发表时间:2012-06-27  
finallygo 写道
谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?

从iso网络七层模型来理解
0 请登录后投票
   发表时间:2012-06-28  
fengfeng925 写道
finallygo 写道
谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?

从iso网络七层模型来理解

不用那么弦乎吧。字面上理解也行啊,down就是下发的意思啊,反过来接收就是up啦 ,嘿嘿
0 请登录后投票
   发表时间:2012-06-28  
ljl_ss 写道
fengfeng925 写道
finallygo 写道
谢谢了,最近正好正在用netty,不过我想的问的是为什么接收消息是叫UpStreamHandler,而发送消息是叫DownStreamHandler,不是和我们正常的理解相反么?

从iso网络七层模型来理解

不用那么弦乎吧。字面上理解也行啊,down就是下发的意思啊,反过来接收就是up啦 ,嘿嘿

我也觉得就是把思维角度换一下去理解。和我们自身的习惯有所不同而已.呵呵
0 请登录后投票
   发表时间:2012-07-02  
bootstrap.setOption("receiveBufferSize", 1048576 * 64); 
这么漂亮的代码为什么还要用魔数呢
0 请登录后投票
   发表时间:2013-01-06  

引用:if you want to learn something, you might read up on it and then write down some notes.

 

 

0 请登录后投票
   发表时间:2013-02-20  
yuanq_20 写道

 

引用:if you want to learn something, you might read up on it and then write down some notes.

 

 

 这个图很漂亮,我也写了netty的学习系列文章,希望大家批评指正:http://asialee.iteye.com/blog/1769508

0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics