`
yuzhongzi81
  • 浏览: 3557 次
文章分类
社区版块
存档分类
最新评论

java nio socket结合netty传输文件

 
阅读更多

最近才开始研究java nio以及netty方面的知识,作为一个IT菜鸟,对这些东西几乎一无所知,所以有些地方的见解也纯属个人愚见,出现差错也在所难免

基于我的理解,java nio 之间数据的传递主要建立在channel和ByteBuffer之上,将要传递的数据转化成ByteBuffer写入到对应的channel中去,就实现了数据的写入

同理,使用netty实际上是基于java nio的一个框架,只不过对其进行了更好的封装,它通过管道中的Handler接收数据,然后将数据对应转化为对应的Bytebuf,实现数据的读取


以下就是一个文件传输例子的实现

其中客户端参考了网络上的数据重传以及给文件加上首部的一些方法(java nio 文件传输)

客户端代码:NioSocketClient.java

package com.test.nio.netty;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.util.logging.Logger;
 
public class NioSocketClient {
	private final static Logger log = Logger.getLogger(NioSocketClient.class.getName());
	private final static String START = "START";
	private InetSocketAddress inetSocketAddress;
	
    /* 发送数据缓冲区 */
    private static ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
     
    public CopyOfNioSocketClient(int port){
        try{
            inetSocketAddress = new InetSocketAddress("localhost", port);
            init();
        } catch(Exception e){
            e.printStackTrace();
        }
         
    }
    private void init(){
        try {
            SocketChannel socketChannel = SocketChannel.open(inetSocketAddress);
            socketChannel.configureBlocking(false);
            sendFile(socketChannel);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
     
    private void sendFile(SocketChannel client) {
        FileInputStream fis = null;
        FileChannel channel = null;
        try {
            File file = new File("F:\\test\\1.avi");
            fis = new FileInputStream(file);
            channel = fis.getChannel();
            int i = 1;
            int sum = 0;
            int len = 0;
            
            //写文件头部
            String file_name = file.getName();
            Long file_length = file.length();
            writeFileHead(client, START);
            writeFileHead(client, file_name);
            writeFileHead(client, String.valueOf(file_length));
            
            //写文件内容
            while((len = channel.read(sendBuffer)) != -1) {
                sendBuffer.flip(); 
                int send = client.write(sendBuffer);
                
                log.info("已发送文件总字节数" + (sum += len));
                log.info("i发送-->" + (i++) + " len:" + len + " send:" + send);
                // 考虑到服务器缓冲区满的情况
                while(send == 0){
                    Thread.sleep(10);
                    send = client.write(sendBuffer);
                    log.info("i重新-->" + i + " len:" + len + " send:" + send);
                }
                sendBuffer.clear(); 
           }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                fis.close();
                client.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
 
        }
    }
    
    /**
     * @category 写文件头部
     * @param channel
     * @param data
     * @throws IOException
     */
    public void writeFileHead(SocketChannel channel, String data) throws IOException {
    	byte[] d_content = data.getBytes();
    	byte[] d_len = i2b(d_content.length);
    	
    	sendBuffer.put(d_len);
    	sendBuffer.put(d_content);
    }
    
    private byte[] i2b(int i) {
    	//4个字节
    	return new byte[] {
    			(byte) ((i >> 24) & 0xFF),	
    			(byte) ((i >> 16) & 0xFF),	
    			(byte) ((i >> 8) & 0xFF),	
    			(byte) (i & 0xFF),	
    	};
    }
    
    public static void main(String[] args){
        new NioSocketClient(12345);
    }
    
}

服务器端:NettyServer.java

package com.test.nio.netty;

import java.net.InetSocketAddress;
import java.util.logging.Logger;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
	private static final Logger log = Logger.getLogger(NettyServer.class.getName());
	private InetSocketAddress inetSocketAddress;
	
	public NettyServer(int port) {
		this.inetSocketAddress = new InetSocketAddress(port);
	}
	
	public void start() {
		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup);
			b.channel(NioServerSocketChannel.class);
			b.option(ChannelOption.SO_KEEPALIVE, true);
			b.childHandler(new ChannelInitializer<Channel>() {

				@Override
				protected void initChannel(Channel ch) throws Exception {
					//ch.pipeline().addLast(new StringEncoder());
					ch.pipeline().addLast(new FileReceiveHandler());
					
				}
			});
			
			log.info("server listen on " + inetSocketAddress.getPort() + ".");
			
			ChannelFuture f = b.bind(inetSocketAddress).sync();
			f.channel().closeFuture().sync();
		} catch (Exception e) {
			// TODO: handle exception
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
	
	public static void main(String[] args) {
		NettyServer ns = new NettyServer(12345);
		ns.start();
	}
}

Handler代码:FileReceivedHandler.java

package com.test.nio.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.FileOutputStream;
import java.util.logging.Logger;

public class FileReceiveHandler extends ChannelInboundHandlerAdapter {
	private final static Logger log = Logger.getLogger(FileReceiveHandler.class.getName());
	private static String R_FLAG;	//是否第一次接收数据
	private static String FILE_NAME = "DEFAULT";	//默认文件名
	private static String STATE = "START";	//文件传输标志
	private static Long FILE_LENGTH = 0L;	//文件总长度
	private static Long READ_LENGTH = 0L;	//文件已读长度

	private FileOutputStream fos;

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		writeMessageToClient(ctx, "connect success");
		log.info("--Client in--");
		
		R_FLAG = "FRIST";
		READ_LENGTH = 0L;
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		

		ByteBuf buf = (ByteBuf) msg;
		
		initialFileReceivedState(buf);
		
		if(isStart()) {
			saveFile(ctx, buf);
		} else {
			writeMessageToClient(ctx, "file hasn't been transported yet");
		}
		
		releaseBuf(buf);
		closeContext(ctx);
		
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
	}
	
	/**
	 * @category 向客户端写回文件传输进度
	 * @param ctx
	 * @param transportLength
	 */
	private void writeTransportProcess(ChannelHandlerContext ctx, Long transportLength) {
		long t_length = transportLength.longValue();
		long f_length = FILE_LENGTH.longValue();
		String process = "file transport percent ";
		String percent = String.format("%.1f", (t_length / (f_length * 1.0)) * 100) + "%";
		
		writeMessageToClient(ctx, process + percent);
	}

	/**
	 * @category 向客户端写回信息 
	 * @param ctx
	 * @param msg
	 */
	private void writeMessageToClient(ChannelHandlerContext ctx, String msg) {
		ctx.write(msg);
	}
	
	/**
	 * @category 释放ByteBuf空间
	 * @param buf
	 */
	private void releaseBuf(ByteBuf buf) {
		buf.clear();
		buf.release();
	}
	
	/**
	 * @category 判断文件传输是否开始
	 * @return
	 */
	private boolean isStart() {
		
		boolean flag = false;
		
		if("START".equalsIgnoreCase(STATE)) {
			flag = true;
		}
		
		return flag;
	}
	
	/**
	 * @category 关闭Context
	 * @return
	 */
	private void closeContext(ChannelHandlerContext ctx) {
		
		if(FILE_LENGTH.longValue() == READ_LENGTH.longValue()) {
			ctx.close();
		}
	}
	
	/**
	 * @category 文件存储
	 * @param buf
	 * @throws Exception
	 */
	private void saveFile(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
		
		if (buf.isReadable()) {
			
			READ_LENGTH += buf.readableBytes();
			
			//向客户端写回文件传输进度
			writeTransportProcess(ctx, READ_LENGTH);
			
			byte[] bytes = new byte[buf.readableBytes()];
			buf.readBytes(bytes);
			
			fos.write(bytes);
			fos.flush();
		}
	}

	/**
	 * @category 将字节转化为整数
	 * @param b
	 * @return
	 */
	private int b2i(byte[] b) {
		int value = 0;
		for (int i = 0; i < 4; i++) {
			int shift = (4 - 1 - i) * 8;
			value += (b[i] & 0x000000FF) << shift;
		}
		return value;
	}
	
	/**
	 * @category 读取头部信息
	 * @param in
	 * @return
	 */
	private String readFileHead(ByteBuf in) {
		
		byte[] stateBytes = new byte[4];
		in.readBytes(stateBytes);
		
		int stateLength = b2i(stateBytes);
		
		byte[] sendState = new byte[stateLength];
		in.readBytes(sendState);
		
		return new String(sendState);
	}
	
	/**
	 * @category 初始化从客户端发过来的文件头部信息
	 * @param in
	 * @throws Exception
	 */
	private void initialFileReceivedState(ByteBuf in) throws Exception {
		
		if("FRIST".equalsIgnoreCase(R_FLAG)) {
			
			STATE = readFileHead(in);
			FILE_NAME = readFileHead(in);
			FILE_LENGTH = Long.valueOf(readFileHead(in));
			fos = new FileOutputStream("F:\\" + FILE_NAME);
			
			log.info("server: state->" + STATE);
			log.info("server: fileName->" + FILE_NAME);
			log.info("server: fileLength->" + FILE_LENGTH);
			
			R_FLAG = "OTHER";
		}
	}
}

测试过程中,可能会出现"您的软件中途释放了一个连接"等类似错误,是因为在写入头部数据时出现差错,目前仍然在解决中...


分享到:
评论

相关推荐

    Java NIO Socket基本

    5. **文件系统API**:NIO还提供了`java.nio.file`包,包含一系列与文件系统交互的类,如Files、Paths等。 Java NIO的主要优势在于非阻塞特性。在BIO中,如果一个线程正在读取或写入数据,那么这个线程将被阻塞,...

    Java-NIO-Netty框架学习

    学习Netty,可以从基础的Socket编程开始,然后深入理解Java NIO的基本概念,接着熟悉Netty提供的各种组件和API,通过编写简单的服务端和客户端程序来实践。随着对Netty的理解加深,可以尝试实现更复杂的网络应用,如...

    Socket 之 BIO、NIO、Netty 简单实现

    在Java中,Socket通信涉及三种不同的模型:BIO(Blocking I/O)、NIO(Non-blocking I/O)和Netty,这些都是实现高并发、高性能网络服务的重要手段。 **1. Socket基础** Socket,通常被称为套接字,是网络通信的...

    从NIO到Netty,编程实战出租车905协议-08172347.pdf

    第3章,结合905.4-2014协议的基本内容,动手实现NIO长连接服务端的实现,以及协议内容的设计和实现思路; 第4章,实现长连接客户端,以及采用多线程技术进行系统性能测试; 第5章,介绍Netty相关基础知识,并使用...

    详细介绍 NIO与Netty编程-实战讲义详细pdf.7z

    **NIO(非阻塞I/O)与Netty编程**是现代Java网络应用开发中的重要技术,它们在处理高并发、低延迟的网络通信场景中起着关键作用。本讲义详细介绍了这两种技术,旨在帮助开发者更好地理解和运用它们。 ### 一、BIO...

    基于Netty实现的文件上传

    本篇将深入探讨如何利用Netty实现文件上传功能,并结合Socket通信和UDP协议进行相关知识的扩展。 首先,我们来关注"基于Netty实现的文件上传"这一主题。Netty提供了丰富的ChannelHandler(通道处理器)和ByteBuf...

    从socket到netty

    【描述】:“从socket到netty,从易到难,稳扎稳打,欢迎交流,java学习者关注:客户端服务端socket通信” Socket,也被称为套接字,是网络编程的基本接口,它允许两个网络应用程序之间进行数据交换。在Java中,`...

    Android-netty和socket通信的demo

    在Android中,由于其运行环境的限制,直接使用Netty可能会遇到一些挑战,比如Java NIO(非阻塞I/O)在Android上并不完全支持。但依然可以通过一些方式来实现,例如使用反射技术或者替换部分NIO库。在"Android-netty...

    Netty的Socket编程详解-搭建服务端与客户端并进行数据传输示例代码.rar

    - Netty的数据传输基于`ByteBuf`,它是Netty提供的高效字节缓冲区,优于Java的`ByteBuffer`。 - 在ChannelHandlerContext中,我们可以调用`writeAndFlush()`方法来发送数据。`writeAndFlush()`会将数据写入缓冲区...

    基于netty与protobuf的Android手机视频实时传输

    Netty是一个Java NIO框架,它提供了一套高度定制且易于使用的API,用于构建高性能、高并发的网络应用程序。Netty的优势在于它的事件驱动模型,通过使用Boss-Worker线程模型,能够有效地处理大量并发连接。在服务器端...

    netty传输kryo序列化的对象基于socket传输

    本主题“netty传输kryo序列化的对象基于socket传输”探讨的是如何结合Netty和Kryo,实现高效、可靠的网络数据传输。首先,我们需要了解Kryo序列化的基本原理。Kryo通过跟踪对象引用和存储类型信息来优化序列化过程,...

    netty-socket实时在线人数统计

    相比传统的Java NIO,Netty提供了更高级别的API,使得开发者可以更容易地处理网络通信。在实时在线人数统计的场景下,Netty可以作为底层通信层,处理客户端连接、数据传输和断开连接等事件。 **WebSocket** ...

    Java 两台服务器之间传递文件

    在实际开发中,还可以考虑使用NIO(非阻塞I/O)或Netty这样的高性能网络库,以提高文件传输的效率。 总之,Java实现两台服务器之间的文件传输需要掌握网络编程、I/O流处理和多线程等技能。通过创建Socket连接,使用...

    smart-socket,一种高性能的Java AIO框架.zip

    《深入解析Smart-Socket:Java AIO框架的高性能实践》 Smart-Socket,作为一个高性能的Java AIO(Asynchronous I/O)框架,是Java开发者在处理高并发、低延迟网络通信场景下的利器。AIO,又称非阻塞I/O,相较于传统...

    netty-file-master.rar

    例如,通过 FileRegion 类可以实现从文件到 Socket 直接的数据传输,避免了中间拷贝步骤。 5. **文件上传实现** 在 Netty 中实现文件上传,通常需要创建一个专门的 ChannelInboundHandler 来接收文件数据,将接收...

    netty技术文档,socket技术 多线程

    标题"Netty技术文档,Socket技术,多线程"指出我们要讨论的是Netty框架,它与Socket编程以及多线程技术的结合。Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端...

    Netty3多文件上传源码

    Netty采用非阻塞I/O(NIO)模型,利用Java的Selector和Channel API,可以在单个线程中处理多个连接,提高了系统的并行性和效率。在长连接模式下,客户端一旦建立连接,就会保持该连接,直到有明确的断开操作,这减少...

    nio socket 消息推送

    为了实现`NIO`的`Socket`服务,我们需要自定义一个`NioServerSocketChannel`工厂类,注册到`Netty`或者`Undertow`等服务器容器中,这些服务器容器已经集成了对`NIO`的支持。 1. **配置SpringBoot**: 在`...

    使用Socket传输音频

    这只是一个基础框架,实际项目中可能还需要结合其他技术,如NIO(非阻塞I/O)提高性能,或者使用框架如Netty简化网络编程。总之,理解Socket的工作原理和音频处理的流程,对于开发高效、可靠的音频传输系统至关重要...

    netty与socket交互实例

    5. **数据传输**:客户端通过Socket发送数据,Netty服务器通过接收的Channel读取这些数据,经过编解码处理后,触发相应的业务逻辑。反之,服务器也可以通过Channel发送响应到客户端。 6. **关闭连接**:当通信完成...

Global site tag (gtag.js) - Google Analytics