package ceshi import java.io.IOException import java.util import java.util.{Random, UUID} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.spark.{SparkConf, SparkContext} /** * Created by hyt on 5/24/17. */ object HbaseTestInsert { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() sparkConf.setMaster("local[1]").setAppName("HbaseTestInsert") val sc = new SparkContext(sparkConf) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "spark") conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181") conf.set("hbase.zookeeper.property.clientPort","2181") val scan:Scan = new Scan() scan.addColumn(Bytes.toBytes("s"),Bytes.toBytes("v_ip")) val startRow = "1495607836079" val endRow = "1495607836920" scan.setStartRow(Bytes.toBytes(startRow)) scan.setStopRow(Bytes.toBytes(endRow)) conf.set(TableInputFormat.SCAN,convertScanToString(scan)) //读取数据并转化成rdd val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = hBaseRDD.count() println(count) // hBaseRDD.foreach{case (_,result) =>{ // //获取行键 // val key = Bytes.toString(result.getRow) // //通过列族和列名获取列 // val v_ip = Bytes.toString(result.getValue("s".getBytes,"v_ip".getBytes)) // println("Row key:"+key+" ip:"+v_ip) // }} hBaseRDD.map(x=>x._2).map{ x=> (Bytes.toString(x.getValue("s".getBytes,"v_ip".getBytes)),1) }.reduceByKey(_+_).sortBy(x=>x._2,false).foreach(println) } def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } def readDataFromHbase(): Unit ={ val scanner: ResultScanner = scanDataByStartAndStopRow("spark", "1495607836079", "1495607836920") val iterator = scanner.iterator() while(iterator.hasNext()) { val result: Result = iterator.next() println(new String(result.getValue(Bytes.toBytes("s"), Bytes.toBytes("v_ip")))) println(new String(result.getRow)) } } def scanDataByStartAndStopRow(tableName:String,startRow:String,endRow:String):ResultScanner = { val conf:Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181") conf.set("hbase.zookeeper.property.clientPort","2181") var rs : ResultScanner = null try { val table : HTable = new HTable(conf,tableName) val scan:Scan = new Scan() scan.addColumn(Bytes.toBytes("s"),Bytes.toBytes("v_ip")) scan.setStartRow(Bytes.toBytes(startRow)) scan.setStopRow(Bytes.toBytes(endRow)) rs = table.getScanner(scan) } catch { case e: IOException => e.printStackTrace() } rs } def putHbaseData(): Unit ={ val conf:Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "172.16.0.226:2181,172.16.0.223:2181;172.16.0.224:2181") conf.set("hbase.zookeeper.property.clientPort","2181") val table:HTable = new HTable(conf,"spark") table.setAutoFlush(false) val list = new util.ArrayList[Put]() for(i <- 0 to 100000){ val v_ip = getRandomIp() val v_time = System.currentTimeMillis().toString val rowKey = System.currentTimeMillis() + UUID.randomUUID().toString val put:Put = new Put(Bytes.toBytes(rowKey)) put.add(Bytes.toBytes("s"), Bytes.toBytes("v_time"), Bytes.toBytes(v_time)) put.add(Bytes.toBytes("s"), Bytes.toBytes("v_ip"), Bytes.toBytes(v_ip)) list.add(put) println(i) if(i%100 == 0){ table.put(list) table.flushCommits() list.clear() } } def getRandomIp():String = { val random_int = new Random() "192"+"."+"168"+"."+random_int.nextInt(255)+"."+random_int.nextInt(255) } } }
相关推荐
本示例将详细介绍如何使用 Spark 从 HBase 中读取数据,并通过 Spark SQL 将其存储到 MySQL 数据库中。 首先,让我们了解 Spark 与 HBase 的交互。Spark 提供了 `spark-hbase-connector` 库,允许我们方便地连接到 ...
总的来说,这个Java程序展示了如何使用Spark读取HBase数据并进行分布式计算。通过Spark的并行处理能力,可以高效地处理大规模的HBase数据,进行复杂的分析任务。要注意的是,实际应用中还需要考虑错误处理、资源管理...
本篇文章将详细探讨如何使用 Scala 和 Spark 的 Resilient Distributed Datasets (RDDs) 与 HBase 进行交互,包括读取、写入以及删除数据。 首先,我们需要理解 Spark RDD。RDD 是 Spark 的基本数据抽象,它是不可...
- 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! <项目介绍> 1、该资源内项目代码都经过测试运行成功,...
此外,Spark Streaming可以从HBase中读取数据,或者将处理后的结果写入HBase,构建实时数据处理管道。 在实际项目中,Spark可以用来做数据预处理、特征工程和模型训练,而HBase则作为数据仓库,存储大量的历史数据...
4. 获取HBase表: 获取要操作的HBase表: ```scala val tableName = TableName.valueOf("your_table_name") val table = connection.getTable(tableName) ``` 5. 插入数据: 创建一个`Put`对象来插入一行数据...
用于将 Spark 与 Apache HBase 数据结合使用的集成实用程序。 支持 基于 HBase 读取的扫描 基于 HBase 写入的 batchPut 基于 HBase 读取的分析 HFile 基于 HBase 写入的批量加载 要求 这个库需要 Spark 1.2+
3. 使用`DataFrameReader`的`format("org.apache.spark.sql.execution.datasources.hbase")`方法读取HBase数据。 4. 使用`DataFrameWriter`的`format("org.apache.spark.sql.execution.datasources.hbase")`方法写入...
【标题】中的“基于Hadoop,Spark,HBase,Kafka新闻统计java大数据demo”揭示了这个项目是关于使用Java编程语言实现的大数据处理示例,它整合了四个关键的大数据技术:Hadoop、Spark、HBase和Kafka。这些技术都是在大...
Spark Streaming支持多种数据源,如Kafka、Flume、Twitter等,并且可以将处理结果写入HDFS、HBase等存储系统。 **HBase概述** HBase是构建在Hadoop文件系统(HDFS)之上,面向列的NoSQL数据库,适合处理大规模数据...
SHC 的主要目的是提供一种高效、可靠的方式来访问 HBase,实现数据的快速读取和写入。 Spark 和 HBase 的集成是为了解决大数据处理中的一些挑战,例如数据的处理速度、数据的存储和查询效率等。SHC 通过使用 Spark...
在这里,我们提供了Scala中的一个新示例,该示例涉及通过Spark将hbase中保存的数据传输到String ,以及python转换器的新示例。 scala 的示例将保存在hbase中的数据传输到RDD[String] ,该数据包含columnFamily,...
Spark Streaming 提供了 `KafkaUtils.createDirectStream` 函数,它能直接从 Kafka 的分区读取数据,无需额外的 Receiver 子进程。 然后,数据需要经过处理,例如解析、过滤或转换,以便于存储到 HBase。HBase 2.1 ...
在IT行业中,Java、Hive、HBase以及Spark是大数据处理和分析领域的重要工具。本压缩包"javaApi_sparkhiveAPI_hbaseAPI.zip"包含了2019年8月至10月期间针对这些技术的Java版API实现,以及与Spark相关的Hive和HBase ...
在大数据领域,HDFS、Flume、Kafka、Spark、HBase和Hive是关键的组件,它们共同构建了一个高效、可靠的数据处理和分析体系。下面将分别介绍这些技术及其在实际项目中的应用。 1. HDFS(Hadoop Distributed File ...
3. 从HBase中读取数据,转换成Spark DataFrame,方便进一步的数据处理和分析。 4. 利用Spark的MLlib库进行机器学习模型训练,预测和分类等任务。 在实际项目中,这些组件的集成可以帮助企业构建大规模数据处理平台...
在大数据处理领域,Spark、Hadoop和HBase是三个至关重要的组件。Spark以其高效的数据处理速度和丰富的计算模型,Hadoop作为分布式存储和计算的基础框架,而HBase则是一个高可扩展的列式数据库,特别适合大规模数据的...
生产者将应用程序生成的日志数据发布到Kafka的特定主题,而消费者(如Spark Streaming)则订阅这些主题,实时获取并处理这些日志。 Spark Streaming是Apache Spark的一部分,提供了对实时数据流处理的支持。它能够...
Apache Atlas的HBase Hook允许我们捕获Spark SQL对HBase表的操作,如读取、写入或更新,生成相应的元数据和血缘信息。 在具体实施过程中,首先需要配置Apache Atlas以支持Spark SQL和HBase。这包括在Atlas中定义...