HIve 自定义inputformat




package loadJar;



import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;

public class NewRecordReader implements RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(NewRecordReader.class.getName());
	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private EscapeLineReader lineReader;
	int maxLineLength;
	public NewRecordReader(FileSplit inputSplit, Configuration job) throws IOException {
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
		start = inputSplit.getStart();
		end = start + inputSplit.getLength();
		final Path file = inputSplit.getPath();
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);
		// Open file and seek to the start of the split
		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(file);
		//InputStreamReader read = new InputStreamReader (new FileInputStream(f),"UTF-8");
		boolean skipFirstLine = false;
		if (codec != null) {
			lineReader = new EscapeLineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			if (start != 0) {
				skipFirstLine = true;
			lineReader = new EscapeLineReader(fileIn, job);
		if (skipFirstLine) {
			start += lineReader.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start));
		this.pos = start;
	public NewRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) {
		this.maxLineLength = maxLineLength;
		this.lineReader = new EscapeLineReader(in);
		this.start = offset;
		this.pos = offset;
		this.end = endOffset;
	public NewRecordReader(InputStream in, long offset, long endOffset, Configuration job)
		throws IOException {
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
		this.lineReader = new EscapeLineReader(in, job);
		this.start = offset;
		this.pos = offset;
		this.end = endOffset; 
	public LongWritable createKey() {
		return new LongWritable();
	public Text createValue() {
		return new Text();
	 * Reads the next record in the split.  All instances of \\t, \\n and \\r\n are replaced by a space.
	 * @param key key of the record which will map to the byte offset of the record's line
	 * @param value the record in text format
	 * @return true if a record existed, false otherwise
	 * @throws IOException
	public synchronized boolean next(LongWritable key, Text value) throws IOException {
		while (pos < end) {
			int newSize = lineReader.readLine(value, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
			String sstr = value.toString();
//		        str[1] = new String(sstr.getBytes("ISO-8859-1"),"GBK");
//			str[2] = new String(sstr.getBytes("ISO-8859-1"),"GB2312");
//			str[3] = new String(sstr.getBytes("GBK"),"GB2312");
//			str[4] = new String(sstr.getBytes("GBK"),"ISO-8859-1");
//			str[5] = new String(sstr.getBytes("GBK"),"UTF-8");
//			str[6] = new String(sstr.getBytes("GB2312"),"UTF-8");
//			str[7] = new String(sstr.getBytes("GB2312"),"GBK");
//			str[8] = new String(sstr.getBytes("GB2312"),"ISO-8859-1");
//			str[9] = new String(sstr.getBytes("UTF-8"),"GBK");
//			str[10] = new String(sstr.getBytes("UTF-8"),"GB2312");
//			str[11] = new String(sstr.getBytes("UTF-8"),"ISO-8859-1");
			sstr = new String(sstr.getBytes("UTF-8"));
//			//			String[] str = new String[13];
//			str[0] = new String(sstr.getBytes("ISO-8859-1"),"UTF-8");
//			for (int i=0;i<str.length;i++){
//				System.out.println("第"+i+"种情况: "+str[i]);
//			}
			if (newSize == 0)
				return false;
			pos += newSize;
			if (newSize < maxLineLength)
				return true;
			LOG.info("Skipped line of size " + newSize + " at position " + (pos - newSize));
		return false;
	public float getProgress() {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float)(end - start));
	public synchronized long getPos() throws IOException {
		return pos;
	public synchronized void close() throws IOException {
		if (lineReader != null)



package log_data_analyze_ftp;

import java.io.IOException;
import java.sql.*;

public class LogTest {
	private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";
	public static void main(String[] args)throws SQLException, IOException{
		try {
		    } catch (ClassNotFoundException e) {
		      // TODO Auto-generated catch block
		    Connection con = DriverManager.getConnection("jdbc:hive://", "", "");
		    Statement stmt = con.createStatement();
		    String tableName = "LogTest1";
		    stmt.executeQuery("drop table " + tableName);
//		    String s = "org.apache.hadoop.hive.contrib.serde2.RegexSerDe";
//		    String regInput = "(\\d{4}(-\\d{2}){2}\\s(\\d{2}[:,]){3})\\s([A-Z]+)\\s(-[\\u4e00-\\u9fa5]{2}:\\s([a-z]+))\\s([A-Z]{2}:\\s(\\d|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d|1\\d\\d|2[0-4]\\d|25[0-5]))\\s([\\u4e00-\\u9fa5]{4}:\\s(\\w+)\\.(\\w+))\\s([\\u4e00-\\u9fa5]{2}:\\s(\\d+)\\s[a-z]{2})\\s([\\u4e00-\\u9fa5]{4}):\\s([a-z]+)\\s(.*)";
//		    ResultSet res = stmt.executeQuery("create table " + tableName + " (line_date STRING, line_time STRING, message_type STRING, user_name STRING, ip_address STRING, file_download STRING, total_time STRING, download_status STRING, message STRING) row format serde '" +s+
//		    				"' with serdeproperties ('input.regex'='" +regInput+ "', 'output.format.string'='%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s') stored as textfile");
		    String sql = "create table " +tableName+ "(line_date string, line_time string, message_type string, user_name string, ip_address string, file_download string, total_time string, download_status string, message string)  row format delimited fields terminated by '#' collection items terminated BY '\n' stored as INPUTFORMAT 'loadJar.NewInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'";
		    ResultSet res = stmt.executeQuery(sql);
		    // show tables
		    sql = "show tables '" + tableName + "'";
		    System.out.println("Running: " + sql);
		    res = stmt.executeQuery(sql);
		    if (res.next()) {
		    // describe table
		    sql = "describe " + tableName;
		    System.out.println("Running: " + sql);
		    res = stmt.executeQuery(sql);
		    while (res.next()) {
		      System.out.println(res.getString(1) + "\t" + res.getString(2));

		    // load data into table
		    // NOTE: filepath has to be local to the hive server
		    // NOTE: /tmp/a.txt is a ctrl-A separated file with two fields per line
		    String filepath = "/home/cdh/yyx/fileDownloadInfo.log_20111223";
		    //String filepath = "/home/cdh/testlog.txt";
		    sql = "load data local inpath '" + filepath + "' into table " + tableName;
		    System.out.println("Running: " + sql);
		    res = stmt.executeQuery(sql);

		    // select * query
		    sql = "select * from " + tableName;
		    System.out.println("Running: " + sql);
		    res = stmt.executeQuery(sql);
		    while (res.next()) {
		      System.out.println(res.getString(1) + "\t" + res.getString(2) + "\t" + res.getString(3) + "\t" + res.getString(4) + "\t" + res.getString(5) + "\t" + res.getString(6) + "\t" + res.getString(7) + "\t" + res.getString(8) + "\t" + res.getString(9));
//	private static void transferFile(String srcFileName, String destFileName) throws IOException {
//		  String line_separator = System.getProperty("line.separator"); 
//		  FileInputStream fis = new FileInputStream(srcFileName);
//		  StringBuffer content = new StringBuffer();
//		  DataInputStream in = new DataInputStream(fis);
//		  BufferedReader d = new BufferedReader(new InputStreamReader(in, "GBK"));// , "UTF-8"  
//		  String line = null;
//		  while ((line = d.readLine()) != null)
//		   content.append(line + line_separator);
//		  d.close();
//		  in.close();
//		  fis.close();
//		  Writer ow = new OutputStreamWriter(new FileOutputStream(destFileName), "utf-8");
//		  ow.write(content.toString());
//		  ow.close();
//		 }


