`

Netty源码学习-ServerBootstrap启动及事件处理过程

阅读更多

Netty是采用了Reactor模式的多线程版本,建议先看下面这篇文章了解一下Reactor模式:
http://bylijinnan.iteye.com/blog/1992325

Netty的启动及事件处理的流程,基本上是按照上面这篇文章来走的
文章里面提到的操作,每一步都能在Netty里面找到对应的代码
其中Reactor里面的Acceptor就对应Netty的ServerBootstrap.boss;
而Reactor里面的Handler就对应Netty里面各ChannelHandler(在worker里面跑)

由于流程涉及到比较多的类和方法,我提取一下Netty的骨架:


ServerBootstrap.bind(localAddress)
|-->newServerSocketChannel & fireChannelOpen	(得到ServerSocketChannel[server])

-->
Binder.channelOpen
|-->Channels.bind(that is : sendDownstream of ChannelState.BOUND)

-->
In DefaultChannelPipeline, No downstreamHandler, jump to 
NioServerSocketPipelineSink.bind	(关键)
|-->1.do the REAL java.net.ServerSocket.bind	(server绑定端口)
	  2.start bossThread in bossExecutor
	  3.do "accept & dispatch" in a endless loop of bossThread(得到SocketChannel[client])
		 |--> registerAcceptedChannel, start worker in workerPool
			   |-->worker.run
					 |-->processSelectedKeys(selector.selectedKeys())
						   |--> read & fireMessageReceived	(开始调用各Handler)



下面就对照上面的“骨架”,把关键的代码拿出来读一下
其中关键的步骤,我用“===[关键步骤]===”的形式标记出来了

Netty的Server端是从ServerBootstrap.bind方法开始的:

public class ServerBootstrap extends Bootstrap {

	public Channel bind(final SocketAddress localAddress) {
			final BlockingQueue<ChannelFuture> futureQueue =
				new LinkedBlockingQueue<ChannelFuture>();

			ChannelHandler binder = new Binder(localAddress, futureQueue);

			ChannelPipeline bossPipeline = Channels.pipeline();
			bossPipeline.addLast("binder", binder);

			/*===OPEN===
			NioServerSocketChannelFactory.newChannel返回一个NioServerSocketChannel
			在NioServerSocketChannel的构造函数里,调用ServerSocketChannel.open()并触发channelOpen事件
			这个事件由上面的“binder”来处理并返回Future(非阻塞),详见Binder
			最后将Future放入futureQueue,以便在接下来的while循环里面取
			*/
			Channel channel = getFactory().newChannel(bossPipeline);
			
			// Wait until the future is available.
			ChannelFuture future = null;
			boolean interrupted = false;
			do {
				try {
					future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
				} catch (InterruptedException e) {
					interrupted = true;
				}
			} while (future == null);

			//处理中断的一种方式,详见《Java并发编程实践》
			if (interrupted) {
				Thread.currentThread().interrupt();
			}

			// Wait for the future.
			future.awaitUninterruptibly();

			return channel;
	}

	//主要是处理channelOpen事件
	private final class Binder extends SimpleChannelUpstreamHandler {

			private final SocketAddress localAddress;
			private final BlockingQueue<ChannelFuture> futureQueue;
			private final Map<String, Object> childOptions =
				new HashMap<String, Object>();

			Binder(SocketAddress localAddress, BlockingQueue<ChannelFuture> futureQueue) {
				this.localAddress = localAddress;
				this.futureQueue = futureQueue;
			}

			public void channelOpen(
					ChannelHandlerContext ctx,
					ChannelStateEvent evt) {

				try {
					//处理各种option,例如keep alive,nodelay等等,省略代码
				} finally {
					ctx.sendUpstream(evt);
				}

				/*
				重点在这里
				这里bind方法只是触发sendDownstream(ChannelState.BOUND)
				而此时pipeline里面还没有ChannelDownstreamHandler(只有一个handler:“binder”):
				public void sendDownstream(ChannelEvent e) {
					DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
					if (tail == null) {
						try {
							getSink().eventSunk(this, e);
							return;
						} 
					}
					sendDownstream(tail, e);
				}
				因此ChannelState.BOUND会去到pipeline里面的sink,在sink里面执行最终的java.net.ServerSocket.bind操作
				详见NioServerSocketPipelineSink.bind
				*/
				boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
				assert finished;
			}
			}
}


NioServerSocketPipelineSink:

class NioServerSocketPipelineSink extends AbstractNioChannelSink {
	
	 private void bind(
            NioServerSocketChannel channel, ChannelFuture future,
            SocketAddress localAddress) {
        try {
		
			//在这里执行真正的===BIND===
            channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
            bound = true;

            Executor bossExecutor =
                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
			
			//java.net.ServerSocket.bind完成,接下来可以accept了,详见Boss类的run方法
			//===BOSS start===,放入线程池里跑(bossExecutor)
            DeadLockProofWorker.start(bossExecutor,
                    new ThreadRenamingRunnable(new Boss(channel),
                            "New I/O server boss #" + id + " (" + channel + ')'));
            bossStarted = true;
        } 
		
    }

	private final class Boss implements Runnable {
        private final Selector selector;
        private final NioServerSocketChannel channel;

		/*
		===REGISTER[server]===
		注意到每新建一个Boss,就会新建一个selector
		*/
        Boss(NioServerSocketChannel channel) throws IOException {
            this.channel = channel;
            selector = Selector.open();
			channel.socket.register(selector, SelectionKey.OP_ACCEPT);
			registered = true;
            channel.selector = selector;
        }

		/*
		===ACCEPT&DISPATCH===
		boss不断地接受Client的连接并将连接成功的SocketChannel交由worker处理
		*/
        public void run() {
                        for (;;) {
                            SocketChannel acceptedSocket = channel.socket.accept();
                            if (acceptedSocket == null) {
                                break;
                            }
							
							//把acceptedSocket交由worker处理
                            registerAcceptedChannel(acceptedSocket, currentThread);
                        }
        }
		
		/*
		这里面的worker(implements Runnable)就相当于“Reactor Pattern”里面“Handler”
		handler需要两方面信息:selector和acceptedSocket,其中后者已经传递过来了,而selector则
		在worker.register里新创建一个
		*/
		private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
                ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline();
				
				//从WorkerPool里面取:workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]
				//可见worker是re-used的
                NioWorker worker = nextWorker();
				
				/*
				值得注意的是new NioAcceptedSocketChannel(...)包含了一个关键操作:
				将pipeline与channel关联起来,一对一;见AbstractChannel类:
					protected AbstractChannel(
							Channel parent, ChannelFactory factory,
							ChannelPipeline pipeline, ChannelSink sink) {

						this.parent = parent;
						this.factory = factory;
						this.pipeline = pipeline;

						id = allocateId(this);

						pipeline.attach(this, sink);
					}
				*/
                worker.register(new NioAcceptedSocketChannel(
                        channel.getFactory(), pipeline, channel,
                        NioServerSocketPipelineSink.this, acceptedSocket,
                        worker, currentThread), null);
        }
    }
}


worker.register,主要工作是创建registerTask(implements Runnable)并放入registerTaskQueue
对应的类是NioWorker 和AbstractNioWorker:
	
	void register(AbstractNioChannel<?> channel, ChannelFuture future) {

		//只是创建Runnable,未启动。在worker的run方法中,processRegisterTaskQueue时候才执行
        Runnable registerTask = createRegisterTask(channel, future);
		
		//在start()里面启动worker线程
        Selector selector = start();
        boolean offered = registerTaskQueue.offer(registerTask);
        assert offered;

        if (wakenUp.compareAndSet(false, true)) {
            selector.wakeup();
        }
    }
	
	 private Selector start() {
        synchronized (startStopLock) {
            if (!started) {
                 selector = Selector.open();
				
				 //===WORKER start===
                DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O  worker #" + id));
            }
        }
        return selector;
    }

	private final class RegisterTask implements Runnable {
        private final NioSocketChannel channel;
        private final ChannelFuture future;
        private final boolean server;
        public void run() {
            try {
                synchronized (channel.interestOpsLock) {
				
					//===REGISTER[client]===	初始的state(getRawInterestOps)是OP_READ
                    channel.channel.register(selector, channel.getRawInterestOps(), channel);
                }
                fireChannelConnected(channel, remoteAddress);
			}
        }
    }
	


worker线程的run操作:


 public void run() {
        for (;;) {
				
				//===SELECT===
                SelectorUtil.select(selector);

                processRegisterTaskQueue();
                processEventQueue();
                processWriteTaskQueue();
				
				//在这里面,就会遍历selectedKey并调用相关联的handler
                processSelectedKeys(selector.selectedKeys());
        }
}	

private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
	for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
		SelectionKey k = i.next();
		i.remove();
		int readyOps = k.readyOps();
		
		//下面的“与”操作等价于k.isReadable
		if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
			//执行读操作
			if (!read(k)) {
				continue;
			}
		}
		//执行写操作
		if ((readyOps & SelectionKey.OP_WRITE) != 0) {
			writeFromSelectorLoop(k);
		}
	}
}

	/*
	主要是两个操作:
	1.从channel里面读取数据
	2.读取完成后,fireMessageReceived,从channel(k.attachment)
	 可以得到与它关联的pipeline,从而触发pipeline里面的handler
	*/
	protected boolean read(SelectionKey k) {
	
		/*
		变量“ch”的类型是java.nio.channels.SocketChannel,是“the channel for which this key was created”
		变量“channel”的类型是org.jboss.netty.channel.socket.nio.NioSocketChannel,是“the attachment for which this key was created”
		因此,“ch”的作用就是读数据,而“channel”的作用则是取得pipeline后开始处理数据
		
		但,“channel”似乎是“包含”了“ch”?
		我们知道,org.jboss.netty.channel.socket.nio.NioSocketChannel是对java.nio.channels.SocketChannel的封装,
		正如org.jboss.netty.channel.socket.nio.NioServerSocketChannel是对java.nio.channels.ServerSocketChannel的封装
		而查看RegisterTask的run方法,register并返回SelectionKey:
		channel.channel.register(
                            selector, channel.getRawInterestOps(), channel);
		变量的命名让人糊涂,翻译一下:
		acceptedNioSocketChannel.channel.register(selector, ops, acceptedNioSocketChannel)
		注意到acceptedNioSocketChannel.channel的真实类型,其实就是java.nio.channels.SocketChannel,
		它就是下面代码里的“acceptedSocket”:
		worker.register(new NioAcceptedSocketChannel(
                        channel.getFactory(), pipeline, channel,
                        NioServerSocketPipelineSink.this, acceptedSocket,
                        worker, currentThread), null);
		因此,是不是可以认为“channel”与“ch”的关系是:
		ch = channel.channel
		*/
		
		
		final SocketChannel ch = (SocketChannel) k.channel();
		final NioSocketChannel channel = (NioSocketChannel) k.attachment();

		//会根据这一次接收到的数据的大小,来预测下一次接收数据的大小
		//并以此为依据来决定ByteBuffer的大小
		final ReceiveBufferSizePredictor predictor =
			channel.getConfig().getReceiveBufferSizePredictor();
		final int predictedRecvBufSize = predictor.nextReceiveBufferSize();

		int ret = 0;
		int readBytes = 0;
		boolean failure = true;

		ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
		try {
			while ((ret = ch.read(bb)) > 0) {
				readBytes += ret;
				if (!bb.hasRemaining()) {
					break;
				}
			}
			failure = false;
		} 

		if (readBytes > 0) {
			bb.flip();

			final ChannelBufferFactory bufferFactory =
				channel.getConfig().getBufferFactory();
			final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
			buffer.setBytes(0, bb);
			buffer.writerIndex(readBytes);

			recvBufferPool.release(bb);

			// Update the predictor.
			predictor.previousReceiveBufferSize(readBytes);

			// Fire the event.
			fireMessageReceived(channel, buffer);
		} 
		return true;
	}
 



0
0
分享到:
评论

相关推荐

    netty_demo-master.zip

    《基于Springboot的Netty TCP Server实践与整合》 在当今的分布式系统中,网络通信是不可或缺的一部分。...这是一个很好的学习和实践案例,对于深入理解Springboot、Netty以及RESTful API设计都有很大的帮助。

    netty源码分析之服务端启动全解析

    4. 灵活性:Netty支持对多种协议的处理,且通过拦截器模式,允许开发者对消息处理过程进行拦截,定制自己的处理逻辑。 5. 完整的文档和活跃的社区:Netty具有丰富的文档资料和一个活跃的社区,有助于用户快速学习和...

    netty源码深入分析

    《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...

    netty源码和相关中文文档

    通过学习 Netty 的源码,开发者可以更好地理解和优化其在网络通信中的性能表现,比如减少内存分配、提高并行度以及优化编码解码过程等。同时,配合中文文档,可以降低学习门槛,让开发者更快地掌握 Netty 的使用技巧...

    netty源码解析视频

    ### Netty源码解析知识点概览 #### 一、Netty简介与应用场景 - **Netty**是一款由JBOSS提供的高性能的...以上是对“netty源码解析视频”教程涉及的主要知识点进行的详细梳理和解释,希望对你学习Netty源码有所帮助。

    netty源码 4.*版本

    Netty 是一个高性能、异步事件驱动的...总之,Netty 源码是一个很好的学习平台,可以深入了解网络编程、并发处理以及高效 I/O 设计。通过分析源码,不仅可以提升自己的技术能力,还能为解决实际问题提供灵感和参考。

    Netty3.x 源码解析

    Netty源码阅读的目的通常有两个:一是因为工作中使用到了Netty,希望通过阅读源码来更加深入地了解它;二是出于对Java网络编程的兴趣,希望通过学习Netty来探索如何构建高性能网络应用。同时,Netty的代码结构组织...

    netty:李林峰-netty权威指南-原始码-可运行代码(专有协议,我调试了很久)

    源码分析可以帮助我们深入理解Netty的内部机制,比如ByteBuf如何管理内存,ChannelHandlerContext如何传递事件,以及Pipeline(管道)如何处理消息等。你可以看到EventLoopGroup是如何管理EventLoop的,以及...

    Netty框架网络编程实战-Netty_chat.zip

    - 分析项目源码,了解ServerBootstrap和Bootstrap的配置过程。 - 理解ChannelHandler和ChannelPipeline的事件处理机制。 - 学习如何编写自定义的Encoder和Decoder。 - 实践客户端和服务器的交互,调试运行聊天...

    netty源码深入剖析.txt

    《Netty源码深入剖析》一书旨在帮助读者深入了解Netty框架的工作原理和技术细节,从基础知识入手,逐步过渡到高级优化技巧,使开发者能够更好地掌握并应用Netty于实际项目中。 ### 一、Netty简介与核心特性 Netty...

    nettychat实时聊天室源码websocket

    通过分析NettyChat源码,我们可以学习到如何利用Netty和WebSocket实现高效的实时聊天功能,并了解如何组织和设计网络应用程序。这个项目是深入理解这两种技术及其相互配合的好实例,对于提升Java网络编程能力具有很...

    Netty-入门Netty编码

    在 Netty 中,我们首先定义一个 ServerBootstrap,它是服务器启动配置的入口。然后,我们设置 ChildHandler(通常是 ChannelInitializer),在这个初始化器中,我们可以添加 ChannelHandlers 到 ChannelPipeline 中...

    javanetty源码-java_study-:netty+java(源代码参考)

    Java Nety是一个高性能、异步事件...对于Java开发者来说,深入学习Netty源码不仅可以提升网络编程技能,也有助于解决实际问题,比如优化性能、处理异常、实现自定义协议等。因此,Netty源码是Java学习者宝贵的资源。

    最全Netty教程源码资料.rar

    Netty的入门通常包括了解其基本组件,如Bootstrap(启动引导类)、ServerBootstrap(服务器启动引导类)、Channel(通道)、EventLoop(事件循环)、Handler(处理器)等。学习者会学习如何创建简单的服务端和客户端...

    netty框架的学习和实战聊天室源码.zip

    - 服务器端的启动类通常会初始化一个 ServerBootstrap,配置好 EventLoopGroup(工作线程组),并绑定监听端口,然后添加 ChannelHandler 处理接收到的连接和数据。 - 客户端则会创建一个 Bootstrap,配置好连接到...

    netty学习之ServerChannel

    3. **ChannelHandler**:Netty中的`ChannelHandler`接口是处理I/O事件的核心组件。ServerChannel会将接收到的客户端连接转换为新的`Channel`,每个新建立的连接都有自己的事件处理管道。你可以通过`ChannelPipeline`...

    Netty权威指南(第2版)-书籍及源码

    《Netty权威指南(第2版)》是一本深度探讨Netty框架的书籍,它为读者提供了全面且深入的理解和实践指导,旨在帮助开发者更好地...配合源码学习,能够更直观地理解Netty的工作机制,从而更好地运用到实际的项目开发中。

    Netty4.0学习笔记系列之三:构建简单的http服务

    - 在Netty中,我们首先需要创建一个`ServerBootstrap`实例,它是服务器的启动类。 - 接着,配置`EventLoopGroup`,这是Netty的I/O线程模型,通常包括BossGroup(负责接收连接)和WorkerGroup(处理I/O事件)。 - ...

Global site tag (gtag.js) - Google Analytics