#!/usr/bin/python # # pbs-spark-submit: Run an Apache Spark "job" (including optionally # starting the Spark services) inside a PBS job. # Copyright 2014, 2015 University of Tennessee # Copyright 2015, 2016 Ohio Supercomputer Center # # License: GNU GPL v2; see ../COPYING for details. # Revision info: # $HeadURL$ # $Revision$ # $Date$ import fcntl import getopt import glob import os import platform import socket import struct import sys import time # # ways to launch workers # class Launcher: def launch(self,cmdline,env,propagate_env=False, prop_env_list=["SPARK_CONF_DIR","SPARK_LOG_DIR","SPARK_LOCAL_DIRS"], tpn=None,worker_on_mother_superior=True): raise NotImplementedError def sleep(self): sleeptime = 5 if ( "PBS_NUM_NODES" in os.environ.keys() ): sleeptime += 2*int(os.environ["PBS_NUM_NODES"]) time.sleep(sleeptime) def env_list(env,prop_env_list): # since we can't rely on ssh_config and sshd_config having # the appropriate SendEnv/AcceptEnv settings argv = [] for var in prop_env_list: if ( var in env.keys() ): argv.append(var+"="+env[var]) return argv def env_string(env,prop_env_list): return " ".join(self.env_list(env,prop_env_list)) class ExecLauncher(Launcher): def launch(self,cmdline,env,propagate_env=False, prop_env_list=["SPARK_CONF_DIR","SPARK_LOG_DIR","SPARK_LOCAL_DIRS"], tpn=None,worker_on_mother_superior=True): time.sleep(1) # sanity check if ( not worker_on_mother_superior ): raise RuntimeError("Cannot use --no-worker-on-mother-superior with Exec launcher") # lots of squick to try to limit the number of cores used on big # SMP/NUMA systems that are likely shared with other users cpuset = None cpusetroot = None cpus = 0 if ( os.path.exists("/proc/self/cpuset") ): cpusetfile = open("/proc/self/cpuset") cpuset = cpusetfile.read().rstrip("\n") cpusetfile.close() if ( os.path.exists("/dev/cpuset") ): cpusetroot = "/dev/cpuset" elif ( os.path.exists("/sys/fs/cgroup/cpuset") ): cpusetroot = "/sys/fs/cgroup/cpuset" if ( cpusetroot is not None and cpuset is not None ): cpusfile = None if ( os.path.exists(cpusetroot+cpuset+"/cpus") ): cpusfile = open(cpusetroot+cpuset+"/cpus") elif ( os.path.exists(cpusetroot+cpuset+"/cpuset.cpus") ): cpusfile = open(cpusetroot+cpuset+"/cpuset.cpus") if ( cpusfile is not None ): allcpus = cpusfile.read() cpusfile.close() for cgroup in allcpus.split(","): cpurange = cgroup.split("-") if ( len(cpurange)==1 ): cpus += 1 elif ( len(cpurange)==2 ): cpus += int(cpurange[1])-int(cpurange[0])+1 if ( cpus==0 and "PBS_NP" in os.environ.keys() ): try: cpus = int(os.environ["PBS_NP"]) except e,Exception: pass if ( cpus>0 ): os.environ["SPARK_WORKER_CORES"] = str(cpus) env["SPARK_WORKER_CORES"] = str(cpus) # need to do the equivalent shenanigans for memory at some point... # base functionality argv = cmdline.split() if ( propagate_env ): for arg in self.env_list(env,prop_arg_list): argv.append(arg) child_pid = os.fork() if ( child_pid==0 ): os.execvpe(argv[0],argv,env) self.sleep() class PBSDSHLauncher(Launcher): def launch(self,cmdline,env,propagate_env=False, prop_env_list=["SPARK_CONF_DIR","SPARK_LOG_DIR","SPARK_LOCAL_DIRS"], tpn=None,worker_on_mother_superior=True): time.sleep(1) cmd = cmdline if ( propagate_env ): cmd = self.env_string(env,prop_env_list)+" "+cmdline if ( tpn is None ): os.system("pbsdsh "+cmdline+" &") else: nodes = nodelist(unique=True) for node in nodes: if ( worker_on_mother_superior or not ( node in platform.node() ) ): for i in range(int(tpn)): os.system("pbsdsh -h "+node+" "+cmdline+" &") self.sleep() class SSHLauncher(Launcher): def launch(self,cmdline,env,propagate_env=False, prop_env_list=["SPARK_CONF_DIR","SPARK_LOG_DIR","SPARK_LOCAL_DIRS"], tpn=None,worker_on_mother_superior=True): time.sleep(1) if ( "PBS_NODEFILE" in os.environ.keys() ): if ( tpn is None ): nodes = nodelist() else: nodes = nodelist(unique=True) for node in nodes: if ( worker_on_mother_superior or not ( node in platform.node() ) ): argv = cmdline.split() ssh = "ssh" if ( "SPARK_SSH" in env.keys() ): ssh=env["SPARK_SSH"] argv.insert(0,ssh) argv.insert(1,node) if ( propagate_env ): for arg in self.env_list(env,prop_env_list): argv.insert(2,arg) sys.stderr.write(" ".join(argv)+"\n") if ( tpn is None ): nforks = 1 else: nforks = int(tpn) for i in range(nforks): child_pid = os.fork() if ( child_pid==0 ): os.execvpe(argv[0],argv,env) self.sleep() else: raise EnvironmentError("PBS_NODEFILE undefined") # # functions to help with PBS node file # def nodelist(unique=False): nodes = [] if ( "PBS_NODEFILE" in os.environ.keys() ): nodefile = open(os.environ["PBS_NODEFILE"]) for line in nodefile.readlines(): node = line.rstrip("\n") if ( not unique or not ( node in nodes ) ): nodes.append(node) return nodes # # functions to help with handling Java properties # def propsToCmdLine(proplist): result = [] for prop in proplist.keys(): result.append("-D"+prop+"=\""+proplist[prop]+"\"") return " ".join(result) def propsFromFile(filename): if ( not os.path.exists(filename) ): raise IOError(filename+" not found") proplist = {} fd = open(filename) for line in fd.readlines(): if ( not line.startswith("#") ): keyval = (line.rstrip("\n")).split("=",1) if ( len(keyval)==2 ): proplist[keyval[0]] = keyval[1] return proplist # # get IP address of network interface # borrowed from http://code.activestate.com/recipes/439094-get-the-ip-address-associated-with-a-network-inter/ # def get_ip_address(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', ifname[:15]) )[20:24]) # # documentation # def usage(): sys.stderr.write("pbs-spark-submit: Run an Apache Spark \"job\" (including optionally\n\tstarting the Spark master and work services) inside a PBS job.\n") sys.stderr.write("\n") sys.stderr.write("Usage: pbs-spark-submit [arguments] [app options]\n") sys.stderr.write("\n") sys.stderr.write("Options:\n") sys.stderr.write("\t--help or -h\n\t\tPrint this help message\n") sys.stderr.write("\t--init\n\t\tInitialize Spark master/worker services (default).\n") sys.stderr.write("\t--no-init\n\t\tDo not initialize Spark master/worker services.\n") sys.stderr.write("\t--exec\n\t\tUse the exec process launcher.\n") sys.stderr.write("\t--pbsdsh\n\t\tUse the pbsdsh process launcher (default).\n") sys.stderr.write("\t--ssh\n\t\tUse the ssh process launcher.\n") sys.stderr.write("\t--worker-on-mother-superior\n\t\tRun a worker on the mother superior node as well as\n\t\tthe driver program (default).\n") sys.stderr.write("\t--no-worker-on-mother-superior or -N\n\t\tDo not run a worker on the mother superior node, only\n\t\tthe driver program.\n") sys.stderr.write("\t--master-interface or -M \n\t\tHave Spark master listen on network interface rather\n\t\tthan the default.\n") sys.stderr.write("\t--conf-dir or -C \n\t\tLook in for Java properties files.\n") sys.stderr.write("\t--log-dir or -L \n\t\tPlace logs in .\n") sys.stderr.write("\t--log4j-properties or -l \n\t\tRead log4j properties from .\n") sys.stderr.write("\t--work-dir or -d \n\t\tUse as Spark program's working directory.\n") sys.stderr.write("\t--memory or -m \n\t\tSet per-worker memory limit.\n") sys.stderr.write("\t--tpn or -t \n\t\tLaunch tasks per node instead of the default from the job.\n") sys.stderr.write("\t--pausetime or -p \n\t\tPause seconds between startup stages (default 5).\n") sys.stderr.write("\t--conf = or -D =\n\t\tSet the Java property to .\n") sys.stderr.write("\t--properties-file or -P \n\t\tRead Java properties from .\n") sys.stderr.write("\t--class \n\t\tApplication's main class (for Java/Scala apps).\n") sys.stderr.write("\t--name \n\t\tThe name of your application.\n") sys.stderr.write("\t--jars \n\t\tComma-separated list of local jars to include on the driver\n\t\tand executor classpaths.\n") sys.stderr.write("\t--packages \n\t\tComma-separated list of maven coordinates of jars to include\n\t\ton the driver and executor classpaths. Will search the local\n\t\tmaven repo, then maven central and any additional remote\n\t\trepositories given by --repositories. The format for the\n\t\tcoordinates should be groupId:artifactId:version.\n") sys.stderr.write("\t--exclude-packages \n\t\tComma-separated list of groupId:artifactId to exclude\n\t\twhile resolving the dependencies provided in --packages\n\t\tto avoid dependency conflicts.\n") sys.stderr.write("\t--repositories \n\t\tComma-separated list of additional remote repositories\n\t\tto search for the maven coordinates given with --packages.\n") sys.stderr.write("\t--py-files \n\t\tComma-separated list of .zip, .egg, or .py files to place\n\t\ton PYTHONPATH for Python apps.\n") sys.stderr.write("\t--files \n\t\tComma-separated list of files to be placed in the\n\t\tworking directory of each executor.\n") sys.stderr.write("\t--driver-memory \n\t\tMemory for driver (e.g. 1000M, 2G; default is 1024M).\n") sys.stderr.write("\t--driver-java-options \n\t\tExtra Java options to pass to the driver.\n") sys.stderr.write("\t--driver-library-path \n\t\tExtra library path entries to pass to the driver.\n") sys.stderr.write("\t--driver-class-path \n\t\tExtra class path entries to pass to the driver. Note\n\t\tthat jars added with --jars are automatically included\n\t\tin the classpath.\n") sys.stderr.write("\t--executor-memory \n\t\tMemory per executor (e.g. 1000M, 2G; default is 1G).\n") sys.stderr.write("\n") sys.stderr.write("Run \"man pbs-spark-submit\" for more details.\n") sys.stderr.write("\n") sys.exit(0) # # main program begins here # # set up default environment init_svcs = True child_args = [] properties = {} pausetime = 5 launcher = PBSDSHLauncher() log4j_props = None tpn = None memlimit = None driver_mem = None driver_java_opts = None driver_lib_path = None driver_class_path = None exec_mem = None classname = None name = None jars = None pkgs = None excl_pkgs = None repos = None pyfiles = None files = None worker_on_mother_superior = True iface = None if ( "SPARK_LAUNCHER" in os.environ.keys() ): if ( os.environ["SPARK_LAUNCHER"] in ("exec","EXEC") ): launcher = ExecLauncher() if ( os.environ["SPARK_LAUNCHER"] in ("pbsdsh","PBSDSH") ): launcher = PBSDSHLauncher() if ( os.environ["SPARK_LAUNCHER"] in ("ssh","SSH") ): launcher = SSHLauncher() if ( not ( "SPARK_CONF_DIR" in os.environ.keys() ) ): os.environ["SPARK_CONF_DIR"] = os.getcwd()+"/conf" if ( not ( "SPARK_LOG_DIR" in os.environ.keys() ) ): os.environ["SPARK_LOG_DIR"] = os.getcwd() # manage scratch directories # **ASSUMPTION**: work directory is on a shared file system workdir = os.getcwd() if ( "SCRATCHDIR" in os.environ.keys() ): workdir = os.environ["SCRATCHDIR"]+"/spark-"+os.environ["PBS_JOBID"] # SPARK_LOCAL_DIRS should be node-local if ( ( "TMPDIR" in os.environ.keys() ) and not ( "SPARK_LOCAL_DIRS" in os.environ.keys() ) ): os.environ["SPARK_LOCAL_DIRS"] = os.environ["TMPDIR"] elif ( not ( "SPARK_LOCAL_DIRS" in os.environ.keys() ) ): os.environ["SPARK_LOCAL_DIRS"] = "/tmp" # command line argument handling try: opts, child_args = getopt.getopt(sys.argv[1:], "C:D:d:hL:l:M:m:NP:p:t:", ["help", "init", "no-init", "exec", "pbsdsh", "ssh", "conf-dir", "log-dir=", "log4j-properties=", "work-dir=", "memory=", "tpn=", "tasks-per-node=", "worker-on-mother-superior", "no-worker-on-mother-superior", "master-interface", "properties-file", "pause-time", "driver-memory=", "driver-java-options=", "driver-library-path", "driver-class-path=", "executor-memory=", "conf=", "class=", "name=", "jars=", "packages=", "exclude-packages=", "repositories=", "py-files=", "files="]) except getopt.GetoptError, err: sys.stderr.write(str(err)+"\n") usage() for opt in opts: if ( opt[0]=="--no-init" ): init_svcs = False elif ( opt[0]=="--init" ): init_svcs = True elif ( opt[0]=="--help" or opt[0]=="-h" ): usage() elif ( opt[0]=="--exec" ): launcher = ExecLauncher() elif ( opt[0]=="--pbsdsh" ): launcher = PBSDSHLauncher() elif ( opt[0]=="--ssh" ): launcher = SSHLauncher() elif ( opt[0]=="--conf-dir" or opt[0]=="-C" ): os.environ["SPARK_CONF_DIR"] = opt[1] elif ( opt[0]=="--log-dir" or opt[0]=="-L" ): os.environ["SPARK_LOG_DIR"] = opt[1] elif ( opt[0]=="--log4j-properties" or opt[0]=="-l" ): log4j_props = opt[1] elif ( opt[0]=="--work-dir" or opt[0]=="-d" ): workdir = opt[1] elif ( opt[0]=="--memory" or opt[0]=="-m" ): memlimit = opt[1] elif ( opt[0]=="--tpn" or opt[0]=="--tasks-per-node" or opt[0]=="-t" ): tpn = opt[1] elif ( opt[0]=="--worker-on-mother-superior" ): worker_on_mother_superior = True elif ( opt[0]=="--no-worker-on-mother-superior" or opt[0]=="-N" ): worker_on_mother_superior = False elif ( opt[0]=="--master-interface" or opt[0]=="-M" ): iface = opt[1] elif ( opt[0]=="-D" or opt[0]=="--conf" ): keyval = opt[1].split("=",1) if ( len(keyval)==2 ): properties[keyval[0]] = keyval[1] else: raise getopt.GetoptError("malformed property \""+opt[1]+"\"") elif ( opt[0]=="--properties-file" or opt[0]=="-P" ): if ( os.path.exists(opt[1]) ): props = propsFromFile(opt[1]) for key in props.keys(): properties[key] = props[key] elif ( opt[0]=="--pause-time" or opt[0]=="-p" ): pausetime = opt[1] elif ( opt[0]=="--class" ): classname = opt[1] elif ( opt[0]=="--name" ): name = opt[1] elif ( opt[0]=="--jars" ): jars = opt[1] elif ( opt[0]=="--packages" ): pkgs = opt[1] elif ( opt[0]=="--exclude-packages" ): excl_pkgs = opt[1] elif ( opt[0]=="--repositories" ): repos = opt[1] elif ( opt[0]=="--py-files" ): pyfiles = opt[1] elif ( opt[0]=="--files" ): files = opt[1] elif ( opt[0]=="--driver-memory" ): driver_mem = opt[1] elif ( opt[0]=="--driver-java-options" ): driver_java_opts = opt[1] elif ( opt[0]=="--driver-library-path" ): driver_lib_path = opt[1] elif ( opt[0]=="--driver-class-path" ): driver_class_path = opt[1] elif ( opt[0]=="--executor-memory" ): exec_mem = opt[1] # environment sanity checks if ( not ( "PBS_JOBID" in os.environ.keys() ) ): raise EnvironmentError("Not in a PBS job") if ( not ( "SPARK_HOME" in os.environ.keys() ) ): raise EnvironmentError("SPARK_HOME not defined") # read any properties files in the conf directory for propfile in glob.glob(os.environ["SPARK_CONF_DIR"]+"/*.properties"): if ( os.path.exists(propfile) ): props = propsFromFile(propfile) for key in props.keys(): if ( not ( key in properties.keys() ) ): properties[key] = props[key] # make sure the work dir actually exists if ( workdir is not None and not os.path.exists(workdir) ): os.mkdir(workdir) # **ASSUMPTION**: master runs on mother superior node os.environ["SPARK_MASTER_IP"] = platform.node() if ( iface is not None ): os.environ["SPARK_MASTER_IP"] = get_ip_address(iface) if ( not ( "SPARK_MASTER_PORT" in os.environ.keys() ) ): os.environ["SPARK_MASTER_PORT"] = "7077" os.environ["SPARK_MASTER"] = "spark://"+os.environ["SPARK_MASTER_IP"]+":"+str(os.environ["SPARK_MASTER_PORT"]) #sys.stderr.write("Spark master = "+os.environ["SPARK_MASTER"]+"\n") if ( init_svcs ): # stick any properties in the appropriate environment variable if ( len(properties)>0 ): if ( "SPARK_DAEMON_JAVA_OPTS" in os.environ.keys() ): os.environ["SPARK_DAEMON_JAVA_OPTS"] += " "+propsToCmdLine(properties) else: os.environ["SPARK_DAEMON_JAVA_OPTS"] = propsToCmdLine(properties) # launch master on mother superior cmdline = os.environ["SPARK_HOME"]+"/sbin/start-master.sh" os.system(cmdline+" &") sys.stderr.write(cmdline+"\n") sys.stdout.write("SPARK_MASTER="+os.environ["SPARK_MASTER"]+"\n") time.sleep(pausetime) # launch workers cmdline = os.environ["SPARK_HOME"]+"/bin/spark-class org.apache.spark.deploy.worker.Worker" if ( memlimit is not None ): cmdline += " --memory "+memlimit if ( workdir is not None ): cmdline += " --work-dir "+workdir cmdline += " "+os.environ["SPARK_MASTER"] sys.stderr.write(cmdline+"\n") launcher.launch(cmdline,os.environ,tpn=tpn, worker_on_mother_superior=worker_on_mother_superior) time.sleep(pausetime) # run the user's Spark "job", if one is given if ( len(child_args)>0 ): cmdline = os.environ["SPARK_HOME"]+"/bin/spark-submit --master "+os.environ["SPARK_MASTER"] if ( classname is not None ): cmdline += " --class "+classname if ( name is not None ): cmdline += " --name "+name if ( jars is not None ): cmdline += " --jars "+jars if ( pkgs is not None ): cmdline += " --packages "+pkgs if ( excl_pkgs is not None ): cmdline += " --exclude-packages "+excl_pkgs if ( repos is not None ): cmdline += " --repositories "+repos if ( pyfiles is not None ): cmdline += " --py-files "+pyfiles if ( files is not None ): cmdline += " --files "+files if ( log4j_props is not None and driver_java_opts is None ): cmdline += " --driver-java-options \"-Dlog4j.configuration=file:"+log4j_props+"\"" elif ( log4j_props is None and driver_java_opts is not None ): cmdline += " --driver-java-options \""+driver_java_opts+"\"" elif ( log4j_props is not None and driver_java_opts is not None ): cmdline += " --driver-java-options \"-Dlog4j.configuration=file:"+log4j_props+" "+driver_java_opts+"\"" if ( driver_mem is not None ): cmdline += " --driver-memory "+driver_mem if ( driver_lib_path is not None ): cmdline += " --driver-library-path "+driver_lib_path if ( driver_class_path is not None ): cmdline += " --driver-class-path "+driver_class_path if ( exec_mem is not None ): cmdline += " --executor-memory "+exec_mem cmdline += " "+" ".join(child_args) os.system(cmdline)