SparkSubmit提交yarn流程分析(学习版)

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

SparkSubmit提交流程分析

tips:分析基于如下执行命令开始

./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-example_2.12-3.0.0.jar \
10

首先执行了spark-submit这个脚本程序,找到这个脚本的代码

#!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

#exec 调用spark-class脚本  然后传入SparkSubmit这个类 和 上面那一堆参数
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

然后我们去看spark-class这个脚本的代码(只关注重点版):

#!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

. "${SPARK_HOME}"/bin/load-spark-env.sh

if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
  echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
  echo "You need to build Spark with the target \"package\" before running this program." 1>&2
  exit 1
else
  LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi

if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

#3.$RUNNER="${JAVA_HOME}/bin/java"  调用类路径中的org.apache.spark.launcher.Main类 参数为spark-submit指定的所有参数,在这里调用launcher生成下面jvm command
build_command() {
  "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
  if [ "$CMD_START_FLAG" == "true" ]; then
  	#2.CMD在这个循环里一直做累加,这个循环通过build_command把参数准备好
    CMD+=("$ARG")
  else
    if [ "$ARG" == $'\0' ]; then
      DELIM=''
      CMD_START_FLAG="true"
    elif [ "$ARG" != "" ]; then
      echo "$ARG"
    fi
  fi
done < <(build_command "$@")

#1。我们执行了一个cmd,这个cmd从哪儿来的
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

最后执行的cmd:

/usr/lib/java/jdk1.8.0_144/bin/java -cp \
/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/conf/:/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/jars/* \
-Xmx1g \
org.apache.spark.deploy.SparkSubmit \
--master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
./examples/jars/spark-example_2.12-3.0.0.jar

所以,spark提交脚本很关键的点在于org.apache.spark.deploy.SparkSubmit这个类是怎么运作的,其他的都是参数,我们就先看看这个类的代码:

//一个可以运行的类肯定有main方法,所以我们从main方法开始
  override def main(args: Array[String]): Unit = {
    //new 了一个sparksubmit的匿名内部类
    val submit = new SparkSubmit() {
      self =>

      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
        new SparkSubmitArguments(args) {
          override protected def logInfo(msg: => String): Unit = self.logInfo(msg)

          override protected def logWarning(msg: => String): Unit = self.logWarning(msg)

          override protected def logError(msg: => String): Unit = self.logError(msg)
        }
      }

      override protected def logInfo(msg: => String): Unit = printMessage(msg)

      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")

      override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")
	 //所以是执行了这个方法,这个方法又调用的父类的doSubmit(args)
      override def doSubmit(args: Array[String]): Unit = {
        try {
          super.doSubmit(args)
        } catch {
          case e: SparkUserAppException =>
            exitFn(e.exitCode)
        }
      }

    }
	//然后用匿名内部类执行了一个dosubmit方法,此方法在匿名内部类里已被重写
    submit.doSubmit(args)
  }

1.从super.dosubmit开始的提交流程

def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    // 这个是日志,暂且不看
    val uninitLog = initializeLogIfNecessary(true, silent = true)
	
    // *parseArguments这个方法返回了appArgs,作用在于解析参数
    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    //这里模式匹配 appArgs.action属性一定在下面这四个之中,所以我们从parseArguments方法开始
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }

1.1 parseArguments(args)

protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
    //构造方法 执行了关键的1.1.1 和 1.1.2 两个东西
  new SparkSubmitArguments(args)
}

1.1.1 SparkSubmitArguments(args)

try {
    //代码块儿
  parse(args.asJava)
} catch {
  case e: IllegalArgumentException =>
    SparkSubmit.printErrorAndExit(e.getMessage())
}

1.1.2 loadEnvironmentArguments()

1.1.1.1 parse(args.asJava)
//很明显了嘛,在爪子嘛,在格式化输入的参数撒
protected final void parse(List<String> args) {
    //这个就是分离参数的正则表达式
  Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

  int idx = 0;
  for (idx = 0; idx < args.size(); idx++) {
    String arg = args.get(idx);
    String value = null;

    Matcher m = eqSeparatedOpt.matcher(arg);
    if (m.matches()) {
      arg = m.group(1);
      value = m.group(2);
    }

    // Look for options with a value.
    String name = findCliOption(arg, opts);
    if (name != null) {
      if (value == null) {
        if (idx == args.size() - 1) {
          throw new IllegalArgumentException(
              String.format("Missing argument for option '%s'.", arg));
        }
        idx++;
        value = args.get(idx);
      }
      if (!handle(name, value)) {
        break;
      }
      continue;
    }

    // Look for a switch.
    name = findCliOption(arg, switches);
    if (name != null) {
        // * 这里就是参数解析的关键函数
      if (!handle(name, null)) {
        break;
      }
      continue;
    }

    if (!handleUnknown(arg)) {
      break;
    }
  }

  if (idx < args.size()) {
    idx++;
  }
  handleExtraArgs(args.subList(idx, args.size()));
}
1.1.1.1.1 handle(name, null)
//看到这个模式匹配是不是一下就清晰了,找到这个参数,然后给属性赋值
override protected def handle(opt: String, value: String): Boolean = {
  opt match {
  // protected final String NAME = "--name";
    case NAME =>
      name = value
  // protected final String MASTER = "--master";
    case MASTER =>
      master = value
  // protected final String CLASS = "--class";
    case CLASS =>
      mainClass = value

    case NUM_EXECUTORS =>
      numExecutors = value

    case TOTAL_EXECUTOR_CORES =>
      totalExecutorCores = value

    case EXECUTOR_CORES =>
      executorCores = value

    case EXECUTOR_MEMORY =>
      executorMemory = value

    case DRIVER_MEMORY =>
      driverMemory = value

    case DRIVER_CORES =>
      driverCores = value

    case _ =>
      throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
  }
  true
}

1.1.2 loadEnvironmentArguments()

// Action should be SUBMIT unless otherwise specified
//第一次执行action为空 那么action赋值一定为submit
action = Option(action).getOrElse(SUBMIT)

1.2 submit(appArgs, uninitLog)

runMain(args, uninitLog)

1.2.1 runMain(args, uninitLog) 删除不重要的log版

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    // (childArgs, childClasspath, sparkConf, childMainClass)
    // childMainClass =》 "org.apache.spark.deploy.yarn.YarnClusterApplication"
    -- prepareSubmitEnvironment(args)
    
    // classForName(childMainClass)
    -- var mainClass: Class[_] = Utils.classForName(childMainClass)
    
    // classOf[SparkApplication].isAssignableFrom(mainClass)
    val app: SparkApplication =
    -- a)mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    -- b)new JavaMainApplication(mainClass)
    
    // "org.apache.spark.deploy.yarn.YarnClusterApplication"
    app.start(childArgs.toArray, sparkConf)
}
1.2.1.1 prepareSubmitEnvironment(args) 删除不重要版
if (isYarnCluster) {
  childMainClass = YARN_CLUSTER_SUBMIT_CLASS
}
1.2.1.2 app.start(childArgs.toArray, sparkConf) 删除不重要版
override def start(args: Array[String], conf: SparkConf): Unit = {
  // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
  // so remove them from sparkConf here for yarn mode.
  conf.remove(JARS)
  conf.remove(FILES)

  // new ClientArguments(args) 解析传过来的参数 其中 --class => userClass = value =>自己执行的那个类
  // private val yarnClient = YarnClient.createYarnClient
    //  YarnClient client = new YarnClientImpl();
      //  protected ApplicationClientProtocol rmClient;  resourceManager的客户端说明这个client 是用来和resourceManager做交互的
  // 对像明白了,接下来看看run里面都是些啥
  new Client(new ClientArguments(args), conf, null).run()
}
1.2.1.2.1 rmClient.run() 删除不重要版
def run(): Unit = {
  this.appId = submitApplication()
}

def submitApplication(): ApplicationId = {

    try {
        //启动了连接
      launcherBackend.connect()
      yarnClient.init(hadoopConf)
      yarnClient.start()
       
       // 从我们的 RM 获取新的应用程序
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      
      // 设置适当的上下文来启动我们的 AM 进程
        // 创建容器
          // commands = JAVA_HOME/bin/java org.apache.spark.deploy.yarn.ApplicationMaster 
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)

      // 最后,提交并监控申请
      yarnClient.submitApplication(appContext)
      launcherBackend.setAppId(appId.toString)
      reportLauncherState(SparkAppHandle.State.SUBMITTED)

      appId
    } catch {
      ...
    }
  }
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6