/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ /* * req_jobobit.c - functions dealing with a Job Obituary Request (Notice) * and the associated post execution job clean up. */ #include /* the master config generated by configure */ #include #include #include #include #include #include #include #include "libpbs.h" #include "server_limits.h" #include "list_link.h" #include "attribute.h" #include "resource.h" #include "server.h" #include "pbs_job.h" #include "credential.h" #include "batch_request.h" #include "work_task.h" #include "pbs_error.h" #include "log.h" #include "acct.h" #include "net_connect.h" #include "svrfunc.h" #include "sched_cmds.h" #include "queue.h" #include "array.h" #define RESC_USED_BUF 2048 #define JOBMUSTREPORTDEFAULTKEEP 30 /* External Global Data Items */ extern unsigned int pbs_mom_port; extern char *path_spool; extern int server_init_type; extern pbs_net_t pbs_server_addr; extern char *msg_init_abt; extern char *msg_job_end; extern char *msg_job_end_sig; extern char *msg_job_end_stat; extern char *msg_momnoexec1; extern char *msg_momnoexec2; extern char *msg_momjoboverlimit; extern char *msg_obitnojob; extern char *msg_obitnocpy; extern char *msg_obitnodel; extern char server_host[]; extern int svr_do_schedule; extern int listener_command; extern time_t time_now; extern int LOGLEVEL; extern const char *PJobState[]; /* External Functions called */ extern void set_resc_assigned(job *, enum batch_op); extern void cleanup_restart_file(job *); /* Local public functions */ void req_jobobit(struct batch_request *); /* * setup_from - setup the "from" name for a standard job file: * output, error, or checkpoint */ static char *setup_from( job *pjob, /* I */ char *suffix) /* I */ { char *from; from = malloc(strlen(pjob->ji_qs.ji_fileprefix) + strlen(suffix) + 1); if (from != NULL) { strcpy(from, pjob->ji_qs.ji_fileprefix); strcat(from, suffix); } return(from); } /* END setup_from() */ /* * setup_cpyfiles - if need be, allocate and initalize a Copy Files * batch request, then append the file pairs */ struct batch_request *setup_cpyfiles( struct batch_request *preq, job *pjob, char *from, /* local (to mom) name */ char *to, /* remote (destination) name */ int direction, /* copy direction */ int tflag) /* 1 if stdout or stderr , 2 if stage out or in*/ { struct rq_cpyfile *pcf; struct rqfpair *pair; if (preq == NULL) { /* allocate and initialize the batch request struct */ preq = alloc_br(PBS_BATCH_CopyFiles); if (preq == NULL) { /* FAILURE */ free(from); if (to != NULL) free(to); log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ALERT: cannot allocate memory for preq in setup_cpyfiles"); return(preq); } pcf = &preq->rq_ind.rq_cpyfile; CLEAR_HEAD(pcf->rq_pair); /* copy jobid, owner, exec-user, group names, up to the @host part */ strcpy(pcf->rq_jobid, pjob->ji_qs.ji_jobid); get_jobowner( pjob->ji_wattr[(int)JOB_ATR_job_owner].at_val.at_str, pcf->rq_owner); get_jobowner( pjob->ji_wattr[(int)JOB_ATR_euser].at_val.at_str, pcf->rq_user); if (((pjob->ji_wattr[(int)JOB_ATR_egroup].at_flags & ATR_VFLAG_DEFLT) == 0) && (pjob->ji_wattr[(int)JOB_ATR_egroup].at_val.at_str != 0)) { strcpy(pcf->rq_group, pjob->ji_wattr[(int)JOB_ATR_egroup].at_val.at_str); } else { pcf->rq_group[0] = '\0'; /* default: use login group */ } pcf->rq_dir = direction; } else { /* use the existing request structure */ pcf = &preq->rq_ind.rq_cpyfile; } pair = (struct rqfpair *)malloc(sizeof(struct rqfpair)); if (pair == NULL) { /* FAILURE */ free_br(preq); log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ALERT: cannot allocate memory for rqfpair in setup_cpyfiles"); return(NULL); } CLEAR_LINK(pair->fp_link); pair->fp_local = from; pair->fp_rmt = to; pair->fp_flag = tflag; append_link(&pcf->rq_pair, &pair->fp_link, pair); return(preq); } /* END setup_cpyfiles() */ static int is_joined( job *pjob, /* I */ enum job_atr ati) /* I */ { char key; attribute *pattr; char *pd; if (ati == JOB_ATR_outpath) key = 'o'; else if (ati == JOB_ATR_errpath) key = 'e'; else { return(0); } pattr = &pjob->ji_wattr[(int)JOB_ATR_join]; if (pattr->at_flags & ATR_VFLAG_SET) { pd = pattr->at_val.at_str; if ((pd != NULL) && (*pd != '\0') && (*pd != 'n')) { /* if not the first letter, and in list - is joined */ if ((*pd != key) && (strchr(pd + 1, (int)key))) { return(1); /* being joined */ } } } return(0); /* either the first or not in list */ } static struct batch_request *return_stdfile( struct batch_request *preq, job *pjob, enum job_atr ati) { if ((pjob->ji_wattr[(int)JOB_ATR_interactive].at_flags) && (pjob->ji_wattr[(int)JOB_ATR_interactive].at_val.at_long)) { return NULL; } if ((pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_flags & ATR_VFLAG_SET) == 0) { return NULL; } /* if this file is joined to another then it doesn't have to get copied back */ if (is_joined(pjob, ati)) { return preq; } if (preq == NULL) { preq = alloc_br(PBS_BATCH_ReturnFiles); } strcpy(preq->rq_ind.rq_returnfiles.rq_jobid, pjob->ji_qs.ji_jobid); if (ati == JOB_ATR_errpath) { preq->rq_ind.rq_returnfiles.rq_return_stderr = TRUE; } else if (ati == JOB_ATR_outpath) { preq->rq_ind.rq_returnfiles.rq_return_stdout = TRUE; } return preq; } /* * cpy_stdfile - determine if one of the job's standard files (output or error) * is to be copied, if so set up the Copy Files request. */ static struct batch_request *cpy_stdfile( struct batch_request *preq, job *pjob, enum job_atr ati) /* JOB_ATR_ output or error path */ { char *from; char key; attribute *jkpattr; attribute *pathattr = &pjob->ji_wattr[(int)ati]; char *suffix; char *to = NULL; if ((pjob->ji_wattr[(int)JOB_ATR_interactive].at_flags) && (pjob->ji_wattr[(int)JOB_ATR_interactive].at_val.at_long)) { /* the job is interactive, don't bother to return output file */ return(NULL); } /* set up depending on which file */ if (ati == JOB_ATR_errpath) { key = 'e'; suffix = JOB_STDERR_SUFFIX; } else { key = 'o'; suffix = JOB_STDOUT_SUFFIX; } if ((pathattr->at_flags & ATR_VFLAG_SET) == 0) { /* FAILURE: This shouldn't be */ sprintf(log_buffer, "%c file missing", key); log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(NULL); } /* Is the file joined to another, if so don't copy it */ if (is_joined(pjob, ati)) { return(preq); } /* * If the job has a keep file attribute, and the specified file is in * the keep list, MOM has already placed the file in the user's HOME * directory. It doesn't need to be copied. */ jkpattr = &pjob->ji_wattr[(int)JOB_ATR_keep]; if ((jkpattr->at_flags & ATR_VFLAG_SET) && (strchr(jkpattr->at_val.at_str, (int)key))) { /* SUCCESS */ return(preq); } /* go with the supplied name */ to = (char *)malloc(strlen(pathattr->at_val.at_str) + 1); if (to == NULL) { /* FAILURE */ /* cannot allocate memory for request this one */ log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ERROR: cannot allocate 'to' memory in cpy_stdfile"); return(preq); } strcpy(to, pathattr->at_val.at_str); /* build up the name used by MOM as the from name */ from = setup_from(pjob, suffix); if (from == NULL) { /* FAILURE */ log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ERROR: cannot allocate 'from' memory for from in cpy_stdfile"); free(to); return(preq); } /* now set names into the batch request */ return(setup_cpyfiles( preq, pjob, from, to, STAGE_DIR_OUT, STDJOBFILE)); } /* END cpy_stdfile() */ /* * cpy_stage - set up a Copy Files request to include files specified by the * user to be staged out (also used for stage-in). * "stage_out" is a resource that may or may not * exist on a host. * If such exists, the files are listed one per string as * "local_name@remote_host:remote_name". */ struct batch_request *cpy_stage( struct batch_request *preq, job *pjob, enum job_atr ati, /* JOB_ATR_stageout */ int direction) /* 1 = , 2 = */ { int i; char *from; attribute *pattr; struct array_strings *parst; char *plocal; char *prmt; char *to; pattr = &pjob->ji_wattr[(int)ati]; if (pattr->at_flags & ATR_VFLAG_SET) { /* at last, we know we have files to stage out/in */ parst = pattr->at_val.at_arst; if (parst == NULL) { /* FAILURE */ snprintf(log_buffer, LOG_BUF_SIZE, "cannot copy stage file for job %s", pjob->ji_qs.ji_jobid); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(preq); } for (i = 0;i < parst->as_usedptr;++i) { plocal = parst->as_string[i]; prmt = strchr(plocal, (int)'@'); if (prmt != NULL) { *prmt = '\0'; from = malloc(strlen(plocal) + 1); if (from == NULL) { log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ALERT: cannot allocate memory in cpy_stage"); return(preq); } strcpy(from, plocal); *prmt = '@'; /* restore the @ */ to = malloc(strlen(prmt + 1) + 1); if (to == NULL) { log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "ALERT: cannot allocate memory in cpy_stage"); free(from); return(preq); } strcpy(to, prmt + 1); preq = setup_cpyfiles(preq, pjob, from, to, direction, STAGEFILE); } } } return(preq); } /* END cpy_stage() */ /* * mom_comm - if needed, open a connection with the MOM under which * the job was running. The connection is typically set up by * req_jobobit() using the connection already established by MOM. * However, on server recovery there will be no pre-established connection. * * If a connection is needed and cannot be setup, set up a work-task * entry and try again later. */ int mom_comm( job *pjob, void (*func)(struct work_task *)) { unsigned int dummy; struct work_task *pwt; if (pjob->ji_momhandle < 0) { /* need to make connection, called from pbsd_init() */ if (pjob->ji_qs.ji_un.ji_exect.ji_momaddr == 0) { pjob->ji_qs.ji_un.ji_exect.ji_momaddr = get_hostaddr( parse_servername(pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str, &dummy)); } pjob->ji_momhandle = svr_connect( pjob->ji_qs.ji_un.ji_exect.ji_momaddr, pbs_mom_port, process_Dreply, ToServerDIS); if (pjob->ji_momhandle < 0) { /* FAILURE */ if (LOGLEVEL >= 2) { log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "cannot establish connection with mom for clean-up - will retry later"); } pwt = set_task( WORK_Timed, (long)(time_now + PBS_NET_RETRY_TIME), func, (void *)pjob); if (pwt != NULL) { /* insure that work task will be removed if job goes away */ append_link(&pjob->ji_svrtask, &pwt->wt_linkobj, pwt); } return(-1); } } /* END if (pjob->ji_momhandle < 0) */ return(pjob->ji_momhandle); } /* END mom_comm() */ /* * rel_resc - release resources assigned to the job */ void rel_resc( job *pjob) /* I (modified) */ { free_nodes(pjob); /* removed the resources used by the job from the used svr/que attr */ set_resc_assigned(pjob, DECR); /* mark that scheduler should be called */ svr_do_schedule = SCH_SCHEDULE_TERM; listener_command = SCH_SCHEDULE_TERM; return; } /* END rel_resc() */ /* * on_job_exit - continue post-execution processing of a job that terminated. * * This function is called by pbsd_init() on recovery, by req_jobobit() * on job termination and by itself (via a work task). The clue to where * we are is the job substate and the type of the work task entry it is * called with. If the work task entry type is Work_Immed, then this is * the first time in for the job substate. Otherwise it is with the reply * given by MOM. * * NOTE: * On the initial work task (WORK_Immed), the wt_parm1 is a job pointer. * On a call-back work task (WORK_Deferred_Reply) generated by * send_request(), the wt_parm1 is pointing to the request; and the * rq_extra field in the request points to the job. */ extern tlist_head svr_alljobs; void on_job_exit( struct work_task *ptask) /* I */ { int handle = -1; job *pjob; job *pj; struct batch_request *preq; int IsFaked = 0; int KeepSeconds = 0; int PurgeIt = FALSE; int MustReport = FALSE; pbs_queue *pque; char namebuf[MAXPATHLEN + 1]; char *namebuf2; int spool_file_exists; int rc = 0; extern void remove_job_delete_nanny(struct job *); if (ptask->wt_type != WORK_Deferred_Reply) { preq = NULL; pjob = (job *)ptask->wt_parm1; } else { preq = (struct batch_request *)ptask->wt_parm1; pjob = (job *)preq->rq_extra; } /* make sure the job is actually still there */ pj = (job *)GET_NEXT(svr_alljobs); while (pj != NULL) { if (pjob == pj) break; pj = (job *)GET_NEXT(pj->ji_alljobs); } /* if the job doesn't exist, just exit */ if (pj == NULL) { sprintf(log_buffer, "on_job_exit called with INVALID pjob: %p", (void *)pjob); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,"NULL",log_buffer); return; } else { sprintf(log_buffer, "on_job_exit valid pjob: %p (substate=%d)", (void *)pjob, pjob->ji_qs.ji_substate); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,"NULL",log_buffer); } /* * we don't need a handle if we are complete. On starting up we will NOT have * a connection to mom, but still want to try and process the completed job */ if ((pjob->ji_qs.ji_substate != JOB_SUBSTATE_COMPLETE) && ((handle = mom_comm(pjob,on_job_exit)) < 0)) { /* FAILURE - cannot connect to mom */ return; } /* MOM has killed everything it can kill, so we can stop the nanny */ remove_job_delete_nanny(pjob); switch (pjob->ji_qs.ji_substate) { case JOB_SUBSTATE_EXITING: case JOB_SUBSTATE_ABORT: if (LOGLEVEL >= 2) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "JOB_SUBSTATE_EXITING"); } /* see if job has any dependencies */ if (pjob->ji_wattr[(int)JOB_ATR_depend].at_flags & ATR_VFLAG_SET) { depend_on_term(pjob); } svr_setjobstate( pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RETURNSTD); ptask->wt_type = WORK_Immed; /* NO BREAK, fall into stage out processing */ case JOB_SUBSTATE_RETURNSTD: /* this is a new substate to TORQUE 2.4.0. The purpose is to provide a * stage of the job exiting process that allows us to transfer stderr and * stdout files from the mom's spool directory back to the server spool * directory. This will only be done if the job has been checkpointed, * and keep_completed is a positive value. This is so that a completed * job can be restarted from a checkpoint file. */ if (ptask->wt_type != WORK_Deferred_Reply) { /* this is the very first call, have mom return the files */ /* if job has been checkpointed and KeepSeconds > 0, copy the stderr * and stdout files back so that we can restart a completed * checkpointed job return_stdfile will only setup this request if * the job has a checkpoint file and the file is not joined to another * file */ KeepSeconds = 0; if ((pque = pjob->ji_qhdr) && (pque->qu_attr != NULL)) { KeepSeconds = attr_ifelse_long( &pque->qu_attr[(int)QE_ATR_KeepCompleted], &server.sv_attr[(int)SRV_ATR_KeepCompleted], 0); } if (KeepSeconds > 0) { strcpy(namebuf, path_spool); strcat(namebuf, pjob->ji_qs.ji_fileprefix); strcat(namebuf, JOB_STDOUT_SUFFIX); /* allocate space for the string name plus ".SAV" */ namebuf2 = malloc((strlen(namebuf) + 5) * sizeof(char)); if (pjob->ji_qs.ji_un.ji_exect.ji_momaddr != pbs_server_addr) { preq = return_stdfile(preq, pjob, JOB_ATR_outpath); } else if (access(namebuf, F_OK) == 0) { strcpy(namebuf2, namebuf); strcat(namebuf2, ".SAV"); if (link(namebuf, namebuf2) == -1) { LOG_EVENT( PBSEVENT_ERROR | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "Link(1) in on_job_exit failed"); } } namebuf[strlen(namebuf) - strlen(JOB_STDOUT_SUFFIX)] = '\0'; strcat(namebuf, JOB_STDERR_SUFFIX); if (pjob->ji_qs.ji_un.ji_exect.ji_momaddr != pbs_server_addr) { preq = return_stdfile(preq, pjob, JOB_ATR_errpath); } else if (access(namebuf, F_OK) == 0) { strcpy(namebuf2, namebuf); strcat(namebuf2, ".SAV"); if (link(namebuf, namebuf2) == -1) { LOG_EVENT( PBSEVENT_ERROR | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "Link(2) in on_job_exit failed"); } } free(namebuf2); } if (preq != NULL) { preq->rq_extra = (void *)pjob; if (issue_Drequest(handle, preq, on_job_exit, 0) == 0) { /* success, we'll come back after mom replies */ return; } /* set up as if mom returned error, if we fall through to * here then we want to hit the error processing below * because something bad happened */ IsFaked = 1; preq->rq_reply.brp_code = PBSE_MOMREJECT; preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL; preq->rq_reply.brp_un.brp_txt.brp_txtlen = 0; } else { /* we don't need to return files to the server spool, move on to see if we need to delete files */ svr_setjobstate( pjob, JOB_STATE_EXITING, JOB_SUBSTATE_STAGEOUT); if (LOGLEVEL >= 6) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "no spool files to return"); } ptask = set_task(WORK_Immed, 0, on_job_exit, pjob); if (ptask != NULL) { append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask); } return; } } /* here we have a reply (maybe faked) from MOM about the request */ if (preq->rq_reply.brp_code != 0) { if (LOGLEVEL >= 3) { snprintf(log_buffer, LOG_BUF_SIZE, "request to return spool files failed on node '%s' for job %s%s", pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str, pjob->ji_qs.ji_jobid, (IsFaked == 1) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } } /* end if (preq->rq_reply.brp_code != 0) */ /* * files (generally) moved ok, move on to the next phase by * "faking" the immediate work task and falling through to * the next case. */ free_br(preq); preq = NULL; svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_STAGEOUT); ptask->wt_type = WORK_Immed; /* NO BREAK -- FALL INTO NEXT CASE */ case JOB_SUBSTATE_STAGEOUT: if (LOGLEVEL >= 4) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "JOB_SUBSTATE_STAGEOUT"); } IsFaked = 0; if (ptask->wt_type != WORK_Deferred_Reply) { /* this is the very first call, have mom copy files */ /* first check the standard files: output & error */ preq = cpy_stdfile(preq, pjob, JOB_ATR_outpath); preq = cpy_stdfile(preq, pjob, JOB_ATR_errpath); /* are there any stage-out files ? */ preq = cpy_stage(preq, pjob, JOB_ATR_stageout, STAGE_DIR_OUT); if (preq != NULL) { /* have files to copy */ if (LOGLEVEL >= 4) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "about to copy stdout/stderr/stageout files"); } preq->rq_extra = (void *)pjob; if (issue_Drequest(handle, preq, on_job_exit, 0) == 0) { /* request sucessfully sent, we'll come back to this function when mom replies */ return; } /* FAILURE */ if (LOGLEVEL >= 1) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "copy request failed"); } /* set up as if mom returned error */ IsFaked = 1; preq->rq_reply.brp_code = PBSE_MOMREJECT; preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL; preq->rq_reply.brp_un.brp_txt.brp_txtlen = 0; /* we will "fall" into the post reply side */ } else { /* no files to copy, go to next step */ svr_setjobstate( pjob, JOB_STATE_EXITING, JOB_SUBSTATE_STAGEDEL); if (LOGLEVEL >= 4) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "no files to copy"); } ptask = set_task(WORK_Immed, 0, on_job_exit, pjob); if (ptask != NULL) { append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask); } return; } } /* END if (ptask->wt_type != WORK_Deferred_Reply) */ /* here we have a reply (maybe faked) from MOM about the copy */ if (preq->rq_reply.brp_code != 0) { int bad; svrattrl tA; /* error from MOM */ snprintf(log_buffer, LOG_BUF_SIZE, msg_obitnocpy, pjob->ji_qs.ji_jobid, pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); if (LOGLEVEL >= 3) { snprintf(log_buffer, LOG_BUF_SIZE, "request to copy stageout files failed on node '%s' for job %s%s", pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str, pjob->ji_qs.ji_jobid, (IsFaked == 1) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) { strncat( log_buffer, preq->rq_reply.brp_un.brp_txt.brp_str, LOG_BUF_SIZE - strlen(log_buffer) - 1); } svr_mailowner(pjob, MAIL_OTHER, MAIL_FORCE, log_buffer); memset(&tA, 0, sizeof(tA)); tA.al_name = "sched_hint"; tA.al_resc = ""; tA.al_value = log_buffer; tA.al_op = SET; modify_job_attr( pjob, &tA, /* I: ATTR_sched_hint - svrattrl */ ATR_DFLAG_MGWR | ATR_DFLAG_SvWR, &bad); } /* END if (preq->rq_reply.brp_code != 0) */ /* * files (generally) copied ok, move on to the next phase by * "faking" the immediate work task. */ /* check to see if we have saved the spool files, that means the mom and server are sharing this spool directory. pbs_server should take ownership of these files and rename them see JOB_SUBSTATE_RETURNSTD above*/ strcpy(namebuf, path_spool); strcat(namebuf, pjob->ji_qs.ji_fileprefix); strcat(namebuf, JOB_STDOUT_SUFFIX); /* allocate space for the string name plus ".SAV" */ namebuf2 = malloc((strlen(namebuf) + 5) * sizeof(char)); strcpy(namebuf2, namebuf); strcat(namebuf2, ".SAV"); spool_file_exists = access(namebuf2, F_OK); if (spool_file_exists == 0) { if (link(namebuf2, namebuf) == -1) { LOG_EVENT( PBSEVENT_ERROR | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "Link(3) in on_job_exit failed"); } unlink(namebuf2); } namebuf[strlen(namebuf) - strlen(JOB_STDOUT_SUFFIX)] = '\0'; strcat(namebuf, JOB_STDERR_SUFFIX); strcpy(namebuf2, namebuf); strcat(namebuf2, ".SAV"); spool_file_exists = access(namebuf2, F_OK); if (spool_file_exists == 0) { if (link(namebuf2, namebuf) == -1) { LOG_EVENT( PBSEVENT_ERROR | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "Link(4) in on_job_exit failed"); } unlink(namebuf2); } free(namebuf2); free_br(preq); preq = NULL; svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_STAGEDEL); ptask->wt_type = WORK_Immed; /* NO BREAK - FALL INTO THE NEXT CASE */ case JOB_SUBSTATE_STAGEDEL: if (LOGLEVEL >= 4) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "JOB_SUBSTATE_STAGEDEL"); } if (ptask->wt_type != WORK_Deferred_Reply) { /* first time in */ /* Build list of files which were staged-in so they can * can be deleted. */ preq = cpy_stage(preq, pjob, JOB_ATR_stagein, 0); if (preq != NULL) { /* have files to delete */ /* change the request type from copy to delete */ preq->rq_type = PBS_BATCH_DelFiles; preq->rq_extra = (void *)pjob; if (issue_Drequest(handle, preq, on_job_exit, 0) == 0) { /* request issued, we'll come back when mom replies */ return; } /* FAILURE */ if (LOGLEVEL >= 2) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "cannot issue file delete request for staged files"); } IsFaked = 1; /* set up as if mom returned error since the issue_Drequest failed */ preq->rq_reply.brp_code = PBSE_MOMREJECT; preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL; /* we will "fall" into the post reply side */ } else { /* preq == NULL, no files to delete */ svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_EXITED); ptask = set_task(WORK_Immed, 0, on_job_exit, pjob); if (ptask) { append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask); } return; } } /* After MOM replied (maybe faked) to Delete Files request */ if (preq->rq_reply.brp_code != 0) { /* an error occurred */ snprintf(log_buffer, LOG_BUF_SIZE, msg_obitnodel, pjob->ji_qs.ji_jobid, pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str); log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); if (LOGLEVEL >= 3) { snprintf(log_buffer, LOG_BUF_SIZE, "request to remove stage-in files failed on node '%s' for job %s%s", pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str, pjob->ji_qs.ji_jobid, (IsFaked == 1) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) { strncat(log_buffer, preq->rq_reply.brp_un.brp_txt.brp_str, LOG_BUF_SIZE - strlen(log_buffer) - 1); } svr_mailowner(pjob, MAIL_OTHER, MAIL_FORCE, log_buffer); } free_br(preq); preq = NULL; svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_EXITED); ptask->wt_type = WORK_Immed; /* NO BREAK, FALL INTO NEXT CASE */ case JOB_SUBSTATE_EXITED: if (LOGLEVEL >= 4) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "JOB_SUBSTATE_EXITED"); } /* tell mom to delete the job, send final track and purge it */ preq = alloc_br(PBS_BATCH_DeleteJob); if (preq != NULL) { strcpy(preq->rq_ind.rq_delete.rq_objname, pjob->ji_qs.ji_jobid); rc = issue_Drequest(handle, preq, release_req, 0); if (rc != 0) { snprintf(log_buffer, LOG_BUF_SIZE, "DeleteJob issue_Drequest failure, rc = %d", rc); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } /* release_req will free preq and close connection */ } /* NOTE: we never check if MOM actually deleted the job */ rel_resc(pjob); /* free any resc assigned to the job */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) issue_track(pjob); /* see if restarted job failed */ if (pjob->ji_wattr[(int)JOB_ATR_checkpoint_restart_status].at_flags & ATR_VFLAG_SET) { char *pfailtype = NULL; char *pfailure = NULL; long *hold_val = 0; char errMsg[21]; strncpy(errMsg, pjob->ji_wattr[(int)JOB_ATR_checkpoint_restart_status].at_val.at_str, 20); pfailtype = strtok(errMsg," "); if (pfailtype != NULL) pfailure = strtok(NULL," "); if (pfailure != NULL) { if (memcmp(pfailure,"failure",7) == 0) { if (memcmp(pfailtype,"Temporary",9) == 0) { /* reque job */ svr_setjobstate(pjob, JOB_STATE_QUEUED, JOB_SUBSTATE_QUEUED); if (LOGLEVEL >= 4) { sprintf(log_buffer, "Requeueing job after checkpoint restart failure: %s", pjob->ji_wattr[(int)JOB_ATR_checkpoint_restart_status].at_val.at_str); log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } return; } /* * If we are deleting job after a failure then the first character * of the comment should no longer be uppercase */ else if (isupper(*pfailtype)) { /* put job on hold */ hold_val = &pjob->ji_wattr[(int)JOB_ATR_hold].at_val.at_long; *hold_val |= HOLD_s; pjob->ji_wattr[(int)JOB_ATR_hold].at_flags |= ATR_VFLAG_SET; pjob->ji_modified = 1; svr_setjobstate(pjob, JOB_STATE_HELD, JOB_SUBSTATE_HELD); if (LOGLEVEL >= 4) { sprintf(log_buffer, "Placing job on hold after checkpoint restart failure: %s", pjob->ji_wattr[(int)JOB_ATR_checkpoint_restart_status].at_val.at_str); log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } return; } } } } svr_setjobstate(pjob, JOB_STATE_COMPLETE, JOB_SUBSTATE_COMPLETE); if ((pque = pjob->ji_qhdr) && (pque != NULL)) { pque->qu_numcompleted++; } ptask->wt_type = WORK_Immed; /* NO BREAK, FALL INTO NEXT CASE */ case JOB_SUBSTATE_COMPLETE: if ((LOGLEVEL >= 4) && (ptask->wt_type == WORK_Immed)) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "JOB_SUBSTATE_COMPLETE"); } if ((pque = pjob->ji_qhdr) && (pque->qu_attr != NULL)) { KeepSeconds = attr_ifelse_long( &pque->qu_attr[(int)QE_ATR_KeepCompleted], &server.sv_attr[(int)SRV_ATR_KeepCompleted], 0); } if (ptask->wt_type == WORK_Immed) { /* first time in */ if (((server.sv_attr[(int)SRV_ATR_JobMustReport].at_flags & ATR_VFLAG_SET) != 0) && (server.sv_attr[(int)SRV_ATR_JobMustReport].at_val.at_long > 0)) { MustReport = TRUE; pjob->ji_wattr[(int)JOB_ATR_reported].at_val.at_long = 0; pjob->ji_wattr[(int)JOB_ATR_reported].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; job_save(pjob,SAVEJOB_FULL); } } else if (((pjob->ji_wattr[(int)JOB_ATR_reported].at_flags & ATR_VFLAG_SET) != 0) && (pjob->ji_wattr[(int)JOB_ATR_reported].at_val.at_long == 0)) { MustReport = TRUE; } if (KeepSeconds <= 0) { if (MustReport) { /* * If job must report is set and keep_completed is 0, * default to JOBMUSTREPORTDEFAULTKEEP seconds */ KeepSeconds = JOBMUSTREPORTDEFAULTKEEP; } else { job_purge(pjob); break; } } if (ptask->wt_type == WORK_Immed) { /* is it first time in or server restart recovery */ if ((handle == -1) && (pjob->ji_wattr[(int)JOB_ATR_comp_time].at_flags & ATR_VFLAG_SET)) { /* * server restart - if we already have a completion_time then we * better be restarting. * use the comp_time to determine task invocation time */ ptask = set_task(WORK_Timed, pjob->ji_wattr[(int)JOB_ATR_comp_time].at_val.at_long + KeepSeconds, on_job_exit, pjob); } else { /* First time in - Set the job completion time */ pjob->ji_wattr[(int)JOB_ATR_comp_time].at_val.at_long = (long)time(NULL); pjob->ji_wattr[(int)JOB_ATR_comp_time].at_flags |= ATR_VFLAG_SET; ptask = set_task(WORK_Timed, time_now + KeepSeconds, on_job_exit, pjob); job_save(pjob, SAVEJOB_FULL); } if (ptask != NULL) { /* insure that work task will be removed if job goes away */ append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask); } } else { /* job has been around long enough */ /* * If the jobs must report attribute is set * then skip the job if it has not yet reported to the scheduler. */ PurgeIt = TRUE; if (((pjob->ji_wattr[(int)JOB_ATR_reported].at_flags & ATR_VFLAG_SET) != 0) && (pjob->ji_wattr[(int)JOB_ATR_reported].at_val.at_long == 0)) { if (LOGLEVEL >= 7) { sprintf(log_buffer, "Bypassing job %s waiting for purge completed command", pjob->ji_qs.ji_jobid); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } ptask = set_task(WORK_Timed,time_now + JOBMUSTREPORTDEFAULTKEEP,on_job_exit,pjob); if (ptask != NULL) { /* insure that work task will be removed if job goes away */ append_link(&pjob->ji_svrtask,&ptask->wt_linkobj,ptask); } PurgeIt = FALSE; } if (PurgeIt) { job_purge(pjob); } } break; } /* END switch (pjob->ji_qs.ji_substate) */ return; } /* END on_job_exit() */ /* * on_job_rerun - Handle the clean up of jobs being rerun. This gets * messy if the job is being executed on another host. Then the * "standard" files must be copied to the server for safe keeping. * * The basic flow is very much like that of on_job_exit(). * The substate will already set to JOB_SUBSTATE_RERUN and the * JOB_SVFLG_HASRUN bit set in ji_svrflags. */ void on_job_rerun( struct work_task *ptask) { int handle; int newstate; int newsubst; int rc = 0; job *pjob; struct batch_request *preq; int IsFaked; if (ptask->wt_type != WORK_Deferred_Reply) { preq = NULL; pjob = (job *)ptask->wt_parm1; } else { preq = (struct batch_request *)ptask->wt_parm1; pjob = (job *)preq->rq_extra; } if ((handle = mom_comm(pjob, on_job_rerun)) < 0) { return; } switch (pjob->ji_qs.ji_substate) { case JOB_SUBSTATE_RERUN: IsFaked = 0; if (ptask->wt_type != WORK_Deferred_Reply) { if (pjob->ji_qs.ji_un.ji_exect.ji_momaddr == pbs_server_addr) { /* files don`t need to be moved, go to next step */ svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN1); ptask = set_task(WORK_Immed, 0, on_job_rerun, pjob); if (ptask) { append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask); } return; } /* here is where we have to save the files */ /* ask mom to send them back to the server */ /* mom deletes her copy if returned ok */ preq = alloc_br(PBS_BATCH_Rerun); if (preq == NULL) { return; } strcpy(preq->rq_ind.rq_rerun, pjob->ji_qs.ji_jobid); preq->rq_extra = (void *)pjob; if (issue_Drequest(handle, preq, on_job_rerun, 0) == 0) { /* request ok, will come back when its done */ return; } /* cannot issue request to mom, set up as if mom returned error */ IsFaked = 1; preq->rq_reply.brp_code = 1; /* we will "fall" into the post reply side */ } /* We get here if MOM replied (may be faked above) */ /* to the rerun (return files) request issued above */ if (preq->rq_reply.brp_code != 0) { /* error */ if (LOGLEVEL >= 3) { snprintf(log_buffer, LOG_BUF_SIZE, "request to save output files failed on node '%s' for job %s%s", pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str, pjob->ji_qs.ji_jobid, (IsFaked == 1) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } /* for now, just log it */ snprintf(log_buffer, LOG_BUF_SIZE, msg_obitnocpy, pjob->ji_qs.ji_jobid, pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN1); ptask->wt_type = WORK_Immed; free_br(preq); preq = NULL; /* NO BREAK, FALL THROUGH TO NEXT CASE, including the request */ case JOB_SUBSTATE_RERUN1: IsFaked = 0; if (ptask->wt_type != WORK_Deferred_Reply) { /* this is the very first call, have mom copy files */ /* are there any stage-out files to process? */ preq = cpy_stage(preq, pjob, JOB_ATR_stageout, STAGE_DIR_OUT); if (preq != NULL) { /* have files to copy */ preq->rq_extra = (void *)pjob; if (issue_Drequest(handle, preq, on_job_rerun, 0) == 0) { return; /* come back when mom replies */ } /* set up as if mom returned error */ IsFaked = 1; preq->rq_reply.brp_code = PBSE_MOMREJECT; preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL; preq->rq_reply.brp_un.brp_txt.brp_txtlen = 0; /* we will "fall" into the post reply side */ } else { /* no files to copy, any to delete? */ svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN2); ptask = set_task(WORK_Immed, 0, on_job_rerun, pjob); if (ptask) { append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask); } return; } } /* here we have a reply (maybe faked) from MOM about the copy */ if (preq->rq_reply.brp_code != 0) { /* error from MOM */ if (LOGLEVEL >= 3) { snprintf(log_buffer, LOG_BUF_SIZE, "request to save stageout files failed on node '%s' for job %s%s", pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str, pjob->ji_qs.ji_jobid, (IsFaked == TRUE) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } snprintf(log_buffer, LOG_BUF_SIZE, msg_obitnocpy, pjob->ji_qs.ji_jobid, pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) { strncat( log_buffer, preq->rq_reply.brp_un.brp_txt.brp_str, LOG_BUF_SIZE - strlen(log_buffer) - 1); } svr_mailowner(pjob, MAIL_OTHER, MAIL_FORCE, log_buffer); } /* * files (generally) copied ok, move on to the next phase by * "faking" the immediate work task. */ free_br(preq); preq = NULL; svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN2); ptask->wt_type = WORK_Immed; /* NO BREAK - FALL INTO THE NEXT CASE */ case JOB_SUBSTATE_RERUN2: IsFaked = 0; if (ptask->wt_type != WORK_Deferred_Reply) { /* here is where we delete any stage-in files */ preq = cpy_stage(preq, pjob, JOB_ATR_stagein, 0); if (preq != NULL) { preq->rq_type = PBS_BATCH_DelFiles; preq->rq_extra = (void *)pjob; if (issue_Drequest(handle, preq, on_job_rerun, 0) == 0) { return; } /* error on sending request */ IsFaked = 1; preq->rq_reply.brp_code = 1; /* we will "fall" into the post reply side */ } else { svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN3); ptask = set_task(WORK_Immed, 0, on_job_rerun, pjob); if (ptask) { append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask); } return; } } /* post reply side for delete file request to MOM */ if (preq->rq_reply.brp_code != 0) { /* error */ if (LOGLEVEL >= 3) { snprintf(log_buffer, LOG_BUF_SIZE, "request to delete stagein files failed on node '%s' for job %s%s", pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str, pjob->ji_qs.ji_jobid, (IsFaked == TRUE) ? "*" : ""); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } /* for now, just log it */ snprintf(log_buffer, LOG_BUF_SIZE, msg_obitnocpy, pjob->ji_qs.ji_jobid, pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } free_br(preq); preq = NULL; svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_RERUN3); ptask->wt_type = WORK_Immed; /* NO BREAK, FALL THROUGH TO NEXT CASE */ case JOB_SUBSTATE_RERUN3: /* need to have MOM delete her copy of the job */ preq = alloc_br(PBS_BATCH_DeleteJob); if (preq != NULL) { strcpy(preq->rq_ind.rq_delete.rq_objname, pjob->ji_qs.ji_jobid); preq->rq_extra = (void *)pjob; rc = issue_Drequest(handle, preq, release_req, 0); if (rc != 0) { snprintf(log_buffer, LOG_BUF_SIZE, "DeleteJob issue_Drequest failure, rc = %d", rc); log_event( PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } /* release_req will free preq and close connection */ } rel_resc(pjob); /* free resc assigned to job */ /* Now re-queue the job */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HOTSTART) == 0) { /* in case of server shutdown, don't clear exec_host */ /* or session_id will use it on hotstart when next comes up */ job_attr_def[(int)JOB_ATR_exec_host].at_free( &pjob->ji_wattr[(int)JOB_ATR_exec_host]); job_attr_def[(int)JOB_ATR_session_id].at_free( &pjob->ji_wattr[(int)JOB_ATR_session_id]); job_attr_def[(int)JOB_ATR_exec_gpus].at_free( &pjob->ji_wattr[JOB_ATR_exec_gpus]); } pjob->ji_modified = 1; /* force full job save */ pjob->ji_momhandle = -1; pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_StagedIn; svr_evaljobstate(pjob, &newstate, &newsubst, 0); svr_setjobstate(pjob, newstate, newsubst); break; } /* END switch (pjob->ji_qs.ji_substate) */ return; } /* END on_job_rerun() */ /* * wait_for_send - recall req_jobobit after delay when race (condition) * goes to an Obit from MOM rather than to the SIGCHLD of the send_job() * child that sent the job to MOM. */ static void wait_for_send( struct work_task *ptask) { req_jobobit((struct batch_request *)ptask->wt_parm1); return; } static int setrerun( job *pjob) { if (pjob->ji_wattr[(int)JOB_ATR_rerunable].at_val.at_long) { /* job is rerunnable */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_RERUN; /* SUCCESS */ return(0); } /* FAILURE */ svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_init_abt); return(1); } /* END setrerun() */ /** * Gets the latest stored used resource information (cput, mem, walltime, etc.) * about the given job. * */ int get_used( svrattrl *patlist, /* I */ char *acctbuf) /* O */ { int retval = FALSE; int amt; int need; amt = RESC_USED_BUF - strlen(acctbuf); while (patlist != NULL) { if (strcmp(patlist->al_name, ATTR_session) == 0) { patlist = (svrattrl *)GET_NEXT(patlist->al_link); continue; } need = strlen(patlist->al_name) + strlen(patlist->al_value) + 3; if (patlist->al_resc) { need += strlen(patlist->al_resc) + 3; } if (need < amt) { strcat(acctbuf, "\n"); strcat(acctbuf, patlist->al_name); if (patlist->al_resc) { strcat(acctbuf, "."); strcat(acctbuf, patlist->al_resc); } strcat(acctbuf, "="); strcat(acctbuf, patlist->al_value); amt -= need; } retval = TRUE; patlist = (svrattrl *)GET_NEXT(patlist->al_link); } return (retval); } /* END get_used() */ /** * Encodes the used resource information (cput, mem, walltime, etc.) * about the given job. * */ #ifdef USESAVEDRESOURCES void encode_job_used( job *pjob, /* I */ tlist_head *phead) /* O */ { attribute *at; attribute_def *ad; resource *rs; at = &pjob->ji_wattr[JOB_ATR_resc_used]; ad = &job_attr_def[JOB_ATR_resc_used]; if ((at->at_flags & ATR_VFLAG_SET) == 0) { return; } for (rs = (resource *)GET_NEXT(at->at_val.at_list); rs != NULL; rs = (resource *)GET_NEXT(rs->rs_link)) { resource_def *rd = rs->rs_defin; attribute val; int rc; val = rs->rs_value; /* copy resource attribute */ rc = rd->rs_encode( &val, phead, ad->at_name, rd->rs_name, ATR_ENCODE_CLIENT); if (rc < 0) break; } /* END for (rs) */ return; } /* END encode_job_used() */ #endif /* USESAVEDRESOURCES */ /* * req_jobobit - process the Job Obituary Notice (request) from MOM. * This notice is sent from MOM when a job terminates. */ void req_jobobit( struct batch_request *preq) /* I */ { #ifdef USESAVEDRESOURCES char id[] = "req_jobobit"; #endif /* USESAVEDRESOURCES */ int alreadymailed = 0; int bad; char acctbuf[RESC_USED_BUF]; int accttail; int exitstatus; #ifdef USESAVEDRESOURCES int have_resc_used = FALSE; #endif char mailbuf[RESC_USED_BUF]; int newstate; int newsubst; char *pc; job *pjob; char jobid[PBS_MAXSVRJOBID+1]; struct work_task *ptask; svrattrl *patlist; unsigned int dummy; strcpy(jobid, preq->rq_ind.rq_jobobit.rq_jid); /* This will be needed later for logging after preq is freed. */ pjob = find_job(preq->rq_ind.rq_jobobit.rq_jid); if ((pjob == NULL) || (pjob->ji_qs.ji_un.ji_exect.ji_momaddr != get_hostaddr( parse_servername(preq->rq_host, &dummy)))) { /* not found or from wrong node */ if ((server_init_type == RECOV_COLD) || (server_init_type == RECOV_CREATE)) { /* tell MOM the job was blown away */ sprintf(log_buffer, msg_obitnojob, preq->rq_host, PBSE_CLEANEDOUT); req_reject(PBSE_CLEANEDOUT, 0, preq, NULL, NULL); } else { sprintf(log_buffer, msg_obitnojob, preq->rq_host, PBSE_UNKJOBID); req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); } log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); return; } /* END if ((pjob == NULL) || ...) */ if (pjob->ji_qs.ji_state != JOB_STATE_RUNNING) { if (pjob->ji_qs.ji_state == JOB_STATE_EXITING) { /* already in exit processing, ignore this request */ bad = PBSE_ALRDYEXIT; } else { /* not running and not exiting - bad news (possible data staging issue) */ /* NOTE: was logged w/msg_obitnojob */ sprintf(log_buffer, "obit received for job %s from host %s with bad state (state: %s)", jobid, preq->rq_host, PJobState[pjob->ji_qs.ji_state]); log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); bad = PBSE_BADSTATE; } req_reject( bad, 0, preq, NULL, NULL); return; } /* END if (pjob->ji_qs.ji_state != JOB_STATE_RUNNING) */ if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_PRERUN) { /* have hit a race condition, the send_job child's SIGCHLD */ /* has not yet been reaped. Must wait for it. */ ptask = set_task(WORK_Timed, time_now + 1, wait_for_send, (void *)preq); if (ptask == NULL) { req_reject(PBSE_SYSTEM, 0, preq, NULL, NULL); } return; } /* * save exit state, update the resources used, and reply to MOM. * * Note, must make the log/mail message about resources used BEFORE * replying to MOM. The reply will free the attribute list so it * cannot be used after the call to reply_ack(); */ exitstatus = preq->rq_ind.rq_jobobit.rq_status; if (LOGLEVEL >= 6) { sprintf(log_buffer, "PBS_BATCH_JobObit exit status: %d", exitstatus); log_event( PBSEVENT_DEBUG, PBSEVENT_JOB, jobid, log_buffer); } pjob->ji_qs.ji_un.ji_exect.ji_exitstat = exitstatus; pjob->ji_wattr[(int)JOB_ATR_exitstat].at_val.at_long = exitstatus; pjob->ji_wattr[(int)JOB_ATR_exitstat].at_flags |= ATR_VFLAG_SET; patlist = (svrattrl *)GET_NEXT(preq->rq_ind.rq_jobobit.rq_attr); /* Encode the final resources_used into the job (useful for keep_completed) */ if (LOGLEVEL >= 2) { log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, "obit received - updating final job usage info"); } modify_job_attr( pjob, patlist, ATR_DFLAG_MGWR | ATR_DFLAG_SvWR, &bad); sprintf(acctbuf, msg_job_end_stat, pjob->ji_qs.ji_un.ji_exect.ji_exitstat); if (exitstatus < 10000) { strcpy(mailbuf, acctbuf); } else { sprintf(mailbuf, msg_job_end_sig, exitstatus - 10000); } accttail = strlen(acctbuf); #ifdef USESAVEDRESOURCES have_resc_used = get_used(patlist, acctbuf); /* if we don't have resources from the obit, use what the job already had */ if (!have_resc_used) { struct batch_request *tmppreq; if (LOGLEVEL >= 7) { log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, "No resources used found"); } tmppreq = alloc_br(PBS_BATCH_JobObit); if (tmppreq == NULL) { /* FAILURE */ sprintf(log_buffer, "cannot allocate memory for temp obit message"); LOG_EVENT( PBSEVENT_DEBUG, PBS_EVENTCLASS_REQUEST, id, log_buffer); return; } CLEAR_HEAD(tmppreq->rq_ind.rq_jobobit.rq_attr); encode_job_used(pjob, &tmppreq->rq_ind.rq_jobobit.rq_attr); patlist = (svrattrl *)GET_NEXT(tmppreq->rq_ind.rq_jobobit.rq_attr); have_resc_used = get_used(patlist, acctbuf); free_br(tmppreq); } #endif /* USESAVEDRESOURCES */ strncat(mailbuf, (acctbuf + accttail), RESC_USED_BUF - strlen(mailbuf) - 1); mailbuf[RESC_USED_BUF - 1] = '\0'; /* make sure ji_momhandle is -1 to force new connection to mom */ pjob->ji_momhandle = -1; reply_ack(preq); /* clear suspended flag if it was set */ pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_Suspend; /* Was there a special exit status from MOM ? */ if (exitstatus < 0) { /* negative exit status is special */ switch (exitstatus) { case JOB_EXEC_OVERLIMIT: /* the job exceeded some resource limit such as walltime, mem, pmem, cput, etc */ svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_momjoboverlimit); alreadymailed = 1; break; case JOB_EXEC_FAIL1: default: /* MOM rejected job with fatal error, abort job */ svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_momnoexec1); alreadymailed = 1; break; case JOB_EXEC_FAIL2: /* MOM reject job after files setup, abort job */ svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_momnoexec2); alreadymailed = 1; break; case JOB_EXEC_INITABT: /* MOM aborted job on her initialization */ alreadymailed = setrerun(pjob); pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN; break; case JOB_EXEC_RETRY: /* MOM rejected job, but said retry it */ if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HASRUN) { /* has run before, treat this as another rerun */ alreadymailed = setrerun(pjob); } else { /* have mom remove job files, not saving them, and requeue job */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_RERUN1; /* transient failure detected */ /* load session id info from prq->rq_ind.rq_jobobit->rq_attr->Session */ /* memset(&tA,0,sizeof(tA)); tA.al_name = "sched_hint"; tA.al_resc = ""; tA.al_value = log_buffer; tA.al_op = SET; modify_job_attr( pjob, &tA, ATR_DFLAG_MGWR | ATR_DFLAG_SvWR, &bad); */ } break; case JOB_EXEC_BADRESRT: /* MOM could not restart job, setup for rerun */ alreadymailed = setrerun(pjob); pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_CHECKPOINT_FILE; break; case JOB_EXEC_INITRST: /* MOM abort job on init, job has checkpoint file */ /* Requeue it, and thats all folks. */ if (LOGLEVEL >= 1) { log_event( PBSEVENT_JOB_USAGE | PBSEVENT_JOB_USAGE, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "received JOB_EXEC_INITRST, setting job CHECKPOINT_FLAG flag"); } rel_resc(pjob); pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN | JOB_SVFLG_CHECKPOINT_FILE; svr_evaljobstate(pjob, &newstate, &newsubst, 1); svr_setjobstate(pjob, newstate, newsubst); svr_disconnect(pjob->ji_momhandle); return; /*NOTREACHED*/ break; case JOB_EXEC_INITRMG: /* MOM abort job on init, job has migratable checkpoint */ /* Must recover output and checkpoint file, do eoj */ alreadymailed = setrerun(pjob); pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN | JOB_SVFLG_CHECKPOINT_MIGRATEABLE; break; } /* END switch (exitstatus) */ } /* END if (exitstatus < 0) */ if (LOGLEVEL >= 2) { sprintf(log_buffer, "job exit status %d handled", exitstatus); log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } /* What do we now do with the job... */ if ((pjob->ji_qs.ji_substate != JOB_SUBSTATE_RERUN) && (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RERUN1)) { /* If job is terminating (not rerun), */ /* update state and send mail */ svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_EXITING); if (alreadymailed == 0) svr_mailowner(pjob, MAIL_END, MAIL_NORMAL, mailbuf); /* replace new-lines with blanks for log message */ for (pc = acctbuf;*pc;++pc) { if (*pc == '\n') *pc = ' '; } /* record accounting and maybe in log */ account_jobend(pjob, acctbuf); if (server.sv_attr[(int)SRV_ATR_log_events].at_val.at_long & PBSEVENT_JOB_USAGE) { /* log events set to record usage */ log_event( PBSEVENT_JOB_USAGE | PBSEVENT_JOB_USAGE, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, acctbuf); } else { /* no usage in log, truncate message */ *(acctbuf + accttail) = '\0'; log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, acctbuf); } ptask = set_task(WORK_Immed, 0, on_job_exit, (void *)pjob); /* decrease array running job count */ if ((pjob->ji_arraystruct != NULL) && (pjob->ji_is_array_template == FALSE)) { update_array_values(pjob->ji_arraystruct, pjob,JOB_STATE_RUNNING,aeTerminate); } if (ptask != NULL) { append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask); if (LOGLEVEL >= 4) { log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, "on_job_exit task assigned to job"); } } /* remove checkpoint restart file if there is one */ if (pjob->ji_wattr[(int)JOB_ATR_restart_name].at_flags & ATR_VFLAG_SET) { cleanup_restart_file(pjob); } /* "on_job_exit()" will be dispatched out of the main loop */ } else { /* Rerunning job, if not checkpointed, clear "resources_used and requeue job */ if ((pjob->ji_qs.ji_svrflags & (JOB_SVFLG_CHECKPOINT_FILE | JOB_SVFLG_CHECKPOINT_MIGRATEABLE)) == 0) { job_attr_def[(int)JOB_ATR_resc_used].at_free(&pjob->ji_wattr[(int)JOB_ATR_resc_used]); } else if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHECKPOINT_FILE) { /* non-migratable checkpoint (cray), leave there */ /* and just requeue the job */ rel_resc(pjob); pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN; svr_evaljobstate(pjob, &newstate, &newsubst, 1); svr_setjobstate(pjob, newstate, newsubst); svr_disconnect(pjob->ji_momhandle); return; } svr_setjobstate( pjob, JOB_STATE_EXITING, pjob->ji_qs.ji_substate); ptask = set_task(WORK_Immed, 0, on_job_rerun, (void *)pjob); if (ptask != NULL) { append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask); if (LOGLEVEL >= 4) { log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, "on_job_rerun task assigned to job"); } } #ifdef RERUNUSAGE /* replace new-lines with blanks for accounting record */ for (pc = acctbuf;*pc;++pc) { if (*pc == '\n') *pc = ' '; } /* record accounting */ account_jobend(pjob, acctbuf); #endif /* RERUNUSAGE */ /* remove checkpoint restart file if there is one */ if (pjob->ji_wattr[(int)JOB_ATR_restart_name].at_flags & ATR_VFLAG_SET) { cleanup_restart_file(pjob); } /* "on_job_rerun()" will be dispatched out of the main loop */ } /* END else */ if (LOGLEVEL >= 4) { sprintf(log_buffer, "job exit status %d handled", exitstatus); log_event( PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, "req_jobobit completed"); } return; } /* END req_jobobit() */ /* END req_jobobit.c */