前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun

欢 迎 关 注 我 的 公 众 号

前言最初是想学习一下Spark提交流程的源码,比如 Spark On Yarn 、Standalone。之前只是通过网上总结的文章大概了解整体的提交流程,但是每个文章描述的又不太一样,弄不清楚到底哪个说的准确,比如Client 和 Cluster 模式的区别,Driver到底是干啥的,是如何定义的,为了彻底弄清楚这些疑问,所以决定学习一下相关的源码。因为不管是服务启动还是应用程序启动,都是通过脚本提交的,所以我们先从分析脚本开始。

版本Spark 3.2.3

Spark 脚本先看一下Spark 主要的脚本有哪些:spark-submit、spark-sql、spark-shell、spark-class、start-all.sh、stop-all.sh、start-master.sh、start-workers.sh 等。

spark-sql123456if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-homefiexport _SPARK_CMD_USAGE="Usage: ./bin/spark-sql [options] [cli option]"exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"

通过 spark-submit 提交类 org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver

spark-shell1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677# Shell script for starting the Spark Shell REPLcygwin=falsecase "$(uname)" in CYGWIN*) cygwin=true;;esac# Enter posix mode for bashset -o posixif [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-homefiexport _SPARK_CMD_USAGE="Usage: ./bin/spark-shell [options]Scala REPL options: -I preload , enforcing line-by-line interpretation"# SPARK-4161: scala does not assume use of the java classpath,# so we need to add the "-Dscala.usejavacp=true" flag manually. We# do this specifically for the Spark shell because the scala REPL# has its own class loader, and any additional classpath specified# through spark.driver.extraClassPath is not automatically propagated.SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"function main() { if $cygwin; then # Workaround for issue involving JLine and Cygwin # (see http://sourceforge.net/p/jline/bugs/40/). # If you're using the Mintty terminal emulator in Cygwin, may need to set the # "Backspace sends ^H" setting in "Keys" section of the Mintty options # (see https://github.com/sbt/sbt/issues/562). stty -icanon min 1 -echo > /dev/null 2>&1 export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" fi}# Copy restore-TTY-on-exit functions from Scala script so spark-shell exits properly even in# binary distribution of Spark where Scala is not installedexit_status=127saved_stty=""# restore stty settings (echo in particular)function restoreSttySettings() { stty $saved_stty saved_stty=""}function onExit() { if [[ "$saved_stty" != "" ]]; then restoreSttySettings fi exit $exit_status}# to reenable echo if we are interrupted before completing.trap onExit INT# save terminal settingssaved_stty=$(stty -g 2>/dev/null)# clear on error so we don't later try to restore themif [[ ! $? ]]; then saved_stty=""fimain "$@"# record the exit status lest it be overwritten:# then reenable echo and propagate the code.exit_status=$?onExit

这里的主要逻辑也是用 spark-submit 提交类 org.apache.spark.repl.Main

spark-submit根据上面的分析:spark-sql 和 spark-shell 两个交互式命令行脚本都是通过 spark-submit –class ClassName 来实现的。

12345678if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-homefi# disable randomized hash for string in Python 3.3+export PYTHONHASHSEED=0exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

逻辑比较清晰:通过 spark-class 提交 org.apache.spark.deploy.SparkSubmit具体到 spark-sql 和 spark-shell 分别为:

12/opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver/opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name Spark shell

start-all.sh功能:启动 standalone 所有服务。相关配置可参考 Spark Standalone 集群配置

1234567891011121314151617# Start all spark daemons.# Starts the master on this node.# Starts a worker on each node specified in conf/workersif [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi# Load the Spark configuration. "${SPARK_HOME}/sbin/spark-config.sh"# Start Master"${SPARK_HOME}/sbin"/start-master.sh# Start Workers"${SPARK_HOME}/sbin"/start-workers.sh

主要逻辑:start-master.sh 启动 master、start-workers.sh 启动所有worker

start-master.sh1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950# Starts the master on the machine this script is executed on.if [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi# NOTE: This exact class name is matched downstream by SparkSubmit.# Any changes need to be reflected there.CLASS="org.apache.spark.deploy.master.Master"if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then echo "Usage: ./sbin/start-master.sh [options]" pattern="Usage:" pattern+="\|Using Spark's default log4j profile:" pattern+="\|Started daemon with process name" pattern+="\|Registered signal handler for" "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2 exit 1fiORIGINAL_ARGS="$@". "${SPARK_HOME}/sbin/spark-config.sh". "${SPARK_HOME}/bin/load-spark-env.sh"if [ "$SPARK_MASTER_PORT" = "" ]; then SPARK_MASTER_PORT=7077fiif [ "$SPARK_MASTER_HOST" = "" ]; then case `uname` in (SunOS) SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`" ;; (*) SPARK_MASTER_HOST="`hostname -f`" ;; esacfiif [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then SPARK_MASTER_WEBUI_PORT=8080fi"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \ --host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \ $ORIGINAL_ARGS

