`
sillycat
  • 浏览: 2564599 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Data Solution 2019(6)MySQL Data Source

 
阅读更多
Data Solution 2019(6)MySQL Data Source

Make sure our connection to database is good
> grant all privileges on database.* to root@‘142.xxx.xxx.xxx' identified by ‘xxxxxx';
> flush privileges;
In my Zeppelin Notebook
I can use this to load the dependencies
%spark.dep
z.load("mysql:mysql-connector-java:5.1.47”)
Connect to the Table and Database
val homeAdvisorCompanysRawDF = sqlContext.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://45.55.xx.xx:3306/sillycat_services")
.option("user", "root")
.option("password", “xxxxxx")
.option("dbtable", "copy_home_companys")
.load()
homeAdvisorCompanysRawDF.printSchema()
homeAdvisorCompanysRawDF.registerTempTable("homeadvisorcompanys")

Use the function within on parameter Method
val checkPhone : (String => Int) = (phone: String) => {
    val regexStr = "^(1\\-)?[0-9]{3}\\-?[0-9]{3}\\-?[0-9]{4}$"
    if (phone.matches(regexStr)) {
        20
    } else {
        10
    }
}
val checkPhoneColumn = udf(checkPhone)
val phoneDF = homeAdvisorCompanysRawDF.withColumn("phoneScore", checkPhoneColumn(homeAdvisorCompanysRawDF("phone")))
phoneDF.select("phone", "phoneScore").show(2)
Using the Function with multiple parameters
val checkAddress = (location: String, street_address: String, address_locality: String, address_region: String, postal_code: String ) => {
    if(location != null && !location.isEmpty() && postal_code != null && !postal_code.isEmpty() ){
        20
    } else {
        10
    }
}
val checkAddressColumn = udf(checkAddress)
val addressDF = phoneDF.withColumn("addressScore", checkAddressColumn(phoneDF("location"), phoneDF("street_address"), phoneDF("address_locality"), phoneDF("address_region"), phoneDF("postal_code")))
addressDF.select("phone", "phoneScore", "location", "postal_code", "addressScore").show(2)
Sum up all the related columns and get a total Score
val columnsToSum = List(col("phoneScore"), col("addressScore"))
val resultDF = addressDF.withColumn("totalScore", columnsToSum.reduce(_ + _))
resultDF.select("phone", "phoneScore", "location", "postal_code", "addressScore", "totalScore").show(2);

References:
https://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.47
https://zeppelin.apache.org/docs/latest/interpreter/spark.html



分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics