//利用cogroup 处理分隔符的文件 import org.apache.spark.{ SparkContext, SparkConf } import java.sql.DriverManager object HandleGroup extends App{ val beginTime = System.currentTimeMillis() //引用spark val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]") val sc = new SparkContext(conf) //获取文件 val txt1 = sc.textFile("i:/1/fg1.txt").map(line =>line.split( "\\|")) val txt2 = sc.textFile("i:/1/fg2.txt").map(line =>line.split( "\\|")) val results = txt1.map{v =>(v(0)+v(1)+v(5),v) }.cogroup( txt2.map{v =>(v(0)+v(1)+v(5),v)}).filter { case (k, (v1, v2)) => !(v1.nonEmpty&&v2.nonEmpty) } //JDBC预处理 val driver = "oracle.jdbc.driver.OracleDriver" val url = "jdbc:oracle:thin:@" val username = "" val password = "" Class.forName(driver) val connection = DriverManager.getConnection(url, username, password) val statement = connection.createStatement() //遍历结果集 results.foreach{case (k,(v1,v2)) => v1.foreach(a =>runSql(a(0),a(1),a(5))) v2.foreach(a => runSql(a(0),a(1),a(5))) } //执行sql语句 def runSql(v1:String,v2:String,v3:String): Unit = statement.addBatch("insert into nidaye( v1, v2, v3) values('"+v1+"','"+v2+"','"+v3+"')") //提交 val resultSet = statement.executeBatch() connection.close() println(System.currentTimeMillis() - beginTime) }
//利用SPARKSQL 处理定长分隔的文件 import java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{ SparkContext, SparkConf } import org.apache.spark.sql.Row import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType } // 导入 Row. import org.apache.spark.sql.Row; // 导入 Spark SQL 数据类型 import org.apache.spark.sql.types.{ StructType, StructField, StringType }; object HandleLength extends App{ val beginTime = System.currentTimeMillis() //引用spark val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]") val sc = new SparkContext(conf) //JDBC预处理 val driver = "oracle.jdbc.driver.OracleDriver" val url = "jdbc:oracle:thin" val username = "" val password = "" Class.forName(driver) val connection = DriverManager.getConnection(url, username, password) val statement = connection.createStatement() //创建sparkSql上下文 val sqlContext = new org.apache.spark.sql.SQLContext(sc) //获取数据源 val people = sc.textFile("i:/1/dc1g_1_temp.txt") val dog = sc.textFile("i:/1/dc1g_2_temp.txt") val t1 = (System.currentTimeMillis() - beginTime) //定义schema val schemaResources = "v1 v2 v3 v4 v5 v6 v7 v8 v9 v10 v11 v12 v13 v14 v15 v16 v17 v18 v19 v20 v21 v22 v23 v24 v25 v26" //val schemaResources = "v1 v2 v3" // 转化成结构化类型 val Resources1 = StructType(schemaResources.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val Resources2 = StructType(schemaResources.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val t2 = (System.currentTimeMillis() - beginTime) //将数据源进行重新匹配 val rowRDD1 = people.map(p => splitString(p)).map(p => Row(p(0).trim, p(1).trim, p(2).trim, p(3).trim, p(4).trim, p(5).trim, p(6).trim, p(7).trim, p(8).trim, p(9).trim , p(10).trim, p(11).trim, p(12).trim, p(13).trim, p(14).trim, p(15).trim, p(16).trim, p(17).trim, p(18).trim, p(19).trim, p(20).trim, p(21).trim, p(22).trim, p(23).trim, p(24).trim, p(25).trim)) val rowRDD2 = dog.map(p => splitString(p)).map(p => Row(p(0).trim, p(1).trim, p(2).trim, p(3).trim, p(4).trim, p(5).trim, p(6).trim, p(7).trim, p(8).trim, p(9).trim , p(10).trim, p(11).trim, p(12).trim, p(13).trim, p(14).trim, p(15).trim, p(16).trim, p(17).trim, p(18).trim, p(19).trim, p(20).trim, p(21).trim, p(22).trim, p(23).trim, p(24).trim, p(25).trim)) val t3 = (System.currentTimeMillis() - beginTime) // 将模板和RDD进行数据结构化 val peopleDataFrame = sqlContext.createDataFrame(rowRDD1, Resources1) val dogDataFrame = sqlContext.createDataFrame(rowRDD2, Resources2) val t4 = (System.currentTimeMillis() - beginTime) // 注册DataFrames为表。 peopleDataFrame.registerTempTable("res1") dogDataFrame.registerTempTable("res2") import org.apache.spark.storage.StorageLevel //持久化 peopleDataFrame.persist(StorageLevel.DISK_ONLY ) dogDataFrame.persist(StorageLevel.DISK_ONLY ) val t5 = (System.currentTimeMillis() - beginTime) // SQL语句可以通过使用由sqlContext提供的SQL方法运行。 //使用126 val resultsAB = sqlContext.sql("SELECT t1.v1,t1.v2,t1.v3 FROM res1 t1 except SELECT t2.v1,t2.v2,t2.v3 FROM res2 t2 ") val t8 = (System.currentTimeMillis() - beginTime) val resultsBA = sqlContext.sql("SELECT t2.v1,t2.v2,t2.v3 FROM res2 t2 except SELECT t1.v1,t1.v2,t1.v3 FROM res1 t1 ") val t6 = (System.currentTimeMillis() - beginTime) // SQL查询的结果是DataFrames支持所有的正常的RDD操作。 //val sum = results //println("-----单边A未匹配的总数:"+resultsAB.count()+" ----------") //println("-----单边B未匹配的总数:"+resultsBA.count()+" ----------") //results.map(t => "v1: " + t(0)+" v2: " + t(1) + " v3:"+ t(2)).collect().foreach(println) val t7 = (System.currentTimeMillis() - beginTime) //入库 resultsAB.collect().foreach(t => runSql(t(0).toString+"-AB",t(1).toString+"-AB",t(2).toString+"-AB")) resultsBA.collect().foreach(t => runSql(t(0).toString+"-BA",t(1).toString+"-BA",t(2).toString+"-BA")) val t9 = (System.currentTimeMillis() - beginTime) //按长度截取 def splitString(str:String):Array[String] ={ val loop ="19,45,65,92,111,118,123,133,143,153,158,165,172,179,184,191,197,200,206,223,226,232,266,271,276,282,289" val arrLoop = loop.split(",") val arrL = arrLoop.map(a => Integer.parseInt(a)) val arr = new Array[String](26) var i:Int = -1 for(a <- arrL if(i!=25)){ i = i+1 arr(i) = str.substring(arrL(i),arrL(i+1)) } arr(25) =str.substring(arrL(25)) return arr } //执行sql语句 def runSql(v1:String,v2:String,v3:String): Unit = statement.addBatch("insert into nidaye( v1, v2, v3) values('"+v1+"','"+v2+"','"+v3+"')") //println("SQL执行完成: "+resultSet) val resultSet = statement.executeBatch() connection.close() println("总耗时:"+(System.currentTimeMillis() - beginTime)) println(t1+" "+t2+" "+t3+" "+t4+" "+t5+" "+t6+" "+t7+" "+t8+" "+t9) }
//利用SPARKSQL 处理分隔符的文件 import java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{ SparkContext, SparkConf } import org.apache.spark.sql.Row import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType } // 导入 Row. import org.apache.spark.sql.Row; // 导入 Spark SQL 数据类型 import org.apache.spark.sql.types.{ StructType, StructField, StringType }; object Handle extends App { val beginTime = System.currentTimeMillis() //引用spark val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]") val sc = new SparkContext(conf) //JDBC预处理 val driver = "oracle.jdbc.driver.OracleDriver" val url = "jdbc:oracle:thin" val username = "" val password = "" Class.forName(driver) val connection = DriverManager.getConnection(url, username, password) val statement = connection.createStatement() //创建sparkSql上下文 val sqlContext = new org.apache.spark.sql.SQLContext(sc) //获取数据源 val people = sc.textFile("i:/1/dc1g_1_temp.txt") val dog = sc.textFile("i:/1/dc1g_2_temp.txt") //定义schema val schemaResources = "v0 v1 v2 v3 v4 v5 v6 v7 v8 v9 v10 v11 v12 v13 v14 v15 v16 v17 v18 v19 v20 v21 v22 v23 v24 v25 v26" //val schemaResources = "v1 v2 v3" // 转化成结构化类型 val Resources1 = StructType(schemaResources.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val Resources2 = StructType(schemaResources.split(" ").map(fieldName => StructField(fieldName, StringType, true))) //将数据源进行重新匹配 val rowRDD1 = people.map(_.split("\\|")).map(p => Row(p(0).trim, p(1).trim, p(2).trim, p(3).trim, p(4).trim, p(5).trim, p(6).trim, p(7).trim, p(8).trim, p(9).trim , p(10).trim, p(11).trim, p(12).trim, p(13).trim, p(14).trim, p(15).trim, p(16).trim, p(17).trim, p(18).trim, p(19).trim, p(20).trim, p(21).trim, p(22).trim, p(23).trim, p(24).trim, p(25).trim, p(26).trim)) val rowRDD2 = dog.map(_.split("\\|")).map(p => Row(p(0).trim, p(1).trim, p(2).trim, p(3).trim, p(4).trim, p(5).trim, p(6).trim, p(7).trim, p(8).trim, p(9).trim , p(10).trim, p(11).trim, p(12).trim, p(13).trim, p(14).trim, p(15).trim, p(16).trim, p(17).trim, p(18).trim, p(19).trim, p(20).trim, p(21).trim, p(22).trim, p(23).trim, p(24).trim, p(25).trim, p(26).trim)) // 将模板和RDD进行数据结构化 val peopleDataFrame = sqlContext.createDataFrame(rowRDD1, Resources1) val dogDataFrame = sqlContext.createDataFrame(rowRDD2, Resources2) import org.apache.spark.storage.StorageLevel //持久化 peopleDataFrame.persist(StorageLevel.DISK_ONLY ) dogDataFrame.persist(StorageLevel.DISK_ONLY ) //peopleDataFrame.filter(dogDataFrame) // 注册DataFrames为表。 peopleDataFrame.registerTempTable("res1") dogDataFrame.registerTempTable("res2") // SQL语句可以通过使用由sqlContext提供的SQL方法运行。 //使用126 val resultsAB = sqlContext.sql("SELECT t1.v0,t1.v1,t1.v2,t1.v3 FROM res1 t1 left outer join res2 t2 on t1.v0=t2.v0 where t2.v1 is null ") val t8 = (System.currentTimeMillis() - beginTime) val resultsBA = sqlContext.sql("SELECT t2.v0,t2.v1,t2.v2,t2.v3 FROM res2 t2 left outer join res1 t1 on t1.v0=t2.v0 where t1.v1 is null ") // SQL查询的结果是DataFrames支持所有的正常的RDD操作。 resultsAB.collect().foreach(t => runSql(t(1).toString+"-AB",t(2).toString+"-AB",t(3).toString+"-AB")) resultsBA.collect().foreach(t => runSql(t(1).toString+"-BA",t(2).toString+"-BA",t(3).toString+"-BA")) //执行sql语句 def runSql(v1:String,v2:String,v3:String): Unit = statement.addBatch("insert into nidaye( v1, v2, v3) values('"+v1+"','"+v2+"','"+v3+"')") val resultSet = statement.executeBatch() connection.close() println(System.currentTimeMillis() - beginTime) }
相关推荐
在"SparkDemo.rar"这个压缩包中,包含了三个关键领域的示例:Spark Count、Spark SQL以及Spark Streaming。接下来,我们将深入探讨这些技术的核心概念和应用。 首先,Spark Count是Spark中最基础的操作之一,用于...
这允许Spark应用程序使用Hive的表和函数,同时利用Spark的高性能计算能力。 步骤一:环境配置 确保你已经安装了Apache Spark和Hive,并且它们的版本兼容。在配置Spark时,需要在`spark-defaults.conf`文件中指定...
在这个"spark_学习demo"中,我们将重点探讨Spark与Java的结合应用。 1. **Spark核心概念** - **RDD(弹性分布式数据集)**:Spark的核心数据抽象,是不可变、分区的记录集合,可以在集群中进行并行操作。 - **DAG...
在“spark计数demo”中,我们将利用Spark的`JavaPairRDD`,它是一个包含键值对的RDD,用于进行MapReduce风格的操作。我们的目标是读取文本文件,将每个单词作为键,出现次数作为值,最后统计出每个单词的总数。 ...
根据提供的信息,我们可以总结出以下有关Spark MLlib的知识点,主要关注K-Means算法以及相关的推荐系统案例。 ...通过理解和掌握这些内容,可以帮助初学者更好地利用Spark MLlib进行机器学习实践。
收集的Spark快速大数据开发Demo,非常不错的学习资料。
【SparkDemo12】是一个基于Apache Spark的演示项目,它可能是为了展示Spark的核心功能和用法。Apache Spark是一个用于大规模数据处理的开源集群计算系统,以其高性能、易用性和广泛的功能而闻名。在这个Demo中,我们...
本篇文章将围绕"Sparkdemo_202108.7z"这一压缩包中的内容,深入探讨Spark的相关知识点,包括Spark的基本架构、核心组件、数据处理模型以及实际应用。 一、Spark概述 Apache Spark是一个开源的集群计算系统,由加州...
- 不懂运行,下载完可以私聊问...3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。 --------
这是一个基于Scala语言开发的Spark RDD、Spark SQL、Spark Streaming相关Demo,包含35个文件。主要文件类型包括29个Scala源文件、2个Markdown文档、1个Reduced文件、1个XML文件、1个Java源文件和1个TXT文件。该项目...
4. 图像处理:使用Spark MLlib库进行图像分类和识别的Demo,展示机器学习在Spark中的应用。 5. 流处理:通过DStream进行实时数据处理,例如Twitter流分析,展示Spark Streaming的窗口和滑动窗口操作。 三、Spark...
简单一个示例,演示Spark中DataFrame的创建与操作
scala是一种基于JVM的面向对象的函数编程,scala编程相对于java而言代码整洁、开发效率更高。 其中scala优点主要有: 1:面向对象,可以定义class,通过new调用实例...5:目前比较流行的kafka,spark均由scala开发。
总之,这份使用指南旨在为Java背景的开发人员提供Spark应用开发的实践经验,特别是针对卡口碰撞分析场景,通过实例展示了如何利用Spark的API进行数据处理和分析。对于希望深入了解Spark并进行大数据应用开发的人来说...
SparkDemo, spark示例代码,有一些生产实践 SparkDemo我们还提供了一些实际的案例,这些案例是对真实场景的抽象,展示了一个实际项目开发需要考虑的问题,这些案例经过了充分测试,读者完全可以在实际项目中参考。...
例如,一个典型的Spark应用可能是从HDFS读取大量日志数据,使用Spark SQL进行复杂查询和过滤,然后通过Spark Streaming进行实时流处理,最后利用MLlib训练模型进行异常检测或用户行为预测。 总结来说,Spark以其...
基于Spark ALS的离线推荐系统demo代码,欢迎大神们纠bug指教
Spark演示 使用来自现代艺术博物馆(MoMA)馆藏的数据(github ) momadm. (2015). collection: First release - ... docker run -i -t -p 8888:8888 freemanlab/sparkdemo 并将您的浏览器指向' '(Linux)或' OS X)
【SparkDemo学习样例】是针对Apache Spark框架与Scala编程语言的一个实践教程,它提供了丰富的示例代码,帮助初学者和开发者深入理解如何在大数据处理中应用Spark和Scala。这个资源已被验证为高质量且非常实用。 ...
在这个"阿里云EMR spark kafka redis MongoDB例子demo"中,我们看到了如何整合这些技术,构建一个实时数据处理系统,从Kafka获取数据,利用Spark Streaming进行实时分析,然后将结果存储到Redis和MongoDB,实现了...