- 浏览: 563111 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
jiang2011jiang:
mybatis3源码核心类1--Configuration -
tuyf_hs:
同求 图片
zookeeper+dubbo+dubbo管理集群的简要配置[单机] -
安静听歌:
请问图片还能找的会吗?你的图片和原文的图片都挂了,,,如果有图 ...
zookeeper+dubbo+dubbo管理集群的简要配置[单机] -
ahua186186:
yngwiet 写道楼主,有一个地方不太明白,为什么要用“ge ...
ListView中getChildAt(index)的使用注意事项 -
yngwiet:
楼主,有一个地方不太明白,为什么要用“getChildAt(p ...
ListView中getChildAt(index)的使用注意事项
1.初始化NioServerSocketChannelFactory:
(1)初始化并启动NioServerBoss线程:默认是1个:
第一步:通过构造函数里面的openSelector()方法创建Selector并启动一个boss线程:
第二步:在线程的run()方法中轮询Selector的select()操作,检查是否有准备就绪的通道,
然后执行任务:processTaskQueue()以及 process(selector)(注册NioAcceptedSocketChannel到nioworker线程):
(2)初始化并启动NioWorker线程:大于1个,可配置,一般配置为:Runtime.getRuntime().availableProcessors() + 1
该线程执行的任务为::processTaskQueue()以及 process(selector)(处理read和write事件)
2.初始化DefaultChannelPipeline的3个属性head,tail,name2ctx。
3.然后通过bind(SocketAddress localAddress)方法初始化一个ServerSocketChannel,并注册到boss线程,注意:注册到boss线程是通过Binder hanlder实现的:
关键代码:
Channel channel = getFactory().newChannel(bossPipeline);
evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
bindFuture.setSuccess();
} else {
bindFuture.setFailure(future.getCause());
}
}
});
最后在NioServerSocketPipelineSink中注册到boss线程
(1)初始化并启动NioServerBoss线程:默认是1个:
第一步:通过构造函数里面的openSelector()方法创建Selector并启动一个boss线程:
/** * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for * the {@link AbstractNioChannel}'s when they get registered */ private void openSelector(ThreadNameDeterminer determiner) { try { selector = SelectorUtil.open(); } catch (Throwable t) { throw new ChannelException("Failed to create a selector.", t); } // Start the worker thread with the new Selector. boolean success = false; try { DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner)); success = true; } finally { if (!success) { // Release the Selector if the execution fails. try { selector.close(); } catch (Throwable t) { logger.warn("Failed to close a selector.", t); } selector = null; // The method will return to the caller at this point. } } assert selector != null && selector.isOpen(); }
第二步:在线程的run()方法中轮询Selector的select()操作,检查是否有准备就绪的通道,
然后执行任务:processTaskQueue()以及 process(selector)(注册NioAcceptedSocketChannel到nioworker线程):
public void run() { thread = Thread.currentThread(); startupLatch.countDown(); int selectReturnsImmediately = 0; Selector selector = this.selector; if (selector == null) { return; } // use 80% of the timeout for measure final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100; boolean wakenupFromLoop = false; for (;;) { wakenUp.set(false); try { long beforeSelect = System.nanoTime(); int selected = select(selector); if (selected == 0 && !wakenupFromLoop && !wakenUp.get()) { long timeBlocked = System.nanoTime() - beforeSelect; if (timeBlocked < minSelectTimeout) { boolean notConnected = false; // loop over all keys as the selector may was unblocked because of a closed channel for (SelectionKey key: selector.keys()) { SelectableChannel ch = key.channel(); try { if (ch instanceof DatagramChannel && !ch.isOpen() || ch instanceof SocketChannel && !((SocketChannel) ch).isConnected() && // Only cancel if the connection is not pending // See https://github.com/netty/netty/issues/2931 !((SocketChannel) ch).isConnectionPending()) { notConnected = true; // cancel the key just to be on the safe side key.cancel(); } } catch (CancelledKeyException e) { // ignore } } if (notConnected) { selectReturnsImmediately = 0; } else { if (Thread.interrupted() && !shutdown) { // Thread was interrupted but NioSelector was not shutdown. // As this is most likely a bug in the handler of the user or it's client // library we will log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because the I/O thread " + "has been interrupted. Use shutdown() to shut the NioSelector down."); } selectReturnsImmediately = 0; } else { // Returned before the minSelectTimeout elapsed with nothing selected. // This may be because of a bug in JDK NIO Selector provider, so increment the counter // which we will use later to see if it's really the bug in JDK. selectReturnsImmediately ++; } } } else { selectReturnsImmediately = 0; } } else { selectReturnsImmediately = 0; } if (SelectorUtil.EPOLL_BUG_WORKAROUND) { if (selectReturnsImmediately == 1024) { // The selector returned immediately for 10 times in a row, // so recreate one selector as it seems like we hit the // famous epoll(..) jdk bug. rebuildSelector(); selector = this.selector; selectReturnsImmediately = 0; wakenupFromLoop = false; // try to select again continue; } } else { // reset counter selectReturnsImmediately = 0; } // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { wakenupFromLoop = true; selector.wakeup(); } else { wakenupFromLoop = false; } cancelledKeys = 0; processTaskQueue(); selector = this.selector; // processTaskQueue() can call rebuildSelector() if (shutdown) { this.selector = null; // process one time again processTaskQueue(); for (SelectionKey k: selector.keys()) { close(k); } try { selector.close(); } catch (IOException e) { logger.warn( "Failed to close a selector.", e); } shutdownLatch.countDown(); break; } else { process(selector); } } catch (Throwable t) { logger.warn( "Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }
(2)初始化并启动NioWorker线程:大于1个,可配置,一般配置为:Runtime.getRuntime().availableProcessors() + 1
该线程执行的任务为::processTaskQueue()以及 process(selector)(处理read和write事件)
2.初始化DefaultChannelPipeline的3个属性head,tail,name2ctx。
3.然后通过bind(SocketAddress localAddress)方法初始化一个ServerSocketChannel,并注册到boss线程,注意:注册到boss线程是通过Binder hanlder实现的:
关键代码:
Channel channel = getFactory().newChannel(bossPipeline);
evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
bindFuture.setSuccess();
} else {
bindFuture.setFailure(future.getCause());
}
}
});
最后在NioServerSocketPipelineSink中注册到boss线程
public ChannelFuture bindAsync(final SocketAddress localAddress) { if (localAddress == null) { throw new NullPointerException("localAddress"); } Binder binder = new Binder(localAddress); ChannelHandler parentHandler = getParentHandler(); ChannelPipeline bossPipeline = pipeline(); bossPipeline.addLast("binder", binder); if (parentHandler != null) { bossPipeline.addLast("userHandler", parentHandler); } Channel channel = getFactory().newChannel(bossPipeline); final ChannelFuture bfuture = new DefaultChannelFuture(channel, false); binder.bindFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { bfuture.setSuccess(); } else { // Call close on bind failure bfuture.getChannel().close(); bfuture.setFailure(future.getCause()); } } }); return bfuture; }
private final class Binder extends SimpleChannelUpstreamHandler { private final SocketAddress localAddress; private final Map<String, Object> childOptions = new HashMap<String, Object>(); private final DefaultChannelFuture bindFuture = new DefaultChannelFuture(null, false); Binder(SocketAddress localAddress) { this.localAddress = localAddress; } @Override public void channelOpen( ChannelHandlerContext ctx, ChannelStateEvent evt) { try { evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory()); // Split options into two categories: parent and child. Map<String, Object> allOptions = getOptions(); Map<String, Object> parentOptions = new HashMap<String, Object>(); for (Entry<String, Object> e: allOptions.entrySet()) { if (e.getKey().startsWith("child.")) { childOptions.put( e.getKey().substring(6), e.getValue()); } else if (!"pipelineFactory".equals(e.getKey())) { parentOptions.put(e.getKey(), e.getValue()); } } // Apply parent options. evt.getChannel().getConfig().setOptions(parentOptions); } finally { ctx.sendUpstream(evt); } evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { bindFuture.setSuccess(); } else { bindFuture.setFailure(future.getCause()); } } }); } @Override public void childChannelOpen( ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception { // Apply child options. try { e.getChildChannel().getConfig().setOptions(childOptions); } catch (Throwable t) { fireExceptionCaught(e.getChildChannel(), t); } ctx.sendUpstream(e); } @Override public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { bindFuture.setFailure(e.getCause()); ctx.sendUpstream(e); } }
NioServerSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, Boss boss, WorkerPool<NioWorker> workerPool) { super(factory, pipeline, sink); this.boss = boss; this.workerPool = workerPool; try { socket = ServerSocketChannel.open(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } try { socket.configureBlocking(false); } catch (IOException e) { try { socket.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } config = new DefaultServerSocketChannelConfig(socket.socket()); fireChannelOpen(this); }
发表评论
-
netty 3.10.4.Final 源码学习总结
2015-08-20 18:08 751提示:个人理解并且只针对NIO模式 总的来说,包括1个“核 ... -
summercool-ddl &Mybatis3.06 总结1-总体框架理解
2015-07-15 17:26 9871.summercool-ddl核心类:SqlSessionD ... -
summercool-hsf &Netty3.X总结6-总体框架理解
2015-07-15 16:56 10671. summercool-hsf 的核心类包括:Abstra ... -
summercool-hsf &Netty3.X总结5--客户端异步调用service API
2015-07-08 14:30 6481.异步调用: 核心原理:利用JDK的动态代理类创建ser ... -
summercool-hsf &Netty3.X总结4--客户端同步调用service API
2015-07-08 14:21 16821.同步调用: 核心原理:利用JDK的动态代理类创建serv ... -
summercool-hsf &Netty3.X总结2--客户端建立连接环节
2015-06-24 17:35 984客户端建立连接环节:(线程就像车的引擎,是RPC框架设计的关键 ... -
summercool-hsf &Netty3.X 总结1--使用JMX来查看服务端的配置和监控数据
2015-06-02 11:25 1155问题1:JMX是什么? 总结:JMX是JAVA管理程序的框架。 ...
相关推荐
summercool-hsf Automatically exported from code.google.com/p/summercool-hsf 1.目前为止性能最高的RPC远程通讯框架 2.也可以做为手机长连接的Server,经测试已经达到了50W以上的性能长连接 (需调整linux内核...
笔者工作的这几年之中,总结并开发了如下几个框架: summercool(Web 框架,已经应用于某国内大型网络公司的等重要应用)、summercool-hsf(基于Netty实现的RPC框架,已经应用国内某移动互联网公司)、 summercool-...
summercool-ddlAutomatically exported from code.google.com/p/summercool-ddl1.依赖Xml代码 收藏代码org.summercoolsummercool-ddl1.0源码svn地址:2.准备Sql映射文件Xml代码 收藏...3.Spring配置Xml代码 收藏代码
summercool-ddl Automatically exported from code.google.com/p/summercool-ddl 学习了解使用!
笔者工作的这几年之中,总结并开发了如下几个框架: summercool( Web框架,已经应用于某国内大型网络公司的等重要应用)、summercool-hsf(基于Netty实现的RPC框架,已经应用国内某移动互联网公司)、summercool-...
在IT行业中,数据库管理和优化是至关重要的环节,尤其是在大数据量的场景下。本文将深入探讨"Ibatis之分表分库解决方案",这是一个针对高并发、大数据处理的有效策略。Ibatis,作为一个轻量级的Java持久层框架,允许...