0 0

HIve 自定义inputformat10

大家好,初学hive想做一个hive的日志处理,但是我所拥有的含有中文的日志文件是“ANSI”格式编码的,(GBK的)。但是hive编码均用utf8,我现在想在日志流导入hive之前把它转成utf8格式,但是console读出来中文仍然是乱码。所以想请大家帮忙。我用的自定义inputformat的原版是https://github.com/msukmanowsky/OmnitureDataFileInputFormat。我想在读入的时候把所有内容转码成标准的UTF8,请大家帮忙,谢谢。

以下是我的尝试:

 

package loadJar;

 

 

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;

public class NewRecordReader implements RecordReader<LongWritable, Text> {
	
private static final Log LOG = LogFactory.getLog(NewRecordReader.class.getName());
	
	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private EscapeLineReader lineReader;
	int maxLineLength;
	
	public NewRecordReader(FileSplit inputSplit, Configuration job) throws IOException {
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
		start = inputSplit.getStart();
		end = start + inputSplit.getLength();
		final Path file = inputSplit.getPath();
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);
		
		// Open file and seek to the start of the split
		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(file);
		//InputStreamReader read = new InputStreamReader (new FileInputStream(f),"UTF-8");
		boolean skipFirstLine = false;
		if (codec != null) {
			lineReader = new EscapeLineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			if (start != 0) {
				skipFirstLine = true;
				--start;
				fileIn.seek(start);
			}
			lineReader = new EscapeLineReader(fileIn, job);
		}
		
		if (skipFirstLine) {
			start += lineReader.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
	}
	
	public NewRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) {
		this.maxLineLength = maxLineLength;
		this.lineReader = new EscapeLineReader(in);
		this.start = offset;
		this.pos = offset;
		this.end = endOffset;
	}
	
	public NewRecordReader(InputStream in, long offset, long endOffset, Configuration job)
		throws IOException {
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
		this.lineReader = new EscapeLineReader(in, job);
		this.start = offset;
		this.pos = offset;
		this.end = endOffset; 
	}
	
	public LongWritable createKey() {
		return new LongWritable();
	}
	
	public Text createValue() {
		return new Text();
	}
	
	/**
	 * Reads the next record in the split.  All instances of \\t, \\n and \\r\n are replaced by a space.
	 * @param key key of the record which will map to the byte offset of the record's line
	 * @param value the record in text format
	 * @return true if a record existed, false otherwise
	 * @throws IOException
	 */
	public synchronized boolean next(LongWritable key, Text value) throws IOException {
		while (pos < end) {
			key.set(pos);
			int newSize = lineReader.readLine(value, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
			String sstr = value.toString();
//		        str[1] = new String(sstr.getBytes("ISO-8859-1"),"GBK");
//			str[2] = new String(sstr.getBytes("ISO-8859-1"),"GB2312");
//			str[3] = new String(sstr.getBytes("GBK"),"GB2312");
//			str[4] = new String(sstr.getBytes("GBK"),"ISO-8859-1");
//			str[5] = new String(sstr.getBytes("GBK"),"UTF-8");
//			str[6] = new String(sstr.getBytes("GB2312"),"UTF-8");
//			str[7] = new String(sstr.getBytes("GB2312"),"GBK");
//			str[8] = new String(sstr.getBytes("GB2312"),"ISO-8859-1");
//			str[9] = new String(sstr.getBytes("UTF-8"),"GBK");
//			str[10] = new String(sstr.getBytes("UTF-8"),"GB2312");
//			str[11] = new String(sstr.getBytes("UTF-8"),"ISO-8859-1");
			sstr = new String(sstr.getBytes("UTF-8"));
//			//			String[] str = new String[13];
//			str[0] = new String(sstr.getBytes("ISO-8859-1"),"UTF-8");
//	
//			for (int i=0;i<str.length;i++){
//				System.out.println("第"+i+"种情况: "+str[i]);
//			}
				
			System.out.println(sstr);
			value.set(sstr);
			
			if (newSize == 0)
				return false;
			
			pos += newSize;
			if (newSize < maxLineLength)
				return true;
			
			LOG.info("Skipped line of size " + newSize + " at position " + (pos - newSize));
		}
		return false;
	}
	
	public float getProgress() {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float)(end - start));
		}
	}
	
	public synchronized long getPos() throws IOException {
		return pos;
	}
	
	public synchronized void close() throws IOException {
		if (lineReader != null)
			lineReader.close();
	}
}

 请大家看注释掉的代码,这是我当时想的办法,后来发现中文显示全是显示乱码。以下是我的test类。

 

package log_data_analyze_ftp;

import java.io.IOException;
import java.sql.*;

public class LogTest {
	private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";
	
