`

利用Spark进行对账demo

阅读更多
//利用cogroup 处理分隔符的文件
import org.apache.spark.{ SparkContext, SparkConf }
import java.sql.DriverManager
object HandleGroup extends App{

  val beginTime = System.currentTimeMillis()
  //引用spark
  val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
  val sc = new SparkContext(conf)

  //获取文件
  val txt1 = sc.textFile("i:/1/fg1.txt").map(line =>line.split( "\\|"))
  val txt2 = sc.textFile("i:/1/fg2.txt").map(line =>line.split( "\\|"))

  val results = txt1.map{v =>(v(0)+v(1)+v(5),v)  }.cogroup(
    txt2.map{v =>(v(0)+v(1)+v(5),v)}).filter {
    case (k, (v1, v2)) => !(v1.nonEmpty&&v2.nonEmpty) }

  //JDBC预处理
  val driver = "oracle.jdbc.driver.OracleDriver"
  val url = "jdbc:oracle:thin:@"
  val username = ""
  val password = ""
  Class.forName(driver)
  val connection = DriverManager.getConnection(url, username, password)
  val statement = connection.createStatement()

  //遍历结果集
  results.foreach{case (k,(v1,v2)) =>
    v1.foreach(a =>runSql(a(0),a(1),a(5)))
    v2.foreach(a => runSql(a(0),a(1),a(5)))
  }

  //执行sql语句
  def runSql(v1:String,v2:String,v3:String): Unit =
    statement.addBatch("insert into nidaye( v1, v2, v3) values('"+v1+"','"+v2+"','"+v3+"')")

  //提交
  val resultSet = statement.executeBatch()
  connection.close()
  println(System.currentTimeMillis() - beginTime)
}

 

//利用SPARKSQL 处理定长分隔的文件
import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD

import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType }

// 导入 Row.
import org.apache.spark.sql.Row;

// 导入 Spark SQL 数据类型
import org.apache.spark.sql.types.{ StructType, StructField, StringType };
object HandleLength extends App{
  val beginTime = System.currentTimeMillis()
  //引用spark
  val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
  val sc = new SparkContext(conf)

  //JDBC预处理
  val driver = "oracle.jdbc.driver.OracleDriver"
  val url = "jdbc:oracle:thin"
  val username = ""
  val password = ""
  Class.forName(driver)
  val connection = DriverManager.getConnection(url, username, password)
  val statement = connection.createStatement()

  //创建sparkSql上下文
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
   //获取数据源
  val people = sc.textFile("i:/1/dc1g_1_temp.txt")
  val dog = sc.textFile("i:/1/dc1g_2_temp.txt")
  val t1 = (System.currentTimeMillis() - beginTime)
  //定义schema
  val schemaResources = "v1 v2 v3 v4 v5 v6 v7 v8 v9 v10 v11 v12 v13 v14 v15 v16 v17 v18 v19 v20 v21 v22 v23 v24 v25 v26"
  //val schemaResources = "v1 v2 v3"

  // 转化成结构化类型
  val Resources1 = StructType(schemaResources.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
  val Resources2 = StructType(schemaResources.split(" ").map(fieldName => StructField(fieldName, StringType, true)))


  val t2 = (System.currentTimeMillis() - beginTime)
  //将数据源进行重新匹配
  val rowRDD1 = people.map(p => splitString(p)).map(p => Row(p(0).trim, p(1).trim, p(2).trim, p(3).trim, p(4).trim, p(5).trim, p(6).trim, p(7).trim, p(8).trim, p(9).trim
    , p(10).trim, p(11).trim, p(12).trim, p(13).trim, p(14).trim, p(15).trim, p(16).trim, p(17).trim, p(18).trim, p(19).trim, p(20).trim, p(21).trim, p(22).trim, p(23).trim, p(24).trim, p(25).trim))
  val rowRDD2 = dog.map(p => splitString(p)).map(p => Row(p(0).trim, p(1).trim, p(2).trim, p(3).trim, p(4).trim, p(5).trim, p(6).trim, p(7).trim, p(8).trim, p(9).trim
    , p(10).trim, p(11).trim, p(12).trim, p(13).trim, p(14).trim, p(15).trim, p(16).trim, p(17).trim, p(18).trim, p(19).trim, p(20).trim, p(21).trim, p(22).trim, p(23).trim, p(24).trim, p(25).trim))

  val t3 = (System.currentTimeMillis() - beginTime)
  // 将模板和RDD进行数据结构化
  val peopleDataFrame = sqlContext.createDataFrame(rowRDD1, Resources1)
  val dogDataFrame = sqlContext.createDataFrame(rowRDD2, Resources2)
  val t4 = (System.currentTimeMillis() - beginTime)
  // 注册DataFrames为表。

  peopleDataFrame.registerTempTable("res1")
  dogDataFrame.registerTempTable("res2")
  import org.apache.spark.storage.StorageLevel
  //持久化
  peopleDataFrame.persist(StorageLevel.DISK_ONLY  )
  dogDataFrame.persist(StorageLevel.DISK_ONLY  )

  val t5 = (System.currentTimeMillis() - beginTime)
  // SQL语句可以通过使用由sqlContext提供的SQL方法运行。
  //使用126
  val resultsAB = sqlContext.sql("SELECT t1.v1,t1.v2,t1.v3 FROM res1 t1 except SELECT t2.v1,t2.v2,t2.v3 FROM res2 t2  ")
  val t8 = (System.currentTimeMillis() - beginTime)
  val resultsBA = sqlContext.sql("SELECT t2.v1,t2.v2,t2.v3 FROM res2 t2 except SELECT t1.v1,t1.v2,t1.v3 FROM res1 t1  ")

  val t6 = (System.currentTimeMillis() - beginTime)
  // SQL查询的结果是DataFrames支持所有的正常的RDD操作。
  //val sum = results

  //println("-----单边A未匹配的总数:"+resultsAB.count()+" ----------")
  //println("-----单边B未匹配的总数:"+resultsBA.count()+" ----------")
  //results.map(t => "v1: " + t(0)+" v2: " + t(1) + " v3:"+ t(2)).collect().foreach(println)
  val t7 = (System.currentTimeMillis() - beginTime)
  //入库

  resultsAB.collect().foreach(t => runSql(t(0).toString+"-AB",t(1).toString+"-AB",t(2).toString+"-AB"))

  resultsBA.collect().foreach(t => runSql(t(0).toString+"-BA",t(1).toString+"-BA",t(2).toString+"-BA"))
  val t9 = (System.currentTimeMillis() - beginTime)

  //按长度截取
  def splitString(str:String):Array[String] ={
    val loop ="19,45,65,92,111,118,123,133,143,153,158,165,172,179,184,191,197,200,206,223,226,232,266,271,276,282,289"
    val arrLoop = loop.split(",")
    val arrL = arrLoop.map(a => Integer.parseInt(a))
    val arr = new Array[String](26)

    var i:Int = -1
    for(a <- arrL if(i!=25)){
      i = i+1
      arr(i)  = str.substring(arrL(i),arrL(i+1))
    }
    arr(25) =str.substring(arrL(25))

    return arr
  }


  //执行sql语句
  def runSql(v1:String,v2:String,v3:String): Unit =

    statement.addBatch("insert into nidaye( v1, v2, v3) values('"+v1+"','"+v2+"','"+v3+"')")
    //println("SQL执行完成: "+resultSet)


  val resultSet = statement.executeBatch()
  connection.close()
  println("总耗时:"+(System.currentTimeMillis() - beginTime))
  println(t1+" "+t2+" "+t3+" "+t4+" "+t5+" "+t6+" "+t7+" "+t8+" "+t9)
}

 

//利用SPARKSQL 处理分隔符的文件
import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD

import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType }

// 导入 Row.
import org.apache.spark.sql.Row;

// 导入 Spark SQL 数据类型
import org.apache.spark.sql.types.{ StructType, StructField, StringType };

object Handle extends App {
  val beginTime = System.currentTimeMillis()
  //引用spark
  val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
  val sc = new SparkContext(conf)

  //JDBC预处理
  val driver = "oracle.jdbc.driver.OracleDriver"
  val url = "jdbc:oracle:thin"
  val username = ""
  val password = ""
  Class.forName(driver)
  val connection = DriverManager.getConnection(url, username, password)
  val statement = connection.createStatement()

  //创建sparkSql上下文
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    //获取数据源
  val people = sc.textFile("i:/1/dc1g_1_temp.txt")
  val dog = sc.textFile("i:/1/dc1g_2_temp.txt")

  //定义schema
  val schemaResources = "v0 v1 v2 v3 v4 v5 v6 v7 v8 v9 v10 v11 v12 v13 v14 v15 v16 v17 v18 v19 v20 v21 v22 v23 v24 v25 v26"
  //val schemaResources = "v1 v2 v3"

  // 转化成结构化类型
  val Resources1 = StructType(schemaResources.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
  val Resources2 = StructType(schemaResources.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

  //将数据源进行重新匹配
  val rowRDD1 = people.map(_.split("\\|")).map(p => Row(p(0).trim, p(1).trim, p(2).trim, p(3).trim, p(4).trim, p(5).trim, p(6).trim, p(7).trim, p(8).trim, p(9).trim
    , p(10).trim, p(11).trim, p(12).trim, p(13).trim, p(14).trim, p(15).trim, p(16).trim, p(17).trim, p(18).trim, p(19).trim, p(20).trim, p(21).trim, p(22).trim, p(23).trim, p(24).trim, p(25).trim, p(26).trim))
  val rowRDD2 = dog.map(_.split("\\|")).map(p => Row(p(0).trim, p(1).trim, p(2).trim, p(3).trim, p(4).trim, p(5).trim, p(6).trim, p(7).trim, p(8).trim, p(9).trim
    , p(10).trim, p(11).trim, p(12).trim, p(13).trim, p(14).trim, p(15).trim, p(16).trim, p(17).trim, p(18).trim, p(19).trim, p(20).trim, p(21).trim, p(22).trim, p(23).trim, p(24).trim, p(25).trim, p(26).trim))

  // 将模板和RDD进行数据结构化
  val peopleDataFrame = sqlContext.createDataFrame(rowRDD1, Resources1)
  val dogDataFrame = sqlContext.createDataFrame(rowRDD2, Resources2)
  import org.apache.spark.storage.StorageLevel
  //持久化
  peopleDataFrame.persist(StorageLevel.DISK_ONLY  )
  dogDataFrame.persist(StorageLevel.DISK_ONLY  )

  //peopleDataFrame.filter(dogDataFrame)
  // 注册DataFrames为表。
  peopleDataFrame.registerTempTable("res1")
  dogDataFrame.registerTempTable("res2")




  // SQL语句可以通过使用由sqlContext提供的SQL方法运行。
  //使用126

  val resultsAB = sqlContext.sql("SELECT t1.v0,t1.v1,t1.v2,t1.v3 FROM res1 t1 left outer join res2 t2 on t1.v0=t2.v0 where t2.v1 is null ")
  val t8 = (System.currentTimeMillis() - beginTime)
  val resultsBA = sqlContext.sql("SELECT t2.v0,t2.v1,t2.v2,t2.v3 FROM res2 t2 left outer join res1 t1 on t1.v0=t2.v0 where t1.v1 is null ")

  // SQL查询的结果是DataFrames支持所有的正常的RDD操作。
  resultsAB.collect().foreach(t => runSql(t(1).toString+"-AB",t(2).toString+"-AB",t(3).toString+"-AB"))

  resultsBA.collect().foreach(t => runSql(t(1).toString+"-BA",t(2).toString+"-BA",t(3).toString+"-BA"))

  //执行sql语句
  def runSql(v1:String,v2:String,v3:String): Unit =

    statement.addBatch("insert into nidaye( v1, v2, v3) values('"+v1+"','"+v2+"','"+v3+"')")

  val resultSet = statement.executeBatch()
  connection.close()
  println(System.currentTimeMillis() - beginTime)
}

 

 

分享到:
评论

相关推荐

    SparkDemo.rar

    在"SparkDemo.rar"这个压缩包中,包含了三个关键领域的示例:Spark Count、Spark SQL以及Spark Streaming。接下来,我们将深入探讨这些技术的核心概念和应用。 首先,Spark Count是Spark中最基础的操作之一,用于...

    spark连接HIveDemo

    这允许Spark应用程序使用Hive的表和函数,同时利用Spark的高性能计算能力。 步骤一:环境配置 确保你已经安装了Apache Spark和Hive,并且它们的版本兼容。在配置Spark时,需要在`spark-defaults.conf`文件中指定...

    spark_学习demo

    在这个"spark_学习demo"中,我们将重点探讨Spark与Java的结合应用。 1. **Spark核心概念** - **RDD(弹性分布式数据集)**:Spark的核心数据抽象,是不可变、分区的记录集合,可以在集群中进行并行操作。 - **DAG...

    spark计数demo

    在“spark计数demo”中,我们将利用Spark的`JavaPairRDD`,它是一个包含键值对的RDD,用于进行MapReduce风格的操作。我们的目标是读取文本文件,将每个单词作为键,出现次数作为值,最后统计出每个单词的总数。 ...

    spark mllib 常用的自学demo

    根据提供的信息,我们可以总结出以下有关Spark MLlib的知识点,主要关注K-Means算法以及相关的推荐系统案例。 ...通过理解和掌握这些内容,可以帮助初学者更好地利用Spark MLlib进行机器学习实践。

    Spark快速大数据开发Demo

    收集的Spark快速大数据开发Demo,非常不错的学习资料。

    SparkDemo12

    【SparkDemo12】是一个基于Apache Spark的演示项目,它可能是为了展示Spark的核心功能和用法。Apache Spark是一个用于大规模数据处理的开源集群计算系统,以其高性能、易用性和广泛的功能而闻名。在这个Demo中,我们...

    sparkdemo_202108.7z

    本篇文章将围绕"Sparkdemo_202108.7z"这一压缩包中的内容,深入探讨Spark的相关知识点,包括Spark的基本架构、核心组件、数据处理模型以及实际应用。 一、Spark概述 Apache Spark是一个开源的集群计算系统,由加州...

    基于java api的spark常用算子demo+源代码+文档说明

    - 不懂运行,下载完可以私聊问...3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。 --------

    基于Scala的Spark RDD、Spark SQL、Spark Streaming相关Demo设计源码

    这是一个基于Scala语言开发的Spark RDD、Spark SQL、Spark Streaming相关Demo,包含35个文件。主要文件类型包括29个Scala源文件、2个Markdown文档、1个Reduced文件、1个XML文件、1个Java源文件和1个TXT文件。该项目...

    Spark各种demo学习

    4. 图像处理:使用Spark MLlib库进行图像分类和识别的Demo,展示机器学习在Spark中的应用。 5. 流处理:通过DStream进行实时数据处理,例如Twitter流分析,展示Spark Streaming的窗口和滑动窗口操作。 三、Spark...

    Spark DataFrame 演示Demo

    简单一个示例,演示Spark中DataFrame的创建与操作

    spark源码之scala基础语法demo

    scala是一种基于JVM的面向对象的函数编程,scala编程相对于java而言代码整洁、开发效率更高。 其中scala优点主要有: 1:面向对象,可以定义class,通过new调用实例...5:目前比较流行的kafka,spark均由scala开发。

    eSDK BigData V100R005C10 SparkDemo使用指南 01

    总之,这份使用指南旨在为Java背景的开发人员提供Spark应用开发的实践经验,特别是针对卡口碰撞分析场景,通过实例展示了如何利用Spark的API进行数据处理和分析。对于希望深入了解Spark并进行大数据应用开发的人来说...

    SparkDemo, spark示例代码,有一些生产实践.zip

    SparkDemo, spark示例代码,有一些生产实践 SparkDemo我们还提供了一些实际的案例,这些案例是对真实场景的抽象,展示了一个实际项目开发需要考虑的问题,这些案例经过了充分测试,读者完全可以在实际项目中参考。...

    spark-demo.7z

    例如,一个典型的Spark应用可能是从HDFS读取大量日志数据,使用Spark SQL进行复杂查询和过滤,然后通过Spark Streaming进行实时流处理,最后利用MLlib训练模型进行异常检测或用户行为预测。 总结来说,Spark以其...

    基于Spark ALS的离线推荐系统demo代码

    基于Spark ALS的离线推荐系统demo代码,欢迎大神们纠bug指教

    spark-demo:Spark演示

    Spark演示 使用来自现代艺术博物馆(MoMA)馆藏的数据(github ) momadm. (2015). collection: First release - ... docker run -i -t -p 8888:8888 freemanlab/sparkdemo 并将您的浏览器指向' '(Linux)或' OS X)

    SparkDemo学习样例

    【SparkDemo学习样例】是针对Apache Spark框架与Scala编程语言的一个实践教程,它提供了丰富的示例代码,帮助初学者和开发者深入理解如何在大数据处理中应用Spark和Scala。这个资源已被验证为高质量且非常实用。 ...

    阿里云emr spark kafka redis MongoDB例子demo

    在这个"阿里云EMR spark kafka redis MongoDB例子demo"中,我们看到了如何整合这些技术,构建一个实时数据处理系统,从Kafka获取数据,利用Spark Streaming进行实时分析,然后将结果存储到Redis和MongoDB,实现了...

Global site tag (gtag.js) - Google Analytics