日常工作中我们往往面对的数据都是海量的文件数据,我们如何快速通过spark将文件导入到hbase库中,我这写了一个简单的例子仅供参考,实际上数据是需要经过清洗才能放入到hbase库中的。
由于数据文件内容涉及到公司实际项目,不便贴出,此文着重spark提出数据、清洗、入hbase库这个逻辑的实现,scala写的代码比较精简,代码如下:
ParseClient.java主要实现文件加载、清洗、入库的工作:
package main.scala import org.apache.spark.SparkConf import org.apache.spark.SparkContext import scala.collection.mutable.ListBuffer import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import scala.collection.mutable.ListBuffer object ParseClient { def main(args: Array[String]) { val conf = new SparkConf(); conf.setAppName("ParseClient") conf.setMaster("local"); val sc = new SparkContext(conf); val textRdd = sc.textFile("WW_2016-10-13~2016-10-13.txt"); ---数据清洗 var smailList = new ListBuffer[String](); val arrRdd = textRdd.flatMap { line => { val allList = new ListBuffer[ListBuffer[String]](); if (line == "" || "".equals(line)) { allList += smailList; smailList = new ListBuffer[String](); } else { smailList += line; } allList; } } val truncArrRdd = arrRdd.map { arr => { val lineArr = new ListBuffer[(String, String, String, String)]; arr.foreach { element => { val eleArr = element.split(" "); if (eleArr.length >= 4) { val tuple = (eleArr(0), eleArr(1), eleArr(2), eleArr(3)); lineArr += tuple; } } } lineArr } } val resultRdd = truncArrRdd.map(tupleArr => { var serviceName: String = ""; var cosumerName: String = ""; var date: String = ""; var context: String = ""; for (tuple <- tupleArr) { if (tuple._3.contains("官方旗舰店")) { serviceName = tuple._3 } else { cosumerName = tuple._3; } date = tuple._1; context = context + tuple._4 + "\n"; } (cosumerName, serviceName, date, context); }) ----获取hbase库模型对象、入hbase库代码实现 val job = new HBaseCommon().getJob("jeffTest"); val put = resultRdd.map(tuple => { val rowkey = tuple._3 + tuple._2 + tuple._1; val put = new Put(Bytes.toBytes(rowkey)); put.add(Bytes.toBytes("cf"), Bytes.toBytes("content"), Bytes.toBytes(tuple._4)) (new ImmutableBytesWritable, put) }).saveAsNewAPIHadoopDataset(job.getConfiguration()); sc.stop(); } }
下面这个是辅助类HBaseCommon.java,主要是负责连接spark、hbase库、以及附带写了hbase库读取:
package main.scala import java.util import scala.collection.mutable.{ListBuffer, LinkedHashMap} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{KeyValue, HBaseConfiguration} import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkContext import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.{Get, Scan, HTable, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat} class HBaseCommon { /** * 获取初始化配置 */ def getConfiguration(): Configuration = { val map: LinkedHashMap[String, String] = new LinkedHashMap[String, String](); val HBASE_CONFIG = new Configuration(); HBASE_CONFIG.set("hbase.zookeeper.quorum", map.getOrElse("zookeeper_quorum", "host1,host2,host3")); HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", map.getOrElse("zookeeper_port", "2181")); val configuration = HBaseConfiguration.create(HBASE_CONFIG); configuration; } /** * 获取作业信息 */ def getJob(tableName: String): Job = { val configuration = this.getConfiguration(); configuration.set(TableOutputFormat.OUTPUT_TABLE, tableName); var job = new Job(configuration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) job; } /** * 读取hbase某个表中的全部内容 */ def getTableInfo(tableName: String, sc: SparkContext): util.ArrayList[String]= { val configuration = this.getConfiguration() configuration.set(TableInputFormat.INPUT_TABLE, tableName) val hBaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) var strinfo = "" val count = hBaseRDD.count() println("HBase RDD Count:" + count) hBaseRDD.cache() val table = new HTable(configuration, tableName); val g = new Get("row1".getBytes) val result = table.get(g) val value = Bytes.toString(result.getValue("basic".getBytes, "name".getBytes)) hBaseRDD.cache() println("------------------------scan----------") val res = hBaseRDD.take(count.toInt) val reslist=new util.ArrayList[String]() for (j <- 1 until count.toInt) { var rs = res(j - 1)._2 var kvs = rs.raw for (kv <- kvs) { strinfo += ("rowkey:" + new String(kv.getRow()) + " cf:" + new String(kv.getFamily()) + " column:" + new String(kv.getQualifier()) + " value:" + new String(kv.getValue())+"\n") reslist.add(new String(kv.getValue())) } } reslist } }
相关推荐
Scala和Spark是大数据分析领域中的两个重要工具,它们在处理大规模数据时表现出强大的性能和灵活性。Scala是一种静态类型的函数式编程语言,而Spark是一个分布式计算框架,尤其适合于大数据处理和分析。本教程将深入...
这个文件是用来在Idea中用于整合Scala实现Spark代码编写的pom配置文件. 内置 JDK规定, Spark-core, SparkSQL, mysql依赖的jar包,SparkStreaming, SparkStreaming + Kafka, 向kafka 生产数据需要包, 连接 Redis 需要...
文件类型包括9个XML配置文件、2个CRC文件、2个Scala源代码文件、1个名称文件、1个Markdown文档、1个Parquet数据文件、1个名称列表文件、1个TXT文本文件、1个PMML文件和1个Java源代码文件。该系统利用JPMML-Spark将...
在Spark中,Scala用于定义数据处理逻辑,通过RDD(弹性分布式数据集)或者DataFrame/Dataset API进行操作,这些API提供了丰富的转换和行动操作,如map、filter、reduce等,支持并行计算,极大地提高了处理速度。...
如果提示`JAVA_HOME is not set`,则需要在`spark-env.sh`文件中添加`JAVA_HOME`的配置。 3. **日志级别配置**:还可以通过修改`log4j.properties`文件来调整日志输出级别。例如,将`log4j.rootCategory=INFO,...
标题中的“spring boot + scala + spark http驱动spark计算”揭示了一个使用现代技术栈构建的数据处理系统。这个系统基于Spring Boot框架来提供HTTP服务,利用Scala作为编程语言,并借助Apache Spark进行大数据计算...
### Linux中Scala和Spark安装教程 #### Scala安装步骤详解 在深入探讨如何在Linux系统上安装Scala和Spark之前,我们首先需要确保系统已正确安装了JDK,因为Scala和Spark都依赖于Java运行环境。 ##### 步骤1:安装...
文件类型包括80个Scala源代码文件、4个XML配置文件、3个TXT文档、2个Markdown文档、2个Java源代码文件、1个GIT忽略文件、1个日志文件、1个JSON配置文件和1个Properties配置文件。该学习项目适合用于学习和实践Scala...
RDD是Spark中的基本数据结构,表示不可变、分区的元素集合,可以在集群中并行操作。在`sparkfirst`或其他文件中,可能会有创建和操作RDD的代码。 这些代码示例可以作为学习和理解Spark工作原理的起点。通过研究这些...
Scala and Spark for Big Data Analytics by Md. Rezaul Karim English | 25 July 2017 | ISBN: 1785280848 | ASIN: B072J4L8FQ | 898 Pages | AZW3 | 20.56 MB Harness the power of Scala to program Spark and ...
scala语言和python一样都是交互式的语言,操作简单。这是wordcount的scala实现,简单明了,比java实现简单很多,希望对大家有所帮助
首先,在你的Scala源代码文件中引入所需的库。例如: ```scala import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.TableName ...
在 Spark Shell 中,可以使用内置函数读取文件,如 `sc.textFile()`,并进行简单的数据分析。实验中统计了 `/home/hadoop/test.txt` 和 `/user/hadoop/test.txt` 文件的行数,这展示了 Spark 对文本数据的基本操作。...
总之,这个项目展示了如何在Spark中运用Scala和IKAnalyzer进行中文分词统计,帮助我们探索和理解古代文学作品的语言特征。这样的实践不仅适用于学术研究,也对于文本挖掘、信息检索等领域有着广泛的应用价值。通过...
文件类型包括43个Scala源代码文件、42个Java源代码文件、2个TXT文本文件、1个city_info文件、1个course文件、1个JSON配置文件、1个dfs_in文件、1个es_in文件、1个product_info文件和1个score文件。该项目旨在熟悉...
【标题】中的“hadoop scala spark 例子项目,运行了单机wordcount”指的是一个使用Hadoop、Scala和Spark框架实现的简单WordCount程序。在大数据处理领域,WordCount是入门级的经典示例,用于统计文本文件中单词出现...
本教程将探讨如何使用 Scala 语言来操作 Spark,并介绍如何与 MySQL 数据库和 HDFS(Hadoop 分布式文件系统)进行交互。以下是相关知识点的详细说明: **1. Scala 语言基础** Scala 是一种多范式编程语言,融合了...
Spark itself is written with Scala and naturally, as a starting point, we will discuss a brief introduction to Scala, such as the basic aspects of its history, purposes, and how to install Scala on ...
Scala and Spark for Big Data Analytics 英文无水印pdf pdf所有页面使用FoxitReader和PDF-XChangeViewer测试都可以打开 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请...
Scala and Spark for Big Data Analytics 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除