	public static void main(String[] args)throws SQLException, IOException{
		
		try {
		      Class.forName(driverName);
		    } catch (ClassNotFoundException e) {
		      // TODO Auto-generated catch block
		      e.printStackTrace();
		      System.exit(1);
		    }
		    Connection con = DriverManager.getConnection("jdbc:hive://172.17.0.119:10000/default", "", "");
		    Statement stmt = con.createStatement();
		    String tableName = "LogTest1";
		    stmt.executeQuery("drop table " + tableName);
//		    String s = "org.apache.hadoop.hive.contrib.serde2.RegexSerDe";
//		    String regInput = "(\\d{4}(-\\d{2}){2}\\s(\\d{2}[:,]){3})\\s([A-Z]+)\\s(-[\\u4e00-\\u9fa5]{2}:\\s([a-z]+))\\s([A-Z]{2}:\\s(\\d|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d|1\\d\\d|2[0-4]\\d|25[0-5]))\\s([\\u4e00-\\u9fa5]{4}:\\s(\\w+)\\.(\\w+))\\s([\\u4e00-\\u9fa5]{2}:\\s(\\d+)\\s[a-z]{2})\\s([\\u4e00-\\u9fa5]{4}):\\s([a-z]+)\\s(.*)";
//		    ResultSet res = stmt.executeQuery("create table " + tableName + " (line_date STRING, line_time STRING, message_type STRING, user_name STRING, ip_address STRING, file_download STRING, total_time STRING, download_status STRING, message STRING) row format serde '" +s+
//		    				"' with serdeproperties ('input.regex'='" +regInput+ "', 'output.format.string'='%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s') stored as textfile");
		    
		    
		    String sql = "create table " +tableName+ "(line_date string, line_time string, message_type string, user_name string, ip_address string, file_download string, total_time string, download_status string, message string)  row format delimited fields terminated by '#' collection items terminated BY '\n' stored as INPUTFORMAT 'loadJar.NewInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'";
		    ResultSet res = stmt.executeQuery(sql);
		    
		  
		    
		    // show tables
		    sql = "show tables '" + tableName + "'";
		    System.out.println("Running: " + sql);
		    res = stmt.executeQuery(sql);
		    if (res.next()) {
		      System.out.println(res.getString(1));
		    }
		    // describe table
		    sql = "describe " + tableName;
		    System.out.println("Running: " + sql);
		    res = stmt.executeQuery(sql);
		    while (res.next()) {
		      System.out.println(res.getString(1) + "\t" + res.getString(2));
		    }

		    // load data into table
		    // NOTE: filepath has to be local to the hive server
		    // NOTE: /tmp/a.txt is a ctrl-A separated file with two fields per line
		    String filepath = "/home/cdh/yyx/fileDownloadInfo.log_20111223";
		    
		    //transferFile(filepath,filepath);
		    //String filepath = "/home/cdh/testlog.txt";
		    
		    sql = "load data local inpath '" + filepath + "' into table " + tableName;
		    System.out.println("Running: " + sql);
		    res = stmt.executeQuery(sql);

		    // select * query
		    sql = "select * from " + tableName;
		    System.out.println("Running: " + sql);
		    res = stmt.executeQuery(sql);
		    
		    while (res.next()) {
		      System.out.println(res.getString(1) + "\t" + res.getString(2) + "\t" + res.getString(3) + "\t" + res.getString(4) + "\t" + res.getString(5) + "\t" + res.getString(6) + "\t" + res.getString(7) + "\t" + res.getString(8) + "\t" + res.getString(9));
		    }
		    
		  }
	
//	private static void transferFile(String srcFileName, String destFileName) throws IOException {
//		  String line_separator = System.getProperty("line.separator"); 
//		  FileInputStream fis = new FileInputStream(srcFileName);
//		  StringBuffer content = new StringBuffer();
//		  DataInputStream in = new DataInputStream(fis);
//		  BufferedReader d = new BufferedReader(new InputStreamReader(in, "GBK"));// , "UTF-8"  
//		  String line = null;
//		  while ((line = d.readLine()) != null)
//		   content.append(line + line_separator);
//		  d.close();
//		  in.close();
//		  fis.close();
//		      
//		  Writer ow = new OutputStreamWriter(new FileOutputStream(destFileName), "utf-8");
//		  ow.write(content.toString());
//		  ow.close();
//		 }
		
	}

请各位大大们帮助我解决中文乱码问题,谢谢啦~\(≧▽≦)/~

1个答案 按时间排序 按投票排序

0 0

为啥不用LOG4J

2012年1月25日 00:29

