最近工作需要使用到Spark操作Hbase,上篇文章已经写了如何使用Spark读写Hbase全量表的数据做处理,但这次有所不同,这次的需求是Scan特定的Hbase的数据然后转换成RDD做后续处理,简单的使用Google查询了一下,发现实现方式还是比较简单的,用的还是Hbase的TableInputFormat相关的API。
基础软件版本如下:
Hadoop2.7.2
Hbase1.2.0
Spark2.1.0
Scala2.11.8
直接上代码如下:
` val startRowkey="row1"
val endRowkey="row1"
//开始rowkey和结束一样代表精确查询某条数据
//组装scan语句
val scan=new Scan(Bytes.toBytes(startRowkey),Bytes.toBytes(endRowkey))
scan.setCacheBlocks(false)
scan.addFamily(Bytes.toBytes("ks"));
scan.addColumn(Bytes.toBytes("ks"), Bytes.toBytes("data"))
//将scan类转化成string类型
val scan_str= TableMapReduceUtil.convertScanToString(scan)
conf.set(TableInputFormat.SCAN,scan_str)
//使用new hadoop api,读取数据,并转成rdd
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
//打印扫描的数据总量
println("count:"+rdd.count)
上面的少量代码,已经完整实现了使用spark查询hbase特定的数据,然后统计出数量最后输出,当然上面只是一个简单的例子,重要的是能把hbase数据转换成RDD,只要转成RDD我们后面就能进行非常多的过滤操作。
注意上面的hbase版本比较新,如果是比较旧的hbase,如果自定义下面的方法将scan对象给转成字符串,代码如下:
import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64
def convertScanToString(scan: Scan): String = {
val out: ByteArrayOutputStream = new ByteArrayOutputStream
val dos: DataOutputStream = new DataOutputStream(out)
scan.write(dos)
Base64.encodeBytes(out.toByteArray)
}
最后,还有一点,上面的代码是直接自己new了一个scan对象进行组装,当然我们还可以不自己new对象,全部使用TableInputFormat下面的相关的常量,并赋值,最后执行的时候TableInputFormat会自动帮我们组装scan对象这一点通过看TableInputFormat的源码就能明白:
private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
public static final String SCAN = "hbase.mapreduce.scan";
public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
上面代码中的常量,都可以conf.set的时候进行赋值,最后任务运行的时候会自动转换成scan,有兴趣的朋友可以自己尝试。
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
值得注意的是,Spark Streaming与Kafka的集成非常紧密,可以使用Direct Stream模式直接从Kafka主题读取数据,避免了额外的消息队列。此外,Spark与HBase的交互也十分便捷,通过HBase connector可以直接将数据写入或...
通过以上步骤,Spark 能够根据用户配置高效地读取 HBase 数据并将其转换为 RDD,从而支持进一步的分布式计算。这种读取方式结合了 Spark 的并行处理能力和 HBase 的分布式存储特性,为大数据分析提供了强大支持。在...
- **第4章:HBase表设计**:讲解如何有效地设计HBase表结构以满足特定的应用需求,包括如何选择合适的列族、如何优化数据模型以提高查询性能等。 - **第5章:通过Coprocessors扩展HBase**:Coprocessors是HBase中...
3. 数据分析:利用Spark SQL进行结构化数据查询,或使用MLlib进行机器学习,以及GraphX处理图相关问题。 4. 数据结果的输出:处理分析后的结果可以导出到各种存储系统或直接提供服务。 实际应用中,Spark可应用于多...
在这个项目中,HBase 被用来存储用户轨迹数据,提供快速的随机读取和写入能力。由于 HBase 支持大规模数据存储,因此它是处理海量用户轨迹数据的理想选择。 4. **Spark**: Apache Spark 是一个用于大规模数据处理的...
此外,HBase也支持与其他Hadoop组件(如Hive、Pig、Spark)的集成,实现数据分析和处理。 8. **HBase的多版本特性**:每个HBase的Cell都保存了多个版本的数据,通过时间戳来区分。这使得数据的历史状态可以被追溯,...
Spark可以读取各种数据源,如HDFS、Cassandra、HBase等。`SparkSession.read`接口用于加载数据,支持多种格式如CSV、JSON、Parquet、ORC等。数据加载后,可以使用`cache`或`persist`进行缓存,提高重用效率。 5. *...
行键的设计对查询性能至关重要,因为HBase是通过行键进行数据定位的。 8. **Cell**:每个Cell包含一个特定版本的数据,由行键、列族、列和时间戳唯一标识。 深入学习HBase原理的资料整理通常会涵盖以下主题: - ...
- **Phoenix**:在HBase之上构建的关系型SQL查询引擎,简化SQL查询操作。 在使用HBase-2.2.5时,用户可以根据自己的需求进行配置,如调整Region大小、设置缓存策略等,以达到最佳性能。同时,通过与其他大数据组件...
Apache Spark通过Catalyst,一套函数式关系查询优化框架,定义了各种数据类型的转换,让各个数据源能够与Catalyst框架实现数据类型的转换。 在数据处理效率方面,Apache Spark提供了一个灵活的API来兼容不同类型的...
在实时日志分析中,HBase通常用于存储经过Spark Streaming处理后的结果数据,以便后续查询和分析。它的列族模型和时间戳机制使得高效地存储和检索海量实时数据变得可能。 系统的实现流程大致如下: 1. **数据采集*...
- **Get操作**: 读取特定行、列族、列及时间戳的数据。 - **Scan操作**: 扫描表中的一系列行,返回匹配条件的数据。 - **Delete操作**: 删除表中的数据,可以按行键、列族、列和时间戳删除。 ### 5. HBase的应用...
读取数据时,HBase首先在内存中查找数据,如果没有找到,再从磁盘读取。 5. 高级特性:HBase提供一些高级特性,比如数据压缩、过滤器、快照、数据分区和负载均衡等。数据压缩可以提高存储效率;过滤器可以减少数据...
### GeoMesa与Apache Spark集成 #### GeoMesa Spark...无论是数据读取、写入还是复杂的空间查询和计算,GeoMesa Spark都提供了全面的支持。这对于地理信息系统(GIS)、遥感分析等领域来说是一个非常有价值的工具。
3. **强一致性**:HBase提供了严格的读一致性保证,确保在写入数据后,立即读取到的总是最新值。这对于那些需要保持数据同步性的应用来说至关重要。 4. **稀疏性**:HBase允许存储稀疏数据,即只有非空值的位置才...
Spark 2.6.3是Apache Spark的一个特定版本,它是一个强大的开源大数据处理框架,专为大规模数据处理而设计。在Linux环境下,Spark能够高效地运行,并且常常与其他开源组件如Openfire集成,以实现更丰富的功能。...