/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ /* * Functions which provide basic operation on the job structure * * Included public functions are: * * job_abt abort (remove from server) a job * job_alloc allocate job struct and initialize defaults * job_free free space allocated to the job structure and its * childern structures. * svr_job_purge purge job from server * * job_clone clones a job (for use with job_arrays) * job_clone_wt work task for cloning a job * * Include private function: * job_init_wattr() initialize job working pbs_attribute array to "unspecified" * * NOTE: for multi-threaded TORQUE, all functions in here except svr_find_job assume that * the caller holds any relevant mutexes */ #include /* the master config generated by configure */ #include "job_func.h" #include #include #include #include #include #ifndef SIGKILL #include #endif #if __STDC__ != 1 #include #endif #include #include #include #include #include #include #include #include "pbs_ifl.h" #include "list_link.h" #include "work_task.h" #include "attribute.h" #include "resource.h" #include "server_limits.h" #include "server.h" #include "queue.h" #include "batch_request.h" #include "pbs_job.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "../lib/Libifl/lib_ifl.h" #include "pbs_error.h" #include "svrfunc.h" #include "acct.h" #include "net_connect.h" #include "portability.h" #include "array.h" #include "pbs_job.h" #include "resizable_array.h" #include "dynamic_string.h" #include "svr_func.h" /* get_svr_attr_* */ #include "issue_request.h" /* release_req */ #include "ji_mutex.h" #include "user_info.h" #include "mutex_mgr.hpp" #include "job_route.h" /* job_route */ #ifndef TRUE #define TRUE 1 #define FALSE 0 #endif #define MAXLINE 1024 extern int LOGLEVEL; int conn_qsub(char *, long, char *); /* External functions */ extern void cleanup_restart_file(job *); extern struct batch_request *setup_cpyfiles(struct batch_request *,job *,char*,char *,int,int); extern int job_log_open(char *, char *); extern int log_job_record(const char *buf); extern void check_job_log(struct work_task *ptask); int issue_signal(job **, const char *, void(*)(batch_request *), void *, char *); /* Local Private Functions */ static void job_init_wattr(job *); /* Global Data items */ struct all_jobs alljobs; extern struct all_jobs array_summary; int check_job_log_started = 0; extern struct server server; extern int queue_rank; extern char *path_arrays; extern char *msg_abt_err; extern char *path_jobs; extern char *path_spool; extern char *path_aux; extern char server_name[]; extern int LOGLEVEL; extern char *path_checkpoint; extern char *path_jobinfo_log; extern char *log_file; extern char *job_log_file; static void send_qsub_delmsg( job *pjob, /* I */ const char *text) /* I */ { char *phost; pbs_attribute *pattri; int qsub_sock; phost = arst_string((char *)"PBS_O_HOST", &pjob->ji_wattr[JOB_ATR_variables]); if ((phost == NULL) || ((phost = strchr(phost, '=')) == NULL)) { return; } pattri = &pjob->ji_wattr[JOB_ATR_interactive]; qsub_sock = conn_qsub(phost + 1, pattri->at_val.at_long, NULL); if (qsub_sock < 0) { return; } if (write_ac_socket(qsub_sock, "PBS: ", 5) == -1) { return; } if (write_ac_socket(qsub_sock, text, strlen(text)) == -1) { return; } close(qsub_sock); return; } /* END send_qsub_delmsg() */ /* * remtree - remove a tree (or single file) * * returns 0 on success * -1 on failure */ static int remtree( char *dirname) { DIR *dir; struct dirent *pdir; char namebuf[MAXPATHLEN]; char log_buf[LOCAL_LOG_BUF_SIZE]; char *filnam; int i; int rtnv = 0; #if defined(HAVE_STRUCT_STAT64) && defined(HAVE_STAT64) && defined(LARGEFILE_WORKS) struct stat64 sb; #else struct stat sb; #endif #if defined(HAVE_STRUCT_STAT64) && defined(HAVE_STAT64) && defined(LARGEFILE_WORKS) if (lstat64(dirname, &sb) == -1) #else if (lstat(dirname, &sb) == -1) #endif { if (errno != ENOENT) log_err(errno, __func__, "stat"); return(-1); } if (S_ISDIR(sb.st_mode)) { if ((dir = opendir(dirname)) == NULL) { if (errno != ENOENT) log_err(errno, __func__, "opendir"); return(-1); } snprintf(namebuf, sizeof(namebuf), "%s/", dirname); i = strlen(namebuf); filnam = &namebuf[i]; while ((pdir = readdir(dir)) != NULL) { if ((pdir->d_name[0] == '.') && ((pdir->d_name[1] == '\0') || (pdir->d_name[1] == '.'))) continue; strcpy(filnam, pdir->d_name); #if defined(HAVE_STRUCT_STAT64) && defined(HAVE_STAT64) && defined(LARGEFILE_WORKS) if (lstat64(namebuf, &sb) == -1) #else if (lstat(namebuf, &sb) == -1) #endif { log_err(errno, __func__, "stat"); rtnv = -1; continue; } if (S_ISDIR(sb.st_mode)) { rtnv = remtree(namebuf); } else if (unlink(namebuf) < 0) { if (errno != ENOENT) { sprintf(log_buf, "unlink failed on %s", namebuf); log_err(errno, __func__, log_buf); rtnv = -1; } } else if (LOGLEVEL >= 7) { sprintf(log_buf, "unlink(1) succeeded on %s", namebuf); log_ext(-1, __func__, log_buf, LOG_DEBUG); } } /* END while ((pdir = readdir(dir)) != NULL) */ closedir(dir); if (rmdir(dirname) < 0) { if ((errno != ENOENT) && (errno != EINVAL)) { sprintf(log_buf, "rmdir failed on %s", dirname); log_err(errno, __func__, log_buf); rtnv = -1; } } else if (LOGLEVEL >= 7) { sprintf(log_buf, "rmdir succeeded on %s", dirname); log_ext(-1, __func__, log_buf, LOG_DEBUG); } } else if (unlink(dirname) < 0) { sprintf(log_buf, "unlink failed on %s", dirname); log_err(errno, __func__, log_buf); rtnv = -1; } else if (LOGLEVEL >= 7) { sprintf(log_buf, "unlink(2) succeeded on %s", dirname); log_ext(-1, __func__, log_buf, LOG_DEBUG); } return(rtnv); } /* END remtree() */ /* * job_abt - abort a job * * The job removed from the system and a mail message is sent * to the job owner. */ /* NOTE: this routine is called under the following conditions: * 1) by req_deletejob whenever deleting a job that is not running, * not transitting, not exiting and does not have a checkpoint * file on the mom. * 2) by req_deletearray whenever deleting a job that is not running, * not transitting, not in prerun, not exiting and does not have a * checkpoint file on the mom. * 3) by close_quejob when the server fails to enqueue the job. * 4) by array_delete_wt for prerun jobs that hang around too long and * do not have a checkpoint file on the mom. * 5) by pbsd_init when recovering jobs. * 6) by svr_movejob when done routing jobs around. * 7) by queue_route when trying toroute any "ready" jobs in a specific queue. * 8) by req_shutdown when trying to shutdown. * 9) by req_register when the request oparation is JOB_DEPEND_OP_DELETE. */ int job_abt( struct job **pjobp, /* I (modified/freed) */ const char *text) /* I (optional) */ { char log_buf[LOCAL_LOG_BUF_SIZE]; int old_state; int old_substate; int rc = 0; char job_id[PBS_MAXSVRJOBID+1]; long job_atr_hold; int job_exit_status; job *pjob; if (pjobp == NULL) { rc = PBSE_BAD_PARAMETER; log_err(rc, __func__, "NULL input pointer to pointer to job"); return(rc); } pjob = *pjobp; if (pjob == NULL) { rc = PBSE_BAD_PARAMETER; log_err(rc, __func__, "NULL input pointer to job"); return(rc); } mutex_mgr pjob_mutex = mutex_mgr(pjob->ji_mutex, true); strcpy(job_id, pjob->ji_qs.ji_jobid); /* save old state and update state to Exiting */ old_state = pjob->ji_qs.ji_state; old_substate = pjob->ji_qs.ji_substate; if (LOGLEVEL >= 6) LOG_EVENT(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, pjob->ji_qs.ji_jobid); /* notify user of abort if notification was requested */ if (text != NULL) { /* req_delete sends own mail and acct record */ account_record(PBS_ACCT_ABT, pjob, ""); svr_mailowner(pjob, MAIL_ABORT, MAIL_NORMAL, text); if ((pjob->ji_qs.ji_state == JOB_STATE_QUEUED) && ((pjob->ji_wattr[JOB_ATR_interactive].at_flags & ATR_VFLAG_SET) && pjob->ji_wattr[JOB_ATR_interactive].at_val.at_long)) { /* interactive and not yet running... send a note to qsub */ send_qsub_delmsg(pjob, text); } } if (old_state == JOB_STATE_RUNNING) { svr_setjobstate(pjob, JOB_STATE_RUNNING, JOB_SUBSTATE_ABORT, FALSE); if ((rc = issue_signal(&pjob, "SIGKILL", free_br, NULL, NULL)) != 0) { if (pjob != NULL) { sprintf(log_buf, msg_abt_err, pjob->ji_qs.ji_jobid, old_substate); log_err(-1, __func__, log_buf); if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { /* notify creator that job is exited */ pjob->ji_wattr[JOB_ATR_state].at_val.at_char = 'E'; issue_track(pjob); } if (pjob->ji_wattr[JOB_ATR_depend].at_flags & ATR_VFLAG_SET) { if (depend_on_term(pjob) == PBSE_JOBNOTFOUND) { pjob = NULL; pjob_mutex.set_lock_on_exit(false); } } /* update internal array bookeeping values */ if ((pjob != NULL) && (pjob->ji_arraystructid[0] != '\0') && (pjob->ji_is_array_template == FALSE)) { job_array *pa = get_jobs_array(&pjob); if (pjob != NULL) { job_atr_hold = pjob->ji_wattr[JOB_ATR_hold].at_val.at_long; job_exit_status = pjob->ji_qs.ji_un.ji_exect.ji_exitstat; pjob_mutex.unlock(); update_array_values(pa,old_state,aeTerminate, job_id, job_atr_hold, job_exit_status); unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); pjob = svr_find_job(job_id, TRUE); } } if (pjob != NULL) { svr_job_purge(pjob); pjob = NULL; } *pjobp = NULL; } } } else if ((old_state == JOB_STATE_TRANSIT) && (old_substate == JOB_SUBSTATE_TRNOUT)) { /* I don't know of a case where this could happen */ sprintf(log_buf, msg_abt_err, pjob->ji_qs.ji_jobid, old_substate); log_err(-1, __func__, log_buf); } else { svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_ABORT, FALSE); if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { /* notify creator that job is exited */ issue_track(pjob); } if (pjob->ji_wattr[JOB_ATR_depend].at_flags & ATR_VFLAG_SET) { strcpy(job_id, pjob->ji_qs.ji_jobid); if (depend_on_term(pjob) == PBSE_JOBNOTFOUND) { pjob = NULL; pjob_mutex.set_lock_on_exit(false); } /* pjob_mutex managed mutex already points to pjob->ji_mutex. Nothing to do */ } /* update internal array bookeeping values */ if ((pjob != NULL) && (pjob->ji_arraystructid[0] != '\0') && (pjob->ji_is_array_template == FALSE)) { job_array *pa = get_jobs_array(&pjob); if (pjob != NULL) { job_atr_hold = pjob->ji_wattr[JOB_ATR_hold].at_val.at_long; job_exit_status = pjob->ji_qs.ji_un.ji_exect.ji_exitstat; pjob_mutex.unlock(); update_array_values(pa,old_state,aeTerminate, job_id, job_atr_hold, job_exit_status); unlock_ai_mutex(pa, __func__,(char *) "1", LOGLEVEL); pjob = svr_find_job(job_id, TRUE); } } if (pjob != NULL) { svr_job_purge(pjob); pjob = NULL; } *pjobp = NULL; } if (pjob == NULL) pjob_mutex.set_lock_on_exit(false); return(rc); } /* END job_abt() */ /* * conn_qsub - connect to the qsub that submitted this interactive job * return >= 0 on SUCCESS, < 0 on FAILURE * (this was moved from resmom/mom_inter.c) */ int conn_qsub( char *hostname, /* I */ long port, /* I */ char *EMsg) /* O (optional,minsize=1024) */ { pbs_net_t hostaddr; int s; int flags; int local_errno = 0; if (hostname == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "NULL input hostname pointer"); return(PBSE_BAD_PARAMETER); } if (EMsg != NULL) EMsg[0] = '\0'; if ((hostaddr = get_hostaddr(&local_errno, hostname)) == (pbs_net_t)0) { #if !defined(H_ERRNO_DECLARED) && !defined(_AIX) extern int h_errno; #endif /* FAILURE */ if (EMsg != NULL) { snprintf(EMsg, 1024, "cannot get address for host '%s', h_errno=%d", hostname, h_errno); } return(-1); } s = client_to_svr(hostaddr, (unsigned int)port, 0, EMsg); /* NOTE: client_to_svr() can return 0 for SUCCESS */ /* assume SUCCESS requires s > 0 (USC) was 'if (s >= 0)' */ /* above comment not enabled */ if (s < 0) { /* FAILURE */ return(-1); } /* SUCCESS */ /* this socket should be blocking */ flags = fcntl(s, F_GETFL); flags &= ~O_NONBLOCK; fcntl(s, F_SETFL, flags); return(s); } /* END conn_qsub() */ /* * job_alloc - allocate space for a job structure and initialize working * pbs_attribute to "unset" * * Returns: pointer to structure or null is space not available. * * @see job_init_wattr() - child */ job *job_alloc(void) { job *pj = (job *)calloc(1, sizeof(job)); if (pj == NULL) { log_err(errno, __func__, "no memory"); return(NULL); } pj->ji_mutex = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t)); pthread_mutex_init(pj->ji_mutex,NULL); lock_ji_mutex(pj, __func__, NULL, LOGLEVEL); pj->ji_qs.qs_version = PBS_QS_VERSION; CLEAR_HEAD(pj->ji_rejectdest); pj->ji_is_array_template = FALSE; pj->ji_momhandle = -1; /* mark mom connection invalid */ /* set the working attributes to "unspecified" */ job_init_wattr(pj); return(pj); } /* END job_alloc() */ /* * job_free - free job structure and its various sub-structures */ void job_free( job *pj, int use_recycle) /* I (modified) */ { int i; badplace *bp; char log_buf[LOCAL_LOG_BUF_SIZE]; if (pj == NULL) { return; } if (LOGLEVEL >= 8) { sprintf(log_buf, "freeing job"); log_record(PBSEVENT_DEBUG,PBS_EVENTCLASS_JOB,pj->ji_qs.ji_jobid,log_buf); } if (pj->ji_cray_clone != NULL) { lock_ji_mutex(pj->ji_cray_clone, __func__, NULL, LOGLEVEL); job_free(pj->ji_cray_clone, TRUE); } if (pj->ji_external_clone != NULL) { lock_ji_mutex(pj->ji_external_clone, __func__, NULL, LOGLEVEL); job_free(pj->ji_external_clone, TRUE); } /* remove any calloc working pbs_attribute space */ for (i = 0;i < JOB_ATR_LAST;i++) { job_attr_def[i].at_free(&pj->ji_wattr[i]); } i = -1; /* free any bad destination structs */ bp = (badplace *)GET_NEXT(pj->ji_rejectdest); while (bp != NULL) { delete_link(&bp->bp_link); free(bp); bp = (badplace *)GET_NEXT(pj->ji_rejectdest); } /* move to the recycling structure - deleting right away can cause a race * condition where two threads are pending on the same job. Thread 1 gets * the lock and then deletes the job, but thread 2 gets the job's lock as * the job is freed, causing segfaults. We use the recycler and the * ji_being_recycled flag to solve this problem --dbeer */ remove_job(&alljobs,pj); //Remove this from the alljobs array. if (use_recycle) { insert_into_recycler(pj); sprintf(log_buf, "1: jobid = %s", pj->ji_qs.ji_jobid); unlock_ji_mutex(pj, __func__, log_buf, LOGLEVEL); } else { sprintf(log_buf, "2: jobid = %s", pj->ji_qs.ji_jobid); unlock_ji_mutex(pj, __func__, log_buf, LOGLEVEL); pthread_mutex_destroy(pj->ji_mutex); memset(pj, 254, sizeof(job)); /* TODO: remove magic number */ free(pj); } return; } /* END job_free() */ /*static*/ job *copy_job( job *parent) { job *pnewjob; int i; if (parent == NULL) { log_err(errno, __func__, "null parent to copy"); return(NULL); } if ((pnewjob = job_alloc()) == NULL) { log_err(errno, __func__, "no memory"); return(NULL); } job_init_wattr(pnewjob); /* new job structure is allocated, now we need to copy the old job, but modify based on taskid */ CLEAR_HEAD(pnewjob->ji_rejectdest); pnewjob->ji_modified = 1; /* struct changed, needs to be saved */ /* copy the fixed size quick save information */ memcpy(&pnewjob->ji_qs, &parent->ji_qs, sizeof(struct jobfix)); /* copy job attributes. some of these are going to have to be modified */ for (i = 0; i < JOB_ATR_LAST; i++) { if (parent->ji_wattr[i].at_flags & ATR_VFLAG_SET) { job_attr_def[i].at_set( &(pnewjob->ji_wattr[i]), &(parent->ji_wattr[i]), SET); } } return(pnewjob); } /* END copy_job() */ /* * job_clone - create a clone of a job for use with job arrays */ job *job_clone( job *template_job, /* I */ /* job to clone */ job_array *pa, /* I */ /* array which the job is a part of */ int taskid) /* I */ { char log_buf[LOCAL_LOG_BUF_SIZE]; job *pnewjob; pbs_attribute tempattr; char *oldid; char *hostname; char *bracket; char *tmpstr; char basename[PBS_JOBBASE+1]; char namebuf[MAXPATHLEN + 1]; char buf[256]; int fds; int i; int slen; int release_mutex = FALSE; if (LOGLEVEL >= 7) { sprintf(log_buf, "taskid %d jobid %s ", taskid, template_job->ji_qs.ji_jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); } if (taskid > PBS_MAXJOBARRAY) { log_err(-1, __func__, "taskid out of range"); return(NULL); } if (template_job == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "template_job* is NULL"); return(NULL); } if ((pnewjob = job_alloc()) == NULL) { log_err(errno, __func__, "no memory"); return(NULL); } job_init_wattr(pnewjob); /* new job structure is allocated, now we need to copy the old job, but modify based on taskid */ CLEAR_HEAD(pnewjob->ji_rejectdest); pnewjob->ji_modified = 1; /* struct changed, needs to be saved */ /* copy the fixed size quick save information */ memcpy(&pnewjob->ji_qs, &template_job->ji_qs, sizeof(struct jobfix)); /* find the job id for the cloned job */ if ((oldid = strdup(template_job->ji_qs.ji_jobid)) == NULL) { log_err(ENOMEM, __func__, "no memory"); job_free(pnewjob, FALSE); return(NULL); } bracket = index(oldid,'['); hostname = index(oldid, '.'); if (bracket != NULL) { *bracket = '\0'; } if (hostname != NULL) { hostname++; snprintf(pnewjob->ji_qs.ji_jobid, sizeof(pnewjob->ji_qs.ji_jobid), "%s[%d].%s", oldid, taskid, hostname); } else { snprintf(pnewjob->ji_qs.ji_jobid, sizeof(pnewjob->ji_qs.ji_jobid), "%s[%d]", oldid, taskid); } /* update the job filename * We could optimize the sub-jobs to all use the same file. We would need a * way to track the number of tasks still using the job file so we know when * to delete it. */ /* * make up new job file name, it is based on the new jobid */ if (hostname != NULL) snprintf(basename, sizeof(basename), "%s-%d.%s", oldid, taskid, hostname); else snprintf(basename, sizeof(basename), "%s-%d", oldid, taskid); free(oldid); do { snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_jobs, basename, JOB_FILE_SUFFIX); fds = open(namebuf, O_CREAT | O_EXCL | O_WRONLY, 0600); if (fds < 0) { if (errno == EEXIST) { job_free(pnewjob, FALSE); return((job *)1); /* TODO: what is this magic for */ } else { /* FAILURE */ log_err(errno, __func__, "cannot create job file"); job_free(pnewjob, FALSE); return(NULL); } } } while (fds < 0); close(fds); strcpy(pnewjob->ji_qs.ji_fileprefix, basename); /* copy job attributes. some of these are going to have to be modified */ for (i = 0; i < JOB_ATR_LAST; i++) { if ((template_job->ji_wattr[i].at_flags & ATR_VFLAG_SET) && (i != JOB_ATR_job_array_request)) { if ((i == JOB_ATR_errpath) || (i == JOB_ATR_outpath) || (i == JOB_ATR_jobname)) { /* modify the errpath and outpath */ slen = strlen(template_job->ji_wattr[i].at_val.at_str); tmpstr = (char*)calloc(sizeof(char), (slen + PBS_MAXJOBARRAYLEN + 1)); sprintf(tmpstr, "%s-%d", template_job->ji_wattr[i].at_val.at_str, taskid); clear_attr(&tempattr, &job_attr_def[i]); job_attr_def[i].at_decode( &tempattr, NULL, NULL, tmpstr, ATR_DFLAG_ACCESS); job_attr_def[i].at_set( &pnewjob->ji_wattr[i], &tempattr, SET); job_attr_def[i].at_free(&tempattr); free(tmpstr); } else { job_attr_def[i].at_set( &(pnewjob->ji_wattr[i]), &(template_job->ji_wattr[i]), SET); } } } /* put a system hold on the job. we'll take the hold off once the * entire array is cloned. We don't want any of the jobs to run and * complete before the whole thing is cloned. This is in case we run into * a problem during setting up the array and want to abort before any of * the jobs run */ pnewjob->ji_wattr[JOB_ATR_hold].at_val.at_long |= HOLD_a; pnewjob->ji_wattr[JOB_ATR_hold].at_flags |= ATR_VFLAG_SET; /* set JOB_ATR_job_array_id */ pnewjob->ji_wattr[JOB_ATR_job_array_id].at_val.at_long = taskid; pnewjob->ji_wattr[JOB_ATR_job_array_id].at_flags |= ATR_VFLAG_SET; /* set PBS_ARRAYID enironment variable */ clear_attr(&tempattr, &job_attr_def[JOB_ATR_variables]); sprintf(buf, "PBS_ARRAYID=%d", taskid); job_attr_def[JOB_ATR_variables].at_decode(&tempattr, NULL, NULL, buf, 0); job_attr_def[JOB_ATR_variables].at_set( &pnewjob->ji_wattr[JOB_ATR_variables], &tempattr, INCR); job_attr_def[JOB_ATR_variables].at_free(&tempattr); /* we need to put the cloned job into the array */ if (pa == NULL) { release_mutex = TRUE; pa = get_array(template_job->ji_qs.ji_jobid); if (pa == NULL) { job_free(pnewjob, FALSE); return(NULL); } } pa->job_ids[taskid] = strdup(pnewjob->ji_qs.ji_jobid); strcpy(pnewjob->ji_arraystructid, pa->ai_qs.parent_id); if (release_mutex == TRUE) { unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); } return(pnewjob); } /* END job_clone() */ #ifndef CLONE_BATCH_SIZE #define CLONE_BATCH_SIZE 256 #endif /* CLONE_BATCH_SIZE */ /* * job_clone_wt - worktask to clone jobs for job array */ void *job_clone_wt( void *cloned_id) { job *template_job; job *pjob; job *pjobclone; char *jobid; int i; int prev_index = -1; int actual_job_count = 0; int newstate; int newsub; int rc; char namebuf[MAXPATHLEN]; char arrayid[PBS_MAXSVRJOBID + 1]; job_array *pa; struct pbs_queue *pque; char log_buf[LOCAL_LOG_BUF_SIZE]; array_request_node *rn; int start; int end; jobid = (char *)cloned_id; if (jobid == NULL) { log_err(ENOMEM, __func__, "Can't malloc"); return(NULL); } /* don't call get_jobs_array because the template job isn't part of the array */ if (((template_job = svr_find_job(jobid, TRUE)) == NULL) || ((pa = get_jobs_array(&template_job)) == NULL)) { free(jobid); if (template_job != NULL) unlock_ji_mutex(template_job, __func__, "1", LOGLEVEL); return(NULL); } mutex_mgr array_mgr(pa->ai_mutex, true); mutex_mgr template_job_mgr(template_job->ji_mutex,true); strcpy(arrayid, pa->ai_qs.parent_id); free(jobid); snprintf(namebuf, sizeof(namebuf), "%s%s.AR", path_jobs, template_job->ji_qs.ji_fileprefix); template_job_mgr.unlock(); while ((rn = (array_request_node *)GET_NEXT(pa->request_tokens)) != NULL) { start = rn->start; end = rn->end; for (i = start; i <= end; i++) { if (pa->job_ids[i] != NULL) { /* This job already exists. This can happen when trying to recover a job * array that wasn't fully cloned. */ rn->start++; continue; } template_job_mgr.lock(); pjobclone = job_clone(template_job, pa, i); template_job_mgr.unlock(); if (pjobclone == NULL) { log_err(-1, __func__, "unable to clone job in job_clone_wt"); continue; } else if (pjobclone == (job *)1) { /* this happens if we attempted to clone an existing job */ rn->start++; continue; } mutex_mgr clone_mgr(pjobclone->ji_mutex, true); svr_evaljobstate(*pjobclone, newstate, newsub, 1); /* do this so that svr_setjobstate() doesn't alter sv_jobstates, * these are set later in svr_enquejob() */ pjobclone->ji_qs.ji_state = newstate; pjobclone->ji_qs.ji_substate = newsub; svr_setjobstate(pjobclone, newstate, newsub, FALSE); pjobclone->ji_wattr[JOB_ATR_qrank].at_val.at_long = ++queue_rank; pjobclone->ji_wattr[JOB_ATR_qrank].at_flags |= ATR_VFLAG_SET; array_mgr.unlock(); if ((rc = svr_enquejob(pjobclone, FALSE, prev_index, false))) { /* XXX need more robust error handling */ clone_mgr.set_lock_on_exit(false); if (rc != PBSE_JOB_RECYCLED) { svr_job_purge(pjobclone); } if ((pa = get_array(arrayid)) == NULL) return(NULL); array_mgr.mark_as_locked(); continue; } if ((pa = get_jobs_array(&pjobclone)) == NULL) { if (pjobclone == NULL) clone_mgr.set_lock_on_exit(false); return(NULL); } array_mgr.mark_as_locked(); if (job_save(pjobclone, SAVEJOB_FULL, 0) != 0) { /* XXX need more robust error handling */ array_mgr.unlock(); svr_job_purge(pjobclone); if ((pa = get_array(arrayid)) == NULL) return(NULL); array_mgr.mark_as_locked(); continue; } prev_index = get_jobs_index(&alljobs, pjobclone); pa->ai_qs.num_cloned++; rn->start++; /* index below 0 means the job no longer exists */ if (prev_index < 0) clone_mgr.set_lock_on_exit(false); } /* END for (i) */ if (rn->start > rn->end) { delete_link(&rn->request_tokens_link); free(rn); } } /* END while (loop) */ array_save(pa); /* scan over all the jobs in the array and unset the hold */ for (i = 0; i < pa->ai_qs.array_size; i++) { if (pa->job_ids[i] == NULL) continue; actual_job_count++; pjob = svr_find_job(pa->job_ids[i], TRUE); if (pjob == NULL) { free(pa->job_ids[i]); pa->job_ids[i] = NULL; } else { mutex_mgr job_mutex(pjob->ji_mutex,true); long moab_compatible = FALSE;; get_svr_attr_l(SRV_ATR_MoabArrayCompatible, &moab_compatible); pjob->ji_wattr[JOB_ATR_hold].at_val.at_long &= ~HOLD_a; if (moab_compatible != FALSE) { /* if configured and necessary, apply a slot limit hold to all * jobs above the slot limit threshold */ if ((pa->ai_qs.slot_limit != NO_SLOT_LIMIT) && (actual_job_count > pa->ai_qs.slot_limit)) { pjob->ji_wattr[JOB_ATR_hold].at_val.at_long |= HOLD_l; } } if (pjob->ji_wattr[JOB_ATR_hold].at_val.at_long == 0) { pjob->ji_wattr[JOB_ATR_hold].at_flags &= ~ATR_VFLAG_SET; } else { pjob->ji_wattr[JOB_ATR_hold].at_flags |= ATR_VFLAG_SET; } pjob->ji_modified = TRUE; svr_evaljobstate(*pjob, newstate, newsub, 1); svr_setjobstate(pjob, newstate, newsub, FALSE); /* * if the job went into a Route (push) queue that has been started, * try once to route it to give immediate feedback as a courtsey * to the user. */ if ((pque = get_jobs_queue(&pjob)) != NULL) { mutex_mgr pque_mutex = mutex_mgr(pque->qu_mutex,true); if ((pque->qu_qs.qu_type == QTYPE_RoutePush) && (pque->qu_attr[QA_ATR_Started].at_val.at_long != 0)) { /* job_route expects the queue to be unlocked */ pque_mutex.unlock(); if ((rc = job_route(pjob))) { if (LOGLEVEL >= 6) { snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot route job %s", pjob->ji_qs.ji_jobid); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } svr_job_purge(pjob); pque_mutex.unlock(); continue; } } } pjob->ji_commit_done = 1; } } unlock_ai_mutex(pa, __func__, "3", LOGLEVEL); return(NULL); } /* END job_clone_wt */ /* * job_init_wattr - initialize job working pbs_attribute array * set the types and the "unspecified value" flag */ static void job_init_wattr( job *pj) { int i; for (i = 0;i < JOB_ATR_LAST;i++) { clear_attr(&pj->ji_wattr[i], &job_attr_def[i]); } return; } /* END job_init_wattr() */ /* * cpy_checkpoint - set up a Copy Files request to transfer checkpoint files */ struct batch_request *cpy_checkpoint( struct batch_request *preq, job *pjob, enum job_atr ati, /* JOB_ATR_checkpoint_name or JOB_ATR_restart_name */ int direction) { char momfile[MAXPATHLEN+1]; char serverfile[MAXPATHLEN+1]; char *from = NULL; char *to = NULL; char log_buf[LOCAL_LOG_BUF_SIZE]; pbs_attribute *pattr; mode_t saveumask = 0; if (pjob == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "null input job pointer"); return(NULL); } pattr = &pjob->ji_wattr[ati]; if ((pattr->at_flags & ATR_VFLAG_SET) == 0) { /* no file to transfer */ return(preq); } /* build up the name used for SERVER file */ snprintf(serverfile, sizeof(serverfile), "%s%s%s", path_checkpoint, pjob->ji_qs.ji_fileprefix, JOB_CHECKPOINT_SUFFIX); /* * We need to make sure the jobs checkpoint directory exists. If it does * not we need to add it since this is the first time we are copying a * checkpoint file for this job */ saveumask = umask(0000); if ((mkdir(serverfile, 01777) == -1) && (errno != EEXIST)) { log_err(errno,"cpy_checkpoint", (char *)"Failed to create jobs checkpoint directory"); } umask(saveumask); if (sizeof(serverfile) - strlen(serverfile) > strlen(pjob->ji_wattr[JOB_ATR_checkpoint_name].at_val.at_str) + 1) { strcat(serverfile, "/"); strcat(serverfile, pjob->ji_wattr[JOB_ATR_checkpoint_name].at_val.at_str); } /* build up the name used for MOM file */ if (pjob->ji_wattr[JOB_ATR_checkpoint_dir].at_flags & ATR_VFLAG_SET) { snprintf(momfile, sizeof(momfile), "%s/%s%s/%s", pjob->ji_wattr[JOB_ATR_checkpoint_dir].at_val.at_str, pjob->ji_qs.ji_fileprefix, JOB_CHECKPOINT_SUFFIX, pattr->at_val.at_str); } else { /* if not specified, moms path may not be the same */ snprintf(momfile, sizeof(momfile), "%s/%s%s/%s", MOM_DEFAULT_CHECKPOINT_DIR, pjob->ji_qs.ji_fileprefix, JOB_CHECKPOINT_SUFFIX, pjob->ji_wattr[JOB_ATR_checkpoint_name].at_val.at_str); if (LOGLEVEL >= 7) { sprintf(log_buf, "Job has NO checkpoint dir specified, using file %s", momfile); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); } } if (direction == CKPT_DIR_OUT) { if (LOGLEVEL >= 7) { sprintf(log_buf,"Requesting checkpoint copy from MOM (%s) to SERVER (%s)", momfile, serverfile); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); } } else { if (LOGLEVEL >= 7) { sprintf(log_buf,"Requesting checkpoint copy from SERVER (%s) to MOM (%s)", serverfile, momfile); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); } } to = (char *)calloc(1, strlen(serverfile) + strlen(server_name) + 2); if (to == NULL) { /* FAILURE */ /* cannot allocate memory for request this one */ log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, (char *)"ERROR: cannot allocate 'to' memory in cpy_checkpoint"); return(preq); } strcpy(to, server_name); strcat(to, (char *)":"); strcat(to, serverfile); from = (char *)calloc(1, strlen(momfile) + 1); if (from == NULL) { /* FAILURE */ log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, (char *)"ERROR: cannot allocate 'from' memory for from in cpy_checkpoint"); free(to); return(preq); } strcpy(from, momfile); if (LOGLEVEL >= 7) { sprintf(log_buf,"Checkpoint copy from (%s) to (%s)", from, to); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); } preq = setup_cpyfiles(preq, pjob, from, to, direction, JOBCKPFILE); return(preq); } /* END cpy_checkpoint() */ /* * remove_checkpoint() - request that mom delete checkpoint file for a job * used when the job is to be purged after file has been transferred */ void remove_checkpoint( job **pjob_ptr) /* I */ { struct batch_request *preq = NULL; char log_buf[LOCAL_LOG_BUF_SIZE]; job *pjob = NULL; if (pjob_ptr == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "null input pointer to job pointer"); return; } pjob = *pjob_ptr; if (pjob == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "null input job pointer"); return; } preq = cpy_checkpoint(preq, pjob, JOB_ATR_checkpoint_name, CKPT_DIR_IN); if (preq != NULL) { /* have files to delete */ sprintf(log_buf,"Removing checkpoint file (%s/%s)", (*pjob_ptr)->ji_wattr[JOB_ATR_checkpoint_dir].at_val.at_str, (*pjob_ptr)->ji_wattr[JOB_ATR_checkpoint_name].at_val.at_str); log_ext(-1, __func__, log_buf, LOG_DEBUG); /* change the request type from copy to delete */ preq->rq_type = PBS_BATCH_DelFiles; preq->rq_extra = NULL; /* The preq is freed in relay_to_mom (failure) * or in issue_Drequest (success) */ if (relay_to_mom(&pjob, preq, NULL) == PBSE_NONE) { if (pjob != NULL) pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_CHECKPOINT_COPIED; } else { /* log that we were unable to remove the files */ log_event( PBSEVENT_JOB, PBS_EVENTCLASS_FILE, pjob->ji_qs.ji_jobid, (char *)"unable to remove checkpoint file for job"); } free_br(preq); } return; } /* END remove_checkpoint() */ /* * cleanup_restart_file() - request that mom cleanup checkpoint restart file for * a job. used when the job has completed or put on hold or deleted */ void cleanup_restart_file( job *pjob) /* I */ { if (pjob == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "null input job pointer"); return; } /* checkpoint restart file cleanup was successful */ /* pjob->ji_qs.ji_svrflags |= JOB_SVFLG_CHECKPOINT_COPIED; */ /* clear restart_name pbs_attribute since purging job will clean it up */ pjob->ji_wattr[JOB_ATR_restart_name].at_flags &= ~ATR_VFLAG_SET; pjob->ji_modified = 1; job_save(pjob, SAVEJOB_FULL, 0); return; } /* END cleanup_restart_file() */ int record_jobinfo( job *pjob) { pbs_attribute *pattr; int i; int rc; dynamic_string *buffer; char job_script_buf[(MAXPATHLEN << 4) + 1]; char namebuf[MAXPATHLEN + 1]; int fd; size_t bytes_read = 0; extern pthread_mutex_t job_log_mutex; long record_job_script = FALSE; if (pjob == NULL) { rc = PBSE_BAD_PARAMETER; log_err(rc, __func__, "null input job pointer"); return(rc); } pthread_mutex_lock(&job_log_mutex); if ((rc = job_log_open(job_log_file, path_jobinfo_log)) < 0) { pthread_mutex_unlock(&job_log_mutex); log_err(rc, __func__, "Could not open job log "); return(rc); } pthread_mutex_unlock(&job_log_mutex); if ((buffer = get_dynamic_string(MAXLINE << 3, NULL)) == NULL) { log_err(ENOMEM, __func__, "Can't allocate memory"); return(-1); } append_dynamic_string(buffer, "\n"); append_dynamic_string(buffer, "\t"); append_dynamic_string(buffer, pjob->ji_qs.ji_jobid); append_dynamic_string(buffer, "\n"); #if 0 if ((rc = log_job_record(buffer->str)) != PBSE_NONE) { log_err(rc, __func__, "log_job_record failed"); free_dynamic_string(buffer); return(rc); } #endif for (i = 0; i < JOB_ATR_LAST; i++) { #if 0 clear_dynamic_string(buffer); #endif pattr = &(pjob->ji_wattr[i]); if (pattr->at_flags & ATR_VFLAG_SET) { if (!strcmp(job_attr_def[i].at_name, "depend")) { /* we don't want this pbs_attribute in our log - The dependecies will show on the submit_args pbs_attribute */ continue; } append_dynamic_string(buffer, "\t<"); append_dynamic_string(buffer, job_attr_def[i].at_name); append_dynamic_string(buffer, ">"); if (pattr->at_type == ATR_TYPE_RESC) append_dynamic_string(buffer, "\n"); rc = attr_to_str(buffer, job_attr_def+i, pjob->ji_wattr[i], 1); if (pattr->at_type == ATR_TYPE_RESC) append_dynamic_string(buffer, "\t"); append_dynamic_string(buffer, "\n"); #if 0 if ((rc = log_job_record(buffer->str)) != PBSE_NONE) { log_err(rc, __func__, "log_job_record failed recording attributes"); free_dynamic_string(buffer); return(rc); } #endif } } get_svr_attr_l(SRV_ATR_RecordJobScript, &record_job_script); if (record_job_script) { /* This is for Baylor. We will make it a server parameter eventually * Write the contents of the script to our log file*/ append_dynamic_string(buffer, "\t"); snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_jobs, pjob->ji_qs.ji_fileprefix, JOB_SCRIPT_SUFFIX); if ((fd = open(namebuf, O_RDONLY)) >= 0) { memset(job_script_buf, 0, sizeof(job_script_buf)); while ((bytes_read = read_ac_socket(fd, job_script_buf, sizeof(job_script_buf) - 1)) > 0) { rc = append_dynamic_string(buffer, job_script_buf); memset(job_script_buf, 0, sizeof(job_script_buf)); } close(fd); } else { append_dynamic_string(buffer, "unable to open script file\n"); } append_dynamic_string(buffer, "\t\n"); #if 0 if ((rc = log_job_record(buffer->str)) != PBSE_NONE) { free_dynamic_string(buffer); log_err(rc, __func__, "log_job_record failed"); return(rc); } #endif } #if 0 clear_dynamic_string(buffer); #endif if ((rc = append_dynamic_string(buffer, "\n")) != PBSE_NONE) { log_err(rc, __func__, ""); free_dynamic_string(buffer); return(rc); } rc = log_job_record(buffer->str); free_dynamic_string(buffer); return(rc); } /* END record_jobinfo() */ /* * svr_job_purge - purge job from system * * pjob->ji_mutex comes into this function locked but * will always exit unlocked. * The job is dequeued; the job control file, script file and any spooled * output files are unlinked, and the job structure is freed. */ int svr_job_purge( job *pjob, /* I (modified) */ int leaveSpoolFiles) /* I */ { int rc = PBSE_NONE; char log_buf[LOCAL_LOG_BUF_SIZE]; char namebuf[MAXPATHLEN + 1]; extern char *msg_err_purgejob; time_t time_now = time(NULL); long record_job_info = FALSE; char job_id[PBS_MAXSVRJOBID+1]; char job_fileprefix[PBS_JOBBASE+1]; int job_substate; int job_is_array_template; unsigned int job_has_checkpoint_file; int job_has_arraystruct; int do_delete_array = FALSE; job_array *pa = NULL; char array_id[PBS_MAXSVRJOBID+1]; if (pjob == NULL) { rc = PBSE_BAD_PARAMETER; log_err(rc, __func__, "null input job pointer fail"); return(rc); } mutex_mgr pjob_mutex = mutex_mgr(pjob->ji_mutex, true); strcpy(job_id, pjob->ji_qs.ji_jobid); strcpy(job_fileprefix, pjob->ji_qs.ji_fileprefix); job_substate = pjob->ji_qs.ji_substate; job_is_array_template = pjob->ji_is_array_template; job_has_arraystruct = ((pjob->ji_arraystructid[0] == '\0') ? FALSE:TRUE); job_has_checkpoint_file = pjob->ji_wattr[JOB_ATR_checkpoint_name].at_flags; if (LOGLEVEL >= 10) log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, pjob->ji_qs.ji_jobid); /* check to see if we are keeping a log of all jobs completed */ get_svr_attr_l(SRV_ATR_RecordJobInfo, &record_job_info); if (record_job_info) { record_jobinfo(pjob); /* Start a task to monitor job log roll over if it is not already started */ if (check_job_log_started == 0) { set_task(WORK_Timed, time_now + 10, check_job_log, NULL, FALSE); check_job_log_started = 1; } } if ((job_has_arraystruct == FALSE) || (job_is_array_template == TRUE)) if (remove_job(&array_summary,pjob) == PBSE_JOB_RECYCLED) { /* PBSE_JOB_RECYCLED means the job is gone */ pjob_mutex.set_lock_on_exit(false); return(PBSE_NONE); } /* if part of job array then remove from array's job list */ if ((job_has_arraystruct == TRUE) && (job_is_array_template == FALSE)) { /* pa->ai_mutex will come out locked after the call to get_jobs_array */ pa = get_jobs_array(&pjob); if (pjob != NULL) { if (pa != NULL) { if (pa->job_ids != NULL) { free(pa->job_ids[pjob->ji_wattr[JOB_ATR_job_array_id].at_val.at_long]); pa->job_ids[pjob->ji_wattr[JOB_ATR_job_array_id].at_val.at_long] = NULL; } /* if there are no more jobs in the array, * then we can clean that up too */ pa->ai_qs.num_purged++; if (pa->ai_qs.num_purged == pa->ai_qs.num_jobs) { /* array_delete will unlock pa->ai_mutex */ strcpy(array_id, pjob->ji_arraystructid); do_delete_array = TRUE; unlock_ai_mutex(pa, __func__, "1a", LOGLEVEL); } else { array_save(pa); unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); } } } else { pjob_mutex.set_lock_on_exit(false); return(PBSE_JOBNOTFOUND); } } if ((job_substate != JOB_SUBSTATE_TRANSIN) && (job_substate != JOB_SUBSTATE_TRANSICM)) { int need_deque = !pjob->ji_cold_restart; /* jobs that are being deleted after a cold restart * haven't been queued */ if (need_deque == TRUE) { /* set the state to complete so that svr_dequejob() will function properly */ pjob->ji_qs.ji_state = JOB_STATE_COMPLETE; rc = svr_dequejob(pjob, FALSE); } if (rc != PBSE_JOBNOTFOUND) { /* we came out of svr_dequejob with pjob locked. Our pointer is still good */ /* job_free will unlock the mutex for us */ if (pjob->ji_being_recycled == FALSE) { job_free(pjob, TRUE); pjob_mutex.set_lock_on_exit(false); } else pjob_mutex.unlock(); } } else { job_free(pjob, TRUE); pjob_mutex.set_lock_on_exit(false); } /* delete the script file */ if ((job_has_arraystruct == FALSE) || (job_is_array_template == TRUE)) { /* delete script file */ snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_jobs, job_fileprefix, JOB_SCRIPT_SUFFIX); if (unlink(namebuf) < 0) { if (errno != ENOENT) log_err(errno, __func__, msg_err_purgejob); } else if (LOGLEVEL >= 6) { sprintf(log_buf, "removed job script"); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, job_id, log_buf); } } if (!leaveSpoolFiles) { /* delete any spooled stdout */ snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_spool, job_fileprefix, JOB_STDOUT_SUFFIX); if (unlink(namebuf) < 0) { if (errno != ENOENT) log_err(errno, __func__, msg_err_purgejob); } else if (LOGLEVEL >= 6) { sprintf(log_buf, "removed job stdout"); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, job_id, log_buf); } /* delete any spooled stderr ($TRQ_HOME/spool) */ snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_spool, job_fileprefix, JOB_STDERR_SUFFIX); if (unlink(namebuf) < 0) { if (errno != ENOENT) log_err(errno, __func__, msg_err_purgejob); } else if (LOGLEVEL >= 6) { sprintf(log_buf, "removed job stderr"); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, job_id, log_buf); } } /* delete checkpoint file directory if there is one */ if (job_has_checkpoint_file & ATR_VFLAG_SET) { snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_checkpoint, job_fileprefix, JOB_CHECKPOINT_SUFFIX); if (remtree(namebuf) < 0) { if (errno != ENOENT) log_err(errno, __func__, msg_err_purgejob); } else if (LOGLEVEL >= 6) { sprintf(log_buf, "removed job checkpoint"); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, job_id, log_buf); } } if (job_is_array_template == TRUE) { snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_jobs, job_fileprefix, JOB_FILE_TMP_SUFFIX); } else { snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_jobs, job_fileprefix, JOB_FILE_SUFFIX); } if (unlink(namebuf) < 0) { if (errno != ENOENT) log_err(errno, __func__, msg_err_purgejob); } else if (LOGLEVEL >= 6) { sprintf(log_buf, "removed job file"); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, job_id, log_buf); } if (do_delete_array == TRUE) { pa = get_array(array_id); if (pa != NULL) array_delete(pa); } return(PBSE_NONE); } /* END svr_job_purge() */ /* * Always access the array in this way in order to avoid deadlock * * @return this job's array struct with it's mutex locked */ job_array *get_jobs_array( job **pjob_ptr) { char log_buf[LOCAL_LOG_BUF_SIZE]; char jobid[PBS_MAXSVRJOBID + 1]; char arrayid[PBS_MAXSVRJOBID + 1]; job *pjob = NULL; job_array *pa = NULL; if (pjob_ptr == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "NULL input pointer to pointer"); return(NULL); } pjob = *pjob_ptr; if (pjob == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "NULL input job pointer"); return(NULL); } mutex_mgr job_mutex(pjob->ji_mutex,true); job_mutex.set_lock_on_exit(false); strcpy(jobid, pjob->ji_qs.ji_jobid); if (pjob->ji_arraystructid[0] != '\0') { strcpy(arrayid, pjob->ji_arraystructid); job_mutex.unlock(); if (LOGLEVEL >= 7) { sprintf(log_buf, "Locking ai_mutex, %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); } pa = get_array(arrayid); pjob = svr_find_job(jobid, TRUE); if (pjob == NULL) { if (pa != NULL) { unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); pa = NULL; } *pjob_ptr = NULL; } } return(pa); } /* END get_jobs_array() */ struct pbs_queue *get_jobs_queue( job **pjob_ptr) { job *pjob = NULL; pbs_queue *pque; if (LOGLEVEL >= 10) log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, "enter"); if (pjob_ptr == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "NULL input pointer to pointer"); return(NULL); } pjob = *pjob_ptr; if (pjob == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "NULL input job pointer"); return(NULL); } pque = lock_queue_with_job_held(pjob->ji_qhdr, pjob_ptr); if (LOGLEVEL >= 10) log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, "exit"); return(pque); } /* END get_jobs_queue() */ /*static*/ int hostname_in_externals( char *hostname, char *externals) { char *ptr = strstr(externals, hostname); int found = FALSE; char *end_word; if (ptr != NULL) { end_word = ptr + strlen(hostname); if ((ptr > externals) && (*(ptr - 1) != '+')) found = FALSE; else if ((*end_word != '\0') && (*end_word != '+')) found = FALSE; else found = TRUE; } return(found); } /* END hostname_in_externals() */ /*static*/ int fix_external_exec_hosts( job *pjob) /* the external sub-job */ { dynamic_string *external_execs; char *exec_host = pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str; char *externals = pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str; char *exec_ptr; char *plus; char *slash; if ((exec_host == NULL) || (externals == NULL)) return(PBSE_BAD_PARAMETER); /* wipe out the current destination */ pjob->ji_qs.ji_destin[0] = '\0'; external_execs = get_dynamic_string(-1, NULL); exec_ptr = exec_host; while (exec_ptr != NULL) { /* remove the extra parts after the hostname and get the point we'll advance to. * exec_host strings are in the format hostname/index[hostname/index[+...]] */ if ((plus = strchr(exec_ptr, '+')) != NULL) { *plus = '\0'; plus++; } if ((slash = strchr(exec_ptr, '/')) != NULL) *slash = '\0'; /* if we find a match, copy in that exec host entry */ if (hostname_in_externals(exec_ptr, externals) == TRUE) { /* capture the first external as my mother superior */ if (pjob->ji_qs.ji_destin[0] == '\0') snprintf(pjob->ji_qs.ji_destin, sizeof(pjob->ji_qs.ji_destin), "%s", exec_ptr); if (slash != NULL) *slash = '/'; /* delimit with + */ if (external_execs->used != 0) append_dynamic_string(external_execs, "+"); append_dynamic_string(external_execs, exec_ptr); } exec_ptr = plus; } /* also remove the login attributes */ if (pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str != NULL) { free(pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str); pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str = NULL; pjob->ji_wattr[JOB_ATR_login_node_id].at_flags &= ~ATR_VFLAG_SET; } if (pjob->ji_wattr[JOB_ATR_login_prop].at_val.at_str != NULL) { free(pjob->ji_wattr[JOB_ATR_login_prop].at_val.at_str); pjob->ji_wattr[JOB_ATR_login_prop].at_val.at_str = NULL; pjob->ji_wattr[JOB_ATR_login_prop].at_flags &= ~ATR_VFLAG_SET; } free(exec_host); pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str = strdup(external_execs->str); free_dynamic_string(external_execs); return(PBSE_NONE); } /* END fix_external_exec_hosts() */ /*static*/ int fix_cray_exec_hosts( job *pjob) { char *external = pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str; char *exec = pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str; char *new_exec = (char *)calloc(1, strlen(exec) + 1); char *exec_ptr = exec; char *plus; char *slash; while (exec_ptr != NULL) { if ((plus = strchr(exec_ptr, '+')) != NULL) { *plus = '\0'; plus++; } if ((slash = strchr(exec_ptr, '/')) != NULL) *slash = '\0'; /* if the hostname isn't in the externals, copy it in */ if (hostname_in_externals(exec_ptr, external) == FALSE) { if (slash != NULL) *slash = '/'; if (*new_exec != '\0') strcat(new_exec, "+"); strcat(new_exec, exec_ptr); } exec_ptr = plus; } free(exec); pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str = new_exec; return(PBSE_NONE); } /* END fix_cray_exec_hosts() */ /*static*/ int change_external_job_name( job *pjob) { char tmp_jobid[PBS_MAXSVRJOBID + 1]; char *dot = strchr(pjob->ji_qs.ji_jobid, '.'); if (dot != NULL) *dot = '\0'; snprintf(tmp_jobid, sizeof(tmp_jobid), "%s-0.%s", pjob->ji_qs.ji_jobid, dot + 1); strcpy(pjob->ji_qs.ji_jobid, tmp_jobid); return(PBSE_NONE); } /* END change_external_job_name() */ int split_job( job *pjob) { job *external; job *cray; if (pjob->ji_external_clone == NULL) { external = copy_job(pjob); fix_external_exec_hosts(external); change_external_job_name(external); external->ji_parent_job = pjob; pjob->ji_external_clone = external; unlock_ji_mutex(external, __func__, NULL, LOGLEVEL); } if (pjob->ji_cray_clone == NULL) { cray = copy_job(pjob); fix_cray_exec_hosts(cray); cray->ji_parent_job = pjob; pjob->ji_cray_clone = cray; unlock_ji_mutex(cray, __func__, NULL, LOGLEVEL); } return(PBSE_NONE); } /* END split_job() */ /* END job_func.c */