相关推荐

    hive inputformat

    在Hive中使用自定义InputFormat,需要在HQL查询中指定`ROW FORMAT SERDE`和`INPUTFORMAT`,例如: ```sql CREATE TABLE logs ( field1 STRING, field2 STRING, ... ) ROW FORMAT SERDE 'org.apache.hadoop.hive...

    Hive多字节分隔符解决方案.docx

    本文将介绍Hive多字节分隔符问题的解决方案,包括替换分隔符、RegexSerDe正则加载和自定义InputFormat三种方法。 应用场景 在实际工作中,我们遇到的数据往往不是非常规范化的数据,例如,我们会遇到以下两种情况...

    SequenceFileKeyValueInputFormat:自定义 Hadoop InputFormat

    Apache Hive 的 InputFormat,在查询 SequenceFiles 时将返回 (Text) 键和 (Text) 值。 我需要在不拆分内容的情况下完整解析大量文本文件。 HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将...

    hive数据表-小文件合并代码(java)

    1. **编写自定义InputFormat**:你需要创建一个继承自Hive的`org.apache.hadoop.hive.ql.io.HiveInputFormat`的类。在这个类中,重写`getSplits()`方法,该方法用于决定如何将输入分区为多个工作单元(split)。在...

    Hive 对 Protobuf 序列化文件读取.zip

    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.protobuf.ProtobufInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.protobuf.ProtobufOutputFormat'; ``` 其中,`protobuf.schema`属性指定了....

    Hive及Hadoop作业调优

    3. 编程技巧:通过编程技巧的提升,例如选择合适的InputFormat来处理数据,可以有效提升Hive和MapReduce作业的效率。例如,使用CombineFileInputFormat而不是旧的MultiFileInputFormat(已废弃),可以更高效地处理...

    hive源码分析

    - **contrib**:扩展功能,如自定义SerDe和UDF。 #### 主要流程 **顶级流程**介绍了Hive Interfaces部分使用Hive Query Engine的主要流程。 1. **命令行参数处理**:处理`-hiveconf x=y`类型的命令行参数,这类...

    写好Hive_程序的五个提示

    **背景**:在Hive中,使用自定义函数(UDF)相比于使用Transform可以带来更高的性能和更简单的代码。 **解决方案**:尽可能使用UDF来完成复杂的数据处理任务,而避免使用Transform。UDF可以更高效地利用Hive的执行...

    Hive编程指南

    4. **UDF/UDAF/UDTF**:Hive支持用户自定义函数(UDF)、聚合函数(UDAF)以及表生成函数(UDTF),可以扩展Hive的功能。 ### 性能优化 1. **小文件合并**:Hive可以通过调整参数来减少小文件的数量,从而提高查询性能。...

    hadoop项目--网站流量日志分析--5.docx

    通过自定义InputFormat和OutputFormat,Sqoop能够适应不同的数据源和目标格式。例如,你可以使用Sqoop将MySQL、Oracle等传统数据库中的数据导入到HDFS或Hive,反之亦然,将Hadoop中的数据导出回关系数据库。 在数据...

    hadoop 开发者入门专刊 1-4

    6. Hadoop数据输入与输出:学习如何使用InputFormat和OutputFormat进行数据读取和写入,以及自定义InputFormat和OutputFormat的方法。 7. Hadoop作业提交与监控:了解如何使用Hadoop命令行工具提交作业,以及如何...

    高级软件人才培训专家-Hadoop课程资料-5-第五章 - 分布式SQL计算 Hive 语法与概念

    delimiter']] [STORED AS INPUTFORMAT 'inputformat_classname' OUTPUTFORMAT 'outputformat_classname'] [LOCATION 'hdfs_path'] [TBLPROPERTIES (tbl_property_name=tbl_property_value, ...)];` - 示例:`CREATE...

    Hadoop高级编程- 构建与实现大数据解决方案

    6. **数据输入和输出格式**:学习自定义InputFormat和OutputFormat,以处理非标准格式的数据,如CSV、JSON或其他定制格式。 7. **错误处理和容错机制**:理解Hadoop的检查点、故障检测和恢复策略,以及如何在代码中...

    Hadoop源码分析

    5. **实战应用**:通过源码分析,开发者可以学习如何定制Hadoop,如调整配置参数以优化性能,开发自定义InputFormat、OutputFormat、Partitioner等,满足特定的数据处理需求。 6. **性能优化**:源码分析有助于识别...

    《实战Hadoop--开启通向云计算的捷径》源码

    3. **MapReduce编程**:讲解如何编写Map和Reduce函数,包括键值对的处理、分区、排序和归约过程,以及自定义InputFormat和OutputFormat。 4. **Hadoop生态组件**:如HBase(分布式数据库)、Hive(数据仓库工具)、...

    大数据应用技术介绍.pptx

    MapReduce还提供了扩展接口,允许用户自定义InputFormat、Mapper、Partitioner、Reducer和OutputFormat,以适应不同的数据处理需求。 在Hadoop生态系统中,还有许多其他工具和框架。例如,Pig是一种高级查询语言,...

    hadoop-3.1.2-src.tar.gz

    2. 功能扩展:Hadoop提供了丰富的API,允许开发人员根据需求扩展其功能,如自定义InputFormat、OutputFormat、Partitioner等。源码中包含了大量的示例,可以帮助我们更好地理解和使用这些接口。 四、Hadoop在实际...

    hadoop权威指南第二版源代码

    7. **Hadoop的扩展性**:源代码中会有关于如何自定义InputFormat、OutputFormat、RecordReader、RecordWriter等内容,这让你能够根据需求扩展Hadoop以适应各种数据格式和处理需求。 8. **配置与优化**:书中源码还...

Global site tag (gtag.js) - Google Analytics