`

Spark

 
阅读更多

 

一、Spark SQL定义:Spark的一个针对结构化数据操作的一个模块

  作用:
  1 用SQL或者DataFrame进行结构化数据处理在Spark的程序中
  2 可以处理任何数据源包括:Hive,Parquet,ORC,json, 和jdbc,甚至可以将这些数据进行join操作
  3 对于Hive中已经存在的数据进行查询和UDF的处理
  4 可以进行JDBC连接进行数据处理。 

 二、流程:

1 创建SparkConf
2 设置配置参数:conf.setAppName("appname").setMaster("local[*]");
3 创建JavaSparkContext
4 创建SQLContext 上下文环境 
5 通过SQLContext来读取结构化数据
6 进行DataFrame或者SQL操作来处理数据
7 stop和close打开的SQLContext
8 配置.setMaster("local[*]"),如果在集群中跑则去掉这个配置

 

 三、json数据读取 

    理解DataFrame中的数据结构:

    DataFrame中包含RDD和Schema,其中RDD是它的数据,Schema是数据的结构

    Schema中是StructType->StructFiled->字段名称,类型,是否为空字段

 

	/**
	 * 读取json数据
	 * @param sqlContext
	 */
	public void testShow(SQLContext sqlContext){
		//默认读取hdfs中的文件,如果读取本地文件需要添加file:\\
		DataFrame df = sqlContext.read().json("file:\\E:\\sparktestplace\\json.txt") ;
		//DataFrame df = sqlContext.read().parquet("/examples_testr/data/resources17/part-r-00000-a3ec949c-cc17-4db6-8e73-0e0ea673af53.gz.parquet") ;
		
		df.show();
		//打印schema
		df.printSchema();
		df.select("name").show();
		df.select(df.col("name"), df.col("age").plus(1)).show();
		
		StructType st = df.schema() ;
		StructField[] sfs = st.fields();
		//打印字段的名称 和字段的类型
		System.out.println(sfs[0].name() +" "+sfs[0].dataType().typeName() );
		System.out.println(sfs[1].name() +" "+sfs[1].dataType().typeName() );
		return ;
	}

 四、数据库读取数据

  

	public void testSQL(SQLContext sqlContext){
		String tableName = "test_mysql" ; 
		String tableName2 = "spark_test_table" ; 
 
		//连接数据的基本参数 
		HashMap<String, String> options = new HashMap<String, String>();
		options.put("url",   "jdbc:mysql://192.168.0.213:3306/data" );
		options.put("driver",  "com.mysql.jdbc.Driver");
		options.put("dbtable",tableName );
		options.put("user", "username" );
		options.put("password", "123456" );
		//读取数据库的数据
        DataFrame df = sqlContext.read().format("jdbc").options( options).load();
 
        //带有过滤条件
        //DataFrame df3 = sqlContext.read().jdbc("jdbc:mysql://192.168.0.213:3306/data",
       // 		tableName,new String[]{"sex='nan'"}, prop);
		//DataFrame df3 = sqlContext.sql( "SELECT * FROM "+tableName ) ; 
        //df3.show();
        //写数据
        df.write().mode(SaveMode.Overwrite).format("parquet").save("file:\\E:\\"+tableName2+".txt");
        
        JavaRDD<Row> rdd = df.javaRDD();
        printRdd( rdd );
	}

 五、打印RDD

 

	public static void printRdd(JavaRDD<Row> rdd ){
		if(rdd==null){
			System.out.println( " rdd is null ");
			return ;
		}
		 rdd.toDebugString();//打印rdd的血统数据
        Iterator<Row>  it=rdd.collect().iterator();
        while(it.hasNext()){
        	Row  row = it.next();
			printStructField(row.schema() );
			printRow(row);
        }
	}

 

 六、打印Row数据 

 

	public static void printRow(Row  row ){
		Integer rowSize = getRowSize(row.schema());
		for(int i=0;i<rowSize;i++){
			System.out.println( "the row of "+ i+"  value is "+row.get(i));
		}
	}	

 七、获得row的列数 

  

	public static Integer getRowSize( StructType  st  ){
		StructField[] sfs = st.fields() ; 
		return sfs.length;
	}	

 八、打印数据结构 

 

	public static void printStructField( StructType  st  ){
		StructField[] sfs = st.fields() ; 
		
		if( sfs==null || sfs.length==0){
			return  ;
		}
		for(StructField sf : sfs){
			System.out.println(" the struct of field "+ sf.name() + " ,type is  "+sf.dataType().typeName()  );
		}
	}

 

九  读取parque数据 

 

	public void parqueFile(SQLContext sqlContext){

        Properties prop = new Properties();
        prop.put("user", "username");
        prop.put("password", "pwd");
        
		String tableName = "spark_test_table" ; 
		sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") ; //parque中schema的兼容性更好
		
		//sqlContext.read().parquet("" )
		DataFrame  df = sqlContext.parquetFile("E:\\spark_test_table.txt") ;
		System.out.println("count=="+ df.count() );
		df.registerTempTable(tableName);
		sqlContext.sql("select * from "+tableName).show(); 
		
		df.write().mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.0.213:3306/data", tableName, prop);
		
		//sqlContext.table
	}

 

十、RDD 与 DataFrame之间的转换操作  

  

	public void rddToDataFrame(JavaSparkContext sc , SQLContext  sqlContext ){
		
		// Load a text file and convert each line to a JavaBean.
		JavaRDD<Person> people = sc.textFile("e://sparktestplace/people.txt").map(
		  new Function<String, Person>() {
		    public Person call(String line) throws Exception {
		      String[] parts = line.split(",");

		      Person person = new Person();
		      person.setName(parts[0]);
		      person.setAge(Integer.parseInt(parts[1].trim()));
		      System.out.println("============="+person );
		      return person;
		    }
		  });

		// Apply a schema to an RDD of JavaBeans and register it as a table.
		DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
		schemaPeople.registerTempTable("people");

		// SQL can be run over RDDs that have been registered as tables.
		DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") ; 

		teenagers.show(); 
		// The results of SQL queries are DataFrames and support all the normal RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
//		List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
//		  public String call(Row row) {
//		    return "Name: " + row.getString(0);
//		  }
//		}).collect();
//		for (int i = 0; i <teenagerNames.size(); i++) {
//			System.out.println( teenagerNames.get(i) );
//		}
	}

 

十一、自己定义数据结构

 

	public void defineSchema(JavaSparkContext sc , SQLContext  sqlContext ){

		// Load a text file and convert each line to a JavaBean.
		JavaRDD<String> people = sc.textFile("e://sparktestplace/people.txt");

		// The schema is encoded in a string
		String schemaString = "name age";

		// Generate the schema based on the string of schema
		List<StructField> fields = new ArrayList<StructField>();
		for (String fieldName: schemaString.split(" ")) {
			fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
		}
		StructType schema = DataTypes.createStructType(fields);

		// Convert records of the RDD (people) to Rows.
		JavaRDD<Row> rowRDD = people.map(
		  new Function<String, Row>() {
		    public Row call(String record) throws Exception {
		      String[] fields = record.split(",");
		      return RowFactory.create(fields[0], fields[1].trim());
		    }
		  });

		// Apply the schema to the RDD.
		DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);

		// Register the DataFrame as a table.
		peopleDataFrame.registerTempTable("people");

		// SQL can be run over RDDs that have been registered as tables.
		DataFrame results = sqlContext.sql("SELECT name FROM people");

		// The results of SQL queries are DataFrames and support all the normal RDD operations.
		// The columns of a row in the result can be accessed by ordinal.
		List<String> names = results.javaRDD().map(new Function<Row, String>() {
		  public String call(Row row) {
		    return "Name: " + row.getString(0);
		  }
		}).collect();
	}

 十二、加载parque数据

	public void loadAndSave( SQLContext  sqlContext ){
		DataFrame df = sqlContext.read().load("E:\\spark_test_table.txt");
		//SQL进行数据处理
		df.registerTempTable("user");
		DataFrame df2 = sqlContext.sql("select * from user  where age=2");
		df2.show();  
		//写数据
		//df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
		//df.select("id", "username", "age").write().save("E:\\sparktestplace\\spark_test_table_write.parquet");
		df.show(); 
	}

 

十三、 自定义UDF使用 

 

	/**
	 * 自定义UDF使用
	 * format的各种格式:json, parquet, jdbc
	 * SaveMode.ErrorIfExists , SaveMode.Append, SaveMode.Overwrite, SaveMode.Ignore
	 * @param sqlContext
	 */
	public void  specifyingOptions( SQLContext  sqlContext ){

		DataFrame df = sqlContext.read().format("json").load("file:\\E:\\sparktestplace\\json.txt");
		//df.select("name", "email").write().mode(SaveMode.Overwrite).format("parquet").save("E:\\sparktestplace\\spark_test_table_write2.parquet");
		sqlContext.registerDataFrameAsTable(df, "user");
		sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);//自定义UDF使用
		sqlContext.udf().register("replace", new  UDF2<String,String,String>(){
			@Override
			public String call(String t1, String t2) throws Exception {
				return t1+"== "+t2;
			}
		}, DataTypes.StringType);
		
		DataFrame df2 = sqlContext.sql("select strLen(country),replace(country,'==') ,email,id,ip ,name  from user");
		
		
		df2.show(); 
		
		//DataFrame df2 = sqlContext.read().format("parquet").load("E:\\sparktestplace\\spark_test_table_write2.parquet");
		//df2.show(); 
	}

 十四、sql 直接读取parque文件数据 

  

	/**
	 * sql 直接读取parque文件数据 
	 * @param sqlContext
	 */
	public void  sqlReadFile( SQLContext  sqlContext ){
		DataFrame df = sqlContext.sql("SELECT * FROM parquet.`E:\\sparktestplace\\spark_test_table_write2.parquet`");
		df.show(); 
		df.printSchema(); 
	}

 

 

 

分享到:
评论

相关推荐

    大数据Spark纯净版安装包,用于快速集成Hive on Spark

    Spark纯净版安装包是一个用于安装Apache Spark的软件包,该软件包提供了Spark的基本功能和组件,但不包含任何额外的扩展或依赖项。纯净版安装包旨在提供一个轻量级、简单易用的Spark安装选项,适用于用户希望快速...

    spark安装包+spark实验安装软件

    Spark是Apache基金会下的一个开源大数据处理框架,以其高效、易用和可扩展性著称。Spark的核心设计理念是基于内存计算,极大地提升了数据处理速度。在本压缩包中,"spark-3.4.0-bin-without-hadoop"是Spark的一个预...

    springboot与spark整合开发, 练习spark api

    在现代大数据处理领域,Spark和Spring Boot的整合已经成为一种常见的技术组合。Spark作为一个快速、通用且可扩展的大数据处理框架,而Spring Boot则为构建微服务提供了简洁、高效的解决方案。本篇文章将深入探讨如何...

    SparkDemo.rar

    《Spark技术深度解析:从SparkCount到SparkSQL与SparkStreaming》 Spark,作为大数据处理领域的重要框架,以其高效、易用的特点受到了广泛的关注。在"SparkDemo.rar"这个压缩包中,包含了三个关键领域的示例:Spark...

    spark相关jar包

    Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效、灵活和可扩展的特性而闻名。Spark的核心设计是基于内存计算,它极大地提高了数据处理的速度,使得数据科学家和开发人员能够在处理大规模数据集时获得...

    Spark官方中文文档

    Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点...

    spark笔记整理文档

    《Spark技术深度解析》 Spark,作为大数据处理领域的重要框架,以其高效、易用和弹性伸缩等特性,被广泛应用于大规模数据处理、实时计算、机器学习和图形处理等多个场景。本篇笔记将深入探讨Spark的核心概念、架构...

    spark-2.3.3.tgz

    Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效、灵活和易用的特性而闻名。Spark 2.3.3是该框架的一个稳定版本,提供了丰富的数据处理功能,包括批处理、交互式查询(Spark SQL)、实时流处理(Spark...

    spark2 安装教程

    ### Spark2.0安装教程与Spark1.3共存配置详解 #### 一、引言 随着大数据技术的发展,Apache Spark 已成为处理大规模数据集的重要工具之一。然而,在实际应用过程中,不同的项目可能需要使用不同版本的 Spark 来...

    Spark和TiDB (Spark on TiDB)

    SparkTI (Spark on TiDB)是TiDB基于Apache Spark的独立于原生系统的计算引擎。它将Spark和TiDB深度集成,在原有MySQL Workload之外借助Spark支持了更多样的用户场景和API。这个项目在SparkSQL和Catalyst引擎之外实现...

    spark_jar包

    Spark_JAR包是Apache Spark项目的核心组件之一,它包含了运行Spark应用程序所必需的类库和依赖。Spark作为一个快速、通用且可扩展的数据处理框架,它为大数据处理提供了丰富的API,支持Scala、Java、Python和R等多种...

    spark的Ubuntu下的安装包

    spark-3.5.0-bin-hadoop3.tgz 是Apache Spark的一个特定版本,针对Hadoop 3.x版本进行了优化和构建。Apache Spark是一个强大的分布式计算系统,用于大数据处理和分析。它提供了高效的数据处理能力,支持多种编程语言...

    spark3.0入门到精通

    ├─Spark-day01 │ 01-[了解]-Spark发展历史和特点介绍.mp4 │ 03-[掌握]-Spark环境搭建-Standalone集群模式.mp4 │ 06-[理解]-Spark环境搭建-On-Yarn-两种模式.mp4 │ 07-[掌握]-Spark环境搭建-On-Yarn-两种...

    Spark 入门实战系列

    Spark 入门实战系列,适合初学者,文档包括十部分内容,质量很好,为了感谢文档作者,也为了帮助更多的人入门,传播作者的心血,特此友情转贴: 1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建....

    适配CDH6.3.2的Spark3.2.2

    Spark3.2.2是Apache Spark的一个重要版本,它提供了许多新特性和性能优化,而适配CDH6.3.2则意味着该版本的Spark已经经过了与Cloudera Data Hub (CDH) 6.3.2的兼容性验证。CDH是一个流行的Hadoop发行版,包含了...

    spark-2.4.0源码

    Spark是Apache软件基金会下的一个开源大数据处理框架,其2.4.0版本是对早期版本的扩展和优化,提供了更高效、稳定和丰富的功能。Spark的核心设计理念是快速、通用和可扩展的数据处理,它通过内存计算大幅度提升了...

    spark全套视频教程

    《Spark全套视频教程》是一份全面讲解Apache Spark的教育资源,旨在帮助学习者深入理解和掌握这一强大的大数据处理框架。Spark以其高效、易用和多模态处理能力,在大数据领域备受推崇,广泛应用于数据处理、机器学习...

    Flink和Spark比较

    Flink 和 Spark 比较 Flink 和 Spark 都是大数据处理领域中的重要框架,本文将对它们进行比较,分别介绍它们的核心实现、计算模型、硬件需求、数据源集成、性能对比等方面。 核心实现 Apache Spark 是基于 Scala ...

    Spark 编程指南简体中文版.pdf

    Spark 编程指南简体中文版 本资源为 Spark 编程指南简体中文版,涵盖了 Spark 的基本概念、数据处理、流处理、图形处理和 SQL 等方面的内容。以下是该资源中的知识点总结: Spark 基础 * Spark Shell:交互式 ...

    大数据Spark企业级实战

    《大数据Spark企业级实战》详细解析了企业级Spark开发所需的几乎所有技术内容,涵盖Spark的架构设计、Spark的集群搭建、Spark内核的解析、Spark SQL、MLLib、GraphX、Spark Streaming、Tachyon、SparkR、Spark多语言...

Global site tag (gtag.js) - Google Analytics