今天看了下netty代码,对代码做了个流程分析,netty的代码写的真是漂亮。
netty服务端启动代码如下
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
ChannelPipeline pipleline = pipeline();
//默认最大传输帧大小为16M
pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));
pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));
pipleline.addLast("handler", handler);
return pipleline;
}
});
//设置缓冲区为64M
bootstrap.setOption("receiveBufferSize", 1048576 * 64);
bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法
//tcp定期发送心跳包 比如IM里边定期探测对方是否下线
//只有tcp长连接下才有意义
// bootstrap.setOption("child.keepAlive", true);
bootstrap.bind(new InetSocketAddress(port));
服务端事件处理顺序如下:
UpStream.ChannelState.OPEN—–>DownStream.ChannelState.BOUND(需要绑定)
——–>UpStream.ChannelState.BOUND(已经绑定)——>DownStream.CONNECTED(需要连接)——->UpStream.CONNECTED(连接成功)
在bind的时候做了如下处理
public Channel bind(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
final BlockingQueue<ChannelFuture> futureQueue =
new LinkedBlockingQueue<ChannelFuture>();
ChannelHandler binder = new Binder(localAddress, futureQueue);
ChannelHandler parentHandler = getParentHandler();
这里创建了一个Binder,它继承了SimpleChannelUpstreamHandler。先说说UpStreamHandler和DownStreamHandler,一般来说,UpStream类型的事件主要是由网络底层反馈给Netty的,比如messageReceived,channelConnected等事件,而DownStream类型的事件是由框架自己发起的,比如bind,write,connect,close等事件。
接着
ChannelPipeline bossPipeline = pipeline();
bossPipeline.addLast("binder", binder);
if (parentHandler != null) {
bossPipeline.addLast("userHandler", parentHandler);
}
Channel channel = getFactory().newChannel(bossPipeline);
这里创建出一个channel,每一个channel都是由一个tcp四元组组成。channel由ChannelFactory创建而成。在创建完NioServerSocketChannel后,会调用
fireChannelOpen(this);这是发出一个ChannelState.OPEN事件,前面注册的BinderHandler会处理这个事件。我们来看看Binder的代码
@Override
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
try {
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
// Split options into two categories: parent and child.
Map<String, Object> allOptions = getOptions();
Map<String, Object> parentOptions = new HashMap<String, Object>();
for (Entry<String, Object> e: allOptions.entrySet()) {
if (e.getKey().startsWith("child.")) {
childOptions.put(
e.getKey().substring(6),
e.getValue());
} else if (!e.getKey().equals("pipelineFactory")) {
parentOptions.put(e.getKey(), e.getValue());
}
}
// Apply parent options.
evt.getChannel().getConfig().setOptions(parentOptions);
} finally {
ctx.sendUpstream(evt);
}
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); //这里发出bind事件,return Channels.bind(this, localAddress)
assert finished;
}
bind就触发了一个DownStream的ChannelState.BOUND事件。表明需要将该Channel绑定至指定的地址。
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
sendDownstream(tail, e);
}
接着就要看NioServerSocketPipelineSink了,这个主要关注于具体传输数据的处理,同时也包括其他方面的内容,比如异常处理等等。执行eventSunk方法。
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}
nio方式ChannelSink一般会有1个boss实例(implements Runnable),以及若干个worker实例(不设置默认为cpu cores*2),它将channel分为 ServerSocketChannel和SocketChannel分开处理。这主要原因是boss线程accept()一个新的连接生成一个 SocketChannel交给worker进行数据接收。
看下ServerSocketChannel的处理
private void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
close(channel, future);
}
break;
}
}
主要是处理bind事件,
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean bossStarted = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
//取出一个boss线程,然后交给Boss类去处理。
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(
bossExecutor,
new ThreadRenamingRunnable(
new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ')'));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
}
}
}
看下Boss类,它实现了Runnable接口
private final Selector selector;
private final NioServerSocketChannel channel;
Boss(NioServerSocketChannel channel) throws IOException {
this.channel = channel;
selector = Selector.open();
boolean registered = false;
try {
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
registered = true;
} finally {
if (!registered) {
closeSelector();
}
}
channel.selector = selector;
代码是不是有点熟悉,没错,是nio里的代码,需要注意的是,ServerSocketChannel只注册OP_ACCEPT事件。
再看下Boss类的run方法
public void run() {
final Thread currentThread = Thread.currentThread();
channel.shutdownLock.lock();
try {
for (;;) {
try {
if (selector.select(1000) > 0) {
selector.selectedKeys().clear();
}
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
registerAcceptedChannel(acceptedSocket, currentThread);
}
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
break;
} catch (Throwable e) {
logger.warn(
"Failed to accept a connection.", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
} finally {
channel.shutdownLock.unlock();
closeSelector();
}
}
这里会调用registerAcceptedChannel(acceptedSocket, currentThread);方法
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
try {
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker(); //获取一个NioWorker
//将Channel注册到NioWorker上去
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
logger.warn(
"Failed to initialize an accepted socket.", e);
try {
acceptedSocket.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
}
当有新的连接建立,会交给NioWorker的线程池去处理,boss只负责accept到新的连接,新的SocketChannel会被注册到一个work线程中去。
分享到:
相关推荐
读书笔记:《Netty实战》源代码——Scala版
通过netty编写文件传输的客户端与服务端,以及协议说明, 通用的netty传输协议 通过该协议进行文件传输 文件传输客户端与服务端 可以根据文件的最后更新时间来增量传输文件 源码开放,通过eclipse或者idea导入代码...
在"netty框架最简单的客户端服务端代码"案例中,我们将探讨如何使用Netty实现一个基本的TCP通信。 首先,让我们来看服务端(SocketServerTest)的实现。在Netty中,服务端启动通常涉及到以下几个关键组件: 1. **...
在本篇关于“Netty框架学习——第一个Netty应用”的文章中,我们将深入理解如何使用Netty构建一个简单的Echo服务器和客户端。Netty是一个高性能、异步事件驱动的网络应用程序框架,广泛应用于Java领域的服务器开发。...
netty-mqtt是一个基于Java开发的MQTT 3.1.1协议服务端与客户端,包含113个文件,其中包括87个Java源文件、8个XML文件、7个Iml文件、3个YAML文件、3个JKS文件、2个Factories文件、1个LICENSE文件和1个Markdown文件。...
Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码;Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码;Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码
例如,我们可以通过以下代码创建一个简单的Netty服务器: ```java EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ...
为了实现通信,服务端和客户端都需要定义自己的`model对象`,这些对象通常包含了数据传输的结构和协议解析的相关逻辑。在本示例中,"model对象目录必须一致"意味着服务端和客户端需要使用相同的类来序列化和反序列化...
netty案例,netty4.1中级拓展篇九《Netty集群部署实现跨服务端通信的落地方案》源码 ...
1. 数据编码与解码:Netty提供了多种编解码器,如LineBasedFrameDecoder和LengthFieldBasedFrameDecoder,可以根据协议需求选择合适的解码器。 2. 异常处理:确保有合适的ExceptionHandler来捕获并处理可能出现的...
标题中的“Netty实现Java服务端和C#客户端联通”是指使用Netty作为Java服务器框架,与C#客户端(使用DotNetty库)进行通信的一种技术实现。这涉及到跨平台的网络通信,以及两个不同编程语言间的交互。 Netty是Java...
基于 Netty 开发的 Java 游戏服务端框架,目前提供 CocosCreator 和 Unity 的客户端SDK.zip 基于 Netty 开发的 Java 游戏服务端框架,目前提供 CocosCreator 和 Unity 的客户端SDK.zip 基于 Netty 开发的 Java 游戏...
在本文中,我们将深入探讨如何使用Netty进行RTSP(Real Time Streaming Protocol)服务端的开发,以及如何处理H264、H265和AAC格式的流媒体文件。 1. RTSP简介: RTSP是一种应用层协议,主要用于控制实时流传输,如...
本实践将详细介绍如何在Android环境中使用Netty进行客户端和服务端的通信。 首先,我们需要理解Netty的基本概念。Netty的核心是其EventLoopGroup(事件循环组),它负责处理I/O事件,并将它们分发到相应的...
springboot整合netty,分客户端和服务端两个项目,springboot整合netty,分客户端和服务端两个项目,springboot整合netty,分客户端和服务端两个项目,springboot整合netty,分客户端和服务端两个项目
客户端发送16进制给服务端,并行实现socket通道活动状态和断开重新连接的功能, 监听接口是否存在数据,如果存在socket客户端发送给socket服务端的实现 随着物联网的发展,随之出现了各种传感器监测数据的实时发送,...
在服务端,通常需要添加解码器(如 ByteToMessageDecoder)来解析接收到的字节流,以及处理器(如 SimpleChannelInboundHandler)来处理解析后的消息。 3. 绑定端口:通过调用 bind() 方法将 ServerBootstrap 对象与...
这个“netty3.2源代码”包含了 Netty 框架的3.2版本的源码,让我们有机会深入理解其内部机制。 Netty 的核心特性在于它的异步事件驱动模型。这种模型允许程序处理多个连接同时进行,而不是像传统的同步模型那样,每...
然后你就可以仔细JavaFX代码和Netty的代码,很简单的呢。声明一下,本人使用Mina开发游戏服务器,没有打算使用Netty开发应用。制作这个示例只是为给别人帮个忙而已,然后就可以带你入门,最后你自己飞^_^
Netty在IDEA中搭建HelloWorld服务端并对Netty执行流程与重要组件进行介绍示例代码;Netty在IDEA中搭建HelloWorld服务端并对Netty执行流程与重要组件进行介绍示例代码