`
大涛学长
  • 浏览: 105010 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

最佳实践 | RDS & POLARDB归档到X-Pack Spark计算

阅读更多
 

 

X-Pack Spark服务通过外部计算资源的方式,为Redis、Cassandra、MongoDB、HBase、RDS存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。

![be0c5c87da12145325d0fbbf2770c011e63bbb7a.png](https://ucc.alicdn.com/pic/developer-ecology/0e9c03366a584728af16c0e09721f631.png)

RDS & POLARDB分表归档到X-Pack Spark步骤
--------------------------------

**一键关联POLARDB到Spark集群**

一键关联主要是做好spark访问RDS & POLARDB的准备工作。 
![3da40523db7ba8a13ad447479c143ae90e1f8f4d.png](https://ucc.alicdn.com/pic/developer-ecology/58032de04d5b4c3da987c353c50b0339.png)

**POLARDB表存储**

在database ‘test1’中每5分钟生成一张表,这里假设为表 'test1'、'test2'、'test2'、... 
![f8b752d3dd8c53fb20c66e01c00fb201f6396c1a.png](https://ucc.alicdn.com/pic/developer-ecology/1dd7c5aad93d4392b224ccf20e936473.png)

具体的建表语句如下:

\*请左右滑动阅览

```
CREATE TABLE `test1` ( `a` int(11) NOT NULL,
                        `b` time DEFAULT NULL,         
               `c` double DEFAULT NULL,
                         PRIMARY KEY (`a`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
```

![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==)![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== "点击并拖拽以移动")

**归档到Spark的调试**

x-pack spark提供交互式查询模式支持直接在控制台提交sql、python脚本、scala code来调试。

1、首先创建一个交互式查询的session,在其中添加mysql-connector的jar包。

\*请左右滑动阅览

```
wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mysql-connector-java-5.1.34.jar
```

![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==)![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== "点击并拖拽以移动")

![6ecbde3466820df6fbee0285e9d11d96cbd995fb.png](https://ucc.alicdn.com/pic/developer-ecology/5a3d783f4fbc4d098f994939a8a874e6.png)

2、创建交互式查询

以pyspark为例,下面是具体归档demo的代码:

\*请左右滑动阅览

```
spark.sql("drop table sparktest").show()
# 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
      "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

#本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
# CREATE TABLE `test1` (
#     `a` int(11) NOT NULL,
#                     `b` time DEFAULT NULL,
#                                      `c` double DEFAULT NULL,
#                                                         PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4):
    #构造polardb的表名
    dbtable = "test1." + "test" + str(num)
    #spark外表关联polardb对应的表
    externalPolarDBTableNow = spark.read \
        .format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \
        .option("dbtable", dbtable) \
        .option("user", "name") \
        .option("password", "xxx*") \
        .load().registerTempTable("polardbTableTemp")
    #生成本次polardb表数据要写入的spark表的分区信息
    (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
    #执行导数据sql
    spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
          "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
    #删除临时的spark映射polardb表的catalog
    spark.catalog.dropTempView("polardbTableTemp")
    #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
    spark.sql("show partitions sparktest").show(1000, False)
    spark.sql("select count(*) from sparktest").show()
```

![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==)![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== "点击并拖拽以移动")

**归档作业上生产**

交互式查询定位为临时查询及调试,生产的作业还是建议使用spark作业的方式运行,使用文档参考。这里以pyspark作业为例: 
![9730abaad2803bfb6a3b7b080a26fe03b808e5ad.png](https://ucc.alicdn.com/pic/developer-ecology/ec0b4e9e8d954237adf47e7f468d6f6b.png)

/polardb/polardbArchiving.py 内容如下:

\*请左右滑动阅览

```
# -*- coding: UTF-8 -*-

from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PolardbArchiving") \
        .enableHiveSupport() \
        .getOrCreate()

    spark.sql("drop table sparktest").show()
    # 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
    spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
          "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

    #本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
    # CREATE TABLE `test1` (
    #     `a` int(11) NOT NULL,
    #      `b` time DEFAULT NULL,
    #      `c` double DEFAULT NULL,
    #       PRIMARY KEY (`a`)
    # ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    for num in range(1, 4):
        #构造polardb的表名
        dbtable = "test1." + "test" + str(num)
        #spark外表关联polardb对应的表
        externalPolarDBTableNow = spark.read \
            .format("jdbc") \
            .option("driver", "com.mysql.jdbc.Driver") \
            .option("url", "jdbc:mysql://pc-.mysql.polardb.rds.aliyuncs.com:3306") \
            .option("dbtable", dbtable) \
            .option("user", "ma,e") \
            .option("password", "xxx*") \
            .load().registerTempTable("polardbTableTemp")
        #生成本次polardb表数据要写入的spark表的分区信息
        (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
        #执行导数据sql
        spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
              "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
        #删除临时的spark映射polardb表的catalog
        spark.catalog.dropTempView("polardbTableTemp")
        #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
        spark.sql("show partitions sparktest").show(1000, False)
        spark.sql("select count(*) from sparktest").show()
    spark.stop()
```

![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==)![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== "点击并拖拽以移动")
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics