`
qindongliang1922
  • 浏览: 2184441 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117546
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125933
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59928
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71305
社区版块
存档分类
最新评论

Spark如何在一个SparkContext中提交多个任务

阅读更多
在使用spark处理数据的时候,大多数都是提交一个job执行,然后job内部会根据具体的任务,生成task任务,运行在多个进程中,比如读取的HDFS文件的数据,spark会加载所有的数据,然后根据block个数生成task数目,多个task运行中不同的进程中,是并行的,如果在同一个进程中一个JVM里面有多个task,那么多个task也可以并行,这是常见的使用方式。


考虑下面一种场景,在HDFS上某个目录下面有10个文件,我想要同时并行的去统计每个文件的数量,应该怎么做? 其实spark是支持在一个spark context中可以通过多线程同时提交多个任务运行,然后spark context接到这所有的任务之后,通过中央调度,在来分配执行各个task,最终任务完成程序退出。

下面就来看下如何使用多线程提交任务,可以直接使用new Thread来创建线程提交,但是不建议这么做,推荐的做法是通过Executors线程池来异步管理线程,尤其是在提交的任务比较多的时候用这个会更加方便。

核心代码如下:

   def main(args: Array[String]): Unit = {

    val sparkConf=new SparkConf()
    //实例化spark context
    val sc=new SparkContext(sparkConf)
    sparkConf.setAppName("multi task submit ")
    //保存任务返回值
    val list=new util.ArrayList[Future[String]]()
    //并行任务读取的path
    val task_paths=new util.ArrayList[String]()
    task_paths.add("/tmp/data/path1/")
    task_paths.add("/tmp/data/path2/")
    task_paths.add("/tmp/data/path3/")

    //线程数等于path的数量
    val nums_threads=task_paths.size()
    //构建线程池
    val executors=Executors.newFixedThreadPool(nums_threads)
    for(i<-0 until  nums_threads){
       val task= executors.submit(new Callable[String] {
          override def call(): String ={
              val count=sc.textFile(task_paths.get(i)).count()//获取统计文件数量
            return task_paths.get(i)+" 文件数量: "+count
          }
        })

      list.add(task)//添加集合里面
    }
    //遍历获取结果
    list.asScala.foreach(result=>{
      log.info(result.get())
    })
    //停止spark
    sc.stop()

  }
 

 


可以看到使用scala写的代码比较精简,这样就完成了一个并行task提交的spark任务,最后我们打包完毕后,上传到linux上进行提交,命令如下:

 
 /opt/bigdata/spark/bin/spark-submit  
 --class  MultiTaskSubmit  
 --master yarn  
 --deploy-mode cluster 
 --executor-cores 3 
 --driver-memory 1g 
 --executor-memory 1g
 --num-executors 10
 --jars  $jars   task.jar
 




最后需要注意一点,在线程里面调用的方法如果包含一些全局加载的属性,最好放在线程的成员变量里面进行初始化,否则多个线程去更改全局属性,有可能会造成一些未知的问题。

有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
0
1
分享到:
评论

相关推荐

    Hive on Spark源码分析DOC

    在任务执行过程中,Hive on Spark 会将 Task 分配到多个 executor 中执行,每个 executor 负责执行一个 Task。 executor 会将执行结果返回给 Hive on Spark,然后将结果返回给用户。 Hive on Spark 源码分析是指将 ...

    Apache Spark源码走读之2 -- Job的提交与运行

    在Spark中,有一个核心概念叫做Driver Program,它负责创建SparkContext实例。SparkContext是用户程序和Spark集群之间的桥梁,负责初始化Spark应用运行所需的各种参数和环境。在SparkShell中,用户提交的代码会首先...

    java提交spark任务到yarn平台的配置讲解共9页

    在Spark生态系统中,Java作为广泛使用的编程语言...总的来说,Java提交Spark任务到YARN平台涉及环境配置、代码编写、任务打包和提交等多个步骤。理解并熟练掌握这些步骤对于开发和运行大规模分布式Spark应用至关重要。

    spark 的schedule的原理

    首先,Spark采用了stage-oriented scheduling(基于阶段的调度)策略,将整个任务分成多个阶段(Stage),每个阶段包含了多个任务集合(TaskSet)。在Spark内部,提交的任务首先被划分为一系列的RDD(弹性分布式数据...

    Spark简介以及其生态圈

    在术语定义中,Application指的是用户编写的Spark应用程序,它由运行main()函数的Driver和分布在集群中多个节点上运行的Executor组成。Driver创建SparkContext以准备运行环境,SparkContext负责与ClusterManager通信...

    spark初始化源码阅读sparkonyarn的client和cluster区别

    Spark 是一个大数据处理的开源框架,它可以在多种集群管理器上运行,如 YARN、Mesos 和 Standalone。Spark on YARN 是 Spark 在 YARN 集群管理器上的实现。了解 Spark 的初始化源码对于深入理解 Spark 的工作原理和...

    Spark-内核源码解析.docx

    在 Spark 中,Application 指的是用户编写的 Spark 应用程序/代码,包含了 Driver 功能代码和分布在集群中多个节点上运行的 Executor 代码。一个 Spark 应用程序由一个或多个作业 JOB 组成,Driver 负责和 Cluster...

    spark 分布式集群搭建

    当创建 SparkContext 并启动一个 Spark 应用程序时,会经历以下关键步骤: - 创建 DAGScheduler 和 TaskScheduler 两个核心组件。 - DAGScheduler 负责根据应用程序的依赖关系构建执行计划,划分成多个 Stage,并为...

    spark_jar包

    Spark作为一个快速、通用且可扩展的数据处理框架,它为大数据处理提供了丰富的API,支持Scala、Java、Python和R等多种编程语言。Spark_JAR包的使用极大地简化了在分布式环境中的数据处理任务。 Spark的主要特性包括...

    【Spark内核篇03】Spark任务调度机制1

    Spark任务调度机制是Spark核心功能之一,它负责高效地组织和执行Spark应用程序中的计算任务。在深入理解这一机制之前,我们需要了解几个关键概念:Job、Stage和Task。 1. Job:Job是由Spark应用程序中的Action操作...

    spark原理.docx

    在 SparkContext 初始化的过程中被实例化,一个 SparkContext 对应创建一个 DAGScheduler。 总结:DAGScheduler 完成以下工作: 1. DAGScheduler 划分 Stage(TaskSet),记录哪个 RDD 或者 Stage 输出被物化...

    spark overview

    用户编写的程序在集群上运行时,通常会包含一个Driver和多个executors,共同完成整个应用程序的计算任务。在Spark的独立部署模式中,用户需要关注如何有效地配置和管理集群资源,以便能够高效地运行Spark作业。

    WordCount_Spark!_spark_wordcount_java_

    **Spark上的WordCount程序详解** ...通过这个简单的例子,我们可以深入理解Spark的工作原理,以及如何在Java环境中编写并执行Spark任务。在实际应用中,这些基本操作可以扩展到更复杂的分布式数据处理场景。

    cloudera-spark 官方文档

    Spark 应用由多个任务组成,这些任务被分发到集群中的各个节点上执行。每个Spark应用都有一个主进程称为“Driver”,它负责调度任务并监控它们的执行情况。 - **安装配置**:首先需要安装Spark环境,通常可以通过...

    大数据技术原理及应用课实验7 :Spark初级编程实践

    2. 求平均值:这个任务需要计算多个文件中所有学生的平均成绩。首先,将所有包含成绩的文件加载到Spark,然后将数据转换为键值对形式,键是学生名字,值是成绩。接着,可以使用`groupByKey`和`mapValues`操作,`...

    spark2.1.0.chm(spark java API)

    Spark,作为一个分布式计算框架,因其高效、灵活和易用的特性,在大数据处理领域深受青睐。Spark 2.1.0版本对Java API进行了全面优化,使得Java开发者能够更加便捷地利用Spark进行大规模数据处理。本篇文章将围绕`...

    Spark核心技术原理透视一Spark运行原理.pdf

    1. Application:Spark应用程序,指的是用户编写的Spark应用程序,包含了Driver功能代码和分布在集群中多个节点上的Executor代码。 2. Driver:驱动程序,Spark中的Driver即运行上述Application的Main()函数,并且...

    spark考试练习题含答案.rar

    2. **Spark Job**:一系列操作的集合,由SparkContext提交执行。 3. **Spark Application**:一个完整的Spark程序,包括main方法和Job。 4. **DAG(Directed Acyclic Graph)**:Spark将一系列操作转化为有向无环...

    Spark-master

    Spark是Apache软件基金会下的一个开源大数据处理框架,其核心组件包括Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图计算)。Spark以其高效、易用和可扩展性著称,尤其在内存计算方面,...

    spark入门培训

    4. Executor:是一个进程(对应JVM进程),可能会执行多个任务(Task)。它是为执行任务而分配给应用程序的资源单元。 四、单机配置Spark 1. 在Linux系统中配置Spark: a. 安装JDK1.8,并设置JAVA_HOME环境变量。 ...

Global site tag (gtag.js) - Google Analytics