/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ /* * req_jobobit.c - functions dealing with a Job Obituary Request (Notice) * and the associated post execution job clean up. */ #include /* the master config generated by configure */ #include #include #include #include #include #include #include #include #include "libpbs.h" #include "server_limits.h" #include "list_link.h" #include "attribute.h" #include "resource.h" #include "server.h" #include "pbs_job.h" #include "credential.h" #include "batch_request.h" #include "work_task.h" #include "pbs_error.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "acct.h" #include "net_connect.h" #include "svrfunc.h" #include "sched_cmds.h" #include "queue.h" #include "array.h" #include "issue_request.h" /* issue_request */ #include "utils.h" #include "svr_func.h" /* get_svr_attr_* */ #include "req_jobobit.h" /* req_jobobit */ #include "svr_connect.h" /* svr_connect */ #include "job_func.h" /* svr_job_purge */ #include "ji_mutex.h" #include "mutex_mgr.hpp" #include "../lib/Libutils/u_lock_ctl.h" #include "exiting_jobs.h" #include "track_alps_reservations.h" #define RESC_USED_BUF 2048 #define JOBMUSTREPORTDEFAULTKEEP 30 /* External Global Data Items */ extern struct all_jobs alljobs; extern unsigned int pbs_mom_port; extern unsigned int pbs_rm_port; extern char *path_spool; extern int server_init_type; extern pbs_net_t pbs_server_addr; extern char *msg_init_abt; extern char *msg_job_end; extern char *msg_job_end_sig; extern char *msg_job_end_stat; extern char *msg_momnoexec1; extern char *msg_momnoexec2; extern const char *msg_momjobovermemlimit; extern const char *msg_momjoboverwalltimelimit; extern const char *msg_momjobovercputlimit; extern char *msg_obitnojob; extern char *msg_obitnocpy; extern char *msg_obitnodel; extern char server_host[]; extern int svr_do_schedule; extern pthread_mutex_t *svr_do_schedule_mutex; extern pthread_mutex_t *listener_command_mutex; extern int listener_command; extern int LOGLEVEL; extern const char *PJobState[]; /* External Functions called */ int timeval_subtract(struct timeval *,struct timeval *,struct timeval *); extern void set_resc_assigned(job *, enum batch_op); extern void cleanup_restart_file(job *); void on_job_exit(batch_request *preq, char *jobid); int kill_job_on_mom(char *jobid, struct pbsnode *pnode); void handle_complete_second_time(struct work_task *ptask); void *on_job_exit_task(struct work_task *vp); /* * setup_from - setup the "from" name for a standard job file: * output, error, or checkpoint */ char *setup_from( job *pjob, /* I */ const char *suffix) /* I */ { char *from; from = (char *)calloc(1, strlen(pjob->ji_qs.ji_fileprefix) + strlen(suffix) + 1); if (from != NULL) { strcpy(from, pjob->ji_qs.ji_fileprefix); strcat(from, suffix); } return(from); } /* END setup_from() */ /* * setup_cpyfiles - if need be, allocate and initalize a Copy Files * batch request, then append the file pairs */ struct batch_request *setup_cpyfiles( struct batch_request *preq, job *pjob, char *from, /* local (to mom) name */ char *to, /* remote (destination) name */ int direction, /* copy direction */ int tflag) /* 1 if stdout or stderr , 2 if stage out or in*/ { struct rq_cpyfile *pcf; struct rqfpair *pair; if (preq == NULL) { /* allocate and initialize the batch request struct */ preq = alloc_br(PBS_BATCH_CopyFiles); if (preq == NULL) { /* FAILURE */ free(from); if (to != NULL) free(to); log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ALERT: cannot allocate memory for preq in setup_cpyfiles"); return(preq); } pcf = &preq->rq_ind.rq_cpyfile; CLEAR_HEAD(pcf->rq_pair); /* copy jobid, owner, exec-user, group names, up to the @host part */ strcpy(pcf->rq_jobid, pjob->ji_qs.ji_jobid); get_jobowner( pjob->ji_wattr[JOB_ATR_job_owner].at_val.at_str, pcf->rq_owner); get_jobowner( pjob->ji_wattr[JOB_ATR_euser].at_val.at_str, pcf->rq_user); if (((pjob->ji_wattr[JOB_ATR_egroup].at_flags & ATR_VFLAG_DEFLT) == 0) && (pjob->ji_wattr[JOB_ATR_egroup].at_val.at_str != NULL)) { snprintf(pcf->rq_group, sizeof(pcf->rq_group), "%s", pjob->ji_wattr[JOB_ATR_egroup].at_val.at_str); } else { pcf->rq_group[0] = '\0'; /* default: use login group */ } pcf->rq_dir = direction; } else { /* use the existing request structure */ pcf = &preq->rq_ind.rq_cpyfile; } pair = (struct rqfpair *)calloc(1, sizeof(struct rqfpair)); if (pair == NULL) { /* FAILURE */ free_br(preq); log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ALERT: cannot allocate memory for rqfpair in setup_cpyfiles"); return(NULL); } CLEAR_LINK(pair->fp_link); pair->fp_local = from; pair->fp_rmt = to; pair->fp_flag = tflag; append_link(&pcf->rq_pair, &pair->fp_link, pair); return(preq); } /* END setup_cpyfiles() */ int is_joined( job *pjob, /* I */ enum job_atr ati) /* I */ { char key; pbs_attribute *pattr; char *pd; if (ati == JOB_ATR_outpath) key = 'o'; else if (ati == JOB_ATR_errpath) key = 'e'; else { return(0); } pattr = &pjob->ji_wattr[JOB_ATR_join]; if (pattr->at_flags & ATR_VFLAG_SET) { pd = pattr->at_val.at_str; if ((pd != NULL) && (*pd != '\0') && (*pd != 'n')) { /* if not the first letter, and in list - is joined */ if ((*pd != key) && (strchr(pd + 1, (int)key))) { return(1); /* being joined */ } } } return(0); /* either the first or not in list */ } struct batch_request *return_stdfile( struct batch_request *preq, job *pjob, enum job_atr ati) { if ((pjob->ji_wattr[JOB_ATR_interactive].at_flags) && (pjob->ji_wattr[JOB_ATR_interactive].at_val.at_long)) { return(NULL); } if ((pjob->ji_wattr[JOB_ATR_checkpoint_name].at_flags & ATR_VFLAG_SET) == 0) { return(NULL); } /* if this file is joined to another then it doesn't have to get copied back */ if (is_joined(pjob, ati)) { return(preq); } if (preq == NULL) { preq = alloc_br(PBS_BATCH_ReturnFiles); } strcpy(preq->rq_ind.rq_returnfiles.rq_jobid, pjob->ji_qs.ji_jobid); if (ati == JOB_ATR_errpath) { preq->rq_ind.rq_returnfiles.rq_return_stderr = TRUE; } else if (ati == JOB_ATR_outpath) { preq->rq_ind.rq_returnfiles.rq_return_stdout = TRUE; } return(preq); } /* END return_stdfile() */ /* * cpy_stdfile - determine if one of the job's standard files (output or error) * is to be copied, if so set up the Copy Files request. */ struct batch_request *cpy_stdfile( struct batch_request *preq, job *pjob, enum job_atr ati) /* JOB_ATR_ output or error path */ { char *from; char key; pbs_attribute *jkpattr; pbs_attribute *pathattr = &pjob->ji_wattr[ati]; const char *suffix; char *to = NULL; char log_buf[LOCAL_LOG_BUF_SIZE]; if ((pjob->ji_wattr[JOB_ATR_interactive].at_flags) && (pjob->ji_wattr[JOB_ATR_interactive].at_val.at_long)) { /* the job is interactive, don't bother to return output file */ return(NULL); } /* set up depending on which file */ if (ati == JOB_ATR_errpath) { key = 'e'; suffix = JOB_STDERR_SUFFIX; } else { key = 'o'; suffix = JOB_STDOUT_SUFFIX; } if ((pathattr->at_flags & ATR_VFLAG_SET) == 0) { /* FAILURE: This shouldn't be */ sprintf(log_buf, "%c file missing", key); log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); return(NULL); } /* Is the file joined to another, if so don't copy it */ if (is_joined(pjob, ati)) { return(preq); } /* * If the job has a keep file pbs_attribute, and the specified file is in * the keep list, MOM has already placed the file in the user's HOME * directory. It doesn't need to be copied. */ jkpattr = &pjob->ji_wattr[JOB_ATR_keep]; if ((jkpattr->at_flags & ATR_VFLAG_SET) && (strchr(jkpattr->at_val.at_str, (int)key))) { /* SUCCESS */ return(preq); } /* go with the supplied name */ to = (char *)calloc(1, strlen(pathattr->at_val.at_str) + 1); if (to == NULL) { /* FAILURE */ /* cannot allocate memory for request this one */ log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ERROR: cannot allocate 'to' memory in cpy_stdfile"); return(preq); } strcpy(to, pathattr->at_val.at_str); /* build up the name used by MOM as the from name */ from = setup_from(pjob, suffix); if (from == NULL) { /* FAILURE */ log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ERROR: cannot allocate 'from' memory for from in cpy_stdfile"); free(to); return(preq); } /* now set names into the batch request */ return(setup_cpyfiles( preq, pjob, from, to, STAGE_DIR_OUT, STDJOBFILE)); } /* END cpy_stdfile() */ /* * cpy_stage - set up a Copy Files request to include files specified by the * user to be staged out (also used for stage-in). * "stage_out" is a resource that may or may not * exist on a host. * If such exists, the files are listed one per string as * "local_name@remote_host:remote_name". */ struct batch_request *cpy_stage( struct batch_request *preq, job *pjob, enum job_atr ati, /* JOB_ATR_stageout */ int direction) /* 1 = , 2 = */ { int i; char *from; pbs_attribute *pattr; struct array_strings *parst; char *plocal; char *prmt; char *to; char log_buf[LOCAL_LOG_BUF_SIZE+1]; pattr = &pjob->ji_wattr[ati]; if (pattr->at_flags & ATR_VFLAG_SET) { /* at last, we know we have files to stage out/in */ parst = pattr->at_val.at_arst; if (parst == NULL) { /* FAILURE */ snprintf(log_buf, sizeof(log_buf), "cannot copy stage file for job %s", pjob->ji_qs.ji_jobid); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); return(preq); } for (i = 0;i < parst->as_usedptr;++i) { plocal = parst->as_string[i]; prmt = strchr(plocal, (int)'@'); if (prmt != NULL) { *prmt = '\0'; from = (char *)calloc(1, strlen(plocal) + 1); if (from == NULL) { log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ALERT: cannot allocate memory in cpy_stage"); return(preq); } strcpy(from, plocal); *prmt = '@'; /* restore the @ */ to = (char *)calloc(1, strlen(prmt + 1) + 1); if (to == NULL) { log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ALERT: cannot allocate memory in cpy_stage"); free(from); return(preq); } strcpy(to, prmt + 1); preq = setup_cpyfiles(preq, pjob, from, to, direction, STAGEFILE); } } } return(preq); } /* END cpy_stage() */ /* * mom_comm - if needed, open a connection with the MOM under which * the job was running. The connection is typically set up by * req_jobobit() using the connection already established by MOM. * However, on server recovery there will be no pre-established connection. * * If a connection is needed and cannot be setup, set up a work-task * entry and try again later. */ int mom_comm( job *pjob, void *(*func)(struct work_task *vp)) { unsigned int dummy; time_t time_now = time(NULL); int local_errno = 0; int handle = -1; long cray_enabled = FALSE; char jobid[PBS_MAXSVRJOBID + 1]; char log_buf[LOCAL_LOG_BUF_SIZE]; if (LOGLEVEL >= 6) { snprintf(log_buf, sizeof(log_buf), "%s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); } /* need to make connection, called from pbsd_init() */ if (pjob->ji_qs.ji_un.ji_exect.ji_momaddr == 0) { char *tmp; get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); if ((cray_enabled == TRUE) && (pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str != NULL)) tmp = parse_servername(pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str, &dummy); else tmp = parse_servername(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str, &dummy); pjob->ji_qs.ji_un.ji_exect.ji_momaddr = get_hostaddr(&local_errno, tmp); free(tmp); } strcpy(jobid, pjob->ji_qs.ji_jobid); unlock_ji_mutex(pjob, __func__, NULL, LOGLEVEL); handle = svr_connect( pjob->ji_qs.ji_un.ji_exect.ji_momaddr, pjob->ji_qs.ji_un.ji_exect.ji_momport, &local_errno, NULL, NULL, ToServerDIS); if ((pjob = svr_find_job(jobid, TRUE)) == NULL) { return(-1 * PBSE_JOB_RECYCLED); } if (handle < 0) { /* FAILURE */ if (LOGLEVEL >= 2) { log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "cannot establish connection with mom for clean-up - will retry later"); } set_task(WORK_Timed, time_now + PBS_NET_RETRY_TIME, (void (*)(struct work_task *))func, strdup(jobid), FALSE); return(-1); } return(handle); } /* END mom_comm() */ /* * rel_resc - release resources assigned to the job */ void rel_resc( job *pjob) /* I (modified) */ { long cray_enabled = FALSE; get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); if ((cray_enabled == TRUE) && (pjob->ji_wattr[JOB_ATR_reservation_id].at_val.at_str != NULL)) { remove_alps_reservation(pjob->ji_wattr[JOB_ATR_reservation_id].at_val.at_str); } free_nodes(pjob); /* removed the resources used by the job from the used svr/que attr */ set_resc_assigned(pjob, DECR); /* mark that scheduler should be called */ pthread_mutex_lock(svr_do_schedule_mutex); svr_do_schedule = SCH_SCHEDULE_TERM; pthread_mutex_unlock(svr_do_schedule_mutex); pthread_mutex_lock(listener_command_mutex); listener_command = SCH_SCHEDULE_TERM; pthread_mutex_unlock(listener_command_mutex); return; } /* END rel_resc() */ int check_if_checkpoint_restart_failed( job *pjob) { char *pfailtype = NULL; char *pfailure = NULL; long *hold_val = 0; char errMsg[MAXPATHLEN]; char *err_ptr; char log_buf[LOCAL_LOG_BUF_SIZE+1]; snprintf(errMsg, sizeof(errMsg), "%s", pjob->ji_wattr[JOB_ATR_checkpoint_restart_status].at_val.at_str); err_ptr = errMsg; if ((pfailtype = threadsafe_tokenizer(&err_ptr, " ")) != NULL) pfailure = threadsafe_tokenizer(&err_ptr, " "); if (pfailure != NULL) { if (memcmp(pfailure,"failure",7) == 0) { if (memcmp(pfailtype,"Temporary",9) == 0) { /* reque job */ svr_setjobstate(pjob, JOB_STATE_QUEUED, JOB_SUBSTATE_QUEUED, FALSE); if (LOGLEVEL >= 4) { sprintf(log_buf, "Requeueing job after checkpoint restart failure: %s", pjob->ji_wattr[JOB_ATR_checkpoint_restart_status].at_val.at_str); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); } return(TRUE); } /* * If we are deleting job after a failure then the first character * of the comment should no longer be uppercase */ else if (isupper(*pfailtype)) { /* put job on hold */ hold_val = &pjob->ji_wattr[JOB_ATR_hold].at_val.at_long; *hold_val |= HOLD_s; pjob->ji_wattr[JOB_ATR_hold].at_flags |= ATR_VFLAG_SET; pjob->ji_modified = 1; svr_setjobstate(pjob, JOB_STATE_HELD, JOB_SUBSTATE_HELD, FALSE); if (LOGLEVEL >= 4) { sprintf(log_buf, "Placing job on hold after checkpoint restart failure: %s", pjob->ji_wattr[JOB_ATR_checkpoint_restart_status].at_val.at_str); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); } return(TRUE); } } } return(FALSE); } /* END check_if_checkpoint_restart_failed() */ int handle_exiting_or_abort_substate( job *pjob) { char log_buf[LOCAL_LOG_BUF_SIZE+1]; if (pjob == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "NULL input job pointer"); return(PBSE_BAD_PARAMETER); } mutex_mgr job_mutex(pjob->ji_mutex, true); if (LOGLEVEL >= 2) { sprintf(log_buf, "%s; JOB_SUBSTATE_EXITING", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,__func__,log_buf); } /* see if job has any dependencies */ if (pjob->ji_wattr[JOB_ATR_depend].at_flags & ATR_VFLAG_SET) { if (depend_on_term(pjob) == PBSE_JOBNOTFOUND) { job_mutex.set_lock_on_exit(false); return(PBSE_JOBNOTFOUND); } } svr_setjobstate(pjob,JOB_STATE_EXITING,JOB_SUBSTATE_RETURNSTD, FALSE); return(PBSE_NONE); } /* END handle_exiting_or_abort_substate() */ int handle_returnstd( job *pjob, struct batch_request *preq, int type) { int rc = PBSE_NONE; int KeepSeconds = 0; int IsFaked = 0; char *namebuf2; char namebuf[MAXPATHLEN + 1]; char log_buf[LOCAL_LOG_BUF_SIZE+1]; pbs_queue *pque; int handle = -1; char job_id[PBS_MAXSVRJOBID+1]; char job_fileprefix[PBS_JOBBASE+1]; unsigned long job_momaddr; char *job_momname = NULL; mutex_mgr job_mutex(pjob->ji_mutex, true); if (LOGLEVEL >= 10) log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, pjob->ji_qs.ji_jobid); strcpy(job_id, pjob->ji_qs.ji_jobid); strcpy(job_fileprefix, pjob->ji_qs.ji_fileprefix); job_momaddr = pjob->ji_qs.ji_un.ji_exect.ji_momaddr; if (pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str == NULL) { rc = PBSE_JOB_FILE_CORRUPT; goto handle_returnstd_cleanup; } job_momname = strdup(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str); if (job_momname == NULL) { rc = PBSE_MEM_MALLOC; goto handle_returnstd_cleanup; } if (type != WORK_Deferred_Reply) { /* this is the very first call, have mom return the files */ /* if job has been checkpointed and KeepSeconds > 0, copy the stderr * and stdout files back so that we can restart a completed * checkpointed job return_stdfile will only setup this request if * the job has a checkpoint file and the file is not joined to another * file */ pque = get_jobs_queue(&pjob); if (pque != NULL) { mutex_mgr pque_mutex = mutex_mgr(pque->qu_mutex, true); pthread_mutex_lock(server.sv_attr_mutex); KeepSeconds = attr_ifelse_long( &pque->qu_attr[QE_ATR_KeepCompleted], &server.sv_attr[SRV_ATR_KeepCompleted], 0); pthread_mutex_unlock(server.sv_attr_mutex); } else if (pjob == NULL) { job_mutex.set_lock_on_exit(false); rc = PBSE_JOBNOTFOUND; log_err(rc, __func__, "Job lost while acquiring queue 2"); goto handle_returnstd_cleanup; } if (KeepSeconds > 0) { snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_spool, job_fileprefix, JOB_STDOUT_SUFFIX); /* allocate space for the string name plus ".SAV" */ namebuf2 = (char *)calloc((strlen(namebuf) + 5), sizeof(char)); if (job_momaddr != pbs_server_addr) { preq = return_stdfile(preq, pjob, JOB_ATR_outpath); } else if (access(namebuf, F_OK) == 0) { strcpy(namebuf2, namebuf); strcat(namebuf2, ".SAV"); if (link(namebuf, namebuf2) == -1) { log_event( PBSEVENT_ERROR | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB, job_id, "Link(1) in on_job_exit failed"); } } namebuf[strlen(namebuf) - strlen(JOB_STDOUT_SUFFIX)] = '\0'; strcat(namebuf, JOB_STDERR_SUFFIX); if (job_momaddr != pbs_server_addr) { preq = return_stdfile(preq, pjob, JOB_ATR_errpath); } else if (access(namebuf, F_OK) == 0) { strcpy(namebuf2, namebuf); strcat(namebuf2, ".SAV"); if (link(namebuf, namebuf2) == -1) { log_event( PBSEVENT_ERROR | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB, job_id, "Link(2) in on_job_exit failed"); } } free(namebuf2); } if (preq != NULL) { if (preq->rq_extra != NULL) free(preq->rq_extra); preq->rq_extra = strdup(job_id); if ((handle = mom_comm(pjob, on_job_exit_task)) < 0) { job_mutex.unlock(); rc = PBSE_CONNECT; log_err(rc, __func__, "Job can not make connection to mom"); goto handle_returnstd_cleanup; } else { job_mutex.unlock(); if ((rc = issue_Drequest(handle, preq)) != PBSE_NONE) { /* set up as if mom returned error, if we fall through to * here then we want to hit the error processing below * because something bad happened */ IsFaked = 1; preq->rq_reply.brp_code = PBSE_MOMREJECT; preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL; preq->rq_reply.brp_un.brp_txt.brp_txtlen = 0; } } } else { job_mutex.unlock(); /* we don't need to return files to the server spool, * move on to see if we need to delete files */ if (LOGLEVEL >= 6) { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, "no spool files to return"); } } } else job_mutex.unlock(); /* this check is added to allow the case where no files need to be returned to function smoothly */ if (preq != NULL) { /* here we have a reply (maybe faked) from MOM about the request */ if (preq->rq_reply.brp_code != 0) { if (LOGLEVEL >= 3) { snprintf(log_buf, sizeof(log_buf), "request to return spool files failed on node '%s' for job %s%s", job_momname, job_id, (IsFaked == 1) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, log_buf); } } /* end if (preq->rq_reply.brp_code != 0) */ /* * files (generally) moved ok, move on to the next phase by * "faking" the immediate work task and falling through to * the next case. */ free_br(preq); } if ((pjob = svr_find_job(job_id, TRUE)) != NULL) { job_mutex.mark_as_locked(); svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_STAGEOUT, FALSE); } handle_returnstd_cleanup: if (job_momname != NULL) free(job_momname); return(rc); } /* END handle_returnstd() */ /* * handle_stageout() * * asks the mom to copy back any relevant files - stdout, stderr, and stageout files * @pre-cond: pjob must point to a valid job that is exiting * @post-cond: the job will be done with the stageout portion of it exiting. * @return: PBSE_NONE on success */ int handle_stageout( job *pjob, int type, batch_request *preq) { int rc = PBSE_NONE; int IsFaked = 0; int spool_file_exists; char log_buf[LOCAL_LOG_BUF_SIZE+1]; char namebuf[MAXPATHLEN + 1]; char *namebuf2; int handle = -1; char job_id[PBS_MAXSVRJOBID+1]; char *job_momname = NULL; char job_fileprefix[PBS_JOBBASE+1]; mutex_mgr job_mutex(pjob->ji_mutex, true); if (LOGLEVEL >= 10) log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, pjob->ji_qs.ji_jobid); snprintf(job_id, sizeof(job_id), "%s", pjob->ji_qs.ji_jobid); snprintf(job_fileprefix, sizeof(job_fileprefix), "%s", pjob->ji_qs.ji_fileprefix); if (pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str != NULL) job_momname = strdup(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str); if (job_momname == NULL) { rc = PBSE_MEM_MALLOC; goto handle_stageout_cleanup; } if (LOGLEVEL >= 4) { snprintf(log_buf, sizeof(log_buf), "JOB_SUBSTATE_STAGE_OUT: %s", job_id); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB, __func__, log_buf); } if (type != WORK_Deferred_Reply) { /* this is the very first call, have mom copy files */ /* first check the standard files: output & error */ preq = cpy_stdfile(preq, pjob, JOB_ATR_outpath); preq = cpy_stdfile(preq, pjob, JOB_ATR_errpath); /* are there any stage-out files ? */ preq = cpy_stage(preq, pjob, JOB_ATR_stageout, STAGE_DIR_OUT); if (preq != NULL) { /* have files to copy */ if (LOGLEVEL >= 4) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, "about to copy stdout/stderr/stageout files"); } preq->rq_extra = strdup(job_id); if ((handle = mom_comm(pjob, on_job_exit_task)) < 0) /* Error */ { rc = PBSE_CONNECT; goto handle_stageout_cleanup; } else { job_mutex.unlock(); if ((rc = issue_Drequest(handle, preq)) != PBSE_NONE) { /* FAILURE */ if (LOGLEVEL >= 1) { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, "copy request failed"); } /* set up as if mom returned error */ IsFaked = 1; preq->rq_reply.brp_code = PBSE_MOMREJECT; preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL; preq->rq_reply.brp_un.brp_txt.brp_txtlen = 0; } } } else { job_mutex.unlock(); /* no files to copy, go to next step */ if (LOGLEVEL >= 4) log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, "no files to copy"); } } /* END if (ptask->wt_type != WORK_Deferred_Reply) */ else job_mutex.unlock(); /* place this check so that we just fall through when a file needs to be copied */ if (preq != NULL) { /* here we have a reply (maybe faked) from MOM about the copy */ if (preq->rq_reply.brp_code != 0) { int bad; svrattrl tA; /* error from MOM */ snprintf(log_buf, sizeof(log_buf), msg_obitnocpy, job_id, job_momname); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); if (LOGLEVEL >= 3) { snprintf(log_buf, sizeof(log_buf), "request to copy stageout files failed on node '%s' for job %s%s", job_momname, job_id, (IsFaked == 1) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, log_buf); } if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) { safe_strncat(log_buf, preq->rq_reply.brp_un.brp_txt.brp_str, sizeof(log_buf) - strlen(log_buf) - 1); } if ((pjob = svr_find_job(job_id, TRUE)) == NULL) { rc = PBSE_JOBNOTFOUND; goto handle_stageout_cleanup; } else job_mutex.mark_as_locked(); svr_mailowner(pjob, MAIL_OTHER, MAIL_FORCE, log_buf); memset(&tA, 0, sizeof(tA)); tA.al_name = (char *)"sched_hint"; tA.al_resc = (char *)""; tA.al_value = log_buf; tA.al_op = SET; modify_job_attr( pjob, &tA, /* I: ATTR_sched_hint - svrattrl */ ATR_DFLAG_MGWR | ATR_DFLAG_SvWR, &bad); job_mutex.unlock(); } /* END if (preq->rq_reply.brp_code != 0) */ /* * files (generally) copied ok, move on to the next phase by * "faking" the immediate work task. */ /* check to see if we have saved the spool files, that means the mom and server are sharing this spool directory. pbs_server should take ownership of these files and rename them see JOB_SUBSTATE_RETURNSTD above*/ snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_spool, job_fileprefix, JOB_STDOUT_SUFFIX); /* allocate space for the string name plus ".SAV" */ namebuf2 = (char *)calloc((strlen(namebuf) + 5), sizeof(char)); strcpy(namebuf2, namebuf); strcat(namebuf2, ".SAV"); spool_file_exists = access(namebuf2, F_OK); if (spool_file_exists == 0) { if (link(namebuf2, namebuf) == -1) { snprintf(log_buf, sizeof(log_buf), "%s, pathname=%s, Link(3) failed", job_id, namebuf2); log_event( PBSEVENT_ERROR | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB, __func__, log_buf); } unlink(namebuf2); } namebuf[strlen(namebuf) - strlen(JOB_STDOUT_SUFFIX)] = '\0'; strcat(namebuf, JOB_STDERR_SUFFIX); strcpy(namebuf2, namebuf); strcat(namebuf2, ".SAV"); spool_file_exists = access(namebuf2, F_OK); if (spool_file_exists == 0) { if (link(namebuf2, namebuf) == -1) { snprintf(log_buf, sizeof(log_buf), "%s, pathname=%s, Link(4) failed", job_id, namebuf2); log_event( PBSEVENT_ERROR | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB, __func__, log_buf); } unlink(namebuf2); } free(namebuf2); if (preq->rq_extra != NULL) { free(preq->rq_extra); preq->rq_extra = NULL; } free_br(preq); preq = NULL; } /* END if preq != NULL */ if ((pjob = svr_find_job(job_id, TRUE)) != NULL) { job_mutex.mark_as_locked(); svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_STAGEDEL, FALSE); } handle_stageout_cleanup: if ((preq != NULL) && (preq->rq_extra != NULL)) { free(preq->rq_extra); preq->rq_extra = NULL; } if (job_momname != NULL) free(job_momname); return(rc); } /* END handle_stageout() */ int handle_stagedel( job *pjob, int type, batch_request *preq) { int rc = PBSE_NONE; int IsFaked = 0; char log_buf[LOCAL_LOG_BUF_SIZE+1]; int handle = -1; char job_id[PBS_MAXSVRJOBID+1]; char *job_momname = NULL; mutex_mgr job_mutex(pjob->ji_mutex, true); if (LOGLEVEL >= 10) log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, pjob->ji_qs.ji_jobid); strcpy(job_id, pjob->ji_qs.ji_jobid); if(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str != NULL) { job_momname = strdup(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str); } if (LOGLEVEL >= 4) { snprintf(log_buf, sizeof(log_buf), "JOB_SUBSTATE_STAGEDEL: %s", job_id); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,__func__, log_buf); } if (type != WORK_Deferred_Reply) { /* first time in */ /* Build list of files which were staged-in so they can * can be deleted. */ preq = cpy_stage(preq, pjob, JOB_ATR_stagein, 0); if (preq != NULL) { /* have files to delete */ /* change the request type from copy to delete */ preq->rq_type = PBS_BATCH_DelFiles; preq->rq_extra = strdup(job_id); if ((handle = mom_comm(pjob, on_job_exit_task)) < 0) { rc = PBSE_CONNECT; goto handle_stagedel_cleanup; } else { job_mutex.unlock(); if (issue_Drequest(handle, preq) != PBSE_NONE) { if (LOGLEVEL >= 2) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, "cannot issue file delete request for staged files"); } IsFaked = 1; /* set up as if mom returned error since the issue_Drequest failed */ preq->rq_reply.brp_code = PBSE_MOMREJECT; preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL; } } } else job_mutex.unlock(); } else job_mutex.unlock(); /* place if here so that jobs without staged files just fall through */ if (preq != NULL) { /* After MOM replied (maybe faked) to Delete Files request */ if (preq->rq_reply.brp_code != 0) { /* an error occurred */ snprintf(log_buf, sizeof(log_buf), msg_obitnodel, job_id, (job_momname != NULL)?job_momname:"(no host)"); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,job_id,log_buf); if (LOGLEVEL >= 3) { snprintf(log_buf, sizeof(log_buf), "request to remove stage-in files failed on node '%s' for job %s%s", (job_momname != NULL)?job_momname:"(no host)", job_id, (IsFaked == 1) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, log_buf); } if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) { safe_strncat(log_buf, preq->rq_reply.brp_un.brp_txt.brp_str, sizeof(log_buf) - strlen(log_buf) - 1); } if ((pjob = svr_find_job(job_id, TRUE)) == NULL) { rc = PBSE_JOBNOTFOUND; goto handle_stagedel_cleanup; } svr_mailowner(pjob, MAIL_OTHER, MAIL_FORCE, log_buf); unlock_ji_mutex(pjob, __func__, "5", LOGLEVEL); } free_br(preq); } if ((pjob = svr_find_job(job_id, TRUE)) != NULL) { job_mutex.mark_as_locked(); svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_EXITED, FALSE); } handle_stagedel_cleanup: if (job_momname != NULL) free(job_momname); return(rc); } /* END handle_stagedel() */ int handle_exited( job *pjob) { batch_request *preq; pbs_queue *pque; char log_buf[LOCAL_LOG_BUF_SIZE+1]; int rc; int handle = -1; char job_id[PBS_MAXSVRJOBID+1]; mutex_mgr job_mutex(pjob->ji_mutex, true); strcpy(job_id, pjob->ji_qs.ji_jobid); if (LOGLEVEL >= 4) { snprintf(log_buf, sizeof(log_buf), "%s; JOB_SUBSTATE_EXITED", job_id); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,__func__,log_buf); } /* tell mom to delete the job, send final track and purge it */ preq = alloc_br(PBS_BATCH_DeleteJob); if (preq != NULL) { strcpy(preq->rq_ind.rq_delete.rq_objname, job_id); if ((handle = mom_comm(pjob, on_job_exit_task)) < 0) return(PBSE_CONNECT); else { job_mutex.unlock(); if ((rc = issue_Drequest(handle, preq)) != PBSE_NONE) { snprintf(log_buf, sizeof(log_buf), "DeleteJob issue_Drequest failure, rc = %d", rc); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, log_buf); } free_br(preq); } } else job_mutex.unlock(); preq = NULL; if ((pjob = svr_find_job(job_id, TRUE)) == NULL) return(PBSE_JOBNOTFOUND); else job_mutex.mark_as_locked(); rel_resc(pjob); /* free any resc assigned to the job */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) issue_track(pjob); /* see if restarted job failed */ if (pjob->ji_wattr[JOB_ATR_checkpoint_restart_status].at_flags & ATR_VFLAG_SET) { if (check_if_checkpoint_restart_failed(pjob) == TRUE) { return(-1); } } svr_setjobstate(pjob, JOB_STATE_COMPLETE, JOB_SUBSTATE_COMPLETE, FALSE); pque = get_jobs_queue(&pjob); if (pque != NULL) { unlock_queue(pque, __func__, NULL, LOGLEVEL); } else if (pjob == NULL) { log_err(PBSE_JOBNOTFOUND, __func__, "Job lost while acquiring queue 3"); return(PBSE_JOBNOTFOUND); } return(PBSE_NONE); } /* END handle_exited() */ int handle_complete_subjob( job *pjob) { job *parent_job = pjob->ji_parent_job; job *other_subjob; int rc = PBSE_NONE; int complete_parent = FALSE; unlock_ji_mutex(pjob, __func__, NULL, LOGLEVEL); lock_ji_mutex(parent_job, __func__, NULL, LOGLEVEL); if (parent_job->ji_being_recycled == FALSE) { if (parent_job->ji_cray_clone == pjob) other_subjob = parent_job->ji_external_clone; else other_subjob = parent_job->ji_cray_clone; lock_ji_mutex(other_subjob, __func__, NULL, LOGLEVEL); if ((other_subjob->ji_being_recycled == TRUE) || (other_subjob->ji_qs.ji_state == JOB_STATE_COMPLETE)) complete_parent = TRUE; unlock_ji_mutex(other_subjob, __func__, NULL, LOGLEVEL); if (complete_parent == TRUE) { if (parent_job->ji_qs.ji_state == JOB_STATE_COMPLETE) { /* ready to finish - delete the parent job */ svr_job_purge(parent_job); } else { svr_setjobstate(parent_job, JOB_STATE_COMPLETE, JOB_SUBSTATE_COMPLETE, FALSE); parent_job->ji_wattr[JOB_ATR_comp_time].at_val.at_long = (long)time(NULL); parent_job->ji_wattr[JOB_ATR_comp_time].at_flags |= ATR_VFLAG_SET; rel_resc(parent_job); handle_complete_first_time(parent_job); } } } unlock_ji_mutex(parent_job, __func__, NULL, LOGLEVEL); return(rc); } /* END handle_complete_subjob() */ int handle_complete_first_time( job *pjob) { int rc = PBSE_NONE; pbs_queue *pque; int KeepSeconds = 0; time_t time_now = time(NULL); char log_buf[LOCAL_LOG_BUF_SIZE+1]; long must_report = FALSE; int job_complete = 0; if (LOGLEVEL >= 10) LOG_EVENT(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, pjob->ji_qs.ji_jobid); /* first time in */ if (LOGLEVEL >= 4) log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "JOB_SUBSTATE_COMPLETE"); remove_job_from_exiting_list(&pjob); if(pjob == NULL) { /* let the caller know the job is gone */ log_err(PBSE_JOBNOTFOUND, __func__, "Job lost while removing job from exiting list."); return PBSE_JOBNOTFOUND; } pque = get_jobs_queue(&pjob); if (pque != NULL) { mutex_mgr pque_mutex = mutex_mgr(pque->qu_mutex, true); pthread_mutex_lock(server.sv_attr_mutex); KeepSeconds = attr_ifelse_long( &pque->qu_attr[QE_ATR_KeepCompleted], &server.sv_attr[SRV_ATR_KeepCompleted], 0); pthread_mutex_unlock(server.sv_attr_mutex); } else if (pjob == NULL) { /* let the caller know the job is gone */ log_err(PBSE_JOBNOTFOUND, __func__, "Job lost while acquiring queue 4"); return PBSE_JOBNOTFOUND; } if ((get_svr_attr_l(SRV_ATR_JobMustReport, &must_report) == PBSE_NONE) && (must_report > 0)) { pjob->ji_wattr[JOB_ATR_reported].at_val.at_long = 0; pjob->ji_wattr[JOB_ATR_reported].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; job_save(pjob,SAVEJOB_FULL, 0); /* If job must report is set and keep_completed is 0, default to * JOBMUSTREPORTDEFAULTKEEP seconds */ if (KeepSeconds <= 0) KeepSeconds = JOBMUSTREPORTDEFAULTKEEP; } if (KeepSeconds <= 0) { rc = svr_job_purge(pjob); return(rc); } job_complete = pjob->ji_qs.ji_substate == JOB_SUBSTATE_COMPLETE ? 1 : 0; if ((job_complete == 1) && (pjob->ji_wattr[JOB_ATR_comp_time].at_flags & ATR_VFLAG_SET)) { /* * server restart - if we already have a completion_time then we * better be restarting. * use the comp_time to determine task invocation time */ if (LOGLEVEL >= 7) { sprintf(log_buf, "calling on_job_exit from %s: rc = -1", __func__); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } set_task(WORK_Timed, pjob->ji_wattr[JOB_ATR_comp_time].at_val.at_long + KeepSeconds, handle_complete_second_time, strdup(pjob->ji_qs.ji_jobid), FALSE); } else { struct timeval tv; struct timeval *tv_attr; struct timeval result; struct timezone tz; pjob->ji_wattr[JOB_ATR_comp_time].at_val.at_long = (long)time(NULL); pjob->ji_wattr[JOB_ATR_comp_time].at_flags |= ATR_VFLAG_SET; if (LOGLEVEL >= 7) { sprintf(log_buf, "calling on_job_exit from %s", __func__); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } set_task(WORK_Timed, time_now + KeepSeconds, handle_complete_second_time, strdup(pjob->ji_qs.ji_jobid), FALSE); if (gettimeofday(&tv, &tz) == 0) { tv_attr = &pjob->ji_wattr[JOB_ATR_total_runtime].at_val.at_timeval; timeval_subtract(&result, &tv, tv_attr); pjob->ji_wattr[JOB_ATR_total_runtime].at_val.at_timeval.tv_sec = result.tv_sec; pjob->ji_wattr[JOB_ATR_total_runtime].at_val.at_timeval.tv_usec = result.tv_usec; pjob->ji_wattr[JOB_ATR_total_runtime].at_flags |= ATR_VFLAG_SET; } else { pjob->ji_wattr[JOB_ATR_total_runtime].at_val.at_timeval.tv_sec = 0; pjob->ji_wattr[JOB_ATR_total_runtime].at_val.at_timeval.tv_usec = 0; } job_save(pjob, SAVEJOB_FULL, 0); } unlock_ji_mutex(pjob, __func__, "2", LOGLEVEL); return(FALSE); } /* END handle_complete_first_time() */ void handle_complete_second_time( struct work_task *ptask) { char log_buf[LOCAL_LOG_BUF_SIZE+1]; time_t time_now = time(NULL); char *job_id = (char *)ptask->wt_parm1; job *pjob; free(ptask->wt_mutex); free(ptask); if (job_id == NULL) return; pjob = svr_find_job(job_id, TRUE); free(job_id); if (pjob == NULL) return; mutex_mgr job_mutex(pjob->ji_mutex, true); if (LOGLEVEL >= 10) log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, pjob->ji_qs.ji_jobid); if (((pjob->ji_wattr[JOB_ATR_reported].at_flags & ATR_VFLAG_SET) != 0) && (pjob->ji_wattr[JOB_ATR_reported].at_val.at_long == 0)) { /* the job must report pbs_attribute but hasn't: * skip the job if it has not yet reported to the scheduler. */ if (LOGLEVEL >= 7) { sprintf(log_buf, "Bypassing job %s waiting for purge completed command", pjob->ji_qs.ji_jobid); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } if (LOGLEVEL >= 7) { sprintf(log_buf, "calling on_job_exit from %s", __func__); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } set_task(WORK_Timed, time_now + JOBMUSTREPORTDEFAULTKEEP, handle_complete_second_time, strdup(pjob->ji_qs.ji_jobid), FALSE); } else { /* under rare circumstances, a job can have a clean up task but have been re-run * by a scheduler. Ensure the job is ready to get deleted before purging */ if (pjob->ji_qs.ji_state != JOB_STATE_COMPLETE) { snprintf(log_buf, sizeof(log_buf), "Job %s has removal task but is in non-completed state %s", pjob->ji_qs.ji_jobid, PJobState[pjob->ji_qs.ji_state]); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, pjob->ji_qs.ji_jobid); } else { svr_job_purge(pjob); job_mutex.set_lock_on_exit(false); } } return; } /* END handle_complete_second_time() */ /* * on_job_exit - continue post-execution processing of a job that terminated. * * This function is called by pbsd_init() on recovery, by req_jobobit() * on job termination and by itself (via a work task). The clue to where * we are is the job substate and the type of the work task entry it is * called with. If the work task entry type is Work_Immed, then this is * the first time in for the job substate. Otherwise it is with the reply * given by MOM. * * NOTE: * On the initial work task (WORK_Immed), the wt_parm1 is a job pointer. * On a call-back work task (WORK_Deferred_Reply) generated by * send_request(), the wt_parm1 is pointing to the request; and the * rq_extra field in the request points to the job. */ void on_job_exit( batch_request *preq, /* I */ char *job_id) /* I */ { int rc = PBSE_NONE; job *pjob; int type = WORK_Deferred_Reply; char log_buf[LOCAL_LOG_BUF_SIZE]; if (preq == NULL) type = WORK_Immed; else { job_id = (char *)preq->rq_extra; preq->rq_extra = NULL; } /* check for calloc errors */ if (job_id == NULL) { log_err(ENOMEM, __func__, "Cannot allocate memory!"); return; } /* make sure the job is actually still there */ pjob = svr_find_job(job_id, TRUE); /* if the job doesn't exist, just exit */ if (pjob == NULL) { sprintf(log_buf, "%s called with INVALID jobid: %s", __func__, job_id); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, "NULL", log_buf); free(job_id); return; } mutex_mgr job_mutex(pjob->ji_mutex, true); job_mutex.set_lock_on_exit(false); sprintf(log_buf, "%s valid pjob: %s (substate=%d)", __func__, job_id, pjob->ji_qs.ji_substate); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, log_buf); /* NOTE: pjob is unlocked in all error cases */ /* MOM has killed everything it can kill, so we can stop the nanny */ switch (pjob->ji_qs.ji_substate) { case JOB_SUBSTATE_EXITING: case JOB_SUBSTATE_ABORT: rc = handle_exiting_or_abort_substate(pjob); pjob = NULL; /* NO BREAK, fall into stage out processing */ case JOB_SUBSTATE_RETURNSTD: /* this is a new substate to TORQUE 2.4.0. The purpose is to provide a * stage of the job exiting process that allows us to transfer stderr and * stdout files from the mom's spool directory back to the server spool * directory. This will only be done if the job has been checkpointed, * and keep_completed is a positive value. This is so that a completed * job can be restarted from a checkpoint file. */ if ((pjob == NULL) && ((pjob = svr_find_job(job_id, TRUE)) == NULL)) break; if ((rc = handle_returnstd(pjob, preq, type)) != PBSE_NONE) break; preq = NULL; pjob = NULL; case JOB_SUBSTATE_STAGEOUT: if ((pjob == NULL) && ((pjob = svr_find_job(job_id, TRUE)) == NULL)) break; if ((rc = handle_stageout(pjob, type, preq)) != PBSE_NONE) { snprintf(log_buf, sizeof(log_buf), "handle_stageout failed: %d", rc); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); break; } preq = NULL; pjob = NULL; case JOB_SUBSTATE_STAGEDEL: if ((pjob == NULL) && ((pjob = svr_find_job(job_id, TRUE)) == NULL)) { snprintf(log_buf, sizeof(log_buf), "could not find job: %s", job_id); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); break; } if ((rc = handle_stagedel(pjob, type, preq)) != PBSE_NONE) { snprintf(log_buf, sizeof(log_buf), "handle_stagedel failed: %d", rc); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); break; } preq = NULL; pjob = NULL; case JOB_SUBSTATE_EXITED: if ((pjob == NULL) && ((pjob = svr_find_job(job_id, TRUE)) == NULL)) { snprintf(log_buf, sizeof(log_buf), "could not find job: %s", job_id); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); break; } rc = handle_exited(pjob); if ((rc == PBSE_JOBNOTFOUND) || (rc == PBSE_CONNECT)) { snprintf(log_buf, sizeof(log_buf), "handle_exited failed: %s, rc = %d", job_id, rc); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); break; } type = rc; pjob = NULL; case JOB_SUBSTATE_COMPLETE: if ((pjob == NULL) && ((pjob = svr_find_job(job_id, TRUE)) == NULL)) { snprintf(log_buf, sizeof(log_buf), "could not find job: %s", job_id); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); break; } if (pjob->ji_parent_job != NULL) { handle_complete_subjob(pjob); } else if (type == WORK_Immed) /* WORK_Immed == PBSE_NONE.... */ handle_complete_first_time(pjob); else { struct work_task *ptask = (struct work_task *)calloc(1, sizeof(struct work_task)); if (ptask == NULL) return; if ((ptask->wt_mutex = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t))) == NULL) { free(ptask); return; } if ((ptask->wt_parm1 = strdup(pjob->ji_qs.ji_jobid)) == NULL) { free(ptask->wt_mutex); free(ptask); return; } handle_complete_second_time(ptask); } break; default: job_mutex.unlock(); break; } /* END switch (pjob->ji_qs.ji_substate) */ if (job_id != NULL) free(job_id); return; } /* END on_job_exit() */ void *on_job_exit_task( struct work_task *vp) { char log_buf[LOCAL_LOG_BUF_SIZE]; struct work_task *ptask = (struct work_task *)vp; char *jobid = (char *)ptask->wt_parm1; free(ptask->wt_mutex); free(ptask); if (LOGLEVEL >= 10) { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, "We scheduled a task"); } if (jobid != NULL) { if (LOGLEVEL >= 10) { snprintf(log_buf, sizeof(log_buf), "%s", jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, "log_buf"); } on_job_exit(NULL, jobid); } return(NULL); } /* END on_job_exit_task() */ void *on_job_rerun_task( struct work_task *vp) { struct work_task *ptask = (struct work_task *)vp; char *jobid = (char *)ptask->wt_parm1; free(ptask->wt_mutex); free(ptask); if (jobid != NULL) { on_job_rerun(NULL, jobid); } return(NULL); } /* END on_job_rerun_task() */ /* * on_job_rerun - Handle the clean up of jobs being rerun. This gets * messy if the job is being executed on another host. Then the * "standard" files must be copied to the server for safe keeping. * * The basic flow is very much like that of on_job_exit(). * The substate will already set to JOB_SUBSTATE_RERUN and the * JOB_SVFLG_HASRUN bit set in ji_svrflags. */ void on_job_rerun( batch_request *preq, char *job_id) { int handle; int newstate; int newsubst; int rc = 0; job *pjob; int reply_type = TRUE; int IsFaked; char log_buf[LOCAL_LOG_BUF_SIZE+1]; if (preq != NULL) { job_id = (char *)preq->rq_extra; preq->rq_extra = NULL; } else reply_type = FALSE; /* check for memory allocation */ if (job_id == NULL) { log_err(ENOMEM, __func__, "Cannot allocate memory"); if (preq != NULL) free_br(preq); return; } pjob = svr_find_job(job_id, TRUE); free(job_id); /* the job has already exited */ if (pjob == NULL) { if (preq != NULL) free_br(preq); return; } mutex_mgr job_mutex(pjob->ji_mutex, true); if ((handle = mom_comm(pjob, on_job_rerun_task)) < 0) { if (preq != NULL) free_br(preq); return; } switch (pjob->ji_qs.ji_substate) { case JOB_SUBSTATE_RERUN: IsFaked = 0; if (pjob->ji_qs.ji_un.ji_exect.ji_momaddr == pbs_server_addr) { /* files don`t need to be moved, go to next step */ svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN1, FALSE); reply_type = FALSE; } else { if (reply_type == FALSE) { /* here is where we have to save the files */ /* ask mom to send them back to the server */ /* mom deletes her copy if returned ok */ if ((preq = alloc_br(PBS_BATCH_Rerun)) == NULL) return; strcpy(preq->rq_ind.rq_rerun, pjob->ji_qs.ji_jobid); preq->rq_extra = strdup(pjob->ji_qs.ji_jobid); job_id = strdup(pjob->ji_qs.ji_jobid); job_mutex.unlock(); if (issue_Drequest(handle, preq) != PBSE_NONE) { /* cannot issue request to mom, set up as if mom returned error */ IsFaked = 1; preq->rq_reply.brp_code = 1; /* we will "fall" into the post reply side */ } pjob = svr_find_job(job_id, TRUE); if (pjob == NULL) { snprintf(log_buf, sizeof(log_buf), "Job %s removed during call to issue_Drequest", job_id ); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); free(job_id); return; } job_mutex.mark_as_locked(); free(job_id); } /* We get here if MOM replied (may be faked above) */ /* to the rerun (return files) request issued above */ if (preq->rq_reply.brp_code != PBSE_NONE) { /* error */ if (LOGLEVEL >= 3) { snprintf(log_buf, sizeof(log_buf), "request to save output files failed on node '%s' for job %s%s", pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str, pjob->ji_qs.ji_jobid, (IsFaked == 1) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } /* for now, just log it */ snprintf(log_buf, sizeof(log_buf), msg_obitnocpy, pjob->ji_qs.ji_jobid, pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); } svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN1, FALSE); reply_type = FALSE; free_br(preq); preq = NULL; } /* NO BREAK, FALL THROUGH TO NEXT CASE, including the request */ case JOB_SUBSTATE_RERUN1: IsFaked = 0; if (reply_type == FALSE) { /* this is the very first call, have mom copy files */ /* are there any stage-out files to process? */ preq = cpy_stage(preq, pjob, JOB_ATR_stageout, STAGE_DIR_OUT); if (preq != NULL) { /* have files to copy */ preq->rq_extra = strdup(pjob->ji_qs.ji_jobid); if (issue_Drequest(handle, preq) != PBSE_NONE) { /* set up as if mom returned error */ IsFaked = 1; preq->rq_reply.brp_code = PBSE_MOMREJECT; preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL; preq->rq_reply.brp_un.brp_txt.brp_txtlen = 0; /* we will "fall" into the post reply side */ } /* here we have a reply (maybe faked) from MOM about the copy */ if (preq->rq_reply.brp_code != 0) { /* error from MOM */ if (LOGLEVEL >= 3) { snprintf(log_buf, sizeof(log_buf), "request to save stageout files failed on node '%s' for job %s%s", pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str, pjob->ji_qs.ji_jobid, (IsFaked == TRUE) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } snprintf(log_buf, sizeof(log_buf), msg_obitnocpy, pjob->ji_qs.ji_jobid, pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) { safe_strncat( log_buf, preq->rq_reply.brp_un.brp_txt.brp_str, sizeof(log_buf) - strlen(log_buf) - 1); } svr_mailowner(pjob, MAIL_OTHER, MAIL_FORCE, log_buf); } /* files (generally) copied ok, move on to the next phase by * "faking" the immediate work task. */ free_br(preq); preq = NULL; svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN2, FALSE); reply_type = FALSE; } else { /* no files to copy, any to delete? */ svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN2, FALSE); } } /* NO BREAK - FALL INTO THE NEXT CASE */ case JOB_SUBSTATE_RERUN2: IsFaked = 0; if (reply_type == FALSE) { /* here is where we delete any stage-in files */ preq = cpy_stage(preq, pjob, JOB_ATR_stagein, 0); if (preq != NULL) { preq->rq_type = PBS_BATCH_DelFiles; preq->rq_extra = strdup(pjob->ji_qs.ji_jobid); if (issue_Drequest(handle, preq) != PBSE_NONE) { /* error on sending request */ IsFaked = 1; preq->rq_reply.brp_code = 1; /* we will "fall" into the post reply side */ } /* post reply side for delete file request to MOM */ if (preq->rq_reply.brp_code != 0) { /* error */ if (LOGLEVEL >= 3) { snprintf(log_buf, sizeof(log_buf), "request to delete stagein files failed on node '%s' for job %s%s", pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str, pjob->ji_qs.ji_jobid, (IsFaked == TRUE) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } /* for now, just log it */ snprintf(log_buf, sizeof(log_buf), msg_obitnocpy, pjob->ji_qs.ji_jobid, pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); } free_br(preq); preq = NULL; svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN3, FALSE); reply_type = FALSE; } else { svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN3, FALSE); } } /* NO BREAK, FALL THROUGH TO NEXT CASE */ case JOB_SUBSTATE_RERUN3: /* need to have MOM delete her copy of the job */ preq = alloc_br(PBS_BATCH_DeleteJob); if (preq != NULL) { strcpy(preq->rq_ind.rq_delete.rq_objname, pjob->ji_qs.ji_jobid); job_id = strdup(pjob->ji_qs.ji_jobid); job_mutex.unlock(); rc = issue_Drequest(handle, preq); free_br(preq); if (rc != 0) { snprintf(log_buf, sizeof(log_buf), "DeleteJob issue_Drequest failure, rc = %d", rc); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } if ((pjob = svr_find_job(job_id, TRUE)) == NULL) { snprintf(log_buf, sizeof(log_buf), "Job %s removed during call to issue_Drequest", job_id); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); free(job_id); return; } else job_mutex.mark_as_locked(); free(job_id); } rel_resc(pjob); /* free resc assigned to job */ /* Now re-queue the job */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HOTSTART) == 0) { /* in case of server shutdown, don't clear exec_host */ /* or session_id will use it on hotstart when next comes up */ job_attr_def[JOB_ATR_exec_host].at_free( &pjob->ji_wattr[JOB_ATR_exec_host]); job_attr_def[JOB_ATR_session_id].at_free( &pjob->ji_wattr[JOB_ATR_session_id]); job_attr_def[JOB_ATR_exec_gpus].at_free( &pjob->ji_wattr[JOB_ATR_exec_gpus]); } pjob->ji_modified = 1; /* force full job save */ pjob->ji_momhandle = -1; pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_StagedIn; svr_evaljobstate(*pjob, newstate, newsubst, 0); svr_setjobstate(pjob, newstate, newsubst, FALSE); break; } /* END switch (pjob->ji_qs.ji_substate) */ return; } /* END on_job_rerun() */ /* * wait_for_send - recall req_jobobit after delay when race (condition) * goes to an Obit from MOM rather than to the SIGCHLD of the send_job() * child that sent the job to MOM. */ void wait_for_send( struct work_task *ptask) { batch_request *preq = (batch_request *)get_remove_batch_request((char *)ptask->wt_parm1); if (preq != NULL) req_jobobit(preq); return; } int setrerun( job *pjob, const char *text) { if (pjob->ji_wattr[JOB_ATR_rerunable].at_val.at_long) { /* job is rerunnable */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_RERUN; /* SUCCESS */ return(PBSE_NONE); } /* FAILURE */ svr_mailowner_with_message(pjob, MAIL_ABORT, MAIL_FORCE, msg_init_abt,text); return(1); } /* END setrerun() */ /** * Gets the latest stored used resource information (cput, mem, walltime, etc.) * about the given job. * */ int get_used( svrattrl *patlist, /* I */ char *acctbuf) /* O */ { int retval = FALSE; int amt; int need; amt = RESC_USED_BUF - strlen(acctbuf); while (patlist != NULL) { if (strcmp(patlist->al_name, ATTR_session) == 0) { patlist = (svrattrl *)GET_NEXT(patlist->al_link); continue; } need = strlen(patlist->al_name) + strlen(patlist->al_value) + 3; if (patlist->al_resc) { need += strlen(patlist->al_resc) + 3; } if (need < amt) { strcat(acctbuf, "\n"); strcat(acctbuf, patlist->al_name); if (patlist->al_resc) { strcat(acctbuf, "."); strcat(acctbuf, patlist->al_resc); } strcat(acctbuf, "="); strcat(acctbuf, patlist->al_value); amt -= need; } retval = TRUE; patlist = (svrattrl *)GET_NEXT(patlist->al_link); } return (retval); } /* END get_used() */ /** * Encodes the used resource information (cput, mem, walltime, etc.) * about the given job. * */ #ifdef USESAVEDRESOURCES void encode_job_used( job *pjob, /* I */ tlist_head *phead) /* O */ { pbs_attribute *at; attribute_def *ad; resource *rs; at = &pjob->ji_wattr[JOB_ATR_resc_used]; ad = &job_attr_def[JOB_ATR_resc_used]; if ((at->at_flags & ATR_VFLAG_SET) == 0) { return; } for (rs = (resource *)GET_NEXT(at->at_val.at_list); rs != NULL; rs = (resource *)GET_NEXT(rs->rs_link)) { resource_def *rd = rs->rs_defin; pbs_attribute val; int rc; val = rs->rs_value; /* copy resource pbs_attribute */ rc = rd->rs_encode( &val, phead, ad->at_name, rd->rs_name, ATR_ENCODE_CLIENT, ATR_DFLAG_ACCESS); if (rc < 0) break; } /* END for (rs) */ return; } /* END encode_job_used() */ #endif /* USESAVEDRESOURCES */ /* * This routine logs a comment on the parent job telling which subjob * exited with a non-zero exit status. It is only called when one job * exited with an error and the second variable indicates if it was * the cray subjob or the exterior sub-job. * * @param parent_job - the job that needs the comment added * @param cray_subjob_exited_nonzero - TRUE if the cray subjob exited * with an error, or FALSE if the external subjob exited with an error. * @param exit_status - the exit status in question * @return PBSE_NONE on success or ENOMEM if you can't alloc memory */ int add_comment_to_parent( job *parent_job, int cray_subjob_exited_nonzero, int exit_status) { char comment_buf[MAXLINE*2]; char *comment; if (cray_subjob_exited_nonzero == TRUE) snprintf(comment_buf, sizeof(comment_buf), "Job terminated because the cray sub-job exited with code '%d'", exit_status); else snprintf(comment_buf, sizeof(comment_buf), "Job terminated because the external sub-job exited with code '%d'", exit_status); if ((comment = strdup(comment_buf)) == NULL) return(ENOMEM); if (parent_job->ji_wattr[JOB_ATR_Comment].at_val.at_str != NULL) free(parent_job->ji_wattr[JOB_ATR_Comment].at_val.at_str); parent_job->ji_wattr[JOB_ATR_Comment].at_val.at_str = comment; parent_job->ji_wattr[JOB_ATR_Comment].at_flags |= ATR_VFLAG_SET; return(PBSE_NONE); } /* END add_comment_to_parent() */ int handle_subjob_exit_status( job *pjob) { job *parent_job; job *other_subjob; int exit_status = pjob->ji_qs.ji_un.ji_exect.ji_exitstat; int rc = PBSE_NONE; char jobid[PBS_MAXSVRJOBID + 1]; char other_jobid[PBS_MAXSVRJOBID + 1]; char log_buf[LOCAL_LOG_BUF_SIZE]; struct pbsnode *pnode = NULL; int cray_exited_nonzero = FALSE; strcpy(jobid, pjob->ji_qs.ji_jobid); parent_job = pjob->ji_parent_job; unlock_ji_mutex(pjob, __func__, NULL, LOGLEVEL); lock_ji_mutex(parent_job, __func__, NULL, LOGLEVEL); if (parent_job->ji_qs.ji_un.ji_exect.ji_exitstat == 0) { parent_job->ji_qs.ji_un.ji_exect.ji_exitstat = exit_status; if (exit_status != 0) { if (parent_job->ji_cray_clone != pjob) { other_subjob = parent_job->ji_cray_clone; } else { cray_exited_nonzero = TRUE; other_subjob = parent_job->ji_external_clone; } add_comment_to_parent(parent_job, cray_exited_nonzero, exit_status); unlock_ji_mutex(parent_job, __func__, NULL, LOGLEVEL); lock_ji_mutex(other_subjob, __func__, NULL, LOGLEVEL); if (other_subjob->ji_qs.ji_state <= JOB_STATE_RUNNING) { strcpy(other_jobid, other_subjob->ji_qs.ji_jobid); pnode = find_nodebyname(other_subjob->ji_qs.ji_destin); } unlock_ji_mutex(other_subjob, __func__, NULL, LOGLEVEL); if (pnode != NULL) { snprintf(log_buf, sizeof(log_buf), "Sub-job %s exited with a non-zero exit status, canceling job %s", jobid, other_jobid); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, other_jobid, log_buf); kill_job_on_mom(other_jobid, pnode); unlock_node(pnode, __func__, NULL, LOGLEVEL); } } else unlock_ji_mutex(parent_job, __func__, NULL, LOGLEVEL); } else unlock_ji_mutex(parent_job, __func__, NULL, LOGLEVEL); return(rc); } /* END handle_subjob_exit_status() */ int rerun_job( job *pjob, int newstate, int newsubst, char *acctbuf) { int rc = PBSE_NONE; #ifdef RERUNUSAGE char *pc; #endif /* Rerunning job, if not checkpointed, clear "resources_used and requeue job */ if ((pjob->ji_qs.ji_svrflags & (JOB_SVFLG_CHECKPOINT_FILE | JOB_SVFLG_CHECKPOINT_MIGRATEABLE)) == 0) { job_attr_def[JOB_ATR_resc_used].at_free(&pjob->ji_wattr[JOB_ATR_resc_used]); } else if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHECKPOINT_FILE) { /* non-migratable checkpoint (cray), leave there */ /* and just requeue the job */ rel_resc(pjob); pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN; svr_evaljobstate(*pjob, newstate, newsubst, 1); svr_setjobstate(pjob, newstate, newsubst, FALSE); close_conn(pjob->ji_momhandle, FALSE); pjob->ji_momhandle = -1; unlock_ji_mutex(pjob, __func__, "8", LOGLEVEL); return(PBSE_SYSTEM); } svr_setjobstate(pjob, JOB_STATE_EXITING, pjob->ji_qs.ji_substate, FALSE); set_task(WORK_Immed, 0, (void (*)(struct work_task *))on_job_rerun_task, strdup(pjob->ji_qs.ji_jobid), FALSE); if (LOGLEVEL >= 4) { log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "on_job_rerun task assigned to job"); } #ifdef RERUNUSAGE /* replace new-lines with blanks for accounting record */ for (pc = acctbuf; *pc; ++pc) { if (*pc == '\n') *pc = ' '; } /* record accounting */ account_jobend(pjob, acctbuf); #endif /* RERUNUSAGE */ /* remove checkpoint restart file if there is one */ if (pjob->ji_wattr[JOB_ATR_restart_name].at_flags & ATR_VFLAG_SET) { cleanup_restart_file(pjob); } /* "on_job_rerun()" will be dispatched out of the main loop */ return(rc); } /* END rerun_job() */ int handle_rerunning_heterogeneous_jobs( job *pjob, int newstate, int newsubst, char *acctbuf) { job *parent_job = pjob->ji_parent_job; job *other_subjob; int rc = PBSE_NONE; if ((rc = rerun_job(pjob, newstate, newsubst, acctbuf)) == PBSE_NONE) { unlock_ji_mutex(pjob, __func__, NULL, LOGLEVEL); lock_ji_mutex(parent_job, __func__, NULL, LOGLEVEL); if (parent_job->ji_external_clone == pjob) other_subjob = parent_job->ji_cray_clone; else other_subjob = parent_job->ji_external_clone; unlock_ji_mutex(parent_job, __func__, NULL, LOGLEVEL); lock_ji_mutex(other_subjob, __func__, NULL, LOGLEVEL); if ((rc = rerun_job(other_subjob, newstate, newsubst, acctbuf)) == PBSE_NONE) { unlock_ji_mutex(other_subjob, __func__, NULL, LOGLEVEL); lock_ji_mutex(parent_job, __func__, NULL, LOGLEVEL); if ((rc = rerun_job(parent_job, newstate, newsubst, acctbuf)) == PBSE_NONE) unlock_ji_mutex(parent_job, __func__, NULL, LOGLEVEL); } } return(rc); } /* END handle_rerunning_heterogeneous_jobs() */ int end_of_job_accounting( job *pjob, char *acctbuf, int accttail) { char *pc; long events = 0; /* replace new-lines with blanks for log message */ for (pc = acctbuf; *pc; ++pc) { if (*pc == '\n') *pc = ' '; } /* record accounting and maybe in log */ account_jobend(pjob, acctbuf); get_svr_attr_l(SRV_ATR_log_events, &events); if (events & PBSEVENT_JOB_USAGE) { /* log events set to record usage */ log_event(PBSEVENT_JOB_USAGE | PBSEVENT_JOB_USAGE, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, acctbuf); } else { /* no usage in log, truncate message */ *(acctbuf + accttail) = '\0'; log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, acctbuf); } return(PBSE_NONE); } /* END end_of_job_accounting() */ int handle_terminating_array_subjob( job *pjob) { job_array *pa; long job_atr_hold; int job_exit_status; char job_id[PBS_MAXSVRJOBID+1]; /* decrease array running job count */ if ((pjob->ji_arraystructid[0] != '\0') && (pjob->ji_is_array_template == FALSE)) { pa = get_jobs_array(&pjob); if (pjob == NULL) return(PBSE_UNKJOBID); if (pa != NULL) { job_atr_hold = pjob->ji_wattr[JOB_ATR_hold].at_val.at_long; job_exit_status = pjob->ji_qs.ji_un.ji_exect.ji_exitstat; snprintf(job_id, sizeof(job_id), "%s", pjob->ji_qs.ji_jobid); unlock_ji_mutex(pjob, __func__, "7", LOGLEVEL); update_array_values(pa, JOB_STATE_RUNNING, aeTerminate, job_id, job_atr_hold, job_exit_status); unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); pjob = svr_find_job(job_id, TRUE); if (pjob == NULL) return(PBSE_UNKJOBID); } } return(PBSE_NONE); } /* END handle_terminating_array_subjob() */ int handle_terminating_job( job *pjob, int alreadymailed, const char *mailbuf) { int rc = PBSE_NONE; /* If job is terminating (not rerun), */ /* update state and send mail */ svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_EXITING, FALSE); record_job_as_exiting(pjob); if (alreadymailed == 0) svr_mailowner(pjob, MAIL_END, MAIL_NORMAL, mailbuf); if ((rc = handle_terminating_array_subjob(pjob)) != PBSE_NONE) return(rc); if (LOGLEVEL >= 4) { log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "on_job_exit task assigned to job"); } /* remove checkpoint restart file if there is one */ if (pjob->ji_wattr[JOB_ATR_restart_name].at_flags & ATR_VFLAG_SET) { cleanup_restart_file(pjob); } return(rc); } /* END handle_terminating_job() */ int update_substate_from_exit_status( job *pjob, int *alreadymailed, const char *text) { long automatic_requeue = -1000; int exitstatus = pjob->ji_qs.ji_un.ji_exect.ji_exitstat; char log_buf[LOCAL_LOG_BUF_SIZE+1]; int rc = PBSE_NONE; int newstate; int newsubst; /* Was there a special exit status from MOM ? */ get_svr_attr_l(SRV_ATR_AutomaticRequeueExitCode, &automatic_requeue); if (exitstatus == automatic_requeue) { pjob->ji_qs.ji_substate = JOB_SUBSTATE_RERUN1; } else if (exitstatus < 0) { /* negative exit status is special */ switch (exitstatus) { case JOB_EXEC_OVERLIMIT_MEM: /* the job exceeded a memory resource limit */ svr_mailowner_with_message(pjob, MAIL_ABORT, MAIL_FORCE, msg_momjobovermemlimit,text); *alreadymailed = 1; break; case JOB_EXEC_OVERLIMIT_WT: /* the job exceeded its walltime limit */ svr_mailowner_with_message(pjob, MAIL_ABORT, MAIL_FORCE, msg_momjoboverwalltimelimit,text); *alreadymailed = 1; break; case JOB_EXEC_OVERLIMIT_CPUT: /* the job exceeded its cpu time limit */ svr_mailowner_with_message(pjob, MAIL_ABORT, MAIL_FORCE, msg_momjobovercputlimit,text); *alreadymailed = 1; break; case JOB_EXEC_FAIL1: default: /* MOM rejected job with fatal error, abort job */ svr_mailowner_with_message(pjob, MAIL_ABORT, MAIL_FORCE, msg_momnoexec1,text); *alreadymailed = 1; break; case JOB_EXEC_FAIL2: /* MOM reject job after files setup, abort job */ svr_mailowner_with_message(pjob, MAIL_ABORT, MAIL_FORCE, msg_momnoexec2,text); *alreadymailed = 1; break; case JOB_EXEC_INITABT: /* MOM aborted job on her initialization */ *alreadymailed = setrerun(pjob,text); pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN; break; case JOB_EXEC_RETRY: /* MOM rejected job, but said retry it */ if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HASRUN) { /* has run before, treat this as another rerun */ *alreadymailed = setrerun(pjob,text); } else { /* have mom remove job files, not saving them, and requeue job */ /* transient failure detected */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_RERUN1; } break; case JOB_EXEC_BADRESRT: /* MOM could not restart job, setup for rerun */ *alreadymailed = setrerun(pjob,text); pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_CHECKPOINT_FILE; break; case JOB_EXEC_INITRST: /* MOM abort job on init, job has checkpoint file */ /* Requeue it, and thats all folks. */ if (LOGLEVEL >= 1) { log_event( PBSEVENT_JOB_USAGE | PBSEVENT_JOB_USAGE, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "received JOB_EXEC_INITRST, setting job CHECKPOINT_FLAG flag"); } rel_resc(pjob); pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN | JOB_SVFLG_CHECKPOINT_FILE; svr_evaljobstate(*pjob, newstate, newsubst, 1); svr_setjobstate(pjob, newstate, newsubst, FALSE); close_conn(pjob->ji_momhandle, FALSE); pjob->ji_momhandle = -1; return(PBSE_SYSTEM); /*NOTREACHED*/ break; case JOB_EXEC_INITRMG: /* MOM abort job on init, job has migratable checkpoint */ /* Must recover output and checkpoint file, do eoj */ *alreadymailed = setrerun(pjob,text); pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN | JOB_SVFLG_CHECKPOINT_MIGRATEABLE; break; } /* END switch (exitstatus) */ } /* END if (exitstatus < 0) */ if (LOGLEVEL >= 2) { sprintf(log_buf, "job exit status %d handled", exitstatus); log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } return(rc); } /* END update_state_from_exit_status() */ /* * req_jobobit - process the Job Obituary Notice (request) from MOM. * This notice is sent from MOM when a job terminates. */ int req_jobobit( batch_request *preq) /* I */ { int alreadymailed = 0; int rc = PBSE_NONE; char acctbuf[RESC_USED_BUF]; int accttail; int exitstatus; char mailbuf[RESC_USED_BUF]; int local_errno = 0; char *tmp = NULL; job *pjob; char job_id[PBS_MAXSVRJOBID+1]; struct work_task *ptask; svrattrl *patlist; unsigned int dummy; char log_buf[LOCAL_LOG_BUF_SIZE+1]; time_t time_now = time(NULL); pbs_net_t mom_addr; int rerunning_job = FALSE; int have_resc_used = FALSE; /* This will be needed later for logging after preq is freed. */ strcpy(job_id, preq->rq_ind.rq_jobobit.rq_jid); tmp = parse_servername(preq->rq_host, &dummy); mom_addr = get_hostaddr(&local_errno, tmp); pjob = svr_find_job(job_id, TRUE); if ((pjob == NULL) || (pjob->ji_qs.ji_un.ji_exect.ji_momaddr != mom_addr)) { /* not found or from wrong node */ if ((server_init_type == RECOV_COLD) || (server_init_type == RECOV_CREATE)) { /* tell MOM the job was blown away */ sprintf(log_buf, msg_obitnojob, preq->rq_host, PBSE_CLEANEDOUT); rc = PBSE_CLEANEDOUT; req_reject(PBSE_CLEANEDOUT, 0, preq, NULL, NULL); } else if (pjob != NULL) { snprintf(log_buf, sizeof(log_buf), "Received obit for job %s from mom %s, but mom address doesn't match", pjob->ji_qs.ji_jobid, preq->rq_host); req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); } else { sprintf(log_buf, msg_obitnojob, preq->rq_host, PBSE_UNKJOBID); rc = PBSE_UNKJOBID; req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); } log_err(rc, job_id, log_buf); if (pjob != NULL) unlock_ji_mutex(pjob, __func__, "1", LOGLEVEL); free(tmp); return(rc); } /* END if ((pjob == NULL) || ...) */ free(tmp); mutex_mgr job_mutex(pjob->ji_mutex, true); if (pjob->ji_qs.ji_state == JOB_STATE_COMPLETE) { reply_ack(preq); return(PBSE_BADSTATE); /* Mom didn't update correctly past time, so this was resent. */ } else if (pjob->ji_qs.ji_state != JOB_STATE_RUNNING) { if (pjob->ji_qs.ji_state == JOB_STATE_EXITING) { /* already in exit processing, just make sure its in the exiting * jobs list. No need to check return code because if it is * already present then we have no problem. */ record_job_as_exiting(pjob); rc = PBSE_ALRDYEXIT; } else { /* not running and not exiting - bad news (possible data staging issue) */ /* NOTE: was logged w/msg_obitnojob */ sprintf(log_buf, "obit received for job %s from host %s with bad state (state: %s)", job_id, preq->rq_host, PJobState[pjob->ji_qs.ji_state]); log_err(PBSE_BADSTATE, job_id, log_buf); rc = PBSE_BADSTATE; } req_reject(rc, 0, preq, NULL, NULL); return(rc); } /* END if (pjob->ji_qs.ji_state != JOB_STATE_RUNNING) */ if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_PRERUN) { /* have hit a race condition, the send_job child's SIGCHLD */ /* has not yet been reaped. Must wait for it. */ ptask = set_task(WORK_Timed, time_now + 1, wait_for_send, (void *)preq, FALSE); if (ptask == NULL) req_reject(PBSE_SYSTEM, 0, preq, NULL, NULL); /* In else case, the request is after callback. Please don't change things * when you aren't sure what should happen. */ /* Connection is left open to be used in wait_for_send * which ends up being a call to this function that writes to the socket */ return(PBSE_SYSTEM); } /* * save exit state, update the resources used, and reply to MOM. * * Note, must make the log/mail message about resources used BEFORE * replying to MOM. The reply will free the pbs_attribute list so it * cannot be used after the call to reply_ack(); */ exitstatus = preq->rq_ind.rq_jobobit.rq_status; pjob->ji_qs.ji_un.ji_exect.ji_exitstat = exitstatus; pjob->ji_wattr[JOB_ATR_exitstat].at_val.at_long = exitstatus; pjob->ji_wattr[JOB_ATR_exitstat].at_flags |= ATR_VFLAG_SET; if (pjob->ji_parent_job != NULL) handle_subjob_exit_status(pjob); patlist = (svrattrl *)GET_NEXT(preq->rq_ind.rq_jobobit.rq_attr); /* Encode the final resources_used into the job (useful for keep_completed) */ if (LOGLEVEL >= 2) { log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, (char *)"obit received - updating final job usage info"); } modify_job_attr( pjob, patlist, ATR_DFLAG_MGWR | ATR_DFLAG_SvWR, &rc); sprintf(acctbuf, msg_job_end_stat, pjob->ji_qs.ji_un.ji_exect.ji_exitstat); if (exitstatus >= 10000) { sprintf(mailbuf, msg_job_end_sig, exitstatus - 10000); } else { mailbuf[0] = '\0'; } accttail = strlen(acctbuf); have_resc_used = get_used(patlist, acctbuf); #ifdef USESAVEDRESOURCES /* if we don't have resources from the obit, use what the job already had */ if (!have_resc_used) { struct batch_request *tmppreq; if (LOGLEVEL >= 7) { log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, "No resources used found"); } tmppreq = alloc_br(PBS_BATCH_JobObit); if (tmppreq == NULL) { /* FAILURE */ sprintf(log_buf, "cannot allocate memory for temp obit message"); log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_REQUEST, __func__, log_buf); req_reject(PBSE_SYSTEM, 0, preq, NULL, NULL); return(PBSE_SYSTEM); } CLEAR_HEAD(tmppreq->rq_ind.rq_jobobit.rq_attr); encode_job_used(pjob, &tmppreq->rq_ind.rq_jobobit.rq_attr); patlist = (svrattrl *)GET_NEXT(tmppreq->rq_ind.rq_jobobit.rq_attr); have_resc_used = get_used(patlist, acctbuf); free_br(tmppreq); } #else if (!have_resc_used) { /* this is so higher versions of gcc won't abort because of the condition 'variable set but not used' */ } #endif /* USESAVEDRESOURCES */ safe_strncat(mailbuf, acctbuf, sizeof(mailbuf) - strlen(mailbuf) - 1); reply_ack(preq); /* clear suspended flag if it was set */ pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_Suspend; if ((rc = update_substate_from_exit_status(pjob, &alreadymailed,mailbuf)) != PBSE_NONE) return(rc); /* What do we now do with the job... */ if ((pjob->ji_qs.ji_substate != JOB_SUBSTATE_RERUN) && (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RERUN1)) { end_of_job_accounting(pjob, acctbuf, accttail); if ((rc = handle_terminating_job(pjob, alreadymailed, mailbuf)) != PBSE_NONE) return(rc); } else { rerunning_job = TRUE; /* if this is a heterogeneous sub-job, handle it appropriately */ if (pjob->ji_parent_job != NULL) { rc = handle_rerunning_heterogeneous_jobs(pjob, pjob->ji_qs.ji_state, pjob->ji_qs.ji_substate, acctbuf); job_mutex.set_lock_on_exit(false); return(rc); } else { if ((rc = rerun_job(pjob, pjob->ji_qs.ji_state, pjob->ji_qs.ji_substate, acctbuf)) != PBSE_NONE) { job_mutex.set_lock_on_exit(false); return(rc); } } } /* END else */ if (LOGLEVEL >= 4) { sprintf(log_buf, "job exit status %d handled", exitstatus); log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, "req_jobobit completed"); } if (rerunning_job == FALSE) { if (LOGLEVEL >= 7) { sprintf(log_buf, "calling on_job_exit from %s", __func__); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, log_buf); } set_task(WORK_Immed, 0, (void (*)(struct work_task *))on_job_exit_task, strdup(job_id), 0); } return(PBSE_NONE); } /* END req_jobobit() */ /* END req_jobobit.c */