本文共 4951 字,大约阅读时间需要 16 分钟。
spark 应用提交流程\spark-master\spark-master\core\src\main\scala\org\apache\spark\deploy\SparkSubmit.scalaoverride def main(args: Array[String]): Unit = {#新建一个SparkSubmit 最后调用其doSubmit val submit = new SparkSubmit() { self => override protected def parseArguments(args: Array[String]): SparkSubmitArguments = { new SparkSubmitArguments(args) { override protected def logInfo(msg: => String): Unit = self.logInfo(msg) override protected def logWarning(msg: => String): Unit = self.logWarning(msg) } } override protected def logInfo(msg: => String): Unit = printMessage(msg) override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg") override def doSubmit(args: Array[String]): Unit = { try {#继续调用 super.doSubmit(args) } catch { case e: SparkUserAppException => exitFn(e.exitCode) case e: SparkException => printErrorAndExit(e.getMessage()) } } }#调用其提交函数 submit.doSubmit(args) }doSubmit的实现如下: def doSubmit(args: Array[String]): Unit = { appArgs.action match {#应用程序分成下面四种action,我们这次对应的submit case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() }submit的实现如下:下面这个函数中的调用流程是doRunMain->runMainprivate def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {#最终执行runMain override def run(): Unit = { runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose) } }) } catch { } } else { runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose) } } } else {#调用内部函数 doRunMain() }private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sparkConf: SparkConf, childMainClass: String, verbose: Boolean): Unit = {#得到装在class的load val loader = if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) { new ChildFirstURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } else { new MutableURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } Thread.currentThread.setContextClassLoader(loader) for (jar <- childClasspath) { addJarToClasspath(jar, loader) } var mainClass: Class[_] = null try {#根据类名找到类,例如常用的历程sparkworkcount这个类 mainClass = Utils.classForName(childMainClass) } catch { throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS) }#创建类的实例 val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.newInstance().asInstanceOf[SparkApplication] } else { // SPARK-4170 if (classOf[scala.App].isAssignableFrom(mainClass)) { logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") } new JavaMainApplication(mainClass) } @tailrec def findCause(t: Throwable): Throwable = t match { case e: UndeclaredThrowableException => if (e.getCause() != null) findCause(e.getCause()) else e case e: InvocationTargetException => if (e.getCause() != null) findCause(e.getCause()) else e case e: Throwable => e } try {#开始运行类,例如sparkworkcount app.start(childArgs.toArray, sparkConf) } catch { case t: Throwable => throw findCause(t) } }于此同时在doSubmit中提到action有四种,我们之前看了submit的action,平时我们通过shell启动的的过程中会传递一个SparkSubmitAction.PRINT_VERSION appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() }在printVersion 中我们就看到了熟悉的spark字样 private def printVersion(): Unit = { logInfo("""Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version %s /_/ """.format(SPARK_VERSION)) logInfo("Using Scala %s, %s, %s".format( Properties.versionString, Properties.javaVmName, Properties.javaVersion)) logInfo(s"Branch $SPARK_BRANCH") logInfo(s"Compiled by user $SPARK_BUILD_USER on $SPARK_BUILD_DATE") logInfo(s"Revision $SPARK_REVISION") logInfo(s"Url $SPARK_REPO_URL") logInfo("Type --help for more information.") }
转载地址:http://wsnmi.baihongyu.com/