#!/usr/bin/python # # jobarray-to-pcp: Run the equivalent of a TORQUE job array using # parallel-command-processor. # Copyright 2016, Ohio Supercomputer Center # # License: GNU GPL v2; see ../COPYING for details. # Revision info: # $HeadURL$ # $Revision$ # $Date$ import getopt import os import re import subprocess import sys class request(): def __init__(self): self.values = {} self.set('shell',"/bin/bash") if ( "SHELL" in os.environ.keys() ): self.set('shell',os.environ["SHELL"]) self.set('qsub_args',[]) self.set('ranges',[]) self.set('jobname',None) self.set('outfile',None) self.set('errfile',None) self.set('join',None) self.set('pbs_header',"PBS") self.set('tpn',None) self.set('test_mode',False) def set(self,key,value): self.values[key] = value def get(self,key): if ( key not in self.values.keys() ): raise ValueError("Unknown key '"+key+"'") return self.values[key] def keys(self): return self.values.keys() def add_range(self,thisrange): self.values['ranges'].append(thisrange) def add_qsub_arg(self,arg): self.values['qsub_args'].append(arg) def merge(self,request2): # copy unique values from request2 for key in request2.values.keys(): if ( key not in self.values.keys() or self.get(key) is None ): self.set(key,request2.get(key)) # merge the qsub_args values, starting with those from request2, # as the later values will take precedence merged_args = [] for arg in request2.values['qsub_args']: merged_args.append(arg) for arg in self.values['qsub_args']: merged_args.append(arg) self.set('qsub_args',merged_args) def usage(): sys.stderr.write("jobarray-to-pcp:\tRun the equivalent of a TORQUE job array using\n\t\t\tparallel-command-processor (PCP).\n\n") sys.stderr.write("Usage: jobarray-to-pcp [args] [qsub args] \n\n") sys.stderr.write("Arguments:\n") sys.stderr.write("\t--help\t\t\t\tPrint this help message.\n") sys.stderr.write("\t--tpn N, --tasks-per-node=N\tRun N instances of PCP per node.\n\t\t\t\t\t(default is 1 per core)\n") sys.stderr.write("\t--test\t\t\t\tPrint job script to stdout rather than\n\t\t\t\t\tsubmit it.\n") sys.exit(1) def parse_args(arglist,request): # command line/script argument processing try: opts, args = getopt.getopt(arglist, "A:C:IM:N:P:S:T:VW:Xa:b:c:d:e:fhj:k:l:m:no:p:q:r:t:u:v:w:xz", ["help","tasks-per-node=","test","tpn="]) except getopt.GetoptError, err: sys.stderr.write(str(err)+"\n") usage() for opt in opts: if ( opt[0]=="-C" ): request.set('pbs_header',opt[1]) request.add_qsub_arg(opt[0]) request.add_qsub_arg(opt[1]) elif ( opt[0]=="-N" ): request.set('jobname',opt[1]) request.add_qsub_arg(opt[0]) request.add_qsub_arg(opt[1]) elif ( opt[0]=="-e" ): request.set('errfile',opt[1]) request.add_qsub_arg(opt[0]) request.add_qsub_arg(opt[1]) elif ( opt[0]=="-j" ): request.set("join",opt[1]) request.add_qsub_arg(opt[0]) request.add_qsub_arg(opt[1]) elif ( opt[0]=="-o" ): request.set('outfile',opt[1]) request.add_qsub_arg(opt[0]) request.add_qsub_arg(opt[1]) elif ( opt[0]=="-t" ): for thisrange in opt[1].split(","): if ( "-" in thisrange ): (taskstart,taskend) = thisrange.split("-",1) taskstart = int(taskstart) taskend = int(taskend) else: taskstart = int(thisrange) taskend = int(taskstart) request.add_range([taskstart,taskend]) elif ( opt[0]=="--tasks-per-node" or opt[0]=="--tpn" ): request.set('tpn',int(opt[1])) elif ( opt[0]=="--test" ): request.set('test_mode',True) elif ( opt[0]=="--help" ): usage() else: request.add_qsub_arg(opt[0]) if ( len(opt)>1 ): request.add_qsub_arg(opt[1]) return args # Main program starts here # site specific job start and end stanzas, which should include: # * getting parallel-command-processor into $PATH # * setting $jobid (just the numeric jobid, possibly needed for output # file naming) jobstart = "cd $PBS_O_WORKDIR\nmodule load pcp\njobid=`echo $PBS_JOBID | sed 's/\..*$//'`\n" jobend = "\n" # site specific MPI launcher settings: # * the name of the MPI launcher program for parallel-command-processor # * the MPI launcher argument used to set how many MPI tasks per node (TPN) mpiexec = "mpiexec" mpiexec_tpn_arg = "-ppn" # command line argument processing req = request() args = parse_args(sys.argv[1:],req) # make sure jobarray script exists if ( len(args)>0 ): jobarrayscript = args[0] if ( not (os.path.exists(jobarrayscript) ) ): raise IOError(jobarrayscript+": file not found") else: raise RuntimeError("No job array script specified") # grovel options out of job array script script_args = [] fp = open(jobarrayscript) lines = fp.readlines() fp.close() for line in lines: if ( line.startswith("#"+req.get('pbs_header')) ): for elt in line.rstrip("\n").split()[1:]: if ( elt.startswith("#") ): break else: script_args.append(elt) # emulate qsub behavior of only reading PBS headers up to the first # non-comment, non-whitespace line elif ( not line.startswith("#") and not ( re.match('^\s*$',line) ) ): break # parse script arguments script_req = request() parse_args(script_args,script_req) # combine script request with the command line request req.merge(script_req) # generate PCP config file that emulates PBS_ARRAYID behavior, especially # WRT stdout and stderr files # basically "PBS_ARRAYID=# $SHELL $JOBARRAYSCRIPT" pcp_cfg = "" if ( req.get('jobname') is None ): req.set('jobname',os.path.basename(jobarrayscript)) jobname = req.get('jobname') join = req.get('join') outfile = req.get('outfile') errfile = req.get('errfile') shell = req.get('shell') for thisrange in req.get('ranges'): for taskid in range(thisrange[0],thisrange[1]+1): stdout = None stderr = None if ( join is None or join=="n" ): # seperate stdout and stderr if ( outfile is None ): stdout = ">"+jobname+".o${jobid}-"+str(taskid) else: stdout = ">"+outfile+"-"+str(taskid) if ( errfile is None ): stderr = "2>"+jobname+".e${jobid}-"+str(taskid) else: stderr = "2>"+errfile+"-"+str(taskid) elif ( join=="oe" ): # combine into stdout if ( outfile is None ): stdout = ">"+jobname+".o${jobid}-"+str(taskid) else: stdout = ">"+outfile+"-"+str(taskid) stderr = "2>&1" elif ( join=="eo" ): # combine into stderr if ( errfile is None ): stdout = ">"+jobname+".e${jobid}-"+str(taskid) else: stdout = ">"+errfile+"-"+str(taskid) stderr = "2>&1" pcp_cfg += "PBS_ARRAYID="+str(taskid)+" "+shell+" "+jobarrayscript+" "+stdout+" "+stderr+"\n" # generate job script jobscript = jobstart jobscript += mpiexec if ( "tpn" in req.keys() and req.get('tpn') is not None ): jobscript += " "+mpiexec_tpn_arg+" "+str(req.get('tpn')) jobscript += " parallel-command-processor <