/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ #include /* the master config generated by configure */ /* define the following so we get prototype for getsid() */ #define _XOPEN_SOURCE #define _XOPEN_SOURCE_EXTENDED 1 #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef _AIX #include #endif /* _AIX */ #include "dis.h" #include "dis_init.h" #include "tm.h" #include "net_connect.h" #include "pbs_ifl.h" #include "../Libnet/lib_net.h" #include "../Liblog/pbs_log.h" /* print_trace */ /* ** Set up a debug print macro. */ #ifdef DEBUG #define TM_DBPRT(x) \ { \ int err = errno; \ printf x; \ errno = err; \ } #else #define TM_DBPRT(x) #endif #ifndef MIN #define MIN(a, b) (((a) < (b)) ? (a) : (b)) #endif /* ** Allocate some string space to hold the values passed in the ** enviornment from MOM. */ static char *tm_jobid = NULL; static int tm_jobid_len = 0; static char *tm_jobcookie = NULL; static int tm_jobcookie_len = 0; static tm_task_id tm_jobtid = TM_NULL_TASK; static tm_node_id tm_jobndid = TM_ERROR_NODE; static int tm_momport = 0; static int local_conn = -1; static struct tcp_chan *static_chan = NULL; int init_done = 0; int *tm_conn = &local_conn; int event_count = 0; /* ** Events are the central focus of this library. They are tracked ** in a hash table. Many of the library calls return events. They ** are recorded and as information is received from MOM's, the ** event is updated and marked so tm_poll() can return it to the user. */ #define EVENT_HASH 128 typedef struct event_info { tm_event_t e_event; /* event number */ tm_node_id e_node; /* destination node */ int e_mtype; /* message type sent */ void *e_info; /* possible returned info */ struct event_info *e_next; /* link to next event */ struct event_info *e_prev; /* link to prev event */ } event_info; static event_info *event_hash[EVENT_HASH]; /* ** Find an event number or return a NULL. */ event_info *find_event( tm_event_t x) { event_info *ep; for (ep = event_hash[x % EVENT_HASH]; ep; ep = ep->e_next) { if (ep->e_event == x) break; } return ep; } /* ** Delete an event. */ void del_event( event_info *ep) { /* unlink event from hash list */ if (ep->e_prev) ep->e_prev->e_next = ep->e_next; else event_hash[ep->e_event % EVENT_HASH] = ep->e_next; if (ep->e_next) ep->e_next->e_prev = ep->e_prev; /* ** Free any memory saved with the event. This depends ** on whay type of event it is. */ switch (ep->e_mtype) { case TM_INIT: case TM_SPAWN: case TM_SIGNAL: case TM_OBIT: case TM_POSTINFO: break; case TM_TASKS: case TM_GETINFO: case TM_RESOURCES: free(ep->e_info); break; default: TM_DBPRT(("del_event: unknown event command %d\n", ep->e_mtype)) break; } free(ep); if (--event_count == 0) { close(local_conn); local_conn = -1; } return; } /* ** Create a new event number. */ tm_event_t new_event(void) { static tm_event_t next_event = TM_NULL_EVENT + 1; event_info *ep; tm_event_t ret; if (next_event == INT_MAX) next_event = TM_NULL_EVENT + 1; for (;;) { ret = next_event++; for (ep = event_hash[ret % EVENT_HASH]; ep; ep = ep->e_next) { if (ep->e_event == ret) break; /* inner loop: this number is in use */ } if (ep == NULL) break; /* this number is not in use */ } return ret; } /* ** Link new event number into the above hash table. */ void add_event( tm_event_t event, tm_node_id node, int type, void *info) { event_info *ep, **head; ep = (event_info *)calloc(1, sizeof(event_info)); assert(ep != NULL); head = &event_hash[event % EVENT_HASH]; ep->e_event = event; ep->e_node = node; ep->e_mtype = type; ep->e_info = info; ep->e_next = *head; ep->e_prev = NULL; if (*head) (*head)->e_prev = ep; *head = ep; event_count++; return; } /* ** Sessions must be tracked by the library so tm_taskid objects ** can be resolved into real tasks on real nodes. ** We will use a hash table. */ #define TASK_HASH 256 typedef struct task_info { char *t_jobid; /* jobid */ tm_task_id t_task; /* task id */ tm_node_id t_node; /* node id */ struct task_info *t_next; /* link to next task */ } task_info; static task_info *task_hash[TASK_HASH]; /* ** Find a task table entry for a given task number or return a NULL. */ task_info *find_task( tm_task_id x) { task_info *tp; for (tp = task_hash[x % TASK_HASH]; tp; tp = tp->t_next) { if (tp->t_task == x) break; } return tp; } /* ** Create a new task entry and link it into the above hash ** table. */ tm_task_id new_task( char *jobid, tm_node_id node, tm_task_id task) { task_info *tp, **head; TM_DBPRT(("%s: jobid=%s node=%d task=%lu\n", __func__, jobid, node, (unsigned long)task)) if (jobid != tm_jobid && strcmp(jobid, tm_jobid) != 0) { TM_DBPRT(("%s: task job %s not my job %s\n", __func__, jobid, tm_jobid)) return TM_NULL_TASK; } if (node == TM_ERROR_NODE) { TM_DBPRT(("%s: called with TM_ERROR_NODE\n", __func__)) return TM_NULL_TASK; } if ((tp = find_task(task)) != NULL) { TM_DBPRT(("%s: task %lu found with node %d should be %d\n", __func__, (unsigned long)task, tp->t_node, node)) return task; } if ((tp = (task_info *)calloc(1, sizeof(task_info))) == NULL) return TM_NULL_TASK; head = &task_hash[task % TASK_HASH]; tp->t_jobid = tm_jobid; tp->t_task = task; tp->t_node = node; tp->t_next = *head; *head = tp; return task; } /* ** Delete a task. === === right now, this is not used. === static void del_task(x) tm_task_id x; { task_info *tp, *prev; prev = NULL; for (tp=task_hash[x % TASK_HASH]; tp; prev=tp, tp=tp->t_next) { if (tp->t_task == x) break; } if (tp) { if (prev) prev->t_next = tp->t_next; else task_hash[x % TASK_HASH] = tp->t_next; tp->t_next = NULL; if (tp->t_jobid != tm_jobid) free(tp->t_jobid); free(tp); } return; } */ /* ** The nodes are tracked in an array. */ static tm_node_id *node_table = NULL; /* ** localmom() - make a connection to the local pbs_mom ** ** The connection will remain open as long as there is an ** outstanding event. */ #define PBS_NET_RC_FATAL -1 #define PBS_NET_RC_RETRY -2 static int localmom(void) { static int have_addr = 0; static struct in_addr hostaddr; struct addrinfo *addr_info; int i; int sock; struct sockaddr_in remote; struct linger ltime; if (local_conn >= 0) { return(local_conn); /* already have open connection */ } memset(&remote, 0, sizeof(remote)); if (have_addr == 0) { /* lookup "localhost" and save address */ if (pbs_getaddrinfo("localhost", NULL, &addr_info) != 0) { TM_DBPRT(("tm_init: localhost not found\n")) return(-1); } hostaddr = ((struct sockaddr_in *)addr_info->ai_addr)->sin_addr; have_addr = 1; } for (i = 0;i < 5;i++) { /* get socket */ sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { return(-1); } #ifndef HAVE_POLL if (sock >= FD_SETSIZE) { close(sock); return(-1); } #endif /* make sure data goes out */ ltime.l_onoff = 1; ltime.l_linger = 5; setsockopt(sock, SOL_SOCKET, SO_LINGER, <ime, sizeof(ltime)); /* connect to specified local pbs_mom and port */ remote.sin_addr = hostaddr; remote.sin_port = htons((unsigned short)tm_momport); remote.sin_family = AF_INET; if (connect(sock, (struct sockaddr *)&remote, sizeof(remote)) < 0) { switch (errno) { case EINTR: case EADDRINUSE: case ETIMEDOUT: case ECONNREFUSED: close(sock); sleep(1); continue; /*NOTREACHED*/ break; default: close(sock); return(-1); /*NOTREACHED*/ break; } } else { local_conn = sock; break; } } /* END for (i) */ return(local_conn); } /* END localmom() */ /* ** startcom() - send request header to local pbs_mom. ** If required, make connection to her. */ static int startcom( int com, tm_event_t event, struct tcp_chan **pchan) { int ret = DIS_SUCCESS; struct tcp_chan *chan = NULL; if (localmom() == -1) { return(-1); } if ((chan = DIS_tcp_setup(local_conn)) == NULL) goto done; ret = diswsi(chan, TM_PROTOCOL); if (ret != DIS_SUCCESS) goto done; ret = diswsi(chan, TM_PROTOCOL_VER); if (ret != DIS_SUCCESS) goto done; ret = diswcs(chan, tm_jobid, tm_jobid_len); if (ret != DIS_SUCCESS) goto done; ret = diswcs(chan, tm_jobcookie, tm_jobcookie_len); if (ret != DIS_SUCCESS) goto done; ret = diswsi(chan, com); if (ret != DIS_SUCCESS) goto done; ret = diswsi(chan, event); if (ret != DIS_SUCCESS) goto done; ret = diswui(chan, tm_jobtid); if (ret != DIS_SUCCESS) goto done; *pchan = chan; return(DIS_SUCCESS); done: TM_DBPRT(("startcom: send error %s\n", dis_emsg[ret])) if (chan != NULL) DIS_tcp_close(chan); else close(local_conn); local_conn = -1; return(ret); } /* END startcom() */ /* ** Initialize the Task Manager interface. */ #ifdef __cplusplus extern "C" { #endif int tm_init( void *info, /* in, currently unused */ struct tm_roots *roots) /* out */ { tm_event_t nevent, revent; char *env, *hold; int err; int nerr = 0; struct tcp_chan *chan = NULL; if (init_done) { return(TM_BADINIT); } if ((tm_jobid = getenv("PBS_JOBID")) == NULL) { return(TM_EBADENVIRONMENT); } tm_jobid_len = strlen(tm_jobid); if ((tm_jobcookie = getenv("PBS_JOBCOOKIE")) == NULL) return TM_EBADENVIRONMENT; tm_jobcookie_len = strlen(tm_jobcookie); if ((env = getenv("PBS_NODENUM")) == NULL) return TM_EBADENVIRONMENT; tm_jobndid = (tm_node_id)strtol(env, &hold, 10); if (env == hold) return TM_EBADENVIRONMENT; if ((env = getenv("PBS_TASKNUM")) == NULL) return TM_EBADENVIRONMENT; if ((tm_jobtid = atoi(env)) == 0) return TM_EBADENVIRONMENT; if ((env = getenv("PBS_MOMPORT")) == NULL) return TM_EBADENVIRONMENT; if ((tm_momport = atoi(env)) == 0) return TM_EBADENVIRONMENT; init_done = 1; nevent = new_event(); /* * send the following request: * header (tm_init) * int node number * int task number */ if (startcom(TM_INIT, nevent, &chan) != DIS_SUCCESS) return TM_ESYSTEM; DIS_tcp_wflush(chan); DIS_tcp_cleanup(chan); add_event(nevent, TM_ERROR_NODE, TM_INIT, (void *)roots); while (TRUE) { if ((err = tm_poll(TM_NULL_EVENT, &revent, 1, &nerr)) != TM_SUCCESS) return err; if (event_count == 0) break; } return nerr; } /* ** Copy out node info. No communication with pbs_mom is needed. */ int tm_nodeinfo( tm_node_id **list, int *nnodes) { tm_node_id *np; int i; int n = 0; if (!init_done) { return (TM_BADINIT); } if (node_table == NULL) { return (TM_ESYSTEM); } for (np = node_table; *np != TM_ERROR_NODE; np++) n++; /* how many nodes */ if ((np = (tm_node_id *)calloc(n,sizeof(tm_node_id))) == NULL) { /* FAILURE - cannot alloc memory */ return(TM_ERROR); } for (i = 0; i < n; i++) np[i] = node_table[i]; *list = np; *nnodes = i; return(TM_SUCCESS); } /* END tm_nodeinfo() */ /* ** Starts [0] with environment at . */ int tm_spawn( int argc, /* in */ char **argv, /* in */ char **envp, /* in */ tm_node_id where, /* in */ tm_task_id *tid, /* out */ tm_event_t *event) /* out */ { int rc = TM_SUCCESS; char *cp; int i; struct tcp_chan *chan = NULL; /* NOTE: init_done is global */ if (!init_done) { return(TM_BADINIT); } if ((argc <= 0) || (argv == NULL) || (argv[0] == NULL) || (*argv[0] == '\0')) { return(TM_ENOTFOUND); } *event = new_event(); if (startcom(TM_SPAWN, *event, &chan) != DIS_SUCCESS) { return(TM_ENOTCONNECTED); } if (diswsi(chan, where) != DIS_SUCCESS) /* send where */ { rc = TM_ENOTCONNECTED; goto tm_spawn_cleanup; } if (diswsi(chan, argc) != DIS_SUCCESS) /* send argc */ { rc = TM_ENOTCONNECTED; goto tm_spawn_cleanup; } /* send argv strings across */ for (i = 0;i < argc;i++) { cp = argv[i]; if (diswcs(chan, cp, strlen(cp)) != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_spawn_cleanup; } } /* send envp strings across */ if (getenv("PBSDEBUG") != NULL) { if (diswcs(chan, "PBSDEBUG=1", strlen("PBSDEBUG=1")) != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_spawn_cleanup; } } if (envp != NULL) { for (i = 0;(cp = envp[i]) != NULL;i++) { if (diswcs(chan, cp, strlen(cp)) != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_spawn_cleanup; } } } if (diswcs(chan, "", 0) != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_spawn_cleanup; } DIS_tcp_wflush(chan); add_event(*event, where, TM_SPAWN, (void *)tid); tm_spawn_cleanup: if (chan != NULL) DIS_tcp_cleanup(chan); return rc; } /* END tm_spawn() */ /* ** Sends a signal to all the process groups in the task ** signified by the handle, . */ int tm_kill( tm_task_id tid, /* in */ int sig, /* in */ tm_event_t *event) /* out */ { int rc = TM_SUCCESS; task_info *tp; struct tcp_chan *chan = NULL; if (!init_done) { rc = TM_BADINIT; goto tm_kill_cleanup; } if ((tp = find_task(tid)) == NULL) { rc = TM_ENOTFOUND; goto tm_kill_cleanup; } *event = new_event(); if (startcom(TM_SIGNAL, *event, &chan) != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_kill_cleanup; } if (diswsi(chan, tp->t_node) != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_kill_cleanup; } if (diswsi(chan, tid) != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_kill_cleanup; } if (diswsi(chan, sig) != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_kill_cleanup; } DIS_tcp_wflush(chan); add_event(*event, tp->t_node, TM_SIGNAL, NULL); tm_kill_cleanup: if (chan != NULL) DIS_tcp_cleanup(chan); return rc; } /* ** Returns an event that can be used to learn when a task ** dies. */ int tm_obit( tm_task_id tid, /* in */ int *obitval, /* out */ tm_event_t *event) /* out */ { int rc = TM_SUCCESS; task_info *tp; struct tcp_chan *chan = NULL; if (!init_done) { rc = TM_BADINIT; goto tm_obit_cleanup; } if ((tp = find_task(tid)) == NULL) { rc = TM_ENOTFOUND; goto tm_obit_cleanup; } *event = new_event(); if (startcom(TM_OBIT, *event, &chan) != DIS_SUCCESS) { rc = TM_ESYSTEM; goto tm_obit_cleanup; } if (diswsi(chan, tp->t_node) != DIS_SUCCESS) { rc = TM_ESYSTEM; goto tm_obit_cleanup; } if (diswsi(chan, tid) != DIS_SUCCESS) { rc = TM_ESYSTEM; goto tm_obit_cleanup; } DIS_tcp_wflush(chan); add_event(*event, tp->t_node, TM_OBIT, (void *)obitval); tm_obit_cleanup: if (chan != NULL) DIS_tcp_cleanup(chan); return rc; } struct taskhold { tm_task_id *list; int size; int *ntasks; }; /* ** Makes a request for the list of tasks on . If ** is a valid node number, it returns the event that the list of ** tasks on is available. */ int tm_taskinfo( tm_node_id node, /* in */ tm_task_id *tid_list, /* out */ int list_size, /* in */ int *ntasks, /* out */ tm_event_t *event) /* out */ { struct taskhold *thold; struct tcp_chan *chan = NULL; if (!init_done) return TM_BADINIT; if (tid_list == NULL || list_size == 0 || ntasks == NULL) return TM_EBADENVIRONMENT; *event = new_event(); if (startcom(TM_TASKS, *event, &chan) != DIS_SUCCESS) return TM_ESYSTEM; if (diswsi(chan, node) != DIS_SUCCESS) { DIS_tcp_cleanup(chan); return TM_ESYSTEM; } DIS_tcp_wflush(chan); DIS_tcp_cleanup(chan); thold = (struct taskhold *)calloc(1, sizeof(struct taskhold)); assert(thold != NULL); thold->list = tid_list; thold->size = list_size; thold->ntasks = ntasks; add_event(*event, node, TM_TASKS, (void *)thold); return TM_SUCCESS; } /* ** Returns the job-relative node number that holds or held . In ** case of an error, it returns TM_ERROR_NODE. */ int tm_atnode( tm_task_id tid, /* in */ tm_node_id *node) /* out */ { task_info *tp; if (!init_done) return TM_BADINIT; if ((tp = find_task(tid)) == NULL) return TM_ENOTFOUND; *node = tp->t_node; return TM_SUCCESS; } struct reschold { char *resc; int len; }; /* ** Makes a request for a string specifying the resources ** available on . If is a valid node number, it ** returns the event that the string specifying the resources on ** is available. It returns ERROR_EVENT otherwise. */ int tm_rescinfo( tm_node_id node, /* in */ char *resource, /* out */ int len, /* in */ tm_event_t *event) /* out */ { struct reschold *rhold; struct tcp_chan *chan = NULL; if (!init_done) return TM_BADINIT; if (resource == NULL || len == 0) return TM_EBADENVIRONMENT; *event = new_event(); if (startcom(TM_RESOURCES, *event, &chan) != DIS_SUCCESS) return TM_ESYSTEM; if (diswsi(chan, node) != DIS_SUCCESS) { DIS_tcp_cleanup(chan); return TM_ESYSTEM; } DIS_tcp_wflush(chan); DIS_tcp_cleanup(chan); rhold = (struct reschold *)calloc(1, sizeof(struct reschold)); assert(rhold != NULL); rhold->resc = resource; rhold->len = len; add_event(*event, node, TM_RESOURCES, (void *)rhold); return TM_SUCCESS; } /* END tm_rescinfo() */ /* ** Posts the first of a copy of * within MOM on ** this node, and associated with this task. If is ** non-NULL, it returns the event that the effort to post * ** is complete. It returns ERROR_EVENT otherwise. */ int tm_publish( char *name, /* in */ void *info, /* in */ int len, /* in */ tm_event_t *event) /* out */ { int rc = TM_SUCCESS; struct tcp_chan *chan = NULL; if (!init_done) return TM_BADINIT; *event = new_event(); if (startcom(TM_POSTINFO, *event, &chan) != DIS_SUCCESS) return TM_ESYSTEM; if (diswst(chan, name) != DIS_SUCCESS) { rc = TM_ESYSTEM; goto tm_publish_cleanup; } if (diswcs(chan, (char *)info, len) != DIS_SUCCESS) { rc = TM_ESYSTEM; goto tm_publish_cleanup; } DIS_tcp_wflush(chan); add_event(*event, TM_ERROR_NODE, TM_POSTINFO, NULL); tm_publish_cleanup: if (chan != NULL) DIS_tcp_cleanup(chan); return rc; } /* tm_publish() */ struct infohold { void *info; int len; int *info_len; }; /* ** Makes a request for a copy of the info posted by . If ** is a valid task, it returns the event that the ** string specifying the info posted by is available. */ int tm_subscribe( tm_task_id tid, /* in */ char *name, /* in */ void *info, /* out */ int len, /* in */ int *info_len,/* out */ tm_event_t *event) /* out */ { int rc = TM_SUCCESS; task_info *tp; struct tcp_chan *chan = NULL; struct infohold *ihold; if (!init_done) { rc = TM_BADINIT; goto tm_subscribe_cleanup; } if ((tp = find_task(tid)) == NULL) { rc = TM_ENOTFOUND; goto tm_subscribe_cleanup; } *event = new_event(); if (startcom(TM_GETINFO, *event, &chan) != DIS_SUCCESS) { rc = TM_ESYSTEM; goto tm_subscribe_cleanup; } if (diswsi(chan, tp->t_node) != DIS_SUCCESS) { rc = TM_ESYSTEM; goto tm_subscribe_cleanup; } if (diswsi(chan, tid) != DIS_SUCCESS) { rc = TM_ESYSTEM; goto tm_subscribe_cleanup; } if (diswst(chan, name) != DIS_SUCCESS) { rc = TM_ESYSTEM; goto tm_subscribe_cleanup; } DIS_tcp_wflush(chan); ihold = (struct infohold *)calloc(1, sizeof(struct infohold)); assert(ihold != NULL); ihold->info = info; ihold->len = len; ihold->info_len = info_len; add_event(*event, tp->t_node, TM_GETINFO, (void *)ihold); tm_subscribe_cleanup: if (chan != NULL) DIS_tcp_cleanup(chan); return rc; } /* ** tm_finalize() - close out task manager interface ** ** This function should be the last one called. It is illegal to call ** any other task manager function following this one. All events are ** freed and any connection to the task manager (pbs_mom) is closed. ** This call is synchronous. */ int tm_finalize(void) { event_info *e; int i = 0; if (!init_done) return TM_BADINIT; while (event_count && (i < EVENT_HASH)) { while ((e = event_hash[i]) != NULL) { del_event(e); } ++i; /* check next slot in hash table */ } init_done = 0; return TM_SUCCESS; /* what else */ } /* ** tm_notify() - set the signal to be sent on event arrival. */ int tm_notify(int tm_signal) { if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED; } /* ** tm_alloc() - make a request for additional resources. */ int tm_alloc(char *resources, tm_event_t *event) { if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED; } /* ** tm_dealloc() - drop a node from the job. */ int tm_dealloc(tm_node_id node, tm_event_t *event) { if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED; } /* ** tm_create_event() - create a persistent event. */ int tm_create_event(tm_event_t *event) { if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED; } /* ** tm_destroy_event() - destroy a persistent event. */ int tm_destroy_event(tm_event_t *event) { if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED; } /* ** tm_register() - link a persistent event with action requests ** from the task manager. */ int tm_register(tm_whattodo_t *what, tm_event_t *event) { if (!init_done) return TM_BADINIT; return TM_ENOTIMPLEMENTED; } #define FOREVER 2592000 /* ** tm_poll - poll to see if an event has been completed. ** ** If "poll_event" is a valid event handle, see if it is completed; ** else if "poll_event" is the null event, check for the first event that ** is completed. ** ** result_event is set to the completed event or the null event. ** ** If wait is non_zero, wait for "poll_event" to be completed. ** ** If an error ocurs, set tm_errno non-zero. */ int tm_poll( tm_event_t poll_event, tm_event_t *result_event, int wait, int *tm_errno) { int num, i; int ret, mtype, nnodes; int prot, protver; int *obitvalp; event_info *ep = NULL; tm_task_id tid, *tidp; tm_event_t nevent; tm_node_id node; char *jobid = NULL; char *info = NULL; struct tm_roots *roots; struct taskhold *thold; struct infohold *ihold; struct reschold *rhold; extern time_t pbs_tcp_timeout; if (!init_done) { return(TM_BADINIT); } if (result_event == NULL) return(TM_EBADENVIRONMENT); *result_event = TM_ERROR_EVENT; if (poll_event != TM_NULL_EVENT) return(TM_ENOTIMPLEMENTED); if (tm_errno == NULL) return(TM_EBADENVIRONMENT); if (event_count == 0) { TM_DBPRT(("%s: no events waiting\n", __func__)) return(TM_ENOTFOUND); } if (local_conn < 0) { TM_DBPRT(("%s: INTERNAL ERROR %d events but no connection (%d)\n", __func__, event_count, local_conn)) if (static_chan != NULL) { DIS_tcp_cleanup(static_chan); static_chan = NULL; } return(TM_ENOTCONNECTED); } if ((static_chan == NULL) && ((static_chan = DIS_tcp_setup(local_conn)) == NULL)) { TM_DBPRT(("%s: Error allocating memory for sock buffer %d", __func__, PBSE_MEM_MALLOC)) return TM_BADINIT; } /* ** Setup tcp dis routines with a wait value appropriate for ** the value of wait the user set. */ pbs_tcp_timeout = wait ? FOREVER : 1; prot = disrsi(static_chan, &ret); if (ret == DIS_EOD) { *result_event = TM_NULL_EVENT; DIS_tcp_cleanup(static_chan); static_chan = NULL; return TM_SUCCESS; } else if (ret != DIS_SUCCESS) { TM_DBPRT(("%s: protocol number dis error %d\n", __func__, ret)) goto tm_poll_error; } if (prot != TM_PROTOCOL) { TM_DBPRT(("%s: bad protocol number %d\n", __func__, prot)) goto tm_poll_error; } /* ** We have seen the start of a message. Set the timeout value ** so we wait for the remaining data of a message. */ pbs_tcp_timeout = FOREVER; protver = disrsi(static_chan, &ret); if (ret != DIS_SUCCESS) { TM_DBPRT(("%s: protocol version dis error %d\n", __func__, ret)) goto tm_poll_error; } if (protver != TM_PROTOCOL_VER) { TM_DBPRT(("%s: bad protocol version %d\n", __func__, protver)) goto tm_poll_error; } mtype = disrsi(static_chan, &ret); if (ret != DIS_SUCCESS) { TM_DBPRT(("%s: mtype dis error %d\n", __func__, ret)) goto tm_poll_error; } nevent = disrsi(static_chan, &ret); if (ret != DIS_SUCCESS) { TM_DBPRT(("%s: event dis error %d\n", __func__, ret)) goto tm_poll_error; } *result_event = nevent; TM_DBPRT(("%s: got event %d return %d\n", __func__, nevent, mtype)) if ((ep = find_event(nevent)) == NULL) { TM_DBPRT(("%s: No event found for number %d\n", __func__, nevent)); DIS_tcp_close(static_chan); static_chan = NULL; local_conn = -1; return TM_ENOEVENT; } if (mtype == TM_ERROR) /* problem, read error num */ { *tm_errno = disrsi(static_chan, &ret); TM_DBPRT(("%s: event %d error %d\n", __func__, nevent, *tm_errno)); goto tm_poll_done; } *tm_errno = TM_SUCCESS; switch (ep->e_mtype) { /* ** auxiliary info ( ** number of nodes int; ** nodeid[0] int; ** ... ** nodeid[n-1] int; ** parent jobid string; ** parent nodeid int; ** parent taskid int; ** ) */ case TM_INIT: nnodes = disrsi(static_chan, &ret); if (ret != DIS_SUCCESS) { TM_DBPRT(("%s: INIT failed nnodes\n", __func__)) goto tm_poll_error; } node_table = (tm_node_id *)calloc(nnodes + 1, sizeof(tm_node_id)); if (node_table == NULL) { perror("Memory allocation failed"); goto tm_poll_error; } TM_DBPRT(("%s: INIT nodes %d\n", __func__, nnodes)) for (i = 0; i < nnodes; i++) { node_table[i] = disrsi(static_chan, &ret); if (ret != DIS_SUCCESS) { TM_DBPRT(("%s: INIT failed nodeid %d\n", __func__, i)) goto tm_poll_error; } } node_table[nnodes] = TM_ERROR_NODE; jobid = disrst(static_chan, &ret); if (ret != DIS_SUCCESS) { TM_DBPRT(("%s: INIT failed jobid\n", __func__)) goto tm_poll_error; } TM_DBPRT(("%s: INIT daddy jobid %s\n", __func__, jobid)) node = disrsi(static_chan, &ret); if (ret != DIS_SUCCESS) { TM_DBPRT(("%s: INIT failed parent nodeid\n", __func__)) goto tm_poll_error; } TM_DBPRT(("%s: INIT daddy node %d\n", __func__, node)) tid = disrsi(static_chan, &ret); if (ret != DIS_SUCCESS) { TM_DBPRT(("%s: INIT failed parent taskid\n", __func__)) goto tm_poll_error; } TM_DBPRT(("%s: INIT daddy tid %lu\n", __func__, (unsigned long)tid)) roots = (struct tm_roots *)ep->e_info; roots->tm_parent = new_task(jobid, node, tid); roots->tm_me = new_task(tm_jobid, tm_jobndid, tm_jobtid); roots->tm_nnodes = nnodes; roots->tm_ntasks = 0; /* TODO */ roots->tm_taskpoolid = -1; /* what? */ roots->tm_tasklist = NULL; /* TODO */ break; case TM_TASKS: thold = (struct taskhold *)ep->e_info; tidp = thold->list; num = thold->size; for (i = 0;; i++) { tid = disrsi(static_chan, &ret); if (tid == TM_NULL_TASK) break; if (ret != DIS_SUCCESS) goto tm_poll_error; if (i < num) { tidp[i] = new_task(tm_jobid, ep->e_node, tid); } } if (i < num) tidp[i] = TM_NULL_TASK; *(thold->ntasks) = i; break; case TM_SPAWN: tid = disrsi(static_chan, &ret); if (ret != DIS_SUCCESS) { TM_DBPRT(("%s: SPAWN failed tid\n", __func__)) goto tm_poll_error; } tidp = (tm_task_id *)ep->e_info; *tidp = new_task(tm_jobid, ep->e_node, tid); break; case TM_SIGNAL: break; case TM_OBIT: obitvalp = (int *)ep->e_info; *obitvalp = disrsi(static_chan, &ret); if (ret != DIS_SUCCESS) { TM_DBPRT(("%s: OBIT failed obitval\n", __func__)) goto tm_poll_error; } break; case TM_POSTINFO: break; case TM_GETINFO: ihold = (struct infohold *)ep->e_info; info = disrcs(static_chan, (size_t *)ihold->info_len, &ret); if (ret != DIS_SUCCESS) { if (info != NULL) free(info); TM_DBPRT(("%s: GETINFO failed info\n", __func__)) break; } memcpy(ihold->info, info, MIN(*ihold->info_len, ihold->len)); free(info); break; case TM_RESOURCES: rhold = (struct reschold *)ep->e_info; info = disrst(static_chan, &ret); if (ret != DIS_SUCCESS) { if (info != NULL) free(info); break; } snprintf(rhold->resc, rhold->len, "%s", info); free(info); break; default: TM_DBPRT(("%s: unknown event command %d\n", __func__, ep->e_mtype)) goto tm_poll_error; } DIS_tcp_wflush(static_chan); tm_poll_done: if (jobid != NULL) free(jobid); del_event(ep); if (tcp_chan_has_data(static_chan) == FALSE) { DIS_tcp_cleanup(static_chan); static_chan = NULL; } return TM_SUCCESS; tm_poll_error: if (jobid != NULL) free(jobid); if (ep) del_event(ep); close(local_conn); DIS_tcp_cleanup(static_chan); static_chan = NULL; local_conn = -1; return TM_ENOTCONNECTED; } /* * tm_adopt() -- * * When PBS is used in conjuction with an alternative (MPI) task * spawning/management system (AMS) (like Quadrics RMS or SGI array * services), only the script task on the mother superior node will * be parented by (or even known to) a PBS MOM. Unless the AMS is * PBS-(tm-)aware, all other tasks will be parented (and to varying * extents managed) by the AMS. This means that PBS cannot track * task resource usage (unless the AMS provides such info) nor * manage (suspend, resume, signal, clean up, ...) the task (unless * the AMS provides such functionality). For example pvmrun and * some mpiruns simply use rsh to start remote processes - no AMS * tracking or management facilities are available. * * This function allows any task (session) to be adopted into a PBS * job. It is used by: * - "adopter" (which is in turn used by our pvmrun) * - our rmsloader wrapper (a home-brew replacement for RMS' * rmsloader that does some work and then exec()s the real * rmsloader) to tell PBS to adopt its session id (which * (hopefully) is also the session id for all its child * processes). * - anumpirun on SGI Altix systems * * Call this instead of tm_init() to ask the local pbs_mom to * adopt a session (i.e. create a new task corresponding to the * session id). Note that this may subvert all of the cookie stuff * in PBS as the AMS task starter may not have any PBS cookie info * (eg rmsloader) * * Arguments: * char *id AMS altid (eg RMS resource id) or PBS_JOBID * (depending on adoptCmd) of the job that will adopt * sid. This is how pbs_mom works out which job will * adopt the sid. * int adoptCmd either TM_ADOPT_JOBID or TM_ADOPT_ALTID if task * id is AMS altid * pid_t pid process id of process to be adopted (always self?) * * Assumption: * If TM_ADOPT_ALTID is used to identify tasks to be adopted, PBS * must be configured to work with one and only one alternative task * spawning/management system that uses it own task identifiers. * * Result: * Returns TM_SUCCESS if the session was successfully adopted by * the mom. Returns TM_ENOTFOUND if the mom couldn't find a job * with the given RMS resource id. Returns TM_ESYSTEM or * TM_ENOTCONNECTED if there was some sort of comms error talking * to the mom * * Side effects: * Sets the tm_* globals to fake values if tm_init() has never * been called. This mainly just prevents segfaults etc when * these values are written to local_conn - the mom ignores most * of them for this special adopt case * */ int tm_adopt( char *id, int adoptCmd, pid_t pid) { int rc = TM_SUCCESS; int status, ret; pid_t sid; char *env; struct tcp_chan *chan = NULL; sid = getsid(pid); /* Must be the only call to call to tm and must only be called once */ if (init_done) return TM_BADINIT; init_done = 1; /* Fabricate the tm state as best we can - not really needed */ if ((tm_jobid = getenv("PBS_JOBID")) == NULL) tm_jobid = (char *)"ADOPT JOB"; tm_jobid_len = strlen(tm_jobid); if ((tm_jobcookie = getenv("PBS_JOBCOOKIE")) == NULL) tm_jobcookie = (char *)"ADOPT COOKIE"; tm_jobcookie_len = strlen(tm_jobcookie); /* We dont have the (right) node id or task id */ tm_jobndid = 0; tm_jobtid = 0; /* Fallback is system default MOM port if not known */ if ((env = getenv("PBS_MOMPORT")) == NULL || (tm_momport = atoi(env)) == 0) tm_momport = PBS_MANAGER_SERVICE_PORT; /* DJH 27 Feb 2002. two kinds of adoption now */ if (adoptCmd != TM_ADOPT_ALTID && adoptCmd != TM_ADOPT_JOBID) return TM_EUNKNOWNCMD; if (startcom(adoptCmd, TM_NULL_EVENT, &chan) != DIS_SUCCESS) return TM_ESYSTEM; /* send session id */ if (diswsi(chan, sid) != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_adopt_cleanup; } /* write the pid so the adopted process can be part of the cpuset if needed */ if (diswsi(chan, pid) != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_adopt_cleanup; } /* send job or alternative id */ if (diswcs(chan, id, strlen(id)) != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_adopt_cleanup; } DIS_tcp_wflush(chan); /* The mom should now attempt to adopt the task and will send back a status flag to indicate whether it was successful or not. */ status = disrsi(chan, &ret); if (ret != DIS_SUCCESS) { rc = TM_ENOTCONNECTED; goto tm_adopt_cleanup; } /* Don't allow any more tm_* calls in this process. As well as closing an unused socket it also prevents any problems related to the fact that all adopted processes have a fake task id which might break the tm mechanism */ tm_finalize(); /* Since we're not using events, tm_finalize won't actually close the socket, so do it here. */ if (local_conn > -1) { close(local_conn); local_conn = -1; } DIS_tcp_cleanup(chan); return (status == TM_OKAY ? TM_SUCCESS : TM_ENOTFOUND); tm_adopt_cleanup: if (chan != NULL) DIS_tcp_cleanup(chan); return rc; } #ifdef __cplusplus } #endif