博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark修炼之道(高级篇)——Spark源码阅读:第一节 Spark应用程序提交流程
阅读量:7197 次
发布时间:2019-06-29

本文共 9559 字,大约阅读时间需要 31 分钟。

作者:摇摆少年梦

微信号: zhouzhihubeyond

spark-submit 脚本应用程序提交流程

在运行Spar应用程序时,会将spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下:

root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin# ./spark-submit --master spark://sparkmaster:7077 --class SparkWordCount --executor-memory 1g /root/IdeaProjects/SparkWordCount/out/artifacts/SparkWord Count_jar/SparkWordCount.jar  hdfs://ns1/README.md hdfs://ns1/SparkWordCountResult

为弄清楚整个流程,我们先来分析一下spark-submit脚本,spark-submit脚本内容如下:

#!/usr/bin/env bashSPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"# disable randomized hash for string in Python 3.3+export PYTHONHASHSEED=0#spark-submit最终调用的是spark-class脚本#传入的类是org.apache.spark.deploy.SparkSubmit#及其它传入的参数,如deploy mode、executor-memory等exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

spark-class脚本会加载spark配置的环境变量信息、定位依赖包spark-assembly-1.5.0-hadoop2.4.0.jar文件(以spark1.5.0为例)等,然后再调用org.apache.spark.launcher.Main正式启动Spark应用程序的运行,具体如下:

# Figure out where Spark is installed#定位SAPRK_HOME目录export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"#加载load-spark-env.sh,运行环境相关信息#例如配置文件conf下的spark-env.sh等. "$SPARK_HOME"/bin/load-spark-env.sh# Find the java binary# 定位JAVA_HOME目录if [ -n "${JAVA_HOME}" ]; then  RUNNER="${JAVA_HOME}/bin/java"else  if [ `command -v java` ]; then    RUNNER="java"  else    echo "JAVA_HOME is not set" >&2    exit 1  fifi# Find assembly jar#定位spark-assembly-1.5.0-hadoop2.4.0.jar文件(以spark1.5.0为例)#这意味着任务提交时无需将该JAR文件打包SPARK_ASSEMBLY_JAR=if [ -f "$SPARK_HOME/RELEASE" ]; then  ASSEMBLY_DIR="$SPARK_HOME/lib"else  ASSEMBLY_DIR="$SPARK_HOME/assembly/target/scala-$SPARK_SCALA_VERSION"finum_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" ]; then  echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2  echo "You need to build Spark before running this program." 1>&2  exit 1fiASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"if [ "$num_jars" -gt "1" ]; then  echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2  echo "$ASSEMBLY_JARS" 1>&2  echo "Please remove all but one jar." 1>&2  exit 1fiSPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"# Add the launcher build dir to the classpath if requested.if [ -n "$SPARK_PREPEND_CLASSES" ]; then  LAUNCH_CLASSPATH="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"fiexport _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"# The launcher library will print arguments separated by a NULL character, to allow arguments with# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating# an array that will be used to exec the final command.#执行org.apache.spark.launcher.Main作为Spark应用程序的主入口CMD=()while IFS= read -d '' -r ARG; do  CMD+=("$ARG")done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")exec "${CMD[@]}"

从上述代码中,可以看到,通过org.apache.spark.launcher.Main类启动org.apache.spark.deploy.SparkSubmit的执行,SparkSubmit部分源码如下:

//SparkSubmit Main方法def main(args: Array[String]): Unit = {    //任务提交时设置的参数,见图2    val appArgs = new SparkSubmitAarguments(args)    if (appArgs.verbose) {      // scalastyle:off println      printStream.println(appArgs)      // scalastyle:on println    }    appArgs.action match {      //任务提交时,执行submit(appArgs)      case SparkSubmitAction.SUBMIT => submit(appArgs)      case SparkSubmitAction.KILL => kill(appArgs)      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)    }  }

这里写图片描述

图1 appArgs = new SparkSubmitAarguments(args)参数

进入submit方法:

/**   * Submit the application using the provided parameters.   *   * This runs in two steps. First, we prepare the launch environment by setting up   * the appropriate classpath, system properties, and application arguments for   * running the child main class based on the cluster manager and the deploy mode.   * Second, we use this launch environment to invoke the main method of the child   * main class.   */  private def submit(args: SparkSubmitArguments): Unit = {   //运行参数等信息,见图2    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)    //定义在submit方法中的方法doRunMain()    def doRunMain(): Unit = {      if (args.proxyUser != null) {        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,          UserGroupInformation.getCurrentUser())        try {          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {            override def run(): Unit = {              //执行runMain方法              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)            }          })        } catch {          case e: Exception =>            // Hadoop's AuthorizationException suppresses the exception's stack trace, which            // makes the message printed to the output by the JVM not very helpful. Instead,            // detect exceptions with empty stack traces here, and treat them differently.            if (e.getStackTrace().length == 0) {              // scalastyle:off println              printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")              // scalastyle:on println              exitFn(1)            } else {              throw e            }        }      } else {        //执行runMain方法        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)      }    }     // In standalone cluster mode, there are two submission gateways:     //   (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper     //   (2) The new REST-based gateway introduced in Spark 1.3     // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over     // to use the legacy gateway if the master endpoint turns out to be not a REST server.    if (args.isStandaloneCluster && args.useRest) {      try {        // scalastyle:off println        printStream.println("Running Spark using the REST application submission protocol.")        // scalastyle:on println        //调用submit方法中的doRunMain方法        doRunMain()      } catch {        // Fail over to use the legacy submission gateway        case e: SubmitRestConnectionException =>          printWarning(s"Master endpoint ${args.master} was not a REST server. " +            "Falling back to legacy submission gateway instead.")          args.useRest = false          submit(args)      }    // In all other modes, just run the main class as prepared    } else {       //调用submit方法中的doRunMain方法      doRunMain()    }  }

这里写图片描述

图2 任务提交时设置的参数,

从上面的代码可以看到,最终调用的是runMain方法,其源码如下:

/**   * Run the main method of the child class using the provided launch environment.   *   * Note that this main class will not be the one provided by the user if we're   * running cluster deploy mode or python applications.   */  private def runMain(      childArgs: Seq[String],      childClasspath: Seq[String],      sysProps: Map[String, String],      childMainClass: String,      verbose: Boolean): Unit = {    // scalastyle:off println    if (verbose) {      printStream.println(s"Main class:\n$childMainClass")      printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")      printStream.println(s"System properties:\n${sysProps.mkString("\n")}")      printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")      printStream.println("\n")    }    // scalastyle:on println    val loader =      if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {        new ChildFirstURLClassLoader(new Array[URL](0),          Thread.currentThread.getContextClassLoader)      } else {        new MutableURLClassLoader(new Array[URL](0),          Thread.currentThread.getContextClassLoader)      }    Thread.currentThread.setContextClassLoader(loader)    for (jar <- childClasspath) {      addJarToClasspath(jar, loader)    }    for ((key, value) <- sysProps) {      System.setProperty(key, value)    }    var mainClass: Class[_] = null    try {      //复用反射加载childMainClass,这里为SparkWordCount      mainClass = Utils.classForName(childMainClass)    } catch {      case e: ClassNotFoundException =>        e.printStackTrace(printStream)        if (childMainClass.contains("thriftserver")) {          // scalastyle:off println          printStream.println(s"Failed to load main class $childMainClass.")          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")          // scalastyle:on println        }        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)    }    // SPARK-4170    if (classOf[scala.App].isAssignableFrom(mainClass)) {      printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")    }    //调用反射机制加载main方法,即SparkWordCount中的main方法    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)    if (!Modifier.isStatic(mainMethod.getModifiers)) {      throw new IllegalStateException("The main method in the given main class must be static")    }    def findCause(t: Throwable): Throwable = t match {      case e: UndeclaredThrowableException =>        if (e.getCause() != null) findCause(e.getCause()) else e      case e: InvocationTargetException =>        if (e.getCause() != null) findCause(e.getCause()) else e      case e: Throwable =>        e    }    try {      //执行main方法,即执行SparkWordCount      mainMethod.invoke(null, childArgs.toArray)    } catch {      case t: Throwable =>        throw findCause(t)    }  }

mainMethod.invoke(null, childArgs.toArray)方法执行完毕后,进入SparkWordCount的main方法,执行Spark应用程序,如下图

这里写图片描述
至此,正式完成Spark应用程序执行的提交。

你可能感兴趣的文章
我的友情链接
查看>>
python 安装mssql扩展
查看>>
js实现点击copy,可兼容
查看>>
oracle安装笔记 win
查看>>
我的友情链接
查看>>
新建文章 1
查看>>
Web应用添加Struts 2 的支持(SSH框架研究)
查看>>
CrowdStrike:我们是如何发现Win64bit提权0day漏洞(CVE-2014-4113)
查看>>
众云推一站式营销让企业移动营销先人一步
查看>>
过度营销 微信未来将低于腾讯预期
查看>>
在PHP语言中使用JSON
查看>>
GrideView 网格控件
查看>>
系统管理员-Linux基础学习-第一部分内容。
查看>>
Django 入门学习(3)
查看>>
Powershell + Nagios 监控 VEEAM 备份状态
查看>>
布雷(扫雷游戏)
查看>>
收集整理了一些免费的区块链、以太坊技术开发相关的文件,有需要的拿去
查看>>
SKYPE原理分析
查看>>
Add Digits(leetcode258)
查看>>
6.3-全栈Java笔记:异常处理方法(上)
查看>>