网上找了很多材料都是写了部份代码的,今天在峰哥的帮助下实现了此功能。
为何要设置此功能是由于 hive fields terminated by '||||' 不支持 字符串导致
将你的inputformat类打成jar包,如MyInputFormat.jar
将MyInputFormat.jar放到 hive/lib里,然后就可以建表了
假设你的inputFormat类路径是com.hive.myinput
则建表语句为:create table tbname(name stirng,id int, ...) stored as INPUTFORMAT 'com.hive.myinput' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
HiveIgnoreKeyTextOutputFormat是系统自带的outputformat类,你也可以自定义
由于hive是基于hadoop集群运行的,所以hadoop/lib里面也必须放入MyInputFormat.jar,
此功能需要二个CLASS 类:ClickstreamInputFormat ClickstreamRecordReader
package com.jd.cloud.clickstore;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
/**
* 自定义hadoop的 org.apache.hadoop.mapred.InputFormat
*
* @author winston
*
*/
public class ClickstreamInputFormat extends TextInputFormat implements
JobConfigurable {
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new ClickstreamRecordReader((FileSplit) genericSplit,job);
}
}
package com.jd.cloud.clickstore;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapred.RecordReader;
public class ClickstreamRecordReader implements
RecordReader<LongWritable, Text> {
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader lineReader;
int maxLineLength;
public ClickstreamRecordReader(FileSplit inputSplit, Configuration job)
throws IOException {
maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength",
Integer.MAX_VALUE);
start = inputSplit.getStart();
end = start + inputSplit.getLength();
final Path file = inputSplit.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// Open file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(file);
boolean skipFirstLine = false;
if (codec != null) {
lineReader = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
lineReader = new LineReader(fileIn, job);
}
if (skipFirstLine) {
start += lineReader.readLine(new Text(), 0,
(int) Math.min((long) Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
public ClickstreamRecordReader(InputStream in, long offset, long endOffset,
int maxLineLength) {
this.maxLineLength = maxLineLength;
this.lineReader = new LineReader(in);
this.start = offset;
this.pos = offset;
this.end = endOffset;
}
public ClickstreamRecordReader(InputStream in, long offset, long endOffset,
Configuration job) throws IOException {
this.maxLineLength = job.getInt(
"mapred.ClickstreamRecordReader.maxlength", Integer.MAX_VALUE);
this.lineReader = new LineReader(in, job);
this.start = offset;
this.pos = offset;
this.end = endOffset;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
/**
* Reads the next record in the split. get usefull fields from the raw nginx
* log.
*
* @param key
* key of the record which will map to the byte offset of the
* record's line
* @param value
* the record in text format
* @return true if a record existed, false otherwise
* @throws IOException
*/
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
// Stay within the split
while (pos < end) {
key.set(pos);
int newSize = lineReader.readLine(value, maxLineLength,
Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
maxLineLength));
if (newSize == 0)
return false;
String str = value.toString().toLowerCase()
.replaceAll("\\@\\_\\@", "\001");
value.set(str);
pos += newSize;
if (newSize < maxLineLength)
return true;
}
return false;
}
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float) (end - start));
}
}
public synchronized long getPos() throws IOException {
return pos;
}
public synchronized void close() throws IOException {
if (lineReader != null)
lineReader.close();
}
// 测试 输出
//public static void main(String ags[]){
// String str1 ="123@_@abcd@_@fk".replaceAll("\\@\\_\\@", "\001");
// System.out.println(str1);
//}
}
1.上传到 HIVE 服务器上 JAVAC 编译
javac -cp ./:/usr/lib/hadoop/hadoop-common.jar:/home/op1/hadoop/hadoop-core-1.0.3.jar:/usr/lib/hadoop/lib/commons-logging-1.1.1.jar */**/*/*/*
2.JAR 打包 类文件
jar -cf ClickstreamInputFormat.jar /home/op1/uerdwdb/src/
3.复制 Hive/lib Hadoop/lib 文件夹内
4.Hive 创建表命令
create table hive_text(num int,name string,`add` string)
stored as INPUTFORMAT 'com.jd.cloud.clickstore.ClickstreamInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/home/op1/uerdwdb/text.txt';
分享到:
相关推荐
Hive的InputFormat允许我们灵活地处理各种数据源和格式,通过自定义InputFormat,我们可以按需定制数据读取逻辑,比如这里演示的按照空格拆分日志文件。理解并掌握InputFormat对于优化Hive查询性能和处理复杂数据...
### Hive的自定义函数(UDF)详解 #### 一、引言 在大数据处理领域,Apache Hive 是一个广泛使用的数据仓库工具,它提供了一种SQL-like查询语言——HiveQL,使用户能够轻松地对存储在Hadoop文件系统中的大规模数据...
Hive是大数据处理领域中的一个关键组件,它作为基于Hadoop的数据仓库工具,为企业提供了对大规模数据集的SQL-like查询和分析能力。本教程将深入探讨Hive数仓的架构与设计,Hive SQL的基本语法及高级特性,以及如何...
在Hive 2.0及更高版本中,启用Metastore审计日志是确保数据安全性与合规性的重要步骤。审计日志记录了用户对Hive Metastore的所有操作,包括元数据的创建、修改和查询等,这对于追踪系统活动、故障排查以及满足法规...
### Spark与Hive自定义函数兼容性问题解析 在大数据处理领域,Apache Spark 和 Apache Hive 都是非常重要的工具。Spark 是一种快速通用的大规模数据处理系统,而Hive 则是一种数据仓库工具,主要用于对存储在 ...
基于Hive的搜狗日志分析 本文档主要介绍了基于Hive的搜狗日志分析的整个过程,从数据预处理、构建数据仓库、数据分析到其他数据操作等方面进行了详细的介绍。 一、 数据预处理 数据预处理是整个日志分析的第一步...
05.hive中如何自定义函数--json解析函数示例.mp4
自定义InputFormat是Hive提供的一种机制,允许用户自定义数据加载的方式。使用自定义InputFormat,可以实现特殊的数据加载逻辑,以适应复杂的数据格式。 本文总结了Hive多字节分隔符解决方案的三个方法,包括替换...
综上所述,自定义Hive函数(UDF、UDAF)极大地增强了Hive的功能,使其能处理各种复杂的业务逻辑,适应不同的数据处理需求。开发和使用这些自定义函数时,需要理解Hive的生命周期、执行模型以及如何将Java代码集成到...
Hive 是一个基于 Hadoop 的数据仓库工具,用于处理大规模数据集。它使用类似 SQL 的语言 HiveQL 来查询和管理数据。而自定义用户定义函数(UDF)是 Hive 中的一个重要功能,允许用户根据自己的需求编写自定义函数,...
这篇博文主要探讨了如何在Hive中创建自定义函数以及如何加载它们,这对于深化Hive的使用和解决复杂的数据处理问题至关重要。 首先,我们来看一下创建自定义函数的过程。在Hive中,UDF分为三种类型:UDF(User ...
综上所述,Hive作为一种高效的数据仓库工具,在处理大规模结构化数据方面具有显著优势,尤其是在日志分析、报表生成等领域有着广泛的应用前景。通过对Hive的深入了解和合理应用,可以帮助企业和组织更好地利用其拥有...
Hive 自定义 UDF 编写函数 本文主要讲解了 Hive 中自定义 UDF 函数的编写方法,包括创建 UDF 类、实现自定义函数逻辑、编译和打包 UDF jar 包、上传至 Hive 服务器并注册自定义函数。 一、创建 UDF 类 为了实现...
Hive自定义函数(User Defined Function,UDF)是用户编写并集成到Hive系统中的函数,用来处理Hive不内置支持的特定计算或转换任务。UDF接受单个输入参数并返回一个结果,非常适合进行简单的数据转换和计算。 2. *...
hive-udfhive自定义函数主要实现hive3种自定义函数1,udf函数,主要用于处理一对一数据处理2,udtf函数,主要用于处理一对多数据处理2,udaf函数,主要用与处理多对一数据聚合处理
1. **日志数据提取**:在 Eclipse 中创建一个 MapReduce 工程(HiveHadoopMysqlPro),用于处理日志分析任务。在开始之前,确保将 Hive 的库目录(lib)下的所有 JAR 包复制到 Hadoop 的 lib 目录下,以便 MapReduce...
在Hive中,UDF(User Defined Functions)是用户自定义函数,允许开发人员扩展Hive的内置功能,以满足特定的数据处理需求。Hive UDF的实现通常涉及到编写Java代码,并将其打包成JAR(Java Archive)文件,然后在Hive...
Hadoop/Hive系统通过HDFS存储Web日志数据,并通过Hive处理这些数据,最终实现日志分析的功能。 设计Web日志分析系统时,需要考虑到以下几个核心功能模块: 1. 日志采集模块:负责实时或定时从Web服务器获取日志...
针对传统分布式模型在海量日志并行处理时的可扩展性和并行程序编写困难的问题, 提出了基于Hive的Web海量搜索日志分析机制。利用HQL语言以及Hadoop分布式文件系统(HDFS)和MapReduce编程模式对海量搜索日志进行分析...