(1)初学者对于spark的几个疑问 | http://aperise.iteye.com/blog/2302481 |
(2)spark开发环境搭建 | http://aperise.iteye.com/blog/2302535 |
(3)Spark Standalone集群安装介绍 | http://aperise.iteye.com/blog/2305905 |
(4)spark-shell 读写hdfs 读写redis 读写hbase | http://aperise.iteye.com/blog/2324253 |
spark-shell 读写hdfs 读写hbase 读写redis
1.进入spark-shell环境
bin/spark-shell --master spark://hadoop31:7077,hadoop33:7077 --executor-memory 4G
2.spark-shell读写hdfs
2.1 读取HDFS上文件
spark-shell读取位于hadoop ha集群下的目录/data/2*/2016*/*,目录采用模糊匹配方式
sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
2.2 RDD处理结果写HDFS
spark-shell存储数据到hadoop的HDFS上,下面这种方式在/hdfsfile下会存在多个结果文件,形如:part-00001.snappy、part-00002.snappy......
sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*") .map(line=>(line.split(",")(0),line.split(",")(1),line.split(",")(2),line.split(",")(3))) .saveAsTextFile("hdfs://hadoop-ha-cluster/hdfsfile")
2.3 RDD处理结果只写一个文件到HDFS
有时为了汇聚结果到一个文件,可以在存储文件之前增加repartition操作,这样在/hdfsfile下面只会产生一个结果文件part-00000.snappy文件
sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*") .map(line=>(line.split(",")(0),line.split(",")(1),line.split(",")(2),line.split(",")(3))) .repartition(1) .saveAsTextFile("hdfs://hadoop-ha-cluster/hdfsfile")
3.spark-shell读写hbase
3.1 spark加载hbase jar
修改spark配置文件,使得spark知道哪里加载hbase相关jar包,修改配置文件spark-env.sh,添加如下内容:
export SPARK_CLASSPATH=/home/hadoop/hbase-1.2.1/lib/*
#加载jedis相关jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/jedis-2.9.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/commons-pool2-2.4.2.jar
3.2 hbase中新增数据表
首先在hbase中创建一个数据表
./hbase shell
disable 'hbase_test'
drop 'hbase_test'
create 'hbase_test', {NAME => 'comumnfamily', TTL=>'604800', COMPRESSION => 'SNAPPY'}, SPLITS => ['2','4','6','8']
quit
3.3 spark-shell往hbase里面写入spark处理的结果
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Put sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*") .map(line=>(line.split(",")(0),(line.split(",")(1),line.split(",")(2).toLong,line.split(",")(3).toLong))) .reduceByKey((tuple1,tuple2)=>(tuple1._1+tuple2._1,tuple1._2+tuple2._2)) .foreachPartition{ iterators=>{ var tmpConf=HBaseConfiguration.create() tmpConf.set("hbase.zookeeper.quorum","hadoop31,hadoop32,hadoop33,hadoop34,hadoop35") tmpConf.set("hbase.zookeeper.property.clientPort","2181") var table=new HTable(tmpConf,"hbase_test") table.setWriteBufferSize(5*1024*1024) var putList=new java.util.ArrayList[Put] iterators.foreach{tupple=>{ var fixednumber=("0"*(25-tupple._1.length)+tupple._1).reverse var rowkey=fixednumber.getBytes() var p=new Put(rowkey) p.add("comumnfamily".getBytes,"column1".getBytes,tupple._2._1.toString.getBytes) p.add("comumnfamily".getBytes,"column2".getBytes,tupple._2._2.toString.getBytes) putList.add(p) if(putList.size()>0&&putList.size()%1000==0){ table.put(putList) putList.clear() }}} table.put(putList) } }
上面小的技巧首先是使用了foreachPartition,使用该操作后,对于每一个parttion,hbase的数据库链接只需建立一个,该parttion内无需频繁创建hbase链接,不用担心序列化相关问题;
第二是hbase使用批量提交,每次提交1000条记录,提高写入速度。
3.4 spark-shell读取hbase中的数据,写成hdfs文件
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.HConstants val tmpConf = HBaseConfiguration.create() tmpConf.set("hbase.zookeeper.quorum", "hadoop31,hadoop32,hadoop33,hadoop34,hadoop35") tmpConf.set("hbase.zookeeper.property.clientPort", "2181") tmpConf.set(TableInputFormat.INPUT_TABLE, "hbase_test") tmpConf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "120000"); val hBaseRDD = sc.newAPIHadoopRDD(tmpConf, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result]) val lineRdd=hBaseRDD.map(r=> (if(r._2.getFamilyMap("comumnfamily".getBytes).keySet.contains("column1".getBytes)){new String(r._2.getValue("data".getBytes,"log_date".getBytes))}else{"0"})+","+ (if(r._2.getFamilyMap("comumnfamily".getBytes).keySet.contains("column2".getBytes)){new String(r._2.getValue("data".getBytes,"area_code".getBytes))}else{"0"}) ) lineRdd.repartition(1).saveAsTextFile("hdfs://hadoop-ha-cluster/hbase2hdfs")
4.spark-shell读写redis
4.1 spark加载redis jar
配置spark,使得spark知道哪里加载jedis的jar,修改配置文件spark-env.sh,添加如下内容
export SPARK_CLASSPATH=/home/hadoop/hbase-1.2.1/lib/*
#加载jedis相关jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/jedis-2.9.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/commons-pool2-2.4.2.jar
4.2 spark-shell通过jedis API写数据到redis-cluster
import redis.clients.jedis.HostAndPort import redis.clients.jedis.JedisCluster sc.textFile("hdfs://hadoop-ha-cluster/auto_data.txt") .repartition(10) .map(line => (line.split(",")(0), line.split(",")(1), line.split(",")(2))) .foreachPartition { iterators => { val jedisClusterNodes = new java.util.HashSet[HostAndPort] val serverList = new java.util.ArrayList[HostAndPort] serverList.add(new HostAndPort("192.168.173.21", 6379)) serverList.add(new HostAndPort("192.168.173.22", 6380)) serverList.add(new HostAndPort("192.168.173.23", 6381)) serverList.add(new HostAndPort("192.168.173.24", 6379)) serverList.add(new HostAndPort("192.168.173.25", 6380)) serverList.add(new HostAndPort("192.168.173.26", 6381)) jedisClusterNodes.addAll(serverList) val jc = new JedisCluster(jedisClusterNodes) iterators.foreach { t => { var key = "auto," + t._1 + "," + t._2 var value = t._3 jc.set(key, value) } } } }
此种方式中JedisCluster不需要序列化,因为使用JedisCluster的地方不在RDD里面,RDD已经通过collect汇聚结果到了当前节点
4.3 spark-shell在RDD操作过程中通过jedis API使用redis-cluster中数据
import java.text.SimpleDateFormat import java.util.{Calendar, Date} import redis.clients.jedis.HostAndPort import redis.clients.jedis.JedisCluster import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Put object JedisClusterObject extends Serializable { var jedisClusterNodes=new java.util.HashSet[HostAndPort] var serverList=new java.util.ArrayList[HostAndPort] serverList.add(new HostAndPort("192.168.173.21",6379)) serverList.add(new HostAndPort("192.168.173.22",6380)) serverList.add(new HostAndPort("192.168.173.23",6381)) serverList.add(new HostAndPort("192.168.173.24",6379)) serverList.add(new HostAndPort("192.168.173.25",6380)) serverList.add(new HostAndPort("192.168.173.26",6381)) jedisClusterNodes.addAll(serverList) val jc:JedisCluster = new JedisCluster(jedisClusterNodes) } sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*") .map(line=>line.split(",")(0)) .map(line=>( line, if(JedisClusterObject.jc.exists("auto,"+line)){JedisClusterObject.jc.get("auto,"+line)}else{line}, if(JedisClusterObject.jc.exists("auto,"+line)){JedisClusterObject.jc.get("auto,"+line)}else{line} )) .saveAsTextFile("hdfs://hadoop-ha-cluster/jedistest")
此种方式JedisCluster必须通过单例并序列化,因为JedisCluster实在RDD中使用,会被序列化后在各个节点计算中使用,否则会提示Task not Serialized :JedisCluster
5 本地开发工具链接spark
5.1 本地IDEA工具中如何链接HADOOP HA环境
hadoop安装的是采用HA的方式,现在本地开发环境开发spark时候,无法解析hadoop-ha方式下的cluster名称,原因是本地程序不知道加载的cluster ha对应的namenode名称和IP,解决办法是通过sparkconf追加参数,让spark 本地local模式知道hadoop ha配置,如下:
val spark = SparkSession .builder() .master("local[2]") .appName("HtSecApp UserEvent Processor") .getOrCreate() val sc = spark.sparkContext val hadoopConf = sc.hadoopConfiguration hadoopConf.set("dfs.nameservices", "mycluster") hadoopConf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider") hadoopConf.set("dfs.ha.namenodes.mycluster", "nn1,nn2") hadoopConf.set("dfs.namenode.rpc-address.mycluster.nn1", "192.168.77.38:9000") hadoopConf.set("dfs.namenode.rpc-address.mycluster.nn2", "192.168.77.39:9000")
能够避免以下错误:
相关推荐
spark官方版本的driver-class-path不支持hdfs路径,只支持本地路径。本资源解决了这个问题,driver-class-path在cluster模式时可以支持hdfs路径,解决了cluster模式driver有大量jar依赖的问题。
HBase 元数据修复工具包。 ①修改 jar 包中的application....②将core-site.xml、hdfs-site.xml添加到BOOT-INF/classes 下; ③开始修复 `java -jar -Drepair.tableName=表名 hbase-meta-repair-hbase-2.0.2.jar`
安装完成后,你可以通过`./bin/spark-shell`启动Spark的交互式Shell,或者使用`./bin/pyspark`启动Python版本的Shell,开始进行数据处理。 总的来说,"spark--bin-hadoop3-without-hive.tgz"提供了一个在CentOS 8和...
- 配置每个组件的配置文件,如Hadoop的`core-site.xml`, `hdfs-site.xml`, `yarn-site.xml`, `mapred-site.xml`,Spark的`spark-defaults.conf`,HBase的`hbase-site.xml`,Hive的`hive-site.xml`,以及Zookeeper的...
2. HDFS兼容:Spark可以直接读写HDFS上的数据,无需将数据复制到其他存储系统。 三、Linux环境下的安装步骤 1. 下载:首先,你需要下载Spark 2.1.0与Hadoop 2.7兼容的二进制包,即`spark-2.1.0-bin-hadoop2.7.tgz`...
标题“spark-2.4.0-hive-hbase-Api.7z”表明这是一个与Apache Spark、Apache Hive和Apache HBase相关的压缩包文件,适用于版本2.4.0。这个压缩包很可能包含了这三个组件的API库,使得开发人员能够在集成环境中进行...
Storm3--Hbase-HDFS-Hive-from-HortonWorks Storm3-来自 HortonWorks 的 Hbase HDFS Hive ====================== 参考来自 Horton Works 教程 ==================== Storm 和 Hadoop 生态系统版本 =============...
- 运行应用:使用Spark Shell或提交Spark应用程序到集群执行。 6. 开发与交互: - 使用Scala、Java、Python或R语言编写Spark应用。 - 使用SparkSubmit工具提交应用程序到集群。 - Spark Shell提供了一个交互式...
5. **客户端连接**:HBase提供了命令行接口(HBase Shell)和Java API,可以用来交互式操作HBase或在应用程序中集成。 ### HBase的数据模型和操作 1. **创建表**:使用`create`命令创建表,指定列族。 2. **插入...
在没有包含Hadoop的版本中,Spark需要用户自行配置HDFS客户端或者其他分布式文件系统以进行数据读写。 安装Spark-3.1.3的过程主要包括以下几个步骤: 1. 解压压缩包:使用tar命令解压文件,例如`tar -xvf spark-...
HBase,全称为Hadoop Distributed File System上的基础结构(HBase on Hadoop Distributed File System),是一种分布式的、面向列的开源数据库,它构建在Apache Hadoop文件系统(HDFS)之上,提供高可靠性、高性能...
Spark 2.2.2支持多种数据源,包括HDFS(Hadoop分布式文件系统)、Cassandra、HBase等,这使得它能无缝集成到Hadoop生态中。此外,它内置了Spark SQL模块,用于执行SQL查询,同时支持DataFrame和Dataset操作,以及...
1. 创建表:使用`hbase shell`进入命令行工具,执行`create '表名', '列族名'`创建表。 2. 插入数据:通过`put '表名', '行键', '列族:列限定符', '值'`命令插入数据。 3. 查询数据:`get '表名', '行键'`获取整行...
13.hbase的工作机制补充--regionserver数据管理--内存缓存热数据--持久化到hdfs的观
Spark提供了HBase connector,允许Spark作业直接读写HBase。在Spark应用中,你需要添加HBase和HBase-connector相关的依赖。在Spark的`spark-defaults.conf`文件中指定HBase的相关配置,如`spark.hadoop.hbase....
5. **交互式Shell**:Spark提供了一个名为`spark-shell`的交互式环境,方便开发人员测试和调试代码。 **Spark与Hadoop 3.2的兼容性** Hadoop 3.2引入了许多新特性,如: 1. **多命名空间**:支持多个HDFS命名空间...
5. **运行Spark**: Spark可以通过命令行工具启动,例如`spark-shell`(交互式Spark会话)或`pyspark`(Python版本的交互式会话)。对于应用程序开发,可以使用Scala、Java、Python或R编写代码,然后通过`spark-...
4. **启动服务**:通过bin目录下的start-hbase.sh脚本启动HBase集群,包括Master节点和RegionServer节点。 5. **监控与管理**:HBase提供了Web UI,用户可以访问http://localhost:16010查看集群状态。此外,命令行...
在实际应用中,Spark可以通过编程接口(API)与多种数据源交互,如HDFS、Cassandra、HBase、Amazon S3等。它的RDD(弹性分布式数据集)模型允许用户在内存中高效地处理数据,大大提高了数据处理速度。Spark还支持...
8. **HBase shell** - 提供命令行接口(CLI)工具,方便用户执行CRUD(创建、读取、更新、删除)操作和管理表。 9. **表设计最佳实践** - 表设计应考虑数据访问模式,如热点问题、范围扫描等。 - 合理设计行键,...