- 浏览: 112921 次
- 性别:
- 来自: 深圳
-
文章分类
最新评论
-
土豆蛋儿:
我想读取一个外部文件,以什么方式好了? 文件内容经常编辑
flume 自定义source -
土豆蛋儿:
大神,您好。
flume 自定义source
自定义分隔符
package com.lwz.inputf;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.LineReader;
/**
* 自定义hadoop的 org.apache.hadoop.mapred.InputFormat
*
* @author winston
* hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理,最常见的FormatInput就是TextInputFormat
*/
public class ClickstreamInputFormat extends TextInputFormat implements
JobConfigurable {
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
//-----genericSplit----hdfs://vmtmstorm01:8020/user/hive/warehouse/hive_test/test:0+13
System.out.println("-----genericSplit----"+genericSplit.toString());
reporter.setStatus(genericSplit.toString());
/*---job----Configuration:
core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@351d4566,
file:/etc/hive/conf.cloudera.hive/hive-site.xml
*/
System.out.println("---job----"+job);
System.out.println("----reporter----"+reporter);
return new ClickstreamRecordReader((FileSplit) genericSplit,job);
}
public class ClickstreamRecordReader implements
RecordReader<LongWritable, Text> {
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader lineReader;
int maxLineLength;
/*FileSplit是一個輸入文件
LineReader 一个类,提供一个输入流行的读者。根据所使用的构造函数,线将被终止:
下列之一:'\n'(LF),‘R’(CR),或“\r\n”(CR + LF)。
或者,一个自定义的字节序列分隔符在这两种情况下,EOF也终止否则无端接线。
*/
public ClickstreamRecordReader(FileSplit inputSplit, Configuration job)
throws IOException {
maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength",
Integer.MAX_VALUE);
start = inputSplit.getStart();
System.out.println("---start---"+start);
System.out.println("----inputSplit.getLength----"+inputSplit.getLength());
end = start + inputSplit.getLength();
//获取文件的路径
final Path file = inputSplit.getPath();
System.out.println("---filepath---"+file);
//一个工厂,将找到一个给定的文件名正确编解码。
compressionCodecs = new CompressionCodecFactory(job);
// 发现对于给定的基于其文件名后缀的文件相关的压缩编解码器。
final CompressionCodec codec = compressionCodecs.getCodec(file);
// Open file and seek to the start of the split
//获取该文件数据那个文件系统
FileSystem fs = file.getFileSystem(job);
System.out.println("---FileSystem----"+fs.toString()+"----fs-----"+fs);
//在指定的path下打开一个FSDataInputStream流
FSDataInputStream fileIn = fs.open(file);
boolean skipFirstLine = false;
if (codec != null) {
//如果解压器不为空时创建一个LineReader实例
lineReader = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
//寻找到了文件的起始偏移
fileIn.seek(start);
}
//创建一个LineReader实例
lineReader = new LineReader(fileIn, job);
}
if (skipFirstLine) {
start += lineReader.readLine(new Text(), 0,
(int) Math.min((long) Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
将代码导出,传到主机,然后通过add jar /app/home/*.jar;添加jar包,此中法添加的都是临时性的,只在当前会话生效
建表
Create table hive_test(num int,name string,jj string) stored as INPUTFORMAT 'com.lwz.inputf.ClickstreamInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
package com.lwz.inputf;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.LineReader;
/**
* 自定义hadoop的 org.apache.hadoop.mapred.InputFormat
*
* @author winston
* hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理,最常见的FormatInput就是TextInputFormat
*/
public class ClickstreamInputFormat extends TextInputFormat implements
JobConfigurable {
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
//-----genericSplit----hdfs://vmtmstorm01:8020/user/hive/warehouse/hive_test/test:0+13
System.out.println("-----genericSplit----"+genericSplit.toString());
reporter.setStatus(genericSplit.toString());
/*---job----Configuration:
core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@351d4566,
file:/etc/hive/conf.cloudera.hive/hive-site.xml
*/
System.out.println("---job----"+job);
System.out.println("----reporter----"+reporter);
return new ClickstreamRecordReader((FileSplit) genericSplit,job);
}
public class ClickstreamRecordReader implements
RecordReader<LongWritable, Text> {
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader lineReader;
int maxLineLength;
/*FileSplit是一個輸入文件
LineReader 一个类,提供一个输入流行的读者。根据所使用的构造函数,线将被终止:
下列之一:'\n'(LF),‘R’(CR),或“\r\n”(CR + LF)。
或者,一个自定义的字节序列分隔符在这两种情况下,EOF也终止否则无端接线。
*/
public ClickstreamRecordReader(FileSplit inputSplit, Configuration job)
throws IOException {
maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength",
Integer.MAX_VALUE);
start = inputSplit.getStart();
System.out.println("---start---"+start);
System.out.println("----inputSplit.getLength----"+inputSplit.getLength());
end = start + inputSplit.getLength();
//获取文件的路径
final Path file = inputSplit.getPath();
System.out.println("---filepath---"+file);
//一个工厂,将找到一个给定的文件名正确编解码。
compressionCodecs = new CompressionCodecFactory(job);
// 发现对于给定的基于其文件名后缀的文件相关的压缩编解码器。
final CompressionCodec codec = compressionCodecs.getCodec(file);
// Open file and seek to the start of the split
//获取该文件数据那个文件系统
FileSystem fs = file.getFileSystem(job);
System.out.println("---FileSystem----"+fs.toString()+"----fs-----"+fs);
//在指定的path下打开一个FSDataInputStream流
FSDataInputStream fileIn = fs.open(file);
boolean skipFirstLine = false;
if (codec != null) {
//如果解压器不为空时创建一个LineReader实例
lineReader = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
//寻找到了文件的起始偏移
fileIn.seek(start);
}
//创建一个LineReader实例
lineReader = new LineReader(fileIn, job);
}
if (skipFirstLine) {
start += lineReader.readLine(new Text(), 0,
(int) Math.min((long) Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
将代码导出,传到主机,然后通过add jar /app/home/*.jar;添加jar包,此中法添加的都是临时性的,只在当前会话生效
建表
Create table hive_test(num int,name string,jj string) stored as INPUTFORMAT 'com.lwz.inputf.ClickstreamInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
发表评论
-
hive + hbase
2015-01-04 10:42 783环境配置: hadoop-2.0.0-cdh4.3.0 (4 ... -
hive 数据倾斜
2014-08-27 09:03 697链接:http://www.alidata.org/archi ... -
hive 分通总结
2014-08-27 08:42 609总结分析: 1. 定义了桶,但要生成桶的数据,只能是由其他表 ... -
深入了解Hive Index具体实现
2014-08-25 08:51 750索引是标准的数据库技术,hive 0.7版本之后支持索引。hi ... -
explain hive index
2014-08-24 16:44 1160设置索引: 使用聚合索引优化groupby操作 hive> ... -
Hive 中内部表与外部表的区别与创建方法
2014-08-15 17:11 771分类: Hive 2013-12-07 11:56 ... -
hive map和reduce的控制
2014-08-15 16:14 632一、 控制hive任务中的map数: 1. 通 ... -
hive 压缩策略
2014-08-15 15:16 1776Hive使用的是Hadoop的文件 ... -
hive 在mysql中创建备用数据库
2014-08-15 09:21 895修改hive-site.xml <property> ... -
HIVE 窗口及分析函数
2014-08-11 16:21 1199HIVE 窗口及分析函数 使 ... -
hive 内置函数
2014-08-11 09:06 30821.sort_array(): sort_array(arra ... -
hive lateral view
2014-08-09 14:59 2038通过Lateral view可以方便的将UDTF得到的行转列的 ... -
hive数据的导出
2014-07-28 21:53 455在本博客的《Hive几种数据导入方式》文章中,谈到了Hive中 ... -
hive udaf
2014-07-25 16:11 769package com.lwz.udaf; import o ... -
HiveServer2连接ZooKeeper出现Too many connections问题的解决
2014-07-24 08:49 1796HiveServer2连接ZooKeeper出现Too man ... -
hive 常用命令
2014-07-17 22:22 7161.hive通过外部设置参数传入脚本中: hiv ... -
CouderaHadoop中hive的Hook扩展
2014-07-16 21:18 3383最近在做关于CDH4.3.0的hive封装,其中遇到了很多问题 ... -
利用SemanticAnalyzerHook回过滤不加分区条件的Hive查询
2014-07-16 16:43 1477我们Hadoop集群中将近百分之80的作业是通过Hive来提交 ... -
hive 的常用命令
2014-07-16 10:07 0设置、查看hive当前的角色: set sys ... -
hive 授权
2014-07-15 10:51 942Hive授权(Security配置) 博客分类: Hive分 ...
相关推荐
在Hive中使用自定义InputFormat,需要在HQL查询中指定`ROW FORMAT SERDE`和`INPUTFORMAT`,例如: ```sql CREATE TABLE logs ( field1 STRING, field2 STRING, ... ) ROW FORMAT SERDE 'org.apache.hadoop.hive...
本文将介绍Hive多字节分隔符问题的解决方案,包括替换分隔符、RegexSerDe正则加载和自定义InputFormat三种方法。 应用场景 在实际工作中,我们遇到的数据往往不是非常规范化的数据,例如,我们会遇到以下两种情况...
Apache Hive 的 InputFormat,在查询 SequenceFiles 时将返回 (Text) 键和 (Text) 值。 我需要在不拆分内容的情况下完整解析大量文本文件。 HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将...
1. **编写自定义InputFormat**:你需要创建一个继承自Hive的`org.apache.hadoop.hive.ql.io.HiveInputFormat`的类。在这个类中,重写`getSplits()`方法,该方法用于决定如何将输入分区为多个工作单元(split)。在...
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.protobuf.ProtobufInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.protobuf.ProtobufOutputFormat'; ``` 其中,`protobuf.schema`属性指定了....
3. 编程技巧:通过编程技巧的提升,例如选择合适的InputFormat来处理数据,可以有效提升Hive和MapReduce作业的效率。例如,使用CombineFileInputFormat而不是旧的MultiFileInputFormat(已废弃),可以更高效地处理...
- **contrib**:扩展功能,如自定义SerDe和UDF。 #### 主要流程 **顶级流程**介绍了Hive Interfaces部分使用Hive Query Engine的主要流程。 1. **命令行参数处理**:处理`-hiveconf x=y`类型的命令行参数,这类...
**背景**:在Hive中,使用自定义函数(UDF)相比于使用Transform可以带来更高的性能和更简单的代码。 **解决方案**:尽可能使用UDF来完成复杂的数据处理任务,而避免使用Transform。UDF可以更高效地利用Hive的执行...
4. **UDF/UDAF/UDTF**:Hive支持用户自定义函数(UDF)、聚合函数(UDAF)以及表生成函数(UDTF),可以扩展Hive的功能。 ### 性能优化 1. **小文件合并**:Hive可以通过调整参数来减少小文件的数量,从而提高查询性能。...
通过自定义InputFormat和OutputFormat,Sqoop能够适应不同的数据源和目标格式。例如,你可以使用Sqoop将MySQL、Oracle等传统数据库中的数据导入到HDFS或Hive,反之亦然,将Hadoop中的数据导出回关系数据库。 在数据...
6. Hadoop数据输入与输出:学习如何使用InputFormat和OutputFormat进行数据读取和写入,以及自定义InputFormat和OutputFormat的方法。 7. Hadoop作业提交与监控:了解如何使用Hadoop命令行工具提交作业,以及如何...
delimiter']] [STORED AS INPUTFORMAT 'inputformat_classname' OUTPUTFORMAT 'outputformat_classname'] [LOCATION 'hdfs_path'] [TBLPROPERTIES (tbl_property_name=tbl_property_value, ...)];` - 示例:`CREATE...
6. **数据输入和输出格式**:学习自定义InputFormat和OutputFormat,以处理非标准格式的数据,如CSV、JSON或其他定制格式。 7. **错误处理和容错机制**:理解Hadoop的检查点、故障检测和恢复策略,以及如何在代码中...
5. **实战应用**:通过源码分析,开发者可以学习如何定制Hadoop,如调整配置参数以优化性能,开发自定义InputFormat、OutputFormat、Partitioner等,满足特定的数据处理需求。 6. **性能优化**:源码分析有助于识别...
3. **MapReduce编程**:讲解如何编写Map和Reduce函数,包括键值对的处理、分区、排序和归约过程,以及自定义InputFormat和OutputFormat。 4. **Hadoop生态组件**:如HBase(分布式数据库)、Hive(数据仓库工具)、...
MapReduce还提供了扩展接口,允许用户自定义InputFormat、Mapper、Partitioner、Reducer和OutputFormat,以适应不同的数据处理需求。 在Hadoop生态系统中,还有许多其他工具和框架。例如,Pig是一种高级查询语言,...
2. 功能扩展:Hadoop提供了丰富的API,允许开发人员根据需求扩展其功能,如自定义InputFormat、OutputFormat、Partitioner等。源码中包含了大量的示例,可以帮助我们更好地理解和使用这些接口。 四、Hadoop在实际...
7. **Hadoop的扩展性**:源代码中会有关于如何自定义InputFormat、OutputFormat、RecordReader、RecordWriter等内容,这让你能够根据需求扩展Hadoop以适应各种数据格式和处理需求。 8. **配置与优化**:书中源码还...