/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * * 1. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 4, this list of conditions * and the disclaimer contained in paragraph 5. * * 2. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 4, this list of * conditions and the disclaimer contained in paragraph 5 in the * documentation and/or other materials provided with the distribution. * * 3. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 4. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 5. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ #include /* the master config generated by configure */ #include #include #include #ifdef USEJOBCREATE #ifndef JOBFAKE #include #endif /* JOBFAKE */ #endif /* USEJOBCREATE */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if IBM_SP2==2 /* IBM SP with PSSP 3.1 */ #include #endif /* IBM SP */ #include "libpbs.h" #include "portability.h" #include "list_link.h" #include "server_limits.h" #include "attribute.h" #include "resource.h" #include "resmon.h" #include "pbs_job.h" #include "log.h" #include "rpp.h" #include "mom_mach.h" #include "mom_func.h" #include "pbs_error.h" #include "svrfunc.h" #include "net_connect.h" #include "dis.h" #include "batch_request.h" #include "md5.h" #include "mcom.h" #include "resource.h" #include "utils.h" #ifdef ENABLE_CPA #include "pbs_cpa.h" #endif #ifdef PENABLE_LINUX26_CPUSETS #include "pbs_cpuset.h" #endif #ifdef HAVE_WORDEXP #include #endif /* HAVE_WORDEXP */ #ifdef ENABLE_CSA #ifndef CSAFAKE #include "csa_api.h" #include "csaacct.h" #else typedef enum { WM_INFO = 1, /* Informational */ WM_RECV, /* Request received */ WM_INIT, /* Request initiated */ WM_SPOOL, /* Request output spooled */ WM_TERM /* Request terminated */ } wm_type; /* * Subtypes for workload management accounting record type WM_RECV. */ #define WM_RECV_NEW 1 /* New request */ /* * Subtypes for workload management accounting record type WM_INIT. */ #define WM_INIT_START 1 /* Request started for first time */ #define WM_INIT_RESTART 2 /* Request restarted */ #define WM_INIT_RERUN 3 /* Request rerun */ /* * Subtypes for workload management accounting record type WM_TERM. */ #define WM_TERM_EXIT 1 /* Request exited */ #define WM_TERM_REQUEUE 2 /* Request requeued */ #define WM_TERM_HOLD 3 /* Request checkpointed and held */ #define WM_TERM_RERUN 4 /* Request will be rerun */ #define WM_TERM_MIGRATE 5 /* Request will be migrated */ #endif /* CSAFAKE */ #endif /* ENABLE_CSA */ #ifdef NOPOSIXMEMLOCK #undef _POSIX_MEMLOCK #endif /* NOPOSIXMEMLOCK */ #define EXTRA_VARIABLE_SPACE 5120 #define EXTRA_ENV_PTRS 32 #define MAX_JOB_ARGS 64 #define EN_THRESHOLD 100 #define B_THRESHOLD 2048 #define EXTRA_VARIABLE_SPACE 5120 /* Global Variables */ extern int exec_with_exec; extern int attempttomakedir; extern int spoolasfinalname; extern int num_var_env; extern char **environ; extern int exiting_tasks; extern int lockfds; extern tlist_head mom_polljobs; extern char *path_jobs; extern char *path_prolog; extern char *path_prologuser; extern char *path_prologp; extern char *path_prologuserp; extern char *path_spool; extern char *path_aux; extern gid_t pbsgroup; extern uid_t pbsuser; extern time_t time_now; extern unsigned int pbs_rm_port; extern u_long localaddr; extern char *nodefile_suffix; extern char *submithost_suffix; extern char DEFAULT_UMASK[]; extern char PRE_EXEC[]; extern int LOGLEVEL; extern int EXTPWDRETRY; extern long TJobStartBlockTime; extern char path_checkpoint[]; extern char jobstarter_exe_name[]; extern int jobstarter_set; extern char tmpdir_basename[]; /* for TMPDIR */ extern int src_login_batch; extern int src_login_interactive; /* Local Variables */ static int script_in; /* script file, will be stdin */ static pid_t writerpid; /* writer side of interactive job */ static pid_t shellpid; /* shell part of interactive job */ int mom_reader_go; /* see catchinter() & mom_writer() */ struct var_table vtable; /* for building up job's environ */ /* sync w/variables_else[] */ enum TVarElseEnum { tveHome = 0, tveLogName, tveJobName, tveJobID, tveQueue, tveShell, tveUser, tveJobCookie, tveNodeNum, tveTaskNum, tveMOMPort, tveNodeFile, tveNumNodes, tveTmpDir, tveVerID, tveNumNodesStr, tveNumPpn, tveGpuFile, tveNprocs, tveWallTime, tveLAST }; static char *variables_else[] = /* variables to add, value computed */ { "HOME", "LOGNAME", "PBS_JOBNAME", "PBS_JOBID", "PBS_QUEUE", "SHELL", "USER", "PBS_JOBCOOKIE", "PBS_NODENUM", "PBS_TASKNUM", "PBS_MOMPORT", "PBS_NODEFILE", "PBS_NNODES", /* number of nodes specified by size */ "TMPDIR", "PBS_VERSION", "PBS_NUM_NODES", /* number of nodes specified by nodes string */ "PBS_NUM_PPN", /* ppn value specified by nodes string */ "PBS_GPUFILE", /* file containing which GPUs to access */ "PBS_NP", /* number of processors requested */ "PBS_WALLTIME", /* requested or default walltime */ NULL }; static int num_var_else = tveLAST; /* prototypes */ static void starter_return(int, int, int, struct startjob_rtn *); static void catchinter(int); #ifdef PENABLE_LINUX26_CPUSETS extern int use_cpusets(job *); #endif /* PENABLE_LINUX26_CPUSETS */ int TMomFinalizeJob1(job *, pjobexec_t *, int *); int TMomFinalizeJob2(pjobexec_t *, int *); int TMomFinalizeJob3(pjobexec_t *, int, int, int *); int expand_path(job *,char *,int,char *); int TMomFinalizeChild(pjobexec_t *); int TMomCheckJobChild(pjobexec_t *, int, int *, int *); int InitUserEnv(job *,task *,char **,struct passwd *pwdp,char *); int mkdirtree(char *,mode_t); int TTmpDirName(job*, char *); int expand_vtable(struct var_table *vtable); int copy_data(struct var_table *tmp_vtable, struct var_table *vtable, int expand_bsize, int expand_ensize); static int search_env_and_open(const char *, u_long); extern int TMOMJobGetStartInfo(job *, pjobexec_t **); extern int mom_reader(int, int); extern int mom_writer(int, int); extern int x11_create_display(int, char *, char *phost, int pport, char *homedir, char *x11authstr); extern int blcr_restart_job(job *pjob, char *file); extern int mom_checkpoint_job_is_checkpointable(job *pjob); extern int mom_checkpoint_job_has_checkpoint(job *pjob); extern int mom_checkpoint_execute_job(job *pjob, char *shell, char *arg[], struct var_table *vtable); extern void mom_checkpoint_init_job_periodic_timer(job *pjob); extern int mom_checkpoint_start_restart(job *pjob); extern void get_chkpt_dir_to_use(job *pjob, char *chkpt_dir); extern char *cat_dirs(char *root, char *base); extern char *get_local_script_path(job *pjob, char *base); #ifdef NVIDIA_GPUS extern int setup_gpus_for_job(job *pjob); extern int use_nvidia_gpu; #endif /* NVIDIA_GPUS */ /* END prototypes */ #ifdef USEJOBCREATE uint64_t get_jobid(char *); #endif /* USEJOBCREATE */ #ifdef ENABLE_CSA void add_wkm_start(uint64_t, char *); void add_wkm_end(uint64_t, int64_t, char *); /* valid commands for csaswitch checking */ enum csa_chk_cmd { IS_INSTALLED = 0, IS_UP = 1 }; #endif /* ENABLE_CSA */ #define FDMOVE(fd) if (fd < 3) { \ int hold = fcntl(fd,F_DUPFD,3); \ close(fd); \ fd = hold; \ } /* * no_hang() - interrupt handler for alarm() around attempt to connect * to qsub for interactive jobs. If qsub hung or suspended or if the * network is fouled up, mom cannot afford to wait forever. */ static void no_hang( int sig) /* I (not used) */ { LOG_EVENT(PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, " ", "alarm timed-out connect to qsub"); return; } /* END no_hang() */ struct passwd *check_pwd( job *pjob) /* I (modified) */ { int retryCount; struct passwd *pwdp = NULL; struct group *grpp; char *ptr; /* NOTE: should cache entire pwd object (NYI) */ ptr = pjob->ji_wattr[(int)JOB_ATR_euser].at_val.at_str; if (ptr == NULL) { /* FAILURE */ sprintf(log_buffer, "no user specified for job"); return(NULL); } /* we will retry if needed just to cover temporary problems */ for (retryCount = 0;retryCount < EXTPWDRETRY;retryCount++) { pwdp = getpwnam_ext(ptr); if (pwdp != NULL) break; sleep(1); } if (pwdp == NULL) { /* FAILURE */ sprintf(log_buffer, "no password entry for user %s", ptr); return(NULL); } #ifdef __CYGWIN__ if (IamUserByName(ptr) == 0) return(NULL); #endif /* __CYGWIN__ */ if (pjob->ji_grpcache != NULL) { /* SUCCESS */ /* group cache previously loaded and cached */ return(pwdp); } pjob->ji_qs.ji_un_type = JOB_UNION_TYPE_MOM; pjob->ji_qs.ji_un.ji_momt.ji_exuid = pwdp->pw_uid; pjob->ji_grpcache = calloc(1, sizeof(struct grpcache) + strlen(pwdp->pw_dir) + 1); if (pjob->ji_grpcache == NULL) { /* FAILURE */ sprintf(log_buffer, "calloc failed"); return(NULL); } strcpy(pjob->ji_grpcache->gc_homedir, pwdp->pw_dir); /* get the group and supplimentary under which the job is to be run */ if ((pjob->ji_wattr[(int)JOB_ATR_egroup].at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) == ATR_VFLAG_SET) { /* execution group specified and not default of login group */ /* NOTE: ideally egroup should be groupname, not groupid, but pbs_server * code will send a group ID over in some instances, so we should try * to work with a groupid if provided */ grpp = getgrnam(pjob->ji_wattr[(int)JOB_ATR_egroup].at_val.at_str); if (grpp != NULL) { pjob->ji_qs.ji_un.ji_momt.ji_exgid = grpp->gr_gid; } else { int tmpGID; /* check to see if we were given a groupid (group names cannot start * with a number) */ tmpGID = (int)strtol(pjob->ji_wattr[(int)JOB_ATR_egroup].at_val.at_str,NULL,10); if (tmpGID != 0) { pjob->ji_qs.ji_un.ji_momt.ji_exgid = tmpGID; } else { /* FAILURE */ sprintf(log_buffer, "no group entry for group %s, user=%s, errno=%d (%s)", pjob->ji_wattr[(int)JOB_ATR_egroup].at_val.at_str, ptr, errno, strerror(errno)); return(NULL); } } /* END if (grpp != NULL) */ } else { /* if no group specified, default to login group */ pjob->ji_qs.ji_un.ji_momt.ji_exgid = pwdp->pw_gid; } if ((pjob->ji_grpcache->gc_ngroup = init_groups( pwdp->pw_name, pjob->ji_qs.ji_un.ji_momt.ji_exgid, NGROUPS_MAX, pjob->ji_grpcache->gc_groups)) < 0) { /* FAILURE */ sprintf(log_buffer, "too many group entries"); return(NULL); } /* perform site specific check on validatity of account */ if (site_mom_chkuser(pjob)) { /* FAILURE */ sprintf(log_buffer, "site_mom_chkuser failed"); return(NULL); } /* SUCCESS */ return(pwdp); } /* END check_pwd() */ /** * @see send_sisters() - child - send ABORT request to sisters * @see start_exec() - parent */ void exec_bail( job *pjob, /* I */ int code) /* I */ { static char id[] = "exec_bail"; int nodecount; nodecount = send_sisters(pjob, IM_ABORT_JOB); if (nodecount != pjob->ji_numnodes - 1) { sprintf(log_buffer, "%s: sent %d ABORT requests, should be %d", id, nodecount, pjob->ji_numnodes - 1); log_err(-1, id, log_buffer); } /* inform non-MS nodes that job is aborting */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; pjob->ji_qs.ji_un.ji_momt.ji_exitstat = code; job_save(pjob, SAVEJOB_QUICK); exiting_tasks = 1; if (pjob->ji_stdout > 0) close(pjob->ji_stdout); if (pjob->ji_stderr > 0) close(pjob->ji_stderr); return; } /* END exec_bail() */ /* * becomes the user for pjob * * @param pjob - the job whose user we should become * @return PBSE_BADUSER on failure */ int become_the_user( job *pjob) { log_buffer[0] = '\0'; if (setgroups(pjob->ji_grpcache->gc_ngroup, (gid_t *)pjob->ji_grpcache->gc_groups) != PBSE_NONE) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setgroups for UID = %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); } else if (setgid(pjob->ji_qs.ji_un.ji_momt.ji_exgid) != PBSE_NONE) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setgid to %lu for UID = %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exgid, (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); } else if (setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, FALSE) < 0) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setuid to %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); } if (log_buffer[0] != '\0') return(PBSE_BADUSER); else return(PBSE_NONE); } /* END become_the_user() */ #define RETRY 3 int open_demux( u_long addr, /* I */ int port) /* I */ { static char id[] = "open_demux"; int sock; int i; struct sockaddr_in remote; remote.sin_addr.s_addr = addr; remote.sin_port = htons((unsigned short)port); remote.sin_family = AF_INET; if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { sprintf(log_buffer, "%s: socket %s", id, netaddr(&remote)); log_err(errno, id, log_buffer); return(-1); } for (i = 0;i < RETRY;i++) { if (connect(sock, (struct sockaddr *)&remote, sizeof(remote)) == 0) { /* success */ return(sock); } switch (errno) { case EINTR: case ETIMEDOUT: case ECONNRESET: sleep(2); continue; /*NOTREACHED*/ break; case EADDRINUSE: case ECONNREFUSED: sprintf(log_buffer, "%s: cannot connect to %s", id, netaddr(&remote)); log_err(errno, id, log_buffer); sleep(2); continue; /*NOTREACHED*/ break; default: /* NO-OP */ break; } /* END switch (errno) */ break; } /* END for (i) */ sprintf(log_buffer, "%s: connect %s", id, netaddr(&remote)); log_err(errno, id, log_buffer); close(sock); return(-1); } /* END open_demux() */ /* * open_pty - open slave side of master/slave pty */ static int open_pty( job *pjob) /* I */ { char *name; int pts; /* Open the slave pty as the controlling tty */ name = pjob->ji_wattr[(int)JOB_ATR_outpath].at_val.at_str; if ((pts = open(name, O_RDWR, 0600)) < 0) { log_err(errno, "open_pty", "cannot open slave"); } else { FDMOVE(pts); fchmod(pts, 0620); if (fchown(pts, pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) { close(pts); log_err(errno, "open_pty", "cannot change slave's owner"); return -1; } #ifdef SETCONTROLLINGTTY #if defined(_CRAY) ioctl(0, TCCLRCTTY, 0); ioctl(pts, TCSETCTTY, 0); /* make controlling */ #elif defined(TCSETCTTY) ioctl(pts, TCSETCTTY, 0); /* make controlling */ #elif defined(TIOCSCTTY) ioctl(pts, TIOCSCTTY, 0); #endif #endif /* SETCONTROLLINGTTY */ } return(pts); } /* END open_pty() */ /* * is_joined - determine if standard out and stardard error are joined together * (-j option) and if so which is first * Returns: 0 - no join, separate files * +1 - joined as stdout * -1 - joined as stderr */ int is_joined( job *pjob) /* I */ { attribute *pattr; pattr = &pjob->ji_wattr[(int)JOB_ATR_join]; if ((pattr->at_flags & ATR_VFLAG_SET) && (pattr->at_val.at_str[0] != 'n')) { if ((pattr->at_val.at_str[0] == 'o') && (strchr(pattr->at_val.at_str, (int)'e') != 0)) { return(1); } if ((pattr->at_val.at_str[0] == 'e') && (strchr(pattr->at_val.at_str, (int)'e') != 0)) { return(-1); } } return(0); } /* END is_joined() */ /* * open_std_out_err - open standard out and err to files */ static int open_std_out_err( job *pjob, /* I */ int timeout) /* I (optional,>0 to set) */ { int i; int file_out = -2; int file_err = -2; int filemode = O_CREAT | O_WRONLY | O_APPEND | O_EXCL; /* if std out/err joined (set and != "n"), which file is first */ i = is_joined(pjob); if (timeout > 0) { alarm(timeout); } if (i == 1) { file_out = open_std_file( pjob, StdOut, filemode, pjob->ji_qs.ji_un.ji_momt.ji_exgid); file_err = dup(file_out); } else if (i == -1) { file_err = open_std_file( pjob, StdErr, filemode, pjob->ji_qs.ji_un.ji_momt.ji_exgid); file_out = dup(file_err); } if (file_out == -2) file_out = open_std_file( pjob, StdOut, filemode, pjob->ji_qs.ji_un.ji_momt.ji_exgid); if (file_err == -2) file_err = open_std_file( pjob, StdErr, filemode, pjob->ji_qs.ji_un.ji_momt.ji_exgid); alarm(0); /* disable alarm */ if ((file_out < 0) || (file_err < 0)) { /* FAILURE - cannot load files */ if ((file_out == -2) || (file_err == -2)) { /* timeout occurred */ char *path; int keeping; path = std_file_name(pjob, StdOut, &keeping); sprintf(log_buffer, "unable to stat/open file '%s' within %d seconds - check filesystem", (path != NULL) ? path : "???", timeout); log_err( errno, "open_std_out_err", log_buffer); } else { log_err( errno, "open_std_out_err", "unable to open standard output/error"); } return(-1); } /* END if ((file_out < 0) || (file_err < 0)) */ FDMOVE(file_out); /* make sure descriptor > 2 */ FDMOVE(file_err); /* so don't clobber stdin/out/err */ if (file_out != 1) { close(1); if (dup(file_out) == -1) { } close(file_out); } if (file_err != 2) { close(2); if (dup(file_err) == -1) { } close(file_err); } return(0); } /* END open_std_out_err() */ int mkdirtree( char *dirpath, /* I */ mode_t mode) /* I */ { char *part; int rc = 0; mode_t oldmask = 0; char *path = NULL; if (*dirpath != '/') { rc = -1; goto done; } /* make a copy to scribble NULLs on */ if ((path = strdup(dirpath)) == NULL) { rc = -1; goto done; } oldmask = umask(0000); part = strtok(path, "/"); if (part == NULL) { rc = -1; goto done; } *(part - 1) = '/'; /* leading '/' */ while ((part = strtok(NULL, "/")) != NULL) { if (mkdir(path, mode) == -1) { if (errno != EEXIST) { rc = errno; goto done; } } *(part - 1) = '/'; } /* very last component */ if (mkdir(path, mode) == -1) { if (errno != EEXIST) { rc = errno; goto done; } } done: if (oldmask != 0) umask(oldmask); if (path != NULL) free(path); return(rc); } /* END mkdirtree() */ /* If our config allows it, construct tmpdir path */ int TTmpDirName( job *pjob, /* I */ char *tmpdir) /* O */ { if (tmpdir_basename[0] == '/') { snprintf(tmpdir, MAXPATHLEN, "%s/%s", tmpdir_basename, pjob->ji_qs.ji_jobid); } else { *tmpdir = '\0'; } return(*tmpdir != '\0'); /* return "true" if tmpdir is set */ } int TMakeTmpDir( job *pjob, /* I */ char *tmpdir) /* I */ { char id[] = "TMakeTmpDir"; int rc; int retval; struct stat sb; if ((setegid(pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) || (setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, TRUE) == -1)) { return(0); } retval = mkdirtree(tmpdir, 0755); if (retval == 0) { /* We made it, it's ours */ pjob->ji_flags |= MOM_HAS_TMPDIR; } else { /* log the first error */ log_err(errno, id, strerror(errno)); rc = stat(tmpdir, &sb); if (rc) rc = errno; switch (rc) { case ENOENT: sprintf(log_buffer, "Unable to make job transient directory: %s", tmpdir); retval = -1; break; case 0: if (S_ISDIR(sb.st_mode)) { if (sb.st_uid == pjob->ji_qs.ji_un.ji_momt.ji_exuid) { retval = 0; /* owned by the job, allowed */ } else { sprintf(log_buffer, "Job transient tmpdir %s already exists, owned by %d", tmpdir, sb.st_uid); retval = -1; } } else { sprintf(log_buffer, "Job transient tmpdir %s exists, but is not a directory", tmpdir); retval = -1; } break; default: sprintf(log_buffer, "Cannot name job tmp directory %s (on stat)", tmpdir); return(0); break; } } /* END if (retval == 0) */ setuid_ext(pbsuser, TRUE); setegid(pbsgroup); if (retval != 0) log_err(retval, id, log_buffer); return(retval == 0); /* return boolean */ } /* END TMakeTmpDir() */ /* Sets up env for a user process, used by TMomFinalizeJob1, start_process, * and file copies */ int InitUserEnv( job *pjob, /* I */ task *ptask, /* I (optional) */ char **envp, /* I (optional) */ struct passwd *pwdp, /* I (optional) */ char *shell) /* I (optional) */ { char id[] = "InitUserEnv"; struct array_strings *vstrs; int j = 0; int ebsize = 0; char buf[MAXPATHLEN + 2]; int usertmpdir = 0; int num_nodes = 1; int num_ppn = 1; attribute *pattr; resource *presc; resource_def *prd; if (pjob == NULL) { sprintf(log_buffer, "passed a NULL pjob!"); log_err(errno, id, log_buffer); return(-1); } /* initialize vtable */ if (envp != NULL) { for (j = 0, ebsize = 0;envp[j] != NULL;j++) ebsize += strlen(envp[j]); } if (LOGLEVEL >= 10) { sprintf(log_buffer, "creating env buffer, count: %d size: %d", j, ebsize); log_ext(-1, id, log_buffer, LOG_DEBUG); } vstrs = pjob->ji_wattr[(int)JOB_ATR_variables].at_val.at_arst; vtable.v_bsize = ebsize + EXTRA_VARIABLE_SPACE + (vstrs != NULL ? (vstrs->as_next - vstrs->as_buf) : 0); vtable.v_block_start = calloc(1, vtable.v_bsize); if (vtable.v_block_start == NULL) { sprintf(log_buffer, "PBS: failed to init env, calloc: %s\n", strerror(errno)); log_err(errno, id, log_buffer); return(-1); } vtable.v_block = vtable.v_block_start; vtable.v_ensize = num_var_else + num_var_env + j + EXTRA_ENV_PTRS + (vstrs != NULL ? vstrs->as_usedptr : 0); vtable.v_used = 0; vtable.v_envp = calloc(vtable.v_ensize, sizeof(char *)); if (vtable.v_envp == NULL) { sprintf(log_buffer, "PBS: failed to init env, calloc: %s\n", strerror(errno)); log_err(errno, id, log_buffer); return(-1); } /* First variables from the local environment */ for (j = 0;j < num_var_env;++j) bld_env_variables(&vtable, environ[j], NULL); if (LOGLEVEL >= 10) { sprintf(log_buffer, "local env added, count: %d", j); log_ext(-1, id, log_buffer, LOG_DEBUG); } /* Next, the variables passed with the job. They may */ /* be overwritten with new correct values for this job */ if (vstrs != NULL) { for (j = 0;j < vstrs->as_usedptr;++j) { bld_env_variables(&vtable, vstrs->as_string[j], NULL); if (!strncmp( vstrs->as_string[j], variables_else[tveTmpDir], strlen(variables_else[tveTmpDir]))) usertmpdir = 1; } if (LOGLEVEL >= 10) { sprintf(log_buffer, "job env added, count: %d", j); log_ext(-1, id, log_buffer, LOG_DEBUG); } } /* END if (vstrs != NULL) */ /* HOME */ if (pjob->ji_grpcache != NULL) bld_env_variables(&vtable, variables_else[tveHome], pjob->ji_grpcache->gc_homedir); /* LOGNAME */ if (pwdp != NULL) bld_env_variables(&vtable, variables_else[tveLogName], pwdp->pw_name); /* PBS_JOBNAME */ bld_env_variables( &vtable, variables_else[tveJobName], pjob->ji_wattr[(int)JOB_ATR_jobname].at_val.at_str); /* PBS_JOBID */ bld_env_variables(&vtable, variables_else[tveJobID], pjob->ji_qs.ji_jobid); /* PBS_QUEUE */ bld_env_variables( &vtable, variables_else[tveQueue], pjob->ji_wattr[(int)JOB_ATR_in_queue].at_val.at_str); /* SHELL */ if (shell != NULL) bld_env_variables(&vtable, variables_else[tveShell], shell); /* USER, for compatability */ if (pwdp != NULL) bld_env_variables(&vtable, variables_else[tveUser], pwdp->pw_name); /* PBS_JOBCOOKIE */ bld_env_variables( &vtable, variables_else[tveJobCookie], pjob->ji_wattr[(int)JOB_ATR_Cookie].at_val.at_str); /* PBS_NODENUM */ sprintf(buf, "%d", pjob->ji_nodeid); bld_env_variables(&vtable, variables_else[tveNodeNum], buf); /* PBS_TASKNUM */ if (ptask != NULL) { sprintf(buf, "%d", (int)ptask->ti_qs.ti_task); bld_env_variables(&vtable, variables_else[tveTaskNum], buf); } /* PBS_MOMPORT */ sprintf(buf, "%d", pbs_rm_port); bld_env_variables(&vtable, variables_else[tveMOMPort], buf); /* PBS_NODEFILE */ if (pjob->ji_flags & MOM_HAS_NODEFILE) { sprintf(buf, "%s/%s", path_aux, pjob->ji_qs.ji_jobid); bld_env_variables(&vtable, variables_else[tveNodeFile], buf); /* add the gpu file as well */ sprintf(buf, "%s/%sgpu", path_aux, pjob->ji_qs.ji_jobid); bld_env_variables(&vtable, variables_else[tveGpuFile], buf); } /* PBS_WALLTIME */ pattr = &pjob->ji_wattr[JOB_ATR_resource]; prd = find_resc_def(svr_resc_def, "walltime", svr_resc_size); if ((presc = find_resc_entry(pattr, prd)) != NULL) { sprintf(buf, "%ld", presc->rs_value.at_val.at_long); bld_env_variables(&vtable, variables_else[tveWallTime], buf); } /* PBS_NNODES */ pattr = &pjob->ji_wattr[(int)JOB_ATR_resource]; prd = find_resc_def(svr_resc_def, "size", svr_resc_size); presc = find_resc_entry(pattr, prd); if (presc != NULL) { sprintf(buf, "%ld", presc->rs_value.at_val.at_long); bld_env_variables(&vtable, variables_else[tveNumNodes], buf); } /* PBS_NUM_NODES and PBS_NPPN */ prd = find_resc_def(svr_resc_def,"nodes",svr_resc_size); presc = find_resc_entry(pattr,prd); if (presc != NULL) { char *ppn_str = "ppn="; char *tmp; if (presc->rs_value.at_val.at_str != NULL) { num_nodes = atoi(presc->rs_value.at_val.at_str); if (num_nodes != 0) { if ((tmp = strstr(presc->rs_value.at_val.at_str,ppn_str)) != NULL) { tmp += strlen(ppn_str); num_ppn = atoi(tmp); } } } } /* these values have been initialized to 1, and will always be in the * environment */ sprintf(buf,"%d",num_nodes); bld_env_variables(&vtable,variables_else[tveNumNodesStr],buf); if (LOGLEVEL >= 3) { snprintf(log_buffer, sizeof(log_buffer), "Added %s=%s to environment", variables_else[tveNumNodesStr], buf); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } sprintf(buf,"%d",num_ppn); bld_env_variables(&vtable,variables_else[tveNumPpn],buf); if (LOGLEVEL >= 3) { snprintf(log_buffer, sizeof(log_buffer), "Added %s=%s to environment", variables_else[tveNumPpn], buf); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } /* setup TMPDIR */ if (!usertmpdir && TTmpDirName(pjob, buf)) bld_env_variables(&vtable, variables_else[tveTmpDir], buf); /* PBS_VERSION */ sprintf(buf, "TORQUE-%s", PACKAGE_VERSION); bld_env_variables(&vtable, variables_else[tveVerID], buf); /* passed-in environment for tasks */ if (envp != NULL) { for (j = 0;envp[j];j++) bld_env_variables(&vtable, envp[j], NULL); } return(0); } /* END InitUserEnv() */ /** * mom_jobstarter_execute_job * * This routine is called from the newly created child process. * * @param pjob Pointer to job structure. * @see TMomFinalizeChild */ int mom_jobstarter_execute_job(job *pjob, char *shell, char *arg[], struct var_table *vtable) { static char *id = "mom_jobstarter_execute_job"; /* Launch job executable with cr_run command so that cr_checkpoint command will work. */ /* shuffle up the existing args */ arg[5] = arg[4]; arg[4] = arg[3]; arg[3] = arg[2]; arg[2] = arg[1]; /* replace first arg with shell name note, this func is called from a child process that exits after the executable is launched, so we don't have to worry about freeing this malloc later */ arg[1] = malloc(strlen(shell) + 1); if (arg[1] == NULL) { log_err(errno,id,"cannot alloc env"); return(-1); } strcpy(arg[1], shell); arg[0] = jobstarter_exe_name; if (LOGLEVEL >= 10) { char cmd[MAXPATHLEN + 1]; int i; strcpy(cmd,arg[0]); for (i = 1; arg[i] != NULL; i++) { strcat(cmd," "); strcat(cmd,arg[i]); } strcat(cmd,")"); log_buffer[0] = '\0'; sprintf(log_buffer, "execing jobstarter command (%s)\n", cmd); log_ext(-1, id, log_buffer, LOG_DEBUG); } execve(jobstarter_exe_name, arg, vtable->v_envp); return (0); } /* * Used by MOM superior to start the shell process. * perform all server level pre-job tasks, collect information * create parent-child pipes * * @see mom_set_use() - child */ int TMomFinalizeJob1( job *pjob, /* I (modified) */ pjobexec_t *TJE, /* O */ int *SC) /* O */ { static char *id = "TMomFinalizeJob1"; torque_socklen_t slen; int i; int rc; attribute *pattr; attribute *pattri; #ifndef MOM_FORCENODEFILE resource *presc; #endif resource_def *prd; struct sockaddr_in saddr; char buf[MAXPATHLEN + 2]; time_t time_now; struct stat sb; *SC = 0; time_now = time(0); if (TJE == NULL) { sprintf(log_buffer, "bad param in %s", id); *SC = JOB_EXEC_RETRY; return(FAILURE); } /* initialize job exec struct */ memset(TJE, 0, sizeof(pjobexec_t)); TJE->ptc = -1; TJE->pjob = (void *)pjob; /* prepare job environment */ if (pjob->ji_numnodes > 1) { /* ** Get port numbers from file decriptors in job struct. The ** sockets are stored there so they can be closed later as ** Main MOM will not need them after the job is going. */ slen = sizeof(saddr); if (getsockname( pjob->ji_stdout, (struct sockaddr *)&saddr, &slen) == -1) { sprintf(log_buffer, "getsockname on stdout"); *SC = JOB_EXEC_RETRY; return(FAILURE); } TJE->port_out = (int)ntohs(saddr.sin_port); slen = sizeof(saddr); if (getsockname( pjob->ji_stderr, (struct sockaddr *)&saddr, &slen) == -1) { sprintf(log_buffer, "getsockname on stderr"); *SC = JOB_EXEC_RETRY; return(FAILURE); } TJE->port_err = (int)ntohs(saddr.sin_port); } else { TJE->port_out = -1; TJE->port_err = -1; } /* did the job request nodes? will need to setup node file */ pattr = &pjob->ji_wattr[(int)JOB_ATR_resource]; prd = find_resc_def(svr_resc_def, "neednodes", svr_resc_size); #ifdef MOM_FORCENODEFILE find_resc_entry(pattr, prd); pjob->ji_flags |= MOM_HAS_NODEFILE; #else /* MOM_FORCENODEFILE */ presc = find_resc_entry(pattr, prd); if (presc != NULL) pjob->ji_flags |= MOM_HAS_NODEFILE; #endif /* MOM_FORCENODEFILE */ /* * get the password entry for the user under which the job is to be run * we do this now to save a few things in the job structure */ if ((TJE->pwdp = (void *)check_pwd(pjob)) == NULL) { LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); *SC = JOB_EXEC_FAIL1; return(FAILURE); } #if IBM_SP2==2 /* IBM SP with PSSP 3.1 */ /* load IBM SP switch table */ if (load_sp_switch(pjob) != 0) { LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } #endif /* IBM SP */ /* Starting job */ mom_checkpoint_init_job_periodic_timer(pjob); if (mom_checkpoint_job_has_checkpoint(pjob)) { rc = mom_checkpoint_start_restart(pjob); if (rc == PBSE_NONE) { /* SUCCESS */ log_ext(-1, id, "Restart succeeded", LOG_DEBUG); /* reset mtime so walltime will not include held time */ /* update to time now minus the time already used */ /* unless it is suspended, see request.c/req_signal() */ /* check time on the file not the directory */ get_chkpt_dir_to_use(pjob, buf); strcat(buf, "/"); strcat(buf, pjob->ji_wattr[JOB_ATR_restart_name].at_val.at_str); stat(buf, &sb); if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_Suspend) == 0) { if ((pjob->ji_qs.ji_stime == 0) && (pjob->ji_wattr[(int)JOB_ATR_start_time].at_flags & ATR_VFLAG_SET)) { pjob->ji_qs.ji_stime = (time_t)pjob->ji_wattr[(int)JOB_ATR_start_time].at_val.at_long; } pjob->ji_qs.ji_stime = time_now - (sb.st_mtime - pjob->ji_qs.ji_stime); pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; if (mom_get_sample() != PBSE_NONE) mom_set_use(pjob); } else { pjob->ji_qs.ji_substate = JOB_SUBSTATE_SUSPEND; } *SC = 0; return(FAILURE); } else { /* FAILURE */ log_err(-1, id, "Restart failed"); /* retry for any kind of changable thing */ if ((errno == EAGAIN) || #ifdef ERFLOCK (errno == ERFLOCK) || #endif #ifdef EQUSR (errno == EQUSR) || #endif #ifdef EQGRP (errno == EQGRP) || #endif #ifdef EQACT (errno == EQACT) || #endif #ifdef ENOSDS (errno == ENOSDS) || #endif (errno == ENOMEM) || (errno == ENOLCK) || (errno == ENOSPC) || (errno == ENFILE) || (errno == EDEADLK) || (errno == EBUSY)) { pjob->ji_qs.ji_un.ji_momt.ji_exitstat = JOB_EXEC_RETRY; *SC = JOB_EXEC_RETRY; } else { pjob->ji_qs.ji_un.ji_momt.ji_exitstat = JOB_EXEC_BADRESRT; *SC = JOB_EXEC_FAIL1; } pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; exiting_tasks = 1; sprintf(log_buffer, "Restart failed, error %d (%s)", errno, pbs_strerror(errno)); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(FAILURE); } } /* end of if mom_checkpoint_job_has_checkpoint(pjob) */ /* * if certain resource limits require that the job usage be * polled or it is a multinode job, we link the job to mom_polljobs. * * NOTE: we overload the job field ji_jobque for this as it * is not used otherwise by MOM */ if ((pjob->ji_numnodes > 1) || (mom_do_poll(pjob) != 0)) append_link(&mom_polljobs, &pjob->ji_jobque, pjob); pattri = &pjob->ji_wattr[(int)JOB_ATR_interactive]; if ((pattri->at_flags & ATR_VFLAG_SET) && (pattri->at_val.at_long != 0)) { TJE->is_interactive = TRUE; } else { TJE->is_interactive = FALSE; } if (TJE->is_interactive == TRUE) { /* * open a master pty, need to do it here before we fork, * to save the slave name in the master's job structure */ if ((TJE->ptc = open_master(&TJE->ptc_name)) < 0) { log_err(errno, id, "cannot open master pty"); *SC = JOB_EXEC_RETRY; return(FAILURE); } FDMOVE(TJE->ptc) /* save pty name in job output/error file name */ pattr = &pjob->ji_wattr[(int)JOB_ATR_outpath]; job_attr_def[(int)JOB_ATR_outpath].at_free(pattr); job_attr_def[(int)JOB_ATR_outpath].at_decode( pattr, NULL, NULL, TJE->ptc_name); pjob->ji_wattr[(int)JOB_ATR_outpath].at_flags = (ATR_VFLAG_SET | ATR_VFLAG_MODIFY | ATR_VFLAG_SEND); pattr = &pjob->ji_wattr[(int)JOB_ATR_errpath]; job_attr_def[(int)JOB_ATR_errpath].at_free(pattr); job_attr_def[(int)JOB_ATR_errpath].at_decode( pattr, NULL, NULL, TJE->ptc_name); pjob->ji_wattr[(int)JOB_ATR_errpath].at_flags = (ATR_VFLAG_SET | ATR_VFLAG_MODIFY | ATR_VFLAG_SEND); } /* END if (TJE->is_interactive == TRUE) */ #if SHELL_USE_ARGV == 0 #if SHELL_INVOKE == 1 if (TJE->is_interactive == FALSE) { /* need a pipe on which to write the shell script */ /* file name to the input of the shell */ if (pipe(TJE->pipe_script) == -1) { sprintf(log_buffer, "Failed to create shell name pipe, errno = %d (%s)", errno, strerror(errno)); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } } /* END if (TJE->is_interactive == FALSE) */ #endif /* SHELL_INVOKE */ #endif /* !SHELL_USE_ARGV */ /* create pipes between MOM and the job starter */ /* fork the job starter which will become the job */ if ((pipe(TJE->mjspipe) == -1) || (pipe(TJE->jsmpipe) == -1)) { i = -1; } else { i = 0; /* make sure pipe file descriptors are above 2 */ if (TJE->jsmpipe[1] < 3) { TJE->upfds = fcntl(TJE->jsmpipe[1], F_DUPFD, 3); close(TJE->jsmpipe[1]); TJE->jsmpipe[1] = 0; } else { TJE->upfds = TJE->jsmpipe[1]; } if (TJE->mjspipe[0] < 3) { TJE->downfds = fcntl(TJE->mjspipe[0], F_DUPFD, 3); close(TJE->mjspipe[0]); TJE->mjspipe[0] = 0; } else { TJE->downfds = TJE->mjspipe[0]; } } if ((i == -1) || (TJE->upfds < 3) || (TJE->downfds < 3)) { sprintf(log_buffer, "cannot create communication pipe"); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } if ((TJE->ptask = (void *)pbs_task_create(pjob, TM_NULL_TASK)) == NULL) { sprintf(log_buffer, "cannot create job task"); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } pjob->ji_qs.ji_substate = JOB_SUBSTATE_STARTING; pjob->ji_qs.ji_stime = time_now; return(SUCCESS); } /* END TMomFinalizeJob1() */ /* fork child/prolog */ int TMomFinalizeJob2( pjobexec_t *TJE, /* I */ int *SC) /* O */ { static char *id = "TMomFinalizeJob2"; char buf[MAXPATHLEN + 2]; pid_t cpid; #if SHELL_USE_ARGV == 0 #if SHELL_INVOKE == 1 int i, j; #endif /* SHELL_INVOKE */ #endif /* !SHELL_USE_ARGV */ job *pjob; pjob = (job *)TJE->pjob; if (LOGLEVEL >= 4) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "about to fork child which will become job"); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, id, log_buffer); } /* ** fork the child that will become the job. */ if ((cpid = fork_me(-1)) < 0) { /* fork failed */ sprintf(log_buffer, "fork kf job '%s' failed in (errno=%d, '%s')", pjob->ji_qs.ji_jobid, errno, strerror(errno)); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, id, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } if (cpid == 0) { /* CHILD: handle child activities */ TMomFinalizeChild(TJE); /*NOTREACHED*/ } /* parent */ close(TJE->upfds); close(TJE->downfds); if (TJE->ptc >= 0) close(TJE->ptc); strcpy(buf, path_jobs); strcat(buf, pjob->ji_qs.ji_fileprefix); strcat(buf, JOB_SCRIPT_SUFFIX); if (chown( buf, pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) { } #if SHELL_USE_ARGV == 0 #if SHELL_INVOKE == 1 if (TJE->is_interactive == FALSE) { int k; if (exec_with_exec) { if (strlen(buf)+5 <= MAXPATHLEN) { memmove(buf + 5, buf, strlen(buf) + 1); strncpy(buf, "exec ", 5); } } /* pass name of shell script on pipe */ /* will be stdin of shell */ close(TJE->pipe_script[0]); /* Did the user submit arguments with the -F option in qsub? */ if(pjob->ji_wattr[(int)JOB_ATR_arguments].at_flags & ATR_VFLAG_SET) { strcat(buf, " "); strcat(buf, pjob->ji_wattr[(int)JOB_ATR_arguments].at_val.at_str); } strcat(buf, "\n"); /* setup above */ i = strlen(buf); j = 0; while (j < i) { if ((k = write(TJE->pipe_script[1], buf + j, i - j)) < 0) { if (errno == EINTR) continue; break; } j += k; } close(TJE->pipe_script[1]); } /* END if (TJE->is_interactive == FALSE) */ #endif /* SHELL_INVOKE */ #endif /* !SHELL_USE_ARGV */ /* SUCCESS: parent returns */ if (LOGLEVEL >= 3) { sprintf(log_buffer, "phase 2 of job launch successfully completed"); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } *SC = 0; return(SUCCESS); } /* END TMomFinalizeJob2() */ int determine_umask( int uid) /* I */ { static char *id = "determine_umask"; int UMaskVal = 0077; struct passwd *pwdp; FILE *fp; char retdata[20]; char command[100]; if (DEFAULT_UMASK[0] != '\0') { if (!strcasecmp(DEFAULT_UMASK, "userdefault")) { /* apply user default */ /* do we inherit umask when we do setuid(), NO */ /* we want to try and determine what the users umask is */ /* then we return its value so it can be set correctly */ if ((pwdp = getpwuid(uid)) == NULL) { sprintf(log_buffer, "FAILED to get password structure for uid %d", uid); log_err(-1, id, log_buffer); } else { sprintf(command, "/bin/su - %s -c umask", pwdp->pw_name); if ((fp = popen(command, "r")) != NULL) { if (fgets(retdata, 20, fp) != NULL) { /* set the umask value from returned data */ UMaskVal = strtol(retdata, NULL, 8); } pclose(fp); } } } else { UMaskVal = (int)strtol(DEFAULT_UMASK, NULL, 0); } /* make sure that we have access to the file when we move the spooled file */ UMaskVal = UMaskVal & 0377; if (LOGLEVEL > 7) { sprintf(log_buffer, "Using $job_output_file_umask value of %o", UMaskVal); log_ext(-1, id, log_buffer, LOG_DEBUG); } } return(UMaskVal); } /* END determine_umask() */ #ifdef PENABLE_LINUX26_CPUSETS /** * indicates whether or not cpusets should be used to run this job * * @param pjob - the job to check * @return TRUE if either GEOMETRY_REQUESTS is undefined or a request was made * else FALSE */ int use_cpusets( job *pjob) /* I */ { #ifdef ALWAYS_USE_CPUSETS return(TRUE); #else #ifdef GEOMETRY_REQUESTS resource *presc; resource_def *prd; if (pjob == NULL) return(FALSE); prd = find_resc_def(svr_resc_def,"procs_bitmap",svr_resc_size); presc = find_resc_entry(&pjob->ji_wattr[(int)JOB_ATR_resource],prd); /* don't create a cpuset unless one was specifically requested */ if ((presc == NULL) || (presc->rs_value.at_flags & ATR_VFLAG_SET) == FALSE) { return(FALSE); } else return(TRUE); #else return(TRUE); #endif /* GEOMETRY_REQUESTS */ #endif /* ALWAYS_USE_CPUSETS */ } /* END use_cpusets() */ #endif /* PENABLE_LINUX26_CPUSETS */ /* * writes the exec_gpu str to a file * receives strings in the format: -gpu/[+-gpu/...] * and prints them in the format: -gpu[\n-gpu...] * * @param file - the file to print to * @param pjob - the job whose gpu string will be printed * @return PBSE_NONE if success, error code otherwise */ int write_gpus_to_file( job *pjob) /* I */ { static char *id = "write_gpus_to_file"; char filename[MAXPATHLEN]; FILE *file; char *gpu_str; char *gpu_worker; char *plus; char *slash; char *next; char *curr; /* if there are no gpus, do nothing */ if ((pjob->ji_wattr[JOB_ATR_exec_gpus].at_flags & ATR_VFLAG_SET) == 0) return(PBSE_NONE); gpu_str = pjob->ji_wattr[JOB_ATR_exec_gpus].at_val.at_str; if (gpu_str == NULL) return(PBSE_NONE); /* open the file just like $PBS_NODEFILE */ sprintf(filename, "%s/%sgpu", path_aux, pjob->ji_qs.ji_jobid); if ((file = fopen(filename, "w")) == NULL) { sprintf(log_buffer, "cannot open %s", filename); log_err(errno, id, log_buffer); return(-1); } if (fchmod(fileno(file), 0644) == -1) { sprintf(log_buffer, "cannot chmod %s", filename); log_err(errno, id, log_buffer); fclose(file); return(-1); } gpu_worker = malloc(strlen(gpu_str) + 1); if (gpu_worker == NULL) { log_err(ENOMEM,id,"Couldn't allocate a string to work with? EPIC FAIL"); return(ENOMEM); } strcpy(gpu_worker,gpu_str); curr = gpu_worker; while (curr != NULL) { plus = strchr(curr,'+'); if (plus == NULL) { /* we have reached the last string */ next = NULL; } else { /* there is more, set up things to print this one */ next = plus + 1; *plus = '\0'; } /* remove the slash replace it with a dash */ slash = strchr(curr,'/'); if (slash == NULL) { /* should never happen, but we'll attempt to handle it anyway */ fprintf(file,"%s\n",curr); } else { *slash = '\0'; fprintf(file,"%s%s\n",curr,slash+1); } /* advance to the next chunk */ curr = next; } /* SUCCESS */ free(gpu_worker); fclose(file); return(PBSE_NONE); } /* END write_gpus_to_file() */ /* * writes the exec_host str to a file * receives strings in the format: /[+/...] * and prints them out to the file * * @param file - the file to print to * @param pjob - the job whose host string will be printed * @return PBSE_NONE if success, error code otherwise */ int write_nodes_to_file( job *pjob) /* I */ { static char *id = "write_nodes_to_file"; char filename[MAXPATHLEN]; int j, vnodenum; FILE *file; char *BPtr; sprintf(filename, "%s/%s", path_aux, pjob->ji_qs.ji_jobid); if ((file = fopen(filename, "w")) == NULL) { sprintf(log_buffer, "cannot open %s", filename); log_err(errno, id, log_buffer); exit(1); } /* ** The file must be owned by root and readable by ** the user. We take the easy way out and make ** it readable by anyone. */ if (fchmod(fileno(file), 0644) == -1) { sprintf(log_buffer, "cannot chmod %s", filename); log_err(errno, id, log_buffer); fclose(file); exit(1); } /* NOTE: if BEOWULF_JOB_MAP is set, populate node file with this info */ BPtr = get_job_envvar(pjob, "BEOWULF_JOB_MAP"); if (BPtr != NULL) { char tmpBuffer[1000000]; char *ptr; /* FORMAT: [:]... */ strncpy(tmpBuffer, BPtr, sizeof(tmpBuffer)); ptr = strtok(tmpBuffer, ":"); while (ptr != NULL) { if (nodefile_suffix != NULL) { fprintf(file, "%s%s\n", ptr, nodefile_suffix); } else { fprintf(file, "%s\n", ptr); } ptr = strtok(NULL, ":"); } } else { vnodenum = pjob->ji_numvnod; for (j = 0;j < vnodenum;j++) { vnodent *vp = &pjob->ji_vnods[j]; if (nodefile_suffix != NULL) { fprintf(file, "%s%s\n", vp->vn_host->hn_host, nodefile_suffix); } else { fprintf(file, "%s\n", vp->vn_host->hn_host); } } /* END for (j) */ } fclose(file); return(PBSE_NONE); } /* END write_nodes_to_file() */ /* child portion of job launch executed as user - called by TMomFinalize2() */ /* will execute run_pelog() * issues setuid to pjob->ji_qs.ji_un.ji_momt.ji_exuid */ int TMomFinalizeChild( pjobexec_t *TJE) /* I */ { static char *id = "TMomFinalizeChild"; int aindex; char *arg[MAX_JOB_ARGS]; char buf[MAXPATHLEN + 2]; pid_t cpid; int i, j, vnodenum; char qsubhostname[1024]; char *phost = NULL; int pport = 0; int pts; int qsub_sock; char *shell; char *shellname; char *idir; char *termtype; resource *presc; char *path_prologuserjob; #ifdef USEJOBCREATE struct startjob_rtn sjr = { 0, 0, 0}; #else struct startjob_rtn sjr = { 0, 0}; #endif /* USEJOBCREATE */ job *pjob; task *ptask; struct passwd *pwdp; char EMsg[1024]; pjob = (job *)TJE->pjob; ptask = (task *)TJE->ptask; pwdp = (struct passwd *)TJE->pwdp; /*******************************************/ /* */ /* The child process - will become the job */ /* */ /*******************************************/ /* NOTE: This child is launched on the mother superior node. It does not have access to stdout/stderr, failure messages will route to syslog via log_err() */ if (LOGLEVEL >= 10) log_ext(-1, id, "starting", LOG_DEBUG); if (lockfds >= 0) { close(lockfds); lockfds = -1; } close(TJE->jsmpipe[0]); close(TJE->mjspipe[1]); /* find which shell to use, one specified or the login shell */ shell = set_shell(pjob, pwdp); /* in the machine dependent section */ if (LOGLEVEL >= 10) log_ext(-1, id, "shell initialized", LOG_DEBUG); /* Setup user env */ if (InitUserEnv(pjob, ptask, NULL, pwdp, shell) < 0) { log_err(-1, id, "failed to setup user env"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, &sjr); } if (LOGLEVEL >= 10) log_ext(-1, id, "env initialized", LOG_DEBUG); /* Create the job's nodefile */ vnodenum = pjob->ji_numvnod; if (pjob->ji_flags & MOM_HAS_NODEFILE) { if (write_nodes_to_file(pjob) == -1) { starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if (write_gpus_to_file(pjob) == -1) { starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } #ifdef NVIDIA_GPUS if ((use_nvidia_gpu) && setup_gpus_for_job(pjob) == -1) { starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } #endif /* NVIDIA_GPUS */ } /* END if (pjob->ji_flags & MOM_HAS_NODEFILE) */ if (LOGLEVEL >= 10) log_ext(-1, id, "node file created", LOG_DEBUG); /* Set PBS_VNODENUM */ sprintf(buf, "%d", 0); bld_env_variables(&vtable, "PBS_VNODENUM", buf); /* PBS_NP */ sprintf(buf, "%d", vnodenum); bld_env_variables(&vtable, variables_else[tveNprocs], buf); #ifdef PENABLE_LINUX26_CPUSETS if (use_cpusets(pjob) == TRUE) { if (LOGLEVEL >= 6) { sprintf(log_buffer, "about to create cpuset for job %s.\n", pjob->ji_qs.ji_jobid); log_ext(-1, id, log_buffer, LOG_DEBUG); } if (create_jobset(pjob) == FAILURE) { /* FAILURE */ sprintf(log_buffer, "Could not create cpuset for job %s.\n", pjob->ji_qs.ji_jobid); log_err(-1, id, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, &sjr); } } #endif /* END PENABLE_LINUX26_CPUSETS */ #ifdef ENABLE_CPA /* Cray CPA setup */ if ((j = CPACreatePartition(pjob, &vtable)) != 0) { log_err(-1, id, "CPACreatePartition failed"); starter_return(TJE->upfds, TJE->downfds, j, &sjr); /* exits */ /*NOTREACHED*/ exit(1); } #endif /* END ENABLE_CPA */ /* specific system related variables */ j = set_mach_vars(pjob, &vtable); if (j != 0) { log_err(-1, id, "failed to set mach vars"); starter_return(TJE->upfds, TJE->downfds, j, &sjr); /* exits */ /*NOTREACHED*/ exit(1); } if (LOGLEVEL >= 10) log_ext(-1, id, "system vars set", LOG_DEBUG); umask(determine_umask(pjob->ji_qs.ji_un.ji_momt.ji_exuid)); if (TJE->is_interactive == TRUE) { struct sigaction act; /*************************************************************/ /* We have an "interactive" job, connect the standard */ /* streams to a socket connected to qsub. */ /*************************************************************/ sigemptyset(&act.sa_mask); #ifdef SA_INTERRUPT act.sa_flags = SA_INTERRUPT; #else act.sa_flags = 0; #endif /* SA_INTERRUPT */ act.sa_handler = no_hang; sigaction(SIGALRM, &act, NULL); /* only giving ourselves 5 seconds to connect to qsub * and get term settings */ alarm(5); /* once we connect to qsub and open a pty, the user can send us * a ctrl-c. It is important that we block this until we exec() * the user's shell or we exit and the job gets stuck */ act.sa_handler = SIG_IGN; sigaction(SIGINT, &act, (struct sigaction *)0); /* Set environment to reflect interactive */ bld_env_variables(&vtable, "PBS_ENVIRONMENT", "PBS_INTERACTIVE"); /* get host where qsub resides */ phost = arst_string("PBS_O_HOST", &pjob->ji_wattr[(int)JOB_ATR_variables]); pport = pjob->ji_wattr[(int)JOB_ATR_interactive].at_val.at_long; if ((phost == NULL) || ((phost = strchr(phost, '=')) == NULL)) { log_err(-1, id, "PBS_O_HOST not set"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } phost++; if (submithost_suffix != NULL) { snprintf(qsubhostname, sizeof(qsubhostname), "%s%s", phost, submithost_suffix); } else { strncpy(qsubhostname, phost, sizeof(qsubhostname)); } qsub_sock = conn_qsub(qsubhostname, pport, EMsg); if (qsub_sock < 0) { snprintf(log_buffer, 1024, "cannot open interactive qsub socket to host %s:%d - '%s' - check routing tables/multi-homed host issues", qsubhostname, pport, EMsg); log_err(errno, id, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } FDMOVE(qsub_sock); /* send job id as validation to qsub */ if (write( qsub_sock, pjob->ji_qs.ji_jobid, PBS_MAXSVRJOBID + 1) != PBS_MAXSVRJOBID + 1) { log_err(errno, id, "cannot write jobid"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, &sjr); } /* receive terminal type and window size */ if ((termtype = rcvttype(qsub_sock)) == NULL) { log_err(errno, id, "cannot get termtype"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } bld_env_variables(&vtable, termtype, NULL); *(vtable.v_envp + vtable.v_used) = NULL; /* null term */ if (rcvwinsize(qsub_sock) == -1) { log_err(errno, id, "cannot get winsize"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } /* turn off alarm set around qsub connect activities */ alarm(0); act.sa_handler = SIG_DFL; act.sa_flags = 0; sigaction(SIGALRM, &act, NULL); #ifdef USEJOBCREATE /* * Get a job id from the system */ sjr.sj_jobid = get_jobid(pjob->ji_qs.ji_jobid); pjob->ji_wattr[(int)JOB_ATR_pagg_id].at_val.at_ll = sjr.sj_jobid; pjob->ji_wattr[(int)JOB_ATR_pagg_id].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; #endif /* USEJOBCREATE */ /* set up the job session (update sjr) */ j = set_job(pjob, &sjr); memcpy(TJE->sjr, &sjr, sizeof(sjr)); if (j < 0) { if (j == -1) { /* set_job didn't leave message in log_buffer */ strcpy(log_buffer, "unable to set session"); } log_err(-1, id, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } /* open the slave pty as the controlling tty */ if ((pts = open_pty(pjob)) < 0) { log_err(errno, id, "cannot open slave"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } act.sa_handler = SIG_IGN; /* setup to ignore SIGTERM */ writerpid = fork(); if (writerpid == 0) { /* child is "writer" process */ sigaction(SIGTERM, &act, NULL); close(TJE->upfds); close(TJE->downfds); close(pts); mom_writer(qsub_sock, TJE->ptc); shutdown(qsub_sock, 2); exit(0); } if (writerpid > 0) { /* ** parent -- it first runs the prolog then forks ** again. the child becomes the job while the ** parent becomes the reader. */ close(1); close(2); dup2(pts, 1); dup2(pts, 2); fflush(stdout); fflush(stderr); set_termcc(pts); /* set terminal control char */ setwinsize(pts); /* set window size to qsub's */ /* run prolog - interactive job */ if (run_pelog( PE_PROLOG, path_prolog, pjob, PE_IO_TYPE_ASIS) != 0) { log_err(-1, id, "interactive prolog failed"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); } #ifdef ENABLE_CSA /* * Add a workload management start record */ add_wkm_start(sjr.sj_jobid, pjob->ji_qs.ji_jobid); #endif /* ENABLE_CSA */ /* run user prolog */ if (run_pelog( PE_PROLOGUSER, path_prologuser, pjob, PE_IO_TYPE_ASIS) != 0) { log_err(-1, id, "interactive user prolog failed"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); /*NOTREACHED*/ } presc = find_resc_entry( &pjob->ji_wattr[(int)JOB_ATR_resource], find_resc_def(svr_resc_def, "prologue", svr_resc_size)); if ((presc != NULL)) if((presc->rs_value.at_flags & ATR_VFLAG_SET) && (presc->rs_value.at_val.at_str)) { path_prologuserjob = get_local_script_path(pjob, presc->rs_value.at_val.at_str); if(path_prologuserjob) { if (run_pelog( PE_PROLOGUSERJOB, path_prologuserjob, pjob, PE_IO_TYPE_ASIS)) { log_err(-1, id, "batch job local user prolog failed"); free(path_prologuserjob); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); /*NOTREACHED*/ } free(path_prologuserjob); } } shellpid = fork(); if (shellpid == 0) { /*********************************************/ /* child - this will be the interactive job */ /* i/o is to slave tty */ /*********************************************/ close(0); dup2(pts, 0); fflush(stdin); close(TJE->ptc); /* close master side */ close(pts); /* dup'ed above */ close(qsub_sock); /* continue setting up and exec-ing shell */ } else { if (shellpid > 0) { /* fork, parent is "reader" process */ sigaction(SIGTERM, &act, NULL); close(pts); close(TJE->upfds); close(TJE->downfds); close(1); close(2); sigemptyset(&act.sa_mask); act.sa_flags = SA_NOCLDSTOP; act.sa_handler = catchinter; sigaction(SIGCHLD, &act, NULL); mom_reader_go = 1; mom_reader(qsub_sock, TJE->ptc); } else { log_err(errno, id, "can't fork reader"); } /* make sure qsub gets EOF */ shutdown(qsub_sock, 2); /* change pty back to available after job is done */ chmod(TJE->ptc_name, 0666); if (chown(TJE->ptc_name, 0, 0) == -1) { } exit(0); } } /* END if (writerpid > 0) */ else { /* FAILURE - fork failed */ log_err(errno, id, "cannot fork nanny"); /* change pty back to available */ chmod(TJE->ptc_name, 0666); if (chown(TJE->ptc_name, 0, 0) == -1) { } starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, &sjr); } } /* END if (TJE->is_interactive == TRUE) */ else { /*************************************************************/ /* We have a "normal" batch job, connect the standard */ /* streams to files */ /*************************************************************/ /* set Environment to reflect batch */ bld_env_variables(&vtable, "PBS_ENVIRONMENT", "PBS_BATCH"); bld_env_variables(&vtable, "ENVIRONMENT", "BATCH"); #if SHELL_USE_ARGV == 1 /* connect stdin to /dev/null and feed the name of * the script on the command line */ if (TJE->is_interactive == FALSE) script_in = open("/dev/null", O_RDONLY, 0); #elif SHELL_INVOKE == 1 /* if passing script file name as input to shell */ close(TJE->pipe_script[1]); script_in = TJE->pipe_script[0]; #else /* SHELL_USE_ARGV || SHELL_INVOKE */ /* if passing script itself as input to shell */ strcpy(buf, path_jobs); strcat(buf, pjob->ji_qs.ji_fileprefix); strcat(buf, JOB_SCRIPT_SUFFIX); if ((script_in = open(buf, O_RDONLY, 0)) < 0) { if (errno == ENOENT) script_in = open("/dev/null", O_RDONLY, 0); } #endif /* SHELL_USE_ARGV */ if (LOGLEVEL >= 10) log_ext(-1, id, "opening script", LOG_DEBUG); if (script_in < 0) { log_err(errno, id, "unable to open script"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } FDMOVE(script_in); /* make sure descriptor > 2 */ if (script_in != 0) { close(0); if (dup(script_in) == -1) { } close(script_in); } /* NOTE: set arg2 to 5 to enable file open timeout check */ if (open_std_out_err(pjob, 0) == -1) { log_err(-1, id, "unable to open stdout/stderr descriptors"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_STDOUTFAIL, &sjr); /*NOTREACHED*/ exit(1); } if (LOGLEVEL >= 10) log_ext(-1, id, "stdout/stderr opened", LOG_DEBUG); #ifdef USEJOBCREATE /* * Get a job id from the system */ sjr.sj_jobid = get_jobid(pjob->ji_qs.ji_jobid); pjob->ji_wattr[(int)JOB_ATR_pagg_id].at_val.at_ll = sjr.sj_jobid; pjob->ji_wattr[(int)JOB_ATR_pagg_id].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; #endif /* USEJOBCREATE */ /* set up the job session (update sjr) */ j = set_job(pjob, &sjr); if (LOGLEVEL >= 10) log_ext(-1, id, "set_job complete", LOG_DEBUG); memcpy(TJE->sjr, &sjr, sizeof(sjr)); if (j < 0) { /* FAILURE */ if (j != -2 && j != -3) { /* set_job didn't leave message in log_buffer */ strcpy(log_buffer, "unable to set session"); } /* set_job leaves message in log_buffer */ log_err(-1, id, log_buffer); if (j == -3) { starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, &sjr); } else { starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); } /*NOTREACHED*/ exit(1); } /* run prolog - standard batch job */ if ((j = run_pelog( PE_PROLOG, path_prolog, pjob, PE_IO_TYPE_ASIS)) != 0) { log_err(-1, id, "batch job prolog failed"); if (j == 1) { /* permanent failure - abort job */ starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); } else { /* retry - requeue job */ starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, &sjr); } /*NOTREACHED*/ } if (LOGLEVEL >= 10) log_ext(-1, id, "prolog complete", LOG_DEBUG); #ifdef ENABLE_CSA /* * Add a workload management start record */ add_wkm_start(sjr.sj_jobid, pjob->ji_qs.ji_jobid); #endif /* ENABLE_CSA */ /* run user prolog */ if ((j = run_pelog( PE_PROLOGUSER, path_prologuser, pjob, PE_IO_TYPE_ASIS)) != 0) { log_err(-1, id, "batch job user prolog failed"); if (j == 1) { /* permanent failure - abort job */ starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); } else { /* retry - requeue job */ starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, &sjr); } /*NOTREACHED*/ } presc = find_resc_entry( &pjob->ji_wattr[(int)JOB_ATR_resource], find_resc_def(svr_resc_def, "prologue", svr_resc_size)); if (presc != NULL) if((presc->rs_value.at_flags & ATR_VFLAG_SET) && (presc->rs_value.at_val.at_str != NULL)) { path_prologuserjob = get_local_script_path(pjob, presc->rs_value.at_val.at_str); if(path_prologuserjob) { if ((j = run_pelog( PE_PROLOGUSERJOB, path_prologuserjob, pjob, PE_IO_TYPE_ASIS)) != 0) { log_err(-1, id, "batch job user prolog failed"); if (j == 1) { /* permanent failure - abort job */ free(path_prologuserjob); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); } else { /* retry - requeue job */ free(path_prologuserjob); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, &sjr); } /*NOTREACHED*/ } free(path_prologuserjob); } } } /* END else (TJE->is_interactive == TRUE) */ /***********************************************************************/ /* Set resource limits */ /* Both normal batch and interactive job come through here */ /* */ /* output fds to the user are setup at this point, so write() all */ /* errors (with a \n) directly to the user on fd 2 and fscync(2) it */ /***********************************************************************/ pjob->ji_wattr[(int)JOB_ATR_session_id].at_val.at_long = sjr.sj_session; pjob->ji_wattr[(int)JOB_ATR_session_id].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY | ATR_VFLAG_SEND; /* leaving a note for myself to check this later... why is it necessary to set JOB_ATR_session_id above? We are a child process and setting that attr should be useless. But if it isn't set, MOM sometimes SIGKILLs herself with interactive jobs -garrick */ #ifdef PENABLE_LINUX26_CPUSETS /* Move this mom process into the cpuset so the job will start in it. */ if (use_cpusets(pjob) == TRUE) { move_to_jobset(getpid(), pjob); } #endif /* (PENABLE_LINUX26_CPUSETS) */ if (site_job_setup(pjob) != 0) { /* FAILURE */ sprintf(log_buffer, "PBS: site specific job setup failed\n"); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, id, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); /* exits */ /*NOTREACHED*/ } if (LOGLEVEL >= 10) log_ext(-1, id, "setting system limits", LOG_DEBUG); log_buffer[0] = '\0'; if ((i = mom_set_limits(pjob, SET_LIMIT_SET)) != PBSE_NONE) { if (log_buffer[0] != '\0') { /* report error to user via stderr file */ if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); } if (i == PBSE_RESCUNAV) { /* resource temp unavailable */ if (TJE->is_interactive == TRUE) j = JOB_EXEC_FAIL2; else j = JOB_EXEC_RETRY; } else { j = JOB_EXEC_FAIL2; } if (log_buffer[0] != '\0') { log_err(errno, id, log_buffer); } else { log_err(errno, id, "mom_set_limits failed"); } starter_return(TJE->upfds, TJE->downfds, j, &sjr); /* exits */ /*NOTREACHED*/ return(-1); } /* END if (mom_set_limits() == 0) */ endpwent(); if (LOGLEVEL >= 10) log_ext(-1, id, "system limits set", LOG_DEBUG); if ((idir = get_job_envvar(pjob, "PBS_O_ROOTDIR")) != NULL) { if (chroot(idir) == -1) { sprintf(log_buffer, "PBS: chroot to '%.256s' failed: %s\n", idir, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, id, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); /*NOTREACHED*/ return(-1); } } /* * become the user, execv the shell and become the real job */ if (LOGLEVEL >= 10) { sprintf(log_buffer, "setting user/group credentials to %d/%d", pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid); log_ext(-1, id, log_buffer, LOG_DEBUG); } if (setgroups( pjob->ji_grpcache->gc_ngroup, (gid_t *)pjob->ji_grpcache->gc_groups) != 0) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setgroups for UID = %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno,id,log_buffer); starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); } if (setgid(pjob->ji_qs.ji_un.ji_momt.ji_exgid) != 0) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setgid to %lu for UID = %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exgid, (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno,id,log_buffer); starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); } if (setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, FALSE) < 0) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setuid to %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, id, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); } #ifdef _CRAY setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, TRUE); /* cray kludge */ #endif /* CRAY */ /* * cwd to PBS_O_INITDIR if specified, otherwise User's Home */ if ((idir = get_job_envvar(pjob, "PBS_O_INITDIR")) != NULL) { /* in TMomFinalizeChild() executed as user */ if (chdir(idir) == -1) { sprintf(log_buffer, "PBS: chdir to '%.256s' failed: %s\n", idir, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, id, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); /*NOTREACHED*/ return(-1); } } else { /* in TMomFinalizeChild() executed as user */ if (chdir(pwdp->pw_dir) == -1) { sprintf(log_buffer, "PBS: chdir to '%.256s' failed: %s\n", pwdp->pw_dir, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, id, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); /*NOTREACHED*/ return(-1); } } if (LOGLEVEL >= 10) { sprintf(log_buffer, "initial directory set to %s\n", idir != NULL ? idir : pwdp->pw_dir); log_ext(-1, id, log_buffer, LOG_DEBUG); } /* X11 forwarding init */ if ((TJE->is_interactive == TRUE) && pjob->ji_wattr[(int)JOB_ATR_forwardx11].at_val.at_str) { char display[512]; if (x11_create_display( 1, /* use localhost only */ display, /* output */ qsubhostname, pport, pjob->ji_grpcache->gc_homedir, pjob->ji_wattr[(int)JOB_ATR_forwardx11].at_val.at_str) >= 0) { bld_env_variables(&vtable, "DISPLAY", display); } else { sprintf(log_buffer, "PBS: X11 forwarding init failed\n"); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); } } /* NULL terminate the envp array, This is MUST DO */ *(vtable.v_envp + vtable.v_used) = NULL; /* tell mom we are going */ if (LOGLEVEL >= 10) log_ext(-1, id, "forking child", LOG_DEBUG); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_OK, &sjr); log_close(0); /* FIXME: this is useless, right? */ if ((pjob->ji_numnodes == 1) || ((cpid = fork()) > 0)) { /* parent does the shell */ /* close sockets that child uses */ if (pjob->ji_stdout >= 0) close(pjob->ji_stdout); if (pjob->ji_stderr >= 0) close(pjob->ji_stderr); /* construct argv array */ shellname = strrchr(shell, '/'); if (shellname != NULL) ++shellname; /* go past last '/' */ else shellname = shell; aindex = 0; /* determine whether or not we bypass the sourcing of login shells */ if (((TJE->is_interactive == TRUE) && (src_login_interactive == FALSE)) || ((TJE->is_interactive != TRUE) && (src_login_batch == FALSE))) { arg[aindex] = malloc(strlen(shellname) + 1); if (arg[aindex] == NULL) { log_err(errno,id,"cannot alloc env"); starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ return(-1); } strcpy(arg[aindex], shellname); if (LOGLEVEL >= 7) { sprintf(log_buffer, "bypass sourcing of login files for job %s", pjob->ji_qs.ji_jobid); log_ext(-1, id, log_buffer, LOG_DEBUG); } } else { arg[aindex] = malloc(strlen(shellname) + 2); if (arg[aindex] == NULL) { log_err(errno,id,"cannot alloc env"); starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ return(-1); } /* specifying '-' indicates this is a 'login' shell */ strcpy(arg[aindex], "-"); strcat(arg[aindex], shellname); } arg[aindex + 1] = NULL; aindex++; if (PRE_EXEC[0] != '\0') { arg[aindex] = strdup(PRE_EXEC); if (arg[aindex] == NULL) { log_err(errno,id,"cannot alloc env"); starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ return(-1); } arg[aindex + 1] = NULL; aindex++; } /* END if (PRE_EXEC[0] != '\0') */ #if SHELL_USE_ARGV == 1 /* Put the script's arguments on the command line (see configure option --enable-shell-use-argv). */ if (TJE->is_interactive == FALSE) { arg[aindex] = malloc( strlen(path_jobs) + strlen(pjob->ji_qs.ji_fileprefix) + strlen(JOB_SCRIPT_SUFFIX) + 6); if (arg[aindex] == NULL) { log_err(errno,id,"cannot alloc env"); starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ return(-1); } if (exec_with_exec) { strcpy(arg[aindex], "exec "); strcat(arg[aindex], path_jobs); } else strcpy(arg[aindex], path_jobs); strcat(arg[aindex], pjob->ji_qs.ji_fileprefix); strcat(arg[aindex], JOB_SCRIPT_SUFFIX); arg[aindex + 1] = NULL; aindex++; } #endif /* SHELL_USE_ARGV */ if (TJE->is_interactive == TRUE) { struct sigaction act; /* restore SIGINT so that the child shell can use ctrl-c */ sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = SIG_DFL; sigaction(SIGINT, &act, (struct sigaction *)0); /* if the user specified command(s) then invoke it */ if ((pjob->ji_wattr[(int)JOB_ATR_inter_cmd].at_flags & ATR_VFLAG_SET) != 0) { arg[aindex] = malloc(strlen("-c") + 1); if (arg[aindex] == NULL) { log_err(errno,id,"cannot alloc env"); starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ return(-1); } strcpy(arg[aindex], "-c"); arg[aindex + 1] = NULL; aindex++; arg[aindex] = malloc(strlen(pjob->ji_wattr[(int)JOB_ATR_inter_cmd].at_val.at_str) + 1); if (arg[aindex] == NULL) { log_err(errno,id,"cannot alloc env"); starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ return(-1); } log_ext(-1, id, log_buffer, LOG_DEBUG); strcpy(arg[aindex], pjob->ji_wattr[(int)JOB_ATR_inter_cmd].at_val.at_str); arg[aindex + 1] = NULL; aindex++; } } if(jobstarter_set) { if(mom_jobstarter_execute_job(pjob, shell, arg, &vtable) == -1) { starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ return(-1); } } if (mom_checkpoint_job_is_checkpointable(pjob)) { if (mom_checkpoint_execute_job(pjob, shell, arg, &vtable) == -1) { starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ return(-1); } } else { if (LOGLEVEL >= 10) { char cmd[1024]; int i; strcpy(cmd,arg[0]); strcat(cmd,","); for (i = 1; arg[i] != NULL; i++) { strcat(cmd," "); strcat(cmd,arg[i]); strcat(cmd,","); } strcat(cmd,")"); sprintf(log_buffer, "execing command (%s) args (%s)\n", shell, cmd); log_ext(-1, id, log_buffer, LOG_DEBUG); } execve(shell, arg, vtable.v_envp); } } /* END if ((pjob->ji_numnodes == 1) || ...) */ else if (cpid == 0) { /* child does demux */ char *demux = DEMUX; /* setup descriptors 3 and 4 */ dup2(pjob->ji_stdout, 3); if (pjob->ji_stdout > 3) close(pjob->ji_stdout); dup2(pjob->ji_stderr, 4); if (pjob->ji_stderr > 4) close(pjob->ji_stderr); /* construct argv array */ shellname = strrchr(demux, '/'); if (shellname != NULL) ++shellname; /* go past last '/' */ else shellname = shell; aindex = 0; arg[aindex] = malloc(strlen(shellname) + 1); if (arg[aindex] == NULL) { log_err(errno,id,"cannot alloc env"); starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ return(-1); } strcpy(arg[aindex], shellname); arg[aindex + 1] = NULL; aindex++; if (PRE_EXEC[0] != '\0') { arg[aindex] = strdup(PRE_EXEC); if (arg[aindex] == NULL) { log_err(errno,id,"cannot alloc env"); starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ return(-1); } arg[aindex + 1] = NULL; aindex++; } /* END if (PRE_EXEC[0] != '\0') */ execve(demux, arg, vtable.v_envp); /* reached only if execve fails */ shell = demux; /* for fprintf below */ } /* END else if (cpid == 0) */ sprintf(log_buffer, "PBS: exec of shell '%.256s' failed\n", shell); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); if (strlen(shell) == 0) { #ifndef NDEBUG extern char mom_host[]; #endif DBPRT(("user \"%s\" may not have a shell defined on node \"%s\"\n", pwdp->pw_name, mom_host)); } else if (strstr(shell, "/bin/false") != NULL) { #ifndef NDEBUG extern char mom_host[]; #endif DBPRT(("user \"%s\" has shell \"/bin/false\" on node \"%s\"\n", pwdp->pw_name, mom_host)); } else { struct stat buf; if (stat(shell, &buf) != 0) { DBPRT(("stat of shell \"%s\" failed with error %d\n", shell, errno)); } else if (S_ISREG(buf.st_mode) == 0) { DBPRT(("shell \"%s\" is not a file\n", shell)); } else if ((buf.st_mode & S_IXUSR) != 0) { DBPRT(("shell \"%s\" is not executable by user \"%s\"\n", shell, pwdp->pw_name)); } } exit(254); /* should never, ever get here */ /*NOTREACHED*/ return(-1); } /* END TMomFinalizeChild() */ /* Child has already reported in via pipe (info in TJE->sjr) which was created in TMomFinalizeJob2->TMomFinalizeChild. Perform final job tasks. Change pjob substate from JOB_SUBSTATE_PRERUN to JOB_SUBSTATE_RUNNING */ int TMomFinalizeJob3( pjobexec_t *TJE, /* I (modified) */ int ReadSize, /* I (bytes read from child pipe) */ int ReadErrno, /* I (errno value from read) */ int *SC) /* O (return code) */ { char *id = "TMomFinalizeJob3"; struct startjob_rtn sjr; job *pjob; task *ptask; pjob = (job *)TJE->pjob; ptask = (task *)TJE->ptask; /* sjr populated in TMomFinalizeJob2() */ memcpy(&sjr, TJE->sjr, sizeof(sjr)); close(TJE->jsmpipe[0]); if (ReadSize != sizeof(sjr)) { /* FAILURE */ sprintf(log_buffer, "read of pipe for sid failed for job %s (%d of %d bytes)", pjob->ji_qs.ji_jobid, ReadSize, (int)sizeof(sjr)); log_err(ReadErrno, id, log_buffer); sprintf(log_buffer, "start failed, improper sid"); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, id, log_buffer); close(TJE->mjspipe[1]); *SC = JOB_EXEC_RETRY; return(FAILURE); } /* send back as an acknowledgement that MOM got it */ if (write(TJE->mjspipe[1], &sjr, sizeof(sjr)) == -1) { } close(TJE->mjspipe[1]); if (LOGLEVEL >= 3) { #ifdef USEJOBCREATE sprintf(log_buffer, "Job %s read start return code=%d session=%ld jobid=%lx", pjob->ji_qs.ji_jobid, sjr.sj_code, (long)sjr.sj_session, sjr.sj_jobid); #else sprintf(log_buffer, "Job %s read start return code=%d session=%ld", pjob->ji_qs.ji_jobid, sjr.sj_code, (long)sjr.sj_session); #endif /* USEJOBCREATE */ log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, id, log_buffer); } if (sjr.sj_code != 0) { char tmpLine[1024]; /* FAILURE */ tmpLine[0] = '\0'; switch (sjr.sj_code) { case JOB_EXEC_OK: /* 0 */ strcpy(tmpLine, "no failure"); break; case JOB_EXEC_FAIL1: /* -1 */ strcpy(tmpLine, "job exec failure, before files staged, no retry"); break; case JOB_EXEC_FAIL2: /* -2 */ strcpy(tmpLine, "job exec failure, after files staged, no retry"); break; case JOB_EXEC_RETRY: /* -3 */ strcpy(tmpLine, "job exec failure, retry will be attempted"); if (sjr.sj_session < 0) { /* NOTE: push sjr.sj_sid into job attribute X to be used by encode_used */ ptask->ti_qs.ti_sid = sjr.sj_session; } break; case JOB_EXEC_STDOUTFAIL: /* -9 */ strcpy(tmpLine,"job exec failure, could not open/create stdout/stderr files, no retry"); break; default: sprintf(tmpLine, "job exec failure, code=%d", sjr.sj_code); break; } /* END switch (sjr.sj_code) */ sprintf(log_buffer, "job not started, %s %s (see syslog for more information)", (sjr.sj_code == JOB_EXEC_RETRY) ? "Retry" : "Failure", tmpLine); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, id, log_buffer); *SC = sjr.sj_code; return(FAILURE); } /* END if (sjr.sj_code < 0) */ /* pjob modified */ set_globid(pjob, &sjr); ptask->ti_qs.ti_sid = sjr.sj_session; ptask->ti_qs.ti_status = TI_STATE_RUNNING; strcpy(ptask->ti_qs.ti_parentjobid, pjob->ji_qs.ji_jobid); if (LOGLEVEL >= 6) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "saving task (TMomFinalizeJob3)"); } if (task_save(ptask) == -1) { /* FAILURE */ sprintf(log_buffer, "Task save failed"); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, id, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } if (pjob->ji_numnodes > 1) { /* ** Put port numbers into job struct and close sockets. ** The job uses them to talk to demux, but main MOM ** doesn't need them. The port numbers are stored ** here for use in start_process(), to connect to ** pbs_demux. */ close(pjob->ji_stdout); pjob->ji_stdout = TJE->port_out; close(pjob->ji_stderr); pjob->ji_stderr = TJE->port_err; } /* return from the starter indicated the job is a go ... */ /* record the start time and session/process id */ pjob->ji_wattr[(int)JOB_ATR_session_id].at_val.at_long = sjr.sj_session; pjob->ji_wattr[(int)JOB_ATR_session_id].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY | ATR_VFLAG_SEND; #ifdef USEJOBCREATE pjob->ji_wattr[(int)JOB_ATR_pagg_id].at_val.at_ll = sjr.sj_jobid; pjob->ji_wattr[(int)JOB_ATR_pagg_id].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; #endif /* USEJOBCREATE */ pjob->ji_qs.ji_state = JOB_STATE_RUNNING; pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; pjob->ji_qs.ji_stime = time_now; /* changed from SAVEJOB_QUICK to SAVEJOB_FULL (USC - 2/5/2005) */ job_save(pjob, SAVEJOB_FULL); sprintf(log_buffer, "job %s started, pid = %ld", pjob->ji_qs.ji_jobid, (long)sjr.sj_session); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, id, log_buffer); return(SUCCESS); } /* END TMomFinalizeJob3() */ /** * Start a process for a spawn request. This will be different from * a job's initial shell task in that the environment will be specified * and no interactive code need be included. * * NOTE: Called for sisters after mother superior receives IM_SPAWN_TASK request */ int start_process( task *ptask, /* I */ char **argv, /* I */ char **envp) /* I */ { static char id[] = "start_process"; char *idir; job *pjob = ptask->ti_job; pid_t pid; int pipes[2], kid_read, kid_write, parent_read, parent_write; int pts; int i, j; int fd0, fd1, fd2; u_long ipaddr; #ifdef USEJOBCREATE struct startjob_rtn sjr = { 0, 0, 0 }; #else struct startjob_rtn sjr = { 0, 0 }; #endif /* USEJOBCREATE */ if (pipe(pipes) == -1) { return(-1); } if (pipes[1] < 3) { kid_write = fcntl(pipes[1], F_DUPFD, 3); close(pipes[1]); } else { kid_write = pipes[1]; } parent_read = pipes[0]; if (pipe(pipes) == -1) { return(-1); } if (pipes[0] < 3) { kid_read = fcntl(pipes[0], F_DUPFD, 3); close(pipes[0]); } else { kid_read = pipes[0]; } parent_write = pipes[1]; /* ** Get ipaddr to Mother Superior. */ if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) /* I'm MS */ { ipaddr = htonl(localaddr); } else { struct sockaddr_in *ap; /* ** We always have a stream open to MS at node 0. */ i = pjob->ji_hosts[0].hn_stream; if ((ap = rpp_getaddr(i)) == NULL) { sprintf(log_buffer, "job %s has no stream to MS", pjob->ji_qs.ji_jobid); log_err(-1, id, log_buffer); return(-1); } ipaddr = ap->sin_addr.s_addr; } /* END else (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) */ /* A restarted mom will not have called this yet, but it is needed * to spawn tasks (ji_grpcache). */ if (!check_pwd(pjob)) { log_err(-1, id, log_buffer); return(-1); } /* ** Begin a new process for the fledgling task. */ if ((pid = fork_me(-1)) == -1) { /* fork failed */ return(-1); } if (pid != 0) { /* parent */ int gotsuccess = 0; close(kid_read); close(kid_write); /* read sid */ for (;;) { i = read(parent_read, (char *) & sjr, sizeof(sjr)); if ((i == -1) && (errno == EINTR)) continue; if ((i == sizeof(sjr)) && (sjr.sj_code == 0) && !gotsuccess) { gotsuccess = 1; if (write(parent_write, &sjr, sizeof(sjr)) == -1) { } continue; } if (gotsuccess) { i = sizeof(sjr); } break; } /* END for(;;) */ j = errno; close(parent_read); if (i != sizeof(sjr)) { sprintf(log_buffer, "read of pipe for sid job %s got %d not %ld (errno: %d, %s)", pjob->ji_qs.ji_jobid, i, (long)sizeof(sjr), j, strerror(j)); log_err(j, id, log_buffer); close(parent_write); return(-1); } /* This write to the pipe is redundant and leads to a sigpipe because of a * race condition with the child */ /* if (write(parent_write, &sjr, sizeof(sjr)) == -1) {} */ close(parent_write); DBPRT(("%s: read start return %d %ld\n", id, sjr.sj_code, (long)sjr.sj_session)) if (sjr.sj_code < 0) { char tmpLine[1024]; tmpLine[0] = '\0'; switch (sjr.sj_code) { case JOB_EXEC_OK: /* 0 */ /* NO-OP */ break; case JOB_EXEC_FAIL1: /* -1 */ case JOB_EXEC_STDOUTFAIL: /* -9 */ strcpy(tmpLine, "stdio setup failed"); break; case JOB_EXEC_FAIL2: /* -2 */ strcpy(tmpLine, "env setup or user dir problem"); break; case JOB_EXEC_RETRY: /* -3 */ strcpy(tmpLine, "unable to set limits, retry will be attempted"); break; case JOB_EXEC_CMDFAIL: /* -8 */ strcpy(tmpLine, "command exec failed"); break; default: sprintf(tmpLine, "code=%d", sjr.sj_code); break; } /* END switch (sjr.sj_code) */ sprintf(log_buffer, "task not started, '%s', %s (see syslog)", argv[0], tmpLine); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(-1); } /* END if (sjr.sj_code < 0) */ set_globid(pjob, &sjr); ptask->ti_qs.ti_sid = sjr.sj_session; ptask->ti_qs.ti_status = TI_STATE_RUNNING; if (LOGLEVEL >= 6) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "task set to running/saving task (start_process)"); } task_save(ptask); if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING) { pjob->ji_qs.ji_state = JOB_STATE_RUNNING; pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; job_save(pjob, SAVEJOB_QUICK); } sprintf(log_buffer, "%s: task started, tid %d, sid %ld, cmd %s", id, ptask->ti_qs.ti_task, (long)ptask->ti_qs.ti_sid, argv[0]); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(0); } /* END else if (pid != 0) */ /************************************************/ /* The child process - will become the TASK */ /************************************************/ if (LOGLEVEL >= 10) log_ext(-1, id, "child starting", LOG_DEBUG); if (lockfds >= 0) { close(lockfds); lockfds = -1; } close(parent_read); close(parent_write); /* set up the environmental variables to be given to the job */ /* NOTE: use log_err beyond this point to write messages to syslog */ if (InitUserEnv(pjob, ptask, envp, NULL, NULL) < 0) { log_err(errno, id, "failed to setup user env"); starter_return(kid_write, kid_read, JOB_EXEC_RETRY, &sjr); /*NOTREACHED*/ exit(1); } if (LOGLEVEL >= 10) log_ext(-1, id, "user env initialized", LOG_DEBUG); if (set_mach_vars(pjob, &vtable) != 0) { strcpy(log_buffer, "PBS: machine dependent environment variable setup failed\n"); log_err(errno, id, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if (LOGLEVEL >= 10) log_ext(-1, id, "mach vars set", LOG_DEBUG); umask(077); /* set environment to reflect batch */ bld_env_variables(&vtable, "PBS_ENVIRONMENT", "PBS_BATCH"); bld_env_variables(&vtable, "ENVIRONMENT", "BATCH"); /* Set limits for the child */ if (mom_set_limits(pjob, SET_LIMIT_SET) != PBSE_NONE) { strcpy(log_buffer, "PBS: resource limits setup failed\n"); log_err(errno, id, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } /* NULL terminate the envp array, This is MUST DO */ *(vtable.v_envp + vtable.v_used) = NULL; /* ** Set up stdin. */ /* look through env for a port# on MS we should use for stdin */ if ((fd0 = search_env_and_open("MPIEXEC_STDIN_PORT", ipaddr)) == -2) { log_err(errno, id, "cannot locate MPIEXEC_STDIN_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if ((fd0 < 0) && ((fd0 = search_env_and_open("TM_STDIN_PORT", ipaddr)) == -2)) { log_err(errno, id, "cannot locate TM_STDIN_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } /* use /dev/null if no env var found */ if ((fd0 < 0) && (fd0 = open("/dev/null", O_RDONLY)) == -1) { log_err(errno, id, "could not open dev/null"); close(0); } else { dup2(fd0, 0); if (fd0 > 0) close(fd0); } /* look through env for a port# on MS we should use for stdout/err */ if ((fd1 = search_env_and_open("MPIEXEC_STDOUT_PORT", ipaddr)) == -2) { log_err(errno, id, "cannot locate MPIEXEC_STDOUT_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if (fd1 < 0) if ((fd1 = search_env_and_open("TM_STDOUT_PORT", ipaddr)) == -2) { log_err(errno, id, "cannot locate TM_STDOUT_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if ((fd2 = search_env_and_open("MPIEXEC_STDERR_PORT", ipaddr)) == -2) { log_err(errno, id, "cannot locate MPIEXEC_STDERR_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if (fd2 < 0) if ((fd2 = search_env_and_open("TM_STDERR_PORT", ipaddr)) == -2) { log_err(errno, id, "cannot locate TM_STDERR_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if (LOGLEVEL >= 10) log_ext(-1, id, "MPI/TM variables set", LOG_DEBUG); #ifdef PENABLE_LINUX26_CPUSETS if (use_cpusets(pjob) == TRUE) { int j; char nodeidbuf[1024]; /* FIXME: vnodenum needs to be stored in the task struct so that we don't * have to fish it out here. Then we just pass the int to move_to_taskset * (changing the type of arg3) */ for (j = 0;j < vtable.v_used;j++) { if (!strncmp(vtable.v_envp[j], "PBS_VNODENUM=", strlen("PBS_VNODENUM="))) { strcpy(nodeidbuf, vtable.v_envp[j] + strlen("PBS_VNODENUM=")); /* FIXME: temp debugging info */ sprintf(log_buffer, "about to move to taskset for job %s/%s.\n", pjob->ji_qs.ji_jobid, nodeidbuf); log_ext(-1, id, log_buffer, LOG_DEBUG); /* Move this mom process into the cpuset so the job will start in it. */ /* Changed to move_to_jobset for OpenMPI jobs - CS - 20080526 */ /* move_to_taskset(getpid(),pjob,nodeidbuf); */ move_to_jobset(getpid(), pjob); } } } #endif /* (PENABLE_LINUX26_CPUSETS) */ if (pjob->ji_numnodes > 1) { /* ** Open sockets to demux proc for stdout and stderr. */ if ((fd1 < 0) && ((fd1 = open_demux(ipaddr, pjob->ji_stdout)) == -1)) { log_err(errno, id, "cannot open mux stdout port"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } dup2(fd1, 1); if (fd1 > 1) close(fd1); if ((fd2 < 0) && ((fd2 = open_demux(ipaddr, pjob->ji_stderr)) == -1)) { log_err(errno, id, "cannot open mux stderr port"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } dup2(fd2, 2); if (fd2 > 2) close(fd2); /* never send cookie - PW mpiexec patch */ /* if (write(1,pjob->ji_wattr[(int)JOB_ATR_Cookie].at_val.at_str, strlen(pjob->ji_wattr[(int)JOB_ATR_Cookie].at_val.at_str)) == -1) {} if (write(2,pjob->ji_wattr[(int)JOB_ATR_Cookie].at_val.at_str, strlen(pjob->ji_wattr[(int)JOB_ATR_Cookie].at_val.at_str)) == -1) {} */ } else if ((pjob->ji_wattr[(int)JOB_ATR_interactive].at_flags&ATR_VFLAG_SET) && (pjob->ji_wattr[(int)JOB_ATR_interactive].at_val.at_long > 0)) { /* interactive job, single node, write to pty */ pts = -1; if ((fd1 < 0) || (fd2 < 0)) { if ((pts = open_pty(pjob)) < 0) { log_err(errno, id, "cannot open slave pty"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if (fd1 < 0) fd1 = pts; if (fd2 < 0) fd2 = pts; } dup2(fd1, 1); dup2(fd2, 2); if (fd1 != pts) close(fd1); if (fd2 != pts) close(fd2); } else { /* This code block may be dead (may never be run). This is due to the fact that * start_process() is only called by a sister who has received a IM_SPAWN_TASK, * but the below comment states that the code only runs for a "single node" job, * and all sisters are part of a multi-node job. */ /* normal batch job, single node, write straight to files */ pts = -1; if ((fd1 < 0) || (fd2 < 0)) { if (open_std_out_err(pjob, -1) == -1) { log_err(errno, id, "cannot open job stderr/stdout files"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); } } if (fd1 >= 0) { close(1); dup2(fd1, 1); if (fd1 > 1) close(fd1); } if (fd2 >= 0) { close(2); dup2(fd2, 2); if (fd2 > 2) close(fd2); } } /* END else */ /******************************************************* * At this point, output fds are setup for the job, * any further error messages should be written * directly to fd 2, with a \n, and ended with fsync(2) *******************************************************/ if (LOGLEVEL >= 10) log_ext(-1, id, "about to perform set_job", LOG_DEBUG); j = set_job(pjob, &sjr); if (j < 0) { if (j != -2) { /* set_job didn't leave message in log_buffer */ strcpy(log_buffer, "PBS: unable to set task session\n"); } if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, id, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr); } ptask->ti_qs.ti_sid = sjr.sj_session; log_buffer[0] = '\0'; if ((i = mom_set_limits(pjob, SET_LIMIT_SET)) != PBSE_NONE) { if (log_buffer[0] != '\0') { /* report error to user via stderr file */ if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); } sprintf(log_buffer, "PBS: unable to set limits, err=%d\n", i); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); if (i == PBSE_RESCUNAV) /* resource temp unavailable */ j = JOB_EXEC_RETRY; else j = JOB_EXEC_FAIL2; log_err(errno, id, log_buffer); starter_return(kid_write, kid_read, j, &sjr); } if (LOGLEVEL >= 10) log_ext(-1, id, "set_job complete", LOG_DEBUG); if ((idir = get_job_envvar(pjob, "PBS_O_ROOTDIR")) != NULL) { if (chroot(idir) == -1) { sprintf(log_buffer, "PBS: chroot to %.256s failed: %s\n", idir, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, id, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr); } } /* become the user and execv the shell and become the real job */ if (setgroups(pjob->ji_grpcache->gc_ngroup, (gid_t *)pjob->ji_grpcache->gc_groups) != 0) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setgroups for UID = %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno,id,log_buffer); starter_return(kid_write,kid_read,JOB_EXEC_FAIL2,&sjr); } if (setgid(pjob->ji_qs.ji_un.ji_momt.ji_exgid) != 0) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setgid to %lu for UID = %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exgid, (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno,id,log_buffer); starter_return(kid_write,kid_read,JOB_EXEC_FAIL2,&sjr); } if (setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, FALSE) < 0) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setuid to %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, id, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr); } #ifdef _CRAY setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, TRUE); /* cray kludge */ #endif /* CRAY */ /* cwd to PBS_O_INITDIR if specified, otherwise User's Home */ if ((idir = get_job_envvar(pjob, "PBS_O_INITDIR")) != NULL) { /* in start_process() executed as user */ if (chdir(idir) == -1) { sprintf(log_buffer, "PBS: chdir to %.256s failed: %s\n", idir, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, id, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr); } } else { /* in start_process() executed as user */ if (chdir(pjob->ji_grpcache->gc_homedir) == -1) { sprintf(log_buffer, "PBS: chdir to %.256s failed: %s\n", pjob->ji_grpcache->gc_homedir, strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, id, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr); } } if (LOGLEVEL >= 10) log_ext(-1, id, "done - writing pipe and exec'ing", LOG_DEBUG); starter_return( kid_write, kid_read, JOB_EXEC_OK, &sjr); fcntl(kid_write, F_SETFD, FD_CLOEXEC); #if 0 /* def DEBUG */ for (i = 3;i < 40;++i) { /* check for any extra open descriptors */ if (close(i) >= 0) fprintf(stderr, "Closed file %d\n", i); } #endif /* DEBUG */ environ = vtable.v_envp; execvp(argv[0], argv); /* only reached if execvp() fails */ sprintf(log_buffer, "PBS: %.256s: %s\n", argv[0], strerror(errno)); if (write(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, id, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_CMDFAIL, &sjr); exit(254); /*NOTREACHED*/ return(-1); } /* END start_process() */ /* ** Free the ji_hosts and ji_vnods arrays for a job. If any events are ** attached to an array element, free them as well. */ void nodes_free( job *pj) /* I */ { void arrayfree (char **); hnodent *np; if (pj->ji_vnods != NULL) { free(pj->ji_vnods); pj->ji_vnods = NULL; } if (pj->ji_hosts != NULL) { for (np = pj->ji_hosts;np->hn_node != TM_ERROR_NODE;np++) { eventent *ep = (eventent *)GET_NEXT(np->hn_events); if (np->hn_host) free(np->hn_host); /* don't close stream incase another job uses it */ while (ep) { if (ep->ee_argv) arrayfree(ep->ee_argv); if (ep->ee_envp) arrayfree(ep->ee_envp); delete_link(&ep->ee_next); free(ep); ep = (eventent *)GET_NEXT(np->hn_events); } /* END while (ep) */ } /* END for (np) */ free(pj->ji_hosts); pj->ji_hosts = NULL; } /* END if (pj->ji_hosts != NULL) */ return; } /* END nodes_free() */ /** * Generate array hosts & vnodes for a job from the exec_host attribute. * Call nodes_free() just in case we have seen this job before. * Parse exec_host first to count the number of nodes and allocate * an array of nodeent's. Then, parse it again to get the hostname * of each node and init the other fields of each nodeent element. * The final element will have the ne_node field set to TM_ERROR_NODE. * * @see start_exec() - parent */ void job_nodes( job *pjob) /* I */ { char *id = "job_nodes"; int i, j, nhosts, nodenum; int ix; char *cp, *nodestr; hnodent *hp; vnodent *np; extern char mom_host[]; nodes_free(pjob); nodenum = 1; if (pjob->ji_wattr[(int)JOB_ATR_exec_host].at_flags & ATR_VFLAG_SET) { nodestr = pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str; if (nodestr != NULL) { for (cp = nodestr;*cp;cp++) { if (*cp == '+') nodenum++; } } } else { nodestr = mom_host; } pjob->ji_hosts = (hnodent *)calloc(nodenum + 1, sizeof(hnodent)); pjob->ji_vnods = (vnodent *)calloc(nodenum + 1, sizeof(vnodent)); assert(pjob->ji_hosts); assert(pjob->ji_vnods); pjob->ji_numvnod = nodenum; nhosts = 0; np = pjob->ji_vnods; for (i = 0;i < nodenum;i++, np++) { char *dp, nodename[MAXPATHLEN + 1]; ix = 0; for (cp = nodestr, dp = nodename;*cp;cp++, dp++) { if (*cp == '/') { ix = atoi(cp + 1); while ((*cp != '\0') && (*cp != '+')) ++cp; if (*cp == '\0') { nodestr = cp; break; } } if (*cp == '+') { nodestr = cp + 1; break; } *dp = *cp; } *dp = '\0'; /* see if we already have this host */ for (j = 0;j < nhosts;++j) { if (strcmp(nodename, pjob->ji_hosts[j].hn_host) == 0) break; } hp = &pjob->ji_hosts[j]; if (j == nhosts) { /* need to add host to tn_host */ hp->hn_node = nhosts++; hp->hn_stream = -1; hp->hn_sister = SISTER_OKAY; hp->hn_host = strdup(nodename); CLEAR_HEAD(hp->hn_events); } np->vn_node = i; /* make up node id */ np->vn_host = &pjob->ji_hosts[j]; np->vn_index = ix; if (LOGLEVEL >= 4) { sprintf(log_buffer, "%d: %s/%d", np->vn_node, np->vn_host->hn_host, np->vn_index); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, id, log_buffer); } } /* END for (i) */ np->vn_node = TM_ERROR_NODE; pjob->ji_hosts[nhosts].hn_node = TM_ERROR_NODE; pjob->ji_numnodes = nhosts; pjob->ji_numvnod = nodenum; if (LOGLEVEL >= 2) { sprintf(log_buffer, "job: %s numnodes=%d numvnod=%d", pjob->ji_qs.ji_jobid, nhosts, nodenum); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, id, log_buffer); } return; } /* END job_nodes() */ /** * Start execution of a job. * * @see req_commit() - parent - handle 'commit' request from pbs_server * * @see TMomFinalizeJob1() - child * @see TMomFinalizeJob2() - child * @see TMomFinalizeJob3() - child * * @see start_process() - peer - called for sisters after receiving IM_SPAWN_TASK request * * NOTE: prior to calling start_exec, job is newly allocated, and added to * svr_alljobs w/pjob->ji_qs.ji_state = JOB_STATE_RUNNING and * pjob->ji_qs.ji_substate = JOB_SUBSTATE_PRERUN. * * NOTE: For parallel jobs, this routine will open connections to each sister * mom and will send a join_job request along each connection. */ void start_exec( job *pjob) /* I (modified) */ { static char *id = "start_exec"; eventent *ep; int i, nodenum; int ports[2], socks[2]; struct sockaddr_in saddr; hnodent *np; attribute *pattr; tlist_head phead; svrattrl *psatl; int stream; char tmpdir[MAXPATHLEN]; torque_socklen_t slen; void im_compose ( int stream, char *jobid, char *cookie, int command, tm_event_t event, tm_task_id taskid); /* Step 1.0 Generate Cookie */ if (!(pjob->ji_wattr[(int)JOB_ATR_Cookie].at_flags & ATR_VFLAG_SET)) { char *tt; extern time_t loopcnt; MD5_CTX c; int i; /* alloc 33 bytes? */ tt = malloc(33); if (tt == NULL) { log_err(-1,id,"cannot alloc memory"); exec_bail(pjob,JOB_EXEC_FAIL1); return; } pjob->ji_wattr[(int)JOB_ATR_Cookie].at_val.at_str = tt; pjob->ji_wattr[(int)JOB_ATR_Cookie].at_flags |= ATR_VFLAG_SET; loopcnt++; MD5Init(&c); MD5Update(&c, (unsigned char *)&loopcnt, sizeof(loopcnt)); MD5Update(&c, (unsigned char *)pjob, sizeof(job)); MD5Final(&c); for (i = 0;i < 16;i++) { sprintf(&tt[i * 2], "%02X", c.digest[i]); } DBPRT(("===== MD5 %s\n", tt)) } /* END if () */ /* Step 2.0 Initialize Job */ /* update nodes info w/in job based on exec_hosts attribute */ job_nodes(pjob); /* start_exec only executed on mother superior */ pjob->ji_nodeid = 0; /* I'm MS */ nodenum = pjob->ji_numnodes; /* Step 3.0 Validate/Initialize Environment */ /* check creds early because we need the uid/gid for TMakeTmpDir() */ if (!check_pwd(pjob)) { log_err(-1, id, log_buffer); exec_bail(pjob, JOB_EXEC_FAIL1); return; } /* should we make a tmpdir? */ if (TTmpDirName(pjob, tmpdir)) { if (!TMakeTmpDir(pjob, tmpdir)) { snprintf(log_buffer, 1024, "cannot create temp dir '%s'", tmpdir); log_err(-1, id, log_buffer); exec_bail(pjob, JOB_EXEC_FAIL1); return; } } /* if nodecount > 1, return once joins are sent, if nodecount == 1, return once job is started */ if (nodenum > 1) { /* Step 4.0A Send Join Request to Sisters */ /* parallel job */ pjob->ji_resources = (noderes *)calloc(nodenum - 1, sizeof(noderes)); assert(pjob->ji_resources != NULL); CLEAR_HEAD(phead); pattr = pjob->ji_wattr; for (i = 0;i < (int)JOB_ATR_LAST;i++) { (job_attr_def + i)->at_encode( pattr + i, &phead, (job_attr_def + i)->at_name, NULL, ATR_ENCODE_MOM); } /* END for (i) */ attrl_fixlink(&phead); /* Open streams to the sisterhood. */ for (i = 1;i < nodenum;i++) { np = &pjob->ji_hosts[i]; log_buffer[0] = '\0'; /* rpp_open() will succeed even if MOM is down */ np->hn_stream = rpp_open(np->hn_host, pbs_rm_port, log_buffer); if (np->hn_stream < 0) { pjob->ji_nodekill = i; if (log_buffer[0] != '\0') { sprintf(log_buffer, "rpp_open failed on %s", np->hn_host); } log_err(errno, id, log_buffer); exec_bail(pjob, JOB_EXEC_FAIL1); return; } } /* END for (i) */ /* Open two sockets for use by demux program later. */ for (i = 0;i < 2;i++) socks[i] = -1; for (i = 0;i < 2;i++) { if ((socks[i] = socket(AF_INET, SOCK_STREAM, 0)) == -1) break; memset(&saddr, '\0', sizeof(saddr)); saddr.sin_addr.s_addr = INADDR_ANY; saddr.sin_family = AF_INET; if (bind( socks[i], (struct sockaddr *)&saddr, sizeof(saddr)) == -1) { break; } slen = sizeof(saddr); if (getsockname(socks[i], (struct sockaddr *)&saddr, &slen) == -1) break; ports[i] = (int)ntohs(saddr.sin_port); } /* END for (i) */ if (i < 2) { /* ERROR: cannot open sockets for stdout and stderr */ for (i = 0;i < 2;i++) { if (socks[i] != -1) close(socks[i]); } /* command sisters to abort job and continue */ log_err(errno, id, "stdout/err socket"); exec_bail(pjob, JOB_EXEC_FAIL1); return; } pjob->ji_stdout = socks[0]; pjob->ji_stderr = socks[1]; /* Send out a JOIN_JOB message to all the MOM's in the sisterhood. */ /* NOTE: does not check success of join request */ for (i = 1;i < nodenum;i++) { np = &pjob->ji_hosts[i]; stream = np->hn_stream; ep = event_alloc(IM_JOIN_JOB, np, TM_NULL_EVENT, TM_NULL_TASK); /* im_compose() will succeed even if mom is down */ im_compose( stream, pjob->ji_qs.ji_jobid, pjob->ji_wattr[(int)JOB_ATR_Cookie].at_val.at_str, IM_JOIN_JOB, ep->ee_event, TM_NULL_TASK); diswsi(stream, i); /* nodeid of receiver */ diswsi(stream, nodenum); /* number of nodes */ diswsi(stream, ports[0]); /* out port number */ diswsi(stream, ports[1]); /* err port number */ /* write jobattrs */ psatl = (svrattrl *)GET_NEXT(phead); encode_DIS_svrattrl(stream, psatl); /* NOTE: rpp_flush() will succeed even if MOM is down */ if (rpp_flush(stream) != 0) { sprintf(log_buffer, "ALERT: unable to send join_job message to %s", np->hn_host); log_err(errno, id, log_buffer); } } /* END for (i) */ free_attrlist(&phead); } /* END if (nodenum > 1) */ else { /* Step 4.0B Launch Serial Task Locally */ /* serial job */ int SC; int RC; int Count; pjobexec_t *TJE; /* single node job - no sisters */ ports[0] = -1; ports[1] = -1; pjob->ji_stdout = -1; pjob->ji_stderr = -1; if (TMOMJobGetStartInfo(NULL, &TJE) == FAILURE) { sprintf(log_buffer, "ALERT: cannot locate available job slot"); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); /* on failure, TJE is NULL */ exec_bail(pjob, JOB_EXEC_RETRY); return; } if (TMomFinalizeJob1(pjob, TJE, &SC) == FAILURE) { /* FAILURE (or at least do not continue) */ if (SC != 0) { memset(TJE, 0, sizeof(pjobexec_t)); exec_bail(pjob, SC); } return; } /* TMomFinalizeJob2() blocks until job is fully launched */ if (TMomFinalizeJob2(TJE, &SC) == FAILURE) { if (SC != 0) { memset(TJE, 0, sizeof(pjobexec_t)); exec_bail(pjob, SC); } return; } if (TMomCheckJobChild(TJE, TJobStartBlockTime, &Count, &RC) == FAILURE) { if (LOGLEVEL >= 3) { sprintf(log_buffer, "job not ready after %ld second timeout, MOM will recheck", TJobStartBlockTime); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } return; } /* NOTE: TMomFinalizeJob3() populates SC */ if (TMomFinalizeJob3(TJE, Count, RC, &SC) == FAILURE) { /* no need to log an error, TMomFinalizeJob3 already does it */ memset(TJE, 0, sizeof(pjobexec_t)); exec_bail(pjob, SC); return; } /* SUCCESS: MOM returns */ if (LOGLEVEL >= 3) { sprintf(log_buffer, "job successfully started"); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } /* clear old TJE */ memset(TJE, 0, sizeof(pjobexec_t)); } /* END else (nodenum > 1) */ /* SUCCESS */ if (LOGLEVEL >= 3) { sprintf(log_buffer, "job %s reported successful start on %d node(s)", pjob->ji_qs.ji_jobid, nodenum); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } return; } /* END start_exec() */ /* * fork_me - fork mom, close all other connections and set default signal actions */ pid_t fork_me( int conn) /* I */ { struct sigaction act; pid_t pid; fflush(stdout); fflush(stderr); pid = fork(); if (pid == 0) { /* now the child */ /* Turn off alarm if it should happen to be on */ alarm(0); rpp_terminate(); /* Reset signal actions for most to SIG_DFL */ sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = SIG_DFL; sigaction(SIGCHLD, &act, (struct sigaction *)0); #ifdef _CRAY sigaction(WJSIGNAL, &act, (struct sigaction *)0); #endif /* _CRAY */ sigaction(SIGHUP, &act, (struct sigaction *)0); sigaction(SIGINT, &act, (struct sigaction *)0); sigaction(SIGTERM, &act, (struct sigaction *)0); /* reset signal mask */ sigprocmask(SIG_SETMASK, &act.sa_mask, NULL); mom_close_poll(); /* NOTE: close logfile, lockfile, and connection to server (NYI) */ if (lockfds >= 0) { close(lockfds); lockfds = -1; } log_close(0); net_close(conn); /* close all but for the current */ /* release mlock; it seems to be inherited even though the * man page claims otherwise */ #ifdef _POSIX_MEMLOCK munlockall(); #endif /* _POSIX_MEMLOCK */ } else if (pid < 0) { log_err(errno, "fork_me", "fork failed"); } return(pid); } /* END fork_me() */ /* * starter_return - return starter value, * exit if negative */ static void starter_return( int upfds, /* I */ int downfds, /* I */ int code, /* I */ struct startjob_rtn *sjrtn) /* I */ { struct startjob_rtn ack; int i; int alarmsecs = 0; sjrtn->sj_code = code; if (write(upfds, (char *)sjrtn, sizeof(*sjrtn)) == -1) { } if (code < 0) close(upfds); /* * Wait for acknowledgement. Need to allow for a timeout. If it takes a while * to start the job which includes running prologues then the mom we are * communicating with may have timed out and gone into a recheck mode using * TMOMScanForStarting(). This works fine until someone qdels the job before * this mom can reply. In this case TMOMScanForStarting() won't see this job * anymore and will not read what we just wrote and we will hang forever on * the read. If we timed out we should probably exit instead of continuing on * and trying to start the job which was deleted. */ do { alarm(120); i = read(downfds, &ack, sizeof(ack)); alarmsecs = alarm(0); if ((i == -1) && (errno != EINTR)) { break; } /* check for case where job has been qdel'd */ if ((i == -1) && (errno == EINTR) && (alarmsecs == 0) && (code == 0)) { close(downfds); exit(0); } } while (i < 0); close(downfds); if (code < 0) { exit(254); } return; } /* END starter_return() */ /* * remove_leading_hostname * * removes the leading hostname from the output path in jobpath * the path is stored as : * this retrieves just the path * * @param jobpath - the altered path, I/O * @return SUCCESS if hostname is removed, FALSE otherwise */ int remove_leading_hostname( char **jobpath) /* I / O */ { char *ptr; if ((jobpath == NULL) || (*jobpath == NULL)) { return(FAILURE); } ptr = strchr(*jobpath,':'); if (ptr == NULL) { return(FAILURE); } /* SUCCESS, move past the ':' and return the rest */ ptr++; *jobpath = ptr; return(SUCCESS); } /* * std_file_name - generate the fully qualified path/name for a * job standard stream * * called by TMomFinalizeJob2 fork_me TMomFinalizeChild open_std_out_err open_std_file std_file_name */ char *std_file_name( job *pjob, /* I */ enum job_file which, /* I */ int *keeping) /* O (0 is no keep, 1 is keep) */ { static char path[MAXPATHLEN + 1]; char key; int len; char *pd; char *suffix; char *jobpath = NULL; char *id = "std_file_name"; #ifdef QSUB_KEEP_NO_OVERRIDE char *pt; char endpath[MAXPATHLEN + 1]; #endif #if NO_SPOOL_OUTPUT == 0 int havehomespool = 0; extern char *TNoSpoolDirList[]; #else /* NO_SPOOL_OUTPUT */ struct stat myspooldir; static char path_alt[MAXPATHLEN + 1]; int rcstat; #endif /* NO_SPOOL_OUTPUT */ if (LOGLEVEL >= 5) { sprintf(log_buffer, "getting %s file name", (which == StdOut) ? "stdout" : "stderr"); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } if (keeping == NULL) { /* FAILURE */ return(NULL); } if ((pjob->ji_wattr[(int)JOB_ATR_interactive].at_flags & ATR_VFLAG_SET) && (pjob->ji_wattr[(int)JOB_ATR_interactive].at_val.at_long > 0)) { /* interactive job, name of pty is in outpath */ *keeping = 0; return(pjob->ji_wattr[(int)JOB_ATR_outpath].at_val.at_str); } if (pjob->ji_grpcache == NULL) { /* FAILURE - ji_grpcache required for gc_homedir information */ return(NULL); } switch (which) { case StdOut: key = 'o'; suffix = JOB_STDOUT_SUFFIX; if (pjob->ji_wattr[(int)JOB_ATR_outpath].at_flags & ATR_VFLAG_SET) { jobpath = pjob->ji_wattr[(int)JOB_ATR_outpath].at_val.at_str; if (spoolasfinalname == TRUE) { remove_leading_hostname(&jobpath); if (expand_path(pjob,jobpath,sizeof(path),path) != SUCCESS) { return(NULL); } } } break; case StdErr: key = 'e'; suffix = JOB_STDERR_SUFFIX; if (pjob->ji_wattr[(int)JOB_ATR_errpath].at_flags & ATR_VFLAG_SET) { jobpath = pjob->ji_wattr[(int)JOB_ATR_errpath].at_val.at_str; if (spoolasfinalname == TRUE) { remove_leading_hostname(&jobpath); if (expand_path(pjob, jobpath, sizeof(path), path) != SUCCESS) { return(NULL); } } } break; case Checkpoint: default: key = '\001'; /* should never be found */ suffix = JOB_CHECKPOINT_SUFFIX; break; } /* END switch (which) */ /* everything that changes the path here is ignored if spoolasfinalname is set * to true. spoolasfinalname specifies directly spooling as the output file */ /* Is file to be kept?, if so, place the stderr/stdout files in the * path specified by the user. The path must be local to the execution node, * if a hostname is supplied, it will be stripped off. The only suppported * environment variable is $HOME--any other will cause files to be placed in * the home directory (the default location). */ if ((pjob->ji_wattr[(int)JOB_ATR_keep].at_flags & ATR_VFLAG_SET) && (strchr(pjob->ji_wattr[(int)JOB_ATR_keep].at_val.at_str, key))) { /* yes, it is to be kept */ if (spoolasfinalname == FALSE) { strcpy(path, pjob->ji_grpcache->gc_homedir); pd = strrchr(pjob->ji_wattr[(int)JOB_ATR_jobname].at_val.at_str, '/'); if (pd == NULL) { pd = pjob->ji_wattr[(int)JOB_ATR_jobname].at_val.at_str; strcat(path, "/"); } #ifdef QSUB_KEEP_NO_OVERRIDE /* don't do for checkpoint file names, only StdErr and StdOut */ if (strcmp(suffix, JOB_CHECKPOINT_SUFFIX) != 0) { pt = strstr(jobpath, "$HOME"); if (pt != NULL) { strcpy(endpath, pt + 5); strcpy(pt, pjob->ji_grpcache->gc_homedir); strcat(jobpath, endpath); } if ((strstr(jobpath, pd) == NULL) && (strchr(jobpath, '$') == NULL)) { if (jobpath[strlen(jobpath) - 1] != '/') { strcat(jobpath, "/"); } pt = strchr(jobpath, ':'); if (pt == NULL) { strcpy(path, jobpath); } else { strcpy(path, pt + 1); } } } #endif strcat(path, pd); /* start with the job name */ len = strlen(path); *(path + len++) = '.'; /* the dot */ *(path + len++) = key; /* the letter */ pd = pjob->ji_qs.ji_jobid; /* the seq_number */ while (isdigit((int)*pd)) *(path + len++) = *pd++; *(path + len) = '\0'; } /* END if (spoolasfinalname == FALSE) */ *keeping = 1; } else { /* don't bother keeping output if the user actually wants to discard it */ if ((jobpath != NULL) && (*jobpath != '\0')) { char *ptr; if ((ptr = strchr(jobpath, ':')) != NULL) { jobpath = ptr + 1; } if (!strcmp(jobpath, "/dev/null")) { strcpy(path, "/dev/null"); *keeping = 1; return(path); } } /* put into spool directory unless NO_SPOOL_OUTPUT is defined */ #if NO_SPOOL_OUTPUT == 1 if (spoolasfinalname == FALSE) { /* force all output to user's HOME */ strncpy(path, pjob->ji_grpcache->gc_homedir, sizeof(path)); /* check for $HOME/.pbs_spool */ /* if it's not a directory, just use $HOME us usual */ strncpy(path_alt, path, sizeof(path_alt)); strncat(path_alt, "/.pbs_spool/", sizeof(path_alt)); if (setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, TRUE) == -1) { return(NULL); } rcstat = stat(path_alt, &myspooldir); setuid_ext(pbsuser, TRUE); if ((rcstat == 0) && (S_ISDIR(myspooldir.st_mode))) strncpy(path, path_alt, sizeof(path)); else strncat(path, "/", sizeof(path)); *keeping = 1; } /* END if (spoolasfinalname == FALSE) */ #else /* NO_SPOOL_OUTPUT */ if (spoolasfinalname == FALSE) { if ((TNoSpoolDirList[0] != NULL)) { int dindex; char *wdir; wdir = get_job_envvar(pjob, "PBS_O_WORKDIR"); if (LOGLEVEL >= 10) { sprintf(log_buffer, "wdir: %s", wdir); log_ext(-1, id, log_buffer, LOG_DEBUG); } if (wdir != NULL) { /* check if job's work dir matches the no-spool directory list */ if (LOGLEVEL >= 10) log_ext(-1, id, "inside wdir != NULL", LOG_DEBUG); for (dindex = 0;dindex < TMAX_NSDCOUNT;dindex++) { if (TNoSpoolDirList[dindex] == NULL) break; if (!strcasecmp(TNoSpoolDirList[dindex], "$WORKDIR") || !strcmp(TNoSpoolDirList[dindex], "*")) { havehomespool = 1; if (LOGLEVEL >= 10) log_ext(-1, id, "inside !strcasecmp", LOG_DEBUG); strncpy(path, wdir, sizeof(path)); break; } if (!strncmp(TNoSpoolDirList[dindex], wdir, strlen(TNoSpoolDirList[dindex]))) { havehomespool = 1; if (LOGLEVEL >= 10) log_ext(-1, id, "inside !strncmp", LOG_DEBUG); strncpy(path, wdir, sizeof(path)); break; } } /* END for (dindex) */ } /* END if (wdir != NULL) */ } /* END if (TNoSpoolDirList != NULL) */ if (havehomespool == 0) { strncpy(path, path_spool, sizeof(path)); } else { strncat(path, "/", sizeof(path)); } } /* END if (spoolasfinalname == FALSE) */ *keeping = 0; #endif /* NO_SPOOL_OUTPUT */ if (spoolasfinalname == FALSE) { strncat(path, pjob->ji_qs.ji_fileprefix, (sizeof(path) - strlen(path) - 1)); strncat(path, suffix, (sizeof(path) - strlen(path) - 1)); if (LOGLEVEL >= 10) { sprintf(log_buffer, "path: '%s' prefix: '%s' suffix: '%s'", path, pjob->ji_qs.ji_fileprefix, suffix); log_ext(-1, id, log_buffer, LOG_DEBUG); } } /* END if (spoolasfinalname == FALSE) */ } /* END else ((pjob->ji_wattr[(int)JOB_ATR_keep].at_flags & ...)) */ return(path); } /* END std_file_name() */ /** * open_std_file - open/create either standard output or standard error * for the job. * * NOTE: called by pbs_mom child - cannot log to mom log file - use * log_err to report to syslog * * RETURN: -1 on failure, -2 on timeout, or file descriptor on success */ int open_std_file( job *pjob, /* I */ enum job_file which, /* which file */ int mode, /* file mode */ gid_t exgid) /* gid for file */ { int fds; int keeping; /* boolean: 1=TRUE, 0=FALSE */ char *path; int old_umask = 0; char *id = "open_std_file"; int changed_to_user = FALSE; int rc; struct stat statbuf; if ((path = std_file_name(pjob, which, &keeping)) == NULL) { log_err(-1, id, "cannot determine filename"); /* FAILURE - cannot determine filename */ return(-1); } /* become user to create file, if we aren't already the user. In * run_pelog setuid etc. are called and the this function is invoked, * so doing this again fails and is unnecessary */ if (LOGLEVEL > 7) { sprintf(log_buffer, "job %s which = %d getuid() = %d geteuid = %d to euid = %d", pjob->ji_qs.ji_jobid, which, getuid(), geteuid(), pjob->ji_qs.ji_un.ji_momt.ji_exuid); log_ext(-1, id, log_buffer, LOG_DEBUG); } #ifdef __CYGWIN__ if (IamRoot() == 1) #else if ((getuid() == 0) && (geteuid() != pjob->ji_qs.ji_un.ji_momt.ji_exuid)) #endif { if (setgroups(pjob->ji_grpcache->gc_ngroup,(gid_t *)pjob->ji_grpcache->gc_groups) != 0) { snprintf(log_buffer,sizeof(log_buffer), "setgroups failed for UID = %lu, error: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); log_err(errno,id,log_buffer); } if (setegid(pjob->ji_qs.ji_un.ji_momt.ji_exgid) != 0) { snprintf(log_buffer,sizeof(log_buffer), "setegid(%lu) failed for UID = %lu, error: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exgid, (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); log_err(errno,id,log_buffer); return(-1); } if (setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, TRUE) != 0) { snprintf(log_buffer,sizeof(log_buffer), "seteuid(%lu) failed, error: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); log_err(errno,id,log_buffer); setegid(pbsgroup); return(-1); } changed_to_user = TRUE; } /* these checks are a bit complicated. If keeping, we do what the user * says. Otherwise, make sure we aren't following a symlink and that the user owns the file without breaking /dev/null. */ if (keeping) { mode &= ~O_EXCL; } else { if (lstat(path, &statbuf) == 0) { /* lstat succeeded */ if (S_ISLNK(statbuf.st_mode)) { log_err(-1, id, "std file is symlink, someone is doing something fishy"); goto reset_ids_fail; } if (S_ISREG(statbuf.st_mode)) { if (statbuf.st_uid != pjob->ji_qs.ji_un.ji_momt.ji_exuid) { log_err(-1, id, "std file exists with the wrong owner, someone is doing something fishy"); goto reset_ids_fail; } if ((statbuf.st_gid != exgid) && (statbuf.st_gid != 0)) { int i; int equal = FALSE; /* check all of the secondary groups before throwing an error */ for (i = 0; i < pjob->ji_grpcache->gc_ngroup; i++) { if (statbuf.st_gid == (gid_t)pjob->ji_grpcache->gc_groups[i]) equal = TRUE; } if (equal == FALSE) { #ifdef RESETGROUP /* Change the group of the file to be the user's group */ if (chown(path, pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) { log_err(errno, id, "std file exists with the wrong group, someone is doing something fishy, cannot change file group"); goto reset_ids_fail; } #else log_err(-1, id, "std file exists with the wrong group, someone is doing something fishy"); goto reset_ids_fail; #endif } } } /* seems reasonably safe to append to the existing file */ /* file exists - do not need to create or open exclusive */ mode &= ~(O_EXCL | O_CREAT); } /* END if (lstat(path,&statbuf) == 0) */ else { /* lstat failed - should we return failure in all cases? */ if (errno == EINTR) { sprintf(log_buffer, "cannot stat stdout/stderr file '%s' (timeout)", path); if (LOGLEVEL >= 6) log_err(errno, id, log_buffer); /* fail on timeout */ goto reset_ids_timeout; } else { sprintf(log_buffer, "cannot stat stdout/stderr file '%s' - file does not exist, will create", path); if (LOGLEVEL >= 6) log_ext(errno, id, log_buffer, LOG_DEBUG); } } } /* END else (keeping) */ if (pjob->ji_wattr[JOB_ATR_umask].at_flags & ATR_VFLAG_SET) { old_umask = umask(pjob->ji_wattr[JOB_ATR_umask].at_val.at_long); } /* open file */ fds = open(path, mode, 0666); if (fds == -1) { /* errno can change in functions called between here and the if check below */ int local_errno = errno; sprintf(log_buffer, "cannot open/create stdout/stderr file '%s' (mode: %o, keeping: %s)", path, mode, (keeping == 0) ? "FALSE" : "TRUE"); log_err(local_errno, id, log_buffer); if (local_errno == ENOENT) { char *ptr; char tmpLine[1024]; int still_failed = TRUE; /* attempt to make the directory if it doesn't exist */ if (attempttomakedir == TRUE) { /* don't make the file a directory - make the last slash empty */ char *slash = path; while (strchr(slash + 1, '/') != NULL) slash = strchr(slash + 1, '/'); if (slash != NULL) *slash = '\0'; mkdirtree(path,0755); /* undo the marking of the end of path as NULL */ *slash = '/'; if ((fds = open(path, mode, 0666)) != -1) { /* SUCCESS */ still_failed = FALSE; } } if (still_failed == TRUE) { /* parent directory does not exist - find out what part of subtree exists */ strncpy(tmpLine, path, sizeof(tmpLine)); while ((ptr = strrchr(tmpLine, '/')) != NULL) { *ptr = '\0'; if (lstat(tmpLine, &statbuf) == 0) { /* lstat succeeded */ sprintf(log_buffer, "'%s' exists\n", tmpLine); break; } /* END if (lstat(tmpLine,&statbuf) == 0) */ else { /* lstat failed - should we return failure in all cases? */ if (errno == EINTR) sprintf(log_buffer, "cannot stat stdout/stderr file '%s' (timeout)\n", tmpLine); else sprintf(log_buffer, "cannot stat stdout/stderr file '%s' - file does not exist\n", tmpLine); } } /* END while ((ptr = strrchr(tmpLine,'/')) != NULL) */ } /* END if still_failed == TRUE */ } /* END if (errno == ENOENT) */ } /* END if (fds == -1) */ if (old_umask) { umask(old_umask); } if (changed_to_user) { rc = setuid_ext(pbsuser, TRUE); if (rc != 0) { snprintf(log_buffer,sizeof(log_buffer), "seteuid(%lu) failed, error: %s\n", (unsigned long)pbsuser, strerror(errno)); log_err(errno,id,log_buffer); } setegid(pbsgroup); } if (fds == -1) { /* FAILURE - cannot open file */ if (errno == EINTR) { /* TIMEOUT */ return(-2); } } if (LOGLEVEL >= 4) { if (fds >= 0) { sprintf(log_buffer, "successfully created/opened stdout/stderr file '%s'", path); log_ext(-1, id, log_buffer, LOG_DEBUG); } } return(fds); reset_ids_fail: if (changed_to_user) { setuid_ext(pbsuser, TRUE); setegid(pbsgroup); } return(-1); reset_ids_timeout: if (changed_to_user) { setuid_ext(pbsuser, TRUE); setegid(pbsgroup); } return(-2); } /* END open_std_file() */ /* * find_env_slot - find if the environment variable is already in the table, * If so, replease existing one with new one. */ static int find_env_slot( struct var_table *ptbl, char *pstr) { int i; int len = 1; /* one extra for '=' */ for (i = 0;(*(pstr + i) != '\0') && (*(pstr + i) != '=');++i) ++len; for (i = 0;i < ptbl->v_used;++i) { if (strncmp(ptbl->v_envp[i], pstr, len) == 0) { return(i); } } /* END for (i) */ return(-1); } /* END find_env_slot() */ /* * bld_env_variables - build up the array of environment variables which are * passed to the job. * * Value may be null if total string (name=value) is included in "name". */ int bld_env_variables( struct var_table *vtable, /* I (modified) */ char *name, /* I (required) */ char *value) /* I (optional) */ { int amt; int i; int rc = PBSE_NONE; char *id = "bld_env_variables"; if (vtable->v_used == vtable->v_ensize) { /* no room for pointer */ if ((rc = expand_vtable(vtable)) == PBSE_NONE) { snprintf(log_buffer, sizeof(log_buffer), "Successfully expanded environment variables table"); log_ext(-1, id, log_buffer, LOG_INFO); } else { snprintf(log_buffer, sizeof(log_buffer), "Error in expanding environment variables table of pointers; err: %d", rc); log_err(-1, id, log_buffer); return rc; } } if ((name == NULL) || (name[0] == '\0')) { /* FAILURE - name required */ if (LOGLEVEL >= 7) { log_err(-1, "bld_env_variables", "invalid name passed"); } return(-1); } if (LOGLEVEL >= 6) { char tmpLine[1024]; snprintf(tmpLine, sizeof(tmpLine), "building var '%s' (value: '%.64s')", name, (value != NULL) ? value : "NULL"); log_ext(-1, "bld_env_variables", tmpLine, LOG_DEBUG); } /* * We do not want the BATCH_PARTITION_ID to be passed down to the child. * It just needs to be checked for the job submitted by the user, not for * any jobs that the job might qsub. */ if (memcmp(name,"BATCH_PARTITION_ID",strlen("BATCH_PARTITION_ID")) == 0) { return -1; } amt = strlen(name) + 1; if (value != NULL) amt += strlen(value) + 1; /* plus 1 for "=" */ if (amt > vtable->v_bsize) { /* no room for string */ if ((rc = expand_vtable(vtable)) == PBSE_NONE) { snprintf(log_buffer, sizeof(log_buffer), "Successfully expanded environment variables table"); log_ext(-1, id, log_buffer, LOG_INFO); } else { snprintf(log_buffer, sizeof(log_buffer), "Error in expanding environment variables table; err: %d", rc); log_err(-1, id, log_buffer); return rc; } } strcpy(vtable->v_block, name); if (value != NULL) { strcat(vtable->v_block, "="); strcat(vtable->v_block, value); } if ((i = find_env_slot(vtable, vtable->v_block)) < 0) { *(vtable->v_envp + vtable->v_used++) = vtable->v_block; } else { *(vtable->v_envp + i) = vtable->v_block; } vtable->v_block += amt; vtable->v_bsize -= amt; return rc; } /* END bld_env_variables() */ /* expand_vtable is called when either the array of character pointers in vtable was filled or the block of memory used to store the env. variables was full. While in this function, it checks to see if either one of the other does require the reallocation by checking its threshold */ int expand_vtable( struct var_table *vtable) { int expand_ensize = 0; /* boolean to check on array of pointers */ int expand_bsize = 0; /* boolean to check on the block of memory storage */ int amt = 0; struct var_table tmp_vtable; int rc = PBSE_NONE; char *id = "expand_vtable"; if (vtable->v_ensize - vtable->v_used < EN_THRESHOLD) expand_ensize = 1; if (vtable->v_bsize < B_THRESHOLD) expand_bsize = 1; memset(&tmp_vtable, 0, sizeof(struct var_table)); if (expand_ensize) tmp_vtable.v_ensize = vtable->v_ensize + EN_THRESHOLD; else tmp_vtable.v_ensize = vtable->v_ensize; /* tmp holder for data copying */ tmp_vtable.v_envp = calloc(tmp_vtable.v_ensize, sizeof(char *)); if (!tmp_vtable.v_envp) { sprintf(log_buffer, "PBS: failed to allocate memory for v_envp: %s\n", strerror(errno)); log_err(errno, id, log_buffer); return -1; } tmp_vtable.v_used = vtable->v_used; if (expand_bsize) { amt = EXTRA_VARIABLE_SPACE + (vtable->v_block - vtable->v_block_start) + vtable->v_bsize; tmp_vtable.v_block_start = calloc(1, amt); tmp_vtable.v_block = tmp_vtable.v_block_start; tmp_vtable.v_bsize = amt; if (!tmp_vtable.v_block_start) { sprintf(log_buffer, "PBS: failed to allocate memory for v_bsize: %s\n", strerror(errno)); log_err(errno, id, log_buffer); return -1; } } if ((rc = copy_data(&tmp_vtable, vtable, expand_bsize, expand_ensize) != PBSE_NONE)) { if (tmp_vtable.v_block_start) free(tmp_vtable.v_block_start); if (tmp_vtable.v_envp) free(tmp_vtable.v_envp); } return rc; } int copy_data( struct var_table *tmp_vtable, struct var_table *vtable, int expand_bsize, int expand_ensize) { char *p_next_block; int len_plus_one, i; char *id = "copy_data"; if (!expand_ensize && !expand_bsize ) return PBSE_NONE; if (expand_ensize && (!expand_bsize)) { /* only the pointers have been expanded and therefore copy the existing values to the new storage of pointers */ for (i = 0; i < vtable->v_used; ++i) *(tmp_vtable->v_envp + i) = *(vtable->v_envp + i); /* free the old storage and assign the new one */ free(vtable->v_envp); vtable->v_envp = tmp_vtable->v_envp; vtable->v_ensize = tmp_vtable->v_ensize; } else if (expand_bsize) { /* block of memory that contains the actual env. variables was reallocated */ p_next_block = tmp_vtable->v_block_start; for (i = 0; i < vtable->v_used; ++i) { len_plus_one = strlen(*(vtable->v_envp + i)) + 1; /* following condition is reached only for a non-null terminated variable */ if (len_plus_one > tmp_vtable->v_bsize) { sprintf(log_buffer, "PBS: failed to copy env var, size: %d space left in buf: %d\n", len_plus_one, tmp_vtable->v_bsize); log_err(errno, id, log_buffer); return -1; } strcpy(p_next_block, *(vtable->v_envp + i)); *(tmp_vtable->v_envp + i) = p_next_block; p_next_block += len_plus_one; tmp_vtable->v_bsize -= len_plus_one; } /*free the old memory block */ vtable->v_bsize = tmp_vtable->v_bsize; vtable->v_block = p_next_block; free(vtable->v_block_start); vtable->v_block_start = tmp_vtable->v_block_start; if (expand_ensize) { /* if the pointers to the env. variables were reallocated adjust vtable->envp and free the old storage of those pointers */ free(vtable->v_envp); vtable->v_envp = tmp_vtable->v_envp; vtable->v_ensize = tmp_vtable->v_ensize; } else { /* copy the new location. Note all memory that had been allocated to tmp_vtable will be freed in the routine where they've been allocated */ for (i = 0; i < vtable->v_used; ++i) *(vtable->v_envp + i) = *(tmp_vtable->v_envp + i); } } return PBSE_NONE; } #ifndef __TOLDGROUP /* * init_groups - build the group list via an LDAP friendly method */ int init_groups( char *pwname, /* I User's name */ int pwgrp, /* I User's group from pw entry */ int groupsize,/* I size of the array, following argument */ int *groups) /* O ptr to group array, list build there */ { /* DJH Jan 2004. The original implementation looped over all groups looking for membership. Thats OK for /etc/groups, but thrashes LDAP if you're using that for groups in nsswitch.conf. Since there is an explicit LDAP backend to do initgroups (3) efficiently in nss_ldap (on Linux), lets use initgroups() to figure out the group membership. A little clunky, but not too ugly. */ /* return -1 on failure */ char id[] = "init_groups"; extern sigset_t allsigs; /* set up at the start of mom_main */ sigset_t savedset; int n, nsaved; gid_t savedgroups[NGROUPS_MAX + 16]; /* plus one for the egid below */ gid_t momegid; int i; /* save current group access because we're about to overwrite it */ nsaved = getgroups(NGROUPS_MAX, savedgroups); if (nsaved < 0) { log_err(errno, id, "getgroups"); return(-1); } /* From the Linux man page: It is unspecified whether the effective group ID of the calling process is included in the returned list. (Thus, an application should also call getegid(2) and add or remove the resulting value.) */ momegid = getegid(); /* search for duplicates */ for (i = 0;i < nsaved;i++) { if (savedgroups[i] == momegid) break; } if (i >= nsaved) savedgroups[nsaved++] = momegid; if (pwgrp == 0) { /* Emulate the original init_groups() behaviour which treated gid==0 as a special case */ struct passwd *pwe = getpwnam_ext(pwname); if (pwe == NULL) { log_err(errno, id, "no such user"); return(-1); } pwgrp = pwe->pw_gid; } if (LOGLEVEL >= 4) { log_ext(-1, id, "pre-sigprocmask", LOG_DEBUG); } /* Block signals while we do this or else the signal handler might run with strange group access */ if (sigprocmask(SIG_BLOCK, &allsigs, &savedset) == -1) { log_err(errno, id, "sigprocmask(BLOCK)"); return(-1); } n = 0; if (initgroups(pwname, pwgrp) < 0) { log_err(errno, id, "initgroups"); n = -1; } else { n = getgroups(groupsize, (gid_t *)groups); } if (LOGLEVEL >= 4) { log_ext(-1, id, "post-initgroups", LOG_DEBUG); } /* restore state */ if (setgroups(nsaved, savedgroups) < 0) log_err(errno, id, "setgroups"); if (sigprocmask(SIG_SETMASK, &savedset, NULL) == -1) log_err(errno, id, "sigprocmask(SIG_SETMASK)"); return(n); } /* END init_groups() */ #else /* !__TOLDGROUP */ /* * init_groups - read the /etc/group file and build an array of * group memberships for user pwname. */ int init_groups( char *pwname, /* I User's name */ int pwgrp, /* I User's group from pw entry */ int groupsize, /* I size of the array, following argument */ int *groups) /* O ptr to group array, list build there */ { struct group *grp; int i; int n; n = 0; if (pwgrp != 0) *(groups + n++) = pwgrp; setgrent(); while ((grp = getgrent())) { if (grp->gr_gid == (gid_t)pwgrp) continue; for (i = 0;grp->gr_mem[i];i++) { if (!strcmp(grp->gr_mem[i], pwname)) { if (n == groupsize) { endgrent(); return(-1); } *(groups + n++) = grp->gr_gid; } } } /* END while (grp) */ endgrent(); return(n); } /* END init_groups() */ #endif /* !__TOLDGROUP */ /* * catchinter = catch death of writer child and/or shell child of interactive * When one dies, kill off the other; there is no mercy in this family. */ static void catchinter( int sig) /* I (not used) */ { int status; pid_t pid; pid = waitpid(-1, &status, WNOHANG); if (pid == 0) { return; } if (pid == writerpid) { kill(shellpid, SIGKILL); wait(&status); } else { kill(writerpid, SIGKILL); wait(&status); } mom_reader_go = 0; return; } /* END catchinter() */ /* * Look for a certain environment variable which has a port# which should * be opened on the MS to establish communication for one of the 3 stdio * streams. >=0 return is that valid fd, -1 means no env var found, * -2 means malformed env value or failure to connect. */ static int search_env_and_open( const char *envname, /* I */ u_long ipaddr) /* I */ { static char *id = "search_env_and_open"; int i, len; len = strlen(envname); for (i = 0;i < vtable.v_used;i++) { if (!strncmp(vtable.v_envp[i], envname, len)) { const char *cp = vtable.v_envp[i] + len; char *cq; int fd, port; if (*cp++ != '=') break; /* empty, ignore it */ port = strtol(cp, &cq, 10); if (*cq) { sprintf(log_buffer, "improper value for %s", envname); log_err(errno, id, log_buffer); return(-2); } #if 0 /* debugging */ log_err(-1, "search_env_and_open attempting open", vtable.v_envp[i]); #endif if ((fd = open_demux(ipaddr, port)) < 0) { sprintf(log_buffer, "failed connect to stdio on %s:%d", vtable.v_envp[i], port); log_err(errno, id, log_buffer); return(-2); } return(fd); } } /* END for (i) */ /* not found */ return(-1); } /* END search_env_and_open() */ int socket_avail_bytes_on_descriptor(int socket) { unsigned avail_bytes; if (ioctl(socket, FIONREAD, &avail_bytes) != -1) return avail_bytes; return 0; } int TMomCheckJobChild( pjobexec_t *TJE, /* I */ int Timeout, /* I (in seconds) */ int *Count, /* O (bytes read) */ int *RC) /* O (return code/errno) */ { int i; fd_set fdset; int rc; int read_size = sizeof(struct startjob_rtn); struct timeval timeout; /* NOTE: assume if anything is on pipe, everything is on pipe (may reasult in hang) */ /* block up to timeout, wait for child to complete indicating success/failure of job launch */ /* read returns the session id or error */ timeout.tv_sec = Timeout; timeout.tv_usec = 0; errno = 0; FD_ZERO(&fdset); FD_SET(TJE->jsmpipe[0], &fdset); rc = select( TJE->jsmpipe[0] + 1, &fdset, (fd_set *)NULL, (fd_set *)NULL, &timeout); if (rc <= 0) { /* TIMEOUT - data not yet available */ return(FAILURE); } if (socket_avail_bytes_on_descriptor(TJE->jsmpipe[0]) < read_size) { return FAILURE; } for (;;) { i = read(TJE->jsmpipe[0], (char *) & TJE->sjr, read_size); if ((i == -1) && (errno == EINTR)) continue; break; } *RC = errno; *Count = i; if (LOGLEVEL >= 4) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, (TJE->pjob != NULL) ? ((job *)TJE->pjob)->ji_qs.ji_jobid : "???", "task/session info loaded"); } return(SUCCESS); } /* END TMomCheckJobChild() */ #ifdef USEJOBCREATE /* * Get a job_id from the system. Return job id or -1 if error */ uint64_t get_jobid( char* pbs_jobid) { static char *id = "get_jobid"; uint64_t job_id; #ifndef JOBFAKE job_id = job_create(0, getuid(), 0); #else static long fakejobid = 0x00000000ffffffff; srand(getpid() + time(NULL)); job_id = fakejobid + rand(); #endif /* JOBFAKE */ if (job_id == JOB_FAIL) { if (LOGLEVEL >= 3) { sprintf(log_buffer, "Failed to get system job id for pbs job %s", pbs_jobid); log_err(-1, id, log_buffer); } return(job_id); } if (LOGLEVEL >= 7) { sprintf(log_buffer, "Got system job id = %lx(%ld) for pbs job %s", job_id, job_id, pbs_jobid); log_ext(-1, id, log_buffer, LOG_DEBUG); } return(job_id); } /* END get_jobid() */ #endif /* USEJOBCREATE */ #ifdef ENABLE_CSA /* * Check to see if CSA in installed or turned on. Return 1 if it is good else 0 */ int check_csa_status(enum csa_chk_cmd chk_action) { #ifndef CSAFAKE static char *id = "check_csa_status"; #endif /* CSAFAKE */ int csa_stat = 0; #ifndef CSAFAKE struct csa_check_req ck_req; ck_req.ck_stat.am_id = (chk_action == IS_INSTALLED) ? ACCT_KERN_CSA : ACCT_DMD_WKMG; ck_req.ck_stat.am_status = ACS_OFF; ck_req.ck_stat.am_param = 0; if (csa_check(&ck_req) != 0) { if (errno != ENOSYS) { sprintf(log_buffer,"check_csa_status errno = %d\n", errno); log_err(-1, id, log_buffer); } return 0; } if (chk_action == IS_INSTALLED) { /* since the call to csa_check was successful, we know csa is installed */ csa_stat = 1; } else { csa_stat = (ck_req.ck_stat.am_status != ACS_ON) ? 0 : 1; } #else csa_stat = 1; #endif /* CSAFAKE */ return(csa_stat); } /* * createWLMRec() Create Workload Management Record * Inputs: job_id - job ID * type - start or termination record * subtype - Subtype of record * prid - project ID * ash - Array session handle * code - account ID * reqid - Request ID * compCode - Request completion code * calls: csa_wracct() * Return 1 if it is good, otherwise 0 * */ int create_WLM_Rec( char* pbs_jobid, uint64_t job_id, int type, int subtype, uint64_t prid, uint64_t ash, int64_t compCode, int64_t reqid) { static char *id = "create_WLM_Rec"; #ifndef CSAFAKE struct csa_wra_req cw; struct wkmgmtbs wkm; #endif /* CSAFAKE */ char rec_type[8]; char sub_type[8]; /* * First, create an workload management record */ if (type == WM_INIT) { strcpy(rec_type, "init"); if (subtype == WM_INIT_START) { strcpy(sub_type, "start"); } else if (subtype == WM_INIT_RESTART) { strcpy(sub_type, "restart"); } else if (subtype == WM_INIT_RERUN) { strcpy(sub_type, "rerun"); } else { sprintf(log_buffer, "WM_INIT bad sub type = %d for pbs job %s", subtype, pbs_jobid); log_err(-1, id, log_buffer); return 0; } } else if (type == WM_TERM) { strcpy(rec_type, "term"); if (subtype == WM_TERM_EXIT) { strcpy(sub_type, "exited"); } else if (subtype == WM_TERM_REQUEUE) { strcpy(sub_type, "requeue"); } else if (subtype == WM_TERM_HOLD) { strcpy(sub_type, "hold"); } else if (subtype == WM_TERM_RERUN) { strcpy(sub_type, "rerun"); } else if (subtype == WM_TERM_MIGRATE) { strcpy(sub_type, "migrate"); } else { sprintf(log_buffer, "WM_TERM bad sub type = %d for pbs job %s", subtype, pbs_jobid); log_err(-1, id, log_buffer); return 0; } } else if (type == WM_RECV) { strcpy(rec_type, "recv"); if (subtype == WM_RECV_NEW) { strcpy(sub_type, "new"); } else { sprintf(log_buffer, "WM_RECV bad sub type = %d for pbs job %s", subtype, pbs_jobid); log_err(-1, id, log_buffer); return 0; } } else { sprintf(log_buffer, "bad record type = %d for pbs job %s", type, pbs_jobid); log_err(-1, id, log_buffer); return 0; } #ifdef CSAFAKE if (LOGLEVEL >= 7) { sprintf(log_buffer, "Creating FAKE CSA workload management %s - %s record for: " "job_id = %lx, compCode = %d, pbs job %s", (char*)&rec_type, (char*)&sub_type, job_id, (int)compCode, pbs_jobid); log_ext(-1, id, log_buffer, LOG_DEBUG); } #else if (LOGLEVEL >= 7) { sprintf(log_buffer, "Creating CSA workload management %s - %s record for: " "job_id = %llx, compCode = %d, pbs job %s", &rec_type, &sub_type, job_id, compCode, pbs_jobid); log_ext(-1, id, log_buffer, LOG_DEBUG); } memset(&wkm, 0, sizeof(wkm)); /* fill in header data */ wkm.hdr.ah_magic = ACCT_MAGIC; wkm.hdr.ah_revision = REV_APP; wkm.hdr.ah_type = ACCT_DAEMON_WKMG; if (!getuid() || ! geteuid()) wkm.hdr.ah_flag |= ASU; /* set super user flag */ wkm.hdr.ah_size = sizeof(struct wkmgmtbs); /* fill in rest of needed data */ wkm.type = type; wkm.subtype = subtype; wkm.arrayid = 0; strncpy(&wkm.serv_provider[0], "TORQUE ", sizeof(wkm.serv_provider)); if ((wkm.time = time(NULL)) == (time_t) - 1) { sprintf(log_buffer, "error setting time, errno = %d - %s pbs job = %s", errno, strerror(errno), pbs_jobid); log_err(-1, id, log_buffer); return 0; } if ((wkm.enter_time = time(NULL)) == (time_t) - 1) { sprintf(log_buffer, "error setting INIT enter time, errno = %d - %s pbs job = %s", errno, strerror(errno), pbs_jobid); log_err(-1, id, log_buffer); return 0; } wkm.uid = getuid(); wkm.prid = prid; wkm.ash = ash; wkm.jid = job_id; /* * character fields need to be NULL terminated */ strncpy(&wkm.machname[0], "TORQUE-MACHINE", sizeof(wkm.machname) - 1); strncpy(&wkm.reqname[0], "TORQUE-REQUEST", sizeof(wkm.reqname) - 1); strncpy(&wkm.quename[0], "TORQUE-QUEUE", sizeof(wkm.quename) - 1); wkm.reqid = reqid; if (type == WM_TERM) { wkm.code = compCode; } /* * Fill in the CSA_WRACCT request structure for csa ioctl call */ memset(&cw, 0, sizeof(cw)); cw.wra_did = ACCT_DMD_WKMG; /* daemon ID */ cw.wra_len = sizeof(struct wkmgmtbs); /* length of app record */ cw.wra_jid = job_id; /* job Id from job create */ cw.wra_buf = (char *) & wkm; /* pointer to record */ if (csa_wracct(&cw)) { /* EINVAL is okay for a WM_TERM */ if ((type != WM_TERM) && (errno != EINVAL)) { sprintf(log_buffer, "error writing wkm %s record, errno=%d - %s pbs job = %s", &rec_type, errno, strerror(errno), pbs_jobid); log_err(-1, id, log_buffer); return 0; } } #endif /* CSAFAKE */ return 1; } /* * add_wkm_start - Add an start workload management record for a job */ void add_wkm_start( uint64_t job_id, char* pbs_jobid) { static char *id = "add_wkm_start"; if (check_csa_status(IS_UP)) { /* check if we have a valid job id, if not just return */ if (job_id == JOB_FAIL) { if (LOGLEVEL >= 2) { sprintf(log_buffer, "%s called with bad job id = %lx for pbs job %s", id, job_id, pbs_jobid); log_err(-1, id, log_buffer); } return; } /* Add a workload management received record before the start */ if (create_WLM_Rec(pbs_jobid, job_id, WM_RECV, WM_RECV_NEW, 0, 0, 0, 0)) { if (LOGLEVEL >= 7) { sprintf(log_buffer, "Added CSA workload management WM_RECV for job id = %lx for pbs job %s", job_id, pbs_jobid); log_ext(-1, id, log_buffer, LOG_DEBUG); } } else { if (LOGLEVEL >= 2) { sprintf(log_buffer, "Failed to add CSA workload management WM_RECV for job id = %lx for pbs job %s", job_id, pbs_jobid); log_err(-1, id, log_buffer); } return; } if (create_WLM_Rec(pbs_jobid, job_id, WM_INIT, WM_INIT_START, 0, 0, 0, 0)) { if (LOGLEVEL >= 7) { sprintf(log_buffer, "Added CSA workload management WM_INIT for job id = %lx for pbs job %s", job_id, pbs_jobid); log_ext(-1, id, log_buffer, LOG_DEBUG); } } else if (LOGLEVEL >= 2) { sprintf(log_buffer, "Failed to add CSA workload management WM_INIT for job id = %lx for pbs job %s", job_id, pbs_jobid); log_err(-1, id, log_buffer); } } return; } /* END add_wkm_start() */ /* * add_wkm_end - Add an end workload management record for a job */ void add_wkm_end( uint64_t job_id, int64_t comp_code, char* pbs_jobid) { static char *id = "add_wkm_end"; if (check_csa_status(IS_UP)) { /* check if we have a valid job id, if not just return */ if (job_id == JOB_FAIL) { if (LOGLEVEL >= 2) { sprintf(log_buffer, "%s called with bad job id = %lx for pbs job %s", id, job_id, pbs_jobid); log_err(-1, id, log_buffer); } return; } if (create_WLM_Rec(pbs_jobid, job_id, WM_TERM, WM_TERM_EXIT, 0, 0, comp_code, 0)) { if (LOGLEVEL >= 7) { sprintf(log_buffer, "Added CSA workload management WM_TERM for job id = %lx for pbs job %s", job_id, pbs_jobid); log_ext(-1, id, log_buffer, LOG_DEBUG); } } else if (LOGLEVEL >= 2) { sprintf(log_buffer, "Failed to add CSA workload management WM_TERM for job id = %lx for pbs job %s", job_id, pbs_jobid); log_err(-1, id, log_buffer); } } return; } /* END add_wkm_end() */ #endif /* ENABLE_CSA */ /* * @param pjob - used to set up the user's environment if desired * @param path_in - the path that will be expanded * @param pathlen - viable space in path * @param path - where to save the new path */ int expand_path( job *pjob, /* I optional */ char *path_in, /* I */ int pathlen, /* I */ char *path) /* I */ { #ifndef HAVE_WORDEXP /* no need for expansion if this isn't defined */ return(SUCCESS); #else char **environ_old = environ; wordexp_t exp; if ((path_in == NULL) || (path == NULL)) { /* must have inputs and outputs */ return(FAILURE); } if (vtable.v_envp == NULL) { if (pjob != NULL) { InitUserEnv(pjob,NULL,NULL,NULL,NULL); *(vtable.v_envp + vtable.v_used) = NULL; environ = vtable.v_envp; } else { /* if there is no environment, there's no way to expand words */ path = path_in; return(SUCCESS); } } else { *(vtable.v_envp + vtable.v_used) = NULL; environ = vtable.v_envp; } /* expand the path */ switch (wordexp(path_in, &exp, WRDE_NOCMD | WRDE_UNDEF)) { case 0: /* success - allow if word count is 1 */ if (exp.we_wordc == 1) { snprintf(path,pathlen,"%s",exp.we_wordv[0]); wordfree(&exp); environ = environ_old; return(SUCCESS); } /* fall through */ case WRDE_NOSPACE: wordfree(&exp); /* fall through */ default: environ = environ_old; return(FAILURE); } /* END switch () */ /* not reached */ environ = environ_old; return(FAILURE); #endif /* HAVE_WORD_EXP */ } /* END start_exec.c */