`

scala实现spark读取文件、清洗、入库base中

 
阅读更多

       日常工作中我们往往面对的数据都是海量的文件数据,我们如何快速通过spark将文件导入到hbase库中,我这写了一个简单的例子仅供参考,实际上数据是需要经过清洗才能放入到hbase库中的。

       由于数据文件内容涉及到公司实际项目,不便贴出,此文着重spark提出数据、清洗、入hbase库这个逻辑的实现,scala写的代码比较精简,代码如下:

 

ParseClient.java主要实现文件加载、清洗、入库的工作:

package main.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import scala.collection.mutable.ListBuffer

object ParseClient {
  def main(args: Array[String]) {
    val conf = new SparkConf();
conf.setAppName("ParseClient")
    conf.setMaster("local");
val sc = new SparkContext(conf);
val textRdd = sc.textFile("WW_2016-10-13~2016-10-13.txt");
---数据清洗
var smailList = new ListBuffer[String]();
val arrRdd = textRdd.flatMap { line => {
      val allList = new ListBuffer[ListBuffer[String]]();
if (line == "" || "".equals(line)) {
        allList += smailList;
smailList = new ListBuffer[String]();
} else {
        smailList += line;
}
      allList;
}
    }

    val truncArrRdd = arrRdd.map { arr => {
      val lineArr = new ListBuffer[(String, String, String, String)];
arr.foreach { element => {
        val eleArr = element.split(" ");
if (eleArr.length >= 4) {
          val tuple = (eleArr(0), eleArr(1), eleArr(2), eleArr(3));
lineArr += tuple;
}
      }
      }
      lineArr
    }
    }

    val resultRdd = truncArrRdd.map(tupleArr => {
      var serviceName: String = "";
var cosumerName: String = "";
var date: String = "";
var context: String = "";
for (tuple <- tupleArr) {
        if (tuple._3.contains("官方旗舰店")) {
          serviceName = tuple._3
        } else {
          cosumerName = tuple._3;
}
        date = tuple._1;
context = context + tuple._4 + "\n";
}

      (cosumerName, serviceName, date, context);
})

----获取hbase库模型对象、入hbase库代码实现
    val job = new HBaseCommon().getJob("jeffTest");
val put = resultRdd.map(tuple => {
      val rowkey = tuple._3 + tuple._2 + tuple._1;
val put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("content"), Bytes.toBytes(tuple._4))
      (new ImmutableBytesWritable, put)
    }).saveAsNewAPIHadoopDataset(job.getConfiguration());
sc.stop();
}
}

 

下面这个是辅助类HBaseCommon.java,主要是负责连接spark、hbase库、以及附带写了hbase库读取:

package main.scala

import java.util

import scala.collection.mutable.{ListBuffer, LinkedHashMap}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{KeyValue, HBaseConfiguration}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Get, Scan, HTable, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}

class HBaseCommon {
  /**
   * 获取初始化配置
   */
def getConfiguration(): Configuration = {
    val map: LinkedHashMap[String, String] = new LinkedHashMap[String, String]();
val HBASE_CONFIG = new Configuration();
HBASE_CONFIG.set("hbase.zookeeper.quorum", map.getOrElse("zookeeper_quorum", "host1,host2,host3"));
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", map.getOrElse("zookeeper_port", "2181"));
val configuration = HBaseConfiguration.create(HBASE_CONFIG);
configuration;
}

  /**
   * 获取作业信息
   */
def getJob(tableName: String): Job = {
    val configuration = this.getConfiguration();
configuration.set(TableOutputFormat.OUTPUT_TABLE, tableName);
var job = new Job(configuration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    job;
}

  /**
   * 读取hbase某个表中的全部内容
   */
def getTableInfo(tableName: String, sc: SparkContext): util.ArrayList[String]= {
    val configuration = this.getConfiguration()
    configuration.set(TableInputFormat.INPUT_TABLE, tableName)

    val hBaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])

    var strinfo = ""
val count = hBaseRDD.count()
    println("HBase RDD Count:" + count)
    hBaseRDD.cache()

    val table = new HTable(configuration, tableName);
val g = new Get("row1".getBytes)
    val result = table.get(g)
    val value = Bytes.toString(result.getValue("basic".getBytes, "name".getBytes))

    hBaseRDD.cache()
    println("------------------------scan----------")
    val res = hBaseRDD.take(count.toInt)
    val reslist=new util.ArrayList[String]()
    for (j <- 1 until count.toInt) {
      var rs = res(j - 1)._2
      var kvs = rs.raw
      for (kv <- kvs) {
        strinfo += ("rowkey:" + new String(kv.getRow()) +
          " cf:" + new String(kv.getFamily()) +
          " column:" + new String(kv.getQualifier()) +
          " value:" + new String(kv.getValue())+"\n")

        reslist.add(new String(kv.getValue()))
      }
    }
    reslist
  }
}

 

 

 

分享到:
评论

相关推荐

    Scala和Spark大数据分析函数式编程、数据流和机器学习

    Scala和Spark是大数据分析领域中的两个重要工具,它们在处理大规模数据时表现出强大的性能和灵活性。Scala是一种静态类型的函数式编程语言,而Spark是一个分布式计算框架,尤其适合于大数据处理和分析。本教程将深入...

    pom.xml(Idea中用于整合Scala实现Spark代码编写的配置文件)

    这个文件是用来在Idea中用于整合Scala实现Spark代码编写的pom配置文件. 内置 JDK规定, Spark-core, SparkSQL, mysql依赖的jar包,SparkStreaming, SparkStreaming + Kafka, 向kafka 生产数据需要包, 连接 Redis 需要...

    scala与spark基础

    在Spark中,Scala用于定义数据处理逻辑,通过RDD(弹性分布式数据集)或者DataFrame/Dataset API进行操作,这些API提供了丰富的转换和行动操作,如map、filter、reduce等,支持并行计算,极大地提高了处理速度。...

    基于Scala的Spark模型转换为PMML格式设计源码

    文件类型包括9个XML配置文件、2个CRC文件、2个Scala源代码文件、1个名称文件、1个Markdown文档、1个Parquet数据文件、1个名称列表文件、1个TXT文本文件、1个PMML文件和1个Java源代码文件。该系统利用JPMML-Spark将...

    scala和spark的安装

    如果提示`JAVA_HOME is not set`,则需要在`spark-env.sh`文件中添加`JAVA_HOME`的配置。 3. **日志级别配置**:还可以通过修改`log4j.properties`文件来调整日志输出级别。例如,将`log4j.rootCategory=INFO,...

    spring boot + scala + spark http驱动spark计算

    标题中的“spring boot + scala + spark http驱动spark计算”揭示了一个使用现代技术栈构建的数据处理系统。这个系统基于Spring Boot框架来提供HTTP服务,利用Scala作为编程语言,并借助Apache Spark进行大数据计算...

    Linux中Scala和Spark安装

    ### Linux中Scala和Spark安装教程 #### Scala安装步骤详解 在深入探讨如何在Linux系统上安装Scala和Spark之前,我们首先需要确保系统已正确安装了JDK,因为Scala和Spark都依赖于Java运行环境。 ##### 步骤1:安装...

    基于Scala的Spark学习项目设计源码

    文件类型包括80个Scala源代码文件、4个XML配置文件、3个TXT文档、2个Markdown文档、2个Java源代码文件、1个GIT忽略文件、1个日志文件、1个JSON配置文件和1个Properties配置文件。该学习项目适合用于学习和实践Scala...

    scala开发spark代码

    RDD是Spark中的基本数据结构,表示不可变、分区的元素集合,可以在集群中并行操作。在`sparkfirst`或其他文件中,可能会有创建和操作RDD的代码。 这些代码示例可以作为学习和理解Spark工作原理的起点。通过研究这些...

    Scala and Spark for Big Data Analytics

    Scala and Spark for Big Data Analytics by Md. Rezaul Karim English | 25 July 2017 | ISBN: 1785280848 | ASIN: B072J4L8FQ | 898 Pages | AZW3 | 20.56 MB Harness the power of Scala to program Spark and ...

    实验七:Spark初级编程实践

    在 Spark Shell 中,可以使用内置函数读取文件,如 `sc.textFile()`,并进行简单的数据分析。实验中统计了 `/home/hadoop/test.txt` 和 `/user/hadoop/test.txt` 文件的行数,这展示了 Spark 对文本数据的基本操作。...

    spark框架中wordcount的scala实现

    scala语言和python一样都是交互式的语言,操作简单。这是wordcount的scala实现,简单明了,比java实现简单很多,希望对大家有所帮助

    scala API 操作hbase表

    首先,在你的Scala源代码文件中引入所需的库。例如: ```scala import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.TableName ...

    基于Scala的Spark数据处理练习设计源码

    文件类型包括43个Scala源代码文件、42个Java源代码文件、2个TXT文本文件、1个city_info文件、1个course文件、1个JSON配置文件、1个dfs_in文件、1个es_in文件、1个product_info文件和1个score文件。该项目旨在熟悉...

    hadoop scala spark 例子项目,运行了单机wordcount

    【标题】中的“hadoop scala spark 例子项目,运行了单机wordcount”指的是一个使用Hadoop、Scala和Spark框架实现的简单WordCount程序。在大数据处理领域,WordCount是入门级的经典示例,用于统计文本文件中单词出现...

    Scala and Spark for Big Data Analytics.pdf

    Spark itself is written with Scala and naturally, as a starting point, we will discuss a brief introduction to Scala, such as the basic aspects of its history, purposes, and how to install Scala on ...

    Scala and Spark for Big Data Analytics epub

    Scala and Spark for Big Data Analytics 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除

    scala与spark文档合集

    Spark的核心特性是其内存计算,通过将数据存储在内存中,实现了比Hadoop MapReduce更快的迭代计算,从而在大数据分析领域取得了显著的优势。 **Scala知识点** 1. **类型系统**:Scala具有强类型和静态类型,支持...

    解析JSON文件

    本范例是一个Scala工程,结合Java组件实现了对spark产品分析的结果json文件进行解析,将其内部的几何对象转换为记录集写入数据集里,由于Scala读写文件的效率比较高,故采用Scala与Java组件实现。

    scala+spark

    Scala和Spark是现代大数据处理领域中的重要工具,它们在数据科学和工程中占据了核心地位。Scala是一种多范式编程语言,结合了面向对象和函数式编程的特点,而Spark则是一个分布式计算框架,专为大数据处理设计,提供...

Global site tag (gtag.js) - Google Analytics