论坛首页 入门技术论坛

WEB多线程Socket文件上传

浏览 9007 次
该帖已经被评为新手帖
作者 正文
   发表时间:2007-05-23  

郁闷,第一次发个帖就被扣10分,呜呜……
不过我脸皮厚,又来了。

这次的问题是我在WEB应用中要实现“大”(几百兆)文件的上传,如果用IE上传,肯定Over!我用有证书的Applet在客户端用多线程通过socket将文件上传。在本机上测试好像可以,但是通过网络上传时就会发生掉包现象,必须重新上传。虽然这是程序自动的,但这样也会浪费大量的时间,有时会重复好几次。我检查了好几次代码,也没有发现什么问题!不知道是不是socket的传输不可靠呢,还是我的代码本身就有问题呢?下面是我的试验代码中主要的2个类,请大家帮我看看是否有什么地方不妥的,谢谢!!

第一个:文件上传处理程序,处理文件上传逻辑

package test;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.net.Socket;
import java.net.UnknownHostException;

import javax.swing.JOptionPane;
import javax.swing.JProgressBar;
import javax.swing.SwingUtilities;

/**
 * $Id: UploadHandler.java,v 1.1 2007/05/14 06:11:02 zhangyb Exp $
 *
 * 描述: 文件上传处理程序,处理文件上传逻辑
 * 
 *
 * @author zhangyb
 * @version 1.0
 */
public class UploadHandler extends Thread {

 /** 系统日志 */
 // private final Log logger = LogFactory.getLog(UploadHandler.class);
 /** 最大文件块大小 20M */
 public final static long MAX_FILE_BLOCK_SIZE = 10 * 1024 * 1024;

 /** 最大缓存大小 1M */
 public final static int MAX_BUFFER_SIZE = 1024 * 1024;

 /** 文件上传的最大线程数 */
 public final static int MAX_THREADS = 5;

 /** 文件系统中的文件 */
 private java.io.File realFile;

 /** 文件的块数 */
 private int blocks;

 /** 当前上传的块序号 */
 private int currentBlock = 1;

 /** 最后块的大小 */
 private long lastLength;

 /** 正在上传的线程数 */
 private int threads = 0;

 /** 文件的唯一编号 */
 private String fileCode = "aaaaaaaaaaaaaaaaaaaa";

 /** 远程主机 */
 private String host;

 /** 远程主机监听端口 */
 private int port;

 private JProgressBar progressBar = null;

 private ProgressDialog dialog = null;

 private FileUpload applet = null;

 /**
  * When the worker needs to update the GUI we do so by queuing a Runnable for the event dispatching thread with SwingUtilities.invokeLater(). In this case we're just changing the progress bars value.
  */
 public void updateStatus(final int i) {

  Runnable doSetProgressBarValue = new Runnable() {
   public void run() {
    progressBar.setValue(i);
   }
  };
  SwingUtilities.invokeLater(doSetProgressBarValue);
 }

 public UploadHandler(String host, int port, java.io.File file) {
  // logger.info("开始上传文件" + file.getName() + "...");
  this.host = host;
  this.port = port;
  this.realFile = file;
  start();
 }

 public UploadHandler(String host, int port, java.io.File file, ProgressDialog dialog, FileUpload applet) {
  // logger.info("开始上传文件" + file.getName() + "...");
  this.host = host;
  this.port = port;
  this.realFile = file;
  this.dialog = dialog;
  this.progressBar = dialog.getProgressBar();
  this.applet = applet;
  start();
 }

