`
m635674608
  • 浏览: 5028227 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark Java使用DataFrame的foreach/foreachPartition

 
阅读更多

Spark已更新至2.x,DataFrame归DataSet管了,因此API也相应统一。本文不再适用2.0.0及以上版本。


DataFrame原生支持直接输出到JDBC,但如果目标表有自增字段(比如id),那么DataFrame就不能直接进行写入了。因为DataFrame.write().jdbc()要求DataFrame的schema与目标表的表结构必须完全一致(甚至字段顺序都要一致),否则会抛异常,当然,如果你SaveMode选择了Overwrite,那么Spark删除你原有的表,然后根据DataFrame的Schema生成一个。。。。字段类型会非常非常奇葩。。。。
于是我们只能通过DataFrame.collect(),把整个DataFrame转成List<Row>到Driver上,然后通过原生的JDBC方法进行写入。但是如果DataFrame体积过于庞大,很容易导致Driver OOM(特别是我们一般不会给Driver配置过高的内存)。这个问题真的很让人纠结。
翻看Spark的JDBC源码,发现实际上是通过foreachPartition方法,在DataFrame每一个分区中,对每个Row的数据进行JDBC插入,那么为什么我们就不能直接用呢?

Spark JdbcUtils.scala部分源码:

  def saveTable(df: DataFrame,url: String,table: String,properties: Properties = new Properties()) {
    val dialect = JdbcDialects.get(url)
    val nullTypes: Array[Int] = df.schema.fields.map { field =>
      dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
        field.dataType match {
          case IntegerType => java.sql.Types.INTEGER
          case LongType => java.sql.Types.BIGINT
          case DoubleType => java.sql.Types.DOUBLE
          case FloatType => java.sql.Types.REAL
          case ShortType => java.sql.Types.INTEGER
          case ByteType => java.sql.Types.INTEGER
          case BooleanType => java.sql.Types.BIT
          case StringType => java.sql.Types.CLOB
          case BinaryType => java.sql.Types.BLOB
          case TimestampType => java.sql.Types.TIMESTAMP
          case DateType => java.sql.Types.DATE
          case t: DecimalType => java.sql.Types.DECIMAL
          case _ => throw new IllegalArgumentException(
            s"Can't translate null value for field $field")
        })
    }

    val rddSchema = df.schema
    val driver: String = DriverRegistry.getDriverClassName(url)
    val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)
    // ****************** here ****************** 
    df.foreachPartition { iterator =>
      savePartition(getConnection, table, iterator, rddSchema, nullTypes)
    }
  }
 

嗯。。。既然Scala能实现,那么作为他的爸爸,Java也应该能玩!我们看看foreachPartition的方法原型:

def foreachPartition(f: Iterator[Row] => Unit)

又是函数式语言最爱的匿名函数。。。非常讨厌写lambda,所以我们还是实现个匿名类吧。要实现的抽象类为:
scala.runtime.AbstractFunction1<Iterator<Row>,BoxedUnit> 两个模板参数,第一个很直观,就是Row的迭代器,作为函数的参数。第二个BoxedUnit,是函数的返回值。不熟悉Scala的可能会很困惑,其实这就是Scala的void。由于Scala函数式编程的特性,代码块的末尾必须返回点什么,于是他们就搞出了个unit来代替本应什么都没有的void(解释得可能不是很准确,我是这么理解的)。对于Java而言,我们可以直接使用BoxedUnit.UNIT,来得到这个“什么都没有”的东西。
来玩耍一下吧!

df.foreachPartition(new AbstractFunction1<Iterator<Row>, BoxedUnit>() {
    @Override
    public BoxedUnit apply(Iterator<Row> it) {
        while (it.hasNext()){
            System.out.println(it.next().toString());
        }
        return BoxedUnit.UNIT;
    }
});

嗯,maven complete一下,spark-submit看看~
好勒~抛异常了
org.apache.spark.SparkException: Task not serializable
Task不能被序列化
嗯哼,想想之前实现UDF的时候,UDF1/2/3/4...各接口,都extends Serializable,也就是说,在Spark运行期间,Driver会把UDF接口实现类序列化,并在Executor中反序列化,执行call方法。。。这就不难理解了,我们foreachPartition丢进去的类,也应该implements Serializable。这样,我们就得自己搞一个继承AbstractFunction1<Iterator<Row>, BoxedUnit>,又实现Serializable的抽象类,给我们这些匿名类去实现!

import org.apache.spark.sql.Row;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

import java.io.Serializable;

public abstract class JavaForeachPartitionFunc extends AbstractFunction1<Iterator<Row>, BoxedUnit> implements Serializable {
}

可是每次都要return BoxedUnit.UNIT 搞得太别扭了,没一点Java的风格。

import org.apache.spark.sql.Row;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

import java.io.Serializable;

public abstract class JavaForeachPartitionFunc extends AbstractFunction1<Iterator<Row>, BoxedUnit> implements Serializable {
    @Override
    public BoxedUnit apply(Iterator<Row> it) {
        call(it);
        return BoxedUnit.UNIT;
    }
    
    public abstract void call(Iterator<Row> it);
}

于是我们可以直接Override call方法,就可以用满满Java Style的代码去玩耍了!

df.foreachPartition(new JavaForeachPartitionFunc() {
    @Override
    public void call(Iterator<Row> it) {
        while (it.hasNext()){
            System.out.println(it.next().toString());
        }
    }
});

注意!我们实现的匿名类的方法,实际上是在executor上执行的,所以println是输出到executor机器的stdout上。这个我们可以通过Spark的web ui,点击具体Application的Executor页面去查看(调试用的虚拟机集群,手扶拖拉机一样的配置,别吐槽了~)

至于foreach方法同理。只不过把Iterator<Row> 换成 Row。具体怎么搞,慢慢玩吧~~~have fun~

 

https://segmentfault.com/a/1190000005365244

分享到:
评论

相关推荐

    SparkSQ操作DataFrame,合并DataFrame

    例子中定义了多个List数据集合,包括用户信息,订单信息,用户订单信息,将List对象生成DataFrame,使用SparkSQL查询将多个DataFrame合成一个DataFrame,使用Scala语言编写。

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。Dataframe可以看作是具有模式信息的分布式数据集。Dataset是类型安全的DataFrame。 在本文中,将详细介绍Spark的RDD API,...

    JAVA spark创建DataFrame的方法

    下面将详细介绍如何在Java中使用Spark创建DataFrame。 首先,创建SparkSession是使用Spark的第一步。SparkSession是Spark 2.x引入的,它是SparkSession、SQLContext和HiveContext的统一接口,用于执行Spark SQL和...

    大数据处理框架:Spark:Spark SQL与DataFrame教程.docx

    大数据处理框架:Spark:Spark SQL与DataFrame教程.docx

    Python DataFrame设置/更改列表字段/元素类型的方法

    Python DataFrame 如何设置列表字段/元素类型? 比如笔者想将列表的两个字段由float64设置为int64,那么就要用到DataFrame的astype属性,举例如图: 该例列表为“m_pred_survived”字段为“PassengerId”及...

    Spark SQL常见4种数据源详解

    Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...

    Spark dataset and dataframe 深入分析

    Spark Dataset和DataFrame深入分析知识点: 1. Spark Catalyst优化器框架:Catalyst是Spark中用于操作关系运算符树的优化器框架,它为执行Spark中的关系查询(SQL)提供了原生支持。这个框架将SQL查询转换为可执行...

    Spark DataFrame详解.zip

    此外,DataFrame还支持DataFrame API的跨语言互操作性,使得Python、Scala、Java和R开发者都能方便地使用Spark进行大数据处理。 总结来说,Spark DataFrame是Spark中的重要数据结构,它提供了一种声明式的数据处理...

    howardyan93#notes_md#spark学习之dataframe1

    但是因为spark支持pandas的转换,因此可以借助pandas来读取数据,最后转为spark支持的dataframe:原来的代码如下,大家可以体会下:感觉有

    spark-hbase-ingestion:Spark HBase使用DataFrame进行读写

    使用数据框的spark-hbase-ingestion / ** 转换记录以插入HBase的方法 @param记录 @param cf列族 @返回 */ def toHbaseRecords(记录:Array [(String,Array [(String,String)])],cf:String):RDD [...

    spark2.1.0.chm(spark java API)

    3. DataFrame和Dataset操作:Java API提供了`SparkSession`,通过它创建DataFrame/Dataset,支持SQL查询和转换操作。 三、Shell命令在Spark开发中的应用 Spark Shell是交互式环境,可以测试和实验Spark API。在Java...

    roohom#Code-Cookbook#[Spark]将Spark DataFrame中的数值取出1

    [Spark]将Spark DataFrame中的数值取出有时候经过Spark SQL计算得到的结果往往就一行,而且需要将该结果取出,作为字符串参与别的代码块的

    spark-scala-examples:该项目以Scala语言提供了Apache Spark SQL,RDD,DataFrame和Dataset示例

    有关该项目中存在的所有Spark...Dataset列使用Spark DataFrame Where过滤器Spark SQL“何时发生”和“何时发生” Collect()–从Spark RDD / DataFrame检索数据Spark –如何删除重复的行如何旋转和取消旋转Spark DataF

    spark dataframe 将一列展开,把该列所有值都变成新列的方法

    ### Spark DataFrame将一列展开,把该列所有值都变成新列的方法 在处理大数据时,Apache Spark 是一个非常强大的工具。特别是在数据处理与分析领域,Spark 的 DataFrame API 提供了丰富的功能来帮助用户高效地操作...

    Spark dataframe使用详解

    Spark DataFrame 使用详解 Spark DataFrame 是一种基于 RDD 的分布式数据集,它提供了详细的结构信息,能够清楚地知道该数据集中包含哪些列、每列的名称和类型。相比于 RDD,DataFrame 的优点在于能够直接获得数据...

    Spark DataFrame

    Apache Spark DataFrame是大数据处理领域的一项重要技术,它在分布式数据集(RDD)的基础上,提供了一个更加高效和易于使用的数据处理模型。Spark DataFrame不仅继承了Spark的强大计算能力,还融入了关系型处理的...

    spark2.1.0 JAVA API

    5. **DataFrame/Dataset**: Spark SQL引入的DataFrame和Dataset是更高级的数据抽象,它们提供了强类型和SQL查询的能力。在Java中,可以通过`SparkSession`来操作DataFrame和Dataset。例如: ```java SparkSession ...

    pandas.DataFrame选取/排除特定行的方法

    在数据分析工作中,经常需要根据一定的条件对数据进行筛选,pandas库中的DataFrame对象提供了灵活的数据筛选功能,本文将详细介绍如何使用pandas进行DataFrame中的特定行的选取与排除。 首先,我们需要了解的是...

Global site tag (gtag.js) - Google Analytics