/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ /* * req_runjob.c - functions dealing with a Run Job Request */ #include /* the master config generated by configure */ #include #include #include #include #include #include #include #include "libpbs.h" #include "server_limits.h" #include "list_link.h" #include "attribute.h" #include "resource.h" #include "server.h" #include "credential.h" #include "batch_request.h" #include "pbs_job.h" #include "pbs_nodes.h" #include "queue.h" #include "work_task.h" #include "pbs_error.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "acct.h" #include "dis.h" #include "svrfunc.h" #include "net_connect.h" #include "pbs_proto.h" #include "array.h" #include "threadpool.h" #include "node_func.h" /* find_nodebyname */ #include "../lib/Libutils/u_lock_ctl.h" /* unlock_node */ #include "svr_func.h" /* get_svr_attr_* */ #include "req_stat.h" /* stat_mom_job */ #include "ji_mutex.h" #include "mutex_mgr.hpp" #ifdef HAVE_NETINET_IN_H #include #endif /* HAVE_NETINET_IN_H */ /* External Functions Called: */ extern int send_job_work(char *job_id,char *,int,int *,struct batch_request *); extern void set_resc_assigned(job *, enum batch_op); extern struct batch_request *cpy_stage(struct batch_request *, job *, enum job_atr, int); void stream_eof(int, u_long, uint16_t, int); extern int job_set_wait(pbs_attribute *, void *, int); extern int LOGLEVEL; /* Public Functions in this file */ int svr_startjob(job *, struct batch_request **, char *, char *); /* Private Functions local to this file */ int svr_stagein(job **, struct batch_request **, int, int); int svr_strtjob2(job **, struct batch_request *); job *chk_job_torun(struct batch_request *, int); int assign_hosts(job *, char *, int, char *, char *); /* Global Data Items: */ extern pbs_net_t pbs_mom_addr; extern int pbs_mom_port; extern struct server server; extern char server_host[PBS_MAXHOSTNAME + 1]; extern char server_name[PBS_MAXSERVERNAME + 1]; extern char *msg_badexit; extern char *msg_jobrun; extern char *msg_manager; extern char *msg_stageinfail; extern int scheduler_jobct; extern int scheduler_sock; extern pthread_mutex_t *scheduler_sock_jobct_mutex; extern int svr_totnodes; /* non-zero if using nodes */ extern const char *PJobSubState[]; extern unsigned int pbs_rm_port; extern char *msg_err_malloc; pthread_mutex_t *dispatch_mutex = NULL; long DispatchTime[20]; job *DispatchJob[20]; char *DispatchNode[20]; extern job *chk_job_request(char *, struct batch_request *); extern struct batch_request *cpy_checkpoint(struct batch_request *, job *, enum job_atr, int); void poll_job_task(work_task *); int kill_job_on_mom(char *jobid, struct pbsnode *pnode); void *check_and_run_job( void *vp) { batch_request *preq = (batch_request *)vp; job *pjob; int *rc_ptr = (int *)calloc(1, sizeof(int)); char failhost[MAXLINE]; char emsg[MAXLINE]; long job_atr_hold; int job_exit_status; int job_state; char job_id[PBS_MAXSVRJOBID+1]; struct badplace *bp; char log_buf[LOCAL_LOG_BUF_SIZE + 1]; *rc_ptr = PBSE_NONE; pjob = svr_find_job(preq->rq_ind.rq_run.rq_jid, FALSE); if (pjob == NULL) { req_reject(PBSE_JOBNOTFOUND, 0, preq, NULL, "Job unexpectedly deleted"); *rc_ptr = PBSE_JOBNOTFOUND; return(rc_ptr); } mutex_mgr job_mutex(pjob->ji_mutex, true); strcpy(job_id, pjob->ji_qs.ji_jobid); /* if the job is part of an array, check the slot limit */ if ((pjob->ji_arraystructid[0] != '\0') && (pjob->ji_is_array_template == FALSE)) { job_array *pa = get_jobs_array(&pjob); if (pjob == NULL) { job_mutex.set_lock_on_exit(false); req_reject(PBSE_JOBNOTFOUND, 0, preq, NULL, "Job unexpectedly deleted"); *rc_ptr = PBSE_JOBNOTFOUND; return(rc_ptr); } if (pa != NULL) { mutex_mgr array_mutex(pa->ai_mutex, true); if ((pa->ai_qs.slot_limit < 0) || (pa->ai_qs.slot_limit > pa->ai_qs.jobs_running)) { job_atr_hold = pjob->ji_wattr[JOB_ATR_hold].at_val.at_long; job_exit_status = pjob->ji_qs.ji_un.ji_exect.ji_exitstat; job_state = pjob->ji_qs.ji_state; job_mutex.unlock(); update_array_values(pa,job_state,aeRun, job_id, job_atr_hold, job_exit_status); if ((pjob = svr_find_job(job_id, FALSE)) == NULL) { req_reject(PBSE_JOBNOTFOUND, 0, preq, NULL, "Job deleted while updating array values"); *rc_ptr = PBSE_JOBNOTFOUND; return(rc_ptr); } else job_mutex.mark_as_locked(); } else { snprintf(log_buf,sizeof(log_buf), "Cannot run job. Array slot limit is %d and there are already %d jobs running\n", pa->ai_qs.slot_limit, pa->ai_qs.jobs_running); free_nodes(pjob); req_reject(PBSE_IVALREQ, 0, preq, NULL, log_buf); *rc_ptr = PBSE_IVALREQ; return(rc_ptr); } } } /* NOTE: nodes assigned to job in svr_startjob() */ *rc_ptr = svr_startjob(pjob, &preq, failhost, emsg); if ((*rc_ptr != 0) && (preq != NULL)) { free_nodes(pjob); /* if the job has a non-empty rejectdest list, pass the first host into req_reject() */ if ((bp = (badplace *)GET_NEXT(pjob->ji_rejectdest)) != NULL) { req_reject(*rc_ptr, 0, preq, bp->bp_dest, "could not contact host"); } else { req_reject(*rc_ptr, 0, preq, failhost, emsg); } } return(rc_ptr); } /* END check_and_run_job() */ /* * req_runjob - service the Run Job and Async Run Job Requests * * This request forces a job into execution. Client must be privileged. */ int req_runjob( batch_request *preq) /* I (modified) */ { job *pjob; int rc = PBSE_NONE; int setneednodes; int *rc_ptr; char log_buf[LOCAL_LOG_BUF_SIZE + 1]; /* chk_job_torun will extract job id and assign hostlist if specified */ if (preq == NULL) return(PBSE_UNKJOBID); if (getenv("TORQUEAUTONN")) setneednodes = 1; else setneednodes = 0; if ((pjob = chk_job_torun(preq, setneednodes)) == NULL) { /* FAILURE - chk_job_torun performs req_reject internally */ return(PBSE_UNKJOBID); } /* we don't currently allow running of an entire job array */ if (strstr(pjob->ji_qs.ji_jobid,"[]") != NULL) { unlock_ji_mutex(pjob, __func__, "1", LOGLEVEL); req_reject(PBSE_IVALREQ, 0, preq, NULL, "cannot run a job array"); return(PBSE_IVALREQ); } pthread_mutex_lock(scheduler_sock_jobct_mutex); if (preq->rq_conn == scheduler_sock) ++scheduler_jobct; /* see scheduler_close() */ pthread_mutex_unlock(scheduler_sock_jobct_mutex); sprintf(log_buf, msg_manager, msg_jobrun, preq->rq_user, preq->rq_host); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); /* If async run, reply now; otherwise reply is handled in */ /* post_sendmom or post_stagein */ unlock_ji_mutex(pjob, __func__, "2", LOGLEVEL); if (preq->rq_type == PBS_BATCH_AsyrunJob) { reply_ack(preq); preq->rq_noreply = TRUE; enqueue_threadpool_request(check_and_run_job, preq); } else { rc_ptr = (int *)check_and_run_job(preq); rc = *rc_ptr; free(rc_ptr); } return(rc); } /* END req_runjob() */ /* * is_checkpoint_restart - Is this the restart of a checkpoint job */ static int is_checkpoint_restart( job *pjob) /* I */ { #if 0 if ((pjob->ji_wattr[JOB_ATR_checkpoint_name].at_flags & ATR_VFLAG_SET) == 0) { return(FALSE); } #else if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHECKPOINT_FILE) == 0) { return(FALSE); } #endif return(TRUE); } /* END is_checkpoint_restart() */ /* * post_checkpointsend - process reply from MOM to checkpoint copy request */ void post_checkpointsend( batch_request *preq) { int code; job *pjob; bool preq_free_done = FALSE; pbs_attribute *pwait; char log_buf[LOCAL_LOG_BUF_SIZE]; time_t time_now = time(NULL); if (preq == NULL) return; code = preq->rq_reply.brp_code; pjob = svr_find_job((char *)preq->rq_extra, FALSE); free(preq->rq_extra); if (pjob != NULL) { mutex_mgr job_mutex(pjob->ji_mutex, true); if (code != 0) { /* copy failed - hold job */ free_nodes(pjob); pwait = &pjob->ji_wattr[JOB_ATR_exectime]; if ((pwait->at_flags & ATR_VFLAG_SET) == 0) { pwait->at_val.at_long = time_now + PBS_STAGEFAIL_WAIT; pwait->at_flags |= ATR_VFLAG_SET; job_set_wait(pwait, pjob, 0); } svr_setjobstate(pjob, JOB_STATE_WAITING, JOB_SUBSTATE_STAGEFAIL, FALSE); if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) { sprintf(log_buf, "Failed to copy checkpoint file to mom - %s", preq->rq_reply.brp_un.brp_txt.brp_str); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); /* NYI */ svr_mailowner( pjob, MAIL_CHKPTCOPY, MAIL_FORCE, preq->rq_reply.brp_un.brp_txt.brp_str); } } else { /* checkpoint copy was successful */ pjob->ji_qs.ji_svrflags |= JOB_SVFLG_CHECKPOINT_COPIED; /* set restart_name pbs_attribute to the checkpoint_name we just copied */ job_attr_def[JOB_ATR_restart_name].at_set( &pjob->ji_wattr[JOB_ATR_restart_name], &pjob->ji_wattr[JOB_ATR_checkpoint_name], SET); pjob->ji_modified = 1; job_save(pjob, SAVEJOB_FULL, 0); /* continue to start job running */ svr_strtjob2(&pjob, preq); preq_free_done = TRUE; } if (pjob == NULL) job_mutex.set_lock_on_exit(false); } /* END if (pjob != NULL) */ if (!preq_free_done) free_br(preq); /* close connection and release request */ return; } /* END post_checkpointsend() */ /* * svr_send_checkpoint - direct MOM to copy in the checkpoint files for a job */ int svr_send_checkpoint( job **pjob_ptr, /* I */ struct batch_request **preq, /* I */ int state, /* I */ int substate) /* I */ { struct batch_request *momreq = 0; int rc; char *tmp_jobid = NULL; char jobid[PBS_MAXSVRJOBID + 1]; job *pjob = *pjob_ptr; momreq = cpy_checkpoint(momreq, pjob, JOB_ATR_checkpoint_name, CKPT_DIR_IN); if (momreq == NULL) { /* no files to send, go directly to sending job to mom */ rc = svr_strtjob2(&pjob, *preq); *preq = NULL; return(rc); } /* save job id for post_checkpointsend */ if ((tmp_jobid = strdup(pjob->ji_qs.ji_jobid)) == NULL) { free_br(momreq); return(PBSE_SYSTEM); } momreq->rq_extra = tmp_jobid; /* The momreq is freed in relay_to_mom (failure) * or in issue_Drequest (success) */ if ((rc = relay_to_mom(&pjob, momreq, NULL)) == PBSE_NONE) { jobid[0] = '\0'; if (pjob != NULL) { svr_setjobstate(pjob, state, substate, FALSE); strcpy(jobid, pjob->ji_qs.ji_jobid); unlock_ji_mutex(pjob, __func__, "1", LOGLEVEL); } post_checkpointsend(momreq); if (jobid[0] != '\0') *pjob_ptr = svr_find_job(jobid, FALSE); /* * checkpoint copy started ok - reply to client as copy may * take too long to wait. */ if (*preq != NULL) { reply_ack(*preq); *preq = NULL; } } else { free_br(momreq); free(tmp_jobid); } return(rc); } /* END svr_send_checkpoint() */ /* * req_stagein - service the Stage In Files for a Job Request * * This request causes MOM to start staging in files. * Client must be privileged. */ int req_stagein( struct batch_request *preq) /* I */ { job *pjob; int rc = PBSE_NONE; int setneednodes; if (getenv("TORQUEAUTONN")) setneednodes = 1; else setneednodes = 0; if ((pjob = chk_job_torun(preq, setneednodes)) == NULL) { return(PBSE_UNKJOBID); } if ((pjob->ji_wattr[JOB_ATR_stagein].at_flags & ATR_VFLAG_SET) == 0) { log_err(-1, "req_stagein", "stage-in information not set"); req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); unlock_ji_mutex(pjob, __func__, "1", LOGLEVEL); return(PBSE_IVALREQ); } if ((rc = svr_stagein( &pjob, &preq, JOB_STATE_QUEUED, JOB_SUBSTATE_STAGEIN))) { free_nodes(pjob); req_reject(rc, 0, preq, NULL, NULL); } unlock_ji_mutex(pjob, __func__, "2", LOGLEVEL); return(rc); } /* END req_stagein() */ /* * post_stagein - process reply from MOM to stage-in request */ void post_stagein( batch_request *preq) { int code; int newstate; int newsub; job *pjob; pbs_attribute *pwait; time_t time_now = time(NULL); /* preq handled previously */ if (preq == NULL) return; code = preq->rq_reply.brp_code; pjob = svr_find_job((char *)preq->rq_extra, FALSE); free(preq->rq_extra); if (pjob != NULL) { mutex_mgr job_mutex(pjob->ji_mutex, true); if (code != 0) { /* stage in failed - hold job */ free_nodes(pjob); pwait = &pjob->ji_wattr[JOB_ATR_exectime]; if ((pwait->at_flags & ATR_VFLAG_SET) == 0) { pwait->at_val.at_long = time_now + PBS_STAGEFAIL_WAIT; pwait->at_flags |= ATR_VFLAG_SET; job_set_wait(pwait, pjob, 0); } svr_setjobstate(pjob, JOB_STATE_WAITING, JOB_SUBSTATE_STAGEFAIL, FALSE); if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) { /* set job comment */ /* NYI */ svr_mailowner( pjob, MAIL_STAGEIN, MAIL_FORCE, preq->rq_reply.brp_un.brp_txt.brp_str); } } else { /* stage in was successful */ pjob->ji_qs.ji_svrflags |= JOB_SVFLG_StagedIn; if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_STAGEGO) { if (is_checkpoint_restart(pjob)) { /* need to copy checkpoint file to mom before running */ svr_send_checkpoint( &pjob, &preq, JOB_STATE_RUNNING, JOB_SUBSTATE_CHKPTGO); } else { /* continue to start job running */ svr_strtjob2(&pjob, preq); /* svr_strjob2 would call finish_sendmom which would free preq in its reply_send_svr, set preq now to NULL to avoid double free */ preq = NULL; } } else { svr_evaljobstate(*pjob, newstate, newsub, 0); svr_setjobstate(pjob, newstate, newsub, FALSE); } } if (pjob == NULL) job_mutex.set_lock_on_exit(false); } /* END if (pjob != NULL) */ if (preq) free_br(preq); /* close connection and release request */ return; } /* END post_stagein() */ /* * svr_stagein - direct MOM to stage in the requested files for a job */ int svr_stagein( job **pjob_ptr, /* I */ struct batch_request **preq, /* I */ int state, /* I */ int substate) /* I */ { job *pjob = *pjob_ptr; struct batch_request *momreq = 0; int rc; char *tmp_jobid = NULL; char jobid[PBS_MAXSVRJOBID + 1]; momreq = cpy_stage(momreq, pjob, JOB_ATR_stagein, STAGE_DIR_IN); if (momreq == NULL) { /* no files to stage, go directly to sending job to mom */ rc = svr_strtjob2(&pjob, *preq); *preq = NULL; return(rc); } /* have files to stage in */ /* save job id for post_stagein */ if ((tmp_jobid = strdup(pjob->ji_qs.ji_jobid)) == NULL) { free_br(momreq); return(PBSE_SYSTEM); } momreq->rq_extra = tmp_jobid; /* The momreq is freed in relay_to_mom (failure) * or in issue_Drequest (success) */ if ((rc = relay_to_mom(&pjob, momreq, NULL)) == PBSE_NONE) { jobid[0] = '\0'; if (pjob != NULL) { strcpy(jobid, pjob->ji_qs.ji_jobid); svr_setjobstate(pjob, state, substate, FALSE); unlock_ji_mutex(pjob, __func__, "1", LOGLEVEL); } post_stagein(momreq); if (jobid[0] != '\0') *pjob_ptr = svr_find_job(jobid, FALSE); /* * stage-in started ok - reply to client as copy may * take too long to wait. */ if (*preq != NULL) reply_ack(*preq); } else { free_br(momreq); free(tmp_jobid); } return(rc); } /* END svr_stagein() */ #ifdef BOEING /* * contacts each mom and verifies that it is up by opening a tcp connection * to it. * * NOTE: this is only done for boeing. */ int verify_moms_up( job *pjob) { int sock; int nodenum; struct addrinfo *addr_info; char *nodestr = NULL; char *cp; char *hostlist; char *hostlist_ptr; struct sockaddr_in *sai; struct sockaddr_in saddr; badplace *bp; /* NOTE: Copy the nodes into a temp string because threadsafe_tokenizer() is destructive. */ hostlist = strdup(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str); hostlist_ptr = hostlist; if (hostlist == NULL) { sprintf(log_buf, "could not allocate temporary buffer (calloc failed) -- skipping TCP connect check"); log_err(errno, __func__, log_buf); } else { /* Get the first host. */ nodestr = threadsafe_tokenizer(&hostlist_ptr, "+"); } while (nodestr != NULL) { /* truncate from trailing slash on (if one exists). */ if ((cp = strchr(nodestr, '/')) != NULL) { cp[0] = '\0'; } /* Lookup IP address of host. */ if ((sai = get_cached_addrinfo(nodestr)) == NULL) { sprintf(log_buf, "could not contact %s (no address info, errno: %d (%s))", nodestr, errno, pbs_strerror(errno)); if (FailHost != NULL) snprintf(FailHost, 1024, "%s", nodestr); if (EMsg != NULL) snprintf(EMsg, 1024, "%s", log_buf); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); /* Add this host to the reject destination list for the job */ if ((bp = (badplace *)calloc(1, sizeof(badplace))) == NULL) { log_err(ENOMEM, __func__, msg_err_malloc); return(PBSE_SYSTEM); } CLEAR_LINK(bp->bp_link); strcpy(bp->bp_dest, nodestr); append_link(&pjob->ji_rejectdest, &bp->bp_link, bp); /* FAILURE - cannot lookup master compute host */ return(PBSE_RESCUNAV); } /* open a socket. */ /* NOTE: should change to PF_* */ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { sprintf(log_buf, "could not contact %s (cannot create socket, errno: %d (%s))", nodestr, errno, pbs_strerror(errno)); if (FailHost != NULL) snprintf(FailHost, 1024, "%s", nodestr); if (EMsg != NULL) snprintf(EMsg, 1024, "%s", log_buf); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); /* Add this host to the reject destination list for the job */ if ((bp = (badplace *)calloc(1, sizeof(badplace))) == NULL) { /* FAILURE - cannot allocate memory */ log_err(errno, __func__, msg_err_malloc); return(PBSE_RESCUNAV); } CLEAR_LINK(bp->bp_link); strcpy(bp->bp_dest, nodestr); append_link(&pjob->ji_rejectdest, &bp->bp_link, bp); /* FAILURE - cannot create socket for master compute host */ return(PBSE_RESCUNAV); } /* Set the host information. */ memcpy(&saddr, sai, sizeof(saddr)); saddr.sin_family = AF_INET; saddr.sin_port = htons(pjob->ji_qs.ji_un.ji_exect.ji_mom_rmport); /* Connect to the host. */ if (connect(sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { close(sock); sprintf(log_buf, "could not contact %s (connect failed, errno: %d (%s))", nodestr, errno, pbs_strerror(errno)); if (FailHost != NULL) snprintf(FailHost, 1024, "%s", nodestr); if (EMsg != NULL) snprintf(EMsg, 1024, "%s", log_buf); log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); /* Add this host to the reject list for the job */ if ((bp = (badplace *)calloc(1, sizeof(badplace))) == NULL) { /* FAILURE - cannot allocate memory */ log_err(errno, __func__, msg_err_malloc); return(PBSE_RESCUNAV); } CLEAR_LINK(bp->bp_link); strcpy(bp->bp_dest, nodestr); append_link(&pjob->ji_rejectdest, &bp->bp_link, bp); /* FAILURE - cannot connect to master compute host */ return(PBSE_RESCUNAV); } /* clean up and get next host. */ close(sock); nodestr = threadsafe_tokenizer(&hostlist_ptr, "+"); } /* END while (nodestr != NULL) */ /* SUCCESS */ if (hostlist != NULL) free(hostlist); return(PBSE_NONE); } /* END verify_moms_up() */ #endif /* * svr_startjob - place a job into running state by shipping it to MOM * called by req_runjob() */ int svr_startjob( job *pjob, /* I job to run (modified) */ struct batch_request **preq, /* I Run Job batch request (optional) */ char *FailHost, /* O (optional,minsize=1024) */ char *EMsg) /* O (optional,minsize=1024) */ { int f; int rc; long cray_enabled = FALSE; if (FailHost != NULL) FailHost[0] = '\0'; if (EMsg != NULL) EMsg[0] = '\0'; /* if not already setup, transfer the control/script file basename */ /* into an pbs_attribute accessible by MOM */ if (!(pjob->ji_wattr[JOB_ATR_hashname].at_flags & ATR_VFLAG_SET)) { if (job_attr_def[JOB_ATR_hashname].at_decode( &pjob->ji_wattr[JOB_ATR_hashname], NULL, NULL, pjob->ji_qs.ji_fileprefix, 0)) { return(PBSE_SYSTEM); } } /* if exec_host already set and either (hot start or checkpoint) */ /* then use the host(s) listed in exec_host */ /* NOTE: qrun hostlist assigned in req_runjob() */ rc = 0; f = pjob->ji_wattr[JOB_ATR_exec_host].at_flags & ATR_VFLAG_SET; if ((f != 0) && ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HOTSTART) || (pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHECKPOINT_FILE)) && ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HasNodes) == 0)) { rc = assign_hosts( /* inside svr_startjob() */ pjob, pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str, 0, FailHost, EMsg); } else if (f == 0) { /* exec_host not already set, get hosts and set it */ rc = assign_hosts( pjob, NULL, 1, FailHost, EMsg); /* inside svr_startjob() */ } if (rc != 0) { /* FAILURE */ return(rc); } get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); /* copy the server nppcu value to the job */ if ((cray_enabled == TRUE) && (!(pjob->ji_wattr[JOB_ATR_nppcu].at_flags & ATR_VFLAG_SET))) { long svr_nppcu_value = 0; char buf[128]; /* get server nppcu here */ get_svr_attr_l(SRV_ATR_nppcu, &svr_nppcu_value); sprintf(buf, "%ld", svr_nppcu_value); if (job_attr_def[JOB_ATR_nppcu].at_decode( &pjob->ji_wattr[JOB_ATR_nppcu], NULL, NULL, buf, 0)) { return(PBSE_SYSTEM); } } #ifdef BOEING if ((rc = verify_moms_up(pjob)) != PBSE_NONE) return(rc); #endif /* END BOEING */ /* Next, are there files to be staged-in? */ if ((pjob->ji_wattr[JOB_ATR_stagein].at_flags & ATR_VFLAG_SET) && (pjob->ji_qs.ji_substate != JOB_SUBSTATE_STAGECMP)) { /* yes, we do that first; then start the job */ rc = svr_stagein( &pjob, preq, JOB_STATE_RUNNING, JOB_SUBSTATE_STAGEGO); /* note, the positive acknowledgment is done by svr_stagein */ } else if (is_checkpoint_restart(pjob)) { /* Checkpoint file copy needed, start copy */ rc = svr_send_checkpoint( &pjob, preq, JOB_STATE_RUNNING, JOB_SUBSTATE_CHKPTGO); } else { /* No stage-in or already done, start job executing */ rc = svr_strtjob2(&pjob, *preq); *preq = NULL; } return(rc); } /* END svr_startjob() */ int send_job_to_mom( job **pjob_ptr, /* M */ batch_request *preq, /* I */ job *parent_job) /* I - for heterogeneous jobs only */ { job *pjob = *pjob_ptr; int old_state; int old_subst; long job_timeout = 0; long tcp_timeout = 0; unsigned long job_momaddr = -1; char job_id[PBS_MAXSVRJOBID+1]; int my_err = 0; int external = FALSE; char tmpLine[MAXLINE]; char *mail_text = NULL; if (parent_job != NULL) external = pjob == parent_job->ji_external_clone; if (preq == NULL) { return(PBSE_BAD_PARAMETER); } old_state = pjob->ji_qs.ji_state; old_subst = pjob->ji_qs.ji_substate; svr_setjobstate(pjob, JOB_STATE_RUNNING, JOB_SUBSTATE_PRERUN, FALSE); /* if job start timeout pbs_attribute is set use its value */ get_svr_attr_l(SRV_ATR_tcp_timeout, &tcp_timeout); if ((get_svr_attr_l(SRV_ATR_JobStartTimeout, &job_timeout) == PBSE_NONE) && (job_timeout > 0)) { DIS_tcp_settimeout(job_timeout); } job_momaddr = pjob->ji_qs.ji_un.ji_exect.ji_momaddr; strcpy(job_id, pjob->ji_qs.ji_jobid); unlock_ji_mutex(pjob, __func__, "1", LOGLEVEL); *pjob_ptr = NULL; pjob = NULL; if ((preq != NULL) && (preq->rq_reply.brp_un.brp_txt.brp_str != NULL)) mail_text = strdup(preq->rq_reply.brp_un.brp_txt.brp_str); if (send_job_work(job_id, NULL, MOVE_TYPE_Exec, &my_err, preq) == PBSE_NONE) { /* SUCCESS */ DIS_tcp_settimeout(tcp_timeout); if (parent_job == NULL) { if ((pjob = svr_find_job(job_id, TRUE)) != NULL) { *pjob_ptr = pjob; } } if (pjob != NULL) { svr_mailowner( pjob, MAIL_BEGIN, MAIL_NORMAL, mail_text); } if (mail_text != NULL) free(mail_text); return(PBSE_NONE); } else { DIS_tcp_settimeout(tcp_timeout); if (parent_job == NULL) pjob = svr_find_job(job_id, TRUE); else { if (external == TRUE) pjob = parent_job->ji_external_clone; else pjob = parent_job->ji_cray_clone; } if (pjob != NULL) { sprintf(tmpLine, "unable to run job, send to MOM '%lu' failed", job_momaddr); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, tmpLine); *pjob_ptr = pjob; pjob->ji_qs.ji_destin[0] = '\0'; svr_setjobstate(pjob, old_state, old_subst, FALSE); } if (mail_text != NULL) free(mail_text); return(my_err); } } /* END send_job_to_mom() */ /* * kills the job on the mom and requeues the job */ int requeue_job( job *pjob) { struct pbsnode *pnode = find_nodebyname(pjob->ji_qs.ji_destin); int retry = 0; int rc = -1; if (pnode == NULL) return(-1); /* setting this makes it so that the obit will be ignored by pbs_server */ pjob->ji_qs.ji_un.ji_exect.ji_momaddr = 0; while ((rc != PBSE_NONE) && (retry < 3)) { rc = kill_job_on_mom(pjob->ji_qs.ji_jobid, pnode); retry++; } /* set the job's state to queued */ svr_setjobstate(pjob, JOB_STATE_QUEUED, JOB_SUBSTATE_QUEUED, FALSE); pjob->ji_qs.ji_destin[0] = '\0'; return(rc); } /* END requeue_job() */ int handle_heterogeneous_job_launch( job *pjob, batch_request *preq) { job *external_clone = pjob->ji_external_clone; job *cray_clone = pjob->ji_cray_clone; int both_running = FALSE; int rc = PBSE_NONE; batch_request *external_preq; batch_request *cray_preq; unlock_ji_mutex(pjob, __func__, NULL, LOGLEVEL); lock_ji_mutex(external_clone, __func__, NULL, LOGLEVEL); lock_ji_mutex(cray_clone, __func__, NULL, LOGLEVEL); /* clone the batch requests to avoid double frees */ external_preq = duplicate_request(preq); cray_preq = duplicate_request(preq); /* client doesn't need a response from these */ external_preq->rq_noreply = TRUE; cray_preq->rq_noreply = TRUE; if ((rc = send_job_to_mom(&external_clone, external_preq, pjob)) == PBSE_NONE) { if ((rc = send_job_to_mom(&cray_clone, cray_preq, pjob)) != PBSE_NONE) { /* requeue the external job */ requeue_job(external_clone); } else both_running = TRUE; } else free_br(cray_preq); if (cray_clone != NULL) unlock_ji_mutex(cray_clone, __func__, NULL, LOGLEVEL); if (external_clone != NULL) unlock_ji_mutex(external_clone, __func__, NULL, LOGLEVEL); lock_ji_mutex(pjob, __func__, NULL, LOGLEVEL); if (both_running == TRUE) { svr_setjobstate(pjob, JOB_STATE_RUNNING, JOB_SUBSTATE_RUNNING, FALSE); reply_ack(preq); } else { req_reject(rc, 0, preq, NULL, NULL); } return(rc); } /* END handle_heterogenenous_job_launch() */ /* PATH req_runjob() > svr_startjob() svr_strtjob2() send_job() - svr_movejob.c svr_connect() PBSD_queuejob() */ int svr_strtjob2( job **pjob_ptr, /* I */ struct batch_request *preq) /* I (modified - report status) */ { job *pjob = *pjob_ptr; pbs_attribute *pattr; struct timeval start_time; struct timezone tz; int rc = PBSE_NONE; long cray_enabled = FALSE; pattr = &pjob->ji_wattr[JOB_ATR_start_time]; if ((pjob->ji_wattr[JOB_ATR_restart_name].at_flags & ATR_VFLAG_SET) == 0) { pattr->at_val.at_long = time(NULL); pattr->at_flags |= ATR_VFLAG_SET; } pattr = &pjob->ji_wattr[JOB_ATR_start_count]; pattr->at_val.at_long++; pattr->at_flags |= ATR_VFLAG_SET; /* This marks the start of total run time from the server's perspective */ pattr = &pjob->ji_wattr[JOB_ATR_total_runtime]; if (gettimeofday(&start_time, &tz) == 0) { pattr->at_val.at_timeval.tv_sec = start_time.tv_sec; pattr->at_val.at_timeval.tv_usec = start_time.tv_usec; } get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); /* check if this is a special heterogeneous job */ if ((cray_enabled == TRUE) && (pjob->ji_external_clone != NULL)) rc = handle_heterogeneous_job_launch(pjob, preq); else rc = send_job_to_mom(&pjob, preq, NULL); return(rc); } /* END svr_strtjob2() */ void finish_sendmom( char *job_id, struct batch_request *preq, long start_time, char *node_name, int status, int mom_err) { pbs_net_t addr; int newstate; int newsub; char log_buf[LOCAL_LOG_BUF_SIZE+1]; time_t time_now = time(NULL); job *pjob; if ((pjob = svr_find_job(job_id, TRUE)) == NULL) { if (preq != NULL) req_reject(PBSE_JOBNOTFOUND, 0, preq, node_name, log_buf); return; } mutex_mgr job_mutex(pjob->ji_mutex, true); if (LOGLEVEL >= 6) { log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,job_id,"entering finish_sendmom"); } if (LOGLEVEL >= 1) { sprintf(log_buf, "child reported %s for job after %ld seconds (dest=%s), rc=%d", (status == 0) ? "success" : "failure", time_now - start_time, (node_name != NULL) ? node_name : "???", status); log_event(PBSEVENT_SYSTEM,PBS_EVENTCLASS_JOB,job_id,log_buf); } switch (status) { case LOCUTION_SUCCESS: /* send to MOM went ok */ pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_HOTSTART; if (preq != NULL) reply_ack(preq); /* record start time for accounting */ pjob->ji_qs.ji_stime = time_now; /* update resource usage attributes */ set_resc_assigned(pjob, INCR); if ((pjob->ji_qs.ji_substate == JOB_SUBSTATE_PRERUN) || (pjob->ji_qs.ji_substate == JOB_SUBSTATE_TRNOUTCM)) { /* may be EXITING if job finished first */ svr_setjobstate(pjob, JOB_STATE_RUNNING, JOB_SUBSTATE_RUNNING, FALSE); /* above saves job structure */ } if ((pjob->ji_wattr[JOB_ATR_restart_name].at_flags & ATR_VFLAG_SET) == 0) { pjob->ji_wattr[JOB_ATR_start_time].at_val.at_long = time(NULL); pjob->ji_wattr[JOB_ATR_start_time].at_flags |= ATR_VFLAG_SET; } /* accounting log for start or restart */ if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHECKPOINT_FILE) account_record(PBS_ACCT_RESTRT, pjob, "Restart from checkpoint"); else account_jobstr(pjob); /* if any dependencies, see if action required */ if (pjob->ji_wattr[JOB_ATR_depend].at_flags & ATR_VFLAG_SET) depend_on_exec(pjob); /* set up the poll task */ set_task(WORK_Timed, time_now + JobStatRate, poll_job_task, strdup(job_id), FALSE); break; case LOCUTION_REQUEUE: /* NOTE: connection to mom timed out. Mark node down */ addr = pjob->ji_qs.ji_un.ji_exect.ji_momaddr; stream_eof(-1, addr, pjob->ji_qs.ji_un.ji_exect.ji_momport, 0); /* send failed, requeue the job */ log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, job_id, "unable to run job, MOM rejected/timeout"); free_nodes(pjob); if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_ABORT) { if (preq != NULL) { if (mom_err != PBSE_NONE) req_reject(mom_err, 0, preq, node_name, "connection to mom timed out"); else req_reject(PBSE_MOMREJECT, 0, preq, node_name, "connection to mom timed out"); } svr_evaljobstate(*pjob, newstate, newsub, 1); svr_setjobstate(pjob, newstate, newsub, FALSE); } else { if (preq != NULL) { if (mom_err != PBSE_NONE) req_reject(mom_err, 0, preq, node_name, "job was aboted by mom"); else req_reject(PBSE_BADSTATE, 0, preq, node_name, "job was aborted by mom"); } } break; case LOCUTION_FAIL: /* commit failed */ default: { int JobOK = FALSE; /* send failed, requeue the job */ sprintf(log_buf, "unable to run job, MOM rejected/rc=%d", status); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,job_id,log_buf); free_nodes(pjob); if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_ABORT) { if (preq != NULL) { char tmpLine[MAXLINE]; if (preq->rq_reply.brp_code == PBSE_JOBEXIST) { /* job already running, start request failed but return success since * desired behavior (job is running) is accomplished */ JobOK = TRUE; } else { sprintf(tmpLine, "cannot send job to %s, state=%s", (node_name != NULL) ? node_name : "mom", PJobSubState[pjob->ji_qs.ji_substate]); if (mom_err != PBSE_NONE) req_reject(mom_err, 0, preq, node_name, tmpLine); else req_reject(PBSE_MOMREJECT, 0, preq, node_name, tmpLine); } } if (JobOK == TRUE) { /* do not re-establish accounting - completed first time job was started */ pjob->ji_momstat = 0; /* update mom-based job status */ job_mutex.unlock(); stat_mom_job(job_id); } else { svr_evaljobstate(*pjob, newstate, newsub, 1); svr_setjobstate(pjob, newstate, newsub, FALSE); } } else if (preq != NULL) { if (mom_err != PBSE_NONE) req_reject(mom_err, 0, preq, node_name, "send failed - abort"); else req_reject(PBSE_BADSTATE, 0, preq, node_name, "send failed - abort"); } break; } } /* END switch (status) */ } /* END finish_sendmom() */ /* * chk_job_torun - check state and past execution host of a job for which * files are about to be staged in or for a job that is about to be run. * Returns pointer to job if all is ok, else returns NULL. */ job *chk_job_torun( batch_request *preq, /* I */ int setnn) /* I */ { job *pjob; struct rq_runjob *prun; pbs_queue *pque; int rc; char EMsg[1024]; char FailHost[1024]; char *exec_host; char *ptr; prun = &preq->rq_ind.rq_run; if ((pjob = chk_job_request(prun->rq_jid, preq)) == 0) { /* FAILURE */ return(NULL); } mutex_mgr job_mutex(pjob->ji_mutex, true); if ((pjob->ji_qs.ji_state == JOB_STATE_TRANSIT) || (pjob->ji_qs.ji_state == JOB_STATE_EXITING) || (pjob->ji_qs.ji_substate == JOB_SUBSTATE_STAGEGO) || (pjob->ji_qs.ji_substate == JOB_SUBSTATE_PRERUN) || (pjob->ji_qs.ji_substate == JOB_SUBSTATE_RUNNING)) { /* FAILURE - job already started */ req_reject(PBSE_BADSTATE, 0, preq, NULL, "job already running"); return(NULL); } if (preq->rq_type == PBS_BATCH_StageIn) { if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_STAGEIN) { /* FAILURE */ req_reject(PBSE_BADSTATE, 0, preq, NULL, NULL); return(NULL); } } if ((preq->rq_perm & (ATR_DFLAG_MGWR | ATR_DFLAG_OPWR)) == 0) { /* FAILURE - run request not authorized */ req_reject(PBSE_PERM, 0, preq, NULL, NULL); return(NULL); } if ((pque = get_jobs_queue(&pjob)) != NULL) { mutex_mgr pque_mutex = mutex_mgr(pque->qu_mutex, true); if (pque->qu_qs.qu_type != QTYPE_Execution || pque->qu_attr[QA_ATR_Started].at_val.at_long == 0) { if (pque->qu_attr[QA_ATR_Started].at_val.at_long == 0) snprintf(EMsg, sizeof(EMsg), "attempt to start job in non-started queue"); else /* FAILURE - job must be in execution queue */ snprintf(EMsg, sizeof(EMsg), "attempt to start job in non-execution queue"); log_err(-1, __func__, EMsg); req_reject(PBSE_IVALREQ, 0, preq, NULL, EMsg); return(NULL); } } else if (pjob == NULL) { req_reject(PBSE_JOBNOTFOUND, 0, preq, NULL, "job vanished while trying to lock queue."); return(NULL); } /* where to execute the job */ #ifdef ENABLE_BLCR if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_StagedIn) #else if (pjob->ji_qs.ji_svrflags & (JOB_SVFLG_CHECKPOINT_FILE | JOB_SVFLG_StagedIn)) #endif { /* job has been checkpointed or files already staged in */ /* in this case, exec_host must be already set */ if (prun->rq_destin && *prun->rq_destin) /* If a destination has been specified */ { /* specified destination must match exec_host */ if ((exec_host = strdup(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str)) == NULL) { req_reject(PBSE_RMSYSTEM, 0, preq, NULL, "Cannot allocate memory"); return(NULL); } if ((ptr = strchr(exec_host, '/'))) * ptr = 0; /* For some reason, node name has "/0" on the end (i.e. "node0001/0"). */ if (strcmp(prun->rq_destin, exec_host) != 0) { /* FAILURE */ free(exec_host); if (pjob->ji_qs.ji_svrflags & (JOB_SVFLG_CHECKPOINT_FILE)) req_reject(PBSE_EXECTHERE, 0, preq, NULL, "allocated nodes must match checkpoint location"); else req_reject(PBSE_EXECTHERE, 0, preq, NULL, "allocated nodes must match input file stagein location"); return(NULL); } free(exec_host); } if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HasNodes) == 0) { /* re-reserve nodes and leave exec_host as is */ if ((rc = assign_hosts( /* inside chk_job_torun() */ pjob, pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str, 0, FailHost, EMsg)) != 0) /* O */ { req_reject(PBSE_EXECTHERE, 0, preq, FailHost, EMsg); return(NULL); } } } /* END if (pjob->ji_qs.ji_svrflags & (JOB_SVFLG_CHECKPOINT_FILE|JOB_SVFLG_StagedIn)) */ else { /* make sure exec gpus is clear */ if (((pjob->ji_wattr[JOB_ATR_exec_gpus].at_flags & ATR_VFLAG_SET) != 0) && (pjob->ji_wattr[JOB_ATR_exec_gpus].at_val.at_str != NULL)) { job_attr_def[JOB_ATR_exec_gpus].at_free( &pjob->ji_wattr[JOB_ATR_exec_gpus]); } /* job has not run before or need not run there again */ /* reserve nodes and set new exec_host */ if ((prun->rq_destin == NULL) || (prun->rq_destin[0] == '\0')) { /* it is possible for the scheduler to pass a hostlist using the rq_extend * field--we should use it as the given list as an alternative to rq_destin */ rc = assign_hosts(pjob, preq->rq_extend, 1, FailHost, EMsg); /* inside chk_job_torun() */ } else { rc = assign_hosts(pjob, prun->rq_destin, 1, FailHost, EMsg); /* inside chk_job_torun() */ } if (rc != 0) { /* FAILURE - cannot assign correct hosts */ req_reject(rc, 0, preq, FailHost, EMsg); return(NULL); } } if (setnn == 1) { #ifdef TDEV /* what should neednodes be set to? */ resource_def *DRes; /* resource definition */ resource *JRes; /* resource on job */ pbs_attribute *Attr; /* 'neednodes' pbs_attribute */ Attr = &pjob->ji_wattr[JOB_ATR_resource]; DRes = find_resc_def(svr_resc_def, "neednodes", svr_resc_size); JRes = find_resc_entry(Attr, DRes); if ((JRes == NULL) || ((JRes->rs_value.at_flags & ATR_VFLAG_SET) == 0)) { /* resource does not exist or value is not set */ if (JRes == NULL) { JRes = add_resource_entry(Attr, DRes); } if (JRes != NULL) { if (DRes->rs_defin->rs_set( &JRes->rs_value, &DRes->rs_value, SET) == 0) { JRes->rs_value.at_flags |= ATR_VFLAG_SET; } } } #endif /* TDEV */ } /* END if (setnn == 1) */ job_mutex.set_lock_on_exit(false); return(pjob); } /* END chk_job_torun() */ /* * set_job_exec_info - The first host in list is the host * of Mother Superior. Find the mom manager and service ports * from allnodes and then set the pjob mom ports accordingly */ int set_job_exec_info( job *pjob, char *list) { char ms[PBS_MAXHOSTNAME]; char *ptr; int i; struct pbsnode *pnode; struct in_addr hostaddr; if (list == NULL) { return(PBSE_UNKNODEATR); } memset(ms, 0, PBS_MAXHOSTNAME); ptr = list; /* get the first name in list. This is Mother Superior */ for (i = 0; ptr && (*ptr != '/') && (i < PBS_MAXHOSTNAME); i++) { ms[i] = *ptr; ptr++; } pnode = find_nodebyname(ms); if (pnode != NULL) { pjob->ji_qs.ji_un.ji_exect.ji_momport = pnode->nd_mom_port; pjob->ji_qs.ji_un.ji_exect.ji_mom_rmport = pnode->nd_mom_rm_port; memcpy(&hostaddr, &pnode->nd_sock_addr.sin_addr, sizeof(hostaddr)); pjob->ji_qs.ji_un.ji_exect.ji_momaddr = ntohl(hostaddr.s_addr); unlock_node(pnode, __func__, NULL, LOGLEVEL); return(PBSE_NONE); } return(PBSE_UNKNODEATR); } /* END set_job_exec_info() */ char *get_correct_spec_string( char *given, job *pjob) { char mode[20]; char *mode_string; char *request; char *correct_spec = NULL; char *outer_plus; char *plus; char *one_req; int num_gpu_reqs; char *gpu_req; int len; resource *pres = NULL; char log_buf[LOCAL_LOG_BUF_SIZE]; /* check to see if there is a gpus request. If so moab * sripted the mode request if it existed. We need to * put it back */ mode_string = strstr(given, ":gpus="); if (mode_string != NULL) { /* Build our host list from what is in the job attrs */ pres = find_resc_entry( &pjob->ji_wattr[(int)JOB_ATR_resource], find_resc_def(svr_resc_def, "neednodes", svr_resc_size)); if (pres != NULL) { /* determine # of gpu requests in spec, we found 1 in given up above */ num_gpu_reqs = 1; gpu_req = mode_string; while ((gpu_req = strstr(gpu_req + 1, ":gpus=")) != NULL) num_gpu_reqs++; /* assign gpu mode that was in "neednodes" */ request = pres->rs_value.at_val.at_str; if ((request != NULL) && (request[0] != 0)) { if (!(gpu_req = strstr(request, ":gpus="))) { correct_spec = strdup(given); return(correct_spec); } mode_string = gpu_req + strlen(":gpus="); while (isdigit(*mode_string)) mode_string++; if (*mode_string == ':') { if (LOGLEVEL >= 7) { sprintf(log_buf, "%s: job has %d gpu requests in node spec '%s'", __func__, num_gpu_reqs, given); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } if ((outer_plus = strchr(mode_string, '+')) != NULL) *outer_plus = '\0'; /* * The neednodes original value may have non gpu things in it, so we * can not rely on the requested gpu mode being the first item in the * the string after the gpus=x:. */ if (strstr(mode_string, "exclusive_thread")) { strcpy(mode, ":exclusive_thread"); } else if (strstr(mode_string, "exclusive_process")) { strcpy(mode, ":exclusive_process"); } else if (strstr(mode_string, "exclusive")) { strcpy(mode, ":exclusive"); } else if (strstr(mode_string, "default")) { strcpy(mode, ":default"); } else if (strstr(mode_string, "shared")) { strcpy(mode, ":shared"); } else { strcpy(mode, ""); } if (outer_plus != NULL) *outer_plus = '+'; /* now using the actual length of requested gpu mode */ len = strlen(given) + 1 + (num_gpu_reqs * strlen(mode)); if ((correct_spec = (char *)calloc(1, len)) != NULL) { one_req = given; while (one_req != NULL) { if ((plus = strchr(one_req, '+')) != NULL) { *plus = '\0'; } strcat(correct_spec, one_req); if (strstr(one_req, ":gpus") != NULL) strcat(correct_spec, mode); if (plus != NULL) { strcat(correct_spec, "+"); one_req = plus + 1; } else one_req = NULL; } } if ((LOGLEVEL >= 7) && (correct_spec != NULL) && (correct_spec[0] != '\0')) { sprintf(log_buf, "%s: job gets adjusted gpu node spec of '%s'", __func__, correct_spec); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buf); } } } } } if (correct_spec == NULL) correct_spec = strdup(given); return(correct_spec); } /* get_correct_spec_string() */ /* * assign_hosts - assign hosts (nodes) to job by the following rules: * 1. use nodes that are "given"; from exec_host when required by * checkpoint-restart or file stage-in, or from run command. * 2. use nodes that match user's resource request. * 3. use default (local system or a single node). */ int assign_hosts( job *pjob, /* I (modified) */ char *given, /* I (optional) list of requested hosts */ int set_exec_host, /* I (boolean) */ char *FailHost, /* O (optional,minsize=1024) */ char *EMsg) /* O (optional,minsize=1024) */ { unsigned int dummy; char *list = NULL; char *portlist = NULL; char *hosttoalloc = NULL; resource *pres; int rc = 0; int procs=0; extern char *mom_host; char log_buf[LOCAL_LOG_BUF_SIZE]; char *def_node = NULL; char *to_free = NULL; long cray_enabled = FALSE; if (EMsg != NULL) EMsg[0] = '\0'; if (FailHost != NULL) FailHost[0] = '\0'; #ifdef __TREQSCHED if ((given == NULL) || (given[0] == '\0')) { /* scheduler must specify node allocation for all jobs */ return(PBSE_UNKNODEATR); } #endif /* __TREQSCHED */ if ((given != NULL) && (given[0] != '\0')) { hosttoalloc = get_correct_spec_string(given, pjob); to_free = hosttoalloc; } else { /* Build our host list from what is in the job attrs */ pres = find_resc_entry( &pjob->ji_wattr[JOB_ATR_resource], find_resc_def(svr_resc_def, "neednodes", svr_resc_size)); if (pres != NULL) { /* assign what was in "neednodes" */ hosttoalloc = pres->rs_value.at_val.at_str; if ((hosttoalloc == NULL) || (hosttoalloc[0] == '\0')) { return(PBSE_UNKNODEATR); } } pres = find_resc_entry( &pjob->ji_wattr[JOB_ATR_resource], find_resc_def(svr_resc_def, "procs", svr_resc_size)); if (pres != NULL) { /* assign what was in "neednodes" */ procs = pres->rs_value.at_val.at_long; if ((hosttoalloc == NULL) || (hosttoalloc[0] == '\0')) { return(PBSE_UNKNODEATR); } } } get_svr_attr_str(SRV_ATR_DefNode, &def_node); if (hosttoalloc != NULL) { /* NO-OP */ } else if (svr_totnodes == 0) { /* assign "local" */ if (def_node != NULL) { hosttoalloc = def_node; } else { hosttoalloc = mom_host; } } else if (def_node != NULL) { /* alloc server default_node */ hosttoalloc = def_node; } else { /* fall back to 1 cluster node */ hosttoalloc = strdup(PBS_DEFAULT_NODE); to_free = hosttoalloc; } /* do we need to allocate the (cluster) node(s)? */ if (svr_totnodes != 0) { rc = set_nodes(pjob, (char *)hosttoalloc, procs, &list, &portlist, FailHost, EMsg); set_exec_host = 1; /* maybe new VPs, must set */ hosttoalloc = list; } if (rc == 0) { /* set_nodes succeeded */ char *tmp; if (set_exec_host != 0) { job_attr_def[JOB_ATR_exec_host].at_free( &pjob->ji_wattr[JOB_ATR_exec_host]); job_attr_def[JOB_ATR_exec_host].at_decode( &pjob->ji_wattr[JOB_ATR_exec_host], NULL, NULL, hosttoalloc, 0); /* O */ job_attr_def[JOB_ATR_exec_port].at_free( &pjob->ji_wattr[JOB_ATR_exec_port]); job_attr_def[JOB_ATR_exec_port].at_decode( &pjob->ji_wattr[JOB_ATR_exec_port], NULL, NULL, portlist, 0); /* O */ pjob->ji_modified = 1; } else { /* leave exec_host alone and reuse old IP address */ hosttoalloc = pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str; portlist = pjob->ji_wattr[JOB_ATR_exec_port].at_val.at_str; } get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); if ((cray_enabled == TRUE) && (pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str != NULL)) tmp = parse_servername(pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str, &dummy); else tmp = parse_servername((char *)hosttoalloc, &dummy); snprintf(pjob->ji_qs.ji_destin, sizeof(pjob->ji_qs.ji_destin), "%s", tmp); free(tmp); if ((cray_enabled == TRUE) && (pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str != NULL)) { split_job(pjob); rc = set_job_exec_info(pjob->ji_external_clone, pjob->ji_external_clone->ji_qs.ji_destin); rc = set_job_exec_info(pjob->ji_cray_clone, pjob->ji_qs.ji_destin); } else { rc = set_job_exec_info(pjob, pjob->ji_qs.ji_destin); } if (rc != 0) { free_nodes(pjob); if (list != NULL) free(list); sprintf(log_buf, "ALERT: job cannot allocate node '%s' (could not determine IP address for node)", pjob->ji_qs.ji_destin); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf); if (to_free != NULL) free(to_free); return(PBSE_BADHOST); } } /* END if (rc == 0) */ if (list != NULL) free(list); if (portlist != NULL) free(portlist); if (to_free != NULL) free(to_free); return(rc); } /* END assign_hosts() */ /* END req_runjob.c */