- 浏览: 125444 次
- 性别:
- 来自: 杭州
文章分类
最新评论
Hive 中 修改表的 rawDataSize = 1
14: jdbc:hive2://ark3:9994> alter table app_uuid_info_test set tblproperties(rawDataSize=1)
14: jdbc:hive2://ark3:9994> ;
HBASE 表是不会根新的所有手工指点
这个 阀值
set spark.sql.autoBroadcastJoinThreshold=100000000;
JOINT 中 left outer join app_uuid_info_test b
select a.tmp_id,a.uuid,a.eguan_id,a.device_id,b.row_key,b.value.int_20 from tmp.ods_app_hour a left outer join app_uuid_info_test b on a.tmp_id = b.row_key where a.app_id='6069' and a.day = '20180303' and a.tmp_id = '1a16d393e7042213384f994394b763d37121d7' limit 100;
执行期间中 : 就是 MAP JOIN 了。
org.apache.spark.sql.execution.SparkStrategies类 决定是否使用broadcast join的逻辑在SparkStrategies类中,
object CanBroadcast { def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match { case BroadcastHint(p) => Some(p) case p if sqlContext. conf.autoBroadcastJoinThreshold > 0 && p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => Some(p) case _ => None } }
== Parsed Logical Plan ==
'GlobalLimit 100
+- 'LocalLimit 100
+- 'Project ['a.tmp_id, 'a.uuid, 'a.eguan_id, 'a.device_id, 'b.row_key, 'b.value.int_20]
+- 'Filter ((('a.app_id = 6069) && ('a.day = 20180303)) && ('a.tmp_id = 1a16d393e7042213384f994394b763d37121d7))
+- 'Join LeftOuter, ('a.tmp_id = 'b.row_key)
:- 'UnresolvedRelation `tmp`.`ods_app_hour`, a
+- 'UnresolvedRelation `app_uuid_info_test`, b
== Analyzed Logical Plan ==
tmp_id: string, uuid: bigint, eguan_id: string, device_id: string, row_key: string, int_20: string
GlobalLimit 100
+- LocalLimit 100
+- Project [tmp_id#22017, uuid#22018L, eguan_id#22019, device_id#22020, row_key#22195, value#22196[int_20] AS int_20#22197]
+- Filter (((app_id#22014 = 6069) && (day#22015 = 20180303)) && (tmp_id#22017 = 1a16d393e7042213384f994394b763d37121d7))
+- Join LeftOuter, (tmp_id#22017 = row_key#22195)
:- SubqueryAlias a
: +- SubqueryAlias ods_app_hour
: +- Relation[tmp_id#22017,uuid#22018L,eguan_id#22019,device_id#22020,device_mac#22021,device_imsi#22022,device_aid#22023,device_snr#22024,device_udid#22025,debug_state#22026,hjk_state#22027,sir_state#22028,ij_state#22029,sdk_api_level#22030,standard_brand_id#22031,standard_model_id#22032,os_name_id#22033,os_version_id#22034,standard_smo_id#22035,app_key#22036,app_channel#22037,sdk_version#22038,app_version#22039,imeis_code#22040,... 157 more fields] parquet
+- SubqueryAlias b
+- MetastoreRelation default, app_uuid_info_test
== Optimized Logical Plan ==
GlobalLimit 100
+- LocalLimit 100
+- Project [tmp_id#22017, uuid#22018L, eguan_id#22019, device_id#22020, row_key#22195, value#22196[int_20] AS int_20#22197]
+- Join LeftOuter, (tmp_id#22017 = row_key#22195)
:- Project [tmp_id#22017, uuid#22018L, eguan_id#22019, device_id#22020]
: +- Filter (((((isnotnull(app_id#22014) && isnotnull(day#22015)) && isnotnull(tmp_id#22017)) && (app_id#22014 = 6069)) && (day#22015 = 20180303)) && (tmp_id#22017 = 1a16d393e7042213384f994394b763d37121d7))
: +- Relation[tmp_id#22017,uuid#22018L,eguan_id#22019,device_id#22020,device_mac#22021,device_imsi#22022,device_aid#22023,device_snr#22024,device_udid#22025,debug_state#22026,hjk_state#22027,sir_state#22028,ij_state#22029,sdk_api_level#22030,standard_brand_id#22031,standard_model_id#22032,os_name_id#22033,os_version_id#22034,standard_smo_id#22035,app_key#22036,app_channel#22037,sdk_version#22038,app_version#22039,imeis_code#22040,... 157 more fields] parquet
+- MetastoreRelation default, app_uuid_info_test
== Physical Plan ==
CollectLimit 100
+- *Project [tmp_id#22017, uuid#22018L, eguan_id#22019, device_id#22020, row_key#22195, value#22196[int_20] AS int_20#22197]
+- *BroadcastHashJoin [tmp_id#22017], [row_key#22195], LeftOuter, BuildRight
:- *Project [tmp_id#22017, uuid#22018L, eguan_id#22019, device_id#22020]
: +- *Filter (isnotnull(tmp_id#22017) && (tmp_id#22017 = 1a16d393e7042213384f994394b763d37121d7))
: +- *FileScan parquet tmp.ods_app_hour[tmp_id#22017,uuid#22018L,eguan_id#22019,device_id#22020,app_id#22014,day#22015,hour#22016] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://mycluster/user/hive/warehouse/tmp.db/ods_app_hour/app_id=6069/day=..., PartitionCount: 24, PartitionFilters: [isnotnull(app_id#22014), isnotnull(day#22015), (app_id#22014 = 6069), (day#22015 = 20180303)], PushedFilters: [IsNotNull(tmp_id), EqualTo(tmp_id,1a16d393e7042213384f994394b763d37121d7)], ReadSchema: struct<tmp_id:string,uuid:bigint,eguan_id:string,device_id:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- HiveTableScan [row_key#22195, value#22196], MetastoreRelation default, app_uuid_info_test
2018-03-04 19:45:47,554 INFO [dispatcher-event-loop-10] storage.BlockManagerInfo: Removed broadcast_173_piece0 on 192.168.220.171:38696 in memory (size: 33.7 KB, free: 334.5 MB)
维表写法
select a.tmp_id,a.uuid,a.eguan_id,a.device_id,b.row_key,b.int_20 from tmp.ods_app_hour a left outer join (select row_key,value.int_20 as int_20 from app_uuid_info_test where row_key in (01,02) ) b on a.tmp_id = b.row_key where a.app_id='6069' and a.day = '20180303' and a.tmp_id = '1a16d393e7042213384f994394b763d37121d7' limit 100
== Physical Plan ==
CollectLimit 100
+- *BroadcastHashJoin [tmp_id#23309], [row_key#23487], LeftOuter, BuildRight
:- *LocalLimit 100
: +- *Project [tmp_id#23309, uuid#23310L, eguan_id#23311, device_id#23312]
: +- *Filter (isnotnull(tmp_id#23309) && (tmp_id#23309 = 1a16d393e7042213384f994394b763d37121d7))
: +- *FileScan parquet tmp.ods_app_hour[tmp_id#23309,uuid#23310L,eguan_id#23311,device_id#23312,app_id#23306,day#23307,hour#23308] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://mycluster/user/hive/warehouse/tmp.db/ods_app_hour/app_id=6069/day=..., PartitionCount: 24, PartitionFilters: [isnotnull(app_id#23306), isnotnull(day#23307), (app_id#23306 = 6069), (day#23307 = 20180303)], PushedFilters: [IsNotNull(tmp_id), EqualTo(tmp_id,1a16d393e7042213384f994394b763d37121d7)], ReadSchema: struct<tmp_id:string,uuid:bigint,eguan_id:string,device_id:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [row_key#23487, value#23488[int_20] AS int_20#22932]
+- *Filter row_key#23487 IN (1,2)
+- HiveTableScan [row_key#23487, value#23488], MetastoreRelation default, app_uuid_info_test
14: jdbc:hive2://ark3:9994> alter table app_uuid_info_test set tblproperties(rawDataSize=1)
14: jdbc:hive2://ark3:9994> ;
HBASE 表是不会根新的所有手工指点
这个 阀值
set spark.sql.autoBroadcastJoinThreshold=100000000;
JOINT 中 left outer join app_uuid_info_test b
select a.tmp_id,a.uuid,a.eguan_id,a.device_id,b.row_key,b.value.int_20 from tmp.ods_app_hour a left outer join app_uuid_info_test b on a.tmp_id = b.row_key where a.app_id='6069' and a.day = '20180303' and a.tmp_id = '1a16d393e7042213384f994394b763d37121d7' limit 100;
执行期间中 : 就是 MAP JOIN 了。
org.apache.spark.sql.execution.SparkStrategies类 决定是否使用broadcast join的逻辑在SparkStrategies类中,
object CanBroadcast { def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match { case BroadcastHint(p) => Some(p) case p if sqlContext. conf.autoBroadcastJoinThreshold > 0 && p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => Some(p) case _ => None } }
== Parsed Logical Plan ==
'GlobalLimit 100
+- 'LocalLimit 100
+- 'Project ['a.tmp_id, 'a.uuid, 'a.eguan_id, 'a.device_id, 'b.row_key, 'b.value.int_20]
+- 'Filter ((('a.app_id = 6069) && ('a.day = 20180303)) && ('a.tmp_id = 1a16d393e7042213384f994394b763d37121d7))
+- 'Join LeftOuter, ('a.tmp_id = 'b.row_key)
:- 'UnresolvedRelation `tmp`.`ods_app_hour`, a
+- 'UnresolvedRelation `app_uuid_info_test`, b
== Analyzed Logical Plan ==
tmp_id: string, uuid: bigint, eguan_id: string, device_id: string, row_key: string, int_20: string
GlobalLimit 100
+- LocalLimit 100
+- Project [tmp_id#22017, uuid#22018L, eguan_id#22019, device_id#22020, row_key#22195, value#22196[int_20] AS int_20#22197]
+- Filter (((app_id#22014 = 6069) && (day#22015 = 20180303)) && (tmp_id#22017 = 1a16d393e7042213384f994394b763d37121d7))
+- Join LeftOuter, (tmp_id#22017 = row_key#22195)
:- SubqueryAlias a
: +- SubqueryAlias ods_app_hour
: +- Relation[tmp_id#22017,uuid#22018L,eguan_id#22019,device_id#22020,device_mac#22021,device_imsi#22022,device_aid#22023,device_snr#22024,device_udid#22025,debug_state#22026,hjk_state#22027,sir_state#22028,ij_state#22029,sdk_api_level#22030,standard_brand_id#22031,standard_model_id#22032,os_name_id#22033,os_version_id#22034,standard_smo_id#22035,app_key#22036,app_channel#22037,sdk_version#22038,app_version#22039,imeis_code#22040,... 157 more fields] parquet
+- SubqueryAlias b
+- MetastoreRelation default, app_uuid_info_test
== Optimized Logical Plan ==
GlobalLimit 100
+- LocalLimit 100
+- Project [tmp_id#22017, uuid#22018L, eguan_id#22019, device_id#22020, row_key#22195, value#22196[int_20] AS int_20#22197]
+- Join LeftOuter, (tmp_id#22017 = row_key#22195)
:- Project [tmp_id#22017, uuid#22018L, eguan_id#22019, device_id#22020]
: +- Filter (((((isnotnull(app_id#22014) && isnotnull(day#22015)) && isnotnull(tmp_id#22017)) && (app_id#22014 = 6069)) && (day#22015 = 20180303)) && (tmp_id#22017 = 1a16d393e7042213384f994394b763d37121d7))
: +- Relation[tmp_id#22017,uuid#22018L,eguan_id#22019,device_id#22020,device_mac#22021,device_imsi#22022,device_aid#22023,device_snr#22024,device_udid#22025,debug_state#22026,hjk_state#22027,sir_state#22028,ij_state#22029,sdk_api_level#22030,standard_brand_id#22031,standard_model_id#22032,os_name_id#22033,os_version_id#22034,standard_smo_id#22035,app_key#22036,app_channel#22037,sdk_version#22038,app_version#22039,imeis_code#22040,... 157 more fields] parquet
+- MetastoreRelation default, app_uuid_info_test
== Physical Plan ==
CollectLimit 100
+- *Project [tmp_id#22017, uuid#22018L, eguan_id#22019, device_id#22020, row_key#22195, value#22196[int_20] AS int_20#22197]
+- *BroadcastHashJoin [tmp_id#22017], [row_key#22195], LeftOuter, BuildRight
:- *Project [tmp_id#22017, uuid#22018L, eguan_id#22019, device_id#22020]
: +- *Filter (isnotnull(tmp_id#22017) && (tmp_id#22017 = 1a16d393e7042213384f994394b763d37121d7))
: +- *FileScan parquet tmp.ods_app_hour[tmp_id#22017,uuid#22018L,eguan_id#22019,device_id#22020,app_id#22014,day#22015,hour#22016] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://mycluster/user/hive/warehouse/tmp.db/ods_app_hour/app_id=6069/day=..., PartitionCount: 24, PartitionFilters: [isnotnull(app_id#22014), isnotnull(day#22015), (app_id#22014 = 6069), (day#22015 = 20180303)], PushedFilters: [IsNotNull(tmp_id), EqualTo(tmp_id,1a16d393e7042213384f994394b763d37121d7)], ReadSchema: struct<tmp_id:string,uuid:bigint,eguan_id:string,device_id:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- HiveTableScan [row_key#22195, value#22196], MetastoreRelation default, app_uuid_info_test
2018-03-04 19:45:47,554 INFO [dispatcher-event-loop-10] storage.BlockManagerInfo: Removed broadcast_173_piece0 on 192.168.220.171:38696 in memory (size: 33.7 KB, free: 334.5 MB)
维表写法
select a.tmp_id,a.uuid,a.eguan_id,a.device_id,b.row_key,b.int_20 from tmp.ods_app_hour a left outer join (select row_key,value.int_20 as int_20 from app_uuid_info_test where row_key in (01,02) ) b on a.tmp_id = b.row_key where a.app_id='6069' and a.day = '20180303' and a.tmp_id = '1a16d393e7042213384f994394b763d37121d7' limit 100
== Physical Plan ==
CollectLimit 100
+- *BroadcastHashJoin [tmp_id#23309], [row_key#23487], LeftOuter, BuildRight
:- *LocalLimit 100
: +- *Project [tmp_id#23309, uuid#23310L, eguan_id#23311, device_id#23312]
: +- *Filter (isnotnull(tmp_id#23309) && (tmp_id#23309 = 1a16d393e7042213384f994394b763d37121d7))
: +- *FileScan parquet tmp.ods_app_hour[tmp_id#23309,uuid#23310L,eguan_id#23311,device_id#23312,app_id#23306,day#23307,hour#23308] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://mycluster/user/hive/warehouse/tmp.db/ods_app_hour/app_id=6069/day=..., PartitionCount: 24, PartitionFilters: [isnotnull(app_id#23306), isnotnull(day#23307), (app_id#23306 = 6069), (day#23307 = 20180303)], PushedFilters: [IsNotNull(tmp_id), EqualTo(tmp_id,1a16d393e7042213384f994394b763d37121d7)], ReadSchema: struct<tmp_id:string,uuid:bigint,eguan_id:string,device_id:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [row_key#23487, value#23488[int_20] AS int_20#22932]
+- *Filter row_key#23487 IN (1,2)
+- HiveTableScan [row_key#23487, value#23488], MetastoreRelation default, app_uuid_info_test
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1035抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 453/home/isuhadoop/spark2/sbin/sta ... -
udaf 返回的 子属性
2018-03-20 13:22 445udaf 返回的 子属性 spark.sql(" ... -
spark datasource
2018-03-16 16:36 669DataFrameWriter format val c ... -
spark thrift server 修改
2018-03-04 12:58 587org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 415正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 534#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 555sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 525sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 868spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 625org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 351jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 947sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1299CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 572def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 476export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 591./sbin/start-thriftserver.sh -- ... -
test code2
2017-09-03 13:45 492package org.test.udf import co ... -
test code
2017-08-24 17:52 291def taskcal(data:Array[(String, ... -
struct streaming SQL udf udaf
2017-08-22 11:50 682spark aggregator class H ...
相关推荐
MapReduce是一种编程模型,用于处理大规模数据集,它是Hadoop项目的核心组成部分。...在Map端join案例中,需要掌握Map端join的原理和优化技巧。这些案例能够帮助开发者更好地理解和掌握MapReduce模型的实际应用。
#### 三、Map端Join操作示例 下面是一个使用 MapReduce 实现的 Map 端 Join 操作的示例代码。 1. **创建 Mapper 类**: - 首先定义一个 `MapJoinMapper` 类,继承自 `Mapper` 抽象类。 ```java public class ...
8. Hadoop常见的join操作:包括reduce端join、map端join、sort merge join等。 9. Hive优化策略:可以通过增加分区、设置合适的执行器配置、使用压缩存储等手段对Hive查询进行优化。 10. 分析函数:Hive支持丰富的...
在Spark编程中,"reduce端变map端"通常是指优化数据处理流程的一种策略,它涉及到如何更有效地执行数据join操作,特别是在处理大规模数据时。在上述的案例中,我们看到两种不同的方法来执行一个join操作,一种是传统...
- **Map端Join**:适用于小表Join大表的场景,通过将小表数据加载到内存中,然后在Map阶段完成Join操作。 - **Reduce端Join**:适用于两个大表Join的情况,通过在Reduce阶段进行Join操作。 - **Broadcast Join**:将...
MapReduce框架提供了多种实现Join的方法,包括Map端Join、Reduce端Join等。其中,Reduce端Join是最常用的一种方式,其基本思路是在Reduce阶段将两个或多个数据集的数据合并在一起进行处理。具体实现步骤如下: 1. *...
- **MapJoinOperator**:用于实现 Map 端 JOIN 操作。 - **LimitOperator**:用于实现 LIMIT 语句。 - **UnionOperator**:用于实现 UNION 语句。 #### 七、总结 通过以上分析可以看出,Hive 作为一款高效的数据...
作者还提供了使用高级特性如map端join和链式映射的示例。为了将理论与实践相结合,作者引导读者一步一步开发一个实际的MapReduce应用,这将帮助读者深入了解真实的Hadoop项目。 标签中提到的“大数据”是指需要通过...
在传统的数据库系统中,JOIN操作通常在服务器端完成,而在分布式计算环境中,由于数据量庞大,如果全部数据都传输到Reducer进行JOIN可能会导致网络瓶颈和性能下降。Mapside Join的出现就是为了缓解这一问题。 Map...
- **Map-side Join**:如果所有表都能在内存中完全装载,可以在mapper端完成关联,避免网络传输,提高效率。 - **Reduce-side Join**:更通用的方法,所有表的数据在mapper阶段分别处理,然后在reducer阶段进行...
当两个表在相同的列上进行了桶化,并且Join条件涉及这些列时,Hive能够执行Map端Join,从而避免了大规模数据的网络传输,提高整体查询速度。 在实际操作中,**数据加载(Data Loading)**是将外部数据导入Hive表的...
- **Map端部分聚合**:为了减轻数据倾斜的影响,可以通过在Map端进行部分聚合操作来降低Reduce端的压力。这需要设置参数`hive.map.aggr=true`开启此功能,并且通过`hive.groupby.mapaggr.checkinterval`来控制在Map...
//是否在 Map 端进行聚合,默认为 True ;该设置会消耗更多的内存。 set hive.groupby.mapaggr.checkinterval = 100000000; //在 Map 端进行聚合操作的条目数目 set hive.groupby.skewindata = true; //解决数据...
为避免这类错误,可以尝试设置`hive.auto.convert.join = false`,将自动的MapJoin转换为普通的Reduce端Join。 3. **Spark会话创建失败** 在Hive on Spark作业中,如果遇到`Failed to get a spark session`,可能...
可根据Map端输出的数据量及业务需求手动调整Reduce Task的数量。 #### 表优化 **表优化**旨在提高查询效率和减少不必要的计算资源消耗,主要包括以下几个方面: 1. **避免笛卡尔积** - **避免省略连接条件**:...
return `${alignedLines.join(' ')}</div> + ${value}`; } ``` 3. **CSS样式**:为了使文字两端对齐生效,我们需要在ECharts的`grid`或者`textStyle`中添加对应的CSS样式。注意,由于ECharts的标签是内联...
当map端处理的数据量较大,而缓冲区固定不变时,数据可能会频繁地溢写到磁盘,造成大量不必要的磁盘IO操作,从而降低性能。通过调整`spark.shuffle.memoryFraction`参数,可以增大map端缓冲区的内存占比,减少溢写...