-
HIve 自定义inputformat10
大家好,初学hive想做一个hive的日志处理,但是我所拥有的含有中文的日志文件是“ANSI”格式编码的,(GBK的)。但是hive编码均用utf8,我现在想在日志流导入hive之前把它转成utf8格式,但是console读出来中文仍然是乱码。所以想请大家帮忙。我用的自定义inputformat的原版是https://github.com/msukmanowsky/OmnitureDataFileInputFormat。我想在读入的时候把所有内容转码成标准的UTF8,请大家帮忙,谢谢。
以下是我的尝试:
package loadJar;
import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.RecordReader; public class NewRecordReader implements RecordReader<LongWritable, Text> { private static final Log LOG = LogFactory.getLog(NewRecordReader.class.getName()); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private EscapeLineReader lineReader; int maxLineLength; public NewRecordReader(FileSplit inputSplit, Configuration job) throws IOException { this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = inputSplit.getStart(); end = start + inputSplit.getLength(); final Path file = inputSplit.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // Open file and seek to the start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(file); //InputStreamReader read = new InputStreamReader (new FileInputStream(f),"UTF-8"); boolean skipFirstLine = false; if (codec != null) { lineReader = new EscapeLineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } lineReader = new EscapeLineReader(fileIn, job); } if (skipFirstLine) { start += lineReader.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start; } public NewRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) { this.maxLineLength = maxLineLength; this.lineReader = new EscapeLineReader(in); this.start = offset; this.pos = offset; this.end = endOffset; } public NewRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException { this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); this.lineReader = new EscapeLineReader(in, job); this.start = offset; this.pos = offset; this.end = endOffset; } public LongWritable createKey() { return new LongWritable(); } public Text createValue() { return new Text(); } /** * Reads the next record in the split. All instances of \\t, \\n and \\r\n are replaced by a space. * @param key key of the record which will map to the byte offset of the record's line * @param value the record in text format * @return true if a record existed, false otherwise * @throws IOException */ public synchronized boolean next(LongWritable key, Text value) throws IOException { while (pos < end) { key.set(pos); int newSize = lineReader.readLine(value, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); String sstr = value.toString(); // str[1] = new String(sstr.getBytes("ISO-8859-1"),"GBK"); // str[2] = new String(sstr.getBytes("ISO-8859-1"),"GB2312"); // str[3] = new String(sstr.getBytes("GBK"),"GB2312"); // str[4] = new String(sstr.getBytes("GBK"),"ISO-8859-1"); // str[5] = new String(sstr.getBytes("GBK"),"UTF-8"); // str[6] = new String(sstr.getBytes("GB2312"),"UTF-8"); // str[7] = new String(sstr.getBytes("GB2312"),"GBK"); // str[8] = new String(sstr.getBytes("GB2312"),"ISO-8859-1"); // str[9] = new String(sstr.getBytes("UTF-8"),"GBK"); // str[10] = new String(sstr.getBytes("UTF-8"),"GB2312"); // str[11] = new String(sstr.getBytes("UTF-8"),"ISO-8859-1"); sstr = new String(sstr.getBytes("UTF-8")); // // String[] str = new String[13]; // str[0] = new String(sstr.getBytes("ISO-8859-1"),"UTF-8"); // // for (int i=0;i<str.length;i++){ // System.out.println("第"+i+"种情况: "+str[i]); // } System.out.println(sstr); value.set(sstr); if (newSize == 0) return false; pos += newSize; if (newSize < maxLineLength) return true; LOG.info("Skipped line of size " + newSize + " at position " + (pos - newSize)); } return false; } public float getProgress() { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float)(end - start)); } } public synchronized long getPos() throws IOException { return pos; } public synchronized void close() throws IOException { if (lineReader != null) lineReader.close(); } }
请大家看注释掉的代码,这是我当时想的办法,后来发现中文显示全是显示乱码。以下是我的test类。
package log_data_analyze_ftp; import java.io.IOException; import java.sql.*; public class LogTest { private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver"; public static void main(String[] args)throws SQLException, IOException{ try { Class.forName(driverName); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); System.exit(1); } Connection con = DriverManager.getConnection("jdbc:hive://172.17.0.119:10000/default", "", ""); Statement stmt = con.createStatement(); String tableName = "LogTest1"; stmt.executeQuery("drop table " + tableName); // String s = "org.apache.hadoop.hive.contrib.serde2.RegexSerDe"; // String regInput = "(\\d{4}(-\\d{2}){2}\\s(\\d{2}[:,]){3})\\s([A-Z]+)\\s(-[\\u4e00-\\u9fa5]{2}:\\s([a-z]+))\\s([A-Z]{2}:\\s(\\d|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d|1\\d\\d|2[0-4]\\d|25[0-5]))\\s([\\u4e00-\\u9fa5]{4}:\\s(\\w+)\\.(\\w+))\\s([\\u4e00-\\u9fa5]{2}:\\s(\\d+)\\s[a-z]{2})\\s([\\u4e00-\\u9fa5]{4}):\\s([a-z]+)\\s(.*)"; // ResultSet res = stmt.executeQuery("create table " + tableName + " (line_date STRING, line_time STRING, message_type STRING, user_name STRING, ip_address STRING, file_download STRING, total_time STRING, download_status STRING, message STRING) row format serde '" +s+ // "' with serdeproperties ('input.regex'='" +regInput+ "', 'output.format.string'='%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s') stored as textfile"); String sql = "create table " +tableName+ "(line_date string, line_time string, message_type string, user_name string, ip_address string, file_download string, total_time string, download_status string, message string) row format delimited fields terminated by '#' collection items terminated BY '\n' stored as INPUTFORMAT 'loadJar.NewInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'"; ResultSet res = stmt.executeQuery(sql); // show tables sql = "show tables '" + tableName + "'"; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); if (res.next()) { System.out.println(res.getString(1)); } // describe table sql = "describe " + tableName; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); while (res.next()) { System.out.println(res.getString(1) + "\t" + res.getString(2)); } // load data into table // NOTE: filepath has to be local to the hive server // NOTE: /tmp/a.txt is a ctrl-A separated file with two fields per line String filepath = "/home/cdh/yyx/fileDownloadInfo.log_20111223"; //transferFile(filepath,filepath); //String filepath = "/home/cdh/testlog.txt"; sql = "load data local inpath '" + filepath + "' into table " + tableName; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); // select * query sql = "select * from " + tableName; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); while (res.next()) { System.out.println(res.getString(1) + "\t" + res.getString(2) + "\t" + res.getString(3) + "\t" + res.getString(4) + "\t" + res.getString(5) + "\t" + res.getString(6) + "\t" + res.getString(7) + "\t" + res.getString(8) + "\t" + res.getString(9)); } } // private static void transferFile(String srcFileName, String destFileName) throws IOException { // String line_separator = System.getProperty("line.separator"); // FileInputStream fis = new FileInputStream(srcFileName); // StringBuffer content = new StringBuffer(); // DataInputStream in = new DataInputStream(fis); // BufferedReader d = new BufferedReader(new InputStreamReader(in, "GBK"));// , "UTF-8" // String line = null; // while ((line = d.readLine()) != null) // content.append(line + line_separator); // d.close(); // in.close(); // fis.close(); // // Writer ow = new OutputStreamWriter(new FileOutputStream(destFileName), "utf-8"); // ow.write(content.toString()); // ow.close(); // } }
请各位大大们帮助我解决中文乱码问题,谢谢啦~\(≧▽≦)/~
2012年1月09日 09:11
相关推荐
在Hive中使用自定义InputFormat,需要在HQL查询中指定`ROW FORMAT SERDE`和`INPUTFORMAT`,例如: ```sql CREATE TABLE logs ( field1 STRING, field2 STRING, ... ) ROW FORMAT SERDE 'org.apache.hadoop.hive...
本文将介绍Hive多字节分隔符问题的解决方案,包括替换分隔符、RegexSerDe正则加载和自定义InputFormat三种方法。 应用场景 在实际工作中,我们遇到的数据往往不是非常规范化的数据,例如,我们会遇到以下两种情况...
Apache Hive 的 InputFormat,在查询 SequenceFiles 时将返回 (Text) 键和 (Text) 值。 我需要在不拆分内容的情况下完整解析大量文本文件。 HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将...
1. **编写自定义InputFormat**:你需要创建一个继承自Hive的`org.apache.hadoop.hive.ql.io.HiveInputFormat`的类。在这个类中,重写`getSplits()`方法,该方法用于决定如何将输入分区为多个工作单元(split)。在...
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.protobuf.ProtobufInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.protobuf.ProtobufOutputFormat'; ``` 其中,`protobuf.schema`属性指定了....
3. 编程技巧:通过编程技巧的提升,例如选择合适的InputFormat来处理数据,可以有效提升Hive和MapReduce作业的效率。例如,使用CombineFileInputFormat而不是旧的MultiFileInputFormat(已废弃),可以更高效地处理...
- **contrib**:扩展功能,如自定义SerDe和UDF。 #### 主要流程 **顶级流程**介绍了Hive Interfaces部分使用Hive Query Engine的主要流程。 1. **命令行参数处理**:处理`-hiveconf x=y`类型的命令行参数,这类...
**背景**:在Hive中,使用自定义函数(UDF)相比于使用Transform可以带来更高的性能和更简单的代码。 **解决方案**:尽可能使用UDF来完成复杂的数据处理任务,而避免使用Transform。UDF可以更高效地利用Hive的执行...
4. **UDF/UDAF/UDTF**:Hive支持用户自定义函数(UDF)、聚合函数(UDAF)以及表生成函数(UDTF),可以扩展Hive的功能。 ### 性能优化 1. **小文件合并**:Hive可以通过调整参数来减少小文件的数量,从而提高查询性能。...
通过自定义InputFormat和OutputFormat,Sqoop能够适应不同的数据源和目标格式。例如,你可以使用Sqoop将MySQL、Oracle等传统数据库中的数据导入到HDFS或Hive,反之亦然,将Hadoop中的数据导出回关系数据库。 在数据...
6. Hadoop数据输入与输出:学习如何使用InputFormat和OutputFormat进行数据读取和写入,以及自定义InputFormat和OutputFormat的方法。 7. Hadoop作业提交与监控:了解如何使用Hadoop命令行工具提交作业,以及如何...
delimiter']] [STORED AS INPUTFORMAT 'inputformat_classname' OUTPUTFORMAT 'outputformat_classname'] [LOCATION 'hdfs_path'] [TBLPROPERTIES (tbl_property_name=tbl_property_value, ...)];` - 示例:`CREATE...
6. **数据输入和输出格式**:学习自定义InputFormat和OutputFormat,以处理非标准格式的数据,如CSV、JSON或其他定制格式。 7. **错误处理和容错机制**:理解Hadoop的检查点、故障检测和恢复策略,以及如何在代码中...
5. **实战应用**:通过源码分析,开发者可以学习如何定制Hadoop,如调整配置参数以优化性能,开发自定义InputFormat、OutputFormat、Partitioner等,满足特定的数据处理需求。 6. **性能优化**:源码分析有助于识别...
3. **MapReduce编程**:讲解如何编写Map和Reduce函数,包括键值对的处理、分区、排序和归约过程,以及自定义InputFormat和OutputFormat。 4. **Hadoop生态组件**:如HBase(分布式数据库)、Hive(数据仓库工具)、...
MapReduce还提供了扩展接口,允许用户自定义InputFormat、Mapper、Partitioner、Reducer和OutputFormat,以适应不同的数据处理需求。 在Hadoop生态系统中,还有许多其他工具和框架。例如,Pig是一种高级查询语言,...
2. 功能扩展:Hadoop提供了丰富的API,允许开发人员根据需求扩展其功能,如自定义InputFormat、OutputFormat、Partitioner等。源码中包含了大量的示例,可以帮助我们更好地理解和使用这些接口。 四、Hadoop在实际...
7. **Hadoop的扩展性**:源代码中会有关于如何自定义InputFormat、OutputFormat、RecordReader、RecordWriter等内容,这让你能够根据需求扩展Hadoop以适应各种数据格式和处理需求。 8. **配置与优化**:书中源码还...