`
iwinit
  • 浏览: 454819 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

深入浅出Netty之三 Server请求处理

阅读更多

 

Server bind之后,就可以对外提供服务了。Netty使用了reactor模式来提升服务的并发处理能力。boss线程负责监听新的连接请求,当有新的连接进来时,将对应的channel指派一个worker线程来处理。Worker线程负责对该Channel的读写操作。

一.Boss线程

 

1.阻塞Select

 	for (;;) {
                    try {
                        // Boss线程专门负责监听新入连接,所以阻塞select
                        selector.select();
                        // 如果有新连接,先把key清掉
                        selector.selectedKeys().clear();

                        // 循环请求队列,处理连接
                        for (;;) {
                            SocketChannel acceptedSocket = channel.socket.accept();
                            if (acceptedSocket == null) {
                                break;
                            }
                            registerAcceptedChannel(acceptedSocket, currentThread);

                        }
		......
	}
2.注册新连接
    private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
            ......
		//根据用户自定义的的PipelineFactory创建pipeline
                ChannelPipeline pipeline =
                    channel.getConfig().getPipelineFactory().getPipeline();
		//hash分配worker线程,默认使用递增循环worker数组方式
                NioWorker worker = nextWorker();
		//将新的连接注册到worker线程,让worker线程负责后续读写
		//新的channel是主channel的子channel,而PipelineSink和主channel是同一个
                worker.register(new NioAcceptedSocketChannel(
                        channel.getFactory(), pipeline, channel,
                        NioServerSocketPipelineSink.this, acceptedSocket,
                        worker, currentThread), null);
            ......
        }
void register(AbstractNioChannel<?> channel, ChannelFuture future) {

        synchronized (startStopLock) {
            ......
	    //创建注册通道的任务
            Runnable registerTask = createRegisterTask(channel, future);
	    //提交任务到阻塞队列
            boolean offered = registerTaskQueue.offer(registerTask);
	    //唤醒selector
            if (wakenUp.compareAndSet(false, true)) {
                selector.wakeup();
            }

        }
    }

3.创建注册任务

 

 protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
        boolean server = !(channel instanceof NioClientSocketChannel);
        return new RegisterTask((NioSocketChannel) channel, future, server);
    }

 二.worker线程

worker线程负责对应channel的读写操作,一个worker对应一个selector,会同时处理多个channel的读写。

1.主循环

for (;;) {
            wakenUp.set(false);
		......               
                if (wakenUp.get()) {
                    wakenupFromLoop = true;
                    selector.wakeup();
                } else {
                    wakenupFromLoop = false;
                }

                cancelledKeys = 0;
		//处理注册通道的任务
                processRegisterTaskQueue();
		//处理异步事件,比如writeComplete事件
                processEventQueue();
		//处理写数据任务,如果业务线程有异步写的时候,会有WriteTask放入队列
                processWriteTaskQueue();
		//处理IO准备好的那些channel
                processSelectedKeys(selector.selectedKeys());
		......
        }

2.RegisterTask执行

 

public void run() {
		......
           	//如果是server,则使用异步模式
                if (server) {
                    channel.channel.configureBlocking(false);
                }
		//将新的channel注册到worker线程的selector上,默认监听READ事件
                synchronized (channel.interestOpsLock) {
                    channel.channel.register(
                            selector, channel.getRawInterestOps(), channel);
                }
                ......
		//触发BOUND的upstream事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件
                if (server || !((NioClientSocketChannel) channel).boundManually) {
                    fireChannelBound(channel, localAddress);
                }
		//触发CONNECTED的upsteam事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件
                fireChannelConnected(channel, remoteAddress);
		......
        }

 3.处理读写准备好的那些channel

 

for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey k = i.next();
            i.remove();
            try {
                int readyOps = k.readyOps();
		//如果某个channel写就位,则读数据
                if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                    if (!read(k)) {
                        // Connection already closed - no need to handle write.
                        continue;
                    }
                }
		//如果写就位,则写数据
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    writeFromSelectorLoop(k);
                }
            } catch (CancelledKeyException e) {
                close(k);
            }
		......
        }

4. 读取

 

 //从channel中读取数据到内部的buffer,转换成内部的ChannelBuffer,触发messageReceived事件
    protected boolean read(SelectionKey k) {
        final SocketChannel ch = (SocketChannel) k.channel();
        final NioSocketChannel channel = (NioSocketChannel) k.attachment();
	//预测下次读将读取的buffer大小,默认使用自适应的预测算法,如果上次读取把buffer读满,则增大该值,如果连续2次都没读满,则减小该值
	//如果以上都不满足,则保持不变,默认长度1024
        final ReceiveBufferSizePredictor predictor =
            channel.getConfig().getReceiveBufferSizePredictor();
        final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
	//默认BufferFactory为HeapChannelBufferFactory,默认使用Big Endian字节序
        final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();

        int ret = 0;
        int readBytes = 0;
        boolean failure = true;
	//从共享pool中拿配额,从channel中读取对应数据
        ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
        try {
            while ((ret = ch.read(bb)) > 0) {
                readBytes += ret;
                if (!bb.hasRemaining()) {
                    break;
                }
            }
            failure = false;
        } 
	......
	//有数据读入,则转换成自己的ChannelBuffer,并触发messageReceived事件,该事件将在用户自定义的Pipeline中执行
        if (readBytes > 0) {
            bb.flip();
	    //构造一个ChannelBuffer,默认使用堆内的数组实现
            final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
	    //复制数据到channelBuffer
            buffer.setBytes(0, bb);
	    //写游标
            buffer.writerIndex(readBytes);

            // 修改预测器的下次读取buffer大小
            predictor.previousReceiveBufferSize(readBytes);

            // 触发messageReceived事件
            fireMessageReceived(channel, buffer);
        }
	......
    }

  5.EchoServerHandler接受消息

 

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
		//通过channel将数据写回
		e.getChannel().write(e.getMessage());
	}

 6.数据写回,write方法其实是触发一个Downsteam事件

 

public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
        ChannelFuture future = future(channel);
        channel.getPipeline().sendDownstream(
                new DownstreamMessageEvent(channel, future, message, remoteAddress));
        return future;
    }

 7.ChannelPipeline中的处理

 

public void sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
	//如果handler已经处理完了,则转发到ChannelSink处理,对于nioserver来说就是NioServerSocketPipelineSink
        if (tail == null) {
            try {
                getSink().eventSunk(this, e);
                return;
            } catch (Throwable t) {
                notifyHandlerException(e, t);
                return;
            }
        }
	//否则,继续调用其他handler
        sendDownstream(tail, e);
    }

 8.NioServerSocketPipelineSink中处理channel事件

 

else if (e instanceof MessageEvent) {
            MessageEvent event = (MessageEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
	    //先放入写任务队列
            boolean offered = channel.writeBufferQueue.offer(event);
            assert offered;
	    //最后还是要通过work来写回数据
            channel.worker.writeFromUserCode(channel);
        }

 9.worker线程的处理

 

void writeFromUserCode(final AbstractNioChannel<?> channel) {
        ......
	//如果业务方使用了业务线程异步写,则直接往worker线程的写队列扔一个WriteTask任务
        if (scheduleWriteIfNecessary(channel)) {
            return;
        }
	......
	//如果业务方没有使用业务线程异步写,说明现在还在netty的Worker线程中,直接写
        write0(channel);
    }

 10.Worker线程直接写

 

protected void write0(AbstractNioChannel<?> channel) {
        boolean open = true;
        boolean addOpWrite = false;
        boolean removeOpWrite = false;

	    //循环写入,如果都写成功了,则将去掉该channel在selector中注册的WRITE事件监听
            for (;;) {

                MessageEvent evt = channel.currentWriteEvent;
                SendBuffer buf = null;
                ChannelFuture future = null;
                try {
                    if (evt == null) {
			//从队列中拿需要写回的数据内容,如果没有了,则认为写成功了
                        if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                            removeOpWrite = true;
                            channel.writeSuspended = false;
                            break;
                        }
                        future = evt.getFuture();
			//将ChannelBuffer转换成ByteBuffer,此处使用PooledSendBuffer	
                        channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
                    } else {
                        future = evt.getFuture();
                        buf = channel.currentWriteBuffer;

                    }

                    long localWrittenBytes = 0;
                    for (int i = writeSpinCount; i > 0; i --) {
			//将Buffer里的数据写出,因为是异步channel,如果socket的write队列满,会导致写处返回0,则重试
                        localWrittenBytes = buf.transferTo(ch);
			//有数据写出就返回,不管是否全部写出
                        if (localWrittenBytes != 0) {
                            writtenBytes += localWrittenBytes;
                            break;
                        }
                        if (buf.finished()) {
                            break;
                        }
                    }
		    //如果全部写出,则通知调用方
                    if (buf.finished()) {
                        // Successful write - proceed to the next message.
                        buf.release();
                        channel.currentWriteEvent = null;
                        channel.currentWriteBuffer = null;
                        evt = null;
                        buf = null;
                        future.setSuccess();
                    } 
		    //如果还没写完,则需要让selector也关心这个channel的write事件,让write就位时,继续写
		    else {
                        // Not written fully - perhaps the kernel buffer is full.
                        addOpWrite = true;
                        channel.writeSuspended = true;
			......                       
                    }
                } 
		......
            }
            channel.inWriteNowLoop = false;

		//让selector监听write事件            
                if (addOpWrite) {
                    setOpWrite(channel);
                } 
		//写成功后,把write监听去掉
		else if (removeOpWrite) {
                    clearOpWrite(channel);
                }
        }
	//如果worker线程直接写,直接触发writeComplete upstream事件,让handler处理
        if (iothread) {
            fireWriteComplete(channel, writtenBytes);
        } 
	//如果是业务线程异步写,将通过worker线程的eventQueue实现异步延时触发writeComplete事件
	else {
            fireWriteCompleteLater(channel, writtenBytes);
        }
    }

11.Worker线程异步写,当业务方使用多线程处理时,写回的动作对worker来说是异步的

12.业务线程放入写任务队列

 

 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
                boolean offered = writeTaskQueue.offer(channel.writeTask);
                assert offered;
            }

 13.worker线程执行写任务

 

 private void processWriteTaskQueue() throws IOException {
        for (;;) {
            final Runnable task = writeTaskQueue.poll();
            if (task == null) {
                break;
            }

            task.run();
            cleanUpCancelledKeys();
        }
    }

 14.WriteTask执行

 

private final class WriteTask implements Runnable {

        WriteTask() {
        }

        public void run() {
            writeTaskInTaskQueue.set(false);
            worker.writeFromTaskLoop(AbstractNioChannel.this);
        }
    }

 15.worker线程执行数据写入

 

    void writeFromSelectorLoop(final SelectionKey k) {
        AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
        ch.writeSuspended = false;
        write0(ch);
    }
 

 

分享到:
评论

相关推荐

    深入浅出Netty ppt

    《深入浅出Netty》是针对Java网络编程框架Netty的一份详细讲解材料,由淘宝工程师精心制作,特别适合初学者。Netty是一款高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...

    深入浅出Netty

    资源名称:深入浅出Netty资源截图: 资源太大,传百度网盘了,链接在附件中,有需要的同学自取。

    深入浅出Netty_netty_

    《深入浅出Netty》是一本专注于讲解Netty框架的编程指南,非常适合初学者入门。Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这本书通过详实的代码案例,...

    深入浅出Netty_netty5.0_

    《深入浅出Netty_netty5.0_》是针对Netty 5.0版本的一本详细教程,旨在帮助读者理解并熟练运用这一强大的网络编程框架。Netty是一个开源的Java框架,它为开发高效、稳定、可扩展的网络应用提供了全面的支持。在本文...

    架构师特刊:深入浅出Netty.pdf

    最近几年,Netty社区的发展如火如荼,无论是大数据领域,还是微服务架构,底层都需要一个高效的分布式通信框架作为基础组件。 Netty凭借优异的性能、灵活的可扩展新得到了广泛的应用。短短几年间,Netty已经成为...

    深入浅出Netty文档.zip

    这个“深入浅出Netty文档”压缩包包含了一份详细的PDF文档,旨在帮助开发者深入理解Netty的工作原理及其在实际应用中的使用。 Netty 的核心是它的事件驱动模型,基于Java NIO(非阻塞I/O)实现。它提供了一种优雅的...

    深入浅出netty-3.rar

    《深入浅出Netty》是针对Java网络编程框架Netty的一份详细讲解文档,由阿里巴巴的专家编写。Netty是一个高性能、异步事件驱动的网络应用程序框架,它为开发可维护的高性能协议服务器和客户端提供了丰富的组件和API。...

    netty in action-深入浅出netty

    Netty通过利用Java的NIO类库,提供了高效的网络处理能力,并且具有很高的可定制性和扩展性,它支持快速开发可维护的高性能协议服务器和客户端。 Netty的设计目标是让网络应用开发变得更加简单,它抽象了网络编程中...

    netty实现websocket server

    在本文中,我们将深入探讨如何使用 Netty 实现 WebSocket 服务器,以及与 Java 客户端进行交互的过程。 WebSocket 协议是一种在浏览器和服务器之间建立长连接的协议,它允许双向通信,即服务器可以主动向客户端推送...

    架构师-深入浅出Netty 书签版

    李林锋,2007 年毕业...软件的设计和开发工作,有7 年NIO 设计和开发经验,精通Netty、Mina 等 NIO 框架和平台中间件,现任华为软件平台开放实验室架构师,《Netty 权威 指南》、《分布式服务框架原理与实践》作者。

    Netty 高并发深入浅出学习高并发服务器

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,专为 Java 平台...《深入浅出Netty.pdf》这本书很可能会详细讲解这些知识点,包括理论基础、实践示例以及高级特性的使用,对于理解和掌握 Netty 有着极大的帮助。

    rtmpServer-master_nettyrtmp_rtmp推流_netty开发rtmp_rtmpServer-master

    本项目“rtmpServer-master_nettyrtmp_rtmp推流_netty开发rtmp_rtmpServer-master”是针对RTMP协议开发的一个服务器端实现,它基于强大的Java网络库Netty。以下将详细介绍该服务器的开发背景、功能、核心技术和应用...

Global site tag (gtag.js) - Google Analytics