`
flychao88
  • 浏览: 753340 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【转】使用Netty实现多路复用的client

 
阅读更多

       Netty只提供的异步传输数据的方式,但是并没有实现多路复用的client。

一个分布式的客户端代码基本是这个样子的:

1
2
3
4
public Response sent(final Request request) {
    channel.writeAndFlush(request);    
    return clientChannelInitializer.getResponse(request.getMessageId());  
}

 

首先通过channel发送一个请求到server,然后等待server返回的结果。

但是Netty是异步的,writeAndFlush这个方法,只是告诉server,我要发数据了,然后就马上返回了。所以这时直接调用getResponse,会得不到值。因为netty是在回调里面写返回值的。

解决的办法是,使用BlockingQueue接收返回的数据。一旦BlockingQueue有数据了,就take出来。如果没数据,就一直等待。

 

在单线程里,这个办法没问题。这就要确保Netty的client只被一个线程访问。如果是多线程同时访问,因为异步的原因,有可能第二个线程的返回值被第一个线程拿到。举个例子:

 

  1. 1、线程A使用client发送请求

  2. 线程A从BlockingQueue中取数据。这时Queue为空,线程A等待。

  3. server接受并开启一个线程处理请求A

  4. 2、线程B使用client发送请求

  5. 线程B从BlockingQueue中取数据。这时Queue为空,线程B也等待。

  6. 3、server接受并开启另一个线程处理请求B

  7. 线程B的请求处理速度较快,先返回

  8. 4、client将返回值B写入BlockingQueue

  9. BlockQueue有数据了,线程A take到数据,但是是线程B的结果

这就导致request和response不匹配。

 

要解决这个问题有3个办法。

  1. 1、使用短连接,每次请求new一个client,接收的response后close掉这个client。这个的缺点很明显,这就相当于http服务器了,不能发挥内网长连接的优势。

  2. 2、使用连接池,每次从连接池里拿一个client,接收完response后将这个client返还连接池。这个就有点像数据库连接池。这个方法没什么缺点,但是需要自己实现连接池。

  3. 3、使用单一client,但是在request中生成一个唯一的messageId,可以是nanoTime。然后server处理完后,在response中也返回这个messageId。这样在client不是维护一个BlockingQueue,而是维护一个ConcurrentHashMap,key是messageId,value是一个空的BlockingQueue。当client发送resquest到server时,在Map里写入messageId,并实例化一个BlockingQueue(为了优化,size可以是1)。然后等待这个BlockingQueue有值。在接收到response的回调方法里,根据messageId取出blockingQueue的值,然后删除掉这个Key。

  4.  

    我自己实现了第三种方式,具体代码请参见:

    https://github.com/terrymanu/miracle-framework/tree/master/miracle-framework-remote/miracle-framework-remote-netty 

 

分享到:
评论

相关推荐

    基于Netty实现了dubbo rpc

    Netty基于IO多路复用模型,如Java的NIO(Non-blocking IO)或epoll,通过selector监听多个通道,实现高效的并发处理。其核心组件包括Bootstrap(引导类)、Channel(通道)、Pipeline(处理链)等,这些组件协同工作...

    netty开发工具包

    6. **线程模型**:Netty 使用多路复用器(Selector)和事件循环组(EventLoopGroup)进行线程管理,使得系统能处理大量并发连接。 7. **协议支持**:Netty 内置了对多种常见网络协议的支持,如HTTP、WebSocket、FTP...

    Netty4.0学习笔记系列之一:Server与Client的通讯

    Netty基于Java NIO库构建,它提供了更高级别的API,简化了多路复用、异步I/O操作,使得开发者可以更容易地处理高并发场景。 在Netty中,Server的启动通常涉及创建一个`ServerBootstrap`实例。`ServerBootstrap`是一...

    netty框架,服务端、客户端代码示例

    线程模型采用“事件循环(EventLoop)”和“多路复用器(Selector)”,实现了高效的I/O处理。通道则是连接到特定I/O资源的对象,如套接字或者管道。Netty通过自定义的ByteBuf类来处理数据缓冲,提供了一种高效且...

    Netty高性能网络应用框架.rar

    客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求就进行处理。 AIO : Asynchronous IO,即异步非阻塞,采用了 Proactor 模式,特点是先由操作系统完成后才通知服务端程序启动线程去处理...

    NettyChat-master.zip

    - **多路复用**:Netty的NIO或Epoll模式可以支持多路复用,允许单个线程处理多个连接,提高服务器性能。 - **线程模型**:Netty提供了不同的线程模型,如SingleThreadEventLoop、MultithreadEventLoopGroup等,选择...

    async-http-client-2.0.13.zip

    库支持HTTP/1.1和HTTP/2协议,这意味着开发者可以利用HTTP/2的多路复用、头部压缩等特性,进一步提升网络请求的效率和响应速度。 3. **丰富的API接口**: 提供了全面且直观的API,使得开发者可以方便地创建GET、...

    一款Android平台UDP双向通信源码

    在Android客户端中,Selector帮助优化了多路复用,减少了资源消耗。 5. **Event Loop**:Netty4中的事件循环,负责处理I/O事件并调度任务。在Android应用中,这有助于实现非阻塞I/O,提高响应速度。 6. **Protocol...

    异步 redis client.rar

    Redis是单线程的吗? 其实这么说不完全正确,我们知道Redis是一个Key-Value的非关系型数据库,我们所理解的Redis单...这里就要扯到NIO多路复用模型了,由于本篇主要是Redis的学习记录,这里等Netty的时候再详细学习。

    InChat-master.zip

    Netty的核心是其高效的设计,包括零拷贝、多路复用I/O和高度可定制的缓冲区,这些特性使其在处理高并发场景时表现出色。 WebSocket是一种在单个TCP连接上进行全双工通信的协议,它提供了浏览器和服务器之间持久化的...

    0.2-chat-epoll.zip

    epoll是Linux内核提供的一种高效I/O事件通知机制,尤其适合多路复用的高并发场景,如聊天应用。这个项目不仅实现了基本的聊天功能,还支持文件的收发,这意味着它涉及到网络编程中的文件传输部分。 描述中提到,这...

    即时通信系统(Java实现)

    - **Java NIO(Non-blocking I/O)**:提高服务器处理并发连接的能力,通过选择器(Selector)实现多路复用。 - **Netty框架**:高性能、异步的网络应用框架,简化了TCP/IP通信的复杂性。 4. **关键组件** - **...

    data-conversion-client-demo:data-conversion 的客户端 例子

    NIO提供了与BIO不同的I/O工作方式,它支持多路复用,允许多个连接在同一线程中并发处理,从而提高了系统的效率和可扩展性。在"data-conversion-client-demo"中,使用NIO可能是因为项目对性能有一定要求,同时避免了...

    seata源码研究.docx

    - **NIO的核心组件**:Channel(通道)、Buffer(缓冲区)、Selector(多路复用器)。 - **NIO的通信步骤**: 1. 创建ServerSocketChannel并配置为非阻塞模式。 2. 绑定监听端口,配置TCP参数。 3. 创建IO线程...

    NIO trick and trap .pdf

    - **概念**:`java.nio.channels.Selector`接口实现了I/O多路复用,允许一个线程监控多个通道的事件。 - **关键组件**: - **SelectableChannel**:可注册的通道,如`ServerSocketChannel`。 - **SelectionKey**:...

Global site tag (gtag.js) - Google Analytics