我们在windows开发机上使用spark的local模式读取远程hadoop集群中的hdfs上的数据,这样的目的是方便快速调试,而不用每写一行代码或者一个方法,一个类文件都需要打包成jar上传到linux上,再扔到正式的集群上进行测试,像功能性验证直接使用local模式来快速调测是非常方便的,当然功能测试之后,我们还需要打包成jar仍到集群上进行其他的验证比如jar包的依赖问题,这个在local模式是没法测的,还有集群运行的调优参数,这些都可以在正式仍到集群时验证。
一个样例代码如下:
def main(args: Array[String]): Unit = {
//指定local模式
val conf = new SparkConf().setMaster("local[2]").setAppName("read kp data to kafka")
val sc= new SparkContext(conf)
//支持通配符路径,支持压缩文件读取
val rrd=sc.textFile("hdfs://192.168.10.4:8020/data/log/{20170227,20170228}/tomcat-log*")
//提到到集群模式时,去掉uri地址,如果有双namenode,可以自动容灾
//val rrd=sc.textFile("/data/log/{20170227,20170228}/tomcat-log*")
//统计数量
println(rrd.count())
//停止spark
sc.stop()
}
如何在spark中遍历数据时获取文件路径:
val path:String="hdfs://192.168.10.4:8020/data/userlog/{20170226}/kp*"
val text= sc.newAPIHadoopFile[LongWritable,Text,TextInputFormat](path)
val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((inputSplit, iterator) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(tup => (file.getPath, tup._2)) // 返回的K=全路径 V=每一行的值
}
)
linesWithFileNames.foreach(println)
如果遍历压缩文件时想要获取文件名,就使用newAPIHadoopFile,此外在本地调试下通过之后,提交到集群运行的时候,一定要把uri去掉,本地加上是想让它远程读取方便调试使用,如果正式运行去掉uri在双namenode的时候可以自动兼容,不去反而成一个隐患了。
最后我们可以通过spark on yarn模式提交任务,一个例子如下:
jars=`echo /home/search/x_spark_job/libs/*jar | sed 's/ /,/g'`
bin/spark-submit --class KSearch --master yarn --jars $jars /home/search/x_spark_job/kp-1.0.0.jar
这里用spark提交有另外一个优势,就是假如我开发的不是YARN应用,就是代码里没有使用SparkContext,而是一个普通的应用,就是读取mysql一个表的数据,写入另外一个mysql,这里跟MR没有关系,但是我依然可以用spark-sumbit提交,这时候是不会提交到YARN上的,但是程序会按普通程序运行,程序依赖的jar包,直接使用--jars传入就行,这一点非常方便,尤其是应用有多个依赖时,比如依赖es,hadoop,hbase,redis,fastjson,我打完包后的程序是瘦身的只有主体jar非常小,依赖的jar我可以不打到主体jar里面,在外部用的时候传入,方便共用并灵活性大大提高。
最后,spark的wholeTextFiles对gz压缩的支持不太友好,不能直接访问,相关问题,请参考:
http://stackoverflow.com/questions/24402737/how-to-read-gz-files-in-spark-using-wholetextfiles
http://stackoverflow.com/questions/36604145/read-whole-text-files-from-a-compression-in-spark
http://stackoverflow.com/questions/24402737/how-to-read-gz-files-in-spark-using-wholetextfiles?rq=1
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
为了与Hadoop集群对接,还需要配置Spark的`spark-env.sh`文件,设置Hadoop的相关路径。 5. **运行Spark**: Spark可以通过命令行工具启动,例如`spark-shell`(交互式Spark会话)或`pyspark`(Python版本的交互式...
总的来说,"spark-2.3.4-bin-hadoop2.7.tgz"是一个包含完整Spark环境的压缩包,适合本地或集群环境中的数据分析和处理。通过pyspark,Python开发者可以充分利用Spark的性能和功能,进行高效的大数据处理任务。
Spark Local模式是Spark的一种运行模式,它允许开发者在单机环境中快速进行开发和测试,而无需搭建复杂的分布式集群。在本项目中,我们将使用IntelliJ IDEA(IDEA)作为集成开发环境,通过Maven来管理依赖,以实现对...
- **Hadoop的联系:**尽管Spark是为了改善Hadoop的不足而设计的,但它仍然依赖于Hadoop的HDFS进行存储,并且可以在Hadoop集群上运行。Hadoop的HDFS为Spark提供了存储基础,而MapReduce则为Spark提供了计算框架。 *...
例如,Spark可以通过YARN(Yet Another Resource Negotiator)调度器来管理资源,从而实现与Hadoop集群的整合。 #### 二、Spark如何改进Hadoop - **减少磁盘I/O**: - **内存计算**:Spark通过将中间计算结果存储...
在"Spark local下 WordCount运行示例"中,我们将探讨如何在本地模式(local mode)下使用Spark执行一个简单的WordCount程序。WordCount是大数据处理领域的一个经典例子,用于统计文本中各个单词出现的次数。 首先,...
Spark Standalone 是 Apache Spark 提供的一种自带的集群管理模式,主要用于管理 Spark 应用程序的执行环境。这种模式简单易用,适合于开发测试以及中小型生产环境。 #### Spark Standalone 部署配置 ##### ...
- **提交至Spark运行**:使用`spark-submit`命令提交JAR文件到Spark集群。 通过以上知识点的学习,可以掌握如何在Linux环境下安装配置Hadoop和Spark,熟悉HDFS的基本操作命令,并能利用Spark对本地文件和HDFS文件...
此外,还需要启动Hadoop的相关服务,如namenode、datanode等,然后使用Spark的Local模式或者Hadoop的Mini Cluster模式运行Spark作业。 总之,Hadoop Common是Spark开发中必不可少的依赖,它提供与HDFS交互的接口和...
在Windows环境下,Hadoop通常用于模拟集群环境,为Spark提供数据存储和计算支持。 二、Hadoop 2.7.6在Windows上的安装 1. 下载:首先从官方网站或第三方源获取Hadoop 2.7.6的zip压缩包,解压至指定目录。 2. 配置...
### Hadoop & Spark 使用教程详解 #### 一、Hadoop 和 Spark 概览 Hadoop 和 Spark 是目前大数据处理领域中最流行的两个框架。Hadoop 主要由 HDFS(Hadoop Distributed File System)和 MapReduce 组成,前者用于...
2. 读取 HDFS 集群数据计算 WordCount:使用 spark-shell 读取 HDFS 文件,计算 WordCount。 standalone 集群模式: 1. 软件上传并解压。 2. 修改配置文件。 3. 软件包的分发。 4. 启动集群。 5. 验证:使用 spark-...
而在Linux环境中,可以使用命令行工具提交Spark作业到HDFS,或者通过Scala编程接口直接读取HDFS上的数据进行处理。 总之,Spark的安装和部署是一个综合性的过程,涉及到多个组件的配置和协调。理解和掌握这些知识点...
Spark可以与Hadoop集群配合使用,但是它可以通过RDD(弹性分布式数据集)在内存中处理数据,比Hadoop MapReduce的磁盘I/O操作更快,因此在处理迭代算法和交互式数据分析方面具有优势。 Hadoop和Spark的优化方法有很...
2. 使用数据分区策略优化数据读取和计算效率。 3. 开启压缩,减少网络传输和存储开销。 4. 使用广播变量和积累器优化通信。 5. 根据任务类型选择合适的调度策略,如FIFO、FAIR或自定义策略。 通过上述步骤和注意...
1. **读取数据**: 可以使用`spark.read()`方法从多种数据源读取数据,如CSV、JSON、Parquet等。例如,从CSV文件读取数据: ```java DataFrame df = spark.read().format("csv").option("header", "true").load(...
- **数据加载与转换**:通过`SparkContext`读取各种数据源,如HDFS、Cassandra、HBase等,并使用DataFrame/Dataset API进行数据转换。 - **并行操作**:使用map、filter、reduceByKey等操作对数据进行并行处理。 ...
1. **spark.master**:指定Spark运行的模式,如`local`(本地模式),`spark://master:7077`(集群模式)或`yarn`(YARN模式)。 2. **spark.executor.instances**:设置Spark应用的执行器(Executor)数量。 3. **...
使用 Spark Submit,可以将 Python 脚本提交到 Spark 集群中执行。 8. Log4j Log4j 是一个日志记录工具,用于记录 Spark 的执行日志。使用 Log4j,可以配置日志的记录级别、输出格式和其他参数。 9. SparkContext...
### Spark在虚拟机中的安装与配置详解 ...通过以上步骤,可以在虚拟机中完成Scala和Spark的安装配置,并且能够进行基本的操作和集群模式下的使用。这些步骤不仅适用于学习Spark的基础知识,也适用于实际项目中的部署。