主要逻辑:由 spark-daemon.sh 启动类 org.apache.spark.deploy.master.Master

具体的参数:/opt/dkl/spark-3.2.3-bin-hadoop3.2/sbin/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 –host indata-10-110-8-199.indata.com –port 7077 –webui-port 8080

start-workers.sh12345678910111213141516171819202122232425if [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi. "${SPARK_HOME}/sbin/spark-config.sh". "${SPARK_HOME}/bin/load-spark-env.sh"# Find the port number for the masterif [ "$SPARK_MASTER_PORT" = "" ]; then SPARK_MASTER_PORT=7077fiif [ "$SPARK_MASTER_HOST" = "" ]; then case `uname` in (SunOS) SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`" ;; (*) SPARK_MASTER_HOST="`hostname -f`" ;; esacfi# Launch the workers"${SPARK_HOME}/sbin/workers.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-worker.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"

配置host和端口,然后调用 workers.sh 参数是 cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-worker.sh” “spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT“具体的参数:cd /opt/dkl/spark-3.2.3-bin-hadoop3.2 ; /opt/dkl/spark-3.2.3-bin-hadoop3.2/sbin/start-worker.sh spark://indata-10-110-8-199.indata.com:7077

workers.sh123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103# Run a shell command on all worker hosts.## Environment Variables## SPARK_WORKERS File naming remote hosts.# Default is ${SPARK_CONF_DIR}/workers.# SPARK_CONF_DIR Alternate conf dir. Default is ${SPARK_HOME}/conf.# SPARK_WORKER_SLEEP Seconds to sleep between spawning remote commands.# SPARK_SSH_OPTS Options passed to ssh when running remote commands.##usage="Usage: workers.sh [--config ] command..."# if no args specified, show usageif [ $# -le 0 ]; then echo $usage exit 1fiif [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi. "${SPARK_HOME}/sbin/spark-config.sh"# If the workers file is specified in the command line,# then it takes precedence over the definition in# spark-env.sh. Save it here.if [ -f "$SPARK_WORKERS" ]; then HOSTLIST=`cat "$SPARK_WORKERS"`fiif [ -f "$SPARK_SLAVES" ]; then >&2 echo "SPARK_SLAVES is deprecated, use SPARK_WORKERS" HOSTLIST=`cat "$SPARK_SLAVES"`fi# Check if --config is passed as an argument. It is an optional parameter.# Exit if the argument is not a directory.if [ "$1" == "--config" ]then shift conf_dir="$1" if [ ! -d "$conf_dir" ] then echo "ERROR : $conf_dir is not a directory" echo $usage exit 1 else export SPARK_CONF_DIR="$conf_dir" fi shiftfi. "${SPARK_HOME}/bin/load-spark-env.sh"if [ "$HOSTLIST" = "" ]; then if [ "$SPARK_SLAVES" = "" ] && [ "$SPARK_WORKERS" = "" ]; then if [ -f "${SPARK_CONF_DIR}/workers" ]; then HOSTLIST=`cat "${SPARK_CONF_DIR}/workers"` elif [ -f "${SPARK_CONF_DIR}/slaves" ]; then HOSTLIST=`cat "${SPARK_CONF_DIR}/slaves"` else HOSTLIST=localhost fi else if [ -f "$SPARK_WORKERS" ]; then HOSTLIST=`cat "$SPARK_WORKERS"` fi if [ -f "$SPARK_SLAVES" ]; then >&2 echo "SPARK_SLAVES is deprecated, use SPARK_WORKERS" HOSTLIST=`cat "$SPARK_SLAVES"` fi fifi# By default disable strict host key checkingif [ "$SPARK_SSH_OPTS" = "" ]; then SPARK_SSH_OPTS="-o StrictHostKeyChecking=no"fi# 这里通过sed 将host # 后面的删除# 遍历 HOSTLIST ,ssh 到每个host节点,执行 start-workers.sh 中的参数# 备注:$@:传递给脚本或函数的所有参数for host in `echo "$HOSTLIST"|sed "s/#.*$//;/^$/d"`; do if [ -n "${SPARK_SSH_FOREGROUND}" ]; then ssh $SPARK_SSH_OPTS "$host" $"${@// /\\ }" \ 2>&1 | sed "s/^/$host: /" else ssh $SPARK_SSH_OPTS "$host" $"${@// /\\ }" \ 2>&1 | sed "s/^/$host: /" & fi if [ "$SPARK_WORKER_SLEEP" != "" ]; then sleep $SPARK_WORKER_SLEEP fi if [ "$SPARK_SLAVE_SLEEP" != "" ]; then >&2 echo "SPARK_SLAVE_SLEEP is deprecated, use SPARK_WORKER_SLEEP" sleep $SPARK_SLAVE_SLEEP fidonewait

主要逻辑:

先获取 HOSTLIST,优先级 $SPARK_WORKERS、$SPARK_SLAVES、${SPARK_CONF_DIR}/workers、${SPARK_CONF_DIR}/slaves,一般我们在 conf/workers (Spark3 默认) 或者 conf/slaves (Spark2 默认) 里配置 worker的 ip 或者hostname,如果没有配置,则默认 localhost

获取 SPARK_SSH_OPTS ,默认 “-o StrictHostKeyChecking=no” ,如果有特殊需求,如端口号不是默认的 22,则可以在 spark-env.sh 中添加 export SPARK_SSH_OPTS=”-p 6233 -o StrictHostKeyChecking=no”

遍历 HOSTLIST , ssh 到每个host节点,执行上面 start-workers.sh 中的参数 cd “${SPARK_HOME}“ \; “${SPARK_HOME}/sbin/start-worker.sh” “spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT“。备注:$@:传递给脚本或函数的所有参数

start-worker.sh12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273# Starts a worker on the machine this script is executed on.## Environment Variables## SPARK_WORKER_INSTANCES The number of worker instances to run on this# worker. Default is 1. Note it has been deprecate since Spark 3.0.# SPARK_WORKER_PORT The base port number for the first worker. If set,# subsequent workers will increment this number. If# unset, Spark will find a valid port number, but# with no guarantee of a predictable pattern.# SPARK_WORKER_WEBUI_PORT The base port for the web interface of the first# worker. Subsequent workers will increment this# number. Default is 8081.if [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi# NOTE: This exact class name is matched downstream by SparkSubmit.# Any changes need to be reflected there.CLASS="org.apache.spark.deploy.worker.Worker"if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then echo "Usage: ./sbin/start-worker.sh [options]" pattern="Usage:" pattern+="\|Using Spark's default log4j profile:" pattern+="\|Started daemon with process name" pattern+="\|Registered signal handler for" "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2 exit 1fi. "${SPARK_HOME}/sbin/spark-config.sh". "${SPARK_HOME}/bin/load-spark-env.sh"# First argument should be the master; we need to store it aside because we may# need to insert arguments between it and the other argumentsMASTER=$1shift# Determine desired worker portif [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then SPARK_WORKER_WEBUI_PORT=8081fi# Start up the appropriate number of workers on this machine.# quick local function to start a workerfunction start_instance { WORKER_NUM=$1 shift if [ "$SPARK_WORKER_PORT" = "" ]; then PORT_FLAG= PORT_NUM= else PORT_FLAG="--port" PORT_NUM=$(( $SPARK_WORKER_PORT + $WORKER_NUM - 1 )) fi WEBUI_PORT=$(( $SPARK_WORKER_WEBUI_PORT + $WORKER_NUM - 1 )) "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \ --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"}if [ "$SPARK_WORKER_INSTANCES" = "" ]; then start_instance 1 "$@"else for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do start_instance $(( 1 + $i )) "$@" donefi

主要逻辑:

获取 MASTER ,这里为 spark://indata-10-110-8-199.indata.com:7077

判断 SPARK_WORKER_INSTANCES 是否为空,默认为空,也就是默认一个 Worker 实例

调用 start_instance 1 “$@“ ,主要逻辑计算每个示例的 PORT_NUM 和 WEBUI_PORT ,最后执行 “${SPARK_HOME}/sbin”/spark-daemon.sh start $CLASS $WORKER_NUM –webui-port “$WEBUI_PORT“ $PORT_FLAG $PORT_NUM $MASTER “$@“

具体的参数:/opt/dkl/spark-3.2.3-bin-hadoop3.2/sbin/spark-daemon.sh start org.apache.spark.deploy.worker.Worker 1 –webui-port 8081 spark://indata-10-110-8-199.indata.com:7077

spark-daemon.sh根据上面的分析:master 和 worker 都是通过 spark-daemon.sh 来启动的。

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265# Runs a Spark command as a daemon.## Environment Variables## SPARK_CONF_DIR Alternate conf dir. Default is ${SPARK_HOME}/conf.# SPARK_LOG_DIR Where log files are stored. ${SPARK_HOME}/logs by default.# SPARK_LOG_MAX_FILES Max log files of Spark daemons can rotate to. Default is 5.# SPARK_MASTER host:path where spark code should be rsync'd from# SPARK_PID_DIR The pid files are stored. /tmp by default.# SPARK_IDENT_STRING A string representing this instance of spark. $USER by default# SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.# SPARK_NO_DAEMONIZE If set, will run the proposed command in the foreground. It will not output a PID file.##usage="Usage: spark-daemon.sh [--config ] (start|stop|submit|status) "# if no args specified, show usageif [ $# -le 1 ]; then echo $usage exit 1fiif [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi. "${SPARK_HOME}/sbin/spark-config.sh"# get arguments# Check if --config is passed as an argument. It is an optional parameter.# Exit if the argument is not a directory.# 判断有没有参数 --config ,如果有,则配置 SPARK_CONF_DIR 等于参数值,默认 ${SPARK_HOME}/confif [ "$1" == "--config" ]then shift conf_dir="$1" if [ ! -d "$conf_dir" ] then echo "ERROR : $conf_dir is not a directory" echo $usage exit 1 else export SPARK_CONF_DIR="$conf_dir" fi shiftfi# 获取 option ,可选值 start|stop|submit|status ,start-master 和 start-worker 对应的都为 startoption=$1shift# 获取 command , 这里的值对应具体的 class , start-master 对应 org.apache.spark.deploy.master.Master# start-worker 对应 org.apache.spark.deploy.worker.Workercommand=$1shift# 获取 instance , 这里的值均为 1 , 注意这里的 instance 是前面传过来的参数,实例数,为了给后面的 log、pid用。instance=$1shiftspark_rotate_log (){ log=$1; if [[ -z ${SPARK_LOG_MAX_FILES} ]]; then num=5 elif [[ ${SPARK_LOG_MAX_FILES} -gt 0 ]]; then num=${SPARK_LOG_MAX_FILES} else echo "Error: SPARK_LOG_MAX_FILES must be a positive number, but got ${SPARK_LOG_MAX_FILES}" exit -1 fi if [ -f "$log" ]; then # rotate logs while [ $num -gt 1 ]; do prev=`expr $num - 1` [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num" num=$prev done mv "$log" "$log.$num"; fi}. "${SPARK_HOME}/bin/load-spark-env.sh"if [ "$SPARK_IDENT_STRING" = "" ]; then export SPARK_IDENT_STRING="$USER"fiexport SPARK_PRINT_LAUNCH_COMMAND="1"# get log directoryif [ "$SPARK_LOG_DIR" = "" ]; then export SPARK_LOG_DIR="${SPARK_HOME}/logs"fimkdir -p "$SPARK_LOG_DIR"touch "$SPARK_LOG_DIR"/.spark_test > /dev/null 2>&1TEST_LOG_DIR=$?if [ "${TEST_LOG_DIR}" = "0" ]; then rm -f "$SPARK_LOG_DIR"/.spark_testelse chown "$SPARK_IDENT_STRING" "$SPARK_LOG_DIR"fiif [ "$SPARK_PID_DIR" = "" ]; then SPARK_PID_DIR=/tmpfi# some variables# 配置 log、pid 的文件路径和文件名 。log 默认路径 ${SPARK_HOME}/logs , pid 默认路径 /tmplog="$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out"pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid"# Set default scheduling priority# 设置 SPARK_NICENESS :守护进程的调度优先级,如果 SPARK_NICENESS 为空,则设置默认值为0# 注意这里的 SPARK_NICENESS 与前面的 instance 不同if [ "$SPARK_NICENESS" = "" ]; then export SPARK_NICENESS=0fiexecute_command() { # -z 判断 ${SPARK_NO_DAEMONIZE+set} 是否为空,这里为空 if [ -z ${SPARK_NO_DAEMONIZE+set} ]; then # 这里主要逻辑是执行 $@ 并将日志输出到对应的日志文件中, $@ : 所有脚本参数的内容 # 实际是通过 nice -n 0 设置进程优先级,然后通过 spark-class 启动对应的 Master 和 Worker 类 nohup -- "$@" >> $log 2>&1 < /dev/null & # 将上面返回的进程号赋给 newpid($! :Shell最后运行的后台Process的PID) newpid="$!" # 然后将 newpid 写到对应的 pid 文件中。 echo "$newpid" > "$pid" # Poll for up to 5 seconds for the java process to start # for 循环 1到10,轮询最多5秒,以启动java进程 for i in {1..10} do # 每次判断 newpid 对应的java 进程是否启动成功 if [[ $(ps -p "$newpid" -o comm=) =~ "java" ]]; then # 如果启动成功则终止循环 break fi # 否则sleep 0.5 ,继续下次循环 sleep 0.5 done # 启动成功后,sleep 2秒 sleep 2 # Check if the process has died; in that case we'll tail the log so the user can see # 判断对应的java进程是否还存在,如果不存在,则提示启动失败,并打印对应的日志 if [[ ! $(ps -p "$newpid" -o comm=) =~ "java" ]]; then echo "failed to launch: $@" tail -10 "$log" | sed 's/^/ /' echo "full log in $log" fi else "$@" fi}run_command() { # 先获取 mode ,这里 mode 为 class , mode="$1" shift # 创建 pid 文件夹 mkdir -p "$SPARK_PID_DIR" # 判断 pid 文件是否存在 if [ -f "$pid" ]; then # 如果存在,则获取pid值 TARGET_ID="$(cat "$pid")" # 判断是否存在对应的java进程 if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then # 如果存在,则提示服务已经运行,先停止它 echo "$command running as process $TARGET_ID. Stop it first." exit 1 fi fi if [ "$SPARK_MASTER" != "" ]; then echo rsync from "$SPARK_MASTER" rsync -a -e ssh --delete --exclude=.svn --exclude='logs/*' --exclude='contrib/hod/logs/*' "$SPARK_MASTER/" "${SPARK_HOME}" fi spark_rotate_log "$log" echo "starting $command, logging to $log" # 匹配 mode 值 case "$mode" in # 如果是class (class) # nice -n "$SPARK_NICENESS" 是设置进程优先级,范围通常从 -20(最高优先级)到 +19(最低优先级)。默认的 nice 值是 0。 # 可参考 https://www.cnblogs.com/yinguojin/p/18600924 execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class "$command" "$@" ;; (submit) execute_command nice -n "$SPARK_NICENESS" bash "${SPARK_HOME}"/bin/spark-submit --class "$command" "$@" ;; (*) echo "unknown mode: $mode" exit 1 ;; esac}# 匹配 option ,case $option in # 如果为 submit ,执行 run_command submit "$@" (submit) run_command submit "$@" ;; # 如果为 start, 执行 run_command class "$@" (start) run_command class "$@" ;; (stop) if [ -f $pid ]; then TARGET_ID="$(cat "$pid")" if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then echo "stopping $command" kill "$TARGET_ID" && rm -f "$pid" else echo "no $command to stop" fi else echo "no $command to stop" fi ;; (decommission) if [ -f $pid ]; then TARGET_ID="$(cat "$pid")" if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then echo "decommissioning $command" kill -s SIGPWR "$TARGET_ID" else echo "no $command to decommission" fi else echo "no $command to decommission" fi ;; (status) if [ -f $pid ]; then TARGET_ID="$(cat "$pid")" if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then echo $command is running. exit 0 else echo $pid file is present but $command not running exit 1 fi else echo $command not running. exit 2 fi ;; (*) echo $usage exit 1 ;;esac

主要逻辑:

判断有没有参数 –config ,如果有,则配置 SPARK_CONF_DIR 等于参数值,默认 ${SPARK_HOME}/conf

获取 option ,可选值 start|stop|submit|status ,start-master 和 start-worker 对应的都为 start

获取 command , 这里的值对应具体的 class , start-master 对应 org.apache.spark.deploy.master.Master ,start-worker 对应 org.apache.spark.deploy.worker.Worker

获取 instance , 这里的值均为 1 ,实例数,为了给后面的 log、pid用。

配置 log、pid 的文件路径和文件名 。log 默认路径 ${SPARK_HOME}/logs , pid 默认路径 /tmp

设置 SPARK_NICENESS :守护进程的调度优先级,如果 SPARK_NICENESS 为空,则设置默认值为0 。

匹配 option ,如果为 submit ,执行 run_command submit “$@“ ;如果为 start, 执行 run_command class “$@“ ; …… ,这里只看 start

run_command 逻辑:

先获取 mode ,这里 mode 为 class ,

创建 pid 文件夹, 判断 pid 文件是否存在,如果存在,则获取pid值并判断是否存在对应的java进程,如果存在,则提示服务已经运行,先停止它

如果不存在,则匹配 mode 值,如果是class ,则执行 execute_command nice -n “$SPARK_NICENESS“ “${SPARK_HOME}“/bin/spark-class “$command“ “$@“

execute_command 是这里自定义的函数, nice -n “$SPARK_NICENESS” 是设置进程优先级,范围通常从 -20(最高优先级)到 +19(最低优先级)。默认的 nice 值是 0。可参考 https://www.cnblogs.com/yinguojin/p/18600924

execute_command 逻辑 :

-z 判断 ${SPARK_NO_DAEMONIZE+set} 是否为空,这里为空

执行 nohup – “$@“ >> $log 2>&1 < /dev/null & ,这里主要逻辑是执行 $@ 并将日志输出到对应的日志文件中, $@ : 所有脚本参数的内容, 这里为 : nice -n 0 /opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.master.Master –host indata-10-110-8-199.indata.com –port 7077 –webui-port 8080 和 nice -n 0 /opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.worker.Worker –webui-port 8081 spark://indata-10-110-8-199.indata.com:7077实际是通过 nice -n 0 设置进程优先级,然后通过 spark-class 启动对应的 Master 和 Worker 类

将上面返回的进程号赋给 newpid ,然后将 newpid 写到对应的 pid 文件中。($! :Shell最后运行的后台Process的PID)

for 循环 1到10,每次判断 newpid 对应的java 进程是否启动成功,如果启动成功则终止循环,否则sleep 0.5 ,继续下次循环,也就是轮询最多5秒,以启动java进程

启动成功后,sleep 2秒,然后判断对应的java进程是否还存在,如果不存在,则提示启动失败,并打印对应的日志

spark-class通过上面的分析可知:spark-sql、spark-shell、Master 和 Worker 的启动最终都是通过 spark-class 启动的,具体分别为:

1234/opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver/opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name Spark shellnice -n 0 /opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.master.Master --host indata-10-110-8-199.indata.com --port 7077 --webui-port 8080nice -n 0 /opt/dkl/spark-3.2.3-bin-hadoop3.2/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://indata-10-110-8-199.indata.com:7077

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-homefi. "${SPARK_HOME}"/bin/load-spark-env.sh# Find the java binary# 找Java环境变量,如果有,则拼接 ${JAVA_HOME}/bin/javaif [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java"else if [ "$(command -v java)" ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fifi# Find Spark jars.# 找 Spark jarsif [ -d "${SPARK_HOME}/jars" ]; then SPARK_JARS_DIR="${SPARK_HOME}/jars"else SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"fiif [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2 echo "You need to build Spark with the target \"package\" before running this program." 1>&2 exit 1else LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"fi# Add the launcher build dir to the classpath if requested.if [ -n "$SPARK_PREPEND_CLASSES" ]; then LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"fi# For testsif [[ -n "$SPARK_TESTING" ]]; then unset YARN_CONF_DIR unset HADOOP_CONF_DIRfi# The launcher library will print arguments separated by a NULL character, to allow arguments with# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating# an array that will be used to exec the final command.## The exit code of the launcher is appended to the output, so the parent shell removes it from the# command array and checks the value to see if the launcher succeeded.build_command() { # 通过java -cp 提交 org.apache.spark.launcher.Main "$@" , 该类会打印对应的命令 # 具体打印 : 先打印'\0'并换行,然后打印拼接的命令,每个命令后跟'\0',不换行。 "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" # 打印上面的命令的退出状态码并后跟一个\0,没有换行 printf "%d\0" $?}# Turn off posix mode since it does not allow process substitutionset +o posix# 定义CMD数组CMD=()# -d : 输入结束符,分隔符DELIM=$'\n'# CMD 开始标志,默认falseCMD_START_FLAG="false"# -r : 反斜杠转义不会生效,意味着行末的’\’成为有效的字符,例如使 \n 成为有效字符而不是换行# 执行 build_command "$@",并解析其输出结果# Bash read 命令可以参考 https://blog.csdn.net/lingeio/article/details/96587362# 先解析完第一行,然后将 CMD_START_FLAG 设置为 true,开始拼接 CMD ,后以 '' 为分隔符分割具体命令,放到CMD数组中。while IFS= read -d "$DELIM" -r ARG; do # 当 CMD_START_FLAG = true , 拼接 cmd 命令 if [ "$CMD_START_FLAG" == "true" ]; then CMD+=("$ARG") else # 开始以 $'\n' 为结束符, ARG = $'\0' if [ "$ARG" == $'\0' ]; then # After NULL character is consumed, change the delimiter and consume command string. # 修改 DELIM='' ,并将CMD_START_FLAG设置为true,意味着开始拼接 cmd 命令 DELIM='' CMD_START_FLAG="true" elif [ "$ARG" != "" ]; then echo "$ARG" fi fidone < <(build_command "$@")# CMD数组长度COUNT=${#CMD[@]}LAST=$((COUNT - 1))# 获取CMD最后一个值作为 LAUNCHER_EXIT_CODELAUNCHER_EXIT_CODE=${CMD[$LAST]}# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes# the code that parses the output of the launcher to get confused. In those cases, check if the# exit code is an integer, and if it's not, handle it as a special error case.# 检查 LAUNCHER_EXIT_CODE 是否正常,如果不正常,则进行相应处理并退出if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then echo "${CMD[@]}" | head -n-1 1>&2 exit 1fiif [ $LAUNCHER_EXIT_CODE != 0 ]; then exit $LAUNCHER_EXIT_CODEfi# 代表从下标0开始往后取 LAST 个值# 数组用法可参考 https://blog.csdn.net/trayvontang/article/details/143440654CMD=("${CMD[@]:0:$LAST}")# 如果正常,则执行 org.apache.spark.launcher.Main 返回的命令exec "${CMD[@]}"

主要逻辑:

找Java环境变量,如果有,则拼接 ${JAVA_HOME}/bin/java

执行 build_command “$@“ ,并打印输出结果。对应命令:”$RUNNER“ -Xmx128m $SPARK_LAUNCHER_OPTS -cp “$LAUNCH_CLASSPATH“ org.apache.spark.launcher.Main “$@“ 具体为:

spark-sql 对应命令 :/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -Xmx128m -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit –class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver

spark-shell 对应命令 :/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -Xmx128m -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit –class org.apache.spark.repl.Main –name Spark shell

start-master 对应命令 :/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -Xmx128m -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.master.Master –host indata-10-110-8-199.indata.com –port 7077 –webui-port 8080

start-worer 对应命令 : /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -Xmx128m -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.worker.Worker –webui-port 8081 spark://indata-10-110-8-199.indata.com:7077

org.apache.spark.launcher.Main 主要逻辑 : 根据传入的参数,拼接命令并打印123456System.out.println('\0');List bashCmd = prepareBashCommand(cmd, env);for (String c : bashCmd) { System.out.print(c); System.out.print('\0');}

最后打印代码如上,先打印’\0’并换行,然后打印拼接的命令,每个命令后跟’\0’,不换行。最后打印的命令分别为:

spark-sql :

/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java-cp/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*-Xmx1gorg.apache.spark.deploy.SparkSubmit–classorg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriverspark-internal一共两行,第一行是空字符串 \0’表示字符串结束符,第二行是具体拼接的命令,每一个命令后跟 ‘\0’ ,因为是空所以没有空格分隔开,第二行最后没有跟换行。后面的命令第一行一样,所以只记录第二行

spark-shell :/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java-cp/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*-Dscala.usejavacp=true-Xmx1gorg.apache.spark.deploy.SparkSubmit–classorg.apache.spark.repl.Main–nameSpark shellspark-shell

start-master :/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java-cp/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*-Xmx1gorg.apache.spark.deploy.master.Master–hostindata-10-110-8-199.indata.com–port7077–webui-port8080

start-worker :/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java-cp/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*-Xmx1gorg.apache.spark.deploy.worker.Worker–webui-port8081spark://indata-10-110-8-199.indata.com:7077

build_command 最后执行 printf “%d\0” $? ,这和脚本含义是打印 $? 后面再跟 \0 ,$? 是一个特殊的变量,用于获取上一个命令的退出状态码

0:命令成功执行

0以外的数字:命令执行失败。

1:通用错误(General error), 发生了一个通用的错误,但没有提供具体的错误信息。

2:误用shell内置命令(Misuse of shell built-ins)

126:命令不可执行(Command invoked cannot execute)

127:未找到命令(Command not found)

所以build_command最终输出:以spark-sql为例:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java-cp/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*-Xmx1gorg.apache.spark.deploy.SparkSubmit–classorg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriverspark-internal0\0 输出到日志文件中,以vi命令看,会显示^@,下面是在日志文件中vi查看的效果:^@/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java^@-cp^@/opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/*``^@-Xmx1g^@org.apache.spark.deploy.SparkSubmit^@–class^@org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver^@spark-internal^@0^@这样更能方便的理解 build_command 的输出是啥样的,方便后面的脚本分析,然后我们将 ^@ 换成空格:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* -Xmx1g org.apache.spark.deploy.SparkSubmit –class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver spark-internal 0

最后解释一下 printf “%d\0” $? %d 代表数字,该行脚本的含义是将上一个命令的退出状态码打印并后跟一个\0,该命令没有换行,另外测试发现 %d 如果后面没有跟具体数字则默认值为0,可以通过 printf “\0” > test.log , 然后 vi test.log 查看 \0 在文件中会显示 ^@ ,但是cat test.log 则会显示空字符串。

后面 while IFS= read -d “$DELIM“ -r ARG; 是通过 read 命令读取 build_command “$@“ 输出的结果,Bash read 命令可以参考 https://blog.csdn.net/lingeio/article/details/96587362 。 主要逻辑是读取第2步中输出的结果,解析对应的命令并放到 CMD 数组中,首先解析完第一行,然后将 CMD_START_FLAG 设置为 true开始拼接 CMD ,后以 ‘’ 为分隔符分割具体命令,放到CMD数组中。

组装好CMD数组后,先取CMD数组长度,获取CMD最后一个值作为 LAUNCHER_EXIT_CODE,即为在 build_command 中 执行 “$RUNNER“ -Xmx128m $SPARK_LAUNCHER_OPTS -cp “$LAUNCH_CLASSPATH“ org.apache.spark.launcher.Main “$@“ 的命令退出状态码。

检查 LAUNCHER_EXIT_CODE 是否正常,如果不正常,则进行相应处理并退出,如果正常,则执行 org.apache.spark.launcher.Main 返回的命令 。

小结通过本文上面的简单分析可知,无论是 spark-sql 、 spark-shell 这种交互式命令行,还是 Master 和 Worker 等Stanalone服务的启动,最终都是通过 spark-class 启动的。而 spark-class 的逻辑则是先通过 java -cp 执行类 org.apache.spark.launcher.Main,然后将拼接好的启动命令打印输出,最终在 spark-class 中解析输出的命令并执行,最终也都是通过 java -cp 执行具体的类的,分别如下:

spark-sql : /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* -Xmx1g org.apache.spark.deploy.SparkSubmit –class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver spark-internal

spark-shell : /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* -Dscala.usejavacp=true -Xmx1g org.apache.spark.deploy.SparkSubmit –class org.apache.spark.repl.Main –name Spark shell spark-shell

start-master : /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/* -Xmx1g org.apache.spark.deploy.master.Master –host indata-10-110-8-199.indata.com –port 7077 –webui-port 8080

start-worker : /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/ -Xmx1g org.apache.spark.deploy.worker.Worker –webui-port 8081 spark://indata-10-110-8-199.indata.com:7077当然我们提交程序代码jar也是一样的,比如 spark-submit –master local –class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.12-3.2.3.jar , 对应到 spark-class 的提交命令为 :/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/bin/java -cp /opt/dkl/spark-3.2.3-bin-hadoop3.2/conf/:/opt/dkl/spark-3.2.3-bin-hadoop3.2/jars/ -Xmx1g org.apache.spark.deploy.SparkSubmit –master local –class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.12-3.2.3.jar

进一步总结发现,关于服务类的启动都是直接通过 java -cp 提交具体的类,其他的交互式命令行、jar 则是先通过 java -cp 提交 org.apache.spark.deploy.SparkSubmit ,最终具体执行的类则通过 –class 作为参数提交。那么下次我们先分析 org.apache.spark.deploy.SparkSubmit,看看最终真正的 class 是怎么提交运行的。

感谢您的支持!

支付宝

微信

本文由 董可伦 发表于 伦少的博客 ,采用署名-非商业性使用-禁止演绎 3.0进行许可。

非商业转载请注明作者及出处。商业转载请联系作者本人。

本文标题:Spark 源码 | 脚本分析总结

本文链接:https://dongkelun.com/2025/02/10/spark/sparkSourceCode-script/

欢迎关注我的公众号