/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ /* * svr_stat.c * * Functions relating to the Status Job, Status Queue, and * Status Server Batch Requests. */ #include /* the master config generated by configure */ #include "req_stat.h" #include #include #include #include "libpbs.h" #include #include #include "server_limits.h" #include "list_link.h" #include "attribute.h" #include "server.h" #include "queue.h" #include "credential.h" #include "batch_request.h" #include "pbs_job.h" #include "work_task.h" #include "pbs_error.h" #include "svrfunc.h" #include "net_connect.h" #include "pbs_nodes.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "u_tree.h" #include "array.h" #include "queue.h" #include "node_func.h" /* find_nodebyname */ #include "issue_request.h" /* issue_Drequest */ #include "../lib/Libutils/u_lock_ctl.h" /* lock_node, unlock_node */ #include "svr_connect.h" /* svr_connect */ #include "queue_func.h" /* find_queuebyname */ #include "reply_send.h" /* reply_send_svr */ #include "svr_func.h" /* get_svr_attr_* */ #include "alps_functions.h" #include "node_manager.h" /* tfind_addr */ #include "ji_mutex.h" #include "mutex_mgr.hpp" #include "unistd.h" #include "log.h" /* Global Data Items: */ extern struct all_jobs alljobs; extern struct all_jobs array_summary; extern struct server server; extern char server_name[]; extern attribute_def svr_attr_def[]; extern attribute_def que_attr_def[]; extern attribute_def job_attr_def[]; extern attribute_def node_attr_def[]; /* node attributes defs */ extern int pbs_mom_port; extern char *msg_init_norerun; extern int LOGLEVEL; extern pthread_mutex_t *netrates_mutex; pthread_mutex_t *poll_job_task_mutex; int max_poll_job_tasks; int current_poll_job_tasks = 0; /* Extern Functions */ int status_job(job *, struct batch_request *, svrattrl *, tlist_head *, int *); int status_attrib(svrattrl *, attribute_def *, pbs_attribute *, int, int, tlist_head *, int *, int); extern int status_nodeattrib(svrattrl *, attribute_def *, struct pbsnode *, int, int, tlist_head *, int*); extern int hasprop(struct pbsnode *, struct prop *); extern void rel_resc(job*); /* The following private support functions are included */ static void update_state_ct(pbs_attribute *, int *, char *); static int status_que(pbs_queue *, struct batch_request *, tlist_head *); int status_node(struct pbsnode *, struct batch_request *, int *, tlist_head *); static void req_stat_job_step2(struct stat_cntl *); #ifndef TMAX_JOB #define TMAX_JOB 999999999 #endif /* TMAX_JOB */ /** * req_stat_job - service the Status Job Request * * This request processes the request for status of a single job or * the set of jobs at a destination. * If SRV_ATR_PollJobs is not set or false (default), this takes three * steps because of running jobs being known to MOM: * 1. validate and setup the request (done here). * 2. for each candidate job which is running and for which there is no * current status, ask MOM for an update. * 3. form the reply for each candidate job and return it to the client. * If SRV_ATR_PollJobs is true, then we skip step 2. */ enum TJobStatTypeEnum { tjstNONE = 0, tjstJob, tjstQueue, tjstServer, tjstTruncatedQueue, tjstTruncatedServer, tjstSummarizeArraysServer, tjstSummarizeArraysQueue, tjstArray, tjstLAST }; int req_stat_job( struct batch_request *preq) /* ptr to the decoded request */ { struct stat_cntl *cntl; /* see svrfunc.h */ char *name; job *pjob = NULL; pbs_queue *pque = NULL; int rc = PBSE_NONE; long poll_jobs = 0; char log_buf[LOCAL_LOG_BUF_SIZE]; enum TJobStatTypeEnum type = tjstNONE; /* * first, validate the name of the requested object, either * a job, a queue, or the whole server. */ if (LOGLEVEL >= 7) { sprintf(log_buf, "note"); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); } /* FORMAT: name = { | | '' } */ name = preq->rq_ind.rq_status.rq_id; if (preq->rq_extend != NULL) { /* evaluate pbs_job_stat() 'extension' field */ if (!strncasecmp(preq->rq_extend, "truncated", strlen("truncated"))) { /* truncate response by 'max_report' */ type = tjstTruncatedServer; } else if (!strncasecmp(preq->rq_extend, "summarize_arrays", strlen("summarize_arrays"))) { type = tjstSummarizeArraysServer; } } /* END if (preq->rq_extend != NULL) */ if (isdigit((int)*name)) { /* status a single job */ if (is_array(name)) { if (type != tjstSummarizeArraysServer) { type = tjstArray; } } else { type = tjstJob; if ((pjob = svr_find_job(name, FALSE)) == NULL) { rc = PBSE_UNKJOBID; } else unlock_ji_mutex(pjob, __func__, "1", LOGLEVEL); } } else if (isalpha(name[0])) { if (type == tjstNONE) type = tjstQueue; else if (type == tjstSummarizeArraysServer) type = tjstSummarizeArraysQueue; else type = tjstTruncatedQueue; /* if found, this mutex is released later */ if ((pque = find_queuebyname(name)) == NULL) { rc = PBSE_UNKQUE; } } else if ((*name == '\0') || (*name == '@')) { /* status all jobs at server */ if (type == tjstNONE) type = tjstServer; } else { rc = PBSE_IVALREQ; } if (rc != 0) { /* is invalid - an error */ req_reject(rc, 0, preq, NULL, NULL); return(rc); } preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_Status; CLEAR_HEAD(preq->rq_reply.brp_un.brp_status); cntl = (struct stat_cntl *)calloc(1, sizeof(struct stat_cntl)); if (cntl == NULL) { if (pque != NULL) unlock_queue(pque, "req_stat_job", (char *)"no memory cntl", LOGLEVEL); req_reject(PBSE_SYSTEM, 0, preq, NULL, NULL); return(PBSE_SYSTEM); } if ((type == tjstTruncatedQueue) || (type == tjstTruncatedServer)) { if (pque != NULL) { unlock_queue(pque, __func__, "", LOGLEVEL); pque = NULL; } } cntl->sc_type = (int)type; cntl->sc_conn = -1; cntl->sc_pque = pque; cntl->sc_origrq = preq; cntl->sc_post = req_stat_job_step2; cntl->sc_jobid[0] = '\0'; /* cause "start from beginning" */ get_svr_attr_l(SRV_ATR_PollJobs, &poll_jobs); if (poll_jobs) cntl->sc_post = 0; /* we're not going to make clients wait */ req_stat_job_step2(cntl); /* go to step 2, see if running is current */ if (pque != NULL) unlock_queue(pque, "req_stat_job", (char *)"success", LOGLEVEL); free(cntl); return(PBSE_NONE); } /* END req_stat_job() */ /* * req_stat_job_step2 - continue with statusing of jobs * * This is re-entered after sending status requests to MOM. * * NOTE: because server job array and queue job arrays are basic linked * lists, this routine will report jobs in jobid order. * * if truncated listed are desired, should handle this by walking * all queues and reporting max_report jobs/queue * * Note, the funny initialization/advance of pjob in the "while" loop * comes from the fact we want to look at the "next" job on re-entry. * * @see req_stat_job() - parent * @see status_job() - child - build job record */ static void req_stat_job_step2( struct stat_cntl *cntl) /* I/O (free'd on return) */ { svrattrl *pal; job *pjob = NULL; struct batch_request *preq; struct batch_reply *preply; int rc = 0; enum TJobStatTypeEnum type; pbs_queue *pque = NULL; int exec_only = 0; int bad = 0; long DTime; /* delta time - only report full pbs_attribute list if J->MTime > DTime */ static svrattrl *dpal = NULL; int job_array_index = 0; job_array *pa = NULL; char log_buf[LOCAL_LOG_BUF_SIZE]; int iter; time_t time_now = time(NULL); long poll_jobs = 0; char job_id[PBS_MAXSVRJOBID+1]; int job_substate = -1; time_t job_momstattime = -1; preq = cntl->sc_origrq; type = (enum TJobStatTypeEnum)cntl->sc_type; preply = &preq->rq_reply; /* See pbs_server_attributes(1B) for details on "poll_jobs" behaviour */ if (dpal == NULL) { /* build 'delta' pbs_attribute list */ svrattrl *tpal; tlist_head dalist; int aindex; int atrlist[] = { JOB_ATR_jobname, JOB_ATR_resc_used, JOB_ATR_LAST }; CLEAR_LINK(dalist); for (aindex = 0;atrlist[aindex] != JOB_ATR_LAST;aindex++) { if ((tpal = attrlist_create("", "", 23)) == NULL) { return; } tpal->al_valln = atrlist[aindex]; if (dpal == NULL) dpal = tpal; append_link(&dalist, &tpal->al_link, tpal); } } /* END if (dpal == NULL) */ if (type == tjstArray) { pa = get_array(preq->rq_ind.rq_status.rq_id); if (pa == NULL) { req_reject(PBSE_UNKARRAYID, 0, preq, NULL, "unable to find array"); return; } } iter = -1; get_svr_attr_l(SRV_ATR_PollJobs, &poll_jobs); if (!poll_jobs) { /* polljobs not set - indicates we may need to obtain fresh data from MOM */ if (cntl->sc_jobid[0] == '\0') pjob = NULL; else pjob = svr_find_job(cntl->sc_jobid, FALSE); while (1) { if (pjob == NULL) { /* start from the first job */ if (type == tjstJob) { pjob = svr_find_job(preq->rq_ind.rq_status.rq_id, FALSE); } else if (type == tjstQueue) { pjob = next_job(cntl->sc_pque->qu_jobs,&iter); } else if (type == tjstArray) { job_array_index = 0; /* increment job_array_index until we find a non-null pointer or hit the end */ while (job_array_index < pa->ai_qs.array_size) { if (pa->job_ids[job_array_index] != NULL) { if ((pjob = svr_find_job(pa->job_ids[job_array_index], FALSE)) != NULL) { unlock_ji_mutex(pjob, __func__, "2", LOGLEVEL); break; } } job_array_index++; } } else { pjob = next_job(&alljobs,&iter); } } /* END if (pjob == NULL) */ else { strcpy(job_id, pjob->ji_qs.ji_jobid); unlock_ji_mutex(pjob, __func__, "3", LOGLEVEL); if (type == tjstJob) break; if (type == tjstQueue) pjob = next_job(cntl->sc_pque->qu_jobs,&iter); else if (type == tjstArray) { pjob = NULL; /* increment job_array_index until we find a non-null pointer or hit the end */ while (++job_array_index < pa->ai_qs.array_size) { if (pa->job_ids[job_array_index] != NULL) { if ((pjob = svr_find_job(pa->job_ids[job_array_index], FALSE)) != NULL) { unlock_ji_mutex(pjob, __func__, "3", LOGLEVEL); break; } } } } else pjob = next_job(&alljobs,&iter); } if (pjob == NULL) break; strcpy(job_id, pjob->ji_qs.ji_jobid); job_substate = pjob->ji_qs.ji_substate; job_momstattime = pjob->ji_momstat; strcpy(cntl->sc_jobid, job_id); unlock_ji_mutex(pjob, __func__, "4", LOGLEVEL); pjob = NULL; /* PBS_RESTAT_JOB defaults to 30 seconds */ if ((job_substate == JOB_SUBSTATE_RUNNING) && ((time_now - job_momstattime) > JobStatRate)) { /* go to MOM for status */ if ((rc = stat_to_mom(job_id, cntl)) == PBSE_MEM_MALLOC) break; if (rc != 0) { pjob = svr_find_job(job_id, FALSE); rc = 0; continue; } if (pa != NULL) unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); return; /* will pick up after mom replies */ } } /* END while(1) */ if (rc != 0) { if (pa != NULL) unlock_ai_mutex(pa, __func__, "2", LOGLEVEL); reply_free(preply); req_reject(rc, 0, preq, NULL, "cannot get update from mom"); return; } } /* END if (!server.sv_attr[SRV_ATR_PollJobs].at_val.at_long) */ /* * now ready for part 3, building the status reply, * loop through again */ if ((type == tjstSummarizeArraysQueue) || (type == tjstSummarizeArraysServer)) { /* No array can be owned for these options */ update_array_statuses(); } if (type == tjstJob) pjob = svr_find_job(preq->rq_ind.rq_status.rq_id, FALSE); else if (type == tjstQueue) pjob = next_job(cntl->sc_pque->qu_jobs,&iter); else if (type == tjstSummarizeArraysQueue) pjob = next_job(cntl->sc_pque->qu_jobs_array_sum,&iter); else if (type == tjstSummarizeArraysServer) pjob = next_job(&array_summary,&iter); else if (type == tjstArray) { job_array_index = -1; pjob = NULL; /* increment job_array_index until we find a non-null pointer or hit the end */ while (++job_array_index < pa->ai_qs.array_size) { if (pa->job_ids[job_array_index] != NULL) { if ((pjob = svr_find_job(pa->job_ids[job_array_index], FALSE)) != NULL) { break; } } } } else pjob = next_job(&alljobs,&iter); DTime = 0; if (preq->rq_extend != NULL) { char *ptr; /* FORMAT: { EXECQONLY | DELTA: } */ if (strstr(preq->rq_extend, EXECQUEONLY)) exec_only = 1; ptr = strstr(preq->rq_extend, "DELTA:"); if (ptr != NULL) { ptr += strlen("delta:"); DTime = strtol(ptr, NULL, 10); } } if ((type == tjstTruncatedServer) || (type == tjstTruncatedQueue)) { long sentJobCounter; long qjcounter; long qmaxreport; int iter = -1; /* loop through all queues */ while ((pque = next_queue(&svr_queues,&iter)) != NULL) { qjcounter = 0; if ((exec_only == 1) && (pque->qu_qs.qu_type != QTYPE_Execution)) { /* ignore routing queues */ unlock_queue(pque, __func__, "ignore queue", LOGLEVEL); continue; } if (((pque->qu_attr[QA_ATR_MaxReport].at_flags & ATR_VFLAG_SET) != 0) && (pque->qu_attr[QA_ATR_MaxReport].at_val.at_long >= 0)) { qmaxreport = pque->qu_attr[QA_ATR_MaxReport].at_val.at_long; } else { qmaxreport = TMAX_JOB; } if (LOGLEVEL >= 5) { sprintf(log_buf,"giving scheduler up to %ld idle jobs in queue %s\n", qmaxreport, pque->qu_qs.qu_name); log_event(PBSEVENT_SYSTEM,PBS_EVENTCLASS_QUEUE,pque->qu_qs.qu_name,log_buf); } sentJobCounter = 0; /* loop through jobs in queue */ if (pjob != NULL) unlock_ji_mutex(pjob, __func__, "5", LOGLEVEL); iter = -1; while ((pjob = next_job(pque->qu_jobs,&iter)) != NULL) { if ((qjcounter >= qmaxreport) && (pjob->ji_qs.ji_state == JOB_STATE_QUEUED)) { /* max_report of queued jobs reached for queue */ unlock_ji_mutex(pjob, __func__, "6", LOGLEVEL); continue; } pal = (svrattrl *)GET_NEXT(preq->rq_ind.rq_status.rq_attr); rc = status_job( pjob, preq, (pjob->ji_wattr[JOB_ATR_mtime].at_val.at_long >= DTime) ? pal : dpal, &preply->brp_un.brp_status, &bad); if ((rc != 0) && (rc != PBSE_PERM)) { req_reject(rc, bad, preq, NULL, NULL); if (pa != NULL) { unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); } unlock_ji_mutex(pjob, __func__, "7", LOGLEVEL); unlock_queue(pque, __func__, "perm", LOGLEVEL); return; } sentJobCounter++; if (pjob->ji_qs.ji_state == JOB_STATE_QUEUED) qjcounter++; unlock_ji_mutex(pjob, __func__, "8", LOGLEVEL); } /* END foreach (pjob from pque) */ if (LOGLEVEL >= 5) { sprintf(log_buf,"sent scheduler %ld total jobs for queue %s\n", sentJobCounter, pque->qu_qs.qu_name); log_event(PBSEVENT_SYSTEM,PBS_EVENTCLASS_QUEUE,pque->qu_qs.qu_name,log_buf); } unlock_queue(pque, __func__, "end while", LOGLEVEL); } /* END for (pque) */ if (pa != NULL) unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); reply_send_svr(preq); return; } /* END if ((type == tjstTruncatedServer) || ...) */ while (pjob != NULL) { /* go ahead and build the status reply for this job */ if (exec_only) { if (cntl->sc_pque != NULL) { if (cntl->sc_pque->qu_qs.qu_type != QTYPE_Execution) goto nextjob; } else { if (pa != NULL) pthread_mutex_unlock(pa->ai_mutex); pque = get_jobs_queue(&pjob); if (pa != NULL) pthread_mutex_lock(pa->ai_mutex); if ((pjob == NULL) || (pque == NULL)) goto nextjob; mutex_mgr pque_mutex = mutex_mgr(pque->qu_mutex, true); if (pque->qu_qs.qu_type != QTYPE_Execution) { goto nextjob; } } } pal = (svrattrl *)GET_NEXT(preq->rq_ind.rq_status.rq_attr); rc = status_job( pjob, preq, pal, &preply->brp_un.brp_status, &bad); if ((rc != 0) && (rc != PBSE_PERM)) { if (pa != NULL) { unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); } unlock_ji_mutex(pjob, __func__, "9", LOGLEVEL); req_reject(rc, bad, preq, NULL, NULL); return; } /* get next job */ nextjob: if (pjob != NULL) unlock_ji_mutex(pjob, __func__, "10", LOGLEVEL); if (type == tjstJob) break; if (type == tjstQueue) pjob = next_job(cntl->sc_pque->qu_jobs,&iter); else if (type == tjstSummarizeArraysQueue) pjob = next_job(cntl->sc_pque->qu_jobs_array_sum,&iter); else if (type == tjstSummarizeArraysServer) pjob = next_job(&array_summary,&iter); else if (type == tjstArray) { pjob = NULL; /* increment job_array_index until we find a non-null pointer or hit the end */ while (++job_array_index < pa->ai_qs.array_size) { if (pa->job_ids[job_array_index] != NULL) { if ((pjob = svr_find_job(pa->job_ids[job_array_index], FALSE)) != NULL) { break; } } } } else pjob = next_job(&alljobs,&iter); rc = 0; } /* END while (pjob != NULL) */ if (pa != NULL) { unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); } reply_send_svr(preq); if (LOGLEVEL >= 7) { log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, "req_statjob", "Successfully returned the status of queued jobs\n"); } return; } /* END req_stat_job_step2() */ /* * stat_to_mom - send a Job Status to MOM to obtain latest status. * Used by req_stat_job()/stat_step_2() * * Returns PBSE_SYSTEM if out of memory, PBSE_NORELYMOM if the MOM * is down, offline, or deleted. Otherwise returns result of MOM * contact request. * * NOTE: called by qstat if poll_jobs == False */ int stat_to_mom( char *job_id, struct stat_cntl *cntl) /* M */ { struct batch_request *newrq; int rc = PBSE_NONE; unsigned long addr; char log_buf[LOCAL_LOG_BUF_SIZE+1]; struct pbsnode *node; int handle = -1; unsigned long job_momaddr = -1; unsigned short job_momport = -1; char *job_momname = NULL; job *pjob = NULL; if ((pjob = svr_find_job(job_id, FALSE)) == NULL) return(PBSE_JOBNOTFOUND); mutex_mgr job_mutex(pjob->ji_mutex, true); if ((pjob->ji_qs.ji_un.ji_exect.ji_momaddr == 0) || (!pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str)) { job_mutex.unlock(); snprintf(log_buf, sizeof(log_buf), "Job %s missing MOM's information. Skipping statting on this job", pjob->ji_qs.ji_jobid); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); return PBSE_BAD_PARAMETER; } job_momaddr = pjob->ji_qs.ji_un.ji_exect.ji_momaddr; job_momport = pjob->ji_qs.ji_un.ji_exect.ji_momport; job_momname = strdup(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str); job_mutex.unlock(); if (job_momname == NULL) return PBSE_MEM_MALLOC; if ((newrq = alloc_br(PBS_BATCH_StatusJob)) == NULL) { free(job_momname); return PBSE_MEM_MALLOC; } if (cntl->sc_type == 1) snprintf(newrq->rq_ind.rq_status.rq_id, sizeof(newrq->rq_ind.rq_status.rq_id), "%s", job_id); else newrq->rq_ind.rq_status.rq_id[0] = '\0'; /* get stat of all */ CLEAR_HEAD(newrq->rq_ind.rq_status.rq_attr); /* if MOM is down just return stale information */ addr = job_momaddr; node = tfind_addr(addr,job_momport,job_momname); free(job_momname); if (node == NULL) return PBSE_UNKNODE; if (node->nd_state & INUSE_DOWN) { if (LOGLEVEL >= 6) { snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "node '%s' is allocated to job but in state 'down'", node->nd_name); log_event(PBSEVENT_SYSTEM,PBS_EVENTCLASS_JOB,job_id,log_buf); } unlock_node(node, __func__, "no rely mom", LOGLEVEL); free_br(newrq); return PBSE_NORELYMOM; } /* get connection to MOM */ unlock_node(node, __func__, "before svr_connect", LOGLEVEL); handle = svr_connect(job_momaddr, job_momport, &rc, NULL, NULL, ToServerDIS); if (handle >= 0) { if ((rc = issue_Drequest(handle, newrq)) == PBSE_NONE) { stat_update(newrq, cntl); } } else rc = PBSE_CONNECT; if (rc == PBSE_SYSTEM) rc = PBSE_MEM_MALLOC; free_br(newrq); return(rc); } /* END stat_to_mom() */ /* * stat_update - take reply to status request from MOM and update job status */ void stat_update( struct batch_request *preq, struct stat_cntl *cntl) { job *pjob; struct batch_reply *preply; struct brp_status *pstatus; svrattrl *sattrl; int oldsid; int bad = 0; time_t time_now = time(NULL); char *msg_ptr = NULL; char log_buf[LOCAL_LOG_BUF_SIZE]; preply = &preq->rq_reply; if (preply->brp_un.brp_txt.brp_str != NULL) { msg_ptr = strstr(preply->brp_un.brp_txt.brp_str, PBS_MSG_EQUAL); if (msg_ptr != NULL) msg_ptr += strlen(PBS_MSG_EQUAL); } if (preply->brp_choice == BATCH_REPLY_CHOICE_Status) { pstatus = (struct brp_status *)GET_NEXT(preply->brp_un.brp_status); while (pstatus != NULL) { if ((pjob = svr_find_job(pstatus->brp_objname, FALSE)) != NULL) { mutex_mgr job_mutex(pjob->ji_mutex, true); sattrl = (svrattrl *)GET_NEXT(pstatus->brp_attr); oldsid = pjob->ji_wattr[JOB_ATR_session_id].at_val.at_long; modify_job_attr( pjob, sattrl, ATR_DFLAG_MGWR | ATR_DFLAG_SvWR, &bad); if (oldsid != pjob->ji_wattr[JOB_ATR_session_id].at_val.at_long) { /* first save since running job (or the sid has changed), */ /* must save session id */ job_save(pjob, SAVEJOB_FULL, 0); } #ifdef USESAVEDRESOURCES else { /* save so we can recover resources used */ job_save(pjob, SAVEJOB_FULL, 0); } #endif /* USESAVEDRESOURCES */ pjob->ji_momstat = time_now; } pstatus = (struct brp_status *)GET_NEXT(pstatus->brp_stlink); } /* END while (pstatus != NULL) */ } /* END if (preply->brp_choice == BATCH_REPLY_CHOICE_Status) */ else if ((preply->brp_choice == BATCH_REPLY_CHOICE_Text) && (preply->brp_code == PBSE_UNKJOBID) && (msg_ptr != NULL) && (!strcmp(msg_ptr, preq->rq_ind.rq_status.rq_id))) { /* we sent a stat request, but mom says it doesn't know anything about the job */ if ((pjob = svr_find_job(preq->rq_ind.rq_status.rq_id, FALSE)) != NULL) { /* job really isn't running any more - mom doesn't know anything about it this can happen if a diskless node reboots and the mom_priv/jobs directory is cleared, set its state to queued so job_abt doesn't think it is still running */ mutex_mgr job_mutex(pjob->ji_mutex, true); unsigned long delta = time(NULL) - pjob->ji_last_reported_time; if (delta > JOB_REPORTED_ABORT_DELTA) { snprintf(log_buf, sizeof(log_buf), "mother superior no longer recognizes %s as a valid job, aborting. Last reported time was %ld", preq->rq_ind.rq_status.rq_id, pjob->ji_last_reported_time); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); svr_setjobstate(pjob, JOB_STATE_QUEUED, JOB_SUBSTATE_ABORT, FALSE); rel_resc(pjob); job_mutex.set_lock_on_exit(false); job_abt(&pjob, "Job does not exist on node"); /* TODO, if the job is rerunnable we should set its state back to queued */ } else { snprintf(log_buf, sizeof(log_buf), "Unknown job message from mother superior appears to be in error, reported %d seconds ago", (int)delta); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); } } } else { snprintf(log_buf, sizeof(log_buf), "Poll job request failed for job %s", preq->rq_ind.rq_status.rq_id); log_err(preply->brp_code, __func__, log_buf); } cntl->sc_conn = -1; if (cntl->sc_post) cntl->sc_post(cntl); /* continue where we left off */ /* If sc_post has a value it is: * req_stat_job_step2 * if so, it expects cntl to be free'd after the call */ free(cntl); /* a bit of a kludge but its saves an extra func */ return; } /* END stat_update() */ /* * stat_mom_job - status a single job running under a MOM * This is used after sending a job to mom to get the session id */ void stat_mom_job( char *job_id) { struct stat_cntl *cntl; cntl = (struct stat_cntl *)calloc(1, sizeof(struct stat_cntl)); if (cntl == NULL) { return; } cntl->sc_type = 1; cntl->sc_conn = -1; cntl->sc_pque = (pbs_queue *)0; cntl->sc_origrq = (struct batch_request *)0; cntl->sc_post = 0; /* tell stat_update() to free cntl */ cntl->sc_jobid[0] = '\0'; /* cause "start from beginning" */ if (stat_to_mom(job_id, cntl) != 0) { free(cntl); } /* if not an error, cntl free'd in stat_update() */ return; } /* END stat_mom_job() */ /** * poll _job_task * * The invocation of this routine is triggered from * the pbs_server main_loop code. The check of * SRV_ATR_PollJobs appears to be redundant. */ void poll_job_task( struct work_task *ptask) { char *job_id = (char *)ptask->wt_parm1; job *pjob; time_t time_now = time(NULL); long poll_jobs = 0; int job_state = -1; char log_buf[LOCAL_LOG_BUF_SIZE]; if (job_id != NULL) { pjob = svr_find_job(job_id, FALSE); if (pjob != NULL) { mutex_mgr job_mutex(pjob->ji_mutex, true); job_state = pjob->ji_qs.ji_state; job_mutex.unlock(); get_svr_attr_l(SRV_ATR_PollJobs, &poll_jobs); if ((poll_jobs) && (job_state == JOB_STATE_RUNNING)) { /* we need to throttle the number of outstanding threads are doing job polling. This prevents a problem where pbs_server gets hung waiting on I/O from the mom */ pthread_mutex_lock(poll_job_task_mutex); if (current_poll_job_tasks < max_poll_job_tasks) { if ((pjob->ji_qs.ji_un.ji_exect.ji_momaddr == 0) || (!pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str)) { pthread_mutex_unlock(poll_job_task_mutex); snprintf(log_buf, sizeof(log_buf), "Job %s missing MOM's information. Skipping polling on this job", pjob->ji_qs.ji_jobid); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); } else { current_poll_job_tasks++; pthread_mutex_unlock(poll_job_task_mutex); stat_mom_job(job_id); pthread_mutex_lock(poll_job_task_mutex); current_poll_job_tasks--; } } pthread_mutex_unlock(poll_job_task_mutex); /* add another task */ set_task(WORK_Timed, time_now + JobStatRate, poll_job_task, strdup(job_id), FALSE); } } free(job_id); } free(ptask->wt_mutex); free(ptask); } /* END poll_job_task() */ /* * req_stat_que - service the Status Queue Request * * This request processes the request for status of a single queue or * the set of queues at a destinaion. */ int req_stat_que( struct batch_request *preq) { char *name; pbs_queue *pque = NULL; struct batch_reply *preply; int rc = 0; int type = 0; char log_buf[LOCAL_LOG_BUF_SIZE+1]; /* * first, validate the name of the requested object, either * a queue, or null for all queues */ name = preq->rq_ind.rq_status.rq_id; if ((*name == '\0') || (*name == '@')) { type = 1; } else { pque = find_queuebyname(name); if (pque == NULL) { rc = PBSE_UNKQUE; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot locate queue %s", name); req_reject(rc, 0, preq, NULL, log_buf); return rc; } } preply = &preq->rq_reply; preply->brp_choice = BATCH_REPLY_CHOICE_Status; CLEAR_HEAD(preply->brp_un.brp_status); if (type == 0) { /* get status of the named queue */ mutex_mgr pque_mutex = mutex_mgr(pque->qu_mutex, true); rc = status_que(pque, preq, &preply->brp_un.brp_status); /* pque_qu_mutex will be unlocked in the destructor when we leave this scope */ } else { /* pque == NULL before next_queue */ int iter = -1; /* get status of all queues */ while ((pque = next_queue(&svr_queues,&iter)) != NULL) { mutex_mgr pque_mutex = mutex_mgr(pque->qu_mutex, true); rc = status_que(pque, preq, &preply->brp_un.brp_status); if (rc != 0) { if (rc != PBSE_PERM) { break; } rc = 0; } } } if (rc != PBSE_NONE) { reply_free(preply); req_reject(PBSE_NOATTR, rc, preq, NULL, "status_queue failed"); } else { reply_send_svr(preq); } return rc; } /* END req_stat_que() */ /* * status_que - Build the status reply for a single queue. */ static int status_que( pbs_queue *pque, /* ptr to que to status */ struct batch_request *preq, tlist_head *pstathd) /* head of list to append status to */ { struct brp_status *pstat; svrattrl *pal; int rc = PBSE_NONE; int bad = 0; if ((preq->rq_perm & ATR_DFLAG_RDACC) == 0) { return(PBSE_PERM); } /* ok going to do status, update count and state counts from qu_qs */ pque->qu_attr[QA_ATR_TotalJobs].at_val.at_long = pque->qu_numjobs; pque->qu_attr[QA_ATR_TotalJobs].at_flags |= ATR_VFLAG_SET; update_state_ct( &pque->qu_attr[QA_ATR_JobsByState], pque->qu_njstate, pque->qu_jobstbuf); /* allocate status sub-structure and fill in header portion */ pstat = (struct brp_status *)calloc(1, sizeof(struct brp_status)); if (pstat == NULL) { return(PBSE_SYSTEM); } memset(pstat, 0, sizeof(struct brp_status)); pstat->brp_objtype = MGR_OBJ_QUEUE; strcpy(pstat->brp_objname, pque->qu_qs.qu_name); CLEAR_LINK(pstat->brp_stlink); CLEAR_HEAD(pstat->brp_attr); append_link(pstathd, &pstat->brp_stlink, pstat); /* add attributes to the status reply */ pal = (svrattrl *)GET_NEXT(preq->rq_ind.rq_status.rq_attr); if ((rc = status_attrib( pal, que_attr_def, pque->qu_attr, QA_ATR_LAST, preq->rq_perm, &pstat->brp_attr, &bad, 1)) != PBSE_NONE) /* IsOwner == TRUE */ { return(rc); } return(PBSE_NONE); } /* END status_que() */ /* instead of getting the status on a node with numa nodes, report * the status of all the numa nodes * * @param pnode - the node to report on * @param preq - the batch request * @param pstathd - the list to add this response to * * @return - 0 on SUCCESS, error code otherwise */ int get_numa_statuses( struct pbsnode *pnode, /* ptr to node receiving status query */ struct batch_request *preq, int *bad, /* O */ tlist_head *pstathd) /* head of list to append status to */ { int i; int rc = 0; struct pbsnode *pn; if (pnode->num_node_boards == 0) { /* no numa nodes, just return the status for this node */ rc = status_node(pnode, preq, bad, pstathd); return(rc); } for (i = 0; i < pnode->num_node_boards; i++) { pn = AVL_find(i,pnode->nd_mom_port,pnode->node_boards); if (pn == NULL) continue; lock_node(pn, __func__, NULL, LOGLEVEL); rc = status_node(pn, preq, bad, pstathd); unlock_node(pn, __func__, NULL, LOGLEVEL); if (rc != PBSE_NONE) { return(rc); } } return(rc); } /* END get_numa_statuses() */ /* * req_stat_node - service the Status Node Request * * This request processes the request for status of a single node or * set of nodes at a destination. */ int req_stat_node( struct batch_request *preq) { char *name; int rc = PBSE_NONE; int type = 0; int bad = 0; struct pbsnode *pnode = NULL; struct batch_reply *preply; struct prop props; svrattrl *pal; /* * first, check that the server indeed has a list of nodes * and if it does, validate the name of the requested object-- * either name is that of a specific node, or name[0] is null/@ * meaning request is for all nodes in the server's jurisdiction */ if (LOGLEVEL >= 6) { log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, "entered"); } if (svr_totnodes <= 0) { rc = PBSE_NONODES; req_reject(rc, 0, preq, NULL, "node list is empty - check 'server_priv/nodes' file"); return rc; } name = preq->rq_ind.rq_status.rq_id; if ((*name == '\0') || (*name == '@')) { type = 1; } else if ((*name == ':') && (*(name + 1) != '\0')) { if (!strcmp(name + 1, "ALL")) { type = 1; /* psuedo-group for all nodes */ } else { type = 2; props.name = name + 1; props.mark = 1; props.next = NULL; } } preply = &preq->rq_reply; preply->brp_choice = BATCH_REPLY_CHOICE_Status; CLEAR_HEAD(preply->brp_un.brp_status); if (type == 0) { /* get status of the named node */ pnode = find_nodebyname(name); if (pnode == NULL) { rc = PBSE_UNKNODE; req_reject(rc, 0, preq, NULL, "cannot locate specified node"); return(rc); } /* get the status on all of the numa nodes */ if (pnode->nd_is_alps_reporter == TRUE) rc = get_alps_statuses(pnode, preq, &bad, &preply->brp_un.brp_status); else rc = get_numa_statuses(pnode, preq, &bad, &preply->brp_un.brp_status); unlock_node(pnode, __func__, "type == 0", LOGLEVEL); } else { /* get status of all or several nodes */ int iter = -1; while ((pnode = next_host(&allnodes,&iter,NULL)) != NULL) { if ((type == 2) && (!hasprop(pnode, &props))) { unlock_node(pnode, __func__, "type != 0, next_host", LOGLEVEL); continue; } /* get the status on all of the numa nodes */ if (pnode->nd_is_alps_reporter == TRUE) rc = get_alps_statuses(pnode, preq, &bad, &preply->brp_un.brp_status); else rc = get_numa_statuses(pnode, preq, &bad, &preply->brp_un.brp_status); if (rc != PBSE_NONE) { unlock_node(pnode, __func__, "type != 0, rc != 0, get_numa_statuses", LOGLEVEL); break; } unlock_node(pnode, __func__, "type != 0, rc == 0, get_numa_statuses", LOGLEVEL); } } if (rc == PBSE_NONE) { /* SUCCESS */ reply_send_svr(preq); } else { if (rc != PBSE_UNKNODEATR) { req_reject(rc, 0, preq, NULL, NULL); } else { pal = (svrattrl *)GET_NEXT(preq->rq_ind.rq_status.rq_attr); reply_badattr(rc, bad, pal, preq); } } return(rc); } /* END req_stat_node() */ /* * status_node - Build the status reply for a single node. */ int status_node( struct pbsnode *pnode, /* ptr to node receiving status query */ struct batch_request *preq, int *bad, /* O */ tlist_head *pstathd) /* head of list to append status to */ { int rc = 0; struct brp_status *pstat; svrattrl *pal; if ((preq->rq_perm & ATR_DFLAG_RDACC) == 0) { return(PBSE_PERM); } /* allocate status sub-structure and fill in header portion */ pstat = (struct brp_status *)calloc(1, sizeof(struct brp_status)); if (pstat == NULL) { return(PBSE_SYSTEM); } pstat->brp_objtype = MGR_OBJ_NODE; strncpy(pstat->brp_objname, pnode->nd_name, sizeof(pstat->brp_objname)-1); CLEAR_LINK(pstat->brp_stlink); CLEAR_HEAD(pstat->brp_attr); /*add this new brp_status structure to the list hanging off*/ /*the request's reply substructure */ append_link(pstathd, &pstat->brp_stlink, pstat); /*point to the list of node-attributes about which we want status*/ /*hang that status information from the brp_attr field for this */ /*brp_status structure */ *bad = 0; /*global variable*/ if (preq->rq_ind.rq_status.rq_attr.ll_struct != NULL) pal = (svrattrl *)GET_NEXT(preq->rq_ind.rq_status.rq_attr); else pal = NULL; rc = status_nodeattrib( pal, node_attr_def, pnode, ND_ATR_LAST, preq->rq_perm, &pstat->brp_attr, bad); return(rc); } /* END status_node() */ /* * req_stat_svr - service the Status Server Request * * This request processes the request for status of the Server */ int req_stat_svr( struct batch_request *preq) /* ptr to the decoded request */ { svrattrl *pal; struct batch_reply *preply; struct brp_status *pstat; int bad = 0; char nc_buf[128]; int numjobs; int netrates[3]; memset(netrates, 0, sizeof(netrates)); /* update count and state counts from sv_numjobs and sv_jobstates */ lock_sv_qs_mutex(server.sv_qs_mutex, __func__); numjobs = server.sv_qs.sv_numjobs; unlock_sv_qs_mutex(server.sv_qs_mutex, __func__); pthread_mutex_lock(server.sv_attr_mutex); server.sv_attr[SRV_ATR_TotalJobs].at_val.at_long = numjobs; server.sv_attr[SRV_ATR_TotalJobs].at_flags |= ATR_VFLAG_SET; pthread_mutex_lock(server.sv_jobstates_mutex); update_state_ct( &server.sv_attr[SRV_ATR_JobsByState], server.sv_jobstates, server.sv_jobstbuf); pthread_mutex_unlock(server.sv_jobstates_mutex); netcounter_get(netrates); snprintf(nc_buf, 127, "%d %d %d", netrates[0], netrates[1], netrates[2]); if (server.sv_attr[SRV_ATR_NetCounter].at_val.at_str != NULL) free(server.sv_attr[SRV_ATR_NetCounter].at_val.at_str); server.sv_attr[SRV_ATR_NetCounter].at_val.at_str = strdup(nc_buf); if (server.sv_attr[SRV_ATR_NetCounter].at_val.at_str != NULL) server.sv_attr[SRV_ATR_NetCounter].at_flags |= ATR_VFLAG_SET; pthread_mutex_unlock(server.sv_attr_mutex); /* allocate a reply structure and a status sub-structure */ preply = &preq->rq_reply; preply->brp_choice = BATCH_REPLY_CHOICE_Status; CLEAR_HEAD(preply->brp_un.brp_status); pstat = (struct brp_status *)calloc(1, sizeof(struct brp_status)); if (pstat == NULL) { reply_free(preply); req_reject(PBSE_SYSTEM, 0, preq, NULL, NULL); pthread_mutex_unlock(server.sv_attr_mutex); return(PBSE_SYSTEM); } CLEAR_LINK(pstat->brp_stlink); strcpy(pstat->brp_objname, server_name); pstat->brp_objtype = MGR_OBJ_SERVER; CLEAR_HEAD(pstat->brp_attr); append_link(&preply->brp_un.brp_status, &pstat->brp_stlink, pstat); /* add attributes to the status reply */ pal = (svrattrl *)GET_NEXT(preq->rq_ind.rq_status.rq_attr); if (status_attrib( pal, svr_attr_def, server.sv_attr, SRV_ATR_LAST, preq->rq_perm, &pstat->brp_attr, &bad, 1)) /* IsOwner == TRUE */ { reply_badattr(PBSE_NOATTR, bad, pal, preq); } else { reply_send_svr(preq); } return(PBSE_NONE); } /* END req_stat_svr() */ /* DIAGTODO: write req_stat_diag() */ /* * update-state_ct - update the count of jobs per state (in queue and server * attributes. */ static void update_state_ct( pbs_attribute *pattr, int *ct_array, char *buf) { static const char *statename[] = { "Transit", "Queued", "Held", "Waiting", "Running", "Exiting", "Complete" }; int index; buf[0] = '\0'; for (index = 0; index < PBS_NUMJOBSTATE; index++) { sprintf(buf + strlen(buf), "%s:%d ", statename[index], *(ct_array + index)); } pattr->at_val.at_str = buf; pattr->at_flags |= ATR_VFLAG_SET; } /* END update_state_ct() */