最近工作有点忙,所以文章更新频率低了点,希望大家可以谅解,好了,言归正传,下面进入今天的主题:
如何使用scala+spark读写Hbase
软件版本如下:
scala2.11.8
spark2.1.0
hbase1.2.0
公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。
接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时操作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的操作api,势必速度回慢上许多。
关于批量操作Hbase,一般我们都会用MapReduce来操作,这样可以大大加快处理效率,原来也写过MR操作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。
整个流程如下:
(1)全量读取hbase表的数据
(2)做一系列的ETL
(3)把全量数据再写回hbase
核心代码如下:
//获取conf
val conf=HBaseConfiguration.create()
//设置读取的表
conf.set(TableInputFormat.INPUT_TABLE,tableName)
//设置写入的表
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
//创建sparkConf
val sparkConf=new SparkConf()
//设置spark的任务名
sparkConf.setAppName("read and write for hbase ")
//创建spark上下文
val sc=new SparkContext(sparkConf)
//为job指定输出格式和输出表名
val newAPIJobConfiguration1 = Job.getInstance(conf)
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)
newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
//全量读取hbase表
val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]
,classOf[ImmutableBytesWritable]
,classOf[Result]
)
//过滤空数据,然后对每一个记录做更新,并转换成写入的格式
val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)
//转换后的结果,再次做过滤
val save_rdd=final_rdd.filter(checkNull)
//最终在写回hbase表
save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)
sc.stop()
从上面的代码可以看出来,使用spark+scala操作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:
第一个:checkNotEmptyKs
作用:过滤掉空列簇的数据
def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={
val r=f._2
val rowkey=Bytes.toString(r.getRow)
val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala
if(map.isEmpty) false else true
}
第二个:forDatas
作用:读取每一条数据,做update后,在转化成写入操作
def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={
val r=f._2 //获取Result
val put:Put=new Put(r.getRow) //声明put
val ks=Bytes.toBytes("ks") //读取指定列簇
val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala
map.foreach(kv=>{//遍历每一个rowkey下面的指定列簇的每一列的数据做转化
val kid= Bytes.toString(kv._1)//知识点id
var value=Bytes.toString(kv._2)//知识点的value值
value="修改后的value"
put.addColumn(ks,kv._1,Bytes.toBytes(value)) //放入put对象
}
)
if(put.isEmpty) null else (new ImmutableBytesWritable(),put)
}
第三个:checkNull
作用:过滤最终结果里面的null数据
def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={
if(f==null) false else true
}
上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。
除了上面的方式,还有一些开源的框架,也封装了相关的处理逻辑,使得spark操作hbase变得更简洁,有兴趣的朋友可以了解下,github链接如下:
https://github.com/nerdammer/spark-hbase-connector
https://github.com/hortonworks-spark/shc
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
labview程序代码参考学习使用,希望对你有所帮助。
毕设和企业适用springboot生鲜鲜花类及数据处理平台源码+论文+视频.zip
毕设和企业适用springboot企业数据智能分析平台类及汽车管理平台源码+论文+视频
毕设和企业适用springboot社区物业类及企业创新研发平台源码+论文+视频
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Floating Text Example</title> <style> .floating-text { font-size: 24px; position: relative; animation: float 3s ease-in-out infinite; } @keyframes float { 0%, 100% { transform: translateY(0); } 50% { transform: translateY(-20px); } } </style> </head> <body> <div class="floating-text">Hello, I'm floating!</div> <script> document.addEventListener('DOMContentLoaded', function() {
毕设和企业适用springboot社交媒体分析平台类及智慧医疗管理平台源码+论文+视频
毕设和企业适用springboot生鲜鲜花类及餐饮管理平台源码+论文+视频
毕设和企业适用springboot人工智能客服系统类及用户行为分析平台源码+论文+视频
毕设和企业适用springboot全渠道电商平台类及个性化广告平台源码+论文+视频
毕设和企业适用springboot社交互动平台类及线上图书馆源码+论文+视频
毕设和企业适用springboot企业知识管理平台类及供应链优化平台源码+论文+视频
毕设和企业适用springboot企业健康管理平台类及数据处理平台源码+论文+视频.zip
内容概要:本文档是一份面向初学者的详细指南,重点介绍如何利用Vue.js 2.0快速创建和运行简单的Todo List应用。首先指导安装必需的Node.js、npm/yarn等环境准备,接着通过Vue CLI工具生成新的Vue项目,再详细介绍项目目录和组件的构建方式。最后提供了具体的方法实现添加和删除待办事项,并指导如何使用命令启动应用,查看结果。 适合人群:具备基础Web开发技能的前端开发新手,尤其是对Vue框架感兴趣的学习者。 使用场景及目标:作为初学者入门级的学习资料,本文档的目标是让读者能够在最短时间内掌握Vue.js的基础概念和技术栈的应用方式,以便日后可以独立地构建更加复杂的Vue应用。 其他说明:除了学习如何构建应用程序之外,本文档还涵盖了Vue的基本语法和数据绑定、事件处理机制等重要概念,对于理解Vue框架的工作原理十分有帮助。
毕设和企业适用springboot企业健康管理平台类及智能化系统源码+论文+视频.zip
毕设和企业适用springboot企业健康管理平台类及远程医疗平台源码+论文+视频.zip
毕设和企业适用springboot数据可视化类及数据智能化平台源码+论文+视频
毕设和企业适用springboot生鲜鲜花类及用户体验优化平台源码+论文+视频.zip
毕设和企业适用springboot人工智能客服系统类及虚拟银行平台源码+论文+视频
毕设和企业适用springboot社交应用平台类及云计算资源管理平台源码+论文+视频
毕设和企业适用springboot企业数据监控平台类及线上图书馆源码+论文+视频