 public void run() {

  if (realFile.isDirectory()) {
   // 文件夹情况
   java.io.File[] files = realFile.listFiles();
   if (files != null || files.length > 0) {
    for (int i = 0; i < files.length; i++) {
     new UploadHandler(host, port, files[i]);
    }
   }
  } else {
   // 文件情况, 上传文件
   try {
    RandomAccessFile randomAccessFile = new RandomAccessFile(realFile, "r");
    long size = randomAccessFile.length();
    randomAccessFile.close();
    lastLength = size % MAX_FILE_BLOCK_SIZE;
    if (lastLength == 0) {
     blocks = (int) (size / MAX_FILE_BLOCK_SIZE);
    } else {
     blocks = (int) (size / MAX_FILE_BLOCK_SIZE) + 1;
    }
    // progressBar.setMaximum((int) size);
    progressBar.setValue(0);
    progressBar.setStringPainted(true);
    if (blocks >= MAX_THREADS) {
     threads = MAX_THREADS;
    } else {
     threads = blocks;
    }
    for (int i = 0; i < threads; i++) {
     new BlockSender(currentBlock);
     currentBlock += 1;
    }
   } catch (FileNotFoundException e) {
    // logger.error("文件" + file.getFileCode() + "未能找到, 不能完成上传.");
    reset();
   } catch (IOException e) {
    // logger.error("发生文件或网络读写错误, 不能完成上传.");
    reset();
   }
  }
 }

 /**
  * 文件块上传类
  */
 class BlockSender extends Thread {

  int block;

  Socket socket;

  BlockSender(int block) {
   this.block = block;
   try {
    // socket = new Socket(host, port);
    socket = new Socket("localhost", 5000);
    start();
   } catch (UnknownHostException e) {
    e.printStackTrace();
    System.out.println("未能找到远程主机, 不能完成上传.");
    reset();
   } catch (IOException e) {
    e.printStackTrace();
    System.out.println("建立网络连接失败, 不能完成上传.");
    JOptionPane.showMessageDialog(null, "建立网络连接失败, 不能完成上传.", "系统提示", JOptionPane.ERROR_MESSAGE);
    dialog.setVisible(false);
    reset();
   }
  }

  public void run() {
   try {
    boolean resend = false;
    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
    writer.println(101);
    String returnValue = reader.readLine();
    if (returnValue != null && returnValue.equals(String.valueOf(201))) {
     // 验证用户登录
     writer.println("userName,password");
     returnValue = reader.readLine();
     if (returnValue != null && returnValue.equals(String.valueOf(201))) {

     } else {
      System.out.println("用户验证未通过.");

      return;
     }
     // 文件名称
     writer.println(fileCode + "," + realFile.getName());
     // 文件块信息
     writer.println(block);
     OutputStream out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
     RandomAccessFile srcFile = new RandomAccessFile(realFile, "r");
     byte[] buf = new byte[MAX_BUFFER_SIZE];
     long start = (block - 1) * MAX_FILE_BLOCK_SIZE;

     srcFile.seek(start);

     int times = 0;
     long total = getBlockSize(block);
     long read = 0;
     if (total % MAX_BUFFER_SIZE == 0) {
      times = (int) total / MAX_BUFFER_SIZE;
     } else {
      times = (int) total / MAX_BUFFER_SIZE + 1;
     }
     int i = 0;
     int length = -1;
     while ((length = srcFile.read(buf)) != -1 && i < times) {
      out.write(buf, 0, length);
      out.flush();
      i++;
      read += length;
     }
     socket.shutdownOutput();
     DataInputStream in = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
     long receive = in.readLong();
     if (read != receive) {
      System.out.println("文件" + realFile.getName() + "第" + block + "块在上传过程中掉包(" + (read - receive) + "字节), 需重传.");
      resend = true;
     }
     reader.close();
     writer.close();
     srcFile.close();
     srcFile = null;
     out.close();
     socket.close();
    } else {
     // System.out.println("服务器未准备好.");
     System.out.print("服务器未准备好.");
     return;
    }
    if (resend) {
     new BlockSender(block);
    } else {
     synchronized (progressBar) {
      progressBar.setValue((progressBar.getValue() + (int) getBlockSize(block)));
     }
     if (blocks >= currentBlock) {
      new BlockSender(currentBlock);
      currentBlock += 1;
     } else {
      threads -= 1;
      if (threads == 0) {
       applet.finished();
       // Fix Me 这里建立备份文件夹的路径有问题?
       // java.io.File backupFolder = new java.io.File(realFile.getParentFile().getAbsolutePath() + java.io.File.separator + CommonUtils.formatDate(new Date(), "yyyyMMdd"));
       // if (logger.isDebugEnabled()) {
       // logger.debug("备份文件夹:" + backupFolder);
       // }
       // if (!backupFolder.exists()) {
       // backupFolder.mkdirs();
       // }
       //       
      }
     }
    }
   } catch (FileNotFoundException e) {
    // System.out.println("文件" + file.getFileCode() + "未能找到, 不能完成上传.");
    e.printStackTrace();
    reset();
   } catch (IOException e) {
    // System.out.println("发生文件或网络读写错误, 不能完成上传.");
    e.printStackTrace();
    reset();
   }
  }

 }

