`

hive spark conf

 
阅读更多
CREATE TABLE org_userbehavior_all_yunzhi
(
user_id Int
,event_time bigint
,behivior_id SMALLINT
,behivior_name String
,behivior_pop  String
,record_date  String
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
location '/user/hive/warehouse/org_userbehavior_all_yunzhi';
LOAD DATA INPATH '/sparklib/data' OVERWRITE INTO TABLE org_userbehavior_all_yunzhi;



CREATE TABLE org_userbehavior_all_yunzhi_parquet
(
user_id Int
,event_time Int
,behivior_pop  String
)
partitioned by (record_date String,behivior_id SMALLINT )
STORED AS PARQUET
location '/user/hive/warehouse/org_userbehavior_all_yunzhi_parquet';

INSERT OVERWRITE TABLE org_userbehavior_funnel_dd PARTITION(record_date,behivior_id)  SELECT user_id, cast (event_time/1000 as  Int) as record_date   ,behivior_pop,record_date,behivior_id from org_userbehavior_all;
-- 4538072102  /user/hive/warehouse/org_userbehavior_funnel_dd
-- hadoop fs -ls -R  /user/hive/warehouse/org_userbehavior_funnel_dd | wc -l     3599

CREATE TABLE org_userbehavior_all_yunzhi_parquet
(
user_id Int
,event_time Int
,behivior_pop  String
,behivior_id SMALLINT
)
partitioned by (record_date String )
STORED AS PARQUET
location '/user/hive/warehouse/org_userbehavior_all_yunzhi_parquet';

set hive.exec.dynamic.partition.mode=nonstrict
set mapred.max.split.size=1024000000;
set mapred.min.split.size.per.node=1024000000;
set mapred.min.split.size.per.rack=1024000000;
set mapred.reduce.tasks=30;
set hive.exec.reducers.bytes.per.reducer=1073741824;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set hive.exec.reducers.bytes.per.reducer=50000000;



set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=200000000;
set mapred.min.split.size.per.rack=200000000;
set mapred.reduce.tasks=100;


set spark.sql.map.partitions=100;
set spark.sql.shuffle.partitions=40;
set spark.sql.reduce.partitions=10;






org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.Bzip2Codec
org.apache.hadoop.io.compress.SnappyCodec

set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec



INSERT OVERWRITE TABLE org_userbehavior_all_yunzhi_parquet PARTITION(record_date) SELECT user_id, cast (event_time/1000 as Int) as event_time,behivior_pop,behivior_id,record_date from org_userbehavior_all_yunzhi;
-- 2908248821  /user/hive/warehouse/org_userbehavior_funnel_dd2
-- hadoop fs -ls -R  /user/hive/warehouse/org_userbehavior_funnel_dd2 | wc -l     2960

-- todo partitions 不包含 behivior_id 压缩比更好,和字典压缩有关.  生成的文件数太多,要修改 shuffer 数量来减少 文件数.



-- udf
-- funnel_merge   tuple
-- funnel_merge2  time *10  + eventid
-- funnel_sum





CREATE TABLE org_userbehavior_all_yunzhi_behiviorid_parquet
(
user_id Int
,event_time Int
,behivior_pop  String
)
partitioned by (record_date String,behivior_id SMALLINT)
STORED AS PARQUET
location '/user/hive/warehouse/org_userbehavior_all_yunzhi_behiviorid_parquet';
INSERT OVERWRITE TABLE org_userbehavior_all_yunzhi_behiviorid_parquet PARTITION(record_date,behivior_id)  SELECT user_id, cast (event_time/1000 as  Int) as event_time ,behivior_pop,record_date,behivior_id from org_userbehavior_all_yunzhi;

set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
INSERT OVERWRITE TABLE org_userbehavior_all_yunzhi_parquet PARTITION(record_date) SELECT user_id, cast (event_time/1000 as Int) as event_time,behivior_pop,behivior_id,record_date from org_userbehavior_all_yunzhi;


set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=128000000;
set hive.merge.smallfiles.avgsize=15000000;

set hive.merge.mapfiles = true #在Map-only的任务结束时合并小文件
set hive.merge.mapredfiles = true #在Map-Reduce的任务结束时合并小文件
set hive.merge.size.per.task = 256*1000*1000 #合并文件的大小
set hive.merge.smallfiles.avgsize=16000000 #当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge


set hive.exec.dynamic.partition.mode=nonstrict;
set mapred.max.split.size=256000000;
set mapred.min.split.size.per.node=256000000;
set mapred.min.split.size.per.rack=256000000;
set mapred.reduce.tasks=30;
set hive.exec.reducers.bytes.per.reducer=1073741824;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set spark.sql.shuffle.partitions=40;
set spark.sql.reduce.partitions=10;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=128000000;
set hive.merge.smallfiles.avgsize=50000000;


org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.Bzip2Codec
org.apache.hadoop.io.compress.SnappyCodec


set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec


select funnelsum(funneldata) from  ( select user_id,funnelmergecount(behivior_id,event_time,'10004,10007',1000000,1451577600) as funneldata  from org_userbehavior_all_yunzhi_parquet  where   behivior_id in (10004,10007)  and  (behivior_id != 10004 or  jsoninfogetstring(behivior_pop,'brand') = 'Apple')  group by  user_id ) temp2
select funnelsum(funneldata) from  ( select user_id,funnelmergecount(behivior_id,event_time,'10002,10003,10004,10007,10008',86400,1451577600) as funneldata  from org_userbehavior_all_yunzhi_parquet  where  record_date in ('20170101','20170102','20170103','20170104','20170105','20170106','20170107')  and  behivior_id in (10002,10003,10004,10007,10008)  and    (behivior_id != 10003 or  jsoninfogetstring(behivior_pop,'brand') = 'Apple')  and   (behivior_id != 10004 or  jsoninfogetdouble(behivior_pop,'price') > 5000)   group by  user_id ) temp2
select funnelsum(funneldata) from  ( select user_id,funnelmergecount(behivior_id,event_time,'10002,10003,10004,10007,10008',86400,1451577600) as funneldata  from org_userbehavior_all_yunzhi_parquet  where  record_date in ('20170101','20170102','20170103','20170104','20170105','20170106','20170107')  and  behivior_id in (10002,10003,10004,10007,10008)   group by  user_id ) temp2 ;
select funnelsum(funneldata) from  ( select user_id,funnelmergecount(behivior_id,event_time,'10002,10003,10004,10007,10008',86400,1451577600) as funneldata  from org_userbehavior_all_yunzhi_parquet  where   behivior_id in (10002,10003,10004,10007,10008)   group by  user_id ) temp2 ;



CREATE TABLE org_userbehavior_all_yunzhi_parquet_userslice
(
user_id Int
,event_time Int
,behivior_pop  String
,behivior_id SMALLINT
)
partitioned by (record_date String,user_id_slice Int )
STORED AS PARQUET
location '/user/hive/warehouse/org_userbehavior_all_yunzhi_parquet_userslice';




要如何把 天的输出 ,变成一个区间 。  一天 一天的 处理 ?  把 utdid 根据 Hash 分区



set hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE org_userbehavior_all_yunzhi_parquet_userslice PARTITION(record_date,user_id_slice) SELECT user_id, cast (event_time/1000 as Int) as event_time,behivior_pop,behivior_id,record_date,user_id%8 as mode  from org_userbehavior_all_yunzhi where record_date = 20170101;




输出小文件合并的改进 (增加自动合并结果文件)

当spark.sql.shuffle.partitions设置的比较大且结果数据集比较小时,会产生大量的小文件(文件数等同spark.sql.shuffle.partitions)。
解决办法:
在最后的执行计划中加入一个repartition transformation。通过参数控制最终的partitions数且不影响shuffle partition的数量。
使用方法:
set spark.sql.result.partitions=10;

作者:Albert陈凯
链接:http://www.jianshu.com/p/00328171b8a6
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

配置属性
set hive.merge.sparkfiles=true;
set hive.merge.smallfiles.avgsize=50000000;







set spark.sql.shuffle.partitions=1;
set spark.sql.reduce.partitions=1;




# see HIVE-9153
mapreduce.input.fileinputformat.split.maxsize=750000000
hive.vectorized.execution.enabled=true

hive.cbo.enable=true
hive.optimize.reducededuplication.min.reducer=4
hive.optimize.reducededuplication=true
hive.orc.splits.include.file.footer=false
set hive.merge.mapfiles=true;
set hive.merge.sparkfiles=true;
set hive.merge.smallfiles.avgsize=16000000;
set hive.merge.size.per.task=256000000;
hive.merge.orcfile.stripe.level=true
hive.auto.convert.join=true
hive.auto.convert.join.noconditionaltask=true
hive.auto.convert.join.noconditionaltask.size=894435328
hive.optimize.bucketmapjoin.sortedmerge=false
hive.map.aggr.hash.percentmemory=0.5
hive.map.aggr=true
hive.optimize.sort.dynamic.partition=false
hive.stats.autogather=true
hive.stats.fetch.column.stats=true
hive.vectorized.execution.reduce.enabled=false
hive.vectorized.groupby.checkinterval=4096
hive.vectorized.groupby.flush.percent=0.1
hive.compute.query.using.stats=true
hive.limit.pushdown.memory.usage=0.4
hive.optimize.index.filter=true
hive.exec.reducers.bytes.per.reducer=67108864
hive.smbjoin.cache.rows=10000
hive.exec.orc.default.stripe.size=67108864
hive.fetch.task.conversion=more
hive.fetch.task.conversion.threshold=1073741824
hive.fetch.task.aggr=false
mapreduce.input.fileinputformat.list-status.num-threads=5
spark.kryo.referenceTracking=false
spark.kryo.classesToRegister=org.apache.hadoop.hive.ql.io.HiveKey,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch

set hive.exec.dynamic.partition.mode=nonstrict
set hive.merge.mapfiles=true;
set hive.merge.sparkfiles=true;
set hive.merge.smallfiles.avgsize=128000000;
set hive.merge.size.per.task=256000000;

INSERT OVERWRITE TABLE org_userbehavior_all_yunzhi_parquet_userslice PARTITION(record_date,user_id_slice) SELECT user_id, cast (event_time/1000 as Int) as event_time,behivior_pop,behivior_id,record_date,user_id%8 as mode  from org_userbehavior_all_yunzhi
分享到:
评论

相关推荐

    Spark和Hive的结合(让hive基于spark计算)

    2.在spark/conf中创建一个hive-site.xml    javax.jdo.option.ConnectionURL  jdbc:mysql://192.168.224.132:3306/hive?createDatabaseIfNotExist=true  JDBC connect string for a JDBC metastore        ...

    基于Shell脚本,通过简单配置后,可以自动安装Hadoop、Hive、Spark等大数据组件.zip

    6.修改/usr/local/hive/conf下的配置文件,该目录下的文件有: 二.在Ubuntu中安装并配置mysql 1.我们采用MySQL数据库保存Hive的元数据,而不是采用Hive自带的derby来存储元数据,因此需要在Ubuntu里安装MySQL 使用...

    藏经阁-spark替代HIVE实现ETL作业.pdf

    1. 部署 Spark 接入 Hive 源数据,将 Hive-site.xml 移动到 Spark 的 conf 目录下即可。 2. 利用 Hive 作业脚本,建立 Spark 作业脚本。替换 Hive 参数,为 Spark 版本。 3. 切换线上作业用 Spark 作业代替 Hive。 ...

    hive metastore java api使用

    hive metastore是hive的元数据管理服务,实际应用中很多第三方框架需要访问metastore服务,如spark,impala等。同样hive metastore也提供了java接口。 使用 import org.apache.hadoop.hive.conf.HiveConf; import org...

    hive文档.docx

    4. 配置Hive的环境变量,包括设置HADOOP_HOME路径和HIVE_CONF_DIR路径。 5. 修改/etc/profile文件,将Hive添加到系统环境变量中。 6. 在Hadoop集群上启动HDFS和YARN服务。 7. 在HDFS上创建所需的目录,如/tmp和/user...

    docker 部署spark集群配置文件

    描述: 这个资源包含了一个基本的Spark集群配置,包括Hadoop、Hive、MySQL和Spark的配置文件。 文件清单: Dockerfile build.sh build_network.sh -yarn-site.xml -stop_containers.sh -start-hadoop.sh -start_...

    Spark Core 笔记02

    Spark Core学习 对最近在看的赵星老师Spark视频中关于SparkCore的几个案例进行总结。 目录1.WordCountWordCount 执行流程详解2.统计最受欢迎老师topN1. 方法一:普通方法,不设置分组/分区2. 方法二:设置分组和过滤...

    12-安装部署Hive1

    9. **优化配置**:根据实际的硬件环境和业务需求,可能需要对Hive的配置进行调整,例如增大Hive的内存分配、调整Hive的执行引擎(如使用Tez或Spark)、优化HDFS的副本数量等。 10. **监控与日志管理**:在部署完成...

    hive开发资料.pdf

    - 配置 Hive,默认配置文件是 `conf/hive-default.xml`。例如,将元数据存储改为远程 Derby 数据库,修改 `<property>` 标签,如 `javax.jdo.option.ConnectionURL` 和 `javax.jdo.option.ConnectionDriverName`。 ...

    在python中使用pyspark读写Hive数据操作

    spark-submit --conf spark.sql.catalogImplementation=hive test.py ``` 6. **补充知识**: - **PySpark与HBase交互**: SHC(Spark-HBase Connector)允许pyspark直接读写HBase数据,转换为DataFrame,需要在Spark...

    基于spark的电商用户行为分析系统源码+项目说明.zip

    hive 3.1.2 mysql 5.7.28 kafka_2.12-2.3.0 jdk 1.8.0_192 hadoop 2.9.2 zookeeper-3.5.5 Ubuntu 18.04 Windows10 Commons包:公共模块包 conf:配置工具类,获取commerce.properties文件中的所有配置信息, 使用户...

    Spark SQL PDF

    - `.conf`提供Spark和Hadoop相关的配置信息。 - `.read`用于读取各种数据源,如CSV、JSON、Parquet等,转化为DataFrame。 Spark SQL 的这些特性使其成为大数据处理和分析的强大工具,无论是在数据工程、数据科学...

    spark最新集群搭建指南2017

    5. **SparkSQL**:Spark SQL是Spark的一个模块,用于处理结构化数据,可以集成多种数据源,如HDFS、Cassandra、Hive等。它可以与DataFrame API一起使用,提供SQL查询能力,使得开发人员可以方便地在SQL和DataFrame ...

    集群配置.zip

    hadoop、yarn、hive、spark等conf目录下的配置文件。。。

    dolphinscheduler配置

    dolphinscheduler使用hive,hadoop,spark,datax,python,scala修改添加的配置

    Spark-shell批量命令执行脚本的方法

    exec $SPARK_HOME/bin/spark-shell --queue tv --name spark-sql-test --executor-cores 8 --executor-memory 8g --num-executors 8 --conf spark.cleaner.ttl=240000 <<!EOF import org.apache.spark.sql....

    Spark学习笔记之Spark SQL的具体使用

    学习Spark SQL的原因是因为我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,但由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的...

    hadoop集群安装简易版.docx

    在实际部署中,要确保所有节点的配置文件一致,包括`core-site.xml`,`hdfs-site.xml`,`yarn-site.xml`和`spark-defaults.conf`等。配置文件中应明确指定HA模式,如NameNode的备用节点地址,ResourceManager的备用...

    海豚调度,dolphinscheduler-data-quality-dev-SNAPSHOT最新版本3.1.1

    如果要用到MySQL数据,需要将pom.xml中MySQL的scope注释掉 当前只测试了MySQL、PostgreSQL和HIVE数据源,其他数据源暂时未测试过 Spark需要配置好读取Hive元数据,Spark不是采用jdbc的方式读取Hive

Global site tag (gtag.js) - Google Analytics