`
乡里伢崽
  • 浏览: 112921 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

hive自定义InputFormat

    博客分类:
  • hive
 
阅读更多
自定义分隔符
package com.lwz.inputf;

import java.io.IOException;   
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;   
import org.apache.hadoop.mapred.InputSplit;   
import org.apache.hadoop.mapred.JobConf;   
import org.apache.hadoop.mapred.JobConfigurable;   
import org.apache.hadoop.mapred.RecordReader;   
import org.apache.hadoop.mapred.Reporter;   
import org.apache.hadoop.mapred.TextInputFormat; 
import org.apache.hadoop.util.LineReader;
/** 
* 自定义hadoop的 org.apache.hadoop.mapred.InputFormat 
*  
* @author winston 
*   hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理,最常见的FormatInput就是TextInputFormat
*/ 
public class ClickstreamInputFormat extends TextInputFormat implements   
        JobConfigurable {   
   
    public RecordReader<LongWritable, Text> getRecordReader(
            InputSplit genericSplit, JobConf job, Reporter reporter)   
            throws IOException {  
    //-----genericSplit----hdfs://vmtmstorm01:8020/user/hive/warehouse/hive_test/test:0+13
    System.out.println("-----genericSplit----"+genericSplit.toString());
        reporter.setStatus(genericSplit.toString());   
        /*---job----Configuration:
        core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
        yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
        org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@351d4566,
        file:/etc/hive/conf.cloudera.hive/hive-site.xml
        */
        System.out.println("---job----"+job);
        System.out.println("----reporter----"+reporter);
        return new ClickstreamRecordReader((FileSplit) genericSplit,job);   
    }   
   
   
    public class ClickstreamRecordReader implements 
    RecordReader<LongWritable, Text> { 


private CompressionCodecFactory compressionCodecs = null; 
private long start; 
private long pos; 
private long end; 
private LineReader lineReader; 
int maxLineLength; 
/*FileSplit是一個輸入文件
LineReader 一个类,提供一个输入流行的读者。根据所使用的构造函数,线将被终止:
   下列之一:'\n'(LF),‘R’(CR),或“\r\n”(CR + LF)。
  或者,一个自定义的字节序列分隔符在这两种情况下,EOF也终止否则无端接线。
*/
public ClickstreamRecordReader(FileSplit inputSplit, Configuration job) 
        throws IOException { 
    maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength", 
            Integer.MAX_VALUE); 
        start = inputSplit.getStart(); 
    System.out.println("---start---"+start);
    System.out.println("----inputSplit.getLength----"+inputSplit.getLength());
    end = start + inputSplit.getLength(); 
    //获取文件的路径
    final Path file = inputSplit.getPath(); 
    System.out.println("---filepath---"+file);
    //一个工厂,将找到一个给定的文件名正确编解码。
    compressionCodecs = new CompressionCodecFactory(job); 
    // 发现对于给定的基于其文件名后缀的文件相关的压缩编解码器。
    final CompressionCodec codec = compressionCodecs.getCodec(file); 

    // Open file and seek to the start of the split 
    //获取该文件数据那个文件系统
    FileSystem fs = file.getFileSystem(job); 
    System.out.println("---FileSystem----"+fs.toString()+"----fs-----"+fs);
    //在指定的path下打开一个FSDataInputStream流
    FSDataInputStream fileIn = fs.open(file); 
    boolean skipFirstLine = false; 
    if (codec != null) { 
    //如果解压器不为空时创建一个LineReader实例
        lineReader = new LineReader(codec.createInputStream(fileIn), job); 
        end = Long.MAX_VALUE; 
    } else { 
        if (start != 0) { 
            skipFirstLine = true; 
            --start; 
            //寻找到了文件的起始偏移
            fileIn.seek(start); 
        } 
        //创建一个LineReader实例
        lineReader = new LineReader(fileIn, job); 
    } 
    if (skipFirstLine) { 
        start += lineReader.readLine(new Text(), 0, 
                (int) Math.min((long) Integer.MAX_VALUE, end - start)); 
    } 
    this.pos = start; 


将代码导出,传到主机,然后通过add jar /app/home/*.jar;添加jar包,此中法添加的都是临时性的,只在当前会话生效

建表
Create table hive_test(num int,name string,jj string) stored as INPUTFORMAT 'com.lwz.inputf.ClickstreamInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';



分享到:
评论

相关推荐

    hive inputformat

    在Hive中使用自定义InputFormat,需要在HQL查询中指定`ROW FORMAT SERDE`和`INPUTFORMAT`,例如: ```sql CREATE TABLE logs ( field1 STRING, field2 STRING, ... ) ROW FORMAT SERDE 'org.apache.hadoop.hive...

    Hive多字节分隔符解决方案.docx

    本文将介绍Hive多字节分隔符问题的解决方案,包括替换分隔符、RegexSerDe正则加载和自定义InputFormat三种方法。 应用场景 在实际工作中,我们遇到的数据往往不是非常规范化的数据,例如,我们会遇到以下两种情况...

    SequenceFileKeyValueInputFormat:自定义 Hadoop InputFormat

    Apache Hive 的 InputFormat,在查询 SequenceFiles 时将返回 (Text) 键和 (Text) 值。 我需要在不拆分内容的情况下完整解析大量文本文件。 HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将...

    hive数据表-小文件合并代码(java)

    1. **编写自定义InputFormat**:你需要创建一个继承自Hive的`org.apache.hadoop.hive.ql.io.HiveInputFormat`的类。在这个类中,重写`getSplits()`方法,该方法用于决定如何将输入分区为多个工作单元(split)。在...

    Hive 对 Protobuf 序列化文件读取.zip

    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.protobuf.ProtobufInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.protobuf.ProtobufOutputFormat'; ``` 其中,`protobuf.schema`属性指定了....

    Hive及Hadoop作业调优

    3. 编程技巧:通过编程技巧的提升,例如选择合适的InputFormat来处理数据,可以有效提升Hive和MapReduce作业的效率。例如,使用CombineFileInputFormat而不是旧的MultiFileInputFormat(已废弃),可以更高效地处理...

    hive源码分析

    - **contrib**:扩展功能,如自定义SerDe和UDF。 #### 主要流程 **顶级流程**介绍了Hive Interfaces部分使用Hive Query Engine的主要流程。 1. **命令行参数处理**:处理`-hiveconf x=y`类型的命令行参数,这类...

    写好Hive_程序的五个提示

    **背景**:在Hive中,使用自定义函数(UDF)相比于使用Transform可以带来更高的性能和更简单的代码。 **解决方案**:尽可能使用UDF来完成复杂的数据处理任务,而避免使用Transform。UDF可以更高效地利用Hive的执行...

    Hive编程指南

    4. **UDF/UDAF/UDTF**:Hive支持用户自定义函数(UDF)、聚合函数(UDAF)以及表生成函数(UDTF),可以扩展Hive的功能。 ### 性能优化 1. **小文件合并**:Hive可以通过调整参数来减少小文件的数量,从而提高查询性能。...

    hadoop项目--网站流量日志分析--5.docx

    通过自定义InputFormat和OutputFormat,Sqoop能够适应不同的数据源和目标格式。例如,你可以使用Sqoop将MySQL、Oracle等传统数据库中的数据导入到HDFS或Hive,反之亦然,将Hadoop中的数据导出回关系数据库。 在数据...

    hadoop 开发者入门专刊 1-4

    6. Hadoop数据输入与输出:学习如何使用InputFormat和OutputFormat进行数据读取和写入,以及自定义InputFormat和OutputFormat的方法。 7. Hadoop作业提交与监控:了解如何使用Hadoop命令行工具提交作业,以及如何...

    高级软件人才培训专家-Hadoop课程资料-5-第五章 - 分布式SQL计算 Hive 语法与概念

    delimiter']] [STORED AS INPUTFORMAT 'inputformat_classname' OUTPUTFORMAT 'outputformat_classname'] [LOCATION 'hdfs_path'] [TBLPROPERTIES (tbl_property_name=tbl_property_value, ...)];` - 示例:`CREATE...

    Hadoop高级编程- 构建与实现大数据解决方案

    6. **数据输入和输出格式**:学习自定义InputFormat和OutputFormat,以处理非标准格式的数据,如CSV、JSON或其他定制格式。 7. **错误处理和容错机制**:理解Hadoop的检查点、故障检测和恢复策略,以及如何在代码中...

    Hadoop源码分析

    5. **实战应用**:通过源码分析,开发者可以学习如何定制Hadoop,如调整配置参数以优化性能,开发自定义InputFormat、OutputFormat、Partitioner等,满足特定的数据处理需求。 6. **性能优化**:源码分析有助于识别...

    《实战Hadoop--开启通向云计算的捷径》源码

    3. **MapReduce编程**:讲解如何编写Map和Reduce函数,包括键值对的处理、分区、排序和归约过程,以及自定义InputFormat和OutputFormat。 4. **Hadoop生态组件**:如HBase(分布式数据库)、Hive(数据仓库工具)、...

    大数据应用技术介绍.pptx

    MapReduce还提供了扩展接口,允许用户自定义InputFormat、Mapper、Partitioner、Reducer和OutputFormat,以适应不同的数据处理需求。 在Hadoop生态系统中,还有许多其他工具和框架。例如,Pig是一种高级查询语言,...

    hadoop-3.1.2-src.tar.gz

    2. 功能扩展:Hadoop提供了丰富的API,允许开发人员根据需求扩展其功能,如自定义InputFormat、OutputFormat、Partitioner等。源码中包含了大量的示例,可以帮助我们更好地理解和使用这些接口。 四、Hadoop在实际...

    hadoop权威指南第二版源代码

    7. **Hadoop的扩展性**:源代码中会有关于如何自定义InputFormat、OutputFormat、RecordReader、RecordWriter等内容,这让你能够根据需求扩展Hadoop以适应各种数据格式和处理需求。 8. **配置与优化**:书中源码还...

Global site tag (gtag.js) - Google Analytics