最近工作有点忙,所以文章更新频率低了点,希望大家可以谅解,好了,言归正传,下面进入今天的主题:
如何使用scala+spark读写Hbase
软件版本如下:
scala2.11.8
spark2.1.0
hbase1.2.0
公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。
接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时操作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的操作api,势必速度回慢上许多。
关于批量操作Hbase,一般我们都会用MapReduce来操作,这样可以大大加快处理效率,原来也写过MR操作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。
整个流程如下:
(1)全量读取hbase表的数据
(2)做一系列的ETL
(3)把全量数据再写回hbase
核心代码如下:
//获取conf
val conf=HBaseConfiguration.create()
//设置读取的表
conf.set(TableInputFormat.INPUT_TABLE,tableName)
//设置写入的表
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
//创建sparkConf
val sparkConf=new SparkConf()
//设置spark的任务名
sparkConf.setAppName("read and write for hbase ")
//创建spark上下文
val sc=new SparkContext(sparkConf)
//为job指定输出格式和输出表名
val newAPIJobConfiguration1 = Job.getInstance(conf)
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)
newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
//全量读取hbase表
val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]
,classOf[ImmutableBytesWritable]
,classOf[Result]
)
//过滤空数据,然后对每一个记录做更新,并转换成写入的格式
val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)
//转换后的结果,再次做过滤
val save_rdd=final_rdd.filter(checkNull)
//最终在写回hbase表
save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)
sc.stop()
从上面的代码可以看出来,使用spark+scala操作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:
第一个:checkNotEmptyKs
作用:过滤掉空列簇的数据
def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={
val r=f._2
val rowkey=Bytes.toString(r.getRow)
val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala
if(map.isEmpty) false else true
}
第二个:forDatas
作用:读取每一条数据,做update后,在转化成写入操作
def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={
val r=f._2 //获取Result
val put:Put=new Put(r.getRow) //声明put
val ks=Bytes.toBytes("ks") //读取指定列簇
val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala
map.foreach(kv=>{//遍历每一个rowkey下面的指定列簇的每一列的数据做转化
val kid= Bytes.toString(kv._1)//知识点id
var value=Bytes.toString(kv._2)//知识点的value值
value="修改后的value"
put.addColumn(ks,kv._1,Bytes.toBytes(value)) //放入put对象
}
)
if(put.isEmpty) null else (new ImmutableBytesWritable(),put)
}
第三个:checkNull
作用:过滤最终结果里面的null数据
def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={
if(f==null) false else true
}
上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。
除了上面的方式,还有一些开源的框架,也封装了相关的处理逻辑,使得spark操作hbase变得更简洁,有兴趣的朋友可以了解下,github链接如下:
https://github.com/nerdammer/spark-hbase-connector
https://github.com/hortonworks-spark/shc
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
根据提供的标题、描述、标签及部分内容链接,我们可以推断出这是一个关于大数据技术栈的培训课程,涉及的技术包括Hadoop、HBase、Zookeeper、Spark、Kafka、Scala以及Ambari。下面将针对这些技术进行详细的介绍和...
标题中的"flink+hbase+spark_linux.rar"表明这是一个关于大数据处理框架Flink、分布式数据库HBase以及大数据处理引擎Spark在Linux操作系统环境下的综合应用。这个压缩包可能包含相关的配置文件、示例代码、教程文档...
通过运行该项目,学生可以深入理解Spark的实时流处理(例如使用Spark Streaming或Structured Streaming)、Flume的数据采集、Kafka的消息传递以及HBase的分布式存储。同时,这个项目也是对Java编程能力、分布式系统...
Scala的强大之处在于其简洁的语法和对并发处理的良好支持,这使得它在大数据处理框架如Spark中被广泛使用。 2. **Hive**:Apache Hive是由Facebook开源的一个数据仓库工具,它可以将结构化的数据文件映射为一张...
Spark SQL支持JDBC接口,可以直接连接到数据库进行数据读写,也能与Hive无缝集成,方便数据仓库的使用。 Spark Streaming Spark Streaming提供了处理实时数据流的能力,它将连续的数据流划分为小批量的微批次...
使用数据框的spark-hbase-ingestion / ** 转换记录以插入HBase的方法 @param记录 @param cf列族 @返回 */ def toHbaseRecords(记录:Array [(String,Array [(String,String)])],cf:String):RDD [...
- **Spark与HBase集成**:Spark可以连接到HBase,通过RDD(Resilient Distributed Datasets)或DataFrame进行数据读写,实现高效的大规模数据处理。 - **HBaseRDD**:Spark提供了HBaseRDD类,允许开发者直接操作...
2. **HBase二次开发**:HBase是一个基于Hadoop的分布式列族数据库,适用于大数据实时读写场景。二次开发可能涉及扩展HBase的RegionServer、Master节点功能,开发自定义过滤器、 Coprocessor或者实现与外部系统的接口...
这个压缩包“数据分析平台,集成kafka、spark、hbase并附带示例.zip”提供了一个集成这些关键技术的数据处理解决方案。让我们详细了解一下其中涉及的技术点: 1. **Kafka**: Kafka是一种分布式流处理平台,用于...
Spark提供了HBase connector,允许Spark作业直接读写HBase。在Spark应用中,你需要添加HBase和HBase-connector相关的依赖。在Spark的`spark-defaults.conf`文件中指定HBase的相关配置,如`spark.hadoop.hbase....
这个模块允许Spark作业直接读写HBase表,通过HBaseRDD(Resilient Distributed Dataset for HBase)实现。然而,由于版本差异,直接使用预编译的Spark库可能无法与HBase2无缝对接。 2. **问题的出现** 当尝试使用...
本文总结了Hadoop、HBase、Hive以及Spark等大数据技术的相关面试知识点,包括HBase与Hive的关系、HBase的数据结构、Spark Core与Spark SQL的比较、RDD vs DataFrame vs DataSet、Scala与Java的互操作性、为什么选择...
【Spark大数据习题】...总的来说,这个习题集覆盖了Scala语言基础、Spark核心功能、SQL查询、实时数据处理(Kafka)、大数据存储(HBase)和数据仓库(Hive)等多个关键知识点,是学习和掌握大数据技术栈的良好资源。
7. **编程语言和工具**:项目的实现可能涉及Java、Scala或Python等编程语言,因为这些是Spark和HBase常用的编程接口。同时,开发环境可能包括IntelliJ IDEA或Eclipse等IDE,以及Git版本控制工具。 8. **日志分析**...
本话题关注的是如何建立Hive与HBase之间的映射关系,并利用Spark将Hive中的数据高效地导入到ClickHouse数据库。以下将详细介绍这一过程的关键步骤和涉及的技术点。 首先,Hive是基于Hadoop的数据仓库工具,用于存储...
使用Scala编写Spark代码可以利用其强大的类型系统和并发模型,提高代码的可读性和可维护性。Spark API为Scala提供了丰富的数据处理操作,如map、reduce、filter等,使得开发人员可以轻松地进行数据转换和分析。 ...
综上所述,Astro是一个基于Scala的开源项目,它使用户能够使用Spark SQL在HBase上执行SQL查询,从而实现大数据的高效分析。项目可能已经更新了发布位置,提供最新的代码和资源。对于熟悉Scala、Spark和HBase的开发者...
在提供的压缩包文件`code_resource_010`中,可能包含了实现以上功能的相关代码资源,包括Spark Streaming的Java或Scala代码、Kafka配置文件以及与HBase交互的代码片段。通过这些代码,开发者可以深入理解并实践整个...
本主题将深入探讨几个关键的Apache项目,包括Hadoop、Spark、Scala、HBase和Cassandra,以及相关的生态系统组件如Zookeeper和Spark Streaming。这些技术在现代大数据解决方案中的应用广泛,对于理解和掌握大数据实践...