/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ #include /* the master config generated by configure */ #include #include #include #include #include #include #include #include #include #include #include #include #include #if defined(NTOHL_NEEDS_ARPA_INET_H) && defined(HAVE_ARPA_INET_H) #include #endif #include "portability.h" #include "libpbs.h" #include "server_limits.h" #include "list_link.h" #include "attribute.h" #include "resource.h" #include "server.h" #include "net_connect.h" #include "batch_request.h" #include "work_task.h" #include "svrfunc.h" #include "pbs_job.h" #include "log.h" #include "pbs_nodes.h" #include "rpp.h" #include "dis.h" #include "dis_init.h" #include "resmon.h" #include "mcom.h" #include "utils.h" #define IS_VALID_STR(STR) (((STR) != NULL) && ((STR)[0] != '\0')) extern void DIS_rpp_reset(void); extern int LOGLEVEL; extern int allow_any_mom; #if !defined(H_ERRNO_DECLARED) && !defined(_AIX) extern int h_errno; #endif int svr_totnodes = 0; /* total number nodes defined */ int svr_clnodes = 0; /* number of cluster nodes */ int svr_tsnodes = 0; /* number of time shared nodes */ int svr_chngNodesfile = 0; /* 1 signals want nodes file update */ int gpu_mode_rqstd = -1; /* default gpu mode requested */ #ifdef NVIDIA_GPUS int gpu_err_reset = FALSE; /* was a gpu errcount reset requested */ #endif /* NVIDIA_GPUS */ /* on server shutdown, (qmgr mods) */ struct pbsnode **pbsndlist = NULL; struct pbsnode **pbsndmast = NULL; static int svr_numnodes = 0; /* number of nodes currently available (global!!! - set in node_spec) */ static int svr_numcfgnodes = 0; /* number of nodes currently configured (global!!! - set in node_spec) */ static int exclusive; /* node allocation type */ static FILE *nstatef = NULL; static int num_addrnote_tasks = 0; /* number of outstanding send_cluster_addrs tasks */ extern int server_init_type; extern int has_nodes; extern time_t time_now; #ifdef NVIDIA_GPUS extern int create_a_gpusubnode(struct pbsnode *); #endif /* NVIDIA_GPUS */ extern int ctnodes(char *); extern char *path_home; extern char *path_nodes; extern char *path_nodes_new; extern char *path_nodestate; extern char *path_nodenote; extern char *path_nodenote_new; extern unsigned int pbs_mom_port; extern char server_name[]; extern struct server server; extern tlist_head svr_newnodes; extern attribute_def node_attr_def[]; /* node attributes defs */ extern int SvrNodeCt; #define SKIP_NONE 0 #define SKIP_EXCLUSIVE 1 #define SKIP_ANYINUSE 2 #define SKIP_NONE_REUSE 3 #ifndef MAX_BM #define MAX_BM 64 #endif int hasprop(struct pbsnode *, struct prop *); void send_cluster_addrs(struct work_task *); int add_cluster_addrs(int); int is_compose(int, int); int add_job_to_node(struct pbsnode *,struct pbssubn *,short,job *,int); int node_satisfies_request(struct pbsnode *,char *); int reserve_node(struct pbsnode *,short,job *,char *,struct howl **); int build_host_list(struct howl **,struct pbssubn *,struct pbsnode *); int procs_available(int proc_ct); #ifdef NVIDIA_GPUS int gpu_entry_by_id(struct pbsnode *,char *, int); #endif /* NVIDIA_GPUS */ /* GBS - I put this in since it's used in the server to mom communication for resource manager information. The server opens rpp sockets for pinging. I just used those for the resource manager queries. */ static void funcs_dis(void) /* The equivalent of DIS_tcp_funcs() */ { if (dis_getc != rpp_getc) { dis_getc = (int (*)(int))rpp_getc; dis_puts = (int (*)(int, const char *, size_t))rpp_write; dis_gets = (int (*)(int, char *, size_t))rpp_read; disr_commit = (int (*)(int, int))rpp_rcommit; disw_commit = (int (*)(int, int))rpp_wcommit; } return; } /*#define setup_dis(x) funcs_dis() */ /* RPP doesn't need reset */ /*#define close_dis(x) rpp_close(x) */ /*#define flush_dis(x) rpp_flush(x) */ /** ** Modified by Tom Proett for PBS. */ tree *ipaddrs = NULL; /* tree of ip addrs */ tree *streams = NULL; /* tree of stream numbers */ /** * specialized version of tfind for looking in the ipadders tree * @param key - the node we are searching for * @return a pointer to the pbsnode */ struct pbsnode *tfind_addr( const u_long key) { return tfind(key, &ipaddrs); } /* update_node_state - central location for updating node state */ /* NOTE: called each time a node is marked down, each time a MOM reports node */ /* status, and when pbs_server sends hello/cluster_addrs */ void update_node_state( struct pbsnode *np, /* I (modified) */ int newstate) /* I (one of INUSE_*) */ { char *id = "update_node_state"; struct pbssubn *sp; #ifdef ALT_CLSTR_ADDR int ret; int send_addrs = FALSE; #endif /* * LOGLEVEL >= 4 logs all state changes * >= 2 logs down->(busy|free) changes * (busy|free)->down changes are always logged */ if (LOGLEVEL >= 3) { sprintf(log_buffer, "adjusting state for node %s - state=%d, newstate=%d", (np->nd_name != NULL) ? np->nd_name : "NULL", np->nd_state, newstate); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } log_buffer[0] = '\0'; #ifdef ALT_CLSTR_ADDR /* * If coming out of DOWN or UNKNOWN states * then we want to send IS_CLUSTER_ADDRS message */ if ((np->nd_state & INUSE_DOWN) || (np->nd_state & INUSE_UNKNOWN)) { if (!(newstate & INUSE_DOWN)) { send_addrs = TRUE; } } #endif if (newstate & INUSE_DOWN) { if (!(np->nd_state & INUSE_DOWN)) { sprintf(log_buffer, "node %s marked down", (np->nd_name != NULL) ? np->nd_name : "NULL"); np->nd_state |= INUSE_DOWN; np->nd_state &= ~INUSE_UNKNOWN; /* mark all subnodes down */ for (sp = np->nd_psn;sp != NULL;sp = sp->next) { sp->inuse |= INUSE_DOWN; } } /* ignoring the obvious possibility of a "down,busy" node */ } /* END if (newstate & INUSE_DOWN) */ else if (newstate & INUSE_BUSY) { if ((!(np->nd_state & INUSE_BUSY) && (LOGLEVEL >= 4)) || ((np->nd_state & INUSE_DOWN) && (LOGLEVEL >= 2))) { sprintf(log_buffer, "node %s marked busy", (np->nd_name != NULL) ? np->nd_name : "NULL"); } np->nd_state |= INUSE_BUSY; np->nd_state &= ~INUSE_UNKNOWN; if (np->nd_state & INUSE_DOWN) { np->nd_state &= ~INUSE_DOWN; /* clear down on all subnodes */ for (sp = np->nd_psn;sp != NULL;sp = sp->next) { sp->inuse &= ~INUSE_DOWN; } } } /* END else if (newstate & INUSE_BUSY) */ else if (newstate == INUSE_FREE) { if (((np->nd_state & INUSE_DOWN) && (LOGLEVEL >= 2)) || ((np->nd_state & INUSE_BUSY) && (LOGLEVEL >= 4))) { sprintf(log_buffer, "node %s marked free", (np->nd_name != NULL) ? np->nd_name : "NULL"); } np->nd_state &= ~INUSE_BUSY; np->nd_state &= ~INUSE_UNKNOWN; #ifdef BROKENVNODECHECKS if ((np->nd_state & INUSE_JOB) || (np->nd_state & INUSE_JOBSHARE) || (np->nd_nsn != np->nd_nsnfree)) { int snjcount; /* total number of jobs assigned to nodes */ int snjacount; /* number of subnodes with job array associated with them */ int nsn_free; int SNIsAllocated; /* boolean */ struct jobinfo *jp; struct jobinfo *jpprev; char tmpLine[1024]; /* count jobs on all subnodes */ snjcount = 0; snjacount = 0; /* initially set free subnode count to config subnode count */ nsn_free = np->nd_nsn; for (sp = np->nd_psn;sp != NULL;sp = sp->next) { if (sp->jobs != NULL) { SNIsAllocated = 0; /* mark subnode allocated only after job detected */ snjacount++; sp->inuse &= ~(INUSE_JOB | INUSE_JOBSHARE); /* look for and remove duplicate job entries in subnode job list */ jpprev = NULL; for (jp = sp->jobs;jp != NULL;jp = jp->next) { if (jp->job != NULL) { if ((jp->job->ji_qs.ji_state != JOB_STATE_RUNNING) || (jp->job->ji_qs.ji_substate == JOB_SUBSTATE_SUSPEND) || (jp->job->ji_wattr[JOB_ATR_state].at_val.at_char == 'S')) { /* only count suspended and running jobs */ continue; } snjcount++; if (SNIsAllocated == 0) { SNIsAllocated = 1; nsn_free--; } } if ((jpprev != NULL) && (jpprev->job == jp->job)) { /* duplicate job entry detected */ sprintf(tmpLine, "ALERT: duplicate entry for job '%s' detected on node %s (clearing entry)", (jp->job != NULL) ? jp->job->ji_qs.ji_jobid : "???", np->nd_name); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, tmpLine); jpprev->next = jp->next; free(jp); break; } jpprev = jp; } /* END for (jp) */ } /* END if (sp->jobs != NULL) */ } /* END for (sp) */ if (snjcount == 0) { /* node has no jobs but is in allocated state - free subnodes */ np->nd_nsnfree = np->nd_nsn; sprintf(log_buffer, "job allocation released on node %s - node marked free", (np->nd_name != NULL) ? np->nd_name : "NULL"); if (snjacount > 0) { strcat(log_buffer, " - subnode job array is corrupt"); } np->nd_state &= ~(INUSE_JOB | INUSE_JOBSHARE); } else { if (np->nd_nsnfree != nsn_free) { sprintf(log_buffer, "subnode allocation adjusted on node %s (%d -> %d)", (np->nd_name != NULL) ? np->nd_name : "NULL", np->nd_nsnfree, nsn_free); np->nd_nsnfree = nsn_free; /* what is the exact meaning of JOBSHARE? */ np->nd_state &= ~INUSE_JOBSHARE; } else { /* subnode availability values are correct */ if (LOGLEVEL >= 7) { sprintf(log_buffer, "subnode allocation correct on node %s (%d free, %d configured)", (np->nd_name != NULL) ? np->nd_name : "NULL", np->nd_nsnfree, np->nd_nsn); } } if (np->nd_nsnfree > 0) { /* if any sub-nodes are free, job cannot be in job-exclusive */ np->nd_state &= ~INUSE_JOB; if (LOGLEVEL >= 3) { if (log_buffer[0] == '\0') sprintf(log_buffer, "unset job-exclusive state for node %s in state %d (%d free, %d configured)", (np->nd_name != NULL) ? np->nd_name : "NULL", np->nd_state, np->nd_nsnfree, np->nd_nsn); else strcat(log_buffer, "(unset job-exclusive state)"); } } } /* END else (snjcount == 0) */ } /* END if ((np->nd_state & INUSE_JOB) || ...) */ else { /* skipping subnode allocation check */ if (LOGLEVEL >= 7) { sprintf(log_buffer, "skipping subnode allocation test for node %s in state %d (%d free, %d configured)\n", (np->nd_name != NULL) ? np->nd_name : "NULL", np->nd_state, np->nd_nsnfree, np->nd_nsn); } } #endif /* BROKENVNODECHECKS */ if (np->nd_state & INUSE_DOWN) { np->nd_state &= ~INUSE_DOWN; /* clear down on all subnodes */ for (sp = np->nd_psn;sp != NULL;sp = sp->next) { sp->inuse &= ~INUSE_DOWN; } } } /* END else if (newstate == INUSE_FREE) */ if (newstate & INUSE_UNKNOWN) { np->nd_state |= INUSE_UNKNOWN; } if ((LOGLEVEL >= 2) && (log_buffer[0] != '\0')) { log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } #ifdef ALT_CLSTR_ADDR if (send_addrs) { /* send the cluster addrs */ ret = is_compose(np->nd_stream, IS_CLUSTER_ADDRS); if (ret == DIS_SUCCESS) { ret = add_cluster_addrs(np->nd_stream); } if (ret == DIS_SUCCESS) { ret = rpp_flush(np->nd_stream); } if ((ret == DIS_SUCCESS) && (LOGLEVEL >= 3)) { sprintf(log_buffer, "sent cluster-addrs to node %s\n", np->nd_name); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); } if (ret != DIS_SUCCESS) { /* a DIS write error has occurred */ if (LOGLEVEL >= 1) { DBPRT(("%s: error processing node %s\n", id, np->nd_name)) } sprintf(log_buffer, "%s %d to %s", dis_emsg[ret], errno, np->nd_name); log_err(-1, id, log_buffer); rpp_close(np->nd_stream); update_node_state(np, INUSE_DOWN); } } /* END send_addrs */ #endif return; } /* END update_node_state() */ /* * find_job_by_node - return a job structure by looking for a jobid in a * specific node struct * * probably only useful as a test to see if a job exists on a given node * and it's much faster than find_job() */ job *find_job_by_node( struct pbsnode *pnode, /* I */ char *jobid) /* I */ { struct pbssubn *np; struct jobinfo *jp; struct job *pjob = NULL; char *at; if ((at = strchr(jobid, (int)'@')) != NULL) * at = '\0'; /* strip off @server_name */ /* for each subnode on node ... */ for (np = pnode->nd_psn;np != NULL;np = np->next) { /* for each jobinfo on subnode on node ... */ for (jp = np->jobs;jp != NULL;jp = jp->next) { if ((jp->job != NULL) && (jp->job->ji_qs.ji_jobid != NULL) && (strcmp(jobid, jp->job->ji_qs.ji_jobid) == 0)) { /* desired job located on node */ pjob = jp->job; break; } } } /* END for (np) */ if (at != NULL) *at = '@'; /* restore @server_name */ return(pjob); } /* END find_job_by_node() */ /* * sync_node_jobs() - determine if a MOM has a stale job and possibly delete it * * This function is called every time we get a node stat from the pbs_mom. * * @see is_stat_get() */ void sync_node_jobs( struct pbsnode *np, /* I */ char *jobstring_in) /* I (space delimited list of jobs 'seen' by mom) */ { char *id = "sync_node_jobs"; char *joblist; char *jobidstr; struct batch_request *preq; int conn; struct job *pjob; if ((jobstring_in == NULL) || (!isdigit(*jobstring_in))) { /* NO-OP */ return; } if (np->nd_state & INUSE_DELETED) { /* should never happen */ return; } /* FORMAT [ ]... */ joblist = strdup(jobstring_in); if (joblist == NULL) { /* FAILURE - cannot alloc memory */ sprintf(log_buffer,"cannot alloc memory for %s", jobstring_in); log_err(-1,id,log_buffer); return; } jobidstr = strtok(joblist, " "); while ((jobidstr != NULL) && isdigit(*jobidstr)) { if (strstr(jobidstr, server_name) != NULL) { if (find_job_by_node(np, jobidstr) == NULL) { pjob = find_job(jobidstr); if (pjob != NULL) { /* job exists, but doesn't currently have resources assigned to this node */ /* double check the job struct because we could be in the middle of moving the job around because of data staging, suspend, or rerun */ if (pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str == NULL) { pjob = NULL; } else if (strstr(pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str, np->nd_name) == NULL) { pjob = NULL; } } if (pjob == NULL) { /* job is reported by mom but server has no record of job */ sprintf(log_buffer, "stray job %s found on %s", jobidstr, np->nd_name); log_err(-1, id, log_buffer); /* NOTE: node is actively reporting so should not be deleted and np->nd_addrs[] should not be NULL */ conn = svr_connect( np->nd_addrs[0], pbs_mom_port, process_Dreply, ToServerDIS); if (conn >= 0) { if ((preq = alloc_br(PBS_BATCH_DeleteJob)) == NULL) { log_err(-1, id, "unable to allocate DeleteJob request - big trouble!"); svr_disconnect(conn); } else { strcpy(preq->rq_ind.rq_delete.rq_objname, jobidstr); if (issue_Drequest(conn, preq, release_req, 0) != 0) { /* release_req will free preq and close connection if successful */ free_br(preq); svr_disconnect(conn); } } } DIS_rpp_reset(); } } } jobidstr = strtok(NULL, " "); } /* END while ((jobidstr != NULL) && ...) */ /* SUCCESS */ free(joblist); return; } /* END sync_node_jobs() */ /* * update_job_data() - update job with values passed through "jobdata" * * This function is called every time we get a "jobdata" status from the pbs_mom. * * @see is_stat_get() */ void update_job_data( struct pbsnode *np, /* I */ char *jobstring_in) /* I (changed attributes sent by mom) */ { char *id = "update_job_data"; char *jobdata; char *jobidstr; char *attr_name; char *attr_value; struct job *pjob; if ((jobstring_in == NULL) || (!isdigit(*jobstring_in))) { /* NO-OP */ return; } if (np->nd_state & INUSE_DELETED) { /* should never happen */ return; } /* FORMAT :,... */ jobdata = strdup(jobstring_in); jobidstr = strtok(jobdata, ":"); if ((jobidstr != NULL) && isdigit(*jobidstr)) { if (strstr(jobidstr, server_name) != NULL) { pjob = find_job_by_node(np, jobidstr); if (pjob == NULL) { pjob = find_job(jobidstr); } if (pjob != NULL) { int bad; svrattrl tA; /* job exists, so get the attributes and update them */ attr_name = strtok(NULL, "="); while (attr_name != NULL) { attr_value = strtok(NULL, ","); if (LOGLEVEL >= 9) { sprintf(log_buffer, "Mom sent changed attribute %s value %s for job %s", attr_name, attr_value, pjob->ji_qs.ji_jobid); log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } memset(&tA, 0, sizeof(tA)); tA.al_name = attr_name; tA.al_resc = ""; tA.al_value = attr_value; tA.al_op = SET; modify_job_attr( pjob, &tA, /* I: ATTR_sched_hint - svrattrl */ ATR_DFLAG_MGWR | ATR_DFLAG_SvWR, &bad); attr_name = strtok(NULL, "="); } } else if (pjob == NULL) { /* job is reported by mom but server has no record of job */ sprintf(log_buffer, "stray job %s reported on %s", jobidstr, np->nd_name); log_err(-1, id, log_buffer); } } } free(jobdata); } /* END update_job_data() */ /* * send_cluster_addrs - sends IS_CLUSTER_ADDRS messages to a set of nodes * called from a work task, all nodes will eventually * be sent the current list of IPs. */ void send_cluster_addrs( struct work_task *ptask) { char id[] = "send_cluster_addrs"; static int startcount = 0; struct pbsnode *np; new_node *nnew; int i, ret; num_addrnote_tasks--; if (num_addrnote_tasks > 0) { /* new nodes are still being added... don't bother yet or start over */ DBPRT(("%s: not sending addrs yet, %d tasks exist\n", id, num_addrnote_tasks)); startcount = 0; return; } for (i = startcount;i < svr_totnodes;i++) { if (i - startcount > 50) { /* only ping 50 nodes at a time, ping next batch later */ break; } np = pbsndmast[i]; /* Don't bother with nodes that we don't currently have a connection, * otherwise we'll get bogged down. The skipped nodes will get the * updated info when they reconnect. */ if ((np == NULL) || (np->nd_state & INUSE_DELETED) || (np->nd_stream < 0)) continue; ret = is_compose(np->nd_stream, IS_CLUSTER_ADDRS); if (ret == DIS_SUCCESS) { if (add_cluster_addrs(np->nd_stream) == DIS_SUCCESS) { if (rpp_flush(np->nd_stream) == DIS_SUCCESS) { sprintf(log_buffer, "successful addr to node %s\n", np->nd_name); log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); continue; } } ret = DIS_NOCOMMIT; } /* ping unsuccessful, mark node down, clear stream */ update_node_state(np, INUSE_DOWN); sprintf(log_buffer, "%s %d to %s", dis_emsg[ret], errno, np->nd_name); log_err(-1, id, log_buffer); rpp_close(np->nd_stream); tdelete((u_long)np->nd_stream, &streams); np->nd_stream = -1; } /* END for (i) */ startcount = i; /* only ping nodes once (disable new task) */ if (startcount < svr_totnodes) { /* continue outstanding pings after checking for other requests */ set_task(WORK_Timed, time_now, send_cluster_addrs, NULL); } else { /* all nodes have new addr list, so clear the new nodes */ while ((nnew = (new_node *)GET_NEXT(svr_newnodes)) != NULL) { np = find_nodebyname(nnew->nn_name); if (np != NULL) { np->nd_state &= ~INUSE_OFFLINE; } delete_link(&nnew->nn_link); } /* reset startcount, as we've sent the updates for all servers */ startcount = 0; } } /* END send_cluster_addrs */ /* * setup_notification - Sets up the mechanism for notifying * other members of the server's node * pool that a new node was added manually * via qmgr. Actual notification occurs some * time later through the send_cluster_addrs mechanism */ void setup_notification(char *pname) { struct pbsnode *pnode; new_node *nnew; if (pname != NULL) { pnode = find_nodebyname(pname); assert(pnode != NULL); /* call it offline until after all nodes get the new ipaddr */ pnode->nd_state |= INUSE_OFFLINE; nnew = malloc(sizeof(new_node)); if (nnew == NULL) { return; } CLEAR_LINK(nnew->nn_link); nnew->nn_name = strdup(pname); append_link(&svr_newnodes, &nnew->nn_link, nnew); } set_task( WORK_Timed, time_now + 5, send_cluster_addrs, NULL); num_addrnote_tasks++; return; } int is_stat_get( struct pbsnode *np) /* I (modified) */ { char *id = "is_stat_get"; int stream = np->nd_stream; int rc; char *ret_info; attribute temp; char date_attrib[100]; int msg_error = 0; extern int TConnGetSelectErrno(); extern int TConnGetReadErrno(); if (LOGLEVEL >= 3) { sprintf(log_buffer, "received status from node %s", (np != NULL) ? np->nd_name : "NULL"); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } if (stream < 0) { return(DIS_EOF); } /* * Before filling the "temp" attribute, initialize it. * The second and third parameter to decode_arst are never * used, so just leave them empty. (GBS) */ memset(&temp, 0, sizeof(temp)); memset(date_attrib, 0, 100); rc = DIS_SUCCESS; if (decode_arst(&temp, NULL, NULL, NULL)) { DBPRT(("is_stat_get: cannot initialize attribute\n")); rpp_eom(stream); return(DIS_NOCOMMIT); } funcs_dis(); while (((ret_info = disrst(stream, &rc)) != NULL) && (rc == DIS_SUCCESS)) { /* add the info to the "temp" attribute */ if (decode_arst(&temp, NULL, NULL, ret_info)) { DBPRT(("is_stat_get: cannot add attributes\n")); free_arst(&temp); free(ret_info); rpp_eom(stream); return(DIS_NOCOMMIT); } if (!strncmp(ret_info, "state", 5)) { /* MOM currently never sends multiple states - bad assumption for the future? */ if (!strncmp(ret_info, "state=down", 10)) { update_node_state(np, INUSE_DOWN); } else if (!strncmp(ret_info, "state=busy", 10)) { update_node_state(np, INUSE_BUSY); } else if (!strncmp(ret_info, "state=free", 10)) { update_node_state(np, INUSE_FREE); } else { sprintf(log_buffer, "unknown %s from node %s", ret_info, (np->nd_name != NULL) ? np->nd_name : "NULL"); log_err(-1, id, log_buffer); update_node_state(np, INUSE_UNKNOWN); } } else if(!strncmp(ret_info, "uname", 5) && allow_any_mom) { /* for any mom mode if an address did not succeed at gethostbyaddr it was given the hex value of its ip address */ if(!strncmp(np->nd_name, "0x", 2)) { char *cp; char node_name[PBS_MAXHOSTNAME + 1]; int count; cp = strchr(ret_info, ' '); count = 0; do { cp++; node_name[count] = *cp; count++; }while(*cp != ' ' && count < PBS_MAXHOSTNAME); node_name[count-1] = 0; cp = strdup(node_name); free(np->nd_name); np->nd_name = cp; np->nd_first = init_prop(np->nd_name); np->nd_last = np->nd_first; np->nd_f_st = init_prop(np->nd_name); np->nd_l_st = np->nd_f_st; } } else if (!strncmp(ret_info, "me", 2)) /* shorter str compare than "message" */ { if (!strncmp(ret_info, "message=ERROR", 13)) { msg_error = 1; } } else if (server.sv_attr[(int)SRV_ATR_MomJobSync].at_val.at_long && !strncmp(ret_info, "jobdata=", 8)) { /* update job attributes based on what the MOM gives us */ update_job_data(np, ret_info + strlen("jobdata=")); } else if (server.sv_attr[(int)SRV_ATR_MomJobSync].at_val.at_long && !strncmp(ret_info, "jobs=", 5)) { /* walk job list reported by mom */ sync_node_jobs(np, ret_info + strlen("jobs=")); } else if (server.sv_attr[(int)SRV_ATR_AutoNodeNP].at_val.at_long) { int str_res; str_res = strncmp(ret_info, "ncpus=", 6); if(str_res == 0) { struct attribute nattr; /* first we decode ret_info into nattr... */ if ((node_attr_def + ND_ATR_np)->at_decode(&nattr, ATTR_NODE_np, NULL, ret_info + 6) == 0) { /* ... and if MOM's ncpus is different than our np... */ if (nattr.at_val.at_long != np->nd_nsn) { /* ... then we do the defined magic to create new subnodes */ (node_attr_def + ND_ATR_np)->at_action(&nattr, (void *)np, ATR_ACTION_ALTER); update_nodes_file(); } } } } else if(server.sv_attr[(int)SRV_ATR_NPDefault].at_val.at_long) { struct pbsnode *pnode; int i; long max_np; long nsnfreediff; max_np = server.sv_attr[(int)SRV_ATR_NPDefault].at_val.at_long; for(i = 0; i < svr_totnodes; i++) { pnode = pbsndlist[i]; nsnfreediff = pnode->nd_nsn - pnode->nd_nsnfree; pnode->nd_nsn = max_np; pnode->nd_nsnfree = max_np - nsnfreediff; } } free(ret_info); } /* END while (rc != DIS_EOD) */ /* clear the transmission */ rpp_eom(stream); /* DIS_EOD is the only valid final value of rc, check it */ if (rc != DIS_EOD) { update_node_state(np, INUSE_UNKNOWN); free_arst(&temp); return(rc); } if (msg_error && server.sv_attr[(int)SRV_ATR_DownOnError].at_val.at_long) { update_node_state(np, INUSE_DOWN); } /* it's nice to know when the last update happened */ sprintf(date_attrib, "rectime=%ld", (long)time_now); if (decode_arst(&temp, NULL, NULL, date_attrib)) { DBPRT(("is_stat_get: cannot add date_attrib\n")); update_node_state(np, INUSE_UNKNOWN); free_arst(&temp); return(DIS_NOCOMMIT); } /* insert the information from "temp" into np */ if (node_status_list(&temp, np, ATR_ACTION_ALTER)) { DBPRT(("is_stat_get: cannot set node status list\n")); update_node_state(np, INUSE_UNKNOWN); return(DIS_NOCOMMIT); } /* NOTE: node state adjusted in update_node_state() */ return(DIS_SUCCESS); } /* END is_stat_get() */ #ifdef NVIDIA_GPUS /* * Function to check if there is a job assigned to this gpu */ int count_gpu_jobs( char *mom_node, int gpuid) { job *pjob; extern tlist_head svr_alljobs; char *gpu_str; char *found_str; char tmp_str[PBS_MAXHOSTNAME + 5]; char num_str[6]; int job_count = 0; if ((pjob = (job *)GET_NEXT(svr_alljobs)) != NULL) { for (;pjob != NULL;pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { /* * Does this job have this gpuid assigned? skip non running jobs * if so, return TRUE */ if ((pjob->ji_qs.ji_state == JOB_STATE_RUNNING) && (pjob->ji_wattr[JOB_ATR_exec_gpus].at_flags & ATR_VFLAG_SET) != 0) { gpu_str = pjob->ji_wattr[JOB_ATR_exec_gpus].at_val.at_str; if (gpu_str != NULL) { strcpy (tmp_str, mom_node); strcat (tmp_str, "-gpu/"); sprintf (num_str, "%d", gpuid); strcat (tmp_str, num_str); /* look thru the string and see if it has this host and gpuid. * exec_gpus string should be in format of * -gpu/[+-gpu/...] */ found_str = strstr (gpu_str, tmp_str); if (found_str != NULL) { job_count++; } } } } } /* END for (pjob) */ return(job_count); } #endif /* NVIDIA_GPUS */ #ifdef NVIDIA_GPUS /* * Function to process gpu status messages received from the mom */ int is_gpustat_get( struct pbsnode *np) /* I (modified) */ { char *id = "is_gpustat_get"; int stream = np->nd_stream; int rc; char *ret_info; attribute temp; char *gpuid; int gpuidx = -1; char gpuinfo[2048]; int need_delimiter; int reportedgpucnt = 0; int startgpucnt = 0; int drv_ver; extern int TConnGetSelectErrno(); extern int TConnGetReadErrno(); if (LOGLEVEL >= 3) { sprintf(log_buffer, "received gpu status from node %s", (np != NULL) ? np->nd_name : "NULL"); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } if (stream < 0) { return(DIS_EOF); } /* save current gpu count for node */ startgpucnt = np->nd_ngpus; /* * Before filling the "temp" attribute, initialize it. * The second and third parameter to decode_arst are never * used, so just leave them empty. (GBS) */ memset(&temp, 0, sizeof(temp)); memset(gpuinfo, 0, 2048); rc = DIS_SUCCESS; if (decode_arst(&temp, NULL, NULL, NULL)) { DBPRT(("is_stat_get: cannot initialize attribute\n")); rpp_eom(stream); return(DIS_NOCOMMIT); } funcs_dis(); while (((ret_info = disrst(stream, &rc)) != NULL) && (rc == DIS_SUCCESS)) { /* add the info to the "temp" attribute */ /* get timestamp */ if(!strncmp(ret_info, "timestamp=", 10)) { if (decode_arst(&temp, NULL, NULL, ret_info)) { DBPRT(("is_gpustat_get: cannot add attributes\n")); free_arst(&temp); free(ret_info); rpp_eom(stream); return(DIS_NOCOMMIT); } free(ret_info); continue; } /* get driver version, if there is one */ if(!strncmp(ret_info, "driver_ver=", 11)) { if (decode_arst(&temp, NULL, NULL, ret_info)) { DBPRT(("is_gpustat_get: cannot add attributes\n")); free_arst(&temp); free(ret_info); rpp_eom(stream); return(DIS_NOCOMMIT); } drv_ver = atoi(ret_info + 11); free(ret_info); continue; } /* gpuid must come before the rest or we will be in trouble */ if(!strncmp(ret_info, "gpuid=", 6)) { if (strlen(gpuinfo) > 0) { if (decode_arst(&temp, NULL, NULL, gpuinfo)) { DBPRT(("is_gpustat_get: cannot add attributes\n")); free_arst(&temp); free(ret_info); rpp_eom(stream); return(DIS_NOCOMMIT); } memset(gpuinfo, 0, 2048); } gpuid = &ret_info[6]; /* * Get this gpus index, if it does not yet exist then find an empty entry. * We need to allow for the gpu status results being returned in * different orders since the nvidia order may change upon mom's reboot */ gpuidx = gpu_entry_by_id(np, gpuid, TRUE); if (gpuidx == -1) { /* * Failure - we could not get / create a nd_gpusn entry for this gpu, * log an error message. */ if (LOGLEVEL >= 3) { sprintf (log_buffer, "Failed to get/create entry for gpu %s on node %s\n", gpuid, np->nd_name); log_ext(-1, id, log_buffer, LOG_DEBUG); } free_arst(&temp); free(ret_info); return(DIS_SUCCESS); } sprintf(gpuinfo, "gpu[%d]=gpu_id=%s;", gpuidx, gpuid); need_delimiter = FALSE; reportedgpucnt++; np->nd_gpusn[gpuidx].driver_ver = drv_ver; /* mark that this gpu node is not virtual */ np->nd_gpus_real = TRUE; /* * if we have not filled in the gpu_id returned by the mom node * then fill it in */ if ((gpuidx >= 0) && (np->nd_gpusn[gpuidx].gpuid == NULL)) { np->nd_gpusn[gpuidx].gpuid = strdup(gpuid); } } else { if (need_delimiter) { strcat(gpuinfo, ";"); } strcat(gpuinfo, ret_info); need_delimiter = TRUE; } /* check current gpu mode and determine gpu state */ if (!memcmp(ret_info, "gpu_mode=", 9)) { if ((!memcmp(ret_info+9, "Normal", 6)) || (!memcmp(ret_info+9, "Default", 7))) { np->nd_gpusn[gpuidx].mode = gpu_normal; if (count_gpu_jobs(np->nd_name, gpuidx) > 0) { np->nd_gpusn[gpuidx].state = gpu_shared; } else { np->nd_gpusn[gpuidx].inuse = 0; np->nd_gpusn[gpuidx].state = gpu_unallocated; } } else if ((!memcmp(ret_info+9, "Exclusive", 9)) || (!memcmp(ret_info+9, "Exclusive_Thread", 16))) { np->nd_gpusn[gpuidx].mode = gpu_exclusive_thread; if (count_gpu_jobs(np->nd_name, gpuidx) > 0) { np->nd_gpusn[gpuidx].state = gpu_exclusive; } else { np->nd_gpusn[gpuidx].inuse = 0; np->nd_gpusn[gpuidx].state = gpu_unallocated; } } else if (!memcmp(ret_info+9, "Exclusive_Process", 17)) { np->nd_gpusn[gpuidx].mode = gpu_exclusive_process; if (count_gpu_jobs(np->nd_name, gpuidx) > 0) { np->nd_gpusn[gpuidx].state = gpu_exclusive; } else { np->nd_gpusn[gpuidx].inuse = 0; np->nd_gpusn[gpuidx].state = gpu_unallocated; } } else if (!memcmp(ret_info+9, "Prohibited", 10)) { np->nd_gpusn[gpuidx].mode = gpu_prohibited; np->nd_gpusn[gpuidx].state = gpu_unavailable; } else { /* unknown mode, default to prohibited */ np->nd_gpusn[gpuidx].mode = gpu_prohibited; np->nd_gpusn[gpuidx].state = gpu_unavailable; if (LOGLEVEL >= 3) { sprintf(log_buffer, "GPU %s has unknown mode on node %s", gpuid, np->nd_name); log_ext(-1, id, log_buffer, LOG_DEBUG); } } /* add gpu_mode so it gets added to the attribute */ if (need_delimiter) { strcat(gpuinfo, ";"); } switch (np->nd_gpusn[gpuidx].state) { case gpu_unallocated: strcat (gpuinfo, "gpu_state=Unallocated"); break; case gpu_shared: strcat (gpuinfo, "gpu_state=Shared"); break; case gpu_exclusive: strcat (gpuinfo, "gpu_state=Exclusive"); break; case gpu_unavailable: strcat (gpuinfo, "gpu_state=Unavailable"); break; } } free(ret_info); } /* end of while disrst */ if (strlen(gpuinfo) > 0) { if (decode_arst(&temp, NULL, NULL, gpuinfo)) { DBPRT(("is_gpustat_get: cannot add attributes\n")); free_arst(&temp); rpp_eom(stream); return(DIS_NOCOMMIT); } } /* maintain the gpu count, if it has changed we need to update the nodes file */ if (reportedgpucnt != startgpucnt) { np->nd_ngpus = reportedgpucnt; /* update the nodes file */ update_nodes_file(); } node_gpustatus_list(&temp, np, ATR_ACTION_ALTER); return(DIS_SUCCESS); } /* END is_gpustat_get() */ #endif /* NVIDIA_GPUS */ /* ** Start a standard inter-server message. */ int is_compose( int stream, /* I */ int command) /* I */ { int ret; if (stream < 0) { return(DIS_EOF); } DIS_rpp_reset(); ret = diswsi(stream, IS_PROTOCOL); if (ret != DIS_SUCCESS) goto done; ret = diswsi(stream, IS_PROTOCOL_VER); if (ret != DIS_SUCCESS) goto done; ret = diswsi(stream, command); if (ret != DIS_SUCCESS) goto done; return(DIS_SUCCESS); done: DBPRT(("is_compose: send error %s\n", dis_emsg[ret])) return(ret); } /* END is_compose() */ /* EOF on a stream received (either stream or addr must be specified) */ /* mark node down and remove associated streams */ /* NOTE: pass in stream = -1 if you wish the stream to be optional */ void stream_eof( int stream, /* I (optional) */ u_long addr, /* I (optional) */ int ret) /* I (ignored) */ { static char id[] = "stream_eof"; struct pbsnode *np; rpp_close(stream); np = NULL; if (stream >= 0) { /* find who the stream belongs to and mark down */ np = tfind((u_long)stream, &streams); } if ((np == NULL) && (addr != 0)) { np = tfind((u_long)addr, &ipaddrs); } if (np == NULL) { /* cannot locate node */ return; } sprintf(log_buffer, "connection to %s is bad, remote service may be down, message may be corrupt, or connection may have been dropped remotely (%s). setting node state to down", np->nd_name, dis_emsg[ret]); log_err(-1, id, log_buffer); /* mark node and all subnodes as down */ update_node_state(np, INUSE_DOWN); /* remove stream from list of valid connections */ if (np->nd_stream >= 0) { tdelete((u_long)np->nd_stream, &streams); np->nd_stream = -1; } return; } /* END stream_eof() */ /* * Send a ping to any node that is in an unknown state. * If wt_parm1 is NULL, set up a worktask to ping again. * * This shouldn't be called anymore... * This is mostly only used for opening the socket * connection to the node. */ #define TNODE_PINGCOUNT 256 #define TNODE_PINGRETRYTIME 3 void ping_nodes( struct work_task *ptask) /* I (optional) */ { static char *id = "ping_nodes"; struct pbsnode *np; struct sockaddr_in *addr; int i, ret, com; extern int pbs_rm_port; static int startcount = 0; extern int RPPConfigure(int, int); extern int RPPReset(void); if (LOGLEVEL >= 6) { log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, "starting"); } sprintf(log_buffer, "ping attempting to contact %d nodes", (svr_totnodes - startcount > TNODE_PINGCOUNT) ? TNODE_PINGCOUNT : (svr_totnodes - startcount < 0) ? svr_totnodes : svr_totnodes - startcount); /* phew! */ log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); /* change RPP to report node state quickly */ RPPConfigure(2, 2); /* (timeout,retry) retry must be at least 2 */ for (i = startcount;i < svr_totnodes;i++) { if (i - startcount > TNODE_PINGCOUNT) { /* only ping TNODE_PINGCOUNT nodes at a time, ping next batch later */ break; } np = pbsndmast[i]; if (np->nd_state & (INUSE_DELETED | INUSE_OFFLINE)) continue; if ((np->nd_state & INUSE_NEEDS_HELLO_PING) == 0) continue; if (np->nd_stream < 0) { /* nodes are down until proven otherwise */ update_node_state(np, INUSE_DOWN); /* open new stream */ np->nd_stream = rpp_open(np->nd_name, pbs_rm_port, NULL); if (np->nd_stream == -1) { sprintf(log_buffer, "rpp_open to %s", np->nd_name); log_err(errno, id, log_buffer); continue; } tinsert((u_long)np->nd_stream, np, &streams); } /* END if (np->nd_stream < 0) */ if (LOGLEVEL >= 6) { sprintf(log_buffer, "sending ping to %s (new stream %d)", np->nd_name, np->nd_stream); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } /* nodes are down until proven otherwise */ com = IS_HELLO; ret = is_compose(np->nd_stream, com); if (ret == DIS_SUCCESS) { if (rpp_flush(np->nd_stream) == 0) { sprintf(log_buffer, "successful ping to node %s (stream %d)", np->nd_name, np->nd_stream); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); continue; } ret = DIS_NOCOMMIT; } /* ping unsuccessful, mark node down, clear stream */ update_node_state(np, INUSE_DOWN); addr = rpp_getaddr(np->nd_stream); sprintf(log_buffer, "%s %d to %s(%s)", dis_emsg[ret], errno, np->nd_name, netaddr(addr)); log_err(-1, id, log_buffer); rpp_close(np->nd_stream); tdelete((u_long)np->nd_stream, &streams); np->nd_stream = -1; } /* END for (i) */ RPPReset(); startcount = i; /* only ping nodes once (disable new task) */ if (startcount < svr_totnodes) { /* continue outstanding pings in TNODE_PINGRETRYTIME seconds */ set_task(WORK_Timed, time_now + TNODE_PINGRETRYTIME, ping_nodes, NULL); } return; } /* END ping_nodes() */ /* add_cluster_addrs - add the IPaddr of every node to the stream */ int add_cluster_addrs( int stream) /* I */ { char id[] = "add_cluster_addrs"; int i, j, ret; struct pbsnode *np; /* should we cache this response and send it as a single string? */ for (i = 0;i < svr_totnodes;i++) { np = pbsndmast[i]; if (np->nd_state & INUSE_DELETED) continue; if (LOGLEVEL == 7) /* higher loglevel gets more info below */ { sprintf(log_buffer, "adding node[%d] %s to hello response", i, np->nd_name); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } for (j = 0;np->nd_addrs[j];j++) { u_long ipaddr = np->nd_addrs[j]; if (LOGLEVEL >= 8) { sprintf(log_buffer, "adding node[%d] interface[%d] %s to hello response", i, j, netaddr_pbs_net_t(ipaddr)); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } ret = diswul(stream, ipaddr); if (ret != DIS_SUCCESS) { /* FAILURE */ return(ret); } } /* END for (j) */ } /* END for (i) */ return(DIS_SUCCESS); } /* END add_cluster_addrs */ /* ** Mark any nodes that haven't checked in as down. ** This should be used rather than the ping_nodes task. If ** the node isn't down then it checks to see that the ** last update hasn't been too long ago. */ void check_nodes( struct work_task *ptask) /* I (modified) */ { static char id[] = "check_nodes"; struct pbsnode *np; int i, chk_len; /* load min refresh interval */ chk_len = server.sv_attr[(int)SRV_ATR_check_rate].at_val.at_long; if (LOGLEVEL >= 5) { sprintf(log_buffer, "verifying nodes are active (min_refresh = %d seconds)", chk_len); log_event( PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } /* evaluate all nodes */ for (i = 0;i < svr_totnodes;i++) { np = pbsndmast[i]; if (np->nd_state & (INUSE_DELETED | INUSE_DOWN)) continue; if (np->nd_lastupdate < (time_now - chk_len)) { if (LOGLEVEL >= 0) { sprintf(log_buffer, "node %s not detected in %ld seconds, marking node down", np->nd_name, (long int)(time_now - np->nd_lastupdate)); log_event( PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } update_node_state(np, (INUSE_DOWN)); } } /* END for (i = 0) */ if (ptask->wt_parm1 == NULL) { set_task( WORK_Timed, time_now + chk_len, check_nodes, NULL); } return; } /* END check_nodes() */ #ifdef NVIDIA_GPUS /* ** reset gpu data in case mom reconnects with changed gpus. ** If we have real gpus, not virtual ones, then clear out gpu_status, ** gpus count and remove gpu subnodes. */ void clear_nvidia_gpus( struct pbsnode *np) /* I */ { static char id[] = "clear_nvidia_gpus"; attribute temp; if ((np->nd_gpus_real) && (np->nd_ngpus > 0)) { /* delete gpusubnodes by freeing it */ free(np->nd_gpusn); np->nd_gpusn = NULL; /* reset # of gpus, etc */ np->nd_ngpus = 0; np->nd_ngpus_free = 0; /* unset "gpu_status" node attribute */ memset(&temp, 0, sizeof(temp)); if (decode_arst(&temp, NULL, NULL, NULL)) { log_err(-1, id, "clear_nvidia_gpus: cannot initialize attribute\n"); return; } node_gpustatus_list(&temp, np, ATR_ACTION_ALTER); } return; } /* END clear_nvidia_gpus() */ #endif /* NVIDIA_GPUS */ /* sync w/#define IS_XXX */ const char *PBSServerCmds2[] = { "NULL", "HELLO", "CLUSTER_ADDRS", "UPDATE", "STATUS", "GPU_STATUS", NULL }; /* * Input is coming from the pbs_mom over a DIS rpp stream. * Read the stream to get a Inter-Server request. */ void is_request( int stream, /* I */ int version, /* I */ int *cmdp) /* O (optional) */ { static char id[] = "is_request"; int command = 0; int ret = DIS_SUCCESS; int i, err; char nodename[PBS_MAXHOSTNAME]; int perm = ATR_DFLAG_MGRD | ATR_DFLAG_MGWR; struct hostent *hp; unsigned long ipaddr; unsigned long tmpaddr; struct sockaddr_in *addr; struct pbsnode *node; struct pbssubn *sp; if (cmdp != NULL) *cmdp = 0; if (LOGLEVEL >= 4) { sprintf(log_buffer, "message received from stream %d (version %d)", stream, version); log_event( PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } addr = rpp_getaddr(stream); if (version != IS_PROTOCOL_VER) { sprintf(log_buffer, "protocol version %d unknown from %s", version, netaddr(addr)); log_err(-1, id, log_buffer); rpp_close(stream); return; } /* check that machine is known */ if (LOGLEVEL >= 3) { sprintf(log_buffer, "message received from stream %s", netaddr(addr)); log_event( PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } if ((node = tfind((u_long)stream, &streams)) != NULL) goto found; ipaddr = ntohl(addr->sin_addr.s_addr); if ((node = tfind(ipaddr, &ipaddrs)) != NULL) { if (node->nd_stream >= 0) { if (LOGLEVEL >= 3) { sprintf(log_buffer, "stream %d from node %s already open on %d (marking node state 'unknown')", stream, node->nd_name, node->nd_stream); log_event( PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } rpp_close(stream); rpp_close(node->nd_stream); tdelete((u_long)node->nd_stream, &streams); if (node->nd_state & INUSE_OFFLINE) { node->nd_state = (INUSE_UNKNOWN | INUSE_OFFLINE); } else { node->nd_state = INUSE_UNKNOWN; } node->nd_stream = -1; /* do a ping in 5 seconds */ /* set_task(WORK_Timed,time_now + 5, ping_nodes, node); */ return; } /* END if (node->nd_stream >= 0) */ node->nd_stream = stream; tinsert((u_long)stream, node, &streams); goto found; } /* END if ((node = tfind(ipaddr,&ipaddrs)) != NULL) */ else if (allow_any_mom) { { hp = gethostbyaddr((void *)&ipaddr, sizeof(ipaddr), AF_INET); if(hp != NULL) { strncpy(nodename, hp->h_name, PBS_MAXHOSTNAME); err = create_partial_pbs_node(nodename, ipaddr, perm); } else { tmpaddr = ntohl(addr->sin_addr.s_addr); sprintf(nodename, "0x%lX", tmpaddr); err = create_partial_pbs_node(nodename, ipaddr, perm); } if(err == PBSE_NONE) { node = tfind(ipaddr, &ipaddrs); goto found; } } } /* node not listed in trusted ipaddrs list */ sprintf(log_buffer, "bad attempt to connect from %s (address not trusted - check entry in server_priv/nodes)", netaddr(addr)); if (LOGLEVEL >= 2) { log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } log_err(-1, id, log_buffer); rpp_close(stream); return; found: command = disrsi(stream, &ret); if (ret != DIS_SUCCESS) goto err; if (cmdp != NULL) *cmdp = command; if (LOGLEVEL >= 3) { sprintf(log_buffer, "message %s (%d) received from mom on host %s (%s) (stream %d)", PBSServerCmds2[command], command, node->nd_name, netaddr(addr), stream); log_event( PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } switch (command) { case IS_NULL: /* a ping from server */ DBPRT(("%s: IS_NULL\n", id)) break; case IS_HELLO: if (LOGLEVEL >= 1) { sprintf(log_buffer, "HELLO received from %s", node->nd_name); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } #ifdef NVIDIA_GPUS /* reset gpu data in case mom reconnects with changed gpus */ clear_nvidia_gpus(node); #endif /* NVIDIA_GPUS */ #ifndef ALT_CLSTR_ADDR ret = is_compose(stream, IS_CLUSTER_ADDRS); if (ret != DIS_SUCCESS) goto err; if (add_cluster_addrs(stream) != DIS_SUCCESS) goto err; /* NOTE: re-enabled rpp_flush/disabled rpp_eom (CRI) */ ret = rpp_flush(stream); if (ret != DIS_SUCCESS) goto err; if (LOGLEVEL >= 3) { sprintf(log_buffer, "sending cluster-addrs to node %s\n", node->nd_name); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } /* rpp_eom(stream); */ /* CLUSTER_ADDRS successful */ #endif node->nd_state &= ~(INUSE_NEEDS_HELLO_PING); break; case IS_UPDATE: DBPRT(("%s: IS_UPDATE\n", id)) i = disrui(stream, &ret); if (ret != DIS_SUCCESS) { if (LOGLEVEL >= 1) { sprintf(log_buffer, "IS_UPDATE error %d on node %s\n", ret, node->nd_name); log_err(ret, id, log_buffer); } goto err; } DBPRT(("%s: IS_UPDATE %s 0x%x\n", id, node->nd_name, i)) update_node_state(node, i); break; case IS_STATUS: /* pbs_server brought up pbs_mom brought up they send IS_HELLO to each other pbs_mom sends IS_STATUS message to pbs_server (replying to IS_HELLO) pbs_server sends IS_CLUSTER_ADDRS message to pbs_mom (replying to IS_HELLO) pbs_mom uses IS_CLUSTER_ADDRS message to authorize contacts from sisters */ if (LOGLEVEL >= 2) { sprintf(log_buffer, "IS_STATUS received from %s", node->nd_name); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } ret = is_stat_get(node); if (ret != DIS_SUCCESS) { if (LOGLEVEL >= 1) { sprintf(log_buffer, "IS_STATUS error %d on node %s", ret, node->nd_name); log_err(ret, id, log_buffer); } goto err; } node->nd_lastupdate = time_now; if (LOGLEVEL >= 9) { sprintf(log_buffer, "node '%s' is at state '0x%x'\n", node->nd_name, node->nd_state); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } for (sp = node->nd_psn;sp != NULL;sp = sp->next) { if (!(node->nd_state & INUSE_OFFLINE) && (sp->inuse & INUSE_OFFLINE)) { /* this doesn't seem to ever happen */ if (LOGLEVEL >= 2) { sprintf(log_buffer, "sync'ing subnode state '%s' with node state on node %s\n", "offline", node->nd_name); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } sp->inuse &= ~INUSE_OFFLINE; } sp->inuse &= ~INUSE_DOWN; } break; case IS_GPU_STATUS: /* pbs_server brought up pbs_mom brought up they send IS_HELLO to each other pbs_mom sends IS_STATUS followed by IS_GPU_STATUS message to pbs_server (replying to IS_HELLO) pbs_server sends IS_CLUSTER_ADDRS message to pbs_mom (replying to IS_HELLO) pbs_mom uses IS_CLUSTER_ADDRS message to authorize contacts from sisters */ #ifdef NVIDIA_GPUS if (LOGLEVEL >= 2) { sprintf(log_buffer, "IS_GPU_STATUS received from %s", node->nd_name); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, log_buffer); } ret = is_gpustat_get(node); if (ret != DIS_SUCCESS) { if (LOGLEVEL >= 1) { sprintf(log_buffer, "IS_GPU_STATUS error %d on node %s", ret, node->nd_name); log_err(ret, id, log_buffer); } goto err; } node->nd_lastupdate = time_now; #else if (LOGLEVEL >= 2) { sprintf(log_buffer, "Not configured: IS_GPU_STATUS received from %s", node->nd_name); log_err(ret, id, log_buffer); } #endif /* NVIDIA_GPUS */ break; default: sprintf(log_buffer, "unknown command %d sent from %s", command, node->nd_name); log_err(-1, id, log_buffer); goto err; break; } /* END switch (command) */ rpp_eom(stream); return; err: /* a DIS write error has occurred */ if (LOGLEVEL >= 1) { DBPRT(("%s: error processing node %s\n", id, node->nd_name)) } sprintf(log_buffer, "%s from %s(%s)", dis_emsg[ret], node->nd_name, netaddr(addr)); log_err(-1, id, log_buffer); rpp_close(stream); update_node_state(node, INUSE_DOWN); return; } /* END is_request() */ void write_node_state(void) { struct pbsnode *np; static char *fmt = "%s %d\n"; int i; int savemask; if (LOGLEVEL >= 5) { DBPRT(("write_node_state: entered\n")) } /* don't store volatile states like down and unknown */ savemask = INUSE_OFFLINE | INUSE_RESERVE; if (nstatef != NULL) { fseek(nstatef, 0L, SEEK_SET); /* rewind and clear */ if (ftruncate(fileno(nstatef), (off_t)0) != 0) { log_err(errno, "write_node_state", "could not truncate file"); return; } } else { /* need to open for first time, temporary-move to pbsd_init */ if ((nstatef = fopen(path_nodestate, "w+")) == NULL) { log_err( errno, "write_node_state", "could not open file"); return; } } /* ** The only state that carries forward is if the ** node has been marked offline. */ for (i = 0;i < svr_totnodes;i++) { np = pbsndmast[i]; if (np->nd_state & INUSE_DELETED) continue; if (np->nd_state & INUSE_OFFLINE) { fprintf(nstatef, fmt, np->nd_name, np->nd_state & savemask); } } /* END for (i) */ if (fflush(nstatef) != 0) { log_err(errno, "write_node_state", "failed saving node state to disk"); } return; } /* END write_node_state() */ /* Create a new node_note file then overwrite the previous one. * * The note file could get up to: * (# of nodes) * (2 + MAX_NODE_NAME + MAX_NOTE) bytes in size */ int write_node_note(void) { #ifndef NDEBUG static char id[] = "write_node_note"; #endif struct pbsnode *np; int i; FILE *nin; if (LOGLEVEL >= 2) { DBPRT(("%s: entered\n", id)) } if ((nin = fopen(path_nodenote_new, "w")) == NULL) goto err1; if ((svr_totnodes == 0) || (pbsndmast == NULL)) { log_event( PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, "node_note", "Server has empty nodes list"); fclose(nin); return(-1); } /* for each node ... */ for (i = 0;i < svr_totnodes;++i) { np = pbsndmast[i]; if (np->nd_state & INUSE_DELETED) continue; /* write node name followed by its note string */ if (np->nd_note != NULL && np->nd_note != '\0') { fprintf(nin, "%s %s\n", np->nd_name, np->nd_note); } } fflush(nin); if (ferror(nin)) { fclose(nin); goto err1; } fclose(nin); if (rename(path_nodenote_new, path_nodenote) != 0) { log_event( PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, "node_note", "replacing old node note file failed"); return(-1); } return(0); err1: log_event( PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, "node_note", "Node note file update failed"); return(-1); } /* END write_node_note() */ /* * free_prop - free list of prop structures created by proplist() */ static void free_prop( struct prop *prop) { struct prop *pp; for (pp = prop;pp != NULL;pp = prop) { prop = pp->next; free(pp->name); free(pp); } /* END for (pp) */ return; } /* END free_prop() */ /* * unreserve - unreserve nodes * * If handle is set to a existing resource_t, then release all nodes * associated with that handle, otherwise, (this is dangerous) * if handle == RESOURCE_T_ALL, release all nodes period. */ void node_unreserve( resource_t handle) { struct pbsnode *np; struct pbssubn *sp; int i; /* clear old reserve */ for (i = 0;i < svr_totnodes;i++) { np = pbsndlist[i]; if (np->nd_state & INUSE_DELETED) continue; for (sp = np->nd_psn;sp;sp = sp->next) { if (sp->inuse & INUSE_RESERVE) { if ((handle == RESOURCE_T_ALL) || (handle == sp->allocto)) { np->nd_nsnfree++; sp->inuse &= ~INUSE_RESERVE; np->nd_state &= ~INUSE_RESERVE; } } } } return; } /* END node_unreserve() */ /* ** Look through the property list and make sure that all ** those marked are contained in the node. */ int hasprop( struct pbsnode *pnode, struct prop *props) { struct prop *need; for (need = props;need;need = need->next) { struct prop *pp; if (need->mark == 0) /* not marked, skip */ continue; for (pp = pnode->nd_first;pp != NULL;pp = pp->next) { if (strcmp(pp->name, need->name) == 0) break; /* found it */ } if (pp == NULL) { return(0); } } return(1); } /* END hasprop() */ /* * see if node has the number of processors required * if free == SKIP_NONE, check against total number of processors, else * if free != SKIP_NONE, check against number free * * Return 1 if possible, 0 if not */ static int hasppn( struct pbsnode *pnode, /* I */ int node_req, /* I */ int free) /* I */ { if ((free != SKIP_NONE) && (free != SKIP_NONE_REUSE) && (pnode->nd_nsnfree >= node_req)) { return(1); } if ((free == SKIP_NONE) && (pnode->nd_nsn >= node_req)) { return(1); } return(0); } /* END hasppn() */ /* ** Mark the properties of a node that match the marked ** properties given. */ static void mark( struct pbsnode *pnode, /* I */ struct prop *props) { struct prop *set, *pp; for (pp = pnode->nd_first;pp != NULL;pp = pp->next) { pp->mark = 0; for (set = props;set;set = set->next) { if (set->mark == 0) continue; if (strcmp(pp->name, set->name) == 0) { pp->mark = 1; break; } } } return; } /* END mark() */ /* ** Count how many gpus are available for use on this node */ static int gpu_count( struct pbsnode *pnode, /* I */ int freeonly) /* I */ { static char id[] = "gpu_count"; int count = 0; if ((pnode->nd_state & INUSE_DELETED) || (pnode->nd_state & INUSE_OFFLINE) || (pnode->nd_state & INUSE_UNKNOWN) || (pnode->nd_state & INUSE_DOWN)) { if (LOGLEVEL >= 7) { sprintf(log_buffer, "Counted %d gpus %s on node %s that was skipped", count, (freeonly? "free":"available"), pnode->nd_name); log_ext(-1, id, log_buffer, LOG_DEBUG); } return (count); } #ifdef NVIDIA_GPUS if (pnode->nd_gpus_real) { int j; for (j = 0; j < pnode->nd_ngpus; j++) { struct gpusubn *gn = pnode->nd_gpusn + j; /* always ignore unavailable gpus */ if (gn->state == gpu_unavailable) continue; if (!freeonly) { count++; } else if ((gn->state == gpu_unallocated) || ((gn->state == gpu_shared) && (gpu_mode_rqstd == gpu_normal))) { count++;; } } } else #endif /* NVIDIA_GPUS */ { /* virtual gpus */ if (freeonly) { count = pnode->nd_ngpus_free; } else { count = pnode->nd_ngpus; } } if (LOGLEVEL >= 7) { sprintf(log_buffer, "Counted %d gpus %s on node %s", count, (freeonly? "free":"available"), pnode->nd_name); log_ext(-1, id, log_buffer, LOG_DEBUG); } return (count); } /* END gpu_count() */ #ifdef NVIDIA_GPUS /* ** get gpu index for this gpuid */ int gpu_entry_by_id( struct pbsnode *pnode, /* I */ char *gpuid, int get_empty) { if (pnode->nd_gpus_real) { int j; for (j = 0; j < pnode->nd_ngpus; j++) { struct gpusubn *gn = pnode->nd_gpusn + j; if ((gn->gpuid != NULL) && (strcmp(gpuid, gn->gpuid) == 0)) { return(j); } } } /* * we did not find the entry. if get_empty is set then look for an empty * slot. If none is found then try to add a new entry to nd_gpusn */ if (get_empty) { int j; for (j = 0; j < pnode->nd_ngpus; j++) { struct gpusubn *gn = pnode->nd_gpusn + j; if (gn->gpuid == NULL) { return(j); } } create_a_gpusubnode(pnode); return (pnode->nd_ngpus - 1); } return (-1); } /* END gpu_entry_by_id() */ #endif /* NVIDIA_GPUS */ #define RECURSIVE_LIMIT 3 /* ** Search for a node which contains properties glorf and the requirements. ** skip indicates which nodes to pass over for this search. ** Don't do any recursive calls deeper than RECURSIVE_LIMIT. ** RETURN: 0 = failure, 1 = SUCCESS */ static int search( struct prop *glorf, /* properties */ int vpreq, /* VPs needed */ int gpureq, /* GPUs needed */ int skip, int order, int depth) { static int pass = INUSE_OFFLINE | INUSE_DOWN | INUSE_RESERVE | INUSE_UNKNOWN | INUSE_DELETED; static char id[] = "search"; struct pbsnode *pnode; int found; int i; if (++depth == RECURSIVE_LIMIT) { return(0); } /* look for nodes we haven't picked already */ for (i = 0;i < svr_totnodes;i++) { pnode = pbsndlist[i]; if (pnode->nd_state & INUSE_DELETED) continue; if (pnode->nd_ntype == NTYPE_CLUSTER) { if (pnode->nd_flag != okay) { if ((skip != SKIP_NONE_REUSE) || (pnode->nd_flag != thinking)) { /* allow node re-use if SKIP_NONE_REUSE is set */ continue; } } /* FIXME: this is rejecting job submits? if (pnode->nd_state & pass) continue; */ if (!hasprop(pnode, glorf)) continue; if (LOGLEVEL >= 7) { sprintf(log_buffer, "search: starting eval gpus on node %s need %d(%d) mode %d has %d free %d skip %d depth %d", pnode->nd_name, gpureq, pnode->nd_ngpus_needed, gpu_mode_rqstd, pnode->nd_ngpus, gpu_count(pnode, TRUE), skip, depth); log_ext(-1, id, log_buffer, LOG_DEBUG); } if ((skip == SKIP_NONE) || (skip == SKIP_NONE_REUSE)) { if ((vpreq > pnode->nd_nsn) || (gpureq > gpu_count(pnode, FALSE))) continue; } else if ((skip == SKIP_ANYINUSE) && ((pnode->nd_state & INUSE_SUBNODE_MASK) || (vpreq > pnode->nd_nsnfree) || (gpureq > gpu_count(pnode, TRUE)))) { continue; } else if ((skip == SKIP_EXCLUSIVE) && ((pnode->nd_state & INUSE_SUBNODE_MASK) || (vpreq > (pnode->nd_nsnfree + pnode->nd_nsnshared)) || (gpureq > gpu_count(pnode, TRUE)))) { continue; } /* NOTE: allow node re-use if SKIP_NONE_REUSE by ignoring 'thinking' above */ pnode->nd_flag = thinking; mark(pnode, glorf); pnode->nd_needed = vpreq; pnode->nd_ngpus_needed = gpureq; pnode->nd_order = order; /* SUCCESS */ if (LOGLEVEL >= 7) { sprintf(log_buffer, "search: successful gpus on node %s need %d(%d) mode %d has %d free %d skip %d depth %d", pnode->nd_name, gpureq, pnode->nd_ngpus_needed, gpu_mode_rqstd, pnode->nd_ngpus, gpu_count(pnode, TRUE), skip, depth); log_ext(-1, id, log_buffer, LOG_DEBUG); } return(1); } } if (glorf == NULL) /* no property */ { /* FAILURE */ return(0); /* can't retry */ } /* try re-shuffling the nodes to get what we want */ for (i = 0;i < svr_totnodes;i++) { pnode = pbsndlist[i]; if (pnode->nd_state & INUSE_DELETED) continue; if (pnode->nd_ntype == NTYPE_CLUSTER) { if (pnode->nd_flag != thinking) { /* only shuffle nodes which have been selected above */ continue; } if (pnode->nd_state & pass) continue; if (LOGLEVEL >= 7) { sprintf(log_buffer, "search(2): starting eval gpus on node %s need %d(%d) mode %d has %d free %d skip %d depth %d", pnode->nd_name, gpureq, pnode->nd_ngpus_needed, gpu_mode_rqstd, pnode->nd_ngpus, gpu_count(pnode, TRUE), skip, depth); log_ext(-1, id, log_buffer, LOG_DEBUG); } if ((skip == SKIP_EXCLUSIVE) && (vpreq < pnode->nd_nsnfree) && (gpureq < gpu_count(pnode, TRUE))) continue; if ((skip == SKIP_ANYINUSE) && (vpreq < (pnode->nd_nsnfree + pnode->nd_nsnshared)) && (gpureq < gpu_count(pnode, TRUE))) continue; if (!hasprop(pnode, glorf)) continue; pnode->nd_flag = conflict; /* Ben Webb patch (CRI 10/06/03) */ found = search( pnode->nd_first, pnode->nd_needed, pnode->nd_ngpus_needed, skip, pnode->nd_order, depth); pnode->nd_flag = thinking; if (found) { mark(pnode, glorf); pnode->nd_needed = vpreq; pnode->nd_ngpus_needed = gpureq; pnode->nd_order = order; /* SUCCESS */ if (LOGLEVEL >= 7) { sprintf(log_buffer, "search(2): successful gpus on node %s need %d(%d) mode %d has %d free %d skip %d depth %d", pnode->nd_name, gpureq, pnode->nd_ngpus_needed, gpu_mode_rqstd, pnode->nd_ngpus, gpu_count(pnode, TRUE), skip, depth); log_ext(-1, id, log_buffer, LOG_DEBUG); } return(1); } } } /* END for (i) */ /* FAILURE */ /* not found */ return(0); } /* END search() */ /* ** Parse a number in a spec. ** Return 0 if okay, 1 if no number exists, -1 on error */ static int number( char **ptr, int *num) { char holder[80]; int i = 0; char *str = *ptr; while (isdigit(*str) && (unsigned int)(i + 1) < sizeof holder) holder[i++] = *str++; if (i == 0) { return(1); } holder[i] = '\0'; if ((i = atoi(holder)) <= 0) { sprintf(log_buffer, "zero illegal"); return(-1); } *ptr = str; *num = i; return(0); } /* END number() */ /* ** Check string to see if it is a legal property name. ** If not, return 1. ** *prop set to static char array containing the properity, ** must be copied. */ static int property( char **ptr, char **prop) { static char name[80]; char* str = *ptr; int i = 0; if (!isalpha(*str)) { sprintf(log_buffer, "first character of property (%s) not a letter", str); return(1); } while (isalnum(*str) || *str == '-' || *str == '.' || *str == '=' || *str == '_') name[i++] = *str++; name[i] = '\0'; *prop = (i == 0) ? NULL : name; /* skip over "/vp_number" */ if (*str == '/') { do { str++; } while (isdigit(*str)); } *ptr = str; return(0); } /* END property() */ /* ** Create a property list from a string. ** Return 0 if all is well, 1 otherwise. */ static int proplist( char **str, struct prop **plist, int *node_req, int *gpu_req) { struct prop *pp; char *pname; char *pequal; #ifdef NVIDIA_GPUS static char id[] = "proplist"; int have_gpus = FALSE; #endif /* NVIDIA_GPUS */ *node_req = 1; /* default to 1 processor per node */ for (;;) { if (property(str, &pname)) { return(1); } if (pname == NULL) break; if ((pequal = strchr(pname, (int)'=')) != NULL) { /* special property */ /* identify the special property and place its value */ /* into node_req */ *pequal = '\0'; if (strcmp(pname, "ppn") == 0) { pequal++; if ((number(&pequal, node_req) != 0) || (*pequal != '\0')) { return(1); } } else if(strcmp(pname, "procs") == 0) { pequal++; if ((number(&pequal, node_req) != 0) || (*pequal != '\0')) { return(1); } } else if (strcmp(pname, "gpus") == 0) { pequal++; if ((number(&pequal, gpu_req) != 0) || (*pequal != '\0')) { return(1); } #ifdef NVIDIA_GPUS have_gpus = TRUE; gpu_err_reset = FALSE; /* default to no */ #endif /* NVIDIA_GPUS */ /* default value if no other gets specified */ gpu_mode_rqstd = gpu_exclusive_thread; } else { return(1); /* not recognized - error */ } } #ifdef NVIDIA_GPUS else if (have_gpus && (!strcasecmp(pname, "exclusive_thread"))) { gpu_mode_rqstd = gpu_exclusive_thread; } else if (have_gpus && (!strcasecmp(pname, "exclusive"))) { gpu_mode_rqstd = gpu_exclusive_thread; } else if (have_gpus && (!strcasecmp(pname, "exclusive_process"))) { gpu_mode_rqstd = gpu_exclusive_process; } else if (have_gpus && (!strcasecmp(pname, "default"))) { gpu_mode_rqstd = gpu_normal; } else if (have_gpus && (!strcasecmp(pname, "shared"))) { gpu_mode_rqstd = gpu_normal; } else if (have_gpus && (!strcasecmp(pname, "reseterr"))) { gpu_err_reset = TRUE; } #endif /* NVIDIA_GPUS */ else { pp = (struct prop *)malloc(sizeof(struct prop)); pp->mark = 1; pp->name = strdup(pname); pp->next = *plist; *plist = pp; } #ifdef NVIDIA_GPUS if ((have_gpus) && (LOGLEVEL >= 7)) { sprintf(log_buffer, "proplist: set needed gpu mode to %d", gpu_mode_rqstd); log_ext(-1, id, log_buffer, LOG_DEBUG); } #endif /* NVIDIA_GPUS */ if (**str != ':') break; (*str)++; } /* END for(;;) */ return 0; } /* END proplist() */ /* * Evaluate one element in a node spec. * * Return 1 if it can be satisfied * 0 if it cannot be completly satisfied. (not used now) * -1 if error - can never be satisfied. */ static int listelem( char **str, int order) { int num = 1; int i, hit; int ret = -1; struct prop *prop = NULL; struct pbsnode *pnode; int node_req = 1; int gpu_req = 0; if ((i = number(str, &num)) == -1) /* get number */ { /* FAILURE */ return(ret); } if (i == 0) { /* number exists */ if (**str == ':') { /* there are properties */ (*str)++; if (proplist(str, &prop, &node_req, &gpu_req)) { return(ret); } } } else { /* no number */ if (proplist(str, &prop, &node_req, &gpu_req)) { /* must be a prop list with no number in front */ return(ret); } } /* count number of nodes with the requested property */ hit = 0; for (i = 0;i < svr_totnodes;i++) { pnode = pbsndlist[i]; if (pnode->nd_state & INUSE_DELETED) continue; if (pnode->nd_ntype == NTYPE_CLUSTER) { if ((hasprop(pnode, prop)) && (hasppn(pnode, node_req, SKIP_NONE)) && (gpu_count(pnode, FALSE) >= gpu_req)) hit++; if (hit == num) { break; /* found enough */ } } } /* END for (i) */ if (hit < num) { /* request exceeds configured nodes */ if ((SvrNodeCt == 0) || (SvrNodeCt < num)) { /* request exceeds server resources_available */ /* request can never be satisfied */ goto done; } } /* ** Find an initial set of nodes to satisfy the request. ** Go ahead and use any nodes no matter what state they are in. */ /* NOTE: SKIP_NONE_REUSE will not mark nodes as inuse, ie allow node re-use */ for (i = 0;i < num;i++) { if (SvrNodeCt == 0) { if (search(prop, node_req, gpu_req, SKIP_NONE, order, 0)) continue; } else { if (search(prop, node_req, gpu_req, SKIP_NONE_REUSE, order, 0)) continue; } /* can never be satisfied */ goto done; } ret = 1; done: free_prop(prop); return(ret); } /* END listelem() */ /* ** Add the "global" spec to every sub-spec in "spec". ** RETURNS: allocated string buffer (must be freed externally) */ static char *mod_spec( char *spec, /* I */ char *global) /* I */ { char *line; char *cp; int len; int nsubspec; nsubspec = 1; for (cp = spec;*cp != '\0';cp++) { if (*cp == '+') { nsubspec++; } } len = strlen(global); line = malloc(nsubspec * (len + 1) + strlen(spec) + 1); if (line == NULL) { /* FAILURE */ return(NULL); } cp = line; while (*spec) { if (*spec == '+') { *cp++ = ':'; strcpy(cp, global); cp += len; } *cp++ = *spec++; } *cp++ = ':'; strcpy(cp, global); return(line); } /* END mod_spec() */ /* cntjons - count jobs on (shared) nodes */ static int cntjons( struct pbsnode *pn) { struct pbssubn *psn; int ct = 0; int n; struct jobinfo *pj; psn = pn->nd_psn; for (n = 0;n < pn->nd_nsn;++n) { pj = psn->jobs; while (pj) { ++ct; pj = pj->next; } psn = psn->next; } return(ct); } /* * nodecmp - compare two nodes for sorting * For "exclusive", depending on setting of node_order attribute: * pack: put free node with fewest non-zero free VPs in node first * scatter: put free node with most fre VPs first * For "shared", put current shared with fewest jobs first, * then free nodes, and others last */ #define BIG_NUM 32768 /* used only in nodecmp() */ static int nodecmp( const void *aa, const void *bb) { struct pbsnode *a = *(struct pbsnode **)aa; struct pbsnode *b = *(struct pbsnode **)bb; int aprim, bprim; /* exclusive is global */ if (exclusive) { /* best is free */ if (server.sv_attr[(int)SRV_ATR_NodePack].at_val.at_long) { /* pack - fill up nodes first */ aprim = (a->nd_nsnfree > 0) ? a->nd_nsnfree : BIG_NUM; bprim = (b->nd_nsnfree > 0) ? b->nd_nsnfree : BIG_NUM; } else { /* scatter - spread amoung nodes first */ aprim = a->nd_nsn - a->nd_nsnfree; bprim = b->nd_nsn - b->nd_nsnfree; } } else { /* best is shared with fewest jobs */ aprim = (a->nd_state == INUSE_JOBSHARE) ? cntjons(a) : ((a->nd_state == INUSE_FREE) ? 5 : 1000); bprim = (b->nd_state == INUSE_JOBSHARE) ? cntjons(b) : ((b->nd_state == INUSE_FREE) ? 5 : 1000); } if (aprim == bprim) { return(a->nd_nprops - b->nd_nprops); } return (aprim - bprim); } /* END nodecmp() */ int MSNPrintF( char **BPtr, /* I */ int *BSpace, /* I */ char *Format, /* I */ ...) /* I */ { int len; va_list Args; if ((BPtr == NULL) || (BSpace == NULL) || (Format == NULL) || (*BSpace <= 0)) { return(FAILURE); } va_start(Args,Format); len = vsnprintf(*BPtr,*BSpace,Format,Args); va_end(Args); if (len <= 0) { return(FAILURE); } if (len >= *BSpace) { /* truncation occurred due to attempted * overflow! */ /* do not place BPtr past the end of the buffer: * it is too dangerous (calling function could derference it * to check for empty string, etc.)! */ *BPtr += (*BSpace) - 1; *BSpace = 0; return(FAILURE); } *BPtr += len; *BSpace -= len; return(SUCCESS); } /* END MSNPrintF() */ /* * Test a procs specification. * * Return >0 - number of procs counted in the spec if it works, * 0 - if it cannot be satisfied now, * -1 - if it can never be satisfied. * */ int procs_available(int proc_ct) { int i; int procs_avail = 0; struct pbsnode *pnode; if(proc_ct > svr_clnodes) { /* user requested more processors than are available on the system*/ return(-1); } for (i = 0;i < svr_totnodes;i++) { pnode = pbsndlist[i]; if (pnode->nd_state & INUSE_DELETED) continue; procs_avail += pnode->nd_nsnfree; } if(proc_ct > procs_avail) { return(0); } return(procs_avail); } /* * Test a node specification. * * Return >0 - number of nodes counted in the spec if it works, * 0 - if it cannot be satisfied, * -1 - if it can never be satisfied. * Okay to bail early if "early" is true. * VPs selected are marked "thinking" */ static int node_spec( char *spec, /* I */ int early, /* I (boolean) */ int exactmatch, /* I (boolean) - NOT USED */ char *ProcBMStr, /* I */ char *FailNode, /* O (optional,minsize=1024) */ char *EMsg) /* O (optional,minsize=1024) */ { static char id[] = "node_spec"; struct pbsnode *pnode; struct pbssubn *snp; char *str, *globs, *cp, *hold; int i, num; int rv; static char shared[] = "shared"; extern int PNodeStateToString(int, char *, int); if (EMsg != NULL) EMsg[0] = '\0'; if (FailNode != NULL) FailNode[0] = '\0'; if (LOGLEVEL >= 6) { sprintf(log_buffer, "entered spec=%.4000s", spec); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); DBPRT(("%s\n", log_buffer)); } exclusive = 1; /* by default, nodes (VPs) are requested exclusively */ spec = strdup(spec); if (spec == NULL) { /* FAILURE */ sprintf(log_buffer,"cannot alloc memory"); if (LOGLEVEL >= 1) { log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } if (EMsg != NULL) { strncpy(EMsg,log_buffer,1024); } return(-1); } if ((globs = strchr(spec, '#')) != NULL) { *globs++ = '\0'; globs = strdup(globs); while ((cp = strrchr(globs, '#')) != NULL) { *cp++ = '\0'; if (strcmp(cp, shared) != 0) { hold = mod_spec(spec, cp); free(spec); spec = hold; } else { exclusive = 0; } } if (strcmp(globs, shared) != 0) { hold = mod_spec(spec, globs); free(spec); spec = hold; } else { exclusive = 0; } free(globs); } /* END if ((globs = strchr(spec,'#')) != NULL) */ str = spec; num = ctnodes(str); #ifndef CRAY_MOAB_PASSTHRU if (num > svr_clnodes) { /* FAILURE */ free(spec); sprintf(log_buffer, "job allocation request exceeds available cluster nodes, %d requested, %d available", num, svr_clnodes); if (LOGLEVEL >= 6) { log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } if (EMsg != NULL) { strncpy(EMsg, log_buffer, 1024); } return(-1); } #endif if (LOGLEVEL >= 6) { sprintf(log_buffer, "job allocation debug: %d requested, %d svr_clnodes, %d svr_totnodes", num, svr_clnodes, svr_totnodes); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); DBPRT(("%s\n", log_buffer)); } /* * if SRV_ATR_NodePack set (true or false), then * sort nodes by state, number of VPs and number of attributes; * otherwise, leave unsorted */ if (server.sv_attr[(int)SRV_ATR_NodePack].at_flags & ATR_VFLAG_SET) { qsort(pbsndlist, svr_totnodes, sizeof(struct pbsnode *), nodecmp); } /* reset subnodes (VPs) to okay */ svr_numnodes = 0; svr_numcfgnodes = 0; for (i = 0;i < svr_totnodes;i++) { pnode = pbsndlist[i]; if (pnode->nd_state & INUSE_DELETED) continue; #ifdef GEOMETRY_REQUESTS /* must be dedicated node use for cpusets */ if (IS_VALID_STR(ProcBMStr)) { if (pnode->nd_state != INUSE_FREE) continue; if (node_satisfies_request(pnode,ProcBMStr) == FALSE) continue; } #endif /* GEOMETRY_REQUESTS */ if (LOGLEVEL >= 6) { DBPRT(("%s: %s nsn %d, nsnfree %d, nsnshared %d\n", id, pnode->nd_name, pnode->nd_nsn, pnode->nd_nsnfree, pnode->nd_nsnshared)) } pnode->nd_flag = okay; pnode->nd_needed = 0; for (snp = pnode->nd_psn;snp != NULL;snp = snp->next) { snp->flag = okay; if (LOGLEVEL >= 6) { DBPRT(("%s: %s/%d inuse 0x%x nprops %d\n", id, pnode->nd_name, snp->index, snp->inuse, pnode->nd_nprops)) } } if (pnode->nd_ntype == NTYPE_CLUSTER) { /* configured node located */ svr_numcfgnodes++; if ((pnode->nd_state & (INUSE_OFFLINE | INUSE_DOWN | INUSE_RESERVE | INUSE_JOB)) == 0) { /* NOTE: checking if node is not just up, but free */ /* available node located */ svr_numnodes++; } } } /* END for (i = 0) */ /* * Make first pass at finding nodes to allocate. * process each subspec (piece between '+'s) */ for (i = 1;;i++) { if ((rv = listelem(&str, i)) <= 0) { free(spec); return(rv); } if (*str != '+') break; str++; } /* END for (i) */ i = (int) * str; free(spec); if (i != 0) /* garbled list */ { /* FAILURE */ sprintf(log_buffer, "job allocation request is corrupt"); if (LOGLEVEL >= 6) { log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } if (EMsg != NULL) { strncpy(EMsg, log_buffer, 1024); } return(-1); } #ifndef CRAY_MOAB_PASSTHRU if ((num > svr_numnodes) && early) /* temp fail, not available */ { /* FAILURE */ sprintf(log_buffer, "job allocation request exceeds currently available cluster nodes, %d requested, %d available", num, svr_numnodes); if (LOGLEVEL >= 6) { log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } if (EMsg != NULL) { strncpy(EMsg, log_buffer, 1024); } return(0); } /* END if ((num > svr_numnodes) && early) */ #endif if (LOGLEVEL >= 6) { sprintf(log_buffer, "job allocation debug(2): %d requested, %d svr_numnodes", num, svr_numnodes); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); DBPRT(("%s\n", log_buffer)); } /* * At this point we know the spec is legal. * Here we find a replacement for any nodes chosen above * that are already inuse. */ for (i = 0;i < svr_totnodes;i++) { pnode = pbsndlist[i]; if (pnode->nd_state & INUSE_DELETED) continue; if (pnode->nd_ntype != NTYPE_CLUSTER) { /* node is ok */ continue; } if (pnode->nd_flag != thinking) /* thinking is global */ { /* node is ok */ continue; } if (LOGLEVEL >= 7) { sprintf(log_buffer, "starting eval gpus on node %s need %d free %d", pnode->nd_name, pnode->nd_ngpus_needed, gpu_count(pnode, TRUE)); log_ext(-1, id, log_buffer, LOG_DEBUG); } if (pnode->nd_state == INUSE_FREE) { if (pnode->nd_needed <= pnode->nd_nsnfree) { if (pnode->nd_ngpus_needed <= gpu_count(pnode, TRUE)) { /* adequate virtual nodes and gpus available - node is ok */ if (LOGLEVEL >= 7) { sprintf(log_buffer, "adequate virtual nodes and gpus available - node is ok"); log_ext(-1, id, log_buffer, LOG_DEBUG); } continue; } } if (!exclusive && (pnode->nd_needed < pnode->nd_nsnfree + pnode->nd_nsnshared)) { if (pnode->nd_ngpus_needed <= gpu_count(pnode, TRUE)) { /* shared node - node is ok */ continue; } } } else { if (!exclusive && (pnode->nd_needed <= pnode->nd_nsnfree + pnode->nd_nsnshared)) { /* shared node - node is ok */ continue; } } /* otherwise find replacement node */ /* Ben Webb search patch applied (CRI 10/03/03) */ pnode->nd_flag = okay; if (search( pnode->nd_first, pnode->nd_needed, pnode->nd_ngpus_needed, (exclusive != 0) ? SKIP_ANYINUSE : SKIP_EXCLUSIVE, pnode->nd_order, 0)) { /* node is ok */ continue; } if (early != 0) { /* FAILURE */ if (LOGLEVEL >= 7) { sprintf(log_buffer, "failure early"); log_ext(-1, id, log_buffer, LOG_DEBUG); DBPRT(("%s\n", log_buffer)); } /* specified node not available and replacement cannot be located */ if (pnode->nd_needed > pnode->nd_nsnfree) { char JobList[1024]; struct pbssubn *np; struct jobinfo *jp; char *BPtr; int BSpace; int nindex; JobList[0] = '\0'; BPtr = JobList; BSpace = sizeof(JobList); /* scheduler and pbs_server disagree on np availability - report current allocation */ /* show allocating jobs */ /* examine all subnodes in node */ nindex = 0; for (np = pnode->nd_psn;np != NULL;np = np->next) { /* examine all jobs allocated to subnode */ for (jp = np->jobs;jp != NULL;jp = jp->next) { MSNPrintF(&BPtr, &BSpace, "%s%s:%d", (JobList[0] != '\0') ? "," : "", (jp->job != NULL) ? jp->job->ji_qs.ji_jobid : "???", nindex); } nindex++; } /* END for (np) */ snprintf(log_buffer, sizeof(log_buffer), "cannot allocate node '%s' to job - node not currently available (nps needed/free: %d/%d, gpus needed/free: %d/%d, joblist: %s)", pnode->nd_name, pnode->nd_needed, pnode->nd_nsnfree, pnode->nd_ngpus_needed, gpu_count(pnode, TRUE), JobList); #ifdef BROKENVNODECHECKS /* NOTE: hack - should be moved to update node state */ if (JobList[0] == '\0') { pnode->nd_nsnfree = pnode->nd_nsn; pnode->nd_ngpus_free = pnode->nd_ngpus; } #endif } else { char NodeState[1024]; PNodeStateToString(pnode->nd_state, NodeState, sizeof(NodeState)); sprintf(log_buffer, "cannot allocate node '%s' to job - node not currently available (state: %s)", pnode->nd_name, NodeState); } if (LOGLEVEL >= 6) { log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } if (EMsg != NULL) { strncpy(EMsg, log_buffer, 1024); } if (FailNode != NULL) strncpy(FailNode, pnode->nd_name, 1024); return(0); } /* END if (early != 0) */ num = 0; } /* END for (i) */ /* SUCCESS - spec is ok */ if (LOGLEVEL >= 6) { sprintf(log_buffer, "job allocation debug(3): returning %d requested", num); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); DBPRT(("%s\n", log_buffer)); } return(num); } /* END node_spec() */ #ifdef GEOMETRY_REQUESTS /** * get_bitmap * * @param pjob (I) - the job whose bitmap is be retrieved * @param ProcBMPtr (O) - the ptr to the string where the bitmap will be stored * @param ProcBMSize (I) - the size of the string ProcBMPtr points to * @return FAILURE if there is no specified bitmap or either pjob or ProcBMStrPtr are NULL * @return SUCCESS otherwise */ int get_bitmap( job *pjob, /* I */ int ProcBMSize, /* I */ char *ProcBMPtr) /* O */ { resource *presc; resource_def *prd; char LocalBM[MAX_BM]; if ((pjob == NULL) || (ProcBMPtr == NULL)) { return(FAILURE); } LocalBM[0] = '\0'; /* read the bitmap from the resource list */ prd = find_resc_def(svr_resc_def,"procs_bitmap",svr_resc_size); presc = find_resc_entry(&pjob->ji_wattr[(int)JOB_ATR_resource],prd); if ((presc != NULL) && (presc->rs_value.at_flags & ATR_VFLAG_SET)) { snprintf(LocalBM,sizeof(LocalBM),"%s",presc->rs_value.at_val.at_str); } else { /* fail if there was no bitmap given */ return(FAILURE); } if (LocalBM[0] == '\0') { /* fail if there was no bitmap given */ return(FAILURE); } else { snprintf(ProcBMPtr,sizeof(LocalBM),"%s",LocalBM); return(SUCCESS); } } /* end get_bitmap() */ /** * node_satisfies_request * * @param pnode (I) - the node to check for validity * @param ProcBMStr (I) - the bitmap of procs requested * @return TRUE - if the node satisfies the bitmap, FALSE otherwise * @return BM_ERROR if the bitmap isn't valid */ int node_satisfies_request( struct pbsnode *pnode, /* I */ char *ProcBMStr) /* I */ { int BMLen; int BMIndex; struct pbssubn *snp; if (IS_VALID_STR(ProcBMStr) == FALSE) return(BM_ERROR); /* nodes are exclusive when we're using bitmaps */ if (pnode->nd_state != INUSE_FREE) return(FALSE); BMLen = strlen(ProcBMStr); /* process in reverse because ProcBMStr[0] referes to core index 0 */ BMIndex = BMLen-1; /* check if the requested processors are available on this node */ for (snp = pnode->nd_psn;snp && BMIndex >= 0;snp = snp->next) { /* don't check cores that aren't requested */ if (ProcBMStr[BMIndex--] != '1') continue; /* cannot use this node, one of the requested cores is busy */ if (snp->inuse != INUSE_FREE) return(FALSE); } if (BMIndex >= 0) { /* this means we didn't finish checking the string - * the node doesn't have enough processors */ return(FALSE); } /* passed all checks, we're good */ return(TRUE); } /* END node_satisfies_request() */ /** * reserve_node * * @param pnode - node to reserve * @param pjob - the job to be added to the node * @param hlistptr - a pointer to the host list */ int reserve_node( struct pbsnode *pnode, /* I/O */ short newstate, /* I */ job *pjob, /* I */ char *ProcBMStr, /* I */ struct howl **hlistptr) /* O */ { int BMLen; int BMIndex; struct pbssubn *snp; if ((pnode == NULL) || (pjob == NULL) || (hlistptr == NULL)) { return(FAILURE); } BMLen = strlen(ProcBMStr); BMIndex = BMLen-1; /* now reserve each node */ for (snp = pnode->nd_psn;snp && BMIndex >= 0;snp = snp->next) { /* ignore unrequested cores */ if (ProcBMStr[BMIndex--] != '1') continue; add_job_to_node(pnode,snp,INUSE_JOB,pjob,exclusive); build_host_list(hlistptr,snp,pnode); } /* mark the node as exclusive */ pnode->nd_state = INUSE_JOB; return(SUCCESS); } #endif /* GEOMETRY_REQUESTS */ /** * adds this job to the node's list of jobs * checks to be sure not to add duplicates * * conditionally updates the subnode's state * decrements the amount of needed nodes * * @param pnode - the node that the job is running on * @param nd_psn - the subnode (processor) that the job is running on * @param newstate - the state nodes are transitioning to when used * @param pjob - the job that is going to be run * @param exclusive - TRUE if jobs are given exclusive node use, FALSE otherwise */ int add_job_to_node( struct pbsnode *pnode, /* I/O */ struct pbssubn *snp, /* I/O */ short newstate, /* I */ job *pjob, /* I */ int exclusive) /* I */ { char *id = "add_job_to_node"; struct jobinfo *jp; /* NOTE: search existing job array. add job only if job not already in place */ if (LOGLEVEL >= 5) { sprintf(log_buffer, "allocated node %s/%d to job %s (nsnfree=%d)", pnode->nd_name, snp->index, pjob->ji_qs.ji_jobid, pnode->nd_nsnfree); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); DBPRT(("%s\n", log_buffer)); } for (jp = snp->jobs;jp != NULL;jp = jp->next) { if (jp->job == pjob) break; } if (jp == NULL) { /* add job to front of subnode job array */ jp = (struct jobinfo *)malloc(sizeof(struct jobinfo)); jp->next = snp->jobs; snp->jobs = jp; jp->job = pjob; pnode->nd_nsnfree--; /* reduce free count */ /* if no free VPs, set node state */ if (pnode->nd_nsnfree <= 0) pnode->nd_state = newstate; if (snp->inuse == INUSE_FREE) { snp->inuse = newstate; if (!exclusive) pnode->nd_nsnshared++; } } /* decrement the amount of nodes needed */ --pnode->nd_needed; return(SUCCESS); } int add_job_to_gpu_subnode( struct pbsnode *pnode, struct gpusubn *gn, job *pjob) { #ifdef NVIDIA_GPUS if (!pnode->nd_gpus_real) #endif /* NVIDIA_GPUS */ { /* update the gpu subnode */ gn->pjob = pjob; gn->inuse = TRUE; /* update the main node */ pnode->nd_ngpus_free--; } /* this seems to be a temporary variable needed for node selection */ pnode->nd_ngpus_needed--; return(PBSE_NONE); } /* END add_job_to_gpu_subnode() */ /** * builds the host list (hlist) * * @param pnode - the node being added to the host list * @param hlist - the host list being built */ int build_host_list( struct howl **hlistptr, /* O */ struct pbssubn *snp, /* I */ struct pbsnode *pnode) /* I */ { struct howl *curr; struct howl *prev; struct howl *hp; /* initialize the pointers */ curr = (struct howl *)malloc(sizeof(struct howl)); curr->order = pnode->nd_order; curr->name = pnode->nd_name; curr->index = snp->index; /* find the proper place in the list */ for (prev = NULL, hp = *hlistptr;hp;prev = hp, hp = hp->next) { if (curr->order <= hp->order) break; } /* END for (prev) */ /* set the correct pointers in the list */ curr->next = hp; if (prev == NULL) *hlistptr = curr; else prev->next = curr; return(SUCCESS); } int add_gpu_to_hostlist( struct howl **hlistptr, struct gpusubn *gn, struct pbsnode *pnode) { struct howl *curr; struct howl *prev; struct howl *hp; char *gpu_name; static char *gpu = "gpu"; /* create gpu_name */ gpu_name = malloc(strlen(pnode->nd_name) + strlen(gpu) + 2); sprintf(gpu_name, "%s-%s", pnode->nd_name, gpu); /* initialize the pointers */ curr = (struct howl *)malloc(sizeof(struct howl)); curr->order = pnode->nd_order; curr->name = gpu_name; curr->index = gn->index; /* find the proper place in the list */ for (prev = NULL, hp = *hlistptr;hp;prev = hp, hp = hp->next) { if (curr->order <= hp->order) break; } /* END for (prev) */ /* set the correct pointers in the list */ curr->next = hp; if (prev == NULL) *hlistptr = curr; else prev->next = curr; return(SUCCESS); } /* END add_gpu_to_hostlist() */ /* * set_nodes() - Call node_spec() to allocate nodes then set them inuse. * Build list of allocated nodes to pass back in rtnlist. * Return: PBS error code */ int set_nodes( job *pjob, /* I */ char *spec, /* I */ int procs, /* I */ char **rtnlist, /* O */ char *FailHost, /* O (optional,minsize=1024) */ char *EMsg) /* O (optional,minsize=1024) */ { struct howl *hp; struct howl *hlist; struct howl *gpu_list; struct howl *nxt; int i; int j; int procs_needed = 0; short newstate; int NCount; static char id[] = "set_nodes"; struct pbsnode *pnode; struct pbssubn *snp; char *nodelist; char *gpu_str = NULL; struct gpusubn *gn; char ProcBMStr[MAX_BM]; #ifdef NVIDIA_GPUS int gpu_flags = 0; #endif /* NVIDIA_GPUS */ if (FailHost != NULL) FailHost[0] = '\0'; if (EMsg != NULL) EMsg[0] = '\0'; if (LOGLEVEL >= 3) { sprintf(log_buffer, "allocating nodes for job %s with node expression '%.4000s'", pjob->ji_qs.ji_jobid, spec); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } ProcBMStr[0] = '\0'; #ifdef GEOMETRY_REQUESTS get_bitmap(pjob,sizeof(ProcBMStr),ProcBMStr); #endif /* GEOMETRY_REQUESTS */ /* allocate nodes */ if ((i = node_spec(spec, 1, 1, ProcBMStr, FailHost, EMsg)) == 0) /* check spec */ { /* no resources located, request failed */ if (EMsg != NULL) { sprintf(log_buffer, "could not locate requested resources '%.4000s' (node_spec failed) %s", spec, EMsg); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } return(PBSE_RESCUNAV); } if (i < 0) { /* request failed, corrupt request */ log_err(PBSE_UNKNODE, id, "request failed, corrupt request"); return(PBSE_UNKNODE); } /* i indicates number of matching nodes */ if (exclusive) /* exclusive is global */ svr_numnodes -= i; hlist = NULL; gpu_list = NULL; newstate = exclusive ? INUSE_JOB : INUSE_JOBSHARE; for (i = 0;i < svr_totnodes;i++) { pnode = pbsndlist[i]; if (pnode->nd_state & INUSE_DELETED) continue; if (pnode->nd_flag != thinking) { /* node is not considered/eligible for job - see search() */ /* skip node */ continue; } /* within the node, check each subnode */ #ifdef GEOMETRY_REQUESTS if (ProcBMStr[0] != '\0') { /* check node here, a request was given */ if (node_satisfies_request(pnode,ProcBMStr) == TRUE) { reserve_node(pnode,newstate,pjob,ProcBMStr,&hlist); } continue; } #endif /* GEOMETRY_REQUESTS */ /* place the gpus in the gpulist */ for (j = 0; j < pnode->nd_ngpus && pnode->nd_ngpus_needed > 0; j++) { sprintf(log_buffer, "node: %s j %d ngpus %d need %d", pnode->nd_name, j, pnode->nd_ngpus, pnode->nd_ngpus_needed); if (LOGLEVEL >= 7) { log_ext(-1, id, log_buffer, LOG_DEBUG); } DBPRT(("%s\n", log_buffer)); gn = pnode->nd_gpusn + j; if ((gn->state == gpu_unavailable) || #ifdef NVIDIA_GPUS ((gn->state == gpu_exclusive) && pnode->nd_gpus_real) || (pnode->nd_gpus_real && ((int)gn->mode == gpu_normal) && ((gpu_mode_rqstd != gpu_normal) && (gn->state != gpu_unallocated))) || ((!pnode->nd_gpus_real) && (gn->inuse == TRUE))) #else (gn->inuse == TRUE)) #endif /* NVIDIA_GPUS */ continue; add_job_to_gpu_subnode(pnode,gn,pjob); sprintf(log_buffer, "ADDING gpu %s/%d to exec_gpus still need %d", pnode->nd_name, j, pnode->nd_ngpus_needed); if (LOGLEVEL >= 7) { log_ext(-1, id, log_buffer, LOG_DEBUG); } DBPRT(("%s\n", log_buffer)); add_gpu_to_hostlist(&gpu_list,gn,pnode); #ifdef NVIDIA_GPUS /* * If this a real gpu in exclusive/single job mode, or a gpu in default * mode and the job requested an exclusive mode then we change state * to exclusive so we cannot assign another job to it */ if ((pnode->nd_gpus_real) && ((gn->mode == gpu_exclusive_thread) || (gn->mode == gpu_exclusive_process) || ((gn->mode == gpu_normal) && ((gpu_mode_rqstd == gpu_exclusive_thread) || (gpu_mode_rqstd == gpu_exclusive_process))))) { gn->state = gpu_exclusive; sprintf(log_buffer, "Setting gpu %s/%d to state EXCLUSIVE for job %s", pnode->nd_name, j, pjob->ji_qs.ji_jobid); if (LOGLEVEL >= 7) { log_ext(-1, id, log_buffer, LOG_DEBUG); } } /* * If this a real gpu in shared/default job mode and the state is * unallocated then we change state to shared so only other shared jobs * can use it */ if ((pnode->nd_gpus_real) && (gn->mode == gpu_normal) && (gpu_mode_rqstd == gpu_normal) && (gn->state == gpu_unallocated)) { gn->state = gpu_shared; sprintf(log_buffer, "Setting gpu %s/%d to state SHARED for job %s", pnode->nd_name, j, pjob->ji_qs.ji_jobid); if (LOGLEVEL >= 7) { log_ext(-1, id, log_buffer, LOG_DEBUG); } } #endif /* NVIDIA_GPUS */ } /* make sure we found gpus to use, if job needed them */ if (pnode->nd_ngpus_needed > 0) { /* no resources located, request failed */ if (EMsg != NULL) { sprintf(log_buffer, "could not locate requested gpu resources '%.4000s' (node_spec failed) %s", spec, EMsg); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } return(PBSE_RESCUNAV); } for (snp = pnode->nd_psn;snp && pnode->nd_needed;snp = snp->next) { if (exclusive) { if (snp->inuse != INUSE_FREE) continue; } else { if ((snp->inuse != INUSE_FREE) && (snp->inuse != INUSE_JOBSHARE)) continue; } /* Mark subnode as being IN USE */ add_job_to_node(pnode,snp,newstate,pjob,exclusive); build_host_list(&hlist,snp,pnode); } /* END for (snp) */ } /* END for (i) */ /* did we have a request for procs? Do those now */ if(procs > 0) { /* check to see if a -l nodes request was made */ if(pjob->ji_have_nodes_request) { procs_needed = procs; } else { /* the qsub request used -l procs only. No -l nodes=x was given in the qsub request. TORQUE allocates 1 node by default if a -l nodes specification is not given. */ if(procs > 1) { procs_needed = procs - 1; } else procs_needed = 1; } for (i = 0;i < svr_totnodes;i++) { pnode = pbsndlist[i]; if (pnode->nd_state & INUSE_DELETED) continue; for (snp = pnode->nd_psn;snp && procs_needed > 0;snp = snp->next) { if(exclusive) { if(snp->inuse != INUSE_FREE) continue; } else { if ((snp->inuse != INUSE_FREE) && (snp->inuse != INUSE_JOBSHARE)) continue; } /* Mark subnode as being IN USE */ pnode->nd_needed++; /* we do this because add_job_to_node will decrement it */ /* We need to set the node to thinking. */ pnode->nd_flag = thinking; add_job_to_node(pnode,snp,newstate,pjob,exclusive); build_host_list(&hlist,snp,pnode); procs_needed--; } /* END for (snp) */ } } if (hlist == NULL) { if (LOGLEVEL >= 1) { sprintf(log_buffer, "no nodes can be allocated to job %s", pjob->ji_qs.ji_jobid); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } if (EMsg != NULL) sprintf(EMsg, "no nodes can be allocated to job"); return(PBSE_RESCUNAV); } /* END if (hlist == NULL) */ pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HasNodes; /* indicate has nodes */ /* build list of allocated nodes */ i = 1; /* first, size list */ for (hp = hlist;hp != NULL;hp = hp->next) { i += (strlen(hp->name) + 6); } nodelist = malloc(++i); /* allocate the gpu list */ i = 1; for (hp = gpu_list; hp != NULL; hp = hp->next) i += strlen(hp->name) + 6; if ( i > 1) gpu_str = malloc(i+1); if (nodelist == NULL) { sprintf(log_buffer, "no nodes can be allocated to job %s - no memory", pjob->ji_qs.ji_jobid); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); if (EMsg != NULL) sprintf(EMsg,"no nodes can be allocated to job"); return(PBSE_RESCUNAV); } *nodelist = '\0'; /* now copy in name+name+... */ NCount = 0; for (hp = hlist;hp;hp = nxt) { NCount++; sprintf(nodelist + strlen(nodelist), "%s/%d+", hp->name, hp->index); nxt = hp->next; free(hp); } /* now do the same for the gpu_str, if necessary * add the gpu_str directly to the job */ if (gpu_str != NULL) { gpu_str[0] = '\0'; for (hp = gpu_list; hp != NULL; hp = nxt) { sprintf(gpu_str + strlen(gpu_str), "%s/%d+", hp->name, hp->index); nxt = hp->next; free(hp); } /* strip trailing '+' */ gpu_str[strlen(gpu_str) - 1] = '\0'; job_attr_def[JOB_ATR_exec_gpus].at_free( &pjob->ji_wattr[JOB_ATR_exec_gpus]); job_attr_def[JOB_ATR_exec_gpus].at_decode( &pjob->ji_wattr[JOB_ATR_exec_gpus], NULL, NULL, gpu_str); /* O */ free(gpu_str); } *(nodelist + strlen(nodelist) - 1) = '\0'; /* strip trailing + */ *rtnlist = nodelist; #ifdef NVIDIA_GPUS /* if we have exec_gpus then fill in the extra gpu_flags */ if ((pjob->ji_wattr[JOB_ATR_exec_gpus].at_flags & ATR_VFLAG_SET) != 0) { if (gpu_mode_rqstd != -1) gpu_flags = gpu_mode_rqstd; if (gpu_err_reset) gpu_flags += 1000; if (gpu_flags >= 0) { pjob->ji_wattr[(int)JOB_ATR_gpu_flags].at_val.at_long = gpu_flags; pjob->ji_wattr[(int)JOB_ATR_gpu_flags].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; if (LOGLEVEL >= 7) { sprintf(log_buffer, "setting gpu_flags for job %s to %d %ld", pjob->ji_qs.ji_jobid, gpu_flags, pjob->ji_wattr[(int)JOB_ATR_gpu_flags].at_val.at_long); log_ext(-1, id, log_buffer, LOG_DEBUG); } job_save(pjob,SAVEJOB_FULL); } } #endif /* NVIDIA_GPUS */ if (LOGLEVEL >= 3) { snprintf(log_buffer, sizeof(log_buffer), "job %s allocated %d nodes (nodelist=%.4000s)", pjob->ji_qs.ji_jobid, NCount, nodelist); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } /* SUCCESS */ return(PBSE_NONE); } /* END set_nodes() */ /* count the number of requested processors in a node spec * return processors requested on success * return -1 on error */ int procs_requested(char *spec) { char *id = "procs_requested"; char *str, *globs, *cp, *hold; int num_nodes = 0, num_procs = 0, total_procs = 0, num_gpus = 0; int i; static char shared[] = "shared"; struct prop *prop = NULL; char *tmp_spec; tmp_spec = strdup(spec); if (tmp_spec == NULL) { /* FAILURE */ sprintf(log_buffer,"cannot alloc memory"); if (LOGLEVEL >= 1) { log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } return(-2); } /* Check to see if we have a global modifier */ if ((globs = strchr(tmp_spec, '#')) != NULL) { *globs++ = '\0'; globs = strdup(globs); while ((cp = strrchr(globs, '#')) != NULL) { *cp++ = '\0'; if (strcmp(cp, shared) != 0) { hold = mod_spec(spec, cp); free(tmp_spec); tmp_spec = hold; } else { exclusive = 0; } } if (strcmp(globs, shared) != 0) { hold = mod_spec(tmp_spec, globs); free(tmp_spec); tmp_spec = hold; } else { exclusive = 0; } free(globs); } /* END if ((globs = strchr(spec,'#')) != NULL) */ str = tmp_spec; do { if((i = number(&str, &num_nodes)) == -1 ) { /* Bad string syntax. Fail */ return(-1); } if (i == 0) { /* number exists */ if (*str == ':') { /* there are properties */ str++; if (proplist(&str, &prop, &num_procs, &num_gpus)) { return(-1); } } } else { /* no number */ num_nodes = 1; if (proplist(&str, &prop, &num_procs, &num_gpus)) { /* must be a prop list with no number in front */ return(-1); } } total_procs += num_procs * num_nodes; } while(*str++ == '+'); free(tmp_spec); return(total_procs); } /* * node_avail_complex - * *navail is set to number available * *nalloc is set to number allocated * *nresvd is set to number reserved * *ndown is set to number down/offline * return -1 on failure */ int node_avail_complex( char *spec, /* I - node spec */ int *navail, /* O - number available */ int *nalloc, /* O - number allocated */ int *nresvd, /* O - number reserved */ int *ndown) /* O - number down */ { int holdnum; int ret; holdnum = svr_numnodes; ret = node_spec(spec, 1, 0, NULL, NULL, NULL); svr_numnodes = holdnum; *navail = ret; *nalloc = 0; *nresvd = 0; *ndown = 0; return(ret); } /* END node_avail_complex() */ /* * node_avail - report if nodes requested are available * Does NOT even consider Time Shared Nodes * * Return 0 when no error in request and * *navail is set to number available * *nalloc is set to number allocated * *nresvd is set to number reserved * *ndown is set to number down/offline * !=0 error number when error in request */ int node_avail( char *spec, /* I - node spec */ int *navail, /* O - number available */ int *nalloc, /* O - number allocated */ int *nresvd, /* O - number reserved */ int *ndown) /* O - number down */ { char *id = "node_avail"; int i; int j; int holdnum; struct pbsnode *pn; struct pbssubn *psn; char *pc; struct prop *prop = NULL; register int xavail; register int xalloc; register int xresvd; register int xdown; int node_req = 1; int gpu_req = 0; /*int gpu_mode = gpu_exclusive_thread;*/ if (spec == NULL) { log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, "no spec"); return(RM_ERR_NOPARAM); } pc = spec; if ((strchr(spec, (int)'+') == NULL) && (number(&pc, &holdnum) == 1)) { /* A simple node spec - reply with numbers of avaiable, */ /* allocated, reserved, and down nodes that match the */ /* the spec, null or simple number means all */ xavail = 0; xalloc = 0; xresvd = 0; xdown = 0; /* find number of a specific type of node */ if (*pc) { if (proplist(&pc, &prop, &node_req, &gpu_req)) { return(RM_ERR_BADPARAM); } } for (i = 0;i < svr_totnodes;i++) { pn = pbsndlist[i]; if (pn->nd_state & INUSE_DELETED) continue; if ((pn->nd_ntype == NTYPE_CLUSTER) && hasprop(pn, prop)) { if (pn->nd_state & (INUSE_OFFLINE | INUSE_DOWN)) ++xdown; else if (hasppn(pn, node_req, SKIP_ANYINUSE)) ++xavail; else if (hasppn(pn, node_req, SKIP_NONE)) { /* node has enough processors, are they busy or reserved? */ j = 0; for (psn = pn->nd_psn;psn;psn = psn->next) { if (psn->inuse & INUSE_RESERVE) j++; } if (j >= node_req) ++xresvd; else ++xalloc; } } } /* END for (i) */ free_prop(prop); *navail = xavail; *nalloc = xalloc; *nresvd = xresvd; *ndown = xdown; return(0); } else if (number(&pc, &holdnum) == -1) { /* invalid spec */ return(RM_ERR_BADPARAM); } /* not a simple spec - determine if supplied complex */ /* node spec can be satisified from avail nodes */ /* navail set to >0 if can be satified now */ /* 0 if not now but possible */ /* -l if never possible */ node_avail_complex(spec, navail, nalloc, nresvd, ndown); return(0); } /* END node_avail() */ /* * node_reserve - Reserve nodes * Cannot reserve Time Shared Nodes * * Returns: >0 - reservation succeeded, number of nodes reserved * 0 - None or partial reservation * -1 - requested reservation impossible */ int node_reserve( char *nspec, /* In - a node specification */ resource_t tag) /* In/Out - tag for resource if reserved */ { static char id[] = "node_reserve"; int nrd; struct pbsnode *pnode; struct pbssubn *snp; int ret_val; int i; DBPRT(("%s: entered\n", id)) if ((nspec == NULL) || (*nspec == '\0')) { log_event( PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, id, "no spec"); return(-1); } if ((ret_val = node_spec(nspec, 0, 0, NULL, NULL, NULL)) >= 0) { /* ** Zero or more of the needed Nodes are available to be ** reserved. */ for (i = 0;i < svr_totnodes;i++) { pnode = pbsndlist[i]; if (pnode->nd_state & INUSE_DELETED) continue; if (pnode->nd_flag != thinking) continue; /* skip this one */ nrd = 0; for (snp = pnode->nd_psn;snp && pnode->nd_needed;snp = snp->next) { if (snp->inuse == INUSE_FREE) { DBPRT(("hold %s/%d\n", pnode->nd_name, snp->index)) snp->inuse |= INUSE_RESERVE; snp->allocto = tag; pnode->nd_nsnfree--; /* in reserve, not reached? */ --pnode->nd_needed; ++nrd; } } if (nrd == pnode->nd_nsn) pnode->nd_state = INUSE_RESERVE; } } else { /* could never satisfy the reservation */ snprintf(log_buffer, sizeof(log_buffer), "can never reserve %s", nspec); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } return(ret_val); } /* END node_reserve() */ /* * is_ts_node - does the nodestr specify a single time share node? * 0 - yes * 1 - no, not a ts node or more than one node (name will not match) */ int is_ts_node( char *nodestr) { struct pbsnode *np; int i; for (i = 0;i < svr_totnodes;i++) { np = pbsndmast[i]; if (((np->nd_state & INUSE_DELETED) == 0) && (np->nd_ntype == NTYPE_TIMESHARED)) { if (!strcmp(nodestr, np->nd_name)) { return(0); } } } return(1); } /* END is_ts_node() */ /* * find_ts_node - find first up time-shared node * * returns name of node or null */ char * find_ts_node(void) { struct pbsnode *np; int i; for (i = 0;i < svr_totnodes;i++) { np = pbsndmast[i]; if ((np->nd_ntype == NTYPE_TIMESHARED) && ((np->nd_state & (INUSE_DOWN | INUSE_DELETED | INUSE_OFFLINE)) == 0)) { return(np->nd_name); } } return(NULL); } /* END find_ts_node() */ /* * free_nodes - free nodes allocated to a job */ void free_nodes( job *pjob) /* I (modified) */ { static char id[] = "free_nodes"; struct pbssubn *np; struct pbsnode *pnode; struct jobinfo *jp, *prev; int i; int j; char *gpu_str = NULL; #ifdef NVIDIA_GPUS char tmp_str[PBS_MAXHOSTNAME + 5]; char num_str[6]; #endif /* NVIDIA_GPUS */ if (LOGLEVEL >= 3) { sprintf(log_buffer, "freeing nodes for job %s", pjob->ji_qs.ji_jobid); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } if ((pjob->ji_wattr[JOB_ATR_exec_gpus].at_flags & ATR_VFLAG_SET) != 0) { gpu_str = pjob->ji_wattr[JOB_ATR_exec_gpus].at_val.at_str; } /* examine all nodes in cluster */ for (i = 0;i < svr_totnodes;i++) { pnode = pbsndlist[i]; if (pnode->nd_state & INUSE_DELETED) continue; if (gpu_str != NULL) { /* reset gpu nodes */ for (j = 0; j < pnode->nd_ngpus; j++) { struct gpusubn *gn = pnode->nd_gpusn + j; #ifdef NVIDIA_GPUS if (pnode->nd_gpus_real) { /* reset real gpu nodes */ strcpy (tmp_str, pnode->nd_name); strcat (tmp_str, "-gpu/"); sprintf (num_str, "%d", j); strcat (tmp_str, num_str); /* look thru the string and see if it has this host and gpuid. * exec_gpus string should be in format of * -gpu/[+-gpu/...] * * if we are using the gpu node exclusively or if shared mode and * this is last job assigned to this gpu then set it's state * unallocated so its available for a new job. Takes time to get the * gpu status report from the moms. */ if (strstr (gpu_str, tmp_str) != NULL) { if ((gn->mode == gpu_exclusive_thread) || (gn->mode == gpu_exclusive_process) || ((gn->mode == gpu_normal) && (count_gpu_jobs(pnode->nd_name, j) == 0))) { gn->state = gpu_unallocated; if (LOGLEVEL >= 7) { sprintf(log_buffer, "freeing node %s gpu %d for job %s", pnode->nd_name, j, pjob->ji_qs.ji_jobid); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } } } } else #endif /* NVIDIA_GPUS */ { if (gn->pjob == pjob) { gn->inuse = FALSE; gn->pjob = NULL; pnode->nd_ngpus_free++; } } } } /* examine all subnodes in node */ for (np = pnode->nd_psn;np != NULL;np = np->next) { /* examine all jobs allocated to subnode */ for (prev = NULL, jp = np->jobs;jp != NULL;prev = jp, jp = jp->next) { if (jp->job != pjob) continue; if (LOGLEVEL >= 4) { sprintf(log_buffer, "freeing node %s/%d from job %s (nsnfree=%d)", pnode->nd_name, np->index, pjob->ji_qs.ji_jobid, pnode->nd_nsnfree); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } if (prev == NULL) np->jobs = jp->next; else prev->next = jp->next; free(jp); pnode->nd_nsnfree++; /* up count of free */ if (LOGLEVEL >= 6) { sprintf(log_buffer, "increased sub-node free count to %d of %d\n", pnode->nd_nsnfree, pnode->nd_nsn); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } pnode->nd_state &= ~(INUSE_JOB | INUSE_JOBSHARE); /* if no jobs are associated with subnode, mark subnode as free */ if (np->jobs == NULL) { if (np->inuse & INUSE_JOBSHARE) pnode->nd_nsnshared--; /* adjust node state (turn off job/job-exclusive) */ np->inuse &= ~(INUSE_JOB | INUSE_JOBSHARE); } break; } /* END for (prev) */ } /* END for (np) */ } /* END for each node */ pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_HasNodes; return; } /* END free_nodes() */ /* * set_one_old - set a named node as allocated to a job */ static void set_one_old( char *name, job *pjob, int shared) /* how used flag, either INUSE_JOB or INUSE_JOBSHARE */ { int i; int index; struct pbsnode *pnode; struct pbssubn *snp; struct jobinfo *jp; char *pc; if ((pc = strchr(name, (int)'/'))) { index = atoi(pc + 1); *pc = '\0'; } else { index = 0; } for (i = 0;i < svr_totnodes;i++) { pnode = pbsndmast[i]; if (strcmp(name, pnode->nd_name) == 0) { /* Mark node as being IN USE ... */ if (pnode->nd_ntype == NTYPE_CLUSTER) { for (snp = pnode->nd_psn;snp;snp = snp->next) { if (snp->index == index) { snp->inuse = shared; jp = (struct jobinfo *)malloc(sizeof(struct jobinfo)); /* NOTE: should report failure if jp == NULL (NYI) */ if (jp != NULL) { jp->next = snp->jobs; snp->jobs = jp; jp->job = pjob; } if (--pnode->nd_nsnfree <= 0) pnode->nd_state |= shared; return; } } /* END for (snp) */ } } } /* END for (i) */ return; } /* END set_one_old() */ /* * set_old_nodes - set "old" nodes as in use - called from pbsd_init() * when recovering a job in the running state. */ void set_old_nodes( job *pjob) /* I (modified) */ { char *old; char *po; resource *presc; int shared = INUSE_JOB; if ((pbsndmast != NULL) && (pjob->ji_wattr[(int)JOB_ATR_exec_host].at_flags & ATR_VFLAG_SET)) { /* are the nodes being used shared? Look in "neednodes" */ presc = find_resc_entry( &pjob->ji_wattr[(int)JOB_ATR_resource], find_resc_def(svr_resc_def, "neednodes", svr_resc_size)); if ((presc != NULL) && (presc->rs_value.at_flags & ATR_VFLAG_SET)) { if ((po = strchr(presc->rs_value.at_val.at_str, '#'))) { if (strstr(++po, "shared") != NULL) shared = INUSE_JOBSHARE; } } old = strdup(pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str); if (old == NULL) { /* FAILURE - cannot alloc memory */ return; } while ((po = strrchr(old, (int)'+')) != NULL) { *po++ = '\0'; set_one_old(po, pjob, shared); } set_one_old(old, pjob, shared); free(old); } /* END if ((pbsndmast != NULL) && ...) */ return; } /* END set_old_nodes() */ /* END node_manager.c */