前一篇文章分析了netty的服务端流程,接下来分析一下客户端的大致流程,客户端启动代码如下
ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipleline = pipeline();
pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));
pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));
pipleline.addLast("handler", handler);
return pipleline;
}
});
bootstrap.setOption("receiveBufferSize", 1048576 * 64);
bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法
//tcp定期发送心跳包 比如IM里边定期探测对方是否下线
//只有tcp长连接下才有意义
// bootstrap.setOption("child.keepAlive", true);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));
Channel channel = future.awaitUninterruptibly().getChannel();
客户端事件处理顺序如下:
UpStream.ChannelState.OPEN(已经open)–>DownStream.ChannelState.BOUND(需要绑定)——>DownStream.CONNECTED(需要连接)—–>UpStream.ChannelState.BOUND(已经绑定)——->UpStream.CONNECTED(连接成功)
在connect的时候做了如下处理
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
ChannelPipeline pipeline;
try {
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
// Set the options.先创建Channel
Channel ch = getFactory().newChannel(pipeline);
ch.getConfig().setOptions(getOptions());
// Bind.
if (localAddress != null) {
ch.bind(localAddress);
}
// Connect. 再进行连接
return ch.connect(remoteAddress);
}
首先要创建出Channel
NioClientSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline,
ChannelSink sink, NioWorker worker) {
super(null, factory, pipeline, sink, newSocket(), worker);
fireChannelOpen(this);
}
紧接着会fire一个ChannelOpen事件,
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
}
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));
这样会出发Upstream的ChannelState.OPEN事件。
接下来要继续connect了
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
ChannelFuture future = future(channel, true);
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
channel, future, ChannelState.CONNECTED, remoteAddress));
return future;
这样就会出发Downstream的ChannelState.CONNECTED事件。
接下来就要由NioClientSocketPipelineSink来进行处理了
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
channel.worker.close(channel, future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (SocketAddress) value);
} else {
channel.worker.close(channel, future);
}
break;
case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
下面看下channel注册到worker的代码,连接的时候是在内部的一个Boss类里处理的
所有的连接connect操作都被封装成一个RegisterTask对象,Boss类持有registerTask队列,在loop中不断的去进行select
private static final class RegisterTask implements Runnable {
private final Boss boss;
private final NioClientSocketChannel channel;
RegisterTask(Boss boss, NioClientSocketChannel channel) {
this.boss = boss;
this.channel = channel;
}
public void run() {
try {
channel.socket.register(
boss.selector, SelectionKey.OP_CONNECT, channel);
} catch (ClosedChannelException e) {
channel.worker.close(channel, succeededFuture(channel));
}
int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
if (connectTimeout > 0) {
channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
}
}
}
register方法
void register(NioClientSocketChannel channel) {
Runnable registerTask = new RegisterTask(this, channel);
Selector selector;
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
this.selector = selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(
bossExecutor,
new ThreadRenamingRunnable(
this, "New I/O client boss #" + id + '-' + subId));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
RegisterTask,放到Boss类持有的registerTaskQueue之后,Boss类会从boss executer线程池中取出一个线程不断地处理队列、选择准备就绪的键等。
然后run方法处理感兴趣的事件
public void run() {
boolean shutdown = false;
Selector selector = this.selector;
long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
for (;;) {
wakenUp.set(false);
try {
int selectedKeyCount = selector.select(500);
.......
processRegisterTaskQueue();
if (selectedKeyCount > 0) {
processSelectedKeys(selector.selectedKeys());
}
在loop中,processRegisterTaskQueue会处理需要注册的任务,processSelectedKeys处理连接事件
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
if (!k.isValid()) {
close(k);
continue;
}
if (k.isConnectable()) {
connect(k);
}
}
}
将连接上的Channel注册到worker中,交给worker去注册read和write
private void connect(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try {
if (ch.socket.finishConnect()) {
k.cancel();
ch.worker.register(ch, ch.connectFuture);
}
} catch (Throwable t) {
ch.connectFuture.setFailure(t);
fireExceptionCaught(ch, t);
k.cancel(); // Some JDK implementations run into an infinite loop without this.
ch.worker.close(ch, succeededFuture(ch));
}
}
在这一系列初始化都完成之后,channel就可以拿来write和接收read数据了。
分享到:
相关推荐
在"mingtian-master"这个压缩包中,我们可以找到项目的源代码和相关资源。源码可能包含了以下关键组件: 1. Server模块:实现了Netty服务器端,包括WebSocket服务器处理类,用于处理连接、消息收发和游戏逻辑。 2. ...
在标签中,"开源项目"表明这两个库都是开放源代码的,允许开发者查看、使用、修改并分发代码,促进了技术社区的合作和创新。 压缩包内的文件名"json4s-path-master"通常表示这是json4s-path项目的主分支,可能是...
开发者可以查看源代码,理解其工作原理,并根据需要进行定制和优化。社区的支持也使得项目持续更新,不断修复已知问题,添加新功能,保持与最新技术趋势同步。 总结起来,PlantUML-7977.zip中的http-client-master...
解压后的文件“手机通过wifi控制电脑程序源码.jpg”可能是程序运行示意图,而“trunk”可能是一个项目目录,包含了源代码、资源文件和构建脚本等。 总之,这个源码实例为开发者提供了一个研究和学习的平台,帮助...
在项目结构中,"java-redis-proxy-master"很可能是项目的主分支或者源码包,包含项目的源代码、配置文件、测试用例等相关资源。开发者通常可以通过克隆或下载这个压缩包,然后在本地环境中进行编译和运行,以了解和...
2. **定义DatagramPacketHandler**:Netty中的DatagramChannel接收的是DatagramPacket对象,因此我们需要自定义一个处理类,重写其channelRead方法,处理接收到的DatagramPacket,例如解码视频数据、解析控制命令等...
在压缩包文件名称"jmqtt-master"中,"master"通常指的是项目的主分支或主版本,这表明你可能获得了该项目的最新稳定源代码。如果你打算深入研究或贡献代码,你可以解压这个文件,使用Java IDE导入项目,然后根据项目...
SMPPTester.jar文件是一个Java可执行的JAR(Java Archive)文件,它封装了整个SMPP测试工具的源代码和运行时环境。这种打包方式使得用户能够在任何安装了Java运行环境的平台上运行此测试工具,实现了跨平台的兼容性...
为了深入了解Petri的实现,我们可以分析Petri-master压缩包中的源代码。这个压缩包包含了项目的主分支代码,包括核心服务器组件、模块接口、游戏逻辑实现等。通过阅读代码,开发者可以学习到如何利用Netty处理网络...
《大天使之剑Java源码—— Animated-Spork:天使游戏网络服务器解析》 在IT行业中,开源系统一直扮演着至关重要的角色,它为开发者提供了深入理解系统运作、学习新技术和创新的机会。"大天使之剑Java源码-animated-...
"FullVideo.rar"这个压缩包文件,很可能是包含了完整的JAVA GB28181客户端源代码或者示例项目,对于开发者来说是一份宝贵的资源。通过解压并研究这些文件,我们可以学习如何在JAVA环境下集成GB28181和SIP,实现视频...
1. **Netty框架**:Netty是一个异步、非阻塞的IO框架,适用于构建高性能的网络服务器和客户端。它的特性包括高度可定制的事件驱动架构、丰富的协议支持以及优秀的性能。 2. **网络爬虫基础**:网络爬虫是一种自动化...
由于只给出了项目文件夹名“couchbase-messages-p2p-sample-master”,我们可以推测这是一个Git仓库的主分支,通常包含源代码、配置文件、文档等资源。开发者可以下载并研究这些文件来理解项目的架构和工作原理。 *...
- **YARN调度框架事件分发机制**:解析YARN调度器的工作原理及事件处理流程。 - **Hadoop底层IPC原理和RPC**:探讨Hadoop内部通信机制,包括RPC(远程过程调用)的实现细节。 - **Hadoop底层googleProtoBuf的协议...
源代码目录可能分为几个主要部分,如主程序、指令解析模块、业务逻辑处理模块以及与用户交互的界面部分。这些模块共同协作,使得AlexanderBot能够正确地接收、解析并执行用户的指令。 在开发过程中,开发者可能会...
此外,ly-live-server的源代码对于学习和理解RTMP协议的实现细节非常有价值。开发者可以通过阅读源码来学习如何在Java中实现网络服务器,以及如何处理实时流媒体数据。这有助于提升开发者在网络编程、服务器端架构...
"Internet-Tech--master"这个文件名可能指的是项目源码的主分支或者根目录,里面可能包含了项目的源代码、配置文件、文档等资源。在实际学习或开发过程中,阅读和理解这些源码将有助于深入掌握上述知识点,并提升在...
Alchemy-GameServer的源代码可能包含了以下部分: - **主服务器程序**:启动服务器,设置监听端口,初始化必要的组件和服务。 - **网络模块**:处理客户端连接,解析和封装网络消息。 - **游戏逻辑类**:实现纸牌...