/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ #include <pbs_config.h> /* the master config generated by configure */ #include "node_manager.h" #include <string> #include <sstream> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <ctype.h> #include <errno.h> #include <time.h> #include <sys/types.h> #include <netinet/in.h> #include <stdarg.h> #include <assert.h> #if defined(NTOHL_NEEDS_ARPA_INET_H) && defined(HAVE_ARPA_INET_H) #include <arpa/inet.h> #endif #include <vector> #include "portability.h" #include "libpbs.h" #include "server_limits.h" #include "list_link.h" #include "attribute.h" #include "resource.h" #include "server.h" #include "net_connect.h" #include "batch_request.h" #include "work_task.h" #include "svrfunc.h" #include "pbs_job.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "pbs_nodes.h" #include "dis.h" #include "dis_init.h" #include "resmon.h" #include "mcom.h" #include "utils.h" #include "u_tree.h" #include "threadpool.h" #include "node_func.h" /* find_nodebyname */ #include "../lib/Libutils/u_lock_ctl.h" /* lock_node, unlock_node */ #include "../lib/Libnet/lib_net.h" /* socket_read_flush */ #include "svr_func.h" /* get_svr_attr_* */ #include "alps_functions.h" #include "login_nodes.h" #include "svr_connect.h" /* svr_disconnect_sock */ #include "net_cache.h" #include "ji_mutex.h" #include "alps_constants.h" #include "mutex_mgr.hpp" #define IS_VALID_STR(STR) (((STR) != NULL) && ((STR)[0] != '\0')) extern int LOGLEVEL; #if !defined(H_ERRNO_DECLARED) && !defined(_AIX) /*extern int h_errno;*/ #endif int svr_totnodes = 0; /* total number nodes defined */ int svr_clnodes = 0; /* number of cluster nodes */ int svr_chngNodesfile = 0; /* 1 signals want nodes file update */ int gpu_mode_rqstd = -1; /* default gpu mode requested */ int gpu_err_reset = FALSE; /* was a gpu errcount reset requested */ /* on server shutdown, (qmgr mods) */ all_nodes allnodes; static int num_addrnote_tasks = 0; /* number of outstanding send_cluster_addrs tasks */ pthread_mutex_t *addrnote_mutex = NULL; extern int server_init_type; extern int has_nodes; extern int create_a_gpusubnode(struct pbsnode *); int is_gpustat_get(struct pbsnode *np, char **str_ptr); extern int ctnodes(char *); extern char *path_home; extern char *path_nodes; extern char *path_nodes_new; extern char *path_nodestate; extern char *path_nodenote; extern char *path_nodenote_new; extern char server_name[]; extern struct server server; extern tlist_head svr_newnodes; extern attribute_def node_attr_def[]; /* node attributes defs */ extern int SvrNodeCt; extern struct all_jobs alljobs; extern int multi_mom; #define SKIP_NONE 0 #define SKIP_EXCLUSIVE 1 #define SKIP_ANYINUSE 2 #define SKIP_NONE_REUSE 3 #ifndef MAX_BM #define MAX_BM 64 #endif int handle_complete_first_time(job *pjob); int is_compute_node(char *node_id); int hasprop(struct pbsnode *, struct prop *); int add_job_to_node(struct pbsnode *,struct pbssubn *,short,job *); int node_satisfies_request(struct pbsnode *,char *); int reserve_node(struct pbsnode *,short,job *,char *,struct howl **); int build_host_list(struct howl **,struct pbssubn *,struct pbsnode *); int procs_available(int proc_ct); void check_nodes(struct work_task *); int gpu_entry_by_id(struct pbsnode *,char *, int); job *get_job_from_jobinfo(struct jobinfo *,struct pbsnode *); /* marks a stream as finished being serviced */ pthread_mutex_t *node_state_mutex = NULL; /** ** Modified by Tom Proett <proett@nas.nasa.gov> for PBS. */ AvlTree ipaddrs = NULL; /** * specialized version of tfind for looking in the ipadders tree * @param key - the node we are searching for * @return a pointer to the pbsnode */ struct pbsnode *tfind_addr( const u_long key, uint16_t port, char *job_momname) { struct pbsnode *pn = AVL_find(key,port,ipaddrs); if (pn == NULL) return(NULL); lock_node(pn, __func__, "pn", LOGLEVEL); if (pn->num_node_boards == 0) return(pn); else { char *dash = NULL; char *plus = NULL; char *tmp = job_momname; struct pbsnode *numa = NULL; int index; plus = strchr(tmp,'+'); if (plus != NULL) *plus = '\0'; while ((tmp = strchr(tmp,'-')) != NULL) { dash = tmp; tmp++; } if (dash == NULL) { /* node has numa nodes but no dashes in exec host?? */ log_err(-1, __func__, "Numa node but there's no dash in exec_host?"); return(pn); } index = atoi(dash+1); numa = AVL_find(index, pn->nd_mom_port, pn->node_boards); unlock_node(pn, __func__, "pn->numa", LOGLEVEL); lock_node(numa, __func__, "numa", LOGLEVEL); if (plus != NULL) *plus = '+'; return(numa); } } /* END tfind_addr() */ /* update_node_state - central location for updating node state */ /* NOTE: called each time a node is marked down, each time a MOM reports node */ /* status, and when pbs_server sends hello/cluster_addrs */ void update_node_state( struct pbsnode *np, /* I (modified) */ int newstate) /* I (one of INUSE_*) */ { char log_buf[LOCAL_LOG_BUF_SIZE]; /* No need to do anything if newstate == oldstate */ if (np->nd_state == newstate) return; /* * LOGLEVEL >= 4 logs all state changes * >= 2 logs down->(busy|free) changes * (busy|free)->down changes are always logged */ if (LOGLEVEL >= 4) { sprintf(log_buf, "adjusting state for node %s - state=%d, newstate=%d", (np->nd_name != NULL) ? np->nd_name : "NULL", np->nd_state, newstate); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } log_buf[0] = '\0'; if (newstate & INUSE_DOWN) { if (!(np->nd_state & INUSE_DOWN)) { sprintf(log_buf, "node %s marked down", (np->nd_name != NULL) ? np->nd_name : "NULL"); np->nd_state |= INUSE_DOWN; np->nd_state &= ~INUSE_UNKNOWN; } /* ignoring the obvious possibility of a "down,busy" node */ } /* END if (newstate & INUSE_DOWN) */ else if (newstate & INUSE_BUSY) { if ((!(np->nd_state & INUSE_BUSY) && (LOGLEVEL >= 4)) || ((np->nd_state & INUSE_DOWN) && (LOGLEVEL >= 2))) { sprintf(log_buf, "node %s marked busy", (np->nd_name != NULL) ? np->nd_name : "NULL"); } np->nd_state |= INUSE_BUSY; np->nd_state &= ~INUSE_UNKNOWN; if (np->nd_state & INUSE_DOWN) { np->nd_state &= ~INUSE_DOWN; } } /* END else if (newstate & INUSE_BUSY) */ else if (newstate == INUSE_FREE) { if (((np->nd_state & INUSE_DOWN) && (LOGLEVEL >= 2)) || ((np->nd_state & INUSE_BUSY) && (LOGLEVEL >= 4))) { sprintf(log_buf, "node %s marked free", (np->nd_name != NULL) ? np->nd_name : "NULL"); } np->nd_state &= ~INUSE_BUSY; np->nd_state &= ~INUSE_UNKNOWN; if (np->nd_state & INUSE_DOWN) { np->nd_state &= ~INUSE_DOWN; } } /* END else if (newstate == INUSE_FREE) */ if (newstate & INUSE_UNKNOWN) { np->nd_state |= INUSE_UNKNOWN; } if ((LOGLEVEL >= 2) && (log_buf[0] != '\0')) { log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } return; } /* END update_node_state() */ int check_node_for_job( struct pbsnode *pnode, char *jobid) { for (int i = 0; i < (int)pnode->nd_job_usages.size(); i++) { job_usage_info *jui = pnode->nd_job_usages[i]; if (!strcmp(jobid, jui->jobid)) return(TRUE); } /* not found */ return(FALSE); } /* END check_node_for_job() */ /* * record_reported_time() * * @pre-cond: vp must be a character pointer to a job id * @post-cond: the job's last reported time is updated. This is used to safeguard against * deleting jobs pre-maturely * * @param vp - the jobid of the job * */ void *record_reported_time( void *vp) { char *job_and_node = (char *)vp; if (job_and_node != NULL) { char *jobid = job_and_node; char *node_id; char *colon = strchr(job_and_node, ':'); if (colon != NULL) { *colon = '\0'; node_id = colon + 1; job *pjob = svr_find_job(jobid, TRUE); if (pjob != NULL) { mutex_mgr job_mutex(pjob->ji_mutex, true); if (pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str != NULL) { if (!strncmp(node_id, pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str, strlen(node_id))) pjob->ji_last_reported_time = time(NULL); } } } free(job_and_node); } return(NULL); } /* END record_reported_time() */ /* * is_job_on_node - return TRUE if this jobid is present on pnode */ int is_job_on_node( struct pbsnode *pnode, /* I */ char *jobid) /* I */ { struct pbsnode *numa; int present = FALSE; char *at; int i; if ((at = strchr(jobid, (int)'@')) != NULL) *at = '\0'; /* strip off @server_name */ if (pnode->num_node_boards > 0) { /* check each subnode on each numa node for the job */ for (i = 0; i < pnode->num_node_boards; i++) { numa = AVL_find(i,pnode->nd_mom_port,pnode->node_boards); lock_node(numa, __func__, "before check_node_for_job numa", LOGLEVEL); present = check_node_for_job(pnode,jobid); unlock_node(numa, __func__, "after check_node_for_job numa", LOGLEVEL); /* leave loop if we found the job */ if (present != FALSE) break; } /* END for each numa node */ } else { present = check_node_for_job(pnode, jobid); if (present == TRUE) { char *job_and_node = (char *)calloc(1, strlen(jobid) + 2 + strlen(pnode->nd_name)); sprintf(job_and_node, "%s:%s", jobid, pnode->nd_name); enqueue_threadpool_request(record_reported_time, job_and_node); } } if (at != NULL) *at = '@'; /* restore @server_name */ return(present); } /* END is_job_on_node() */ /* If nodes have similiar names this will make sure the name is an exact match. * Not just found inside another name. * i.e. Machines by the name of gpu, gpuati, gpunvidia. If searching for gpu... * List format is similiar to: gpuati+gpu/1+gpunvidia/4+gpu/5 */ int node_in_exechostlist( char *node_name, char *node_ehl) { int rc = FALSE; char *cur_pos = node_ehl; char *new_pos = cur_pos; int name_len = strlen(node_name); while (1) { if ((new_pos = strstr(cur_pos, node_name)) == NULL) break; else if (new_pos == node_ehl) { if ((new_pos+name_len == NULL) || (*(new_pos+name_len) == '+') || (*(new_pos+name_len) == '/')) { rc = TRUE; break; } } else if (*(new_pos-1) == '+') { if ((new_pos+name_len == NULL) || (*(new_pos+name_len) == '+') || (*(new_pos+name_len) == '/')) { rc = TRUE; break; } } cur_pos = new_pos+1; } return(rc); } /* END node_in_exechostlist() */ int kill_job_on_mom( char *jobid, struct pbsnode *pnode) { batch_request *preq; int rc = -1; int conn; int local_errno = 0; char log_buf[LOCAL_LOG_BUF_SIZE]; /* job is reported by mom but server has no record of job */ sprintf(log_buf, "stray job %s found on %s", jobid, pnode->nd_name); log_err(-1, __func__, log_buf); conn = svr_connect(pnode->nd_addrs[0], pnode->nd_mom_port, &local_errno, pnode, NULL, ToServerDIS); if (conn >= 0) { if ((preq = alloc_br(PBS_BATCH_SignalJob)) == NULL) { log_err(-1, __func__, "unable to allocate SignalJob request-trouble!"); svr_disconnect(conn); } else { snprintf(preq->rq_ind.rq_signal.rq_jid, sizeof(preq->rq_ind.rq_signal.rq_jid), "%s", jobid); snprintf(preq->rq_ind.rq_signal.rq_signame, sizeof(preq->rq_ind.rq_signal.rq_signame), "SIGKILL"); preq->rq_extra = strdup(SYNC_KILL); unlock_node(pnode, __func__, NULL, LOGLEVEL); rc = issue_Drequest(conn, preq); free_br(preq); lock_node(pnode, __func__, NULL, LOGLEVEL); } } return(rc); } /* END kill_job_on_mom() */ pthread_mutex_t jobsKilledMutex = PTHREAD_MUTEX_INITIALIZER; std::vector<std::string> jobsKilled; #define JOB_SYNC_TIMEOUT 60 //Once a kill job has been sent to a MOM, don't send another for five minutes. /* * Delayed task to remove a killed job from the list in * case it needs to be removed again. */ void remove_job_from_already_killed_list(struct work_task *pwt) { std::string *pJobID = (std::string *)pwt->wt_parm1; free(pwt->wt_mutex); free(pwt); if(pJobID == NULL) return; pthread_mutex_lock(&jobsKilledMutex); for(std::vector<std::string>::iterator i = jobsKilled.begin();i != jobsKilled.end();i++) { if((*i).compare(*pJobID) == 0) { jobsKilled.erase(i); if(i == jobsKilled.end()) { break; } } } pthread_mutex_unlock(&jobsKilledMutex); delete pJobID; } /* * If a job is not supposed to be on a node and we have * not sent a kill to that job in the last 5 minutes * then the job should be killed. */ bool job_should_be_killed( char *jobid, struct pbsnode *pnode) { bool should_be_on_node = true; bool should_kill_job = false; job *pjob; if (strstr(jobid, server_name) != NULL) { if ((is_job_on_node(pnode, jobid)) == FALSE) { /* must lock the job before the node */ unlock_node(pnode, __func__, NULL, LOGLEVEL); pjob = svr_find_job(jobid, TRUE); lock_node(pnode, __func__, NULL, LOGLEVEL); if (pjob != NULL) { /* job exists, but doesn't currently have resources assigned * to this node double check the job struct because we * could be in the middle of moving the job around because * of data staging, suspend, or rerun */ mutex_mgr job_mgr(pjob->ji_mutex,true); if (pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str == NULL) { should_be_on_node = false; } else if (node_in_exechostlist(pnode->nd_name, pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str) == FALSE) { should_be_on_node = false; } } else should_be_on_node = false; } } if(!should_be_on_node) { bool jobAlreadyKilled = false; //Job should not be on the node, see if we have already sent a kill for this job. pthread_mutex_lock(&jobsKilledMutex); for(std::vector<std::string>::iterator i = jobsKilled.begin();(i != jobsKilled.end())&&(jobAlreadyKilled == false);i++) { if((*i).compare(jobid) == 0) { jobAlreadyKilled = true; } } if(!jobAlreadyKilled) { should_kill_job = true; } pthread_mutex_unlock(&jobsKilledMutex); } return(should_kill_job); } /* END job_should_be_on_node() */ void *finish_job( void *vp) { char *jobid = (char *)vp; job *pjob; if (jobid == NULL) return(NULL); else if ((pjob = svr_find_job(jobid, TRUE)) == NULL) { free(jobid); return(NULL); } mutex_mgr job_mgr(pjob->ji_mutex,true); job_mgr.set_lock_on_exit(false); free(jobid); free_nodes(pjob); svr_setjobstate(pjob, JOB_STATE_COMPLETE, JOB_SUBSTATE_COMPLETE, FALSE); handle_complete_first_time(pjob); return(NULL); } /* END finish_job() */ int remove_jobs_that_have_disappeared( struct pbsnode *pnode, resizable_array *reported_ms_jobs, time_t timestamp) { int iter = -1; char log_buf[LOCAL_LOG_BUF_SIZE]; char *jobid; while ((jobid = (char *)pop_thing(pnode->nd_ms_jobs)) != NULL) { job *pjob; /* locking priority is job before node */ unlock_node(pnode, __func__, NULL, LOGLEVEL); pjob = svr_find_job(jobid, TRUE); lock_node(pnode, __func__, NULL, LOGLEVEL); if (pjob == NULL) { free(jobid); continue; } mutex_mgr job_mgr(pjob->ji_mutex,true); /* 45 seconds is typically the time between intervals for each update * from the mom. Add this in case a stale update is processed and the job * hadn't started at the time the update was sent */ if ((pjob->ji_qs.ji_state >= JOB_STATE_EXITING) || (pjob->ji_qs.ji_substate < JOB_SUBSTATE_RUNNING) || (pjob->ji_wattr[JOB_ATR_start_time].at_val.at_long > timestamp - 45)) { free(jobid); job_mgr.unlock(); continue; } job_mgr.unlock(); /* mom didn't report this job - it has exited unexpectedly */ snprintf(log_buf, sizeof(log_buf), "Server thinks job %s is on node %s, but the mom doesn't. Terminating job", jobid, pnode->nd_name); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buf); enqueue_threadpool_request(finish_job, jobid); } /* now replace the old resizable array with the ones currently reported */ while ((jobid = (char *)next_thing(reported_ms_jobs, &iter)) != NULL) insert_thing(pnode->nd_ms_jobs, jobid); return(PBSE_NONE); } /* END remove_jobs_that_have_disappeared() */ /* * match_exact_jobid() */ int match_exact_jobid( const char *jobs, const char *jobid) { char *joblist = strdup(jobs); char *jobidstr = NULL; jobidstr = threadsafe_tokenizer(&joblist, (char *)" "); while (jobidstr != NULL) { if (strcmp(jobid, jobidstr) == 0) return(1); jobidstr = threadsafe_tokenizer(&joblist, " "); } return(0); } /* * sync_node_jobs_with_moms() - remove any jobs in the pbsnode (np) that was not * reported by the mom that it's currently running in its status update. */ void sync_node_jobs_with_moms( struct pbsnode *np, /* I */ const char *jobs_in_mom) /* I */ { std::vector<std::string> jobsRemoveFromNode; int removealljobs = (strlen(jobs_in_mom) == 0); for (int i = 0; i < (int)np->nd_job_usages.size(); i++) { int removejob = 0; job_usage_info *jui = np->nd_job_usages[i]; char *jobid = jui->jobid; if (!removealljobs) { char *p = strstr((char *)jobs_in_mom, jobid); /* job is in the node but not in mom */ if (!p) removejob = 1; else if (!(match_exact_jobid(jobs_in_mom, jobid))) removejob = 1; } if (removejob || removealljobs) { unlock_node(np, __func__, NULL, LOGLEVEL); job *pjob = svr_find_job(jobid, TRUE); lock_node(np, __func__, NULL, LOGLEVEL); if (pjob) unlock_ji_mutex(pjob, __func__, NULL, LOGLEVEL); else jobsRemoveFromNode.push_back(std::string (jobid)); } } char log_buf[LOCAL_LOG_BUF_SIZE + 1]; for (std::vector<std::string>::iterator it = jobsRemoveFromNode.begin(); it != jobsRemoveFromNode.end(); it++) { snprintf(log_buf, sizeof(log_buf), "Job %s was not reported in %s update status. Freeing job from node.", (*it).c_str(), np->nd_name); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); remove_job_from_node(np, (*it).c_str()); } } /* end of sync_node_jobs_with_moms */ /* * sync_node_jobs() - determine if a MOM has a stale job and possibly delete it * * This function is called every time we get a node stat from the pbs_mom. * * NOTE: changed to be processed in a thread so that processing here doesn't hinder * the server's ability to reply to the status * * @see is_stat_get() */ void *sync_node_jobs( void *vp) { struct pbsnode *np; sync_job_info *sji = (sync_job_info *)vp; char *raw_input; char *node_id; char *jobstring_in; char *joblist; char *jobidstr; long job_sync_timeout = JOB_SYNC_TIMEOUT; char *jobs_in_mom = NULL; if (vp == NULL) return(NULL); raw_input = sji->input; /* raw_input's format is: * node name:<JOBID>[ <JOBID>]... */ if ((jobstring_in = strchr(raw_input, ':')) != NULL) { node_id = raw_input; *jobstring_in = '\0'; jobstring_in++; } else { /* bad input */ free(raw_input); free(sji); return(NULL); } if ((np = find_nodebyname(node_id)) == NULL) { free(raw_input); free(sji); return(NULL); } /* FORMAT <JOBID>[ <JOBID>]... */ jobs_in_mom = strdup(jobstring_in); joblist = jobstring_in; jobidstr = threadsafe_tokenizer(&joblist, (char *)" "); get_svr_attr_l(SRV_ATR_job_sync_timeout, &job_sync_timeout); while ((jobidstr != NULL) && (isdigit(*jobidstr)) != FALSE) { if (job_should_be_killed(jobidstr, np)) { if (kill_job_on_mom(jobidstr, np) == PBSE_NONE) { pthread_mutex_lock(&jobsKilledMutex); std::string str(jobidstr); jobsKilled.push_back(str); pthread_mutex_unlock(&jobsKilledMutex); set_task(WORK_Timed, time(NULL) + job_sync_timeout, remove_job_from_already_killed_list, (void *)new std::string(jobidstr), FALSE); } } jobidstr = threadsafe_tokenizer(&joblist, " "); } /* END while ((jobidstr != NULL) && ...) */ /* SUCCESS */ free(raw_input); free(sji); if (jobs_in_mom) sync_node_jobs_with_moms(np, jobs_in_mom); unlock_node(np, __func__, NULL, LOGLEVEL); if (jobs_in_mom) free(jobs_in_mom); return(NULL); } /* END sync_node_jobs() */ /* * setup_notification - Sets up the mechanism for notifying * other members of the server's node * pool that a new node was added manually * via qmgr. Actual notification occurs some * time later through the send_cluster_addrs mechanism */ void setup_notification( char *pname) /* I */ { struct pbsnode *pnode; new_node *nnew; if (pname != NULL) { if ((pnode = find_nodebyname(pname)) != NULL) { /* call it offline until after all nodes get the new ipaddr */ pnode->nd_state |= INUSE_OFFLINE; nnew = (new_node *)calloc(1, sizeof(new_node)); if (nnew == NULL) { unlock_node(pnode, __func__, "nnew == NULL", LOGLEVEL); return; } CLEAR_LINK(nnew->nn_link); nnew->nn_name = strdup(pname); append_link(&svr_newnodes, &nnew->nn_link, nnew); unlock_node(pnode, __func__, "nnew != NULL", LOGLEVEL); } } if (addrnote_mutex == NULL) { addrnote_mutex = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t)); pthread_mutex_init(addrnote_mutex,NULL); } pthread_mutex_lock(addrnote_mutex); num_addrnote_tasks++; pthread_mutex_unlock(addrnote_mutex); return; } /* END setup_notification() */ /* ** reset gpu data in case mom reconnects with changed gpus. ** If we have real gpus, not virtual ones, then clear out gpu_status, ** gpus count and remove gpu subnodes. */ void clear_nvidia_gpus( struct pbsnode *np) /* I */ { pbs_attribute temp; if ((np->nd_gpus_real) && (np->nd_ngpus > 0)) { /* delete gpusubnodes by freeing it */ free(np->nd_gpusn); np->nd_gpusn = NULL; /* reset # of gpus, etc */ np->nd_ngpus = 0; np->nd_ngpus_free = 0; /* unset "gpu_status" node attribute */ memset(&temp, 0, sizeof(temp)); if (decode_arst(&temp, NULL, NULL, NULL, 0)) { log_err(-1, __func__, "clear_nvidia_gpus: cannot initialize attribute\n"); return; } node_gpustatus_list(&temp, np, ATR_ACTION_ALTER); } return; } /* END clear_nvidia_gpus() */ /* EOF on a stream received (either stream or addr must be specified) */ /* mark node down */ /* NOTE: pass in stream = -1 if you wish the stream to be optional */ void stream_eof( int stream, /* I (optional) */ u_long addr, /* I (optional) */ uint16_t port, /* I (optional) */ int ret) /* I (ignored) */ { char log_buf[LOCAL_LOG_BUF_SIZE]; enum conn_type cntype = ToServerDIS; int conn; int my_err = 0; struct pbsnode *np = NULL; if (LOGLEVEL >= 6) { sprintf(log_buf, "stream: %d, addr: %ld, port %d", stream, addr, port); LOG_EVENT(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); } if (addr != 0) { np = AVL_find(addr, port, ipaddrs); } if (np == NULL) { /* cannot locate node */ return; } /* Before we mark this node down see if we can connect */ lock_node(np, __func__, "parent", LOGLEVEL); conn = svr_connect(addr, port, &my_err, np, NULL, cntype); if(conn >= 0) { unlock_node(np, __func__, "parent", LOGLEVEL); svr_disconnect(conn); return; } sprintf(log_buf, "connection to %s is no longer valid, connection may have been closed remotely, remote service may be down, or message may be corrupt (%s). setting node state to down", np->nd_name, dis_emsg[ret]); log_err(-1, __func__, log_buf); /* mark node and all subnodes as down */ if (np->num_node_boards > 0) { int i; struct pbsnode *pnode; for (i = 0; i < np->num_node_boards; i++) { pnode = AVL_find(i,np->nd_mom_port,np->node_boards); lock_node(pnode, __func__, "subs", LOGLEVEL); update_node_state(pnode,INUSE_DOWN); unlock_node(pnode, __func__, "subs", LOGLEVEL); } } else update_node_state(np, INUSE_DOWN); unlock_node(np, __func__, "parent", LOGLEVEL); return; } /* END stream_eof() */ /* * wrapper task that check_nodes places in the thread pool's queue */ void *check_nodes_work( void *vp) { work_task *ptask = (struct work_task *)vp; struct pbsnode *np = NULL; long chk_len = 300; char log_buf[LOCAL_LOG_BUF_SIZE]; time_t time_now = time(NULL); node_iterator iter; /* load min refresh interval */ get_svr_attr_l(SRV_ATR_check_rate, &chk_len); if (LOGLEVEL >= 5) { sprintf(log_buf, "verifying nodes are active (min_refresh = %d seconds)", (int)chk_len); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, log_buf); } /* evaluate all nodes */ reinitialize_node_iterator(&iter); while ((np = next_node(&allnodes,np,&iter)) != NULL) { if (!(np->nd_state & INUSE_DOWN)) { if (np->nd_lastupdate < (time_now - chk_len)) { if (LOGLEVEL >= 6) { sprintf(log_buf, "node %s not detected in %ld seconds, contacting mom", np->nd_name, (long int)(time_now - np->nd_lastupdate)); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, log_buf); } if (LOGLEVEL >= 0) { sprintf(log_buf, "node %s not detected in %ld seconds, marking node down", np->nd_name, (long int)(time_now - np->nd_lastupdate)); log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, log_buf); } update_node_state(np, (INUSE_DOWN)); /* The node is up. Do not mark the node down, but schedule a check_nodes */ } } } /* END for each node */ if (ptask->wt_parm1 == NULL) { set_task(WORK_Timed, time_now + chk_len, check_nodes, (char *)NULL,FALSE); } /* since this is done via threading, we now free the task here */ free(ptask->wt_mutex); free(ptask); return(NULL); } /* check_nodes_work() */ /* * Mark any nodes that haven't checked in as down. * If the node isn't down then it checks to see that the * last update hasn't been too long ago. */ void check_nodes( struct work_task *ptask) /* I (modified) */ { int rc = enqueue_threadpool_request(check_nodes_work,ptask); if (rc) { log_err(rc, __func__, "Unable to enqueue check nodes task into the threadpool"); } } /* END check_nodes() */ void *write_node_state_work( void *vp) { struct pbsnode *np; static char *fmt = (char *)"%s %d\n"; static FILE *nstatef = NULL; int iter = -1; int savemask; pthread_mutex_lock(node_state_mutex); if (LOGLEVEL >= 5) { DBPRT(("write_node_state_work: entered\n")) } /* don't store volatile states like down and unknown */ savemask = INUSE_OFFLINE | INUSE_RESERVE; if (nstatef != NULL) { fseek(nstatef, 0L, SEEK_SET); /* rewind and clear */ if (ftruncate(fileno(nstatef), (off_t)0) != 0) { log_err(errno, __func__, "could not truncate file"); pthread_mutex_unlock(node_state_mutex); return(NULL); } } else { /* need to open for first time, temporary-move to pbsd_init */ if ((nstatef = fopen(path_nodestate, "w+")) == NULL) { log_err(errno, __func__, "could not open file"); pthread_mutex_unlock(node_state_mutex); return(NULL); } } /* ** The only state that carries forward is if the ** node has been marked offline. */ while ((np = next_host(&allnodes,&iter,NULL)) != NULL) { if (np->nd_state & INUSE_OFFLINE) { fprintf(nstatef, fmt, np->nd_name, np->nd_state & savemask); } unlock_node(np, __func__, NULL, LOGLEVEL); } /* END for each node */ if (fflush(nstatef) != 0) { log_err(errno, __func__, "failed saving node state to disk"); } fclose(nstatef); nstatef = NULL; pthread_mutex_unlock(node_state_mutex); return(NULL); } /* END write_node_state_work() */ void write_node_state(void) { int rc = enqueue_threadpool_request(write_node_state_work,NULL); if (rc) { log_err(rc, __func__, "Unable to enqueue write_node_state_work task into the threadpool"); } } /* END write_node_state() */ /* Create a new node_note file then overwrite the previous one. * * The note file could get up to: * (# of nodes) * (2 + MAX_NODE_NAME + MAX_NOTE) bytes in size */ int write_node_note(void) { struct pbsnode *np; int iter = -1; FILE *nin; if (LOGLEVEL >= 2) { DBPRT(("%s: entered\n", __func__)) } if ((nin = fopen(path_nodenote_new, "w")) == NULL) goto err1; if (svr_totnodes == 0) { log_event( PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, "Server has empty nodes list"); fclose(nin); return(-1); } /* for each node ... */ while ((np = next_host(&allnodes, &iter, NULL)) != NULL) { /* write node name followed by its note string */ if ((np->nd_note != NULL) && (np->nd_note[0] != '\0')) { fprintf(nin, "%s %s\n", np->nd_name, np->nd_note); } unlock_node(np, __func__, NULL, LOGLEVEL); } fflush(nin); if (ferror(nin)) { fclose(nin); goto err1; } fclose(nin); if (rename(path_nodenote_new, path_nodenote) != 0) { log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, (char *)"replacing old node note file failed"); return(-1); } return(PBSE_NONE); err1: log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, (char *)"Node note file update failed"); return(-1); } /* END write_node_note() */ /* * free_prop - free list of prop structures created by proplist() */ static void free_prop( struct prop *prop) { struct prop *pp; for (pp = prop;pp != NULL;pp = prop) { prop = pp->next; free(pp->name); free(pp); } /* END for (pp) */ return; } /* END free_prop() */ void *node_unreserve_work( void *vp) { resource_t handle = *((resource_t *)vp); struct pbsnode *np; int iter = -1; /* clear old reserve */ while ((np = next_host(&allnodes,&iter,NULL)) != NULL) { if (handle == RESOURCE_T_ALL) np->nd_np_to_be_used = 0; unlock_node(np, "node_unreserve_work", NULL, LOGLEVEL); } return(NULL); } /* END node_unreserve_work() */ /* * unreserve - unreserve nodes * * If handle is set to a existing resource_t, then release all nodes * associated with that handle, otherwise, (this is dangerous) * if handle == RESOURCE_T_ALL, release all nodes period. */ void node_unreserve( resource_t handle) { int rc = enqueue_threadpool_request(node_unreserve_work,NULL); if (rc) { log_err(rc, __func__, "Unable to enqueue node_unreserve task into the threadpool"); } } /* END node_unreserve() */ /* ** Look through the property list and make sure that all ** those marked are contained in the node. */ int hasprop( struct pbsnode *pnode, struct prop *props) { struct prop *need; for (need = props;need;need = need->next) { struct prop *pp; if (need->mark == 0) /* not marked, skip */ continue; for (pp = pnode->nd_first;pp != NULL;pp = pp->next) { if (strcmp(pp->name, need->name) == 0) break; /* found it */ } if (pp == NULL) { return(0); } } return(1); } /* END hasprop() */ /* * see if node has the number of processors required * if free == SKIP_NONE, check against total number of processors, else * if free != SKIP_NONE, check against number free * * Return 1 if possible, 0 if not */ static int hasppn( struct pbsnode *pnode, /* I */ int node_req, /* I */ int free) /* I */ { if ((free != SKIP_NONE) && (free != SKIP_NONE_REUSE) && (pnode->nd_slots.get_number_free() >= node_req)) { return(1); } if ((free == SKIP_NONE) && (pnode->nd_slots.get_total_execution_slots() >= node_req)) { return(1); } return(0); } /* END hasppn() */ /* ** Count how many gpus are available for use on this node */ static int gpu_count( struct pbsnode *pnode, /* I */ int freeonly) /* I */ { int count = 0; char log_buf[LOCAL_LOG_BUF_SIZE]; if ((pnode->nd_state & INUSE_OFFLINE) || (pnode->nd_state & INUSE_UNKNOWN) || (pnode->nd_state & INUSE_DOWN)) { if (LOGLEVEL >= 7) { sprintf(log_buf, "Counted %d gpus %s on node %s that was skipped", count, (freeonly? "free":"available"), pnode->nd_name); log_ext(-1, __func__, log_buf, LOG_DEBUG); } return (count); } if (pnode->nd_gpus_real) { int j; for (j = 0; j < pnode->nd_ngpus; j++) { struct gpusubn *gn = pnode->nd_gpusn + j; /* always ignore unavailable gpus */ if (gn->state == gpu_unavailable) continue; if (!freeonly) { count++; } else if ((gn->state == gpu_unallocated) || ((gn->state == gpu_shared) && (gpu_mode_rqstd == gpu_normal))) { count++;; } } } else { /* virtual gpus */ if (freeonly) { count = pnode->nd_ngpus_free; } else { count = pnode->nd_ngpus; } } if (LOGLEVEL >= 7) { sprintf(log_buf, "Counted %d gpus %s on node %s", count, (freeonly? "free":"available"), pnode->nd_name); log_ext(-1, __func__, log_buf, LOG_DEBUG); } return(count); } /* END gpu_count() */ /* ** get gpu index for this gpuid */ int gpu_entry_by_id( struct pbsnode *pnode, /* I */ char *gpuid, int get_empty) { if (pnode->nd_gpus_real) { int j; for (j = 0; j < pnode->nd_ngpus; j++) { struct gpusubn *gn = pnode->nd_gpusn + j; if ((gn->gpuid != NULL) && (strcmp(gpuid, gn->gpuid) == 0)) { return(j); } } } /* * we did not find the entry. if get_empty is set then look for an empty * slot. If none is found then try to add a new entry to nd_gpusn */ if (get_empty) { int j; for (j = 0; j < pnode->nd_ngpus; j++) { struct gpusubn *gn = pnode->nd_gpusn + j; if (gn->gpuid == NULL) { return(j); } } create_a_gpusubnode(pnode); return (pnode->nd_ngpus - 1); } return (-1); } /* END gpu_entry_by_id() */ /* * checks if a node is ok for to reshuffle * * All parameters are exactly the same as search * @param pnode - the node we're looking at * * @return TRUE if the node is reshuffleable for search's purposes */ int can_reshuffle( struct pbsnode *pnode, struct prop *glorf, int skip, int vpreq, int gpureq, int pass) { char log_buf[LOCAL_LOG_BUF_SIZE]; if (pnode->nd_ntype == NTYPE_CLUSTER) { if (pnode->nd_flag != thinking) { /* only shuffle nodes which have been selected above */ return(FALSE); } if (pnode->nd_state & pass) return(FALSE); if (LOGLEVEL >= 6) { sprintf(log_buf, "search(2): starting eval gpus on node %s need %d(%d) mode %d has %d free %d skip %d", pnode->nd_name, gpureq, pnode->nd_ngpus_needed, gpu_mode_rqstd, pnode->nd_ngpus, gpu_count(pnode, TRUE), skip); log_ext(-1, __func__, log_buf, LOG_DEBUG); } if ((skip == SKIP_EXCLUSIVE) && (vpreq < pnode->nd_slots.get_number_free()) && (gpureq < gpu_count(pnode, TRUE))) return(FALSE); if ((skip == SKIP_ANYINUSE) && (vpreq < pnode->nd_slots.get_number_free()) && (gpureq < gpu_count(pnode, TRUE))) return(FALSE); if (!hasprop(pnode, glorf)) return(FALSE); } else return(FALSE); return(TRUE); } /* can_reshuffle() */ /* ** Parse a number in a spec. ** Return 0 if okay, 1 if no number exists, -1 on error */ static int number( char **ptr, int *num) { char holder[80]; int i = 0; char *str = *ptr; char log_buf[LOCAL_LOG_BUF_SIZE]; while (isdigit(*str) && (unsigned int)(i + 1) < sizeof holder) holder[i++] = *str++; if (i == 0) { return(1); } holder[i] = '\0'; if ((i = atoi(holder)) <= 0) { sprintf(log_buf, "zero illegal"); return(-1); } *ptr = str; *num = i; return(0); } /* END number() */ /* ** Check string to see if it is a legal property name. ** If not, return 1. ** *prop set to static char array containing the properity, ** must be copied. */ static int property( char **ptr, char **prop) { char *str = *ptr; char *dest = *prop; int i = 0; char log_buf[LOCAL_LOG_BUF_SIZE]; long cray_enabled = FALSE; get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); if (!isalpha(*str)) { if ((cray_enabled == FALSE) || (is_compute_node(str) == FALSE)) { sprintf(log_buf, "first character of property (%s) not a letter", str); return(1); } } while (isalnum(*str) || *str == '-' || *str == '.' || *str == '=' || *str == '_') dest[i++] = *str++; dest[i] = '\0'; /* skip over "/vp_number" */ if (*str == '/') { do { str++; } while (isdigit(*str)); } *ptr = str; return(0); } /* END property() */ /* ** Create a property list from a string. ** Return 0 if all is well, 1 otherwise. */ int proplist( char **str, struct prop **plist, int *node_req, int *gpu_req, int *mic_req) { struct prop *pp; char name_storage[80]; char *pname; char *pequal; int have_gpus = FALSE; char log_buf[LOCAL_LOG_BUF_SIZE]; *node_req = 1; /* default to 1 processor per node */ pname = name_storage; *pname = '\0'; for (;;) { if (property(str, &pname)) { return(1); } if (*pname == '\0') break; if ((pequal = strchr(pname, (int)'=')) != NULL) { /* special property */ /* identify the special property and place its value */ /* into node_req */ *pequal = '\0'; if (strcmp(pname, "ppn") == 0) { pequal++; if ((number(&pequal, node_req) != 0) || (*pequal != '\0')) { return(1); } } else if (strcmp(pname, "mics") == 0) { pequal++; if ((number(&pequal, mic_req) != PBSE_NONE) || (*pequal != '\0')) { return(1); } } else if (strcmp(pname, "gpus") == 0) { pequal++; if ((number(&pequal, gpu_req) != 0) || (*pequal != '\0')) { return(1); } have_gpus = TRUE; gpu_err_reset = FALSE; /* default to no */ /* default value if no other gets specified */ gpu_mode_rqstd = gpu_exclusive_thread; } else { return(1); /* not recognized - error */ } } else if (have_gpus && (!strcasecmp(pname, "exclusive_thread"))) { gpu_mode_rqstd = gpu_exclusive_thread; } else if (have_gpus && (!strcasecmp(pname, "exclusive"))) { gpu_mode_rqstd = gpu_exclusive_thread; } else if (have_gpus && (!strcasecmp(pname, "exclusive_process"))) { gpu_mode_rqstd = gpu_exclusive_process; } else if (have_gpus && (!strcasecmp(pname, "default"))) { gpu_mode_rqstd = gpu_normal; } else if (have_gpus && (!strcasecmp(pname, "shared"))) { gpu_mode_rqstd = gpu_normal; } else if (have_gpus && (!strcasecmp(pname, "reseterr"))) { gpu_err_reset = TRUE; } else { pp = (struct prop *)calloc(1, sizeof(struct prop)); pp->mark = 1; pp->name = strdup(pname); pp->next = *plist; *plist = pp; } if ((have_gpus) && (LOGLEVEL >= 7)) { sprintf(log_buf, "proplist: set needed gpu mode to %d", gpu_mode_rqstd); log_ext(-1, __func__, log_buf, LOG_DEBUG); } if (**str != ':') break; (*str)++; } /* END for(;;) */ return(PBSE_NONE); } /* END proplist() */ /* ** Add the "global" spec to every sub-spec in "spec". ** RETURNS: allocated string buffer (must be freed externally) */ static char *mod_spec( char *spec, /* I */ char *global) /* I */ { char *line; char *cp; int len; int nsubspec; nsubspec = 1; for (cp = spec;*cp != '\0';cp++) { if (*cp == '+') { nsubspec++; } } len = strlen(global); line = (char *)calloc(1, nsubspec * (len + 1) + strlen(spec) + 1); if (line == NULL) { /* FAILURE */ return(NULL); } cp = line; while (*spec) { if (*spec == '+') { *cp++ = ':'; strcpy(cp, global); cp += len; } *cp++ = *spec++; } *cp++ = ':'; strcpy(cp, global); return(line); } /* END mod_spec() */ /* * Test a procs specification. * * Return >0 - number of procs counted in the spec if it works, * 0 - if it cannot be satisfied now, * -1 - if it can never be satisfied. * */ int procs_available( int proc_ct) { int iter = -1; int procs_avail = 0; struct pbsnode *pnode; if (proc_ct > svr_clnodes) { /* user requested more processors than are available on the system*/ return(-1); } while ((pnode = next_host(&allnodes,&iter,NULL)) != NULL) { procs_avail += pnode->nd_slots.get_number_free(); unlock_node(pnode, "procs_available", NULL, LOGLEVEL); } if (proc_ct > procs_avail) { return(0); } return(procs_avail); } /* END procs_available() */ int node_is_spec_acceptable( struct pbsnode *pnode, single_spec_data *spec, char *ProcBMStr, int *eligible_nodes) { struct prop *prop = spec->prop; int ppn_req = spec->ppn; int gpu_req = spec->gpu; int mic_req = spec->mic; int gpu_free; int np_free; int mic_free; #ifdef GEOMETRY_REQUESTS if (IS_VALID_STR(ProcBMStr)) { if (pnode->nd_state != INUSE_FREE) return(FALSE); if (node_satisfies_request(pnode, ProcBMStr) == FALSE) return(FALSE); } #endif /* NYI: check if these are necessary */ pnode->nd_flag = okay; if (pnode->nd_ntype != NTYPE_CLUSTER) return(FALSE); /* make sure that the node has properties */ if (hasprop(pnode, prop) == FALSE) return(FALSE); if ((hasppn(pnode, ppn_req, SKIP_NONE) == FALSE) || (gpu_count(pnode, FALSE) < gpu_req) || (pnode->nd_nmics < mic_req)) return(FALSE); (*eligible_nodes)++; if ((pnode->nd_state & (INUSE_OFFLINE | INUSE_DOWN | INUSE_RESERVE | INUSE_JOB)) != 0) return(FALSE); gpu_free = gpu_count(pnode, TRUE) - pnode->nd_ngpus_to_be_used; np_free = pnode->nd_slots.get_number_free() - pnode->nd_np_to_be_used; mic_free = pnode->nd_nmics_free - pnode->nd_nmics_to_be_used; if ((ppn_req > np_free) || (gpu_req > gpu_free) || (mic_req > mic_free)) return(FALSE); return(TRUE); } /* END node_is_spec_acceptable() */ int parse_req_data( complete_spec_data *all_reqs) { int i; int j = 0; long cray_enabled = FALSE; single_spec_data *req; get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); all_reqs->total_nodes = 0; for (i = 0; i < all_reqs->num_reqs; i++) { req = all_reqs->reqs + i; req->nodes = 1; req->gpu = 0; req->ppn = 1; req->prop = NULL; if ((cray_enabled == FALSE) || (is_compute_node(all_reqs->req_start[i]) == FALSE)) { if ((j = number(&(all_reqs->req_start[i]), &(req->nodes))) == -1) return(j); } if (j == 0) { /* there was a number */ if (*(all_reqs->req_start[i]) != '\0') { if (*(all_reqs->req_start[i]) == ':') all_reqs->req_start[i]++; if (proplist(&(all_reqs->req_start[i]), &(req->prop), &(req->ppn), &(req->gpu), &(req->mic))) return(-1); } } else { if (*(all_reqs->req_start[i]) != '\0') { if (proplist(&(all_reqs->req_start[i]), &(req->prop), &(req->ppn), &(req->gpu), &(req->mic))) return(-1); } } all_reqs->total_nodes += req->nodes; } return(PBSE_NONE); } /* END parse_req_data() */ /* * builds the node_job_add_info struct that will be used by set_nodes * instead of looping over different nodes. */ int save_node_for_adding( node_job_add_info *naji, struct pbsnode *pnode, single_spec_data *req, char *first_node_name, int is_external_node, int req_rank) { node_job_add_info *to_add; node_job_add_info *old_next; node_job_add_info *cur_naji; bool first = false; if ((first_node_name[0] != '\0') && (!strcmp(first_node_name, pnode->nd_name))) { pnode->nd_order = 0; first = true; req_rank = 0; } else pnode->nd_order = 1; if (naji->node_name[0] == '\0') { /* first */ snprintf(naji->node_name, sizeof(naji->node_name), "%s", pnode->nd_name); naji->ppn_needed = req->ppn; naji->gpu_needed = req->gpu; naji->mic_needed = req->mic; naji->is_external = is_external_node; naji->req_rank = req_rank; } else { /* second */ if ((to_add = (node_job_add_info *)calloc(1, sizeof(node_job_add_info))) == NULL) { log_err(ENOMEM, __func__, "Cannot allocate memory!"); return(ENOMEM); } if (first == true) { /* move the first element here and place me first */ memcpy(to_add, naji, sizeof(node_job_add_info)); snprintf(naji->node_name, sizeof(naji->node_name), "%s", pnode->nd_name); naji->ppn_needed = req->ppn; naji->gpu_needed = req->gpu; naji->mic_needed = req->mic; naji->is_external = is_external_node; naji->req_rank = req_rank; } else { /* initialize to_add */ snprintf(to_add->node_name, sizeof(to_add->node_name), "%s", pnode->nd_name); to_add->ppn_needed = req->ppn; to_add->gpu_needed = req->gpu; to_add->mic_needed = req->mic; to_add->is_external = is_external_node; to_add->req_rank = req_rank; } /* fix pointers, NOTE: works even if old_next == NULL */ cur_naji = naji; old_next = cur_naji->next; while(old_next != NULL) { if(to_add->req_rank <= old_next->req_rank) { cur_naji->next = to_add; to_add->next = old_next; to_add = NULL; break; } cur_naji = old_next; old_next = cur_naji->next; } if(to_add != NULL) { cur_naji->next = to_add; to_add->next = NULL; } } /* count off the number we have reserved */ pnode->nd_np_to_be_used += req->ppn; pnode->nd_ngpus_to_be_used += req->gpu; return(PBSE_NONE); } /* END save_node_for_adding */ /* * if there is a node being requested, the spec should look like * node_name[:ppn=X][+]... * otherwise it should look like: * <NUM_NODES>[:ppn=X][+]... * * If a specific node is being requested first, copy just the * name into first_node_name */ void set_first_node_name( char *spec_param, /* I */ char *first_node_name) /* O */ { int i; int len; if (isdigit(spec_param[0]) == TRUE) { first_node_name[0] = '\0'; } else { len = strlen(spec_param); for (i = 0; i < len; i++) { /* a ':' means you've moved on to ppn and a + means its the next req */ if ((spec_param[i] == ':') || (spec_param[i] == '+') || (spec_param[i] == '|')) break; else first_node_name[i] = spec_param[i]; } /* make sure you NULL terminate */ first_node_name[i] = '\0'; } } /* END set_first_node_name() */ int is_reserved_property( char *prop) { if ((strncmp(prop, "ppn", strlen("ppn")) == 0) || (strncmp(prop, "gpus", strlen("gpus") == 0)) || (strncasecmp(prop, "exclusive_thread", strlen("exclusive_thread")) == 0) || (strncasecmp(prop, "exclusive", strlen("exclusive")) == 0) || (strncasecmp(prop, "exclusive_process", strlen("exclusive_process")) == 0) || (strncasecmp(prop, "default", strlen("default")) == 0) || (strncasecmp(prop, "shared", strlen("shared")) == 0) || (strncasecmp(prop, "reseterr", strlen("reseterr")) == 0)) return(TRUE); else return(FALSE); } /* END is_reserved_property() */ int is_compute_node( char *node_id) { struct pbsnode *pnode; int rc = FALSE; char *colon; char *plus; if ((colon = strchr(node_id, ':')) != NULL) { if ((!strcmp(colon + 1, "external")) || (!strcmp(colon + 1, alps_reporter_feature)) || (!strcmp(colon + 1, alps_starter_feature))) { return(rc); } else *colon = '\0'; } if ((plus = strchr(node_id, '+')) != NULL) *plus = '\0'; if ((pnode = find_nodebyname(node_id)) != NULL) { rc = TRUE; unlock_node(pnode, __func__, NULL, LOGLEVEL); } if (colon != NULL) *colon = ':'; if (plus != NULL) *plus = '+'; return(rc); } /* END is_compute_node() */ void release_node_allocation( node_job_add_info *naji) { node_job_add_info *current = NULL; struct pbsnode *pnode = NULL; current = naji; while (current != NULL) { if ((pnode = find_nodebyname(current->node_name)) != NULL) { pnode->nd_np_to_be_used -= current->ppn_needed; pnode->nd_ngpus_to_be_used -= current->gpu_needed; pnode->nd_nmics_to_be_used -= current->mic_needed; unlock_node(pnode, __func__, NULL, LOGLEVEL); } current = current->next; } } /* END release_node_allocation() */ int check_for_node_type( complete_spec_data *all_reqs, enum node_types nt) { single_spec_data *req; int i; int found_type = FALSE; struct pbsnode *pnode; struct pbsnode *reporter = alps_reporter; struct prop *p; if (reporter == NULL) { /* this shouldn't be possible */ log_err(-1, __func__, "Checking for node types with a non-cray enabled pbs_server??"); return(-1); } for (i = 0; i < all_reqs->num_reqs; i++) { req = all_reqs->reqs + i; for (p = req->prop; p != NULL; p = p->next) { if ((!strcmp(p->name, "cray_compute")) || (!strcmp(p->name, alps_starter_feature))) continue; lock_node(reporter, __func__, NULL, LOGLEVEL); pnode = find_node_in_allnodes(&(reporter->alps_subnodes), p->name); unlock_node(reporter, __func__, NULL, LOGLEVEL); if (pnode != NULL) { unlock_node(pnode, __func__, NULL, LOGLEVEL); if (nt == ND_TYPE_CRAY) { found_type = TRUE; break; } } else if (nt != ND_TYPE_CRAY) { int login = FALSE; pnode = find_nodebyname(p->name); if (pnode != NULL) { if (pnode->nd_is_alps_login == TRUE) login = TRUE; unlock_node(pnode, __func__, NULL, LOGLEVEL); if (nt == ND_TYPE_EXTERNAL) { if (login == FALSE) found_type = TRUE; } else if (nt == ND_TYPE_LOGIN) if (login == TRUE) found_type = TRUE; break; } } } if (found_type == TRUE) break; } return(found_type); } /* END check_for_node_type() */ enum job_types find_job_type( complete_spec_data *all_reqs) { enum job_types jt = JOB_TYPE_cray; if (check_for_node_type(all_reqs, ND_TYPE_CRAY) == TRUE) { if (check_for_node_type(all_reqs, ND_TYPE_EXTERNAL) == TRUE) jt = JOB_TYPE_heterogeneous; else jt = JOB_TYPE_cray; } else if (check_for_node_type(all_reqs, ND_TYPE_EXTERNAL) == TRUE) { jt = JOB_TYPE_normal; } else if (check_for_node_type(all_reqs, ND_TYPE_LOGIN) == TRUE) jt = JOB_TYPE_login; return(jt); } /* END find_job_type() */ int add_login_node_if_needed( char **first_node_name_ptr, char *login_prop, node_job_add_info *naji) { char *first_node_name = *first_node_name_ptr; struct pbsnode *login = find_nodebyname(first_node_name); int need_to_add_login = FALSE; int rc = PBSE_NONE; int dummy1; int dummy2; int dummy3; struct prop *prop = NULL; single_spec_data req; if (login == NULL) need_to_add_login = TRUE; else { if (login->nd_is_alps_login == FALSE) need_to_add_login = TRUE; unlock_node(login, __func__, NULL, LOGLEVEL); } if (need_to_add_login == TRUE) { if (login_prop != NULL) { proplist(&login_prop, &prop, &dummy1, &dummy2, &dummy3); } if ((login = get_next_login_node(prop)) == NULL) rc = -1; else { if (naji != NULL) { /* add to naji */ req.nodes = 1; req.ppn = 1; req.gpu = 0; req.mic = 0; req.prop = NULL; save_node_for_adding(naji, login, &req, login->nd_name, FALSE, -1); strcpy(*first_node_name_ptr, login->nd_name); } rc = PBSE_NONE; unlock_node(login, __func__, NULL, LOGLEVEL); } if (prop != NULL) free_prop(prop); } return(rc); } /* END add_login_node_if_needed() */ int node_is_external( struct pbsnode *pnode) { int is_external = FALSE; /* all logins have nd_is_alps_login set to true. * all cray computes have their parent pointer set to alps_reporter. * if neither of these are found, it must be an external node */ if ((pnode->nd_is_alps_login == FALSE) && (pnode->parent == NULL)) is_external = TRUE; return(is_external); } /* END node_is_external() */ /* * Test a node specification. * * Return >0 - number of nodes counted in the spec if it works, * 0 - if it cannot be satisfied, * -1 - if it can never be satisfied. * Okay to bail early if "early" is true. * VPs selected are marked "thinking" */ int node_spec( char *spec_param, /* I */ int early, /* I (boolean) */ int exactmatch, /* I (boolean) - NOT USED */ char *ProcBMStr, /* I */ char *FailNode, /* O (optional,minsize=1024) */ node_job_add_info *naji, /* O (optional) */ char *EMsg, /* O (optional,minsize=1024) */ char *login_prop, /* I (optional) */ alps_req_data **ard_array, /* O (optional) */ int *num_reqs, /* O (optional) */ enum job_types &job_type) { struct pbsnode *pnode; char first_node_name[PBS_MAXHOSTNAME + 1]; char *first_name_ptr; node_iterator iter; char log_buf[LOCAL_LOG_BUF_SIZE]; char *globs; char *cp; char *hold; int i; int num; int rc; int eligible_nodes = 0; complete_spec_data all_reqs; char *spec; char *plus; long cray_enabled = FALSE; int num_alps_reqs = 0; if (EMsg != NULL) EMsg[0] = '\0'; if (FailNode != NULL) FailNode[0] = '\0'; if (LOGLEVEL >= 6) { sprintf(log_buf, "entered spec=%.4000s", spec_param); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); DBPRT(("%s\n", log_buf)); } job_type = JOB_TYPE_normal; set_first_node_name(spec_param, first_node_name); get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); spec = strdup(spec_param); if (spec == NULL) { /* FAILURE */ sprintf(log_buf, "cannot alloc memory"); if (LOGLEVEL >= 1) { log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } if (EMsg != NULL) { snprintf(EMsg, 1024, "%s", log_buf); } return(-1); } if ((globs = strchr(spec, '#')) != NULL) { *globs++ = '\0'; globs = strdup(globs); while ((cp = strrchr(globs, '#')) != NULL) { *cp++ = '\0'; hold = mod_spec(spec, cp); free(spec); spec = hold; } hold = mod_spec(spec, globs); free(spec); spec = hold; free(globs); } /* END if ((globs = strchr(spec,'#')) != NULL) */ all_reqs.num_reqs = 1; plus = spec; /* count number of reqs */ while (*plus != '\0') { if ((*plus == '+') || (*plus == '|')) all_reqs.num_reqs++; plus++; } /* allocate space in all_reqs */ all_reqs.reqs = (single_spec_data *)calloc(all_reqs.num_reqs, sizeof(single_spec_data)); all_reqs.req_start = (char **)calloc(all_reqs.num_reqs, sizeof(char *)); if ((all_reqs.reqs == NULL) || (all_reqs.req_start == NULL)) { if (all_reqs.reqs != NULL) free(all_reqs.reqs); else if (all_reqs.req_start != NULL) free(all_reqs.req_start); log_err(ENOMEM, __func__, "Cannot allocate memory!"); free(spec); return(-1); } /* set up pointers for reqs */ plus = spec; i = 0; all_reqs.req_start[i] = spec; i++; while (*plus != '\0') { /* make the '+' NULL and advance past it */ if (*plus == '|') num_alps_reqs++; if ((*plus == '|') || (*plus == '+')) { all_reqs.reqs[i].req_id = num_alps_reqs; *plus = '\0'; plus++; /* advance past "nodes=" */ if (!strncmp(plus, "nodes=", strlen("nodes="))) plus += strlen("nodes="); all_reqs.req_start[i] = plus; i++; } else plus++; } /* now parse each spec into the data */ if ((rc = parse_req_data(&all_reqs)) != PBSE_NONE) { /* FAILURE */ for (i = 0; i < all_reqs.num_reqs; i++) free_prop(all_reqs.reqs[i].prop); free(all_reqs.reqs); free(all_reqs.req_start); free(spec); return(rc); } num = all_reqs.total_nodes; #ifndef CRAY_MOAB_PASSTHRU /* If we restart pbs_server while the cray is down, pbs_server won't know about * the computes. Don't perform this check for this case. */ if ((cray_enabled != TRUE) || (alps_reporter == NULL) || (alps_reporter->alps_subnodes.ra->num != 0)) { if (num > svr_clnodes) { /* FAILURE */ free(spec); sprintf(log_buf, "job allocation request exceeds available cluster nodes, %d requested, %d available", num, svr_clnodes); if (LOGLEVEL >= 6) { log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } if (EMsg != NULL) { snprintf(EMsg, 1024, "%s", log_buf); } return(-1); } } #endif if (LOGLEVEL >= 6) { sprintf(log_buf, "job allocation debug: %d requested, %d svr_clnodes, %d svr_totnodes", num, svr_clnodes, svr_totnodes); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); DBPRT(("%s\n", log_buf)); } if (cray_enabled == TRUE) { job_type = find_job_type(&all_reqs); first_name_ptr = first_node_name; if ((job_type == JOB_TYPE_cray) || (job_type == JOB_TYPE_heterogeneous)) { /* naji == NULL indicates that this is a qsub not a run command, * we only need to assign the login when the job is run */ if (naji != NULL) { if (add_login_node_if_needed(&first_name_ptr, login_prop, naji) != PBSE_NONE) { snprintf(log_buf, sizeof(log_buf), "Couldn't find an acceptable login node for spec '%s' with feature request '%s'", spec_param, (login_prop != NULL) ? login_prop : "null"); log_err(-1, __func__, log_buf); free(spec); return(-1); } } } if ((num_alps_reqs > 0) && (ard_array != NULL) && (job_type == JOB_TYPE_cray)) { *ard_array = (alps_req_data *)calloc(num_alps_reqs + 1, sizeof(alps_req_data)); for (i = 0; i <= num_alps_reqs; i++) (*ard_array)[i].node_list = get_dynamic_string(-1, NULL); *num_reqs = num_alps_reqs + 1; } } reinitialize_node_iterator(&iter); pnode = NULL; /* iterate over all nodes */ while ((pnode = next_node(&allnodes,pnode,&iter)) != NULL) { /* check each req against this node to see if it satisfies it */ for (i = 0; i < all_reqs.num_reqs; i++) { single_spec_data *req = all_reqs.reqs + i; if (req->nodes > 0) { if (node_is_spec_acceptable(pnode, req, ProcBMStr, &eligible_nodes) == TRUE) { if (naji != NULL) { /* for heterogeneous jobs on the cray, record the external * nodes in a separate attribute */ if ((job_type == JOB_TYPE_heterogeneous) && (node_is_external(pnode) == TRUE)) save_node_for_adding(naji, pnode, req, first_node_name, TRUE, i+1); else save_node_for_adding(naji, pnode, req, first_node_name, FALSE, i+1); if ((num_alps_reqs > 0) && (ard_array != NULL) && (*ard_array != NULL)) { if ((*ard_array)[req->req_id].node_list->used != 0) append_char_to_dynamic_string((*ard_array)[req->req_id].node_list, ','); append_dynamic_string((*ard_array)[req->req_id].node_list, pnode->nd_name); if (req->ppn > (*ard_array)[req->req_id].ppn) (*ard_array)[req->req_id].ppn = req->ppn; } } /* decrement needed nodes */ all_reqs.total_nodes--; req->nodes--; /* are all reqs satisfied? */ if (all_reqs.total_nodes == 0) break; } } } /* are all reqs satisfied? */ if (all_reqs.total_nodes == 0) { unlock_node(pnode, __func__, NULL, LOGLEVEL); break; } } /* END for each node */ for (i = 0; i < all_reqs.num_reqs; i++) if (all_reqs.reqs[i].prop != NULL) free_prop(all_reqs.reqs[i].prop); free(all_reqs.reqs); free(all_reqs.req_start); free(spec); /* If we restart pbs_server while the cray is down, pbs_server won't know about * the computes. Don't perform this check for this case. */ if ((cray_enabled != TRUE) || (alps_reporter == NULL) || (alps_reporter->alps_subnodes.ra->num != 0)) { #ifndef CRAY_MOAB_PASSTHRU if (eligible_nodes < num) { if ((SvrNodeCt == 0) || (SvrNodeCt < num)) { /* sufficient eligible nodes do not exist */ /* FAILURE */ sprintf(log_buf, "job requesting nodes that will never be available - spec = %s", spec_param); log_err(-1, __func__, log_buf); if (naji != NULL) release_node_allocation(naji); return(-1); } } #endif } if (all_reqs.total_nodes > 0) { /* nodes not currently available */ /* FAILURE */ sprintf(log_buf, "job allocation request exceeds currently available cluster nodes, %d requested, %d available", num, num - all_reqs.total_nodes); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); if (EMsg != NULL) { snprintf(EMsg, MAXLINE, "%s", log_buf); } if (naji != NULL) release_node_allocation(naji); return(0); } /* END if (all_reqs.total_nodes > 0) */ /* SUCCESS - spec is ok */ if (LOGLEVEL >= 6) { sprintf(log_buf, "job allocation debug(3): returning %d requested", num); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); DBPRT(("%s\n", log_buf)); } return(num); } /* END node_spec() */ #ifdef GEOMETRY_REQUESTS /** * get_bitmap * * @param pjob (I) - the job whose bitmap is be retrieved * @param ProcBMPtr (O) - the ptr to the string where the bitmap will be stored * @param ProcBMSize (I) - the size of the string ProcBMPtr points to * @return FAILURE if there is no specified bitmap or either pjob or ProcBMStrPtr are NULL * @return SUCCESS otherwise */ int get_bitmap( job *pjob, /* I */ int ProcBMSize, /* I */ char *ProcBMPtr) /* O */ { resource *presc; resource_def *prd; char LocalBM[MAX_BM]; if ((pjob == NULL) || (ProcBMPtr == NULL)) { return(FAILURE); } LocalBM[0] = '\0'; /* read the bitmap from the resource list */ prd = find_resc_def(svr_resc_def,"procs_bitmap",svr_resc_size); presc = find_resc_entry(&pjob->ji_wattr[JOB_ATR_resource],prd); if ((presc != NULL) && (presc->rs_value.at_flags & ATR_VFLAG_SET)) { snprintf(LocalBM,sizeof(LocalBM),"%s",presc->rs_value.at_val.at_str); } else { /* fail if there was no bitmap given */ return(FAILURE); } if (LocalBM[0] == '\0') { /* fail if there was no bitmap given */ return(FAILURE); } else { snprintf(ProcBMPtr,sizeof(LocalBM),"%s",LocalBM); return(SUCCESS); } } /* end get_bitmap() */ /** * node_satisfies_request * * @param pnode (I) - the node to check for validity * @param ProcBMStr (I) - the bitmap of procs requested * @return TRUE - if the node satisfies the bitmap, FALSE otherwise * @return BM_ERROR if the bitmap isn't valid * * NOTE: must always be called by a thread already locking the pnode's mutex */ int node_satisfies_request( struct pbsnode *pnode, /* I */ char *ProcBMStr) /* I */ { int BMLen; int BMIndex; if (IS_VALID_STR(ProcBMStr) == FALSE) return(BM_ERROR); /* nodes are exclusive when we're using bitmaps */ if (pnode->nd_state != INUSE_FREE) return(FALSE); BMLen = strlen(ProcBMStr); /* process in reverse because ProcBMStr[0] referes to core index 0 */ BMIndex = BMLen-1; /* check if the requested processors are available on this node */ for (int i = 0; i < pnode->nd_slots.get_total_execution_slots() && BMIndex >= 0; i++) { /* don't check cores that aren't requested */ if (ProcBMStr[BMIndex--] != '1') continue; if (pnode->nd_slots.is_occupied(i) == true) return(FALSE); } if (BMIndex >= 0) { /* this means we didn't finish checking the string - * the node doesn't have enough processors */ return(FALSE); } /* passed all checks, we're good */ return(TRUE); } /* END node_satisfies_request() */ /** * reserve_node * * @param pnode - node to reserve * @param pjob - the job to be added to the node * @param hlistptr - a pointer to the host list */ job_reservation_info *reserve_node( struct pbsnode *pnode, /* I/O */ job *pjob, /* I */ char *ProcBMStr) /* I */ { if ((pnode == NULL) || (pjob == NULL) || (ProcBMStr == NULL)) { return(NULL); } int BMIndex = strlen(ProcBMStr) - 1; job_reservation_info *node_info = (job_reservation_info *)calloc(1, sizeof(job_reservation_info)); /* now reserve each node */ for (int i = 0; i < pnode->nd_slots.get_total_execution_slots() && BMIndex >= 0; i++) { /* ignore unrequested cores */ if (ProcBMStr[BMIndex--] != '1') continue; pnode->nd_slots.reserve_execution_slot(i, node_info->est); } if (BMIndex >= 0) { /* failure */ free(node_info); return(NULL); } job_usage_info *jui = new job_usage_info(pjob->ji_qs.ji_jobid); jui->est = node_info->est; snprintf(node_info->node_name, sizeof(node_info->node_name), "%s", pnode->nd_name); node_info->port = pnode->nd_mom_rm_port; pnode->nd_job_usages.push_back(jui); /* mark the node as exclusive */ pnode->nd_state = INUSE_JOB; return(node_info); } #endif /* GEOMETRY_REQUESTS */ /** * adds this job to the node's list of jobs * checks to be sure not to add duplicates * * conditionally updates the subnode's state * decrements the amount of needed nodes * * @param pnode - the node that the job is running on * @param nd_psn - the subnode (processor) that the job is running on * @param newstate - the state nodes are transitioning to when used * @param pjob - the job that is going to be run */ int add_job_to_node( struct pbsnode *pnode, /* I/O */ struct pbssubn *snp, /* I/O */ short newstate, /* I */ job *pjob) /* I */ { struct jobinfo *jp; char log_buf[LOCAL_LOG_BUF_SIZE]; /* NOTE: search existing job array. add job only if job not already in place */ if (LOGLEVEL >= 5) { sprintf(log_buf, "allocated node %s/%d to job %s (nsnfree=%d)", pnode->nd_name, snp->index, pjob->ji_qs.ji_jobid, pnode->nd_slots.get_number_free()); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); DBPRT(("%s\n", log_buf)); } for (jp = snp->jobs;jp != NULL;jp = jp->next) { if (!(strcmp(jp->jobid, pjob->ji_qs.ji_jobid))) break; } if (jp == NULL) { /* add job to front of subnode job array */ jp = (struct jobinfo *)calloc(1, sizeof(struct jobinfo)); jp->next = snp->jobs; snp->jobs = jp; strcpy(jp->jobid, pjob->ji_qs.ji_jobid); /* if no free VPs, set node state */ if ((pnode->nd_slots.get_number_free() <= 0) || (pjob->ji_wattr[JOB_ATR_node_exclusive].at_val.at_long == TRUE)) pnode->nd_state = newstate; if (snp->inuse == INUSE_FREE) { snp->inuse = newstate; } } /* decrement the amount of nodes needed */ --pnode->nd_np_to_be_used; return(SUCCESS); } /* END add_job_to_node() */ int add_job_to_gpu_subnode( struct pbsnode *pnode, struct gpusubn *gn, job *pjob) { if (!pnode->nd_gpus_real) { /* update the gpu subnode */ strcpy(gn->jobid, pjob->ji_qs.ji_jobid); gn->inuse = TRUE; /* update the main node */ pnode->nd_ngpus_free--; } gn->job_count++; pnode->nd_ngpus_to_be_used--; return(PBSE_NONE); } /* END add_job_to_gpu_subnode() */ int add_job_to_mic( struct pbsnode *pnode, int index, job *pjob) { int rc = -1; if (pnode->nd_micjobs[index].jobid[0] == '\0') { strcpy(pnode->nd_micjobs[index].jobid, pjob->ji_qs.ji_jobid); pnode->nd_nmics_free--; pnode->nd_nmics_to_be_used--; rc = PBSE_NONE; } return(rc); } /* END add_job_to_mic() */ int remove_job_from_nodes_mics( struct pbsnode *pnode, job *pjob) { short i; for (i = 0; i < pnode->nd_nmics; i++) { if (!strcmp(pnode->nd_micjobs[i].jobid, pjob->ji_qs.ji_jobid)) pnode->nd_micjobs[i].jobid[0] = '\0'; } return(PBSE_NONE); } /* END remove_job_from_nodes_mics() */ /** * builds the host list (hlist) * * @param pnode - the node being added to the host list * @param hlist - the host list being built */ int build_host_list( struct howl **hlistptr, /* O */ struct pbssubn *snp, /* I */ struct pbsnode *pnode) /* I */ { struct howl *curr; struct howl *prev; struct howl *hp; /* initialize the pointers */ curr = (struct howl *)calloc(1, sizeof(struct howl)); curr->order = pnode->nd_order; curr->name = pnode->nd_name; curr->index = snp->index; curr->port = pnode->nd_mom_rm_port; /* find the proper place in the list */ for (prev = NULL, hp = *hlistptr;hp;prev = hp, hp = hp->next) { if (curr->order <= hp->order) break; } /* END for (prev) */ /* set the correct pointers in the list */ curr->next = hp; if (prev == NULL) *hlistptr = curr; else prev->next = curr; return(SUCCESS); } int add_gpu_to_hostlist( struct howl **hlistptr, struct gpusubn *gn, struct pbsnode *pnode) { struct howl *curr; struct howl *prev; struct howl *hp; char *gpu_name; static char *gpu = (char *)"gpu"; /* create gpu_name */ gpu_name = (char *)calloc(1, strlen(pnode->nd_name) + strlen(gpu) + 2); sprintf(gpu_name, "%s-%s", pnode->nd_name, gpu); /* initialize the pointers */ curr = (struct howl *)calloc(1, sizeof(struct howl)); curr->order = pnode->nd_order; curr->name = gpu_name; curr->index = gn->index; curr->port = pnode->nd_mom_rm_port; /* find the proper place in the list */ for (prev = NULL, hp = *hlistptr;hp;prev = hp, hp = hp->next) { if (curr->order <= hp->order) break; } /* END for (prev) */ /* set the correct pointers in the list */ curr->next = hp; if (prev == NULL) *hlistptr = curr; else prev->next = curr; return(SUCCESS); } /* END add_gpu_to_hostlist() */ /* * checks the gpus of pnode and places them in gpu_list as necessary */ int place_gpus_in_hostlist( struct pbsnode *pnode, job *pjob, node_job_add_info *naji, struct howl **gpu_list) { int j; struct gpusubn *gn; char log_buf[LOCAL_LOG_BUF_SIZE]; /* place the gpus in the hostlist as well */ for (j = 0; j < pnode->nd_ngpus && naji->gpu_needed > 0; j++) { sprintf(log_buf, "node: %s j %d ngpus %d need %d", pnode->nd_name, j, pnode->nd_ngpus, pnode->nd_ngpus_needed); if (LOGLEVEL >= 7) { log_ext(-1, __func__, log_buf, LOG_DEBUG); } DBPRT(("%s\n", log_buf)); gn = pnode->nd_gpusn + j; if (pnode->nd_gpus_real) { if ((gn->state == gpu_unavailable) || (gn->state == gpu_exclusive) || ((((int)gn->mode == gpu_normal)) && (gpu_mode_rqstd != gpu_normal) && (gn->state != gpu_unallocated))) continue; } else { if ((gn->state == gpu_unavailable) || (gn->inuse == TRUE)) continue; } if ((gn->state == gpu_unavailable) || ((gn->state == gpu_exclusive) && pnode->nd_gpus_real) || ((pnode->nd_gpus_real) && ((int)gn->mode == gpu_normal) && ((gpu_mode_rqstd != gpu_normal) && (gn->state != gpu_unallocated))) || ((!pnode->nd_gpus_real) && (gn->inuse == TRUE))) continue; add_job_to_gpu_subnode(pnode,gn,pjob); naji->gpu_needed--; sprintf(log_buf, "ADDING gpu %s/%d to exec_gpus still need %d", pnode->nd_name, j, pnode->nd_ngpus_needed); if (LOGLEVEL >= 7) { log_ext(-1, __func__, log_buf, LOG_DEBUG); } DBPRT(("%s\n", log_buf)); add_gpu_to_hostlist(gpu_list,gn,pnode); /* * If this a real gpu in exclusive/single job mode, or a gpu in default * mode and the job requested an exclusive mode then we change state * to exclusive so we cannot assign another job to it */ if ((pnode->nd_gpus_real) && ((gn->mode == gpu_exclusive_thread) || (gn->mode == gpu_exclusive_process) || ((gn->mode == gpu_normal) && ((gpu_mode_rqstd == gpu_exclusive_thread) || (gpu_mode_rqstd == gpu_exclusive_process))))) { gn->state = gpu_exclusive; sprintf(log_buf, "Setting gpu %s/%d to state EXCLUSIVE for job %s", pnode->nd_name, j, pjob->ji_qs.ji_jobid); if (LOGLEVEL >= 7) { log_ext(-1, __func__, log_buf, LOG_DEBUG); } } /* * If this a real gpu in shared/default job mode and the state is * unallocated then we change state to shared so only other shared jobs * can use it */ if ((pnode->nd_gpus_real) && (gn->mode == gpu_normal) && (gpu_mode_rqstd == gpu_normal) && (gn->state == gpu_unallocated)) { gn->state = gpu_shared; sprintf(log_buf, "Setting gpu %s/%d to state SHARED for job %s", pnode->nd_name, j, pjob->ji_qs.ji_jobid); if (LOGLEVEL >= 7) { log_ext(-1, __func__, log_buf, LOG_DEBUG); } } } return(PBSE_NONE); } /* END place_gpus_in_hostlist() */ int add_mic_to_list( struct howl **mic_list, struct pbsnode *pnode, int index) { struct howl *curr; struct howl *prev; struct howl *hp; char *name; static char *mic = (char *)"mic"; /* create gpu_name */ name = (char *)calloc(1, strlen(pnode->nd_name) + strlen(mic) + 2); sprintf(name, "%s-%s", pnode->nd_name, mic); /* initialize the pointers */ curr = (struct howl *)calloc(1, sizeof(struct howl)); curr->order = pnode->nd_order; curr->name = name; curr->index = index; curr->port = pnode->nd_mom_rm_port; /* find the proper place in the list */ for (prev = NULL, hp = *mic_list; hp != NULL; prev = hp, hp = hp->next) { if (curr->order <= hp->order) break; } /* END for (prev) */ /* set the correct pointers in the list */ curr->next = hp; if (prev == NULL) *mic_list = curr; else prev->next = curr; return(PBSE_NONE); } /* END add_mic_to_list() */ int place_mics_in_hostlist( struct pbsnode *pnode, job *pjob, node_job_add_info *naji, struct howl **mic_list) { int i; for (i = 0; i < pnode->nd_nmics && naji->mic_needed > 0; i++) { if (add_job_to_mic(pnode, i, pjob) == PBSE_NONE) { naji->mic_needed--; add_mic_to_list(mic_list, pnode, i); } } return(PBSE_NONE); } /* END place_mics_in_hostlist() */ /* * checks the subnodes of pnode and places them in the host list * as necessary */ job_reservation_info *place_subnodes_in_hostlist( job *pjob, struct pbsnode *pnode, node_job_add_info *naji, char *ProcBMStr) { #ifdef GEOMETRY_REQUESTS if (IS_VALID_STR(ProcBMStr)) { job_reservation_info *node_info = reserve_node(pnode, pjob, ProcBMStr); if (node_info != NULL) { // nodes are used exclusively for GEOMETRY_REQUESTS pnode->nd_np_to_be_used = 0; naji->ppn_needed = 0; } return(node_info); } #endif job_reservation_info *node_info = (job_reservation_info *)calloc(1, sizeof(job_reservation_info)); if (pnode->nd_slots.reserve_execution_slots(naji->ppn_needed, node_info->est) == PBSE_NONE) { /* SUCCESS */ pnode->nd_np_to_be_used -= naji->ppn_needed; naji->ppn_needed = 0; node_info->port = pnode->nd_mom_rm_port; job_usage_info *jui = new job_usage_info(pjob->ji_qs.ji_jobid); jui->est = node_info->est; snprintf(node_info->node_name, sizeof(node_info->node_name), "%s", pnode->nd_name); pnode->nd_job_usages.push_back(jui); if ((pnode->nd_slots.get_number_free() <= 0) || (pjob->ji_wattr[JOB_ATR_node_exclusive].at_val.at_long == TRUE)) pnode->nd_state |= INUSE_JOB; } else { free(node_info); node_info = NULL; } return(node_info); } /* END place_subnodes_in_hostlist() */ /* * takes a struct howl and translates it to a string that will * become a job pbs_attribute (exec_hosts, exec_gpus, exec_ports) * NOTE: frees list (the struct howl) */ int translate_howl_to_string( struct howl *list, char *EMsg, int *NCount, char **str_ptr, char **portstr_ptr, int port) { struct howl *hp; struct howl *next; size_t len = 1; int count = 1; char *str; char *end; char *portlist = NULL; char *endport; for (hp = list;hp != NULL;hp = hp->next) { len += (strlen(hp->name) + 8); count++; } if ((str = (char *)calloc(1, len + 1)) == NULL) { log_err(ENOMEM, __func__, "Cannot allocate memory!"); if (EMsg != NULL) sprintf(EMsg,"no nodes can be allocated to job"); return(PBSE_RESCUNAV); } *str = '\0'; if (port == TRUE) { /* port list will have a string of sister port addresses */ if ((portlist = (char *)calloc(1, (count * PBS_MAXPORTNUM) + count)) == NULL) { log_err(ENOMEM, __func__, "Cannot allocate memory!"); if (EMsg != NULL) sprintf(EMsg,"no nodes can be allocated to job"); free(str); return(PBSE_RESCUNAV); } *portlist = '\0'; } /* now copy in name+name+... */ *NCount = 0; for (hp = list,end = str,endport = portlist; hp != NULL; hp = next) { (*NCount)++; sprintf(end, "%s/%d+", hp->name, hp->index); end += strlen(end); if (port == TRUE) { sprintf(endport, "%d+", hp->port); endport += strlen(endport); } next = hp->next; free(hp); } /* strip trailing '+' and assign pointers */ str[strlen(str) - 1] = '\0'; *str_ptr = str; if (port == TRUE) { portlist[strlen(portlist) - 1] = '\0'; *portstr_ptr = portlist; } return(PBSE_NONE); } /* END translate_howl_to_string() */ int translate_job_reservation_info_to_string( std::vector<job_reservation_info *> &host_info, int *NCount, std::stringstream &exec_host_output, std::stringstream *exec_port_output) { bool first = true; for (int hi_index = 0; hi_index < (int)host_info.size(); hi_index++) { job_reservation_info *jri = host_info[hi_index]; int jri_index; int jri_iterator = -1; (*NCount)++; while ((jri_index = jri->est.get_next_occupied_index(jri_iterator)) != -1) { if (first == false) { exec_host_output << "+"; if (exec_port_output != NULL) *exec_port_output << "+"; } first = false; exec_host_output << jri->node_name << "/" << jri_index; if (exec_port_output != NULL) *exec_port_output << jri->port; } } return(PBSE_NONE); } /* END translate_job_reservation_info_to_string() */ /* * free the struct that holds the information for where the job * will be placed **/ void free_naji( node_job_add_info *naji) { node_job_add_info *current = NULL; node_job_add_info *tmp = NULL; node_job_add_info *first = naji; current = naji; while (current != NULL) { tmp = current; current = current->next; free(tmp); if (current == first) break; } } /* END free_naji() */ /* * external nodes refers only to nodes outside of the cray * for jobs that also have cray compute nodes */ int record_external_node( job *pjob, struct pbsnode *pnode) { char *external_nodes; unsigned int len; if (pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str == NULL) { pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str = strdup(pnode->nd_name); pjob->ji_wattr[JOB_ATR_external_nodes].at_flags |= ATR_VFLAG_SET; } else { len = strlen(pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str) + strlen(pnode->nd_name) + 2; external_nodes = (char *)calloc(1, len); snprintf(external_nodes, len, "%s+%s", pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str, pnode->nd_name); free(pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str); pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str = external_nodes; } return(PBSE_NONE); } /* END record_external_node() */ /* * builds the hostlist based on the nodes=... part of the request */ int build_hostlist_nodes_req( job *pjob, /* M */ char *EMsg, /* O */ char *spec, /* I */ short newstate, /* I */ std::vector<job_reservation_info *> &host_info, /* O */ struct howl **gpu_list, /* O */ struct howl **mic_list, /* O */ node_job_add_info *naji, /* I - freed */ char *ProcBMStr) /* I */ { struct pbsnode *pnode = NULL; node_job_add_info *current; char log_buf[LOCAL_LOG_BUF_SIZE]; bool failure = false; current = naji; while (current != NULL) { if ((pnode = find_nodebyname(current->node_name)) != NULL) { if (failure == true) { /* just remove the marked request from the node */ pnode->nd_np_to_be_used -= current->ppn_needed; pnode->nd_ngpus_to_be_used -= current->gpu_needed; pnode->nd_nmics_to_be_used -= current->mic_needed; } else { job_reservation_info *host_single = place_subnodes_in_hostlist(pjob, pnode, current, ProcBMStr); if (host_single != NULL) { host_info.push_back(host_single); place_gpus_in_hostlist(pnode, pjob, current, gpu_list); place_mics_in_hostlist(pnode, pjob, current, mic_list); if (current->is_external == TRUE) { record_external_node(pjob, pnode); } } /* NOTE: continue through the loop if failure is true just to clean up amounts needed */ if ((naji->gpu_needed > 0) || (naji->ppn_needed > 0) || (naji->mic_needed > 0)) { failure = true; /* remove any remaining things marked on the node */ pnode->nd_np_to_be_used -= current->ppn_needed; pnode->nd_ngpus_to_be_used -= current->gpu_needed; pnode->nd_nmics_to_be_used -= current->mic_needed; } } unlock_node(pnode, __func__, NULL, LOGLEVEL); } current = current->next; } /* END processing reserved nodes */ free_naji(naji); if (failure == true) { /* did not satisfy the request */ if (EMsg != NULL) { sprintf(log_buf, "could not locate requested gpu resources '%.4000s' (node_spec failed) %s", spec, EMsg); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); } return(PBSE_RESCUNAV); } return(PBSE_NONE); } /* END build_hostlist_nodes_req() */ int build_hostlist_procs_req( job *pjob, /* M */ int procs, /* I */ short newstate, /* I */ std::vector<job_reservation_info *> &host_info) /* O */ { int procs_needed; node_iterator iter; struct pbsnode *pnode = NULL; /* did we have a request for procs? Do those now */ if (procs > 0) { /* check to see if a -l nodes request was made */ if (pjob->ji_have_nodes_request) { procs_needed = procs; } else { /* the qsub request used -l procs only. No -l nodes=x was given in the qsub request. TORQUE allocates 1 node by default if a -l nodes specification is not given. */ if (procs > 1) { procs_needed = procs - 1; } else procs_needed = 1; } reinitialize_node_iterator(&iter); while ((pnode = next_node(&allnodes,pnode,&iter)) != NULL) { int execution_slots_free = pnode->nd_slots.get_number_free(); if (execution_slots_free > 0) { job_reservation_info *node_info = (job_reservation_info *)calloc(1, sizeof(job_reservation_info)); if (pnode->nd_slots.reserve_execution_slots(execution_slots_free, node_info->est) == PBSE_NONE) { procs_needed -= execution_slots_free; host_info.push_back(node_info); node_info->port = pnode->nd_mom_rm_port; } } } /* END for each node */ } /* if (procs > 0) */ return(PBSE_NONE); } /* END build_hostlist_procs_req() */ int add_to_ms_list( char *node_name, job *pjob) { struct pbsnode *pnode = find_nodebyname(node_name); if (pnode != NULL) { insert_thing(pnode->nd_ms_jobs, strdup(pjob->ji_qs.ji_jobid)); unlock_node(pnode, __func__, NULL, LOGLEVEL); } return(PBSE_NONE); } /* END add_to_ms_list() */ void free_alps_req_data_array( alps_req_data *ard_array, int num_reqs) { int i; for (i = 0; i < num_reqs; i++) free_dynamic_string(ard_array[i].node_list); free(ard_array); } /* END free_alps_req_data_array() */ int add_multi_reqs_to_job( job *pjob, int num_reqs, alps_req_data *ard_array) { int i; dynamic_string *attr_str; char buf[MAXLINE]; if (ard_array == NULL) return(PBSE_NONE); attr_str = ard_array[0].node_list; for (i = 0; i < num_reqs; i++) { if (i != 0) { append_char_to_dynamic_string(attr_str, '|'); append_dynamic_string(attr_str, ard_array[i].node_list->str); } snprintf(buf, sizeof(buf), "*%d", ard_array[i].ppn); append_dynamic_string(attr_str, buf); } if (pjob->ji_wattr[JOB_ATR_multi_req_alps].at_val.at_str != NULL) free(pjob->ji_wattr[JOB_ATR_multi_req_alps].at_val.at_str); pjob->ji_wattr[JOB_ATR_multi_req_alps].at_val.at_str = strdup(attr_str->str); pjob->ji_wattr[JOB_ATR_multi_req_alps].at_flags |= ATR_VFLAG_SET; return(PBSE_NONE); } /* END add_multi_reqs_to_job() */ int free_hostinfo( std::vector<job_reservation_info *> &host_info) /* O */ { for (unsigned int i = 0; i < host_info.size(); i++) { job_reservation_info *host_single = host_info[i]; if (host_single != NULL) free(host_single); } return(PBSE_NONE); } /* * set_nodes() - Call node_spec() to allocate nodes then set them inuse. * Build list of allocated nodes to pass back in rtnlist. * Return: PBS error code */ int set_nodes( job *pjob, /* I */ char *spec, /* I */ int procs, /* I */ char **rtnlist, /* O */ char **rtnportlist, /* O */ char *FailHost, /* O (optional,minsize=1024) */ char *EMsg) /* O (optional,minsize=1024) */ { std::vector<job_reservation_info *> host_info; std::stringstream exec_hosts; std::stringstream exec_ports; /* job_reservation_info *gpu_info = NULL; job_reservation_info *mic_info = NULL; */ struct howl *gpu_list = NULL; struct howl *mic_list = NULL; int i; int rc; int NCount = 0; short newstate; char *login_prop = NULL; char *gpu_str = NULL; char *mic_str = NULL; char ProcBMStr[MAX_BM]; char log_buf[LOCAL_LOG_BUF_SIZE]; node_job_add_info *naji = NULL; alps_req_data *ard_array = NULL; int num_reqs = 0; long cray_enabled = FALSE; enum job_types job_type; int gpu_flags = 0; if (FailHost != NULL) FailHost[0] = '\0'; if (EMsg != NULL) EMsg[0] = '\0'; if (LOGLEVEL >= 3) { sprintf(log_buf, "allocating nodes for job %s with node expression '%.4000s'", pjob->ji_qs.ji_jobid, spec); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } ProcBMStr[0] = '\0'; #ifdef GEOMETRY_REQUESTS get_bitmap(pjob,sizeof(ProcBMStr),ProcBMStr); #endif /* GEOMETRY_REQUESTS */ naji = (node_job_add_info *)calloc(1, sizeof(node_job_add_info)); if (pjob->ji_wattr[JOB_ATR_login_prop].at_flags & ATR_VFLAG_SET) login_prop = pjob->ji_wattr[JOB_ATR_login_prop].at_val.at_str; /* allocate nodes */ if ((i = node_spec(spec, 1, 1, ProcBMStr, FailHost, naji, EMsg, login_prop, &ard_array, &num_reqs, job_type)) == 0) { /* no resources located, request failed */ if (EMsg != NULL) { sprintf(log_buf, "could not locate requested resources '%.4000s' (node_spec failed) %s", spec, EMsg); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); } free_naji(naji); free_alps_req_data_array(ard_array, num_reqs); return(PBSE_RESCUNAV); } else if (i < 0) { /* request failed, corrupt request */ log_err(PBSE_UNKNODE, __func__, "request failed, corrupt request"); free_naji(naji); free_alps_req_data_array(ard_array, num_reqs); return(PBSE_UNKNODE); } get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); if (cray_enabled == TRUE) { // JOB_TYPE_normal means no component from the Cray will be used if ((job_type != JOB_TYPE_normal) && (naji->next != NULL)) { pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str = strdup(naji->node_name); pjob->ji_wattr[JOB_ATR_login_node_id].at_flags = ATR_VFLAG_SET; } } newstate = INUSE_JOB; if ((rc = build_hostlist_nodes_req(pjob, EMsg, spec, newstate, host_info, &gpu_list, &mic_list, naji, ProcBMStr)) != PBSE_NONE) { free_nodes(pjob); free_alps_req_data_array(ard_array, num_reqs); free_hostinfo(host_info); return(rc); } if ((rc = build_hostlist_procs_req(pjob, procs, newstate, host_info)) != PBSE_NONE) { free_nodes(pjob); free_alps_req_data_array(ard_array, num_reqs); free_hostinfo(host_info); return(rc); } if (host_info.empty() == true) { if (LOGLEVEL >= 1) { sprintf(log_buf, "no nodes can be allocated to job %s", pjob->ji_qs.ji_jobid); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } if (EMsg != NULL) sprintf(EMsg, "no nodes can be allocated to job"); free_alps_req_data_array(ard_array, num_reqs); free_hostinfo(host_info); return(PBSE_RESCUNAV); } /* END if (host_info.size() == 0) */ pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HasNodes; /* indicate has nodes */ /* build list of allocated nodes, gpus, and ports */ rc = translate_job_reservation_info_to_string(host_info, &NCount, exec_hosts, &exec_ports); if (rc != PBSE_NONE) { free_alps_req_data_array(ard_array, num_reqs); free_hostinfo(host_info); return(rc); } *rtnlist = strdup(exec_hosts.str().c_str()); *rtnportlist = strdup(exec_ports.str().c_str()); // JOB_TYPE_normal means no component from the Cray will be used if ((cray_enabled == TRUE) && (job_type != JOB_TYPE_normal)) { char *plus = strchr(*rtnlist, '+'); /* only do this if there's more than one host in the host list */ if (plus != NULL) { char *to_free = *rtnlist; *plus = '\0'; *rtnlist = strdup(plus + 1); free(to_free); } } if (mic_list != NULL) { if ((rc = translate_howl_to_string(mic_list, EMsg, &NCount, &mic_str, NULL, FALSE)) != PBSE_NONE) { free_hostinfo(host_info); return(rc); } job_attr_def[JOB_ATR_exec_mics].at_free( &pjob->ji_wattr[JOB_ATR_exec_mics]); job_attr_def[JOB_ATR_exec_mics].at_decode( &pjob->ji_wattr[JOB_ATR_exec_mics], NULL, NULL, mic_str, 0); free(mic_str); } if (gpu_list != NULL) { if ((rc = translate_howl_to_string(gpu_list, EMsg, &NCount, &gpu_str, NULL, FALSE)) != PBSE_NONE) { free_alps_req_data_array(ard_array, num_reqs); free_hostinfo(host_info); return(rc); } job_attr_def[JOB_ATR_exec_gpus].at_free( &pjob->ji_wattr[JOB_ATR_exec_gpus]); job_attr_def[JOB_ATR_exec_gpus].at_decode( &pjob->ji_wattr[JOB_ATR_exec_gpus], NULL, NULL, gpu_str, 0); /* O */ free(gpu_str); if (gpu_mode_rqstd != -1) gpu_flags = gpu_mode_rqstd; if (gpu_err_reset) gpu_flags += 1000; if (gpu_flags >= 0) { pjob->ji_wattr[JOB_ATR_gpu_flags].at_val.at_long = gpu_flags; pjob->ji_wattr[JOB_ATR_gpu_flags].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; if (LOGLEVEL >= 7) { sprintf(log_buf, "setting gpu_flags for job %s to %d %ld", pjob->ji_qs.ji_jobid, gpu_flags, pjob->ji_wattr[JOB_ATR_gpu_flags].at_val.at_long); log_ext(-1, __func__, log_buf, LOG_DEBUG); } } } if (LOGLEVEL >= 3) { snprintf(log_buf, sizeof(log_buf), "job %s allocated %d nodes (nodelist=%.4000s)", pjob->ji_qs.ji_jobid, NCount, *rtnlist); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } add_multi_reqs_to_job(pjob, num_reqs, ard_array); free_alps_req_data_array(ard_array, num_reqs); free_hostinfo(host_info); /* SUCCESS */ return(PBSE_NONE); } /* END set_nodes() */ /* count the number of requested processors in a node spec * return processors requested on success * return -1 on error */ int procs_requested( char *spec) { char *str; char *globs; char *cp; char *hold; int num_nodes = 0; int num_procs = 1; int total_procs = 0; int num_gpus = 0; int num_mics = 0; int i; struct prop *prop = NULL; char *tmp_spec; char log_buf[LOCAL_LOG_BUF_SIZE]; tmp_spec = strdup(spec); if (tmp_spec == NULL) { /* FAILURE */ sprintf(log_buf,"cannot alloc memory"); if (LOGLEVEL >= 1) { log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } return(-2); } /* Check to see if we have a global modifier */ if ((globs = strchr(tmp_spec, '#')) != NULL) { *globs++ = '\0'; globs = strdup(globs); while ((cp = strrchr(globs, '#')) != NULL) { *cp++ = '\0'; hold = mod_spec(spec, cp); free(tmp_spec); tmp_spec = hold; } hold = mod_spec(tmp_spec, globs); free(tmp_spec); tmp_spec = hold; free(globs); } /* END if ((globs = strchr(spec,'#')) != NULL) */ str = tmp_spec; do { if ((i = number(&str, &num_nodes)) == -1) { free(tmp_spec); /* Bad string syntax. Fail */ return(-1); } if (i == 0) { /* number exists */ if (*str == ':') { /* there are properties */ str++; if (proplist(&str, &prop, &num_procs, &num_gpus, &num_mics)) { free(tmp_spec); return(-1); } } } else { /* no number */ num_nodes = 1; if (proplist(&str, &prop, &num_procs, &num_gpus, &num_mics)) { /* must be a prop list with no number in front */ free(tmp_spec); return(-1); } } total_procs += num_procs * num_nodes; } while(*str++ == '+'); free(tmp_spec); return(total_procs); } /* END procs_requested() */ /* * node_avail_complex - * *navail is set to number available * *nalloc is set to number allocated * *nresvd is set to number reserved * *ndown is set to number down/offline * return -1 on failure */ int node_avail_complex( char *spec, /* I - node spec */ int *navail, /* O - number available */ int *nalloc, /* O - number allocated */ int *nresvd, /* O - number reserved */ int *ndown) /* O - number down */ { int ret; enum job_types job_type; ret = node_spec(spec, 1, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, job_type); *navail = ret; *nalloc = 0; *nresvd = 0; *ndown = 0; return(ret); } /* END node_avail_complex() */ /* * node_avail - report if nodes requested are available * * Return 0 when no error in request and * *navail is set to number available * *nalloc is set to number allocated * *nresvd is set to number reserved * *ndown is set to number down/offline * !=0 error number when error in request */ int node_avail( char *spec, /* I - node spec */ int *navail, /* O - number available */ int *nalloc, /* O - number allocated */ int *nresvd, /* O - number reserved */ int *ndown) /* O - number down */ { int j; int holdnum; struct pbsnode *pn; char *pc; struct prop *prop = NULL; register int xavail; register int xalloc; register int xresvd; register int xdown; int node_req = 1; int gpu_req = 0; int mic_req = 0; node_iterator iter; if (spec == NULL) { log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, "no spec"); return(RM_ERR_NOPARAM); } pc = spec; if ((strchr(spec, (int)'+') == NULL) && (number(&pc, &holdnum) == 1)) { /* A simple node spec - reply with numbers of avaiable, */ /* allocated, reserved, and down nodes that match the */ /* the spec, null or simple number means all */ xavail = 0; xalloc = 0; xresvd = 0; xdown = 0; /* find number of a specific type of node */ if (*pc) { if (proplist(&pc, &prop, &node_req, &gpu_req, &mic_req)) { return(RM_ERR_BADPARAM); } } reinitialize_node_iterator(&iter); pn = NULL; while ((pn = next_node(&allnodes, pn, &iter)) != NULL) { if ((pn->nd_ntype == NTYPE_CLUSTER) && hasprop(pn, prop)) { if (pn->nd_state & (INUSE_OFFLINE | INUSE_DOWN)) ++xdown; else if (hasppn(pn, node_req, SKIP_ANYINUSE)) ++xavail; else if (hasppn(pn, node_req, SKIP_NONE)) { /* node has enough processors, are they busy or reserved? */ j = pn->nd_slots.get_total_execution_slots() - pn->nd_np_to_be_used; if (j >= node_req) ++xresvd; else ++xalloc; } } } /* END for each node */ free_prop(prop); *navail = xavail; *nalloc = xalloc; *nresvd = xresvd; *ndown = xdown; return(0); } else if (number(&pc, &holdnum) == -1) { /* invalid spec */ return(RM_ERR_BADPARAM); } /* not a simple spec - determine if supplied complex */ /* node spec can be satisified from avail nodes */ /* navail set to >0 if can be satified now */ /* 0 if not now but possible */ /* -l if never possible */ node_avail_complex(spec, navail, nalloc, nresvd, ndown); return(0); } /* END node_avail() */ /* * node_reserve - Reserve nodes * * Returns: >0 - reservation succeeded, number of nodes reserved * 0 - None or partial reservation * -1 - requested reservation impossible */ int node_reserve( char *nspec, /* In - a node specification */ resource_t tag) /* In/Out - tag for resource if reserved */ { struct pbsnode *pnode; int ret_val; node_iterator iter; char log_buf[LOCAL_LOG_BUF_SIZE]; node_job_add_info *naji = NULL; enum job_types job_type; DBPRT(("%s: entered\n", __func__)) if ((nspec == NULL) || (*nspec == '\0')) { log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, "no spec"); return(-1); } naji = (node_job_add_info *)calloc(1, sizeof(node_job_add_info)); if ((ret_val = node_spec(nspec, 0, 0, NULL, NULL, naji, NULL, NULL, NULL, NULL, job_type)) >= 0) { /* ** Zero or more of the needed Nodes are available to be ** reserved. */ reinitialize_node_iterator(&iter); pnode = NULL; while ((pnode = next_node(&allnodes,pnode,&iter)) != NULL) { if (pnode->nd_flag != thinking) { continue; /* skip this one */ } if (pnode->nd_np_to_be_used == pnode->nd_slots.get_total_execution_slots()) pnode->nd_state |= INUSE_RESERVE; } /* END for each node */ } else { /* could never satisfy the reservation */ snprintf(log_buf, sizeof(log_buf), "can never reserve %s", nspec); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } free_naji(naji); return(ret_val); } /* END node_reserve() */ char *get_next_exec_host( char **current) { char *name_ptr = *current; char *plus; char *slash; if (name_ptr != NULL) { if ((plus = strchr(name_ptr, '+')) != NULL) { *current = plus + 1; *plus = '\0'; } else *current = NULL; if ((slash = strchr(name_ptr, '/')) != NULL) *slash = '\0'; } return(name_ptr); } /* END get_next_exec_host() */ int remove_job_from_nodes_gpus( struct pbsnode *pnode, job *pjob) { struct gpusubn *gn; char *gpu_str = NULL; int i; char log_buf[LOCAL_LOG_BUF_SIZE]; char tmp_str[PBS_MAXHOSTNAME + 10]; char num_str[6]; if (pjob->ji_wattr[JOB_ATR_exec_gpus].at_flags & ATR_VFLAG_SET) gpu_str = pjob->ji_wattr[JOB_ATR_exec_gpus].at_val.at_str; if (gpu_str != NULL) { /* reset gpu nodes */ for (i = 0; i < pnode->nd_ngpus; i++) { gn = pnode->nd_gpusn + i; if (pnode->nd_gpus_real) { /* reset real gpu nodes */ strcpy (tmp_str, pnode->nd_name); strcat (tmp_str, "-gpu/"); sprintf (num_str, "%d", i); strcat (tmp_str, num_str); /* look thru the string and see if it has this host and gpuid. * exec_gpus string should be in format of * <hostname>-gpu/<index>[+<hostname>-gpu/<index>...] * * if we are using the gpu node exclusively or if shared mode and * this is last job assigned to this gpu then set it's state * unallocated so its available for a new job. Takes time to get the * gpu status report from the moms. */ if (strstr(gpu_str, tmp_str) != NULL) { gn->job_count--; if ((gn->mode == gpu_exclusive_thread) || (gn->mode == gpu_exclusive_process) || ((gn->mode == gpu_normal) && (gn->job_count == 0))) { gn->state = gpu_unallocated; if (LOGLEVEL >= 7) { sprintf(log_buf, "freeing node %s gpu %d for job %s", pnode->nd_name, i, pjob->ji_qs.ji_jobid); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } } } } else { if (!strcmp(gn->jobid, pjob->ji_qs.ji_jobid)) { gn->inuse = FALSE; memset(gn->jobid, 0, sizeof(gn->jobid)); pnode->nd_ngpus_free++; } } } } return(PBSE_NONE); } /* END remove_job_from_nodes_gpus() */ int remove_job_from_node( struct pbsnode *pnode, const char *jobid) { char log_buf[LOCAL_LOG_BUF_SIZE]; for (int i = 0; i < (int)pnode->nd_job_usages.size(); i++) { job_usage_info *jui = pnode->nd_job_usages[i]; if (!strcmp(jui->jobid, jobid)) { pnode->nd_slots.unreserve_execution_slots(jui->est); pnode->nd_job_usages.erase(pnode->nd_job_usages.begin() + i); if (LOGLEVEL >= 6) { sprintf(log_buf, "increased execution slot free count to %d of %d\n", pnode->nd_slots.get_number_free(), pnode->nd_slots.get_total_execution_slots()); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } pnode->nd_state &= ~INUSE_JOB; delete jui; break; } } return(PBSE_NONE); } /* END remove_job_from_node() */ /* * free_nodes - free nodes allocated to a job */ void free_nodes( job *pjob) /* I (modified) */ { struct pbsnode *pnode; char log_buf[LOCAL_LOG_BUF_SIZE]; char *exec_hosts = NULL; char *host_ptr = NULL; char *hostname; char *previous_hostname = NULL; if (LOGLEVEL >= 3) { sprintf(log_buf, "freeing nodes for job %s", pjob->ji_qs.ji_jobid); log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } if (pjob->ji_wattr[JOB_ATR_exec_host].at_flags & ATR_VFLAG_SET) { if (pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str != NULL) { exec_hosts = strdup(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str); host_ptr = exec_hosts; } } while ((hostname = get_next_exec_host(&host_ptr)) != NULL) { if ((previous_hostname) != NULL) { if (!strcmp(hostname, previous_hostname)) continue; } previous_hostname = hostname; if ((pnode = find_nodebyname(hostname)) != NULL) { remove_job_from_node(pnode, pjob->ji_qs.ji_jobid); remove_job_from_nodes_gpus(pnode, pjob); remove_job_from_nodes_mics(pnode, pjob); unlock_node(pnode, __func__, NULL, LOGLEVEL); } } free(exec_hosts); if (pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str != NULL) { if ((pnode = find_nodebyname(pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str)) != NULL) { remove_job_from_node(pnode, pjob->ji_qs.ji_jobid); unlock_node(pnode, __func__, NULL, LOGLEVEL); } } pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_HasNodes; return; } /* END free_nodes() */ struct pbsnode *get_compute_node( char *node_name) { struct pbsnode *ar = alps_reporter; struct pbsnode *compute_node = NULL; unsigned int i; unsigned int len = strlen(node_name); for (i = 0; i < len; i++) { if (isdigit(node_name[i]) == FALSE) { /* found a non-numeric character - not a compute node */ return(NULL); } } lock_node(ar, __func__, NULL, LOGLEVEL); compute_node = create_alps_subnode(ar, node_name); unlock_node(ar, __func__, NULL, LOGLEVEL); return(compute_node); } /* END get_compute_node() */ /* * set_one_old - set a named node as allocated to a job */ void set_one_old( char *name, job *pjob) { int index; struct pbsnode *pnode; char *pc; long cray_enabled = FALSE; if ((pc = strchr(name, (int)'/'))) { index = atoi(pc + 1); *pc = '\0'; } else { index = 0; } get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); pnode = find_nodebyname(name); if (cray_enabled == TRUE) { if (pnode == NULL) pnode = get_compute_node(name); if (pnode != NULL) { if (pnode->parent == alps_reporter) { while (index >= pnode->nd_slots.get_total_execution_slots()) { add_execution_slot(pnode); } } } } if (pnode != NULL) { bool found = false; /* Mark node as being IN USE ... */ for (int i = 0; i < (int)pnode->nd_job_usages.size(); i++) { job_usage_info *jui = pnode->nd_job_usages[i]; if (!strcmp(jui->jobid, pjob->ji_qs.ji_jobid)) { found = true; while (index >= jui->est.get_total_execution_slots()) jui->est.add_execution_slot(); jui->est.mark_as_used(index); pnode->nd_slots.mark_as_used(index); } } if (found == false) { job_usage_info *jui = new job_usage_info(pjob->ji_qs.ji_jobid); while (index >= jui->est.get_total_execution_slots()) jui->est.add_execution_slot(); jui->est.mark_as_used(index); pnode->nd_slots.mark_as_used(index); pnode->nd_job_usages.push_back(jui); } if (pnode->nd_slots.get_number_free() <= 0) pnode->nd_state |= INUSE_JOB; unlock_node(pnode, __func__, NULL, LOGLEVEL); } return; } /* END set_one_old() */ /* * set_old_nodes - set "old" nodes as in use - called from pbsd_init() * when recovering a job in the running state. */ void set_old_nodes( job *pjob) /* I (modified) */ { char *old; char *po; long cray_enabled = FALSE; if (pjob->ji_wattr[JOB_ATR_exec_host].at_flags & ATR_VFLAG_SET) { old = strdup(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str); if (old == NULL) { /* FAILURE - cannot alloc memory */ return; } while ((po = strrchr(old, (int)'+')) != NULL) { *po++ = '\0'; set_one_old(po, pjob); } set_one_old(old, pjob); free(old); } /* END if pjobs exec host is set */ /* record the job on the alps_login if cray_enabled */ get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); if ((cray_enabled == TRUE) && (pjob->ji_wattr[JOB_ATR_login_node_id].at_flags & ATR_VFLAG_SET)) { set_one_old(pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str, pjob); } return; } /* END set_old_nodes() */ job *get_job_from_job_usage_info( job_usage_info *jui, struct pbsnode *pnode) { job *pjob; unlock_node(pnode, __func__, NULL, LOGLEVEL); pjob = svr_find_job(jui->jobid, TRUE); lock_node(pnode, __func__, NULL, LOGLEVEL); return(pjob); } job *get_job_from_jobinfo( struct jobinfo *jp, struct pbsnode *pnode) { job *pjob; unlock_node(pnode, __func__, NULL, LOGLEVEL); pjob = svr_find_job(jp->jobid, TRUE); lock_node(pnode, __func__, NULL, LOGLEVEL); return(pjob); } /* END get_job_from_jobinfo() */ /* END node_manager.c */