 private long getBlockSize(int block) {
  if (block == blocks && lastLength > 0) {
   return lastLength;
  }
  return MAX_FILE_BLOCK_SIZE;
 }

 /**
  * 恢复文件的状态
  */
 private void reset() {

 }
}

第二个:请求处理类, 处理文件上传和任务发送等业务

package test;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.net.Socket;

//import org.apache.commons.logging.Log;
//import org.apache.commons.logging.LogFactory;

//import com.excellence.exchange.domain.File;
//import com.excellence.exchange.domain.Missive;
//import com.excellence.exchange.domain.Organ;
//import com.excellence.exchange.domain.TransferTask;
//import com.excellence.exchange.facade.ExchangeFacade;
//import com.excellence.exchange.util.ConfigUtils;
//import com.excellence.exchange.util.ContextUtils;

/**
 * $Id: RequestHandler.java,v 1.1 2007/05/14 06:11:02 zhangyb Exp $
 *
 * 描述: 请求处理类, 处理文件上传和任务发送等业务
 * 
 * @author zhangyb
 * @version 1.0
 */
public class RequestHandler extends Thread {

 /** 文件交换命令--上传文件 */
 public static final int CMD_UPLOAD = 101;

 /** 文件交换命令--创建文件夹 */
 public static final int CMD_MAKE_DIR = 102;

 /** 文件交换命令--发送任务 */
 public static final int CMD_SEND_TASK = 103;

 /** 文件交换命令--机构同步 */
 public static final int CMD_SYNCHRONIZE_ORGAN = 104;

 /** 文件交换命令--机构注册 */
 public static final int CMD_REGISTER_ORGAN = 105;

 /** 请求返回状态值--就绪 */
 public static final int STATE_READY = 201;

 /** 请求返回状态值--错误 */
 public static final int STATE_ERROR = 202;

 /** 请求返回状态值--拒绝 */
 public static final int STATE_REJECT = 203;

 /** 请求返回状态值--完成 */
 public static final int STATE_FINISHED = 204;

 // private final Log logger = LogFactory.getLog(RequestHandler.class);

 private Socket request;

 public RequestHandler() {
 }

 public RequestHandler(Socket socket) {
  this.request = socket;
  start();
 }

