`

spark读取hbase数据

阅读更多
package ceshi

import java.io.IOException
import java.util
import java.util.{Random, UUID}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by hyt on 5/24/17.
  */
object HbaseTestInsert {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
    sparkConf.setMaster("local[1]").setAppName("HbaseTestInsert")
    val sc = new SparkContext(sparkConf)

    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, "spark")
    conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181")
    conf.set("hbase.zookeeper.property.clientPort","2181")

    val scan:Scan = new Scan()
    scan.addColumn(Bytes.toBytes("s"),Bytes.toBytes("v_ip"))
    val startRow = "1495607836079"
    val endRow = "1495607836920"

    scan.setStartRow(Bytes.toBytes(startRow))
    scan.setStopRow(Bytes.toBytes(endRow))

    conf.set(TableInputFormat.SCAN,convertScanToString(scan))

    //读取数据并转化成rdd
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])


    val count = hBaseRDD.count()

    println(count)

//    hBaseRDD.foreach{case (_,result) =>{
//      //获取行键
//      val key = Bytes.toString(result.getRow)
//      //通过列族和列名获取列
//      val v_ip = Bytes.toString(result.getValue("s".getBytes,"v_ip".getBytes))
//      println("Row key:"+key+" ip:"+v_ip)
//    }}

    hBaseRDD.map(x=>x._2).map{
      x=>
        (Bytes.toString(x.getValue("s".getBytes,"v_ip".getBytes)),1)
    }.reduceByKey(_+_).sortBy(x=>x._2,false).foreach(println)

  }

  def convertScanToString(scan: Scan) = {
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray)
  }

  def readDataFromHbase(): Unit ={
    val scanner: ResultScanner = scanDataByStartAndStopRow("spark", "1495607836079", "1495607836920")
    val iterator = scanner.iterator()
    while(iterator.hasNext()) {
      val result: Result = iterator.next()

      println(new String(result.getValue(Bytes.toBytes("s"), Bytes.toBytes("v_ip"))))

      println(new String(result.getRow))
    }
  }

  def  scanDataByStartAndStopRow(tableName:String,startRow:String,endRow:String):ResultScanner = {

    val conf:Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181")
    conf.set("hbase.zookeeper.property.clientPort","2181")

    var rs : ResultScanner = null
    try {
      val table : HTable  = new HTable(conf,tableName)

      val scan:Scan = new Scan()
      scan.addColumn(Bytes.toBytes("s"),Bytes.toBytes("v_ip"))
      scan.setStartRow(Bytes.toBytes(startRow))
      scan.setStopRow(Bytes.toBytes(endRow))
      rs = table.getScanner(scan)
    } catch {
      case e: IOException => e.printStackTrace()
    }
    rs
  }

    def putHbaseData(): Unit ={

      val conf:Configuration = HBaseConfiguration.create()
      conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181")
      conf.set("hbase.zookeeper.property.clientPort","2181")

      val table:HTable = new HTable(conf,"spark")
      table.setAutoFlush(false)
      val list = new util.ArrayList[Put]()

      for(i <- 0 to 100000){

        val v_ip = getRandomIp()
        val v_time = System.currentTimeMillis().toString
        val rowKey = System.currentTimeMillis() + UUID.randomUUID().toString


        val put:Put = new Put(Bytes.toBytes(rowKey))
        put.add(Bytes.toBytes("s"), Bytes.toBytes("v_time"), Bytes.toBytes(v_time))
        put.add(Bytes.toBytes("s"), Bytes.toBytes("v_ip"), Bytes.toBytes(v_ip))
        list.add(put)
        println(i)
        if(i%100 == 0){
          table.put(list)
          table.flushCommits()
          list.clear()
        }

      }

    def getRandomIp():String = {
      val random_int  = new Random()
      "192"+"."+"168"+"."+random_int.nextInt(255)+"."+random_int.nextInt(255)
    }
  }





}

 

分享到:
评论

相关推荐

    spark读取hbase数据,并使用spark sql保存到mysql

    本示例将详细介绍如何使用 Spark 从 HBase 中读取数据,并通过 Spark SQL 将其存储到 MySQL 数据库中。 首先,让我们了解 Spark 与 HBase 的交互。Spark 提供了 `spark-hbase-connector` 库,允许我们方便地连接到 ...

    spark使用java读取hbase数据做分布式计算.pdf

    总的来说,这个Java程序展示了如何使用Spark读取HBase数据并进行分布式计算。通过Spark的并行处理能力,可以高效地处理大规模的HBase数据,进行复杂的分析任务。要注意的是,实际应用中还需要考虑错误处理、资源管理...

    hbase-rdd:Spark RDD从HBase读取,写入和删除

    本篇文章将详细探讨如何使用 Scala 和 Spark 的 Resilient Distributed Datasets (RDDs) 与 HBase 进行交互,包括读取、写入以及删除数据。 首先,我们需要理解 Spark RDD。RDD 是 Spark 的基本数据抽象,它是不可...

    基于Spark SQL可通过输入SQL语句操作HBase表,目前提供对HBase表的查询、创建、删除以及数据插入+源代码+说明

    - 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! &lt;项目介绍&gt; 1、该资源内项目代码都经过测试运行成功,...

    Spark以及hbase学习资料

    此外,Spark Streaming可以从HBase中读取数据,或者将处理后的结果写入HBase,构建实时数据处理管道。 在实际项目中,Spark可以用来做数据预处理、特征工程和模型训练,而HBase则作为数据仓库,存储大量的历史数据...

    scala API 操作hbase表

    4. 获取HBase表: 获取要操作的HBase表: ```scala val tableName = TableName.valueOf("your_table_name") val table = connection.getTable(tableName) ``` 5. 插入数据: 创建一个`Put`对象来插入一行数据...

    spark-hbase:用于将 Spark 与 Apache HBase 数据结合使用的集成实用程序

    用于将 Spark 与 Apache HBase 数据结合使用的集成实用程序。 支持 基于 HBase 读取的扫描 基于 HBase 写入的 batchPut 基于 HBase 读取的分析 HFile 基于 HBase 写入的批量加载 要求 这个库需要 Spark 1.2+

    读写HBase数据.pdf

    3. 使用`DataFrameReader`的`format("org.apache.spark.sql.execution.datasources.hbase")`方法读取HBase数据。 4. 使用`DataFrameWriter`的`format("org.apache.spark.sql.execution.datasources.hbase")`方法写入...

    基于hadoop,spark,Hbase,Kafka新闻统计java大数据demo.zip

    【标题】中的“基于Hadoop,Spark,HBase,Kafka新闻统计java大数据demo”揭示了这个项目是关于使用Java编程语言实现的大数据处理示例,它整合了四个关键的大数据技术:Hadoop、Spark、HBase和Kafka。这些技术都是在大...

    Kafka集成Spark Streaming并写入数据到HBase

    Spark Streaming支持多种数据源,如Kafka、Flume、Twitter等,并且可以将处理结果写入HDFS、HBase等存储系统。 **HBase概述** HBase是构建在Hadoop文件系统(HDFS)之上,面向列的NoSQL数据库,适合处理大规模数据...

    藏经阁-Apache Spark -Apache HBase Con.pdf

    SHC 的主要目的是提供一种高效、可靠的方式来访问 HBase,实现数据的快速读取和写入。 Spark 和 HBase 的集成是为了解决大数据处理中的一些挑战,例如数据的处理速度、数据的存储和查询效率等。SHC 通过使用 Spark...

    spark_hbase:Scala中的示例通过Spark读取保存在hbase中的数据,以及python的转换器示例

    在这里,我们提供了Scala中的一个新示例,该示例涉及通过Spark将hbase中保存的数据传输到String ,以及python转换器的新示例。 scala 的示例将保存在hbase中的数据传输到RDD[String] ,该数据包含columnFamily,...

    spark streamming消费kafka数据存入hbase示例代码

    Spark Streaming 提供了 `KafkaUtils.createDirectStream` 函数,它能直接从 Kafka 的分区读取数据,无需额外的 Receiver 子进程。 然后,数据需要经过处理,例如解析、过滤或转换,以便于存储到 HBase。HBase 2.1 ...

    javaApi_sparkhiveAPI_hbaseAPI.zip

    在IT行业中,Java、Hive、HBase以及Spark是大数据处理和分析领域的重要工具。本压缩包"javaApi_sparkhiveAPI_hbaseAPI.zip"包含了2019年8月至10月期间针对这些技术的Java版API实现,以及与Spark相关的Hive和HBase ...

    大数据实习hdfs+flume+kafka+spark+hbase+hive项目.zip

    在大数据领域,HDFS、Flume、Kafka、Spark、HBase和Hive是关键的组件,它们共同构建了一个高效、可靠的数据处理和分析体系。下面将分别介绍这些技术及其在实际项目中的应用。 1. HDFS(Hadoop Distributed File ...

    spark-2.4.0-hive-hbase-Api.7z

    3. 从HBase中读取数据,转换成Spark DataFrame,方便进一步的数据处理和分析。 4. 利用Spark的MLlib库进行机器学习模型训练,预测和分类等任务。 在实际项目中,这些组件的集成可以帮助企业构建大规模数据处理平台...

    SparkHadoopHbase案例

    在大数据处理领域,Spark、Hadoop和HBase是三个至关重要的组件。Spark以其高效的数据处理速度和丰富的计算模型,Hadoop作为分布式存储和计算的基础框架,而HBase则是一个高可扩展的列式数据库,特别适合大规模数据的...

    java基于spark streaming和kafka,hbase的日志统计分析系统.rar

    生产者将应用程序生成的日志数据发布到Kafka的特定主题,而消费者(如Spark Streaming)则订阅这些主题,实时获取并处理这些日志。 Spark Streaming是Apache Spark的一部分,提供了对实时数据流处理的支持。它能够...

    Atlas Spark SQL血缘分析,HBASE

    Apache Atlas的HBase Hook允许我们捕获Spark SQL对HBase表的操作,如读取、写入或更新,生成相应的元数据和血缘信息。 在具体实施过程中,首先需要配置Apache Atlas以支持Spark SQL和HBase。这包括在Atlas中定义...

Global site tag (gtag.js) - Google Analytics