- 浏览: 787493 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
萨琳娜啊:
Java读源码之Netty深入剖析网盘地址:https://p ...
Netty源码学习-FileRegion -
飞天奔月:
写得有趣 ^_^
那一年你定义了一个接口 -
GoldRoger:
第二个方法很好
java-判断一个自然数是否是某个数的平方。当然不能使用开方运算 -
bylijinnan:
<script>alert("close ...
自己动手实现Java Validation -
paul920531:
39行有个bug:"int j=new Random ...
java-蓄水池抽样-要求从N个元素中随机的抽取k个元素,其中N无法确定
Netty里面采用了NIO-based Reactor Pattern
了解这个模式对学习Netty非常有帮助
参考以下两篇文章:
http://jeewanthad.blogspot.com/2013/02/reactor-pattern-explained-part-1.html
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
本文所贴的代码来自第一篇文章,在注释部分加入了我自己的理解
完整代码可以到我的github上下载,仅供参考:
https://github.com/bylijinnan/nettyLearn/tree/master/ljn-netty3-learn/src/main/java/com/ljn/reactor
package com.ljn.reactor; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /* 单线程的实现 Server端用一个Selector利用一个线程(在main方法里面start)来响应所有请求 1.当ACCEPT事件就绪,Acceptor被选中,执行它的run方法:创建一个Handler(例如为handlerA),并将Handler的interestOps初始为READ 2.当READ事件就绪,handlerA被选中,执行它的run方法:它根据自身的当前状态,来执行读或写操作 因此,每一个Client连接过来,Server就创建一个Handler,但都所有操作都在一个线程里面 Selection Key Channel Handler Interested Operation ------------------------------------------------------------------------ SelectionKey 0 ServerSocketChannel Acceptor Accept SelectionKey 1 SocketChannel 1 Handler 1 Read and Write SelectionKey 2 SocketChannel 2 Handler 2 Read and Write SelectionKey 3 SocketChannel 3 Handler 3 Read and Write 如果采用多个selector,那就是所谓的“Multiple Reactor Threads”,大体思路如下: Selector[] selectors; // also create threads int next = 0; class Acceptor { // ... public synchronized void run() { ... Socket connection = serverSocket.accept(); if (connection != null) new Handler(selectors[next], connection); if (++next == selectors.length) next = 0; } } */ public class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocketChannel; final boolean isWithThreadPool; /*Reactor的主要工作: * 1.给ServerSocketChannel设置一个Acceptor,接收请求 * 2.给每一个一个SocketChannel(代表一个Client)关联一个Handler * 要注意其实Acceptor也是一个Handler(只是与它关联的channel是ServerSocketChannel而不是SocketChannel) */ Reactor(int port, boolean isWithThreadPool) throws IOException { this.isWithThreadPool = isWithThreadPool; selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); selectionKey0.attach(new Acceptor()); } public void run() { System.out.println("Server listening to port: " + serverSocketChannel.socket().getLocalPort()); try { while (!Thread.interrupted()) { int readySelectionKeyCount = selector.select(); if (readySelectionKeyCount == 0) { continue; } Set<SelectionKey> selected = selector.selectedKeys(); Iterator<SelectionKey> it = selected.iterator(); while (it.hasNext()) { dispatch((SelectionKey) (it.next())); } //不会自动remove,因此要手动清;下次事件到来会自动添加 selected.clear(); } } catch (IOException ex) { ex.printStackTrace(); } } //从SelectionKey中取出Handler并执行Handler的run方法,没有创建新线程 void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null) { r.run(); } } //主要工作是为每一个连接成功后返回的SocketChannel关联一个Handler,详见Handler的构造函数 class Acceptor implements Runnable { public void run() { try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { if (isWithThreadPool) new HandlerWithThreadPool(selector, socketChannel); else new Handler(selector, socketChannel); } System.out.println("Connection Accepted by Reactor2"); } catch (IOException ex) { ex.printStackTrace(); } } } public static void main(String[] args) throws IOException{ int port = 9900; boolean withThreadPool = false; Reactor reactor = new Reactor(port, withThreadPool); new Thread(reactor).start(); } }
package com.ljn.reactor; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /* * 单线程版本的Handler */ public class Handler implements Runnable { final SocketChannel socketChannel; final SelectionKey selectionKey; ByteBuffer input = ByteBuffer.allocate(1024); static final int READING = 0, SENDING = 1; //初始状态 int state = READING; String clientName = ""; //在handler里面设置interestOps,而且这个interestOps是会随着事件的进行而改变的 Handler(Selector selector, SocketChannel c) throws IOException { socketChannel = c; c.configureBlocking(false); selectionKey = socketChannel.register(selector, 0); /* handler作为SellectionKey的attachment。这样,handler就与SelectionKey也就是interestOps对应起来了 反过来说,当interestOps发生、SelectionKey被选中时,就能从SelectionKey中取得handler */ selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } //在Reactor的dispatch方法里面被调用,但是直接的方法调用,没有创建新线程 public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { ex.printStackTrace(); } } void read() throws IOException { int readCount = socketChannel.read(input); if (readCount > 0) { readProcess(readCount); } state = SENDING; // Interested in writing selectionKey.interestOps(SelectionKey.OP_WRITE); } /** * Processing of the read message. This only prints the message to stdOut. * 非IO操作(业务逻辑,实际应用中可能会非常耗时):将Client发过来的信息(clientName)转成字符串形式 * @param readCount */ synchronized void readProcess(int readCount) { StringBuilder sb = new StringBuilder(); input.flip(); //from writing mode to reading mode byte[] subStringBytes = new byte[readCount]; byte[] array = input.array(); System.arraycopy(array, 0, subStringBytes, 0, readCount); // Assuming ASCII (bad assumption but simplifies the example) sb.append(new String(subStringBytes)); input.clear(); clientName = sb.toString().trim(); } void send() throws IOException { System.out.println("Saying hello to " + clientName); ByteBuffer output = ByteBuffer.wrap(("Hello " + clientName + "\n").getBytes()); socketChannel.write(output); selectionKey.interestOps(SelectionKey.OP_READ); state = READING; } }
package com.ljn.reactor; import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* * 多线程版本的Handler * 思路就是把耗时的操作(非IO操作)放到其他线程里面跑, * 使得Handler只专注与Channel之间的IO操作; * Handler快速地从Channel中读或写,可以使Channel及时地、更快地响应其他请求 * 耗时的操作完成后,产生一个事件(改变state),再“通知”(由Handler轮询这个状态是否有改变) * Handler执行Channel的读写操作 */ public class HandlerWithThreadPool extends Handler { static ExecutorService pool = Executors.newFixedThreadPool(2); static final int PROCESSING = 2; public HandlerWithThreadPool(Selector sel, SocketChannel c) throws IOException { super(sel, c); } //Handler从SocketChannel中读到数据后,把“数据的处理”这个工作扔到线程池里面执行 void read() throws IOException { int readCount = socketChannel.read(input); if (readCount > 0) { state = PROCESSING; //execute是非阻塞的,所以要新增一个state(PROCESSING),表示数据在处理当中,Handler还不能执行send操作 pool.execute(new Processer(readCount)); } //We are interested in writing back to the client soon after read processing is done. //这时候虽然设置了OP_WRITE,但下一次本Handler被选中时不会执行send()方法,因为state=PROCESSING //或者可以把这个设置放到Processer里面,等process完成后再设为OP_WRITE selectionKey.interestOps(SelectionKey.OP_WRITE); } //Start processing in a new Processer Thread and Hand off to the reactor thread. synchronized void processAndHandOff(int readCount) { readProcess(readCount); //Read processing done. Now the server is ready to send a message to the client. state = SENDING; } class Processer implements Runnable { int readCount; Processer(int readCount) { this.readCount = readCount; } public void run() { processAndHandOff(readCount); } } }
发表评论
-
TCP的TIME-WAIT
2014-04-23 16:35 1199原文连接:http://vincent.bernat.im/e ... -
《TCPIP详解卷1》学习-拥塞避免
2014-01-15 15:16 159拥塞避免算法、 ... -
Netty源码学习-HTTP-tunnel
2014-01-14 18:19 4305Netty关于HTTP tunnel的说明: http://d ... -
Netty源码学习-FileRegion
2013-12-31 17:17 5663今天看org.jboss.netty.example.http ... -
Netty源码学习-HttpChunkAggregator-HttpRequestEncoder-HttpResponseDecoder
2013-12-27 16:10 4092今天看Netty如何实现一个Http Server org.j ... -
Netty源码学习-ReadTimeoutHandler
2013-12-26 17:53 3844ReadTimeoutHandler的实现思 ... -
Netty学习笔记
2013-12-25 18:39 1489本文是阅读以下两篇文章时: http://seeallhear ... -
Netty源码学习-ChannelHandler
2013-12-25 18:12 1637一般来说,“有状态”的ChannelHandler不应 ... -
Netty源码学习-ServerBootstrap启动及事件处理过程
2013-12-19 20:11 10765Netty是采用了Reactor模式的多线程版本,建议先看下面 ... -
Netty源码学习-ReplayingDecoder
2013-12-13 20:21 4267ReplayingDecoder是FrameDecoder的子 ... -
Netty源码学习-DefaultChannelPipeline2
2013-12-11 15:47 1286Netty3的API http://docs.jboss.or ... -
Netty源码学习-CompositeChannelBuffer
2013-12-06 15:54 2763CompositeChannelBuffer体现了Netty的 ... -
Netty源码学习-DelimiterBasedFrameDecoder
2013-12-05 18:36 9563看DelimiterBasedFrameDecoder的AP ... -
Netty源码学习-ObjectEncoder和ObjectDecoder
2013-12-05 16:06 5008Netty中传递对象的思路很直观: Netty中数据的传递是基 ... -
Netty源码学习-LengthFieldBasedFrameDecoder
2013-12-05 15:20 7313先看看LengthFieldBasedFrameDecoder ... -
Netty源码学习-FrameDecoder
2013-11-28 18:38 3940Netty 3.x的user guide里FrameDecod ... -
Netty源码学习-DefaultChannelPipeline
2013-11-27 17:00 2243package com.ljn.channel; /** ...
相关推荐
总之,Netty-4.1.97.Final源码提供了丰富的学习资源,涵盖了网络编程的各个方面,对于提升Java程序员的专业技能具有重要作用。通过深入研究源码,你将能够更好地掌握Netty的工作原理,为你的项目带来更高效、更稳定...
接下来,我们讨论`ByteBuf`,这是Netty中的缓冲区实现,相比Java NIO中的`ByteBuffer`,它提供了更高效且易用的API。`ByteBuf`支持读写索引管理,可以避免不必要的内存拷贝,并提供了一套完整的内存管理机制,包括预...
1. **异步非阻塞I/O**: Netty基于Java NIO(非阻塞I/O)构建,它允许一个线程处理多个连接,提高了系统的并发能力。在高并发场景下,如大型网络游戏服务器、金融交易系统等,Netty表现出色。 2. **零拷贝技术**: ...
Netty 框架的核心理念是基于“Reactor”模式,通过非阻塞 I/O(NIO)实现高效处理。它提供了高度可定制的 ChannelHandler 接口,允许开发者自定义网络事件的处理逻辑。Netty 的 ChannelPipeline 提供了事件处理链的...
《深入理解Reactor Netty:Java中的高性能网络库》 Reactor Netty是Spring Reactor项目的一部分,它是一个用于构建高性能、反应式网络应用的Java库。Reactor Netty的核心理念在于利用非阻塞I/O和事件驱动模型,提供...
1. **Netty概述**:Netty是由JBOSS提供的一个开源框架,基于Java NIO(非阻塞I/O)构建,它提供了一组高度优化的网络操作API,简化了网络编程的复杂性,如TCP、UDP和HTTP等传输协议。 2. **异步事件驱动**:Netty的...
总的来说,这个示例代码是学习和理解Java NIO和Reactor模式的好起点。通过分析和运行这段代码,你可以更深入地了解如何在Java中构建高并发、非阻塞的网络服务。不过,为了构建健壮的生产环境应用,你需要进一步完善...
Java Nety是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的协议服务器和客户端。这个项目是基于Java编程语言实现的,并且深受Java开发者喜爱,尤其是在...因此,Netty源码是Java学习者宝贵的资源。
学习 Netty 的资源通常包括源码分析、官方文档、示例项目以及社区提供的教程。压缩包中的“代码”部分可能包含了 Netty 的示例代码或者已经实现的协议处理器,这对于理解框架的工作原理非常有帮助。而“文档”部分...
标题“reactor-netty-demo”表明这是一个关于Reactor Netty的实际应用示例,而描述中的内容没有提供额外信息,我们主要依赖标签“Java”来推测这可能是一个使用Java语言实现的项目,具体来说,它利用了Reactor Netty...
Netty通过利用Java的NIO类库,提供了高效的网络处理能力,并且具有很高的可定制性和扩展性,它支持快速开发可维护的高性能协议服务器和客户端。 Netty的设计目标是让网络应用开发变得更加简单,它抽象了网络编程中...
Netty是一个基于NIO(Non-blocking I/O)的Java开源框架,用于简化网络编程。它提供了一套完整的解决方案,包括线程模型、传输层、编解码器等,使开发者能够更加专注于业务逻辑而不是底层I/O处理。 #### 2. Netty的...
- **非阻塞 I/O**:Netty 基于 Java NIO (Non-blocking I/O) 构建,利用 reactor 模式,提供高并发下的高效网络通信。 - **Channel** 和 **Pipeline**:Channel 负责 I/O 操作,Pipeline 是一系列处理器的链,用于...
在学习和使用Netty时,可以通过`HelloNetty_src.zip`中的源码示例来理解其工作原理。这个例子通常会展示如何创建一个简单的服务器和客户端,通过Netty发送和接收消息。从中你可以了解到如何配置ChannelHandler(处理...
1. **异步非阻塞通信**:Netty基于Java NIO技术实现,通过使用异步非阻塞IO(AIO)和多路复用技术,能够在一个线程中处理多个客户端连接,极大地提高了系统的并发处理能力。 - **NIO多路复用模型**:如图2-3所示...
通过Netty基于Java NIO的https代理服务器的实现。 这是一个简单的工作原理: 要通过浏览器对其进行测试,我们将需要设置一个虚拟主机名,如下所示: 127.0.0.1 test.localdomain 这样浏览器就能将SNI发送到我们...
Java编程方法论-响应式篇-Reactor分享视频已完结B站: : 油管: : list PL95Ey4rht7980EH8yr7SLBvj9XSE1ggdyJava编程方法论-响应式篇-Reactor-Netty相关博文: : 视频分享: B站: : 油管: : 6qLh2L75KdM list PL95...