`
irfen
  • 浏览: 204869 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

使用Netty传输大文件内容

阅读更多

最近又开始需要使用netty进行网络通信方面的编程开发了。于是遇到了一些问题通过查找好多资料记录下来。

 

做的内容大致是:客户端向服务端发送一条命令,服务端接收到之后,根据命令里面的一些信息去读取服务器上的一些文件并把文件内容(文件的内容类似于数据库中的一行一行的数据,是以行存储的,每个字段值以\t分割,每条数据为一行)发送给客户端处理(我这里的样例暂以获取数据之后按行保存入文件中)。

 

1、客户端服务端的代码

cmdLog = getSearchCmd();
        ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                Executors.newCachedThreadPool());
        ClientBootstrap bootstrap = new ClientBootstrap(factory);
        final ClientBufferHandler clientHandler =  new ClientBufferHandler(cmdLog, getEncoding());
        final DelimiterBasedFrameDecoder clientDecoder = new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, false, true, ChannelBuffers.copiedBuffer("\r\n", Charset.defaultCharset()));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(clientDecoder, clientHandler);
            }
        });
        
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("keepAlive", true);
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(serverHost, serverPort));
        future.awaitUninterruptibly();
        if (!future.isSuccess())
        {
            future.getCause().printStackTrace();
        }
        future.getChannel().getCloseFuture().awaitUninterruptibly();
        factory.releaseExternalResources();

 cmdLog是客户端要发送的命令,getEncoding()是因为每个服务端要读取的文件可能是不同的编码,客户端这边传过去之后通过这个来编码。

有两个handler,下面会介绍,其他的都是很常规的这里就不多说了。

 

2、服务端代码

ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool()
                ,Executors.newCachedThreadPool());
        ServerBootstrap bootstrap = new ServerBootstrap(factory);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory()
        {
            public ChannelPipeline getPipeline()
            {
                ChannelPipeline pipeline = Channels.pipeline(
                        new ServerDecoderHandler(),
                        new FileSearchHandler());
                return pipeline;
            }
        });
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);
        bootstrap.bind(new InetSocketAddress(8027));

 服务端的代码也很简单,后面会详细介绍handler

 

3、先看客户端的handler,一个是ClientBufferHandler,这个是用来发送命令并接收服务端响应handler

ClientBufferHandler extends SimpleChannelHandler

 

private static final String testPath = "F:/ test/test";
    
private String cmd;
private String encoding;
public ClientBufferHandler(String cmd, String encoding)
    {
        this.cmd = cmd;
        this.encoding = encoding;
    }

@Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
    {
        int cmdLength = cmd.getBytes().length;
        ChannelBuffer cmdBuffer = ChannelBuffers.buffer(cmdLength+4);
        cmdBuffer.writeInt(cmdLength);
        cmdBuffer.writeBytes(cmd.getBytes());
        e.getChannel().write(cmdBuffer);
    }
@Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
    {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
        if(!buf.readable())
        {
            return;
        }
        
        FileHelper.writeFile(testPath, buf.toString(Charset.forName(encoding)));

 重写channelConnected方法,发送命令。我这里在发送命令前面跟了四个字节的命令长度,保证服务端一次接收到所有的命令信息。

 

重写messageReceived方法,接收服务端获取的信息存入文件中。FileHelper.writeFile是把字符串追加写入文件的工具方法我就不放出了,实现方法还是很多的。

 

在这个之前还有一个解码器,因为服务端发送过来的数据都是按行发送的(每行结尾是\r\n),所以使用netty提供的一个解码器DelimiterBasedFrameDecoder实现按分隔符分割接收到的数据,构造方法见客户端的代码。保证获取的数据每次都是完整的一行。这里感谢一下http://blog.163.com/linfenliang@126/blog/static/12785719520121082103807/提供的netty的分包、组包、粘包处理机制。

 

4、然后是服务端的handler

 

首先是ServerDecoderHandler解码器,保证能够读取完整的命令并把命令前的四个字节用来标识命令长度的内容丢掉。

public class ServerDecoderHandler extends FrameDecoder
{
@Override
    protected Object decode(ChannelHandlerContext ctx, Channel c, ChannelBuffer buf) throws Exception
    {
        int length = 4;
        if(buf.readableBytes() < length)
        {
            return null;
        }
        byte[] header = new byte[length];
        buf.markReaderIndex();
        buf.readBytes(header);
        int cmdLength = (header[0] & 0xFF) << 24 | (header[1] & 0xFF) << 16 | (header[2] & 0xFF) << 8 | (header[3] & 0xFF);
        if (cmdLength != 0)
        {
            if (buf.readableBytes() < cmdLength)
            {
                buf.resetReaderIndex();
                return null;
            }
            length += cmdLength;
        }
        buf.resetReaderIndex();
        buf.readerIndex(4);
        return buf.readBytes(cmdLength);
    }
}

 这部分代码内容比较简单就不多做说明了。

 

然后是FileSearchHandler的代码。

public class CdrFileSearchHandler extends SimpleChannelUpstreamHandler
{
@Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
    {
        ChannelBuffer buf = (ChannelBuffer) e.getMessage();
        String cmd = buf.toString(Charset.defaultCharset());
        logger.info("查询命令:\r\n" + cmd);
        
        // 返回文件内容
        Channel ch = e.getChannel();
ChannelFuture f = null;
        final BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(getPath(cmd))));
        String line = "";
        while (line != null)
        {
            line = reader.readLine();
            if (line != null)
            {
                ChannelBuffer returnBuf = ChannelBuffers.dynamicBuffer();
                returnBuf.writeBytes((line + "\r\n").getBytes());
                f = ch.write(returnBuf);
            }
        }
        if(line == null)
        {
            f.addListener(new ChannelFutureListener()
            {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception
                {
                    reader.close();
                    Channel ch = future.getChannel();
                    ch.close();
                }
            });
        }
    }

这部分代码其实也很简单,就是获取到命令,根据命令选择文件(这部分代码省略了),按行读取文件,然后加上\r\n然后写入发送。当line不为null时则读取到了数据,如果为null则说明没有读取到数据,跳出循环,并且添加监听器,当发送完关闭各种链接。(之所以会这样判断是因为netty本身是多次调用messageReceived的,需要在发送完最后一条数据的时候关闭连接。)

 

这种内容发送的方式只适用与文件内容比较小可以,但是youyu 一般的handler都是同步执行的,一旦文件内容很大,就会因为文件读取耗时较长导致Worker线程不能及时返回处理其它请求,对性能影响较高,从而导致内存溢出等问题。

 

这个时候就需要使用netty提供的一个handlerExecutionHandlerExecutionHandler就是为这种情况设计了,它提供了一种异步处理任务的机制,将它之后handler处理以任务的形式投递到线程池中并直接返回。ExecutionHandler不像其它Handler都是独立的,它是所有Handler共享使用。其使用OrderedMemoryAwareThreadPoolExecutor线程池来保证同一个Channel上事件的先后顺序。

所以在服务端的代码处需要修改代码如下:

bootstrap.setPipelineFactory(new ChannelPipelineFactory()
        {
            public ChannelPipeline getPipeline()
            {
                ChannelPipeline pipeline = Channels.pipeline(
                        new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)),
                        new ServerDecoderHandler(),
                        new CdrFileSearchHandler());
                return pipeline;
            }
        });

 增加一个ExecutionHandlerhandler,即可处理。

 

 

注意ExecutionHandler一定要在不同的pipeline 之间共享。它的作用是自动从ExecutionHandler自己管理的一个线程池中拿出一个线程来处理排在它后面的业务逻辑handler。而 worker线程在经过ExecutionHandler后就结束了,它会被ChannelFactory的worker线程池所回收。

它的构造方法是ExecutionHandler(Executor executor) ,很显然executor就是ExecutionHandler内部管理的线程池了。netty额外给我们提供了两种线程池:
MemoryAwareThreadPoolExecutor和OrderedMemoryAwareThreadPoolExecutor,它们都在org.jboss.netty.handler.execution 包下。
MemoryAwareThreadPoolExecutor 确保jvm不会因为过多的线程而导致内存溢出错误,OrderedMemoryAwareThreadPoolExecutor是前一个线程池的子类,除 了保证没有内存溢出之外,还可以保证channel event的处理次序。具体可以查看API文档,上面有详细说明。

 

总结一下:写这些东西用了一天的时间,其实多数时候都是在测试大文件的问题,网上好多源码分析的文章,都很浅,真正说每种buffer怎么用,每种handler的用法的很少。而详细讲源码的文章多数也都是抄来抄去,但是也还是有好多不错的。所以遇到什么问题多找找资料,在自己探索一下应该都可以解决的。其实api上面的东西还是不少的,有时间研究的话应该好好看看netty的源码有助于更好的使用netty开发

 

最后说下这里使用的版本是3.6的。

 

 

0
0
分享到:
评论
2 楼 irfen 2013-12-05  
cwy5310 写道
你好  我在项目中使用了这个框架  遇到了编码问题  我希望输出编码方式为GBK 不知道如何设置  QQ:358597989  希望能解答一下

你在传输的时候需要以GBK编码变为字节,在解析的时候以GBK编码解析就不会出现问题。
1 楼 cwy5310 2013-11-26  
你好  我在项目中使用了这个框架  遇到了编码问题  我希望输出编码方式为GBK 不知道如何设置  QQ:358597989  希望能解答一下

相关推荐

    用netty实现文件传输

    4. **Netty 与大文件传输** - **零拷贝**:Netty 利用 NIO 的 FileChannel.transferTo 方法,实现了从文件到网络的直接传输,减少了数据拷贝的次数,提高了效率。 - **异步处理**:Netty 的非阻塞 I/O 允许在等待...

    Netty5多文件、大文件上传源码

    综上所述,"Netty5多文件、大文件上传源码"涉及到的技术点包括Netty框架的使用、多文件上传的实现、大文件分块传输、长连接管理、心跳机制、错误处理与重试策略、性能优化以及安全性措施。这些知识点共同构建了一个...

    netty文件传输服务端与客户端以及协议说明

    通用的netty传输协议 通过该协议进行文件传输 文件传输客户端与服务端 可以根据文件的最后更新时间来增量传输文件 源码开放,通过eclipse或者idea导入代码即可运行 协议开放,协议是自定义的协议,大家可以根据需求...

    采用netty与protobuf进行文件传输

    本话题聚焦于使用Netty和Protocol Buffers(Protobuf)来实现文件传输,这是一种现代且高性能的技术组合,尤其适用于大规模分布式系统。 Netty是一个开源的异步事件驱动的网络应用程序框架,用于快速开发可维护的高...

    基于Netty实现的文件上传

    客户端则通过构建一个HttpClientHandler,利用Netty的HttpObjectAggregator聚合多个HttpChunks以获取完整的文件内容。 Socket通信是网络编程的基础,Netty其实就是在Socket之上构建的一层抽象。在"基于socket通信的...

    Netty 文件上传获取进度条

    在处理文件上传时,为了提供良好的用户体验,通常需要实现文件上传的进度条功能,让用户能够实时了解文件传输的状态。在传统的HTTP协议中,这通常依赖于JavaScript的定时轮询来实现,但这种方式效率较低且消耗资源。...

    实例:如何使用Netty下载文件

    文件下载是网络通信中的常见场景,Netty通过其强大的非阻塞I/O模型,可以高效地处理大文件传输。首先,我们需要理解Netty的基本组件,包括Bootstrap(启动器)、ServerBootstrap(服务器启动器)、Channel(通道)、...

    netty file server 文件服务

    可以使用 FileRegion 类来高效地传输大文件,它允许直接将文件通道映射到网络通道,避免了额外的内存拷贝。在 Handler 中,根据请求的文件路径,创建一个 FileInputStream 并转换为 FileRegion,然后调用 Channel 的...

    NETTY 分布式文件传输

    标题中的“NETTY 分布式文件传输”是指利用Netty框架构建一个能够在多个节点间进行文件传输的服务。Netty是一个高性能、异步事件驱动的网络应用程序框架,它为开发自定义网络协议或高并发的客户端-服务器应用提供了...

    使用Netty搭建WebSocket服务器,可修改单包大小限制

    在开发WebSocket服务时,我们可能会遇到数据包大小的限制问题,这会影响到大文件或者大量数据的传输。Netty是一个高性能、异步事件驱动的网络应用程序框架,它非常适合用来构建WebSocket服务器。 Netty提供了...

    Netty3多文件上传源码

    4. **流式传输**:为了处理大文件,Netty支持流式传输,即不一次性加载整个文件到内存,而是按块读取和写入,减少内存压力。在Netty中,可以通过FileRegion对象实现。 5. **进度反馈**:在文件上传过程中,客户端...

    使用netty使用http协议开发文件服务器

    对于GET请求,我们通常会检查请求的目标URL,找到对应的本地文件,然后读取文件内容作为响应体。注意,我们需要正确设置响应的状态码(如200表示成功,404表示未找到),以及响应头(如Content-Type指示文件类型,...

    Netty实现大文件分块传输详解.pdf

    本文将详细介绍如何使用Netty来实现大文件的分块传输。 #### 知识点解析 ##### 1. Netty框架简介 Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器与客户端。其核心功能包括...

    使用netty5进行udp网络通讯

    这个小程序使用netty5进行udp网络通讯,客户端有两种,1:用netty5类库发送DatagramPacket和接收 2:直接使用DatagramSocket发送接收DatagramPacket 先运行netty_server的QuoteOfTheMomentServer, 在运行netty_...

    netty4.0文件分片上传+断点续传+权限校验

    文件分片上传是为了处理大文件传输时,避免一次性加载整个文件到内存中,从而减少内存消耗和提高传输效率。在Netty中,我们可以使用ChannelHandlerContext的writeAndFlush方法将文件流分割成多个数据块(chunk),...

    使用netty进行rtsp服务端开发.zip

    在本文中,我们将深入探讨如何使用Netty进行RTSP(Real Time Streaming Protocol)服务端的开发,以及如何处理H264、H265和AAC格式的流媒体文件。 1. RTSP简介: RTSP是一种应用层协议,主要用于控制实时流传输,如...

    Netty5.0TCP/IP上传大文件

    Netty 是一个高性能、...总结起来,Netty5.0提供的TCP/IP上传大文件功能依赖于其强大的ByteBuf管理、拆包与合包机制、以及高效的文件传输技术。通过理解和运用这些知识点,开发者能够构建出稳定且高效的文件上传系统。

    netty案例,netty4.1中级拓展篇四《Netty传输文件、分片发送、断点续传》源码

    netty案例,netty4.1中级拓展篇四《Netty传输文件、分片发送、断点续传》源码 ...

    厉害了,Netty 轻松实现文件上传!(csdn)————程序.pdf

    在本文中,我们将探讨如何使用 Netty 实现文件上传功能,同时了解 Netty 的"零拷贝"技术。 首先,Netty 的"零拷贝"特性是其性能优化的关键。在传统的文件传输中,数据在内存中可能需要多次拷贝,例如从堆内存到直接...

    netty文件传输

    总的来说,Netty 的文件传输功能强大且易于使用,结合 Java NIO 的优势,使得它成为开发高性能网络应用的理想选择。通过理解并实践 Netty 的文件传输机制,开发者可以更好地利用 Netty 构建出满足业务需求的应用。...

Global site tag (gtag.js) - Google Analytics