事件模型
在分析Netty的事件模型之前,先回忆一下一种编程模型,不是OO编程,也不是函数式编程,而是一种基于事件驱动的编程方式。这种编程方式实际上我们经常接触到,比如UI编程,点击一个按钮就会触发一个动作响应,这都是事件驱动的编程实现。包括页面加载触发的动作,还有Select, Poll, EPoll这种基于事件驱动的IO机制。
一个事件模型包括事件源,触发的事件,以及事件触发的动作响应三部分。
1、事件源
指的就是谁触发的事件。比如点击按钮就会触发响应的动作,按钮就是一个事件源。页面加载也会触发相应的动作,页面也就是一个事件源。Select, Poll, EPoll中IO也是一个事件源,触发读写事件。
在Netty中我们还将讲到Channel,这也是一个事件源,Channel上产生事件或者IO操作。当然Netty中的事件模型并不只是体现在Channel上。
2、事件
3、事件触发的动作
Netty在实现Reactor的时候就采用了这种事件驱动模型。关于Reactor,可以参考POSA 2(Pattern-Oriented Software Architecture, Volume 2, 面向模式的软件架构 第2卷),这是一本关于面向模式的软件架构的书籍。
想要理解Netty的Reactor模式实现需要对事件驱动这种模型有一定的理解。还有就是多路复用这种技术,比如Java中的Selector,就是一种multiplexor多路复用这种技术。还有就是Linux下的EPoll这种机制。
Netty事件模型的核心组件包括EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup、Channel、ChannelPipeline以及ChannelHandler等。
在事件模型中,必须要有一个事件处理机制,这个事件处理机制负责接收触发的事件,并分发给对应的事件处理,以便响应相应的动作。
在Netty中,EventLoop就是这样的一个角色。
Netty中的事件处理机制
1、EventLoopGroup
2、EventLoop
3、EventExecutorGroup
4、EventExecutor
EventLoop和EventLoopGroup这两个接口不太好理解。从这两个接口定义来看,它们是is-a的关系,也就是说,EventLoop也是一个EventLoopGroup。除了这两个接口,另外还有一个EventExecutorGroup接口。EventLoopGroup和EventExecutorGroup这两个接口也不太好理解。从这两个接口定义来看,它们也是is-a的关系,也就是说,EventLoopGroup也是一个EventExecutorGroup。和EventExecutorGroup接口对应的,还有一个EventExecutor。这两个也是is-a的关系,也就是说,EventExecutor也是一个EventExecutorGroup。
这几个接口之间的关系
EventLoop is-a EventLoopGroup is-a EventExecutorGroup
EventLoop is-a OrderedEventExecutor is-a EventExecutor is-a EventExecutorGroup
EventLoopGroup中有一个重要的next方法,它返回(或者选择)的是一个EventLoop,而在EventExecutorGroup中也有一个重要的next方法,但它返回(或者选择)的是一个EventExecutor。
Netty的这几个核心组件关系看着合理,但挺不好理解的。而且在具体实现的时候有些地方挺怪异的,比如EventLoopGroup的next方法是一个挺重要的方法,它返回一个EventLoop,但在ThreadPerChannelEventLoopGroup的实现中不支持,直接抛出UnsupportedOperationException异常,在OioEventLoopGroup中也是这样。这个方法只在MultithreadEventLoopGroup多线程EventLoopGroup中才有用。
因为EventExecutor也是一个EventExecutorGroup,而EventLoop是一个EventLoopGroup,同时EventLoopGroup也是一个EventExecutorGroup,所以在注册Channel时,
想要理解Netty的这几个核心组件需要对Java的Executor、ExecutorService、ScheduledExecutorService这几个标准接口以及AbstractExecutorService要有个深入的理解。可以参考Java的线程池实现。
EventLoop
EventLoop其实就是一个Reactor。简单的理解就是它会去轮训事件或者IO操作,如果Channel有事件或者IO操作产生,比如有连接过来,或者有数据可读或可写,它负责将这些请求分发给对应的ChannelHandler处理.
Netty4定义了一个EventLoop接口,用于处理Channel产生的IO操作。
EventLoop有一个inEventLoop方法,从EventExecutor接口中继承过来的,用于判断是否当前线程。
如果不是当前线程,EventLoop会启动一个线程任务去负责轮训事件或者IO操作,参考SingleThreadEventExecutor类的execute方法。这里会调用一个startThread方法,这个方法会调用doStartThread方法,在这个方法中,会调用run去不断的轮训。
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); }
NioEventLoop
非阻塞式IO场景下使用。
NioEventLoop继承了SingleThreadEventLoop,所以NioEventLoop是一个单线程的EventLoop。
EpollEventLoop
基于Epoll的IO事件处理机制的IO多路复用场景下使用。Epoll是Linux下提供的基于事件的IO处理机制,所以这个只能在Linux下采用使用。而且Java提供的IO多路复用技术在不同OS下所采用的机制不一样。如果是Linux,采用的是Epoll,如果是Windows,采用的是完成端口。
具体EpollEventLoop采用的什么样的机制后面再详细说明。
EventLoopGroup
对EventLoop进行分组,定义了一个EventLoopGroup接口,这个接口继承了EventExecutorGroup接口,定义也比较简单。
1、next
EventLoop next();
2、register
ChannelFuture register(Channel channel);
ChannelFuture register(ChannelPromise promise);
ChannelFuture register(Channel channel, ChannelPromise promise);
其中next方法用于从Group中返回(或者选择)一个EventLoop进行Looping。一旦Channel产生了事件或者IO操作,就可以Looping到产生的事件或者IO操作,进而进行处理。register方法用于注册一个Channel和(或者)一个ChannelPromise,在注册的时候,会选择一个EventLoop进行注册,后面在看具体实现的时候可以分析下具体的选择策略。
EventLoopGroup接口有一个AbstractEventLoopGroup抽象实现。不过并没有具体实现继承了这个抽象类。仅仅定义了一个next抽象方法。这个抽象方法覆盖了父接口EventExecutorGroup的next方法,而对父接口EventLoopGroup的next方法进行了抽象定义。
1、next
public abstract EventLoop next();
这个设计挺怪异的。虽说Java类并没有像C++那样支持多继承,但接口支持多继承,从这里来看,Java接口的多继承机制也挺怪异的。
和EventLoopGroup对应的,还有一个EventExecutorGroup。EventLoopGroup也是一个EventExecutorGroup。
EventExecutorGroup
EventExecutorGroup接口扩展了Java的ScheduledExecutorService接口,对于Java程序员来说,应该对这个是了解的,Java中的线程池实现一个是ThreadPoolExecutor,另一个就是ScheduledThreadPoolExecutor,这个是一个可调度的线城池实现,就实现了ScheduledExecutorService接口。
ScheduledExecutorService接口定义很简单,就几个方法:
1、schedule
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
2、scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
3、scheduleWithFixedDelay
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
ScheduledExecutorService接口扩展了ExecutorService接口
1、shutdown
void shutdown();
2、shutdownNow
List<Runnable> shutdownNow();
3、isShutdown
boolean isShutdown();
4、isTerminated
boolean isTerminated();
5、awaitTermination
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
6、submit
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
7、invokeAll
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
8、invokeAny
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
ExecutorService接口又扩展了Executor接口
1、execute
void execute(Runnable command);
EventExecutorGroup接口也扩展了自己的一些定义
1、isShuttingDown
boolean isShuttingDown();
2、shutdownGracefully
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
3、terminationFuture
Future<?> terminationFuture();
4、shutdown
void shutdown();
5、shutdownNow
List<Runnable> shutdownNow();
6、next
EventExecutor next();
7、iterator
Iterator<EventExecutor> iterator();
8、submit
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);
9、schedule
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
10、scheduleAtFixedRate
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
11、scheduleWithFixedDelay
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
说到线城池,我们也可以通过实现Executor、ExecutorService或者ScheduledExecutorService接口来实现自己的线城池。且不说实现一个线程池难度如何,从ExecutorService接口定义来看,就要实现不少接口方法。不过可以参考ThreadPoolExecutor、ScheduledThreadPoolExecutor实现,Java中的这两个线城池实现的也不错。
EventExecutor
MultithreadEventLoopGroup
多线程EventLoopGroup。包括:
1、DefaultEventLoopGroup(LocalEventLoopGroup)
2、NioEventLoopGroup
3、EpollEventLoopGroup
NioEventLoopGroup
这是一个多线程模型的EventLoopGroup。NioEventLoopGroup会创建指定线程数的EventExecutor,参考MultithreadEventExecutorGroup,NioEventLoopGroup间接继承了MultithreadEventExecutorGroup。
默认情况下,也就是在创建NioEventLoopGroup对象的时候使用无参构造方法或者线程数指定0的时候,线程数为逻辑核数*2。
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
Channel
Channel是一个很重要的概念,这是一个很抽象的概念。通常理解来说,Channel是一个类似于通信的信道。
从通信的角度看,通信的双方通过这个Channel进行对话,双方通过这个Channel发送和接受对方的信息。从这个角度看,双方进行通信似乎只需要这一个Channel就可以通信,这的确是可以的,不过在同一时刻只能是单向通信,如果需要同时双向通信的话,从逻辑上来讲,需要两个Channel。
如果两台机器需要进行通信,需要通过一个Socket进行通信。从Socket角度看,双方通过一个Socket发送和接收对方的信息,所以Socket就是这样的一个Channel。另外对于TCP来说,通信的双方如果需要通信的话,在通信之前需要建立一个连接。如果两台机器需要进行TCP通信的话,也就需要两个Socket,一台机器对应有一个Socket。实际上至少会有两个Socket,通常会有三个Socket,一方发起连接,另一方接收到这个连接后,也会有一个Socket,这个Socket代表连接方的Socket。在客户端服务器编程中,也就是我们习惯说的服务端Socket和客户端Socket。这两个概念比较容易混淆,需要注意的是,通信的双方并不是在各自创建的Socket上发送和接收对方的信息,一方接收到另一方发起的连接,会得到一个代表另一方连接的Socket,连接方通过自己创建的Socket和对方通信,这个Socket代表的是对方的Socket,或者说其实代表的是服务端Socket,在这个Socket上发送和接收另一方的信息,另一方通过得到的这个代表连接方连接的Socket和连接方通信,发送和接收连接方的信息。
另外需要注意的是,通信的双方Socket其实是对等的,并不存在所谓的客户端和服务端Socket的区别。Linux在实现Socket这块体现的非常好的。
在一些Socket实现中,尤其是在一些编程语言中,在实现Socket这块把Socket分为客户端和服务端Socket,比如Java,这其实是结合了面向对象思想,从OO的角度看待Socket。引入了客户端/服务端这种模式思想。
Channel是Netty中的一个核心组件。Netty定义了一个Channel接口。Channel打开后需要注册到EventLoop中,先是调用EventLoopGroup的register注册方法,这个方法会调用next方法,选择一个EventLoop,然后调用EventLoop的register注册方法将这个Channel封装成一个ChannelPromise进行注册。最后还是会调回到Channel的register注册方法。参考AbstractChannel的register方法。
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
这里会调用AbstractChannel的register0方法。在这个方法中会调用doRegister方法,最后会调用到具体的Channel的doRegister方法进行注册。比如在NioServerSocketChannel、NioSocketChannel这些非阻塞式(NIO)Channel中,doRegister方法实现统一在AbstractNioChannel中实现。参考AbstractNioChannel的doRegister方法。
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
这里会调用Java的Channel(这里应该是SelectableChannel)的register方法进行注册。初始注册时,interest set指定为0,attachment保存的是Netty的要注册的Channel。
Channel注册后,如果有注册的ChannelHandler还有未触发调用的动作还在pending等待队列中,会调用DefaultChannelPipeline的invokeHandlerAddedIfNeeded方法,这个方法会调用callHandlerAddedForAllHandlers方法按照顺序执行这些触发调用的动作。同时还会触发Channel注册时要调用的动作,这里Channel注册也是一个事件,Channel注册后,会触发Channel注册事件。这里会调用ChannelPipeline的fireChannelRegistered方法,这个方法会调用注册的ChannelHandler的channelRegistered方法。
Unsafe
在Channel接口内还定义了一个Unsafe接口。这个接口不可以在外部用户程序中调用。
* <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods
* are only provided to implement the actual transport, and must be invoked from an I/O thread except for the
* following methods:
NioServerSocketChannel
这其实是一个连接通道,对应接收方的Socket,负责建立和连接方的连接。需要注意的是,Java在实现Socket这块把Socket分为客户端和服务端Socket,NioServerSocketChannel对应的是服务器端的ServerSocket。
这个Channel其实是一个比较特殊的Channel。首先它是非阻塞的(NIO),另外它是一个ServerSocket的Channel,这个Channel只能在这个上面接收客户端的连接,获取和客户端通信的Channel。不过Netty在实现NioServerSocketChannel的时候,确是直接继承了AbstractNioMessageChannel类。这样就得在实现AbstractNioMessageChannel的doWriteMessage方法,另外AbstractNioMessageChannel又继承了AbstractNioChannel类,就得实现AbstractNioChannel的doFinishConnect和doConnect方法,还有AbstractNioChannel又继承了AbstractChannel类,又得实现AbstractChannel的doDisconnect方法,Netty不得不在这些方法中直接报UnsupportedOperationException。
NioServerSocketChannel中主要就一个doReadMessages方法。这个方法调用Java的ServerSocketChannel的accept方法获取和客户端连接的通道SocketChannel,通过这个Channel和客户端通信。
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
ChannelPipeline
在DefaultChannelPipeline中还维护了一个注册添加、删除ChannelHandler的等待队列,也就是一个pending队列,这个等待队列是一个单链表。在对应的Channel还没有被注册时,向ChannelPipeline中注册添加、删除ChannelHandler后,不会调用对应ChannelHandler的handlerAdded和handlerRemoved方法,而是把这个要触发调用的动作放在一个pending等待队列中。将这个要触发调用的动作封装成一个PendingHandlerCallback放在这个等待队列中,如果是注册添加ChannelHandler,会将这个ChannelHandler对应的ChannelHandlerContext封装成一个PendingHandlerAddedTask,如果是删除注册的ChannelHandler,会将这个ChannelHandler对应的ChannelHandlerContext封装成一个PendingHandlerRemovedTask,然后放在队列中。
这个等待队列只有一个对头指向而没有一个队尾,在向这个队列中添加时,需要遍历找到队尾,才能在队尾进行添加。
ChannelHandler
1、handlerAdded
2、handlerRemoved
3、exceptionCaught
在注册添加、删除ChannelHandler或者报异常时会触发调用这几个方法。比如通过ChannelPipeline注册添加ChannelHandler时,会调用ChannelHandler的handlerAdded方法。在删除注册的ChannelHandler时,会调用handlerRemoved方法。
ByteBuf
Netty实现了自己的一套Buffer。
Netty的Buffer定义为一个可以同时随机和线性访问的有限序列。
* A random and sequential accessible sequence of zero or more bytes (octets).
* This interface provides an abstract view for one or more primitive byte
* arrays ({@code byte[]}) and {@linkplain ByteBuffer NIO buffers}.
Netty的Buffer支持扩容或者缩容。
支持discard。
支持线性读写操作:readerIndex和writerIndex
和Java中的ByteBuffer比较
Java中的Buffer就是我们说的读写缓冲。
我们在进行IO操作的时候,其实就涉及到这个东西了,只是这个IO缓冲一般我们不会直接去操作,所以平常我们不去深究的话,就没有感觉到读写缓冲的存在。这个东西其实在我们调用printf标准函数或者Java中的System.out.println的时候就涉及到写缓冲了,以Java为例,调用println打印输出的时候,输出的内容首先写入到一个写缓冲中,然后才会flush刷入输出到控制台上。
还有我们在读取数据的时候,比如我们读取文件,或者接收从客户端发送过来的数据,我们通常也会申请一块缓冲,将读取出来的数据缓存在这块缓冲区中。
这些都是我们说的缓冲,不过我们说的这些都比较简单,基本上仅仅是申请了一块内存缓存区域。
Java中的Buffer跟我们说的缓冲没有本质区别。不过在我们的基础上,有了更明确的定义和实现。
* A container for data of a specific primitive type.
*
* <p> A buffer is a linear, finite sequence of elements of a specific
* primitive type. Aside from its content, the essential properties of a
* buffer are its capacity, limit, and position: </p>
*
* <blockquote>
*
* <p> A buffer's <i>capacity</i> is the number of elements it contains. The
* capacity of a buffer is never negative and never changes. </p>
*
* <p> A buffer's <i>limit</i> is the index of the first element that should
* not be read or written. A buffer's limit is never negative and is never
* greater than its capacity. </p>
*
* <p> A buffer's <i>position</i> is the index of the next element to be
* read or written. A buffer's position is never negative and is never
* greater than its limit. </p>
*
* </blockquote>
Java中的Buffer定义为一个线性的有限序列。
Buffer的几个重要属性
1、capacity
2、position
3、limit
4、mark
其中capacity表示Buffer的容量大小,position表示开始读写的位置,比如读的时候从哪里开始读,写入的时候从哪里开始写入,每次读写后都会更新position的值。limit表示读写的时候不能超过limit的限制,mark记录一个标记位置,如果reset的话,position将被重新设置为mark的值。
这几个属性必须满足以下条件:
mark <= position <= limit <= capacity
Java中的Buffer不支持扩容或者缩容,容量在初始化的时候就是指定了的。
mark
记录当前位置,将mark设置为当前位置。后面调用reset的话,position将被重新设置为mark的值。
flip
将limit设置为当前position的值,position重置为0,mark重置为-1。
rewind
调用rewind将使Buffer倒回到一个"初始"的状态,这个"初始"的状态并不是初始化时候的状态,而是会position重置为0,mark重置为-1。
ByteBuffer
这个是Buffer的一个字节序的抽象实现。
slice
调用slice会创建一个新的ByteBuffer实例,但这个ByteBuffer实例会共享之前这个ByteBuffer的内容。但新的ByteBuffer实例内容从之前这个ByteBuffer的position开始,也就是新的ByteBuffer实例的limit和capacity从前这个ByteBuffer的position开始算起(newLimit = limit - position, newCapacity = capacity - position),且新的ByteBuffer实例的offset被设置为之前的ByteBuffer的position。
public class NioEventLoop_ServerTest { private Selector selector(NioEventLoop eventLoop) throws NoSuchFieldException { Class<?> clazz = eventLoop.getClass(); Field field = null; try { field = clazz.getDeclaredField("selector"); } catch (NoSuchFieldException e) { field = clazz.getDeclaredField("unwrappedSelector"); } field.setAccessible(true); try { return (Selector) field.get(eventLoop); } catch (IllegalAccessException e) { throw new NoSuchFieldException(e.getMessage()); } } private class SocketChannelHandler implements NioTask<SelectableChannel> { private NioEventLoop eventLoop; public SocketChannelHandler(NioEventLoop eventLoop) { this.eventLoop = eventLoop; } @Override public void channelReady(SelectableChannel ch, SelectionKey key) throws Exception { System.out.println("channelReady..."); if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { socketChannel.configureBlocking(false); eventLoop.register(socketChannel, SelectionKey.OP_READ, this); } } catch (ClosedChannelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(256); try { int nbytes = socketChannel.read(buffer); System.out.println(nbytes + ", " + buffer.toString()); buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); System.out.println(new String(bytes)); } catch (IOException e) { e.printStackTrace(); } } } @Override public void channelUnregistered(SelectableChannel ch, Throwable cause) throws Exception { System.out.println("channelUnregistered..."); } } @Test public void test() throws Exception { System.setProperty("io.netty.noKeySetOptimization", "true"); SelectorProvider selectorProvider = SelectorProvider.provider(); SelectStrategyFactory selectStrategyFactory = DefaultSelectStrategyFactory.INSTANCE; RejectedExecutionHandler rejectedExecutionHandler = RejectedExecutionHandlers.reject(); Executor executor = new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass())); final NioEventLoop eventLoop = new NioEventLoop(null, executor, selectorProvider, selectStrategyFactory.newSelectStrategy(), rejectedExecutionHandler); Selector selector = selector(eventLoop); System.out.println(selector.getClass()); ServerSocketChannel channel0 = null; try { channel0 = ServerSocketChannel.open(); System.out.println(channel0.getClass().getName()); } catch (IOException e) { e.printStackTrace(); } try { channel0.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); } try { channel0.socket().bind(new InetSocketAddress(1239)); } catch (IOException e) { e.printStackTrace(); } // try { // SelectionKey key = channel0.register(selector, SelectionKey.OP_ACCEPT); // System.out.println(key.interestOps()); // } catch (ClosedChannelException e) { // e.printStackTrace(); // } eventLoop.register(channel0, SelectionKey.OP_ACCEPT, new SocketChannelHandler(eventLoop)); Runnable worker = new Runnable() { @Override public void run() { System.out.println("running..."); } }; eventLoop.execute(worker); synchronized (this) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
相关推荐
- **EventLoop**:事件循环负责处理I/O事件,每个EventLoop对应一个线程。 - **Pipeline**:处理链,用于注册处理器(Handler),数据在Pipeline中按顺序进行处理。 - **Buffer**:Netty的ByteBuf提供了高效的...
4. **线程模型**:Netty采用EventLoopGroup和ChannelHandler,实现了一种高效的事件驱动模型,简化了多线程编程。 5. **灵活的配置**:Netty支持自定义的协议栈,可以通过组合不同的ChannelHandler来构建复杂的网络...
4. **异步处理模型**:Netty的异步事件驱动模型非常适合处理大量并发连接。"thrift-netty"将Thrift的服务调用封装为Future,使得服务端可以并行处理多个请求,提高了系统的并发能力。 5. **自定义Handler**:Netty...
Netty 是一款用于快速开发高性能的网络应用程序的 Java 框架。它封装了网络编程的复杂性, 使网络编程和 Web 技术的最新...它还定义了一种架构模型以及一套丰富的设计模式。 该资源就是开发Netty程序必须要依赖的jar包
10. **线程模型**: Netty的线程模型是高度优化的,通过EventLoop确保并发性能,避免了过多的线程同步开销。 这个"Netty-all-5.0.0.Alpha2"版本的JAR文件包含了上述所有功能和模块的实现,适用于那些希望在早期项目...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个“netty-all-5.0.0.Alpha3 完整pom.xml配置”是针对Netty 5.0.0 Alpha3版本的集成配置,包含了该版本所需...
1. **异步事件驱动**:Netty采用非阻塞I/O模型,利用Java NIO(非阻塞输入/输出)库,提高了处理大量并发连接的能力。通过Channel和EventLoopGroup等概念,Netty能够高效地处理网络事件,如读写、连接和关闭等。 2....
1. **异步事件驱动**:Netty采用非阻塞I/O模型,基于Java NIO(非阻塞输入/输出)库构建,实现了低延迟、高吞吐量的网络通信。它通过事件循环(EventLoop)和事件处理器(ChannelHandler)处理网络事件,如连接建立...
8. **线程模型**: Netty采用NIO模型,利用了Epoll和KQueue等高效选择器,减少了线程间的上下文切换,提高了并发性能。 9. **ChannelFuture**: 代表Channel的异步操作,可以通过Future监听操作的完成情况,并在完成...
- **定义**:Netty 是一个高性能的异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器与客户端。 - **应用场景**: - **游戏服务器**:如大型多人在线游戏(MMO)。 - **聊天服务器**:实现...
RxNetty是Reactive Extensions (Rx)的一个扩展,它将RxJava的响应式编程模型引入到网络I/O操作中,为开发人员提供了一种处理网络事件流的新方式。这个开源项目,即"rx-netty-0.1.1.zip",提供了对TCP、HTTP、UDP等...
Netty的事件驱动模型依赖于`Selector`,这是Java NIO的一部分。`Selector`能够监控多个`Channel`的事件,当某个事件发生时,`Selector`会唤醒对应的`EventLoop`进行处理。这种模型极大地优化了资源利用率,减少了...
- **EventLoop**:事件循环,负责处理 Channel 上的事件。 - **ChannelFuture**:代表 Channel 的未来状态,可以注册监听器等待状态变化。 **Netty 实例的学习路径:** 1. **基础概念**:首先,你需要理解 Netty 的...
Netty的核心设计原则是基于反应式编程模型,它利用了Java NIO(非阻塞I/O)来实现高并发和低延迟。Netty的API简洁且直观,使得开发者可以轻松地处理各种网络协议,如TCP、UDP、HTTP、FTP等。 1. **ByteBuf**: Netty...
6. **线程模型**:Netty 使用 Boss-Worker 线程模型,Boss 线程负责接受新的连接,Worker 线程负责处理已建立连接上的读写事件。 7. **编码与解码器**:Netty 提供了一系列预定义的编码器和解码器,如 ...
Netty服务器线程模型概览_线程模型
首先,Netty的核心在于其非阻塞I/O模型,基于Java NIO(Non-blocking Input/Output)实现。通过Selector和Channel,Netty能够高效地处理大量并发连接,避免了线程上下文切换带来的性能损耗。在源码解析过程中,我们...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在深入探讨Netty之前,我们先来理解Java在并发编程和网络通信中的基本概念。 Java语言提供了丰富的并发工具...
通过多线程和事件驱动模型,Netty 能够高效地处理并发连接。 在 Netty 中,ChannelHandler 是业务逻辑的容器,它处理由 Channel 产生的事件。ChannelHandlerContext 是 ChannelHandler 与 Channel 交互的桥梁,提供...