博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark应用程序的提交
阅读量:4215 次
发布时间:2019-05-26

本文共 4951 字,大约阅读时间需要 16 分钟。

spark 应用提交流程\spark-master\spark-master\core\src\main\scala\org\apache\spark\deploy\SparkSubmit.scalaoverride def main(args: Array[String]): Unit = {#新建一个SparkSubmit 最后调用其doSubmit    val submit = new SparkSubmit() {      self =>      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {        new SparkSubmitArguments(args) {          override protected def logInfo(msg: => String): Unit = self.logInfo(msg)          override protected def logWarning(msg: => String): Unit = self.logWarning(msg)        }      }      override protected def logInfo(msg: => String): Unit = printMessage(msg)      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")      override def doSubmit(args: Array[String]): Unit = {        try {#继续调用          super.doSubmit(args)        } catch {          case e: SparkUserAppException =>            exitFn(e.exitCode)          case e: SparkException =>            printErrorAndExit(e.getMessage())        }      }    }#调用其提交函数    submit.doSubmit(args)  }doSubmit的实现如下: def doSubmit(args: Array[String]): Unit = {    appArgs.action match {#应用程序分成下面四种action,我们这次对应的submit      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)      case SparkSubmitAction.KILL => kill(appArgs)      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)      case SparkSubmitAction.PRINT_VERSION => printVersion()    }submit的实现如下:下面这个函数中的调用流程是doRunMain->runMainprivate def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)    def doRunMain(): Unit = {      if (args.proxyUser != null) {        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,          UserGroupInformation.getCurrentUser())        try {          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {#最终执行runMain            override def run(): Unit = {              runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)            }          })        } catch {                  }      } else {        runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)      }    }    } else {#调用内部函数      doRunMain()  }private def runMain(      childArgs: Seq[String],      childClasspath: Seq[String],      sparkConf: SparkConf,      childMainClass: String,      verbose: Boolean): Unit = {#得到装在class的load    val loader =      if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {        new ChildFirstURLClassLoader(new Array[URL](0),          Thread.currentThread.getContextClassLoader)      } else {        new MutableURLClassLoader(new Array[URL](0),          Thread.currentThread.getContextClassLoader)      }    Thread.currentThread.setContextClassLoader(loader)    for (jar <- childClasspath) {      addJarToClasspath(jar, loader)    }    var mainClass: Class[_] = null    try {#根据类名找到类,例如常用的历程sparkworkcount这个类      mainClass = Utils.classForName(childMainClass)    } catch {        throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)    }#创建类的实例    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {      mainClass.newInstance().asInstanceOf[SparkApplication]    } else {      // SPARK-4170      if (classOf[scala.App].isAssignableFrom(mainClass)) {        logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")      }      new JavaMainApplication(mainClass)    }    @tailrec    def findCause(t: Throwable): Throwable = t match {      case e: UndeclaredThrowableException =>        if (e.getCause() != null) findCause(e.getCause()) else e      case e: InvocationTargetException =>        if (e.getCause() != null) findCause(e.getCause()) else e      case e: Throwable =>        e    }    try {#开始运行类,例如sparkworkcount      app.start(childArgs.toArray, sparkConf)    } catch {      case t: Throwable =>        throw findCause(t)    }  }于此同时在doSubmit中提到action有四种,我们之前看了submit的action,平时我们通过shell启动的的过程中会传递一个SparkSubmitAction.PRINT_VERSION     appArgs.action match {      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)      case SparkSubmitAction.KILL => kill(appArgs)      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)      case SparkSubmitAction.PRINT_VERSION => printVersion()    }在printVersion 中我们就看到了熟悉的spark字样 private def printVersion(): Unit = {    logInfo("""Welcome to      ____              __     / __/__  ___ _____/ /__    _\ \/ _ \/ _ `/ __/  '_/   /___/ .__/\_,_/_/ /_/\_\   version %s      /_/                        """.format(SPARK_VERSION))    logInfo("Using Scala %s, %s, %s".format(      Properties.versionString, Properties.javaVmName, Properties.javaVersion))    logInfo(s"Branch $SPARK_BRANCH")    logInfo(s"Compiled by user $SPARK_BUILD_USER on $SPARK_BUILD_DATE")    logInfo(s"Revision $SPARK_REVISION")    logInfo(s"Url $SPARK_REPO_URL")    logInfo("Type --help for more information.")  }

 

转载地址:http://wsnmi.baihongyu.com/

你可能感兴趣的文章
安装.Net Framework 4.7.2时出现“不受信任提供程序信任的根证书中终止”的解决方法
查看>>
input type=“button“与input type=“submit“的区别
查看>>
解决Github代码下载慢问题!
查看>>
LeetCode-栈|双指针-42. 接雨水
查看>>
Linux文件和设备编程
查看>>
文件描述符
查看>>
终端驱动程序:几个简单例子
查看>>
HTML条件注释
查看>>
内核态与用户态
查看>>
使用mingw(fedora)移植virt-viewer
查看>>
趣链 BitXHub跨链平台 (4)跨链网关“初介绍”
查看>>
C++ 字符串string操作
查看>>
MySQL必知必会 -- 了解SQL和MySQL
查看>>
MySQL必知必会 -- 数据检索
查看>>
MySQL必知必会 -- 排序检索数据 ORDER BY
查看>>
POJ 3087 解题报告
查看>>
POJ 2536 解题报告
查看>>
POJ 1154 解题报告
查看>>
POJ 1661 解题报告
查看>>
POJ 1101 解题报告
查看>>