`

大数据学习0基础葵花宝典

阅读更多
BIGDATA葵花宝典

1、虚拟机安装CentOS
VMware workstation 12
CentOS 6.6
(百度下安装步骤,网上很多)
2、CentOS配置网络和安装桌面系统
2.1 网络配置
选择桥接模式
vi /etc/sysconfig/network-scripts/ifcfg-eth0
把ONBOOT=no这行的no修改为yes,reboot重启可联通网络
修改/etc/hosts,IP地址对应机器名,把之前的行注释调,加入:IP地址 机器名
service network restart //重启网络

安装vim软件:yum -y install vim

shutdown -h now //关闭系统

2.2 安装桌面系统
用Scala开发工具才操作
1)、使用命令 runlevel 查看当前的运行级别 ,
2)、使用命令 yum grouplist | more  查看是否安装了桌面环境的组件,
3)、当前运行级别是3,而且也没有安装桌面环境的软件
4)、yum groupinstall -y   "Desktop"   "Desktop Platform"   "Desktop Platform Development"  "Fonts"  "General Purpose Desktop"  "X Window System"  "Chinese Support [zh]" "Internet Browser"
     后面的是安装软件过程,需要等等一阵时间。
5)、编辑/etc/inittab文件,修改启级别为5,“id:3:initdefault:”修改为“id:5:initdefault:”
6)、然后重新启动就可以进入桌面环境
7)、重启的过程中,设置一下桌面环境的几个参数就可以正常进入登陆界面了

3、查看测试环境版本以及下载相对应版本
1) echo $JA[align=center][/align]VA_HOME:/usr/java/jdk1.7.0_75
下载路径:www.oracle.com -> 鼠标悬停“Downloads”上点击“Java for Developers”->拖到网页最下面Java Archive页,点击“Downloads”选择下载对应的版本即可
2) hadoop version:Hadoop 2.6.0-cdh5.8.3
下载路径:http://hadoop.apache.org ->左边点击“Download Hadoop”->点击“releases”->选择镜像站“mirror site”

4、配置SSH(免密码登录)
命令:ssh-keygen -t rsa  --(ll .ssh/ id_rsa为私钥,id_rsa.pub为公钥)
命令:cd .ssh/
命令:cat id_rsa.pub >> authorized_keys --生成一个权限文件
命令:chmod 644 authorized_keys --给一个644的权限
  验证下:ssh bigdata 第一次登陆需要数据YES  退出exit
ssh IP/HOSTNAME 连接命令
5、 安装JDK和HADOOP
5.1、安装和配置JDK
安装命令:rpm -ivh jdk-7u75-linux-x64.rpm,
安装好的路径一般在/usr目录下,命令:ll /usr/java/default/
环境变量:vi /etc/profile 修改该文件,最末尾加入:
export JAVA_HOME=/usr/java/jdk1.7.0_75
export PATH=$PATH:$JAVA_HOME/bin

          source /etc/profile 生效操作 (该操作不需要)
卸载:rpm -e packgename,rpm -e jdk1.8.0_101(文件安装的跟目录)
5.2、安装及配置Hadoop
安装:tar zxf hadoop-2.6.0.tar.gz  //解压到当前目录
配置环境变量:修改vi /etc/profile,在最后加上如下三行:
export JAVA_HOME=/usr/java/jdk1.7.0_75
export HADOOP_HOME=/opt/hadoop-2.6.0
export PATH=$PATH:$JAVA_HOME/bin: $HADOOP_HOME/bin

       配置文件路径:安装目录 /etc/Hadoop

   创建文件:
           /opt/hadoop-2.6.0/current/tmp
           /opt/hadoop-2.6.0/current/data
mkdir –p /opt/hadoop-2.6.0/current/dfs/name

       下面的配置文件路径为:/opt/hadoop-2.6.0/etc/hadoop
5.2.1、Core-site.xml
<property>
    <name>fs.default.name</name>
    <value>hdfs://bigdata:9000</value>
  </property>

<property>
    <name>hadoop.tmp.dir</name>
    <value>/opt/hadoop-2.6.0/current/tmp</value>
  </property>
<property>
    <name>fs.trash.interval</name>
    <value>10</value>
  </property>

5.2.2、Hdfs-site.xml
<property>
   <name>dfs.namenode.name.dir</name>
   <value>/opt/hadoop-2.6.0/current/dfs/name</value>
</property>
<property>
   <name>dfs.datanode.data.dir</name>
   <value>/opt/hadoop-2.6.0/current/data</value>
</property>
<property>  --副本的数量
   <name>dfs.replication</name>
   <value>1</value>
</property>
<property> --是否启用WEB
   <name>dfs.webhdfs.enabled</name>
   <value>true</value>
</property>
<property> --设置用户组
   <name>dfs.permissions.superusergroup</name>
   <value>staff</value>
</property>
<property> --是否开启HDFS的权限
   <name>dfs.permissions.enabled</name>
   <value>false</value>
</property>
5.2.3、Yarn-site.xml
<property> 配置resourcemanager名称
   <name>yarn.resourcemanager.hostname</name>
   <value>bigdata</value>
</property>
<property>  配置nodemanager
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>
</property>
<property> 配置nodemanager的mapreduce的类
   <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
   <value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property> 配置resourcemanager地址的端口
   <name>yarn.resourcemanager.address</name>
   <value>bigdata:18040</value>
</property>
<property> 配置resourcemanager调度器的端口
   <name>yarn.resourcemanager.scheduler.address</name>
   <value>bigdata:18030</value>
</property>
<property> tracker的端口地址
   <name>yarn.resourcemanager.resource-tracker.address</name>
   <value>bigdata:18025</value>
</property>
<property>  admin的端口地址
   <name>yarn.resourcemanager.admin.address</name>
   <value>bigdata:18141</value>
</property>
<property> webapp端口地址
   <name>yarn.resourcemanager.webapp.address</name>
   <value>bigdata:18088</value>
</property>
<property> 配置日志 启用
   <name>yarn.log-aggregation-enable</name>
   <value>true</value>
</property>
<property> 日志聚合,是以秒为单位
   <name>yarn.log-aggregation.retain-seconds</name>
   <value>86400</value>
</property>
<property> 日志检查,多长时间检查一次,是以秒为单位
   <name>yarn.log-aggregation.retain-check-interval-seconds</name>
   <value>86400</value>
</property>
<property>
   <name>yarn.nodemanager.remote-app-log-dir</name>
   <value>/tmp/logs</value>
</property>
<property>
   <name>yarn.nodemanager.remote-app-log-dir-suffix</name>
   <value>logs</value>
</property>

5.2.4、Mapred-site.xml
复制文件:cp mapred-site.xml.template mapred-site.xml

<property> mapreduce框架
  <name>mapreduce.framework.name</name>
  <value>yarn</value>
</property>
<property> JOB端口地址
  <name>mapreduce.jobtracker.http.address</name>
  <value>bigdata:50030</value>
</property>
<property> JOB跑的历史记录地址
  <name>mapreduce.jobhisotry.address</name>
  <value>bigdata:10020</value>
</property>
<property>
  <name>mapreduce.jobhistory.webapp.address</name>
  <value>bigdata:19888</value>
</property>
<property> 已完成的日志目录
  <name>mapreduce.jobhistory.done-dir</name>
  <value>/jobhistory/done</value>
</property>
<property> 中间完成情况的日志目录
  <name>mapreduce.intermediate-done-dir</name>
  <value>/jobhisotry/done_intermediate</value>
</property>
<property>
  <name>mapreduce.job.ubertask.enable</name>
  <value>true</value>
</property>

5.2.5、Hadoop-env.sh
目录:/opt/hadoop-2.6.0/etc/hadoop
export JAVA_HOME=/usr/java/default/

5.2.6、Slaves
Bigdata  //计算机名称

5.2.7、格式化HDFS
hdfs namenode -format
5.2.8、启动Hadoop集群
/opt/hadoop-2.6.0/sbin/start-all.sh
5.2.9、验证Hadoop集群
1) jps
2) 通过页面端口来查看
查看防火墙是否关闭:chkconfig iptables --list
关闭防火墙:service iptables stop (临时)
    永久  vim /etc/selinux/config 把SELINUX=enforcing修改为SELINUX=disabled
          chkconfig iptables off
http://172.29.20.53:18088/  http://172.29.20.53:50070/

问题,启动hadoop有警告
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [bigdata]
解决过程:直接在log4j日志中去除告警信息。
在/opt /hadoop-2.6.0/etc/hadoop/log4j.properties文件中添加
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR

6、 HDFS的一些命令操作
技术相关文档:http://hadoop.apache.org/ ->左边点击Documentation->选择相关的版本
创建目录:hdfs dfs -mkdir /demo
上传文件:hdfs dfs -put demo.txt /demo/
查看目录:hdfs dfs -ls /demo
查看文件:hdfs dfs -cat /demo/demo.txt
删除文件:hdfs dfs -rm /demo/demo.txt
删除目录:hdfs dfs –rm –r /demo
下载文件:hdfs dfs -get /demo/demo.txt ./

7、 Spark安装、配置、验证
7.1、Scala安装和配置
解压:tar zxf scala-2.10.5.tgz
配置:vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.7.0_75
export SCALA_HOME=/opt/scala-2.10.5
export HADOOP_HOME=/opt/hadoop-2.6.0
export PATH=$PATH:$HADOOP_HOME/bin:$SCALA_HOME/bin:$JAVA_HOME/bin
7.2、Spark安装和配置
解压:tar zxf spark-1.6.0-bin-hadoop2.6
配置:1)vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.7.0_75
export SCALA_HOME=/opt/scala-2.10.5
export HADOOP_HOME=/opt/hadoop-2.6.0
export SPARK_HOME=/opt/spark-1.6.0-bin-hadoop2.6
export PATH=$PATH:$SCALA_HOME/bin:$JAVA_HOME/bin:$SPARK_HOME/bin
      2)spark-env.sh
        路径:/opt/spark-1.6.0-bin-hadoop2.6/conf
复制文件:cp spark-env.sh.template spark-env.sh
export SPARK_MASTER_IP=bigdata
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=2g
3)slaves
路径:/opt/spark-1.6.0-bin-hadoop2.6/conf
复制文件:cp slaves.template slaves
增加计算机名称或者是IP地址
7.3、Spark启动
先启动Hadoop
/opt/spark-1.6.0-bin-hadoop2.6/sbin/start-all.sh
用jps查看会多处两个进程:Master、Worker
页面地址和端口:http://172.29.20.53:8080/
7.4、Spark_shell启动和验证
启动:/opt/spark-1.6.0-bin-hadoop2.6/bin/spark-shell
验证,执行如下代码:
scala> val file=sc.textFile("hdfs://bigdata:9000/demo/demo.txt")
scala>val count=file.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey(_+_)
scala> count.collect
第一行是读取HDFS的demo.txt文件
第二行是对文件进行操作
第三行是提交并执行Job
   

  :help  --获取帮助
  :quit   --退出spark-shell
7.5、Spark四大天王简介
Spark Streaming:
Spark Streaming基于微批量方式的计算和处理,可以用于处理实时的流数据。它使用DStream,简单来说就是一个弹性分布式数据集(RDD)系列,处理实时数据。

Spark SQL:
Spark SQL可以通过JDBC API将Spark数据集暴露出去,而且还可以用传统的BI和可视化工具在Spark数据上执行类似SQL的查询。用户还可以用Spark SQL对不同格式的数据(如JSON,Parquet以及数据库等)执行ETL,将其转化,然后暴露给特定的查询。

Spark MLlib:
MLlib是一个可扩展的Spark机器学习库,由通用的学习算法和工具组成,包括二元分类、线性回归、聚类、协同过滤、梯度下降以及底层优化原语。用于机器学习和统计等场景

Spark GraphX:
GraphX是用于图计算和并行图计算的新的(alpha)Spark API。通过引入弹性分布式属性图(Resilient Distributed Property Graph),一种顶点和边都带有属性的有向多重图,扩展了Spark RDD。为了支持图计算,GraphX暴露了一个基础操作符集合(如subgraph,joinVertices和aggregateMessages)和一个经过优化的Pregel API变体。此外,GraphX还包括一个持续增长的用于简化图分析任务的图算法和构建器集合。
8、 基于IDEA构建SPARK开发环境
8.1 手动安装SBT
 下载地址:http://www.scala-sbt.org/
 sudo tar zxvf sbt-0.13.13.tgz (把解压后的文件放到/opt/sbt中)
 建立启动sbt的脚本文件
路径在/opt/sbt中新建一个文件sbt,命令vim sbt
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar /opt/sbt/bin/sbt-launch.jar "$@"

 修改sbt文件权限
命令为:chmod u+x sbt
 配置PATH环境变量,保证在控制台中可以使用sbt命令
vim ~/.bashrc  最后加入:export PATH=/opt/sbt/:$PATH  
source ~/.bashrc 生效
 测试sbt是否安装成功
第一次执行时,会下载一些文件包,然后才能正常使用,要确保联网了,安装成功后显示如下:
sbt sbt-version
[info] Set current project to sbt (in build file:/opt/sbt/)
[info] 0.13.13

8.2 安装IDEA和相关插件
浏览器:yum install firefox
安装中文输入法:yum install "@Chinese Support"
IntelliJ idea下载地址:https://www.jetbrains.com/idea/
把/opt/idea/bin配置到PATH环境变量中,vim ~./.bashrc
把插件解压放到/opt/idea/plugins目录下即可
启动命令:./idea.sh
IDEA工具打开后,需要安装SBT插件,打开idea的首选项,然后找到 Plugins ,点击 Browser repositores... 按钮,输入 sbt 搜索,然后找到 sbt 的插件进行安装,如下图所示:

8.3 创建SBT工程
创建SBT项目,依赖包下载完成后的目录结构如下图:

plugins.sbt 文件放置插件配置
build.sbt 是整体的项目配置信息
build.properties 可以设置 sbt 版本
java 目录存放 java 文件
scala 目录存放 scala 文件
resources 目录用来存放配置文件
test 相关目录用来存放测试相关文件

测试一下运行:


9、 Spark Streaming
学习:http://spark.apache.org/docs/1.6.0/streaming-programming-guide.html

安装netcat
netcat 在centos里叫:nc.x86_64,可以用:yum search nc找下。如果有执行下面命令: yum install nc.x86_64,安装完成后用nc –help验证是否可以用。
9.1 A Quick Example
   def main(args: Array[String]): Unit = {
    // 创建一个local StreamingContext,包含2个工作线程,并将批次间隔设为1秒
// master至少需要2个CPU核,以避免出现任务饿死的情况
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf,Seconds(1))

// 创建一个连接到hostname:port的DStream,如:localhost:9999
    val lines = ssc.socketTextStream("bigdata",9999)

// 将每一行分割成多个单词
    val words = lines.flatMap(_.split(" "))

// 对每一批次中的单词进行计数
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_+_)

// 将该DStream产生的RDD的头十个元素打印到控制台上
    wordCounts.print()

    ssc.start()             // 启动流式计算
    ssc.awaitTermination()  // 等待直到计算终止

  }



运行结果如下:


9.2 初始化StreamingContext
import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
context对象创建后,你还需要如下步骤:
1、创建DStream对象,并定义好输入数据源。
2、基于数据源DStream定义好计算逻辑和输出。
3、调用streamingContext.start() 启动接收并处理数据。
4、调用streamingContext.awaitTermination() 等待流式处理结束(不管是手动结束,还是发生异常错误)
5、你可以主动调用 streamingContext.stop() 来手动停止处理流程。
9.3 离散数据流 (DStreams)
离散数据流(DStream)是Spark Streaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。

9.4 输入DStream和接收器
Spark Streaming主要提供两种内建的流式数据源:
基础数据源(Basic sources): 在StreamingContext API 中可直接使用的源,如:文件系统,套接字连接或者Akka actor。
 文件数据流(File Streams): 可以从任何兼容HDFS API(包括:HDFS、S3、NFS等)的文件系统,创建方式如下:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
另外,文件数据流不是基于接收器的,所以不需要为其单独分配一个CPU core。
 基于自定义Actor的数据流(Streams based on Custom Actors): DStream可以由Akka actor创建得到,只需调用 streamingContext.actorStream(actorProps, actor-name)。详见自定义接收器(Custom Receiver Guide)。actorStream暂时不支持Python API。
 RDD队列数据流(Queue of RDDs as a Stream): 如果需要测试Spark Streaming应用,你可以创建一个基于一批RDD的DStream对象,只需调用 streamingContext.queueStream(queueOfRDDs)。RDD会被一个个依次推入队列,而DStream则会依次以数据流形式处理这些RDD的数据。

高级数据源(Advanced sources): 需要依赖额外工具类的源,如:Kafka、Flume、Kinesis、Twitter等数据源。这些数据源都需要增加额外的依赖。
自定义数据源

9.5 接收器可靠性
从可靠性角度来划分,大致有两种数据源。其中,像Kafka、Flume这样的数据源,它们支持对所传输的数据进行确认。系统收到这类可靠数据源过来的数据,然后发出确认信息,这样就能够确保任何失败情况下,都不会丢数据。因此我们可以将接收器也相应地分为两类:
可靠接收器(Reliable Receiver) – 可靠接收器会在成功接收并保存好Spark数据副本后,向可靠数据源发送确认信息。
不可靠接收器(Unreliable Receiver) – 不可靠接收器不会发送任何确认信息。不过这种接收器常用语于不支持确认的数据源,或者不想引入数据确认的复杂性的数据源。
9.6 DStream支持的transformation算子
和RDD类似,DStream也支持从输入DStream经过各种transformation算子映射成新的DStream。DStream支持很多RDD上常见的transformation算子,一些常用的见下表:
Transformation算子 用途
map(func) 返回会一个新的DStream,并将源DStream中每个元素通过func映射为新的元素
flatMap(func) 和map类似,不过每个输入元素不再是映射为一个输出,而是映射为0到多个输出
filter(func) 返回一个新的DStream,并包含源DStream中被func选中(func返回true)的元素
repartition(numPartitions) 更改DStream的并行度(增加或减少分区数)
union(otherStream) 返回新的DStream,包含源DStream和otherDStream元素的并集
count() 返回一个包含单元素RDDs的DStream,其中每个元素是源DStream中各个RDD中的元素个数
reduce(func) 返回一个包含单元素RDDs的DStream,其中每个元素是通过源RDD中各个RDD的元素经func(func输入两个参数并返回一个同类型结果数据)聚合得到的结果。func必须满足结合律,以便支持并行计算。
countByValue() 如果源DStream包含的元素类型为K,那么该算子返回新的DStream包含元素为(K, Long)键值对,其中K为源DStream各个元素,而Long为该元素出现的次数。
reduceByKey(func, [numTasks]) 如果源DStream 包含的元素为 (K, V) 键值对,则该算子返回一个新的也包含(K, V)键值对的DStream,其中V是由func聚合得到的。注意:默认情况下,该算子使用Spark的默认并发任务数(本地模式为2,集群模式下由spark.default.parallelism 决定)。你可以通过可选参数numTasks来指定并发任务个数。
join(otherStream, [numTasks]) 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中源DStream和otherDStream中每个K都对应一个 (K, (V, W))键值对元素。
cogroup(otherStream, [numTasks]) 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中每个元素类型为包含(K, Seq[V], Seq[W])的tuple。
transform(func) 返回一个新的DStream,其包含的RDD为源RDD经过func操作后得到的结果。利用该算子可以对DStream施加任意的操作。
updateStateByKey(func) 返回一个包含新”状态”的DStream。源DStream中每个key及其对应的values会作为func的输入,而func可以用于对每个key的“状态”数据作任意的更新操作。

……不想往下看了,还有好多内容

10、 Spark SQL, DataFrames 以及 Datasets 编程指南
Spark SQL的一种用法是直接执行SQL查询语句,你可使用最基本的SQL语法,也可以选择HiveQL语法。Spark SQL可以从已有的Hive中读取数据。更详细的请参考Hive Tables 这一节。如果用其他编程语言运行SQL,Spark SQL将以DataFrame返回结果。你还可以通过命令行command-line 或者 JDBC/ODBC 使用Spark SQL。
DataFrame是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame可以从很多数据源(sources)加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。
Dataset是Spark-1.6新增的一种API,目前还是实验性的。Dataset想要把RDD的优势(强类型,可以使用lambda表达式函数)和Spark SQL的优化执行引擎的优势结合到一起。Dataset可以由JVM对象构建(constructed )得到,而后Dataset上可以使用各种transformation算子(map,flatMap,filter 等)。
10.1 入门
10.1.1 创建DataFrames和一些操作
Spark SQL所有的功能入口都是SQLContext 类,及其子类。不过要创建一个SQLContext对象,首先需要有一个SparkContext对象
val conf = new SparkConf().setMaster("local[2]").setAppName("JSON DATA")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

val df = sqlContext.read.json("/opt/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
df.show() //显示内容
df.printSchema() //打印数据树形结构
df.select("name").show()
df.filter(df("age")>21).show()
df.groupBy("age").count().show()

10.1.2 编程方式执行SQL查询
val df = sqlContext.sql("SELECT * FROM table")
10.1.3 创建Dataset
10.1.3.1 和RDD互操作-利用反射推导schema
Dataset API和RDD类似,不过Dataset不使用Java序列化或者Kryo,而是使用专用的编码器(Encoder )来序列化对象和跨网络传输通信。如果这个编码器和标准序列化都能把对象转字节,那么编码器就可以根据代码动态生成,并使用一种特殊数据格式,这种格式下的对象不需要反序列化回来,就能允许Spark进行操作,如过滤、排序、哈希等。
// 定义一个case class.
// 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制,
// 你可以使用自定义class,并实现Product接口。当然,你也可以改用编程方式定义schema
case class Person(name: String, age: Int)
上面创建的case class类放到函数外面。

val conf = new SparkConf().setMaster("local[2]").setAppName("JSON DATA")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// 为了支持RDD到DataFrame的隐式转换
import sqlContext.implicits._

val path = "/opt/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt"

// 创建一个包含Person对象的RDD,并将其注册成table
val people = sc.textFile(path).map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// sqlContext.sql方法可以直接执行SQL语句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// SQL查询的返回结果是一个DataFrame,且能够支持所有常见的RDD算子
// 查询结果中每行的字段可以按字段索引访问:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// 或者按字段名访问:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回结果: Map("name" -> "Justin", "age" -> 19)

10.1.3.2 和RDD互操作-编程方式定义Schema
如果不能事先通过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,需要先解析,又或者字段对不同用户有所不同),那么你可能需要按以下三个步骤,以编程方式的创建一个DataFrame:
 从已有的RDD创建一个包含Row对象的RDD
 用StructType创建一个schema,和步骤1中创建的RDD的结构相匹配
 把得到的schema应用于包含Row对象的RDD,调用这个方法来实现这一步:SQLContext.createDataFrame
val conf = new SparkConf().setMaster("local[2]").setAppName("Dataset")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

val path = "/opt/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt"
// 创建一个RDD
val people = sc.textFile(path)

// 数据的schema被编码与一个字符串中
val schemaString = "name age"

// Import Row.
//    import org.apache.spark.sql.Row;

// Import Spark SQL 各个数据类型
//    import org.apache.spark.sql.types.{StructType,StructField,StringType};

// 基于前面的字符串生成schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 将RDD[people]的各个记录转换为Rows,即:得到一个包含Row对象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// 将schema应用到包含Row对象的RDD上,得到一个DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// 将DataFrame注册为table
peopleDataFrame.registerTempTable("people")

// 执行SQL语句
val results = sqlContext.sql("SELECT name FROM people")

// SQL查询的结果是DataFrame,且能够支持所有常见的RDD算子
// 并且其字段可以以索引访问,也可以用字段名访问
results.map(t => "Name: " + t(0)).collect().foreach(println)

10.2 数据源
Spark SQL支持基于DataFrame操作一系列不同的数据源。DataFrame既可以当成一个普通RDD来操作,也可以将其注册成一个临时表来查询。把DataFrame注册为table之后,你就可以基于这个table执行SQL语句了。本节将描述加载和保存数据的一些通用方法,包含了不同的Spark数据源,然后深入介绍一下内建数据源可用选项。
10.2.1 通用加载/保存函数
在最简单的情况下,所有操作都会以默认类型数据源来加载数据(默认是Parquet,除非修改了spark.sql.sources.default 配置).
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

 手动指定选项
你也可以手动指定数据源,并设置一些额外的选项参数。数据源可由其全名指定(如,org.apache.spark.sql.parquet),而对于内建支持的数据源,可以使用简写名(json, parquet, jdbc)。任意类型数据源创建的DataFrame都可以用下面这种语法转成其他类型数据格式。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

 直接对文件使用SQL
Spark SQL还支持直接对文件使用SQL查询,不需要用read方法把文件加载进来。
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

 保存模式
Save操作有一个可选参数SaveMode,用这个参数可以指定如何处理数据已经存在的情况。很重要的一点是,这些保存模式都没有加锁,所以其操作也不是原子性的。另外,如果使用Overwrite模式,实际操作是,先删除数据,再写新数据。
仅Scala/Java 所有支持的语言 含义
SaveMode.ErrorIfExists (default) "error" (default) (默认模式)从DataFrame向数据源保存数据时,如果数据已经存在,则抛异常。
SaveMode.Append "append" 如果数据或表已经存在,则将DataFrame的数据追加到已有数据的尾部。
SaveMode.Overwrite "overwrite" 如果数据或表已经存在,则用DataFrame数据覆盖之。
SaveMode.Ignore "ignore" 如果数据已经存在,那就放弃保存DataFrame数据。这和SQL里CREATE TABLE IF NOT EXISTS有点类似。

 保存到持久化表
在使用HiveContext的时候,DataFrame可以用saveAsTable方法,将数据保存成持久化的表。与registerTempTable不同,saveAsTable会将DataFrame的实际数据内容保存下来,并且在HiveMetastore中创建一个游标指针。持久化的表会一直保留,即使Spark程序重启也没有影响,只要你连接到同一个metastore就可以读取其数据。读取持久化表时,只需要用用表名作为参数,调用SQLContext.table方法即可得到对应DataFrame。
默认情况下,saveAsTable会创建一个”managed table“,也就是说这个表数据的位置是由metastore控制的。同样,如果删除表,其数据也会同步删除。
10.2.2 Parquet文件
Parquet 是一种流行的列式存储格式。Spark SQL提供对Parquet文件的读写支持,而且Parquet文件能够自动保存原始数据的schema。写Parquet文件的时候,所有的字段都会自动转成nullable,以便向后兼容。
 编程方式加载数据
代码省略
10.2.3 JSON数据集
Spark SQL在加载JSON数据的时候,可以自动推导其schema并返回DataFrame。用SQLContext.read.json读取一个包含String的RDD或者JSON文件,即可实现这一转换。
注意,通常所说的json文件只是包含一些json数据的文件,而不是我们所需要的JSON格式文件。JSON格式文件必须每一行是一个独立、完整的的JSON对象。因此,一个常规的多行json文件经常会加载失败。
    val conf = new SparkConf().setMaster("local[2]").setAppName("sparkSQLJSON")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    // 数据集是由路径指定的
    // 路径既可以是单个文件,也可以还是存储文本文件的目录
    val path = "/opt/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json"
    val people = sqlContext.read.json(path)

    // 推导出来的schema,可由printSchema打印出来
    people.printSchema()
    // root
    //  |-- age: integer (nullable = true)
    //  |-- name: string (nullable = true)

    // 将DataFrame注册为table
    people.registerTempTable("people")

    // 跑SQL语句吧!
    val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19")

    teenagers.map(t => "Name: " + t(0) + ", Age:" + t(1)).collect().foreach(println)

    teenagers.map(t => "Name: " + t.getAs[String]("name") + ", Age:" + t.getAs[Int]("age")).collect().foreach(println)

    // 另一种方法是,用一个包含JSON字符串的RDD来创建DataFrame
    val anotherPeopleRDD = sc.parallelize(
      """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

SQL版的应用
CREATE TEMPORARY TABLE jsonTable
USING org.apache.spark.sql.json
OPTIONS (
  path "examples/src/main/resources/people.json"
)

SELECT * FROM jsonTable

10.2.4 Hive表
Spark SQL支持从Apache Hive读写数据。然而,Hive依赖项太多,所以没有把Hive包含在默认的Spark发布包里。要支持Hive,需要在编译spark的时候增加-Phive和-Phive-thriftserver标志。这样编译打包的时候将会把Hive也包含进来。注意,hive的jar包也必须出现在所有的worker节点上,访问Hive数据时候会用到(如:使用hive的序列化和反序列化SerDes时)。

Hive配置在conf/目录下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)文件中。请注意,如果在YARN cluster(yarn-cluster mode)模式下执行一个查询的话,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必须在驱动器(driver)和所有执行器(executor)都可用。一种简便的方法是,通过spark-submit命令的–jars和–file选项来提交这些文件。
代码省略
10.2.5 用JDBC连接其他数据库
Spark SQL也可以用JDBC访问其他数据库。这一功能应该优先于使用JdbcRDD。因为它返回一个DataFrame,而DataFrame在Spark SQL中操作更简单,且更容易和来自其他数据源的数据进行交互关联。JDBC数据源在java和python中用起来也很简单,不需要用户提供额外的ClassTag。(注意,这与Spark SQL JDBC server不同,Spark SQL JDBC server允许其他应用执行Spark SQL查询)

首先,你需要在spark classpath中包含对应数据库的JDBC driver,下面这行包括了用于访问postgres的数据库driver
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
远程数据库的表可以通过Data Sources API,用DataFrame或者SparkSQL 临时表来装载。以下是选项列表:
属性名 含义
url 需要连接的JDBC URL
dbtable 需要读取的JDBC表。注意,任何可以填在SQL的where子句中的东西,都可以填在这里。(既可以填完整的表名,也可填括号括起来的子查询语句)
driver JDBC driver的类名。这个类必须在master和worker节点上都可用,这样各个节点才能将driver注册到JDBC的子系统中。
partitionColumn, lowerBound, upperBound, numPartitions 这几个选项,如果指定其中一个,则必须全部指定。他们描述了多个worker如何并行的读入数据,并将表分区。partitionColumn必须是所查询的表中的一个数值字段。注意,lowerBound和upperBound只是用于决定分区跨度的,而不是过滤表中的行。因此,表中所有的行都会被分区然后返回。
fetchSize JDBC fetch size,决定每次获取多少行数据。在JDBC驱动上设成较小的值有利于性能优化(如,Oracle上设为10)

Spark SQL连接MYSQL查询的例子。
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://172.29.20.35:3306/xedk"
val user = "xedk"
val pwd = "admin"

val conf = new SparkConf().setMaster("local[2]").setAppName("sparkSQL connMySQL")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

//注意:集群上运行时,一定要添加这句话,否则会报找不到mysql驱动的错误
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")

val jdbcMYSQL = sqlContext.read.format("jdbc").options(
  Map("url" -> url,
      "user" -> user,
      "password" -> pwd,
      "driver" -> driver,
      "dbtable" -> "cr_customer")
).load()

jdbcMYSQL.registerTempTable("cr_customer")
//字段区分大小写
sqlContext.sql("select CIFNO,CIFNAME from cr_customer").collect().take(10).foreach(println)

10.2.6 疑难解答
JDBC driver class必须在所有client session或者executor上,对java的原生classloader可见。这是因为Java的DriverManager在打开一个连接之前,会做安全检查,并忽略所有对原声classloader不可见的driver。最简单的一种方法,就是在所有worker节点上修改compute_classpath.sh,并包含你所需的driver jar包。
一些数据库,如H2,会把所有的名字转大写。对于这些数据库,在Spark SQL中必须也使用大写。
10.3 性能调优
对于有一定计算量的Spark作业来说,可能的性能改进的方式,不是把数据缓存在内存里,就是调整一些开销较大的选项参数。
10.3.1 内存缓存
Spark SQL可以通过调用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存储格式缓存到内存中。随后,Spark SQL将会扫描必要的列,并自动调整压缩比例,以减少内存占用和GC压力。你也可以用SQLContext.uncacheTable(“tableName”)来删除内存中的table。
你还可以使用SQLContext.setConf 或在SQL语句中运行SET key=value命令,来配置内存中的缓存。
属性名 默认值 含义
spark.sql.inMemoryColumnarStorage.compressed TRUE 如果设置为true,Spark SQL将会根据数据统计信息,自动为每一列选择单独的压缩编码方式。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式缓存批量的大小。增大批量大小可以提高内存利用率和压缩率,但同时也会带来OOM(Out Of Memory)的风险。

10.3.2 其他配置选项
以下选项同样也可以用来给查询任务调性能。不过这些选项在未来可能被放弃,因为spark将支持越来越多的自动优化。
10.4 分布式SQL引擎
10.4.1 运行Thrift JDBC/ODBC server
10.4.2 使用Spark SQL命令行工具

分享到:
评论

相关推荐

    开发类葵花宝典

    《开发类葵花宝典》是一本集合了.NET和Java两大主流开发平台的精华知识的综合指南。作为开发者,无论是初学者还是资深工程师,都可从中受益匪浅。本宝典旨在提供全面、深入的编程理论与实践技巧,帮助读者在软件开发...

    程序员面试葵花宝典

    《程序员面试葵花宝典》是一本专门为编程新手和应届毕业生准备的面试指南,它集成了众多编程基础知识、面试技巧和实践经验,旨在帮助初入职场的程序员们在求职过程中更好地展示自己的技能和潜力。这份“葵花宝典”...

    葵花宝典-数据库类

    这份名为“葵花宝典-数据库类”的文档,很可能是对以上知识点的详细讲解和实践指导,对于Java开发者来说,是一份不可多得的学习资源。通过深入学习和实践,可以提升在数据库领域的专业技能,为开发更高效、可靠的...

    系统集成项目管理工程师考试葵花宝典

    通过《系统集成项目管理工程师考试葵花宝典》的深入学习和金色题库的实战演练,考生可以对上述知识点有更深入的理解,从而在实际考试中游刃有余。这份资料的使用,不仅有助于考生熟悉考试题型,还能提升其在项目管理...

    国内第一部hadoop面试葵花宝典

    这部《国内第一部Hadoop面试葵花宝典》不仅涵盖了Hadoop的基础知识,还深入到面试中可能遇到的技术点,对于准备Hadoop相关职位的求职者来说,是一份极具价值的学习资料。通过深入研读,可以提升对Hadoop的理解,增加...

    HCIE-R&S 面试理论之葵花宝典-答案版v2.0(修正版).pdf

    随着技术的发展,ICT行业不仅在传统的网络通信领域有着深入的发展,同时在云计算、存储、大数据等新兴领域也展现出迅猛的发展势头。因此,投身于ICT行业,只要努力掌握相关技术,将会为个人的未来发展打下坚实的基础...

    HCIE-RS 面试葵花宝典V2.0

    ICT行业包括云计算、存储、大数据等多个新兴方向,掌握核心技术将为未来的职业发展打下坚实基础。 HCIE认证因其高度专业性和含金量受到重视,但不应只是为了获取证书而学习。"Paper IE"是指只通过考试而缺乏实际...

    HCIE-RS面试理论之葵花宝典V3.0

    他鼓励学员不仅要看到ICT行业的广阔前景,如云计算、存储和大数据等新兴领域,还要努力学习技术,为未来的职业发展打下坚实基础。 其次,作者指出,不应仅为了证书而学习,而是要重视学习过程本身。过去的“paper ...

    华为HCIE-RS面试理论之葵花宝典V3.0

    这份葵花宝典主要针对想要考取HCIE-R&S认证的学员,提供了学习方法和备考心态的建议。以下为详细的知识点解析: 1. 明确目标,避免迷失方向 在备考HCIE-R&S的过程中,首要任务是明确自己的学习目标。作者强调,一个...

    超人学院Hadoop面试葵花宝典

    超人学院所发布的《Hadoop面试葵花宝典》不仅是一本面试题集,更是Hadoop学习者和求职者的必备宝典。本书不仅提供了大量实战题,还对知识点进行了深入解析,尤其适合那些希望在大数据领域进一步发展的人才。 1. ...

    2019年信息系统项目管理师考试葵花宝典之历年真题分类详细解析【最新带书签】1

    【信息系统项目管理师考试葵花宝典】是针对2019年度信息系统项目管理师资格认证考试的一份重要参考资料,其内容涵盖了历年真题的详细分类解析,旨在帮助考生全面理解和掌握考试的重点与难点。这份资料的特点是带有...

    华为HCIE-RS 数通3.0面试宝典.pdf

    ICT行业正快速发展,其中数通(数据通信)是其基础,也是推动其他方向如云计算、存储和大数据等领域发展的重要力量。选择HCIE-R&S认证,意味着选择了网络领域的深入学习,通过认证的专家往往具有深厚的技术积累和...

    一些企业招聘程序员的面试题

    这份资源可以被视为程序员的“JAVA葵花宝典”,暗示它包含了许多Java编程语言相关的面试知识点,同时也可能涵盖其他编程语言和技术领域。 首先,Java作为最广泛使用的编程语言之一,其面试题通常会围绕以下几个核心...

Global site tag (gtag.js) - Google Analytics