/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ #include /* the master config generated by configure */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "dis.h" #include "libpbs.h" #include "pbs_error.h" #include "server_limits.h" #include "list_link.h" #include "credential.h" #include "attribute.h" #include "resource.h" #include "pbs_job.h" #include "batch_request.h" #include "mom_mach.h" #include "mom_func.h" #include "log.h" #include "rpp.h" #include "resmon.h" #include "net_connect.h" #include "utils.h" #ifdef _CRAY #include #endif #ifdef HAVE_WORDEXP #include extern struct var_table vtable; /* see start_exec.c */ extern char **environ; extern int InitUserEnv( job *pjob, /* I */ task *ptask, /* I (optional) */ char **envp, /* I (optional) */ struct passwd *pwdp, /* I (optional) */ char *shell); /* I (optional) */ extern int mkdirtree( char *dirpath, /* I */ mode_t mode); extern int TTmpDirName(job*, char *); #endif /* HAVE_WORDEXP */ #ifdef NVIDIA_GPUS int setgpumode(char *, int); int resetgpuecc(char *, int, int); extern void mom_server_all_update_gpustat(void); #endif /* NVIDIA_GPUS */ /* External Global Data Items */ extern unsigned int alarm_time; extern unsigned int default_server_port; extern int exiting_tasks; extern tlist_head svr_alljobs; extern char mom_host[]; extern char *msg_err_unlink; extern char *path_spool; extern char *path_undeliv; extern attribute_def job_attr_def[]; extern char *msg_jobmod; extern char *msg_manager; extern time_t time_now; extern int resc_access_perm; /* see encode_resc() */ extern int spoolasfinalname; #ifdef NVIDIA_GPUS extern int use_nvidia_gpu; #endif /* NVIDIA_GPUS */ /* in attr_fn_resc.c */ extern char MOMUNameMissing[]; extern int pbs_rm_port; extern char rcp_path[]; extern char rcp_args[]; extern char *TNoSpoolDirList[]; extern char path_checkpoint[]; /* Local Data Items */ static uid_t useruid; static gid_t usergid; static int ngroup; static int *groups; #if NO_SPOOL_OUTPUT == 0 static char *output_retained = "Output retained on that host in: "; #endif /* !NO_SPOOL_OUTPUT */ static char rcperr[MAXPATHLEN]; /* file to contain rcp error */ extern char PBSNodeMsgBuf[1024]; extern int LOGLEVEL; extern int im_compose(int, char *, char *, int, tm_event_t, tm_task_id); extern int mom_open_socket_to_jobs_server(job *, char *, void (*)(int)); extern int TMOMJobGetStartInfo(job *, pjobexec_t **) ; /* prototypes */ char *get_job_envvar(job *, char *); int replace_checkpoint_path(char *); int in_remote_checkpoint_dir(char *); /* loaded in mom_mach.h */ /* END prototypes */ char *get_job_envvar( job *pjob, /* I */ char *variable) /* I */ { char *pc; if (pjob == NULL) { return(NULL); } pc = arst_string( variable, &pjob->ji_wattr[(int)JOB_ATR_variables]); if (pc != NULL) { if ((pc = strchr(pc, (int)'=')) != NULL) pc++; } return(pc); } /* END get_job_envvar() */ /* * fork_to_user - fork mom and go to user's home directory * also sets up the global useruid and usergid in the child * * WARNING: valid only if called when preq points to a cpyfiles structure */ static pid_t fork_to_user( struct batch_request *preq, /* I */ int SetUID, /* I (boolean) */ char *HDir, /* O (job/user home directory) */ char *EMsg) /* I (optional,minsize=1024) */ { char *id = "fork_to_user"; struct group *grpp; pid_t pid; job *pjob; struct passwd *pwdp; static int fgrp[NGROUPS_MAX]; char *idir; char *hdir; struct stat sb; /* initialize */ if (EMsg != NULL) EMsg[0] = '\0'; if ((pjob = find_job(preq->rq_ind.rq_cpyfile.rq_jobid)) && (pjob->ji_grpcache != 0) && (preq->rq_ind.rq_cpyfile.rq_dir != CKPT_DIR_IN) && (preq->rq_ind.rq_cpyfile.rq_dir != CKPT_DIR_OUT)) { /* use information cached in the job structure */ useruid = pjob->ji_qs.ji_un.ji_momt.ji_exuid; usergid = pjob->ji_qs.ji_un.ji_momt.ji_exgid; ngroup = pjob->ji_grpcache->gc_ngroup; groups = pjob->ji_grpcache->gc_groups; if ((idir = get_job_envvar(pjob, "PBS_O_INITDIR")) != NULL) { hdir = idir; } else { hdir = pjob->ji_grpcache->gc_homedir; } } else { if ((pwdp = getpwnam_ext(preq->rq_ind.rq_cpyfile.rq_user)) == NULL) { if (MOMUNameMissing[0] == '\0') strncpy(MOMUNameMissing, preq->rq_ind.rq_cpyfile.rq_user, 64); sprintf(log_buffer, "cannot find user '%s' in password file", preq->rq_ind.rq_cpyfile.rq_user); if (EMsg != NULL) strncpy(EMsg, log_buffer, 1024); log_err(errno, id, log_buffer); return(-PBSE_BADUSER); } useruid = pwdp->pw_uid; if (preq->rq_ind.rq_cpyfile.rq_group[0] == '\0') { usergid = pwdp->pw_gid; /* default to login group */ } else if ((grpp = getgrnam(preq->rq_ind.rq_cpyfile.rq_group)) != NULL) { usergid = grpp->gr_gid; } else { sprintf(log_buffer, "cannot find group '%s' for user '%s' in password file", preq->rq_ind.rq_cpyfile.rq_group, preq->rq_ind.rq_cpyfile.rq_user); if (EMsg != NULL) strncpy(EMsg, log_buffer, 1024); log_err(errno, id, log_buffer); return(-PBSE_BADUSER); } ngroup = init_groups(pwdp->pw_name, usergid, NGROUPS_MAX, fgrp); if (ngroup < 0) ngroup = 0; groups = fgrp; /* determine user`s home directory */ if ((pjob != NULL) && ((idir = get_job_envvar(pjob, "PBS_O_INITDIR")) != NULL)) { hdir = idir; } else { hdir = pwdp->pw_dir; } } /* END if ((pjob = find_job(preq->rq_ind.rq_cpyfile.rq_jobid)) && ...) */ if (hdir == NULL) { /* FAILURE */ log_err(PBSE_UNKRESC, id, "cannot determine home directory"); if (EMsg != NULL) strncpy(EMsg, "cannot determine home directory", 1024); return(-PBSE_UNKRESC); } /* check user home directory as root and log failures */ /* NOTE: root may not be able to see user home directory due to root_squash permissions but routine will test and log what is detected. Some issues will be logged by the parent but TORQUE will only fail if the problems persist in the child after the setuid() call */ if (stat(hdir, &sb) != 0) { sprintf(log_buffer, "invalid home directory '%s' specified, errno=%d (%s)", hdir, errno, strerror(errno)); if (LOGLEVEL >= 2) { log_err(errno, id, log_buffer); } if (EMsg != NULL) strncpy(EMsg, log_buffer, 1024); /* NOTE: warn only, root may not be able to stat directory */ /* return(-PBSE_UNKRESC); */ } else if (!S_ISDIR(sb.st_mode)) { sprintf(log_buffer, "invalid home directory '%s' specified, not a directory", hdir); log_err(PBSE_UNKRESC, id, log_buffer); if (EMsg != NULL) strncpy(EMsg, log_buffer, 1024); return(-PBSE_UNKRESC); } if (LOGLEVEL >= 1) { sprintf(log_buffer, "forking to user, uid: %ld gid: %ld homedir: '%s'", (long)useruid, (long)usergid, hdir); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pjob != NULL) ? pjob->ji_qs.ji_jobid : "N/A", log_buffer); } if (HDir != NULL) strcpy(HDir, hdir); pid = fork_me(preq->rq_conn); if (pid < 0) { /* fork failed */ sprintf(log_buffer, "forked failed, errno=%d (%s)", errno, strerror(errno)); log_err(-1, id, log_buffer); if (EMsg != NULL) strncpy(EMsg, log_buffer, 1024); return(-PBSE_SYSTEM); } if (pid > 0) { /* parent - note leave connection open */ free_br(preq); return(pid); } /* the child */ /* NOTE: writing to log as child may cause corruption */ #ifdef _CRAY if ((pjob = find_job(preq->rq_ind.rq_cpyfile.rq_jobid)) && (pjob->ji_grpcache != 0)) { /* set account id */ if (pjob->ji_wattr[(int)JOB_ATR_account].at_flags & ATR_VFLAG_SET) { acctid(0, nam2acid(pjob->ji_wattr[(int)JOB_ATR_account].at_val.at_str)); } } #endif /* _CRAY */ /* NOTE: only chdir now if SetUID is TRUE */ if (SetUID == TRUE) { if (setgroups(ngroup,(gid_t *)groups) != 0 || setgid(usergid) != 0) { /* unable to set user groups */ return(-PBSE_BADGRP); } if (setuid_ext(useruid, FALSE) == -1) { /* cannot run as the user */ return(-PBSE_BADUSER); } if (chdir(hdir) == -1) { /* cannot change directory to user home dir (or 'INITDIR' if specified) */ return(-PBSE_UNKRESC); } } #ifdef HAVE_WORDEXP { /* set some useful env variables */ char *envstr; envstr = malloc((strlen("HOME=") + strlen(hdir) + 1) * sizeof(char)); if (envstr == NULL) { sprintf(log_buffer, "malloc failed, errno=%d (%s)", errno, strerror(errno)); log_err(-1, id, log_buffer); if (EMsg != NULL) strncpy(EMsg, log_buffer, 1024); return(-PBSE_SYSTEM); } sprintf(envstr, "HOME=%s", hdir); putenv(envstr); envstr = malloc((strlen("PBS_JOBID=") + strlen(preq->rq_ind.rq_cpyfile.rq_jobid) + 1) * sizeof(char)); if (envstr == NULL) { sprintf(log_buffer, "malloc failed, errno=%d (%s)", errno, strerror(errno)); log_err(-1, id, log_buffer); if (EMsg != NULL) strncpy(EMsg, log_buffer, 1024); return(-PBSE_SYSTEM); } sprintf(envstr, "PBS_JOBID=%s", preq->rq_ind.rq_cpyfile.rq_jobid); putenv(envstr); } #endif /* END HAVE_WORDEXP */ return(pid); } /* END fork_to_user() */ /* * add_bad_list - add bad file message to bad file list */ static void add_bad_list( char **pbl, char *newtext, int nl) { int needed = 0; char *pnew; if (*pbl != NULL) { needed += strlen(*pbl) + strlen(newtext) + nl + 1; pnew = realloc(*pbl, needed); } else { needed += strlen(newtext) + nl + 1; pnew = malloc(needed); if (pnew != NULL) *pnew = '\0'; } if (pnew == NULL) { /* cannot allocate memory - FAILURE */ return; } *pbl = pnew; while (nl--) /* prefix new-lines */ strcat(*pbl, "\n"); strcat(*pbl, newtext); return; } /* END add_bad_list() */ #define RT_BLK_SZ 4096 /* return 0 on failure */ static int return_file( job *pjob, enum job_file which, int sock, int remove_file) { int amt; char buf[RT_BLK_SZ]; int fds; char *filename; struct batch_request *prq; int rc = 0; int seq = 0; filename = std_file_name(pjob, which, &amt); /* amt is place holder */ /* We need to check for NULL which may be returned */ if (filename == NULL) { return(-1); } if (strcmp(filename, "/dev/null") == 0) { return(0); } fds = open(filename, O_RDONLY, 0); if (fds < 0) { return(0); } prq = alloc_br(PBS_BATCH_MvJobFile); if (prq == NULL) { /* no memory */ return(PBSE_SYSTEM); } strcpy(prq->rq_host, mom_host); strcpy(prq->rq_ind.rq_jobfile.rq_jobid, pjob->ji_qs.ji_jobid); while ((amt = read(fds, buf, RT_BLK_SZ)) > 0) { /* prq->rq_ind.rq_jobfile.rq_sequence = seq++; */ /* prq->rq_ind.rq_jobfile.rq_type = (int)which; */ /* prq->rq_ind.rq_jobfile.rq_size = amt; */ /* prq->rq_ind.rq_jobfile.rq_data = buf; */ DIS_tcp_setup(sock); if ((rc = encode_DIS_ReqHdr(sock, PBS_BATCH_MvJobFile, pbs_current_user)) || (rc = encode_DIS_JobFile(sock, seq++, buf, amt, pjob->ji_qs.ji_jobid, which)) || (rc = encode_DIS_ReqExtend(sock, NULL))) { break; } DIS_tcp_wflush(sock); if ((DIS_reply_read(sock, &prq->rq_reply) != 0) || (prq->rq_reply.brp_code != 0)) { close(fds); rc = -1; break; } } /* END while ((amt = read()) */ free_br(prq); close(fds); if (remove_file == TRUE && rc == 0) job_unlink_file(pjob, filename); return(rc); } /* END return_file() */ /* * wchost_match - wild card host name match * * return 1 if can"idate" matches master name * 0 if not a match * master name may be wild carded at beginning */ static int wchost_match( const char *can, /* I candidate */ const char *master) /* I */ { const char *pc; const char *pm; /* FORMAT: master = [*], candidate = */ /* start at end and work backwards */ pc = can + strlen(can) - 1; pm = master + strlen(master) - 1; while ((pc > can) && (pm > master)) { if (*pc != *pm) { /* no match */ return(0); } pc--; pm--; } /* comparison of one or both reached the start of the string */ if (pm == master) { if (*pm == '*') { /* reached master wild card, all candidates match */ return(1); } if ((pc == can) && (*pc == *pm)) { /* start of candidate reached, always match */ /* NOTE: this seems wrong (CRI) ie, can=dog.com master=*.bigdog.com */ return(1); } } /* no match */ return(0); } /* END wchost_match() */ /* Determine which local path to copy to if specified using $usecp */ static int told_to_cp( char *host, /* I */ char *oldpath, /* I */ char **newpath) /* O */ { int i; int nh; const char *id = "told_to_cp"; static char newp[MAXPATHLEN + 1]; char linkpath[MAXPATHLEN + 1]; int max_links; extern struct cphosts *pcphosts; for (max_links = 16;max_links > 0;max_links--) { for (nh = 0;nh < cphosts_num;nh++) { if (wchost_match(host, pcphosts[nh].cph_hosts)) { if (LOGLEVEL >= 5) { sprintf(log_buffer, "host '%s' pcphosts[%d].cph_hosts: %s", host, nh, pcphosts[nh].cph_hosts); log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, (char *)id, log_buffer); } i = strlen(pcphosts[nh].cph_from); if (strncmp(pcphosts[nh].cph_from, oldpath, i) == 0) { int nchars, link_size; nchars = snprintf(newp, sizeof(newp), "%s%s", pcphosts[nh].cph_to, oldpath + i); if (nchars >= (int)sizeof(newp)) { snprintf(log_buffer, sizeof(log_buffer), "too long string when transforming path '%s' to '%s%s'\n", oldpath, pcphosts[nh].cph_to, oldpath + i); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, (char *)id, log_buffer); return(0); } link_size = readlink((const char *)newp, linkpath, sizeof(linkpath) - 1); if (link_size == -1) { /* * Catching only too many symbolic links, bad buffer * location and insufficient kernel memory cases. */ if (errno == ELOOP || errno == EFAULT || errno == ENOMEM) { snprintf(log_buffer, sizeof(log_buffer), "translation of symbolic link '%s' failed: %s\n", newp, strerror(errno)); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, (char *)id, log_buffer); return(0); } /* * We're done. All other errors (if any) will be * reported in the respective routines. * * At least ENOENT and EINVAL are good error codes: * they correspond to non-existent object or to * object that is not a symbolic link. */ else { *newpath = newp; /* success */ return(1); } } else { linkpath[link_size] = '\0'; { snprintf(log_buffer, sizeof(log_buffer), "translated symbolic link '%s:%s' to '%s:%s'; " "restarting $usecp search\n", host, newp, host, linkpath); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, (char *)id, log_buffer); } oldpath = linkpath; } break; } } if (LOGLEVEL >= 5) { sprintf(log_buffer, "host '%s' and path '%s' does not match usecp[%d] (host '%s' path '%s')\n", host, oldpath, nh, (pcphosts + nh)->cph_hosts, (pcphosts + nh)->cph_from); log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, (char *)id, log_buffer); } } /* END for (nh) */ } /* END for (max_links) */ /* failure */ if (LOGLEVEL >= 3) { sprintf(log_buffer, "no local path matches host '%s' and path '%s' (%d paths checked)\n", host, oldpath, nh); log_ext(-1, (char *)id, log_buffer, LOG_DEBUG); } return(0); } /* END told_to_cp() */ /* * local_or_remote() - is the specified path to a local or remote file * checks to see if there is a hostname which matches this host * * returns: 1 if remote and 0 if local * also updates the path pointer to just the path name if local */ static int local_or_remote( char **path) /* I */ { int len; char *pcolon; pcolon = strchr(*path, (int)':'); if (pcolon == NULL) { /* local file */ return(0); } *pcolon = '\0'; len = strlen(*path); if ((strncmp("localhost", *path, 9) == 0) || ((strncmp(mom_host, *path, len) == 0) && ((mom_host[len] == '\0') || (mom_host[len] == '.')))) { /* we have a host match, file is local */ *pcolon = ':'; *path = pcolon + 1; /* local file */ return(0); } else if (told_to_cp(*path, pcolon + 1, path)) { /* path updated in told_to_cp() */ /* local file */ return(0); } /* remote file */ *pcolon = ':'; return(1); } /* END local_or_remote() */ /* * is_file_same() - are two paths pointing to the same file * returns: 1 if are the same * 0 if not the same (or cannot tell) */ static int is_file_same( char *file1, char *file2) { #if defined(HAVE_STRUCT_STAT64) && defined(HAVE_STAT64) && defined(LARGEFILE_WORKS) struct stat64 sb1, sb2; #else struct stat sb1, sb2; #endif #if defined(HAVE_STRUCT_STAT64) && defined(HAVE_STAT64) && defined(LARGEFILE_WORKS) if ((stat64(file1, &sb1) == 0) && (stat64(file2, &sb2) == 0)) #else if ((stat(file1, &sb1) == 0) && (stat(file2, &sb2) == 0)) #endif { if (!memcmp(&sb1.st_dev, &sb2.st_dev, sizeof(dev_t)) && !memcmp(&sb1.st_ino, &sb2.st_ino, sizeof(ino_t))) { /* files are same */ return(1); } } return(0); } /* End of is_file_same() */ /* * is_file_going_to_dir() - is file going to destination directory * returns: 1 if file is going to directory * 0 if not going to the directory or destiation is not a directory (or cannot tell) */ static int is_file_going_to_dir( char *file, char *destdir) { char *id = "is_file_going_to_dir"; #if defined(HAVE_STRUCT_STAT64) && defined(HAVE_STAT64) && defined(LARGEFILE_WORKS) struct stat64 sb1; #else struct stat sb1; #endif #if defined(HAVE_STRUCT_STAT64) && defined(HAVE_STAT64) && defined(LARGEFILE_WORKS) if (stat64(destdir, &sb1) == 0) #else if (stat(destdir, &sb1) == 0) #endif { char *ptr1; char filename[MAXPATHLEN+1]; int complen = 0; /* Make sure the destination is a directory */ if (!S_ISDIR(sb1.st_mode)) { /* destination is not a directory */ return(0); } strcpy(filename,file); /* Does directory match the files path? */ ptr1 = strrchr(filename, '/'); if (ptr1 != NULL) { ptr1[0] = '\0'; complen = strlen(destdir); if (destdir[complen - 1] == '/') { /* don't include trailing slash (if any) in comparision */ complen--; } if (memcmp(filename, destdir, complen) == 0) { /* file is going to directory*/ return(1); } } } else if (errno == 2) { /* * This is okay. Probably a file that does not yet exist because * we have not copied it yet */ } else { sprintf(log_buffer, "File %s stat failed, errno = %d", destdir, errno); log_err(-1, id, log_buffer); } return(0); } /* End of is_file_going_to_dir() */ void req_deletejob( struct batch_request *preq) /* I */ { job *pjob; pjobexec_t *TJE = NULL; pjob = find_job(preq->rq_ind.rq_delete.rq_objname); if (pjob != NULL) { if (LOGLEVEL >= 3) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "deleting job"); } /* * We need to clear out the TJE starter slot if this job is in it. This * can occur if we qdel the job while the prologue is running. If we * don't remove ourself here then the TJE slot may remain with data that is * no longer valid, yet we will reuse the slot since it only matches on the * address of the pjob not on a jobid. This can lead to unexpected crashes. */ if (TMOMJobGetStartInfo(pjob, &TJE) == SUCCESS) { memset(TJE, 0, sizeof(pjobexec_t)); } /* assume success? */ mom_deljob(pjob); reply_ack(preq); } else { req_reject(PBSE_UNKJOBID, 0, preq, mom_host, "cannot locate job to delete"); } return; } /* END req_deletejob() */ /* * req_holdjob - checkpoint and terminate job */ void req_holdjob( struct batch_request *preq) { int rc; job *pjob; svrattrl *pal; attribute tmph; /* If checkpoint supported, do it and terminate the job */ /* otherwise, return PBSE_NOSUP */ if ((pjob = find_job(preq->rq_ind.rq_hold.rq_orig.rq_objname)) == NULL) { rc = PBSE_UNKJOBID; } else { /* propagate servers hold state to job */ clear_attr(&tmph, &job_attr_def[(int)JOB_ATR_hold]); if ((pal = (svrattrl *)GET_NEXT(preq->rq_ind.rq_hold.rq_orig.rq_attr)) != NULL) { job_attr_def[(int)JOB_ATR_hold].at_decode( &tmph, pal->al_name, NULL, pal->al_value); } if ((rc = start_checkpoint(pjob,1,preq)) != PBSE_NONE) req_reject(rc,0,preq,mom_host,"cannot checkpoint job"); /* unable to start checkpoint */ } return; } /* END req_holdjob() */ /* * req_checkpointjob - checkpoint and continue job */ void req_checkpointjob( struct batch_request *preq) { int rc; job *pjob; /* If checkpoint supported, do it and terminate the job */ /* otherwise, return PBSE_NOSUP */ pjob = find_job(preq->rq_ind.rq_manager.rq_objname); if (pjob == NULL) { rc = PBSE_UNKJOBID; req_reject(rc, 0, preq, mom_host, "job does not exist on mom"); } else { if ((rc = start_checkpoint(pjob, 0, preq)) != PBSE_NONE) req_reject(rc, 0, preq, mom_host, "cannot checkpoint job"); /* unable to start checkpoint */ } /* note, normally the reply to the server is in start_checkpoint() */ } void req_gpuctrl( struct batch_request *preq) /* I */ { char *id = "req_gpuctrl"; char *mom_node; char *gpuid; int gpumode = -1; int reset_perm = -1; int reset_vol = -1; #ifdef NVIDIA_GPUS int rc = -1; #endif /* NVIDIA_GPUS */ gpuid = preq->rq_ind.rq_gpuctrl.rq_gpuid; gpumode = preq->rq_ind.rq_gpuctrl.rq_gpumode; mom_node = preq->rq_ind.rq_gpuctrl.rq_momnode; reset_perm = preq->rq_ind.rq_gpuctrl.rq_reset_perm; reset_vol = preq->rq_ind.rq_gpuctrl.rq_reset_vol; #ifdef NVIDIA_GPUS if (LOGLEVEL >= 7) { sprintf( log_buffer, "GPU control request for node %s gpuid %s mode %d reset_perm %d reset_vol %d", mom_node, gpuid, gpumode, reset_perm, reset_vol); log_ext(-1, id, log_buffer, LOG_INFO); } if (!use_nvidia_gpu) { sprintf( log_buffer, "GPU control requests not active: node %s gpuid %s mode %d reset_perm %d reset_vol %d", mom_node, gpuid, gpumode, reset_perm, reset_vol); if (LOGLEVEL >= 3) { log_ext(-1, id, log_buffer, LOG_INFO); } req_reject(PBSE_NOSUP, 0, preq, NULL, NULL); return; } /* assume success? */ if (gpumode != -1) { rc = setgpumode(gpuid, gpumode); } else if ((reset_perm != -1) || (reset_vol != -1)) { rc = resetgpuecc(gpuid, reset_perm, reset_vol); } if (rc) { reply_ack(preq); /* * if we were successful changing the mode then we need to update the gpu * statuses */ if (gpumode != -1) { mom_server_all_update_gpustat(); } } else { req_reject(PBSE_RMSYSTEM, 0, preq, mom_host, "failed to set gpu status"); } #else sprintf( log_buffer, "GPU control requests not supported: node %s gpuid %s mode %d reset_perm %d reset_vol %d", mom_node, gpuid, gpumode, reset_perm, reset_vol); if (LOGLEVEL >= 3) { log_ext(-1, id, log_buffer, LOG_INFO); } req_reject(PBSE_NOSUP, 0, preq, NULL, NULL); #endif /* NVIDIA_GPUS */ return; } /* END req_deletejob() */ /* * Write text into a job's output file, * Return a PBS error code. */ int message_job( job *pjob, enum job_file jft, /* I */ char *text) { char *pstr = NULL; int len; int fds; int rc; char *id = "message_job"; if (pjob == NULL) { return(PBSE_UNKJOBID); } /* must be Mother Superior for this to make sense */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { log_err(errno, id, "cannot message job, not mother superior"); return(PBSE_MOMREJECT); } /* to prevent a security issue and also handle root-squashing slightly better, * fork and become the user before doing this. */ if ((rc = fork_me(-1)) == -1) { /* ERROR */ log_err(errno, id, "cannot fork child"); return(PBSE_SYSTEM); } else if (rc > 0) { /* parent */ return(PBSE_NONE); } /* only the child reaches here, become the user for root-squashing as well */ if (become_the_user(pjob) != PBSE_NONE) { /* log_buffer is populated by become_the_user */ log_err(errno, id, log_buffer); exit(PBSE_BADUSER); } len = is_joined(pjob); if (len == -1) jft = StdErr; /* only have stderr open */ else if (len == 1) jft = StdOut; /* only have stdout open */ if ((fds = open_std_file( pjob, jft, O_WRONLY | O_APPEND, pjob->ji_qs.ji_un.ji_momt.ji_exgid)) < 0) { sprintf(log_buffer, "cannot open %s file for job '%s' (msg: '%.64s')", (jft == StdErr) ? "stderr" : "stdout", pjob->ji_qs.ji_jobid, text); log_err(errno, id, log_buffer); exit(PBSE_MOMREJECT); } len = strlen(text); if (text[len - 1] != '\n') { if ((pstr = malloc(len + 2)) == NULL) { exit(PBSE_INTERNAL); } strcpy(pstr, text); pstr[len++] = '\n'; /* append new-line */ text = pstr; } rc = PBSE_NONE; alarm(alarm_time); if (write(fds, text, len) != len) { log_err(errno, id, "unable to write message to job"); rc = PBSE_INTERNAL; } alarm(0); if (close(fds) != 0) { log_err(errno, id, "unable to write message to job"); rc = PBSE_INTERNAL; } if (pstr != NULL) free(pstr); /* SUCCESS */ exit(0); } /* END message_job() */ /* * req_messagejob - Append message to job's output/error file */ void req_messagejob( struct batch_request *preq) { int ret = 0; job *pjob; pjob = find_job(preq->rq_ind.rq_message.rq_jid); if ((preq->rq_ind.rq_message.rq_file == PBS_BATCH_FileOpt_Default) || (preq->rq_ind.rq_message.rq_file & PBS_BATCH_FileOpt_OFlg)) { ret = message_job(pjob, StdOut, preq->rq_ind.rq_message.rq_text); } if ((preq->rq_ind.rq_message.rq_file & PBS_BATCH_FileOpt_EFlg) && (ret == 0)) { ret = message_job(pjob, StdErr, preq->rq_ind.rq_message.rq_text); } if (ret == PBSE_NONE) { /* message request successful */ reply_ack(preq); } else { req_reject(ret, 0, preq, mom_host, "cannot add message to job output/error buffer"); } } /* END req_messagejob() */ const char *TJobAttr[] = { "jobname", /* this set appears first as they show */ "job_owner", /* in a basic job status display */ "resc_used", "state", "in_queue", "at_server", /* 5 */ "account", /* the bulk of the attributes are in */ "chkpnt", /* alphabetic order for no good reason */ "ctime", "depend", "errpath", "exec_host", "exectime", "grouplst", "hold", "interactive", "join", "keep", "mailpnts", "mailuser", "mtime", /* 20 */ "outpath", "priority", "qtime", "rerunable", "resource", "session_id", "shell", "stagein", "stageout", "substate", "userlst", "variables", /* 32 */ "euser", /* execution user name for MOM */ "egroup", /* execution group name for MOM */ "hashname", /* job name hashed into 14 characters */ "hopcount", "qrank", "queuetype", "schedhint", /* 39 */ "security", "comment", "cookie", "altid", "etime", "exitstat", "forwardx11", "submitargs", "jobarrayid", "jobarrayreq", "umask", /* 50 */ "start_time", "start_count", "chkptdir", "chkptname", "chkpttime", "restartstat", "restartname", "faulttol", "comp_time", "reported", /* 60 */ "job_type", "inter_cmd", "proxy_user", #ifdef USEJOBCREATE "pagg_id", #endif /* USEJOBCREATE */ "job_id", #ifdef NVIDIA_GPUS "gpu_flags", #endif /* NVIDIA_GPUS */ NULL }; /* * req_modifyjob - service the Modify Job Request * * This request modifies a job's attributes. */ void req_modifyjob( struct batch_request *preq) /* I */ { int bad = 0; int i; attribute newattr[(int)JOB_ATR_LAST]; attribute *pattr; job *pjob; svrattrl *plist; int rc; char tmpLine[1024]; pjob = find_job(preq->rq_ind.rq_modify.rq_objname); if (pjob == NULL) { sprintf(tmpLine, "modify job failed, unknown job %s", preq->rq_ind.rq_modify.rq_objname); req_reject(PBSE_UNKJOBID, 0, preq, mom_host, tmpLine); return; } if (LOGLEVEL >= 3) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "modifying job"); } plist = (svrattrl *)GET_NEXT(preq->rq_ind.rq_modify.rq_attr); if (plist == NULL) { /* nothing to do */ reply_ack(preq); /* SUCCESS */ return; } /* modify the jobs attributes */ bad = 0; pattr = pjob->ji_wattr; /* call attr_atomic_set to decode and set a copy of the attributes */ rc = attr_atomic_set( plist, pattr, newattr, job_attr_def, JOB_ATR_LAST, -1, ATR_DFLAG_MGWR | ATR_DFLAG_MOM, &bad); if (rc != 0) { /* FAILURE - leave old values, free the new ones */ for (i = 0;i < JOB_ATR_LAST;i++) { job_attr_def[i].at_free(newattr + i); } /* cannot set attributes, return FAILURE */ req_reject(rc, 0, preq, mom_host, "cannot set attributes"); return; } /* OK, now copy the new values into the job attribute array */ for (i = 0;i < JOB_ATR_LAST;i++) { if (!(newattr[i].at_flags & ATR_VFLAG_MODIFY)) continue; if (LOGLEVEL >= 5) { char tmpLine[1024]; strcpy(tmpLine, "???"); if (newattr[i].at_type == ATR_TYPE_STR) { if (newattr[i].at_val.at_str != NULL) strncpy(tmpLine, newattr[i].at_val.at_str, sizeof(tmpLine)); } else if (newattr[i].at_type == ATR_TYPE_LONG) { sprintf(tmpLine, "%ld", newattr[i].at_val.at_long); } else if (newattr[i].at_type == ATR_TYPE_CHAR) { sprintf(tmpLine, "%c", newattr[i].at_val.at_char); } else if (newattr[i].at_type == ATR_TYPE_RESC) { sprintf(tmpLine, "%s", "RESC"); } else if (newattr[i].at_type == ATR_TYPE_ARST) { if (i == JOB_ATR_variables) { char *tmpPtr; tmpLine[0] = '\0'; tmpPtr = arst_string( "", &newattr[i]); if (tmpPtr != NULL) strncpy(tmpLine, tmpPtr, sizeof(tmpLine)); } else { sprintf(tmpLine, "%s", "ARST"); } } sprintf(log_buffer, "modifying type %d attribute %s of job (value: '%s')", newattr[i].at_type, (i <= JOB_ATR_checkpoint_name) ? TJobAttr[i] : "Unkn", tmpLine); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pjob != NULL) ? pjob->ji_qs.ji_jobid : "N/A", log_buffer); } /* END if (LOGLEVEL >= 5) */ if (job_attr_def[i].at_action != NULL) job_attr_def[i].at_action(&newattr[i], pjob, ATR_ACTION_ALTER); job_attr_def[i].at_free(pattr + i); if ((newattr[i].at_type == ATR_TYPE_LIST) || (newattr[i].at_type == ATR_TYPE_RESC)) { list_move(&newattr[i].at_val.at_list, &(pattr + i)->at_val.at_list); } #ifdef TNOT else if (newattr[i].at_type == ATR_TYPE_ARST) { set_arst(&(pattr + i)->at_val, newattr[i], SET); /* set_arst(patr,&temp,INCR) */ } #endif /* TNOT */ else { *(pattr + i) = newattr[i]; } (pattr + i)->at_flags = newattr[i].at_flags; } /* END for (i) */ /* note, the newattr[] attributes are on the stack, they go away auto */ if (rc == 0) { rc = mom_set_limits(pjob, SET_LIMIT_ALTER); } if (rc != 0) { req_reject(rc, bad, preq, mom_host, "cannot set limits"); return; } job_save(pjob, SAVEJOB_FULL); sprintf(log_buffer, msg_manager, msg_jobmod, preq->rq_user, preq->rq_host); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); /* SUCCESS */ reply_ack(preq); return; } /* END req_modifyjob() */ void req_shutdown( struct batch_request *preq) /* I */ { req_reject(PBSE_NOSUP, 0, preq, NULL, NULL); return; } #ifdef _CRAY /* * cray_susp_resum - special cray suspend/resume function */ static void cray_susp_resum( job *pjob, int which, struct batch_request *preq) { int i; int ct; task *ptask; pid_t pid; long sess; int sock; sock = preq->rq_conn; pid = fork_me(sock); if (pid > 0) { /* record pid in job for when child terminates */ pjob->ji_momsubt = pid; if (which == 1) { pjob->ji_mompost = post_suspend; /* save stop time for adjusting walltime */ pjob->ji_momstat = time_now; } else { pjob->ji_mompost = post_resume; } free_br(preq); return; } else if (pid == -1) { /* fork failed - still the main mom */ log_err(-1, id, "cannot fork child for cray suspend"); req_reject(PBSE_SYSTEM, errno, preq, NULL, NULL); return; } /* child of MOM, cannot update job struct */ for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { sess = ptask->ti_qs.ti_sid; for (ct = 0;ct < 3;ct++) { i = (which == 1) ? suspend(C_JOB, sess) : resume(C_JOB, sess); if (i == 0) break; if ((errno != EAGAIN) && (errno != EINTR)) break; } if (i == -1) { /* error */ req_reject(PBSE_SYSTEM, errno, preq, NULL, NULL); exit(1); } } reply_ack(preq); exit(0); } /* END cray_susp_resum() */ #endif /* _CRAY */ /* send a signal to all tasks on sisters */ int sigalltasks_sisters( job *pjob, int signum) { #ifndef NDEBUG char id[] = "sigalltasks_sisters"; #endif char *cookie; eventent *ep; int i; cookie = pjob->ji_wattr[(int)JOB_ATR_Cookie].at_val.at_str; for (i = 0;i < pjob->ji_numnodes;i++) { int ret; hnodent *np = &pjob->ji_hosts[i]; if (np->hn_node == pjob->ji_nodeid) /* this is me */ continue; DBPRT(("%s: sending sig%d to all tasks on sister %s\n", id, signum, np->hn_host)); if (np->hn_stream == -1) np->hn_stream = rpp_open(np->hn_host, pbs_rm_port, NULL); ep = event_alloc(IM_SIGNAL_TASK, np, TM_NULL_EVENT, TM_NULL_TASK); if (np->hn_stream == -1) { np->hn_stream = rpp_open(np->hn_host, pbs_rm_port, NULL); } ret = im_compose(np->hn_stream, pjob->ji_qs.ji_jobid, cookie, IM_SIGNAL_TASK, ep->ee_event, TM_NULL_TASK); if (ret != DIS_SUCCESS) { return ret; } ret = diswui(np->hn_stream, pjob->ji_nodeid); /* XXX */ if (ret != DIS_SUCCESS) { return ret; } ret = diswsi(np->hn_stream, TM_NULL_TASK); if (ret != DIS_SUCCESS) { return ret; } ret = diswsi(np->hn_stream, signum); if (ret != DIS_SUCCESS) { return ret; } ret = rpp_flush(np->hn_stream); if (ret != DIS_SUCCESS) { return ret; } } return(0); } /* END sigalltasks_sisters() */ static void resume_suspend( job *pjob, int susp, /* I (0=FALSE, 1=TRUE) */ struct batch_request *preq) { static char *id = "resume_suspend"; task *tp; int stat = 0; int savederr = 0; int signum; signum = (susp == 1) ? SIGSTOP : SIGCONT; if (LOGLEVEL >= 2) { sprintf(log_buffer, "%s: %s job", id, (susp == 1) ? "suspending" : "resuming"); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pjob != NULL) ? pjob->ji_qs.ji_jobid : "N/A", log_buffer); } /* Once upon a time, suspend/resume signals weren't propagated to sisters. Verily, the parallel jobs weren't correctly suspended or resumed. Then it was decided that SIGTSTP would be used instead of SIGSTOP so that specially patched MPI launchers could suspend the sisters. To keep serial jobs suspending, a delayed SIGSTOP was added after the SIGTSTP. Of course, non-MPI parallel jobs were still not suspended properly. Now we are sending SIGTSTP only to the top-level task on MS, then SIGSTOP to all tasks and given MOM the ability to propagate suspend/resume signals to sisters. The End. */ /* NOTE: format {suspend[:X]|resume[:X]} should be supported to allow job state change AND custom suspend/resume signal (NYI) */ if (susp == 1) { kill_task((task *)GET_NEXT(pjob->ji_tasks), SIGTSTP, 0); MUSleep(50000); } for (tp = (task *)GET_NEXT(pjob->ji_tasks); tp != NULL; tp = (task *)GET_NEXT(tp->ti_jobtask)) { if (tp->ti_qs.ti_status != TI_STATE_RUNNING) continue; DBPRT(("%s: inspecting %d from node %d\n", id, tp->ti_qs.ti_task, tp->ti_qs.ti_parentnode)); stat = kill_task(tp, signum, 0); if (stat < 0) { /* couldn't send signal, don't signal more tasks */ savederr = errno; break; } /* END if (stat < 0) */ } /* END for (tp) */ if (stat >= 0) { if (pjob->ji_numnodes > 1) { stat = sigalltasks_sisters(pjob, signum); if (stat < 0) { savederr = errno; } } } if (stat < 0) { /* We couldn't signal all the tasks, signal them back to their old state */ if (LOGLEVEL >= 1) { sprintf(log_buffer, "cannot send signal %s to tasks of job in %s (errno=%d %s) - attempt aborted", (susp == 1) ? "SIGSTOP" : "SIGCONT", id, savederr, pbs_strerror(savederr)); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, (pjob != NULL) ? pjob->ji_qs.ji_jobid : "N/A", log_buffer); } signum = (susp == 1) ? SIGCONT : SIGSTOP; for (tp = (task *)GET_NEXT(pjob->ji_tasks); tp != NULL; tp = (task *)GET_NEXT(tp->ti_jobtask)) { if (tp->ti_qs.ti_status != TI_STATE_RUNNING) continue; kill_task(tp, signum, 0); } if (pjob->ji_numnodes > 1) { sigalltasks_sisters(pjob, signum); } /* report suspend/resume failure */ req_reject(PBSE_SYSTEM, savederr, preq, NULL, NULL); return; } /* END if (stat < 0) */ /* signals sent to all tasks, now adjust job state */ if (susp == 1) { /* Successfully suspended, let's update status */ /* This is needed for calculating correct walltime */ pjob->ji_momstat = time_now; pjob->ji_qs.ji_substate = JOB_SUBSTATE_SUSPEND; pjob->ji_qs.ji_svrflags |= JOB_SVFLG_Suspend; if (LOGLEVEL >= 1) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pjob != NULL) ? pjob->ji_qs.ji_jobid : "N/A", "job suspended - adjusted job state"); } } else { pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; /* Ok, we resumed'em, we have set ji_momstat to the time we suspended the job. We use this to compute a new start-time for the job, so that walltime is computed correctly elsewhere */ if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_Suspend) { /* If it's suspended, update the start time */ pjob->ji_qs.ji_stime = pjob->ji_qs.ji_stime - pjob->ji_momstat + time_now; } /* clear the Suspend bit */ pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_Suspend; if (LOGLEVEL >= 1) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pjob != NULL) ? pjob->ji_qs.ji_jobid : "N/A", "job resumed - adjusted job state"); } } /* END else (susp != 0) */ /* acknowledge change to pbs_server */ reply_ack(preq); /* SUCCESS */ return; } /* END resume_suspend() */ /** * req_signaljob - issue (kill) a specified signal to a job * Signal may be either a numeric string or a signal name * with or without the "SIG" prefix. * * NOTE: process_request() set up as request handler via accept_conn() * * @see process_request->dispatch_request() - parent * @see req_signaljob() in server/req_signal.c - peer */ void req_signaljob( struct batch_request *preq) /* I */ { char id[] = "req_signaljob"; job *pjob; int sig; int numprocs=0; char *sname; struct sig_tbl *psigt; extern struct sig_tbl sig_tbl[]; pjob = find_job(preq->rq_ind.rq_signal.rq_jid); if (pjob == NULL) { req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); return; } sname = preq->rq_ind.rq_signal.rq_signame; if (LOGLEVEL >= 3) { sprintf(log_buffer, "signaling job with signal %s", sname); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } if (preq->rq_extend != NULL) { if (!strcasecmp("rerun", preq->rq_extend)) { pjob->ji_job_is_being_rerun = TRUE; } } if (!strcasecmp(sname, SIG_SUSPEND)) { if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING) { req_reject(PBSE_BADSTATE, 0, preq, NULL, NULL); } else { #ifdef _CRAY /* suspend/resume on Cray only */ cray_susp_resum(pjob, 1, preq); #else resume_suspend(pjob, 1, preq); #endif /* _CRAY */ } return; } if (!strcasecmp(sname, SIG_RESUME)) { if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_SUSPEND) { LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "resume request on job that is not suspended"); } #ifdef _CRAY cray_susp_resum(pjob, 0, preq); #else resume_suspend(pjob, 0, preq); #endif /* _CRAY */ return; } /* standard signal received - signal tasks but do not change job state */ if (isdigit((int)*sname)) { sig = atoi(sname); } else { if (!strncasecmp("SIG", sname, 3)) sname += 3; psigt = sig_tbl; while (psigt->sig_name != NULL) { if (!strcasecmp(sname, psigt->sig_name)) break; psigt++; } sig = psigt->sig_val; } if (sig < 0) { req_reject(PBSE_UNKSIG, 0, preq, NULL, NULL); return; } if ((sig == SIGTERM) && (pjob->ji_qs.ji_substate == JOB_SUBSTATE_SUSPEND)) { /* if job is suspended, resume, and then kill - allow job to clean up on sigterm */ kill_job(pjob, SIGCONT, id, "job is suspended, resume and kill"); sleep(1); } /* * When kill_job is launched, processes are killed and waitpid() should harvest the process * and takes action to send an obit. If no matching process exists, then an obit may never be * sent due to the current way that TORQUE's state machine works. */ numprocs = kill_job(pjob, sig, id, "killing job"); if ((numprocs == 0) && ((sig == 0)||(sig == SIGKILL)) && (pjob->ji_qs.ji_substate != JOB_SUBSTATE_OBIT)) { /* SIGNUL and no procs found, force job to exiting */ /* force issue of (another) job obit */ sprintf(log_buffer, "job recycled into exiting on SIGNULL/KILL from substate %d", pjob->ji_qs.ji_substate); LOG_EVENT( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; job_save(pjob, SAVEJOB_QUICK); exiting_tasks = 1; } reply_ack(preq); return; } /* END req_signaljob() */ /** * Encodes the used resource information (cput, mem, walltime, etc.) * about the given job. (The data is encoded in preparation for * being sent to the pbs_server.) * */ void encode_used( job *pjob, /* I */ tlist_head *phead) /* O */ { unsigned long lnum; int i; attribute *at; attribute_def *ad; resource *rs; at = &pjob->ji_wattr[JOB_ATR_resc_used]; ad = &job_attr_def[JOB_ATR_resc_used]; if ((at->at_flags & ATR_VFLAG_SET) == 0) { return; } for (rs = (resource *)GET_NEXT(at->at_val.at_list); rs != NULL; rs = (resource *)GET_NEXT(rs->rs_link)) { resource_def *rd = rs->rs_defin; attribute val; int rc; if ((rd->rs_flags & resc_access_perm) == 0) continue; val = rs->rs_value; /* copy resource attribute */ /* count up sisterhood too */ if (pjob->ji_resources != NULL) { lnum = 0; if (!strcmp(rd->rs_name, "cput")) { for (i = 0;i < pjob->ji_numnodes - 1;i++) { lnum += pjob->ji_resources[i].nr_cput; } } else if (!strcmp(rd->rs_name, "mem")) { for (i = 0;i < pjob->ji_numnodes - 1;i++) { lnum += pjob->ji_resources[i].nr_mem; } } else if (!strcmp(rd->rs_name, "vmem")) { for (i = 0;i < pjob->ji_numnodes - 1;i++) { lnum += pjob->ji_resources[i].nr_vmem; } } val.at_val.at_long += lnum; } rc = rd->rs_encode( &val, phead, ad->at_name, rd->rs_name, ATR_ENCODE_CLIENT); if (rc < 0) break; } /* END for (rs) */ return; } /* END encode_used() */ void encode_flagged_attrs( job *pjob, /* I */ tlist_head *phead) /* O */ { int index; attribute *at; attribute_def *ad; for (index = 0;(int)index < JOB_ATR_LAST;++index) { at = &pjob->ji_wattr[index]; ad = &job_attr_def[index]; if (at->at_flags & ATR_VFLAG_SEND) { /* turn off "need to send" flag */ at->at_flags &= ~ATR_VFLAG_SEND; if (LOGLEVEL >= 4) { sprintf(log_buffer, "encoding \"send flagged\" attr: %s", ad->at_name); LOG_EVENT( PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } ad->at_encode( at, phead, ad->at_name, NULL, ATR_ENCODE_CLIENT); } } } /* * req_stat_job - return the status of one (if id is specified) or all * jobs (if id is the null string). * * This is usually triggered due to a request from the pbs_server to learn * about this (or all) jobs. The server will query the MOM periodically * for this information. It is controlled by the pbs_server attributes * 'job_stat_rate' and 'poll_jobs'. * * This data is different than the occasional status update sent to * the server that tells the server the MOM's general stats (see mom_server_all_update_stat()). */ void req_stat_job( struct batch_request *preq) /* I */ { int all; char *name; job *pjob; struct batch_reply *preply = &preq->rq_reply; struct brp_status *pstat; /* * first, validate the name of the requested object, either * a single job or all jobs */ name = preq->rq_ind.rq_status.rq_id; if ((*name == '\0') || (*name == '@')) { all = 1; pjob = (job *)GET_NEXT(svr_alljobs); } else { all = 0; pjob = find_job(name); if (pjob == NULL) { req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); return; } } preply->brp_choice = BATCH_REPLY_CHOICE_Status; CLEAR_HEAD(preply->brp_un.brp_status); /* pass user-client privilege to encode_resc() */ resc_access_perm = preq->rq_perm & ATR_DFLAG_RDACC; for (;pjob;pjob = all ? (job *)GET_NEXT(pjob->ji_alljobs) : NULL) { if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) continue; /* not Mother Superior */ if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING) continue; /* allocate reply structure and fill in header portion */ pstat = (struct brp_status *)malloc(sizeof(struct brp_status)); assert(pstat != NULL); CLEAR_LINK(pstat->brp_stlink); pstat->brp_objtype = MGR_OBJ_JOB; strcpy(pstat->brp_objname, pjob->ji_qs.ji_jobid); CLEAR_HEAD(pstat->brp_attr); append_link(&preply->brp_un.brp_status, &pstat->brp_stlink, pstat); encode_used(pjob, &pstat->brp_attr); /* adds resources_used attr */ encode_flagged_attrs(pjob, &pstat->brp_attr); /* adds other flagged attrs */ } reply_send(preq); return; } /* END req_stat_job() */ /* * del_files - delete the files in a copy files or delete files request * * WARNING WARNING WARNING WARNING WARNING WARNING WARNING * * fork_to_user() must be called first so that useruid/gid is set up */ int del_files( struct batch_request *preq, /* I */ char *HDir, /* I (home directory) */ int setuserenv,/* I */ char **pbadfile) /* O */ { char id[] = "del_files"; int AsUser = FALSE; int UID0 = TRUE; int EUID0 = TRUE; struct rqfpair *pair; int rc = 0; char *path; char *pp; char *prmt; struct stat sb; #if NO_SPOOL_OUTPUT == 1 char path_alt[MAXPATHLEN + 1]; int rcstat; struct stat myspooldir; #endif #ifdef HAVE_WORDEXP job *pjob; wordexp_t pathexp; #endif path = malloc(sizeof(char)*(MAXPATHLEN + 1)); if (path==NULL) { add_bad_list(pbadfile,"malloc failed",1); return(-1); } /* * NOTE: may be called as root in TORQUE home dir * or as user in user homedir. Let's determine if we will * be permitted to run setXid()/setgroup calls. */ #ifndef __CYGWIN__ if (getuid() != 0) { #else if (IamUser() == 1) { #endif /* __CYGWIN__ */ UID0 = FALSE; } #ifndef __CYGWIN__ if (geteuid() != 0) { #else if (IamUser() == 1) { #endif /* __CYGWIN__ */ EUID0 = FALSE; } /* * Build up path of file using local name only, then unlink it. * The first set of files may have the STDJOBFILE * flag set, which we need to unlink as root, the others as the user. */ if (HDir != NULL) { /* ignore failure */ if (chdir(HDir) == -1) {} } for (pair = (struct rqfpair *)GET_NEXT(preq->rq_ind.rq_cpyfile.rq_pair); pair != NULL; pair = (struct rqfpair *)GET_NEXT(pair->fp_link)) { prmt = pair->fp_rmt; *path = '\0'; if (pair->fp_flag == STDJOBFILE) { /* the job's stdout/stderr */ #if NO_SPOOL_OUTPUT == 0 strncpy(path, path_spool, sizeof(path)); #endif /* !NO_SPOOL_OUTPUT */ } else if (AsUser == FALSE) { if (setgroups(ngroup,(gid_t *)groups) != 0 && UID0 == TRUE) { snprintf(log_buffer,sizeof(log_buffer), "%s: setgroups() for UID = %lu failed: %s", id, (unsigned long)useruid, strerror(errno)); add_bad_list(pbadfile,log_buffer,1); return(-1); } if (setgid(usergid) != 0 && EUID0 == TRUE) { snprintf(log_buffer,sizeof(log_buffer), "%s: setgid(%lu) for UID = %lu failed: %s", id, (unsigned long)usergid, (unsigned long)useruid, strerror(errno)); add_bad_list(pbadfile,log_buffer,1); return(-1); } /* run as the user */ if (setuid_ext(useruid, FALSE) != 0 && EUID0 == TRUE) { snprintf(log_buffer,sizeof(log_buffer), "%s: setuid(%lu) failed: %s", id, (unsigned long)useruid, strerror(errno)); add_bad_list(pbadfile,log_buffer,1); return(-1); } EUID0 = FALSE; UID0 = FALSE; if (HDir != NULL) { /* ignore failure of chdir */ if (chdir(HDir) == -1) {} } #ifdef HAVE_WORDEXP if (setuserenv && (pjob = find_job(preq->rq_ind.rq_cpyfile.rq_jobid)) != NULL) { InitUserEnv(pjob, NULL, NULL, NULL, NULL); *(vtable.v_envp + vtable.v_used) = NULL; environ = vtable.v_envp; } #endif AsUser = TRUE; } strcat(path, pair->fp_local); replace_checkpoint_path(path); if (local_or_remote(&prmt) == 0) { /* local file, do source and destination match? */ /* if so, don't delete it */ if (is_file_same(prmt, path) == 1) { continue; } } #ifdef HAVE_WORDEXP switch (wordexp(path, &pathexp, WRDE_NOCMD | WRDE_UNDEF)) { case 0: break; /* Successful */ case WRDE_NOSPACE: wordfree(&pathexp); /* fall through */ default: sprintf(log_buffer, "*** failed to delete files, expansion of %s failed", path); add_bad_list(pbadfile, log_buffer, 1); return(-1); /*NOTREACHED*/ break; } strcpy(path, pathexp.we_wordv[0]); wordfree(&pathexp); #endif if (stat(path, &sb) == 0) { if (S_ISDIR(sb.st_mode)) { /* have a directory, must append last segment */ /* of source name to it for the unlink */ #if NO_SPOOL_OUTPUT == 1 /* check for ~/.pbs_spool */ /* if it isn't a dir., use $HOME us usual */ strcpy(path_alt, path); strcat(path_alt, "/.pbs_spool/"); rcstat = stat(path_alt, &myspooldir); if ((rcstat == 0) && (S_ISDIR(myspooldir.st_mode)) && ((myspooldir.st_mode & S_IXOTH) == S_IXOTH)) { strcpy(path, path_alt); } else #endif /* NO_SPOOL_OUTPUT */ { strcat(path, "/"); } pp = strrchr(prmt, (int)'/'); if (pp != NULL) { ++pp; } else if ((pp = strrchr(prmt, (int)':'))) { ++pp; } else { pp = prmt; } strcat(path, pp); } } else { sprintf(log_buffer, "cannot stat %s", path); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, id, log_buffer); } /* * This should only be set it we are trying to delete a checkpoint restart * file that is in the moms default checkpoint directory. We change the * path to remove the jobs checkpoint directory not just the checkpoint itself. * Do not remove if it is in the remote checkpoint directory list */ if (pair->fp_flag == JOBCKPFILE) { char *ptr; ptr = strrchr(path,'/'); if (ptr != NULL) { ptr[0] = '\0'; } } if (!in_remote_checkpoint_dir(path)) { if (remtree(path) == -1) { if (errno != ENOENT) { sprintf(log_buffer, "Unable to delete file %s for user %s, error = %d %s", path, preq->rq_ind.rq_cpyfile.rq_user, errno, pbs_strerror(errno)); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, id, log_buffer); add_bad_list(pbadfile, log_buffer, 2); rc = errno; } #ifdef DEBUG } else { sprintf(log_buffer, "Deleted file %s for user %s", path, preq->rq_ind.rq_cpyfile.rq_user); LOG_EVENT( PBSEVENT_DEBUG, PBS_EVENTCLASS_FILE, id, log_buffer); #endif /* DEBUG */ } } } return(rc); } /* END del_files() */ void req_rerunjob( struct batch_request *preq) /* I */ { static char *id = "req_rerunjob"; job *pjob; int sock; int rc; int retrycnt = 0; pjob = find_job(preq->rq_ind.rq_rerun); if (pjob == NULL) { req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); return; } if (LOGLEVEL >= 3) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "rerunning job"); } /* fork to send files back */ if ((rc = fork_me(preq->rq_conn)) > 0) { free_br(preq); /* parent - note leave connection open */ return; } else if (rc < 0) { req_reject(-rc, 0, preq, NULL, NULL); return; } /* Child process ... for each standard file generate and send a Job Files request(s). * No message handler function is needed because return_file blocks and waits for reply. * This is acceptable because we are a child process, not pbs_mom. */ retry: sock = mom_open_socket_to_jobs_server(pjob, id, NULL); if (sock < 0) { retrycnt++; if (retrycnt < 10) { sleep(1); goto retry; } /* FAILURE */ req_reject(PBSE_NOSERVER, 0, preq, NULL, NULL); exit(0); } if (((rc = return_file(pjob, StdOut, sock, TRUE)) != 0) || ((rc = return_file(pjob, StdErr, sock, TRUE)) != 0) || ((rc = return_file(pjob, Checkpoint, sock, TRUE)) != 0)) { /* FAILURE - cannot report file to server */ LOG_EVENT( PBSEVENT_ERROR, PBS_EVENTCLASS_REQUEST, id, "cannot move output files to server"); req_reject(rc, 0, preq, NULL, NULL); } else { /* SUCCESS */ reply_ack(preq); close(sock); exit(0); } return; } /* END req_rerunjob() */ void req_returnfiles( struct batch_request *preq) { struct job *pjob; int sock; static char *id = "req_returnfiles"; pjob = find_job(preq->rq_ind.rq_returnfiles.rq_jobid); if (pjob != NULL) { retry: sock = mom_open_socket_to_jobs_server(pjob, id, NULL); if (sock < 0) { /* XXX TODO */ sprintf(log_buffer, "mom_open_socket_to_jobs_server FAILED to get socket: %d for job %s", sock, pjob->ji_qs.ji_jobid); log_err(-1, id, log_buffer); sleep(1); goto retry; } if (preq->rq_ind.rq_returnfiles.rq_return_stdout) { return_file(pjob, StdOut, sock, FALSE); } if (preq->rq_ind.rq_returnfiles.rq_return_stderr) { return_file(pjob, StdErr, sock, FALSE); } reply_ack(preq); close(sock); } else { req_reject(PBSE_UNKJOBID, 0, preq, mom_host, "cannot locate job"); } return; } /* * sys_copy - issue system call to copy file * * Check error and retry as required */ static int sys_copy( int rmtflg, /* I */ char *ag2, /* I (is this source or destination?) */ char *ag3, /* I (is this source or destination?) */ int conn) /* I */ { char *ag0; char *ag1; int i; static char *id = "sys_copy"; int loop; int rc; sprintf(rcperr, "%srcperr.%ld", path_spool, (long)getpid()); if (rmtflg == 0) { /* local copy */ ag0 = "/bin/cp"; ag1 = "-rp"; } else { ag0 = rcp_path; ag1 = rcp_args; } if (LOGLEVEL >= 6) { sprintf(log_buffer, "executing copy command: %s %s %s %s", ag0, ag1, ag2, ag3); log_ext(-1, id, log_buffer, LOG_DEBUG); } for (loop = 1;loop < 4;++loop) { if ((rc = fork()) > 0) { /* Parent - wait for copy to complete */ while (((i = wait(&rc)) < 0) && (errno == EINTR)); if (i == -1) { rc = (20000 + errno); /* 200xx is error on wait */ } else if (WIFEXITED(rc)) { if ((rc = WEXITSTATUS(rc)) == 0) { return(rc); /* good, stop now */ } } else if (WIFSTOPPED(rc)) { rc = (30000 + WSTOPSIG(rc)); /* 300xx is stopped */ } else if (WIFSIGNALED(rc)) { rc = (40000 + WTERMSIG(rc)); /* 400xx is signaled */ } } else if (rc < 0) { rc = errno + 10000; /* error on fork (100xx), retry */ } else { int fd; /* child - exec the copy command */ rpp_terminate(); close(conn); /* redirect stderr to make error from rcp available to MOM */ unlink(rcperr); /* likely failing, ignore */ if ((fd = open(rcperr, O_RDWR | O_CREAT | O_EXCL, 0644)) < 0) { sprintf(log_buffer, "can't open %s, error = %d %s", rcperr, errno, pbs_strerror(errno)); log_err(errno, id, log_buffer); exit(12); }; if (fd != 2) { dup2(fd, 2); close(fd); } /* NOTE: arg2 should be source, arg3 should be destination */ execl(ag0, ag0, ag1, ag2, ag3, NULL); /* reached only if execl() fails */ sprintf(log_buffer, "exec of command '%s %s %s %s' failed, errno=%d %s", ag0, ag1, ag2, ag3, errno, pbs_strerror(errno)); log_err(errno, id, log_buffer); exit(13); /* 13, an unlucky number */ } /* END else ((rc = fork()) > 0) */ /* copy did not work, try again */ if ((loop % 2) == 0) sleep(loop / 2 * 3 + 1); } /* END for (loop) */ /* tried a bunch of times, just give up */ sprintf(log_buffer, "command '%s %s %s %s' failed with status=%d, giving up after %d attempts", ag0, ag1, ag2, ag3, rc, loop); log_err(-1, id, log_buffer); return(rc); } /* END sys_copy() */ /* * req_cpyfile - process the Copy Files request from the server to dispose * of output from the job. This is done by a child of MOM since it * might take time. * * The supplied PBS means of moving the file is by "rcp". * A site may wish to change this. */ /* NOTE: --------------------------------------------------------------------------- PBS_BATCH_RunJob (received from sched) req_runjob svr_startjob svr_stagein ----------------------> req_cpyfile fork_to_user[1] svr_strtjob2 send_job PBSD_queuejob ----> req_quejob[2] PBSD_jscript -----> req_jobscript PBSD_rdytocmt ----> req_rdytocommit PBSD_commit ------> req_commit[3] [1] job not in svr_alljobs list (pjob == NULL) [2] added to svr_newjobs list [3] deleted from svr_newjobs, added to srv_alljobs therefore it is normal for pjob to be NULL in req_cpyfile. */ void req_cpyfile( struct batch_request *preq) /* I */ { char id[] = "req_cpyfile"; char *arg2 = NULL; char *arg3 = NULL; int bad_files = 0; char *bad_list = NULL; int dir = 0; int from_spool = 0; /* boolean - set if file must be removed from spool after copy */ int len; char localname[MAXPATHLEN + 1]; /* used only for in-bound */ struct rqfpair *pair = NULL; char *prmt; int rc; int rmtflag = 0; #if NO_SPOOL_OUTPUT == 0 char undelname[MAXPATHLEN + 1]; #endif /* !NO_SPOOL_OUTPUT */ #ifdef _CRAY char tmpdirname[MAXPATHLEN + 1]; #endif /* _CRAY */ char localname_alt[MAXPATHLEN + 1]; struct stat myspooldir; int rcstat; char homespool[MAXPATHLEN + 1]; int havehomespool; char EMsg[1024]; char HDir[1024]; job *pjob = NULL; #ifdef HAVE_WORDEXP int madefaketmpdir = 0; int usedfaketmpdir = 0; wordexp_t arg2exp, arg3exp; int arg2index = -1; char faketmpdir[1024]; int wordexperr = 0; #endif /* there is nothing to copy */ if (spoolasfinalname == TRUE) { reply_ack(preq); return; } if (LOGLEVEL >= 3) { pair = (struct rqfpair *)GET_NEXT(preq->rq_ind.rq_cpyfile.rq_pair); if ((pair != NULL) && (pair->fp_rmt != NULL)) { sprintf(log_buffer, "attempting to copy file '%s'", pair->fp_rmt); } else { sprintf(log_buffer, "copy file request is corrupt"); } LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, preq->rq_ind.rq_cpyfile.rq_jobid, log_buffer); } rc = (int)fork_to_user(preq, TRUE, HDir, EMsg); if (rc < 0) { char tmpLine[1024]; /* FAILURE */ req_reject(-rc, 0, preq, mom_host, EMsg); if ((rc != -PBSE_SYSTEM) && (rc != -PBSE_BADUSER)) { sprintf(tmpLine, "fork_to_user failed with rc=%d '%s' - exiting", rc, EMsg); log_err(errno, id, tmpLine); exit(rc); } sprintf(tmpLine, "fork_to_user failed with rc=%d '%s' - returning failure", rc, EMsg); log_err(errno, id, tmpLine); return; } /* END if (rc < 0) */ if (rc > 0) { /* parent - continue with other tasks */ /* SUCCESS */ return; } /* child */ /* now running as user in the user's home directory */ #if NO_SPOOL_OUTPUT == 1 snprintf(homespool, sizeof(homespool), "%s/.pbs_spool/", HDir); rcstat = stat(homespool, &myspooldir); if ((rcstat == 0) && S_ISDIR(myspooldir.st_mode)) { havehomespool = 1; } else { havehomespool = 0; } #else /* NO_SPOOL_OUTPUT == 1 */ homespool[0] = '\0'; havehomespool = 0; #endif /* END NO_SPOOL_OUTPUT == 1 */ if ((havehomespool == 0) && (TNoSpoolDirList[0] != NULL)) { int dindex; char *wdir; if ((pjob = find_job(preq->rq_ind.rq_cpyfile.rq_jobid)) == NULL) { wdir = NULL; } else { wdir = get_job_envvar(pjob, "PBS_O_WORKDIR"); } if (wdir != NULL) { /* check if job's work dir matches the no-spool directory list */ for (dindex = 0;dindex < TMAX_NSDCOUNT;dindex++) { if (TNoSpoolDirList[dindex] == NULL) break; if (!strcasecmp(TNoSpoolDirList[dindex], "$WORKDIR") || !strcmp(TNoSpoolDirList[dindex], "*")) { havehomespool = 1; strncpy(homespool, wdir, sizeof(homespool)); break; } if (!strncmp(TNoSpoolDirList[dindex], wdir, strlen(TNoSpoolDirList[dindex]))) { havehomespool = 1; strncpy(homespool, wdir, sizeof(homespool)); break; } } /* END for (dindex) */ } /* END if (wdir != NULL) */ } /* END if ((havehomespool == 0) && (TNoSpoolDirList != NULL)) */ #ifdef HAVE_WORDEXP faketmpdir[0] = '\0'; if ((pjob = find_job(preq->rq_ind.rq_cpyfile.rq_jobid)) == NULL) { /* This is a stagein which happens before the job struct to sent to MOM * or a checkpoint file coming in. * This limits the available variables we can use. fork_to_user() * has already set PBS_JOBID and HOME for us. Now just fake a TMPDIR * if we need it. */ pjob = job_alloc(); if (pjob == NULL) { /* FAILURE - in child process */ sprintf(log_buffer,"alloc failed with errno=%d - returning failure", errno); log_err(errno,id,log_buffer); bad_files = 1; goto error; } strcpy(pjob->ji_qs.ji_jobid, preq->rq_ind.rq_cpyfile.rq_jobid); if (TTmpDirName(pjob, faketmpdir)) { if (!mkdirtree(faketmpdir, 0755)) { char *envstr; envstr = malloc((strlen("TMPDIR=") + strlen(faketmpdir) + 1) * sizeof(char)); if (envstr == NULL) { /* FAILURE - in child process */ sprintf(log_buffer,"alloc failed with errno=%d - returning failure", errno); log_err(errno,id,log_buffer); bad_files = 1; goto error; } sprintf(envstr, "TMPDIR=%s", faketmpdir); putenv(envstr); madefaketmpdir = 1; } } } else { InitUserEnv(pjob, NULL, NULL, NULL, NULL); *(vtable.v_envp + vtable.v_used) = NULL; environ = vtable.v_envp; } #endif /* END HAVE_WORDEXP */ /* build up cp/rcp command(s), one per file pair */ arg2 = malloc(sizeof(char)*(MAXPATHLEN + 1)); arg3 = malloc(sizeof(char)*(MAXPATHLEN + 1)); if ((arg2==NULL) || (arg3==NULL)) { /* FAILURE - in child process */ sprintf(log_buffer,"alloc failed with errno=%d - returning failure", errno); log_err(errno,id,log_buffer); bad_files = 1; goto error; } dir = preq->rq_ind.rq_cpyfile.rq_dir; for (pair = (struct rqfpair *)GET_NEXT(preq->rq_ind.rq_cpyfile.rq_pair); pair != NULL; pair = (struct rqfpair *)GET_NEXT(pair->fp_link)) { if ((pair->fp_rmt != NULL) && (strstr(pair->fp_rmt, ":/dev/null"))) { /* ignore copies to/from /dev/null */ continue; } from_spool = 0; prmt = pair->fp_rmt; if (local_or_remote(&prmt) == 0) { /* destination host is this host, use cp */ rmtflag = 0; } else { /* destination host is another, use (pbs_)rcp */ rmtflag = 1; } /* which way to copy, in or out? */ if ((dir == STAGE_DIR_OUT) || (dir == CKPT_DIR_OUT)) { /* * out bound copy ... * build "from" path name, local to this system */ if (pair->fp_flag == STDJOBFILE) { #if NO_SPOOL_OUTPUT == 0 if (havehomespool == 1) { /* only use spooldir if the job file exists */ strcpy(localname_alt, homespool); strcat(localname_alt, "/"); strcat(localname_alt, pair->fp_local); rcstat = stat(localname_alt, &myspooldir); if ((rcstat == 0) && S_ISREG(myspooldir.st_mode)) { strcpy(localname, localname_alt); } else { /* what should be done here??? */ strcpy(localname, localname_alt); } } else { /* stdout | stderr from MOM's spool area (ie, /var/spool/torque/spool ) */ strcpy(localname, path_spool); strcat(localname, pair->fp_local); /* from location */ from_spool = 1; /* flag as being in spool dir */ } #else strcpy(localname, pair->fp_local); /* from location */ if (havehomespool) { /* only use ~/.pbs_spool if the file actually exists */ strcpy(localname_alt, homespool); strcat(localname_alt, "/"); strcat(localname_alt, pair->fp_local); rcstat = stat(localname_alt, &myspooldir); if ((rcstat == 0) && S_ISREG(myspooldir.st_mode)) { strcpy(localname, localname_alt); } else { /* what should be done here??? */ strcpy(localname, localname_alt); } } #endif /* NO_SPOOL_OUTPUT */ } /* END if (pair->fp_flag == STDJOBFILE) */ else if (pair->fp_flag == JOBCKPFILE) { strncpy(localname, pair->fp_local, sizeof(localname) - 1); /* from location */ replace_checkpoint_path(localname); /* * If the checkpoint directory * is in the the TRemChkptDirList then we do not transfer since directory * is remotely mounted. */ if (in_remote_checkpoint_dir(localname)) { continue; } } /* END if (pair->fp_flag == JOBCKPFILE) */ else { /* user-supplied stage-out file */ strncpy(localname, pair->fp_local, sizeof(localname) - 1); /* from location */ } #if SRFS /* Is this file part of $BIGDIR or $FASTDIR ? */ if (!strncmp(localname, "/BIGDIR", 7)) { sprintf(tmpname, "%s/%s", tmpdirname(var_value("BIGDIR", preq->rq_ind.rq_cpyfile.rq_jobid)), &localname[7]); strcpy(localname, tmpname); } else if (!strncmp(localname, "/FASTDIR", 8)) { sprintf(tmpname, "%s/%s", tmpdirname(var_value("BIGDIR", preq->rq_ind.rq_cpyfile.rq_jobid)), &localname[8]); strcpy(localname, tmpname); } #endif /* SRFS */ strcpy(arg2, localname); /* take (remote) destination name from request */ *arg3 = '\0'; if (rmtflag) { /* using rcp, need to prepend the owner name */ strcat(arg3, preq->rq_ind.rq_cpyfile.rq_owner); strcat(arg3, "@"); } strcat(arg3, prmt); } /* END if (dir == STAGE_DIR_OUT) */ else { /* in bound (stage-in) file */ /* take (remote) source name from request */ strcpy(arg3, pair->fp_local); if (pair->fp_flag == JOBCKPFILE) { char needdir[MAXPATHLEN + 1]; int saveumask; char *ptr; replace_checkpoint_path(arg3); /* * If the checkpoint directory * is in the the TRemChkptDirList then we do not transfer since directory * is remotely mounted. */ if (in_remote_checkpoint_dir(arg3)) { continue; } /* * We may need to create the directory for this inbound checkpoint / * restart file. If the last segment of the path does not exist then * create it. */ strcpy(needdir,arg3); ptr = strrchr(needdir,'/'); if (ptr != NULL) { ptr[0] = '\0'; } saveumask = umask(0000); if ((mkdir(needdir, 01777) == -1) && (errno != EEXIST)) { log_err(errno, id, "Failed to create jobs checkpoint directory"); } umask(saveumask); } /* END if (pair->fp_flag == JOBCKPFILE) */ *arg2 = '\0'; if (rmtflag) { /* using rcp, need to prepend the owner name */ strcat(arg2, preq->rq_ind.rq_cpyfile.rq_owner); strcat(arg2, "@"); } strcat(arg2, prmt); } /* END else (dir == STAGE_DIR_OUT) */ #ifdef HAVE_WORDEXP /* Expand and verify arg2 (source path) */ switch (wordexp(arg2, &arg2exp, WRDE_NOCMD | WRDE_UNDEF)) { case 0: wordexperr = 0; break; /* Successful */ case WRDE_NOSPACE: wordfree(&arg2exp); /* fall through */ default: sprintf(log_buffer, "Failed to expand source path in data staging: %s", arg2); add_bad_list(&bad_list, log_buffer, 2); bad_files = 1; wordexperr = 1; /* ensure we don't attempt a second source file */ goto error; /*NOTREACHED*/ break; } /* END switch () */ /* Expand and verify arg3 (destination path) */ switch (wordexp(arg3, &arg3exp, WRDE_NOCMD | WRDE_UNDEF)) { case 0: /* success - allow if word count is 1 */ if (arg3exp.we_wordc == 1) { strcpy(arg3, arg3exp.we_wordv[0]); wordfree(&arg3exp); wordexperr = 0; break; /* Successful */ } /* fall through */ case WRDE_NOSPACE: wordfree(&arg3exp); /* fall through */ default: sprintf(log_buffer, "Failed to expand destination path in data staging: %s", arg3); add_bad_list(&bad_list, log_buffer, 2); bad_files = 1; wordexperr = 1; /* ensure we don't attempt a second destination file */ goto error; break; } /* END switch () */ /* NOTE: more than one word is only allowed for arg2 (source) */ arg2index = -1; nextword: arg2index++; if (arg2index >= (int)arg2exp.we_wordc) { /* no more words */ wordfree(&arg2exp); continue; } strcpy(arg2, arg2exp.we_wordv[arg2index]); if (dir == STAGE_DIR_OUT) { strcpy(localname, arg2); } /* if we made a fake TMPDIR, and we are using it, don't delete after stagein */ if (madefaketmpdir && (faketmpdir[0] != '\0') && !strncmp(faketmpdir, arg3, strlen(faketmpdir))) { usedfaketmpdir = 1; } else { usedfaketmpdir = 0; } #endif /* HAVE_WORDEXP */ if ((rmtflag == 0) && ((is_file_same(arg2, arg3) == 1) || (is_file_going_to_dir(arg2, arg3) == 1))) { /* * If this is a local file then don't copy it * if source file and destination file are the same file or * if the destination (arg3) is a directory not a file name * and the source file (arg2) is in the destination directory (arg3) */ continue; } if ((rc = sys_copy(rmtflag, arg2, arg3, preq->rq_conn)) != 0) { FILE *fp; /* copy failed */ bad_files = 1; sprintf(log_buffer, "Unable to copy file %s to %s", arg2, arg3); add_bad_list(&bad_list, log_buffer, 2); log_err(-1, id, log_buffer); /* copy message from rcp as well */ if ((fp = fopen(rcperr, "r")) != NULL) { add_bad_list(&bad_list, "*** error from copy", 1); while (fgets(log_buffer, LOG_BUF_SIZE, fp) != NULL) { len = strlen(log_buffer) - 1; if (log_buffer[len] == '\n') log_buffer[len] = '\0'; add_bad_list(&bad_list, log_buffer, 1); } fclose(fp); add_bad_list(&bad_list, "*** end error output", 1); } error: if ((dir == STAGE_DIR_IN) || (dir == CKPT_DIR_IN)) { /* delete the stage_in files that were just copied in */ /* NOTE: running as user in user homedir */ del_files(preq, NULL, 1, &bad_list); #if NO_SPOOL_OUTPUT == 0 } else if (from_spool == 1) { /* copy out of spool */ /* Copying out files and in spool area ... */ /* move to "undelivered" directory */ strncpy(localname, path_spool, sizeof(localname)); strncat(localname, pair->fp_local, (sizeof(localname) - strlen(localname) - 1)); strncpy(undelname, path_undeliv, sizeof(undelname)); strncat(undelname, pair->fp_local, (sizeof(undelname) - strlen(undelname) - 1)); if (rename(localname, undelname) == 0) { add_bad_list(&bad_list, output_retained, 1); add_bad_list(&bad_list, undelname, 0); } else { sprintf(log_buffer, "Unable to rename %s to %s", localname, undelname); log_err(errno, id, log_buffer); } #endif /* !NO_SPOOL_OUTPUT */ } if ((dir == STAGE_DIR_IN) || (dir == CKPT_DIR_IN)) { unlink(rcperr); break; } } /* END if ((rc = sys_copy(rmtflag,arg2,arg3,preq->rq_conn)) != 0) */ else { /* Copy in/out succeeded */ if (LOGLEVEL >= 7) { sprintf(log_buffer,"copy succeeded (%s) from (%s) to (%s)\n", (dir == 0)? "In" : "Out", arg2, arg3); log_ext(-1, id, log_buffer, LOG_DEBUG); } if (dir == STAGE_DIR_OUT) { /* have copied out, ok to remove local one */ if (remtree(localname) < 0) { sprintf(log_buffer, msg_err_unlink, "stage out", localname); log_err(errno, id, log_buffer); add_bad_list(&bad_list, log_buffer, 2); bad_files = 1; } } else if (dir == CKPT_DIR_OUT) { /* * we need to clean up the job checkpoint file * the job directory gets deleted when job is done */ /* * If the checkpoint file * is in the the TRemChkptDirList then we do not delete since directory * is remotely mounted. */ if (in_remote_checkpoint_dir(localname)) { continue; } if (LOGLEVEL >= 7) { sprintf(log_buffer,"removing checkpoint file (%s)\n", localname); log_ext(-1, id, log_buffer, LOG_DEBUG); } /* have copied out, ok to remove local one */ if (remtree(localname) < 0) { sprintf(log_buffer, msg_err_unlink, "checkpoint", localname); log_err(errno, id, log_buffer); add_bad_list(&bad_list, log_buffer, 2); bad_files = 1; } } } unlink(rcperr); #ifdef HAVE_WORDEXP if (!wordexperr) goto nextword; /* ugh, it's hard to use a real loop when your feature is #ifdef's out */ #endif } /* END for (pair) */ #ifdef HAVE_WORDEXP if (madefaketmpdir && !usedfaketmpdir) { remtree(faketmpdir); } #endif if (bad_files) { reply_text(preq, PBSE_NOCOPYFILE, bad_list); log_err(-1,id,bad_list); } else { reply_ack(preq); } /* we are the child, exit not return */ /* SUCCESS */ exit(0); } /* END req_cpyfile() */ /* * req_delfile - delete the specifled output/staged files */ void req_delfile( struct batch_request *preq) /* I */ { int rc; char *bad_list = NULL; char HDir[1024]; char EMsg[1024]; rc = (int)fork_to_user(preq, FALSE, HDir, EMsg); if (rc < 0) { /* FAILURE */ req_reject(-rc, 0, preq, mom_host, EMsg); return; } if (rc > 0) { /* parent */ /* continue with other tasks */ /* SUCCESS */ return; } /* child */ /* running as root in TORQUE homedir */ /* delete the files */ if ((rc = del_files(preq, HDir, 1, &bad_list))) { /* FAILURE */ reply_text(preq, rc, bad_list); exit(0); } /* SUCCESS */ reply_ack(preq); /* we are the child, exit not return */ exit(0); } /* END req_delfile() */ /* END requests.c */