`
superlxw1234
  • 浏览: 551269 次
  • 性别: Icon_minigender_1
  • 来自: 西安
博客专栏
Bd1c0a0c-379a-31a8-a3b1-e6401e2f1523
Hive入门
浏览量:44430
社区版块
存档分类
最新评论

SparkSQL读取HBase数据

阅读更多

关键字: Spark读取HBase、SparkSQL读取HBase、SparkSQL整合Hive读取HBase表、Spark任务本地化调度机制

这里的SparkSQL是指整合了Hive的spark-sql cli(关于SparkSQL和Hive的整合,见文章后面的参考阅读).
本质上就是通过Hive访问HBase表,具体就是通过hive-hbase-handler(关于Hive和HBase的整合,见文章后面的参考阅读).

 

环境篇

hadoop-2.3.0-cdh5.0.0
apache-hive-0.13.1-bin
spark-1.4.0-bin-hadoop2.3
hbase-0.96.1.1-cdh5.0.0

部署情况如下图:

Spark部署

测试集群,将Spark Worker部署在每台DataNode上,是为了最大程度的任务本地化,Spark集群为Standalone模式部署。
其中有三台机器上也部署了RegionServer。
这个部署情况对理解后面提到的任务本地化调度有帮助。

 

配置篇

 

1. 拷贝以下HBase的相关jar包到Spark Master和每个Spark Worker节点上的$SPARK_HOME/lib目录下.
(我尝试用–jars的方式添加之后,不work,所以采用这种土办法)

    $HBASE_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar
    $HBASE_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar
    $HBASE_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar
    $HBASE_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar
    $HBASE_HOME/lib/htrace-core-2.01.jar
    $HBASE_HOME/lib/protobuf-java-2.5.0.jar
    $HBASE_HOME/lib/guava-12.0.1.jar
     
    $HIVE_HOME/lib/hive-hbase-handler-0.13.1.jar

 2.配置每个节点上的$SPARK_HOME/conf/spark-env.sh,将上面的jar包添加到SPARK_CLASSPATH

    export SPARK_CLASSPATH=$SPARK_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar:
    $SPARK_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar:
    $SPARK_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar:
    $SPARK_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar:
    $SPARK_HOME/lib/htrace-core-2.01.jar:
    $SPARK_HOME/lib/protobuf-java-2.5.0.jar:
    $SPARK_HOME/lib/guava-12.0.1.jar:
    $SPARK_HOME/lib/hive-hbase-handler-0.13.1.jar:
    ${SPARK_CLASSPATH}

 

3.将hbase-site.xml拷贝至${HADOOP_CONF_DIR},由于spark-env.sh中配置了Hadoop配置文件目录${HADOOP_CONF_DIR},因此会将hbase-site.xml加载。
hbase-site.xml中主要是以下几个参数的配置:

<property>
<name>hbase.zookeeper.quorum</name>
<value>zkNode1:2181,zkNode2:2181,zkNode3:2181</value>
<description>HBase使用的zookeeper节点</description>
</property>
<property>
<name>hbase.client.scanner.caching</name>
<value>5000</value>
<description>HBase客户端扫描缓存,对查询性能有很大帮助</description>
</property>

 

另外还有一个参数:zookeeper.znode.parent=/hbase
是HBase在zk中的根目录,默认为/hbase,视实际情况进行配置。

4.重启Spark集群。

 

使用篇

hbase中有表lxw1234,数据如下:

    hbase(main):025:0* scan 'lxw1234'
    ROW COLUMN+CELL
    lxw1234.com column=f1:c1, timestamp=1435624625198, value=name1
    lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2
    lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1
    lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2
    lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1
    lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2
    lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3
    1 row(s) in 0.0350 seconds

 

进入spark-sql,使用如下语句建表:

    CREATE EXTERNAL TABLE lxw1234 (
    rowkey string,
    f1 map<STRING,STRING>,
    f2 map<STRING,STRING>,
    f3 map<STRING,STRING>
    ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,f1:,f2:,f3:")
    TBLPROPERTIES ("hbase.table.name" = "lxw1234");

 

建好之后,就可以查询了:

    spark-sql> select * from lxw1234;
    lxw1234.com {"c1":"name1","c2":"name2"} {"c1":"age1","c2":"age2"} {"c1":"job1","c2":"job2","c3":"job3"}
    Time taken: 4.726 seconds, Fetched 1 row(s)
    spark-sql> select count(1) from lxw1234;
    1
    Time taken: 2.46 seconds, Fetched 1 row(s)
    spark-sql> 

 

大表查询,消耗的时间和通过Hive用MapReduce查询差不多。

 

    spark-sql> select count(1) from lxw1234_hbase;
    53609638
    Time taken: 335.474 seconds, Fetched 1 row(s)

 

在spark-sql中通过insert插入数据到HBase表时候报错:

 

    INSERT INTO TABLE lxw1234
    SELECT 'row1' AS rowkey,
    map('c3','name3') AS f1,
    map('c3','age3') AS f2,
    map('c4','job3') AS f3
    FROM lxw1234_a
    limit 1;
     
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times,
    most recent failure: Lost task 0.3 in stage 10.0 (TID 23, slave013.uniclick.cloud):
    java.lang.ClassCastException: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.HiveOutputFormat
    at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat$lzycompute(hiveWriterContainers.scala:74)
    at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat(hiveWriterContainers.scala:73)
    at org.apache.spark.sql.hive.SparkHiveWriterContainer.getOutputName(hiveWriterContainers.scala:93)
    at org.apache.spark.sql.hive.SparkHiveWriterContainer.initWriters(hiveWriterContainers.scala:117)
    at org.apache.spark.sql.hive.SparkHiveWriterContainer.executorSideSetup(hiveWriterContainers.scala:86)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:99)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
     
    Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

 

这个还有待分析。

 

关于Spark任务本地化运行

先看这张图,该图为运行select * from lxw1234_hbase;这张大表查询时候的任务运行图。

Spark任务运行

Spark和Hadoop MapReduce一样,在任务调度时候都会考虑数据本地化,即”任务向数据靠拢”,尽量将任务分配到数据所在的节点上运行。
基于这点,lxw1234_hbase为HBase中的外部表,Spark在解析时候,通过 org.apache.hadoop.hive.hbase.HBaseStorageHandler获取到表lxw1234_hbase在HBase中 的region所在的RegionServer,即:slave004、slave005、slave006 (上面的部署图中提到了,总共只有三台RegionServer,就是这三台),所以,在调度任务时候,首先考虑要往这三台节点上分配任务。
表lxw1234_hbase共有10个region,因此需要10个map task来运行。

再看一张图,这是spark-sql cli指定的Executor配置:

Spark WebUI

 

每台机器上Worker的实例为2个,每个Worker实例中运行的Executor为1个,因此,每台机器上运行两个Executor.
那么salve004、slave005、slave006上各运行2个Executor,总共6个,很好,Spark已经第一时间将这6个Task交给这6个Executor去执行了(NODE_LOCAL Tasks)。
剩下4个Task,没办法,想NODE_LOCAL运行,但那三台机器上没有剩余的Executor了,只能分配给其他Worker上的Executor,这4个Task为ANY Tasks。
正如那张任务运行图中所示。

 

写在后面

通过Hive和spark-sql去访问HBase表,只是为统计分析提供了一定的便捷性,个人觉得性能上的优势并不明显。
可能Spark通过API去读取HBase数据,性能更好些吧,以后再试。
另外,spark-sql有一点好处,就是可以先把HBase中的数据cache到一张内存表中,然后在这张内存表中,
通过SQL去统计分析,那就爽多了。

 

相关阅读

SparkSQL与Hive的整合:
http://lxw1234.com/archives/2015/06/294.htm
Hive与HBase的整合:
http://lxw1234.com/archives/2015/06/319.htm

 

3
2
分享到:
评论
1 楼 passionke 2016-03-16  
在spark-sql中通过insert插入数据到HBase表时候报错:


请问这个问题解决了吗? 是怎么解决的?

相关推荐

    spark读取hbase数据,并使用spark sql保存到mysql

    使用spark读取hbase中的数据,并插入到mysql中

    SparkSQL编程指南中文版

    SparkSQL支持Hadoop的多种数据源,包括HDFS、Cassandra、HBase等,能高效处理PB级别的数据。其并行计算能力使得处理大数据变得快速而有效。此外,SparkSQL还与Spark Streaming、MLlib等组件紧密集成,实现流处理和...

    sparksql-for-hbase:了解如何使用Spark SQL和HSpark连接器软件包创建驻留在HBase区域服务器中的查询数据表

    这通常涉及创建一个`HBaseTableCatalog`实例,定义表名、列族和列限定符,然后使用`SparkSession`的`read.format()`方法读取HBase表。 3. **执行SQL查询**:一旦DataFrame被创建,就可以使用Spark SQL的API或直接...

    SparkSQL开发与优化实践.zip

    SparkSQL支持多种数据源,包括Parquet、JSON、CSV、Avro等,可以灵活地读取和写入各种格式的数据。理解这些数据源的特点和使用场景是优化数据处理流程的关键。 6. **性能优化** 在实际应用中,性能优化是必不可少...

    基于spark streaming和kafka,hbase的日志统计分析系统.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    基于spark+flume+kafka+hbase的实时日志处理分析系统.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    基于hbase + spark 实现常用推荐算法(主要用于精准广告投放和推荐系统).zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    【SparkCore篇07】RDD数据读取和保存1

    本文将详细讨论RDD数据的读取和保存,特别是涉及JSON和SequenceFile这两种文件格式。 首先,我们来看数据的读取。Spark支持多种文件格式,包括Text文件、Json文件、Csv文件、Sequence文件以及Object文件。对于文件...

    SparkSQl实践与优化

    例如,可以使用SQL语句对Hive中的数据进行查询,也可以通过DataFrame API读取存储在HBase中的数据。 对于多数据类型的处理,Spark SQL 提供了丰富的数据类型支持,例如数值类型、字符串类型、二进制类型、布尔类型...

    python大数据处理与分析数据集与源代码.zip

    8. **数据导入与导出**: 数据通常存储在各种格式如CSV、JSON、数据库等,Python的csv、json、pandas等库可以帮助我们方便地读取和写入数据。 9. **大数据存储**: 如MongoDB、Cassandra等NoSQL数据库,以及HBase这样...

    基于spark及用户行为标签的日志大数据分析系统.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    基于Spark的实时推荐系统,使用MovieLens作为测试数据集.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    基于Spark MLlib 的 ALS 算法实现的电影推荐系统,采用MovieLens数据集进行分析建模.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    sparksqlCmd_Spark!_spark_

    SparkSQL集成了Hive,因此它可以读取和写入Hive表,同时支持多种数据源,如HDFS、Cassandra、HBase等。在SparkSQL中,SQL查询被转换为Spark的DAG(有向无环图)任务,然后由Spark执行引擎高效地并行执行。 标题中的...

    离线数据处理练习表数据

    DataFrame是一种分布式的、可以进行优化的表格数据结构,支持多种数据源,包括HDFS、Cassandra、HBase等。 gy_pub.sql和ds_pub.sql可能是两个SQL脚本文件,分别包含了对特定数据集的查询或操作。这些文件可能包含...

    sparkSQL基本操作.zip

    4. **数据源集成**:Spark SQL支持多种数据源,如HDFS、Cassandra、HBase、Hive等。你可以使用`DataFrameReader`和`DataFrameWriter`接口来读取和写入这些数据源。例如,`spark.read.format("csv").option("header",...

    LearningSparkSQL_Code.zip

    1. CSV数据读取与写入:演示如何从CSV文件读取数据,并将结果写回到新的CSV文件。 2. SQL查询:展示如何使用SQL查询DataFrame,并返回结果。 3. 转换操作:包括选择特定列、过滤数据、分组聚合等操作。 4. Join操作...

    基于Spark+PageRank算法构建仿微博用户好友的分布式推荐系统.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    基于spark+drools+kafka+redis的大数据实时风控系统.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    毕业设计--基于Django的电影推荐系统和论坛,采用协同过滤及als算法.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

Global site tag (gtag.js) - Google Analytics