/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * * 1. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 4, this list of conditions * and the disclaimer contained in paragraph 5. * * 2. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 4, this list of * conditions and the disclaimer contained in paragraph 5 in the * documentation and/or other materials provided with the distribution. * * 3. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 4. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 5. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ #include /* the master config generated by configure */ #include #include #include #include #ifdef USEJOBCREATE #ifndef JOBFAKE extern "C" { #include } #endif /* JOBFAKE */ #endif /* USEJOBCREATE */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if IBM_SP2==2 /* IBM SP with PSSP 3.1 */ #include #endif /* IBM SP */ #include "libpbs.h" #include "portability.h" #include "list_link.h" #include "server_limits.h" #include "attribute.h" #include "resource.h" #include "resmon.h" #include "pbs_job.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "../lib/Libifl/lib_ifl.h" #include "mom_mach.h" #include "mom_func.h" #include "pbs_error.h" #include "svrfunc.h" #include "net_connect.h" #include "net_cache.h" #include "dis.h" #include "batch_request.h" #include "md5.h" #include "mcom.h" #include "resource.h" #include "utils.h" #include "mom_comm.h" #include "../lib/Libnet/lib_net.h" /* socket_avail_bytes_on_descriptor */ #include "alps_constants.h" #include "alps_functions.h" #include "tcp.h" /* tcp_chan */ #include "dynamic_string.h" #include "mom_memory.h" #include "node_internals.hpp" #ifdef ENABLE_CPA #include "pbs_cpa.h" #endif #ifdef PENABLE_LINUX26_CPUSETS #include "pbs_cpuset.h" #endif #ifdef HAVE_WORDEXP #include #endif /* HAVE_WORDEXP */ #ifdef ENABLE_CSA #ifndef CSAFAKE #include "csa_api.h" #include "csaacct.h" #else typedef enum { WM_INFO = 1, /* Informational */ WM_RECV, /* Request received */ WM_INIT, /* Request initiated */ WM_SPOOL, /* Request output spooled */ WM_TERM /* Request terminated */ } wm_type; /* * Subtypes for workload management accounting record type WM_RECV. */ #define WM_RECV_NEW 1 /* New request */ /* * Subtypes for workload management accounting record type WM_INIT. */ #define WM_INIT_START 1 /* Request started for first time */ #define WM_INIT_RESTART 2 /* Request restarted */ #define WM_INIT_RERUN 3 /* Request rerun */ /* * Subtypes for workload management accounting record type WM_TERM. */ #define WM_TERM_EXIT 1 /* Request exited */ #define WM_TERM_REQUEUE 2 /* Request requeued */ #define WM_TERM_HOLD 3 /* Request checkpointed and held */ #define WM_TERM_RERUN 4 /* Request will be rerun */ #define WM_TERM_MIGRATE 5 /* Request will be migrated */ #endif /* CSAFAKE */ #endif /* ENABLE_CSA */ #ifdef NOPOSIXMEMLOCK #undef _POSIX_MEMLOCK #endif /* NOPOSIXMEMLOCK */ #define EXTRA_VARIABLE_SPACE 5120 #define EXTRA_ENV_PTRS 32 #define MAX_JOB_ARGS 64 /* Global Variables */ extern node_internals internal_layout; extern int exec_with_exec; extern int attempttomakedir; extern int spoolasfinalname; extern int num_var_env; extern char **environ; extern int exiting_tasks; extern int lockfds; extern tlist_head mom_polljobs; extern char *path_jobs; extern char *path_prolog; extern char *path_prologuser; extern char *path_prologp; extern char *path_prologuserp; extern char *path_spool; extern char *path_aux; extern char *apbasil_path; extern char *apbasil_protocol; extern gid_t pbsgroup; extern uid_t pbsuser; extern time_t time_now; extern unsigned int pbs_rm_port; extern u_long localaddr; extern char *nodefile_suffix; extern char *submithost_suffix; extern char DEFAULT_UMASK[]; extern char PRE_EXEC[]; extern int LOGLEVEL; extern int EXTPWDRETRY; extern long TJobStartBlockTime; extern int multi_mom; extern unsigned int pbs_rm_port; extern char path_checkpoint[]; extern char jobstarter_exe_name[]; extern char mom_host[]; extern int jobstarter_set; int mom_reader_go; /* see catchinter() & mom_writer() */ struct var_table vtable; /* for building up job's environ */ extern char tmpdir_basename[]; /* for TMPDIR */ extern int src_login_batch; extern int src_login_interactive; extern int is_login_node; #ifdef NUMA_SUPPORT extern int num_node_boards; extern nodeboard node_boards[MAX_NODE_BOARDS]; extern int numa_index; #endif /* Local Variables */ static int script_in; /* script file, will be stdin */ static pid_t writerpid; /* writer side of interactive job */ static pid_t shellpid; /* shell part of interactive job */ /* sync w/variables_else[] */ enum TVarElseEnum { tveHome = 0, tveLogName, tveJobName, tveJobID, tveQueue, tveShell, tveUser, tveJobCookie, tveNodeNum, tveTaskNum, tveMOMPort, tveNodeFile, tveNumNodes, tveTmpDir, tveVerID, tveNumNodesStr, tveNumPpn, tveGpuFile, tveNprocs, tveWallTime, tveMicFile, tveOffloadDevices, tveLAST }; static const char *variables_else[] = /* variables to add, value computed */ { "HOME", "LOGNAME", "PBS_JOBNAME", "PBS_JOBID", "PBS_QUEUE", "SHELL", "USER", "PBS_JOBCOOKIE", "PBS_NODENUM", "PBS_TASKNUM", "PBS_MOMPORT", "PBS_NODEFILE", "PBS_NNODES", /* number of nodes specified by size */ "TMPDIR", "PBS_VERSION", "PBS_NUM_NODES", /* number of nodes specified by nodes string */ "PBS_NUM_PPN", /* ppn value specified by nodes string */ "PBS_GPUFILE", /* file containing which GPUs to access */ "PBS_NP", /* number of processors requested */ "PBS_WALLTIME", /* requested or default walltime */ "PBS_MICFILE", /* file containing which MICs to access */ "OFFLOAD_DEVICES",/* indices of MICs for the job */ NULL }; static int num_var_else = tveLAST; /* prototypes */ static void starter_return(int, int, int, struct startjob_rtn *); static void catchinter(int); int expand_vtable(struct var_table *vtable); int copy_data(struct var_table *tmp_vtable, struct var_table *vtable, int expand_bsize, int expand_ensize); #define EN_THRESHOLD 100 #define B_THRESHOLD 2048 #define EXTRA_VARIABLE_SPACE 5120 #ifdef PENABLE_LINUX26_CPUSETS extern int use_cpusets(job *); #endif /* PENABLE_LINUX26_CPUSETS */ int TMomFinalizeJob1(job *, pjobexec_t *, int *); int TMomFinalizeJob2(pjobexec_t *, int *); int TMomFinalizeJob3(pjobexec_t *, int, int, int *); int expand_path(job *,char *,int,char *); int TMomFinalizeChild(pjobexec_t *); int TMomCheckJobChild(pjobexec_t *, int, int *, int *); int InitUserEnv(job *,task *,char **,struct passwd *pwdp,char *); int mkdirtree(char *,mode_t); struct radix_buf **allocate_sister_list(int radix); int add_host_to_sister_list(char *, unsigned short , struct radix_buf *); void free_sisterlist(struct radix_buf **list, int radix); int allocate_demux_sockets(job *pjob, int flag); static int search_env_and_open(const char *, u_long); extern int TMOMJobGetStartInfo(job *, pjobexec_t **); extern int mom_reader(int, int); extern int mom_writer(int, int); extern int x11_create_display(int, char *, char *phost, int pport, char *homedir, char *x11authstr); extern int blcr_restart_job(job *pjob, char *file); extern int mom_checkpoint_job_is_checkpointable(job *pjob); extern int mom_checkpoint_job_has_checkpoint(job *pjob); extern int mom_checkpoint_execute_job(job *pjob, char *shell, char *arg[], struct var_table *vtable); extern void mom_checkpoint_init_job_periodic_timer(job *pjob); extern int mom_checkpoint_start_restart(job *pjob); extern void get_chkpt_dir_to_use(job *pjob, char *chkpt_dir, int chkpt_dir_size); extern char *cat_dirs(char *root, char *base); extern char *get_local_script_path(job *pjob, char *base); #ifdef NVIDIA_GPUS extern int setup_gpus_for_job(job *pjob); extern int use_nvidia_gpu; #endif /* NVIDIA_GPUS */ int exec_job_on_ms(job *pjob); /* END prototypes */ #ifdef USEJOBCREATE uint64_t get_jobid(char *); #endif /* USEJOBCREATE */ #ifdef ENABLE_CSA void add_wkm_start(uint64_t, char *); void add_wkm_end(uint64_t, int64_t, char *); /* valid commands for csaswitch checking */ enum csa_chk_cmd { IS_INSTALLED = 0, IS_UP = 1 }; #endif /* ENABLE_CSA */ int create_command( char *cmdbuf, int cmdbuf_size, char **argv) { int i; snprintf(cmdbuf, cmdbuf_size, "%s", argv[0]); for (i = 1; argv[i] != NULL; i++) { if (cmdbuf_size - strlen(cmdbuf) > strlen(argv[i]) + 1) { strcat(cmdbuf, " "); strcat(cmdbuf, argv[i]); } else return(-1); } if (cmdbuf_size - strlen(cmdbuf) > 1) strcat(cmdbuf, ")"); else return(-1); return(PBSE_NONE); } /* END create_command() */ /* * no_hang() - interrupt handler for alarm() around attempt to connect * to qsub for interactive jobs. If qsub hung or suspended or if the * network is fouled up, mom cannot afford to wait forever. */ static void no_hang( int sig) /* I (not used) */ { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, " ", "alarm timed-out connect to qsub"); return; } /* END no_hang() */ struct passwd *check_pwd( job *pjob) /* I (modified) */ { int retryCount; struct passwd *pwdp = NULL; struct group *grpp; char *ptr; /* NOTE: should cache entire pwd object (NYI) */ ptr = pjob->ji_wattr[JOB_ATR_euser].at_val.at_str; if (ptr == NULL) { /* FAILURE */ sprintf(log_buffer, "no user specified for job"); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); return(NULL); } /* we will retry if needed just to cover temporary problems */ for (retryCount = 0;retryCount < EXTPWDRETRY;retryCount++) { pwdp = getpwnam_ext(ptr); if (pwdp != NULL) break; sleep(1); } if (pwdp == NULL) { /* FAILURE */ sprintf(log_buffer, "no password entry for user %s", ptr); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); return(NULL); } #ifdef __CYGWIN__ if (IamUserByName(ptr) == 0) return(NULL); #endif /* __CYGWIN__ */ if (pjob->ji_grpcache != NULL) { /* SUCCESS */ /* group cache previously loaded and cached */ return(pwdp); } pjob->ji_qs.ji_un_type = JOB_UNION_TYPE_MOM; pjob->ji_qs.ji_un.ji_momt.ji_exuid = pwdp->pw_uid; pjob->ji_grpcache = (struct grpcache *)calloc(1, sizeof(struct grpcache) + strlen(pwdp->pw_dir) + 1); if (pjob->ji_grpcache == NULL) { /* FAILURE */ sprintf(log_buffer, "calloc failed"); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); return(NULL); } strcpy(pjob->ji_grpcache->gc_homedir, pwdp->pw_dir); /* get the group and supplimentary under which the job is to be run */ if ((pjob->ji_wattr[JOB_ATR_egroup].at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) == ATR_VFLAG_SET) { /* execution group specified and not default of login group */ /* NOTE: ideally egroup should be groupname, not groupid, but pbs_server * code will send a group ID over in some instances, so we should try * to work with a groupid if provided */ grpp = getgrnam(pjob->ji_wattr[JOB_ATR_egroup].at_val.at_str); if (grpp != NULL) { pjob->ji_qs.ji_un.ji_momt.ji_exgid = grpp->gr_gid; } else { int tmpGID; /* check to see if we were given a groupid (group names cannot start * with a number) */ tmpGID = (int)strtol(pjob->ji_wattr[JOB_ATR_egroup].at_val.at_str,NULL,10); if (tmpGID != 0) { pjob->ji_qs.ji_un.ji_momt.ji_exgid = tmpGID; } else { /* FAILURE */ sprintf(log_buffer, "no group entry for group %s, user=%s, errno=%d (%s)", pjob->ji_wattr[JOB_ATR_egroup].at_val.at_str, ptr, errno, strerror(errno)); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); return(NULL); } } /* END if (grpp != NULL) */ } else { /* if no group specified, default to login group */ pjob->ji_qs.ji_un.ji_momt.ji_exgid = pwdp->pw_gid; } if ((pjob->ji_grpcache->gc_ngroup = init_groups( pwdp->pw_name, pjob->ji_qs.ji_un.ji_momt.ji_exgid, NGROUPS_MAX, pjob->ji_grpcache->gc_groups)) < 0) { /* FAILURE */ sprintf(log_buffer, "too many group entries"); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); return(NULL); } /* perform site specific check on validatity of account */ if (site_mom_chkuser(pjob)) { /* FAILURE */ sprintf(log_buffer, "site_mom_chkuser failed"); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); return(NULL); } /* SUCCESS */ return(pwdp); } /* END check_pwd() */ /** * @see send_sisters() - child - send ABORT request to sisters * @see start_exec() - parent */ void exec_bail( job *pjob, /* I */ int code, /* I */ std::set *sisters_contacted) { int nodecount; unsigned short momport = 0; char log_buffer[LOG_BUF_SIZE]; sprintf(log_buffer, "bailing on job %s code %d", pjob->ji_qs.ji_jobid, code); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); if (am_i_mother_superior(*pjob) == true) { nodecount = send_sisters(pjob, IM_ABORT_JOB, FALSE, sisters_contacted); if (nodecount != pjob->ji_numnodes - 1) { sprintf(log_buffer, "%s: sent %d ABORT requests, should be %d", __func__, nodecount, pjob->ji_numnodes - 1); log_err(-1, __func__, log_buffer); } /* inform non-MS nodes that job is aborting */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; pjob->ji_qs.ji_un.ji_momt.ji_exitstat = code; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); exiting_tasks = 1; if (pjob->ji_stdout > 0) close(pjob->ji_stdout); if (pjob->ji_stderr > 0) close(pjob->ji_stderr); } else { send_ms(pjob, IM_ABORT_JOB); } return; } /* END exec_bail() */ /* * becomes the user for pjob * * @param pjob - the job whose user we should become * @return PBSE_BADUSER on failure */ int become_the_user( job *pjob) { log_buffer[0] = '\0'; if (setgroups(pjob->ji_grpcache->gc_ngroup, (gid_t *)pjob->ji_grpcache->gc_groups) != PBSE_NONE) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setgroups for UID = %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); } else if (setgid(pjob->ji_qs.ji_un.ji_momt.ji_exgid) != PBSE_NONE) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setgid to %lu for UID = %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exgid, (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); } else if (setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, FALSE) < 0) { snprintf(log_buffer,sizeof(log_buffer), "PBS: setuid to %lu failed: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); } if (log_buffer[0] != '\0') return(PBSE_BADUSER); else return(PBSE_NONE); } /* END become_the_user() */ int become_the_user_sjr( job *pjob, int write, int read, struct startjob_rtn *sjr) { if (become_the_user(pjob) != PBSE_NONE) { if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, __func__, log_buffer); starter_return(write, read, JOB_EXEC_FAIL2, sjr); } #ifdef _CRAY setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, TRUE); /* cray kludge */ #endif /* CRAY */ return(PBSE_NONE); } /* END become_the_user_sjr() */ #define RETRY 10 int open_demux( u_long addr, /* I */ int port) /* I */ { int sock; int i; struct sockaddr_in saddr; struct sockaddr_in remote; torque_socklen_t slen; unsigned short local_port; memset(&remote, 0, sizeof(remote)); remote.sin_addr.s_addr = addr; remote.sin_port = htons((unsigned short)port); remote.sin_family = AF_INET; if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { sprintf(log_buffer, "%s: socket %s", __func__, netaddr(&remote)); log_err(errno, __func__, log_buffer); return(-1); } for (i = 0;i < RETRY;i++) { if (connect(sock, (struct sockaddr *)&remote, sizeof(remote)) == 0) { /* success */ if (LOGLEVEL >= 6) { slen = sizeof(saddr); if (getsockname(sock, (struct sockaddr *)&saddr, &slen) == -1) return(sock); local_port = (int)ntohs(saddr.sin_port); sprintf(log_buffer, "%s: local port: %d", __func__, local_port); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, __func__, log_buffer); } return(sock); } sprintf(log_buffer, "connect failed: addr: %ld port: %d", addr, port); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, __func__, log_buffer); switch (errno) { case EINTR: case ETIMEDOUT: case ECONNRESET: sleep(2); continue; /*NOTREACHED*/ break; case EADDRINUSE: case ECONNREFUSED: sprintf(log_buffer, "%s: cannot connect to %s", __func__, netaddr(&remote)); log_err(errno, __func__, log_buffer); sleep(2); continue; /*NOTREACHED*/ break; default: /* NO-OP */ break; } /* END switch (errno) */ break; } /* END for (i) */ sprintf(log_buffer, "%s: connect %s", __func__, netaddr(&remote)); log_err(errno, __func__, log_buffer); close(sock); return(-1); } /* END open_demux() */ /* * open_pty - open slave side of master/slave pty */ static int open_pty( job *pjob) /* I */ { char *name; int pts; /* Open the slave pty as the controlling tty */ name = pjob->ji_wattr[JOB_ATR_outpath].at_val.at_str; if ((pts = open(name, O_RDWR, 0600)) < 0) { snprintf(log_buffer, sizeof(log_buffer), "cannot open slave, file name: %s", name); log_err(errno, "open_pty", log_buffer); } else { FDMOVE(pts); if(pts < 0) { log_err(errno, "open_pty", "cannot move pts file."); return -1; } fchmod(pts, 0620); if (fchown(pts, pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) { close(pts); log_err(errno, "open_pty", "cannot change slave's owner"); return -1; } #ifdef SETCONTROLLINGTTY #if defined(_CRAY) ioctl(0, TCCLRCTTY, 0); ioctl(pts, TCSETCTTY, 0); /* make controlling */ #elif defined(TCSETCTTY) ioctl(pts, TCSETCTTY, 0); /* make controlling */ #elif defined(TIOCSCTTY) ioctl(pts, TIOCSCTTY, 0); #endif #endif /* SETCONTROLLINGTTY */ } return(pts); } /* END open_pty() */ /* * is_joined - determine if standard out and stardard error are joined together * (-j option) and if so which is first * Returns: 0 - no join, separate files * +1 - joined as stdout * -1 - joined as stderr */ int is_joined( job *pjob) /* I */ { pbs_attribute *pattr; pattr = &pjob->ji_wattr[JOB_ATR_join]; if ((pattr->at_flags & ATR_VFLAG_SET) && (pattr->at_val.at_str[0] != 'n')) { if ((pattr->at_val.at_str[0] == 'o') && (strchr(pattr->at_val.at_str, (int)'e') != 0)) { return(1); } if ((pattr->at_val.at_str[0] == 'e') && (strchr(pattr->at_val.at_str, (int)'e') != 0)) { return(-1); } } return(0); } /* END is_joined() */ /* * open_std_out_err - open standard out and err to files */ static int open_std_out_err( job *pjob, /* I */ int timeout) /* I (optional,>0 to set) */ { int i; int file_out = -2; int file_err = -2; int filemode = O_CREAT | O_WRONLY | O_APPEND | O_EXCL; /* if std out/err joined (set and != "n"), which file is first */ i = is_joined(pjob); if (timeout > 0) { alarm(timeout); } if (i == 1) { file_out = open_std_file( pjob, StdOut, filemode, pjob->ji_qs.ji_un.ji_momt.ji_exgid); file_err = (file_out < 0)?-1:dup(file_out); } else if (i == -1) { file_err = open_std_file( pjob, StdErr, filemode, pjob->ji_qs.ji_un.ji_momt.ji_exgid); file_out = (file_err < 0)?-1:dup(file_err); } if (file_out == -2) file_out = open_std_file( pjob, StdOut, filemode, pjob->ji_qs.ji_un.ji_momt.ji_exgid); if (file_err == -2) file_err = open_std_file( pjob, StdErr, filemode, pjob->ji_qs.ji_un.ji_momt.ji_exgid); alarm(0); /* disable alarm */ if ((file_out < 0) || (file_err < 0)) { /* FAILURE - cannot load files */ if ((file_out == -2) || (file_err == -2)) { /* timeout occurred */ char *path; int keeping; path = std_file_name(pjob, StdOut, &keeping); sprintf(log_buffer, "unable to stat/open file '%s' within %d seconds - check filesystem", (path != NULL) ? path : "???", timeout); log_err(errno, __func__, log_buffer); } else { log_err(errno, __func__, "unable to open standard output/error"); } return(-1); } /* END if ((file_out < 0) || (file_err < 0)) */ FDMOVE(file_out); /* make sure descriptor > 2 */ FDMOVE(file_err); /* so don't clobber stdin/out/err */ if((file_out == -1)||(file_err == -1)) { log_err(errno, __func__, "unable to open standard output/error"); return -1; } if (file_out != 1) { close(1); if (dup(file_out) >= 0) { close(file_out); } } if (file_err != 2) { close(2); if (dup(file_err) >= 0) { close(file_err); } } return(PBSE_NONE); } /* END open_std_out_err() */ int mkdirtree( char *dirpath, /* I */ mode_t mode) /* I */ { char *part; int rc = 0; mode_t oldmask = 0; char *path = NULL; if (*dirpath != '/') { rc = -1; goto done; } /* make a copy to scribble NULLs on */ if ((path = strdup(dirpath)) == NULL) { rc = -1; goto done; } oldmask = umask(0000); part = strtok(path, "/"); if (part == NULL) { rc = -1; goto done; } *(part - 1) = '/'; /* leading '/' */ while ((part = strtok(NULL, "/")) != NULL) { if (mkdir(path, mode) == -1) { if (errno != EEXIST) { rc = errno; goto done; } } *(part - 1) = '/'; } /* very last component */ if (mkdir(path, mode) == -1) { if (errno != EEXIST) { rc = errno; goto done; } } done: if (oldmask != 0) umask(oldmask); if (path != NULL) free(path); return(rc); } /* END mkdirtree() */ /* If our config allows it, construct tmpdir path */ int TTmpDirName( job *pjob, /* I */ char *tmpdir, /* O */ int tmpdir_size) { if (tmpdir_basename[0] == '/') { snprintf(tmpdir, tmpdir_size, "%s/%s", tmpdir_basename, pjob->ji_qs.ji_jobid); } else { *tmpdir = '\0'; } return(*tmpdir != '\0'); /* return "true" if tmpdir is set */ } /* END TTmpDirName() */ /* * @return PBSE_NONE on success, PBSE_... otherwise */ int TMakeTmpDir( job *pjob, /* I */ char *tmpdir) /* I */ { int rc; int retval; struct stat sb; #ifdef TOP_TEMPDIR_ONLY mode_t oldmask = 0; #endif if ((setegid(pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) || (setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, TRUE) == -1)) { return(PBSE_BADUSER); } #ifdef TOP_TEMPDIR_ONLY errno = 0; oldmask = umask(0000); retval = mkdir(tmpdir,0755); if (oldmask != 0) umask(oldmask); if (retval == -1) retval = errno; #else retval = mkdirtree(tmpdir, 0755); #endif if (retval == 0) { /* We made it, it's ours */ pjob->ji_flags |= MOM_HAS_TMPDIR; } else { /* log the first error */ log_err(errno, __func__, strerror(errno)); rc = stat(tmpdir, &sb); if (rc) rc = errno; switch (rc) { case ENOENT: sprintf(log_buffer, "Unable to make job transient directory: %s", tmpdir); retval = PBSE_CANTCREATETMPDIR; break; case 0: if (S_ISDIR(sb.st_mode)) { if (sb.st_uid == pjob->ji_qs.ji_un.ji_momt.ji_exuid) { retval = PBSE_NONE; /* owned by the job, allowed */ } else { sprintf(log_buffer, "Job transient tmpdir %s already exists, owned by %d", tmpdir, sb.st_uid); retval = PBSE_TMPDIFFOWNER; } } else { sprintf(log_buffer, "Job transient tmpdir %s exists, but is not a directory", tmpdir); retval = PBSE_TMPNOTDIR; } break; default: sprintf(log_buffer, "Cannot name job tmp directory %s (on stat)", tmpdir); return(PBSE_TMPNONAME); break; } } /* END if (retval == 0) */ setuid_ext(pbsuser, TRUE); setegid(pbsgroup); if (retval != 0) log_err(retval, __func__, log_buffer); return(retval); } /* END TMakeTmpDir() */ /* * get_mic_indices * * parses the exec_mics string and places the absolute indices in a * list in buf * * the exec_mics string in NUMA is in the format: * --mic/[+...] * e.g.: 'slesmic-0-mic/1+slesmic-0-mic/0' * * non-numa is in the format: * -mic/[+...] * * @param pjob - the job whose exec_mic string we're parsing * @param buf - where the list of absolute mic indices goes * @param buf_size - maximum size that can be written into buf */ void get_mic_indices( job *pjob, char *buf, int buf_size) { char *mic_str; char *tok; char *slash; #ifdef NUMA_SUPPORT char *dash1; char *dash2; int numa_index = -1; #endif int numa_offset = 0; int mic_index; if (buf == NULL) return; fprintf(stderr,"in %s val = %d\n",__func__,JOB_ATR_exec_mics); mic_str = strdup(pjob->ji_wattr[JOB_ATR_exec_mics].at_val.at_str); buf[0] = '\0'; if (mic_str != NULL) { tok = strtok(mic_str, "+"); while (tok != NULL) { numa_offset = 0; #ifdef NUMA_SUPPORT dash1 = strrchr(tok, '-'); if (dash1 != NULL) { *dash1 = '\0'; dash2 = strrchr(tok, '-'); if (dash2 != NULL) { numa_index = strtol(dash2+1, NULL, 10); if (numa_index < num_node_boards) numa_offset = node_boards[numa_index].mic_start_index; } *dash1 = '-'; } #endif if ((slash = strchr(tok, '/')) != NULL) { mic_index = strtol(slash+1, NULL, 10) + numa_offset; if (buf[0] != '\0') snprintf(buf + strlen(buf), buf_size - strlen(buf), ",%d", mic_index); else snprintf(buf, buf_size, "%d", mic_index); } tok = strtok(NULL, "+"); } } } /* END get_mic_indices() */ /* Sets up env for a user process, used by TMomFinalizeJob1, start_process, * and file copies */ int InitUserEnv( job *pjob, /* I */ task *ptask, /* I (optional) */ char **envp, /* I (optional) */ struct passwd *pwdp, /* I (optional) */ char *shell) /* I (optional) */ { struct array_strings *vstrs; int j = 0; int ebsize = 0; char buf[MAXPATHLEN + 2]; int usertmpdir = 0; int num_nodes = 1; int num_ppn = 1; pbs_attribute *pattr; resource *presc; resource_def *prd; if (pjob == NULL) { sprintf(log_buffer, "passed a NULL pjob!"); log_err(errno, __func__, log_buffer); return(-1); } /* initialize vtable */ if (envp != NULL) { for (j = 0, ebsize = 0;envp[j] != NULL;j++) ebsize += strlen(envp[j]); } if (LOGLEVEL >= 10) { sprintf(log_buffer, "creating env buffer, count: %d size: %d", j, ebsize); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } vstrs = pjob->ji_wattr[JOB_ATR_variables].at_val.at_arst; vtable.v_bsize = ebsize + EXTRA_VARIABLE_SPACE + (vstrs != NULL ? (vstrs->as_next - vstrs->as_buf) : 0); vtable.v_block_start = (char *)calloc(1, vtable.v_bsize); if (vtable.v_block_start == NULL) { sprintf(log_buffer, "PBS: failed to init env, calloc: %s\n", strerror(errno)); log_err(errno, __func__, log_buffer); return(-1); } vtable.v_block = vtable.v_block_start; vtable.v_ensize = num_var_else + num_var_env + j + EXTRA_ENV_PTRS + (vstrs != NULL ? vstrs->as_usedptr : 0); vtable.v_used = 0; vtable.v_envp = (char **)calloc(vtable.v_ensize, sizeof(char *)); if (vtable.v_envp == NULL) { sprintf(log_buffer, "PBS: failed to init env, calloc: %s\n", strerror(errno)); log_err(errno, __func__, log_buffer); return(-1); } /* First variables from the local environment */ for (j = 0;j < num_var_env;++j) if (bld_env_variables(&vtable, environ[j], NULL) != PBSE_NONE) return -1; if (LOGLEVEL >= 10) { sprintf(log_buffer, "local env added, count: %d", j); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } /* Next, the variables passed with the job. They may */ /* be overwritten with new correct values for this job */ if (vstrs != NULL) { for (j = 0;j < vstrs->as_usedptr;++j) { bld_env_variables(&vtable, vstrs->as_string[j], NULL); if (!strncmp( vstrs->as_string[j], variables_else[tveTmpDir], strlen(variables_else[tveTmpDir]))) usertmpdir = 1; } if (LOGLEVEL >= 10) { sprintf(log_buffer, "job env added, count: %d", j); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } } /* END if (vstrs != NULL) */ /* HOME */ if (pjob->ji_grpcache != NULL) bld_env_variables(&vtable, variables_else[tveHome], pjob->ji_grpcache->gc_homedir); /* LOGNAME */ if (pwdp != NULL) bld_env_variables(&vtable, variables_else[tveLogName], pwdp->pw_name); /* PBS_JOBNAME */ bld_env_variables(&vtable, variables_else[tveJobName], pjob->ji_wattr[JOB_ATR_jobname].at_val.at_str); /* PBS_JOBID */ bld_env_variables(&vtable, variables_else[tveJobID], pjob->ji_qs.ji_jobid); /* PBS_QUEUE */ bld_env_variables(&vtable, variables_else[tveQueue], pjob->ji_wattr[JOB_ATR_in_queue].at_val.at_str); /* SHELL */ if (shell != NULL) bld_env_variables(&vtable, variables_else[tveShell], shell); /* USER, for compatability */ if (pwdp != NULL) bld_env_variables(&vtable, variables_else[tveUser], pwdp->pw_name); /* PBS_JOBCOOKIE */ bld_env_variables(&vtable, variables_else[tveJobCookie], pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str); /* PBS_NODENUM */ sprintf(buf, "%d", pjob->ji_nodeid); bld_env_variables(&vtable, variables_else[tveNodeNum], buf); /* PBS_TASKNUM */ if (ptask != NULL) { sprintf(buf, "%d", ptask->ti_qs.ti_task); bld_env_variables(&vtable, variables_else[tveTaskNum], buf); } /* PBS_MOMPORT */ sprintf(buf, "%d", pbs_rm_port); bld_env_variables(&vtable, variables_else[tveMOMPort], buf); /* PBS_NODEFILE */ if (pjob->ji_flags & MOM_HAS_NODEFILE) { sprintf(buf, "%s/%s", path_aux, pjob->ji_qs.ji_jobid); bld_env_variables(&vtable, variables_else[tveNodeFile], buf); /* add the gpu file as well */ sprintf(buf, "%s/%sgpu", path_aux, pjob->ji_qs.ji_jobid); bld_env_variables(&vtable, variables_else[tveGpuFile], buf); sprintf(buf, "%s/%smic", path_aux, pjob->ji_qs.ji_jobid); bld_env_variables(&vtable, variables_else[tveMicFile], buf); } if (pjob->ji_wattr[JOB_ATR_exec_mics].at_val.at_str != NULL) { get_mic_indices(pjob, buf, sizeof(buf)); bld_env_variables(&vtable, variables_else[tveOffloadDevices], buf); } /* PBS_WALLTIME */ pattr = &pjob->ji_wattr[JOB_ATR_resource]; prd = find_resc_def(svr_resc_def, "walltime", svr_resc_size); if ((presc = find_resc_entry(pattr, prd)) != NULL) { sprintf(buf, "%ld", presc->rs_value.at_val.at_long); bld_env_variables(&vtable, variables_else[tveWallTime], buf); } /* PBS_NNODES */ pattr = &pjob->ji_wattr[JOB_ATR_resource]; prd = find_resc_def(svr_resc_def, "size", svr_resc_size); presc = find_resc_entry(pattr, prd); if (presc != NULL) { sprintf(buf, "%ld", presc->rs_value.at_val.at_long); bld_env_variables(&vtable, variables_else[tveNumNodes], buf); } /* PBS_NUM_NODES and PBS_NPPN */ prd = find_resc_def(svr_resc_def,"nodes",svr_resc_size); presc = find_resc_entry(pattr,prd); if (presc != NULL) { const char *ppn_str = "ppn="; char *tmp; char *other_reqs; if (presc->rs_value.at_val.at_str != NULL) { num_nodes = strtol(presc->rs_value.at_val.at_str, NULL, 10); if (num_nodes != 0) { if ((tmp = strstr(presc->rs_value.at_val.at_str,ppn_str)) != NULL) { tmp += strlen(ppn_str); num_ppn = strtol(tmp, NULL, 10); } other_reqs = presc->rs_value.at_val.at_str; while ((other_reqs = strchr(other_reqs, '+')) != NULL) { other_reqs += 1; num_nodes += strtol(other_reqs, &other_reqs, 10); } } } } /* these values have been initialized to 1, and will always be in the * environment */ sprintf(buf,"%d",num_nodes); bld_env_variables(&vtable,variables_else[tveNumNodesStr],buf); if (LOGLEVEL >= 3) { snprintf(log_buffer, sizeof(log_buffer), "Added %s=%s to environment", variables_else[tveNumNodesStr], buf); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } sprintf(buf,"%d",num_ppn); bld_env_variables(&vtable,variables_else[tveNumPpn],buf); if (LOGLEVEL >= 3) { snprintf(log_buffer, sizeof(log_buffer), "Added %s=%s to environment", variables_else[tveNumPpn], buf); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } /* setup TMPDIR */ if ((!usertmpdir) && (TTmpDirName(pjob, buf, sizeof(buf)))) bld_env_variables(&vtable, variables_else[tveTmpDir], buf); /* PBS_VERSION */ sprintf(buf, "TORQUE-%s", PACKAGE_VERSION); bld_env_variables(&vtable, variables_else[tveVerID], buf); /* passed-in environment for tasks */ if (envp != NULL) { for (j = 0;envp[j];j++) bld_env_variables(&vtable, envp[j], NULL); } return(PBSE_NONE); } /* END InitUserEnv() */ /** * mom_jobstarter_execute_job * * This routine is called from the newly created child process. * * @param pjob Pointer to job structure. * @see TMomFinalizeChild */ int mom_jobstarter_execute_job( job *pjob, char *shell, char *arg[], struct var_table *vtable) { /* Launch job executable with cr_run command so that cr_checkpoint command will work. */ /* shuffle up the existing args */ arg[5] = arg[4]; arg[4] = arg[3]; arg[3] = arg[2]; arg[2] = arg[1]; /* replace first arg with shell name note, this func is called from a child process that exits after the executable is launched, so we don't have to worry about freeing this calloc later */ arg[1] = (char *)calloc(1, strlen(shell) + 1); if (arg[1] == NULL) { log_err(errno,__func__, "cannot alloc env"); return(-1); } strcpy(arg[1], shell); arg[0] = jobstarter_exe_name; if (LOGLEVEL >= 10) { char cmd[MAXPATHLEN + 1]; create_command(cmd, sizeof(cmd), arg); sprintf(log_buffer, "execing jobstarter command (%s)\n", cmd); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } execve(jobstarter_exe_name, arg, vtable->v_envp); return(PBSE_NONE); } /* END mom_jobstarter_execute_job() */ /* For intermediate moms when job_radix is set. * open a stream to each sister mom in this radix group * and send an IM_JOIN_JOB_RADIX request with all the sister * and radix host information */ int open_tcp_stream_to_sisters( job *pjob, int com, tm_event_t parent_event, int mom_radix, hnodent *hosts, /* This is really an array of hnodent */ struct radix_buf **sister_list, tlist_head *phead, int flag) { int rc = DIS_SUCCESS; int i; hnodent *np; int stream; eventent *ep; svrattrl *psatl; struct tcp_chan *chan = NULL; np = hosts; pjob->ji_outstanding = 0; /* the sister lists have been made. Now contact the intermediate moms as designated by mom_radix */ for (i = 1; i <= mom_radix; i++) { np++; log_buffer[0] = '\0'; if (sister_list[i-1]->count < 2) { continue; } pjob->ji_outstanding++; stream = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr,sizeof(np->sock_addr)); if (IS_VALID_STREAM(stream) == FALSE) { pjob->ji_nodekill = i; if (log_buffer[0] != '\0') { sprintf(log_buffer, "tcp_connect_sockaddr failed on %s - job id %s", np->hn_host, pjob->ji_qs.ji_jobid); } log_err(errno, __func__, log_buffer); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); exec_bail(pjob, JOB_EXEC_FAIL1); return(PBSE_SISCOMM); } ep = event_alloc(com, np, TM_NULL_EVENT, TM_NULL_TASK); ep->ee_parent_event = parent_event; sprintf(log_buffer, "event %d to host %s: com: %d", ep->ee_event, np->hn_host,com); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_JOB, __func__, log_buffer); if ((chan = DIS_tcp_setup(stream)) == NULL) { close(stream); sprintf(log_buffer, "failed to allocate channel: %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); exec_bail(pjob, JOB_EXEC_FAIL1); return(PBSE_SISCOMM); } else if ((rc = im_compose(chan, pjob->ji_qs.ji_jobid, pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str, com, ep->ee_event, TM_NULL_TASK)) != DIS_SUCCESS) { } /* nodeid of receiver */ else if ((rc = diswsi(chan, i)) != DIS_SUCCESS) { } /* number of nodes */ else if ((rc = diswsi(chan, pjob->ji_numnodes)) != DIS_SUCCESS) { } /* out port number */ else if ((flag == MOTHER_SUPERIOR) && ((rc = diswsi(chan, pjob->ji_portout)) != DIS_SUCCESS)) { } /* err port number */ else if ((flag == MOTHER_SUPERIOR) && ((rc = diswsi(chan, pjob->ji_porterr)) != DIS_SUCCESS)) { } /* out port number */ else if ((flag != MOTHER_SUPERIOR) && ((rc = diswsi(chan, pjob->ji_im_portout)) != DIS_SUCCESS)) { } /* err port number */ else if ((flag != MOTHER_SUPERIOR) && ((rc = diswsi(chan, pjob->ji_im_porterr)) != DIS_SUCCESS)) { } /* sisters for this intermediate mom */ else if ((rc = diswst(chan, sister_list[i-1]->host_list)) != DIS_SUCCESS) { } /* sisters for this intermediate mom */ else if ((rc = diswst(chan, sister_list[i-1]->port_list)) != DIS_SUCCESS) { } /* how many sisters in this radix group */ else if ((rc = diswsi(chan, sister_list[i-1]->count)) != DIS_SUCCESS) { } else { /* write jobattrs */ psatl = (svrattrl *)GET_NEXT(*phead); if ((rc = encode_DIS_svrattrl(chan, psatl)) == DIS_SUCCESS) DIS_tcp_wflush(chan); } if (chan != NULL) DIS_tcp_cleanup(chan); close(stream); } return(PBSE_NONE); } /* end open_tcp_stream_to_sisters */ void free_sisterlist( struct radix_buf **list, int radix) { int i; if (list == NULL) return; for (i = 0; i < radix; i++) { if (list[i]) { if (list[i]->host_list) { free(list[i]->host_list); } if (list[i]->port_list) { free(list[i]->port_list); } free(list[i]); } } free(list); } /* END free_sisterlist() */ struct radix_buf **allocate_sister_list( int radix) { struct radix_buf **sister_list; int i; /* create sister lists to send out to intermediate moms */ if ((sister_list = (struct radix_buf **)calloc((size_t)radix, sizeof(struct radix_buf *))) == NULL) { log_err(ENOMEM,__func__, ""); return(NULL); } for (i = 0; i < radix; i++) { if ((sister_list[i] = (struct radix_buf *)calloc(1, sizeof(struct radix_buf))) == NULL) { free(sister_list); log_err(ENOMEM,__func__, ""); return(NULL); } if ((sister_list[i]->host_list = (char *)calloc(1, THE_LIST_SIZE)) == NULL) { free(sister_list); log_err(ENOMEM,__func__, ""); return(NULL); } memset(sister_list[i]->host_list, 0, THE_LIST_SIZE); sister_list[i]->current_string_len = 0; sister_list[i]->max_string_len = THE_LIST_SIZE; sister_list[i]->port_list = (char *)calloc(1, THE_LIST_SIZE); assert(sister_list[i]->port_list); memset(sister_list[i]->port_list, 0, THE_LIST_SIZE); sister_list[i]->current_port_str_len = 0; sister_list[i]->max_port_str_len = THE_LIST_SIZE; sister_list[i]->count = 0; } return(sister_list); } /* END allocate_sister_list() */ /* * Used by MOM superior to start the shell process. * perform all server level pre-job tasks, collect information * create parent-child pipes * * @see mom_set_use() - child */ int TMomFinalizeJob1( job *pjob, /* I (modified) */ pjobexec_t *TJE, /* O */ int *SC) /* O */ { int i; int rc; pbs_attribute *pattr; pbs_attribute *pattri; #ifndef MOM_FORCENODEFILE resource_def *prd; resource *presc; #endif #ifndef NUMA_SUPPORT torque_socklen_t slen; struct sockaddr_in saddr; #endif /* ndef NUMA_SUPPORT */ char buf[MAXPATHLEN + 2]; time_t time_now; struct stat sb; *SC = 0; time_now = time(0); if (TJE == NULL) { sprintf(log_buffer, "bad param in %s", __func__); *SC = JOB_EXEC_RETRY; return(FAILURE); } /* initialize job exec struct */ memset(TJE, 0, sizeof(pjobexec_t)); TJE->ptc = -1; TJE->pjob = (void *)pjob; /* prepare job environment */ #ifndef NUMA_SUPPORT if (pjob->ji_numnodes > 1) { /* ** Get port numbers from file decriptors in job struct. The ** sockets are stored there so they can be closed later as ** Main MOM will not need them after the job is going. */ slen = sizeof(saddr); if (getsockname(pjob->ji_stdout, (struct sockaddr *)&saddr, &slen) == -1) { sprintf(log_buffer, "getsockname on stdout"); *SC = JOB_EXEC_RETRY; return(FAILURE); } TJE->port_out = (int)ntohs(saddr.sin_port); slen = sizeof(saddr); if (getsockname(pjob->ji_stderr, (struct sockaddr *)&saddr, &slen) == -1) { sprintf(log_buffer, "getsockname on stderr"); *SC = JOB_EXEC_RETRY; return(FAILURE); } TJE->port_err = (int)ntohs(saddr.sin_port); } else #endif /* ndef NUMA_SUPPORT */ { TJE->port_out = -1; TJE->port_err = -1; } /* did the job request nodes? will need to setup node file */ pattr = &pjob->ji_wattr[JOB_ATR_resource]; #ifdef MOM_FORCENODEFILE pjob->ji_flags |= MOM_HAS_NODEFILE; #else /* MOM_FORCENODEFILE */ prd = find_resc_def(svr_resc_def, "neednodes", svr_resc_size); presc = find_resc_entry(pattr, prd); if (presc != NULL) pjob->ji_flags |= MOM_HAS_NODEFILE; #endif /* MOM_FORCENODEFILE */ /* * get the password entry for the user under which the job is to be run * we do this now to save a few things in the job structure */ if ((TJE->pwdp = (void *)check_pwd(pjob)) == NULL) { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); *SC = JOB_EXEC_FAIL1; return(FAILURE); } #if IBM_SP2==2 /* IBM SP with PSSP 3.1 */ /* load IBM SP switch table */ if (load_sp_switch(pjob) != 0) { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } #endif /* IBM SP */ /* Starting job */ mom_checkpoint_init_job_periodic_timer(pjob); if (mom_checkpoint_job_has_checkpoint(pjob)) { rc = mom_checkpoint_start_restart(pjob); if (rc == PBSE_NONE) { /* SUCCESS */ log_ext(-1, __func__, "Restart succeeded", LOG_DEBUG); /* reset mtime so walltime will not include held time */ /* update to time now minus the time already used */ /* unless it is suspended, see request.c/req_signal() */ /* check time on the file not the directory */ get_chkpt_dir_to_use(pjob, buf, sizeof(buf)); if (sizeof(buf) - strlen(buf) > strlen(pjob->ji_wattr[JOB_ATR_checkpoint_name].at_val.at_str) + 1) { strcat(buf, "/"); strcat(buf, pjob->ji_wattr[JOB_ATR_checkpoint_name].at_val.at_str); } stat(buf, &sb); if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_Suspend) == 0) { if ((pjob->ji_qs.ji_stime == 0) && (pjob->ji_wattr[JOB_ATR_start_time].at_flags & ATR_VFLAG_SET)) { pjob->ji_qs.ji_stime = (time_t)pjob->ji_wattr[JOB_ATR_start_time].at_val.at_long; } pjob->ji_qs.ji_stime = time_now - (sb.st_mtime - pjob->ji_qs.ji_stime); pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; if (mom_get_sample() != PBSE_NONE) mom_set_use(pjob); } else { pjob->ji_qs.ji_substate = JOB_SUBSTATE_SUSPEND; } *SC = 0; return(FAILURE); } else { /* FAILURE */ log_err(-1, __func__, "Restart failed"); /* retry for any kind of changable thing */ if ((errno == EAGAIN) || #ifdef ERFLOCK (errno == ERFLOCK) || #endif #ifdef EQUSR (errno == EQUSR) || #endif #ifdef EQGRP (errno == EQGRP) || #endif #ifdef EQACT (errno == EQACT) || #endif #ifdef ENOSDS (errno == ENOSDS) || #endif (errno == ENOMEM) || (errno == ENOLCK) || (errno == ENOSPC) || (errno == ENFILE) || (errno == EDEADLK) || (errno == EBUSY)) { pjob->ji_qs.ji_un.ji_momt.ji_exitstat = JOB_EXEC_RETRY; *SC = JOB_EXEC_RETRY; } else { pjob->ji_qs.ji_un.ji_momt.ji_exitstat = JOB_EXEC_BADRESRT; *SC = JOB_EXEC_FAIL1; } pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; exiting_tasks = 1; sprintf(log_buffer, "Restart failed, error %d (%s)", errno, pbs_strerror(errno)); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(FAILURE); } } /* end of if mom_checkpoint_job_has_checkpoint(pjob) */ /* * if certain resource limits require that the job usage be * polled or it is a multinode job, we link the job to mom_polljobs. * * NOTE: we overload the job field ji_jobque for this as it * is not used otherwise by MOM */ if ((pjob->ji_numnodes > 1) || (mom_do_poll(pjob) != 0)) append_link(&mom_polljobs, &pjob->ji_jobque, pjob); pattri = &pjob->ji_wattr[JOB_ATR_interactive]; if ((pattri->at_flags & ATR_VFLAG_SET) && (pattri->at_val.at_long != 0)) { TJE->is_interactive = TRUE; } else { TJE->is_interactive = FALSE; } if (TJE->is_interactive == TRUE) { /* * open a master pty, need to do it here before we fork, * to save the slave name in the master's job structure */ if ((TJE->ptc = open_master(&TJE->ptc_name)) < 0) { log_err(errno, __func__, "cannot open master pty"); *SC = JOB_EXEC_RETRY; return(FAILURE); } FDMOVE(TJE->ptc) /* save pty name in job output/error file name */ pattr = &pjob->ji_wattr[JOB_ATR_outpath]; job_attr_def[JOB_ATR_outpath].at_free(pattr); job_attr_def[JOB_ATR_outpath].at_decode( pattr, NULL, NULL, TJE->ptc_name, 0); pjob->ji_wattr[JOB_ATR_outpath].at_flags = (ATR_VFLAG_SET | ATR_VFLAG_MODIFY | ATR_VFLAG_SEND); pattr = &pjob->ji_wattr[JOB_ATR_errpath]; job_attr_def[JOB_ATR_errpath].at_free(pattr); job_attr_def[JOB_ATR_errpath].at_decode( pattr, NULL, NULL, TJE->ptc_name, 0); pjob->ji_wattr[JOB_ATR_errpath].at_flags = (ATR_VFLAG_SET | ATR_VFLAG_MODIFY | ATR_VFLAG_SEND); } /* END if (TJE->is_interactive == TRUE) */ #if SHELL_USE_ARGV == 0 #if SHELL_INVOKE == 1 if (TJE->is_interactive == FALSE) { /* need a pipe on which to write the shell script */ /* file name to the input of the shell */ if (pipe(TJE->pipe_script) == -1) { sprintf(log_buffer, "Failed to create shell name pipe, errno = %d (%s)", errno, strerror(errno)); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } } /* END if (TJE->is_interactive == FALSE) */ #endif /* SHELL_INVOKE */ #endif /* !SHELL_USE_ARGV */ /* create pipes between MOM and the job starter */ /* fork the job starter which will become the job */ if ((pipe(TJE->mjspipe) == -1) || (pipe(TJE->jsmpipe) == -1)) { i = -1; } else { i = 0; /* make sure pipe file descriptors are above 2 */ if (TJE->jsmpipe[1] < 3) { TJE->upfds = fcntl(TJE->jsmpipe[1], F_DUPFD, 3); close(TJE->jsmpipe[1]); TJE->jsmpipe[1] = 0; } else { TJE->upfds = TJE->jsmpipe[1]; } if (TJE->mjspipe[0] < 3) { TJE->downfds = fcntl(TJE->mjspipe[0], F_DUPFD, 3); close(TJE->mjspipe[0]); TJE->mjspipe[0] = 0; } else { TJE->downfds = TJE->mjspipe[0]; } } if ((i == -1) || (TJE->upfds < 3) || (TJE->downfds < 3)) { sprintf(log_buffer, "cannot create communication pipe"); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } if ((TJE->ptask = (void *)pbs_task_create(pjob, TM_NULL_TASK)) == NULL) { sprintf(log_buffer, "cannot create job task"); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } pjob->ji_qs.ji_substate = JOB_SUBSTATE_STARTING; pjob->ji_qs.ji_stime = time_now; return(SUCCESS); } /* END TMomFinalizeJob1() */ /* fork child/prolog */ int TMomFinalizeJob2( pjobexec_t *TJE, /* I */ int *SC) /* O */ { char buf[MAXPATHLEN + 2]; pid_t cpid; #if SHELL_USE_ARGV == 0 #if SHELL_INVOKE == 1 int i; int j; #endif /* SHELL_INVOKE */ #endif /* !SHELL_USE_ARGV */ job *pjob; pjob = (job *)TJE->pjob; if (LOGLEVEL >= 4) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "about to fork child which will become job"); } /* fork the child that will become the job. */ if ((cpid = fork_me(-1)) < 0) { /* fork failed */ sprintf(log_buffer, "fork kf job '%s' failed in (errno=%d, '%s')", pjob->ji_qs.ji_jobid, errno, strerror(errno)); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } if (cpid == 0) { /* CHILD: handle child activities */ TMomFinalizeChild(TJE); /*NOTREACHED*/ } /* parent */ close(TJE->upfds); close(TJE->downfds); if (TJE->ptc >= 0) close(TJE->ptc); if (multi_mom) { snprintf(buf, sizeof(buf), "%s%s%d%s", path_jobs, pjob->ji_qs.ji_fileprefix, pbs_rm_port, JOB_SCRIPT_SUFFIX); } else snprintf(buf, sizeof(buf), "%s%s%s", path_jobs, pjob->ji_qs.ji_fileprefix, JOB_SCRIPT_SUFFIX); if (chown( buf, pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) { } #if SHELL_USE_ARGV == 0 #if SHELL_INVOKE == 1 if (TJE->is_interactive == FALSE) { int k; if (exec_with_exec) { if (strlen(buf)+5 <= sizeof(buf)) { memmove(buf + 5, buf, strlen(buf) + 1); strncpy(buf, "exec ", 5); } } /* pass name of shell script on pipe */ /* will be stdin of shell */ close(TJE->pipe_script[0]); /* Did the user submit arguments with the -F option in qsub? */ if (pjob->ji_wattr[JOB_ATR_arguments].at_flags & ATR_VFLAG_SET) { if (sizeof(buf) - strlen(buf) > strlen(pjob->ji_wattr[JOB_ATR_arguments].at_val.at_str) + 1) { strcat(buf, " "); strcat(buf, pjob->ji_wattr[JOB_ATR_arguments].at_val.at_str); } } if (sizeof(buf) - strlen(buf) > 1) strcat(buf, "\n"); /* setup above */ i = strlen(buf); j = 0; while (j < i) { if ((k = write_ac_socket(TJE->pipe_script[1], buf + j, i - j)) < 0) { if (errno == EINTR) continue; break; } j += k; } close(TJE->pipe_script[1]); } /* END if (TJE->is_interactive == FALSE) */ #endif /* SHELL_INVOKE */ #endif /* !SHELL_USE_ARGV */ /* SUCCESS: parent returns */ if (LOGLEVEL >= 3) { sprintf(log_buffer, "phase 2 of job launch successfully completed"); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } *SC = 0; return(SUCCESS); } /* END TMomFinalizeJob2() */ int determine_umask( int uid) /* I */ { int UMaskVal = 0077; struct passwd *pwdp; FILE *fp; char retdata[20]; char command[100]; if (DEFAULT_UMASK[0] != '\0') { if (!strcasecmp(DEFAULT_UMASK, "userdefault")) { /* apply user default */ /* do we inherit umask when we do setuid(), NO */ /* we want to try and determine what the users umask is */ /* then we return its value so it can be set correctly */ if ((pwdp = getpwuid(uid)) == NULL) { sprintf(log_buffer, "FAILED to get password structure for uid %d", uid); log_err(-1, __func__, log_buffer); } else { sprintf(command, "/bin/su - %s -c umask", pwdp->pw_name); if ((fp = popen(command, "r")) != NULL) { if (fgets(retdata, 20, fp) != NULL) { /* set the umask value from returned data */ UMaskVal = strtol(retdata, NULL, 8); } pclose(fp); } } } else { UMaskVal = (int)strtol(DEFAULT_UMASK, NULL, 0); } /* make sure that we have access to the file when we move the spooled file */ UMaskVal = UMaskVal & 0377; if (LOGLEVEL > 7) { sprintf(log_buffer, "Using $job_output_file_umask value of %o", UMaskVal); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } } return(UMaskVal); } /* END determine_umask() */ #ifdef PENABLE_LINUX26_CPUSETS /** * indicates whether or not cpusets should be used to run this job * * @param pjob - the job to check * @return TRUE if either GEOMETRY_REQUESTS is undefined or a request was made * else FALSE */ int use_cpusets( job *pjob) /* I */ { #ifdef ALWAYS_USE_CPUSETS return(TRUE); #else #ifdef GEOMETRY_REQUESTS resource *presc; resource_def *prd; if (pjob == NULL) return(FALSE); prd = find_resc_def(svr_resc_def,"procs_bitmap",svr_resc_size); presc = find_resc_entry(&pjob->ji_wattr[JOB_ATR_resource],prd); /* don't create a cpuset unless one was specifically requested */ if ((presc == NULL) || (presc->rs_value.at_flags & ATR_VFLAG_SET) == FALSE) { return(FALSE); } else return(TRUE); #else return(TRUE); #endif /* GEOMETRY_REQUESTS */ #endif /* ALWAYS_USE_CPUSETS */ } /* END use_cpusets() */ #endif /* PENABLE_LINUX26_CPUSETS */ /* * writes an exec attr to a file * receives strings in the format: -gpu/[+-gpu/...] * and prints them in the format: -gpu[\n-gpu...] * Currently this is only supported for JOB_ATR_exec_gpus and JOB_ATR_exec_mics * * @param file - the file to print to * @param pjob - the job whose gpu string will be printed * @return PBSE_NONE if success, error code otherwise */ int write_attr_to_file( job *pjob, int index, const char *suffix) { char filename[MAXPATHLEN]; FILE *file; char *attr_str; char *worker; char *plus; char *slash; char *next; char *curr; /* if there are no gpus, do nothing */ if ((pjob->ji_wattr[index].at_flags & ATR_VFLAG_SET) == 0) return(PBSE_NONE); attr_str = pjob->ji_wattr[index].at_val.at_str; if (attr_str == NULL) return(PBSE_NONE); /* open the file just like $PBS_NODEFILE */ sprintf(filename, "%s/%s%s", path_aux, pjob->ji_qs.ji_jobid, suffix); if ((file = fopen(filename, "w")) == NULL) { sprintf(log_buffer, "cannot open %s", filename); log_err(errno, __func__, log_buffer); return(-1); } if (fchmod(fileno(file), 0644) == -1) { sprintf(log_buffer, "cannot chmod %s", filename); log_err(errno, __func__, log_buffer); fclose(file); return(-1); } if ((worker = strdup(attr_str)) == NULL) { fclose(file); log_err(ENOMEM,__func__, "Couldn't allocate a string to work with? EPIC FAIL"); return(ENOMEM); } curr = worker; while (curr != NULL) { plus = strchr(curr,'+'); if (plus == NULL) { /* we have reached the last string */ next = NULL; } else { /* there is more, set up things to print this one */ next = plus + 1; *plus = '\0'; } /* remove the slash replace it with a dash */ slash = strchr(curr,'/'); if (slash == NULL) { /* should never happen, but we'll attempt to handle it anyway */ fprintf(file,"%s\n",curr); } else { *slash = '\0'; fprintf(file,"%s%s\n",curr,slash+1); } /* advance to the next chunk */ curr = next; } /* SUCCESS */ free(worker); fclose(file); return(PBSE_NONE); } /* END write_attr_to_file() */ /* * writes the exec_host str to a file * receives strings in the format: /[+/...] * and prints them out to the file * * @param file - the file to print to * @param pjob - the job whose host string will be printed * @return PBSE_NONE if success, error code otherwise */ int write_nodes_to_file( job *pjob) /* I */ { char filename[MAXPATHLEN]; int j; int vnodenum; FILE *file; char *BPtr; sprintf(filename, "%s/%s", path_aux, pjob->ji_qs.ji_jobid); if ((file = fopen(filename, "w")) == NULL) { sprintf(log_buffer, "cannot open %s", filename); log_err(errno, __func__, log_buffer); exit(1); } /* ** The file must be owned by root and readable by ** the user. We take the easy way out and make ** it readable by anyone. */ if (fchmod(fileno(file), 0644) == -1) { sprintf(log_buffer, "cannot chmod %s", filename); log_err(errno, __func__, log_buffer); fclose(file); exit(1); } /* NOTE: if BEOWULF_JOB_MAP is set, populate node file with this info */ BPtr = get_job_envvar(pjob, "BEOWULF_JOB_MAP"); if (BPtr != NULL) { char tmpBuffer[1000000]; char *ptr; /* FORMAT: [:]... */ snprintf(tmpBuffer, sizeof(tmpBuffer), "%s", BPtr); ptr = strtok(tmpBuffer, ":"); while (ptr != NULL) { if (nodefile_suffix != NULL) { fprintf(file, "%s%s\n", ptr, nodefile_suffix); } else { fprintf(file, "%s\n", ptr); } ptr = strtok(NULL, ":"); } } else { vnodenum = pjob->ji_numvnod; for (j = 0;j < vnodenum;j++) { vnodent *vp = &pjob->ji_vnods[j]; #ifdef NUMA_SUPPORT /* make sure that the nodefile has actual hostnames, not * numa names */ char *dash = NULL; char *tmp = vp->vn_host->hn_host; while ((tmp = strchr(tmp,'-')) != NULL) { /* just advance to the last dash */ dash = tmp; tmp++; } if (dash != NULL) { *dash = '\0'; } else { /* this should never happen */ log_err(-1, __func__, "Numa enabled but no dash in hostname?"); } #endif /* def NUMA_SUPPORT */ if (nodefile_suffix != NULL) { fprintf(file, "%s%s\n", vp->vn_host->hn_host, nodefile_suffix); } else { fprintf(file, "%s\n", vp->vn_host->hn_host); } #ifdef NUMA_SUPPORT if (dash != NULL) { *dash = '-'; } #endif /* def NUMA_SUPPORT */ } /* END for (j) */ } fclose(file); return(PBSE_NONE); } /* END write_nodes_to_file() */ void take_care_of_nodes_file( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE) { if (pjob->ji_flags & MOM_HAS_NODEFILE) { if (write_nodes_to_file(pjob) == -1) starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, sjr); if (write_attr_to_file(pjob, JOB_ATR_exec_gpus, "gpu") == -1) starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, sjr); if (write_attr_to_file(pjob, JOB_ATR_exec_mics, "mic") == -1) starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, sjr); #ifdef NVIDIA_GPUS if ((use_nvidia_gpu) && setup_gpus_for_job(pjob) == -1) starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, sjr); #endif /* NVIDIA_GPUS */ } /* END if (pjob->ji_flags & MOM_HAS_NODEFILE) */ if (LOGLEVEL >= 10) log_ext(-1, __func__, "node file created", LOG_DEBUG); } /* END take_care_of_nodes_file() */ void handle_cpuset_creation( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE) { #ifdef PENABLE_LINUX26_CPUSETS if (use_cpusets(pjob) == TRUE) { if (LOGLEVEL >= 6) { sprintf(log_buffer, "about to create cpuset for job %s.\n", pjob->ji_qs.ji_jobid); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } if (create_job_cpuset(pjob) == FAILURE) { /* FAILURE */ sprintf(log_buffer, "Could not create cpuset for job %s.\n", pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, sjr); } } #endif /* END PENABLE_LINUX26_CPUSETS */ } /* END handle_cpuset_creation() */ void handle_reservation( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE) { long long pagg = 0; int j; char *rsv_id = NULL; resource *pres; int use_nppn = TRUE; int nppcu = APBASIL_DEFAULT_NPPCU_VALUE; /* default */ sjr->sj_session = setsid(); if (is_login_node == TRUE) { #ifdef USEJOBCREATE /* Get a jobid from the system */ pagg = get_jobid(pjob->ji_qs.ji_jobid); #else pagg = sjr->sj_session; #endif /* USEJOBCREATE */ sjr->sj_jobid = pagg; pjob->ji_wattr[JOB_ATR_pagg_id].at_val.at_ll = pagg; pjob->ji_wattr[JOB_ATR_pagg_id].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; } /* set up the job session (update sjr) */ memcpy(TJE->sjr, sjr, sizeof(struct startjob_rtn)); if (is_login_node == TRUE) { char *exec_str; int mppdepth = 0; char *mppnodes = NULL; if (pjob->ji_wattr[JOB_ATR_multi_req_alps].at_val.at_str != NULL) exec_str = pjob->ji_wattr[JOB_ATR_multi_req_alps].at_val.at_str; else exec_str = pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str; pres = find_resc_entry( &pjob->ji_wattr[JOB_ATR_resource], find_resc_def(svr_resc_def, "procs", svr_resc_size)); if ((pres != NULL) && (pres->rs_value.at_val.at_long != 0)) use_nppn = FALSE; pres = find_resc_entry( &pjob->ji_wattr[JOB_ATR_resource], find_resc_def(svr_resc_def, "mppdepth", svr_resc_size)); if ((pres != NULL) && (pres->rs_value.at_val.at_long != 0)) mppdepth = pres->rs_value.at_val.at_long; /* get nppcu value from job if it exists */ if ((pjob->ji_wattr[JOB_ATR_nppcu].at_flags & ATR_VFLAG_SET)) nppcu = pjob->ji_wattr[JOB_ATR_nppcu].at_val.at_long; /* get the mppnodes if it exists */ pres = find_resc_entry( &pjob->ji_wattr[JOB_ATR_resource], find_resc_def(svr_resc_def, "mppnodes", svr_resc_size)); if ((pres != NULL) && (pres->rs_value.at_val.at_str != NULL)) { mppnodes = strdup(pres->rs_value.at_val.at_str); } j = create_alps_reservation(exec_str, pjob->ji_wattr[JOB_ATR_job_owner].at_val.at_str, pjob->ji_qs.ji_jobid, apbasil_path, apbasil_protocol, pagg, use_nppn, nppcu, mppdepth, &rsv_id, mppnodes); if(mppnodes != NULL) free(mppnodes); if (rsv_id != NULL) { sjr->sj_rsvid = atoi(rsv_id); free(rsv_id); } if (j < 0) { snprintf(log_buffer, sizeof(log_buffer), "Couldn't create the reservation for job %s", pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, sjr); } } } /* END handle_reservation() */ void handle_prologs( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE) { char *path_prologuserjob; int rc; resource *presc; if ((rc = run_pelog(PE_PROLOG, path_prolog, pjob, PE_IO_TYPE_ASIS, FALSE)) != PBSE_NONE) { log_err(-1, __func__, "prolog failed"); if ((TJE->is_interactive == FALSE) && (rc != 1)) { starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, sjr); } else { starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, sjr); } } if (LOGLEVEL >= 10) log_ext(-1, __func__, "prolog complete", LOG_DEBUG); /* run user prolog */ if ((rc = run_pelog(PE_PROLOGUSER, path_prologuser, pjob, PE_IO_TYPE_ASIS, FALSE)) != PBSE_NONE) { log_err(-1, __func__, "user prolog failed"); if ((TJE->is_interactive == FALSE) && (rc != 1)) { starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, sjr); } else { starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, sjr); } } if (LOGLEVEL >= 10) log_ext(-1, __func__, "user prolog complete", LOG_DEBUG); presc = find_resc_entry( &pjob->ji_wattr[JOB_ATR_resource], find_resc_def(svr_resc_def, "prologue", svr_resc_size)); if ((presc != NULL)) { if ((presc->rs_value.at_flags & ATR_VFLAG_SET) && (presc->rs_value.at_val.at_str != NULL)) { path_prologuserjob = get_local_script_path(pjob, presc->rs_value.at_val.at_str); if (path_prologuserjob) { if ((rc = run_pelog(PE_PROLOGUSERJOB, path_prologuserjob, pjob, PE_IO_TYPE_ASIS, FALSE)) != PBSE_NONE) { log_err(-1, __func__, "batch job local user prolog failed"); free(path_prologuserjob); if ((TJE->is_interactive == FALSE) && (rc != 1)) { starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, sjr); } else { starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, sjr); } } free(path_prologuserjob); if (LOGLEVEL >= 10) log_ext(-1, __func__, "job prolog complete", LOG_DEBUG); } } } } /* END handle_prologs() */ int start_interactive_session( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE, int *pts_ptr, int *qsub_sock_ptr, char *qsubhostname, int qsubhostname_size) { struct sigaction act; int pport = 0; char *phost; char *termtype; char EMsg[MAXLINE]; /*************************************************************/ /* We have an "interactive" job, connect the standard */ /* streams to a socket connected to qsub. */ /*************************************************************/ sigemptyset(&act.sa_mask); #ifdef SA_INTERRUPT act.sa_flags = SA_INTERRUPT; #else act.sa_flags = 0; #endif /* SA_INTERRUPT */ act.sa_handler = no_hang; sigaction(SIGALRM, &act, NULL); /* only giving ourselves 5 seconds to connect to qsub * and get term settings */ alarm(5); /* once we connect to qsub and open a pty, the user can send us * a ctrl-c. It is important that we block this until we exec() * the user's shell or we exit and the job gets stuck */ act.sa_handler = SIG_IGN; sigaction(SIGINT, &act, (struct sigaction *)0); /* Set environment to reflect interactive */ bld_env_variables(&vtable, "PBS_ENVIRONMENT", "PBS_INTERACTIVE"); /* get host where qsub resides */ phost = arst_string("PBS_O_HOST", &pjob->ji_wattr[JOB_ATR_variables]); pport = pjob->ji_wattr[JOB_ATR_interactive].at_val.at_long; if ((phost == NULL) || ((phost = strchr(phost, '=')) == NULL)) { log_err(-1, __func__, "PBS_O_HOST not set"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, sjr); } phost++; if (submithost_suffix != NULL) { snprintf(qsubhostname, qsubhostname_size, "%s%s", phost, submithost_suffix); } else { snprintf(qsubhostname, qsubhostname_size, "%s", phost); } if ((*qsub_sock_ptr = conn_qsub(qsubhostname, pport, EMsg)) < 0) { snprintf(log_buffer, sizeof(log_buffer), "cannot open interactive qsub socket to host %s:%d - '%s' - check routing tables/multi-homed host issues", qsubhostname, pport, EMsg); log_err(errno, __func__, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, sjr); } FDMOVE(*qsub_sock_ptr); /* send jobid as validation to qsub */ if ((*qsub_sock_ptr < 0)|| (write_ac_socket(*qsub_sock_ptr, pjob->ji_qs.ji_jobid, PBS_MAXSVRJOBID + 1) != PBS_MAXSVRJOBID + 1)) { log_err(errno, __func__, "cannot write jobid"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, sjr); } /* receive terminal type and window size */ if ((termtype = rcvttype(*qsub_sock_ptr)) == NULL) { log_err(errno, __func__, "cannot get termtype"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, sjr); } bld_env_variables(&vtable, termtype, NULL); *(vtable.v_envp + vtable.v_used) = NULL; /* null term */ if (rcvwinsize(*qsub_sock_ptr) == -1) { log_err(errno, __func__, "cannot get winsize"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, sjr); } /* turn off alarm set around qsub connect activities */ alarm(0); act.sa_handler = SIG_DFL; act.sa_flags = 0; sigaction(SIGALRM, &act, NULL); /* open the slave pty as the controlling tty */ if ((*pts_ptr = open_pty(pjob)) < 0) { log_err(errno, __func__, "cannot open slave"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, sjr); } return(PBSE_NONE); } /* END start_interactive_session() */ void start_interactive_reader( struct sigaction *act, pjobexec_t *TJE, int pts, int qsub_sock) { sigaction(SIGTERM, act, NULL); close(pts); close(TJE->upfds); close(TJE->downfds); close(1); close(2); sigemptyset(&act->sa_mask); act->sa_flags = SA_NOCLDSTOP; act->sa_handler = catchinter; sigaction(SIGCHLD, act, NULL); mom_reader_go = 1; mom_reader(qsub_sock, TJE->ptc); } /* END start_interactive_reader() */ void setup_interactive_job( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE, int *pts_ptr, int *qsub_sock_ptr, char *qsubhostname, int qsubhostname_size) { struct sigaction act; int writerpid; int shellpid; handle_reservation(pjob, sjr, TJE); start_interactive_session(pjob, sjr, TJE, pts_ptr, qsub_sock_ptr, qsubhostname, qsubhostname_size); memset(&act, 0, sizeof(act)); sigemptyset(&act.sa_mask); act.sa_handler = SIG_IGN; /* setup to ignore SIGTERM */ writerpid = fork(); if (writerpid == 0) { /* child is "writer" process */ sigaction(SIGTERM, &act, NULL); close(TJE->upfds); close(TJE->downfds); close(*pts_ptr); mom_writer(*qsub_sock_ptr, TJE->ptc); shutdown(*qsub_sock_ptr, 2); exit(0); } else if (writerpid > 0) { /* ** parent -- it first runs the prolog then forks ** again. the child becomes the job while the ** parent becomes the reader. */ close(1); close(2); dup2(*pts_ptr, 1); dup2(*pts_ptr, 2); fflush(stdout); fflush(stderr); set_termcc(*pts_ptr); /* set terminal control char */ setwinsize(*pts_ptr); /* set window size to qsub's */ /* run prolog - interactive job */ handle_prologs(pjob, sjr, TJE); #ifdef ENABLE_CSA /* Add a workload management start record */ add_wkm_start(sjr->sj_jobid, pjob->ji_qs.ji_jobid); #endif /* ENABLE_CSA */ shellpid = fork(); if (shellpid == 0) { /*********************************************/ /* child - this will be the interactive job */ /* i/o is to slave tty */ /*********************************************/ close(0); dup2(*pts_ptr, 0); fflush(stdin); close(TJE->ptc); /* close master side */ close(*pts_ptr); /* dup'ed above */ close(*qsub_sock_ptr); /* continue setting up and exec-ing shell */ } else { if (shellpid > 0) { /* fork, parent is "reader" process */ start_interactive_reader(&act, TJE, *pts_ptr, *qsub_sock_ptr); } else { log_err(errno, __func__, "can't fork reader"); } /* make sure qsub gets EOF */ shutdown(*qsub_sock_ptr, 2); /* change pty back to available after job is done */ if (chmod(TJE->ptc_name, 0666) != 0) { log_err(errno, __func__, "can't chmod 0666 to change pty back to available"); } if (chown(TJE->ptc_name, 0, 0) == -1) { log_err(errno, __func__, "can't chown pty"); } exit(0); } } /* END if (writerpid > 0) */ else { /* FAILURE - fork failed */ log_err(errno, __func__, "cannot fork nanny"); /* change pty back to available */ if (chmod(TJE->ptc_name, 0666) != 0) { log_err(errno, __func__, "can't chmod 0666 to change pty back to available"); } if (chown(TJE->ptc_name, 0, 0) == -1) { log_err(errno, __func__, "can't chown ptc"); } starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, sjr); } } /* END setup_interactive_job() */ void set_job_script_as_stdin( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE) { #if (SHELL_USE_ARGV != 1) && (SHELL_INVOKE != 1) char portname[MAXLINE]; char buf[MAXPATHLEN + 2]; #endif #if SHELL_USE_ARGV == 1 /* connect stdin to /dev/null and feed the name of * the script on the command line */ script_in = open("/dev/null", O_RDONLY, 0); #elif SHELL_INVOKE == 1 /* if passing script file name as input to shell */ close(TJE->pipe_script[1]); script_in = TJE->pipe_script[0]; #else /* SHELL_USE_ARGV || SHELL_INVOKE */ /* if passing script itself as input to shell */ strcpy(buf, path_jobs); strcat(buf, pjob->ji_qs.ji_fileprefix); if (multi_mom) { sprintf(portname,"%d",pbs_rm_port); strcat(buf,portname); } strcat(buf, JOB_SCRIPT_SUFFIX); if ((script_in = open(buf, O_RDONLY, 0)) < 0) { if (errno == ENOENT) script_in = open("/dev/null", O_RDONLY, 0); } #endif /* SHELL_USE_ARGV */ if (LOGLEVEL >= 10) log_ext(-1, __func__, "opening script", LOG_DEBUG); if (script_in < 0) { log_err(errno, __func__, "unable to open script"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL1, sjr); } FDMOVE(script_in); /* make sure descriptor > 2 */ if (script_in > 0) { close(0); if (dup(script_in) > 0) { close(script_in); } } } /* END set_job_script_as_stdin() */ void setup_batch_job( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE, int *pts_ptr, int *qsub_sock_ptr) { /*************************************************************/ /* We have a "normal" batch job, connect the standard */ /* streams to files */ /*************************************************************/ /* set Environment to reflect batch */ bld_env_variables(&vtable, "PBS_ENVIRONMENT", "PBS_BATCH"); bld_env_variables(&vtable, "ENVIRONMENT", "BATCH"); set_job_script_as_stdin(pjob, sjr, TJE); /* NOTE: set arg2 to 5 to enable file open timeout check */ if (open_std_out_err(pjob, 0) == -1) { log_err(-1, __func__, "unable to open stdout/stderr descriptors"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_STDOUTFAIL, sjr); } if (LOGLEVEL >= 10) log_ext(-1, __func__, "stdout/stderr opened", LOG_DEBUG); handle_reservation(pjob, sjr, TJE); /* run prolog - standard batch job */ handle_prologs(pjob, sjr, TJE); #ifdef ENABLE_CSA /* Add a workload management start record */ add_wkm_start(sjr->sj_jobid, pjob->ji_qs.ji_jobid); #endif /* ENABLE_CSA */ } /* END setup_batch_job() */ void attempt_to_set_limits( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE) { int i; int j; if ((i = mom_set_limits(pjob, SET_LIMIT_SET)) != PBSE_NONE) { if (log_buffer[0] != '\0') { /* report error to user via stderr file */ if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); } if (i == PBSE_RESCUNAV) { /* resource temp unavailable */ if (TJE->is_interactive == TRUE) j = JOB_EXEC_FAIL2; else j = JOB_EXEC_RETRY; } else { j = JOB_EXEC_FAIL2; } if (log_buffer[0] != '\0') { log_err(errno, __func__, log_buffer); } else { log_err(errno, __func__, "mom_set_limits failed"); } starter_return(TJE->upfds, TJE->downfds, j, sjr); /* exits */ } /* END if (mom_set_limits() == 0) */ } /* END attempt_to_set_limits() */ void go_to_init_dir( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE, struct passwd *pwdp) { /* cwd to PBS_O_INITDIR if specified, otherwise User's Home */ char *idir = get_job_envvar(pjob, "PBS_O_INITDIR"); if (idir == NULL) idir = pwdp->pw_dir; if (chdir(idir) == -1) { sprintf(log_buffer, "PBS: chdir to '%.256s' failed: %s\n", idir, strerror(errno)); if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, __func__, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, sjr); } } /* END go_to_init_dir() */ int setup_x11_forwarding( job *pjob, char *qsubhostname, pjobexec_t *TJE) { int pport = pjob->ji_wattr[JOB_ATR_interactive].at_val.at_long; if ((TJE->is_interactive == TRUE) && pjob->ji_wattr[JOB_ATR_forwardx11].at_val.at_str) { char display[512]; if (x11_create_display( 1, /* use localhost only */ display, /* output */ qsubhostname, pport, pjob->ji_grpcache->gc_homedir, pjob->ji_wattr[JOB_ATR_forwardx11].at_val.at_str) >= 0) { bld_env_variables(&vtable, "DISPLAY", display); } else { sprintf(log_buffer, "PBS: X11 forwarding init failed\n"); if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); return(-1); } } return(PBSE_NONE); } /* END setup_x11_forwarding() */ void restore_SIGINT() { struct sigaction act; /* restore SIGINT so that the child shell can use ctrl-c */ sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = SIG_DFL; sigaction(SIGINT, &act, (struct sigaction *)0); } /* END restore_SIGINT() */ /* * setup_interactive_command_if_present * Checks for an interactive command set on an interactive job. If it has * been set there, add it to the arguments to the job's shell in the form * -c * * @param pjob - the job we conditionally set this up for * @param sjr * @param TJE - job execution information * @param arg - the argv that will be supplied to the job's shell * @param aindex_ptr - a pointer to the number of arguments to the shell */ void setup_interactive_command_if_present( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE, char **arg, int *aindex_ptr) { int aindex = *aindex_ptr; if ((pjob->ji_wattr[JOB_ATR_inter_cmd].at_flags & ATR_VFLAG_SET) != 0) { arg[aindex] = (char *)calloc(1, strlen("-c") + 1); if (arg[aindex] == NULL) { log_err(errno, __func__, "cannot alloc env"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, sjr); } strcpy(arg[aindex], "-c"); arg[aindex + 1] = NULL; aindex++; arg[aindex] = (char *)calloc(1, strlen(pjob->ji_wattr[JOB_ATR_inter_cmd].at_val.at_str) + 1); if (arg[aindex] == NULL) { log_err(errno, __func__, "cannot alloc env"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, sjr); } log_ext(-1, __func__, log_buffer, LOG_DEBUG); strcpy(arg[aindex], pjob->ji_wattr[JOB_ATR_inter_cmd].at_val.at_str); arg[aindex + 1] = NULL; aindex++; } *aindex_ptr = aindex; } /* END setup_interactive_command_if_present() */ void launch_the_job_normally( char *shell, char **arg, char **job_env) { if (LOGLEVEL >= 10) { char cmd[MAXLINE]; create_command(cmd, sizeof(cmd), arg); sprintf(log_buffer, "execing command (%s) args (%s)\n", shell, cmd); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } execve(shell, arg, job_env); } /* END launch_the_job_normally() */ void add_preexec_if_needed( char **arg, int *aindex_ptr, struct startjob_rtn *sjr, pjobexec_t *TJE) { int aindex = *aindex_ptr; if (PRE_EXEC[0] != '\0') { arg[aindex] = strdup(PRE_EXEC); if (arg[aindex] == NULL) { log_err(errno, __func__, "cannot alloc env"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, sjr); } arg[aindex + 1] = NULL; aindex++; *aindex_ptr = aindex; } /* END if (PRE_EXEC[0] != '\0') */ } /* END add_preexec_if_needed() */ void launch_the_demux( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE, char **shell_ptr) { /* child does demux */ const char *demux = DEMUX; char *shellname; char *arg[MAX_JOB_ARGS]; int aindex; /* setup descriptors 3 and 4 */ /* pjob->ji_stdout and pjob->ji_stderr were opened * in allocate_demux_sockets when we started this job */ dup2(pjob->ji_stdout, 3); if (pjob->ji_stdout > 3) close(pjob->ji_stdout); dup2(pjob->ji_stderr, 4); if (pjob->ji_stderr > 4) close(pjob->ji_stderr); /* construct argv array */ shellname = strrchr((char *)demux, '/'); if (shellname != NULL) ++shellname; /* go past last '/' */ else shellname = *shell_ptr; aindex = 0; arg[aindex] = (char *)calloc(1, strlen(shellname) + 1); if (arg[aindex] == NULL) { log_err(errno, __func__, "cannot alloc env"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, sjr); } strcpy(arg[aindex], shellname); arg[aindex + 1] = NULL; aindex++; add_preexec_if_needed(arg, &aindex, sjr, TJE); execve(demux, arg, vtable.v_envp); /* reached only if execve fails */ *shell_ptr = (char *)demux; /* for fprintf below */ } /* END launch_the_demux() */ void source_login_shells_or_not( job *pjob, struct startjob_rtn *sjr, pjobexec_t *TJE, char **arg, int *aindex_ptr, char *shellname) { int aindex = *aindex_ptr; if (((TJE->is_interactive == TRUE) && (src_login_interactive == FALSE)) || ((TJE->is_interactive != TRUE) && (src_login_batch == FALSE))) { arg[aindex] = (char *)calloc(1, strlen(shellname) + 1); if (arg[aindex] == NULL) { log_err(errno, __func__, "cannot alloc env"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, sjr); } strcpy(arg[aindex], shellname); if (LOGLEVEL >= 7) { sprintf(log_buffer, "bypass sourcing of login files for job %s", pjob->ji_qs.ji_jobid); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } } else { arg[aindex] = (char *)calloc(1, strlen(shellname) + 2); if (arg[aindex] == NULL) { log_err(errno, __func__, "cannot alloc env"); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, sjr); } /* specifying '-' indicates this is a 'login' shell */ strcpy(arg[aindex], "-"); strcat(arg[aindex], shellname); } arg[aindex + 1] = NULL; aindex++; *aindex_ptr = aindex; } /* END source_login_shells_or_not() */ /* child portion of job launch executed as user - called by TMomFinalize2() */ /* will execute run_pelog() * issues setuid to pjob->ji_qs.ji_un.ji_momt.ji_exuid */ int TMomFinalizeChild( pjobexec_t *TJE) /* I */ { int aindex; char *arg[MAX_JOB_ARGS]; char buf[MAXPATHLEN + 2]; pid_t cpid; int j; int vnodenum; char qsubhostname[MAXLINE]; char log_buf[LOG_BUF_SIZE]; int pts; int qsub_sock; char *shell; char *shellname; char *idir; struct startjob_rtn sjr = { 0, 0, 0, 0}; job *pjob; task *ptask; struct passwd *pwdp; proc_stat_t *ps = NULL; pjob = (job *)TJE->pjob; ptask = (task *)TJE->ptask; pwdp = (struct passwd *)TJE->pwdp; if (pwdp == NULL) { snprintf(log_buf, sizeof(log_buf), "TMomFinalizeChild - Running job with no password entry? job id %s", pjob->ji_qs.ji_jobid); log_err(PBSE_BADUSER, __func__, log_buf); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, &sjr); exit(-254); } /*******************************************/ /* */ /* The child process - will become the job */ /* */ /*******************************************/ /* NOTE: This child is launched on the mother superior node. It does not have access to stdout/stderr, failure messages will route to syslog via log_err() */ if (LOGLEVEL >= 6) log_ext(-1, __func__, "starting", LOG_DEBUG); if (lockfds >= 0) { close(lockfds); lockfds = -1; } close(TJE->jsmpipe[0]); close(TJE->mjspipe[1]); /* find which shell to use, one specified or the login shell */ shell = set_shell(pjob, pwdp); /* in the machine dependent section */ if (LOGLEVEL >= 6) log_ext(-1, __func__, "shell initialized", LOG_DEBUG); /* Setup user env */ if (InitUserEnv(pjob, ptask, NULL, pwdp, shell) < 0) { snprintf(log_buf, sizeof(log_buf),"TMomFinalizeChild - failed to setup user env. job id %s", pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buf); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_RETRY, &sjr); } if (LOGLEVEL >= 6) log_ext(-1, __func__, "env initialized", LOG_DEBUG); /* Create the job's nodefile */ vnodenum = pjob->ji_numvnod; take_care_of_nodes_file(pjob, &sjr, TJE); sprintf(buf, "%d", 0); /* Set PBS_VNODENUM */ bld_env_variables(&vtable, "PBS_VNODENUM", buf); /* PBS_NP */ sprintf(buf, "%d", vnodenum); bld_env_variables(&vtable, variables_else[tveNprocs], buf); handle_cpuset_creation(pjob, &sjr, TJE); #ifdef ENABLE_CPA /* Cray CPA setup */ if ((j = CPACreatePartition(pjob, &vtable)) != 0) { snprintf(log_buf, sizeof(log_buf),"TMomFinalizeChild - CPACreatePartition failed. job id %s", pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buf); starter_return(TJE->upfds, TJE->downfds, j, &sjr); /* exits */ /*NOTREACHED*/ exit(1); } #endif /* END ENABLE_CPA */ /* specific system related variables */ j = set_mach_vars(pjob, &vtable); if (j != 0) { snprintf(log_buf, sizeof(log_buf),"TMomFinalizeChild - failed to set mach vars. job id %s", pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buf); starter_return(TJE->upfds, TJE->downfds, j, &sjr); /* exits */ /*NOTREACHED*/ exit(1); } if (LOGLEVEL >= 6) log_ext(-1, __func__, "system vars set", LOG_DEBUG); umask(determine_umask(pjob->ji_qs.ji_un.ji_momt.ji_exuid)); if (TJE->is_interactive == TRUE) { setup_interactive_job(pjob, &sjr, TJE, &pts, &qsub_sock, qsubhostname, sizeof(qsubhostname)); } /* END if (TJE->is_interactive == TRUE) */ else { setup_batch_job(pjob, &sjr, TJE, &pts, &qsub_sock); } /* END else (TJE->is_interactive == TRUE) */ /***********************************************************************/ /* Set resource limits */ /* Both normal batch and interactive job come through here */ /* */ /* output fds to the user are setup at this point, so write() all */ /* errors (with a \n) directly to the user on fd 2 and fscync(2) it */ /***********************************************************************/ pjob->ji_wattr[JOB_ATR_session_id].at_val.at_long = sjr.sj_session; pjob->ji_wattr[JOB_ATR_session_id].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY | ATR_VFLAG_SEND; ps = get_proc_stat((int)sjr.sj_session); if(ps != NULL) { pjob->ji_wattr[JOB_ATR_system_start_time].at_val.at_long = ps->start_time; pjob->ji_wattr[JOB_ATR_system_start_time].at_flags |= ATR_VFLAG_SET; } #ifdef PENABLE_LINUX26_CPUSETS /* Move this mom process into the cpuset so the job will start in it. */ if (use_cpusets(pjob) == TRUE) { if (LOGLEVEL >= 6) { sprintf(log_buffer, "about to move to cpuset of job %s", pjob->ji_qs.ji_jobid); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } move_to_job_cpuset(getpid(), pjob); } #endif /* (PENABLE_LINUX26_CPUSETS) */ if (site_job_setup(pjob) != 0) { /* FAILURE */ sprintf(log_buffer, "PBS: site specific job setup failed\n"); if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, __func__, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); /* exits */ /*NOTREACHED*/ } if (LOGLEVEL >= 6) { snprintf(log_buf, sizeof(log_buf),"setting system limits. job id %s", pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buf); } log_buffer[0] = '\0'; attempt_to_set_limits(pjob, &sjr, TJE); endpwent(); if (LOGLEVEL >= 6) { snprintf(log_buf, sizeof(log_buf),"system limits set. job id %s", pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buf); } if ((idir = get_job_envvar(pjob, "PBS_O_ROOTDIR")) != NULL) { if (chroot(idir) == -1) { sprintf(log_buffer, "TMomFinalizeChild - PBS: chroot to '%.256s' failed: %s\n", idir, strerror(errno)); if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, __func__, log_buffer); starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_FAIL2, &sjr); /*NOTREACHED*/ exit(-1); } } /* become the user, execv the shell and become the real job */ if (LOGLEVEL >= 6) { snprintf(log_buf, sizeof(log_buf), "setting user/group credentials to %d/%d, job id %s", pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid, pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buf ); } /* NOTE: must set groups before setting the user because not all users can * call setgid and setgroups, even if its their group, see setgid's man page */ become_the_user_sjr(pjob, TJE->upfds, TJE->downfds, &sjr); go_to_init_dir(pjob, &sjr, TJE, pwdp); if (LOGLEVEL >= 6) { snprintf(log_buf, sizeof(log_buf), "job id %s: initial directory set to %s\n", pjob->ji_qs.ji_jobid, idir != NULL ? idir : pwdp->pw_dir); log_err(-1, __func__, log_buf); } /* X11 forwarding init */ setup_x11_forwarding(pjob, qsubhostname, TJE); /* NULL terminate the envp array, This is MUST DO */ *(vtable.v_envp + vtable.v_used) = NULL; /* tell mom we are going */ if (LOGLEVEL >= 6) { snprintf(log_buf, sizeof(log_buf), "forking child: job id %s", pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buf); } starter_return(TJE->upfds, TJE->downfds, JOB_EXEC_OK, &sjr); log_close(0); /* FIXME: this is useless, right? */ if ((pjob->ji_numnodes == 1) || ((cpid = fork()) > 0)) { /* parent does the shell */ /* close sockets that child uses */ if (pjob->ji_stdout >= 0) close(pjob->ji_stdout); if (pjob->ji_stderr >= 0) close(pjob->ji_stderr); /* construct argv array */ shellname = strrchr(shell, '/'); if (shellname != NULL) ++shellname; /* go past last '/' */ else shellname = shell; aindex = 0; /* determine whether or not we bypass the sourcing of login shells */ source_login_shells_or_not(pjob, &sjr, TJE, arg, &aindex, shellname); add_preexec_if_needed(arg, &aindex, &sjr, TJE); #if SHELL_USE_ARGV == 1 /* Put the script's arguments on the command line (see configure option --enable-shell-use-argv). */ if (TJE->is_interactive == FALSE) { arg[aindex] = (char *)calloc(1, strlen(path_jobs) + strlen(pjob->ji_qs.ji_fileprefix) + strlen(JOB_SCRIPT_SUFFIX) + 6); if (arg[aindex] == NULL) { snprintf(log_buf, sizeof(log_buf), "cannot alloc env. job id %s", pjob->ji_qs.ji_jobid); log_err(errno, __func__, log_buf); starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ exit(-1); } if (exec_with_exec) { strcpy(arg[aindex], "exec "); strcat(arg[aindex], path_jobs); } else strcpy(arg[aindex], path_jobs); strcat(arg[aindex], pjob->ji_qs.ji_fileprefix); strcat(arg[aindex], JOB_SCRIPT_SUFFIX); arg[aindex + 1] = NULL; aindex++; } #endif /* SHELL_USE_ARGV */ if (TJE->is_interactive == TRUE) { char **ArgV = arg; restore_SIGINT(); /* if the user specified command(s) using -x in qsub then take care of it */ setup_interactive_command_if_present(pjob, &sjr, TJE, ArgV, &aindex); } if (jobstarter_set) { if (mom_jobstarter_execute_job(pjob, shell, arg, &vtable) == -1) { starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ exit(-1); } } if (mom_checkpoint_job_is_checkpointable(pjob)) { if (mom_checkpoint_execute_job(pjob, shell, arg, &vtable) == -1) { starter_return(TJE->upfds,TJE->downfds,JOB_EXEC_FAIL2,&sjr); /*NOTREACHED*/ exit(-1); } } else { char **ArgV = arg; launch_the_job_normally(shell, ArgV, vtable.v_envp); } } /* END if ((pjob->ji_numnodes == 1) || ...) */ else if (cpid == 0) { launch_the_demux(pjob, &sjr, TJE, &shell); } /* END else if (cpid == 0) */ /* NOTE: the code above all calls exec, so if we have reached here it is in error */ sprintf(log_buffer, "PBS: exec of shell '%.256s' failed\n", shell); if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); if (strlen(shell) == 0) { snprintf(log_buffer,sizeof(log_buffer), "user \"%s\" may not have a shell defined on node \"%s\"\n", pwdp->pw_name, mom_host); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } else if (strstr(shell, "/bin/false") != NULL) { snprintf(log_buffer,sizeof(log_buffer), "user \"%s\" has shell \"/bin/false\" on node \"%s\"\n", pwdp->pw_name, mom_host); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } else { struct stat buf; if (stat(shell, &buf) != 0) { DBPRT(("stat of shell \"%s\" failed with error %d\n", shell, errno)); } else if (S_ISREG(buf.st_mode) == 0) { DBPRT(("shell \"%s\" is not a file\n", shell)); } else if ((buf.st_mode & S_IXUSR) != 0) { DBPRT(("shell \"%s\" is not executable by user \"%s\"\n", shell, pwdp->pw_name)); } } exit(254); /* should never, ever get here */ } /* END TMomFinalizeChild() */ /* Child has already reported in via pipe (info in TJE->sjr) which was created in TMomFinalizeJob2->TMomFinalizeChild. Perform final job tasks. Change pjob substate from JOB_SUBSTATE_PRERUN to JOB_SUBSTATE_RUNNING */ int TMomFinalizeJob3( pjobexec_t *TJE, /* I (modified) */ int ReadSize, /* I (bytes read from child pipe) */ int ReadErrno, /* I (errno value from read) */ int *SC) /* O (return code) */ { struct startjob_rtn sjr; job *pjob; task *ptask; unsigned int momport = 0; proc_stat_t *ps = NULL; pjob = (job *)TJE->pjob; ptask = (task *)TJE->ptask; if (pjob == NULL) { log_err(-1, __func__, "This function needs a valid job pointer"); return(FAILURE); } /* sjr populated in TMomFinalizeJob2() */ memcpy(&sjr, TJE->sjr, sizeof(sjr)); close(TJE->jsmpipe[0]); if (ReadSize != sizeof(sjr)) { /* FAILURE */ sprintf(log_buffer, "read of pipe for sid failed for job %s (%d of %d bytes)", pjob->ji_qs.ji_jobid, ReadSize, (int)sizeof(sjr)); log_err(ReadErrno, __func__, log_buffer); sprintf(log_buffer, "start failed, improper sid"); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); close(TJE->mjspipe[1]); *SC = JOB_EXEC_RETRY; return(FAILURE); } /* send back as an acknowledgement that MOM got it */ if (write_ac_socket(TJE->mjspipe[1], &sjr, sizeof(sjr)) == -1) { } close(TJE->mjspipe[1]); if (LOGLEVEL >= 3) { #ifdef USEJOBCREATE sprintf(log_buffer, "Job %s read start return code=%d session=%ld jobid=%lx", pjob->ji_qs.ji_jobid, sjr.sj_code, (long)sjr.sj_session, sjr.sj_jobid); #else sprintf(log_buffer, "Job %s read start return code=%d session=%ld", pjob->ji_qs.ji_jobid, sjr.sj_code, (long)sjr.sj_session); #endif /* USEJOBCREATE */ log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); } if (sjr.sj_code != 0) { char tmpLine[MAXLINE]; /* FAILURE */ tmpLine[0] = '\0'; switch (sjr.sj_code) { case JOB_EXEC_OK: /* 0 */ strcpy(tmpLine, "no failure"); break; case JOB_EXEC_FAIL1: /* -1 */ strcpy(tmpLine, "job exec failure, before files staged, no retry"); break; case JOB_EXEC_FAIL2: /* -2 */ strcpy(tmpLine, "job exec failure, after files staged, no retry"); break; case JOB_EXEC_RETRY: /* -3 */ strcpy(tmpLine, "job exec failure, retry will be attempted"); if (sjr.sj_session < 0) { /* NOTE: push sjr.sj_sid into job pbs_attribute X to be used by encode_used */ ptask->ti_qs.ti_sid = sjr.sj_session; } break; case JOB_EXEC_STDOUTFAIL: /* -9 */ strcpy(tmpLine,"job exec failure, could not open/create stdout/stderr files, no retry"); break; default: sprintf(tmpLine, "job exec failure, code=%d", sjr.sj_code); break; } /* END switch (sjr.sj_code) */ sprintf(log_buffer, "job not started, %s %s (see syslog for more information)", (sjr.sj_code == JOB_EXEC_RETRY) ? "Retry" : "Failure", tmpLine); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); *SC = sjr.sj_code; return(FAILURE); } /* END if (sjr.sj_code < 0) */ /* pjob modified */ set_globid(pjob, &sjr); ptask->ti_qs.ti_sid = sjr.sj_session; ptask->ti_qs.ti_status = TI_STATE_RUNNING; strcpy(ptask->ti_qs.ti_parentjobid, pjob->ji_qs.ji_jobid); if (LOGLEVEL >= 6) { log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "saving task (TMomFinalizeJob3)"); } if (task_save(ptask) == -1) { /* FAILURE */ sprintf(log_buffer, "Task save failed"); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); *SC = JOB_EXEC_RETRY; return(FAILURE); } if (pjob->ji_numnodes > 1) { /* ** Put port numbers into job struct and close sockets. ** The job uses them to talk to demux, but main MOM ** doesn't need them. The port numbers are stored ** here for use in start_process(), to connect to ** pbs_demux. */ close(pjob->ji_stdout); pjob->ji_stdout = TJE->port_out; close(pjob->ji_stderr); pjob->ji_stderr = TJE->port_err; } /* return from the starter indicated the job is a go ... */ /* record the start time and session/process id */ pjob->ji_wattr[JOB_ATR_session_id].at_val.at_long = sjr.sj_session; pjob->ji_wattr[JOB_ATR_session_id].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY | ATR_VFLAG_SEND; ps = get_proc_stat((int)sjr.sj_session); if(ps != NULL) { pjob->ji_wattr[JOB_ATR_system_start_time].at_val.at_long = ps->start_time; pjob->ji_wattr[JOB_ATR_system_start_time].at_flags |= ATR_VFLAG_SET; } if (is_login_node == TRUE) { if (sjr.sj_rsvid != 0) { char buf[MAXLINE]; sprintf(buf, "%d", sjr.sj_rsvid); pjob->ji_wattr[JOB_ATR_reservation_id].at_val.at_str = strdup(buf); pjob->ji_wattr[JOB_ATR_reservation_id].at_flags = ATR_VFLAG_SET; pjob->ji_wattr[JOB_ATR_pagg_id].at_val.at_ll = sjr.sj_jobid; pjob->ji_wattr[JOB_ATR_pagg_id].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; } } #ifdef USEJOBCREATE pjob->ji_wattr[JOB_ATR_pagg_id].at_val.at_ll = sjr.sj_jobid; pjob->ji_wattr[JOB_ATR_pagg_id].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; #endif /* USEJOBCREATE */ pjob->ji_qs.ji_state = JOB_STATE_RUNNING; pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; pjob->ji_qs.ji_stime = time_now; /* changed from SAVEJOB_QUICK to SAVEJOB_FULL (USC - 2/5/2005) */ if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_FULL,momport); sprintf(log_buffer, "job %s started, pid = %ld", pjob->ji_qs.ji_jobid, (long)sjr.sj_session); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); return(SUCCESS); } /* END TMomFinalizeJob3() */ /** * Start a process for a spawn request. This will be different from * a job's initial shell task in that the environment will be specified * and no interactive code need be included. * * NOTE: Called for sisters after mother superior receives IM_SPAWN_TASK request */ int start_process( task *ptask, /* I */ char **argv, /* I */ char **envp) /* I */ { char *idir; job *pjob = ptask->ti_job; pid_t pid; int pipes[2]; int kid_read; int kid_write; int parent_read; int parent_write; int pts; int i; int j; int fd0; int fd1; int fd2; u_long ipaddr; unsigned int momport = 0; struct startjob_rtn sjr = { 0, 0, 0, 0 }; if (pipe(pipes) == -1) { return(-1); } if (pipes[1] < 3) { kid_write = fcntl(pipes[1], F_DUPFD, 3); close(pipes[1]); } else { kid_write = pipes[1]; } parent_read = pipes[0]; if (pipe(pipes) == -1) { return(-1); } if (pipes[0] < 3) { kid_read = fcntl(pipes[0], F_DUPFD, 3); close(pipes[0]); } else { kid_read = pipes[0]; } parent_write = pipes[1]; if ((kid_read < 0) || (kid_write < 0)) { log_err(-1, __func__, log_buffer); return(-1); } /* ** Get ipaddr to Mother Superior. */ if (am_i_mother_superior(*pjob) == true) { ipaddr = htonl(localaddr); } else if (pjob->ji_radix > 1) { ipaddr = pjob->ji_sisters[0].sock_addr.sin_addr.s_addr; } else { ipaddr = pjob->ji_hosts[0].sock_addr.sin_addr.s_addr; } /* A restarted mom will not have called this yet, but it is needed * to spawn tasks (ji_grpcache). */ if (!check_pwd(pjob)) { log_err(-1, __func__, log_buffer); return(-1); } /* ** Begin a new process for the fledgling task. */ if ((pid = fork_me(-1)) == -1) { /* fork failed */ return(-1); } if (pid != 0) { /* parent */ int gotsuccess = 0; close(kid_read); close(kid_write); /* read sid */ for (;;) { i = read_ac_socket(parent_read, (char *) & sjr, sizeof(sjr)); if ((i == -1) && (errno == EINTR)) continue; if ((i == sizeof(sjr)) && (sjr.sj_code == 0) && !gotsuccess) { gotsuccess = 1; if (write_ac_socket(parent_write, &sjr, sizeof(sjr)) == -1) { } continue; } if (gotsuccess) { i = sizeof(sjr); } break; } /* END for(;;) */ j = errno; close(parent_read); if (i != sizeof(sjr)) { sprintf(log_buffer, "read of pipe for sid job %s got %d not %ld (errno: %d, %s)", pjob->ji_qs.ji_jobid, i, (long)sizeof(sjr), j, strerror(j)); log_err(j, __func__, log_buffer); close(parent_write); return(-1); } close(parent_write); DBPRT(("%s: read start return %d %ld\n", __func__, sjr.sj_code, (long)sjr.sj_session)) if (sjr.sj_code < 0) { char tmpLine[MAXLINE]; tmpLine[0] = '\0'; switch (sjr.sj_code) { case JOB_EXEC_OK: /* 0 */ /* NO-OP */ break; case JOB_EXEC_FAIL1: /* -1 */ case JOB_EXEC_STDOUTFAIL: /* -9 */ strcpy(tmpLine, "stdio setup failed"); break; case JOB_EXEC_FAIL2: /* -2 */ strcpy(tmpLine, "env setup or user dir problem"); break; case JOB_EXEC_RETRY: /* -3 */ strcpy(tmpLine, "unable to set limits, retry will be attempted"); break; case JOB_EXEC_CMDFAIL: /* -8 */ strcpy(tmpLine, "command exec failed"); break; default: sprintf(tmpLine, "code=%d", sjr.sj_code); break; } /* END switch (sjr.sj_code) */ sprintf(log_buffer, "task not started, '%s', %s (see syslog)", argv[0], tmpLine); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(-1); } /* END if (sjr.sj_code < 0) */ set_globid(pjob, &sjr); ptask->ti_qs.ti_sid = sjr.sj_session; ptask->ti_qs.ti_status = TI_STATE_RUNNING; if (LOGLEVEL >= 6) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "task set to running/saving task (start_process)"); } task_save(ptask); if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING) { pjob->ji_qs.ji_state = JOB_STATE_RUNNING; pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK,momport); } sprintf(log_buffer, "%s: task started, tid %d, sid %ld, cmd %s", __func__, ptask->ti_qs.ti_task, (long)ptask->ti_qs.ti_sid, argv[0]); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(0); } /* END else if (pid != 0) */ /************************************************/ /* The child process - will become the TASK */ /************************************************/ if (LOGLEVEL >= 6) log_ext(-1, __func__, "child starting", LOG_DEBUG); if (lockfds >= 0) { close(lockfds); lockfds = -1; } close(parent_read); close(parent_write); /* set up the environmental variables to be given to the job */ /* NOTE: use log_err beyond this point to write messages to syslog */ if (InitUserEnv(pjob, ptask, envp, NULL, NULL) < 0) { log_err(errno, __func__, "failed to setup user env"); starter_return(kid_write, kid_read, JOB_EXEC_RETRY, &sjr); /*NOTREACHED*/ exit(1); } if (LOGLEVEL >= 10) log_ext(-1, __func__, "user env initialized", LOG_DEBUG); if (set_mach_vars(pjob, &vtable) != 0) { strcpy(log_buffer, "PBS: machine dependent environment variable setup failed\n"); log_err(errno, __func__, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if (LOGLEVEL >= 10) log_ext(-1, __func__, "mach vars set", LOG_DEBUG); umask(077); /* set environment to reflect batch */ bld_env_variables(&vtable, "PBS_ENVIRONMENT", "PBS_BATCH"); bld_env_variables(&vtable, "ENVIRONMENT", "BATCH"); /* Set limits for the child */ if (mom_set_limits(pjob, SET_LIMIT_SET) != PBSE_NONE) { strcpy(log_buffer, "PBS: resource limits setup failed\n"); log_err(errno, __func__, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } /* NULL terminate the envp array, This is MUST DO */ *(vtable.v_envp + vtable.v_used) = NULL; /* ** Set up stdin. */ /* look through env for a port# on MS we should use for stdin */ if ((fd0 = search_env_and_open("MPIEXEC_STDIN_PORT", ipaddr)) == -2) { log_err(errno, __func__, "cannot locate MPIEXEC_STDIN_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if ((fd0 < 0) && ((fd0 = search_env_and_open("TM_STDIN_PORT", ipaddr)) == -2)) { log_err(errno, __func__, "cannot locate TM_STDIN_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } /* use /dev/null if no env var found */ if ((fd0 < 0) && (fd0 = open("/dev/null", O_RDONLY)) == -1) { log_err(errno, __func__, "could not open dev/null"); close(0); } else { dup2(fd0, 0); if (fd0 > 0) close(fd0); } /* look through env for a port# on MS we should use for stdout/err */ if ((fd1 = search_env_and_open("MPIEXEC_STDOUT_PORT", ipaddr)) == -2) { log_err(errno, __func__, "cannot locate MPIEXEC_STDOUT_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if (fd1 < 0) { if ((fd1 = search_env_and_open("TM_STDOUT_PORT", ipaddr)) == -2) { log_err(errno, __func__, "cannot locate TM_STDOUT_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } } if ((fd2 = search_env_and_open("MPIEXEC_STDERR_PORT", ipaddr)) == -2) { log_err(errno, __func__, "cannot locate MPIEXEC_STDERR_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if (fd2 < 0) { if ((fd2 = search_env_and_open("TM_STDERR_PORT", ipaddr)) == -2) { log_err(errno, __func__, "cannot locate TM_STDERR_PORT"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } } if (LOGLEVEL >= 10) log_ext(-1, __func__, "MPI/TM variables set", LOG_DEBUG); #ifdef PENABLE_LINUX26_CPUSETS if (use_cpusets(pjob) == TRUE) { int j; /* FIXME: vnodenum needs to be stored in the task struct so that we don't * have to fish it out here. */ for (j = 0;j < vtable.v_used;j++) { if (!strncmp(vtable.v_envp[j], "PBS_VNODENUM=", strlen("PBS_VNODENUM="))) { if (LOGLEVEL >= 6) { sprintf(log_buffer, "about to move to cpuset for job %s.\n", pjob->ji_qs.ji_jobid); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } /* Move this mom process into the cpuset so the job will start in it. */ move_to_job_cpuset(getpid(), pjob); } } } #endif /* (PENABLE_LINUX26_CPUSETS) */ if (pjob->ji_numnodes > 1) { /* ** Open sockets to demux proc for stdout and stderr. */ if ((fd1 < 0) && ((fd1 = open_demux(ipaddr, pjob->ji_portout)) == -1)) { log_err(errno, __func__, "cannot open mux stdout port"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } dup2(fd1, 1); if (fd1 > 1) close(fd1); if ((fd2 < 0) && ((fd2 = open_demux(ipaddr, pjob->ji_porterr)) == -1)) { log_err(errno, __func__, "cannot open mux stderr port"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } dup2(fd2, 2); if (fd2 > 2) close(fd2); /* never send cookie - PW mpiexec patch */ /* if (write_ac_socket(1,pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str, strlen(pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str)) == -1) {} if (write_ac_socket(2,pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str, strlen(pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str)) == -1) {} */ } else if ((pjob->ji_wattr[JOB_ATR_interactive].at_flags & ATR_VFLAG_SET) && (pjob->ji_wattr[JOB_ATR_interactive].at_val.at_long > 0)) { /* interactive job, single node, write to pty */ pts = -1; if ((fd1 < 0) || (fd2 < 0)) { if ((pts = open_pty(pjob)) < 0) { log_err(errno, __func__, "cannot open slave pty"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); /*NOTREACHED*/ exit(1); } if (fd1 < 0) fd1 = pts; if (fd2 < 0) fd2 = pts; } dup2(fd1, 1); dup2(fd2, 2); if (fd1 != pts) close(fd1); if (fd2 != pts) close(fd2); } else { /* This code block may be dead (may never be run). This is due to the fact that * start_process() is only called by a sister who has received a IM_SPAWN_TASK, * but the below comment states that the code only runs for a "single node" job, * and all sisters are part of a multi-node job. */ /* normal batch job, single node, write straight to files */ pts = -1; if ((fd1 < 0) || (fd2 < 0)) { if (open_std_out_err(pjob, -1) == -1) { log_err(errno, __func__, "cannot open job stderr/stdout files"); starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr); } } if (fd1 >= 0) { close(1); dup2(fd1, 1); if (fd1 > 1) close(fd1); } if (fd2 >= 0) { close(2); dup2(fd2, 2); if (fd2 > 2) close(fd2); } } /* END else */ /******************************************************* * At this point, output fds are setup for the job, * any further error messages should be written * directly to fd 2, with a \n, and ended with fsync(2) *******************************************************/ sjr.sj_session = setsid(); ptask->ti_qs.ti_sid = sjr.sj_session; proc_stat_t *ps = get_proc_stat((int)sjr.sj_session); if(ps != NULL) { pjob->ji_wattr[JOB_ATR_system_start_time].at_val.at_long = ps->start_time; pjob->ji_wattr[JOB_ATR_system_start_time].at_flags |= ATR_VFLAG_SET; } log_buffer[0] = '\0'; if ((i = mom_set_limits(pjob, SET_LIMIT_SET)) != PBSE_NONE) { if (log_buffer[0] != '\0') { /* report error to user via stderr file */ if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); } sprintf(log_buffer, "PBS: unable to set limits, err=%d\n", i); if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); if (i == PBSE_RESCUNAV) /* resource temp unavailable */ j = JOB_EXEC_RETRY; else j = JOB_EXEC_FAIL2; log_err(errno, __func__, log_buffer); starter_return(kid_write, kid_read, j, &sjr); } if ((idir = get_job_envvar(pjob, "PBS_O_ROOTDIR")) != NULL) { if (chroot(idir) == -1) { sprintf(log_buffer, "PBS: chroot to %.256s failed: %s\n", idir, strerror(errno)); if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, __func__, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr); } } /* become the user and execv the shell and become the real job */ /* NOTE: must set groups before setting the user because not all users can * call setgid and setgroups, even if its their group, see setgid's man page */ become_the_user_sjr(pjob, kid_write, kid_read, &sjr); /* cwd to PBS_O_INITDIR if specified, otherwise User's Home */ if ((idir = get_job_envvar(pjob, "PBS_O_INITDIR")) != NULL) { /* in start_process() executed as user */ if (chdir(idir) == -1) { sprintf(log_buffer, "PBS: chdir to %.256s failed: %s\n", idir, strerror(errno)); if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, __func__, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr); } } else { /* in start_process() executed as user */ if (chdir(pjob->ji_grpcache->gc_homedir) == -1) { sprintf(log_buffer, "PBS: chdir to %.256s failed: %s\n", pjob->ji_grpcache->gc_homedir, strerror(errno)); if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, __func__, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr); } } if (LOGLEVEL >= 10) log_ext(-1, __func__, "done - writing pipe and exec'ing", LOG_DEBUG); starter_return( kid_write, kid_read, JOB_EXEC_OK, &sjr); fcntl(kid_write, F_SETFD, FD_CLOEXEC); environ = vtable.v_envp; if (jobstarter_set) { char **argv_jobstarter; int argc = 0; int i; /* count argc - argv is always null terminated */ while (argv[argc] != NULL) argc++; /* add one for the jobstarter argument and one for NULL */ argc += 2; argv_jobstarter = (char **)calloc(argc, sizeof(char *)); argv_jobstarter[0] = jobstarter_exe_name; for (i = 1; i < argc - 1; i++) argv_jobstarter[i] = argv[i-1]; execvp(jobstarter_exe_name, argv_jobstarter); /* only reached on failure */ free(argv_jobstarter); } else execvp(argv[0], argv); /* only reached if execvp() fails */ sprintf(log_buffer, "PBS: %.256s: %s\n", argv[0], strerror(errno)); if (write_ac_socket(2, log_buffer, strlen(log_buffer)) == -1) { } fsync(2); log_err(errno, __func__, log_buffer); starter_return(kid_write, kid_read, JOB_EXEC_CMDFAIL, &sjr); exit(254); } /* END start_process() */ /* ** Free the ji_hosts and ji_vnods arrays for a job. If any events are ** attached to an array element, free them as well. */ void nodes_free( job *pj) /* I */ { void arrayfree (char **); hnodent *np; if (pj->ji_vnods != NULL) { free(pj->ji_vnods); pj->ji_vnods = NULL; } if (pj->ji_hosts != NULL) { for (np = pj->ji_hosts;np->hn_node != TM_ERROR_NODE;np++) { eventent *ep = (eventent *)GET_NEXT(np->hn_events); if (np->hn_host) free(np->hn_host); /* don't close stream incase another job uses it */ while (ep) { if (ep->ee_argv) arrayfree(ep->ee_argv); if (ep->ee_envp) arrayfree(ep->ee_envp); delete_link(&ep->ee_next); free(ep); ep = (eventent *)GET_NEXT(np->hn_events); } /* END while (ep) */ } /* END for (np) */ free(pj->ji_hosts); pj->ji_hosts = NULL; } /* END if (pj->ji_hosts != NULL) */ return; } /* END nodes_free() */ /* For job_radix we create a sister list for each member of the radix. * That is if the job_radix is three then there will be three sister lists. * If the job_radix is four there will be four sister lists. */ int add_host_to_sister_list( char *hostname, unsigned short port, struct radix_buf *list) { char *tmp; char *cp; char stringNum[10]; /* Have we used up all of our buffer. If so allocate more */ if ((int)(strlen(hostname)+list->current_string_len + 1) >= list->max_string_len) { /* This is a long list and we need to make more room */ tmp = (char *)realloc(list->host_list, list->max_string_len + THE_LIST_SIZE); assert(tmp != NULL); list->max_string_len += THE_LIST_SIZE; list->host_list = tmp; } if (list->host_list[0] != 0) { cp = &list->host_list[strlen(list->host_list)]; *cp = '+'; cp++; *cp = 0; strcat(list->host_list, hostname); cp = &list->port_list[strlen(list->port_list)]; *cp = '+'; cp++; *cp = 0; sprintf(stringNum, "%hu", port); strcat(list->port_list, stringNum); } else { strcpy(list->host_list, hostname); sprintf(stringNum, "%hu", port); strcpy(list->port_list, stringNum); } list->count++; return(0); } /* END add_host_to_sister_list() */ /** * Generate array hosts & vnodes for a job from the exec_host pbs_attribute. * Call nodes_free() just in case we have seen this job before. * Parse exec_host first to count the number of nodes and allocate * an array of nodeent's. Then, parse it again to get the hostname * of each node and init the other fields of each nodeent element. * The final element will have the ne_node field set to TM_ERROR_NODE. * * @see start_exec() - parent */ void job_nodes( job *pjob) /* I */ { int i; int j; int nhosts; int nodenum; int ix; char *cp = NULL; char *nodestr = NULL; char *portstr = NULL; hnodent *hp = NULL; vnodent *np = NULL; nodes_free(pjob); nodenum = 1; if (pjob->ji_wattr[JOB_ATR_exec_host].at_flags & ATR_VFLAG_SET) { nodestr = pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str; portstr = pjob->ji_wattr[JOB_ATR_exec_port].at_val.at_str; if (nodestr != NULL) { /* count how many nodes there are by counting the number of '+' * characters in the string */ for (cp = nodestr;*cp;cp++) { if (*cp == '+') nodenum++; } } } else { log_err(-1, __func__, "Cannot parse the nodes for a job without exec hosts being set"); return; } pjob->ji_hosts = (hnodent *)calloc(nodenum + 1, sizeof(hnodent)); pjob->ji_vnods = (vnodent *)calloc(nodenum + 1, sizeof(vnodent)); if ((pjob->ji_hosts == NULL) || (pjob->ji_vnods == NULL)) { log_err(-1,__func__,"Out of memory, system failure!\n"); return; } pjob->ji_numvnod = nodenum; nhosts = 0; np = pjob->ji_vnods; for (i = 0;i < nodenum;i++, np++) { char *dp; char nodename[MAXPATHLEN + 1]; char *portptr; char portnumber[MAXPORTLEN+1]; int portcount; struct addrinfo *addr_info; ix = 0; for (cp = nodestr, dp = nodename;*cp;cp++, dp++) { if (*cp == '/') { ix = atoi(cp + 1); while ((*cp != '\0') && (*cp != '+')) ++cp; if (*cp == '\0') { nodestr = cp; break; } } if (*cp == '+') { nodestr = cp + 1; break; } *dp = *cp; } *dp = '\0'; /* see if we already have this host - we only need to check the * last host as of 4.0.0 because each node can only be in the list * a maximum of one time */ if ((nhosts > 0) && (strcmp(nodename, pjob->ji_hosts[nhosts - 1].hn_host) == 0)) j = nhosts - 1; else { /* we need a new host */ j = nhosts; } /* Get the port number for this host */ for (cp = portstr, portptr = portnumber, portcount = 0; portcount < (MAXPORTLEN+1) && *cp; cp++, portptr++, portcount++) { if (*cp == '+') { portstr = cp + 1; break; } *portptr = *cp; } *portptr = '\0'; hp = &pjob->ji_hosts[j]; if (j == nhosts) { /* need to add host to hn_host */ hp->hn_node = nhosts++; hp->hn_sister = SISTER_OKAY; hp->hn_host = strdup(nodename); hp->hn_port = atoi(portnumber); CLEAR_HEAD(hp->hn_events); /* set up the socket address information */ if (pbs_getaddrinfo(nodename, NULL, &addr_info) == 0) { hp->sock_addr.sin_addr = ((struct sockaddr_in *)addr_info->ai_addr)->sin_addr; hp->sock_addr.sin_family = AF_INET; hp->sock_addr.sin_port = htons(hp->hn_port); } } np->vn_node = i; /* make up node id */ np->vn_host = &pjob->ji_hosts[j]; np->vn_index = ix; if (LOGLEVEL >= 4) { sprintf(log_buffer, "%d: %s/%d", np->vn_node, np->vn_host->hn_host, np->vn_index); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); } } /* END for (i) */ np->vn_node = TM_ERROR_NODE; pjob->ji_hosts[nhosts].hn_node = TM_ERROR_NODE; #ifdef NUMA_SUPPORT pjob->ji_numnodes = 1; #else if (is_login_node == FALSE) pjob->ji_numnodes = nhosts; else pjob->ji_numnodes = 1; #endif /* NUMA_SUPPORT */ pjob->ji_numvnod = nodenum; if (LOGLEVEL >= 2) { sprintf(log_buffer, "job: %s numnodes=%d numvnod=%d", pjob->ji_qs.ji_jobid, nhosts, nodenum); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); } return; } /* END job_nodes() */ /** * This is similar to job_nodes only this is done for * intermediate moms when a job_radix has been set. * The intermediate moms need to know what sisters they * are in charge of. sister_job_nodes parses the list of * sisters given to the intermediate mom from its parent * and adds the names to its copy of the job structure. * To clarify, intermediate moms will have a slightly different * copy of a job than the rest of the sisters because they * will keep track of the sisters in their radix. * * @see start_exec() - parent */ void sister_job_nodes( job *pjob, char *radix_hosts, char *radix_ports ) /* I */ { int i; int j; int nhosts; int nodenum; int portnum; int ix; char *cp = NULL; char *nodestr = NULL; char *portstr = NULL; hnodent *hp = NULL; vnodent *np = NULL; /* nodes_free(pjob); We may need to do a sister_nodes_free later */ nodenum = 1; if (radix_hosts == NULL) return; if (radix_ports == NULL) return; nodestr = radix_hosts; /* count the number of nodes */ for(cp = nodestr; *cp; cp++) { if (*cp == '+') { nodenum++; } } portstr = radix_ports; /* count the number of ports. It must be equal to * number of hosts */ portnum = 1; for(cp = portstr; *cp; cp++) { if (*cp == '+') { portnum++; } } if (portnum != nodenum) { return; } pjob->ji_sisters = (hnodent *)calloc(nodenum + 1, sizeof(hnodent)); if (pjob->ji_sisters == NULL) return; pjob->ji_sister_vnods = (vnodent *)calloc(nodenum + 1, sizeof(vnodent)); if (pjob->ji_sister_vnods == NULL) return; nhosts = 0; np = pjob->ji_sister_vnods; for (i = 0;i < nodenum;i++, np++) { char *dp, nodename[MAXPATHLEN + 1]; char *portptr, portnumber[MAXPORTLEN + 1]; int portcount; ix = 0; for (cp = nodestr, dp = nodename;*cp;cp++, dp++) { if (*cp == '/') { ix = atoi(cp + 1); while ((*cp != '\0') && (*cp != '+')) ++cp; if (*cp == '\0') { nodestr = cp; break; } } if (*cp == '+') { nodestr = cp + 1; break; } *dp = *cp; } *dp = '\0'; /* see if we already have this host */ for (j = 0;j < nhosts;++j) { if (strcmp(nodename, pjob->ji_sisters[j].hn_host) == 0) break; } /* Get the port number for this host */ for(cp = portstr, portptr = portnumber, portcount = 0;portcount < (MAXPORTLEN+1)&& *cp; cp++, portptr++, portcount++) { if (*cp == '+') { portstr = cp + 1; break; } *portptr = *cp; } *portptr = 0; hp = &pjob->ji_sisters[j]; if (j == nhosts) { /* need to add host to tn_host */ hp->hn_node = nhosts++; hp->hn_sister = SISTER_OKAY; hp->hn_host = strdup(nodename); hp->hn_port = atoi(portnumber); CLEAR_HEAD(hp->hn_events); } np->vn_node = i; /* make up node id */ np->vn_host = &pjob->ji_sisters[j]; np->vn_index = ix; if (LOGLEVEL >= 4) { sprintf(log_buffer, "%d: %s/%d", np->vn_node, np->vn_host->hn_host, np->vn_index); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); } } /* END for (i) */ np->vn_node = TM_ERROR_NODE; pjob->ji_sisters[nhosts].hn_node = TM_ERROR_NODE; pjob->ji_numsisternodes = nhosts; pjob->ji_numsistervnod = nodenum; if (LOGLEVEL >= 2) { sprintf(log_buffer, "job: %s numnodes=%d numvnod=%d", pjob->ji_qs.ji_jobid, nhosts, nodenum); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); } return; } /* END sister_job_nodes() */ int send_join_job_to_a_sister( job *pjob, int stream, eventent *ep, tlist_head phead, int node_id) { tcp_chan *chan = DIS_tcp_setup(stream); int ret = DIS_NOMALLOC; if (chan != NULL) { if ((ret = im_compose(chan, pjob->ji_qs.ji_jobid, pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str, IM_JOIN_JOB, ep->ee_event, TM_NULL_TASK)) != DIS_SUCCESS) { } else if ((ret = diswsi(chan, node_id)) != DIS_SUCCESS) { } else if ((ret = diswsi(chan, pjob->ji_numnodes)) != DIS_SUCCESS) { } else if ((ret = diswsi(chan, pjob->ji_portout)) != DIS_SUCCESS) { } else if ((ret = diswsi(chan, pjob->ji_porterr)) != DIS_SUCCESS) { } else { svrattrl *psatl = (svrattrl *)GET_NEXT(phead); if ((ret = encode_DIS_svrattrl(chan, psatl)) == DIS_SUCCESS) { ret = DIS_tcp_wflush(chan); } } DIS_tcp_cleanup(chan); } return(ret); } /* END send_join_job_to_a_sister() */ int send_join_job_to_sisters( job *pjob, int nodenum, tlist_head phead) { int i; int retry_count; int stream; int ret = PBSE_NONE; eventent *ep; hnodent *np; int send_failed_size = nodenum * sizeof(int); int *send_failed = (int *)calloc(nodenum, sizeof(int)); int unsent_count = nodenum - 1; bool permanent_fail = false; std::set sisters_contacted; errno = 0; memset(send_failed, -1, send_failed_size); for (i = 1; i < nodenum; i++) send_failed[i] = i; for (retry_count = 0; retry_count < 5 && permanent_fail == false; retry_count++) { if (unsent_count == 0) break; for (i = 1; i < nodenum; i++) { if (send_failed[i] == DIS_SUCCESS) continue; np = &pjob->ji_hosts[i]; log_buffer[0] = '\0'; ret = -1; stream = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr,sizeof(np->sock_addr)); if (IS_VALID_STREAM(stream)) { ep = event_alloc(IM_JOIN_JOB, np, TM_NULL_EVENT, TM_NULL_TASK); ret = send_join_job_to_a_sister(pjob, stream, ep, phead, i); close(stream); } else if (stream == PERMANENT_SOCKET_FAIL) { permanent_fail = true; break; } if (ret == DIS_SUCCESS) { sisters_contacted.insert(i); send_failed[i] = DIS_SUCCESS; unsent_count--; } } /* END for each node */ } /* END for 5 retries */ if (unsent_count > 0) { snprintf(log_buffer, sizeof(log_buffer), "Failed to send join job to %d of %d sisters.", unsent_count, nodenum - 1); if (LOGLEVEL >= 3) { /* append the names of the sisters that failed */ int len; dynamic_string *ds = get_dynamic_string(-1, NULL); for (i = 1; i < nodenum; i++) { if (send_failed[i] != DIS_SUCCESS) { if (ds->used != 0) append_dynamic_string(ds, ", "); append_dynamic_string(ds, pjob->ji_hosts[i].hn_host); } } len = strlen(log_buffer); snprintf(log_buffer + len, sizeof(log_buffer) - len, " The list of nodes it failed to contact was %s", ds->str); free_dynamic_string(ds); } log_err(errno, __func__, log_buffer); exec_bail(pjob, JOB_EXEC_RETRY, &sisters_contacted); ret = PBSE_CANTCONTACTSISTERS; } else ret = PBSE_NONE; free(send_failed); return(ret); } /* END send_join_job_to_sisters() */ int generate_cookie( job *pjob) { char *tt; extern time_t loopcnt; MD5_CTX c; int i; char log_buffer[LOG_BUF_SIZE]; if (!(pjob->ji_wattr[JOB_ATR_Cookie].at_flags & ATR_VFLAG_SET)) { if ((tt = (char *)calloc(1, JOB_COOKIE_SIZE)) == NULL) { log_err(ENOMEM, __func__, "cannot alloc memory"); sprintf(log_buffer, "memory allocation failure. job id %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); exec_bail(pjob, JOB_EXEC_FAIL1); return(-1); } pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str = tt; pjob->ji_wattr[JOB_ATR_Cookie].at_flags |= ATR_VFLAG_SET; loopcnt++; MD5Init(&c); MD5Update(&c, (unsigned char *)&loopcnt, sizeof(loopcnt)); MD5Update(&c, (unsigned char *)pjob, sizeof(job)); MD5Final(&c); for (i = 0;i < 16;i++) { sprintf(&tt[i * 2], "%02X", c.digest[i]); } DBPRT(("===== MD5 %s\n", tt)) } /* END if () */ return(PBSE_NONE); } /* END generate_cookie */ #ifdef PENABLE_LINUX26_CPUSETS #ifndef NUMA_SUPPORT void create_cpuset_reservation_if_needed( job &pjob) { /* create the cpuset reservation before forking - only do this for cpuset builds that * aren't built for NUMA and don't have a geometry request */ resource *presc = NULL; resource_def *prd = NULL; prd = find_resc_def(svr_resc_def, "procs_bitmap", svr_resc_size); presc = find_resc_entry(&pjob.ji_wattr[JOB_ATR_resource],prd); if ((presc == NULL) || (presc->rs_value.at_flags & ATR_VFLAG_SET) == FALSE) { /* this means there is no geometry request */ long long mem_requested = get_memory_requested_in_kb(pjob); int cpu_count = get_cpu_count_requested_on_this_node(pjob); // make sure the memory is evenly set over the job. double mem_pcnt = ((double)cpu_count) / pjob.ji_numvnod; mem_requested = mem_requested * mem_pcnt; internal_layout.reserve(cpu_count, mem_requested, pjob.ji_qs.ji_jobid); } } void recover_cpuset_reservation( job &pjob) { /* recover the cpuset reservation of running jobs * only do this for cpuset builds that aren't built for NUMA and don't * have a geometry request */ resource *presc = NULL; resource_def *prd = NULL; prd = find_resc_def(svr_resc_def, "procs_bitmap", svr_resc_size); presc = find_resc_entry(&pjob.ji_wattr[JOB_ATR_resource],prd); if ((presc == NULL) || (presc->rs_value.at_flags & ATR_VFLAG_SET) == FALSE) { /* this means there is no geometry request */ long long mem_requested = get_memory_requested_in_kb(pjob); int cpu_count = get_cpu_count_requested_on_this_node(pjob); // make sure the memory is evenly set over the job. double mem_pcnt = ((double)cpu_count) / pjob.ji_numvnod; mem_requested = mem_requested * mem_pcnt; internal_layout.recover_reservation(cpu_count, mem_requested, pjob.ji_qs.ji_jobid); } } #endif #endif /** * Start execution of a job. * * @see req_commit() - parent - handle 'commit' request from pbs_server * * @see TMomFinalizeJob1() - child * @see TMomFinalizeJob2() - child * @see TMomFinalizeJob3() - child * * @see start_process() - peer - called for sisters after receiving IM_SPAWN_TASK request * * NOTE: prior to calling start_exec, job is newly allocated, and added to * svr_alljobs w/pjob->ji_qs.ji_state = JOB_STATE_RUNNING and * pjob->ji_qs.ji_substate = JOB_SUBSTATE_PRERUN. * * NOTE: For parallel jobs, this routine will open connections to each sister * mom and will send a join_job request along each connection. */ int start_exec( job *pjob) /* I (modified) */ { int nodenum; int ret; #ifndef NUMA_SUPPORT int i; int j; int local_errno; int addr_len; hnodent *np; pbs_attribute *pattr; tlist_head phead; int mom_radix = 0; struct radix_buf **sister_list; struct timeval start_time; struct timezone tz; struct timeval tv; struct timeval *tv_attr; struct timeval result; #endif /* ndef NUMA_SUPPORT */ char tmpdir[MAXPATHLEN]; /* Step 1.0 Generate Cookie */ if (generate_cookie(pjob) != PBSE_NONE) { /* couldn't allocate memory */ return(PBSE_MEM_MALLOC); } /* Step 2.0 Initialize Job */ /* update nodes info w/in job based on exec_hosts pbs_attribute */ job_nodes(pjob); /* start_exec only executed on mother superior */ pjob->ji_nodeid = 0; /* I'm MS */ nodenum = pjob->ji_numnodes; /* Step 3.0 Validate/Initialize Environment */ /* check creds early because we need the uid/gid for TMakeTmpDir() */ if (!check_pwd(pjob)) { sprintf(log_buffer, "bad credentials: job id %s", pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buffer); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); exec_bail(pjob, JOB_EXEC_FAIL1); return(PBSE_BADUSER); } /* should we make a tmpdir? */ if (TTmpDirName(pjob, tmpdir, sizeof(tmpdir))) { if ((ret = TMakeTmpDir(pjob, tmpdir)) != PBSE_NONE) { snprintf(log_buffer, sizeof(log_buffer), "cannot create temp dir '%s', job id %s", tmpdir, pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buffer); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); exec_bail(pjob, JOB_EXEC_FAIL1); return(ret); } } #ifdef PENABLE_LINUX26_CPUSETS #ifndef NUMA_SUPPORT create_cpuset_reservation_if_needed(*pjob); #endif #endif /* if nodecount > 1, return once joins are sent, if nodecount == 1, return once job is started */ #ifndef NUMA_SUPPORT if ((pjob->ji_wattr[JOB_ATR_job_radix].at_flags & ATR_VFLAG_SET) && (pjob->ji_wattr[JOB_ATR_job_radix].at_val.at_long != 0)) { /* parallel job */ mom_radix = pjob->ji_wattr[JOB_ATR_job_radix].at_val.at_long; } pjob->ji_radix = mom_radix; /* this starts tracking total run time for the MOM */ pattr = &pjob->ji_wattr[JOB_ATR_total_runtime]; if (gettimeofday(&start_time, &tz) == 0) { pattr->at_val.at_timeval.tv_sec = start_time.tv_sec; pattr->at_val.at_timeval.tv_usec = start_time.tv_usec; } /* If the job_radix pbs_attribute has been set then nodenum must be at least one more than mom_radix or there is no point in doing a radix */ if ((mom_radix > 0) && ((mom_radix + 1) <= nodenum) && (is_login_node == FALSE)) { pjob->ji_resources = (noderes *)calloc(nodenum - 1, sizeof(noderes)); assert(pjob->ji_resources != NULL); pjob->ji_resources[0].nr_cput = 0; pjob->ji_resources[0].nr_mem = 0; pjob->ji_resources[0].nr_vmem = 0; pjob->ji_joins_sent = time(NULL); if ((ret = allocate_demux_sockets(pjob, MOTHER_SUPERIOR)) != PBSE_NONE) return(ret); CLEAR_HEAD(phead); pattr = pjob->ji_wattr; /* prepare the attributes to go out on the wire. at_encode does this */ for (i = 0;i < JOB_ATR_LAST;i++) { (job_attr_def + i)->at_encode( pattr + i, &phead, (job_attr_def + i)->at_name, NULL, ATR_ENCODE_MOM, ATR_DFLAG_ACCESS); } /* END for (i) */ attrl_fixlink(&phead); pjob->ji_sisters = NULL; pjob->ji_numsisternodes = 0; /* Parse nodes into the radix */ /* First mother superior needs to keep track of the sisters that will be in her first job_radix level */ /* create list of sisters for mother superiors radix. * This list will include mother superior and a list * of hosts equal to the size of the job_radix. */ sister_list = allocate_sister_list(mom_radix+1); for (i = 0; i <= mom_radix; i++) { char *host_addr = NULL; unsigned short af_family; np = &pjob->ji_hosts[i]; add_host_to_sister_list(np->hn_host, np->hn_port, sister_list[0]); ret = get_hostaddr_hostent_af(&local_errno, np->hn_host, &af_family, &host_addr, &addr_len); memmove(&np->sock_addr.sin_addr, host_addr, addr_len); np->sock_addr.sin_port = htons(np->hn_port); np->sock_addr.sin_family = af_family; free(host_addr); } sister_job_nodes(pjob, sister_list[0]->host_list, sister_list[0]->port_list); free_sisterlist(sister_list, mom_radix + 1); /* The first element in the sister list will be the originator of the IM_JOIN_JOB_RADIX request. When the IM_OK_REPLY is received back the intermediate mothers need to know who called them so they can reply. This will always be the first sister in the list */ /* now allocate sister list for all the sisters */ sister_list = allocate_sister_list(mom_radix); np = &pjob->ji_hosts[0]; /* This is mother superior. Mother superior will be the first sister in the list */ for (j = 0; j < mom_radix; j++) { add_host_to_sister_list(np->hn_host, np->hn_port, sister_list[j]); } i = 1; /* Mother superior was the first entry, now start with the sisters */ do { for (j = 0; j < mom_radix && i < nodenum; j++) { /* Generate a list of sisters divided in to 'mom_radix' number of lists. For example an exec_host list of host1+host2+host3+host4+host5+host6+host7 would create sister lists on a mom_radix of 3 like the following host1+host4+host7 host2+host5 host3+host6 */ np = &pjob->ji_hosts[i]; add_host_to_sister_list(np->hn_host, np->hn_port, sister_list[j]); i++; } } while (i < nodenum); /* the sister lists have been made. Now contact the intermediate moms as designated by mom_radix */ open_tcp_stream_to_sisters( pjob, IM_JOIN_JOB_RADIX, -1, mom_radix, pjob->ji_hosts, sister_list, &phead, MOTHER_SUPERIOR); free_attrlist(&phead); free_sisterlist(sister_list, mom_radix); } else if ((nodenum > 1) && (is_login_node == FALSE)) { /* Step 4.0A Send Join Request to Sisters */ /* parallel job */ pjob->ji_resources = (noderes *)calloc(nodenum - 1, sizeof(noderes)); assert(pjob->ji_resources != NULL); /* open a pair of sockets for pbs_demux used later */ if ((ret = allocate_demux_sockets(pjob, MOTHER_SUPERIOR)) != PBSE_NONE) { /* can't gather stdout/err for the job - FAIL */ return(ret); } CLEAR_HEAD(phead); pattr = pjob->ji_wattr; for (i = 0;i < JOB_ATR_LAST;i++) { (job_attr_def + i)->at_encode( pattr + i, &phead, (job_attr_def + i)->at_name, NULL, ATR_ENCODE_MOM, ATR_DFLAG_ACCESS); } /* END for (i) */ attrl_fixlink(&phead); if ((ret = send_join_job_to_sisters(pjob, nodenum, phead)) != DIS_SUCCESS) { /* couldn't contact all of the sisters, we've already bailed */ free_attrlist(&phead); return(ret); } pjob->ji_joins_sent = time(NULL); /* We made it to here. That means all of the sisters responded and we can now start the job */ if (LOGLEVEL >= 6) { if (gettimeofday(&tv, &tz) == 0) { tv_attr = &pjob->ji_wattr[JOB_ATR_total_runtime].at_val.at_timeval; timeval_subtract(&result, &tv, tv_attr); sprintf(log_buffer, "%s: total wire-up time for job %ld.%ld", __func__, result.tv_sec, result.tv_usec); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } } free_attrlist(&phead); /* The job will execute on mother superior when all sister nodes have replied */ } /* END if (nodenum > 1) */ else #endif /* ndef NUMA_SUPPORT */ { /* Step 4.0B Launch Serial Task Locally */ /* serial job */ /* single node job - no sisters */ pjob->ji_porterr = -1; pjob->ji_portout = -1; pjob->ji_stdout = -1; pjob->ji_stderr = -1; if (exec_job_on_ms(pjob) == PBSE_NONE) { /* SUCCESS */ if (LOGLEVEL >= 3) { sprintf(log_buffer,"%s:job %s reported successful start on %d node(s)", __func__, pjob->ji_qs.ji_jobid, nodenum); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } } else { if (LOGLEVEL >= 3) { sprintf(log_buffer,"%s:job %s reported failure to start of %d node(s)", __func__, pjob->ji_qs.ji_jobid, nodenum); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } } } /* end else if mom_radix > 0 */ return(PBSE_NONE); } /* END start_exec() */ /* * fork_me - fork mom, close all other connections and set default signal actions */ pid_t fork_me( int conn) /* I */ { struct sigaction act; pid_t pid; fflush(stdout); fflush(stderr); pid = fork(); if (pid == 0) { /* now the child */ /* Turn off alarm if it should happen to be on */ alarm(0); /* Reset signal actions for most to SIG_DFL */ sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = SIG_DFL; sigaction(SIGCHLD, &act, (struct sigaction *)0); #ifdef _CRAY sigaction(WJSIGNAL, &act, (struct sigaction *)0); #endif /* _CRAY */ sigaction(SIGHUP, &act, (struct sigaction *)0); sigaction(SIGINT, &act, (struct sigaction *)0); sigaction(SIGTERM, &act, (struct sigaction *)0); /* reset signal mask */ sigprocmask(SIG_SETMASK, &act.sa_mask, NULL); mom_close_poll(); /* NOTE: close logfile, lockfile, and connection to server (NYI) */ if (lockfds >= 0) { close(lockfds); lockfds = -1; } log_close(0); net_close(conn); /* close all but for the current */ /* release mlock; it seems to be inherited even though the * man page claims otherwise */ #ifdef _POSIX_MEMLOCK munlockall(); #endif /* _POSIX_MEMLOCK */ } else if (pid < 0) { log_err(errno, "fork_me", "fork failed"); } return(pid); } /* END fork_me() */ /* * starter_return - return starter value, * exit if negative */ void starter_return( int upfds, /* I */ int downfds, /* I */ int code, /* I */ struct startjob_rtn *sjrtn) /* I */ { struct startjob_rtn ack; int i; int alarmsecs = 0; char rsv_id[MAXLINE]; sjrtn->sj_code = code; if (write_ac_socket(upfds, (char *)sjrtn, sizeof(*sjrtn)) == -1) { } if (code < 0) close(upfds); /* * Wait for acknowledgement. Need to allow for a timeout. If it takes a while * to start the job which includes running prologues then the mom we are * communicating with may have timed out and gone into a recheck mode using * TMOMScanForStarting(). This works fine until someone qdels the job before * this mom can reply. In this case TMOMScanForStarting() won't see this job * anymore and will not read what we just wrote and we will hang forever on * the read. If we timed out we should probably exit instead of continuing on * and trying to start the job which was deleted. */ do { alarm(120); i = read_ac_socket(downfds, &ack, sizeof(ack)); alarmsecs = alarm(0); if ((i == -1) && (errno != EINTR)) { break; } /* check for case where job has been qdel'd */ if ((i == -1) && (errno == EINTR) && (alarmsecs == 0) && (code == 0)) { close(downfds); exit(0); } } while (i < 0); close(downfds); if (code < 0) { /* for login nodes, release the reservation if one has been made */ if ((is_login_node == TRUE) && (sjrtn->sj_rsvid != 0)) { snprintf(rsv_id, sizeof(rsv_id), "%d", sjrtn->sj_rsvid); destroy_alps_reservation(rsv_id, apbasil_path, apbasil_protocol, 1); } exit(254); } return; } /* END starter_return() */ /* * remove_leading_hostname * * removes the leading hostname from the output path in jobpath * the path is stored as : * this retrieves just the path * * @param jobpath - the altered path, I/O * @return SUCCESS if hostname is removed, FALSE otherwise */ int remove_leading_hostname( char **jobpath) /* I / O */ { char *ptr; if ((jobpath == NULL) || (*jobpath == NULL)) { return(FAILURE); } ptr = strchr(*jobpath,':'); if (ptr == NULL) { return(FAILURE); } /* SUCCESS, move past the ':' and return the rest */ ptr++; *jobpath = ptr; return(SUCCESS); } /* END remove_leading_hostname() */ /* * std_file_name - generate the fully qualified path/name for a * job standard stream * * called by TMomFinalizeJob2 fork_me TMomFinalizeChild open_std_out_err open_std_file std_file_name */ char *std_file_name( job *pjob, /* I */ enum job_file which, /* I */ int *keeping) /* O (0 is no keep, 1 is keep) */ { static char path[MAXPATHLEN + 1]; char key; int len; char *pd; const char *suffix; char *jobpath = NULL; #ifdef QSUB_KEEP_NO_OVERRIDE char *pt; char endpath[MAXPATHLEN + 1]; #endif #if NO_SPOOL_OUTPUT == 0 int havehomespool = 0; extern char *TNoSpoolDirList[]; #else /* NO_SPOOL_OUTPUT */ struct stat myspooldir; static char path_alt[MAXPATHLEN + 1]; int rcstat; #endif /* NO_SPOOL_OUTPUT */ if (LOGLEVEL >= 5) { sprintf(log_buffer, "getting %s file name", (which == StdOut) ? "stdout" : "stderr"); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } if (keeping == NULL) { /* FAILURE */ return(NULL); } if ((pjob->ji_wattr[JOB_ATR_interactive].at_flags & ATR_VFLAG_SET) && (pjob->ji_wattr[JOB_ATR_interactive].at_val.at_long > 0)) { /* interactive job, name of pty is in outpath */ *keeping = 0; return(pjob->ji_wattr[JOB_ATR_outpath].at_val.at_str); } if (pjob->ji_grpcache == NULL) { /* FAILURE - ji_grpcache required for gc_homedir information */ return(NULL); } switch (which) { case StdOut: key = 'o'; suffix = JOB_STDOUT_SUFFIX; if (pjob->ji_wattr[JOB_ATR_outpath].at_flags & ATR_VFLAG_SET) { jobpath = pjob->ji_wattr[JOB_ATR_outpath].at_val.at_str; if (spoolasfinalname == TRUE) { remove_leading_hostname(&jobpath); if (expand_path(pjob, jobpath, sizeof(path), path) != SUCCESS) { return(NULL); } } } break; case StdErr: key = 'e'; suffix = JOB_STDERR_SUFFIX; if (pjob->ji_wattr[JOB_ATR_errpath].at_flags & ATR_VFLAG_SET) { jobpath = pjob->ji_wattr[JOB_ATR_errpath].at_val.at_str; if (spoolasfinalname == TRUE) { remove_leading_hostname(&jobpath); if (expand_path(pjob,jobpath,sizeof(path),path) != SUCCESS) { return(NULL); } } } break; case Checkpoint: default: key = '\001'; /* should never be found */ suffix = JOB_CHECKPOINT_SUFFIX; break; } /* END switch (which) */ /* everything that changes the path here is ignored if spoolasfinalname is set * to true. spoolasfinalname specifies directly spooling as the output file */ /* Is file to be kept?, if so, place the stderr/stdout files in the * path specified by the user. The path must be local to the execution node, * if a hostname is supplied, it will be stripped off. The only suppported * environment variable is $HOME--any other will cause files to be placed in * the home directory (the default location). */ if ((pjob->ji_wattr[JOB_ATR_keep].at_flags & ATR_VFLAG_SET) && (strchr(pjob->ji_wattr[JOB_ATR_keep].at_val.at_str, key))) { /* yes, it is to be kept */ if (spoolasfinalname == FALSE) { strcpy(path, pjob->ji_grpcache->gc_homedir); pd = strrchr(pjob->ji_wattr[JOB_ATR_jobname].at_val.at_str, '/'); if (pd == NULL) { pd = pjob->ji_wattr[JOB_ATR_jobname].at_val.at_str; strcat(path, "/"); } #ifdef QSUB_KEEP_NO_OVERRIDE /* don't do for checkpoint file names, only StdErr and StdOut */ if (strcmp(suffix, JOB_CHECKPOINT_SUFFIX) != 0) { pt = strstr(jobpath, "$HOME"); if (pt != NULL) { strcpy(endpath, pt + 5); strcpy(pt, pjob->ji_grpcache->gc_homedir); strcat(jobpath, endpath); } if ((strstr(jobpath, pd) == NULL) && (strchr(jobpath, '$') == NULL)) { if (jobpath[strlen(jobpath) - 1] != '/') { strcat(jobpath, "/"); } pt = strchr(jobpath, ':'); if (pt == NULL) { strcpy(path, jobpath); } else { strcpy(path, pt + 1); } } } #endif strcat(path, pd); /* start with the job name */ len = strlen(path); *(path + len++) = '.'; /* the dot */ *(path + len++) = key; /* the letter */ pd = pjob->ji_qs.ji_jobid; /* the seq_number */ while (isdigit((int)*pd)) *(path + len++) = *pd++; *(path + len) = '\0'; } /* END if (spoolasfinalname == FALSE) */ *keeping = 1; } else { /* don't bother keeping output if the user actually wants to discard it */ if ((jobpath != NULL) && (*jobpath != '\0')) { char *ptr; if ((ptr = strchr(jobpath, ':')) != NULL) { jobpath = ptr + 1; } if (!strcmp(jobpath, "/dev/null")) { strcpy(path, "/dev/null"); *keeping = 1; return(path); } } /* put into spool directory unless NO_SPOOL_OUTPUT is defined */ #if NO_SPOOL_OUTPUT == 1 if (spoolasfinalname == FALSE) { /* force all output to user's HOME */ snprintf(path, sizeof(path), "%s", pjob->ji_grpcache->gc_homedir); /* check for $HOME/.pbs_spool */ /* if it's not a directory, just use $HOME us usual */ snprintf(path_alt, sizeof(path_alt), "%s/.pbs_spool/", path); if (setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, TRUE) == -1) { return(NULL); } rcstat = stat(path_alt, &myspooldir); setuid_ext(pbsuser, TRUE); if ((rcstat == 0) && (S_ISDIR(myspooldir.st_mode))) snprintf(path, sizeof(path), "%s", path_alt); else strncat(path, "/", sizeof(path) - 1); *keeping = 1; } /* END if (spoolasfinalname == FALSE) */ #else /* NO_SPOOL_OUTPUT */ if (spoolasfinalname == FALSE) { if ((TNoSpoolDirList[0] != NULL)) { int dindex; char *wdir; wdir = get_job_envvar(pjob, "PBS_O_WORKDIR"); if (LOGLEVEL >= 10) { sprintf(log_buffer, "wdir: %s", wdir); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } if (wdir != NULL) { /* check if job's work dir matches the no-spool directory list */ if (LOGLEVEL >= 10) log_ext(-1, __func__, "inside wdir != NULL", LOG_DEBUG); for (dindex = 0;dindex < TMAX_NSDCOUNT;dindex++) { if (TNoSpoolDirList[dindex] == NULL) break; if (!strcasecmp(TNoSpoolDirList[dindex], "$WORKDIR") || !strcmp(TNoSpoolDirList[dindex], "*")) { havehomespool = 1; if (LOGLEVEL >= 10) log_ext(-1, __func__, "inside !strcasecmp", LOG_DEBUG); snprintf(path, sizeof(path), "%s", wdir); break; } if (!strncmp(TNoSpoolDirList[dindex], wdir, strlen(TNoSpoolDirList[dindex]))) { havehomespool = 1; if (LOGLEVEL >= 10) log_ext(-1, __func__, "inside !strncmp", LOG_DEBUG); snprintf(path, sizeof(path), "%s", wdir); break; } } /* END for (dindex) */ } /* END if (wdir != NULL) */ } /* END if (TNoSpoolDirList != NULL) */ if (havehomespool == 0) { snprintf(path, sizeof(path), "%s", path_spool); } else { strncat(path, "/", sizeof(path) - strlen(path) - 1); } } /* END if (spoolasfinalname == FALSE) */ *keeping = 0; #endif /* NO_SPOOL_OUTPUT */ if (spoolasfinalname == FALSE) { strncat(path, pjob->ji_qs.ji_fileprefix, (sizeof(path) - strlen(path) - 1)); strncat(path, suffix, (sizeof(path) - strlen(path) - 1)); if (LOGLEVEL >= 10) { sprintf(log_buffer, "path: '%s' prefix: '%s' suffix: '%s'", path, pjob->ji_qs.ji_fileprefix, suffix); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } } /* END if (spoolasfinalname == FALSE) */ } /* END else ((pjob->ji_wattr[JOB_ATR_keep].at_flags & ...)) */ return(path); } /* END std_file_name() */ /** * open_std_file - open/create either standard output or standard error * for the job. * * NOTE: called by pbs_mom child - cannot log to mom log file - use * log_err to report to syslog * * RETURN: -1 on failure, -2 on timeout, or file descriptor on success */ int open_std_file( job *pjob, /* I */ enum job_file which, /* which file */ int mode, /* file mode */ gid_t exgid) /* gid for file */ { int fds; int keeping; /* boolean: 1=TRUE, 0=FALSE */ char *path; int old_umask = 0; char local_buf[LOCAL_LOG_BUF_SIZE + 1]; int changed_to_user = FALSE; int rc; struct stat statbuf; if ((path = std_file_name(pjob, which, &keeping)) == NULL) { log_err(-1, __func__, "cannot determine filename"); /* FAILURE - cannot determine filename */ return(-1); } /* become user to create file, if we aren't already the user. In * run_pelog setuid etc. are called and the this function is invoked, * so doing this again fails and is unnecessary */ if (LOGLEVEL > 7) { sprintf(log_buffer, "job %s which = %d getuid() = %d geteuid = %d to euid = %d", pjob->ji_qs.ji_jobid, which, getuid(), geteuid(), pjob->ji_qs.ji_un.ji_momt.ji_exuid); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } #ifdef __CYGWIN__ if (IamRoot() == 1) #else if ((getuid() == 0) && (geteuid() != pjob->ji_qs.ji_un.ji_momt.ji_exuid)) #endif { if (setgroups(pjob->ji_grpcache->gc_ngroup,(gid_t *)pjob->ji_grpcache->gc_groups) != PBSE_NONE) { snprintf(log_buffer,sizeof(log_buffer), "setgroups failed for UID = %lu, error: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); log_err(errno, __func__, log_buffer); } if (setegid(pjob->ji_qs.ji_un.ji_momt.ji_exgid) != PBSE_NONE) { snprintf(log_buffer, sizeof(log_buffer), "setegid(%lu) failed for UID = %lu, error: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exgid, (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); log_err(errno, __func__, log_buffer); return(-1); } if (setuid_ext(pjob->ji_qs.ji_un.ji_momt.ji_exuid, TRUE) != PBSE_NONE) { snprintf(log_buffer, sizeof(log_buffer), "seteuid(%lu) failed, error: %s\n", (unsigned long)pjob->ji_qs.ji_un.ji_momt.ji_exuid, strerror(errno)); log_err(errno, __func__, log_buffer); setegid(pbsgroup); return(-1); } changed_to_user = TRUE; } /* these checks are a bit complicated. If keeping, we do what the user * says. Otherwise, make sure we aren't following a symlink and that the user owns the file without breaking /dev/null. */ if (keeping) { mode &= ~O_EXCL; } else { if (lstat(path, &statbuf) == 0) { /* lstat succeeded */ if (S_ISLNK(statbuf.st_mode)) { log_err(-1, __func__, "std file is symlink, someone is doing something fishy"); goto reset_ids_fail; } if (S_ISREG(statbuf.st_mode)) { if (statbuf.st_uid != pjob->ji_qs.ji_un.ji_momt.ji_exuid) { snprintf(local_buf, LOCAL_LOG_BUF_SIZE,"std file exists with the wrong owner, userid: %d owner: %d. someone is doing something fishy", pjob->ji_qs.ji_un.ji_momt.ji_exuid, statbuf.st_uid); log_err(-1, __func__, local_buf); goto reset_ids_fail; } if ((statbuf.st_gid != exgid) && (statbuf.st_gid != 0)) { int i; int equal = FALSE; /* check all of the secondary groups before throwing an error */ for (i = 0; i < pjob->ji_grpcache->gc_ngroup; i++) { if (statbuf.st_gid == (gid_t)pjob->ji_grpcache->gc_groups[i]) equal = TRUE; } if (equal == FALSE) { #ifdef RESETGROUP /* Change the group of the file to be the user's group */ if (chown(path, pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) { log_err(errno, __func__, "std file exists with the wrong group, someone is doing something fishy, cannot change file group"); goto reset_ids_fail; } #else log_err(-1, __func__, "std file exists with the wrong group, someone is doing something fishy"); goto reset_ids_fail; #endif } } } /* seems reasonably safe to append to the existing file */ /* file exists - do not need to create or open exclusive */ mode &= ~(O_EXCL | O_CREAT); } /* END if (lstat(path,&statbuf) == 0) */ else { /* lstat failed - should we return failure in all cases? */ if (errno == EINTR) { sprintf(log_buffer, "cannot stat stdout/stderr file '%s' (timeout)", path); if (LOGLEVEL >= 6) log_err(errno, __func__, log_buffer); /* fail on timeout */ goto reset_ids_timeout; } else { sprintf(log_buffer, "cannot stat stdout/stderr file '%s' - file does not exist, will create", path); if (LOGLEVEL >= 6) log_ext(errno, __func__, log_buffer, LOG_DEBUG); } } } /* END else (keeping) */ if (pjob->ji_wattr[JOB_ATR_umask].at_flags & ATR_VFLAG_SET) { old_umask = umask(pjob->ji_wattr[JOB_ATR_umask].at_val.at_long); } /* open file */ fds = open(path, mode, 0666); if (fds == -1) { /* errno can change in functions called between here and the if check below */ int local_errno = errno; sprintf(log_buffer, "cannot open/create stdout/stderr file '%s' (mode: %o, keeping: %s)", path, mode, (keeping == 0) ? "FALSE" : "TRUE"); log_err(local_errno, __func__, log_buffer); if (local_errno == ENOENT) { char *ptr; char tmpLine[MAXLINE]; int still_failed = TRUE; /* attempt to make the directory if it doesn't exist */ if (attempttomakedir == TRUE) { /* don't make the file a directory - make the last slash empty */ char *slash = path; while (strchr(slash + 1, '/') != NULL) slash = strchr(slash + 1, '/'); *slash = '\0'; mkdirtree(path,0755); /* undo the marking of the end of path as NULL */ *slash = '/'; if ((fds = open(path, mode, 0666)) != -1) { /* SUCCESS */ still_failed = FALSE; } } if (still_failed == TRUE) { /* parent directory does not exist - find out what part of subtree exists */ snprintf(tmpLine, sizeof(tmpLine), "%s", path); while ((ptr = strrchr(tmpLine, '/')) != NULL) { *ptr = '\0'; if (lstat(tmpLine, &statbuf) == 0) { /* lstat succeeded */ sprintf(log_buffer, "'%s' exists\n", tmpLine); break; } /* END if (lstat(tmpLine,&statbuf) == 0) */ else { /* lstat failed - should we return failure in all cases? */ if (errno == EINTR) sprintf(log_buffer, "cannot stat stdout/stderr file '%s' (timeout)\n", tmpLine); else sprintf(log_buffer, "cannot stat stdout/stderr file '%s' - file does not exist\n", tmpLine); } } /* END while ((ptr = strrchr(tmpLine,'/')) != NULL) */ } /* END if (still_failed == TRUE) */ } /* END if (errno == ENOENT) */ } /* END if (fds == -1) */ if (old_umask) { umask(old_umask); } if (changed_to_user) { rc = setuid_ext(pbsuser, TRUE); if (rc != 0) { snprintf(log_buffer,sizeof(log_buffer), "seteuid(%lu) failed, error: %s\n", (unsigned long)pbsuser, strerror(errno)); log_err(errno, __func__, log_buffer); } setegid(pbsgroup); } if (fds == -1) { /* FAILURE - cannot open file */ if (errno == EINTR) { /* TIMEOUT */ return(-2); } } if (LOGLEVEL >= 4) { if (fds >= 0) { sprintf(log_buffer, "successfully created/opened stdout/stderr file '%s'", path); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } } return(fds); reset_ids_fail: if (changed_to_user) { setuid_ext(pbsuser, TRUE); setegid(pbsgroup); } return(-1); reset_ids_timeout: if (changed_to_user) { setuid_ext(pbsuser, TRUE); setegid(pbsgroup); } return(-2); } /* END open_std_file() */ /* * find_env_slot - find if the environment variable is already in the table, * If so, replease existing one with new one. */ static int find_env_slot( struct var_table *ptbl, char *pstr) { int i; int len = 1; /* one extra for '=' */ for (i = 0;(*(pstr + i) != '\0') && (*(pstr + i) != '=');++i) ++len; for (i = 0;i < ptbl->v_used;++i) { if (strncmp(ptbl->v_envp[i], pstr, len) == 0) { return(i); } } /* END for (i) */ return(-1); } /* END find_env_slot() */ /* * bld_env_variables - build up the array of environment variables which are * passed to the job. * * Value may be null if total string (name=value) is included in "name". */ int bld_env_variables( struct var_table *vtable, /* I (modified) */ const char *name, /* I (required) */ const char *value) /* I (optional) */ { int amt; int i; int rc = PBSE_NONE; if (vtable->v_used == vtable->v_ensize) { /* no room for pointers to the strings */ if ((rc = expand_vtable(vtable)) == PBSE_NONE) { snprintf(log_buffer, sizeof(log_buffer), "Successfully expanded environment variables table"); log_ext(-1, __func__, log_buffer, LOG_INFO); } else { snprintf(log_buffer, sizeof(log_buffer), "Error in expanding environment variables table of pointers; err: %d", rc); log_err(-1, __func__, log_buffer); return rc; } } if ((name == NULL) || (name[0] == '\0')) { log_err(-1, "bld_env_variables", "invalid name passed"); if (LOGLEVEL >= 7) { log_err(-1, "bld_env_variables", "invalid name passed"); } return PBSE_BAD_PARAMETER; } if (LOGLEVEL >= 6) { char tmpLine[MAXLINE]; snprintf(tmpLine, sizeof(tmpLine), "building var '%s' (value: '%.64s')", name, (value != NULL) ? value : "NULL"); log_ext(-1, "bld_env_variables", tmpLine, LOG_DEBUG); } /* * We do not want the BATCH_PARTITION_ID to be passed down to the child. * It just needs to be checked for the job submitted by the user, not for * any jobs that the job might qsub. */ if (memcmp(name,"BATCH_PARTITION_ID",strlen("BATCH_PARTITION_ID")) == 0) { return(PBSE_NONE); } amt = strlen(name) + 1; if (value != NULL) amt += strlen(value) + 1; /* plus 1 for "=" */ if (amt > vtable->v_bsize) { /* no room for string */ if ((rc = expand_vtable(vtable)) == PBSE_NONE) { snprintf(log_buffer, sizeof(log_buffer), "Successfully expanded environment variables table"); log_ext(-1, __func__, log_buffer, LOG_INFO); } else { snprintf(log_buffer, sizeof(log_buffer), "Error in expanding environment variables table; err: %d", rc); log_err(-1, __func__, log_buffer); return rc; } } strcpy(vtable->v_block, name); if (value != NULL) { strcat(vtable->v_block, "="); strcat(vtable->v_block, value); } if ((i = find_env_slot(vtable, vtable->v_block)) < 0) { *(vtable->v_envp + vtable->v_used++) = vtable->v_block; } else { *(vtable->v_envp + i) = vtable->v_block; } vtable->v_block += amt; vtable->v_bsize -= amt; return PBSE_NONE; } /* END bld_env_variables() */ /* expand_vtable is called when either the array of character pointers in vtable was filled or the block of memory used to store the env. variables was full. While in this function, it checks to see if either one of the other does require the reallocation by checking its threshold */ int expand_vtable( struct var_table *vtable) { int expand_ensize = 0; /* boolean to check on array of pointers */ int expand_bsize = 0; /* boolean to check on the block of memory storage */ int amt = 0; struct var_table tmp_vtable; int rc = PBSE_NONE; if (vtable->v_ensize - vtable->v_used < EN_THRESHOLD) expand_ensize = 1; if (vtable->v_bsize < B_THRESHOLD) expand_bsize = 1; memset(&tmp_vtable, 0, sizeof(struct var_table)); if (expand_ensize) tmp_vtable.v_ensize = vtable->v_ensize + EN_THRESHOLD; else tmp_vtable.v_ensize = vtable->v_ensize; /* tmp holder for data copying */ tmp_vtable.v_envp = (char **)calloc(tmp_vtable.v_ensize, sizeof(char *)); if (!tmp_vtable.v_envp) { sprintf(log_buffer, "PBS: failed to allocate memory for v_envp: %s\n", strerror(errno)); log_err(errno, __func__, log_buffer); return -1; } tmp_vtable.v_used = vtable->v_used; if (expand_bsize) { amt = EXTRA_VARIABLE_SPACE + (vtable->v_block - vtable->v_block_start) + vtable->v_bsize; tmp_vtable.v_block_start = (char *)calloc(1, amt); tmp_vtable.v_block = tmp_vtable.v_block_start; tmp_vtable.v_bsize = amt; if (!tmp_vtable.v_block_start) { sprintf(log_buffer, "PBS: failed to allocate memory for v_bsize: %s\n", strerror(errno)); log_err(errno, __func__, log_buffer); return -1; } } if ((rc = copy_data(&tmp_vtable, vtable, expand_bsize, expand_ensize) != PBSE_NONE)) { if (tmp_vtable.v_block_start) free(tmp_vtable.v_block_start); if (tmp_vtable.v_envp) free(tmp_vtable.v_envp); } return(rc); } /* END expand_vtable() */ int copy_data( struct var_table *tmp_vtable, struct var_table *vtable, int expand_bsize, int expand_ensize) { char *p_next_block; int len_plus_one; int i; if (!expand_ensize && !expand_bsize ) return(PBSE_NONE); if (expand_ensize && (!expand_bsize)) { /* only the pointers have been expanded and therefore copy the existing values to the new storage of pointers */ for (i = 0; i < vtable->v_used; ++i) *(tmp_vtable->v_envp + i) = *(vtable->v_envp + i); /* free the old storage and assign the new one */ free(vtable->v_envp); vtable->v_envp = tmp_vtable->v_envp; vtable->v_ensize = tmp_vtable->v_ensize; } else if (expand_bsize) { /* block of memory that contains the actual env. variables was reallocated */ p_next_block = tmp_vtable->v_block_start; for (i = 0; i < vtable->v_used; ++i) { len_plus_one = strlen(*(vtable->v_envp + i)) + 1; /* following condition is reached only for a non-null terminated variable */ if (len_plus_one > tmp_vtable->v_bsize) { sprintf(log_buffer, "PBS: failed to copy env var, size: %d space left in buf: %d\n", len_plus_one, tmp_vtable->v_bsize); log_err(errno, __func__, log_buffer); return(-1); } strcpy(p_next_block, *(vtable->v_envp + i)); *(tmp_vtable->v_envp + i) = p_next_block; p_next_block += len_plus_one; tmp_vtable->v_bsize -= len_plus_one; } /*free the old memory block */ vtable->v_bsize = tmp_vtable->v_bsize; vtable->v_block = p_next_block; free(vtable->v_block_start); vtable->v_block_start = tmp_vtable->v_block_start; if (expand_ensize) { /* if the pointers to the env. variables were reallocated adjust vtable->envp and free the old storage of those pointers */ free(vtable->v_envp); vtable->v_envp = tmp_vtable->v_envp; vtable->v_ensize = tmp_vtable->v_ensize; } else { /* copy the new location. Note all memory that had been allocated to tmp_vtable will be freed in the routine where they've been allocated */ for (i = 0; i < vtable->v_used; ++i) *(vtable->v_envp + i) = *(tmp_vtable->v_envp + i); } } return(PBSE_NONE); } /* END copy_data() */ #ifndef __TOLDGROUP /* * init_groups - build the group list via an LDAP friendly method */ int init_groups( char *pwname, /* I User's name */ int pwgrp, /* I User's group from pw entry */ int groupsize,/* I size of the array, following argument */ int *groups) /* O ptr to group array, list build there */ { /* DJH Jan 2004. The original implementation looped over all groups looking for membership. Thats OK for /etc/groups, but thrashes LDAP if you're using that for groups in nsswitch.conf. Since there is an explicit LDAP backend to do initgroups (3) efficiently in nss_ldap (on Linux), lets use initgroups() to figure out the group membership. A little clunky, but not too ugly. */ /* return -1 on failure */ extern sigset_t allsigs; /* set up at the start of mom_main */ sigset_t savedset; int n; int nsaved; gid_t savedgroups[NGROUPS_MAX + 16]; /* plus one for the egid below */ gid_t momegid; int i; /* save current group access because we're about to overwrite it */ nsaved = getgroups(NGROUPS_MAX, savedgroups); if (nsaved < 0) { log_err(errno, __func__, "getgroups"); return(-1); } /* From the Linux man page: It is unspecified whether the effective group ID of the calling process is included in the returned list. (Thus, an application should also call getegid(2) and add or remove the resulting value.) */ momegid = getegid(); /* search for duplicates */ for (i = 0;i < nsaved;i++) { if (savedgroups[i] == momegid) break; } if (i >= nsaved) savedgroups[nsaved++] = momegid; if (pwgrp == 0) { /* Emulate the original init_groups() behaviour which treated gid==0 as a special case */ struct passwd *pwe = getpwnam_ext(pwname); if (pwe == NULL) { log_err(errno, __func__, "no such user"); return(-1); } pwgrp = pwe->pw_gid; } if (LOGLEVEL >= 4) { log_ext(-1, __func__, "pre-sigprocmask", LOG_DEBUG); } /* Block signals while we do this or else the signal handler might run with strange group access */ if (sigprocmask(SIG_BLOCK, &allsigs, &savedset) == -1) { log_err(errno, __func__, "sigprocmask(BLOCK)"); return(-1); } n = 0; if (initgroups(pwname, pwgrp) < 0) { log_err(errno, __func__, "initgroups"); n = -1; } else { n = getgroups(groupsize, (gid_t *)groups); } if (LOGLEVEL >= 4) { log_ext(-1, __func__, "post-initgroups", LOG_DEBUG); } /* restore state */ if (setgroups(nsaved, savedgroups) < 0) log_err(errno, __func__, "setgroups"); if (sigprocmask(SIG_SETMASK, &savedset, NULL) == -1) log_err(errno, __func__, "sigprocmask(SIG_SETMASK)"); return(n); } /* END init_groups() */ #else /* !__TOLDGROUP */ /* * init_groups - read the /etc/group file and build an array of * group memberships for user pwname. */ int init_groups( char *pwname, /* I User's name */ int pwgrp, /* I User's group from pw entry */ int groupsize, /* I size of the array, following argument */ int *groups) /* O ptr to group array, list build there */ { struct group *grp; int i; int n; n = 0; if (pwgrp != 0) *(groups + n++) = pwgrp; setgrent(); while ((grp = getgrent())) { if (grp->gr_gid == (gid_t)pwgrp) continue; for (i = 0;grp->gr_mem[i];i++) { if (!strcmp(grp->gr_mem[i], pwname)) { if (n == groupsize) { endgrent(); return(-1); } *(groups + n++) = grp->gr_gid; } } } /* END while (grp) */ endgrent(); return(n); } /* END init_groups() */ #endif /* !__TOLDGROUP */ /* * catchinter = catch death of writer child and/or shell child of interactive * When one dies, kill off the other; there is no mercy in this family. */ static void catchinter( int sig) /* I (not used) */ { int status; pid_t pid; pid = waitpid(-1, &status, WNOHANG); if (pid == 0) { return; } if (pid == writerpid) { kill(shellpid, SIGKILL); wait(&status); } else { kill(writerpid, SIGKILL); wait(&status); } mom_reader_go = 0; return; } /* END catchinter() */ /* * Look for a certain environment variable which has a port# which should * be opened on the MS to establish communication for one of the 3 stdio * streams. >=0 return is that valid fd, -1 means no env var found, * -2 means malformed env value or failure to connect. */ static int search_env_and_open( const char *envname, /* I */ u_long ipaddr) /* I */ { int i; int len; len = strlen(envname); for (i = 0;i < vtable.v_used;i++) { if (!strncmp(vtable.v_envp[i], envname, len)) { const char *cp = vtable.v_envp[i] + len; char *cq; int fd, port; if (*cp++ != '=') break; /* empty, ignore it */ port = strtol(cp, &cq, 10); if (*cq) { sprintf(log_buffer, "improper value for %s", envname); log_err(errno, __func__, log_buffer); return(-2); } if ((fd = open_demux(ipaddr, port)) < 0) { sprintf(log_buffer, "failed connect to stdio on %s:%d", vtable.v_envp[i], port); log_err(errno, __func__, log_buffer); return(-2); } return(fd); } } /* END for (i) */ /* not found */ return(-1); } /* END search_env_and_open() */ int TMomCheckJobChild( pjobexec_t *TJE, /* I */ int Timeout, /* I (in seconds) */ int *Count, /* O (bytes read) */ int *RC) /* O (return code/errno) */ { int i; fd_set fdset; int rc; int read_size = sizeof(struct startjob_rtn); struct timeval timeout; /* NOTE: assume if anything is on pipe, everything is on pipe (may reasult in hang) */ /* block up to timeout, wait for child to complete indicating success/failure of job launch */ /* read returns the session id or error */ timeout.tv_sec = Timeout; timeout.tv_usec = 0; errno = 0; FD_ZERO(&fdset); FD_SET(TJE->jsmpipe[0], &fdset); rc = select( TJE->jsmpipe[0] + 1, &fdset, (fd_set *)NULL, (fd_set *)NULL, &timeout); if (rc <= 0) { /* TIMEOUT - data not yet available */ return(FAILURE); } if (socket_avail_bytes_on_descriptor(TJE->jsmpipe[0]) < read_size) { return FAILURE; } for (;;) { i = read_ac_socket(TJE->jsmpipe[0], (char *) &TJE->sjr, read_size); if ((i == -1) && (errno == EINTR)) continue; break; } *RC = errno; *Count = i; if (LOGLEVEL >= 4) { log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, (TJE->pjob != NULL) ? ((job *)TJE->pjob)->ji_qs.ji_jobid : "???", "task/session info loaded"); } return(SUCCESS); } /* END TMomCheckJobChild() */ #ifdef USEJOBCREATE /* * Get a job_id from the system. Return job id or -1 if error */ uint64_t get_jobid( char* pbs_jobid) { uint64_t job_id; #ifndef JOBFAKE job_id = job_create(0, getuid(), 0); #else static long fakejobid = 0x00000000ffffffff; job_id = fakejobid + rand(); #endif /* JOBFAKE */ if (job_id == JOB_FAIL) { if (LOGLEVEL >= 3) { sprintf(log_buffer, "Failed to get system job id for pbs job %s", pbs_jobid); log_err(-1, __func__, log_buffer); } return(job_id); } if (LOGLEVEL >= 7) { sprintf(log_buffer, "Got system job id = %lx(%ld) for pbs job %s", job_id, job_id, pbs_jobid); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } return(job_id); } /* END get_jobid() */ #endif /* USEJOBCREATE */ #ifdef ENABLE_CSA /* * Check to see if CSA in installed or turned on. Return 1 if it is good else 0 */ int check_csa_status( enum csa_chk_cmd chk_action) { #ifndef CSAFAKE #endif /* CSAFAKE */ int csa_stat = 0; #ifndef CSAFAKE struct csa_check_req ck_req; ck_req.ck_stat.am_id = (chk_action == IS_INSTALLED) ? ACCT_KERN_CSA : ACCT_DMD_WKMG; ck_req.ck_stat.am_status = ACS_OFF; ck_req.ck_stat.am_param = 0; if (csa_check(&ck_req) != 0) { if (errno != ENOSYS) { sprintf(log_buffer, "check_csa_status errno = %d\n", errno); log_err(-1, __func__, log_buffer); } return 0; } if (chk_action == IS_INSTALLED) { /* since the call to csa_check was successful, we know csa is installed */ csa_stat = 1; } else { csa_stat = (ck_req.ck_stat.am_status != ACS_ON) ? 0 : 1; } #else csa_stat = 1; #endif /* CSAFAKE */ return(csa_stat); } /* * createWLMRec() Create Workload Management Record * Inputs: job_id - job ID * type - start or termination record * subtype - Subtype of record * prid - project ID * ash - Array session handle * code - account ID * reqid - Request ID * compCode - Request completion code * calls: csa_wracct() * Return 1 if it is good, otherwise 0 * */ int create_WLM_Rec( char *pbs_jobid, uint64_t job_id, int type, int subtype, uint64_t prid, uint64_t ash, int64_t compCode, int64_t reqid) { #ifndef CSAFAKE struct csa_wra_req cw; struct wkmgmtbs wkm; #endif /* CSAFAKE */ char rec_type[8]; char sub_type[8]; /* * First, create an workload management record */ if (type == WM_INIT) { strcpy(rec_type, "init"); if (subtype == WM_INIT_START) { strcpy(sub_type, "start"); } else if (subtype == WM_INIT_RESTART) { strcpy(sub_type, "restart"); } else if (subtype == WM_INIT_RERUN) { strcpy(sub_type, "rerun"); } else { sprintf(log_buffer, "WM_INIT bad sub type = %d for pbs job %s", subtype, pbs_jobid); log_err(-1, __func__, log_buffer); return 0; } } else if (type == WM_TERM) { strcpy(rec_type, "term"); if (subtype == WM_TERM_EXIT) { strcpy(sub_type, "exited"); } else if (subtype == WM_TERM_REQUEUE) { strcpy(sub_type, "requeue"); } else if (subtype == WM_TERM_HOLD) { strcpy(sub_type, "hold"); } else if (subtype == WM_TERM_RERUN) { strcpy(sub_type, "rerun"); } else if (subtype == WM_TERM_MIGRATE) { strcpy(sub_type, "migrate"); } else { sprintf(log_buffer, "WM_TERM bad sub type = %d for pbs job %s", subtype, pbs_jobid); log_err(-1, __func__, log_buffer); return 0; } } else if (type == WM_RECV) { strcpy(rec_type, "recv"); if (subtype == WM_RECV_NEW) { strcpy(sub_type, "new"); } else { sprintf(log_buffer, "WM_RECV bad sub type = %d for pbs job %s", subtype, pbs_jobid); log_err(-1, __func__, log_buffer); return 0; } } else { sprintf(log_buffer, "bad record type = %d for pbs job %s", type, pbs_jobid); log_err(-1, __func__, log_buffer); return 0; } #ifdef CSAFAKE if (LOGLEVEL >= 7) { sprintf(log_buffer, "Creating FAKE CSA workload management %s - %s record for: " "job_id = %lx, compCode = %d, pbs job %s", (char*)&rec_type, (char*)&sub_type, job_id, (int)compCode, pbs_jobid); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } #else if (LOGLEVEL >= 7) { sprintf(log_buffer, "Creating CSA workload management %s - %s record for: " "job_id = %llx, compCode = %d, pbs job %s", &rec_type, &sub_type, job_id, compCode, pbs_jobid); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } memset(&wkm, 0, sizeof(wkm)); /* fill in header data */ wkm.hdr.ah_magic = ACCT_MAGIC; wkm.hdr.ah_revision = REV_APP; wkm.hdr.ah_type = ACCT_DAEMON_WKMG; if (!getuid() || ! geteuid()) wkm.hdr.ah_flag |= ASU; /* set super user flag */ wkm.hdr.ah_size = sizeof(struct wkmgmtbs); /* fill in rest of needed data */ wkm.type = type; wkm.subtype = subtype; wkm.arrayid = 0; snprintf(wkm.serv_provider, sizeof(wkm.serv_provider), "TORQUE "); if ((wkm.time = time(NULL)) == (time_t) - 1) { sprintf(log_buffer, "error setting time, errno = %d - %s pbs job = %s", errno, strerror(errno), pbs_jobid); log_err(-1, __func__, log_buffer); return 0; } if ((wkm.enter_time = time(NULL)) == (time_t) - 1) { sprintf(log_buffer, "error setting INIT enter time, errno = %d - %s pbs job = %s", errno, strerror(errno), pbs_jobid); log_err(-1, __func__, log_buffer); return 0; } wkm.uid = getuid(); wkm.prid = prid; wkm.ash = ash; wkm.jid = job_id; /* * character fields need to be NULL terminated */ snprintf(wkm.machname, sizeof(wkm.machname), "TORQUE-MACHINE"); snprintf(wkm.reqname, sizeof(wkm.reqname), "TORQUE-REQUEST"); snprintf(wkm.quename, sizeof(wkm.quename), "TORQUE-QUEUE"); wkm.reqid = reqid; if (type == WM_TERM) { wkm.code = compCode; } /* * Fill in the CSA_WRACCT request structure for csa ioctl call */ memset(&cw, 0, sizeof(cw)); cw.wra_did = ACCT_DMD_WKMG; /* daemon ID */ cw.wra_len = sizeof(struct wkmgmtbs); /* length of app record */ cw.wra_jid = job_id; /* job Id from job create */ cw.wra_buf = (char *) & wkm; /* pointer to record */ if (csa_wracct(&cw)) { /* EINVAL is okay for a WM_TERM */ if ((type != WM_TERM) && (errno != EINVAL)) { sprintf(log_buffer, "error writing wkm %s record, errno=%d - %s pbs job = %s", &rec_type, errno, strerror(errno), pbs_jobid); log_err(-1, __func__, log_buffer); return 0; } } #endif /* CSAFAKE */ return 1; } /* * add_wkm_start - Add an start workload management record for a job */ void add_wkm_start( uint64_t job_id, char* pbs_jobid) { if (check_csa_status(IS_UP)) { /* check if we have a valid job id, if not just return */ if (job_id == JOB_FAIL) { if (LOGLEVEL >= 2) { sprintf(log_buffer, "%s called with bad job id = %lx for pbs job %s", __func__, job_id, pbs_jobid); log_err(-1, __func__, log_buffer); } return; } /* Add a workload management received record before the start */ if (create_WLM_Rec(pbs_jobid, job_id, WM_RECV, WM_RECV_NEW, 0, 0, 0, 0)) { if (LOGLEVEL >= 7) { sprintf(log_buffer, "Added CSA workload management WM_RECV for job id = %lx for pbs job %s", job_id, pbs_jobid); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } } else { if (LOGLEVEL >= 2) { sprintf(log_buffer, "Failed to add CSA workload management WM_RECV for job id = %lx for pbs job %s", job_id, pbs_jobid); log_err(-1, __func__, log_buffer); } return; } if (create_WLM_Rec(pbs_jobid, job_id, WM_INIT, WM_INIT_START, 0, 0, 0, 0)) { if (LOGLEVEL >= 7) { sprintf(log_buffer, "Added CSA workload management WM_INIT for job id = %lx for pbs job %s", job_id, pbs_jobid); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } } else if (LOGLEVEL >= 2) { sprintf(log_buffer, "Failed to add CSA workload management WM_INIT for job id = %lx for pbs job %s", job_id, pbs_jobid); log_err(-1, __func__, log_buffer); } } /* END if (check_csa_status(IS_UP)) */ return; } /* END add_wkm_start() */ /* * add_wkm_end - Add an end workload management record for a job */ void add_wkm_end( uint64_t job_id, int64_t comp_code, char *pbs_jobid) { if (check_csa_status(IS_UP)) { /* check if we have a valid job id, if not just return */ if (job_id == JOB_FAIL) { if (LOGLEVEL >= 2) { sprintf(log_buffer, "%s called with bad job id = %lx for pbs job %s", __func__, job_id, pbs_jobid); log_err(-1, __func__, log_buffer); } return; } if (create_WLM_Rec(pbs_jobid, job_id, WM_TERM, WM_TERM_EXIT, 0, 0, comp_code, 0)) { if (LOGLEVEL >= 7) { sprintf(log_buffer, "Added CSA workload management WM_TERM for job id = %lx for pbs job %s", job_id, pbs_jobid); log_ext(-1, __func__, log_buffer, LOG_DEBUG); } } else if (LOGLEVEL >= 2) { sprintf(log_buffer, "Failed to add CSA workload management WM_TERM for job id = %lx for pbs job %s", job_id, pbs_jobid); log_err(-1, __func__, log_buffer); } } return; } /* END add_wkm_end() */ #endif /* ENABLE_CSA */ /* * @param pjob - used to set up the user's environment if desired * @param path_in - the path that will be expanded * @param pathlen - viable space in path * @param path - where to save the new path */ int expand_path( job *pjob, /* I optional */ char *path_in, /* I */ int pathlen, /* I */ char *path) /* I */ { #ifndef HAVE_WORDEXP /* no need for expansion if this isn't defined */ return(SUCCESS); #else char **environ_old = environ; wordexp_t exp; if ((path_in == NULL) || (path == NULL)) { /* must have inputs and outputs */ return(FAILURE); } if (vtable.v_envp == NULL) { if (pjob != NULL) { InitUserEnv(pjob,NULL,NULL,NULL,NULL); *(vtable.v_envp + vtable.v_used) = NULL; environ = vtable.v_envp; } else { /* if there is no environment, there's no way to expand words */ path = path_in; return(SUCCESS); } } else { *(vtable.v_envp + vtable.v_used) = NULL; environ = vtable.v_envp; } /* expand the path */ switch (wordexp(path_in, &exp, WRDE_NOCMD | WRDE_UNDEF)) { case 0: /* success - allow if word count is 1 */ if (exp.we_wordc == 1) { snprintf(path,pathlen,"%s",exp.we_wordv[0]); wordfree(&exp); environ = environ_old; return(SUCCESS); } /* fall through */ case WRDE_NOSPACE: wordfree(&exp); /* fall through */ default: environ = environ_old; break; } /* END switch () */ /* not reached */ environ = environ_old; return(FAILURE); #endif /* HAVE_WORD_EXP */ } /* END expand_path() */ /* exec_job_on_ms starts the execution of a job on the mother superior node */ int exec_job_on_ms( job *pjob) { pjobexec_t *TJE; int Count; int RC; int SC; char log_buffer[LOG_BUF_SIZE]; TMOMJobGetStartInfo(NULL, &TJE); if (TMomFinalizeJob1(pjob, TJE, &SC) == FAILURE) { /* FAILURE (or at least do not continue) */ if (SC != 0) { memset(TJE, 0, sizeof(pjobexec_t)); sprintf(log_buffer, "job %s failed after TMomFinalizeJob1", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); exec_bail(pjob, SC); } return(SC); } /* TMomFinalizeJob2() blocks until job is fully launched */ if (TMomFinalizeJob2(TJE, &SC) == FAILURE) { if (SC != 0) { memset(TJE, 0, sizeof(pjobexec_t)); sprintf(log_buffer, "job %s failed after TMomFinalizeJob2", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); exec_bail(pjob, SC); } return(SC); } /* block, wait for child to complete indicating success/failure of job launch */ if (TMomCheckJobChild(TJE, TJobStartBlockTime, &Count, &RC) == FAILURE) { if (LOGLEVEL >= 3) { sprintf(log_buffer, "job not ready after %ld second timeout, MOM will check later", TJobStartBlockTime); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } return(SC); } /* NOTE: TMomFinalizeJob3() populates SC */ if (TMomFinalizeJob3(TJE, Count, RC, &SC) == FAILURE) { sprintf(log_buffer, "ALERT: job failed phase 3 start - jobid %s", pjob->ji_qs.ji_jobid); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); memset(TJE, 0, sizeof(pjobexec_t)); exec_bail(pjob, SC); return(SC); } /* SUCCESS: MOM returns */ memset(TJE, 0, sizeof(pjobexec_t)); if (LOGLEVEL >= 3) { sprintf(log_buffer, "%s:job successfully started", __func__); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } return(PBSE_NONE); } /* END exec_job_on_ms() */ int allocate_demux_sockets( job *pjob, int flag) { int i; int ret; int socks[2]; int ports[2]; struct sockaddr_in saddr; torque_socklen_t slen; char log_buffer[LOG_BUF_SIZE]; /* Open two sockets for use by demux program later. */ for (i = 0;i < 2;i++) socks[i] = -1; for (i = 0;i < 2;i++) { if ((socks[i] = socket(AF_INET, SOCK_STREAM, 0)) == -1) break; memset(&saddr, '\0', sizeof(saddr)); saddr.sin_addr.s_addr = INADDR_ANY; saddr.sin_family = AF_INET; if (bind( socks[i], (struct sockaddr *)&saddr, sizeof(saddr)) == -1) break; slen = sizeof(saddr); if (getsockname(socks[i], (struct sockaddr *)&saddr, &slen) == -1) break; ports[i] = (int)ntohs(saddr.sin_port); } /* END for (i) */ if (i < 2) { /* ERROR: cannot open sockets for stdout and stderr */ for (i = 0;i < 2;i++) { if (socks[i] != -1) close(socks[i]); } /* command sisters to abort job and continue */ log_err(errno, __func__, "stdout/err socket"); sprintf(log_buffer, "failed to open stdout/stderr - job id %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); exec_bail(pjob, JOB_EXEC_FAIL1); ret = PBSE_CANTOPENSOCKET; goto done; } if (flag == MOTHER_SUPERIOR) { pjob->ji_stdout = socks[0]; pjob->ji_stderr = socks[1]; pjob->ji_portout = ports[0]; pjob->ji_porterr = ports[1]; } else { pjob->ji_im_stdout = socks[0]; pjob->ji_im_stderr = socks[1]; pjob->ji_im_portout = ports[0]; pjob->ji_im_porterr = ports[1]; } if (LOGLEVEL >= 3) { sprintf(log_buffer, "%s: stdout: %d:%d stderr: %d:%d", __func__, socks[0], ports[0], socks[1], ports[1]); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } ret = PBSE_NONE; done: return(ret); } /* END allocate_demux_sockets() */ /* END start_exec.c */