package com.tool;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.xx.CommonMapper;
import com.xx.hadoop.rcfile.RCFileOutputFormat;
/**
* CommonRcFileCompression.
* 將普通的Text文件壓縮成RcFile格式的文件,節約存儲空間.
* ${srcpath} 要壓縮的文件.
* ${respath} 存放位置.
* ${reducenum} reduce數.
* ${sep} 分隔符.
* ${colsum} 按sep分隔后一共幾列.
*
* @author zhangk
*
*/
public class CommonRcFileCompression extends Configured implements Tool, DataStatic {
private static final String SRCPATH = "srcpath";
private static final String RESPATH = "respath";
private static final String SEP = "sep"; //分隔符
private static final String COLSUM = "colsum"; //共幾列
public static void main(String[] args) throws Exception {
int exitcode = ToolRunner.run(new CommonRcFileCompression(), args);
System.exit(exitcode);
}
@Override
public int run(String[] args) throws Exception {
int code = 0;
Options options = new Options();
options.addOption(DATE, true, DATE);
options.addOption(SRCPATH, true, SRCPATH);
options.addOption(RESPATH, true, RESPATH);
options.addOption(SEP, true, "eg: 001");
options.addOption(COLSUM, true, "eg:column sum");
options.addOption(REDUCENUM, true, "eg:100");
CommandLineParser parser = new GnuParser();
HelpFormatter helper = new HelpFormatter();
CommandLine line = null;
try {
line = parser.parse(options, args);
if (!(line.hasOption(DATE)
&& line.hasOption(SRCPATH)
&& line.hasOption(RESPATH)
&& line.hasOption(SEP)
&& line.hasOption(COLSUM)
&& line.hasOption(REDUCENUM))
|| "".equals(line.getOptionValue(DATE))
|| "".equals(line.getOptionValue(SRCPATH))
|| "".equals(line.getOptionValue(RESPATH))
|| "".equals(line.getOptionValue(SEP))
|| "".equals(line.getOptionValue(COLSUM))
|| "".equals(line.getOptionValue(REDUCENUM))) {
helper.printHelp("DisttibuteOrder", options);
return 1;
}
} catch (Exception e) {
helper.printHelp("DisttibuteOrder", options);
e.printStackTrace();
}
String srcpath = line.getOptionValue(SRCPATH);
String sep = line.getOptionValue(SEP);
String colsum = line.getOptionValue(COLSUM);
String respath = line.getOptionValue(RESPATH);
String reducenum = line.getOptionValue(REDUCENUM);
if (code == 0) {
Job job = new Job();
Configuration conf = job.getConfiguration();
RCFileOutputFormat.setColumnNumber(conf, Tools.stringToInteger(colsum, 0));
job.setJarByClass(CommonRcFileCompression.class);
FileInputFormat.setInputPaths(job, new Path(srcpath));
RCFileOutputFormat.setOutputPath(job, new Path(respath));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(RCFileOutputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(BytesRefArrayWritable.class);
job.setNumReduceTasks(Integer.parseInt(reducenum));
job.setMapperClass(CommonMapper.class);
conf.set("sep", sep);
conf.set("colsum", colsum);
conf.setBoolean("mapred.output.compress", true);
conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
code = (job.waitForCompletion(true)) ? 0 : 1;
}
return code;
}
}
package xxx.mapper;
import java.io.IOException;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.tool.DataStatic;
import com.tool.Tools;
/**
* map讀入.
* @author zhangk
*
*/
public class CommonMapper extends Mapper<LongWritable, Text, LongWritable, BytesRefArrayWritable> implements DataStatic {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String sep = context.getConfiguration().get("sep");
Integer colsum = Tools.stringToInteger(context.getConfiguration().get("colsum"), 0);
String line = value.toString();
if (!line.equals("")) {
String[] lines = line.split(sep, NEGATIVE_ONE);
if (lines.length >= colsum) {
byte[][] record = makeByte(lines, colsum);
BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length);
for (int i = 0; i < record.length; i++) {
BytesRefWritable cu = new BytesRefWritable(record[i], 0, record[i].length);
bytes.set(i, cu);
}
context.write(key, bytes);
}
}
}
/**
* 将行数据转换到byte数组中.
*
* @param lines
* @return
*/
public static byte[][] makeByte(String[] lines, Integer colsum) {
byte[][] record = new byte[colsum][colsum];
for (int i = 0; i < colsum; i++) {
record[i] = lines[i].getBytes();
}
return record;
}
}
将工程打包命名:commonPress.jar
调用例子:
hadoop jar commonPress.jar -date=20120504 -srcpath=/xxx/xx/xx/2012/05/04/ -respath=/xxx/xx/xx/2012/05/04_1 -colsum=4 -sep=\\t -reducenum=40
参数:
日期
输入路径
输出路径
sep是分隔符
colsum是按照sep分隔后一共几列
reducenum是reduce的数量
分享到:
相关推荐
将 CSV 压缩为 RCFile 这个简单的项目以输入 CSV 数据为例,演示了如何使用 Apache Crunch 写出 RCFile 文件。 使用以下命令运行作业: hadoop jar crunchcsvtorcfile-0.0.1-SNAPSHOT-job.jar ...
mapreduce生成RCFile的jar包
### Facebook数据仓库揭秘之RCFile高效存储结构 #### RCFile高效存储结构概述 Facebook的数据仓库在处理海量数据方面面临着巨大挑战。为了优化数据处理效率,Facebook引入了一种名为RCFile(Record Columnar File...
- 最基础的文本格式,易于理解和调试,但不支持块级别的压缩,读取成本较高。 4. **Sequence File** - Hadoop中的二进制格式,支持键值对存储,可进行record和block级别的压缩,常作为中间数据格式。 5. **Avro ...
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
jar包,官方版本,自测可用
本文介绍了Facebook公司数据分析系统中的RCFile存储结构,该结构集行存储和列存储的优点于一身,在MapReduce环境下的大规模数据分析中扮演重要角色。Facebook曾在2010ICDE...
- RCFile将数据划分成多个小块,每个块都可以独立处理。 - 这种方式有利于并行处理,同时也方便进行增量更新或删除操作。 #### RCFile在Facebook的应用 RCFile已经在Facebook的数据仓库Hive中得到了广泛应用。...
jar包,官方版本,自测可用