 public void run() {
  BufferedReader reader = null;
  PrintWriter writer = null;
  DataOutputStream out = null;
  DataInputStream in = null;
  ObjectInputStream ois = null;
  ObjectOutputStream oos = null;
  try {
   reader = new BufferedReader(new InputStreamReader(request.getInputStream()));
   writer = new PrintWriter(request.getOutputStream(), true);
   String command = reader.readLine();
   // 请求类型信息
   // if (logger.isDebugEnabled()) {
   System.out.println("请求类型代码:" + command);
   // }
   // 返回准备好响应
   writer.println(STATE_READY);
   writer.flush();
   // 接收用户信息(用户名,密码)
   String userInfo = reader.readLine();
   if (userInfo != null && !userInfo.equals("")) {
    // Fix me 如果要验证用户需实现下面方法
    String[] infos = userInfo.split(",");
    if (!login(infos[0], infos[1])) {
     System.out.println("无效的用户信息.");
     writer.println(STATE_REJECT);
     writer.flush();
     return;
    }
   } else {
    System.out.println("无效的用户信息.");
    writer.println(STATE_ERROR);
    writer.flush();
    return;
   }
   // 返回准备好响应
   writer.println(STATE_READY);
   writer.flush();
   // 接收文件名称(新文件名称,原始文件名称)
   String line = reader.readLine();
   String newName = "";
   if (line != null && !line.equals("")) {
    String[] names = line.split(",");
    newName = names[1];
   } else {
    System.out.println("无效的文件名称信息.");
    return;
   }
   // 接收文件块信息
   int block = 1;
   line = reader.readLine();
   if (line != null && !line.equals("")) {
    block = Integer.parseInt(line);
   } else {
    System.out.println("无效的文件块信息.");
    return;
   }
   in = new DataInputStream(new BufferedInputStream(request.getInputStream()));
   String root = "C:/";// "/tmp/youbin/";//ConfigUtils.getTargetFolder();
   // 如果是有父文件夹的情况需要重新处理???
   long receive = saveFile(new DataInputStream(new BufferedInputStream(request.getInputStream())), root + newName, block);
   out = new DataOutputStream(new BufferedOutputStream(request.getOutputStream()));
   out.writeLong(receive);
   out.flush();

  } catch (IOException e) {
   System.out.println("发生网络读写错误, 不能正常完成请求.");
   try {
    out.writeInt(RequestHandler.STATE_ERROR);
    out.flush();
   } catch (IOException e1) {
    System.out.println("网络读写错误, 未能返回错误到客户端.");
   }
  } finally {
   if (reader != null) {
    try {
     reader.close();
    } catch (IOException e) {
     System.out.println("非正常退出请求处理.");
    }
   }
   if (writer != null) {
    writer.close();
   }
   if (out != null) {
    try {
     out.close();
    } catch (IOException e) {
     System.out.println("非正常退出请求处理.");
    }
   }
   if (in != null) {
    try {
     in.close();
    } catch (IOException e) {
     System.out.println("非正常退出请求处理.");
    }
   }
   if (ois != null) {
    try {
     ois.close();
    } catch (IOException e) {
     System.out.println("非正常退出请求处理.");
    }
   }
   if (oos != null) {
    try {
     oos.close();
    } catch (IOException e) {
     System.out.println("非正常退出请求处理.");
    }
   }
   if (request != null) {
    try {
     request.close();
    } catch (IOException e) {
     System.out.println("非正常退出请求处理.");
    }
   }
  }
 }

 /**
  * 保存文件到指定的文件夹中
  *
  * @param in 包含文件数据的流
  * @param fileName 在文件夹中的文件名称
  * @param block 当前文件块的序号
  * @return 返回实际收到的长度返回给客户端验证完整性
  */
 private long saveFile(InputStream in, String fileName, int block) {
  long receive = 0;
  try {
   RandomAccessFile targetFile = new RandomAccessFile(fileName, "rws");
   targetFile.seek((block - 1) * UploadHandler.MAX_FILE_BLOCK_SIZE);
   byte[] buf = new byte[UploadHandler.MAX_BUFFER_SIZE];
   int length = 0;
   while ((length = in.read(buf)) != -1) {
    targetFile.write(buf, 0, length);
    receive += length;
   }
   targetFile.close();
   targetFile = null;
  } catch (FileNotFoundException e) {
   e.printStackTrace();
   // 这里不应该出现异常.
  } catch (IOException e) {
   // Fix Me 这里需要处理
   e.printStackTrace();
  }
  return receive;
 }

 /**
  * 验证用户有效性
  *
  * TODO 如果需要验证则实现下面方法
  *
  * @param userName
  * @param password
  * @return
  */
 private boolean login(String userName, String password) {
  return true;
 }

}

 

   发表时间:2007-05-23  
如果大家有其它实现方法也可以在这里提提,本人感激不尽。
0 请登录后投票
   发表时间:2007-05-23  
怎么像csdn?
0 请登录后投票
   发表时间:2007-05-25  
已经搞定
1 请登录后投票
   发表时间:2007-08-06  
能不能分享一下源码呀..我也在项目中遇到这个问题..超大文件上传..头正大着呢...看到你的思路不错...你那样速度方面呢...服务器性能有什么要求...谢谢
1 请登录后投票
论坛首页 入门技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics