#include "license_pbs.h" /* See here for the software license */ #include /* the master config generated by configure */ #include "catch_child.h" #include #include #include #include #include #include #include #include #include "dis.h" #include "libpbs.h" #include "portability.h" #include #include #include #include "list_link.h" #include "server_limits.h" #include "attribute.h" #include "resource.h" #include "pbs_job.h" #include "net_cache.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "credential.h" #include "batch_request.h" #include "net_connect.h" #include "svrfunc.h" #include "mom_mach.h" #include "mom_func.h" #include "mom_comm.h" /* im_compose */ #include "pbs_error.h" #include "pbs_proto.h" #include "../lib/Libifl/lib_ifl.h" /* pbs_disconnect_socket */ #include "../server/svr_connect.h" /* svr_disconnect_sock */ #include "mom_job_func.h" /* mom_job_purge */ #include "mom_job_cleanup.h" #ifdef ENABLE_CPA #include "pbs_cpa.h" #endif #ifdef PENABLE_LINUX26_CPUSETS #include "pbs_cpuset.h" #endif #define DIS_REPLY_READ_RETRY 10 /* External Functions */ /* External Globals */ extern char *path_epilog; extern char *path_epiloguser; extern char *path_epilogp; extern char *path_epiloguserp; extern char *path_jobs; extern unsigned int default_server_port; extern tlist_head svr_alljobs; extern tlist_head mom_polljobs; extern int exiting_tasks; extern char *msg_daemonname; extern int termin_child; extern char *path_aux; extern int multi_mom; extern int pbs_rm_port; extern int is_login_node; extern int LOGLEVEL; extern char *PJobSubState[]; extern int PBSNodeCheckProlog; extern int PBSNodeCheckEpilog; extern char *PMOMCommand[]; /* external prototypes */ int release_job_reservation(job *pjob); u_long resc_used(job *, const char *, u_long(*f) (resource *)); void preobit_preparation (job *); void *obit_reply (void *); extern u_long addclient (const char *); extern void encode_used (job *, int, tlist_head *); extern void encode_flagged_attrs (job *, int, tlist_head *); extern void job_nodes (job *); extern int task_recov (job *); extern void mom_server_all_update_stat(void); extern void check_state(int); extern int mark_for_resend (job *); extern void checkpoint_partial(job *pjob); extern void mom_checkpoint_recover(job *pjob); extern void clear_down_mom_servers(); extern int is_mom_server_down(pbs_net_t); extern void set_mom_server_down(pbs_net_t); extern int no_mom_servers_down(); extern char *get_local_script_path(job *pjob, char *base); u_long gettime(resource *); u_long getsize(resource *); /* END external prototypes */ void exit_mom_job(job *pjob, int mom_radix); /* * catch_child() - the signal handler for SIGCHLD. * * To keep the signal handler simple for * SIGCHLD - just indicate there was one. */ void catch_child( int sig) { termin_child = 1; return; } /* END catch_child() */ hnodent *get_node( job *pjob, tm_node_id nodeid) { int i; vnodent *vp = pjob->ji_vnods; for (i = 0;i < pjob->ji_numvnod;i++, vp++) { if (vp->vn_node == nodeid) { return(vp->vn_host); } } return(NULL); } /* END get_node() */ int send_task_obit_response( job *pjob, hnodent *pnode, char *cookie, obitent *pobit, int exitstat) { int i; int ret; int stream; struct tcp_chan *chan = NULL; for (i = 0; i < 5; i++) { ret = -1; stream = tcp_connect_sockaddr((struct sockaddr *)&pnode->sock_addr,sizeof(pnode->sock_addr)); if (IS_VALID_STREAM(stream)) { if ((chan = DIS_tcp_setup(stream)) != NULL) { if ((ret = im_compose( chan, pjob->ji_qs.ji_jobid, cookie, IM_ALL_OKAY, pobit->oe_info.fe_event, pobit->oe_info.fe_taskid)) != DIS_SUCCESS) { } else if ((ret = diswsi(chan, exitstat)) != DIS_SUCCESS) { } else { ret = DIS_tcp_wflush(chan); } DIS_tcp_close(chan); } else { close(stream); } } else if (stream == PERMANENT_SOCKET_FAIL) break;; if (ret == DIS_SUCCESS) break; usleep(10); } return(ret); } /* END send_task_obit_response() */ /** * For all jobs in MOM * ignore job if job's pbs_server is down * for all tasks in job * ignore task if task state is not exiting * if task is master, send kill to all sisters * process TM client obits * if I am sister, do sister stuff and continue * kill_job * execute preobit_preparation() * set job substate to JOB_SUBSTATE_PREOBIT * * @see main_loop() - parent * @see scan_for_terminated() * @see post_epilog() * @see preobit_preparation() - registered to handle response to preobit * @see send_sisters() - child * @see kill_job() - child * * Obit Overview: * - main_loop() * - scan_for_terminated() * uses waitpid() to detect completed children * First Pass: catches SIGCHLD of job executable to identify when job * tasks terminate, issues kill_task(), and marks job task ti_status * as TI_STATE_EXITED which is detected and processed inside of * scan_for_exiting() * Second Pass: catches SIGCHLD for job epilog child and exec's * job's ji_mompost (post_epilog) * * - scan_for_exiting() * called after scan_for_terminated and looks at jobs to identify which * have exiting tasks. Sends kill to all sisters via send_sisters(), * sets job substate to JOB_SUBSTATE_EXITING, issues kill_job, and * then sets job substate to JOB_SUBSTATE_PREOBIT. This routine then * creates the preobit message and sends it to pbs_server. * calls preobit_preparation() * * - preobit_preparation() -- see header for function * * - post_epilog() * sends obit to pbs_server and registers obit_reply() as connection handler * * - obit_reply() * sets job substate to EXITED * END OF JOB LIFECYCLE * * when job completes and process id goes away scan_for_terminated() * * OVERALL FLOW: * - scan_for_terminating() - PHASE I * - KILL TASK * - scan_for_exiting() * - KILL SISTERS * - SEND PREOBIT TO PBS_SERVER * - preobit_preparation() - FORK AND EXEC EPILOG * - scan_for_terminating() - PHASE II * - post_epilog() * - SEND OBIT TO PBS_SERVER * - obit_reply() * * STATE TRANSITIONS: * JOB_SUBSTATE_RUNNING (42) * JOB_SUBSTATE_EXITING (50) - scan_for_exiting() * JOB_SUBSTATE_PREOBIT (57) - scan_for_exiting() * JOB_SUBSTATE_OBIT (58) - preobit_preparation() */ void scan_for_exiting(void) { int found_one = 0; job *nextjob; job *pjob; task *ptask; obitent *pobit; #ifndef NUMA_SUPPORT char *cookie; #endif task *task_find(job *, tm_task_id); unsigned int momport = 0; static int ForceObit = -1; /* boolean - if TRUE, ObitsAllowed will be enforced */ static int ObitsAllowed = 1; int mom_radix = 0; int NumSisters; /* ** Look through the jobs. Each one has it's tasks examined ** and if the job is EXITING, it meets it's fate depending ** on whether this is the Mother Superior or not. */ if (LOGLEVEL >= 3) { log_record( PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, __func__, "searching for exiting jobs"); } if (ForceObit == -1) { /* NOTE: Allow sites to locally specify obit groupings larger than 1. */ /* Remove after 6/1/2008 if no further obit issues are encountered */ char *ptr; if ((ptr = getenv("TORQUEFORCESEND")) != NULL) { int tmpI; tmpI = (int)strtol(ptr, NULL, 10); if (tmpI > 0) ObitsAllowed = tmpI; ForceObit = 1; } else { ForceObit = 1; } } /* END if (ForceObit == -1) */ clear_down_mom_servers(); /* do not change this from the nextjob formal. In some cases pjob has * been freed by the time that the loop comes around */ for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = nextjob) { nextjob = (job *)GET_NEXT(pjob->ji_alljobs); /* * Bypass job if it is for a server that we know is down */ if (is_mom_server_down(pjob->ji_qs.ji_un.ji_momt.ji_svraddr)) { if (LOGLEVEL >= 3) { snprintf(log_buffer, sizeof(log_buffer), "not checking job %s - server is down", pjob->ji_qs.ji_jobid); log_record( PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, __func__, log_buffer); } continue; } /* ** If a checkpoint with aborts is active, ** skip it. We don't want to report any obits ** until we know that the whole thing worked. */ if (pjob->ji_flags & MOM_CHECKPOINT_ACTIVE) { continue; } /* ** If the job has had an error doing a checkpoint with ** abort, the MOM_CHECKPOINT_POST flag will be on. */ if (pjob->ji_flags & MOM_CHECKPOINT_POST) { checkpoint_partial(pjob); continue; } if (!(pjob->ji_wattr[JOB_ATR_Cookie].at_flags & ATR_VFLAG_SET)) { continue; } #ifndef NUMA_SUPPORT cookie = pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str; #endif /* ** Check each EXITED task. They transition to DEAD here. */ if (LOGLEVEL >= 6) { log_record( PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, __func__, "working on a job"); } for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { if (ptask->ti_qs.ti_status != TI_STATE_EXITED) continue; /* ** Check if it is the top shell. */ if (ptask->ti_qs.ti_parenttask == TM_NULL_TASK) { /* master task is in state TI_STATE_EXITED */ if ((pjob->ji_qs.ji_un.ji_momt.ji_exitstat != JOB_EXEC_OVERLIMIT_MEM) && (pjob->ji_qs.ji_un.ji_momt.ji_exitstat != JOB_EXEC_OVERLIMIT_WT) && (pjob->ji_qs.ji_un.ji_momt.ji_exitstat != JOB_EXEC_OVERLIMIT_CPUT)) { /* do not over-write JOB_EXEC_OVERLIMIT */ pjob->ji_qs.ji_un.ji_momt.ji_exitstat = ptask->ti_qs.ti_exitstat; } log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "job was terminated"); mom_radix = pjob->ji_wattr[JOB_ATR_job_radix].at_val.at_long; if (mom_radix < 2) { NumSisters = send_sisters(pjob, IM_KILL_JOB, FALSE); if (NumSisters > 0) { pjob->ji_qs.ji_substate = JOB_SUBSTATE_MOM_WAIT; pjob->ji_kill_started = time(NULL); } } else { NumSisters = 1; /* We use this for later */ if (pjob->ji_sampletim == 0) { pjob->ji_sampletim = time(NULL); if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM) == 0) { /* only call send_sisters with radix == TRUE if this * is mother superior intermediate moms already called * this in im_request IM_KILL_JOB_RADIX */ NumSisters = send_sisters(pjob, IM_KILL_JOB_RADIX, TRUE); pjob->ji_outstanding = NumSisters; } } else { time_t time_now; time_now = time(NULL); if (time_now - pjob->ji_sampletim > 5) { if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM) == 0) { /* only call send_sisters with radix == TRUE if this is * mother superior intermediate moms already called this * in im_request IM_KILL_JOB_RADIX */ NumSisters = send_sisters(pjob, IM_KILL_JOB_RADIX, TRUE); pjob->ji_outstanding = NumSisters; } } } } if (NumSisters == 0) { /* no sisters contacted - should be a serial job */ if (LOGLEVEL >= 3) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "no sisters contacted - setting job substate to EXITING"); } pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); } else if (LOGLEVEL >= 3) { snprintf(log_buffer, sizeof(log_buffer), "%s: master task has exited - sent kill job request to %d sisters", __func__, NumSisters); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); /* job is waiting for the reply from other sisters * before it exits */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_MOM_WAIT; pjob->ji_kill_started = time(NULL); } } /* END if (ptask->ti_qs.ti_parenttask == TM_NULL_TASK) */ /* ** process any TM client obits waiting. */ while ((pobit = (obitent *)GET_NEXT(ptask->ti_obits)) != NULL) { #ifndef NUMA_SUPPORT hnodent *pnode; pnode = get_node(pjob, pobit->oe_info.fe_node); /* see if this is me or another MOM */ /* for NUMA, we are always the mother superior and the correct * node for everything to happen */ if ((pnode != NULL) && (pjob->ji_nodeid == pnode->hn_node)) #endif /* ndef NUMA_SUPPORT */ { task *tmp_task; /* send event to local child */ tmp_task = task_find(pjob, pobit->oe_info.fe_taskid); if ((tmp_task != NULL) && (tmp_task->ti_chan != NULL)) { tm_reply(tmp_task->ti_chan, IM_ALL_OKAY, pobit->oe_info.fe_event); diswsi(tmp_task->ti_chan, ptask->ti_qs.ti_exitstat); DIS_tcp_wflush(tmp_task->ti_chan); } } #ifndef NUMA_SUPPORT else { /* ** Send a response over to MOM ** whose child sent the request. */ send_task_obit_response(pjob, pnode, cookie, pobit, ptask->ti_qs.ti_exitstat); } #endif /* ndef NUMA_SUPPORT */ delete_link(&pobit->oe_next); free(pobit); } /* END while (pobit) */ if (ptask->ti_chan != NULL) { DIS_tcp_cleanup(ptask->ti_chan); ptask->ti_chan = NULL; } ptask->ti_qs.ti_status = TI_STATE_DEAD; if (LOGLEVEL >= 3) { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "task is dead"); } task_save(ptask); } /* END for (ptask) */ /* If we are an intermediate mom we need to see if everyone has checked in */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM)) { if ((pjob->ji_qs.ji_substate != JOB_SUBSTATE_EXITING) && (pjob->ji_qs.ji_substate != JOB_SUBSTATE_NOTERM_REQUE)) { if (LOGLEVEL >= 3) { snprintf(log_buffer, sizeof(log_buffer), "%s:intermediate mom has not received reply from all siblings", __func__); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } continue; } } /* ** Look to see if the job has terminated. If it is ** in any state other than EXITING continue on. */ if ((pjob->ji_qs.ji_substate != JOB_SUBSTATE_EXITING) && (pjob->ji_qs.ji_substate != JOB_SUBSTATE_EXIT_WAIT) && (pjob->ji_qs.ji_substate != JOB_SUBSTATE_NOTERM_REQUE)) { if (LOGLEVEL >= 3) { snprintf(log_buffer, sizeof(log_buffer), "%s:job is in non-exiting substate %s, no obit sent at this time", __func__, PJobSubState[pjob->ji_qs.ji_substate]); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_EXITED) { /* This is quasi odd. If we are in an EXITED substate then we already sent the obit to the server and it replied. But we have not received a PBS_BATCH_DeleteJob request from the server. If we have tasks to complete continue. But if there are no tasks left to run we need to delete the job.*/ ptask = (task *)GET_NEXT(pjob->ji_tasks); if (ptask == NULL) mom_deljob(pjob); } continue; } /* ** Look to see if I am a regular sister. If so, ** check to see if there is an obit event to ** send back to mother superior. ** Otherwise, I need to wait for her to send a KILL_JOB ** so I can send the obit (unless she died). */ #ifndef NUMA_SUPPORT if ((am_i_mother_superior(*pjob) == false) && ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM) == 0)) { mom_radix = pjob->ji_wattr[JOB_ATR_job_radix].at_val.at_long; exit_mom_job(pjob, mom_radix); continue; } /* Are we an intermediate mom? If so kill our job and tell the mom who called us */ if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM) { if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_EXITING) { mom_radix = pjob->ji_wattr[JOB_ATR_job_radix].at_val.at_long; exit_mom_job(pjob, mom_radix); } else { kill_job(pjob, SIGKILL, __func__, "kill_job message received"); pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; /*pjob->ji_obit = event;*/ if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); exiting_tasks = 1; /* Setting this to 1 will cause scan_for_exiting to execute */ } continue; } #endif /* NUMA_SUPPORT */ /* * At this point, we know we are Mother Superior for this * job which is EXITING. Time for it to die. */ pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_Suspend; if ((pjob->ji_qs.ji_substate != JOB_SUBSTATE_NOTERM_REQUE) && (pjob->ji_qs.ji_substate != JOB_SUBSTATE_EXIT_WAIT)) kill_job(pjob, SIGKILL, __func__, "local task termination detected"); else { ptask = (task *)GET_NEXT(pjob->ji_tasks); while (ptask != NULL) { if (ptask->ti_qs.ti_status == TI_STATE_RUNNING) { if (LOGLEVEL >= 4) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "kill_job found a task to kill"); } if (pjob->ji_qs.ji_un.ji_momt.ji_exitstat != 0) ptask->ti_qs.ti_exitstat = pjob->ji_qs.ji_un.ji_momt.ji_exitstat; else ptask->ti_qs.ti_exitstat = 0; /* assume successful completion */ ptask->ti_qs.ti_status = TI_STATE_EXITED; task_save(ptask); } ptask = (task *)GET_NEXT(ptask->ti_jobtask); } /* END while (ptask != NULL) */ } #ifdef ENABLE_CPA if (CPADestroyPartition(pjob) != 0) continue; #endif delete_link(&pjob->ji_jobque); /* unlink for poll list */ /* * + Open connection to the Server (for the Job Obituary) * + Set the connection to call obit_reply when the reply * arrives. * + fork child process, parent looks for more terminated jobs. * Child: * + Run the epilogue script (if one) * + Send the Job Obit Request (notice). */ if (LOGLEVEL >= 6) { log_record( PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, __func__, "calling mom_open_socket_to_jobs_server"); } /* epilogues are now run by preobit reply, which happens after the fork */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_PREOBIT; preobit_preparation(pjob); if (found_one++ >= ObitsAllowed) { /* do not exceed max obits per iteration limit */ break; } } /* END for (pjob) */ if ((pjob == NULL) && (no_mom_servers_down())) { /* search finished */ exiting_tasks = 0; /* went through all jobs */ } return; } /* END scan_for_exiting() */ int run_epilogues( job *pjob, int i_am_ms, int deletejob) { char *path_epiloguserjob; resource *presc; int io_type = PE_IO_TYPE_STD; int rc; if ((pjob->ji_wattr[JOB_ATR_interactive].at_flags & ATR_VFLAG_SET) && pjob->ji_wattr[JOB_ATR_interactive].at_val.at_long) { io_type = PE_IO_TYPE_NULL; } if (i_am_ms == TRUE) { presc = find_resc_entry(&pjob->ji_wattr[JOB_ATR_resource], find_resc_def(svr_resc_def, "epilogue", svr_resc_size)); if ((presc != NULL)) { if ((presc->rs_value.at_flags & ATR_VFLAG_SET) && (presc->rs_value.at_val.at_str != NULL)) { path_epiloguserjob = get_local_script_path(pjob, presc->rs_value.at_val.at_str); if (path_epiloguserjob) { if (run_pelog(PE_EPILOGUSERJOB, path_epiloguserjob, pjob, io_type, deletejob) != 0) { log_err(-1, __func__, "user local epilog failed"); } free(path_epiloguserjob); } } } if (run_pelog(PE_EPILOGUSER, path_epiloguser, pjob, io_type, deletejob) != 0) log_err(-1, __func__, "user epilog failed - interactive job"); if ((rc = run_pelog(PE_EPILOG, path_epilog, pjob, io_type, deletejob)) != 0) { sprintf(log_buffer, "system epilog failed w/rc=%d", rc); log_err(-1, __func__, log_buffer); } } else { if (run_pelog(PE_EPILOGUSER, path_epiloguserp, pjob, io_type, deletejob) != 0) log_err(-1, __func__, "user epilog failed - interactive job"); if (run_pelog(PE_EPILOG, path_epilogp, pjob, io_type, deletejob) != 0) { log_err(-1, __func__, "parallel epilog failed"); } } return(PBSE_NONE); } /* run_epilogues() */ /** * Send obit to server. * * @see scan_for_terminated() - calls post_epilog() via ji_mompost job pbs_attribute * @see mom_open_socket_to_jobs_server() - child * @see obit_reply() - registered handler for obit connection * * @see scan_for_exiting() for Obit overview */ int post_epilogue( job *pjob, /* I */ int ev) /* I exit value (only used to determine if retrying obit) */ { int sock; int resc_access_perm; struct batch_request *preq; struct tcp_chan *chan = NULL; if (LOGLEVEL >= 2) { sprintf(log_buffer, "preparing obit message for job %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_REQUEST, __func__, log_buffer); } /* This is the child code */ /* open new connection - register obit_reply as handler */ sock = mom_open_socket_to_jobs_server(pjob, __func__, obit_reply); if (sock < 0) { /* FAILURE */ if ((errno == EINTR) || (errno == ETIMEDOUT) || (errno == EINPROGRESS)) { /* transient failure - server/network up but busy... retry */ int retrycount; for (retrycount = 0;retrycount < 2;retrycount++) { sock = mom_open_socket_to_jobs_server(pjob, __func__, obit_reply); if (sock >= 0) break; } /* END for (retrycount) */ } if (sock < 0) { /* We are trying to send obit, but failed - where is this retried? * Answer: In the main_loop examine_all_jobs_to_resend() tries * every so often to send the obit. This would work for recovered * jobs also. */ if (ev != MOM_OBIT_RETRY) { mark_for_resend(pjob); } return(1); } } /* send the job obiturary notice to the server */ preq = alloc_br(PBS_BATCH_JobObit); if (preq == NULL) { /* FAILURE */ sprintf(log_buffer, "cannot allocate memory for obit message"); log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_REQUEST, __func__, log_buffer); close_conn(sock, FALSE); return(1); } strcpy(preq->rq_ind.rq_jobobit.rq_jid, pjob->ji_qs.ji_jobid); if (pjob->ji_job_is_being_rerun == TRUE) { pjob->ji_qs.ji_un.ji_momt.ji_exitstat = 0; } preq->rq_ind.rq_jobobit.rq_status = pjob->ji_qs.ji_un.ji_momt.ji_exitstat; CLEAR_HEAD(preq->rq_ind.rq_jobobit.rq_attr); resc_access_perm = ATR_DFLAG_RDACC; encode_used(pjob, resc_access_perm, &preq->rq_ind.rq_jobobit.rq_attr); encode_flagged_attrs(pjob, resc_access_perm, &preq->rq_ind.rq_jobobit.rq_attr); if ((chan = DIS_tcp_setup(sock)) == NULL) { } else if (encode_DIS_ReqHdr(chan, PBS_BATCH_JobObit, pbs_current_user) || encode_DIS_JobObit(chan, preq) || encode_DIS_ReqExtend(chan, 0)) { /* FAILURE */ sprintf(log_buffer, "cannot create obit message for job %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_REQUEST, __func__, log_buffer); close_conn(chan->sock, FALSE); DIS_tcp_cleanup(chan); free_br(preq); return(1); } if (chan != NULL) { DIS_tcp_wflush(chan); DIS_tcp_cleanup(chan); } free_br(preq); /* SUCCESS */ /* FYI: socket gets closed and pjob->ji_momhandle is unset in obit_reply, the reply handler */ log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "obit sent to server"); return(0); } /* END post_epilogue() */ /** * preobit_preparation * * @see scan_for_exiting() - parent * @see mom_deljob() - child * @see run_pelog() - child * * This function is run from scan_for_exiting(). * It will fork: * - the child will run the epilogues and release the ALPS reservation if this is a login node. * - the parent will mark this job as ready to send the obit and mark post_epilogue as the * next step for its processing. * * @pre-cond: pjob must be a valid job * @post-cond: a child process will be running the epilogues and releasing the ALPS * reservation. this process will know the job needs its obit sent. */ void preobit_preparation( job *pjob) /* I */ { pid_t cpid; exiting_job_info *eji; log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, __func__, "top"); cpid = fork_me(-1); if (cpid < 0) { /* FAILURE */ log_record( PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "fork failed in preobit_preparation"); return; } if (cpid > 0) { /* parent - mark that job epilog subtask has been launched */ /* NOTE: pjob->ji_mompost will be executed in scan_for_terminated() */ eji = (exiting_job_info *)calloc(1, sizeof(exiting_job_info)); strcpy(eji->jobid, pjob->ji_qs.ji_jobid); eji->obit_sent = time(NULL); insert_thing(exiting_job_list, eji); pjob->ji_qs.ji_substate = JOB_SUBSTATE_OBIT; pjob->ji_momsubt = cpid; pjob->ji_mompost = post_epilogue; pjob->ji_momhandle = -1; if (LOGLEVEL >= 2) { snprintf(log_buffer, sizeof(log_buffer), "epilog subtask created with pid %d - substate set to JOB_SUBSTATE_OBIT - registered post_epilogue", cpid); log_record(PBSEVENT_DEBUG,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } return; } /* child - just run epilogues */ run_epilogues(pjob, TRUE, FALSE); /* for cray, release the reservation now so that the job isn't reported * as finished until the reservation is kaput. This is important for the * cray because nodes are job exclusive so a lingering reservation causes * a job failure. */ if (is_login_node == TRUE) release_job_reservation(pjob); exit(0); } /* END preobit_preparation() */ /* * obit_reply * * This function is a message handler that is hooked to a server connection. * The connection is established in post_epilogue(). * * A socket connection to the server is opened, a job obiturary notice * message is sent to the server, and then at some later time, the server * sends back a reply and we end up here. * * On success, this routine sets the job's substate to EXITED * * @see post_epilogue() - registers obit_reply via add_conn() */ void *obit_reply( void *new_sock) /* I */ { int irtn; job *nxjob; job *pjob; pbs_attribute *pattr; unsigned int momport = 0; char tmp_line[MAXLINE]; struct batch_request *preq; int x; /* dummy */ int sock = *(int *)new_sock; struct tcp_chan *chan = NULL; int count = 0; /* read and decode the reply */ if ((preq = alloc_br(PBS_BATCH_JobObit)) == NULL) return(NULL); CLEAR_HEAD(preq->rq_ind.rq_jobobit.rq_attr); if ((chan = DIS_tcp_setup(sock)) == NULL) { free_br(preq); return(NULL); } /* make sure errno isn't stale */ errno = 0; while ((irtn = DIS_reply_read(chan, &preq->rq_reply)) && (errno == EINTR) && (count < DIS_REPLY_READ_RETRY)) count++; DIS_tcp_cleanup(chan); if (irtn != 0) { /* NOTE: irtn is of type DIS_* in include/dis.h, see dis_emsg[] */ sprintf(log_buffer, "DIS_reply_read failed, rc=%d sock=%d", irtn, sock); log_err(errno, __func__, log_buffer); preq->rq_reply.brp_code = -1; } /* find the job associated with the reply by the socket number */ /* saved in the job structure, ji_momhandle */ pjob = (job *)GET_NEXT(svr_alljobs); while (pjob != NULL) { nxjob = (job *)GET_NEXT(pjob->ji_alljobs); if ((pjob->ji_qs.ji_substate == JOB_SUBSTATE_OBIT) && (pjob->ji_momhandle == sock)) { /* Clear out destination so we know job is not on mom any more */ pjob->ji_qs.ji_destin[0] = '\0'; switch (preq->rq_reply.brp_code) { case PBSE_NONE: /* normal ack, mark job as exited */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITED; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); if (LOGLEVEL >= 4) { log_event( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "job obit acknowledge received - substate set to JOB_SUBSTATE_EXITED"); } break; case PBSE_ALRDYEXIT: /* have already told the server before recovery */ /* the server will contact us to continue */ if (LOGLEVEL >= 7) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "setting already exited job substate to EXITED"); } pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITED; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); break; case PBSE_CLEANEDOUT: /* all jobs discarded by server, discard job */ pattr = &pjob->ji_wattr[JOB_ATR_interactive]; if (((pattr->at_flags & ATR_VFLAG_SET) == 0) || (pattr->at_val.at_long == 0)) { /* do this if not interactive */ job_unlink_file(pjob, std_file_name(pjob, StdOut, &x)); job_unlink_file(pjob, std_file_name(pjob, StdErr, &x)); job_unlink_file(pjob, std_file_name(pjob, Checkpoint, &x)); } mom_deljob(pjob); break; case - 1: /* FIXME - causes epilogue to be run twice! */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; exiting_tasks = 1; break; default: { switch (preq->rq_reply.brp_code) { case PBSE_BADSTATE: sprintf(tmp_line, "server rejected job obit - unexpected job state"); break; case PBSE_SYSTEM: sprintf(tmp_line, "server rejected job obit - server not ready for job completion"); break; default: sprintf(tmp_line, "server rejected job obit - %d", preq->rq_reply.brp_code); break; } /* END switch (preq->rq_reply.brp_code) */ log_ext(-1,__func__,tmp_line,LOG_ALERT); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, tmp_line); } /* END BLOCK */ mom_deljob(pjob); break; } /* END switch (preq->rq_reply.brp_code) */ break; } /* END if (...) */ else { if (pjob->ji_momhandle == sock) { if (preq->rq_reply.brp_code == PBSE_UNKJOBID) { sprintf(tmp_line, "Unknown job id on server. Setting to exited and deleting"); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, tmp_line); pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITED; /* This means the server has no idea what this job is * and it should be deleted!!! */ mom_deljob(pjob); } else if (preq->rq_reply.brp_code == PBSE_ALRDYEXIT) { sprintf(tmp_line, "Job already in exit state on server. Setting to exited"); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, tmp_line); pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITED; } /* Commenting for now. The mom's are way to chatty right now */ /* else { sprintf(tmp_line, "Current state is: %d code (%d) sock (%d) - unknown Job state/request", pjob->ji_qs.ji_substate, preq->rq_reply.brp_code, sock); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, tmp_line); } */ } } pjob = nxjob; } /* END while (pjob != NULL) */ if (pjob == NULL) { log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_REQUEST, __func__, "Job not found for obit reply"); } free_br(preq); /* MUTSU - Should these two commands be moved to the top? */ pbs_disconnect_socket(sock); close_conn(sock, FALSE); if (PBSNodeCheckEpilog) { check_state(1); mom_server_all_update_stat(); } return(NULL); } /* END obit_reply() */ int has_exec_host_and_port( job *pjob) { char *hosts; char *ports; int has_exec_info = TRUE; if (((pjob->ji_wattr[JOB_ATR_exec_host].at_flags & ATR_VFLAG_SET) == FALSE) || ((pjob->ji_wattr[JOB_ATR_exec_port].at_flags & ATR_VFLAG_SET) == FALSE)) { has_exec_info = FALSE; } else { hosts = pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str; ports = pjob->ji_wattr[JOB_ATR_exec_port].at_val.at_str; if ((hosts == NULL) || (ports == NULL)) has_exec_info = FALSE; } return(has_exec_info); } /* END has_exec_host_and_port() */ /* * init_abort_jobs - on mom initialization, recover all running jobs. * * Called on initialization * If the -p option was given (default) (recover = JOB_RECOV_RUNNING), Mom will allow the jobs * to continue to run. She depends on detecting when they terminate * via the slow poll method rather than SIGCHLD. * * If the -r option was given (recover = JOB_RECOV_TERM_REQUE), MOM is * recovering on a running system and the session id of the jobs should be valid; * the job processes are killed and the job is re-queued * * If -q was given (recover = JOB_RECOV_RQUE), it is assumed that the whole * system, not just MOM, is coming up, the session ids are not valid; * so no attempt is made to kill the job processes. But the jobs are * terminated and requeued. * * If the -P option was given (recover == JOB_RECOV_DELETE), no attempt is * made to recover the jobs. The jobs are deleted from the queue. */ void init_abort_jobs( int recover) /* I (boolean) */ { DIR *dir; int i; int rc; #ifndef NUMA_SUPPORT int j; int sisters; int mom_radix = 0; #endif /* ndef NUMA_SUPPORT */ struct dirent *pdirent; job *pj; const char *job_suffix = JOB_FILE_SUFFIX; int job_suf_len = strlen(job_suffix); char *psuffix; unsigned int momport = 0; if (LOGLEVEL >= 6) { sprintf(log_buffer, "%s: recover=%d", __func__, recover); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); } dir = opendir(path_jobs); if (dir == NULL) { sprintf(log_buffer, "cannot open job directory '%s'", path_jobs); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); exit(1); } while ((pdirent = readdir(dir)) != NULL) { if ((i = strlen(pdirent->d_name)) <= job_suf_len) continue; psuffix = pdirent->d_name + i - job_suf_len; if (strcmp(psuffix, job_suffix)) continue; pj = job_recov(pdirent->d_name); if (pj == NULL) { sprintf(log_buffer, "%s: NULL job pointer", __func__); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); continue; } if (has_exec_host_and_port(pj) == FALSE) { /* if you don't have the exec information, you can't recover the job */ mom_deljob(pj); continue; } /* PW: mpiexec patch - set the globid so mom does not coredump in response to tm_spawn */ set_globid(pj, NULL); append_link(&svr_alljobs, &pj->ji_alljobs, pj); job_nodes(pj); rc = task_recov(pj); if (LOGLEVEL >= 2) { sprintf(log_buffer, "task recovery %s for job %s, rc=%d", (rc == 0) ? "succeeded" : "failed", pj->ji_qs.ji_jobid, rc); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, __func__, log_buffer); } mom_checkpoint_recover(pj); /* * make sure we trust connections from sisters in case we get an * IM request before we get the real addr list from server. * Note: this only works after the job_nodes() call above. */ #ifndef NUMA_SUPPORT for (j = 0;j < pj->ji_numnodes;j++) { if (LOGLEVEL >= 6) { sprintf(log_buffer, "%s: adding client %s", __func__, pj->ji_hosts[j].hn_host); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); } addclient(pj->ji_hosts[j].hn_host); } /* END for (j) */ #endif /* ndef NUMA_SUPPORT */ if (LOGLEVEL >= 4) { sprintf(log_buffer, "successfully recovered job %s", pj->ji_qs.ji_jobid); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, __func__, log_buffer); } /* code moved to here because even when we're canceling jobs, if there is a * user epilogue we'll attempt to become the user, so if ji_grpcache is * NULL then we'll get a crash */ if (pj->ji_grpcache == NULL) { DBPRT(("init_abort_jobs: setting grpcache for job %s\n", pj->ji_qs.ji_jobid)); if (check_pwd(pj) == NULL) { /* somehow a job that was legally executing (had a password entry) * no longer has a password entry?? */ snprintf(log_buffer, sizeof(log_buffer), "job %s no longer has valid password entry - deleting", pj->ji_qs.ji_jobid); log_err(-1, __func__, log_buffer); mom_deljob(pj); continue; } } if ((recover != JOB_RECOV_RUNNING) && (recover != JOB_RECOV_DELETE) && ((pj->ji_qs.ji_substate == JOB_SUBSTATE_RUNNING) || (pj->ji_qs.ji_substate == JOB_SUBSTATE_PRERUN) || (pj->ji_qs.ji_substate == JOB_SUBSTATE_SUSPEND) || (pj->ji_qs.ji_substate == JOB_SUBSTATE_EXITED) || (pj->ji_qs.ji_substate == JOB_SUBSTATE_NOTERM_REQUE) || (pj->ji_qs.ji_substate == JOB_SUBSTATE_EXITING))) { if (LOGLEVEL >= 2) { sprintf(log_buffer, "job %s recovered in active state %s (full recover not enabled)", pj->ji_qs.ji_jobid, PJobSubState[pj->ji_qs.ji_substate]); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, __func__, log_buffer); } if (recover == JOB_RECOV_TERM_REQUE) /* -r option was used to start mom */ { kill_job(pj, SIGKILL, __func__, "recover is non-zero"); } /* ** Check to see if I am Mother Superior. The ** JOB_SVFLG_HERE flag is overloaded for MOM ** for this purpose. ** If I'm an ordinary sister, just throw the job ** away. If I am MS, send a KILL_JOB request to ** any sisters that happen to still be alive. */ if (am_i_mother_superior(*pj) == false) { if (LOGLEVEL >= 2) { sprintf(log_buffer, "local host is not mother-superior, deleting job %s", pj->ji_qs.ji_jobid); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, __func__, log_buffer); } mom_deljob(pj); continue; } if (LOGLEVEL >= 2) { sprintf(log_buffer, "setting job state to exiting for job %s in state %s", pj->ji_qs.ji_jobid, PJobSubState[pj->ji_qs.ji_substate]); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, __func__, log_buffer); } /* set exit status to: * JOB_EXEC_INITABT - init abort and no checkpoint * JOB_EXEC_INITRST - init and checkpoint, no mig * JOB_EXEC_INITRMG - init and checkpoint, migrate * to indicate recovery abort */ if (pj->ji_qs.ji_svrflags & (JOB_SVFLG_CHECKPOINT_FILE | JOB_SVFLG_CHECKPOINT_MIGRATEABLE)) { #if PBS_CHKPT_MIGRATE pj->ji_qs.ji_un.ji_momt.ji_exitstat = JOB_EXEC_INITRMG; #else pj->ji_qs.ji_un.ji_momt.ji_exitstat = JOB_EXEC_INITRST; #endif } else { pj->ji_qs.ji_un.ji_momt.ji_exitstat = JOB_EXEC_INITABT; } #ifndef NUMA_SUPPORT sisters = pj->ji_numnodes - 1; /* ** A sisterhood exists... send a KILL request. */ if (sisters > 0) { DBPRT(("init_abort_jobs: Sending to sisters\n")) pj->ji_resources = (noderes *)calloc(sisters, sizeof(noderes)); mom_radix = pj->ji_wattr[JOB_ATR_job_radix].at_val.at_long; if (mom_radix) { send_sisters(pj, IM_KILL_JOB_RADIX, TRUE); } else { send_sisters(pj, IM_KILL_JOB, FALSE); } /* job is waiting for the reply from other sisters * before it exits */ pj->ji_qs.ji_substate = JOB_SUBSTATE_MOM_WAIT; pj->ji_kill_started = time(NULL); continue; } #endif /* ndef NUMA_SUPPORT */ /* If mom was initialized with a -r any running processes have already been killed. We set substate to JOB_SUBSTATE_NOTERM_REQUE so scan_for_exiting will not try to kill the running processes for this job */ pj->ji_qs.ji_substate = JOB_SUBSTATE_NOTERM_REQUE; if (multi_mom) { momport = pbs_rm_port; } job_save(pj, SAVEJOB_QUICK, momport); exiting_tasks = 1; } /* END if ((recover != 2) && ...) */ else if (recover == JOB_RECOV_RUNNING || recover == JOB_RECOV_DELETE) { /* * add: 8/11/03 David.Singleton@anu.edu.au * * Lots of job structure components need to be * initialized if we are leaving this job * running, this is just a few. * Modified to accomodate JOB_RECOV_DELETE option * 01/13/2009 Ken Nielson knielson@adaptivecomputing.com */ if (LOGLEVEL >= 2 && recover == JOB_RECOV_RUNNING) { sprintf(log_buffer, "attempting to recover job %s in state %s", pj->ji_qs.ji_jobid, PJobSubState[pj->ji_qs.ji_substate]); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, __func__, log_buffer); } #ifndef NUMA_SUPPORT sisters = pj->ji_numnodes - 1; if (sisters > 0) pj->ji_resources = (noderes *)calloc(sisters, sizeof(noderes)); if ((sisters > 0) && (recover == JOB_RECOV_RUNNING)) append_link(&mom_polljobs, &pj->ji_jobque, pj); #endif /* ndef NUMA_SUPPORT */ } } /* while ((pdirent = readdir(dir)) != NULL) */ closedir(dir); return; } /* END init_abort_jobs() */ /* * mom_deljob - delete the job entry, MOM no longer knows about the job */ void mom_deljob( job *pjob) /* I (modified) */ { #ifdef _CRAY /* remove any temporary directories */ rmtmpdir(pjob->ji_qs.ji_jobid); #endif /* _CRAY */ if (LOGLEVEL >= 3) { sprintf(log_buffer, "deleting job %s in state %s", pjob->ji_qs.ji_jobid, PJobSubState[pjob->ji_qs.ji_substate]); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, __func__, log_buffer); } mom_job_purge(pjob); return; } /* END mom_deljob() */ int needs_and_ready_for_reply( job *pjob) { int needs_and_ready = FALSE; task *ptask; if (pjob->ji_obit == TM_NULL_EVENT) { /* No event waiting for sending info to MS - we don't need a reply */ if (LOGLEVEL >= 3) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "obit method not specified for job - no obit sent"); } } else { /* Are any tasks running? If so we're not ready */ ptask = (task *)GET_NEXT(pjob->ji_tasks); while (ptask != NULL) { if (ptask->ti_qs.ti_status == TI_STATE_RUNNING) break; ptask = (task *)GET_NEXT(ptask->ti_jobtask); } /* Still somebody there so don't send it yet. */ if (ptask != NULL) { if (LOGLEVEL >= 3) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "one or more running tasks found - no obit sent"); } } else needs_and_ready = TRUE; } return(needs_and_ready); } /* END needs_and_ready_for_reply() */ int send_job_obit_to_ms( job *pjob, int mom_radix) { int stream = -1; int i; int rc = PBSE_NONE; char *cookie = pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str; u_long cput = resc_used(pjob, "cput", gettime); u_long mem = resc_used(pjob, "mem", getsize); u_long vmem = resc_used(pjob, "vmem", getsize); int command; tm_event_t event; hnodent *np; struct tcp_chan *chan = NULL; if (mom_radix) np = pjob->ji_sisters; else np = pjob->ji_hosts; /* no entry for Mother Superior?? */ if (np == NULL) return(-1); if (mom_radix < 2) { command = IM_ALL_OKAY; event = pjob->ji_obit; } else { command = IM_RADIX_ALL_OK; event = pjob->ji_obit; } if (LOGLEVEL >= 3) { sprintf(log_buffer, "sending command %s", PMOMCommand[command]); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, __func__, log_buffer); } for (i = 0; i < 5; i++) { stream = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr,sizeof(np->sock_addr)); if (IS_VALID_STREAM(stream)) { if ((chan = DIS_tcp_setup(stream)) == NULL) { } /* write the resources used for this job */ else if ((rc = im_compose(chan,pjob->ji_qs.ji_jobid,cookie,command,event,TM_NULL_TASK)) == DIS_SUCCESS) { if ((rc = diswul(chan, cput)) == DIS_SUCCESS) { if ((rc = diswul(chan, mem)) == DIS_SUCCESS) { if ((rc = diswul(chan, vmem)) == DIS_SUCCESS) { if (mom_radix >= 2) { rc = diswsi(chan, pjob->ji_nodeid); } if (rc == DIS_SUCCESS) rc = DIS_tcp_wflush(chan); if (rc == DIS_SUCCESS) { /* Don't wait for a reply from Mother Superior since this could lead to a live lock. That is Mother Superior is waiting for a read from us and we are waiting on this read */ /* SUCCESS - no more retries needed */ if (LOGLEVEL >= 6) { sprintf(log_buffer, "%s: all tasks complete - purging job as sister: %d", __func__, rc); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } close(chan->sock); DIS_tcp_cleanup(chan); break; } } } } } if (chan != NULL) DIS_tcp_cleanup(chan); close(stream); } /* END work on a valid stream */ else if (stream == PERMANENT_SOCKET_FAIL) break; usleep(10); } /* END retry loop */ /* If I cannot contact mother superior, kill this job */ if (rc != PBSE_NONE) { resend_momcomm *mc = (resend_momcomm *)calloc(1, sizeof(resend_momcomm)); killjob_reply_info *kj = (killjob_reply_info *)calloc(1, sizeof(killjob_reply_info)); if (mc == NULL) { if (kj != NULL) free(kj); return(ENOMEM); } else if (kj == NULL) { if (mc != NULL) free(mc); return(ENOMEM); } mc->mc_type = KILLJOB_REPLY; mc->mc_struct = kj; kj->ici = create_compose_reply_info(pjob->ji_qs.ji_jobid, cookie, np, command, event, TM_NULL_TASK); if (kj->ici == NULL) { free(mc); free(kj); } else { kj->mem = mem; kj->vmem = vmem; kj->cputime = cput; if (mom_radix >= 2) kj->node_id = pjob->ji_nodeid; else kj->node_id = -1; add_to_resend_things(mc); } if (LOGLEVEL >= 3) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "couldn't contact mother superior - no obit sent - job will be purged"); } if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_NOTERM_REQUE) { kill_job(pjob, SIGKILL, __func__, "couldn't contact mother superior - no obit sent"); } } return(rc); } /* END send_job_obit_to_ms() */ void exit_mom_job( job *pjob, int mom_radix) { if (LOGLEVEL >= 6) { snprintf(log_buffer, sizeof(log_buffer), "I'm not mother superior for job %s", pjob->ji_qs.ji_jobid); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, __func__, log_buffer); } /* should we do anything yet? */ if (needs_and_ready_for_reply(pjob) == FALSE) return; run_epilogues(pjob, FALSE, FALSE); send_job_obit_to_ms(pjob, mom_radix); mom_job_purge(pjob); } /* END exit_mom_job() */ /* END catch_child() */