- 浏览: 125256 次
- 性别:
- 来自: 杭州
文章分类
最新评论
DataFrameWriter
format
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName
根据这个标来找对应的
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance()
ds match {
case ws: WriteSupport =>
val options = new DataSourceOptions((extraOptions ++
DataSourceV2Utils.extractSessionConfigs(
ds = ds.asInstanceOf[DataSourceV2],
conf = df.sparkSession.sessionState.conf)).asJava)
// Using a timestamp and a random UUID to distinguish different writing jobs. This is good
// enough as there won't be tons of writing jobs created at the same second.
val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
.format(new Date()) + "-" + UUID.randomUUID()
val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
if (writer.isPresent) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get(), df.logicalPlan)
}
}
2018-03-14 09:39:19,706 WARN [Executor task launch worker for task 175] parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0
org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\)
at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:567)
at org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:544)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:431)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:386)
at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:107)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:381)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:355)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
parquet
ParquetFileFormat.scala
package org.apache.spark.sql.execution.datasources
format
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName
根据这个标来找对应的
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance()
ds match {
case ws: WriteSupport =>
val options = new DataSourceOptions((extraOptions ++
DataSourceV2Utils.extractSessionConfigs(
ds = ds.asInstanceOf[DataSourceV2],
conf = df.sparkSession.sessionState.conf)).asJava)
// Using a timestamp and a random UUID to distinguish different writing jobs. This is good
// enough as there won't be tons of writing jobs created at the same second.
val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
.format(new Date()) + "-" + UUID.randomUUID()
val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
if (writer.isPresent) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get(), df.logicalPlan)
}
}
2018-03-14 09:39:19,706 WARN [Executor task launch worker for task 175] parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0
org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\)
at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:567)
at org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:544)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:431)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:386)
at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:107)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:381)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:355)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
parquet
ParquetFileFormat.scala
package org.apache.spark.sql.execution.datasources
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1034抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 451/home/isuhadoop/spark2/sbin/sta ... -
udaf 返回的 子属性
2018-03-20 13:22 444udaf 返回的 子属性 spark.sql(" ... -
如何 map 端 Join。
2018-03-04 19:31 626Hive 中 修改表的 rawDataSize = 1 1 ... -
spark thrift server 修改
2018-03-04 12:58 586org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 415正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 533#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 553sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 525sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 867spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 624org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 349jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 943sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1299CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 570def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 473export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 590./sbin/start-thriftserver.sh -- ... -
test code2
2017-09-03 13:45 492package org.test.udf import co ... -
test code
2017-08-24 17:52 290def taskcal(data:Array[(String, ... -
struct streaming SQL udf udaf
2017-08-22 11:50 680spark aggregator class H ...
相关推荐
用于Apache Spark的Neo4j连接器 该存储库包含适用于Apache Spark的Neo4j连接器。 执照 这个neo4j-connector-apache-spark是Apache 2许可的 从源代码生成文档 cd doc # Install NodeJS dependencies npm install # ...
可能讨论了如何利用CarbonData的Spark DataSource API,实现Spark作业与CarbonData之间的无缝对接。 3. **查询优化**:会议可能涵盖了如何利用CarbonData的查询优化器来提升查询效率,例如通过选择合适的索引、优化...
本pdf是对spark3.0新增特性的讲解描述,主要包括以下几个方面: ...4,Apache Spark DataSource V2(数据源API稳定版) 5,SparkR向量化读写 6,更好的 ANSI SQL 兼容 7,其他 8,参考文献 9,扩展(数据湖)
4. **DataSource**:用于将数据源连接到Hudi表的接口,此处特指Spark DataSource API。 5. **Index**:提供对数据文件的快速查找机制。 6. **Data Files**:存储实际的数据。 7. **Timeline**:管理Hudi表的状态和...
适用于Apache Spark的Hive ACID数据源一个基于Spark Datasource V1 API的数据源,为提供Spark支持。 此数据源提供了与Hive ACID V2表,完全ACID表以及仅插入表一起使用的功能。 功能可用性矩阵功能性完整的ACID表仅...
第三方mongo spark连接器,运行spark-submit --packages com.stratio.datasource:spark-mongodb_2.10:0.11.2可以自动下载,国内网容易下载失败,把这个文件解压后拷贝到~/.ivy2目录下即可。 ...
DataSource 实现。 笔记 该库的开发仍在进行中。 因此,由于迄今为止的测试能力有限,某些功能在尚未充分考虑的生产环境中使用时可能不稳定。 在线文档 您可以在 . 建造 先决条件 在构建项目之前有一些要求。 请确保...
8.外部数据源Exeternal DataSource 9.集成Hive 10.自定义函数UDF 11.分布式SQL引擎(spakr-sql和Spark ThriftServer) 12.Catalyst 优化器 第四章、离线综合实战 1.综合实战概述(需求、调研、业务) 2.环境搭建...
Spark SQL提供了一个DataSource API,用于管理存储在外部数据源中的数据集。DataFrameReader用于从外部数据源读取数据集,而DataFrameWriter用于将数据集写入外部数据源。这些API使得与多种数据源进行交互成为可能,...
SparkSQL基于DataSource API,能够接入多种数据源,包括但不限于HDFS(Hadoop分布式文件系统)、Kudu、OSS(对象存储服务)。这使得Spark能够处理各种不同类型的存储系统中的数据。值得注意的是,对于某些特定的...
在数据处理效率方面,Apache Spark提供了一个灵活的API来兼容不同类型的DataSource,支持全量扫描、列剪枝、列剪枝加过滤机制以及数据插入等多种数据访问方式。之前,Spark已经支持了包括JDBC(MySQL、PostgreSQL)...
- MapReduce和Spark作业:观察任务执行情况,分析延迟和失败原因。 总的来说,“foursquare-datasource-plugin-clouderamanager”是Grafana与Cloudera Manager之间的重要桥梁,它极大地提升了CDH集群的运维效率,使...
DataSource是一个可插拔的数据提供者框架,它允许用户通过简单的API来扩展Spark SQL以支持新的数据源。 ##### 9.5 CreatableRelationProvider - 根据保存模式保存行的数据源 CreatableRelationProvider是一个接口...
### Mastering Apache Spark #### 概览 Apache Spark 是一个用于大规模数据处理的开源集群计算框架,它提供了统一的数据分析接口,支持批处理、交互式查询、流处理、机器学习和图形处理等任务。本篇内容将围绕 ...
#Spark SQL HBase Connector##----------------Note: This Project is Deprecated---------------##--------------And This ...Spark1.2发布之后,Spark SQL支持了External Datasource API,我们才能方便的编写扩
《Hudi-0.9.0在Windows上的IDFEA集成Spark3.0与Flink-1.12.x实战指南》 Hudi(Hadoop Upsert Delta Tables)是一款开源的、面向大数据处理的实时更新工具,它允许对HDFS上的数据进行插入、更新和删除操作,提供了一...
优雅的交互方式,支持多种datasource/sink,多数据源混算 spark常驻服务,基于zookeeper的引擎自动发现 负载均衡,多个引擎随机执行 多session模式实现并行查询 采用spark的FAIR调度,避免资源被大任务独占 基于...
Spark MLlib随机梯度下降法概述与实例 Spark MLlib随机梯度下降法概述与实例是机器学习算法中的一种重要技术,用于解决线性回归问题。该算法通过不断地判断和选择当前目标下最优的路径,从而能够在最短路径下达到...
基于Spark任务流执行平台 依赖 Scala 2.11.12 星火2.4.7 Spring Boot 2.3.7。发布 快速开始 修改src / main / resources / application.yaml配置文件 spring : datasource : driver-class-name : ...