博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark源码分析(四)-Job提交过程
阅读量:5369 次
发布时间:2019-06-15

本文共 3327 字,大约阅读时间需要 11 分钟。

原创文章,转载请注明: 转载自  

    本文将以一个简单的WordCount为例来看看Job的提交过程

      

    由输出的日志可以看出job的提交过程主要经过了SparkContext-》DAGScheduler-》TaskScheduler的处理

    

    先从RDD入手,看看RDD的转化过程。在wordcount程序中一个README.md文件从HadoopRDD最终会转化为MapPartitionsRDD

   

    textFile()函数只是从hdfs、local file等读取文件转换成HadoopRDD,并通过map函数转化为了MappedRDD文件

    

    与RDD相关的一个重要类就是Dependency类,它负责表示RDD之间的依赖关系。包含了NarrowDependency(窄依赖)与ShuffleDependency(宽依赖)两类

   其中NarrowDependency包含一对一的OneToOneDependency与一对多的RangeDependency。在wordcount程序中MappedRDD、FlatMappedRDD都属于

   OneToOneDependency,而ShuffledRDD、MapPartitionsRDD属于ShuffleDependency。

     

    job真正的执行入口是从count这个action开始的

    

  job提交的大致调用链是:sc.runJob()->dagScheduler.runJob->dagScheduler.submitJob->dagSchedulerEventProcessActor.JobSubmitted

  ->dagScheduler.handleJobSubmitted->dagScheduler.submitStage->dagScheduler.submitMissingTasks->taskScheduler.submitTasks

  可以看出job先经过DAGScheduler生成stage,转换成TaskSet后提交给TaskScheduler进行调度。TaskScheduler工作原理在上一节已经分析过了,下面

  主要来分析下DAGScheduler处理job的过程:

    job处理过程中handleJobSubmitted比较关键,finalRDD就是进行action操作前的最后一个RDD,对应wordcount就是MapPartitionsRDD。

private[scheduler] def handleJobSubmitted(jobId: Int,      finalRDD: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      allowLocal: Boolean,      callSite: String,      listener: JobListener,      properties: Properties = null)  {    var finalStage: Stage = null    try {      // New stage creation may throw an exception if, for example, jobs are run on a      // HadoopRDD whose underlying HDFS files have been deleted.      finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))    } catch {      case e: Exception =>        logWarning("Creating new stage failed due to exception - job: " + jobId, e)        listener.jobFailed(e)        return    }    if (finalStage != null) {      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)      clearCacheLocs()       logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(        job.jobId, callSite, partitions.length, allowLocal))      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")      logInfo("Parents of final stage: " + finalStage.parents)      logInfo("Missing parents: " + getMissingParentStages(finalStage)) if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {        // Compute very short actions like first() or take() with no parent stages locally.        listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))        runLocally(job)      } else {        jobIdToActiveJob(jobId) = job        activeJobs += job        resultStageToJob(finalStage) = job        listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,          properties))        submitStage(finalStage)      }    }    submitWaitingStages()  }

    getMissingParentStages函数中会根据finalstage对应finalRDD的dependence类型来创建它的父stage。由于MapPartitionsRDD属于ShuffleDependency,所以上

             面的日志截图中可以看出finalStage(stage 0)的父stage是(stage 1)

      

   submitStage函数中会根据依赖关系划分stage,通过递归调用从finalStage一直往前找它的父stage,直到stage没有父stage时就调用submitMissingTasks方法

   提交改stage。这样就完成了将job划分为一个或者多个stage。

      

  最后会在submitMissingTasks函数中将stage封装成TaskSet通过taskScheduler.submitTasks函数提交给TaskScheduler处理。

 

原创文章,转载请注明: 转载自

转载于:https://www.cnblogs.com/tovin/p/3903478.html

你可能感兴趣的文章
ubuntu下USB连接Android手机
查看>>
C# 语句 分支语句 switch----case----.
查看>>
反射获取 obj类 的属性 与对应值
查看>>
表单中的readonly与disable的区别(zhuan)
查看>>
win10下安装配置mysql-8.0.13--实战可用
查看>>
周记2018.8.27~9.2
查看>>
MySQL中 1305-FUNCTION liangshanhero2.getdate does not exit 问题解决
查看>>
python序列化和json
查看>>
mongodb
查看>>
网格与无网格
查看>>
SSH-struts2的异常处理
查看>>
《30天自制操作系统》学习笔记--第14天
查看>>
LGPL协议的理解
查看>>
1、Python基础
查看>>
Unity The Tag Attribute Matching Rule
查看>>
试着理解下kvm
查看>>
WebService学习总结(二)--使用JDK开发WebService
查看>>
Tizen参考手机RD-210和RD-PQ
查看>>
竞价广告系统-位置拍卖理论
查看>>
策略模式 C#
查看>>