以netty提供的echo server作为分析入口,echoServer代码:
public void run() {
// 构造NioServerSocketChannelFactory,初始化bootstrap
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// 创建一个自定义的PipelineFactory
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new EchoServerHandler());
}
});
// 开始bind端口,启动server
bootstrap.bind(new InetSocketAddress(port));
}
Server启动过程:
一.ServerBootstrap配置,NioServerSocketChannelFactory初始化
1. 构造NioWorkerPool,启动worker线程,默认个数CPU*2
public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
}
2.创建worker数组
workers = new AbstractNioWorker[workerCount];
for (int i = 0; i < workers.length; i++) {
workers[i] = createWorker(workerExecutor);
}
this.workerExecutor = workerExecutor;
protected NioWorker createWorker(Executor executor) {
return new NioWorker(executor);
}
3.打开选择器
AbstractNioWorker(Executor executor) {
this.executor = executor;
openSelector();
}
4. 启动worker线程
selector = Selector.open();
....
DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id));
...
5.初始化NioServerSocketPipelineSink,将workerPool注册到PipelineSink中
sink = new NioServerSocketPipelineSink(workerPool);
二. bind过程
1.构造叫binder的upstreamHandler,用来捕获bind事件
ChannelHandler binder = new Binder(localAddress, futureQueue);
2.构造一个默认的ChannelPipeline作为主的ChannelPipeline
ChannelPipeline bossPipeline = pipeline();
3.将binder注册上pipeline
bossPipeline.addLast("binder", binder);
4.使用之前构造的NioServerSocketChannelFactory创建主Channel
Channel channel = getFactory().newChannel(bossPipeline);
a.在父类AbstractChannel中关联pipeline和channel
pipeline.attach(this, sink);
b.打开一个ServerSocketChannel
socket = ServerSocketChannel.open();
c.使用非阻塞模式
socket.configureBlocking(false);
d.触发channelOpen事件
fireChannelOpen(this);
e.在父channle中的pipeline链中执行,这里只有一个handler就是之前的binder
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));
f.binder处理channelOpen事件
1.将之前在ServerBootstrap中注册的ChannelFactory和主Channel关联
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
2.按parent和child对设置的参数进行归类,将parent的参数和主channel关联
evt.getChannel().getConfig().setOptions(parentOptions);
3.对之前打开的ServerSocketChannel执行bind
evt.getChannel().bind(localAddress)
4.在工具类Channels中执行bind,其实是发送一个bind的downStream事件
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
channel, future, ChannelState.BOUND, localAddress));
5.由于主channel的pipeline只有一个binder的upstreamHandler,所以这个bind事件直接就到Pipeline最下面的Sink中处理了,这里是NioServerSocketPipelineSink
6.NioServerSocketPipelineSink处理bind事件
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}
7.这里起的是服务端,所以channel是一个NioServerSocketChannel
8.处理BOUND事件
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
close(channel, future);
}
break;
9.拿到channel的socket执行bind
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
10.bind成功后,触发BOUND事件
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.BOUND, localAddress));
11.binder处理BOUND这个upstream事件,这里bind没有处理,因为没有扩展接口
12.启动BOSS线程,用来接受新的请求
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(bossExecutor,
new ThreadRenamingRunnable(new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ')'));
13.Boss构造中,创建一个selector,并将之前已经bind的channel注册上去,注册的key是accept
selector = Selector.open();
...
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
g.由于主channel的pipeline只有一个handler,所以事件处理在binder这里结束了
h.向阻塞队列发成功信号
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
5.主线程从阻塞队列中获取信号,如果bind不成功,则抛出异常
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
6.bind结束,server启动成功,此时Boss和worker线程都已成功启动,可以对外服务了。
分享到:
相关推荐
《深入浅出Netty》是针对Java网络编程框架Netty的一份详细讲解材料,由淘宝工程师精心制作,特别适合初学者。Netty是一款高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...
资源名称:深入浅出Netty资源截图: 资源太大,传百度网盘了,链接在附件中,有需要的同学自取。
《深入浅出Netty》是一本专注于讲解Netty框架的编程指南,非常适合初学者入门。Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这本书通过详实的代码案例,...
《深入浅出Netty_netty5.0_》是针对Netty 5.0版本的一本详细教程,旨在帮助读者理解并熟练运用这一强大的网络编程框架。Netty是一个开源的Java框架,它为开发高效、稳定、可扩展的网络应用提供了全面的支持。在本文...
最近几年,Netty社区的发展如火如荼,无论是大数据领域,还是微服务架构,底层都需要一个高效的分布式通信框架作为基础组件。 Netty凭借优异的性能、灵活的可扩展新得到了广泛的应用。短短几年间,Netty已经成为...
这个“深入浅出Netty文档”压缩包包含了一份详细的PDF文档,旨在帮助开发者深入理解Netty的工作原理及其在实际应用中的使用。 Netty 的核心是它的事件驱动模型,基于Java NIO(非阻塞I/O)实现。它提供了一种优雅的...
《深入浅出Netty》是针对Java网络编程框架Netty的一份详细讲解文档,由阿里巴巴的专家编写。Netty是一个高性能、异步事件驱动的网络应用程序框架,它为开发可维护的高性能协议服务器和客户端提供了丰富的组件和API。...
Netty是一款高性能、异步事件驱动的网络应用框架,它被设计用于简化网络编程,特别是对于TCP和UDP协议的服务端和客户端的开发。Netty通过利用Java的NIO类库,提供了高效的网络处理能力,并且具有很高的可定制性和...
李林锋,2007 年毕业...软件的设计和开发工作,有7 年NIO 设计和开发经验,精通Netty、Mina 等 NIO 框架和平台中间件,现任华为软件平台开放实验室架构师,《Netty 权威 指南》、《分布式服务框架原理与实践》作者。
在本文中,我们将深入探讨如何使用 Netty 实现 WebSocket 服务器,以及与 Java 客户端进行交互的过程。 WebSocket 协议是一种在浏览器和服务器之间建立长连接的协议,它允许双向通信,即服务器可以主动向客户端推送...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,专为 Java 平台...《深入浅出Netty.pdf》这本书很可能会详细讲解这些知识点,包括理论基础、实践示例以及高级特性的使用,对于理解和掌握 Netty 有着极大的帮助。
总的来说,这个项目提供了从零开始构建一个RTMP服务器的参考,可以帮助开发者深入理解RTMP协议以及如何利用Netty构建高性能的网络服务。通过学习和研究这个项目,开发者可以掌握网络编程、实时流媒体传输以及服务器...