/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ #include /* the master config generated by configure */ #include "mom_comm.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if defined(NTOHL_NEEDS_ARPA_INET_H) && defined(HAVE_ARPA_INET_H) #include #endif #include #include "libpbs.h" #include "list_link.h" #include "attribute.h" #include "resource.h" #include "server_limits.h" #include "pbs_job.h" #include "pbs_nodes.h" #include "pbs_error.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "../lib/Libifl/lib_ifl.h" #include "net_connect.h" #include "net_cache.h" #include "dis.h" #include "dis_init.h" #include "mom_func.h" #include "batch_request.h" #include "resmon.h" #include "mom_comm.h" #include "mcom.h" #include "svrfunc.h" #include "u_tree.h" #include "utils.h" #include "../lib/Libnet/lib_net.h" /* get_hostaddr_hostent_af */ #include "mom_server.h" #include "mom_job_func.h" /* mom_job_purge */ #include "tcp.h" /* tcp_chan */ #ifdef PENABLE_LINUX26_CPUSETS #include "pbs_cpuset.h" #endif #define IM_FINISHED 1 #define IM_DONE 0 #define IM_FAILURE -1 /* Global Data Items */ extern int exiting_tasks; extern char *path_jobs; extern char *path_home; extern unsigned int pbs_mom_port; extern unsigned int pbs_rm_port; extern unsigned int pbs_tm_port; extern tlist_head svr_newjobs; extern tlist_head mom_polljobs; /* must have resource limits polled */ extern tlist_head svr_alljobs; /* all jobs under MOM's control */ extern int termin_child; extern time_t time_now; extern AvlTree okclients; extern int port_care; extern char *path_prologp; extern char *path_prologuserp; extern int multi_mom; extern int maxupdatesbeforesending; char *stat_string_aggregate = NULL; unsigned int ssa_index; unsigned long ssa_size; resizable_array *received_statuses; /* holds information on node's whose statuses we've received */ hash_table_t *received_table; int updates_waiting_to_send = 0; extern time_t LastServerUpdateTime; extern int ServerStatUpdateInterval; extern struct connection svr_conn[]; const char *PMOMCommand[] = { "ALL_OKAY", "JOIN_JOB", "KILL_JOB", "SPAWN_TASK", "GET_TASKS", "SIGNAL_TASK", "OBIT_TASK", "POLL_JOB", "GET_INFO", "GET_RESC", "ABORT_JOB", "GET_TID", "RADIX_ALL_OK", "JOIN_JOB_RADIX", "KILL_JOB_RADIX", "ERROR", /* 14+ */ NULL }; char task_fmt[] = "/%010.10ld"; char noglobid[] = "none"; extern int LOGLEVEL; extern long TJobStartBlockTime; enum rwhich { invalid, listen_out, listen_err, new_out, new_err}; struct routefd { enum rwhich r_which; /* Is this the listen out, err or new out and err sockets */ unsigned short r_fd; }; fd_set readset; /* external functions */ extern struct radix_buf **allocate_sister_list(int radix); extern int add_host_to_sister_list(char *, unsigned short , struct radix_buf *); extern void free_sisterlist(struct radix_buf **list, int radix); extern int open_demux(u_long addr, int port); extern int timeval_subtract( struct timeval *result, struct timeval *x, struct timeval *y); int start_process(task *, char **, char **); int allocate_demux_sockets(job *pjob, int flag); extern int TMomFinalizeJob1(job *, pjobexec_t *, int *); extern int TMomFinalizeJob2(pjobexec_t *, int *); extern int TMomFinalizeJob3(pjobexec_t *, int, int, int *); extern int TMOMJobGetStartInfo(job *, pjobexec_t **) ; extern int TMomCheckJobChild(pjobexec_t *, int, int *, int *); extern void job_nodes(job *); extern void sister_job_nodes( job *pjob, char *radix_hosts, char *radix_ports ); extern int tlist(tree *, char *, int); extern int TMakeTmpDir(job *, char *); extern int exec_job_on_ms(job *pjob); u_long gettime(resource *); u_long getsize(resource *); #ifdef NVIDIA_GPUS int setup_gpus_for_job(job *pjob); #endif /* NVIDIA_GPUS */ #ifdef PENABLE_LINUX26_CPUSETS int use_cpusets(job *); #ifndef NUMA_SUPPORT void create_cpuset_reservation_if_needed(job &pjob); #endif #endif /* PENABLE_LINUX26_CPUSETS */ /* END external functions */ int get_reply_stream(job *); int get_radix_reply_stream(job *); int run_prologue_scripts(job *pjob); char *cat_dirs(char *root, char *base); char *get_local_script_path(job *pjob, char *base); void *im_demux_thread(void *threadArg); void fork_demux(job *pjob); /* ** Save the critical information associated with a task to disk. */ int task_save( task *ptask) /* I */ { job *pjob; int fds; int i; int TaskID = 0; char namebuf[MAXPATHLEN + 1]; char portname[MAXPATHLEN + 1]; int openflags; if (ptask == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "NULL input pointer"); return(PBSE_BAD_PARAMETER); } pjob = ptask->ti_job; if (pjob == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "NULL pointer to owning job"); return(PBSE_BAD_PARAMETER); } strncpy(namebuf, path_jobs, sizeof(namebuf) - 1); /* job directory path */ strncat(namebuf, pjob->ji_qs.ji_fileprefix, sizeof(namebuf) - 1); /*TODO: think about stncats third arguments*/ if (multi_mom) { sprintf(portname, "%d", pbs_rm_port); /*TODO: do we have actually snprintf*/ /*snprintf(portname, sizeof(portname), "%d", pbs_rm_port);*/ strncat(namebuf, portname, sizeof(namebuf) - 1); } strncat(namebuf, JOB_TASKDIR_SUFFIX, sizeof(namebuf) - 1); openflags = O_WRONLY | O_CREAT | O_Sync; if (LOGLEVEL >= 6) { sprintf(log_buffer, "saving task in %s", namebuf); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_SERVER, __func__, log_buffer); } #if defined(HAVE_OPEN64) && defined(LARGEFILE_WORKS) fds = open64(namebuf, openflags, 0600); #else fds = open(namebuf, openflags, 0600); #endif if (fds < 0) { log_err(errno, __func__, "error on open"); return(-1); } TaskID = ptask->ti_qs.ti_task; /* adjust task ID if it is adopted... */ if (IS_ADOPTED_TASK(ptask->ti_qs.ti_task)) { TaskID = ptask->ti_qs.ti_task % TM_ADOPTED_TASKID_BASE; } #ifdef HAVE_LSEEK64 if (lseek64(fds, (off_t)(TaskID*sizeof(ptask->ti_qs)), SEEK_SET) < 0) #else if (lseek(fds, (off_t)(TaskID*sizeof(ptask->ti_qs)), SEEK_SET) < 0) #endif { log_err(errno, __func__, "lseek"); close(fds); return(-1); } /* NOTE: to avoid partial write failures in fs full situations, */ /* attempt write of empty buffer, if success, then write actual task? */ /* (NYI) */ /* just write the "critical" base structure to the file */ while ((i = write_ac_socket(fds,&ptask->ti_qs,sizeof(ptask->ti_qs)) ) != sizeof(ptask->ti_qs)) { if ((i < 0) && (errno == EINTR)) { /* retry the write */ #ifdef HAVE_LSEEK64 if (lseek64(fds, (off_t)(TaskID*sizeof(ptask->ti_qs)), SEEK_SET) < 0) #else if (lseek(fds, (off_t)(TaskID*sizeof(ptask->ti_qs)), SEEK_SET) < 0) #endif { log_err(errno, __func__, "lseek"); close(fds); return(-1); } continue; } log_err(errno, __func__, "quickwrite"); close(fds); return(-1); } /* END while (i = write()) */ /* SUCCESS */ close(fds); return(0); } /* END task_save() */ /* ** Allocate an event and link it to the given nodeent entry. */ eventent *event_alloc( int command, hnodent *pnode, tm_event_t event, tm_task_id taskid) { static tm_event_t eventnum = TM_NULL_EVENT + 1; eventent *ep; ep = (eventent *)calloc(1, sizeof(eventent)); assert(ep); memset(ep, 0, sizeof(eventent)); ep->ee_command = command; ep->ee_event = (event == TM_NULL_EVENT) ? eventnum++ : event; ep->ee_taskid = taskid; ep->ee_parent_event = -1; ep->ee_forward.fe_node = TM_ERROR_NODE; ep->ee_forward.fe_event = TM_ERROR_EVENT; ep->ee_forward.fe_taskid = TM_NULL_TASK; ep->ee_argv = NULL; ep->ee_envp = NULL; CLEAR_LINK(ep->ee_next); append_link(&pnode->hn_events, &ep->ee_next, ep); return(ep); } /* END event_alloc() */ /* Forward declaration */ static int adoptSession(pid_t sid, pid_t pid, char *id, int command, char *cookie); /* * Create a new task if the current number is less then * the tasks per node limit. */ task *pbs_task_create( job *pjob, tm_task_id taskid) { task *ptask; pbs_attribute *at; resource_def *rd; resource *pres; u_long tasks; /* DJH 27 feb 2002. Check that we aren't about to run into the */ /* task IDs that we use to label adopted tasks. */ if ((taskid == TM_NULL_TASK) && (pjob->ji_taskid >= TM_ADOPTED_TASKID_BASE)) { sprintf(log_buffer, "Ran into reserved task IDs on job %s", pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buffer); return(NULL); } for (ptask = (task *)GET_NEXT(pjob->ji_tasks), tasks = 0; ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask), tasks++) /* NO-OP, counting */; at = &pjob->ji_wattr[JOB_ATR_resource]; rd = find_resc_def(svr_resc_def, "taskspn", svr_resc_size); /* NYI: are checks like this necessary? Before this was an assert, which shouldn't * be in production code, so I figure this is better than that --dbeer */ if (rd == NULL) { log_err(-1, __func__, "No tasks per node resource definition? TORQUE is very broken!"); return(NULL); } pres = find_resc_entry(at, rd); if (pres != NULL) { if (tasks >= (unsigned long)pres->rs_value.at_val.at_long) { return(NULL); } } ptask = (task *)calloc(1, sizeof(task)); if (ptask == NULL) { log_err(ENOMEM, __func__, "No memory to allocate task! IMMINENT FAILURE"); return(NULL); } /* initialize task */ ptask->ti_job = pjob; CLEAR_LINK(ptask->ti_jobtask); append_link(&pjob->ji_tasks, &ptask->ti_jobtask, ptask); ptask->ti_flags = 0; ptask->ti_register = TM_NULL_EVENT; CLEAR_HEAD(ptask->ti_obits); CLEAR_HEAD(ptask->ti_info); memset(ptask->ti_qs.ti_parentjobid, 0, sizeof(ptask->ti_qs.ti_parentjobid)); ptask->ti_qs.ti_parentnode = TM_ERROR_NODE; ptask->ti_qs.ti_parenttask = TM_NULL_TASK; ptask->ti_qs.ti_task = ((taskid == TM_NULL_TASK) ? pjob->ji_taskid++ : taskid); ptask->ti_qs.ti_status = TI_STATE_EMBRYO; ptask->ti_qs.ti_sid = 0; ptask->ti_qs.ti_exitstat = 0; memset(ptask->ti_qs.ti_u.ti_hold, 0, sizeof(ptask->ti_qs.ti_u.ti_hold)); /* SUCCESS */ return(ptask); } /* END pbs_task_create() */ task *task_find( job *pjob, tm_task_id taskid) { task *ptask; for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { if (ptask->ti_qs.ti_task == taskid) break; } return(ptask); } task *task_check( job *pjob, tm_task_id taskid) { task *ptask; if (taskid == TM_NULL_TASK) { /* don't bother with the error messages */ return(NULL); } ptask = task_find(pjob, taskid); if (ptask == NULL) { sprintf(log_buffer, "%s requesting task %ld not found", pjob->ji_qs.ji_jobid, (long)taskid); log_err(-1, __func__, log_buffer); return(NULL); } if ((ptask->ti_chan == NULL) || (ptask->ti_chan->sock < 0)) { sprintf(log_buffer, "cannot tm_reply to task %ld", (long)taskid); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(NULL); } return(ptask); } /* END task_check() */ /* ** task_recov() ** Recover (read in) the tasks from their save files for a job. ** ** This function is only needed upon MOM start up. */ int task_recov( job *pjob) { int fds; task *pt; char namebuf[MAXPATHLEN]; taskfix task_save; tm_task_id tid; if (multi_mom) { snprintf(namebuf, sizeof(namebuf), "%s%s%d%s", path_jobs, pjob->ji_qs.ji_fileprefix, pbs_rm_port, JOB_TASKDIR_SUFFIX); } else { snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_jobs, pjob->ji_qs.ji_fileprefix, JOB_TASKDIR_SUFFIX); } #if defined(HAVE_OPEN64) && defined(LARGEFILE_WORKS) fds = open64(namebuf, O_RDONLY, 0); #else fds = open(namebuf, O_RDONLY, 0); #endif if (fds < 0) { log_err(errno, __func__, "open of task file"); unlink(namebuf); return -1; } /* read in task quick save sub-structure */ while (read_ac_socket(fds, (char *)&task_save, sizeof(task_save)) == sizeof(task_save)) { tid = TM_NULL_TASK; if (IS_ADOPTED_TASK(task_save.ti_task)) { /* * Set the high water mark for adopted task ids. Its * "+1" due to the post-increment when we generate the * task ids. */ pjob->maxAdoptedTaskId = MAX(pjob->maxAdoptedTaskId, (int)(task_save.ti_task + 1)); tid = task_save.ti_task; } if ((pt = pbs_task_create(pjob, tid)) == NULL) { log_err(errno, __func__, "cannot create task"); close(fds); return -1; } pt->ti_qs = task_save; } /* END while read */ close(fds); return(0); } /* END task_recov() */ /* ** Send a reply message to a user proc over a TCP stream. */ int tm_reply( struct tcp_chan *chan, int com, tm_event_t event) { int ret; ret = diswsi(chan, TM_PROTOCOL); if (ret == DIS_SUCCESS) { ret = diswsi(chan, TM_PROTOCOL_VER); if (ret == DIS_SUCCESS) { ret = diswsi(chan, com); if (ret == DIS_SUCCESS) ret = diswsi(chan, event); } } if (ret != DIS_SUCCESS) { snprintf(log_buffer,sizeof(log_buffer), "error sending tm reply: %s\n", dis_emsg[ret]); log_err(-1,__func__,log_buffer); } return(ret); } /* tm_reply() */ /* ** Start a standard inter-MOM message. */ int im_compose( struct tcp_chan *chan, char *jobid, char *cookie, int command, tm_event_t event, tm_task_id taskid) { int ret; if (chan->sock < 0) { return(DIS_EOF); } if ((ret = diswsi(chan, IM_PROTOCOL)) != DIS_SUCCESS) { } else if ((ret = diswsi(chan, IM_PROTOCOL_VER)) != DIS_SUCCESS) { } else if ((ret = diswus(chan, pbs_rm_port)) != DIS_SUCCESS) { } else if ((ret = diswst(chan, jobid)) != DIS_SUCCESS) { } else if ((ret = diswst(chan, cookie)) != DIS_SUCCESS) { } else if ((ret = diswsi(chan, command)) != DIS_SUCCESS) { } else if ((ret = diswsi(chan, event)) != DIS_SUCCESS) { } else { ret = diswsi(chan, taskid); } if (ret != DIS_SUCCESS) { snprintf(log_buffer,sizeof(log_buffer), "send error %s\n", dis_emsg[ret]); log_err(-1, __func__, log_buffer); } return(ret); } /* END im_compose() */ /** * Send a message (command = com) to all the other MOMs in the job -> pjob. * * @see scan_for_exiting() - parent - report to sisters upon job completion * @see examine_all_polled_jobs() - parent - poll job status info * @see exec_bail() - parent - abort parallel job * * @see start_exec() - peer - opens connections to sisters at parallel job start * * @return 0 on FAILURE or number of sister mom's successfully contacted on SUCCESS */ int send_sisters( job *pjob, /* I */ int com, /* I (command to send to all sisters) */ int using_radix, /* I (TRUE if this job has a job radix, false otherwise */ std::set *sisters_contacted) { int i; int num; int ret = PBSE_NONE; int local_socket; struct tcp_chan *local_chan = NULL; int job_radix; int loop_limit; eventent *ep; char *cookie; resend_momcomm *mc; if (LOGLEVEL >= 4) { sprintf(log_buffer, "sending command %s for job %s (%d)", PMOMCommand[com], pjob->ji_qs.ji_jobid, com); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, __func__, log_buffer); } if (!(pjob->ji_wattr[JOB_ATR_Cookie].at_flags & ATR_VFLAG_SET)) { /* cookie not set - return FAILURE */ snprintf(log_buffer, sizeof(log_buffer), "cookie not set in send_sisters for job %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, __func__, log_buffer); return(0); } cookie = pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str; num = 0; if (com == IM_ABORT_JOB) { snprintf(log_buffer, sizeof(log_buffer), "sending ABORT to sisters for job %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, __func__, log_buffer); } if (using_radix == TRUE) { job_radix = 0; loop_limit = pjob->ji_numsisternodes; } else { /* set so that loop doesn't fail on this condition */ job_radix = -1; loop_limit = pjob->ji_numnodes; } /* walk thru node list, contact each mom */ for (i = 0; i < loop_limit && job_radix < pjob->ji_radix; i++) { hnodent *np; char *host_addr = NULL; unsigned short af_family; int local_errno; int addr_len; if (sisters_contacted != NULL) { if (sisters_contacted->find(i) == sisters_contacted->end()) continue; } if ((using_radix == TRUE) && (pjob->ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM)) { np = &pjob->ji_sisters[i]; if ((i == 0) || (i == 1)) /* ji_sisters[0] is the superior node to me and */ continue; /* ji_sisters[1] is me. We skip them */ /* we need to record the addresses of sister nodes for later */ get_hostaddr_hostent_af(&local_errno, np->hn_host, &af_family, &host_addr, &addr_len); memmove(&np->sock_addr.sin_addr, host_addr, addr_len); free(host_addr); np->sock_addr.sin_port = htons(np->hn_port); np->sock_addr.sin_family = af_family; job_radix++; } else { np = &pjob->ji_hosts[i]; if (np->hn_node == pjob->ji_nodeid) /* this is me*/ continue; } if (np->hn_sister != SISTER_OKAY) /* sister is gone? */ { snprintf(log_buffer, 1024, "%s: sister #%d (%s) is not ok (%d)", __func__, i, (np->hn_host != NULL) ? np->hn_host : "NULL", np->hn_sister); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } ep = event_alloc(com, np, TM_NULL_EVENT, TM_NULL_TASK); if (ep == NULL) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "cannot alloc event object in send_sisters"); continue; } local_socket = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr,sizeof(np->sock_addr)); if (IS_VALID_STREAM(local_socket) == FALSE) { if ((mc = (resend_momcomm *)calloc(1, sizeof(resend_momcomm))) != NULL) { mc->mc_type = COMPOSE_REPLY; mc->mc_struct = create_compose_reply_info(pjob->ji_qs.ji_jobid, cookie, np, com, TM_NULL_EVENT, TM_NULL_TASK); if (mc->mc_struct == NULL) free(mc); else add_to_resend_things(mc); } snprintf(log_buffer, sizeof(log_buffer), "%s: cannot open tcp connection to sister #%d (%s)", __func__, i, (np->hn_host != NULL) ? np->hn_host : "NULL"); log_record(PBSEVENT_ERROR,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); continue; } if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((ret = im_compose(local_chan,pjob->ji_qs.ji_jobid,cookie,com,ep->ee_event,TM_NULL_TASK)) == DIS_SUCCESS) { if ((ret = DIS_tcp_wflush(local_chan)) != DIS_SUCCESS) { sprintf(log_buffer, "%s:DIS_tcp_wflush failed", __func__); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid,log_buffer); } } close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); if (ret != DIS_SUCCESS) { if ((mc = (resend_momcomm *)calloc(1, sizeof(resend_momcomm))) != NULL) { mc->mc_type = COMPOSE_REPLY; mc->mc_struct = create_compose_reply_info(pjob->ji_qs.ji_jobid, cookie, np, com, TM_NULL_EVENT, TM_NULL_TASK); if (mc->mc_struct == NULL) free(mc); else add_to_resend_things(mc); } snprintf(log_buffer, sizeof(log_buffer), "%s: cannot compose message to sister #%d (%s) - %d", __func__, i, (np->hn_host != NULL) ? np->hn_host : "NULL", ret); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); np->hn_sister = SISTER_EOF; } else { np->hn_sister = SISTER_OKAY; num++; } } /* END for (i) */ return(num); } /* END send_sisters() */ /** * Send a message (command = com) to Mother Superior in the job -> pjob. * * @see examine_all_polled_jobs() - parent - poll job status info * @see exec_bail() - parent - abort parallel job * * @see start_exec() - peer - opens connections to sisters at parallel job start * * @return 0 on FAILURE or number of sister mom's successfully contacted on SUCCESS */ int send_ms( job *pjob, /* I */ int com) /* I (command to send to all sisters) */ { int num; int ret = PBSE_NONE; int local_socket; struct tcp_chan *local_chan = NULL; char *cookie; resend_momcomm *mc; hnodent *np; if (LOGLEVEL >= 4) { sprintf(log_buffer, "sending command %s for job %s (%d)", PMOMCommand[com], pjob->ji_qs.ji_jobid, com); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, __func__, log_buffer); } if (!(pjob->ji_wattr[JOB_ATR_Cookie].at_flags & ATR_VFLAG_SET)) { /* cookie not set - return FAILURE */ snprintf(log_buffer, sizeof(log_buffer), "cookie not set in send_ms for job %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, __func__, log_buffer); return(0); } cookie = pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str; num = 0; if (com == IM_ABORT_JOB) { snprintf(log_buffer, sizeof(log_buffer), "sending ABORT to mother superior for job %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, __func__, log_buffer); } if (am_i_mother_superior(*pjob) == true) return(1); /* walk thru node list, contact each mom */ np = &pjob->ji_hosts[0]; if (np->hn_sister != SISTER_OKAY) /* sister is gone? */ { snprintf(log_buffer, 1024, "%s: sister (%s) is not ok (%d)", __func__, (np->hn_host != NULL) ? np->hn_host : "NULL", np->hn_sister); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(0); } local_socket = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr,sizeof(np->sock_addr)); if (IS_VALID_STREAM(local_socket) == FALSE) { if ((mc = (resend_momcomm *)calloc(1, sizeof(resend_momcomm))) != NULL) { mc->mc_type = COMPOSE_REPLY; mc->mc_struct = create_compose_reply_info(pjob->ji_qs.ji_jobid, cookie, np, com, TM_NULL_EVENT, TM_NULL_TASK); if (mc->mc_struct == NULL) free(mc); else add_to_resend_things(mc); } snprintf(log_buffer, sizeof(log_buffer), "%s: cannot open tcp connection to sister (%s)", __func__, (np->hn_host != NULL) ? np->hn_host : "NULL"); log_record(PBSEVENT_ERROR,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); return(0); } if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((ret = im_compose(local_chan,pjob->ji_qs.ji_jobid,cookie,com,0,TM_NULL_TASK)) == DIS_SUCCESS) { if ((ret = DIS_tcp_wflush(local_chan)) != DIS_SUCCESS) { sprintf(log_buffer, "%s:DIS_tcp_wflush failed", __func__); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid,log_buffer); } } close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); if (ret != DIS_SUCCESS) { if ((mc = (resend_momcomm *)calloc(1, sizeof(resend_momcomm))) != NULL) { mc->mc_type = COMPOSE_REPLY; mc->mc_struct = create_compose_reply_info(pjob->ji_qs.ji_jobid, cookie, np, com, TM_NULL_EVENT, TM_NULL_TASK); if (mc->mc_struct == NULL) free(mc); else add_to_resend_things(mc); } snprintf(log_buffer, sizeof(log_buffer), "%s: cannot compose message to sister (%s) - %d", __func__,(np->hn_host != NULL) ? np->hn_host : "NULL", ret); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); np->hn_sister = SISTER_EOF; } else { np->hn_sister = SISTER_OKAY; num++; } return(num); } /* END send_ms() */ /** * Check to see which node a stream is coming from. Return a NULL * if it is not assigned to this job. Return a nodeent pointer if * it is. */ hnodent *find_node( job *pjob, int stream, tm_node_id nodeid) { int i; unsigned long connecting_ipaddr; unsigned long node_ipaddr; struct sockaddr_in *connecting_addr; struct sockaddr s_addr; struct sockaddr connecting_stack_addr; vnodent *vp; hnodent *hp; socklen_t len = sizeof(s_addr); if (getpeername(stream,&connecting_stack_addr,&len) != 0) { log_err(errno, __func__, "Couldn't find connecting information for this stream"); return(NULL); } connecting_addr = (struct sockaddr_in *)&connecting_stack_addr; for (vp = pjob->ji_vnods, i = 0;i < pjob->ji_numvnod;vp++, i++) { if (vp->vn_node == nodeid) break; } /* END for (vp) */ if (i == pjob->ji_numvnod) { sprintf(log_buffer, "node %d not found", nodeid); log_err(-1, __func__, log_buffer); return(NULL); } hp = vp->vn_host; node_ipaddr = ntohl(hp->sock_addr.sin_addr.s_addr); connecting_ipaddr = ntohl(connecting_addr->sin_addr.s_addr); if (node_ipaddr != connecting_ipaddr) { sprintf(log_buffer, "stream id %d does not match %d to node %d (stream=%lu node=%lu)", stream, hp->hn_stream, nodeid, connecting_ipaddr, node_ipaddr); log_err(-1, __func__, log_buffer); hp = NULL; } return(hp); } /* END find_node() */ /** * An error has been encountered starting a job. * * Format a message to all the sisterhood to get rid of their copy * of the job. There should be no processes running at this point. */ void job_start_error( job *pjob, /* I */ int code, /* I */ char *nodename) /* I */ { static char abortjobid[PBS_MAXSVRJOBID + 1]; static int abortcount = -1; pbs_attribute *pattr; char tmpLine[1024]; if (abortcount == -1) { abortjobid[0] = '\0'; } sprintf(log_buffer, "job_start_error from node %s in %s", nodename, __func__); log_err(code, pjob->ji_qs.ji_jobid, log_buffer); if (!strcmp(abortjobid, pjob->ji_qs.ji_jobid)) { if (abortcount >= 16) { /* abort is not working, do not send sisters again */ sprintf(log_buffer, "abort attempted 16 times in %s. ignoring abort request from node %s", __func__, nodename); log_err(code, pjob->ji_qs.ji_jobid, log_buffer); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); exec_bail(pjob, JOB_EXEC_RETRY); return; } abortcount++; } else { snprintf(abortjobid, sizeof(abortjobid), "%s", pjob->ji_qs.ji_jobid); abortcount = 1; } /* annotate job with failed node info */ snprintf(tmpLine, sizeof(tmpLine), "REJHOST=%s", nodename); pattr = &pjob->ji_wattr[JOB_ATR_sched_hint]; job_attr_def[JOB_ATR_sched_hint].at_free(pattr); job_attr_def[JOB_ATR_sched_hint].at_decode( pattr, NULL, NULL, tmpLine, 0); pjob->ji_wattr[JOB_ATR_errpath].at_flags = (ATR_VFLAG_SET | ATR_VFLAG_MODIFY | ATR_VFLAG_SEND); /* NOTE: is there a way to force the updated 'sched_hint' info to pbs_server before the obit to avoid a race condition? */ /* Perhaps, pbs_mom could register job and perform 'exec_bail' after next job status query from pbs_server? */ /* NOTE: exec_bail() will issue 'send_sisters(pjob,IM_ABORT_JOB);' */ snprintf(log_buffer, sizeof(log_buffer), "job %s failing on startup", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); exec_bail(pjob, JOB_EXEC_RETRY); return; } /* END job_start_error() */ /* ** Free calloc'ed array (used in SPAWN) */ void arrayfree( char **array) /* I - freed */ { int i; for (i = 0;array[i];i++) free(array[i]); free(array); return; } /** * Deal with events hooked to a node where a stream has gone * south or we are going away. * * @see term_job() - parent - terminate job * @see im_eof() - parent - inter-MOM end of file detected */ void node_bailout( job *pjob, /* I */ hnodent *np) /* I */ { task *ptask; eventent *ep; int i; unsigned int momport = 0; ep = (eventent *)GET_NEXT(np->hn_events); while (ep != NULL) { switch (ep->ee_command) { case IM_JOIN_JOB: { /* * I'm MS and a node has failed to respond to the * call. Maybe in the future the user can specify * the job can start with a range of nodes so * one (or more) missing can be tolerated. Not * for now. */ sprintf(log_buffer, "%s join_job failed from node %s %d - recovery attempted)", pjob->ji_qs.ji_jobid, np->hn_host, np->hn_node); log_err(-1, __func__, log_buffer); job_start_error(pjob, PBSE_SISCOMM, np->hn_host); break; } case IM_ABORT_JOB: case IM_KILL_JOB: /* ** The job is already in the process of being killed ** but somebody has dropped off the face of the ** earth. Just check to see if everybody has ** been heard from in some form or another and ** set JOB_SUBSTATE_EXITING if so. */ sprintf(log_buffer, "%s: received KILL/ABORT request for job %s from node %s", __func__, pjob->ji_qs.ji_jobid, np->hn_host); log_err(-1, __func__, log_buffer); for (i = 1;i < pjob->ji_numnodes;i++) { if (pjob->ji_hosts[i].hn_sister == SISTER_OKAY) break; } /* END for (i) */ if (i == pjob->ji_numnodes) { /* all dead */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); exiting_tasks = 1; } break; case IM_SPAWN_TASK: case IM_GET_TASKS: case IM_SIGNAL_TASK: case IM_OBIT_TASK: case IM_GET_INFO: case IM_GET_RESC: /* ** A user attempt failed, inform process. */ if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: REQUEST %d %s\n", __func__, ep->ee_command, pjob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } ptask = task_check(pjob, ep->ee_taskid); if (ptask == NULL) break; tm_reply(ptask->ti_chan, TM_ERROR, ep->ee_event); diswsi(ptask->ti_chan, TM_ESYSTEM); DIS_tcp_wflush(ptask->ti_chan); break; case IM_POLL_JOB: /* ** I must be Mother Superior for the job and ** this is an error reply to a poll request. */ #ifdef __TRR /* roadrunner */ sprintf(log_buffer, "%s POLL failed from node %s %d - recovery attempted - job will not be killed)", pjob->ji_qs.ji_jobid, np->hn_host, np->hn_node); log_err(-1, __func__, log_buffer); #else /* __TRR */ /* we should be more patient - how do we recover this connection? (NYI) */ /* if job pbs_attribute fault_tolerant is not set or set to false then kill the job */ if ((pjob->ji_wattr[JOB_ATR_fault_tolerant].at_flags & ATR_VFLAG_SET) && pjob->ji_wattr[JOB_ATR_fault_tolerant].at_val.at_long) { sprintf(log_buffer, "%s POLL failed from node %s %d - job is fault tolerant - job will not be killed)", pjob->ji_qs.ji_jobid, np->hn_host, np->hn_node); } else { sprintf(log_buffer, "%s POLL failed from node %s %d - recovery not attempted - job will be killed)", pjob->ji_qs.ji_jobid, np->hn_host, np->hn_node); pjob->ji_nodekill = np->hn_node; } log_err(-1, __func__, log_buffer); #endif /* __TRR */ break; case IM_GET_TID: /* ** A request to Mother Superior to get ** a TID has failed. */ arrayfree(ep->ee_argv); arrayfree(ep->ee_envp); ptask = task_check(pjob, ep->ee_forward.fe_taskid); if (ptask == NULL) break; tm_reply(ptask->ti_chan, TM_ERROR, ep->ee_forward.fe_event); diswsi(ptask->ti_chan, TM_ESYSTEM); DIS_tcp_wflush(ptask->ti_chan); break; default: sprintf(log_buffer, "unknown command %d saved", ep->ee_command); log_err(-1, __func__, log_buffer); break; } /* END switch (ep->ee_command) */ delete_link(&ep->ee_next); free(ep); ep = (eventent *)GET_NEXT(np->hn_events); } /* END while (ep != NULL) */ return; } /* END node_bailout() */ /** terminate job - terminate all node events of all types contained by nodes in job nodelist */ void term_job( job *pjob) /* I */ { hnodent *np; int num; for (num = 0, np = pjob->ji_hosts;num < pjob->ji_numnodes;num++, np++) { np->hn_sister = SISTER_EOF; node_bailout(pjob, np); } /* END for (num) */ return; } /* END term_job() */ /* * Check to be sure this is a connection from Mother Superior on * a good port. A good port is a privileged port if port_care is TRUE. * If pjob is NULL, only check that it is on a privileged port. * Check to make sure I am not Mother Superior (talking to myself). * Set the stream in ji_nodes[0] if needed. * @return false if the connection is not privileged or not from mother superior, true otherwise */ bool connection_from_ms( struct tcp_chan *chan, job *pjob, struct sockaddr_in *source_addr) { struct sockaddr_in *addr; unsigned long ipaddr_connect; hnodent *np; unsigned long ipaddr_ms; addr = source_addr; ipaddr_connect = ntohl(addr->sin_addr.s_addr); if ((port_care != FALSE) && (ntohs(addr->sin_port) >= IPPORT_RESERVED)) { sprintf(log_buffer, "non-privileged connection from %s", netaddr(addr)); log_err(-1, __func__, log_buffer); close(chan->sock); chan->sock = -1; return(false); } if (pjob == NULL) { return(true); } np = pjob->ji_hosts; if (pjob->ji_hosts == NULL) { log_err(PBSE_BAD_PARAMETER, __func__, "NULL ptr to job host management stuff"); return(false); } ipaddr_ms = ntohl(((struct sockaddr_in *)(&np->sock_addr))->sin_addr.s_addr); /* make sure the ip addresses match */ if (ipaddr_ms != ipaddr_connect) return(false); if (am_i_mother_superior(*pjob) == true) { log_err(-1, __func__, "Mother Superior talking to herself"); return(false); } return(true); } /* END connection_from_ms() */ u_long resc_used( job *pjob, const char *name, u_long(*func)(resource *)) { pbs_attribute *at; resource_def *rd; resource *pres; u_long val = 0L; at = &pjob->ji_wattr[JOB_ATR_resc_used]; if (at == NULL) { return(0); } rd = find_resc_def(svr_resc_def, name, svr_resc_size); if (rd == NULL) return 0; pres = find_resc_entry(at, rd); if (pres == NULL) return 0; val = func(pres); DBPRT(("resc_used: %s %lu\n", name, val)) return(val); } /* ** Find named info for a task. */ infoent *task_findinfo( task *ptask, char *name) { infoent *ip; for (ip = (infoent *)GET_NEXT(ptask->ti_info); ip; ip = (infoent *)GET_NEXT(ip->ie_next)) { if (strcmp(ip->ie_name, name) == 0) break; } return ip; } /* ** Save named info with a task. */ void task_saveinfo( task *ptask, char *name, void *info, size_t len) { infoent *ip; if ((ip = task_findinfo(ptask, name)) == NULL) { /* new name */ ip = (infoent *)calloc(1, sizeof(infoent)); assert(ip); CLEAR_LINK(ip->ie_next); append_link(&ptask->ti_info, &ip->ie_next, ip); ip->ie_name = name; } else { /* replace name with new info */ free(ip->ie_info); } ip->ie_info = info; ip->ie_len = len; return; } /* END task_saveinfo() */ /* ** Generate a resource string for a job. */ char *resc_string( job *pjob) { pbs_attribute *at; attribute_def *ad; svrattrl *pal; tlist_head lhead; int len, used, tot; char *res_str; const char *ch; const char *getuname(); int resc_access_perm = ATR_DFLAG_USRD; char *tmpResStr; ch = getuname(); len = strlen(ch); tot = len * 2; used = 0; res_str = (char *)calloc(1, tot); if (res_str == NULL) { /* FAILURE - cannot alloc memory */ return(NULL); } strcpy(res_str, ch); used += len; res_str[used++] = ':'; at = &pjob->ji_wattr[JOB_ATR_resource]; if (at->at_type != ATR_TYPE_RESC) { /* SUCCESS */ res_str[used] = '\0'; return(res_str); } ad = &job_attr_def[JOB_ATR_resource]; CLEAR_HEAD(lhead); ad->at_encode( at, &lhead, ad->at_name, NULL, ATR_ENCODE_CLIENT, resc_access_perm); attrl_fixlink(&lhead); for (pal = (svrattrl *)GET_NEXT(lhead); pal; pal = (svrattrl *)GET_NEXT(pal->al_link)) { while (used + pal->al_rescln + pal->al_valln > tot) { tot *= 2; tmpResStr = (char *)calloc(1, tot); if (tmpResStr == NULL) { /* FAILURE - cannot alloc memory */ free(res_str); return(NULL); } strcat(tmpResStr, res_str); free(res_str); res_str = tmpResStr; } strcpy(&res_str[used], pal->al_resc); used += (pal->al_rescln - 1); res_str[used++] = '='; strcpy(&res_str[used], pal->al_value); used += (pal->al_valln - 1); res_str[used++] = ','; } free_attrlist(&lhead); res_str[--used] = '\0'; /* SUCCESS */ return(res_str); } /* END resc_string() */ /* create the list of sisters to contact for this radix group * and send the job along */ int contact_sisters( job *pjob, tm_event_t event, int sister_count, char *radix_hosts, char *radix_ports) { int index; int j; int i; int mom_radix; hnodent *np; struct radix_buf **sister_list; int ret; tlist_head phead; pbs_attribute *pattr; char *host_addr = NULL; int addr_len; int local_errno; unsigned short af_family; /* we have to have a sister count of 2 or more for this to work */ if (sister_count <= 2) { return(-1); } mom_radix = pjob->ji_radix; CLEAR_HEAD(phead); pattr = pjob->ji_wattr; /* prepare the attributes to go out on the wire. at_encode does this */ for (i = 0;i < JOB_ATR_LAST;i++) { (job_attr_def + i)->at_encode( pattr + i, &phead, (job_attr_def + i)->at_name, NULL, ATR_ENCODE_MOM, ATR_DFLAG_ACCESS); } /* END for (i) */ attrl_fixlink(&phead); /* NYI: this code performs unnecessary steps. Fix later */ /* We have to put this job into the proper queues. These queues are filled in req_quejob and req_commit on Mother Superior for non-job_radix jobs */ append_link(&svr_newjobs, &pjob->ji_alljobs, pjob); /* from req_quejob */ delete_link(&pjob->ji_alljobs); /* from req_commit */ append_link(&svr_alljobs, &pjob->ji_alljobs, pjob); /* from req_commit */ /* initialize the nodes for every sister in this job only the first mom_radix+1 entries will be used for communication */ sister_job_nodes(pjob, radix_hosts, radix_ports); /* we now need to create the list of sisters to send to our intermediate MOMs in our job_radix */ sister_list = allocate_sister_list(mom_radix+1); /* We need to get the address and port of the MOM who called us (pjob->ji_sister[0]) so we can contact her when we call back later */ np = &pjob->ji_sisters[0]; ret = get_hostaddr_hostent_af(&local_errno, np->hn_host, &af_family, &host_addr, &addr_len); memmove(&np->sock_addr.sin_addr, host_addr, addr_len); np->sock_addr.sin_port = htons(np->hn_port); np->sock_addr.sin_family = af_family; /* Set this MOM as the first entry for everyone in the job_radix. This is how the children will know who called them. */ index = 1; for (j = 0; j <= mom_radix && j < sister_count-1; j++) { np = &pjob->ji_sisters[index]; add_host_to_sister_list(np->hn_host, np->hn_port, sister_list[j]); ret = get_hostaddr_hostent_af(&local_errno, np->hn_host, &af_family, &host_addr, &addr_len); memmove(&np->sock_addr.sin_addr, host_addr, addr_len); np->sock_addr.sin_port = htons(np->hn_port); np->sock_addr.sin_family = af_family; index++; } free(host_addr); free_sisterlist(sister_list, mom_radix+1); sister_list = allocate_sister_list(mom_radix); /* Add this node as the first node in each sister_list */ np = &pjob->ji_sisters[1]; for (i = 0; i < mom_radix; i++) { add_host_to_sister_list(np->hn_host, np->hn_port, sister_list[i]); } index = 2; /* index 2 is the first child node. */ do { for (j = 0; j < mom_radix && index < sister_count; j++) { /* Generate a list of sisters divided in to 'mom_radix' number of lists. For example an exec_host list of host1+host2+host3+host4+host5+host6+host7 would create sister lists on a mom_radix of 3 like the following host1+host4+host7 host2+host5 host3+host6 */ np = &pjob->ji_sisters[index]; add_host_to_sister_list(np->hn_host, np->hn_port, sister_list[j]); index++; } } while (index < sister_count); pjob->ji_sisters[1].hn_node = 1; /* This will also identify us an an intermediate node later */ /* we go to pjob->ji_sisters[1] because we do not want to include the parent node that sent the IM_JOIN_JOB_RADIX request as a sister to lower MOMs */ ret = open_tcp_stream_to_sisters(pjob, IM_JOIN_JOB_RADIX, event, mom_radix, &pjob->ji_sisters[1], sister_list, &phead, INTERMEDIATE_MOM); free_sisterlist(sister_list, mom_radix); free_attrlist(&phead); return(ret); } /* END contact_sisters */ void send_im_error( int err, int reply, job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask) { int socket; int i; int rc = DIS_SUCCESS; struct tcp_chan *local_chan = NULL; if (reply) { for (i = 0; i < 5; i++) { if ((socket = get_reply_stream(pjob)) < 0) { rc = DIS_INVALID; if (socket == PERMANENT_SOCKET_FAIL) break; } else if ((local_chan = DIS_tcp_setup(socket)) == NULL) { } else if ((rc = im_compose(local_chan,pjob->ji_qs.ji_jobid,cookie,IM_ERROR,event,fromtask)) != DIS_SUCCESS) { } else if ((rc = diswsi(local_chan,err)) != DIS_SUCCESS) { } else rc = DIS_tcp_wflush(local_chan); if (socket != -1) close(socket); if (local_chan != NULL) { DIS_tcp_cleanup(local_chan); local_chan = NULL; } if (rc == DIS_SUCCESS) break; } if (rc != DIS_SUCCESS) { resend_momcomm *mc; if ((mc = (resend_momcomm *)calloc(1, sizeof(resend_momcomm))) != NULL) { mc->mc_type = COMPOSE_REPLY; mc->mc_struct = create_compose_reply_info(pjob->ji_qs.ji_jobid, cookie, pjob->ji_hosts, IM_ERROR, TM_NULL_EVENT, TM_NULL_TASK); if (mc->mc_struct == NULL) free(mc); else add_to_resend_things(mc); } snprintf(log_buffer, sizeof(log_buffer), "Could not send error on event %d for job %s", event, pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buffer); } } } /* END send_im_error() */ int reply_to_join_job_as_sister( job *pjob, struct sockaddr_in *addr, char *cookie, tm_event_t event, int fromtask, int job_radix) { int socket; int retry_count; int ret = DIS_SUCCESS; int command; struct tcp_chan *local_chan = NULL; if (job_radix) command = IM_RADIX_ALL_OK; else command = IM_ALL_OKAY; for (retry_count = 0; retry_count < 5; retry_count++) { if (job_radix) socket = get_radix_reply_stream(pjob); else socket = get_reply_stream(pjob); if (socket < 0) { ret = PBSE_SOCKET_FAULT; if (socket == PERMANENT_SOCKET_FAIL) break; } else if ((local_chan = DIS_tcp_setup(socket)) == NULL) { } else if ((ret = im_compose(local_chan, pjob->ji_qs.ji_jobid, cookie, command, event, fromtask)) != DIS_SUCCESS) { } else ret = DIS_tcp_wflush(local_chan); if (socket >= 0) close(socket); if (local_chan != NULL) { DIS_tcp_cleanup(local_chan); local_chan = NULL; } if (ret == DIS_SUCCESS) { /* SUCCESS */ if (LOGLEVEL >= 8) log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "Successfully sent join job reply"); break; } usleep(10); } /* END for 5 retries */ if (ret != DIS_SUCCESS) { resend_momcomm *mc = (resend_momcomm *)calloc(1, sizeof(resend_momcomm)); im_compose_info *ici; if (mc != NULL) { ici = create_compose_reply_info(pjob->ji_qs.ji_jobid, cookie, pjob->ji_hosts, command, event, fromtask); if (ici != NULL) { mc->mc_type = COMPOSE_REPLY; mc->mc_struct = ici; add_to_resend_things(mc); } else free(mc); } /* FAILURE */ if ((ret >= 0) && (ret <= DIS_INVALID)) { snprintf(log_buffer,sizeof(log_buffer), "Couldn't send join job reply for job %s to %s - %s will try later", pjob->ji_qs.ji_jobid, netaddr(addr), dis_emsg[ret]); } else { snprintf(log_buffer,sizeof(log_buffer), "Couldn't send join job reply for job %s to %s will try later", pjob->ji_qs.ji_jobid, netaddr(addr)); } log_err(-1, __func__, log_buffer); } return(ret); } /* END reply_to_join_job_as_sister() */ /* ** Sender is mother superior sending a job structure to me. ** I am going to become a member of a job. ** ** auxiliary info ( ** localnode id int; ** number of nodes int; ** stdout port int; ** stderr port int; ** nodeid 0 int; ** ... ** nodeid n-1 int; ** jobattrs attrl; ** ) */ int im_join_job_as_sister( struct tcp_chan *chan, char *jobid, /* I */ struct sockaddr_in *addr, char *cookie, /* I */ tm_event_t event, int fromtask, int command, /* I */ int job_radix) { hnodent *np = NULL; attribute_def *pdef; job *pjob; tlist_head lhead; svrattrl *psatl; int ret; int nodeid; int index; int nodenum; int rc; int sister_count = 0; int resc_access_perm; char basename[50]; char namebuf[MAXPATHLEN]; char *radix_hosts = NULL; char *radix_ports = NULL; unsigned short momport = 0; nodeid = disrsi(chan, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer,"join_job request for job %s failed - %s (nodeid)", jobid, dis_emsg[ret]); log_err(-1, __func__, log_buffer); return(IM_FAILURE); } nodenum = disrsi(chan, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "join_job request from node %d for job %s failed - %s (nodenum)", nodeid, jobid, dis_emsg[ret]); log_err(-1, __func__, log_buffer); return(IM_FAILURE); } if (LOGLEVEL >= 3) { sprintf(log_buffer, "%s: JOIN_JOB %s node %d", __func__, jobid, nodeid); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } /* does job already exist? */ ret = get_job_struct(&pjob, jobid, command, chan, addr, nodeid); if (ret != PBSE_NONE) { if (ret == PBSE_DISPROTO) { return(IM_FAILURE); } else { return(IM_DONE); } } pjob->ji_numnodes = nodenum; /* XXX */ /* insert block based on radix */ if (job_radix == TRUE) { /* Get the nodes for this radix */ radix_hosts = disrst(chan, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "%s: join_job_radix request to node %d for job %s failed - %s (radix_hosts)", __func__, nodeid, jobid, dis_emsg[ret]); log_err(-1, __func__, log_buffer); if (radix_hosts != NULL) free(radix_hosts); return(IM_FAILURE); } radix_ports = disrst(chan, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "%s: join_job_radix request to node %d for job %s failed - %s (radix_ports)", __func__, nodeid, jobid, dis_emsg[ret]); log_err(-1, __func__, log_buffer); if (radix_ports != NULL) free(radix_ports); free(radix_hosts); return(IM_FAILURE); } sister_count = disrsi(chan, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "%s: join_job_radix request to node %d for job %s failed - %s (radix_ports)", __func__, nodeid, jobid, dis_emsg[ret]); log_err(-1, __func__, log_buffer); free(radix_hosts); free(radix_ports); return(IM_FAILURE); } } /* END if job_radix == TRUE */ CLEAR_HEAD(lhead); if (decode_DIS_svrattrl(chan, &lhead) != DIS_SUCCESS) { sprintf(log_buffer, "%s: join_job request to node %d for job %s failed - %s (decode)", __func__, nodeid, jobid, dis_emsg[ret]); log_err(-1, __func__, log_buffer); if (radix_hosts != NULL) free(radix_hosts); if (radix_ports != NULL) free(radix_ports); return(IM_FAILURE); } /* Get the hashname from the pbs_attribute. */ psatl = (svrattrl *)GET_NEXT(lhead); while (psatl) { if (!strcmp(psatl->al_name, ATTR_hashname)) { snprintf(basename, sizeof(basename), "%s", psatl->al_value); break; } psatl = (svrattrl *)GET_NEXT(psatl->al_link); } snprintf(pjob->ji_qs.ji_jobid, sizeof(pjob->ji_qs.ji_jobid), "%s", jobid); snprintf(pjob->ji_qs.ji_fileprefix, sizeof(pjob->ji_qs.ji_fileprefix), "%s", basename); pjob->ji_modified = 1; pjob->ji_nodeid = nodeid; pjob->ji_qs.ji_svrflags = 0; if ((job_radix == TRUE) && (sister_count > 2)) { pjob->ji_qs.ji_svrflags |= JOB_SVFLG_INTERMEDIATE_MOM; } pjob->ji_qs.ji_un_type = JOB_UNION_TYPE_MOM; /* decode attributes from request into job structure */ rc = 0; resc_access_perm = READ_WRITE; for (psatl = (svrattrl *)GET_NEXT(lhead); psatl; psatl = (svrattrl *)GET_NEXT(psatl->al_link)) { /* identify the pbs_attribute by name */ index = find_attr(job_attr_def, psatl->al_name, JOB_ATR_LAST); if (index < 0) { /* didn`t recognize the name */ rc = PBSE_NOATTR; break; } pdef = &job_attr_def[index]; /* decode pbs_attribute */ if ((rc = pdef->at_decode(&pjob->ji_wattr[index], psatl->al_name, psatl->al_resc, psatl->al_value,resc_access_perm)) != PBSE_NONE) break; } /* END for (psatl) */ free_attrlist(&lhead); if (rc != 0) { if (LOGLEVEL >= 6) { sprintf(log_buffer, "%s:error %d received in joinjob - purging job", __func__, rc); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } send_im_error(rc,1,pjob,cookie,event,fromtask); mom_job_purge(pjob); if (radix_hosts != NULL) free(radix_hosts); if (radix_ports != NULL) free(radix_ports); return(IM_DONE); } job_nodes(pjob); /* set remaining job structure elements */ pjob->ji_qs.ji_state = JOB_STATE_TRANSIT; pjob->ji_qs.ji_substate = JOB_SUBSTATE_PRERUN; pjob->ji_qs.ji_stime = time_now; pjob->ji_wattr[JOB_ATR_mtime].at_val.at_long = (long)time_now; pjob->ji_wattr[JOB_ATR_mtime].at_flags |= ATR_VFLAG_SET; /* check_pwd is setting up ji_un as type MOM * pjob->ji_qs.ji_un_type = JOB_UNION_TYPE_NEW; * pjob->ji_qs.ji_un.ji_newt.ji_fromsock = -1; * pjob->ji_qs.ji_un.ji_newt.ji_fromaddr = addr->sin_addr.s_addr; * pjob->ji_qs.ji_un.ji_newt.ji_scriptsz = 0; **/ if (check_pwd(pjob) == NULL) { /* log_buffer populated in check_pwd() */ log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); send_im_error(PBSE_BADUSER,1,pjob,cookie,event,fromtask); mom_job_purge(pjob); if (radix_hosts != NULL) free(radix_hosts); if (radix_ports != NULL) free(radix_ports); return(IM_DONE); } /* should we make a tmpdir? */ if (TTmpDirName(pjob, namebuf, sizeof(namebuf))) { if (TMakeTmpDir(pjob, namebuf) != PBSE_NONE) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "cannot create tmp dir"); send_im_error(PBSE_BADUSER,1,pjob,cookie,event,fromtask); mom_job_purge(pjob); if (radix_hosts != NULL) free(radix_hosts); if (radix_ports != NULL) free(radix_ports); return(IM_DONE); } } #ifdef PENABLE_LINUX26_CPUSETS #ifndef NUMA_SUPPORT if (use_cpusets(pjob) == TRUE) { sprintf(log_buffer, "about to create cpuset for job %s.\n", pjob->ji_qs.ji_jobid); log_ext(-1, __func__, log_buffer, LOG_INFO); create_cpuset_reservation_if_needed(*pjob); if (create_job_cpuset(pjob) == FAILURE) { sprintf(log_buffer, "Could not create cpuset for job %s.\n", pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buffer); } } #endif /* ndef NUMA_SUPPORT */ #endif /* (PENABLE_LINUX26_CPUSETS) */ ret = run_prologue_scripts(pjob); if (ret != PBSE_NONE) { send_im_error(ret, 1, pjob, cookie, event, fromtask); mom_job_purge(pjob); if (radix_hosts != NULL) free(radix_hosts); if (radix_ports != NULL) free(radix_ports); return(IM_DONE); } #if IBM_SP2==2 /* IBM SP with PSSP 3.1 */ if (load_sp_switch(pjob) != 0) { send_im_error(PBSE_SYSTEM,1,pjob,cookie,event,fromtask); log_err(-1, __func__, "cannot load sp switch table"); mom_job_purge(pjob); if (radix_hosts != NULL) free(radix_hosts); if (radix_ports != NULL) free(radix_ports); return(IM_DONE); } #endif /* IBM SP */ if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_FULL, momport); sprintf(log_buffer, "JOIN JOB as node %d", nodeid); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); if ((job_radix == TRUE) && (sister_count > 2)) { /* handle the case where we're contacting multiple nodes */ if ((pjob->ji_wattr[JOB_ATR_job_radix].at_flags & ATR_VFLAG_SET) && (pjob->ji_wattr[JOB_ATR_job_radix].at_val.at_long != 0)) { pjob->ji_radix = pjob->ji_wattr[JOB_ATR_job_radix].at_val.at_long; } pjob->ji_im_nodeid = 1; /* this will identify us as an intermediate node later */ if (allocate_demux_sockets(pjob, INTERMEDIATE_MOM)) { free(radix_hosts); free(radix_ports); return(IM_DONE); } contact_sisters(pjob,event,sister_count,radix_hosts,radix_ports); pjob->ji_intermediate_join_event = event; job_save(pjob,SAVEJOB_FULL,momport); free(radix_ports); free(radix_hosts); return(IM_DONE); } else { unsigned short af_family; char *host_addr = NULL; int addr_len; int local_errno; /* handle the single contact case */ if (job_radix == TRUE) { sister_job_nodes(pjob, radix_hosts, radix_ports); free(radix_ports); radix_ports = NULL; np = &pjob->ji_sisters[0]; if (np != NULL) { ret = get_hostaddr_hostent_af(&local_errno, np->hn_host, &af_family, &host_addr, &addr_len); memmove(&np->sock_addr.sin_addr, host_addr, addr_len); free(host_addr); np->sock_addr.sin_port = htons(np->hn_port); np->sock_addr.sin_family = af_family; } /* This is a leaf node in the job radix hierarchy. pjob->ji_radix needs to be set to non-zero for later in tm_spawn calls. */ pjob->ji_radix = 2; } } /* ** if certain resource limits require that the job usage be ** polled, we link the job to mom_polljobs. ** ** NOTE: we overload the job field ji_jobque for this as it ** is not used otherwise by MOM */ if (mom_do_poll(pjob)) append_link(&mom_polljobs, &pjob->ji_jobque, pjob); append_link(&svr_alljobs, &pjob->ji_alljobs, pjob); /* establish a connection and write the reply back */ if ((reply_to_join_job_as_sister(pjob, addr, cookie, event, fromtask, job_radix)) == DIS_SUCCESS) ret = IM_DONE; if (radix_ports != NULL) free(radix_ports); if (radix_hosts != NULL) free(radix_hosts); return(ret); } /* END im_join_job_as_sister() */ /* * takes care of killing of a job for which I'm not the mother superior * * Sender is (must be) mom superior * commanding me to kill a job which I should be a * part of. Send a signal and set the jobstate to begin * the kill. We wait for all tasks to exit before sending * an obit to mother superior. If I have spawned not tasks * no obit will be sent. * * @param id - the id of the caller * @param pjob - the job we're killing * @param radix - true if this is a job radix, false otherwise */ void im_kill_job_as_sister( job *pjob, /* M */ tm_event_t event, /* I */ unsigned int momport, /* I */ int radix) /* I */ { /* If we are an intermediate mom we need to tell our radix the job has been killed */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM) && (radix == TRUE)) { pjob->ji_outstanding = send_sisters(pjob, IM_KILL_JOB_RADIX, TRUE); } /* * Send the jobs a signal but we have to wait to * do a reply to mother superior until the procs * die and are reaped. */ kill_job(pjob, SIGKILL, __func__, "kill_job message received"); pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; pjob->ji_obit = event; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM) && (radix == TRUE)) { exiting_tasks = 0; } else exiting_tasks = 1; /* Setting this to 1 will cause scan_for_exiting to execute */ } /* END im_kill_job_as_sister() */ /* * im_spawn_task * * Sender is a MOM in a job that wants to start a task. * I am MOM on the node that is to run the task. * * auxiliary info ( * parent node tm_node_id * task id tm_task_id * global id string * argv 0 string * ... * argv n string * null * envp 0 string * ... * envp m string * ) */ int im_spawn_task( struct tcp_chan *chan, char *cookie, /* I */ tm_event_t event, /* I */ struct sockaddr_in *addr, /* I */ tm_task_id fromtask, /* I */ job *pjob) /* M */ { int ret; int taskid; int num; int i; int local_socket; struct tcp_chan *local_chan = NULL; tm_node_id nodeid; char *globid = NULL; char *cp; char *jobid = pjob->ji_qs.ji_jobid; char **argv; char **envp; task *ptask; nodeid = disrsi(chan, &ret); if (ret == DIS_SUCCESS) { if (find_node(pjob, chan->sock, nodeid) == NULL) { send_im_error(PBSE_BADHOST,1,pjob,cookie,event,fromtask); return(IM_DONE); } taskid = disrsi(chan, &ret); if (ret == DIS_SUCCESS) { globid = disrst(chan, &ret); } } if (ret != DIS_SUCCESS) { if (globid != NULL) free(globid); return(IM_FAILURE); } if (LOGLEVEL >= 3) { sprintf(log_buffer, "INFO: received request '%s' from %s for job '%s' (spawning task on node '%d' with taskid=%d, globid='%s'", PMOMCommand[IM_SPAWN_TASK], netaddr(addr), jobid, nodeid, taskid, globid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } if ((pjob->ji_globid[0] == '\0') || (strcmp(pjob->ji_globid, noglobid) == 0)) { snprintf(pjob->ji_globid, sizeof(pjob->ji_globid), "%s", globid); } else if (strcmp(pjob->ji_globid, globid) != 0) { DBPRT(("%s: globid job %s received %s\n", __func__, pjob->ji_globid, globid)) } free(globid); globid = NULL; num = 4; argv = (char **)calloc(sizeof(char *), num); if (argv == NULL) return(IM_FAILURE); for (i = 0;;i++) { if ((cp = disrst(chan, &ret)) == NULL) break; if (ret != DIS_SUCCESS) break; if (*cp == '\0') { free(cp); break; } if (i == num - 1) { char **tmpArgV; num *= 2; tmpArgV = (char **)realloc(argv,num * sizeof(char *)); if (tmpArgV == NULL) { free(cp); return(IM_FAILURE); } argv = tmpArgV; } argv[i] = cp; } /* END for (i) */ argv[i] = NULL; if (ret != DIS_SUCCESS) { arrayfree(argv); return(IM_FAILURE); } num = 8; envp = (char **)calloc(sizeof(char *), num); assert(envp); for (i = 0;;i++) { if ((cp = disrst(chan, &ret)) == NULL) break; if (ret != DIS_SUCCESS) break; if (*cp == '\0') { free(cp); break; } if (i == num - 1) { char **tmp = (char **)calloc(num * 2, sizeof(char *)); if (tmp == NULL) { if (envp != NULL) free(envp); arrayfree(argv); free(cp); return(ENOMEM); } else { memcpy(tmp, envp, sizeof(char *) * num); free(envp); envp = tmp; num *= 2; } } envp[i] = cp; } /* END for (i) */ envp[i] = NULL; if ((ret != DIS_EOD) && (ret != DIS_EOF)) { arrayfree(argv); arrayfree(envp); return(IM_FAILURE); } /* do the spawn */ ret = DIS_SUCCESS; if ((ptask = pbs_task_create(pjob, taskid)) == NULL) { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (cannot create task)", PMOMCommand[IM_SPAWN_TASK], netaddr(addr), jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } send_im_error(PBSE_SYSTEM,1,pjob,cookie,event,fromtask); } else { strcpy(ptask->ti_qs.ti_parentjobid, jobid); ptask->ti_qs.ti_parentnode = nodeid; ptask->ti_qs.ti_parenttask = fromtask; if (LOGLEVEL >= 6) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, "saving task (IM_SPAWN_TASK)"); } if (task_save(ptask) == -1) { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (cannot save task)", PMOMCommand[IM_SPAWN_TASK], netaddr(addr), jobid); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, jobid, log_buffer); } send_im_error(PBSE_SYSTEM,1,pjob,cookie,event,fromtask); } else { if (start_process(ptask, argv, envp) == -1) { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (cannot start task)", PMOMCommand[IM_SPAWN_TASK], netaddr(addr), jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } send_im_error(TM_ESYSTEM,1,pjob,cookie,event,fromtask); } else { if ((local_socket = get_reply_stream(pjob)) < 0) { } else if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((ret = im_compose(local_chan,jobid,cookie,IM_ALL_OKAY,event,fromtask)) != DIS_SUCCESS) { } else if ((ret = diswsi(local_chan, ptask->ti_qs.ti_task)) != DIS_SUCCESS) { } else ret = DIS_tcp_wflush(local_chan); if (local_socket != -1) close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); if (ret != DIS_SUCCESS) { /* SUCCESS but cannot send response message */ resend_momcomm *mc; spawn_task_info *st; if ((mc = (resend_momcomm *)calloc(1, sizeof(resend_momcomm))) != NULL) { mc->mc_type = COMPOSE_REPLY; st = (spawn_task_info *)calloc(1, sizeof(spawn_task_info)); if (st != NULL) { st->ici = create_compose_reply_info(pjob->ji_qs.ji_jobid, cookie, pjob->ji_hosts, IM_SPAWN_TASK, TM_NULL_EVENT, TM_NULL_TASK); if (st->ici == NULL) { free(st); free(mc); } else { st->ti_task = ptask->ti_qs.ti_task; mc->mc_struct = st; add_to_resend_things(mc); } } else free(mc); } if (LOGLEVEL >= 0) { sprintf(log_buffer, "ALERT: received request '%s' from %s for job '%s' (task successfully started but send response failed)", PMOMCommand[IM_SPAWN_TASK], netaddr(addr), jobid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } } /* END SUCCESS but can't send response */ } /* END SUCCESS i.e. started process successfully */ } } arrayfree(argv); arrayfree(envp); return(IM_DONE); } /* END im_spawn_task() */ /* ** Sender is MOM sending a task and signal to ** deliver. If taskid is 0, signal all tasks. ** ** auxiliary info ( ** sending node tm_node_id; ** taskid tm_task_id; ** signal int; ** ) */ int im_signal_task( struct tcp_chan *chan, job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask) { int nodeid; int taskid; int ret; int sig; int socket; struct tcp_chan *local_chan = NULL; char *jobid = pjob->ji_qs.ji_jobid; task *ptask = NULL; /* first read all of the data */ nodeid = disrsi(chan, &ret); if (ret == DIS_SUCCESS) { taskid = disrsi(chan, &ret); if (ret == DIS_SUCCESS) { sig = disrsi(chan, &ret); } } if (ret != DIS_SUCCESS) return(IM_FAILURE); if (find_node(pjob, chan->sock, nodeid) == NULL) { send_im_error(PBSE_BADHOST,1,pjob,cookie,event,fromtask); return(IM_DONE); } if (taskid == 0) { snprintf(log_buffer,sizeof(log_buffer), "%s: SIGNAL_TASK %s from node %d all tasks signal %d\n", __func__, jobid, nodeid, sig); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { kill_task(ptask, sig, 0); } /* if STOPing all tasks, we're obviously suspending the job */ if (sig == SIGSTOP) { pjob->ji_qs.ji_substate = JOB_SUBSTATE_SUSPEND; pjob->ji_qs.ji_svrflags |= JOB_SVFLG_Suspend; } else if (sig == SIGCONT) { pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_Suspend; } } else { snprintf(log_buffer,sizeof(log_buffer), "%s: SIGNAL_TASK %s from node %d task %d signal %d\n", __func__, jobid, nodeid, taskid, sig); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); if ((ptask = task_find(pjob, taskid))) kill_task(ptask, sig, 0); } if ((socket = get_reply_stream(pjob)) < 0) { } else if ((local_chan = DIS_tcp_setup(socket)) == NULL) { } else if ((ret = im_compose(local_chan, jobid, cookie, IM_ALL_OKAY, event, fromtask)) != DIS_SUCCESS) { } else DIS_tcp_wflush(local_chan); if (socket != -1) close(socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); if (ret != DIS_SUCCESS) { resend_momcomm *mc = (resend_momcomm *)calloc(1, sizeof(resend_momcomm)); if (mc != NULL) { mc->mc_struct = create_compose_reply_info(jobid, cookie, pjob->ji_hosts, IM_SIGNAL_TASK, event, fromtask); if (mc->mc_struct == NULL) free(mc); else { mc->mc_type = COMPOSE_REPLY; add_to_resend_things(mc); } } } return(IM_DONE); } /* END im_signal_task() */ /* ** Sender is MOM sending a request to monitor a ** task for exit. ** ** auxiliary info ( ** sending node tm_node_id; ** taskid tm_task_id; ** ) */ int im_obit_task( struct tcp_chan *chan, job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask) { int nodeid; int taskid; int ret; int local_socket; struct tcp_chan *local_chan = NULL; char *jobid = pjob->ji_qs.ji_jobid; task *ptask = NULL; nodeid = disrsi(chan, &ret); if (ret == DIS_SUCCESS) { taskid = disrsi(chan, &ret); } if (ret != DIS_SUCCESS) return(IM_FAILURE); if (find_node(pjob, chan->sock, nodeid) == NULL) { send_im_error(PBSE_BADHOST,1,pjob,cookie,event,fromtask); return(IM_DONE); } ptask = task_find(pjob, taskid); if (ptask == NULL) { send_im_error(PBSE_JOBEXIST,1,pjob,cookie,event,fromtask); return(IM_DONE); } snprintf(log_buffer,sizeof(log_buffer), "%s: OBIT_TASK %s from node %d task %d\n", __func__, jobid, nodeid, taskid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); if (ptask->ti_qs.ti_status >= TI_STATE_EXITED) { local_socket = get_reply_stream(pjob); if (IS_VALID_STREAM(local_socket)) { if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((ret = im_compose(local_chan, jobid, cookie, IM_ALL_OKAY, event, fromtask)) != DIS_SUCCESS) { } else if ((ret = diswsi(local_chan, ptask->ti_qs.ti_exitstat)) != DIS_SUCCESS) { } else ret = DIS_tcp_wflush(local_chan); close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); if (ret != DIS_SUCCESS) { resend_momcomm *mc = (resend_momcomm *)calloc(1, sizeof(resend_momcomm)); obit_task_info *ot; if (mc != NULL) { if ((ot = (obit_task_info *)calloc(1, sizeof(obit_task_info))) == NULL) { free(mc); } else { ot->ici = create_compose_reply_info(jobid, cookie, pjob->ji_hosts, IM_OBIT_TASK, event, fromtask); if (ot->ici == NULL) { free(ot); free(mc); } else { mc->mc_type = OBIT_TASK_REPLY; ot->ti_exitstat = ptask->ti_qs.ti_exitstat; mc->mc_struct = ot; add_to_resend_things(mc); } } } } } } else { /* save obit request with task */ obitent *op = (obitent *)calloc(1, sizeof(obitent)); if (op == NULL) { log_err(ENOMEM, __func__, "Cannot allocate memory for the obit entry"); } else { CLEAR_LINK(op->oe_next); append_link(&ptask->ti_obits, &op->oe_next, op); op->oe_info.fe_node = nodeid; op->oe_info.fe_event = event; op->oe_info.fe_taskid = fromtask; } } return(IM_DONE); } /* END im_obit_task() */ /* ** Sender is MOM sending a task and name to lookup ** for info to report back. ** ** auxiliary info ( ** sending node tm_node_id; ** taskid tm_task_id; ** name string; ** ) */ int im_get_info( struct tcp_chan *chan, job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask) { int nodeid; int taskid; int ret; int local_socket; struct tcp_chan *local_chan = NULL; char *jobid = pjob->ji_qs.ji_jobid; char *name = NULL; task *ptask = NULL; infoent *ip; nodeid = disrsi(chan, &ret); if (ret == DIS_SUCCESS) { taskid = disrsi(chan, &ret); if (ret == DIS_SUCCESS) name = disrst(chan, &ret); } if (ret != DIS_SUCCESS) { if (name != NULL) free(name); return(IM_FAILURE); } if (find_node(pjob, chan->sock, nodeid) == NULL) { send_im_error(PBSE_BADHOST,1,pjob,cookie,event,fromtask); free(name); return(IM_DONE); } ptask = task_find(pjob, taskid); if (ptask == NULL) { send_im_error(PBSE_JOBEXIST,1,pjob,cookie,event,fromtask); free(name); return(IM_DONE); } snprintf(log_buffer,sizeof(log_buffer), "%s: GET_INFO %s from node %d task %d name %s\n", __func__, jobid, nodeid, taskid, name); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); if ((ip = task_findinfo(ptask, name)) == NULL) { send_im_error(PBSE_JOBEXIST,1,pjob,cookie,event,fromtask); free(name); return(IM_DONE); } free(name); local_socket = get_reply_stream(pjob); if (IS_VALID_STREAM(local_socket)) { if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((ret = im_compose(local_chan,jobid,cookie,IM_ALL_OKAY,event,fromtask)) != DIS_SUCCESS) { } else if ((ret = diswcs(local_chan, (const char *)ip->ie_info, ip->ie_len)) != DIS_SUCCESS) { } else DIS_tcp_wflush(local_chan); close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); } return(IM_DONE); } /* END im_get_info() */ /* * im_get_resc_as_sister * * Sender is MOM requesting resource info to * report back its client. * */ int im_get_resc_as_sister( struct tcp_chan *chan, job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask) { int nodeid; int ret; int i; int local_socket; struct tcp_chan *local_chan = NULL; char *jobid = pjob->ji_qs.ji_jobid; char *info = NULL; vnodent *vp; nodeid = disrsi(chan, &ret); if (ret != DIS_SUCCESS) return(IM_FAILURE); /* find the node using just the id */ for (vp = pjob->ji_vnods, i = 0;i < pjob->ji_numvnod;vp++, i++) { if (vp->vn_node == nodeid) break; } /* END for (vp) */ if (i == pjob->ji_numvnod) { sprintf(log_buffer, "node %d not found", nodeid); log_err(-1, __func__, log_buffer); send_im_error(PBSE_BADHOST,1,pjob,cookie,event,fromtask); return(IM_DONE); } snprintf(log_buffer,sizeof(log_buffer), "%s: GET_RESC %s from node %d\n", __func__, jobid, nodeid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); info = resc_string(pjob); local_socket = get_reply_stream(pjob); if (IS_VALID_STREAM(local_socket)) { if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((ret = im_compose(local_chan,jobid,cookie,IM_ALL_OKAY,event,fromtask)) != DIS_SUCCESS) { } else if ((ret = diswst(local_chan, info)) != DIS_SUCCESS) { } else DIS_tcp_wflush(local_chan); close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); } if (info != NULL) free(info); return(IM_DONE); } /* END im_get_resc_as_sister() */ /* * get_reply_stream * * NOTE: assumes pjob isn't NULL * NOTE: this stream needs to be closed * * @return the open stream to mother superior, or -1 on error */ int get_reply_stream( job *pjob) /* I */ { hnodent *np = pjob->ji_hosts; return (tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr,sizeof(np->sock_addr))); } /* END get_reply_stream() */ /* * get_radix_reply_stream * * NOTE: assumes pjob isn't NULL * NOTE: this stream needs to be closed * * @return the open stream to mother superior or intermmediate mom * on a job radix, or -1 on error */ int get_radix_reply_stream( job *pjob) /* I */ { hnodent *np = pjob->ji_sisters; return (tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr,sizeof(np->sock_addr))); } /* END get_radix_reply_stream() */ /* * im_poll_job_as_sister * * @parent im_request * */ int im_poll_job_as_sister( job *pjob, /* I */ char *cookie, /* I */ tm_event_t event, /* I */ tm_task_id fromtask) /* I */ { int should_kill_job = FALSE; int ret; int local_socket; struct tcp_chan *local_chan = NULL; unsigned int momport = 0; char *jobid = pjob->ji_qs.ji_jobid; if (LOGLEVEL >= 3) { snprintf(log_buffer,sizeof(log_buffer), "%s: POLL_JOB %s - %d\n", __func__, jobid, event); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } local_socket = get_reply_stream(pjob); if (IS_VALID_STREAM(local_socket)) { if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { close(local_socket); return(IM_DONE); } else if ((ret = im_compose(local_chan,jobid,cookie,IM_ALL_OKAY,event,fromtask)) != DIS_SUCCESS) { DIS_tcp_close(local_chan); return(IM_DONE); } } else return(IM_DONE); if (pjob->ji_qs.ji_state == JOB_STATE_TRANSIT) { /* first poll, set job to running */ pjob->ji_qs.ji_state = JOB_STATE_RUNNING; pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); } /* recommend to mother superior that the job be killed */ if (pjob->ji_qs.ji_svrflags & (JOB_SVFLG_OVERLMT1 | JOB_SVFLG_OVERLMT2)) { should_kill_job = TRUE; } if ((ret = diswsi(local_chan, should_kill_job)) == DIS_SUCCESS) { /* get fresh resource usage */ mom_set_use(pjob); /* ** Send the information tallied for the job. */ if ((ret = diswul(local_chan, resc_used(pjob, "cput", gettime))) != DIS_SUCCESS) { } else if ((ret = diswul(local_chan, resc_used(pjob, "mem", getsize))) != DIS_SUCCESS) { } else if ((ret = diswul(local_chan, resc_used(pjob, "vmem", getsize))) != DIS_SUCCESS) { } else { DIS_tcp_wflush(local_chan); } } DIS_tcp_close(local_chan); return(IM_DONE); } /* END im_poll_job_as_sister() */ /* * im_abort_job * * Sender is (must be) mom superior commanding me to * abort a JOIN_JOB request. * */ int im_abort_job( job *pjob, /* I */ struct sockaddr_in *addr, /* I */ char *cookie, /* I */ tm_event_t event, /* I */ tm_task_id fromtask) /* I */ { char *jobid = pjob->ji_qs.ji_jobid; if (LOGLEVEL >= 2) { sprintf(log_buffer, "%s: received KILL/ABORT request for job %s from node %s", __func__, jobid, netaddr(addr)); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } mom_job_purge(pjob); return(IM_DONE); } /* END im_abort_job() */ /* * im_get_tid * I must be mom superior getting a request from a * sub-mom to get a TID. * */ int im_get_tid( job *pjob, /* I */ char *cookie, /* I */ tm_event_t event, /* I */ tm_task_id fromtask) /* I */ { char *jobid = pjob->ji_qs.ji_jobid; int ret; int local_socket; struct tcp_chan *local_chan = NULL; if (am_i_mother_superior(*pjob) == false) { log_err(-1, __func__, "got GET_TID and I'm not MS"); return(IM_FAILURE); } if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: GET_TID %s\n", __func__, jobid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } /* DJH 27 Feb 2002 */ if (IS_ADOPTED_TASK(pjob->ji_taskid)) { log_err(-1, __func__, "Ran into reserved task ids"); return(IM_FAILURE); } local_socket = get_reply_stream(pjob); if (IS_VALID_STREAM(local_socket)) { if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((ret = im_compose(local_chan, jobid, cookie, IM_ALL_OKAY, event, fromtask)) == DIS_SUCCESS) { ret = diswsi(local_chan, pjob->ji_taskid++); DIS_tcp_wflush(local_chan); } close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); } return(IM_DONE); } /* END im_get_tid() */ /* * Sender is one of the sisterhood saying she * got the job structure sent and she accepts it. * I'm mother superior. * * auxiliary info ( * none; * ) */ int handle_im_join_job_response( struct tcp_chan *chan, job *pjob, /* M */ struct sockaddr_in *addr) { int i; hnodent *np = NULL; eventent *ep = NULL; if (am_i_mother_superior(*pjob) == false) { log_err(-1, __func__, "got JOIN_JOB OKAY and I'm not MS"); return(IM_FAILURE); } /* This is an O(N) algorithm We should do a countdown instead */ for (i = 0;i < pjob->ji_numnodes;i++) { np = &pjob->ji_hosts[i]; if ((ep = (eventent *)GET_NEXT(np->hn_events)) != NULL) break; } /* END for (i) */ if (ep == NULL) { if (LOGLEVEL >= 6) { struct timeval tv; struct timeval *tv_attr; struct timeval result; struct timezone tz; if (gettimeofday(&tv, &tz) == 0) { tv_attr = &pjob->ji_wattr[JOB_ATR_total_runtime].at_val.at_timeval; timeval_subtract(&result, &tv, tv_attr); sprintf(log_buffer, "%s: total wire-up time for job %ld.%ld", __func__, result.tv_sec, result.tv_usec); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } } /* no events remaining, all moms have reported in, launch job locally */ if (LOGLEVEL >= 2) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "im_request: all sisters have reported in, launching job locally"); } exec_job_on_ms(pjob); } /* END if (ep == NULL) */ else { /* received a join job response, but not ready to launch */ if (LOGLEVEL >= 4) { sprintf(log_buffer, "%s:joinjob response received from node %s, (still waiting for %s)", __func__, netaddr(addr), np->hn_host); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } } return(IM_DONE); } /* END handle_im_join_job_response() */ /* * Sender is sending a response that a job * which needs to die has been given the ax. * I'm mother superior. * * auxiliary info ( * cput ulong; * mem ulong; * vmem ulong; * ) */ int handle_im_kill_job_response( struct tcp_chan *chan, job *pjob, hnodent *np, int event_com, int nodeidx) { int ret; int i; unsigned int momport = 0; char *jobid = pjob->ji_qs.ji_jobid; if (am_i_mother_superior(*pjob) == false) { log_err(-1, __func__, "got KILL_JOB OKAY and I'm not MS"); return(IM_FAILURE); } if (LOGLEVEL >= 2) { sprintf(log_buffer, "%s:KILL_JOB acknowledgement received", __func__); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } if (pjob->ji_resources != NULL) { pjob->ji_resources[nodeidx - 1].nr_cput = disrul(chan, &ret); if (ret == DIS_SUCCESS) { pjob->ji_resources[nodeidx - 1].nr_mem = disrul(chan, &ret); if (ret == DIS_SUCCESS) pjob->ji_resources[nodeidx - 1].nr_vmem = disrul(chan, &ret); } if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: %s FINAL from %d cpu %lu sec mem %lu kb vmem %ld kb\n", __func__, jobid, nodeidx, pjob->ji_resources[nodeidx - 1].nr_cput, pjob->ji_resources[nodeidx - 1].nr_mem, pjob->ji_resources[nodeidx - 1].nr_vmem); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } } /* END if (pjob_ji_resources != NULL) */ np->hn_sister = SISTER_KILLDONE; /* We are changing this node from SISTER_OKAY which was set in send_sisters() */ for (i = 1; i < pjob->ji_numnodes; i++) { /* if we get through this loop without finding a * hn_sister set to SISTER_OKAY then we know * all sisters have reported in */ if (pjob->ji_hosts[i].hn_sister == SISTER_OKAY) break; } if (i == pjob->ji_numnodes) { /* all dead */ if (LOGLEVEL >= 3) { snprintf(log_buffer,sizeof(log_buffer), "%s: ALL DONE, set EXITING job %s\n", __func__, jobid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); exiting_tasks = 1; } return(IM_DONE); } /* END handle_im_kill_job_response() */ /* * Sender is MOM responding to a "spawn_task" * request. * * auxiliary info ( * task id tm_task_id; * ) */ int handle_im_spawn_task_response( struct tcp_chan *chan, job *pjob, tm_task_id event_task, tm_event_t event) { int taskid; int ret; task *ptask; taskid = disrsi(chan, &ret); if (ret != DIS_SUCCESS) return(IM_FAILURE); if (LOGLEVEL >= 5) { sprintf(log_buffer, "%s: SPAWN_TASK %s OKAY task %d\n", __func__, pjob->ji_qs.ji_jobid, taskid); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } ptask = task_check(pjob, event_task); if (ptask != NULL) { tm_reply(ptask->ti_chan, TM_OKAY, event); diswsi(ptask->ti_chan, taskid); DIS_tcp_wflush(ptask->ti_chan); } return(IM_DONE); } /* END handle_im_spawn_task_response() */ /* * Sender is MOM with a good signal to report. * * auxiliary info ( * none; * ) */ int handle_im_signal_task_response( job *pjob, /* M */ tm_task_id event_task, /* I */ tm_event_t event) /* I */ { task *ptask; if (LOGLEVEL >= 5) { sprintf(log_buffer, "%s: SIGNAL_TASK %s OKAY %d\n", __func__, pjob->ji_qs.ji_jobid, event_task); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } ptask = task_check(pjob, event_task); if (ptask != NULL) { tm_reply(ptask->ti_chan, TM_OKAY, event); DIS_tcp_wflush(ptask->ti_chan); } return(IM_DONE); } /* END handle_im_signal_task_response() */ /* * Sender is MOM giving a list of tasks which she * has started for this job. * * auxiliary info ( * task id tm_task_id; * ... * task id tm_task_id; * ) */ int handle_im_get_tasks_response( struct tcp_chan *chan, job *pjob, /* M */ tm_task_id event_task, /* I */ tm_event_t event) /* I */ { task *ptask; int taskid; int ret; if (LOGLEVEL >= 5) { sprintf(log_buffer, "%s: GET_TASKS %s OKAY \n", __func__, pjob->ji_qs.ji_jobid); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } ptask = task_check(pjob, event_task); if (ptask == NULL) return(IM_DONE); tm_reply(ptask->ti_chan, TM_OKAY, event); /* read each task and write it */ for (;;) { /* DIS_rpp_reset(); */ taskid = disrsi(chan, &ret); if (ret != DIS_SUCCESS) { if ((ret == DIS_EOD) || (ret == DIS_EOF)) break; return(IM_FAILURE); } diswsi(ptask->ti_chan, taskid); } diswsi(ptask->ti_chan, TM_NULL_TASK); DIS_tcp_wflush(ptask->ti_chan); return(IM_DONE); } /* handle_im_get_tasks_response() */ /* * Sender is MOM with a death report. * * auxiliary info ( * exit value int; * ) */ int handle_im_obit_task_response( struct tcp_chan *chan, job *pjob, /* I */ tm_task_id event_task, /* I */ tm_event_t event) /* I */ { int exitval; int ret; char *jobid = pjob->ji_qs.ji_jobid; task *ptask; exitval = disrsi(chan, &ret); if (ret != DIS_SUCCESS) return(IM_FAILURE); if (LOGLEVEL >= 5) { sprintf(log_buffer, "%s: OBIT_TASK %s OKAY %d exit val %d\n", __func__, jobid, event_task, exitval); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } ptask = task_check(pjob, event_task); if (ptask != NULL) { if (is_ptask_corrupt(ptask->ti_chan)) return(IM_FAILURE); tm_reply(ptask->ti_chan, TM_OKAY, event); diswsi(ptask->ti_chan, exitval); DIS_tcp_wflush(ptask->ti_chan); } return(IM_DONE); } /* END handle_im_obit_task_response() */ /* * Sender is MOM with a named info to report. * * auxiliary info ( * info counted string; * ) */ int handle_im_get_info_response( struct tcp_chan *chan, job *pjob, /* I */ tm_task_id event_task, /* I */ tm_event_t event) /* I */ { char *info = NULL; char *jobid = pjob->ji_qs.ji_jobid; int ret; size_t len; task *ptask; info = disrcs(chan, &len, &ret); if (ret != DIS_SUCCESS) { if (info != NULL) free(info); return(IM_FAILURE); } if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: GET_INFO %s OKAY %d\n", __func__, jobid, event_task); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } ptask = task_check(pjob, event_task); if (ptask != NULL) { tm_reply(ptask->ti_chan, TM_OKAY, event); diswcs(ptask->ti_chan, info, len); DIS_tcp_wflush(ptask->ti_chan); } free(info); return(IM_DONE); } /* END handle_im_get_info_response() */ /* * Sender is MOM with a resource info to report. * * auxiliary info ( * info counted string; * ) */ int handle_im_get_resc_response( struct tcp_chan *chan, job *pjob, /* I */ tm_task_id event_task, /* I */ tm_event_t event) /* I */ { int ret; char *info = disrst(chan, &ret); task *ptask; if (ret != DIS_SUCCESS) { if (info != NULL) free(info); return(IM_FAILURE); } if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: GET_RESC %s OKAY %d\n", __func__, pjob->ji_qs.ji_jobid, event_task); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } ptask = task_check(pjob, event_task); if (ptask != NULL) { tm_reply(ptask->ti_chan, TM_OKAY, event); if ((ret = diswst(ptask->ti_chan, info)) == DIS_SUCCESS) { DIS_tcp_wflush(ptask->ti_chan); } } free(info); return(IM_DONE); } /* END handle_im_get_resc_response() */ /* * I must be Mother Superior for the job and * this is a reply with job resources to * tally up. * * @pre-cond: chan must be a valid tcp_chan pointer * @pre-cond: this node should be mother superior for pjob * @post-cond: the resources used by this sister will be overwritten with the new resources reported * @post-cond: if a non-zero exit status is reported by this sister it will be marked as the node * to report a failed exit status * @return: IM_FAILURE if this node isn't the mother superior or IM_DONE otherwise * * auxiliary info ( * recommendation int; * cput u_long; * mem u_long; * vmem u_long; * ) */ int handle_im_poll_job_response( struct tcp_chan *chan, job &pjob, /* I */ int nodeidx, /* I */ hnodent *np) /* I */ { int exitval; int ret; if (am_i_mother_superior(pjob) == false) { log_err(-1, __func__, "got POLL_JOB and I'm not MS"); return(IM_FAILURE); } exitval = disrsi(chan, &ret); if (ret == DIS_SUCCESS) { pjob.ji_resources[nodeidx - 1].nr_cput = disrul(chan, &ret); if (ret == DIS_SUCCESS) { pjob.ji_resources[nodeidx - 1].nr_mem = disrul(chan, &ret); if (ret == DIS_SUCCESS) pjob.ji_resources[nodeidx - 1].nr_vmem = disrul(chan, &ret); } } if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: POLL_JOB %s OKAY kill %d cpu=%lu mem=%lu vmem=%lu\n", __func__, pjob.ji_qs.ji_jobid, exitval, pjob.ji_resources[nodeidx - 1].nr_cput, pjob.ji_resources[nodeidx - 1].nr_mem, pjob.ji_resources[nodeidx - 1].nr_vmem); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob.ji_qs.ji_jobid,log_buffer); } if (exitval != 0) { if (LOGLEVEL >= 2) { sprintf(log_buffer, "non-zero exit status reported from node %s, aborting job", np->hn_host); log_record(PBSEVENT_ERROR,PBS_EVENTCLASS_JOB,pjob.ji_qs.ji_jobid,log_buffer); } pjob.ji_nodekill = np->hn_node; } return(IM_DONE); } /* END handle_im_poll_job_response() */ /* * Sender must be Mother Superior with a TID. * I will either do the spawn or forward the SPAWN * to the final destination. * * auxiliary info ( * task id tm_task_id; * ) */ int handle_im_get_tid_response( struct tcp_chan *chan, job *pjob, /* I */ char *cookie, /* I */ char **argv, /* M */ char **envp, /* M */ fwdevent *efwd) /* I */ { int taskid; int ret; int i; int local_socket; struct tcp_chan *local_chan = NULL; char *jobid = pjob->ji_qs.ji_jobid; hnodent *np; task *ptask; taskid = disrsi(chan, &ret); if (ret != DIS_SUCCESS) return(IM_FAILURE); /* Check to see if I need to forward the taskid to another MOM. */ if (LOGLEVEL >= 5) { sprintf(log_buffer, "%s: GET_TID %s OKAY task %d\n", __func__, jobid, taskid); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } if (pjob->ji_nodeid != efwd->fe_node) { np = find_node(pjob, -1, efwd->fe_node); if (np == NULL) return(IM_DONE); event_alloc(IM_SPAWN_TASK,np,efwd->fe_event,efwd->fe_taskid); local_socket = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr,sizeof(np->sock_addr)); if (local_socket < 0) return(IM_DONE); if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { close(local_socket); return IM_DONE; } ret = im_compose(local_chan,jobid,cookie,IM_SPAWN_TASK,efwd->fe_event,efwd->fe_taskid); if (ret == DIS_SUCCESS) { if ((ret = diswsi(local_chan, pjob->ji_nodeid)) == DIS_SUCCESS) { if ((ret = diswsi(local_chan, taskid)) == DIS_SUCCESS) { if ((ret = diswst(local_chan, pjob->ji_globid)) == DIS_SUCCESS) { for (i = 0;argv[i];i++) { ret = diswst(local_chan, argv[i]); if (ret != DIS_SUCCESS) break; } if (ret == DIS_SUCCESS) { if ((ret = diswst(local_chan, "")) == DIS_SUCCESS) { for (i = 0;envp[i];i++) { ret = diswst(local_chan, envp[i]); if (ret != DIS_SUCCESS) break; } if (ret == DIS_SUCCESS) DIS_tcp_wflush(local_chan); } } } } } } DIS_tcp_close(local_chan); if (ret != DIS_SUCCESS) return(IM_DONE); arrayfree(argv); arrayfree(envp); return(IM_DONE); } /* END if (pjob->ji_nodeid != efwd->fe_node) */ /* It's me, do the spawn */ ret = 0; if ((ptask = pbs_task_create(pjob, taskid)) != NULL) { strcpy(ptask->ti_qs.ti_parentjobid, jobid); ptask->ti_qs.ti_parentnode = efwd->fe_node; ptask->ti_qs.ti_parenttask = efwd->fe_taskid; if (LOGLEVEL >= 6) { log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,"saving task (IM_GET_TID)"); } if (task_save(ptask) != -1) ret = start_process(ptask, argv, envp); } arrayfree(argv); arrayfree(envp); taskid = ptask->ti_qs.ti_task; ptask = task_check(pjob, efwd->fe_taskid); if (ptask != NULL) { tm_reply(ptask->ti_chan, (ret == -1) ? TM_ERROR : TM_OKAY, efwd->fe_event); diswsi( ptask->ti_chan, (int)(ret == -1 ? TM_ESYSTEM : taskid)); DIS_tcp_wflush(ptask->ti_chan); } return(IM_DONE); } /* END handle_im_get_tid_response() */ int send_im_error_addr( int err, struct sockaddr_in *si, int mom_mgr_sock, char *jobid, char *cookie, tm_event_t event, tm_task_id fromtask) { int rc = PBSE_NONE; int cntr = 0; int sock = 0; struct tcp_chan *local_chan = NULL; /* char log_buf[LOCAL_LOG_BUF_SIZE]; */ si->sin_port = htons(mom_mgr_sock); for (cntr = 0; cntr < 5; cntr++) { if ((sock = tcp_connect_sockaddr((struct sockaddr *)si,sizeof(struct sockaddr))) < 0) { rc = PBSE_SOCKET_FAULT; if (sock == PERMANENT_SOCKET_FAIL) break; continue; } if ((local_chan = DIS_tcp_setup(sock)) == NULL) { rc = PBSE_MEM_MALLOC; } else if ((rc = im_compose(local_chan,jobid,cookie,IM_ERROR,event,fromtask)) != DIS_SUCCESS) { } else if ((rc = diswsi(local_chan,err)) != DIS_SUCCESS) { } else if ((rc = DIS_tcp_wflush(local_chan)) != DIS_SUCCESS) { } if (local_chan != NULL) DIS_tcp_cleanup(local_chan); close(sock); if (rc == DIS_SUCCESS) break; } /* if (rc != DIS_SUCCESS) { resend_momcomm *mc; if ((mc = calloc(1, sizeof(resend_momcomm))) != NULL) { mc->mc_type = COMPOSE_REPLY; mc->mc_struct = create_compose_reply_info(jobid, cookie, si, IM_ERROR, TM_NULL_EVENT, TM_NULL_TASK); if (mc->mc_struct == NULL) free(mc); else add_to_resend_things(mc); } snprintf(log_buf,sizeof(log_buf), "Could not send error on event %d for job %s", event, pjob->ji_qs.ji_jobid); log_err(-1,__func__,log_buf); } */ return rc; } /* * create_contact_list() * * Mother superior has received an abort from a sister, and it is going to notify all * other sisters that they should abort. This function adds all moms other than this * one to the contact list. * * @pre-cond: contacting_address must be a valid pointer to the address of the mom * that sent the abort * @post-cond: sister_list will be populated with the indices of the nodes that * should be contacted. */ void create_contact_list( job &pjob, std::set &sister_list, struct sockaddr_in *contacting_address) { unsigned long ipaddr_connect = 0; if (contacting_address != NULL) ipaddr_connect = ntohl(contacting_address->sin_addr.s_addr); for (int i = 1; i < pjob.ji_numnodes; i++) { hnodent *np; if (pjob.ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM) np = &pjob.ji_sisters[i]; else np = &pjob.ji_hosts[i]; unsigned long node_addr = ntohl(np->sock_addr.sin_addr.s_addr); if (node_addr != ipaddr_connect) sister_list.insert(i); } } /** * Input is coming from another MOM over a DIS rpp stream. * Read the stream to get a Inter-MOM request. * * request ( * jobid string * cookie string * command int * event int * task int * ) * * Replies now need to be sent in a new connection (for now) due to * threading issues. * * @see im_eof() - child - called if failure occurs */ void im_request( struct tcp_chan *chan, int version, /* I */ struct sockaddr_in *pSockAddr) /* I */ { int command = 0; int event_com = 0; int ret; char *jobid = NULL; char *cookie = NULL; char *oreo; job *pjob; task *ptask; hnodent *np = NULL; eventent *ep = NULL; u_long ipaddr; int i; int errcode; int nodeidx = 0; int check_for_event = 0; tm_node_id nodeid; tm_task_id fromtask; int event_task = 0; char **argv = NULL; char **envp = NULL; tm_event_t event = 0; fwdevent efwd; unsigned short sender_port = -1; unsigned int momport = 0; char log_buffer[LOCAL_LOG_BUF_SIZE+1]; int awaiting_replies = 0; int local_socket; struct tcp_chan *local_chan = NULL; struct passwd *check_pwd(); u_long gettime(resource *); u_long getsize(resource *); memset(&efwd, 0, sizeof(efwd)); if (version != IM_PROTOCOL_VER) { close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; sprintf(log_buffer, "protocol version %d unknown", version); log_err(-1, __func__, log_buffer); goto im_req_finish; } ipaddr = ntohl(pSockAddr->sin_addr.s_addr); /* NYI: check that this connection comes from a privileged port */ if (LOGLEVEL >= 3) { sprintf(log_buffer, "connect from %s", netaddr(pSockAddr)); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, log_buffer); } if (AVL_is_in_tree_no_port_compare(ipaddr, 0, okclients) == 0 ) { long max_len = 1024; long final_len = 0; char *tmp_line = (char *)calloc(1, max_len + 1); close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; if (tmp_line != NULL) ret = AVL_list(okclients, &tmp_line, &final_len, &max_len); else ret = -1; if (ret == 0) { snprintf(log_buffer, LOCAL_LOG_BUF_SIZE, "bad connect from %s - unauthorized (okclients: %s)", netaddr(pSockAddr), tmp_line); } else snprintf(log_buffer, LOCAL_LOG_BUF_SIZE, "bad connect from %s - unauthorized (could not get ok clients %d)", netaddr(pSockAddr), ret); if (tmp_line != NULL) free(tmp_line); log_err(-1, __func__, log_buffer); goto im_req_finish; } /* For sequential reads of values, the error condition is handled after * the data sequence is read. On failure to read a value, the * following of the values are not read */ sender_port = disrus(chan,&ret); if (ret == DIS_SUCCESS) jobid = disrst(chan, &ret); if (ret == DIS_SUCCESS) cookie = disrst(chan, &ret); if (ret == DIS_SUCCESS) command = disrsi(chan, &ret); if (ret == DIS_SUCCESS) event = disrsi(chan, &ret); if (ret == DIS_SUCCESS) fromtask = disrsi(chan, &ret); if (ret != DIS_SUCCESS) { close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; sprintf(log_buffer,"request for job %s failed - %s (command)", jobid, dis_emsg[ret]); log_err(-1,__func__,log_buffer); goto err; } if (LOGLEVEL >= 3) { sprintf(log_buffer, "%s:rec req '%s' (%d) for job %s from %s ev %d task %d cookie %s", __func__, PMOMCommand[MIN(command,IM_MAX)], command, jobid, netaddr(pSockAddr), event, fromtask, cookie); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } switch (command) { case IM_JOIN_JOB: { if (connection_from_ms(chan, NULL, pSockAddr) == true) { ret = im_join_job_as_sister(chan,jobid,pSockAddr,cookie,event,fromtask,command,FALSE); } else ret = IM_FAILURE; close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; if (ret == IM_FAILURE) goto err; else goto im_req_finish; break; /* END IM_JOIN_JOB */ } case IM_JOIN_JOB_RADIX: { if (connection_from_ms(chan, NULL, pSockAddr) == true) { ret = im_join_job_as_sister(chan,jobid,pSockAddr,cookie,event,fromtask,command,TRUE); } else ret = IM_FAILURE; close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; if (ret == IM_FAILURE) goto err; else goto im_req_finish; break; /* END IM_JOIN_JOB_RADIX */ } default: break; } /* END switch (command) */ /* ** Check if job already exists. */ pjob = mom_find_job(jobid); if (pjob == NULL) { /* Without this if, the systems will play ping pong with the request */ if (command == IM_ERROR) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (job does not exist locally). No response (no ping pong)", PMOMCommand[MIN(command,IM_MAX)], netaddr(pSockAddr), jobid); close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; goto err; } send_im_error_addr(PBSE_UNKJOBID,pSockAddr,sender_port,jobid,cookie,event,fromtask); sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' cookie '%s' event '%d' (job does not exist locally).", PMOMCommand[MIN(command,IM_MAX)], netaddr(pSockAddr), jobid, cookie, event); close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); goto im_req_finish; } /* check cookie */ if (!(pjob->ji_wattr[JOB_ATR_Cookie].at_flags & ATR_VFLAG_SET)) { sprintf(log_buffer, "ERROR: received request '%s' from %s for job '%s' (job has no cookie)", PMOMCommand[MIN(command,IM_MAX)], netaddr(pSockAddr), jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); send_im_error(PBSE_BADSTATE,1,pjob,cookie,event,fromtask); close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; goto err; } oreo = pjob->ji_wattr[JOB_ATR_Cookie].at_val.at_str; if (strcmp(oreo, cookie) != 0) { snprintf(log_buffer, sizeof(log_buffer), "Bad cookie: job id %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); /* multiple versions of the same job are out there, kill it */ exec_bail(pjob, JOB_EXEC_FAIL1); if (LOGLEVEL >= 0) { snprintf(log_buffer, LOCAL_LOG_BUF_SIZE, "ERROR: received request '%s' from %s for job '%s' (job has corrupt cookie - '%s' != '%s')", PMOMCommand[MIN(command,IM_MAX)], netaddr(pSockAddr), jobid, oreo, cookie); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } send_im_error(PBSE_BADSTATE, 1, pjob, cookie, event, fromtask); close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; goto im_req_finish; } /* Only run this if the command is NOT one of the following: * IM_ALL_OKAY, IM_ERROR or IM_RADIX_ALL_OK */ check_for_event = (command == IM_ALL_OKAY) || (command == IM_ERROR) || (command == IM_RADIX_ALL_OK); if (check_for_event == TRUE) { for (nodeidx = 0;nodeidx < pjob->ji_numnodes;nodeidx++) { if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM) np = &pjob->ji_sisters[nodeidx]; else np = &pjob->ji_hosts[nodeidx]; if ((ntohl(np->sock_addr.sin_addr.s_addr) == ipaddr) && (htons(sender_port) == np->sock_addr.sin_port)) break; } if (nodeidx == pjob->ji_numnodes) { close_conn(chan->sock, FALSE); sprintf(log_buffer, "stream %d not found", chan->sock); log_err(-1, __func__, log_buffer); goto err; } ep = (eventent *)GET_NEXT(np->hn_events); while (ep != NULL) { if ((ep->ee_event == event) && (ep->ee_taskid == fromtask)) break; ep = (eventent *)GET_NEXT(ep->ee_next); } if ((ep == NULL) && (command != IM_ERROR)) { close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; sprintf(log_buffer, "event %d taskid %ld not found", event, (long)fromtask); log_err(-1, __func__, log_buffer); goto err; } if (ep != NULL) { efwd = ep->ee_forward; event_com = ep->ee_command; event_task = ep->ee_taskid; argv = ep->ee_argv; envp = ep->ee_envp; delete_link(&ep->ee_next); free(ep); } } /* END if (check_for_event == TRUE) */ switch (command) { case IM_KILL_JOB: { if (connection_from_ms(chan, pjob, pSockAddr) == true) { im_kill_job_as_sister(pjob,event,momport,FALSE); } close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; break; } case IM_KILL_JOB_RADIX: { im_kill_job_as_sister(pjob,event,momport,TRUE); close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; goto im_req_finish; break; } case IM_SPAWN_TASK: { ret = im_spawn_task(chan,cookie,event,pSockAddr,fromtask,pjob); close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; if (ret == IM_FAILURE) { log_err(-1, __func__, "im_spawn_task error"); goto err; } break; } case IM_SIGNAL_TASK: { ret = im_signal_task(chan,pjob,cookie,event,fromtask); if (ret == IM_FAILURE) { log_err(-1, __func__, "im_signal_task error"); goto err; } break; } case IM_OBIT_TASK: { ret = im_obit_task(chan,pjob,cookie,event,fromtask); if (ret == IM_FAILURE) { log_err(-1, __func__, "im_obit_task error"); goto err; } break; } case IM_GET_INFO: { ret = im_get_info(chan,pjob,cookie,event,fromtask); if (ret == IM_FAILURE) { log_err(-1, __func__, "im_get_info error"); goto err; } break; } case IM_GET_RESC: { ret = im_get_resc_as_sister(chan,pjob,cookie,event,fromtask); if (ret == IM_FAILURE) { log_err(-1, __func__, "im_get_resc_as_sister error"); goto err; } break; } case IM_POLL_JOB: { /* check the validity of our connection */ if (connection_from_ms(chan, pjob, pSockAddr) == false) { close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; log_err(-1, __func__, "IM_POLL_JOB request came from a node other than mother superior"); goto err; } /* im_poll_job_as_sister will create a new connection and send an IM_ALL_OKAY message which will then be processed by the sending MOM (Mother superior for regular jobs or intermediate MOM/Mother Superior for Job Radix */ im_poll_job_as_sister(pjob,cookie,event,fromtask); break; } case IM_ABORT_JOB: { /* check if the abort is from mother superior or a sister node */ if (connection_from_ms(chan, pjob, pSockAddr) == false) { /* it is valid to receive an abort from a sister */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_JOB_ABORTED) == 0) { std::set sisters; create_contact_list(*pjob, sisters, pSockAddr); pjob->ji_qs.ji_svrflags |= JOB_SVFLG_JOB_ABORTED; exec_bail(pjob, JOB_EXEC_RETRY, &sisters); sprintf(log_buffer, "%s sent an abort. Killing job %s", netaddr(pSockAddr), pjob->ji_qs.ji_jobid); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); } } else im_abort_job(pjob,pSockAddr,cookie,event,fromtask); close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; break; } case IM_GET_TID: { if ((ret = im_get_tid(pjob,cookie,event,fromtask)) == IM_FAILURE) { close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; log_err(-1, __func__, "im_get_tid error"); goto err; } break; } case IM_ALL_OKAY: /* this is a response message */ { /* Sender is another MOM telling me that a request has completed successfully */ svr_conn[chan->sock].cn_stay_open = FALSE; switch (event_com) { case IM_JOIN_JOB: { ret = handle_im_join_job_response(chan, pjob, pSockAddr); close_conn(chan->sock, FALSE); chan->sock = -1; if (ret == IM_FAILURE) { log_err(-1, __func__, "handle_im_join_job_response error"); goto err; } break; } case IM_KILL_JOB: ret = handle_im_kill_job_response(chan, pjob, np, event_com, nodeidx); close_conn(chan->sock, FALSE); chan->sock = -1; if (ret == IM_FAILURE) { log_err(-1, __func__, "handle_im_kill_job_response error"); goto err; } break; case IM_SPAWN_TASK: ret = handle_im_spawn_task_response(chan,pjob,event_task,event); close_conn(chan->sock, FALSE); chan->sock = -1; if (ret == IM_FAILURE) { log_err(-1, __func__, "handle_im_spawn_task_response error"); goto err; } break; case IM_GET_TASKS: ret = handle_im_get_tasks_response(chan,pjob,event_task,event); close_conn(chan->sock, FALSE); chan->sock = -1; if (ret == IM_FAILURE) { log_err(-1, __func__, "handle_im_get_tasks_response error"); goto err; } break; case IM_SIGNAL_TASK: ret = handle_im_signal_task_response(pjob,event_task,event); close_conn(chan->sock, FALSE); chan->sock = -1; if (ret == IM_FAILURE) { log_err(-1, __func__, "handle_im_signal_task_response error"); goto err; } break; case IM_OBIT_TASK: ret = handle_im_obit_task_response(chan,pjob,event_task,event); close_conn(chan->sock, FALSE); chan->sock = -1; if (ret == IM_FAILURE) { log_err(-1, __func__, "handle_im_obit_task_response error"); goto err; } break; case IM_GET_INFO: ret = handle_im_get_info_response(chan,pjob,event_task,event); close_conn(chan->sock, FALSE); chan->sock = -1; if (ret == IM_FAILURE) { log_err(-1, __func__, "handle_im_get_info_response error"); goto err; } break; case IM_GET_RESC: ret = handle_im_get_resc_response(chan,pjob,event_task,event); close_conn(chan->sock, FALSE); chan->sock = -1; if (ret == IM_FAILURE) { log_err(-1, __func__, "handle_im_get_resc_response error"); goto err; } break; case IM_POLL_JOB: ret = handle_im_poll_job_response(chan, *pjob, nodeidx, np); if (ret == IM_FAILURE) { close_conn(chan->sock, FALSE); chan->sock = -1; log_err(-1, __func__, "handle_im_poll_job_response error"); goto err; } close_conn(chan->sock, FALSE); chan->sock = -1; break; case IM_GET_TID: if (connection_from_ms(chan, NULL, pSockAddr) == true) { ret = handle_im_get_tid_response(chan,pjob,cookie,argv,envp,&efwd); } else ret = IM_FAILURE; svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; switch (ret) { case IM_FINISHED: goto im_req_finish; break; case IM_DONE: break; case IM_FAILURE: { log_err(-1, __func__, "IM_FAILURE in IM_GET_TID handle response"); goto err; } } break; default: snprintf(log_buffer,LOCAL_LOG_BUF_SIZE, "%s: job %s received event_com %d event %d. (IM_ALL_OKAY) No handler!!!\n", __func__, jobid, command, event_com); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); break; } /* END switch (event_com) */ break; } case IM_RADIX_ALL_OK: { /* ** Sender is an intermediate MOM or leaf within the radix ** of the current MOM. The sending intermediate mom ** has received the job structure sent plus all of ** her children have also received the job structure ** and accepted it. ** ** This node can be the mother superior ** or an intermediate MOM. ** ** auxiliary info ( ** none; ** ) */ switch (event_com) { case IM_JOIN_JOB_RADIX: { close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; pjob = mom_find_job(jobid); if (pjob != NULL) { if (((pjob->ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM) == 0) && (am_i_mother_superior(*pjob) == false)) { log_err(-1, __func__, "got JOIN_JOB OKAY and I'm not an intermediate MOM or Mother Superior"); goto err; } if (pjob->ji_outstanding > 0) { pjob->ji_outstanding--; } if (pjob->ji_outstanding == 0) { if (LOGLEVEL >= 5) { struct timeval tv, *tv_attr, result; struct timezone tz; if (gettimeofday(&tv, &tz) == 0) { tv_attr = &pjob->ji_wattr[JOB_ATR_total_runtime].at_val.at_timeval; timeval_subtract(&result, &tv, tv_attr); sprintf(log_buffer, "%s: job_radix total wire-up time for job %ld.%ld", __func__, result.tv_sec, result.tv_usec); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } } /* All sisters in our job radix have reported in */ if (pjob->ji_im_nodeid == 1) { sprintf(log_buffer, "%s: all sisters for intermediate mom %s reported in", __func__, pjob->ji_sisters[0].hn_host); } else { sprintf(log_buffer, "%s: all sisters for Mother Superior %s reported in", __func__, pjob->ji_hosts[0].hn_host); } if (LOGLEVEL >= 2) { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } /* no events remaining, all moms have reported in, * launch job report back to parent MOM unless I am * Mother superior and then execute job */ if (pjob->ji_im_nodeid == 1) { sprintf(log_buffer, "%s:all sisters for intermediate mom %s reported in", __func__, pjob->ji_sisters[0].hn_host); if (LOGLEVEL >= 2) { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } /* 0 is always the intermediate mom that called us */ np = &pjob->ji_sisters[0]; log_buffer[0] = 0; /* at this point stream is the connection between * the intermediate mom and her sister. we need to * finish this stream and then we are going to either * open a new connection to the mom that called us or * reuse an existing connection */ /*rpp_eom(stream);*/ local_socket = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr,sizeof(np->sock_addr)); if (IS_VALID_STREAM(local_socket) == FALSE) { pjob->ji_nodekill = pjob->ji_nodeid; if (log_buffer[0] != '\0') { sprintf(log_buffer, "tcp_connect_sockaddr failed on %s - job id %s", np->hn_host, pjob->ji_qs.ji_jobid); } log_err(errno, __func__, log_buffer); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); exec_bail(pjob, JOB_EXEC_FAIL1); } if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((ret = im_compose(local_chan, jobid, cookie, IM_RADIX_ALL_OK, pjob->ji_intermediate_join_event, TM_NULL_TASK)) != DIS_SUCCESS) { } else DIS_tcp_wflush(local_chan); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); close(local_socket); /* We need to open our intermediate demux here */ fork_demux(pjob); } else { /* I am Mother Superior. Start job execution */ if (LOGLEVEL >= 2) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "im_request:all sisters have reported in, launching job locally"); } exec_job_on_ms(pjob); } } /* SUCCESS: MOM returns */ /* END if (pjob->ji_outstanding == 0) */ else { if (LOGLEVEL >= 4) { sprintf(log_buffer, "%s:joinjob response received from node %s", __func__, netaddr(pSockAddr)); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } } } else { if (LOGLEVEL >= 0) { snprintf(log_buffer,LOCAL_LOG_BUF_SIZE, "ERROR: received request '%s' from %s for job '%s' (job does not exist locally):IM_RADIX_ALL_OK", PMOMCommand[MIN(command,IM_MAX)], netaddr(pSockAddr), jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } } break; } case IM_KILL_JOB_RADIX: { /* ** Sender is sending a response that a job ** which needs to die has been given the ax. ** I'm mother superior. ** ** auxiliary info ( ** cput ulong; ** mem ulong; ** vmem ulong; ** ) */ long cput; long mem; long vmem; if (am_i_mother_superior(*pjob) == true) { if (LOGLEVEL >= 2) { sprintf(log_buffer, "KILL_JOB_RADIX acknowledgement received"); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } cput = disrul(chan, &ret); if (ret == DIS_SUCCESS) mem = disrul(chan, &ret); if (ret == DIS_SUCCESS) vmem = disrul(chan, &ret); if (ret == DIS_SUCCESS) nodeid = disrsi(chan, &ret); if (ret != DIS_SUCCESS) { close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; log_err(-1, __func__, "Count not read cput||mem||vmem||nodeid"); goto err; } np = &pjob->ji_hosts[nodeid]; if (pjob->ji_resources != NULL) { pjob->ji_resources[nodeid - 1].nr_cput = cput; pjob->ji_resources[nodeid - 1].nr_mem = mem; pjob->ji_resources[nodeid - 1].nr_vmem = vmem; if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: %s FINAL from %d cpu %lu sec mem %lu kb vmem %ld kb\n", __func__, jobid, nodeid, pjob->ji_resources[nodeid - 1].nr_cput, pjob->ji_resources[nodeid - 1].nr_mem, pjob->ji_resources[nodeid - 1].nr_vmem); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } } /* END if (pjob_ji_resources != NULL) */ /* don't close stream in case other jobs use it */ np->hn_sister = SISTER_KILLDONE; for (i = 1; i < pjob->ji_radix + 1;i++) { if (pjob->ji_hosts[i].hn_sister == SISTER_OKAY) break; } if (i == pjob->ji_radix + 1 ) { /* all dead */ if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: ALL DONE, set EXITING job %s\n", __func__, jobid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); exiting_tasks = 1; } } else if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_INTERMEDIATE_MOM) { if (LOGLEVEL >= 2) { sprintf(log_buffer, "KILL_JOB_RADIX acknowledgement received"); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } cput = disrul(chan, &ret); if (ret == DIS_SUCCESS) mem = disrul(chan, &ret); if (ret == DIS_SUCCESS) vmem = disrul(chan, &ret); if (ret == DIS_SUCCESS) nodeid = disrsi(chan, &ret); if (ret != DIS_SUCCESS) { close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; log_err(-1, __func__, "Count not read cput||mem||vmem||nodeid"); goto err; } np = &pjob->ji_sisters[nodeid+1]; /* yes this is klugey but the sisters are off by one on the index */ if (pjob->ji_resources != NULL) { pjob->ji_resources[nodeid - 1].nr_cput = cput; pjob->ji_resources[nodeid - 1].nr_mem = mem; pjob->ji_resources[nodeid - 1].nr_vmem = vmem; if (LOGLEVEL >= 7) { snprintf(log_buffer,LOCAL_LOG_BUF_SIZE, "%s: %s FINAL from %d cpu %lu sec mem %lu kb vmem %ld kb\n", __func__, jobid, nodeid, pjob->ji_resources[nodeid - 1].nr_cput, pjob->ji_resources[nodeid - 1].nr_mem, pjob->ji_resources[nodeid - 1].nr_vmem); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } } /* END if (pjob_ji_resources != NULL) */ /* don't close stream in case other jobs use it */ np->hn_sister = SISTER_KILLDONE; if (pjob->ji_outstanding > 0) { pjob->ji_outstanding--; } if (pjob->ji_outstanding == 0) { /* all dead */ if (LOGLEVEL >= 7) { snprintf(log_buffer,LOCAL_LOG_BUF_SIZE, "%s: ALL DONE, set EXITING job %s\n", __func__, jobid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); exiting_tasks = 1; } } else { close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; log_err(-1, __func__, "KILL_JOB_RADIX OK received on a leaf node"); goto err; } close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; break; } default: snprintf(log_buffer,LOCAL_LOG_BUF_SIZE, "%s: job %s received event_com %d event %d. (IM_RADIX_ALL_OK) No handler!!!\n", __func__, jobid, command, event_com); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); break; } break; } case IM_ERROR: /* this is a REPLY */ { /* ** Sender is responding to a request with an error code. ** ** auxiliary info ( ** error value int; ** ) */ errcode = disrsi(chan, &ret); snprintf(log_buffer, LOCAL_LOG_BUF_SIZE, "Error response received from client %s (%d) jobid %s", netaddr(pSockAddr), sender_port, jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); if ((errcode == PBSE_UNKJOBID) && (event_com == TM_NULL_EVENT)) event_com = IM_KILL_JOB; if (ret != DIS_SUCCESS) { close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; log_err(-1, __func__, "Could not read error code"); goto err; } /* What's the purpose of this? *MUTSU* */ if (event_com == IM_GET_TID) { if (connection_from_ms(chan, pjob, pSockAddr) == false) { close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; log_err(-1, __func__, "IM_GET_TID close_conn"); goto err; } } close_conn(chan->sock, FALSE); svr_conn[chan->sock].cn_stay_open = FALSE; chan->sock = -1; switch (event_com) { case IM_JOIN_JOB: /* ** A MOM has rejected a request to join a job. ** We need to send a ABORT_JOB to all the sisterhood ** and fail the job start to server. ** I'm mother superior. */ if (am_i_mother_superior(*pjob) == false) { log_err(-1, __func__, "JOIN_JOB ERROR and I'm not MS"); goto err; } job_start_error(pjob, errcode, netaddr(pSockAddr)); break; case IM_ABORT_JOB: case IM_KILL_JOB: /* ** Job cleanup failed on a sister. ** Wait for everybody to respond then finish. ** I'm mother superior. */ if (am_i_mother_superior(*pjob) == false) { log_err(-1, __func__, "KILL_JOB ERROR and I'm not MS"); goto err; } snprintf(log_buffer, sizeof(log_buffer), "KILL/ABORT (job %s) request returned error %d from %s (%d total nodes)\n", jobid, errcode, netaddr(pSockAddr), pjob->ji_numnodes); log_err(errcode, __func__, log_buffer); np->hn_sister = errcode ? errcode : SISTER_KILLDONE; for (i = 1;i < pjob->ji_numnodes;i++) { if (pjob->ji_hosts[i].hn_sister == SISTER_OKAY) { snprintf(log_buffer, sizeof(log_buffer), "KILL (job %s) still awaiting response from at least %s(%s)", jobid, pjob->ji_hosts[i].hn_host, netaddr(&pjob->ji_hosts[i].sock_addr)); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); awaiting_replies += 1; break; } } if (awaiting_replies == 0) { snprintf(log_buffer, LOCAL_LOG_BUF_SIZE, "Job %s has received replies from all moms, setting substate to exiting", jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); /* all dead */ pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); exiting_tasks = 1; } break; case IM_SPAWN_TASK: case IM_GET_TASKS: case IM_SIGNAL_TASK: case IM_OBIT_TASK: case IM_GET_INFO: /* A user attempt failed, inform process. */ if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: REQUEST %d %s returned ERROR %d\n", __func__, event_com, jobid, errcode); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } ptask = task_check(pjob, event_task); if (ptask == NULL) break; tm_reply(ptask->ti_chan, TM_ERROR, event); diswsi(ptask->ti_chan, errcode); DIS_tcp_wflush(ptask->ti_chan); break; case IM_POLL_JOB: /* ** I must be Mother Superior for the job and ** this is an error reply to a poll request. */ if (am_i_mother_superior(*pjob) == false) { log_err(-1, __func__, "POLL_JOB ERROR and I'm not MS"); goto err; } if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: POLL_JOB %s returned ERROR %d\n", __func__, jobid, errcode); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } np->hn_sister = errcode ? errcode : SISTER_BADPOLL; break; case IM_GET_TID: /* ** Sender must be Mother Superior failing to ** send a TID. ** Send a fail to the task which called SPAWN. */ if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: GET_TID %s returned ERROR %d\n", __func__, jobid, errcode); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } arrayfree(argv); arrayfree(envp); ptask = task_check(pjob, efwd.fe_taskid); if (ptask == NULL) break; /* When is this connection created? *MUTSU* */ tm_reply(ptask->ti_chan, TM_ERROR, efwd.fe_event); diswsi(ptask->ti_chan, errcode); DIS_tcp_wflush(ptask->ti_chan); /* Does it need to be closed here? *MUTSU* */ break; default: snprintf(log_buffer,sizeof(log_buffer), "%s: job %s received event_com %d event %d. (IM_ERROR) No handler!!!\n", __func__, jobid, command, event_com); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); goto err; break; } /* END switch(event_com) */ break; } default: { sprintf(log_buffer, "unknown command %d sent", command); log_err(-1, __func__, log_buffer); goto err; break; } } /* END switch (Command) */ goto im_req_finish; err: /* ** We come here if we got a DIS read error or a protocol ** element is missing. The likely case is the remote ** host has gone down. */ snprintf(log_buffer, LOCAL_LOG_BUF_SIZE, "error processing command %d event_com %d for job %s from %s:(%d)", command, event_com, jobid ? jobid : "unknown", netaddr(pSockAddr), sender_port); log_err(-1, __func__, log_buffer); im_req_finish: if (jobid != NULL) free(jobid); if (cookie != NULL) free(cookie); return; } /* END im_request() */ void tm_eof( int fd) { job *pjob; task *ptask; /* ** Search though all the jobs looking for this fd. */ for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { if (ptask->ti_chan == NULL) continue; if (ptask->ti_chan->sock == fd) { if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "matching task located, marking interface closed"); } return; } } } if (LOGLEVEL >= 1) { log_record(PBSEVENT_JOB, PBS_EVENTCLASS_SERVER, __func__, "no matching task found"); } return; } /* END tm_eof() */ /* * tm_request_init * * A request to initialize. Must be the first thing we see from a * task to do psched requests */ void tm_request_init( job *pjob, task *ptask, int *ret, int event, int prev_error) { int vnodenum; int i; vnodent *pnode; if (prev_error == PBSE_NONE) { if ((*ret = tm_reply(ptask->ti_chan, TM_OKAY, event)) == DIS_SUCCESS) { vnodenum = pjob->ji_numvnod; if ((*ret = diswui(ptask->ti_chan, vnodenum)) == DIS_SUCCESS) { pnode = pjob->ji_vnods; for (i = 0;i < vnodenum;i++) { *ret = diswsi(ptask->ti_chan, pnode[i].vn_node); if (*ret != DIS_SUCCESS) break; } if (*ret == DIS_SUCCESS) { if ((*ret = diswst(ptask->ti_chan, ptask->ti_qs.ti_parentjobid)) == DIS_SUCCESS) { if ((*ret = diswsi(ptask->ti_chan, ptask->ti_qs.ti_parentnode)) == DIS_SUCCESS) { if ((*ret = diswsi(ptask->ti_chan, ptask->ti_qs.ti_parenttask)) == DIS_SUCCESS) { ptask->ti_flags |= TI_FLAGS_INIT; } } } } } } } /* done */ } /* END tm_request_init() */ /* * Post named info for a task. * * read ( * name string; * info counted string; * ) */ int tm_postinfo( char *name, /* O */ char *info, /* O */ char *jobid, /* I */ int fromtask, /* I */ int prev_error, /* I */ int event, /* I */ int *ret, /* O */ task *ptask, /* M */ size_t *len) /* I */ { name = disrst(ptask->ti_chan, ret); if (*ret == DIS_SUCCESS) info = disrcs(ptask->ti_chan, len, ret); if (*ret != DIS_SUCCESS) { if (name != NULL) free(name); return(TM_ERROR); } if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: POSTINFO %s task %d sent info %s:%s(%d)\n", __func__, jobid, fromtask, name, info, (int)*len); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } if (prev_error) { free(name); free(info); return(TM_DONE); } task_saveinfo(ptask, name, info, *len); *ret = tm_reply(ptask->ti_chan, TM_OKAY, event); return(TM_DONE); } /* END tm_postinfo() */ /* * tm_spawn_request * * Spawn a task on the requested node. * * read ( * argc int; * arg 0 string; * ... * arg argc-1 string; * env 0 string; * ... * env m string; * ) */ int tm_spawn_request( struct tcp_chan *chan, job *pjob, /* I */ int prev_error, /* I */ int event, /* I */ char *cookie, /* I */ int *reply_ptr, /* O */ int *ret, /* O */ tm_task_id fromtask, /* I */ hnodent *phost, /* M */ int nodeid) /* I */ { char **argv = NULL; char **envp = NULL; char *jobid = pjob->ji_qs.ji_jobid; int local_socket; struct tcp_chan *local_chan = NULL; int numele; int i; unsigned int momport = 0; vnodent *pnode; tm_task_id taskid; task *ptask; eventent *ep; if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: SPAWN %s on node %d\n", __func__, jobid, nodeid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } numele = disrui(chan, ret); if (*ret != DIS_SUCCESS) return(TM_DONE); argv = (char **)calloc(numele + 1, sizeof(char *)); if (argv == NULL) { log_err(ENOMEM, __func__, "No memory available, cannot calloc!"); return(TM_ERROR); } for (i = 0;i < numele;i++) { argv[i] = disrst(chan, ret); if (*ret != DIS_SUCCESS) { arrayfree(argv); return(TM_DONE); } } argv[i] = NULL; numele = 4; envp = (char **)calloc(numele, sizeof(char *)); if (envp == NULL) { log_err(ENOMEM, __func__, "No memory available, cannot calloc!"); arrayfree(argv); return(TM_ERROR); } for (i = 0;;i++) { char *env; env = disrst(chan, ret); if ((*ret != DIS_SUCCESS) && (*ret != DIS_EOD) && (*ret != DIS_EOF)) { arrayfree(argv); arrayfree(envp); return(TM_DONE); } if (env == NULL) break; if (*env == '\0') { free(env); break; } /* * Need to remember extra slot for NULL * at the end. Thanks to Pete Wyckoff * for finding this. */ if (i == numele - 2) { numele *= 2; envp = (char **)realloc(envp, numele * sizeof(char *)); assert(envp); } envp[i] = env; envp[i+1] = NULL; } /* tack on PBS_VNODENUM */ envp[i] = (char *)calloc(MAXLINE, sizeof(char)); if (envp[i] == NULL) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "cannot alloc env memory)"); arrayfree(argv); arrayfree(envp); return(TM_DONE); } sprintf(envp[i], "PBS_VNODENUM=%d", nodeid); i++; envp[i] = NULL; *ret = DIS_SUCCESS; if (prev_error) { arrayfree(argv); arrayfree(envp); return(TM_DONE); } /* * If I'm Mother Suerior and the spawn happens on * me, just do it. */ #ifndef NUMA_SUPPORT if ((pjob->ji_nodeid == 0) && (pjob->ji_nodeid == nodeid)) #endif /* ndef NUMA_SUPPORT */ { /* XXX */ i = TM_ERROR; ptask = pbs_task_create(pjob, TM_NULL_TASK); if (ptask != NULL) { strcpy(ptask->ti_qs.ti_parentjobid, jobid); ptask->ti_qs.ti_parentnode = pjob->ji_nodeid; ptask->ti_qs.ti_parenttask = fromtask; if (LOGLEVEL >= 6) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "saving task (TM_SPAWN)"); } if (task_save(ptask) != -1) { *ret = start_process(ptask, argv, envp); if (*ret != -1) i = TM_OKAY; } } arrayfree(argv); arrayfree(envp); *ret = tm_reply(chan, i, event); if (*ret != DIS_SUCCESS) return(TM_DONE); *ret = diswsi(chan, ((i == TM_ERROR) ? TM_ESYSTEM : ptask->ti_qs.ti_task)); return(TM_DONE); } /* END if ((pjob->ji_nodeid == 0) && (pjob->ji_nodeid == nodeid)) */ /* * If I'm a regular mom and the destination is not * MS, just send a GET_TID to MS. */ #ifndef NUMA_SUPPORT else if ((pjob->ji_nodeid != 0) && (nodeid != pjob->ji_vnods[0].vn_node)) #endif /* ndef NUMA_SUPPORT */ { /* XXX */ pnode = &pjob->ji_vnods[0]; phost = pnode->vn_host; ep = event_alloc(IM_GET_TID,pnode->vn_host,TM_NULL_EVENT,TM_NULL_TASK); ep->ee_argv = argv; ep->ee_envp = envp; ep->ee_forward.fe_node = nodeid; ep->ee_forward.fe_event = event; ep->ee_forward.fe_taskid = fromtask; local_socket = tcp_connect_sockaddr((struct sockaddr *)&phost->sock_addr,sizeof(phost->sock_addr)); if (local_socket < 0) return(TM_DONE); else if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((*ret = im_compose(local_chan,jobid,cookie,IM_GET_TID,ep->ee_event,TM_NULL_TASK)) != DIS_SUCCESS) { } else { DIS_tcp_wflush(local_chan); *reply_ptr = FALSE; } close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); return(TM_DONE); } /* END else if ((pjob->ji_nodeid != 0) && ...) */ /* * If I am MS, generate the TID now, otherwise * we are sending to MS who will do it when she gets * the SPAWN. */ taskid = (pjob->ji_nodeid == 0) ? pjob->ji_taskid++ : TM_NULL_TASK; event_alloc(IM_SPAWN_TASK, phost, event, fromtask); if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_FULL, momport); local_socket = tcp_connect_sockaddr((struct sockaddr *)&phost->sock_addr,sizeof(phost->sock_addr)); if (IS_VALID_STREAM(local_socket) == FALSE) { arrayfree(argv); arrayfree(envp); return(TM_DONE); } if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((*ret = im_compose(local_chan,jobid,cookie,IM_SPAWN_TASK,event,fromtask)) != DIS_SUCCESS) { } else { if (*ret == DIS_SUCCESS) { if ((*ret = diswui(local_chan, pjob->ji_nodeid)) == DIS_SUCCESS) { if ((*ret = diswui(local_chan, taskid)) == DIS_SUCCESS) { if ((*ret = diswst(local_chan, pjob->ji_globid)) == DIS_SUCCESS) { for (i = 0;argv[i];i++) { *ret = diswst(local_chan, argv[i]); if (*ret != DIS_SUCCESS) break; } if (*ret == DIS_SUCCESS) { if ((*ret = diswst(local_chan, "")) == DIS_SUCCESS) { for (i = 0;envp[i];i++) { *ret = diswst(local_chan, envp[i]); if (*ret != DIS_SUCCESS) break; } if (*ret == DIS_SUCCESS) *ret = DIS_tcp_wflush(local_chan); *reply_ptr = FALSE; } } } } } } } if (*ret != DIS_SUCCESS) { snprintf(log_buffer,sizeof(log_buffer), "Unable to send IM_SPAWN_TASK request to node %s for job %s", phost->hn_host, pjob->ji_qs.ji_jobid); log_err(-1, __func__, log_buffer); /* NYI: shouldn't we kill the job here instead of letting it run forever?? */ } close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); if (argv != NULL) arrayfree(argv); if (envp != NULL) arrayfree(envp); return(TM_DONE); } /* END tm_spawn_request() */ /* * tm_tasks_request * * A request to read the list of tasks that a * particular node has charge of. */ int tm_tasks_request( struct tcp_chan *chan, job *pjob, /* I */ int prev_error, /* I */ int event, /* I */ char *cookie, /* I */ int *reply_ptr, /* O */ int *ret, /* O */ tm_task_id fromtask, /* I */ hnodent *phost, /* M */ int nodeid) /* I */ { char *jobid = pjob->ji_qs.ji_jobid; task *ptask; #ifndef NUMA_SUPPORT int local_socket; struct tcp_chan *local_chan = NULL; #endif if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: TASKS %s on node %d\n", __func__, jobid, nodeid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } if (prev_error) return(TM_DONE); #ifndef NUMA_SUPPORT /* for numa, this is always the correct mom */ if (pjob->ji_nodeid != nodeid) { /* not me */ event_alloc(IM_GET_TASKS, phost, event, fromtask); local_socket = tcp_connect_sockaddr((struct sockaddr *)&phost->sock_addr,sizeof(phost->sock_addr)); if (IS_VALID_STREAM(local_socket) == FALSE) return(TM_DONE); if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((*ret = im_compose(local_chan,jobid,cookie,IM_GET_TASKS,event,fromtask)) != DIS_SUCCESS) { } else if ((*ret = diswui(local_chan, pjob->ji_nodeid)) != DIS_SUCCESS) { } else if ((*ret = DIS_tcp_wflush(local_chan)) == DIS_SUCCESS) *reply_ptr = FALSE; close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); return(TM_DONE); } /* END if (pjob->ji_nodeid != nodeid) */ #endif /* ndef NUMA_SUPPORT */ *ret = tm_reply(chan, TM_OKAY, event); if (*ret != DIS_SUCCESS) return(TM_DONE); for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { *ret = diswui(chan, ptask->ti_qs.ti_task); if (*ret != DIS_SUCCESS) return(TM_DONE); } *ret = diswui(chan, TM_NULL_TASK); return(TM_DONE); } /* END tm_tasks_requests() */ /* * tm_signal_request * * Send a signal to the specified task. * * read ( * to task int * signal int * ) */ int tm_signal_request( struct tcp_chan *chan, job *pjob, /* I */ int prev_error, /* I */ int event, /* I */ char *cookie, /* I */ tm_task_id fromtask, /* I */ int *ret, /* O */ int *reply_ptr, /* O */ hnodent *phost, /* M */ int nodeid) /* I */ { int taskid; int signum; char *jobid = pjob->ji_qs.ji_jobid; task *ptask; #ifndef NUMA_SUPPORT int local_socket; struct tcp_chan *local_chan = NULL; #endif taskid = disrui(chan, ret); if (*ret == DIS_SUCCESS) { signum = disrui(chan, ret); } if (*ret != DIS_SUCCESS) return(TM_ERROR); if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: SIGNAL %s on node %d task %d sig %d\n", __func__, jobid, nodeid, taskid, signum); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } if (prev_error) return(TM_DONE); #ifndef NUMA_SUPPORT if (pjob->ji_nodeid != nodeid) { /* not me XXX */ event_alloc(IM_SIGNAL_TASK, phost, event, fromtask); local_socket = tcp_connect_sockaddr((struct sockaddr *)&phost->sock_addr,sizeof(phost->sock_addr)); if (IS_VALID_STREAM(local_socket) == FALSE) return(TM_DONE); if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((*ret = im_compose(local_chan, jobid, cookie, IM_SIGNAL_TASK, event, fromtask)) == DIS_SUCCESS) { if ((*ret = diswui(local_chan, pjob->ji_nodeid)) == DIS_SUCCESS) { if ((*ret = diswsi(local_chan, taskid)) == DIS_SUCCESS) { if ((*ret = diswsi(local_chan, signum)) == DIS_SUCCESS) { DIS_tcp_wflush(local_chan); *reply_ptr = FALSE; } } } } close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); return(TM_DONE); } /* END if (pjob->ji_nodeid != nodeid) */ #endif /* ndef NUMA_SUPPORT */ /* Task should be here... look for it. */ if ((ptask = task_find(pjob, taskid)) == NULL) { *ret = tm_reply(chan, TM_ERROR, event); if (*ret == DIS_SUCCESS) *ret = diswsi(chan, TM_ENOTFOUND); return(TM_DONE); } if (LOGLEVEL >= 3) { sprintf(log_buffer, "%s: TM_SIGNAL %s from node %d task %d signal %d", __func__, jobid, nodeid, taskid, signum); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } kill_task(ptask, signum, 0); *ret = tm_reply(chan, TM_OKAY, event); return(TM_DONE); } /* END tm_signal_request() */ /* * Register an obit request for the specified task. * * read ( * task to watch int * ) */ int tm_obit_request( struct tcp_chan *chan, job *pjob, /* I */ int prev_error, /* I */ int event, /* I */ char *cookie, /* I */ int *reply_ptr, /* O */ int *ret, /* O */ tm_task_id fromtask, /* I */ hnodent *phost, /* M */ int nodeid) /* I */ { int taskid; char *jobid = pjob->ji_qs.ji_jobid; task *ptask; #ifndef NUMA_SUPPORT int local_socket; struct tcp_chan *local_chan = NULL; #endif taskid = disrui(chan, ret); if (*ret != DIS_SUCCESS) return(TM_ERROR); if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: OBIT %s on node %d task %d\n", __func__, jobid, nodeid, taskid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } if (prev_error) return(TM_DONE); #ifndef NUMA_SUPPORT if (pjob->ji_nodeid != nodeid) { /* not me */ event_alloc(IM_OBIT_TASK, phost, event, fromtask); local_socket = tcp_connect_sockaddr((struct sockaddr *)&phost->sock_addr,sizeof(phost->sock_addr)); if (IS_VALID_STREAM(local_socket) == FALSE) return(TM_DONE); if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((*ret = im_compose(local_chan,jobid,cookie,IM_OBIT_TASK,event,fromtask)) == DIS_SUCCESS) { if ((*ret = diswui(local_chan, pjob->ji_nodeid)) == DIS_SUCCESS) { if ((*ret = diswsi(local_chan, taskid)) == DIS_SUCCESS) { DIS_tcp_wflush(local_chan); *reply_ptr = FALSE; } } } else { snprintf(log_buffer,sizeof(log_buffer), "Could not pass along obit for job %s to host %s", pjob->ji_qs.ji_jobid, phost->hn_host); log_err(-1, __func__, log_buffer); } close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); return(TM_DONE); } #endif /* ndef NUMA_SUPPORT */ /* Task should be here... look for it. */ if ((ptask = task_find(pjob, taskid)) == NULL) { *ret = tm_reply(chan, TM_ERROR, event); if (*ret == DIS_SUCCESS) *ret = diswsi(chan, TM_ENOTFOUND); return(TM_DONE); } if (ptask->ti_qs.ti_status >= TI_STATE_EXITED) { *ret = tm_reply(chan, TM_OKAY, event); if (*ret != DIS_SUCCESS) return(TM_DONE); *ret = diswsi(chan, ptask->ti_qs.ti_exitstat); } else { obitent *op = (obitent *)calloc(1, sizeof(obitent)); if (op == NULL) { log_err(ENOMEM, __func__, "No memory! Cannot calloc!"); return(TM_ERROR); } CLEAR_LINK(op->oe_next); append_link(&ptask->ti_obits, &op->oe_next, op); op->oe_info.fe_node = nodeid; op->oe_info.fe_event = event; op->oe_info.fe_taskid = fromtask; *reply_ptr = FALSE; } return(TM_DONE); } /* END tm_obit_request() */ /* * tm_getinfo_request * * Get named info for a specified task. * * read ( * task int * name string * ) */ int tm_getinfo_request( struct tcp_chan *chan, job *pjob, /* I */ int prev_error, /* I */ int event, /* I */ char *cookie, /* I */ int *reply_ptr, /* O */ int *ret, /* O */ tm_task_id fromtask, /* I */ hnodent *phost, /* M */ int nodeid) /* I */ { int taskid; char *jobid = pjob->ji_qs.ji_jobid; char *name; task *ptask; infoent *ip; #ifndef NUMA_SUPPORT int local_socket; struct tcp_chan *local_chan = NULL; #endif taskid = disrui(chan, ret); if (*ret == DIS_SUCCESS) { name = disrst(chan, ret); } if (*ret != DIS_SUCCESS) return(TM_ERROR); if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: GETINFO %s from node %d task %d name %s\n", __func__, jobid, nodeid, taskid, name); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } if (prev_error) { free(name); return(TM_DONE); } #ifndef NUMA_SUPPORT if (pjob->ji_nodeid != nodeid) { /* not me */ event_alloc(IM_GET_INFO,phost,event,fromtask); local_socket = tcp_connect_sockaddr((struct sockaddr *)&phost->sock_addr,sizeof(phost->sock_addr)); if (IS_VALID_STREAM(local_socket) == FALSE) { free(name); return(TM_ERROR); } if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((*ret = im_compose(local_chan,jobid,cookie,IM_GET_INFO,event,fromtask)) == DIS_SUCCESS) { if ((*ret = diswui(local_chan, pjob->ji_nodeid)) == DIS_SUCCESS) { if ((*ret = diswsi(local_chan, taskid)) == DIS_SUCCESS) { *ret = diswst(local_chan, name); DIS_tcp_wflush(local_chan); *reply_ptr = FALSE; } } } close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); free(name); return(TM_DONE); } /* END if (pjob->ji_nodeid != nodeid) */ #endif /* ndef NUMA_SUPPORT */ /* Task should be here... look for it. */ if ((ptask = task_find(pjob, taskid)) != NULL) { if ((ip = task_findinfo(ptask, name)) != NULL) { *ret = tm_reply(chan, TM_OKAY, event); if (*ret == DIS_SUCCESS) *ret = diswcs(chan, (const char *)ip->ie_info, ip->ie_len); free(name); return(TM_DONE); } } *ret = tm_reply(chan, TM_ERROR, event); if (*ret == DIS_SUCCESS) *ret = diswsi(chan, TM_ENOTFOUND); free(name); return(TM_DONE); } /* END tm_getinfo_request() */ /* * tm_resources_request * * get resource string for a node */ int tm_resources_request( struct tcp_chan *chan, job *pjob, /* I */ int prev_error, /* I */ int event, /* I */ char *cookie, /* I */ int *reply_ptr, /* O */ int *ret, /* O */ tm_task_id fromtask, /* I */ hnodent *phost, /* M */ int nodeid) /* I */ { char *jobid = pjob->ji_qs.ji_jobid; char *info = NULL; #ifndef NUMA_SUPPORT int local_socket; struct tcp_chan *local_chan = NULL; #endif if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: RESOURCES %s for node %d task %d\n", __func__, jobid, nodeid, fromtask); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } if (prev_error) return(TM_DONE); #ifndef NUMA_SUPPORT if (pjob->ji_nodeid != nodeid) { /* not me XXX */ event_alloc(IM_GET_RESC, phost, event, fromtask); local_socket = tcp_connect_sockaddr((struct sockaddr *)&phost->sock_addr,sizeof(phost->sock_addr)); if (IS_VALID_STREAM(local_socket) == FALSE) return(TM_DONE); if ((local_chan = DIS_tcp_setup(local_socket)) == NULL) { } else if ((*ret = im_compose(local_chan,jobid,cookie,IM_GET_RESC,event,fromtask)) == DIS_SUCCESS) { if ((*ret = diswui(local_chan, pjob->ji_nodeid)) == DIS_SUCCESS) { DIS_tcp_wflush(local_chan); *reply_ptr = FALSE; } } close(local_socket); if (local_chan != NULL) DIS_tcp_cleanup(local_chan); return(TM_DONE); } /* END if (pjob->ji_nodeid != nodeid) */ #endif /* ndef NUMA_SUPPORT */ info = resc_string(pjob); *ret = tm_reply(chan, TM_OKAY, event); if (*ret == DIS_SUCCESS) *ret = diswst(chan, info); if (info != NULL) free(info); return(TM_DONE); } /* END tm_resources_request() */ /* ** Input is coming from a process running on this host which ** should be part of one of the jobs I am part of. The i/o ** will take place using DIS over a tcp fd. ** ** Read the stream to get a task manager request. Format the reply ** and write it back. ** ** read ( ** jobid string ** cookie string ** command int ** event int ** from taskid int ** ) ** ** ** tm_requests only use tcp. No rpp. */ int tm_request( struct tcp_chan *chan, int version) { int command, reply = 0; int ret = DIS_SUCCESS; int rc = DIS_SUCCESS; char *jobid = NULL; char *cookie = NULL; char *oreo; job *pjob; task *ptask = NULL; vnodent *pnode; hnodent *phost; int i; int event; size_t len = -1; long ipadd; char *name = NULL; char *info = NULL; int prev_error = 0; tm_node_id nodeid; tm_task_id fromtask; pbs_attribute *at; pid_t pid; extern u_long localaddr; extern struct connection svr_conn[]; if (svr_conn[chan->sock].cn_addr != localaddr) { sprintf(log_buffer, "non-local connect"); goto err; } if (version != TM_PROTOCOL_VER) { sprintf(log_buffer, "bad protocol version %d", version); goto err; } jobid = disrst(chan, &ret); if (ret == DIS_SUCCESS) { cookie = disrst(chan, &ret); if (ret == DIS_SUCCESS) { command = disrsi(chan, &ret); if (ret == DIS_SUCCESS) { event = disrsi(chan, &ret); if (ret == DIS_SUCCESS) fromtask = disrui(chan, &ret); } } } if (ret != DIS_SUCCESS) goto err; if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "%s: job %s cookie %s task %d com %d event %d\n", __func__, jobid, cookie, fromtask, command, event); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,jobid,log_buffer); } /* * Allow a non-PBS process to be adopted * by PBS for resource accounting and possibly management * purposes. Note that this circumvents much of the protocol, * cookie checks etc. See adoptSession() for more info * DJH 26 Feb 2002. Distinguish between jobid and altid * adoptions - see adoptSession() */ if ((command == TM_ADOPT_ALTID) || (command == TM_ADOPT_JOBID)) { pid_t sid; char *id = NULL; int adoptStatus; reply = TRUE; /* Read the session id and alt/job id from tm_adopt() */ sid = disrsi(chan, &ret); if (ret == DIS_SUCCESS) { pid = disrsi(chan, &ret); if (ret == DIS_SUCCESS) { id = disrst(chan, &ret); } } if (ret != DIS_SUCCESS) { if (id) free(id); goto err; } /* Got all the info. Try to adopt the session */ adoptStatus = adoptSession(sid, pid, id, command, cookie); if (id) free(id); /* Let the tm_adopt() call know if it was adopted or not. This is synchronous - doesn't use the event stuff.*/ if ((ret = diswsi(chan, adoptStatus)) == DIS_SUCCESS) ret = DIS_tcp_wflush(chan); if (ret != DIS_SUCCESS) goto err; svr_conn[chan->sock].cn_stay_open = FALSE; free(jobid); free(cookie); return(1); } /* verify the jobid is known and the cookie matches */ if ((pjob = mom_find_job(jobid)) == NULL) { sprintf(log_buffer, "job %s not found", jobid); tm_reply(chan, TM_ERROR, event); DIS_tcp_wflush(chan); goto err; } if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING) { sprintf(log_buffer, "job %s not running", jobid); goto err; } at = &pjob->ji_wattr[JOB_ATR_Cookie]; if (!(at->at_flags & ATR_VFLAG_SET)) { sprintf(log_buffer, "job %s has no cookie", jobid); goto err; } oreo = at->at_val.at_str; if (strcmp(oreo, cookie) != 0) { sprintf(log_buffer, "job %s cookie %s message %s", jobid, oreo, cookie); goto err; } /* verify this taskid is my baby */ ptask = task_find(pjob, fromtask); if (ptask == NULL) { /* not found */ sprintf(log_buffer, "task %d in job %s not found", fromtask, jobid); log_err(-1, __func__, log_buffer); ret = tm_reply(chan, TM_ERROR, event); if (ret != DIS_SUCCESS) goto tm_req_finish; ret = diswsi(chan, TM_ENOTFOUND); if (ret != DIS_SUCCESS) goto tm_req_finish; prev_error = 1; /* * ANUPBS - DBS 21/10/02 * This line added to avoid segfault. Code path can fall thru * here and deref ptask! Problem uncovered by adopt? Problem * noticed in code with multiple pbs_dsh and prun (adopt) */ goto tm_req_finish; } else if ((ptask->ti_chan != NULL) && (ptask->ti_chan->sock != chan->sock)) { /* someone is already connected, create a new task for the new conn */ ptask = pbs_task_create(pjob, TM_NULL_TASK); if (ptask == NULL) goto err; snprintf(ptask->ti_qs.ti_parentjobid, sizeof(ptask->ti_qs.ti_parentjobid), "%s", jobid); ptask->ti_qs.ti_parentnode = pjob->ji_nodeid; ptask->ti_qs.ti_parenttask = fromtask; /* the initial connection is "from" task 1, we set this to not confuse the new connection with the old */ fromtask = ptask->ti_qs.ti_task; if (LOGLEVEL >= 6) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "saving task (additional connection)"); } if (task_save(ptask) == -1) goto err; } svr_conn[chan->sock].cn_oncl = tm_eof; if ((ptask->ti_chan != NULL) && (ptask->ti_chan != chan)) DIS_tcp_cleanup(ptask->ti_chan); ptask->ti_chan = chan; reply = TRUE; /* set no timeout so connection is not closed for being idle */ svr_conn[chan->sock].cn_authen |= PBS_NET_CONN_NOTIMEOUT; switch (command) { case TM_INIT: tm_request_init(pjob,ptask,&ret,event,prev_error); goto tm_req_finish; /*NOTREACHED*/ break; case TM_POSTINFO: rc = tm_postinfo(name,info,jobid,fromtask,prev_error,event,&ret,ptask,&len); goto tm_req_finish; /*NOTREACHED*/ break; case TM_FINALIZE: DIS_tcp_wflush(ptask->ti_chan); DIS_tcp_close(ptask->ti_chan); reply = 0; goto tm_req_finish; break; case TM_REGISTER: sprintf(log_buffer, "REGISTER - NOT IMPLEMENTED %s", jobid); tm_reply(ptask->ti_chan, TM_ERROR, event); diswsi(ptask->ti_chan, TM_ENOTIMPLEMENTED); DIS_tcp_wflush(ptask->ti_chan); goto err; /*NOTREACHED*/ break; default: /* NO-OP */ break; } /* END switch(command) */ /* ** All requests beside TM_INIT and TM_POSTINFO ** require a node number where the action will take place. ** Read that and check that it is legal. ** ** read ( ** node number int ** ) */ nodeid = disrui(ptask->ti_chan, &ret); if (ret != DIS_SUCCESS) goto err; pnode = pjob->ji_vnods; for (i = 0;i < pjob->ji_numvnod;i++, pnode++) { if (pnode->vn_node == nodeid) break; } if (i == pjob->ji_numvnod) { sprintf(log_buffer, "node %d in job %s not found", nodeid, jobid); log_err(-1, __func__, log_buffer); ret = tm_reply(ptask->ti_chan, TM_ERROR, event); if (ret == DIS_SUCCESS) ret = diswsi(ptask->ti_chan, TM_ENOTFOUND); if (ret != DIS_SUCCESS) goto tm_req_finish; prev_error = 1; } phost = pnode->vn_host; switch (command) { case TM_TASKS: rc = tm_tasks_request(ptask->ti_chan,pjob,prev_error,event,cookie,&reply,&ret,fromtask,phost,nodeid); break; case TM_SPAWN: rc = tm_spawn_request(ptask->ti_chan,pjob,prev_error,event,cookie,&reply,&ret,fromtask,phost,nodeid); break; case TM_SIGNAL: rc = tm_signal_request(ptask->ti_chan,pjob,prev_error,event,cookie,fromtask,&ret,&reply,phost,nodeid); break; case TM_OBIT: rc = tm_obit_request(ptask->ti_chan,pjob,prev_error,event,cookie,&reply,&ret,fromtask,phost,nodeid); break; case TM_GETINFO: rc = tm_getinfo_request(ptask->ti_chan,pjob,prev_error,event,cookie,&reply,&ret,fromtask,phost,nodeid); break; case TM_RESOURCES: rc = tm_resources_request(ptask->ti_chan,pjob,prev_error,event,cookie,&reply,&ret,fromtask,phost,nodeid); break; default: sprintf(log_buffer, "unknown command %d", command); tm_reply(ptask->ti_chan, TM_ERROR, event); diswsi(ptask->ti_chan, TM_EUNKNOWNCMD); DIS_tcp_wflush(ptask->ti_chan); rc = TM_ERROR; /*NOTREACHED*/ break; } /* END switch (command) */ if (rc == TM_ERROR) goto err; tm_req_finish: if (reply) { if ((ret != DIS_SUCCESS) || (DIS_tcp_wflush(ptask->ti_chan) == -1)) { if (ret >= 0) { sprintf(log_buffer, "comm failed %s when performing command %d for job %s", dis_emsg[ret], command, jobid); } else { sprintf(log_buffer, "comm failed when performing command %d for job %s", command, jobid); } log_err(errno, __func__, log_buffer); if (ptask->ti_chan != NULL) { close_conn(ptask->ti_chan->sock, FALSE); ptask->ti_chan = NULL; } free(jobid); free(cookie); return(-1); } } free(jobid); free(cookie); return(PBSE_NONE); err: if (ret != DIS_SUCCESS) { if (ret >= 0) sprintf(log_buffer, "bad header %s", dis_emsg[ret]); else sprintf(log_buffer, "bad header - communication error"); } log_err(errno, __func__, log_buffer); ipadd = svr_conn[chan->sock].cn_addr; sprintf(log_buffer, "message refused from port %d addr %ld.%ld.%ld.%ld", svr_conn[chan->sock].cn_port, (ipadd & 0xff000000) >> 24, (ipadd & 0x00ff0000) >> 16, (ipadd & 0x0000ff00) >> 8, (ipadd & 0x000000ff)); close(chan->sock); svr_conn[chan->sock].cn_stay_open = FALSE; if (jobid) free(jobid); if (cookie) free(cookie); return DIS_EOD; } /* END tm_request() */ /* * adoptSession -- * * Find a job that corresponds to a given alternative task management * id or job id and create a new task in it to monitor the usage of a * given session id. * * Result: * Returns TM_OK if the session id was adopted, and TM_ERROR if * it wasn't. The job identified by jobid or altid (eg rmsResourceId) * gets a new task, and that task has its session id set to monitor * sid. Various special values are set in the task to ensure that * PBS only monitors the new task, and doesn't attempt to control it. * * Side effects: * Saves the new task with task_save(). * Forces a mom_get_sample() * * */ static int adoptSession( pid_t sid, pid_t pid, char *jobid, int command, char *cookie) { job *pjob = NULL; task *ptask = NULL; unsigned short momport = 0; #ifdef PENABLE_LINUX26_CPUSETS unsigned int len; FILE *fp; char cpuset_path[MAXPATHLEN]; char pid_str[MAXPATHLEN]; #endif /* extern int next_sample_time; */ /* extern time_t time_resc_updated; */ /* Find the job that has this job/alt id */ for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { if (command == TM_ADOPT_JOBID) { if (strcmp(jobid, pjob->ji_qs.ji_jobid) == 0) break; else if (strchr(pjob->ji_qs.ji_jobid, '.') == NULL) { char *dot = strchr(jobid, '.'); if (dot != NULL) { *dot = '\0'; if (strcmp(jobid, pjob->ji_qs.ji_jobid) == 0) { *dot = '.'; break; } *dot = '.'; } } } else { if (strcmp(jobid, pjob->ji_altid) == 0) break; } } if (pjob == NULL) { /* Didn't find a job with this resource id. Complain. */ (void)sprintf(log_buffer, "Adoption rejected: no job with id %1.30s", jobid); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, "adoptSession()", log_buffer); return TM_ERROR; } /* * Check cookie if we can (why bother!) Should check for * correct cookie first, it might be available! */ if (strcmp(cookie, "ADOPT COOKIE")) { char *oreo; pbs_attribute *at = &pjob->ji_wattr[JOB_ATR_Cookie]; if (!(at->at_flags & ATR_VFLAG_SET)) { sprintf(log_buffer, "Adoption rejected: job %s has no cookie", pjob->ji_qs.ji_jobid); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return TM_ERROR; } oreo = at->at_val.at_str; if (strcmp(oreo, cookie) != 0) { sprintf(log_buffer, "Adoption rejected: job %s cookie %s message %s", pjob->ji_qs.ji_jobid, oreo, cookie); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return TM_ERROR; } } else { /* sprintf(log_buffer, "job %s cookie %s message %s", jobid, oreo, cookie); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); */ } /* JMB--set task ID in such a way that makes it obvious this is an adopted task */ if (pjob->maxAdoptedTaskId == TM_NULL_TASK) { pjob->maxAdoptedTaskId = TM_ADOPTED_TASKID_BASE; } /* * DJH 27 Feb 2002. * Now create a task to monitor that sid. Use a task id that isn't * going to collide with the ones given to non-adopted tasks. */ if((ptask = pbs_task_create(pjob, (pjob->ji_taskid - 1) + TM_ADOPTED_TASKID_BASE)) == NULL) return TM_ERROR; pjob->ji_taskid++; /* ti_parenttask not used but avoiding using TM_NULL_TASK as it means 'top level shell' to scan_for_exiting() */ ptask->ti_qs.ti_parenttask = TM_NULL_TASK + 1; ptask->ti_qs.ti_sid = sid; ptask->ti_qs.ti_status = TI_STATE_RUNNING; (void)task_save(ptask); /* Mark the job as running if we need to. This is copied from start_process() */ if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING) { pjob->ji_qs.ji_state = JOB_STATE_RUNNING; pjob->ji_qs.ji_substate = JOB_SUBSTATE_RUNNING; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); } if (mom_get_sample() == PBSE_NONE) { /* time_resc_updated = time_now; */ (void)mom_set_use(pjob); } #ifdef PENABLE_LINUX26_CPUSETS /* add to the cpuset */ snprintf(cpuset_path,sizeof(cpuset_path), "/dev/cpuset/torque/%s/tasks", pjob->ji_qs.ji_jobid); snprintf(pid_str,sizeof(pid_str),"%d",pid); fp = fopen(cpuset_path,"w"); len = strlen(pid_str); if (fp != NULL) { if (fwrite(pid_str,sizeof(char),len,fp) != len) { snprintf(log_buffer,sizeof(log_buffer), "Unable to add process (%s) to the job's cpuset (%s)\n", pid_str, cpuset_path); log_err(-1, __func__, log_buffer); } fclose(fp); } else { snprintf(log_buffer,sizeof(log_buffer), "Unable to open the cpuset's task file (%s)\n", cpuset_path); log_err(-1, __func__, log_buffer); } #endif /* def PENABLE_LINUX26_CPUSETS */ /* next_sample_time = 45; */ (void)sprintf(log_buffer, "Task adopted. id=%1.30s, sid = %d", jobid, sid); DBPRT(("%s\n", log_buffer)); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return TM_OKAY; } /* * cat_dirs -- * * Concatenate root and base into a new string and return the result * * Result: * if root is null only the base value is returned. Otherwise a string * with the root followed by base is returned. If memory cannot be * allocated NULL is returned. * * Side effects: * If a non-null value is returned the new string will need * to later be freed * * */ char *cat_dirs( char *root, char *base) { char *pn; int len = 0; if (root) len = strlen(root); len += strlen(base); pn = (char *)calloc(1, len+2); if (!pn) { return(NULL); } if (root) { strcpy(pn, root); strcat(pn, base); } else strcpy(pn, base); return(pn); } /* * get_local_script_path -- * * takes a path given by base and prepends * the PBS_O_WORKDIR if base is a relative path. That is, if * base does not begin with '/'. Otherwise it returns * the value of base. cat_dirs allocates memory for the new * string so this has to be freed. * * If a null string is returned it is because cat_dirs could * not allocate memory */ char *get_local_script_path( job *pjob, char *base) { char *wdir; size_t len; char *pn = NULL; /* see if base is an absolute path*/ if (base[0] != '/') { /* base is not an absolute path. Prepend it with the working directory */ if((wdir = get_job_envvar(pjob, "PBS_O_WORKDIR")) == NULL) return NULL; len = strlen(wdir); if (wdir[len-1] != '/') strcat(wdir, "/"); pn = cat_dirs(wdir, base); } else pn = cat_dirs(NULL, base); /* cat_dirs will allocate memory to hold our string */ return(pn); } /* Get the job info. If the job exists get it. If not make a new one */ int get_job_struct( job **pjob, char *jobid, int command, struct tcp_chan *chan, struct sockaddr_in *addr, tm_node_id nodeid) { int ret; job *new_job; new_job = mom_find_job(jobid); if (new_job != NULL) { /* job already exists locally */ if (new_job->ji_qs.ji_substate == JOB_SUBSTATE_PRERUN) { if (LOGLEVEL >= 3) { /* if peer mom times out, MS will send new join request for same job */ sprintf(log_buffer, "WARNING: duplicate JOIN request %s from %s (purging previous pjob)", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr)); log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } mom_job_purge(new_job); } else { if (LOGLEVEL >= 0) { sprintf(log_buffer, "ERROR: received request '%s' from %s (job already exists locally)", PMOMCommand[MIN(command,IM_MAX)], netaddr(addr)); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buffer); } /* should local job be purged, ie 'mom_job_purge(pjob);' ? */ ret = PBSE_JOBEXIST; goto done; } } /* END if (pjob != NULL) */ if ((new_job = job_alloc()) == NULL) { /* out of memory */ log_err(-1, __func__, "insufficient memory to create job"); ret = PBSE_SYSTEM; goto done; } new_job->ji_portout = disrsi(chan, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "%s: join_job request to node %d for job %s failed - %s (stdout)", __func__, nodeid, jobid, dis_emsg[ret]); log_err(-1, __func__, log_buffer); ret = PBSE_DISPROTO; goto done; } new_job->ji_porterr = disrsi(chan, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "%s: join_job request to node %d for job %s failed - %s (stderr)", __func__, nodeid, jobid, dis_emsg[ret]); log_err(-1, __func__, log_buffer); ret = PBSE_DISPROTO; goto done; } ret = PBSE_NONE; *pjob = new_job; done: return(ret); } /* END get_job_struct() */ int run_prologue_scripts( job *pjob) { int j; int ret; /* run local prolog */ if ((j = run_pelog(PE_PROLOG, path_prologp, pjob, PE_IO_TYPE_ASIS, FALSE)) != 0) { snprintf(log_buffer,sizeof(log_buffer), "cannot run local prolog '%s': %s (rc: %d)\n", path_prologp, log_buffer, j); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); ret = PBSE_SYSTEM; goto done; } /* run user prolog */ if ((j = run_pelog(PE_PROLOGUSER, path_prologuserp, pjob, PE_IO_TYPE_ASIS, FALSE)) != 0) { snprintf(log_buffer,sizeof(log_buffer), "cannot run local user prolog '%s': %s (rc: %d)\n", path_prologuserp, log_buffer, j); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); ret = PBSE_SYSTEM; goto done; } ret = PBSE_NONE; done: return(ret); } /* END run_prologue_scripts() */ #define READ_BUF_SIZE 1024 int readit( int sock, int fd) { int amt; char buf[READ_BUF_SIZE]; size_t ret; if ((amt = recv(sock, buf, READ_BUF_SIZE, 0)) > 0) { ret = send(fd, buf, amt, 0); if (ret == (size_t) -1) { close(sock); close(fd); FD_CLR(sock, &readset); } } else { close(sock); FD_CLR(sock, &readset); } return(amt); } /* END readit() */ void demux_wait( int sig) { wait(0); } void fork_demux( job *pjob) { pid_t cpid; struct timeval timeout; int i; int retries; int maxfd; int n; int newsock; int fd1; int fd2; int im_mom_stdout; int im_mom_stderr; fd_set selset; pid_t parent; u_long ipaddr; struct sigaction act; struct routefd *routem; int open_sockets = 0; int amt_read = 0; int pipes[2]; int pipe_failed = FALSE; char buf[MAXLINE]; if ((maxfd = sysconf(_SC_OPEN_MAX)) < 0) { log_err(errno, __func__, "call to sysconf() failed"); return; } routem = (struct routefd *)calloc(sizeof(struct routefd), maxfd); if (routem == NULL) { fprintf(stderr, "cannot allocate memory in fork_demux"); return; } for (i = 0; i < maxfd; i++) { routem[i].r_which = invalid; routem[i].r_fd = -1; } /* set up signal handler so we know when our child process goes away */ sigemptyset(&act.sa_mask); act.sa_flags = SA_NOCLDSTOP; act.sa_handler = demux_wait; sigaction(SIGCHLD, &act, NULL); im_mom_stdout = dup(pjob->ji_im_stdout); if (im_mom_stdout == -1) { fprintf(stderr, "could not dup stdout in fork_demux"); free(routem); return; } close(pjob->ji_im_stdout); im_mom_stderr = dup(pjob->ji_im_stderr); if (im_mom_stdout == -1) { fprintf(stderr, "could not dup stdout in fork_demux"); free(routem); return; } close(pjob->ji_im_stderr); routem[im_mom_stdout].r_which = listen_out; routem[im_mom_stderr].r_which = listen_err; parent = getppid(); /* create pipe so that child can tell us when the demux is opened */ if (pipe(pipes) == -1) { log_err(errno, __func__, "Couldn't create the pipe!"); pipe_failed = TRUE; } cpid = fork_me(-1); if (cpid) { /* parent - wait for the child to confirm the demux was opened */ close(pipes[1]); /* parent doesn't use the write end */ if (pipe_failed == TRUE) sleep(2); else { /* read the pipe and then continue */ while ((read_ac_socket(pipes[0], &buf, sizeof(buf) - 1) == -1) && (errno == EINTR)) ; /* NYI: what should we do if opening the buf failed? !strcmp(buf, "fail") */ close(pipes[0]); } free(routem); close(im_mom_stderr); close(im_mom_stdout); return; } /* child - open the demux and then inform the parent */ close(pipes[0]); /* child doesn't use the read end */ ipaddr = pjob->ji_sisters[0].sock_addr.sin_addr.s_addr; /* maxfd = sysconf(_SC_OPEN_MAX); */ FD_ZERO(&readset); FD_SET(im_mom_stdout, &readset); FD_SET(im_mom_stderr, &readset); if (listen(im_mom_stdout, TORQUE_LISTENQUEUE) < 0) { perror("listen on out"); close(im_mom_stdout); close(im_mom_stderr); if (write_ac_socket(pipes[1], "fail", strlen("fail")) < 0) perror(__func__); close(pipes[1]); _exit(5); } if (listen(im_mom_stderr, TORQUE_LISTENQUEUE) < 0) { perror("listen on err"); close(im_mom_stdout); close(im_mom_stderr); if (write_ac_socket(pipes[1], "fail", strlen("fail")) < 0) perror(__func__); close(pipes[1]); _exit(5); } /* We have our sockets open for listening. Now we can connect to those listen ports on our parent MOM */ retries = 0; do { fd1 = open_demux(ipaddr, pjob->ji_portout); if (fd1 >= 0) break; usleep(500000); retries++; } while(retries < 10); if (retries >= 10) { perror("could not open demux to parent"); close(im_mom_stdout); close(im_mom_stderr); if (write_ac_socket(pipes[1], "fail", strlen("fail")) < 0) perror(__func__); close(pipes[1]); _exit(5); } fd2 = open_demux(ipaddr, pjob->ji_porterr); if (fd2 < 0) { perror("cannot open mux stderr port"); close(im_mom_stdout); close(im_mom_stderr); close(fd1); if (write_ac_socket(pipes[1], "fail", strlen("fail")) < 0) perror(__func__); close(pipes[1]); _exit(5); } if (write_ac_socket(pipes[1], "success", strlen("success")) < 0) perror(__func__); close(pipes[1]); while (1) { selset = readset; timeout.tv_usec = 0; timeout.tv_sec = 20; n = select(FD_SETSIZE, &selset, (fd_set *)0, (fd_set *)0, &timeout); if (n == -1) { if (errno == EINTR) { n = 0; } else { perror("fork_demux: select failed\n"); close(im_mom_stdout); close(im_mom_stderr); close(fd1); close(fd2); _exit(1); } } else if (n == 0) { /* NOTE: on TRU64, init process does not have pid==1 */ if (getppid() != parent) { #ifdef DEBUG fprintf(stderr, "%s: Parent has gone, and so will I\n", __func__); #endif /* DEBUG */ break; } } /* END else if (n == 0) */ for (i = 0;(n != 0) && (i < maxfd);++i) { if (FD_ISSET(i, &selset)) { /* this socket has data */ n--; switch (routem[i].r_which) { case listen_out: case listen_err: newsock = accept(i, 0, 0); if (newsock < 0) { perror("accept failed"); close(fd1); close(fd2); close(im_mom_stdout); close(im_mom_stderr); _exit(5); } routem[newsock].r_which = routem[i].r_which == listen_out ? new_out : new_err; routem[newsock].r_fd = newsock; open_sockets++; FD_SET(newsock, &readset); break; case new_out: amt_read = readit(i, fd1); if (amt_read <= 0) { routem[i].r_fd = -1; routem[i].r_which = invalid; } break; case new_err: amt_read = readit(i, fd2); if (amt_read <= 0) { routem[i].r_fd = -1; routem[i].r_which = invalid; } break; default: if (routem[i].r_which == invalid) { continue; } perror("internal error"); close(fd1); close(fd2); close(im_mom_stdout); close(im_mom_stderr); _exit(2); /*NOTREACHED*/ break; } } } } /* END while(1) */ close(fd1); close(fd2); close(im_mom_stdout); close(im_mom_stderr); _exit(0); } /* END fork_demux() */ void send_update_soon() { int sindex; int amount_of_time = ServerStatUpdateInterval / 3; /* force an update reasonably soon */ if (time_now - LastServerUpdateTime > amount_of_time) { LastServerUpdateTime = 0; for (sindex = 0; sindex < PBS_MAXSERVER; sindex++) { mom_servers[sindex].MOMLastSendToServerTime = 0; } } else { time_t temp = time_now - ServerStatUpdateInterval + amount_of_time; if (temp < LastServerUpdateTime) { LastServerUpdateTime = temp; for (sindex = 0; sindex < PBS_MAXSERVER; sindex++) { mom_servers[sindex].MOMLastSendToServerTime = temp; } } } } /* END send_update_soon() */ received_node *get_received_node_entry( char *str) { received_node *rn; int index; char *hostname; if (str == NULL) return(NULL); hostname = str + strlen("node="); /* get the old node for this table if present. If not, create a new one */ index = get_value_hash(received_table, hostname); if (index == -1) { rn = (received_node *)calloc(1, sizeof(received_node)); if (rn == NULL) { log_err(ENOMEM, __func__, "No memory to allocate for status information\n"); return(NULL); } /* initialize the received node struct */ rn->statuses = get_dynamic_string(MAXLINE,NULL); snprintf(rn->hostname, sizeof(rn->hostname), "%s", hostname); if (rn->statuses == NULL) { log_err(ENOMEM, __func__, "No memory to allocate for status information\n"); free(rn); return(NULL); } rn->hellos_sent = 0; if (LOGLEVEL >= 7) { snprintf(log_buffer,sizeof(log_buffer), "Received first status from mom %s", hostname); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE, __func__, log_buffer); } /* add the new node to the received status list */ index = insert_thing(received_statuses, rn); if (index == -1) log_err(ENOMEM, __func__, "No memory to resize the received_statuses array...SYSTEM FAILURE\n"); else { add_hash(received_table, index, rn->hostname); send_update_soon(); } } else { rn = (received_node *)received_statuses->slots[index].item; /* make sure we aren't hold 2 statuses for the same node */ clear_dynamic_string(rn->statuses); if (LOGLEVEL >= 10) { snprintf(log_buffer,sizeof(log_buffer), "Received status update from mom %s", hostname); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE, __func__, log_buffer); } } return(rn); } /* END get_received_node_entry() */ /* * reads the status strings sent from another mom * * @param fds - the stream being read * @param version - the protocol version * @param hostname - the hostname that is sending to us */ int read_status_strings( struct tcp_chan *chan, int version) /* I */ { int rc; char *str; received_node *rn = NULL; if (chan == NULL) { return DIS_INVALID; } /* was mom_port but storage unnecessary */ disrsi(chan,&rc); if (rc == DIS_SUCCESS) { /* was rm_port but no longer needed to be stored */ disrsi(chan,&rc); } if (rc != DIS_SUCCESS) { return(rc); } /* read each string */ while (((str = disrst(chan,&rc)) != NULL) && (rc == DIS_SUCCESS)) { /* terminate on end message */ if (!strcmp(str, IS_EOL_MESSAGE)) { free(str); str = NULL; break; } if (!strncmp(str, "node=", strlen("node="))) rn = get_received_node_entry(str); /* place each string into the buffer */ if (rn != NULL) { copy_to_end_of_dynamic_string(rn->statuses, str); } free(str); } if (str != NULL) free(str); if ((rc == DIS_SUCCESS) || (rc == DIS_EOF)) { /* SUCCESS */ write_tcp_reply(chan, IS_PROTOCOL, IS_PROTOCOL_VER, IS_STATUS, PBSE_NONE); updates_waiting_to_send++; if (updates_waiting_to_send >= maxupdatesbeforesending) { if (LOGLEVEL >= 3) { snprintf(log_buffer, sizeof(log_buffer), "Forcing update because I have received %d updates", updates_waiting_to_send); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE, __func__, log_buffer); } send_update_soon(); } } return(PBSE_NONE); } /* END read_status_strings() */ int is_ptask_corrupt( struct tcp_chan *chan) /* Input */ { char log_buf[LOCAL_LOG_BUF_SIZE]; struct tcpdisbuf *tp = &chan->writebuf; if (tp->tdis_bufsize == 0) { snprintf(log_buf,sizeof(log_buf), "write buffer's tdis_bufsize was unexpectely found with a value of 0"); log_err(-1, __func__, log_buf); return 1; } tp = &chan->readbuf; if (tp->tdis_bufsize == 0) { snprintf(log_buf,sizeof(log_buf), "read buffer's tdis_bufsize was unexpectely found with a value of 0"); log_err(-1, __func__, log_buf); return -1; } return 0; } /* END mom_comm.c */