`

HadoopFileUtil

 
阅读更多
import java.io.File;
import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;



public class HadoopFileUtil {
static Logger logger = Logger.getLogger(HadoopFileUtil.class);
/**
  * @param args
  */
public static void main(String[] args) {
Configuration conf = new Configuration();
    String[] otherArgs = null;
try {
otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
   String src=args[0];
   String dst=args[1];
   String tag=args[2];
   HadoopFileUtil util=new HadoopFileUtil();
   if(tag!=null&&tag.equals("1")){
    System.out.println(util.createFile(src, dst));
   }
   else{
    util.deleteFile(dst);
   }

}

/**
  * 拷贝一个本地文件到hadoop里面
  * @param localFile 本地文件和路径名
  * @param hadoopFile hadoop文件和路径名
  * @return
  */
public  boolean createFile(String localFile,String hadoopFile){
  try {
   Configuration conf=new Configuration();
   FileSystem src=FileSystem.getLocal(conf);
   FileSystem dst= FileSystem.get(conf);
   Path srcpath = new Path(localFile);
   Path dstpath = new Path(hadoopFile);
   FileUtil.copy(src, srcpath, dst, dstpath,false,conf);
  } catch (Exception e) {
   e.printStackTrace();
   return false;
  } 

  return true;
}


/**将一个流作为输入,生成一个hadoop里面的文件
  * @param inStream 输入流
  * @param hadoopFile hadoop路径及文件名字
  * @return
  */
public boolean createFileByInputStream(InputStream inStream,String hadoopFile){
  try {
   Configuration conf=new Configuration();
   FileSystem dst= FileSystem.get(conf);
   Path dstpath = new Path(hadoopFile);
   FSDataOutputStream oStream=dst.create(dstpath);
   byte[] buffer = new byte[400];
   int length = 0;
   while((length = inStream.read(buffer))>0){
    oStream.write(buffer,0,length);
   }
   oStream.flush();
   oStream.close();
   inStream.close();
  } catch (Exception e) {
   e.printStackTrace();
   return false;
  } 
  return true;
}
/**
  * 删除hadoop里面的一个文件
  * @param hadoopFile
  * @return
  */
public  boolean deleteFile(String hadoopFile){
  try {
   Configuration conf=new Configuration();
   FileSystem dst= FileSystem.get(conf);
   FileUtil.fullyDelete(dst,new Path(hadoopFile));
  } catch (Exception e) {
   e.printStackTrace();
   return false;
  }
 
  return true;
}
/**
  * 从hadoop中读取一个文件流
  * @param hadoopFile
  * @return
  */
public FSDataInputStream getInputStream(String hadoopFile){
  FSDataInputStream iStream=null;
  try {
   Configuration conf=new Configuration();
   FileSystem dst= FileSystem.get(conf);
   Path p=new Path(hadoopFile);
   iStream=dst.open(p);
  } catch (Exception e) {
   e.printStackTrace();
   logger.error("getInputStream error:", e);
  }
  return iStream;
}

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics