`

spark (4)spark-shell 读写hdfs 读写redis 读写hbase

阅读更多

 

(1)初学者对于spark的几个疑问 http://aperise.iteye.com/blog/2302481
(2)spark开发环境搭建 http://aperise.iteye.com/blog/2302535
(3)Spark Standalone集群安装介绍 http://aperise.iteye.com/blog/2305905
(4)spark-shell 读写hdfs 读写redis 读写hbase http://aperise.iteye.com/blog/2324253

spark-shell 读写hdfs 读写hbase 读写redis

1.进入spark-shell环境

spark使用的是standalone方式,spark通过zookeeper做了HA(Highe Available),spark master在机器hadoop31和hadoop33上面,登录时候指定每个worker在跑spark-shell任务时候使用内存为4GB
cd /home/hadoop/spark-1.6.0-bin-hadoop2.6/
bin/spark-shell --master spark://hadoop31:7077,hadoop33:7077 --executor-memory 4G

 

2.spark-shell读写hdfs

    2.1 读取HDFS上文件

    spark-shell读取位于hadoop ha集群下的目录/data/2*/2016*/*,目录采用模糊匹配方式

 

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")

   

 

    2.2 RDD处理结果写HDFS

    spark-shell存储数据到hadoop的HDFS上,下面这种方式在/hdfsfile下会存在多个结果文件,形如:part-00001.snappy、part-00002.snappy......

 

 

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>(line.split(",")(0),line.split(",")(1),line.split(",")(2),line.split(",")(3)))
.saveAsTextFile("hdfs://hadoop-ha-cluster/hdfsfile")

     

    2.3 RDD处理结果只写一个文件到HDFS

    有时为了汇聚结果到一个文件,可以在存储文件之前增加repartition操作,这样在/hdfsfile下面只会产生一个结果文件part-00000.snappy文件

 

 

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>(line.split(",")(0),line.split(",")(1),line.split(",")(2),line.split(",")(3)))
.repartition(1)
.saveAsTextFile("hdfs://hadoop-ha-cluster/hdfsfile")

 

 

3.spark-shell读写hbase

    3.1 spark加载hbase jar

    修改spark配置文件,使得spark知道哪里加载hbase相关jar包,修改配置文件spark-env.sh,添加如下内容:

#加载hbase相关jar
export SPARK_CLASSPATH=/home/hadoop/hbase-1.2.1/lib/*

#加载jedis相关jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/jedis-2.9.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/commons-pool2-2.4.2.jar

 

    3.2 hbase中新增数据表

    首先在hbase中创建一个数据表

cd /home/hadoop/hbase-1.2.1/bin
./hbase shell
disable 'hbase_test'
drop 'hbase_test'
create 'hbase_test', {NAME => 'comumnfamily', TTL=>'604800', COMPRESSION => 'SNAPPY'}, SPLITS => ['2','4','6','8']
quit

 

    3.3 spark-shell往hbase里面写入spark处理的结果

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>(line.split(",")(0),(line.split(",")(1),line.split(",")(2).toLong,line.split(",")(3).toLong)))
.reduceByKey((tuple1,tuple2)=>(tuple1._1+tuple2._1,tuple1._2+tuple2._2))
.foreachPartition{
    iterators=>{
    var tmpConf=HBaseConfiguration.create()
    tmpConf.set("hbase.zookeeper.quorum","hadoop31,hadoop32,hadoop33,hadoop34,hadoop35")
    tmpConf.set("hbase.zookeeper.property.clientPort","2181")
    var table=new HTable(tmpConf,"hbase_test")
    table.setWriteBufferSize(5*1024*1024)
    var putList=new java.util.ArrayList[Put]
    iterators.foreach{tupple=>{
        var fixednumber=("0"*(25-tupple._1.length)+tupple._1).reverse
        var rowkey=fixednumber.getBytes()
        var p=new Put(rowkey)
        p.add("comumnfamily".getBytes,"column1".getBytes,tupple._2._1.toString.getBytes)
        p.add("comumnfamily".getBytes,"column2".getBytes,tupple._2._2.toString.getBytes)
        
        putList.add(p)
        if(putList.size()>0&&putList.size()%1000==0){
            table.put(putList)
            putList.clear()
        }}}
    table.put(putList)
    }
}

     上面小的技巧首先是使用了foreachPartition,使用该操作后,对于每一个parttion,hbase的数据库链接只需建立一个,该parttion内无需频繁创建hbase链接,不用担心序列化相关问题

    第二是hbase使用批量提交,每次提交1000条记录,提高写入速度

 

    3.4 spark-shell读取hbase中的数据,写成hdfs文件

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.HConstants

val tmpConf = HBaseConfiguration.create()
tmpConf.set("hbase.zookeeper.quorum", "hadoop31,hadoop32,hadoop33,hadoop34,hadoop35")
tmpConf.set("hbase.zookeeper.property.clientPort", "2181")
tmpConf.set(TableInputFormat.INPUT_TABLE, "hbase_test")
tmpConf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "120000");   
val hBaseRDD = sc.newAPIHadoopRDD(tmpConf, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
val lineRdd=hBaseRDD.map(r=>
    (if(r._2.getFamilyMap("comumnfamily".getBytes).keySet.contains("column1".getBytes)){new String(r._2.getValue("data".getBytes,"log_date".getBytes))}else{"0"})+","+
    (if(r._2.getFamilyMap("comumnfamily".getBytes).keySet.contains("column2".getBytes)){new String(r._2.getValue("data".getBytes,"area_code".getBytes))}else{"0"})
)
lineRdd.repartition(1).saveAsTextFile("hdfs://hadoop-ha-cluster/hbase2hdfs")

 

4.spark-shell读写redis

    4.1 spark加载redis jar

    配置spark,使得spark知道哪里加载jedis的jar,修改配置文件spark-env.sh,添加如下内容

#加载hbase相关jar
export SPARK_CLASSPATH=/home/hadoop/hbase-1.2.1/lib/*

#加载jedis相关jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/jedis-2.9.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/commons-pool2-2.4.2.jar

 

    4.2 spark-shell通过jedis API写数据到redis-cluster

import redis.clients.jedis.HostAndPort
import redis.clients.jedis.JedisCluster

sc.textFile("hdfs://hadoop-ha-cluster/auto_data.txt")
      .repartition(10)
      .map(line => (line.split(",")(0), line.split(",")(1), line.split(",")(2)))
      .foreachPartition { iterators => {
        val jedisClusterNodes = new java.util.HashSet[HostAndPort]
        val serverList = new java.util.ArrayList[HostAndPort]
        serverList.add(new HostAndPort("192.168.173.21", 6379))
        serverList.add(new HostAndPort("192.168.173.22", 6380))
        serverList.add(new HostAndPort("192.168.173.23", 6381))
        serverList.add(new HostAndPort("192.168.173.24", 6379))
        serverList.add(new HostAndPort("192.168.173.25", 6380))
        serverList.add(new HostAndPort("192.168.173.26", 6381))
        jedisClusterNodes.addAll(serverList)
        val jc = new JedisCluster(jedisClusterNodes)
        iterators.foreach { t => {
          var key = "auto," + t._1 + "," + t._2
          var value = t._3
          jc.set(key, value)
        }
        }
      }
      }

     此种方式中JedisCluster不需要序列化,因为使用JedisCluster的地方不在RDD里面,RDD已经通过collect汇聚结果到了当前节点

 

    4.3 spark-shell在RDD操作过程中通过jedis API使用redis-cluster中数据

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import redis.clients.jedis.HostAndPort
import redis.clients.jedis.JedisCluster
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put

object JedisClusterObject extends Serializable {
    var jedisClusterNodes=new java.util.HashSet[HostAndPort]
    var serverList=new java.util.ArrayList[HostAndPort]
    serverList.add(new HostAndPort("192.168.173.21",6379))
    serverList.add(new HostAndPort("192.168.173.22",6380))
    serverList.add(new HostAndPort("192.168.173.23",6381))
    serverList.add(new HostAndPort("192.168.173.24",6379))
    serverList.add(new HostAndPort("192.168.173.25",6380))
    serverList.add(new HostAndPort("192.168.173.26",6381))
    jedisClusterNodes.addAll(serverList)
    val jc:JedisCluster = new JedisCluster(jedisClusterNodes)
}

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>line.split(",")(0))
.map(line=>(
  line,
  if(JedisClusterObject.jc.exists("auto,"+line)){JedisClusterObject.jc.get("auto,"+line)}else{line},
  if(JedisClusterObject.jc.exists("auto,"+line)){JedisClusterObject.jc.get("auto,"+line)}else{line}
))
.saveAsTextFile("hdfs://hadoop-ha-cluster/jedistest")

     此种方式JedisCluster必须通过单例并序列化,因为JedisCluster实在RDD中使用,会被序列化后在各个节点计算中使用,否则会提示Task not Serialized :JedisCluster

 

5 本地开发工具链接spark

    5.1 本地IDEA工具中如何链接HADOOP HA环境

      hadoop安装的是采用HA的方式,现在本地开发环境开发spark时候,无法解析hadoop-ha方式下的cluster名称,原因是本地程序不知道加载的cluster ha对应的namenode名称和IP,解决办法是通过sparkconf追加参数,让spark 本地local模式知道hadoop ha配置,如下:

  val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName("HtSecApp UserEvent Processor")
    .getOrCreate()

  val sc = spark.sparkContext
  val hadoopConf = sc.hadoopConfiguration

  hadoopConf.set("dfs.nameservices", "mycluster")
  hadoopConf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
  hadoopConf.set("dfs.ha.namenodes.mycluster", "nn1,nn2")
  hadoopConf.set("dfs.namenode.rpc-address.mycluster.nn1", "192.168.77.38:9000")
  hadoopConf.set("dfs.namenode.rpc-address.mycluster.nn2", "192.168.77.39:9000")

    能够避免以下错误:

 

    5.2 本地搭建spark开发环境

    参见博客spark (2)spark开发环境搭建

  • 大小: 18 KB
分享到:
评论

相关推荐

    spark-submit cluster模式时driver-class-path支持hdfs路径

    spark官方版本的driver-class-path不支持hdfs路径,只支持本地路径。本资源解决了这个问题,driver-class-path在cluster模式时可以支持hdfs路径,解决了cluster模式driver有大量jar依赖的问题。

    hbase-meta-repair-hbase-2.0.2.jar

    HBase 元数据修复工具包。 ①修改 jar 包中的application....②将core-site.xml、hdfs-site.xml添加到BOOT-INF/classes 下; ③开始修复 `java -jar -Drepair.tableName=表名 hbase-meta-repair-hbase-2.0.2.jar`

    spark--bin-hadoop3-without-hive.tgz

    安装完成后,你可以通过`./bin/spark-shell`启动Spark的交互式Shell,或者使用`./bin/pyspark`启动Python版本的Shell,开始进行数据处理。 总的来说,"spark--bin-hadoop3-without-hive.tgz"提供了一个在CentOS 8和...

    hadoop2.6.3-spark1.5.2-hbase-1.1.2-hive-1.2.1-zookeeper-3.4.6安装指南

    - 配置每个组件的配置文件,如Hadoop的`core-site.xml`, `hdfs-site.xml`, `yarn-site.xml`, `mapred-site.xml`,Spark的`spark-defaults.conf`,HBase的`hbase-site.xml`,Hive的`hive-site.xml`,以及Zookeeper的...

    spark2.1.0-bin-hadoop2.7

    2. HDFS兼容:Spark可以直接读写HDFS上的数据,无需将数据复制到其他存储系统。 三、Linux环境下的安装步骤 1. 下载:首先,你需要下载Spark 2.1.0与Hadoop 2.7兼容的二进制包,即`spark-2.1.0-bin-hadoop2.7.tgz`...

    spark-2.4.0-hive-hbase-Api.7z

    标题“spark-2.4.0-hive-hbase-Api.7z”表明这是一个与Apache Spark、Apache Hive和Apache HBase相关的压缩包文件,适用于版本2.4.0。这个压缩包很可能包含了这三个组件的API库,使得开发人员能够在集成环境中进行...

    Storm3--Hbase-HDFS-Hive-from-HortonWorks:Storm3-来自 HortonWorks 的 Hbase HDFS Hive

    Storm3--Hbase-HDFS-Hive-from-HortonWorks Storm3-来自 HortonWorks 的 Hbase HDFS Hive ====================== 参考来自 Horton Works 教程 ==================== Storm 和 Hadoop 生态系统版本 =============...

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    - 运行应用:使用Spark Shell或提交Spark应用程序到集群执行。 6. 开发与交互: - 使用Scala、Java、Python或R语言编写Spark应用。 - 使用SparkSubmit工具提交应用程序到集群。 - Spark Shell提供了一个交互式...

    hbase的hbase-1.2.0-cdh5.14.2.tar.gz资源包

    5. **客户端连接**:HBase提供了命令行接口(HBase Shell)和Java API,可以用来交互式操作HBase或在应用程序中集成。 ### HBase的数据模型和操作 1. **创建表**:使用`create`命令创建表,指定列族。 2. **插入...

    spark-3.1.3-bin-without-hadoop.tgz

    在没有包含Hadoop的版本中,Spark需要用户自行配置HDFS客户端或者其他分布式文件系统以进行数据读写。 安装Spark-3.1.3的过程主要包括以下几个步骤: 1. 解压压缩包:使用tar命令解压文件,例如`tar -xvf spark-...

    hbase-1.2.1-bin.tar.gz.zip

    HBase,全称为Hadoop Distributed File System上的基础结构(HBase on Hadoop Distributed File System),是一种分布式的、面向列的开源数据库,它构建在Apache Hadoop文件系统(HDFS)之上,提供高可靠性、高性能...

    spark-2.2.2-bin-hadoop2.7.tgz

    Spark 2.2.2支持多种数据源,包括HDFS(Hadoop分布式文件系统)、Cassandra、HBase等,这使得它能无缝集成到Hadoop生态中。此外,它内置了Spark SQL模块,用于执行SQL查询,同时支持DataFrame和Dataset操作,以及...

    hbase-2.4.17-bin 安装包

    1. 创建表:使用`hbase shell`进入命令行工具,执行`create '表名', '列族名'`创建表。 2. 插入数据:通过`put '表名', '行键', '列族:列限定符', '值'`命令插入数据。 3. 查询数据:`get '表名', '行键'`获取整行...

    13.hbase的工作机制补充--regionserver数据管理--内存缓存热数据--持久化到hdfs的观察.mp4

    13.hbase的工作机制补充--regionserver数据管理--内存缓存热数据--持久化到hdfs的观

    hbase-1.4.10-bin.tar.gz

    Spark提供了HBase connector,允许Spark作业直接读写HBase。在Spark应用中,你需要添加HBase和HBase-connector相关的依赖。在Spark的`spark-defaults.conf`文件中指定HBase的相关配置,如`spark.hadoop.hbase....

    spark-3.1.2-bin-hadoop3.2.tgz

    5. **交互式Shell**:Spark提供了一个名为`spark-shell`的交互式环境,方便开发人员测试和调试代码。 **Spark与Hadoop 3.2的兼容性** Hadoop 3.2引入了许多新特性,如: 1. **多命名空间**:支持多个HDFS命名空间...

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    5. **运行Spark**: Spark可以通过命令行工具启动,例如`spark-shell`(交互式Spark会话)或`pyspark`(Python版本的交互式会话)。对于应用程序开发,可以使用Scala、Java、Python或R编写代码,然后通过`spark-...

    hbase-2.4.11-bin.tar.gz

    4. **启动服务**:通过bin目录下的start-hbase.sh脚本启动HBase集群,包括Master节点和RegionServer节点。 5. **监控与管理**:HBase提供了Web UI,用户可以访问http://localhost:16010查看集群状态。此外,命令行...

    spark-2.1.1-bin-hadoop2.7.tgz.7z

    在实际应用中,Spark可以通过编程接口(API)与多种数据源交互,如HDFS、Cassandra、HBase、Amazon S3等。它的RDD(弹性分布式数据集)模型允许用户在内存中高效地处理数据,大大提高了数据处理速度。Spark还支持...

    hbase-1.3.1-bin.zip

    8. **HBase shell** - 提供命令行接口(CLI)工具,方便用户执行CRUD(创建、读取、更新、删除)操作和管理表。 9. **表设计最佳实践** - 表设计应考虑数据访问模式,如热点问题、范围扫描等。 - 合理设计行键,...

Global site tag (gtag.js) - Google Analytics