`

hive spark conf

 
阅读更多
CREATE TABLE org_userbehavior_all_yunzhi
(
user_id Int
,event_time bigint
,behivior_id SMALLINT
,behivior_name String
,behivior_pop  String
,record_date  String
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
location '/user/hive/warehouse/org_userbehavior_all_yunzhi';
LOAD DATA INPATH '/sparklib/data' OVERWRITE INTO TABLE org_userbehavior_all_yunzhi;



CREATE TABLE org_userbehavior_all_yunzhi_parquet
(
user_id Int
,event_time Int
,behivior_pop  String
)
partitioned by (record_date String,behivior_id SMALLINT )
STORED AS PARQUET
location '/user/hive/warehouse/org_userbehavior_all_yunzhi_parquet';

INSERT OVERWRITE TABLE org_userbehavior_funnel_dd PARTITION(record_date,behivior_id)  SELECT user_id, cast (event_time/1000 as  Int) as record_date   ,behivior_pop,record_date,behivior_id from org_userbehavior_all;
-- 4538072102  /user/hive/warehouse/org_userbehavior_funnel_dd
-- hadoop fs -ls -R  /user/hive/warehouse/org_userbehavior_funnel_dd | wc -l     3599

CREATE TABLE org_userbehavior_all_yunzhi_parquet
(
user_id Int
,event_time Int
,behivior_pop  String
,behivior_id SMALLINT
)
partitioned by (record_date String )
STORED AS PARQUET
location '/user/hive/warehouse/org_userbehavior_all_yunzhi_parquet';

set hive.exec.dynamic.partition.mode=nonstrict
set mapred.max.split.size=1024000000;
set mapred.min.split.size.per.node=1024000000;
set mapred.min.split.size.per.rack=1024000000;
set mapred.reduce.tasks=30;
set hive.exec.reducers.bytes.per.reducer=1073741824;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set hive.exec.reducers.bytes.per.reducer=50000000;



set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=200000000;
set mapred.min.split.size.per.rack=200000000;
set mapred.reduce.tasks=100;


set spark.sql.map.partitions=100;
set spark.sql.shuffle.partitions=40;
set spark.sql.reduce.partitions=10;






org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.Bzip2Codec
org.apache.hadoop.io.compress.SnappyCodec

set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec



INSERT OVERWRITE TABLE org_userbehavior_all_yunzhi_parquet PARTITION(record_date) SELECT user_id, cast (event_time/1000 as Int) as event_time,behivior_pop,behivior_id,record_date from org_userbehavior_all_yunzhi;
-- 2908248821  /user/hive/warehouse/org_userbehavior_funnel_dd2
-- hadoop fs -ls -R  /user/hive/warehouse/org_userbehavior_funnel_dd2 | wc -l     2960

-- todo partitions 不包含 behivior_id 压缩比更好,和字典压缩有关.  生成的文件数太多,要修改 shuffer 数量来减少 文件数.



-- udf
-- funnel_merge   tuple
-- funnel_merge2  time *10  + eventid
-- funnel_sum





CREATE TABLE org_userbehavior_all_yunzhi_behiviorid_parquet
(
user_id Int
,event_time Int
,behivior_pop  String
)
partitioned by (record_date String,behivior_id SMALLINT)
STORED AS PARQUET
location '/user/hive/warehouse/org_userbehavior_all_yunzhi_behiviorid_parquet';
INSERT OVERWRITE TABLE org_userbehavior_all_yunzhi_behiviorid_parquet PARTITION(record_date,behivior_id)  SELECT user_id, cast (event_time/1000 as  Int) as event_time ,behivior_pop,record_date,behivior_id from org_userbehavior_all_yunzhi;

set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
INSERT OVERWRITE TABLE org_userbehavior_all_yunzhi_parquet PARTITION(record_date) SELECT user_id, cast (event_time/1000 as Int) as event_time,behivior_pop,behivior_id,record_date from org_userbehavior_all_yunzhi;


set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=128000000;
set hive.merge.smallfiles.avgsize=15000000;

set hive.merge.mapfiles = true #在Map-only的任务结束时合并小文件
set hive.merge.mapredfiles = true #在Map-Reduce的任务结束时合并小文件
set hive.merge.size.per.task = 256*1000*1000 #合并文件的大小
set hive.merge.smallfiles.avgsize=16000000 #当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge


set hive.exec.dynamic.partition.mode=nonstrict;
set mapred.max.split.size=256000000;
set mapred.min.split.size.per.node=256000000;
set mapred.min.split.size.per.rack=256000000;
set mapred.reduce.tasks=30;
set hive.exec.reducers.bytes.per.reducer=1073741824;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set spark.sql.shuffle.partitions=40;
set spark.sql.reduce.partitions=10;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=128000000;
set hive.merge.smallfiles.avgsize=50000000;


org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.Bzip2Codec
org.apache.hadoop.io.compress.SnappyCodec


set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec


select funnelsum(funneldata) from  ( select user_id,funnelmergecount(behivior_id,event_time,'10004,10007',1000000,1451577600) as funneldata  from org_userbehavior_all_yunzhi_parquet  where   behivior_id in (10004,10007)  and  (behivior_id != 10004 or  jsoninfogetstring(behivior_pop,'brand') = 'Apple')  group by  user_id ) temp2
select funnelsum(funneldata) from  ( select user_id,funnelmergecount(behivior_id,event_time,'10002,10003,10004,10007,10008',86400,1451577600) as funneldata  from org_userbehavior_all_yunzhi_parquet  where  record_date in ('20170101','20170102','20170103','20170104','20170105','20170106','20170107')  and  behivior_id in (10002,10003,10004,10007,10008)  and    (behivior_id != 10003 or  jsoninfogetstring(behivior_pop,'brand') = 'Apple')  and   (behivior_id != 10004 or  jsoninfogetdouble(behivior_pop,'price') > 5000)   group by  user_id ) temp2
select funnelsum(funneldata) from  ( select user_id,funnelmergecount(behivior_id,event_time,'10002,10003,10004,10007,10008',86400,1451577600) as funneldata  from org_userbehavior_all_yunzhi_parquet  where  record_date in ('20170101','20170102','20170103','20170104','20170105','20170106','20170107')  and  behivior_id in (10002,10003,10004,10007,10008)   group by  user_id ) temp2 ;
select funnelsum(funneldata) from  ( select user_id,funnelmergecount(behivior_id,event_time,'10002,10003,10004,10007,10008',86400,1451577600) as funneldata  from org_userbehavior_all_yunzhi_parquet  where   behivior_id in (10002,10003,10004,10007,10008)   group by  user_id ) temp2 ;



CREATE TABLE org_userbehavior_all_yunzhi_parquet_userslice
(
user_id Int
,event_time Int
,behivior_pop  String
,behivior_id SMALLINT
)
partitioned by (record_date String,user_id_slice Int )
STORED AS PARQUET
location '/user/hive/warehouse/org_userbehavior_all_yunzhi_parquet_userslice';




要如何把 天的输出 ,变成一个区间 。  一天 一天的 处理 ?  把 utdid 根据 Hash 分区



set hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE org_userbehavior_all_yunzhi_parquet_userslice PARTITION(record_date,user_id_slice) SELECT user_id, cast (event_time/1000 as Int) as event_time,behivior_pop,behivior_id,record_date,user_id%8 as mode  from org_userbehavior_all_yunzhi where record_date = 20170101;




输出小文件合并的改进 (增加自动合并结果文件)

当spark.sql.shuffle.partitions设置的比较大且结果数据集比较小时,会产生大量的小文件(文件数等同spark.sql.shuffle.partitions)。
解决办法:
在最后的执行计划中加入一个repartition transformation。通过参数控制最终的partitions数且不影响shuffle partition的数量。
使用方法:
set spark.sql.result.partitions=10;

作者:Albert陈凯
链接:http://www.jianshu.com/p/00328171b8a6
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

配置属性
set hive.merge.sparkfiles=true;
set hive.merge.smallfiles.avgsize=50000000;







set spark.sql.shuffle.partitions=1;
set spark.sql.reduce.partitions=1;




# see HIVE-9153
mapreduce.input.fileinputformat.split.maxsize=750000000
hive.vectorized.execution.enabled=true

hive.cbo.enable=true
hive.optimize.reducededuplication.min.reducer=4
hive.optimize.reducededuplication=true
hive.orc.splits.include.file.footer=false
set hive.merge.mapfiles=true;
set hive.merge.sparkfiles=true;
set hive.merge.smallfiles.avgsize=16000000;
set hive.merge.size.per.task=256000000;
hive.merge.orcfile.stripe.level=true
hive.auto.convert.join=true
hive.auto.convert.join.noconditionaltask=true
hive.auto.convert.join.noconditionaltask.size=894435328
hive.optimize.bucketmapjoin.sortedmerge=false
hive.map.aggr.hash.percentmemory=0.5
hive.map.aggr=true
hive.optimize.sort.dynamic.partition=false
hive.stats.autogather=true
hive.stats.fetch.column.stats=true
hive.vectorized.execution.reduce.enabled=false
hive.vectorized.groupby.checkinterval=4096
hive.vectorized.groupby.flush.percent=0.1
hive.compute.query.using.stats=true
hive.limit.pushdown.memory.usage=0.4
hive.optimize.index.filter=true
hive.exec.reducers.bytes.per.reducer=67108864
hive.smbjoin.cache.rows=10000
hive.exec.orc.default.stripe.size=67108864
hive.fetch.task.conversion=more
hive.fetch.task.conversion.threshold=1073741824
hive.fetch.task.aggr=false
mapreduce.input.fileinputformat.list-status.num-threads=5
spark.kryo.referenceTracking=false
spark.kryo.classesToRegister=org.apache.hadoop.hive.ql.io.HiveKey,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch

set hive.exec.dynamic.partition.mode=nonstrict
set hive.merge.mapfiles=true;
set hive.merge.sparkfiles=true;
set hive.merge.smallfiles.avgsize=128000000;
set hive.merge.size.per.task=256000000;

INSERT OVERWRITE TABLE org_userbehavior_all_yunzhi_parquet_userslice PARTITION(record_date,user_id_slice) SELECT user_id, cast (event_time/1000 as Int) as event_time,behivior_pop,behivior_id,record_date,user_id%8 as mode  from org_userbehavior_all_yunzhi
分享到:
评论

相关推荐

    spark2.0编译版-适用于hive2.3的hive on spark

    在Hive的`metastore.conf`文件中,需要设置`spark.sql.hive.metastore.jars`参数为`maven`或`builtin`,以指示Hive从Maven仓库或Hive的类路径中加载Spark相关jar。 4. **启动和使用**:编译完成后,将`spark-2.0.2-...

    Hive on Spark实施笔记1

    编辑`conf/spark-env.sh`、`conf/spark-defaults.conf`、`conf/slaves`和`conf/log4j.properties`文件。在`slaves`文件中列出所有Spark集群的worker节点。通过`log4j.properties`,可以根据需求调整日志级别。 启动...

    spark-hive-2.11和spark-sql-以及spark-hadoop包另付下载地址

    在实际使用中,首先需要解压缩这些文件,然后配置Spark的`spark-defaults.conf`文件以指向Hive的元数据存储位置(如HDFS或本地文件系统上的metastore_db)。接着,通过设置`spark.sql.hive.metastore.uris`等参数,...

    spark或mr引擎插入的数据,hive表查询数据为0

    - `--conf spark.sql.hive.convertMetastoreOrc=false` - `--conf spark.hadoop.mapred.input.dir.recursive=true` - 第一个参数用于禁用自动转换Metastore中的Orc文件,第二个参数则确保Spark-SQL能够递归地读取...

    spark连接HIveDemo

    在配置Spark时,需要在`spark-defaults.conf`文件中指定Hive的相关路径,如Hive Metastore URI、Hive配置目录等。例如: ``` spark.sql.warehouse.dir=hdfs://namenode:port/warehouse spark.sql.hive.metastore....

    win10下搭建Hadoop环境(jdk+mysql+hadoop+scala+hive+spark) 3.docx

    下载Spark的预编译版本,配置`spark-env.sh`和`spark-defaults.conf`文件,指定Hadoop的配置路径、Spark的工作内存、主节点等参数。启动Spark的Master和Worker节点,可以通过`start-all.sh`命令完成。 在整个过程中...

    Spark和Hive的结合(让hive基于spark计算)

    2.在spark/conf中创建一个hive-site.xml    javax.jdo.option.ConnectionURL  jdbc:mysql://192.168.224.132:3306/hive?createDatabaseIfNotExist=true  JDBC connect string for a JDBC metastore        ...

    spark操作hive表源码

    val conf = new SparkConf().setAppName("Spark-Hive-Example") val hiveContext = new org.apache.spark.sql.HiveContext(conf) ``` 在`SparkConf`中,我们可以设置`spark.sql.warehouse.dir`为Hive的默认数据库...

    spark-2.3.1-bin-hadoop2-without-hive.tgz

    2. **配置Spark**: 在Spark的配置文件`spark-defaults.conf`中,需要设置`spark.sql.hive.metastore.uris`为Hive Metastore的Thrift服务URI,以便Spark可以连接到Hive的元数据服务。 3. **Hive-Spark交互**: 通过...

    apache-hive-3.1.2-bin.tar.gz

    - `conf`:配置文件,如`hive-site.xml`,用于设置Hive的各种属性。 - `lib`:包含Hive运行所需的库文件和依赖。 - `docs`:Hive的文档和API参考。 - `sql-standard`:SQL标准相关的文件,Hive支持ANSI SQL标准。 - ...

    spark--bin-hadoop3-without-hive.tgz

    本压缩包“spark--bin-hadoop3-without-hive.tgz”提供了Spark二进制版本,针对Hadoop 3.1.3进行了编译和打包,这意味着它已经与Hadoop 3.x兼容,但不包含Hive组件。在CentOS 8操作系统上,这个版本的Spark已经被...

    spark-2.3.0-bin-hadoop2-without-hive

    用户需要自行添加 Hive 的相关依赖,并在 Spark 的配置文件(如 `spark-defaults.conf`)中设置 `spark.sql.hive.metastore.uris`,指向 Hive 的元数据服务器地址,确保 Spark 能够访问 Hive 的元数据。 在实际操作...

    CDH6.3.2之升级spark-3.3.1.doc

    - **hive-site.xml**:`cp /etc/hive/conf/hive-site.xml /opt/cloudera/parcels/CDH/lib/spark3/conf/` - **spark-env.sh**:`cp /etc/spark/conf/spark-env.sh /opt/cloudera/parcels/CDH/lib/spark3/conf/` - ...

    spark-2.4.3-bin-hadoop2-without-hive.tgz

    - 连接Hadoop集群时,需要正确配置`conf/spark-defaults.conf`中的相关参数,如`spark.master`(例如设置为yarn)和`spark.hadoop.conf`等。 - 使用DataFrame API进行数据处理,可以使用Scala、Java、Python或R...

    基于Shell脚本,通过简单配置后,可以自动安装Hadoop、Hive、Spark等大数据组件.zip

    6.修改/usr/local/hive/conf下的配置文件,该目录下的文件有: 二.在Ubuntu中安装并配置mysql 1.我们采用MySQL数据库保存Hive的元数据,而不是采用Hive自带的derby来存储元数据,因此需要在Ubuntu里安装MySQL 使用...

    cdh6.3.2升级hive至3.x后登陆HiveServer2连接失败.doc

    cp /opt/cloudera/parcels/CDH/lib/hive/conf/hive-env.sh /opt/cloudera/parcels/CDH/lib/hive/conf/hive-env.sh.bak ``` 2. **编辑并注释掉 `export HIVE_OPTS`**: - 使用文本编辑器打开 `hive-env.sh` 文件...

    hive如何去安装与配置

    2. **HIVE_CONF_DIR**:Hive配置文件的路径,通常是`/usr/local/hive/conf`。 3. **HIVE_AUX_JARS_PATH**:如果需要添加自定义库,可以在这里指定路径。 此外,还需要在`/usr/local/hive/conf/hive-site.xml`中配置...

    spark-3.0.2-bin-hadoop2.7-hive1.2.tgz

    在运行Spark之前,需要根据你的集群环境调整`conf/spark-defaults.conf`和`conf/hive-site.xml`等配置文件,以确保与Hadoop和Hive的正确连接。 总的来说,Spark 3.0.2与Hadoop 2.7和Hive 1.2的集成为大数据处理提供...

    spark-2.0.0-bin-hadoop2-without-hive.tgz

    2. **配置Spark**:在Spark的`conf/spark-defaults.conf`文件中,设置`spark.sql.hive.metastore.uris`来指向你的Hive Metastore服务的Thrift URI。同时,可能还需要指定Hive的库路径,例如`spark.sql.hive....

    藏经阁-spark替代HIVE实现ETL作业.pdf

    1. 部署 Spark 接入 Hive 源数据,将 Hive-site.xml 移动到 Spark 的 conf 目录下即可。 2. 利用 Hive 作业脚本,建立 Spark 作业脚本。替换 Hive 参数,为 Spark 版本。 3. 切换线上作业用 Spark 作业代替 Hive。 ...

Global site tag (gtag.js) - Google Analytics