- 浏览: 347713 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lvyuan1234:
你好,你那个sample.txt文件可以分享给我吗
hive insert overwrite into -
107x:
不错,谢谢!
hive 表的一些默认值 -
on_way_:
赞
Hadoop相关书籍 -
bupt04406:
dengkanghua 写道出来这个问题该怎么解决?hbase ...
Unexpected state导致HMaster abort -
dengkanghua:
出来这个问题该怎么解决?hbase master启动不起来。
Unexpected state导致HMaster abort
Hadoop.The.Definitive.Guide.2nd.Edition 79页
hadoop默认的压缩算法。
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
结果数据压缩是否开启,下面的配置为true,所以开启。
这个是最终的结果数据:
<property>
<name>hive.exec.compress.output</name>
<value>true</value>
<description> This controls whether the final outputs of a query (to a local/hdfs file or a hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
</property>
mapred.output.compression.codec 这个选项确定压缩算法
这个是中间的结果数据是否压缩,也就是一个sql,生成多道MR,除了最后一道MR的结果数据外,前面的MR的结果数据可以压缩。
<property>
<name>hive.exec.compress.intermediate</name>
<value>true</value>
<description> This controls whether intermediate files produced by hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
</property>
中间结果数据压缩使用的算法
<property>
<name>hive.intermediate.compression.codec</name>
<value>org.apache.hadoop.io.compress.LzoCodec</value>
</property>
默认的文件格式是SequenceFile
<property>
<name>hive.default.fileformat</name>
<value>SequenceFile</value>
<description>Default file format for CREATE TABLE statement. Options are TextFile and SequenceFile. Users can explicitly say CREATE TABLE ... STORED AS <TEXTFILE|SEQUENCEFILE> to override</description>
</property>
HiveConf里面:
COMPRESSRESULT("hive.exec.compress.output", false),
COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""),
hive.exec.compress.output
SemanticAnalyzer:
private Operator genFileSinkPlan(String dest, QB qb, Operator input)
throws SemanticException {
Operator output = putOpInsertMap(
OperatorFactory.getAndMakeChild(
new FileSinkDesc(
queryTmpdir,
table_desc,
conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), //结果数据是否压缩
currentTableId,
rsCtx.isMultiFileSpray(),
rsCtx.getNumFiles(),
rsCtx.getTotalFiles(),
rsCtx.getPartnCols(),
dpCtx),
fsRS, input), inputRR);
}
FileSinkOperator:
private void createEmptyBuckets(Configuration hconf, ArrayList<String> paths)
throws HiveException, IOException {
for (String p: paths) {
Path path = new Path(p);
RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path);//创建RecordWriter
writer.close(false);
LOG.info("created empty bucket for enforcing bucketing at " + path);
}
}
HiveFileFormatUtils:
public static RecordWriter getRecordWriter(JobConf jc,
HiveOutputFormat<?, ?> hiveOutputFormat,
final Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProp, Path outPath) throws IOException, HiveException {
if (hiveOutputFormat != null) {
return hiveOutputFormat.getHiveRecordWriter(jc, outPath, valueClass,
isCompressed, tableProp, null);
}
return null;
}
HiveSequenceFileOutputFormat:
public class HiveSequenceFileOutputFormat extends SequenceFileOutputFormat
implements HiveOutputFormat<WritableComparable, Writable> {
public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
FileSystem fs = finalOutPath.getFileSystem(jc);
final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);
}
}
Utilities:
public static SequenceFile.Writer createSequenceWriter(JobConf jc, FileSystem fs, Path file,
Class<?> keyClass, Class<?> valClass, boolean isCompressed) throws IOException {
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
Class codecClass = null;
if (isCompressed) {
compressionType = SequenceFileOutputFormat.getOutputCompressionType(jc);
codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); //默认的压缩算法是DefaultCodec org.apache.hadoop.io.compress.DefaultCodec
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
}
return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec));
}
FileOutputFormat:
public static Class<? extends CompressionCodec>
getOutputCompressorClass(JobConf conf,
Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
String name = conf.get("mapred.output.compression.codec"); //可以经过这个选项进行配置
if (name != null) {
try {
codecClass =
conf.getClassByName(name).asSubclass(CompressionCodec.class);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Compression codec " + name +
" was not found.", e);
}
}
return codecClass;
}
中间结果数据压缩:
GenMapRedUtils.splitTasks:
public static void splitTasks(Operator<? extends Serializable> op,
Task<? extends Serializable> parentTask,
Task<? extends Serializable> childTask, GenMRProcContext opProcCtx,
boolean setReducer, boolean local, int posn) throws SemanticException {
// Create a file sink operator for this file name
boolean compressIntermediate = parseCtx.getConf().getBoolVar(
HiveConf.ConfVars.COMPRESSINTERMEDIATE);
FileSinkDesc desc = new FileSinkDesc(taskTmpDir, tt_desc,
compressIntermediate);
if (compressIntermediate) {
desc.setCompressCodec(parseCtx.getConf().getVar(
HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
desc.setCompressType(parseCtx.getConf().getVar(
HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
}
Operator<? extends Serializable> fs_op = putOpInsertMap(OperatorFactory
.get(desc, parent.getSchema()), null, parseCtx);
}
hadoop默认的压缩算法。
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
结果数据压缩是否开启,下面的配置为true,所以开启。
这个是最终的结果数据:
<property>
<name>hive.exec.compress.output</name>
<value>true</value>
<description> This controls whether the final outputs of a query (to a local/hdfs file or a hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
</property>
mapred.output.compression.codec 这个选项确定压缩算法
这个是中间的结果数据是否压缩,也就是一个sql,生成多道MR,除了最后一道MR的结果数据外,前面的MR的结果数据可以压缩。
<property>
<name>hive.exec.compress.intermediate</name>
<value>true</value>
<description> This controls whether intermediate files produced by hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
</property>
中间结果数据压缩使用的算法
<property>
<name>hive.intermediate.compression.codec</name>
<value>org.apache.hadoop.io.compress.LzoCodec</value>
</property>
默认的文件格式是SequenceFile
<property>
<name>hive.default.fileformat</name>
<value>SequenceFile</value>
<description>Default file format for CREATE TABLE statement. Options are TextFile and SequenceFile. Users can explicitly say CREATE TABLE ... STORED AS <TEXTFILE|SEQUENCEFILE> to override</description>
</property>
HiveConf里面:
COMPRESSRESULT("hive.exec.compress.output", false),
COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""),
hive.exec.compress.output
SemanticAnalyzer:
private Operator genFileSinkPlan(String dest, QB qb, Operator input)
throws SemanticException {
Operator output = putOpInsertMap(
OperatorFactory.getAndMakeChild(
new FileSinkDesc(
queryTmpdir,
table_desc,
conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), //结果数据是否压缩
currentTableId,
rsCtx.isMultiFileSpray(),
rsCtx.getNumFiles(),
rsCtx.getTotalFiles(),
rsCtx.getPartnCols(),
dpCtx),
fsRS, input), inputRR);
}
FileSinkOperator:
private void createEmptyBuckets(Configuration hconf, ArrayList<String> paths)
throws HiveException, IOException {
for (String p: paths) {
Path path = new Path(p);
RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path);//创建RecordWriter
writer.close(false);
LOG.info("created empty bucket for enforcing bucketing at " + path);
}
}
HiveFileFormatUtils:
public static RecordWriter getRecordWriter(JobConf jc,
HiveOutputFormat<?, ?> hiveOutputFormat,
final Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProp, Path outPath) throws IOException, HiveException {
if (hiveOutputFormat != null) {
return hiveOutputFormat.getHiveRecordWriter(jc, outPath, valueClass,
isCompressed, tableProp, null);
}
return null;
}
HiveSequenceFileOutputFormat:
public class HiveSequenceFileOutputFormat extends SequenceFileOutputFormat
implements HiveOutputFormat<WritableComparable, Writable> {
public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
FileSystem fs = finalOutPath.getFileSystem(jc);
final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);
}
}
Utilities:
public static SequenceFile.Writer createSequenceWriter(JobConf jc, FileSystem fs, Path file,
Class<?> keyClass, Class<?> valClass, boolean isCompressed) throws IOException {
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
Class codecClass = null;
if (isCompressed) {
compressionType = SequenceFileOutputFormat.getOutputCompressionType(jc);
codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); //默认的压缩算法是DefaultCodec org.apache.hadoop.io.compress.DefaultCodec
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
}
return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec));
}
FileOutputFormat:
public static Class<? extends CompressionCodec>
getOutputCompressorClass(JobConf conf,
Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
String name = conf.get("mapred.output.compression.codec"); //可以经过这个选项进行配置
if (name != null) {
try {
codecClass =
conf.getClassByName(name).asSubclass(CompressionCodec.class);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Compression codec " + name +
" was not found.", e);
}
}
return codecClass;
}
中间结果数据压缩:
GenMapRedUtils.splitTasks:
public static void splitTasks(Operator<? extends Serializable> op,
Task<? extends Serializable> parentTask,
Task<? extends Serializable> childTask, GenMRProcContext opProcCtx,
boolean setReducer, boolean local, int posn) throws SemanticException {
// Create a file sink operator for this file name
boolean compressIntermediate = parseCtx.getConf().getBoolVar(
HiveConf.ConfVars.COMPRESSINTERMEDIATE);
FileSinkDesc desc = new FileSinkDesc(taskTmpDir, tt_desc,
compressIntermediate);
if (compressIntermediate) {
desc.setCompressCodec(parseCtx.getConf().getVar(
HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
desc.setCompressType(parseCtx.getConf().getVar(
HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
}
Operator<? extends Serializable> fs_op = putOpInsertMap(OperatorFactory
.get(desc, parent.getSchema()), null, parseCtx);
}
发表评论
-
hadoop
2017-08-01 13:42 0audit log配置 http://hack ... -
hbase jmx
2013-12-11 20:42 2940conf/hbase-env.sh 里面配了 JMX后就可 ... -
Too many fetch failures
2013-10-29 10:42 1427http://lucene.472066.n3.na ... -
cdh3集群 distcp 数据到 cdh4集群
2013-09-26 21:54 1106从cdh3集群 distcp 数据到 cdh4集群上面 ... -
hive rename table name
2013-09-18 14:28 2593hive rename tablename hive re ... -
cdh4 vs cdh3 client处理DataNode异常的不同
2013-09-13 21:13 2209cdh4在处理pipeline中的错误时,逻辑上与原先不一 ... -
hive的distribute by如何partition long型的数据
2013-08-20 10:15 2470有用户问:hive的distribute by分桶是怎么分 ... -
hdfs 升级,cdh3 升级 cdh4
2013-08-05 18:09 2195Step 1: 做下saveNamespace操作,停掉集 ... -
hive like vs rlike vs regexp
2013-04-11 18:53 11207like vs rlike vs regexp r ... -
HDFS HBase NIO相关知识
2012-09-26 18:29 2654HDFS的NIO有一些相关的知识偶尔需要注意下: (1) 使 ... -
java.net.SocketTimeoutException: 480000 millis timeout hdfs
2012-08-13 16:45 8177hdfs集群出现SocketTimeoutException, ... -
HBase如何从Hadoop读取数据,DFSInputStream
2012-08-08 15:41 3339HDFS Client的读取流是从DFSInputStream ... -
DFSClient Packet dfs.write.packet.size
2012-07-30 20:01 1621HBase 里面调用DFSOutputStream的方法常用的 ... -
hbase、hadoop checksum相关
2012-07-25 21:16 1961support checksums in HBase bloc ... -
hive sql where条件很简单,但是太多
2012-07-18 15:51 8733insert overwrite table aaaa ... -
DFSClient 写一个Block的过程
2012-07-12 21:39 1237DFSClient 写一个Block的过程 ... -
insert into时(string->bigint)自动类型转换
2012-06-14 12:30 8277原表src: hive> desc src; ... -
通过复合结构来优化udf的调用
2012-05-11 14:07 1206select split("accba&quo ... -
cdh3u0的jetty导致Error Reading IndexFile
2012-04-13 20:21 2306在36个机器上面跑一个大作业,8千多个map,2w多个r ... -
RegexSerDe
2012-03-14 09:58 1544官方示例在: https://cwiki.apache.or ...
相关推荐
Hive 优化方法整理是 Hive 数据处理过程中的重要步骤,涉及到 Hive 的类 SQL 语句本身进行调优、参数调优、Hadoop 的 HDFS 参数调优和 Map/Reduce 调优等多个方面。 Hive 类 SQL 语句优化 1. 尽量尽早地过滤数据...
可以开启Hive的中间数据和最终数据压缩,使用`hive.exec.compress.output=true`和`hive.exec.compress.intermediate=true`,并选择合适的压缩编码器如LZO、GZIP或Snappy。 2. **处理数据倾斜**:数据倾斜发生在某些...
该参数决定了HDFS路径,用于存储不同map/reduce阶段的执行计划和这些阶段的中间输出结果,默认值为/tmp/<user.name>/hive。 8. hive.metastore.warehouse.dir 该参数决定了Hive默认的数据文件存储路径,通常为HDFS...
- 除了设置 `hive.map.aggr`,还可以调整其他参数如 `hive.exec.parallel` 和 `hive.exec.compress.intermediate` 来并行执行任务和压缩中间结果。 13. **Hive 增加列**: - 可以使用 ALTER TABLE 命令向已有的表...
这种操作对于小数据集来说效率较高,但是当涉及到大数据量时,它的性能会迅速下降,因为它需要处理大量的中间数据,并且在Map和Reduce阶段都要进行数据的全量扫描。 2. Map Join(映射Join) Map Join是针对大数据...
- 多job间共享中间结果集的技巧; - 执行计划的深入分析; - 几种Join方式(ReduceJoin、MapJoin、SMBJoin)的工作原理与适用场景; - PredicatePushdown(PPD)的作用与实现; - 数据倾斜现象的诊断与解决策略...
此外,启用压缩临时文件`mapred.compress.map.output=true`可以进一步减少中间数据的磁盘占用和网络传输时间,从而加速整体流程。 综上所述,Hive优化是一个综合性的过程,涉及查询设计、数据存储和压缩等多个方面...
这包括了控制Map任务的数量、合理设置Reduce任务的数量、压缩中间数据以及减少磁盘IO。配置参数调优的目的是提高数据处理的效率和速度,降低不必要的资源消耗。 2. 自定义模块:调优过程中,用户可能需要根据自身...
除了基本的数据存储和查询功能外,Hive还提供了丰富的数据管理功能,包括但不限于表的创建、修改、删除,以及数据的导入导出等操作。 #### 二、Hive基本操作 **2.1 createtable** - **总述**:`CREATE TABLE`命令...
- 通过压缩中间数据(`mapred.compress.map.output`)减少磁盘I/O操作和网络传输数据量。 - 配置合适的Shuffle参数,如`io.sort.mb`、`io.sort.spill.percent`等,以提高Shuffle过程的效率。 ### 知识点三:Hive与...
- `<property><name>hive.exec.scratchdir</name><value>/tmp/hive</value></property>`: 用于存储执行过程中的中间结果。 - 其他配置项,如元数据存储、连接数据库的相关参数等,确保配置正确无误。 **二、Hive...
- `<name>hive.exec.local.scratchdir</name>`:本地临时目录,用于存放中间计算结果。 - `<name>hive.exec.scratchdir</name>`:分布式临时目录,用于跨节点的临时文件。 - `<name>hive.server2.logging....
在传统的模型中,数据压缩通常在Reduce阶段进行,但作者提出了一种新的思路,即在Map阶段即刻进行数据压缩,以此来减少后续步骤中的数据量,进而提升整个查询和处理过程的效率。 最后,文章中提到的时延性相关公式...
` 开启中间结果数据压缩,减少数据传输时间。 #### 三、总结 通过上述对Hive脚本任务参数的优化配置,我们可以显著提高Hive查询的执行效率。然而,需要注意的是,在实际应用中还需根据具体场景灵活调整这些参数。...
- **含义**:定义了HDFS路径,用于存储不同map/reduce阶段的执行计划和这些阶段的中间输出结果。 - **默认值**:`/tmp/<user.name>/hive` - **建议设置**:确保有足够的空间,并且访问权限正确。 8. **hive....
- **Map 阶段**:负责对输入数据进行处理并产生中间结果。每个 Map 任务只处理输入数据的一部分。 - **Shuffle 阶段**:将所有 Map 任务产生的中间结果按键排序后发送到 Reduce 任务。 - **Reduce 阶段**:负责对...