- 浏览: 788481 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
萨琳娜啊:
Java读源码之Netty深入剖析网盘地址:https://p ...
Netty源码学习-FileRegion -
飞天奔月:
写得有趣 ^_^
那一年你定义了一个接口 -
GoldRoger:
第二个方法很好
java-判断一个自然数是否是某个数的平方。当然不能使用开方运算 -
bylijinnan:
<script>alert("close ...
自己动手实现Java Validation -
paul920531:
39行有个bug:"int j=new Random ...
java-蓄水池抽样-要求从N个元素中随机的抽取k个元素,其中N无法确定
Netty是采用了Reactor模式的多线程版本,建议先看下面这篇文章了解一下Reactor模式:
http://bylijinnan.iteye.com/blog/1992325
Netty的启动及事件处理的流程,基本上是按照上面这篇文章来走的
文章里面提到的操作,每一步都能在Netty里面找到对应的代码
其中Reactor里面的Acceptor就对应Netty的ServerBootstrap.boss;
而Reactor里面的Handler就对应Netty里面各ChannelHandler(在worker里面跑)
由于流程涉及到比较多的类和方法,我提取一下Netty的骨架:
ServerBootstrap.bind(localAddress) |-->newServerSocketChannel & fireChannelOpen (得到ServerSocketChannel[server]) --> Binder.channelOpen |-->Channels.bind(that is : sendDownstream of ChannelState.BOUND) --> In DefaultChannelPipeline, No downstreamHandler, jump to NioServerSocketPipelineSink.bind (关键) |-->1.do the REAL java.net.ServerSocket.bind (server绑定端口) 2.start bossThread in bossExecutor 3.do "accept & dispatch" in a endless loop of bossThread(得到SocketChannel[client]) |--> registerAcceptedChannel, start worker in workerPool |-->worker.run |-->processSelectedKeys(selector.selectedKeys()) |--> read & fireMessageReceived (开始调用各Handler)
下面就对照上面的“骨架”,把关键的代码拿出来读一下
其中关键的步骤,我用“===[关键步骤]===”的形式标记出来了
Netty的Server端是从ServerBootstrap.bind方法开始的:
public class ServerBootstrap extends Bootstrap { public Channel bind(final SocketAddress localAddress) { final BlockingQueue<ChannelFuture> futureQueue = new LinkedBlockingQueue<ChannelFuture>(); ChannelHandler binder = new Binder(localAddress, futureQueue); ChannelPipeline bossPipeline = Channels.pipeline(); bossPipeline.addLast("binder", binder); /*===OPEN=== NioServerSocketChannelFactory.newChannel返回一个NioServerSocketChannel 在NioServerSocketChannel的构造函数里,调用ServerSocketChannel.open()并触发channelOpen事件 这个事件由上面的“binder”来处理并返回Future(非阻塞),详见Binder 最后将Future放入futureQueue,以便在接下来的while循环里面取 */ Channel channel = getFactory().newChannel(bossPipeline); // Wait until the future is available. ChannelFuture future = null; boolean interrupted = false; do { try { future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS); } catch (InterruptedException e) { interrupted = true; } } while (future == null); //处理中断的一种方式,详见《Java并发编程实践》 if (interrupted) { Thread.currentThread().interrupt(); } // Wait for the future. future.awaitUninterruptibly(); return channel; } //主要是处理channelOpen事件 private final class Binder extends SimpleChannelUpstreamHandler { private final SocketAddress localAddress; private final BlockingQueue<ChannelFuture> futureQueue; private final Map<String, Object> childOptions = new HashMap<String, Object>(); Binder(SocketAddress localAddress, BlockingQueue<ChannelFuture> futureQueue) { this.localAddress = localAddress; this.futureQueue = futureQueue; } public void channelOpen( ChannelHandlerContext ctx, ChannelStateEvent evt) { try { //处理各种option,例如keep alive,nodelay等等,省略代码 } finally { ctx.sendUpstream(evt); } /* 重点在这里 这里bind方法只是触发sendDownstream(ChannelState.BOUND) 而此时pipeline里面还没有ChannelDownstreamHandler(只有一个handler:“binder”): public void sendDownstream(ChannelEvent e) { DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail); if (tail == null) { try { getSink().eventSunk(this, e); return; } } sendDownstream(tail, e); } 因此ChannelState.BOUND会去到pipeline里面的sink,在sink里面执行最终的java.net.ServerSocket.bind操作 详见NioServerSocketPipelineSink.bind */ boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); assert finished; } } }
NioServerSocketPipelineSink:
class NioServerSocketPipelineSink extends AbstractNioChannelSink { private void bind( NioServerSocketChannel channel, ChannelFuture future, SocketAddress localAddress) { try { //在这里执行真正的===BIND=== channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); bound = true; Executor bossExecutor = ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; //java.net.ServerSocket.bind完成,接下来可以accept了,详见Boss类的run方法 //===BOSS start===,放入线程池里跑(bossExecutor) DeadLockProofWorker.start(bossExecutor, new ThreadRenamingRunnable(new Boss(channel), "New I/O server boss #" + id + " (" + channel + ')')); bossStarted = true; } } private final class Boss implements Runnable { private final Selector selector; private final NioServerSocketChannel channel; /* ===REGISTER[server]=== 注意到每新建一个Boss,就会新建一个selector */ Boss(NioServerSocketChannel channel) throws IOException { this.channel = channel; selector = Selector.open(); channel.socket.register(selector, SelectionKey.OP_ACCEPT); registered = true; channel.selector = selector; } /* ===ACCEPT&DISPATCH=== boss不断地接受Client的连接并将连接成功的SocketChannel交由worker处理 */ public void run() { for (;;) { SocketChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket == null) { break; } //把acceptedSocket交由worker处理 registerAcceptedChannel(acceptedSocket, currentThread); } } /* 这里面的worker(implements Runnable)就相当于“Reactor Pattern”里面“Handler” handler需要两方面信息:selector和acceptedSocket,其中后者已经传递过来了,而selector则 在worker.register里新创建一个 */ private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) { ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline(); //从WorkerPool里面取:workers[Math.abs(workerIndex.getAndIncrement() % workers.length)] //可见worker是re-used的 NioWorker worker = nextWorker(); /* 值得注意的是new NioAcceptedSocketChannel(...)包含了一个关键操作: 将pipeline与channel关联起来,一对一;见AbstractChannel类: protected AbstractChannel( Channel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink) { this.parent = parent; this.factory = factory; this.pipeline = pipeline; id = allocateId(this); pipeline.attach(this, sink); } */ worker.register(new NioAcceptedSocketChannel( channel.getFactory(), pipeline, channel, NioServerSocketPipelineSink.this, acceptedSocket, worker, currentThread), null); } } }
worker.register,主要工作是创建registerTask(implements Runnable)并放入registerTaskQueue
对应的类是NioWorker 和AbstractNioWorker:
void register(AbstractNioChannel<?> channel, ChannelFuture future) { //只是创建Runnable,未启动。在worker的run方法中,processRegisterTaskQueue时候才执行 Runnable registerTask = createRegisterTask(channel, future); //在start()里面启动worker线程 Selector selector = start(); boolean offered = registerTaskQueue.offer(registerTask); assert offered; if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } } private Selector start() { synchronized (startStopLock) { if (!started) { selector = Selector.open(); //===WORKER start=== DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id)); } } return selector; } private final class RegisterTask implements Runnable { private final NioSocketChannel channel; private final ChannelFuture future; private final boolean server; public void run() { try { synchronized (channel.interestOpsLock) { //===REGISTER[client]=== 初始的state(getRawInterestOps)是OP_READ channel.channel.register(selector, channel.getRawInterestOps(), channel); } fireChannelConnected(channel, remoteAddress); } } }
worker线程的run操作:
public void run() { for (;;) { //===SELECT=== SelectorUtil.select(selector); processRegisterTaskQueue(); processEventQueue(); processWriteTaskQueue(); //在这里面,就会遍历selectedKey并调用相关联的handler processSelectedKeys(selector.selectedKeys()); } } private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException { for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); int readyOps = k.readyOps(); //下面的“与”操作等价于k.isReadable if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { //执行读操作 if (!read(k)) { continue; } } //执行写操作 if ((readyOps & SelectionKey.OP_WRITE) != 0) { writeFromSelectorLoop(k); } } } /* 主要是两个操作: 1.从channel里面读取数据 2.读取完成后,fireMessageReceived,从channel(k.attachment) 可以得到与它关联的pipeline,从而触发pipeline里面的handler */ protected boolean read(SelectionKey k) { /* 变量“ch”的类型是java.nio.channels.SocketChannel,是“the channel for which this key was created” 变量“channel”的类型是org.jboss.netty.channel.socket.nio.NioSocketChannel,是“the attachment for which this key was created” 因此,“ch”的作用就是读数据,而“channel”的作用则是取得pipeline后开始处理数据 但,“channel”似乎是“包含”了“ch”? 我们知道,org.jboss.netty.channel.socket.nio.NioSocketChannel是对java.nio.channels.SocketChannel的封装, 正如org.jboss.netty.channel.socket.nio.NioServerSocketChannel是对java.nio.channels.ServerSocketChannel的封装 而查看RegisterTask的run方法,register并返回SelectionKey: channel.channel.register( selector, channel.getRawInterestOps(), channel); 变量的命名让人糊涂,翻译一下: acceptedNioSocketChannel.channel.register(selector, ops, acceptedNioSocketChannel) 注意到acceptedNioSocketChannel.channel的真实类型,其实就是java.nio.channels.SocketChannel, 它就是下面代码里的“acceptedSocket”: worker.register(new NioAcceptedSocketChannel( channel.getFactory(), pipeline, channel, NioServerSocketPipelineSink.this, acceptedSocket, worker, currentThread), null); 因此,是不是可以认为“channel”与“ch”的关系是: ch = channel.channel */ final SocketChannel ch = (SocketChannel) k.channel(); final NioSocketChannel channel = (NioSocketChannel) k.attachment(); //会根据这一次接收到的数据的大小,来预测下一次接收数据的大小 //并以此为依据来决定ByteBuffer的大小 final ReceiveBufferSizePredictor predictor = channel.getConfig().getReceiveBufferSizePredictor(); final int predictedRecvBufSize = predictor.nextReceiveBufferSize(); int ret = 0; int readBytes = 0; boolean failure = true; ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); try { while ((ret = ch.read(bb)) > 0) { readBytes += ret; if (!bb.hasRemaining()) { break; } } failure = false; } if (readBytes > 0) { bb.flip(); final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory(); final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes); buffer.setBytes(0, bb); buffer.writerIndex(readBytes); recvBufferPool.release(bb); // Update the predictor. predictor.previousReceiveBufferSize(readBytes); // Fire the event. fireMessageReceived(channel, buffer); } return true; }
发表评论
-
TCP的TIME-WAIT
2014-04-23 16:35 1205原文连接:http://vincent.bernat.im/e ... -
《TCPIP详解卷1》学习-拥塞避免
2014-01-15 15:16 159拥塞避免算法、 ... -
Netty源码学习-HTTP-tunnel
2014-01-14 18:19 4309Netty关于HTTP tunnel的说明: http://d ... -
Netty源码学习-FileRegion
2013-12-31 17:17 5676今天看org.jboss.netty.example.http ... -
Netty源码学习-HttpChunkAggregator-HttpRequestEncoder-HttpResponseDecoder
2013-12-27 16:10 4099今天看Netty如何实现一个Http Server org.j ... -
Netty源码学习-ReadTimeoutHandler
2013-12-26 17:53 3852ReadTimeoutHandler的实现思 ... -
Netty学习笔记
2013-12-25 18:39 1492本文是阅读以下两篇文章时: http://seeallhear ... -
Netty源码学习-ChannelHandler
2013-12-25 18:12 1639一般来说,“有状态”的ChannelHandler不应 ... -
Netty源码学习-Java-NIO-Reactor
2013-12-19 18:21 4901Netty里面采用了NIO-based Reactor Pat ... -
Netty源码学习-ReplayingDecoder
2013-12-13 20:21 4272ReplayingDecoder是FrameDecoder的子 ... -
Netty源码学习-DefaultChannelPipeline2
2013-12-11 15:47 1292Netty3的API http://docs.jboss.or ... -
Netty源码学习-CompositeChannelBuffer
2013-12-06 15:54 2768CompositeChannelBuffer体现了Netty的 ... -
Netty源码学习-DelimiterBasedFrameDecoder
2013-12-05 18:36 9585看DelimiterBasedFrameDecoder的AP ... -
Netty源码学习-ObjectEncoder和ObjectDecoder
2013-12-05 16:06 5009Netty中传递对象的思路很直观: Netty中数据的传递是基 ... -
Netty源码学习-LengthFieldBasedFrameDecoder
2013-12-05 15:20 7329先看看LengthFieldBasedFrameDecoder ... -
Netty源码学习-FrameDecoder
2013-11-28 18:38 3953Netty 3.x的user guide里FrameDecod ... -
Netty源码学习-DefaultChannelPipeline
2013-11-27 17:00 2247package com.ljn.channel; /** ...
相关推荐
《基于Springboot的Netty TCP Server实践与整合》 在当今的分布式系统中,网络通信是不可或缺的一部分。...这是一个很好的学习和实践案例,对于深入理解Springboot、Netty以及RESTful API设计都有很大的帮助。
4. 灵活性:Netty支持对多种协议的处理,且通过拦截器模式,允许开发者对消息处理过程进行拦截,定制自己的处理逻辑。 5. 完整的文档和活跃的社区:Netty具有丰富的文档资料和一个活跃的社区,有助于用户快速学习和...
《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...
通过学习 Netty 的源码,开发者可以更好地理解和优化其在网络通信中的性能表现,比如减少内存分配、提高并行度以及优化编码解码过程等。同时,配合中文文档,可以降低学习门槛,让开发者更快地掌握 Netty 的使用技巧...
### Netty源码解析知识点概览 #### 一、Netty简介与应用场景 - **Netty**是一款由JBOSS提供的高性能的...以上是对“netty源码解析视频”教程涉及的主要知识点进行的详细梳理和解释,希望对你学习Netty源码有所帮助。
Netty 是一个高性能、异步事件驱动的...总之,Netty 源码是一个很好的学习平台,可以深入了解网络编程、并发处理以及高效 I/O 设计。通过分析源码,不仅可以提升自己的技术能力,还能为解决实际问题提供灵感和参考。
Netty源码阅读的目的通常有两个:一是因为工作中使用到了Netty,希望通过阅读源码来更加深入地了解它;二是出于对Java网络编程的兴趣,希望通过学习Netty来探索如何构建高性能网络应用。同时,Netty的代码结构组织...
源码分析可以帮助我们深入理解Netty的内部机制,比如ByteBuf如何管理内存,ChannelHandlerContext如何传递事件,以及Pipeline(管道)如何处理消息等。你可以看到EventLoopGroup是如何管理EventLoop的,以及...
- 分析项目源码,了解ServerBootstrap和Bootstrap的配置过程。 - 理解ChannelHandler和ChannelPipeline的事件处理机制。 - 学习如何编写自定义的Encoder和Decoder。 - 实践客户端和服务器的交互,调试运行聊天...
《Netty源码深入剖析》一书旨在帮助读者深入了解Netty框架的工作原理和技术细节,从基础知识入手,逐步过渡到高级优化技巧,使开发者能够更好地掌握并应用Netty于实际项目中。 ### 一、Netty简介与核心特性 Netty...
通过分析NettyChat源码,我们可以学习到如何利用Netty和WebSocket实现高效的实时聊天功能,并了解如何组织和设计网络应用程序。这个项目是深入理解这两种技术及其相互配合的好实例,对于提升Java网络编程能力具有很...
在 Netty 中,我们首先定义一个 ServerBootstrap,它是服务器启动配置的入口。然后,我们设置 ChildHandler(通常是 ChannelInitializer),在这个初始化器中,我们可以添加 ChannelHandlers 到 ChannelPipeline 中...
Java Nety是一个高性能、异步事件...对于Java开发者来说,深入学习Netty源码不仅可以提升网络编程技能,也有助于解决实际问题,比如优化性能、处理异常、实现自定义协议等。因此,Netty源码是Java学习者宝贵的资源。
Netty的入门通常包括了解其基本组件,如Bootstrap(启动引导类)、ServerBootstrap(服务器启动引导类)、Channel(通道)、EventLoop(事件循环)、Handler(处理器)等。学习者会学习如何创建简单的服务端和客户端...
- 服务器端的启动类通常会初始化一个 ServerBootstrap,配置好 EventLoopGroup(工作线程组),并绑定监听端口,然后添加 ChannelHandler 处理接收到的连接和数据。 - 客户端则会创建一个 Bootstrap,配置好连接到...
3. **ChannelHandler**:Netty中的`ChannelHandler`接口是处理I/O事件的核心组件。ServerChannel会将接收到的客户端连接转换为新的`Channel`,每个新建立的连接都有自己的事件处理管道。你可以通过`ChannelPipeline`...
《Netty权威指南(第2版)》是一本深度探讨Netty框架的书籍,它为读者提供了全面且深入的理解和实践指导,旨在帮助开发者更好地...配合源码学习,能够更直观地理解Netty的工作机制,从而更好地运用到实际的项目开发中。
- 在Netty中,我们首先需要创建一个`ServerBootstrap`实例,它是服务器的启动类。 - 接着,配置`EventLoopGroup`,这是Netty的I/O线程模型,通常包括BossGroup(负责接收连接)和WorkerGroup(处理I/O事件)。 - ...