/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ #include /* the master config generated by configure */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if defined(NTOHL_NEEDS_ARPA_INET_H) && defined(HAVE_ARPA_INET_H) #include #endif #include "libpbs.h" #include "pbs_ifl.h" #include "list_link.h" #include "attribute.h" #include "resource.h" #include "server_limits.h" #include "pbs_job.h" #include "pbs_nodes.h" #include "pbs_error.h" #include "log.h" #include "net_connect.h" #include "rpp.h" #include "dis.h" #include "dis_init.h" #include "mom_func.h" #include "batch_request.h" #include "resmon.h" #include "mcom.h" #include "svrfunc.h" #ifdef PENABLE_LINUX26_CPUSETS #include "pbs_cpuset.h" #endif /* Global Data Items */ extern int exiting_tasks; extern char mom_host[]; extern char *path_jobs; extern char *path_home; extern int pbs_errno; extern unsigned int pbs_mom_port; extern unsigned int pbs_rm_port; extern unsigned int pbs_tm_port; extern tlist_head mom_polljobs; /* must have resource limits polled */ extern tlist_head svr_alljobs; /* all jobs under MOM's control */ extern int termin_child; extern time_t time_now; extern tree *okclients; /* accept connections from */ extern int port_care; extern char *path_prologp; extern char *path_prologuserp; const char *PMOMCommand[] = { "ALL_OKAY", "JOIN_JOB", "KILL_JOB", "SPAWN_TASK", "GET_TASKS", "SIGNAL_TASK", "OBIT_TASK", "POLL_JOB", "GET_INFO", "GET_RESC", "ABORT_JOB", "GET_TID", "ERROR", /* 12+ */ NULL }; char task_fmt[] = "/%010.10ld"; char noglobid[] = "none"; extern int LOGLEVEL; extern long TJobStartBlockTime; /* external functions */ extern void exec_bail(job *, int); extern int TMomFinalizeJob1(job *, pjobexec_t *, int *); extern int TMomFinalizeJob2(pjobexec_t *, int *); extern int TMomFinalizeJob3(pjobexec_t *, int, int, int *); extern int TMOMJobGetStartInfo(job *, pjobexec_t **) ; extern int TMomCheckJobChild(pjobexec_t *, int, int *, int *); extern void job_nodes(job *); extern int tlist(tree *, char *, int); extern void DIS_tcp_funcs(); extern int TTmpDirName(job *, char *); extern int TMakeTmpDir(job *, char *); extern void mom_server_close_stream(int stream); char *cat_dirs(char *root, char *base); char *get_local_script_path(job *pjob, char *base); #ifdef NVIDIA_GPUS extern int setup_gpus_for_job(job *pjob); #endif /* NVIDIA_GPUS */ #ifdef PENABLE_LINUX26_CPUSETS extern int use_cpusets(job *); #endif /* PENABLE_LINUX26_CPUSETS */ /* END external functions */ /* ** Save the critical information associated with a task to disk. */ int task_save( task *ptask) /* I */ { static char id[] = "task_save"; job *pjob = ptask->ti_job; int fds; int i; int TaskID = 0; char namebuf[MAXPATHLEN]; int openflags; strcpy(namebuf, path_jobs); /* job directory path */ strcat(namebuf, pjob->ji_qs.ji_fileprefix); strcat(namebuf, JOB_TASKDIR_SUFFIX); openflags = O_WRONLY | O_CREAT | O_Sync; if (LOGLEVEL >= 6) { sprintf(log_buffer, "saving task in %s", namebuf); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_SERVER, id, log_buffer); } #if defined(HAVE_OPEN64) && defined(LARGEFILE_WORKS) fds = open64(namebuf, openflags, 0600); #else fds = open(namebuf, openflags, 0600); #endif if (fds < 0) { log_err(errno, id, "error on open"); return(-1); } TaskID = ptask->ti_qs.ti_task; /* adjust task ID if it is adopted... */ if (IS_ADOPTED_TASK(ptask->ti_qs.ti_task)) { TaskID = ptask->ti_qs.ti_task % TM_ADOPTED_TASKID_BASE; } #ifdef HAVE_LSEEK64 if (lseek64(fds, (off_t)(TaskID*sizeof(ptask->ti_qs)), SEEK_SET) < 0) #else if (lseek(fds, (off_t)(TaskID*sizeof(ptask->ti_qs)), SEEK_SET) < 0) #endif { log_err(errno, id, "lseek"); close(fds); return(-1); } /* NOTE: to avoid partial write failures in fs full situations, */ /* attempt write of empty buffer, if success, then write actual task? */ /* (NYI) */ /* just write the "critical" base structure to the file */ while ((i = write( fds, (char *) & ptask->ti_qs, sizeof(ptask->ti_qs))) != sizeof(ptask->ti_qs)) { if ((i < 0) && (errno == EINTR)) { /* retry the write */ #ifdef HAVE_LSEEK64 if (lseek64(fds, (off_t)(TaskID*sizeof(ptask->ti_qs)), SEEK_SET) < 0) #else if (lseek(fds, (off_t)(TaskID*sizeof(ptask->ti_qs)), SEEK_SET) < 0) #endif { log_err(errno, id, "lseek"); close(fds); return(-1); } continue; } log_err(errno, id, "quickwrite"); close(fds); return(-1); } /* END while (i = write()) */ /* SUCCESS */ close(fds); return(0); } /* END task_save() */ /* ** Allocate an event and link it to the given nodeent entry. */ eventent *event_alloc( int command, hnodent *pnode, tm_event_t event, tm_task_id taskid) { static tm_event_t eventnum = TM_NULL_EVENT + 1; eventent *ep; ep = (eventent *)malloc(sizeof(eventent)); assert(ep); ep->ee_command = command; ep->ee_event = (event == TM_NULL_EVENT) ? eventnum++ : event; ep->ee_taskid = taskid; ep->ee_forward.fe_node = TM_ERROR_NODE; ep->ee_forward.fe_event = TM_ERROR_EVENT; ep->ee_forward.fe_taskid = TM_NULL_TASK; ep->ee_argv = NULL; ep->ee_envp = NULL; CLEAR_LINK(ep->ee_next); append_link(&pnode->hn_events, &ep->ee_next, ep); return(ep); } /* END event_alloc() */ /* Forward declaration */ static int adoptSession(pid_t sid, char *id, int command, char *cookie); /* * Create a new task if the current number is less then * the tasks per node limit. */ task *pbs_task_create( job *pjob, tm_task_id taskid) { static char id[] = "pbs_task_create"; task *ptask; attribute *at; resource_def *rd; resource *pres; u_long tasks; /* DJH 27 feb 2002. Check that we aren't about to run into the */ /* task IDs that we use to label adopted tasks. */ if ((taskid == TM_NULL_TASK) && (pjob->ji_taskid >= TM_ADOPTED_TASKID_BASE)) { sprintf(log_buffer, "Ran into reserved task IDs on job %s", pjob->ji_qs.ji_jobid); log_err(-1, id, log_buffer); return NULL; } for (ptask = (task *)GET_NEXT(pjob->ji_tasks), tasks = 0; ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask), tasks++); at = &pjob->ji_wattr[(int)JOB_ATR_resource]; rd = find_resc_def(svr_resc_def, "taskspn", svr_resc_size); assert(rd != NULL); pres = find_resc_entry(at, rd); if (pres != NULL) { if (tasks >= (unsigned long)pres->rs_value.at_val.at_long) { return(NULL); } } ptask = (task *)calloc(1, sizeof(task)); assert(ptask); /* initialize task */ ptask->ti_job = pjob; CLEAR_LINK(ptask->ti_jobtask); append_link(&pjob->ji_tasks, &ptask->ti_jobtask, ptask); ptask->ti_fd = -1; ptask->ti_flags = 0; ptask->ti_register = TM_NULL_EVENT; CLEAR_HEAD(ptask->ti_obits); CLEAR_HEAD(ptask->ti_info); memset(ptask->ti_qs.ti_parentjobid, 0, sizeof(ptask->ti_qs.ti_parentjobid)); ptask->ti_qs.ti_parentnode = TM_ERROR_NODE; ptask->ti_qs.ti_parenttask = TM_NULL_TASK; ptask->ti_qs.ti_task = ((taskid == TM_NULL_TASK) ? pjob->ji_taskid++ : taskid); ptask->ti_qs.ti_status = TI_STATE_EMBRYO; ptask->ti_qs.ti_sid = 0; ptask->ti_qs.ti_exitstat = 0; memset(ptask->ti_qs.ti_u.ti_hold, 0, sizeof(ptask->ti_qs.ti_u.ti_hold)); /* SUCCESS */ return(ptask); } /* END pbs_task_create() */ task *task_find( job *pjob, tm_task_id taskid) { task *ptask; for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { if (ptask->ti_qs.ti_task == taskid) break; } return(ptask); } task *task_check( job *pjob, tm_task_id taskid) { static char id[] = "task_check"; task *ptask; if (taskid == TM_NULL_TASK) { /* don't bother with the error messages */ return(NULL); } ptask = task_find(pjob, taskid); if (ptask == NULL) { sprintf(log_buffer, "%s requesting task %ld not found", pjob->ji_qs.ji_jobid, (long)taskid); log_err(-1, id, log_buffer); return(NULL); } if (ptask->ti_fd < 0) { sprintf(log_buffer, "cannot tm_reply to task %ld", (long)taskid); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(NULL); } return(ptask); } /* END task_check() */ /* ** task_recov() ** Recover (read in) the tasks from their save files for a job. ** ** This function is only needed upon MOM start up. */ int task_recov( job *pjob) { static char id[] = "task_recov"; int fds; task *pt; char namebuf[MAXPATHLEN]; struct taskfix task_save; tm_task_id tid; strcpy(namebuf, path_jobs); /* job directory path */ strcat(namebuf, pjob->ji_qs.ji_fileprefix); strcat(namebuf, JOB_TASKDIR_SUFFIX); #if defined(HAVE_OPEN64) && defined(LARGEFILE_WORKS) fds = open64(namebuf, O_RDONLY, 0); #else fds = open(namebuf, O_RDONLY, 0); #endif if (fds < 0) { log_err(errno, id, "open of task file"); unlink(namebuf); return -1; } /* read in task quick save sub-structure */ while (read(fds, (char *)&task_save, sizeof(task_save)) == sizeof(task_save)) { tid = TM_NULL_TASK; if (IS_ADOPTED_TASK(task_save.ti_task)) { /* * Set the high water mark for adopted task ids. Its * "+1" due to the post-increment when we generate the * task ids. */ pjob->maxAdoptedTaskId = MAX(pjob->maxAdoptedTaskId, (int)(task_save.ti_task + 1)); tid = task_save.ti_task; } if ((pt = pbs_task_create(pjob, tid)) == NULL) { log_err(errno, id, "cannot create task"); close(fds); return -1; } pt->ti_qs = task_save; } /* END while read */ close(fds); return(0); } /* END task_recov() */ /* ** Send a reply message to a user proc over a TCP stream. */ int tm_reply( int stream, int com, tm_event_t event) { int ret; DIS_tcp_funcs(); ret = diswsi(stream, TM_PROTOCOL); if (ret != DIS_SUCCESS) goto done; ret = diswsi(stream, TM_PROTOCOL_VER); if (ret != DIS_SUCCESS) goto done; ret = diswsi(stream, com); if (ret != DIS_SUCCESS) goto done; ret = diswsi(stream, event); if (ret != DIS_SUCCESS) goto done; return(DIS_SUCCESS); done: DBPRT(("tm_reply: send error %s\n", dis_emsg[ret])) return(ret); } /* tm_reply() */ /* ** Start a standard inter-MOM message. */ int im_compose( int stream, char *jobid, char *cookie, int command, tm_event_t event, tm_task_id taskid) { int ret; if (stream < 0) { return(DIS_EOF); } DIS_rpp_reset(); ret = diswsi(stream, IM_PROTOCOL); if (ret != DIS_SUCCESS) goto err; ret = diswsi(stream, IM_PROTOCOL_VER); if (ret != DIS_SUCCESS) goto err; ret = diswst(stream, jobid); if (ret != DIS_SUCCESS) goto err; ret = diswst(stream, cookie); if (ret != DIS_SUCCESS) goto err; ret = diswsi(stream, command); if (ret != DIS_SUCCESS) goto err; ret = diswsi(stream, event); if (ret != DIS_SUCCESS) goto err; ret = diswsi(stream, taskid); if (ret != DIS_SUCCESS) goto err; return(DIS_SUCCESS); err: DBPRT(("im_compose: send error %s\n", dis_emsg[ret])) return(ret); } /* END im_compose() */ /** * Send a message (command = com) to all the other MOMs in the job -> pjob. * * @see scan_for_exiting() - parent - report to sisters upon job completion * @see examine_all_polled_jobs() - parent - poll job status info * @see exec_bail() - parent - abort parallel job * * @see start_exec() - peer - opens connections to sisters at parallel job start * * @return 0 on FAILURE or number of sister mom's successfully contacted on SUCCESS */ int send_sisters( job *pjob, /* I */ int com) /* I (command to send to all sisters) */ { char *id = "send_sisters"; int i, num, ret; eventent *ep; char *cookie; if (LOGLEVEL >= 4) { sprintf(log_buffer, "sending command %s for job %s (%d)", PMOMCommand[com], pjob->ji_qs.ji_jobid, com); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, id, log_buffer); } if (!(pjob->ji_wattr[(int)JOB_ATR_Cookie].at_flags & ATR_VFLAG_SET)) { /* cookie not set - return FAILURE */ return(0); } cookie = pjob->ji_wattr[(int)JOB_ATR_Cookie].at_val.at_str; num = 0; if (com == IM_ABORT_JOB) { snprintf(log_buffer, 1024, "sending ABORT to sisters for job %s", pjob->ji_qs.ji_jobid); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, id, log_buffer); } /* walk thru node list, contact each mom */ for (i = 0;i < pjob->ji_numnodes;i++) { hnodent *np = &pjob->ji_hosts[i]; if (np->hn_node == pjob->ji_nodeid) /* this is me */ continue; if (np->hn_sister != SISTER_OKAY) /* sister is gone? */ { snprintf(log_buffer, 1024, "%s: sister #%d (%s) is not ok (%d)", id, i, (np->hn_host != NULL) ? np->hn_host : "NULL", np->hn_sister); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); /* garrick commented out continue statement below */ /* continue; */ } if (np->hn_stream == -1) { char EMsg[1024]; EMsg[0] = 0; np->hn_stream = rpp_open(np->hn_host, pbs_rm_port, EMsg); if (np->hn_stream == -1) { snprintf(log_buffer, 1024, "%s: cannot open rpp connection to sister #%d (%s) - %s", id, i, (np->hn_host != NULL) ? np->hn_host : "NULL", EMsg); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); if(LOGLEVEL >= 6) { if(EMsg[0] != 0) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, EMsg); } } continue; } } ep = event_alloc(com, np, TM_NULL_EVENT, TM_NULL_TASK); if (ep == NULL) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "cannot alloc event object in send_sisters"); continue; } ret = im_compose( np->hn_stream, pjob->ji_qs.ji_jobid, cookie, com, ep->ee_event, TM_NULL_TASK); if (ret != DIS_SUCCESS) { snprintf(log_buffer, 1024, "%s: cannot compose message to sister #%d (%s) - %d", id, i, (np->hn_host != NULL) ? np->hn_host : "NULL", ret); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); rpp_close(np->hn_stream); np->hn_stream = -1; np->hn_sister = SISTER_EOF; continue; } ret = rpp_flush(np->hn_stream); if (ret == -1) { snprintf(log_buffer, 1024, "%s: cannot flush message to sister #%d (%s)", id, i, (np->hn_host != NULL) ? np->hn_host : "NULL"); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); rpp_close(np->hn_stream); np->hn_stream = -1; np->hn_sister = SISTER_EOF; continue; } np->hn_sister = SISTER_OKAY; num++; } /* END for (i) */ return(num); } /* END send_sisters() */ #define SEND_ERR(err) \ if (reply) { \ im_compose(stream,jobid,cookie,IM_ERROR,event,fromtask); \ diswsi(stream,err); \ } /** * Check to see which node a stream is coming from. Return a NULL * if it is not assigned to this job. Return a nodeent pointer if * it is. */ hnodent *find_node( job *pjob, int stream, tm_node_id nodeid) { static char id[] = "find_node"; int i; vnodent *vp; hnodent *hp; struct sockaddr_in *stream_addr; struct sockaddr_in *node_addr; for (vp = pjob->ji_vnods, i = 0;i < pjob->ji_numvnod;vp++, i++) { if (vp->vn_node == nodeid) break; } /* END for (vp) */ if (i == pjob->ji_numvnod) { sprintf(log_buffer, "node %d not found", nodeid); log_err(-1, id, log_buffer); return(NULL); } hp = vp->vn_host; stream_addr = rpp_getaddr(stream); node_addr = rpp_getaddr(hp->hn_stream); if (stream_addr == NULL) { /* caller didn't have a stream */ /* if node is not me and no stream open, open one */ if (pjob->ji_nodeid != hp->hn_node && node_addr == NULL) hp->hn_stream = rpp_open(hp->hn_host, pbs_rm_port, NULL); return(hp); } /* No stream recorded in the node info, save this one. */ if (node_addr == NULL) { hp->hn_stream = stream; return(hp); } /* ** At this point, both the input stream and the recorded ** stream for the node are good. If they are the same ** index, we are done. */ if (hp->hn_stream == stream) { return(hp); } /* ** The node struct has a different stream number saved ** then the one passed in (supposedly from the same node). ** Check to see if stream recorded in the node struct ** and the one passed in have the same IP address. If ** they do (only a possibly different port number), ** we are fine. Otherwise, a mixup has happened. */ /* if (memcmp( &stream_addr->sin_addr, &node_addr->sin_addr, sizeof(node_addr->sin_addr)) != 0) { */ if (stream_addr->sin_addr.s_addr != node_addr->sin_addr.s_addr) { /* FAILURE */ char *addr1; char *addr2; addr1 = strdup(netaddr(stream_addr)); addr2 = strdup(netaddr(node_addr)); sprintf(log_buffer, "stream id %d does not match %d to node %d (stream=%s node=%s)", stream, hp->hn_stream, nodeid, (addr1 != NULL) ? addr1 : "", (addr2 != NULL) ? addr2 : ""); log_err(-1, id, log_buffer); free(addr1); free(addr2); return(NULL); } return(hp); } /* END find_node() */ /** * An error has been encountered starting a job. * * Format a message to all the sisterhood to get rid of their copy * of the job. There should be no processes running at this point. */ void job_start_error( job *pjob, /* I */ int code, /* I */ char *nodename) /* I */ { static char id[] = "job_start_error"; static char abortjobid[64]; static int abortcount = -1; attribute *pattr; char tmpLine[1024]; if (abortcount == -1) { abortjobid[0] = '\0'; } sprintf(log_buffer, "job_start_error from node %s in %s", nodename, id); log_err(code, pjob->ji_qs.ji_jobid, log_buffer); if (!strcmp(abortjobid, pjob->ji_qs.ji_jobid)) { if (abortcount >= 16) { /* abort is not working, do not send sisters again */ sprintf(log_buffer, "abort attempted 16 times in %s. ignoring abort request from node %s", id, nodename); log_err(code, pjob->ji_qs.ji_jobid, log_buffer); exec_bail(pjob, JOB_EXEC_RETRY); return; } abortcount++; } else { strcpy(abortjobid, pjob->ji_qs.ji_jobid); abortcount = 1; } /* annotate job with failed node info */ snprintf(tmpLine, sizeof(tmpLine), "REJHOST=%s", nodename); pattr = &pjob->ji_wattr[(int)JOB_ATR_sched_hint]; job_attr_def[(int)JOB_ATR_sched_hint].at_free(pattr); job_attr_def[(int)JOB_ATR_sched_hint].at_decode( pattr, NULL, NULL, tmpLine); pjob->ji_wattr[(int)JOB_ATR_errpath].at_flags = (ATR_VFLAG_SET | ATR_VFLAG_MODIFY | ATR_VFLAG_SEND); /* NOTE: is there a way to force the updated 'sched_hint' info to pbs_server before the obit to avoid a race condition? */ /* Perhaps, pbs_mom could register job and perform 'exec_bail' after next job status query from pbs_server? */ /* NOTE: exec_bail() will issue 'send_sisters(pjob,IM_ABORT_JOB);' */ exec_bail(pjob, JOB_EXEC_RETRY); return; } /* END job_start_error() */ /* ** Free malloc'ed array (used in SPAWN) */ void arrayfree( char **array) /* I - freed */ { int i; for (i = 0;array[i];i++) free(array[i]); free(array); return; } /** * Deal with events hooked to a node where a stream has gone * south or we are going away. * * @see term_job() - parent - terminate job * @see im_eof() - parent - inter-MOM end of file detected */ void node_bailout( job *pjob, /* I */ hnodent *np) /* I */ { static char id[] = "node_bailout"; task *ptask; eventent *ep; int i; ep = (eventent *)GET_NEXT(np->hn_events); while (ep != NULL) { switch (ep->ee_command) { case IM_JOIN_JOB: /* * I'm MS and a node has failed to respond to the * call. Maybe in the future the user can specify * the job can start with a range of nodes so * one (or more) missing can be tolerated. Not * for now. */ sprintf(log_buffer, "%s join_job failed from node %s %d - recovery attempted)", pjob->ji_qs.ji_jobid, np->hn_host, np->hn_node); log_err(-1, id, log_buffer); DBPRT(("%s: JOIN_JOB %s\n", id, pjob->ji_qs.ji_jobid)) job_start_error(pjob, PBSE_SISCOMM, np->hn_host); break; case IM_ABORT_JOB: case IM_KILL_JOB: /* ** The job is already in the process of being killed ** but somebody has dropped off the face of the ** earth. Just check to see if everybody has ** been heard from in some form or another and ** set JOB_SUBSTATE_EXITING if so. */ sprintf(log_buffer, "%s: received KILL/ABORT request for job %s from node %s", id, pjob->ji_qs.ji_jobid, np->hn_host); log_err(-1, id, log_buffer); for (i = 1;i < pjob->ji_numnodes;i++) { if (pjob->ji_hosts[i].hn_sister == SISTER_OKAY) break; } /* END for (i) */ if (i == pjob->ji_numnodes) { /* all dead */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; job_save(pjob, SAVEJOB_QUICK); exiting_tasks = 1; } break; case IM_SPAWN_TASK: case IM_GET_TASKS: case IM_SIGNAL_TASK: case IM_OBIT_TASK: case IM_GET_INFO: case IM_GET_RESC: /* ** A user attempt failed, inform process. */ DBPRT(("%s: REQUEST %d %s\n", id, ep->ee_command, pjob->ji_qs.ji_jobid)) ptask = task_check(pjob, ep->ee_taskid); if (ptask == NULL) break; tm_reply(ptask->ti_fd, TM_ERROR, ep->ee_event); diswsi(ptask->ti_fd, TM_ESYSTEM); DIS_tcp_wflush(ptask->ti_fd); break; case IM_POLL_JOB: /* ** I must be Mother Superior for the job and ** this is an error reply to a poll request. */ #ifdef __TRR /* roadrunner */ sprintf(log_buffer, "%s POLL failed from node %s %d - recovery attempted - job will not be killed)", pjob->ji_qs.ji_jobid, np->hn_host, np->hn_node); log_err(-1, id, log_buffer); #else /* __TRR */ /* we should be more patient - how do we recover this connection? (NYI) */ /* if job attribute fault_tolerant is not set or set to false then kill the job */ if ((pjob->ji_wattr[(int)JOB_ATR_fault_tolerant].at_flags & ATR_VFLAG_SET) && pjob->ji_wattr[(int)JOB_ATR_fault_tolerant].at_val.at_long) { sprintf(log_buffer, "%s POLL failed from node %s %d - job is fault tolerant - job will not be killed)", pjob->ji_qs.ji_jobid, np->hn_host, np->hn_node); } else { sprintf(log_buffer, "%s POLL failed from node %s %d - recovery not attempted - job will be killed)", pjob->ji_qs.ji_jobid, np->hn_host, np->hn_node); pjob->ji_nodekill = np->hn_node; } log_err(-1, id, log_buffer); #endif /* __TRR */ break; case IM_GET_TID: /* ** A request to Mother Superior to get ** a TID has failed. */ DBPRT(("%s: GET_TID %s\n", id, pjob->ji_qs.ji_jobid)) arrayfree(ep->ee_argv); arrayfree(ep->ee_envp); ptask = task_check(pjob, ep->ee_forward.fe_taskid); if (ptask == NULL) break; tm_reply( ptask->ti_fd, TM_ERROR, ep->ee_forward.fe_event); diswsi(ptask->ti_fd, TM_ESYSTEM); DIS_tcp_wflush(ptask->ti_fd); break; default: sprintf(log_buffer, "unknown command %d saved", ep->ee_command); log_err(-1, id, log_buffer); break; } /* END switch (ep->ee_command) */ delete_link(&ep->ee_next); free(ep); ep = (eventent *)GET_NEXT(np->hn_events); } /* END while (ep != NULL) */ return; } /* END node_bailout() */ /** terminate job - terminate all node events of all types contained by nodes in job nodelist */ void term_job( job *pjob) /* I */ { hnodent *np; int num; for (num = 0, np = pjob->ji_hosts;num < pjob->ji_numnodes;num++, np++) { if (np->hn_stream >= 0) { rpp_close(np->hn_stream); np->hn_stream = -1; np->hn_sister = SISTER_EOF; } node_bailout(pjob, np); } /* END for (num) */ return; } /* END term_job() */ /* * Handle a stream that needs to be closed. * May be either from another MOM, or the server. * * @see im_request() - parent * @see do_rpp() - parent */ void im_eof( int stream, /* I */ int ret) /* I */ { static char id[] = "im_eof"; int num = -1; job *pjob = NULL; hnodent *np = NULL; struct sockaddr_in *addr; addr = rpp_getaddr(stream); sprintf(log_buffer, "%s from addr %s", dis_emsg[ret], netaddr(addr)); log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); rpp_close(stream); mom_server_close_stream(stream); /* ** Search though all the jobs looking for this stream. ** We want to find if any events are being waited for ** from the "dead" stream and do something with them. */ for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { for (num = 0, np = pjob->ji_hosts;num < pjob->ji_numnodes;num++, np++) { if (np->hn_stream == stream) { np->hn_stream = -1; np->hn_sister = SISTER_EOF; break; } } /* END for (num) */ if (num < pjob->ji_numnodes) /* found it */ break; } /* END for (pjob) */ if (pjob == NULL) { return; } /* matching job located - connection has failed - close all connections to MOM */ node_bailout(pjob, np); /* ** If dead stream is num = 0, I'm a regular node and my connection to ** Mother Superior is gone... kill job. */ if (num == 0) { sprintf(log_buffer, "job %s lost connection to MS on %s", pjob->ji_qs.ji_jobid, np->hn_host); log_err(-1, id, log_buffer); /* don't just give up, maybe mother superior is just being restarted, do not set exiting_tasks -gs */ /* disabled - USC 2/11/2005 */ /* * exiting_tasks = 1; * * pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; */ } return; } /* END im_eof() */ /* * Check to be sure this is a connection from Mother Superior on * a good port. * Check to make sure I am not Mother Superior (talking to myself). * Set the stream in ji_nodes[0] if needed. * Return TRUE on error, FALSE if okay. */ int check_ms( int stream, /* I */ job *pjob) /* I */ { static char id[] = "check_ms"; struct sockaddr_in *addr; hnodent *np; addr = rpp_getaddr(stream); if ((port_care != 0) && (ntohs(addr->sin_port) >= IPPORT_RESERVED)) { sprintf(log_buffer, "non-privileged connection from %s", netaddr(addr)); log_err(-1, id, log_buffer); rpp_close(stream); return(TRUE); } if (pjob == NULL) { return(FALSE); } if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) { log_err(-1, id, "Mother Superior talking to herself"); rpp_eom(stream); return(TRUE); } /* ** This should be mother superior calling. ** We always have a stream open to her at node 0. */ np = &pjob->ji_hosts[0]; /* MS entry */ if (stream != np->hn_stream) { if (np->hn_stream != -1) { sprintf(log_buffer, "MS reset from %d to %d (%s)", np->hn_stream, stream, netaddr(addr)); log_err(-1, id, log_buffer); } np->hn_stream = stream; } return(FALSE); } /* END check_ms() */ u_long resc_used( job *pjob, char *name, u_long(*func)(resource *)) { attribute *at; resource_def *rd; resource *pres; u_long val = 0L; at = &pjob->ji_wattr[(int)JOB_ATR_resc_used]; if (at == NULL) { return(0); } rd = find_resc_def(svr_resc_def, name, svr_resc_size); if (rd == NULL) return 0; pres = find_resc_entry(at, rd); if (pres == NULL) return 0; val = func(pres); DBPRT(("resc_used: %s %lu\n", name, val)) return(val); } /* ** Find named info for a task. */ infoent * task_findinfo(task *ptask, char *name) { infoent *ip; for (ip = (infoent *)GET_NEXT(ptask->ti_info); ip; ip = (infoent *)GET_NEXT(ip->ie_next)) { if (strcmp(ip->ie_name, name) == 0) break; } return ip; } /* ** Save named info with a task. */ void task_saveinfo( task *ptask, char *name, void *info, size_t len) { infoent *ip; if ((ip = task_findinfo(ptask, name)) == NULL) { /* new name */ ip = (infoent *)malloc(sizeof(infoent)); assert(ip); CLEAR_LINK(ip->ie_next); append_link(&ptask->ti_info, &ip->ie_next, ip); ip->ie_name = name; } else { /* replace name with new info */ free(ip->ie_info); } ip->ie_info = info; ip->ie_len = len; return; } /* END task_saveinfo() */ /* ** Generate a resource string for a job. */ char *resc_string( job *pjob) { attribute *at; attribute_def *ad; svrattrl *pal; tlist_head lhead; int len, used, tot; char *res_str, *ch; char *getuname(); extern int resc_access_perm; char *tmpResStr; ch = getuname(); len = strlen(ch); tot = len * 2; used = 0; res_str = (char *)malloc(tot); if (res_str == NULL) { /* FAILURE - cannot alloc memory */ return(NULL); } strcpy(res_str, ch); used += len; res_str[used++] = ':'; at = &pjob->ji_wattr[(int)JOB_ATR_resource]; if (at->at_type != ATR_TYPE_RESC) { /* SUCCESS */ res_str[used] = '\0'; return(res_str); } ad = &job_attr_def[(int)JOB_ATR_resource]; resc_access_perm = ATR_DFLAG_USRD; CLEAR_HEAD(lhead); ad->at_encode( at, &lhead, ad->at_name, NULL, ATR_ENCODE_CLIENT); attrl_fixlink(&lhead); for (pal = (svrattrl *)GET_NEXT(lhead); pal; pal = (svrattrl *)GET_NEXT(pal->al_link)) { while (used + pal->al_rescln + pal->al_valln > tot) { tot *= 2; tmpResStr = realloc(res_str,tot); if (tmpResStr == NULL) { /* FAILURE - cannot alloc memory */ free(res_str); return(NULL); } res_str = tmpResStr; } strcpy(&res_str[used], pal->al_resc); used += (pal->al_rescln - 1); res_str[used++] = '='; strcpy(&res_str[used], pal->al_value); used += (pal->al_valln - 1); res_str[used++] = ','; } free_attrlist(&lhead); res_str[--used] = '\0'; /* SUCCESS */ return(res_str); } /* END resc_string() */ /** * Input is coming from another MOM over a DIS rpp stream. * Read the stream to get a Inter-MOM request. * * request ( * jobid string * cookie string * command int * event int * task int * ) * * Format the reply and write it back. * * @see im_eof() - child - called if failure occurs */ void im_request( int stream, /* I */ int version) /* I */ { char *id = "im_request"; int command = 0; int event_com = 0, ret; char *jobid = NULL; char *cookie = NULL; char *oreo; char basename[50]; char namebuf[MAXPATHLEN]; job *pjob; task *ptask; hnodent *np; eventent *ep = NULL; infoent *ip; int kill_done = 0; struct sockaddr_in *addr; u_long ipaddr; int i, j, errcode, nodeidx = 0; int reply; int exitval; tm_node_id nodeid; tm_task_id fromtask, event_task = 0, taskid; int nodenum, index; int num; int sig; char **argv = NULL, **envp = NULL, *cp, *globid; char *name; void *info; size_t len; tm_event_t event; fwdevent efwd; tlist_head lhead; svrattrl *psatl; attribute_def *pdef; struct passwd *check_pwd(); extern int resc_access_perm; int start_process(task *, char **, char **); u_long gettime(resource *); u_long getsize(resource *); memset(&efwd, 0, sizeof(efwd)); if (version != IM_PROTOCOL_VER) { sprintf(log_buffer, "protocol version %d unknown", version); log_err(-1, id, log_buffer); rpp_close(stream); return; } /* check that machine is known */ addr = rpp_getaddr(stream); ipaddr = ntohl(addr->sin_addr.s_addr); if (LOGLEVEL >= 3) { sprintf(log_buffer, "connect from %s", netaddr(addr)); log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); } if (tfind(ipaddr, &okclients) == NULL) { char tmpLine[1024]; tmpLine[0] = '\0'; tlist(okclients, tmpLine, 1024); sprintf(log_buffer, "bad connect from %s - unauthorized (okclients: %s)", netaddr(addr), tmpLine); log_err(-1, id, log_buffer); rpp_close(stream); return; } jobid = disrst(stream, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer,"request for failed - %s (jobid)", dis_emsg[ret]); log_err(-1,id,log_buffer); goto err; } cookie = disrst(stream, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer,"request for job %s failed - %s (cookie)", jobid, dis_emsg[ret]); log_err(-1,id,log_buffer); goto err; } command = disrsi(stream, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer,"request for job %s failed - %s (command)", jobid, dis_emsg[ret]); log_err(-1,id,log_buffer); goto err; } event = disrsi(stream, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer,"%s request for job %s failed - %s (event)", PMOMCommand[MIN(command,IM_MAX)], jobid, dis_emsg[ret]); log_err(-1,id,log_buffer); goto err; } fromtask = disrsi(stream, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer,"%s request for job %s failed - %s (fromtask)", PMOMCommand[MIN(command,IM_MAX)], jobid, dis_emsg[ret]); log_err(-1,id,log_buffer); goto err; } if (LOGLEVEL >= 3) { sprintf(log_buffer,"received request '%s' for job %s from %s", PMOMCommand[MIN(command,IM_MAX)], jobid, netaddr(addr)); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); DBPRT(("%s\n", log_buffer)); } switch (command) { case IM_JOIN_JOB: /* ** Sender is mother superior sending a job structure to me. ** I am going to become a member of a job. ** ** auxiliary info ( ** localnode id int; ** number of nodes int; ** stdout port int; ** stderr port int; ** nodeid 0 int; ** ... ** nodeid n-1 int; ** jobattrs attrl; ** ) */ reply = 1; if (check_ms(stream, NULL)) goto fini; nodeid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer,"join_job request for job %s failed - %s (nodeid)", jobid, dis_emsg[ret]); log_err(-1, id, log_buffer); goto err; } nodenum = disrsi(stream, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "join_job request from node %d for job %s failed - %s (nodenum)", nodeid, jobid, dis_emsg[ret]); log_err(-1, id, log_buffer); goto err; } if (LOGLEVEL >= 3) { sprintf(log_buffer, "%s: JOIN_JOB %s node %d", id, jobid, nodeid); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } np = NULL; /* does job already exist? */ pjob = find_job(jobid); if (pjob != NULL) { /* job already exists locally */ if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_PRERUN) { if (LOGLEVEL >= 3) { /* if peer mom times out, MS will send new join request for same job */ sprintf(log_buffer, "WARNING: duplicate JOIN request %s from %s (purging previous pjob)", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr)); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } job_purge(pjob); } else { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s (job already exists locally)", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr)); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } /* should local job be purged, ie 'job_purge(pjob);' ? */ SEND_ERR(PBSE_JOBEXIST) goto done; } } /* END if (pjob != NULL) */ if ((pjob = job_alloc()) == NULL) { /* out of memory */ log_err(-1, id, "insufficient memory to create job"); SEND_ERR(PBSE_SYSTEM) goto done; } pjob->ji_stdout = disrsi(stream, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "%s: join_job request to node %d for job %s failed - %s (stdout)", id, nodeid, jobid, dis_emsg[ret]); log_err(-1, id, log_buffer); goto err; } pjob->ji_stderr = disrsi(stream, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "%s: join_job request to node %d for job %s failed - %s (stderr)", id, nodeid, jobid, dis_emsg[ret]); log_err(-1, id, log_buffer); goto err; } pjob->ji_numnodes = nodenum; /* XXX */ CLEAR_HEAD(lhead); if (decode_DIS_svrattrl(stream, &lhead) != DIS_SUCCESS) { sprintf(log_buffer, "%s: join_job request to node %d for job %s failed - %s (decode)", id, nodeid, jobid, dis_emsg[ret]); log_err(-1, id, log_buffer); goto err; } /* Get the hashname from the attribute. */ psatl = (svrattrl *)GET_NEXT(lhead); while (psatl) { if (!strcmp(psatl->al_name, ATTR_hashname)) { strcpy(basename, psatl->al_value); break; } psatl = (svrattrl *)GET_NEXT(psatl->al_link); } strcpy(pjob->ji_qs.ji_jobid, jobid); strcpy(pjob->ji_qs.ji_fileprefix, basename); pjob->ji_modified = 1; pjob->ji_nodeid = nodeid; pjob->ji_qs.ji_svrflags = 0; pjob->ji_qs.ji_un_type = JOB_UNION_TYPE_MOM; /* decode attributes from request into job structure */ errcode = 0; resc_access_perm = READ_WRITE; for (psatl = (svrattrl *)GET_NEXT(lhead); psatl; psatl = (svrattrl *)GET_NEXT(psatl->al_link)) { /* identify the attribute by name */ index = find_attr(job_attr_def, psatl->al_name, JOB_ATR_LAST); if (index < 0) { /* didn`t recognize the name */ errcode = PBSE_NOATTR; break; } pdef = &job_attr_def[index]; /* decode attribute */ errcode = pdef->at_decode( &pjob->ji_wattr[index], psatl->al_name, psatl->al_resc, psatl->al_value); if (errcode != 0) break; } /* END for (psatl) */ free_attrlist(&lhead); if (errcode != 0) { if (LOGLEVEL >= 6) { sprintf(log_buffer, "error %d received in joinjob - purging job", errcode); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } job_purge(pjob); SEND_ERR(errcode) goto done; } job_nodes(pjob); /* set remaining job structure elements */ pjob->ji_qs.ji_state = JOB_STATE_TRANSIT; pjob->ji_qs.ji_substate = JOB_SUBSTATE_PRERUN; pjob->ji_qs.ji_stime = time_now; pjob->ji_wattr[(int)JOB_ATR_mtime].at_val.at_long = (long)time_now; pjob->ji_wattr[(int)JOB_ATR_mtime].at_flags |= ATR_VFLAG_SET; /* check_pwd is setting up ji_un as type MOM pjob->ji_qs.ji_un_type = JOB_UNION_TYPE_NEW; pjob->ji_qs.ji_un.ji_newt.ji_fromsock = -1; pjob->ji_qs.ji_un.ji_newt.ji_fromaddr = addr->sin_addr.s_addr; pjob->ji_qs.ji_un.ji_newt.ji_scriptsz = 0; */ if (check_pwd(pjob) == NULL) { /* log_buffer populated in check_pwd() */ LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); job_purge(pjob); SEND_ERR(PBSE_BADUSER) goto done; } /* should we make a tmpdir? */ if (TTmpDirName(pjob, namebuf)) { if (!TMakeTmpDir(pjob, namebuf)) { LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "cannot create tmp dir"); job_purge(pjob); SEND_ERR(PBSE_BADUSER) goto done; } } #ifdef PENABLE_LINUX26_CPUSETS if (use_cpusets(pjob) == TRUE) { sprintf(log_buffer, "about to create cpuset for job %s.\n", pjob->ji_qs.ji_jobid); log_ext(-1, id, log_buffer, LOG_INFO); if (create_jobset(pjob) == FAILURE) { sprintf(log_buffer, "Could not create cpuset for job %s.\n", pjob->ji_qs.ji_jobid); log_err(-1, id, log_buffer); } } #endif /* (PENABLE_LINUX26_CPUSETS) */ /* run local prolog */ if ((j = run_pelog( PE_PROLOG, path_prologp, pjob, PE_IO_TYPE_ASIS)) != 0) { DBPRT(("cannot run local prolog '%s': %s (rc: %d)\n", path_prologp, log_buffer, j)); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); job_purge(pjob); SEND_ERR(PBSE_SYSTEM) goto done; } /* run user prolog */ if ((j = run_pelog( PE_PROLOGUSER, path_prologuserp, pjob, PE_IO_TYPE_ASIS)) != 0) { DBPRT(("cannot run local user prolog '%s': %s (rc: %d)\n", path_prologuserp, log_buffer, j)); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); job_purge(pjob); SEND_ERR(PBSE_SYSTEM) goto done; } #if IBM_SP2==2 /* IBM SP with PSSP 3.1 */ if (load_sp_switch(pjob) != 0) { job_purge(pjob); log_err(-1, id, "cannot load sp switch table"); SEND_ERR(PBSE_SYSTEM) goto done; } #endif /* IBM SP */ #ifdef NVIDIA_GPUS if (setup_gpus_for_job(pjob) == -1) { job_purge(pjob); log_err(-1, id, "cannot set up gpus"); SEND_ERR(PBSE_SYSTEM) goto done; } #endif /* NVIDIA_GPUS */ job_save(pjob, SAVEJOB_FULL); sprintf(log_buffer, "JOIN JOB as node %d", nodeid); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); /* ** if certain resource limits require that the job usage be ** polled, we link the job to mom_polljobs. ** ** NOTE: we overload the job field ji_jobque for this as it ** is not used otherwise by MOM */ if (mom_do_poll(pjob)) append_link(&mom_polljobs, &pjob->ji_jobque, pjob); append_link(&svr_alljobs, &pjob->ji_alljobs, pjob); ret = im_compose( stream, jobid, cookie, IM_ALL_OKAY, event, fromtask); goto done; /*NOTREACHED*/ break; /* END IM_JOIN_JOB */ case IM_ALL_OKAY: case IM_ERROR: reply = 0; break; default: reply = 1; break; } /* END switch (command) */ np = NULL; /* ** Check if job already exists. */ if ((pjob = find_job(jobid)) == NULL) { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (job does not exist locally)", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr), jobid); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } SEND_ERR(PBSE_JOBEXIST) goto done; } /* check cookie */ if (!(pjob->ji_wattr[(int)JOB_ATR_Cookie].at_flags & ATR_VFLAG_SET)) { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (job has no cookie)", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr), jobid); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } SEND_ERR(PBSE_BADSTATE) goto done; } oreo = pjob->ji_wattr[(int)JOB_ATR_Cookie].at_val.at_str; if (strcmp(oreo, cookie) != 0) { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (job has corrupt cookie - '%s' != '%s')", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr), jobid, oreo, cookie); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } SEND_ERR(PBSE_BADSTATE) goto done; } /* ** This is some processing needed that is common between ** both kinds of reply. */ if (reply == 0) { for (nodeidx = 0;nodeidx < pjob->ji_numnodes;nodeidx++) { np = &pjob->ji_hosts[nodeidx]; if (np->hn_stream == stream) break; } if (nodeidx == pjob->ji_numnodes) { sprintf(log_buffer, "stream %d not found", stream); log_err(-1, id, log_buffer); goto err; } ep = (eventent *)GET_NEXT(np->hn_events); while (ep != NULL) { if ((ep->ee_event == event) && (ep->ee_taskid == fromtask)) break; ep = (eventent *)GET_NEXT(ep->ee_next); } if (ep == NULL) { sprintf(log_buffer, "event %d taskid %ld not found", event, (long)fromtask); log_err(-1, id, log_buffer); goto err; } efwd = ep->ee_forward; event_com = ep->ee_command; event_task = ep->ee_taskid; argv = ep->ee_argv; envp = ep->ee_envp; delete_link(&ep->ee_next); free(ep); } /* END if (reply == 0) */ switch (command) { case IM_KILL_JOB: /* ** Sender is (must be) mom superior commanding me to kill a ** job which I should be a part of. ** Send a signal and set the jobstate to begin the ** kill. We wait for all tasks to exit before sending ** an obit to mother superior. ** ** auxiliary info ( ** none; ** ) */ if (check_ms(stream, pjob)) goto fini; /* ** Send the jobs a signal but we have to wait to ** do a reply to mother superior until the procs ** die and are reaped. */ reply = 0; kill_job(pjob, SIGKILL, id, "kill_job message received"); pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; pjob->ji_obit = event; job_save(pjob, SAVEJOB_QUICK); exiting_tasks = 1; break; case IM_SPAWN_TASK: /* ** Sender is a MOM in a job that wants to start a task. ** I am MOM on the node that is to run the task. ** ** auxiliary info ( ** parent node tm_node_id ** task id tm_task_id ** global id string ** argv 0 string ** ... ** argv n string ** null ** envp 0 string ** ... ** envp m string ** ) */ nodeid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; if ((np = find_node(pjob, stream, nodeid)) == NULL) { SEND_ERR(PBSE_BADHOST) break; } taskid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; globid = disrst(stream, &ret); if (ret != DIS_SUCCESS) goto err; if (LOGLEVEL >= 3) { sprintf(log_buffer, "INFO: received request '%s' from %s for job '%s' (spawning task on node '%d' with taskid=%d, globid='%s'", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr), jobid, nodeid, taskid, globid); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } if (pjob->ji_globid == NULL) { pjob->ji_globid = globid; } else if (strcmp(pjob->ji_globid, noglobid) == 0) { free(pjob->ji_globid); pjob->ji_globid = globid; } else if (strcmp(pjob->ji_globid, globid) != 0) { DBPRT(("%s: globid job %s received %s\n", id, pjob->ji_globid, globid)) free(globid); } num = 4; argv = (char **)calloc(sizeof(char **), num); assert(argv); for (i = 0;;i++) { if ((cp = disrst(stream, &ret)) == NULL) break; if (ret != DIS_SUCCESS) break; if (*cp == '\0') { free(cp); break; } if (i == num - 1) { char **tmpArgV; num *= 2; tmpArgV = (char **)realloc(argv,num * sizeof(char **)); if (tmpArgV == NULL) goto err; argv = tmpArgV; } argv[i] = cp; } /* END for (i) */ argv[i] = NULL; if (ret != DIS_SUCCESS) { arrayfree(argv); goto err; } num = 8; envp = (char **)calloc(sizeof(char **), num); assert(envp); for (i = 0;;i++) { if ((cp = disrst(stream, &ret)) == NULL) break; if (ret != DIS_SUCCESS) break; if (*cp == '\0') { free(cp); break; } if (i == num - 1) { num *= 2; envp = (char **)realloc(envp, num * sizeof(char **)); assert(envp); } envp[i] = cp; } /* END for (i) */ envp[i] = NULL; if (ret != DIS_EOD) { arrayfree(argv); arrayfree(envp); goto err; } /* ** do the spawn */ ret = DIS_SUCCESS; if ((ptask = pbs_task_create(pjob, taskid)) == NULL) { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (cannot create task)", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr), jobid); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } SEND_ERR(PBSE_SYSTEM); arrayfree(argv); arrayfree(envp); break; } strcpy(ptask->ti_qs.ti_parentjobid, jobid); ptask->ti_qs.ti_parentnode = nodeid; ptask->ti_qs.ti_parenttask = fromtask; if (LOGLEVEL >= 6) { LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, "saving task (IM_SPAWN_TASK)"); } if (task_save(ptask) == -1) { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (cannot save task)", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr), jobid); LOG_EVENT( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, jobid, log_buffer); } SEND_ERR(PBSE_SYSTEM) arrayfree(argv); arrayfree(envp); break; } if (start_process(ptask, argv, envp) == -1) { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (cannot start task)", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr), jobid); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } SEND_ERR(TM_ESYSTEM) arrayfree(argv); arrayfree(envp); break; } ret = im_compose( stream, jobid, cookie, IM_ALL_OKAY, event, fromtask); if (ret != DIS_SUCCESS) { /* SUCCESS but cannot reply */ if (LOGLEVEL >= 0) { sprintf(log_buffer, "ALERT: received request '%s' from %s for job '%s' (task successfully started but reply failed)", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr), jobid); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } arrayfree(argv); arrayfree(envp); break; } /* SUCCESS */ ret = diswsi(stream, ptask->ti_qs.ti_task); arrayfree(argv); arrayfree(envp); break; case IM_GET_TASKS: /* ** Sender is MOM which controls a task that wants to get ** the list of tasks running here. ** ** auxiliary info ( ** sending node tm_node_id; ** ) */ nodeid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: GET_TASKS %s from node %d\n", id, jobid, nodeid)) if ((np = find_node(pjob, stream, nodeid)) == NULL) { SEND_ERR(PBSE_BADHOST) break; } ret = im_compose( stream, jobid, cookie, IM_ALL_OKAY, event, fromtask); if (ret != DIS_SUCCESS) break; for (ptask = (task *)GET_NEXT(pjob->ji_tasks);ptask != NULL;ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { ret = diswsi(stream, ptask->ti_qs.ti_task); if (ret != DIS_SUCCESS) break; } break; case IM_SIGNAL_TASK: /* ** Sender is MOM sending a task and signal to ** deliver. If taskid is 0, signal all tasks. ** ** auxiliary info ( ** sending node tm_node_id; ** taskid tm_task_id; ** signal int; ** ) */ nodeid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; if ((np = find_node(pjob, stream, nodeid)) == NULL) { SEND_ERR(PBSE_BADHOST) break; } taskid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; sig = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; if (taskid == 0) { DBPRT(("%s: SIGNAL_TASK %s from node %d all tasks signal %d\n", id, jobid, nodeid, sig)) for ( ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { kill_task(ptask, sig, 0); } /* if STOPing all tasks, we're obviously suspending the job */ if (sig == SIGSTOP) { pjob->ji_qs.ji_substate = JOB_SUBSTATE_SUSPEND; pjob->ji_qs.ji_svrflags |= JOB_SVFLG_Suspend; } else if (sig == SIGCONT) { pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_Suspend; } } else { DBPRT(("%s: SIGNAL_TASK %s from node %d task %d signal %d\n", id, jobid, nodeid, taskid, sig)) ptask = task_find(pjob, taskid); if (ptask == NULL) { SEND_ERR(PBSE_JOBEXIST) break; } sprintf(log_buffer, "%s: SIGNAL_TASK %s from node %d task %d signal %d", id, jobid, nodeid, taskid, sig); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); kill_task(ptask, sig, 0); } ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY, event, fromtask); break; case IM_OBIT_TASK: /* ** Sender is MOM sending a request to monitor a ** task for exit. ** ** auxiliary info ( ** sending node tm_node_id; ** taskid tm_task_id; ** ) */ nodeid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; if ((np = find_node(pjob, stream, nodeid)) == NULL) { SEND_ERR(PBSE_BADHOST) break; } taskid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; ptask = task_find(pjob, taskid); if (ptask == NULL) { SEND_ERR(PBSE_JOBEXIST) break; } DBPRT(("%s: OBIT_TASK %s from node %d task %d\n", id, jobid, nodeid, taskid)) if (ptask->ti_qs.ti_status >= TI_STATE_EXITED) { ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY, event, fromtask); if (ret != DIS_SUCCESS) break; ret = diswsi(stream, ptask->ti_qs.ti_exitstat); } else { /* save obit request with task */ obitent *op = (obitent *)malloc(sizeof(obitent)); assert(op); CLEAR_LINK(op->oe_next); append_link(&ptask->ti_obits, &op->oe_next, op); op->oe_info.fe_node = nodeid; op->oe_info.fe_event = event; op->oe_info.fe_taskid = fromtask; reply = 0; } break; case IM_GET_INFO: /* ** Sender is MOM sending a task and name to lookup ** for info to report back. ** ** auxiliary info ( ** sending node tm_node_id; ** taskid tm_task_id; ** name string; ** ) */ nodeid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; if ((np = find_node(pjob, stream, nodeid)) == NULL) { SEND_ERR(PBSE_BADHOST) break; } taskid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; ptask = task_find(pjob, taskid); if (ptask == NULL) { SEND_ERR(PBSE_JOBEXIST) break; } name = disrst(stream, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: GET_INFO %s from node %d task %d name %s\n", id, jobid, nodeid, taskid, name)) if ((ip = task_findinfo(ptask, name)) == NULL) { SEND_ERR(PBSE_JOBEXIST) break; } ret = im_compose( stream, jobid, cookie, IM_ALL_OKAY, event, fromtask); if (ret != DIS_SUCCESS) break; ret = diswcs(stream, ip->ie_info, ip->ie_len); break; case IM_GET_RESC: /* ** Sender is MOM requesting resource info to ** report back its client. ** ** auxiliary info ( ** sending node tm_node_id; ** ) */ nodeid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; if ((np = find_node(pjob, stream, nodeid)) == NULL) { SEND_ERR(PBSE_BADHOST) break; } DBPRT(("%s: GET_RESC %s from node %d\n", id, jobid, nodeid)) info = resc_string(pjob); ret = im_compose( stream, jobid, cookie, IM_ALL_OKAY, event, fromtask); if (ret != DIS_SUCCESS) break; ret = diswst(stream, info); free(info); break; case IM_POLL_JOB: /* ** Sender is (must be) mom superior commanding me to send ** information for a job which I should be a part of. ** ** auxiliary info ( ** none; ** ) */ if (check_ms(stream, pjob)) goto fini; DBPRT(("%s: POLL_JOB %s\n", id, jobid)) ret = im_compose( stream, jobid, cookie, IM_ALL_OKAY, event, fromtask); if (ret != DIS_SUCCESS) break; if (pjob->ji_qs.ji_state == JOB_STATE_TRANSIT) { /* first poll, set job to running */ pjob->ji_qs.ji_state = JOB_STATE_RUNNING; pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; job_save(pjob, SAVEJOB_QUICK); } /* Now comes a recomendation for killing the job. */ exitval = (pjob->ji_qs.ji_svrflags & (JOB_SVFLG_OVERLMT1 | JOB_SVFLG_OVERLMT2)) ? 1 : 0; ret = diswsi(stream, exitval); if (ret != DIS_SUCCESS) break; /* get fresh resource usage */ mom_set_use(pjob); /* ** Send the information tallied for the job. */ ret = diswul(stream, resc_used(pjob, "cput", gettime)); if (ret != DIS_SUCCESS) break; ret = diswul(stream, resc_used(pjob, "mem", getsize)); if (ret != DIS_SUCCESS) break; ret = diswul(stream, resc_used(pjob, "vmem", getsize)); break; case IM_ABORT_JOB: /* ** Sender is (must be) mom superior commanding me to ** abort a JOIN_JOB request. ** ** auxiliary info ( ** none; ** ) */ if (check_ms(stream, pjob)) { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (requestor is not parent)", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr), jobid); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } goto fini; } if (LOGLEVEL >= 2) { sprintf(log_buffer, "%s: received KILL/ABORT request for job %s from node %s", id, jobid, netaddr(addr)); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } reply = 0; job_purge(pjob); break; case IM_GET_TID: /* * I must be mom superior getting a request from a * sub-mom to get a TID. * * auxiliary info ( * none; * ) */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { log_err(-1, id, "got GET_TID and I'm not MS"); goto err; } DBPRT(("%s: GET_TID %s\n", id, jobid)) /* DJH 27 Feb 2002 */ if (IS_ADOPTED_TASK(pjob->ji_taskid)) { log_err(-1, id, "Ran into reserved task ids"); goto err; } ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY, event, fromtask); if (ret != DIS_SUCCESS) break; ret = diswsi(stream, pjob->ji_taskid++); break; case IM_ALL_OKAY: /* this is a REPLY */ /* ** Sender is another MOM telling me that a request has ** completed successfully. */ switch (event_com) { case IM_JOIN_JOB: /* ** Sender is one of the sisterhood saying she ** got the job structure sent and she accepts it. ** I'm mother superior. ** ** auxiliary info ( ** none; ** ) */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { log_err(-1, id, "got JOIN_JOB OKAY and I'm not MS"); goto err; } for (i = 0;i < pjob->ji_numnodes;i++) { np = &pjob->ji_hosts[i]; if ((ep = (eventent *)GET_NEXT(np->hn_events)) != NULL) break; } /* END for (i) */ if (ep == NULL) { pjobexec_t *TJE = NULL; int SC; int RC; int Count; /* no events remaining, all moms have reported in, launch job locally */ if (LOGLEVEL >= 2) { LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "all sisters have reported in, launching job locally"); } TMOMJobGetStartInfo(NULL, &TJE); if (TMomFinalizeJob1(pjob, TJE, &SC) == FAILURE) { /* FAILURE (or at least do not continue) */ if (SC != 0) { memset(TJE, 0, sizeof(pjobexec_t)); exec_bail(pjob, SC); } break; } /* TMomFinalizeJob2() blocks until job is fully launched */ if (TMomFinalizeJob2(TJE, &SC) == FAILURE) { if (SC != 0) { memset(TJE, 0, sizeof(pjobexec_t)); exec_bail(pjob, SC); } break; } /* block, wait for child to complete indicating success/failure of job launch */ if (TMomCheckJobChild(TJE, TJobStartBlockTime, &Count, &RC) == FAILURE) { if (LOGLEVEL >= 3) { sprintf(log_buffer, "job not ready after %ld second timeout, MOM will check later", TJobStartBlockTime); log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } break; } /* NOTE: TMomFinalizeJob3() populates SC */ if (TMomFinalizeJob3(TJE, Count, RC, &SC) == FAILURE) { sprintf(log_buffer, "ALERT: job failed phase 3 start"); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); memset(TJE, 0, sizeof(pjobexec_t)); exec_bail(pjob, SC); break; } /* SUCCESS: MOM returns */ memset(TJE, 0, sizeof(pjobexec_t)); if (LOGLEVEL >= 3) { sprintf(log_buffer, "job successfully started"); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } } /* END if (ep == NULL) */ else { if (LOGLEVEL >= 4) { sprintf(log_buffer, "joinjob response received from node %s, (still waiting for %s)", netaddr(addr), np->hn_host); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } } break; case IM_KILL_JOB: /* ** Sender is sending a response that a job ** which needs to die has been given the ax. ** I'm mother superior. ** ** auxiliary info ( ** cput ulong; ** mem ulong; ** vmem ulong; ** ) */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { log_err(-1, id, "got KILL_JOB OKAY and I'm not MS"); goto err; } if (LOGLEVEL >= 2) { sprintf(log_buffer, "KILL_JOB acknowledgement received"); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } if (pjob->ji_resources != NULL) { pjob->ji_resources[nodeidx - 1].nr_cput = disrul(stream, &ret); if (ret != DIS_SUCCESS) goto err; pjob->ji_resources[nodeidx - 1].nr_mem = disrul(stream, &ret); if (ret != DIS_SUCCESS) goto err; pjob->ji_resources[nodeidx - 1].nr_vmem = disrul(stream, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: %s FINAL from %d cpu %lu sec mem %lu kb vmem %ld kb\n", id, jobid, nodeidx, pjob->ji_resources[nodeidx - 1].nr_cput, pjob->ji_resources[nodeidx - 1].nr_mem, pjob->ji_resources[nodeidx - 1].nr_vmem)) } /* END if (pjob_ji_resources != NULL) */ /* don't close stream in case other jobs use it */ np->hn_sister = SISTER_KILLDONE; for (i = 1;i < pjob->ji_numnodes;i++) { if (pjob->ji_hosts[i].hn_sister == SISTER_OKAY) break; } if (i == pjob->ji_numnodes) { /* all dead */ DBPRT(("%s: ALL DONE, set EXITING job %s\n", id, jobid)) pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; job_save(pjob, SAVEJOB_QUICK); exiting_tasks = 1; } kill_done = 1; break; case IM_SPAWN_TASK: /* ** Sender is MOM responding to a "spawn_task" ** request. ** ** auxiliary info ( ** task id tm_task_id; ** ) */ taskid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: SPAWN_TASK %s OKAY task %d\n", id, jobid, taskid)) ptask = task_check(pjob, event_task); if (ptask == NULL) break; tm_reply(ptask->ti_fd, TM_OKAY, event); diswsi(ptask->ti_fd, taskid); DIS_tcp_wflush(ptask->ti_fd); break; case IM_GET_TASKS: /* ** Sender is MOM giving a list of tasks which she ** has started for this job. ** ** auxiliary info ( ** task id tm_task_id; ** ... ** task id tm_task_id; ** ) */ DBPRT(("%s: GET_TASKS %s OKAY\n", id, jobid)) ptask = task_check(pjob, event_task); if (ptask == NULL) break; tm_reply(ptask->ti_fd, TM_OKAY, event); for (;;) { DIS_rpp_reset(); taskid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) { if (ret == DIS_EOD) break; goto err; } DIS_tcp_funcs(); diswsi(ptask->ti_fd, taskid); } DIS_tcp_funcs(); diswsi(ptask->ti_fd, TM_NULL_TASK); DIS_tcp_wflush(ptask->ti_fd); break; case IM_SIGNAL_TASK: /* ** Sender is MOM with a good signal to report. ** ** auxiliary info ( ** none; ** ) */ DBPRT(("%s: SIGNAL_TASK %s OKAY %d\n", id, jobid, event_task)) ptask = task_check(pjob, event_task); if (ptask == NULL) break; tm_reply(ptask->ti_fd, TM_OKAY, event); DIS_tcp_wflush(ptask->ti_fd); break; case IM_OBIT_TASK: /* ** Sender is MOM with a death report. ** ** auxiliary info ( ** exit value int; ** ) */ exitval = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: OBIT_TASK %s OKAY %d exit val %d\n", id, jobid, event_task, exitval)) ptask = task_check(pjob, event_task); if (ptask == NULL) break; tm_reply(ptask->ti_fd, TM_OKAY, event); diswsi(ptask->ti_fd, exitval); DIS_tcp_wflush(ptask->ti_fd); break; case IM_GET_INFO: /* ** Sender is MOM with a named info to report. ** ** auxiliary info ( ** info counted string; ** ) */ info = disrcs(stream, &len, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: GET_INFO %s OKAY %d\n", id, jobid, event_task)) ptask = task_check(pjob, event_task); if (ptask == NULL) { free(info); break; } tm_reply(ptask->ti_fd, TM_OKAY, event); diswcs(ptask->ti_fd, info, len); DIS_tcp_wflush(ptask->ti_fd); break; case IM_GET_RESC: /* ** Sender is MOM with a resource info to report. ** ** auxiliary info ( ** info counted string; ** ) */ info = disrst(stream, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: GET_RESC %s OKAY %d\n", id, jobid, event_task)) ptask = task_check(pjob, event_task); if (ptask == NULL) { free(info); break; } tm_reply(ptask->ti_fd, TM_OKAY, event); diswst(ptask->ti_fd, info); DIS_tcp_wflush(ptask->ti_fd); break; case IM_POLL_JOB: /* ** I must be Mother Superior for the job and ** this is a reply with job resources to ** tally up. ** ** auxiliary info ( ** recommendation int; ** cput u_long; ** mem u_long; ** vmem u_long; ** ) */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { log_err(-1, id, "got POLL_JOB and I'm not MS"); goto err; } exitval = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; pjob->ji_resources[nodeidx - 1].nr_cput = disrul(stream, &ret); if (ret != DIS_SUCCESS) goto err; pjob->ji_resources[nodeidx - 1].nr_mem = disrul(stream, &ret); if (ret != DIS_SUCCESS) goto err; pjob->ji_resources[nodeidx - 1].nr_vmem = disrul(stream, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: POLL_JOB %s OKAY kill %d cpu=%lu mem=%lu vmem=%lu\n", id, jobid, exitval, pjob->ji_resources[nodeidx - 1].nr_cput, pjob->ji_resources[nodeidx - 1].nr_mem, pjob->ji_resources[nodeidx - 1].nr_vmem)) if (exitval != 0) { if (LOGLEVEL >= 2) { sprintf(log_buffer, "non-zero exit status reported from node %s, aborting job", np->hn_host); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } pjob->ji_nodekill = np->hn_node; } break; case IM_GET_TID: /* ** Sender must be Mother Superior with a TID. ** I will either do the spawn or forward the SPAWN ** to the final destination. ** ** auxiliary info ( ** task id tm_task_id; ** ) */ if (check_ms(stream, pjob)) goto fini; taskid = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; /* ** Check to see if I need to forward the taskid ** to another MOM. */ if (pjob->ji_nodeid != efwd.fe_node) { np = find_node(pjob, -1, efwd.fe_node); if (np == NULL) goto done; ep = event_alloc( IM_SPAWN_TASK, np, efwd.fe_event, efwd.fe_taskid); ret = im_compose( np->hn_stream, jobid, cookie, IM_SPAWN_TASK, efwd.fe_event, efwd.fe_taskid); if (ret != DIS_SUCCESS) goto done; ret = diswsi(np->hn_stream, pjob->ji_nodeid); if (ret != DIS_SUCCESS) goto done; ret = diswsi(np->hn_stream, taskid); if (ret != DIS_SUCCESS) goto done; ret = diswst(np->hn_stream, pjob->ji_globid); if (ret != DIS_SUCCESS) goto done; for (i = 0;argv[i];i++) { ret = diswst(np->hn_stream, argv[i]); if (ret != DIS_SUCCESS) goto done; } ret = diswst(np->hn_stream, ""); if (ret != DIS_SUCCESS) goto done; for (i = 0;envp[i];i++) { ret = diswst(np->hn_stream, envp[i]); if (ret != DIS_SUCCESS) goto done; } ret = (rpp_flush(np->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS; arrayfree(argv); arrayfree(envp); break; } /* END if (pjob->ji_nodeid != efwd.fe_node) */ /* It's me, do the spawn */ ret = 0; if ((ptask = pbs_task_create(pjob, taskid)) != NULL) { strcpy(ptask->ti_qs.ti_parentjobid, jobid); ptask->ti_qs.ti_parentnode = efwd.fe_node; ptask->ti_qs.ti_parenttask = efwd.fe_taskid; if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "saving task (IM_GET_TID)"); } if (task_save(ptask) != -1) ret = start_process(ptask, argv, envp); } arrayfree(argv); arrayfree(envp); taskid = ptask->ti_qs.ti_task; ptask = task_check(pjob, efwd.fe_taskid); if (ptask == NULL) break; tm_reply( ptask->ti_fd, (ret == -1) ? TM_ERROR : TM_OKAY, efwd.fe_event); diswsi( ptask->ti_fd, (int)(ret == -1 ? TM_ESYSTEM : taskid)); DIS_tcp_wflush(ptask->ti_fd); break; default: sprintf(log_buffer, "unknown request type %d saved", event_com); log_err(-1, id, log_buffer); break; } /* END switch (event_com) */ break; case IM_ERROR: /* this is a REPLY */ /* ** Sender is responding to a request with an error code. ** ** auxiliary info ( ** error value int; ** ) */ errcode = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; switch (event_com) { case IM_JOIN_JOB: /* ** A MOM has rejected a request to join a job. ** We need to send a ABORT_JOB to all the sisterhood ** and fail the job start to server. ** I'm mother superior. */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { log_err(-1, id, "JOIN_JOB ERROR and I'm not MS"); goto err; } DBPRT(("%s: JOIN_JOB %s returned ERROR %d\n", id, jobid, errcode)) job_start_error(pjob, errcode, netaddr(addr)); break; case IM_ABORT_JOB: case IM_KILL_JOB: /* ** Job cleanup failed on a sister. ** Wait for everybody to respond then finish. ** I'm mother superior. */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { /* log_err(-1,id,"KILL_JOB %s ERROR and I'm not MS"); */ log_err(-1, id, "KILL_JOB ERROR and I'm not MS"); goto err; } if (LOGLEVEL >= 1) { char tmpLine[1024]; sprintf(tmpLine, "KILL/ABORT request for job %s returned error %d\n", jobid, errcode); log_err(errcode, id, tmpLine); } np->hn_sister = errcode ? errcode : SISTER_KILLDONE; for (i = 1;i < pjob->ji_numnodes;i++) { if (pjob->ji_hosts[i].hn_sister == SISTER_OKAY) break; } if (i == pjob->ji_numnodes) { /* all dead */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; job_save(pjob, SAVEJOB_QUICK); exiting_tasks = 1; } break; case IM_SPAWN_TASK: case IM_GET_TASKS: case IM_SIGNAL_TASK: case IM_OBIT_TASK: case IM_GET_INFO: /* A user attempt failed, inform process. */ DBPRT(("%s: REQUEST %d %s returned ERROR %d\n", id, event_com, jobid, errcode)) ptask = task_check(pjob, event_task); if (ptask == NULL) break; tm_reply(ptask->ti_fd, TM_ERROR, event); diswsi(ptask->ti_fd, errcode); DIS_tcp_wflush(ptask->ti_fd); break; case IM_POLL_JOB: /* ** I must be Mother Superior for the job and ** this is an error reply to a poll request. */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { /* log_err(-1,id,"POLL_JOB %s ERROR and I'm not MS"); */ log_err(-1, id, "POLL_JOB ERROR and I'm not MS"); goto err; } DBPRT(("%s: POLL_JOB %s returned ERROR %d\n", id, jobid, errcode)) np->hn_sister = errcode ? errcode : SISTER_BADPOLL; break; case IM_GET_TID: /* ** Sender must be Mother Superior failing to ** send a TID. ** Send a fail to the task which called SPAWN. */ if (check_ms(stream, pjob)) goto fini; DBPRT(("%s: GET_TID %s returned ERROR %d\n", id, jobid, errcode)) arrayfree(argv); arrayfree(envp); ptask = task_check(pjob, efwd.fe_taskid); if (ptask == NULL) break; tm_reply(ptask->ti_fd, TM_ERROR, efwd.fe_event); diswsi(ptask->ti_fd, errcode); DIS_tcp_wflush(ptask->ti_fd); break; default: sprintf(log_buffer, "unknown command %d error", event_com); log_err(-1, id, log_buffer); goto err; /*NOTREACHED*/ break; } /* END switch(event_com) */ break; default: sprintf(log_buffer, "unknown command %d sent", command); log_err(-1, id, log_buffer); goto err; /*NOTREACHED*/ break; } /* END switch (Command) */ done: rpp_eom(stream); if (reply) { /* check if write worked */ if ((ret != DIS_SUCCESS) || (rpp_flush(stream) == -1)) { log_err(errno, id, "rpp_flush"); rpp_close(stream); kill_done = 0; if ((np != NULL) && (np->hn_stream == stream)) np->hn_stream = -1; } } if(kill_done) { rpp_close(stream); } goto fini; err: /* ** We come here if we got a DIS read error or a protocol ** element is missing. The likely case is the remote ** host has gone down. */ sprintf(log_buffer, "error sending command %d to job %s", command, jobid ? jobid : "unknown"); log_err(-1, id, log_buffer); im_eof(stream, ret); fini: if (jobid != NULL) free(jobid); if (cookie != NULL) free(cookie); return; } /* END im_request() */ void tm_eof( int fd) { job *pjob; task *ptask; char *id = "tm_eof"; /* ** Search though all the jobs looking for this fd. */ for ( pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { for ( ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { if (ptask->ti_fd == fd) { if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "matching task located, marking interface closed"); } ptask->ti_fd = -1; return; } } } if (LOGLEVEL >= 1) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_SERVER, id, "no matching task found"); } return; } /* END tm_eof() */ /* ** Input is coming from a process running on this host which ** should be part of one of the jobs I am part of. The i/o ** will take place using DIS over a tcp fd. ** ** Read the stream to get a task manager request. Format the reply ** and write it back. ** ** read ( ** jobid string ** cookie string ** command int ** event int ** from taskid int ** ) ** ** ** tm_requests only use tcp. No rpp. */ int tm_request( int fd, int version) { char *id = "tm_request"; int command, reply = 0; int ret = DIS_SUCCESS; char *jobid = NULL; char *cookie = NULL; char *oreo; job *pjob; task *ptask; vnodent *pnode; hnodent *phost; int i, event, numele; size_t len; long ipadd; char **argv, **envp; char *name, *info; eventent *ep; infoent *ip; int signum; int vnodenum; int prev_error = 0; tm_node_id nodeid; tm_task_id taskid = 0, fromtask; attribute *at; extern u_long localaddr; extern struct connection svr_conn[]; int start_process(task *ptask, char **argv, char **envp); if (svr_conn[fd].cn_addr != localaddr) { sprintf(log_buffer, "non-local connect"); goto err; } if (version != TM_PROTOCOL_VER) { sprintf(log_buffer, "bad protocol version %d", version); goto err; } jobid = disrst(fd, &ret); if (ret != DIS_SUCCESS) goto err; cookie = disrst(fd, &ret); if (ret != DIS_SUCCESS) goto err; command = disrsi(fd, &ret); if (ret != DIS_SUCCESS) goto err; event = disrsi(fd, &ret); if (ret != DIS_SUCCESS) goto err; fromtask = disrui(fd, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: job %s cookie %s task %d com %d event %d\n", id, jobid, cookie, fromtask, command, event)) /* * Allow a non-PBS process to be adopted * by PBS for resource accounting and possibly management * purposes. Note that this circumvents much of the protocol, * cookie checks etc. See adoptSession() for more info * DJH 26 Feb 2002. Distinguish between jobid and altid * adoptions - see adoptSession() */ if ((command == TM_ADOPT_ALTID) || (command == TM_ADOPT_JOBID)) { pid_t sid; char *id = NULL; int adoptStatus; reply = TRUE; /* Read the session id and alt/job id from tm_adopt() */ sid = disrsi(fd, &ret); if (ret != DIS_SUCCESS) goto err; id = disrst(fd, &ret); if (ret != DIS_SUCCESS) { if (id) free(id); goto err; } /* Got all the info. Try to adopt the session */ adoptStatus = adoptSession(sid, id, command, cookie); if (id) free(id); /* Let the tm_adopt() call know if it was adopted or not. This is synchronous - doesn't use the event stuff.*/ DIS_tcp_funcs(); /* do I really need this? */ ret = diswsi(fd, adoptStatus); if (ret != DIS_SUCCESS) goto err; goto done; } /* verify the jobid is known and the cookie matches */ if ((pjob = find_job(jobid)) == NULL) { sprintf(log_buffer, "job %s not found", jobid); goto err; } if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING) { sprintf(log_buffer, "job %s not running", jobid); goto err; } at = &pjob->ji_wattr[(int)JOB_ATR_Cookie]; if (!(at->at_flags & ATR_VFLAG_SET)) { sprintf(log_buffer, "job %s has no cookie", jobid); goto err; } oreo = at->at_val.at_str; if (strcmp(oreo, cookie) != 0) { sprintf(log_buffer, "job %s cookie %s message %s", jobid, oreo, cookie); goto err; } /* verify this taskid is my baby */ ptask = task_find(pjob, fromtask); if (ptask == NULL) { /* not found */ sprintf(log_buffer, "task %d in job %s not found", fromtask, jobid); log_err(-1, id, log_buffer); ret = tm_reply(fd, TM_ERROR, event); if (ret != DIS_SUCCESS) goto done; ret = diswsi(fd, TM_ENOTFOUND); if (ret != DIS_SUCCESS) goto done; prev_error = 1; /* * ANUPBS - DBS 21/10/02 * This line added to avoid segfault. Code path can fall thru * here and deref ptask! Problem uncovered by adopt? Problem * noticed in code with multiple pbs_dsh and prun (adopt) */ goto done; } else if ((ptask->ti_fd != -1) && (ptask->ti_fd != fd)) { /* someone is already connected, create a new task for the new conn */ ptask = pbs_task_create(pjob, TM_NULL_TASK); if (ptask == NULL) goto err; strcpy(ptask->ti_qs.ti_parentjobid, jobid); ptask->ti_qs.ti_parentnode = pjob->ji_nodeid; ptask->ti_qs.ti_parenttask = fromtask; /* the initial connection is "from" task 1, we set this to not confuse the new connection with the old */ fromtask = ptask->ti_qs.ti_task; if (LOGLEVEL >= 6) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "saving task (additional connection)"); } if (task_save(ptask) == -1) goto err; } svr_conn[fd].cn_oncl = tm_eof; ptask->ti_fd = fd; reply = TRUE; /* set no timeout so connection is not closed for being idle */ svr_conn[fd].cn_authen |= PBS_NET_CONN_NOTIMEOUT; switch (command) { case TM_INIT: /* ** A request to initialize. Must be the first ** thing we see from a task to do psched requests. */ DBPRT(("%s: INIT %s\n", id, jobid)) if (prev_error) goto done; ret = tm_reply(fd, TM_OKAY, event); if (ret != DIS_SUCCESS) goto done; vnodenum = pjob->ji_numvnod; ret = diswui(fd, vnodenum); /* num nodes */ if (ret != DIS_SUCCESS) goto done; pnode = pjob->ji_vnods; for (i = 0;i < vnodenum;i++) { ret = diswsi(fd, pnode[i].vn_node); if (ret != DIS_SUCCESS) goto done; } ret = diswst(fd, ptask->ti_qs.ti_parentjobid); /* dad job */ if (ret != DIS_SUCCESS) goto done; ret = diswsi(fd, ptask->ti_qs.ti_parentnode); /* dad node */ if (ret != DIS_SUCCESS) goto done; ret = diswsi(fd, ptask->ti_qs.ti_parenttask); /* dad task */ if (ret != DIS_SUCCESS) goto done; ptask->ti_flags |= TI_FLAGS_INIT; goto done; /*NOTREACHED*/ break; case TM_POSTINFO: /* ** Post named info for a task. ** ** read ( ** name string; ** info counted string; ** ) */ name = disrst(fd, &ret); if (ret != DIS_SUCCESS) goto err; info = disrcs(fd, &len, &ret); if (ret != DIS_SUCCESS) { free(name); goto err; } DBPRT(("%s: POSTINFO %s task %d sent info %s:%s(%d)\n", id, jobid, fromtask, name, info, (int)len)) if (prev_error) goto done; task_saveinfo(ptask, name, info, len); ret = tm_reply(fd, TM_OKAY, event); goto done; /*NOTREACHED*/ break; case TM_REGISTER: sprintf(log_buffer, "REGISTER - NOT IMPLEMENTED %s", jobid); tm_reply(fd, TM_ERROR, event); diswsi(fd, TM_ENOTIMPLEMENTED); DIS_tcp_wflush(fd); goto err; /*NOTREACHED*/ break; default: /* NO-OP */ break; } /* END switch(command) */ /* ** All requests beside TM_INIT and TM_POSTINFO ** require a node number where the action will take place. ** Read that and check that it is legal. ** ** read ( ** node number int ** ) */ nodeid = disrui(fd, &ret); if (ret != DIS_SUCCESS) goto err; pnode = pjob->ji_vnods; for (i = 0;i < pjob->ji_numvnod;i++, pnode++) { if (pnode->vn_node == nodeid) break; } if (i == pjob->ji_numvnod) { sprintf(log_buffer, "node %d in job %s not found", nodeid, jobid); log_err(-1, id, log_buffer); ret = tm_reply(fd, TM_ERROR, event); if (ret != DIS_SUCCESS) goto done; ret = diswsi(fd, TM_ENOTFOUND); if (ret != DIS_SUCCESS) goto done; prev_error = 1; } phost = pnode->vn_host; switch (command) { case TM_TASKS: /* ** A request to read the list of tasks that a ** particular node has charge of. */ DBPRT(("%s: TASKS %s on node %d\n", id, jobid, nodeid)) if (prev_error) goto done; if (pjob->ji_nodeid != nodeid) { /* not me */ ep = event_alloc(IM_GET_TASKS, phost, event, fromtask); if (phost->hn_stream == -1) { phost->hn_stream = rpp_open(phost->hn_host, pbs_rm_port, NULL); } ret = im_compose( phost->hn_stream, jobid, cookie, IM_GET_TASKS, event, fromtask); if (ret != DIS_SUCCESS) goto done; ret = diswui(phost->hn_stream, pjob->ji_nodeid); /* XXX */ if (ret != DIS_SUCCESS) goto done; ret = (rpp_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS; if (ret != DIS_SUCCESS) goto done; reply = FALSE; goto done; } /* END if (pjob->ji_nodeid != nodeid) */ ret = tm_reply(fd, TM_OKAY, event); if (ret != DIS_SUCCESS) goto done; for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { ret = diswui(fd, ptask->ti_qs.ti_task); if (ret != DIS_SUCCESS) goto done; } ret = diswui(fd, TM_NULL_TASK); break; case TM_SPAWN: /* ** Spawn a task on the requested node. ** ** read ( ** argc int; ** arg 0 string; ** ... ** arg argc-1 string; ** env 0 string; ** ... ** env m string; ** ) */ DBPRT(("%s: SPAWN %s on node %d\n", id, jobid, nodeid)) numele = disrui(fd, &ret); if (ret != DIS_SUCCESS) goto done; argv = (char **)calloc(numele + 1, sizeof(char **)); assert(argv); for (i = 0;i < numele;i++) { argv[i] = disrst(fd, &ret); if (ret != DIS_SUCCESS) { arrayfree(argv); goto done; } } argv[i] = NULL; numele = 4; envp = (char **)calloc(numele, sizeof(char **)); assert(envp); for (i = 0;;i++) { char *env; env = disrst(fd, &ret); if ((ret != DIS_SUCCESS) && (ret != DIS_EOD)) { arrayfree(argv); arrayfree(envp); goto done; } if (env == NULL) break; if (*env == '\0') { free(env); break; } /* ** Need to remember extra slot for NULL ** at the end. Thanks to Pete Wyckoff ** for finding this. */ if (i == numele - 2) { numele *= 2; envp = (char **)realloc(envp, numele * sizeof(char **)); assert(envp); } envp[i] = env; envp[i+1] = NULL; } /* tack on PBS_VNODENUM */ envp[i] = malloc(1024 * sizeof(char)); if (envp[i] == NULL) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "cannot alloc env memory)"); arrayfree(argv); arrayfree(envp); goto done; } sprintf(envp[i], "PBS_VNODENUM=%d", nodeid); i++; envp[i] = NULL; ret = DIS_SUCCESS; if (prev_error) { arrayfree(argv); arrayfree(envp); goto done; } /* ** If I'm Mother Suerior and the spawn happens on ** me, just do it. */ if ((pjob->ji_nodeid == 0) && (pjob->ji_nodeid == nodeid)) { /* XXX */ i = TM_ERROR; ptask = pbs_task_create(pjob, TM_NULL_TASK); if (ptask != NULL) { strcpy(ptask->ti_qs.ti_parentjobid, jobid); ptask->ti_qs.ti_parentnode = pjob->ji_nodeid; ptask->ti_qs.ti_parenttask = fromtask; if (LOGLEVEL >= 6) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "saving task (TM_SPAWN)"); } if (task_save(ptask) != -1) { ret = start_process(ptask, argv, envp); if (ret != -1) i = TM_OKAY; } } arrayfree(argv); arrayfree(envp); ret = tm_reply(fd, i, event); if (ret != DIS_SUCCESS) goto done; ret = diswsi( fd, ((i == TM_ERROR) ? TM_ESYSTEM : ptask->ti_qs.ti_task)); goto done; } /* END if ((pjob->ji_nodeid == 0) && (pjob->ji_nodeid == nodeid)) */ /* ** If I'm a regular mom and the destination is not ** MS, just send a GET_TID to MS. */ else if ((pjob->ji_nodeid != 0) && (nodeid != pjob->ji_vnods[0].vn_node)) { /* XXX */ pnode = &pjob->ji_vnods[0]; phost = pnode->vn_host; ep = event_alloc( IM_GET_TID, pnode->vn_host, TM_NULL_EVENT, TM_NULL_TASK); ep->ee_argv = argv; ep->ee_envp = envp; ep->ee_forward.fe_node = nodeid; ep->ee_forward.fe_event = event; ep->ee_forward.fe_taskid = fromtask; ret = im_compose( phost->hn_stream, jobid, cookie, IM_GET_TID, ep->ee_event, TM_NULL_TASK); if (ret != DIS_SUCCESS) goto done; ret = (rpp_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS; if (ret != DIS_SUCCESS) goto done; reply = FALSE; goto done; } /* END else if ((pjob->ji_nodeid != 0) && ...) */ /* ** If I am MS, generate the TID now, otherwise ** we are sending to MS who will do it when she gets ** the SPAWN. */ taskid = (pjob->ji_nodeid == 0) ? pjob->ji_taskid++ : TM_NULL_TASK; ep = event_alloc(IM_SPAWN_TASK, phost, event, fromtask); job_save(pjob, SAVEJOB_FULL); if (phost->hn_stream == -1) { phost->hn_stream = rpp_open(phost->hn_host, pbs_rm_port, NULL); } ret = im_compose( phost->hn_stream, jobid, cookie, IM_SPAWN_TASK, event, fromtask); if (ret != DIS_SUCCESS) goto done; ret = diswui(phost->hn_stream, pjob->ji_nodeid); if (ret != DIS_SUCCESS) goto done; ret = diswui(phost->hn_stream, taskid); if (ret != DIS_SUCCESS) goto done; ret = diswst(phost->hn_stream, pjob->ji_globid); if (ret != DIS_SUCCESS) goto done; for (i = 0;argv[i];i++) { ret = diswst(phost->hn_stream, argv[i]); if (ret != DIS_SUCCESS) goto done; } ret = diswst(phost->hn_stream, ""); if (ret != DIS_SUCCESS) goto done; for (i = 0;envp[i];i++) { ret = diswst(phost->hn_stream, envp[i]); if (ret != DIS_SUCCESS) goto done; } ret = (rpp_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS; if (ret != DIS_SUCCESS) goto done; reply = FALSE; arrayfree(argv); arrayfree(envp); break; case TM_SIGNAL: /* ** Send a signal to the specified task. ** ** read ( ** to task int ** signal int ** ) */ taskid = disrui(fd, &ret); if (ret != DIS_SUCCESS) goto err; signum = disrui(fd, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: SIGNAL %s on node %d task %d sig %d\n", id, jobid, nodeid, taskid, signum)) if (prev_error) goto done; if (pjob->ji_nodeid != nodeid) { /* not me XXX */ ep = event_alloc(IM_SIGNAL_TASK, phost, event, fromtask); if (phost->hn_stream == -1) { phost->hn_stream = rpp_open(phost->hn_host, pbs_rm_port, NULL); } ret = im_compose(phost->hn_stream, jobid, cookie, IM_SIGNAL_TASK, event, fromtask); if (ret != DIS_SUCCESS) goto done; ret = diswui(phost->hn_stream, pjob->ji_nodeid); /* XXX */ if (ret != DIS_SUCCESS) goto done; ret = diswsi(phost->hn_stream, taskid); if (ret != DIS_SUCCESS) goto done; ret = diswsi(phost->hn_stream, signum); if (ret != DIS_SUCCESS) goto done; ret = (rpp_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS; if (ret != DIS_SUCCESS) goto done; reply = FALSE; goto done; } /* END if (pjob->ji_nodeid != nodeid) */ /* ** Task should be here... look for it. */ if ((ptask = task_find(pjob, taskid)) == NULL) { ret = tm_reply(fd, TM_ERROR, event); if (ret != DIS_SUCCESS) goto done; ret = diswsi(fd, TM_ENOTFOUND); break; } if (LOGLEVEL >= 3) { sprintf(log_buffer, "%s: TM_SIGNAL %s from node %d task %d signal %d", id, jobid, nodeid, taskid, signum); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } kill_task(ptask, signum, 0); ret = tm_reply(fd, TM_OKAY, event); break; case TM_OBIT: /* ** Register an obit request for the specified task. ** ** read ( ** task to watch int ** ) */ taskid = disrui(fd, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: OBIT %s on node %d task %d\n", id, jobid, nodeid, taskid)) if (prev_error) goto done; if (pjob->ji_nodeid != nodeid) { /* not me */ ep = event_alloc(IM_OBIT_TASK, phost, event, fromtask); if (phost->hn_stream == -1) { phost->hn_stream = rpp_open(phost->hn_host, pbs_rm_port, NULL); } ret = im_compose( phost->hn_stream, jobid, cookie, IM_OBIT_TASK, event, fromtask); if (ret != DIS_SUCCESS) goto done; ret = diswui(phost->hn_stream, pjob->ji_nodeid); if (ret != DIS_SUCCESS) goto done; ret = diswsi(phost->hn_stream, taskid); if (ret != DIS_SUCCESS) goto done; ret = (rpp_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS; if (ret != DIS_SUCCESS) goto done; reply = FALSE; goto done; } /* ** Task should be here... look for it. */ if ((ptask = task_find(pjob, taskid)) == NULL) { ret = tm_reply(fd, TM_ERROR, event); if (ret != DIS_SUCCESS) goto done; ret = diswsi(fd, TM_ENOTFOUND); break; } if (ptask->ti_qs.ti_status >= TI_STATE_EXITED) { ret = tm_reply(fd, TM_OKAY, event); if (ret != DIS_SUCCESS) goto done; ret = diswsi(fd, ptask->ti_qs.ti_exitstat); } else { obitent *op = (obitent *)malloc(sizeof(obitent)); assert(op); CLEAR_LINK(op->oe_next); append_link(&ptask->ti_obits, &op->oe_next, op); op->oe_info.fe_node = nodeid; op->oe_info.fe_event = event; op->oe_info.fe_taskid = fromtask; reply = 0; } break; case TM_GETINFO: /* ** Get named info for a specified task. ** ** read ( ** task int ** name string ** ) */ taskid = disrui(fd, &ret); if (ret != DIS_SUCCESS) goto err; name = disrst(fd, &ret); if (ret != DIS_SUCCESS) goto err; DBPRT(("%s: GETINFO %s from node %d task %d name %s\n", id, jobid, nodeid, taskid, name)) if (prev_error) goto done; if (pjob->ji_nodeid != nodeid) { /* not me */ ep = event_alloc( IM_GET_INFO, phost, event, fromtask); if (phost->hn_stream == -1) { phost->hn_stream = rpp_open(phost->hn_host, pbs_rm_port, NULL); } ret = im_compose( phost->hn_stream, jobid, cookie, IM_GET_INFO, event, fromtask); if (ret == DIS_SUCCESS) { ret = diswui(phost->hn_stream, pjob->ji_nodeid); if (ret == DIS_SUCCESS) { ret = diswsi(phost->hn_stream, taskid); if (ret == DIS_SUCCESS) { ret = diswst(phost->hn_stream, name); } } } free(name); if (ret != DIS_SUCCESS) goto done; ret = (rpp_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS; if (ret != DIS_SUCCESS) goto done; reply = FALSE; goto done; } /* END if (pjob->ji_nodeid != nodeid) */ /* ** Task should be here... look for it. */ if ((ptask = task_find(pjob, taskid)) != NULL) { if ((ip = task_findinfo(ptask, name)) != NULL) { ret = tm_reply(fd, TM_OKAY, event); if (ret != DIS_SUCCESS) goto done; ret = diswcs(fd, ip->ie_info, ip->ie_len); break; } } ret = tm_reply(fd, TM_ERROR, event); if (ret != DIS_SUCCESS) goto done; ret = diswsi(fd, TM_ENOTFOUND); break; case TM_RESOURCES: /* get resource string for a node */ DBPRT(("%s: RESOURCES %s for node %d task %d\n", id, jobid, nodeid, taskid)) if (prev_error) goto done; if (pjob->ji_nodeid != nodeid) { /* not me XXX */ ep = event_alloc(IM_GET_RESC, phost, event, fromtask); if (phost->hn_stream == -1) { phost->hn_stream = rpp_open(phost->hn_host, pbs_rm_port, NULL); } ret = im_compose( phost->hn_stream, jobid, cookie, IM_GET_RESC, event, fromtask); if (ret != DIS_SUCCESS) goto done; ret = diswui(phost->hn_stream, pjob->ji_nodeid); if (ret != DIS_SUCCESS) goto done; ret = (rpp_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS; if (ret != DIS_SUCCESS) goto done; reply = FALSE; goto done; } /* END if (pjob->ji_nodeid != nodeid) */ info = resc_string(pjob); ret = tm_reply(fd, TM_OKAY, event); if (ret != DIS_SUCCESS) goto done; ret = diswst(fd, info); free(info); break; default: sprintf(log_buffer, "unknown command %d", command); tm_reply(fd, TM_ERROR, event); diswsi(fd, TM_EUNKNOWNCMD); DIS_tcp_wflush(fd); goto err; /*NOTREACHED*/ break; } /* END switch (command) */ done: free(jobid); free(cookie); if (reply) { DBPRT(("%s: REPLY %s\n", id, dis_emsg[ret])) if ((ret != DIS_SUCCESS) || (DIS_tcp_wflush(fd) == -1)) { sprintf(log_buffer, "comm failed %s", dis_emsg[ret]); log_err(errno, id, log_buffer); close_conn(fd); return(-1); } } return(0); err: if (ret != DIS_SUCCESS) sprintf(log_buffer, "bad header %s", dis_emsg[ret]); log_err(errno, id, log_buffer); ipadd = svr_conn[fd].cn_addr; sprintf(log_buffer, "message refused from port %d addr %ld.%ld.%ld.%ld", svr_conn[fd].cn_port, (ipadd & 0xff000000) >> 24, (ipadd & 0x00ff0000) >> 16, (ipadd & 0x0000ff00) >> 8, (ipadd & 0x000000ff)); close_conn(fd); if (jobid) free(jobid); if (cookie) free(cookie); return(-1); } /* END tm_request() */ /* * adoptSession -- * * Find a job that corresponds to a given alternative task management * id or job id and create a new task in it to monitor the usage of a * given session id. * * Result: * Returns TM_OK if the session id was adopted, and TM_ERROR if * it wasn't. The job identified by jobid or altid (eg rmsResourceId) * gets a new task, and that task has its session id set to monitor * sid. Various special values are set in the task to ensure that * PBS only monitors the new task, and doesn't attempt to control it. * * Side effects: * Saves the new task with task_save(). * Forces a mom_get_sample() * * */ static int adoptSession(pid_t sid, char *id, int command, char *cookie) { job *pjob; task *ptask; /* extern int next_sample_time; */ /* extern time_t time_resc_updated; */ /* Find the job that has this job/alt id */ for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { if (command == TM_ADOPT_JOBID) { if (strcmp(id, pjob->ji_qs.ji_jobid) == 0) break; } else { if (strcmp(id, pjob->ji_altid) == 0) break; } } if (pjob == NULL) { /* Didn't find a job with this resource id. Complain. */ (void)sprintf(log_buffer, "Adoption rejected: no job with id %1.30s", id); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, "adoptSession()", log_buffer); return TM_ERROR; } /* * Check cookie if we can (why bother!) Should check for * correct cookie first, it might be available! */ if (strcmp(cookie, "ADOPT COOKIE")) { char *oreo; attribute *at = &pjob->ji_wattr[(int)JOB_ATR_Cookie]; if (!(at->at_flags & ATR_VFLAG_SET)) { sprintf(log_buffer, "Adoption rejected: job %s has no cookie", pjob->ji_qs.ji_jobid); DBPRT(("%s\n", log_buffer)); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return TM_ERROR; } oreo = at->at_val.at_str; if (strcmp(oreo, cookie) != 0) { sprintf(log_buffer, "Adoption rejected: job %s cookie %s message %s", pjob->ji_qs.ji_jobid, oreo, cookie); DBPRT(("%s\n", log_buffer)); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return TM_ERROR; } } else { /* sprintf(log_buffer, "job %s cookie %s message %s", jobid, oreo, cookie); DBPRT(("%s\n", log_buffer)); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); */ DBPRT(("Trusting ADOPT cookie for job %s\n", pjob->ji_qs.ji_jobid)); } /* JMB--set task ID in such a way that makes it obvious this is an adopted task */ if (pjob->maxAdoptedTaskId == TM_NULL_TASK) { pjob->maxAdoptedTaskId = TM_ADOPTED_TASKID_BASE; } /* * DJH 27 Feb 2002. * Now create a task to monitor that sid. Use a task id that isn't * going to collide with the ones given to non-adopted tasks. */ ptask = pbs_task_create(pjob, (pjob->ji_taskid - 1) + TM_ADOPTED_TASKID_BASE); pjob->ji_taskid++; /* ti_parenttask not used but avoiding using TM_NULL_TASK as it means 'top level shell' to scan_for_exiting() */ ptask->ti_qs.ti_parenttask = TM_NULL_TASK + 1; ptask->ti_qs.ti_sid = sid; ptask->ti_qs.ti_status = TI_STATE_RUNNING; (void)task_save(ptask); /* Mark the job as running if we need to. This is copied from start_process() */ if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING) { pjob->ji_qs.ji_state = JOB_STATE_RUNNING; pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; job_save(pjob, SAVEJOB_QUICK); } if (mom_get_sample() == PBSE_NONE) { /* time_resc_updated = time_now; */ (void)mom_set_use(pjob); } /* next_sample_time = 45; */ (void)sprintf(log_buffer, "Task adopted. id=%1.30s, sid = %d", id, sid); DBPRT(("%s\n", log_buffer)); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return TM_OKAY; } /* * cat_dirs -- * * Concatenate root and base into a new string and return the result * * Result: * if root is null only the base value is returned. Otherwise a string * with the root followed by base is returned. If memory cannot be * allocated NULL is returned. * * Side effects: * If a non-null value is returned the new string will need * to later be freed * * */ char *cat_dirs(char *root, char *base) { char *pn; int len = 0; if(root) len = strlen(root); len += strlen(base); pn = malloc(len+2); if(!pn) { return(NULL); } if(root) { strcpy(pn, root); strcat(pn, base); } else strcpy(pn, base); return(pn); } /* * get_local_script_path -- * * takes a path given by base and prepends * the PBS_O_WORKDIR if base is a relative path. That is, if * base does not begin with '/'. Otherwise it returns * the value of base. cat_dirs allocates memory for the new * string so this has to be freed. * * If a null string is returned it is because cat_dirs could * not allocate memory */ char *get_local_script_path(job *pjob, char *base) { char *wdir; size_t len; char *pn = NULL; /* see if base is an absolute path*/ if(base[0] != '/') { /* base is not an absolute path. Prepend it with the working directory */ wdir = get_job_envvar(pjob, "PBS_O_WORKDIR"); len = strlen(wdir); if(wdir[len-1] != '/') strcat(wdir, "/"); pn = cat_dirs(wdir, base); } else pn = cat_dirs(NULL, base); /* cat_dirs will allocate memory to hold our string */ return(pn); } /* END mom_comm.c */