一、Spark SQL定义:Spark的一个针对结构化数据操作的一个模块
作用: 1 用SQL或者DataFrame进行结构化数据处理在Spark的程序中 2 可以处理任何数据源包括:Hive,Parquet,ORC,json, 和jdbc,甚至可以将这些数据进行join操作 3 对于Hive中已经存在的数据进行查询和UDF的处理 4 可以进行JDBC连接进行数据处理。
二、流程:
1 创建SparkConf
2 设置配置参数:conf.setAppName("appname").setMaster("local[*]");
3 创建JavaSparkContext
4 创建SQLContext 上下文环境
5 通过SQLContext来读取结构化数据
6 进行DataFrame或者SQL操作来处理数据
7 stop和close打开的SQLContext
8 配置.setMaster("local[*]"),如果在集群中跑则去掉这个配置
三、json数据读取
理解DataFrame中的数据结构:
DataFrame中包含RDD和Schema,其中RDD是它的数据,Schema是数据的结构
Schema中是StructType->StructFiled->字段名称,类型,是否为空字段
/** * 读取json数据 * @param sqlContext */ public void testShow(SQLContext sqlContext){ //默认读取hdfs中的文件,如果读取本地文件需要添加file:\\ DataFrame df = sqlContext.read().json("file:\\E:\\sparktestplace\\json.txt") ; //DataFrame df = sqlContext.read().parquet("/examples_testr/data/resources17/part-r-00000-a3ec949c-cc17-4db6-8e73-0e0ea673af53.gz.parquet") ; df.show(); //打印schema df.printSchema(); df.select("name").show(); df.select(df.col("name"), df.col("age").plus(1)).show(); StructType st = df.schema() ; StructField[] sfs = st.fields(); //打印字段的名称 和字段的类型 System.out.println(sfs[0].name() +" "+sfs[0].dataType().typeName() ); System.out.println(sfs[1].name() +" "+sfs[1].dataType().typeName() ); return ; }
四、数据库读取数据
public void testSQL(SQLContext sqlContext){ String tableName = "test_mysql" ; String tableName2 = "spark_test_table" ; //连接数据的基本参数 HashMap<String, String> options = new HashMap<String, String>(); options.put("url", "jdbc:mysql://192.168.0.213:3306/data" ); options.put("driver", "com.mysql.jdbc.Driver"); options.put("dbtable",tableName ); options.put("user", "username" ); options.put("password", "123456" ); //读取数据库的数据 DataFrame df = sqlContext.read().format("jdbc").options( options).load(); //带有过滤条件 //DataFrame df3 = sqlContext.read().jdbc("jdbc:mysql://192.168.0.213:3306/data", // tableName,new String[]{"sex='nan'"}, prop); //DataFrame df3 = sqlContext.sql( "SELECT * FROM "+tableName ) ; //df3.show(); //写数据 df.write().mode(SaveMode.Overwrite).format("parquet").save("file:\\E:\\"+tableName2+".txt"); JavaRDD<Row> rdd = df.javaRDD(); printRdd( rdd ); }
五、打印RDD
public static void printRdd(JavaRDD<Row> rdd ){ if(rdd==null){ System.out.println( " rdd is null "); return ; } rdd.toDebugString();//打印rdd的血统数据 Iterator<Row> it=rdd.collect().iterator(); while(it.hasNext()){ Row row = it.next(); printStructField(row.schema() ); printRow(row); } }
六、打印Row数据
public static void printRow(Row row ){ Integer rowSize = getRowSize(row.schema()); for(int i=0;i<rowSize;i++){ System.out.println( "the row of "+ i+" value is "+row.get(i)); } }
七、获得row的列数
public static Integer getRowSize( StructType st ){ StructField[] sfs = st.fields() ; return sfs.length; }
八、打印数据结构
public static void printStructField( StructType st ){ StructField[] sfs = st.fields() ; if( sfs==null || sfs.length==0){ return ; } for(StructField sf : sfs){ System.out.println(" the struct of field "+ sf.name() + " ,type is "+sf.dataType().typeName() ); } }
九 读取parque数据
public void parqueFile(SQLContext sqlContext){ Properties prop = new Properties(); prop.put("user", "username"); prop.put("password", "pwd"); String tableName = "spark_test_table" ; sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") ; //parque中schema的兼容性更好 //sqlContext.read().parquet("" ) DataFrame df = sqlContext.parquetFile("E:\\spark_test_table.txt") ; System.out.println("count=="+ df.count() ); df.registerTempTable(tableName); sqlContext.sql("select * from "+tableName).show(); df.write().mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.0.213:3306/data", tableName, prop); //sqlContext.table }
十、RDD 与 DataFrame之间的转换操作
public void rddToDataFrame(JavaSparkContext sc , SQLContext sqlContext ){ // Load a text file and convert each line to a JavaBean. JavaRDD<Person> people = sc.textFile("e://sparktestplace/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); System.out.println("============="+person ); return person; } }); // Apply a schema to an RDD of JavaBeans and register it as a table. DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class); schemaPeople.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") ; teenagers.show(); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. // List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { // public String call(Row row) { // return "Name: " + row.getString(0); // } // }).collect(); // for (int i = 0; i <teenagerNames.size(); i++) { // System.out.println( teenagerNames.get(i) ); // } }
十一、自己定义数据结构
public void defineSchema(JavaSparkContext sc , SQLContext sqlContext ){ // Load a text file and convert each line to a JavaBean. JavaRDD<String> people = sc.textFile("e://sparktestplace/people.txt"); // The schema is encoded in a string String schemaString = "name age"; // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<StructField>(); for (String fieldName: schemaString.split(" ")) { fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true)); } StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows. JavaRDD<Row> rowRDD = people.map( new Function<String, Row>() { public Row call(String record) throws Exception { String[] fields = record.split(","); return RowFactory.create(fields[0], fields[1].trim()); } }); // Apply the schema to the RDD. DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table. peopleDataFrame.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame results = sqlContext.sql("SELECT name FROM people"); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> names = results.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); }
十二、加载parque数据
public void loadAndSave( SQLContext sqlContext ){ DataFrame df = sqlContext.read().load("E:\\spark_test_table.txt"); //SQL进行数据处理 df.registerTempTable("user"); DataFrame df2 = sqlContext.sql("select * from user where age=2"); df2.show(); //写数据 //df.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); //df.select("id", "username", "age").write().save("E:\\sparktestplace\\spark_test_table_write.parquet"); df.show(); }
十三、 自定义UDF使用
/** * 自定义UDF使用 * format的各种格式:json, parquet, jdbc * SaveMode.ErrorIfExists , SaveMode.Append, SaveMode.Overwrite, SaveMode.Ignore * @param sqlContext */ public void specifyingOptions( SQLContext sqlContext ){ DataFrame df = sqlContext.read().format("json").load("file:\\E:\\sparktestplace\\json.txt"); //df.select("name", "email").write().mode(SaveMode.Overwrite).format("parquet").save("E:\\sparktestplace\\spark_test_table_write2.parquet"); sqlContext.registerDataFrameAsTable(df, "user"); sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);//自定义UDF使用 sqlContext.udf().register("replace", new UDF2<String,String,String>(){ @Override public String call(String t1, String t2) throws Exception { return t1+"== "+t2; } }, DataTypes.StringType); DataFrame df2 = sqlContext.sql("select strLen(country),replace(country,'==') ,email,id,ip ,name from user"); df2.show(); //DataFrame df2 = sqlContext.read().format("parquet").load("E:\\sparktestplace\\spark_test_table_write2.parquet"); //df2.show(); }
十四、sql 直接读取parque文件数据
/** * sql 直接读取parque文件数据 * @param sqlContext */ public void sqlReadFile( SQLContext sqlContext ){ DataFrame df = sqlContext.sql("SELECT * FROM parquet.`E:\\sparktestplace\\spark_test_table_write2.parquet`"); df.show(); df.printSchema(); }
相关推荐
Spark纯净版安装包是一个用于安装Apache Spark的软件包,该软件包提供了Spark的基本功能和组件,但不包含任何额外的扩展或依赖项。纯净版安装包旨在提供一个轻量级、简单易用的Spark安装选项,适用于用户希望快速...
Spark是Apache基金会下的一个开源大数据处理框架,以其高效、易用和可扩展性著称。Spark的核心设计理念是基于内存计算,极大地提升了数据处理速度。在本压缩包中,"spark-3.4.0-bin-without-hadoop"是Spark的一个预...
Apache Spark是一个快速的分布式计算系统,它提供了一个高层次的API,支持Java、Scala、Python和R。Spark可以运行在Hadoop、Apache Mesos、Kubernetes、独立,或开发人员自己的机器上。它旨在扩展以支持各种数据处理...
在现代大数据处理领域,Spark和Spring Boot的整合已经成为一种常见的技术组合。Spark作为一个快速、通用且可扩展的大数据处理框架,而Spring Boot则为构建微服务提供了简洁、高效的解决方案。本篇文章将深入探讨如何...
【标题】: "openfire+spark+sparkweb的配置指南" 【内容】: 本文将详细介绍如何配置openfire、spark和sparkweb这三款组件,它们是构建即时通讯系统的常用工具。Openfire是一款开源的XMPP服务器,Spark是基于Java的...
《Spark技术深度解析:从SparkCount到SparkSQL与SparkStreaming》 Spark,作为大数据处理领域的重要框架,以其高效、易用的特点受到了广泛的关注。在"SparkDemo.rar"这个压缩包中,包含了三个关键领域的示例:Spark...