`
qindongliang1922
  • 浏览: 2172222 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117108
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125448
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59555
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71034
社区版块
存档分类
最新评论

如何使用scala+spark读写hbase?

阅读更多


最近工作有点忙,所以文章更新频率低了点,希望大家可以谅解,好了,言归正传,下面进入今天的主题:

如何使用scala+spark读写Hbase

软件版本如下:

scala2.11.8

spark2.1.0

hbase1.2.0


公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。



接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时操作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的操作api,势必速度回慢上许多。

关于批量操作Hbase,一般我们都会用MapReduce来操作,这样可以大大加快处理效率,原来也写过MR操作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。


整个流程如下:


(1)全量读取hbase表的数据

(2)做一系列的ETL

(3)把全量数据再写回hbase



核心代码如下:



//获取conf
 val conf=HBaseConfiguration.create()
  //设置读取的表
  conf.set(TableInputFormat.INPUT_TABLE,tableName)
  //设置写入的表
  conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
//创建sparkConf    
   val sparkConf=new SparkConf()
   //设置spark的任务名
   sparkConf.setAppName("read and write for hbase ")
   //创建spark上下文
   val sc=new SparkContext(sparkConf)
   
   //为job指定输出格式和输出表名
   
    val newAPIJobConfiguration1 = Job.getInstance(conf)
    newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)
    newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
   
   
   //全量读取hbase表
      val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]
      ,classOf[ImmutableBytesWritable]
      ,classOf[Result]
    )
   
   //过滤空数据,然后对每一个记录做更新,并转换成写入的格式
    val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)
    
    //转换后的结果,再次做过滤
    val save_rdd=final_rdd.filter(checkNull)
    
    //最终在写回hbase表
 save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)
    sc.stop()
 



从上面的代码可以看出来,使用spark+scala操作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:


第一个:checkNotEmptyKs

作用:过滤掉空列簇的数据

  def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={
    val r=f._2
    val rowkey=Bytes.toString(r.getRow)
    val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala
    if(map.isEmpty)  false else true
  }



第二个:forDatas

作用:读取每一条数据,做update后,在转化成写入操作


  def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={
      val r=f._2 //获取Result
      val put:Put=new Put(r.getRow) //声明put
      val ks=Bytes.toBytes("ks") //读取指定列簇
      val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala
      map.foreach(kv=>{//遍历每一个rowkey下面的指定列簇的每一列的数据做转化
                val kid= Bytes.toString(kv._1)//知识点id
                var value=Bytes.toString(kv._2)//知识点的value值
		value="修改后的value"
		put.addColumn(ks,kv._1,Bytes.toBytes(value))	//放入put对象
      }
      )
    if(put.isEmpty)  null  else (new ImmutableBytesWritable(),put)

  }




第三个:checkNull
作用:过滤最终结果里面的null数据
  def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={
    if(f==null)  false  else true
  }



上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。

除了上面的方式,还有一些开源的框架,也封装了相关的处理逻辑,使得spark操作hbase变得更简洁,有兴趣的朋友可以了解下,github链接如下:


https://github.com/nerdammer/spark-hbase-connector

https://github.com/hortonworks-spark/shc




有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。





0
1
分享到:
评论

相关推荐

    徐老师大数据培训Hadoop+HBase+ZooKeeper+Spark+Kafka+Scala+Ambari

    根据提供的标题、描述、标签及部分内容链接,我们可以推断出这是一个关于大数据技术栈的培训课程,涉及的技术包括Hadoop、HBase、Zookeeper、Spark、Kafka、Scala以及Ambari。下面将针对这些技术进行详细的介绍和...

    flink+hbase+spark_linux.rar

    标题中的"flink+hbase+spark_linux.rar"表明这是一个关于大数据处理框架Flink、分布式数据库HBase以及大数据处理引擎Spark在Linux操作系统环境下的综合应用。这个压缩包可能包含相关的配置文件、示例代码、教程文档...

    基于spark+flume+kafka+hbase的实时日志处理分析系统.zip

    通过运行该项目,学生可以深入理解Spark的实时流处理(例如使用Spark Streaming或Structured Streaming)、Flume的数据采集、Kafka的消息传递以及HBase的分布式存储。同时,这个项目也是对Java编程能力、分布式系统...

    scala-hive-HBASE-Api.7z

    Scala的强大之处在于其简洁的语法和对并发处理的良好支持,这使得它在大数据处理框架如Spark中被广泛使用。 2. **Hive**:Apache Hive是由Facebook开源的一个数据仓库工具,它可以将结构化的数据文件映射为一张...

    Spark+SparkSQL+Spark Streaming+Spark Core+数据处理

    Spark SQL支持JDBC接口,可以直接连接到数据库进行数据读写,也能与Hive无缝集成,方便数据仓库的使用。 Spark Streaming Spark Streaming提供了处理实时数据流的能力,它将连续的数据流划分为小批量的微批次...

    spark-hbase-ingestion:Spark HBase使用DataFrame进行读写

    使用数据框的spark-hbase-ingestion / ** 转换记录以插入HBase的方法 @param记录 @param cf列族 @返回 */ def toHbaseRecords(记录:Array [(String,Array [(String,String)])],cf:String):RDD [...

    javaApi_sparkhiveAPI_hbaseAPI.zip

    - **Spark与HBase集成**:Spark可以连接到HBase,通过RDD(Resilient Distributed Datasets)或DataFrame进行数据读写,实现高效的大规模数据处理。 - **HBaseRDD**:Spark提供了HBaseRDD类,允许开发者直接操作...

    大数据资料Spark\HBase\HDFS 二次开发 PPT

    2. **HBase二次开发**:HBase是一个基于Hadoop的分布式列族数据库,适用于大数据实时读写场景。二次开发可能涉及扩展HBase的RegionServer、Master节点功能,开发自定义过滤器、 Coprocessor或者实现与外部系统的接口...

    数据分析平台,集成kafka、spark、hbase并附带示例.zip

    这个压缩包“数据分析平台,集成kafka、spark、hbase并附带示例.zip”提供了一个集成这些关键技术的数据处理解决方案。让我们详细了解一下其中涉及的技术点: 1. **Kafka**: Kafka是一种分布式流处理平台,用于...

    original-spark-examples-2.4.3.jar.zip

    这个模块允许Spark作业直接读写HBase表,通过HBaseRDD(Resilient Distributed Dataset for HBase)实现。然而,由于版本差异,直接使用预编译的Spark库可能无法与HBase2无缝对接。 2. **问题的出现** 当尝试使用...

    hbase-1.4.10-bin.tar.gz

    Spark提供了HBase connector,允许Spark作业直接读写HBase。在Spark应用中,你需要添加HBase和HBase-connector相关的依赖。在Spark的`spark-defaults.conf`文件中指定HBase的相关配置,如`spark.hadoop.hbase....

    hadoop、hbase、hive等相关面试问题

    本文总结了Hadoop、HBase、Hive以及Spark等大数据技术的相关面试知识点,包括HBase与Hive的关系、HBase的数据结构、Spark Core与Spark SQL的比较、RDD vs DataFrame vs DataSet、Scala与Java的互操作性、为什么选择...

    【Spark大数据习题】习题-Spark SQL&&&Kafka&& HBase&&HivePDF资源路径-Spark2

    【Spark大数据习题】...总的来说,这个习题集覆盖了Scala语言基础、Spark核心功能、SQL查询、实时数据处理(Kafka)、大数据存储(HBase)和数据仓库(Hive)等多个关键知识点,是学习和掌握大数据技术栈的良好资源。

    计算机课程毕设:基于spark streaming和kafka,hbase的日志统计分析系统.zip

    7. **编程语言和工具**:项目的实现可能涉及Java、Scala或Python等编程语言,因为这些是Spark和HBase常用的编程接口。同时,开发环境可能包括IntelliJ IDEA或Eclipse等IDE,以及Git版本控制工具。 8. **日志分析**...

    建立Hive和Hbase的映射关系,通过Spark将Hive表中数据导入ClickHouse

    本话题关注的是如何建立Hive与HBase之间的映射关系,并利用Spark将Hive中的数据高效地导入到ClickHouse数据库。以下将详细介绍这一过程的关键步骤和涉及的技术点。 首先,Hive是基于Hadoop的数据仓库工具,用于存储...

    spark-2.4.6-bin-2.6.0-cdh5.7.0.tgz

    使用Scala编写Spark代码可以利用其强大的类型系统和并发模型,提高代码的可读性和可维护性。Spark API为Scala提供了丰富的数据处理操作,如map、reduce、filter等,使得开发人员可以轻松地进行数据转换和分析。 ...

    Backup-Repo:Astro(HBase上的Spark SQL)的发行版本已移至

    综上所述,Astro是一个基于Scala的开源项目,它使用户能够使用Spark SQL在HBase上执行SQL查询,从而实现大数据的高效分析。项目可能已经更新了发布位置,提供最新的代码和资源。对于熟悉Scala、Spark和HBase的开发者...

    基于spark streaming和kafka,hbase的日志统计分析系统.zip

    在提供的压缩包文件`code_resource_010`中,可能包含了实现以上功能的相关代码资源,包括Spark Streaming的Java或Scala代码、Kafka配置文件以及与HBase交互的代码片段。通过这些代码,开发者可以深入理解并实践整个...

    大数据:Apache技术和大数据实践(Hadoop,Spark,Scala,Hbase,Cassandra ...)

    本主题将深入探讨几个关键的Apache项目,包括Hadoop、Spark、Scala、HBase和Cassandra,以及相关的生态系统组件如Zookeeper和Spark Streaming。这些技术在现代大数据解决方案中的应用广泛,对于理解和掌握大数据实践...

Global site tag (gtag.js) - Google Analytics