/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ /* * req_quejob.c * * Functions relating to the Queue Job Batch Request sequence, including * Queue Job, Job Script, Ready to Commit, and Commit. * * Included functions are: * req_quejob() * req_jobcredential() * req_jobscript() * req_rdycommit() * req_commit() */ #include /* the master config generated by configure */ #include #include #include #include #include #include #include #include "libpbs.h" #include "server_limits.h" #include "list_link.h" #include "attribute.h" #include "resource.h" #include "server.h" #include "credential.h" #include "batch_request.h" #include "pbs_job.h" #include "queue.h" #include "net_connect.h" #include "pbs_error.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Libifl/lib_ifl.h" #include "svrfunc.h" #include "mom_job_func.h" /* mom_job_purge */ #include "mom_func.h" #include "utils.h" #include "pbs_nodes.h" #include "utils.h" /* External Functions Called: */ extern int reply_jid(char *); extern int svr_authorize_jobreq(struct batch_request *, job *); int reply_send_mom(struct batch_request *request); extern void check_state(int); extern void mom_server_all_update_stat(); void send_update_soon(); #ifdef NVIDIA_GPUS extern int use_nvidia_gpu; #endif /* NVIDIA_GPUS */ extern int multi_mom; extern unsigned int pbs_rm_port; /* Global Data Items: */ extern int PBSNodeCheckProlog; extern int internal_state; extern const char *PJobSubState[]; /* sync w/enum job_file TJobFileType[]) */ const char *TJobFileType[] = { "jobscript", "stdin", "stdout", "stderr", "ckpt", NULL }; extern tlist_head svr_alljobs; extern tlist_head svr_newjobs; extern attribute_def job_attr_def[]; extern char *path_jobs; extern char *pbs_o_host; extern char *msg_script_open; extern char *msg_script_write; extern char *msg_init_abt; extern char *msg_jobnew; extern time_t time_now; extern int LOGLEVEL; extern int reject_job_submit; extern char *msg_daemonname; extern int decode_arst_merge(struct pbs_attribute *,const char *,const char *,const char *); /* Private Functions in this file */ static job *locate_new_job(int, char *); #ifdef PNOT static int user_account_verify(char *, char *); static char *user_account_default(char *); static int user_account_read_user(char *); #endif /* PNOT */ /* * req_quejob - Queue Job Batch Request processing routine */ void req_quejob( struct batch_request *preq) /* ptr to the decoded request */ { char basename[PBS_JOBBASE + 1]; int created_here = 0; int index; char *jid; attribute_def *pdef; job *pj; svrattrl *psatl; int rc; int sock = preq->rq_conn; int IsCheckpoint = 0; /* set basic (user) level access permission */ int resc_access_perm = ATR_DFLAG_USWR | ATR_DFLAG_Creat; if (PBSNodeCheckProlog) { check_state(1); if (internal_state & INUSE_DOWN) { req_reject(PBSE_BADMOMSTATE, 0, preq, NULL, NULL); return; } } if (reject_job_submit == TRUE) { req_reject(-1, 0, preq, NULL, "This mom is configured not to run jobs"); return; } if (preq->rq_fromsvr) { /* from another server - accept the extra attributes */ resc_access_perm |= ATR_DFLAG_MGWR | ATR_DFLAG_SvWR | ATR_DFLAG_MOM; jid = preq->rq_ind.rq_queuejob.rq_jid; } else { /* request must be from server */ log_err(errno, __func__, (char *)"request not from server"); req_reject(PBSE_IVALREQ, 0, preq, NULL, "request not received from server"); return; } /* does job already exist, check both old and new jobs */ if ((pj = mom_find_job(jid)) == NULL) { pj = (job *)GET_NEXT(svr_newjobs); while (pj != NULL) { if (!strcmp(pj->ji_qs.ji_jobid, jid)) break; pj = (job *)GET_NEXT(pj->ji_alljobs); } } /* * New job ... * * for MOM - rather than make up a hashname, we use the name sent * to us by the server as an pbs_attribute. */ psatl = (svrattrl *)GET_NEXT(preq->rq_ind.rq_queuejob.rq_attr); while (psatl != NULL) { if (!strcmp(psatl->al_name,ATTR_hashname)) { snprintf(basename, sizeof(basename), "%s", psatl->al_value); break; } psatl = (svrattrl *)GET_NEXT(psatl->al_link); } if (pj != NULL) { /* newly queued job already exists */ if (pj->ji_qs.ji_substate == JOB_SUBSTATE_RUNNING) { /* FAILURE - job exists and is running */ log_err(errno, __func__, (char *)"cannot queue new job, job exists and is running"); req_reject(PBSE_JOBEXIST, 0, preq, NULL, "job is running"); return; } /* if checkpointed, then keep old and skip rest of process */ if (pj->ji_qs.ji_svrflags & JOB_SVFLG_CHECKPOINT_FILE) { IsCheckpoint = 1; } /* END if (pj->ji_qs.ji_svrflags & JOB_SVFLG_CHECKPOINT_FILE) */ else { /* unlink job from svr_alljobs since it will be placed on newjobs */ delete_link(&pj->ji_alljobs); } } /* END if (pj != NULL) */ else { /* if not already here, allocate job struct */ if ((pj = job_alloc()) == NULL) { /* FAILURE */ req_reject(PBSE_MEM_MALLOC, 0, preq, NULL, "cannot allocate new job structure"); return; } } /* END else (pj != NULL) */ if (IsCheckpoint == 0) { strcpy(pj->ji_qs.ji_jobid,jid); strcpy(pj->ji_qs.ji_fileprefix,basename); pj->ji_modified = 1; pj->ji_qs.ji_svrflags = created_here; pj->ji_qs.ji_un_type = JOB_UNION_TYPE_NEW; /* changing the union type overwrites the euid for the job, and if * ji_grpcache is set this potentially allows jobs to run as root. Unsetting * ji_grpcache fixes this problem --dbeer */ if (pj->ji_grpcache != NULL) { free(pj->ji_grpcache); pj->ji_grpcache = NULL; } } /* decode attributes from request into job structure */ psatl = (svrattrl *)GET_NEXT(preq->rq_ind.rq_queuejob.rq_attr); while (psatl != NULL) { if (IsCheckpoint == 1) { if (strcmp(psatl->al_name,ATTR_checkpoint_name) && strcmp(psatl->al_name,ATTR_v)) { psatl = (svrattrl *)GET_NEXT(psatl->al_link); continue; } } /* identify the pbs_attribute by name */ index = find_attr(job_attr_def,psatl->al_name,JOB_ATR_LAST); if (index < 0) { /* FAILURE */ /* didn`t recognize the name */ mom_job_purge(pj); /* CRI - 12/20/2004 */ reply_badattr(PBSE_NOATTR, 1, psatl, preq); return; } pdef = &job_attr_def[index]; /* Is pbs_attribute not writeable by manager or by a server? */ if ((pdef->at_flags & resc_access_perm) == 0) { /* FAILURE */ mom_job_purge(pj); reply_badattr(PBSE_ATTRRO, 1, psatl, preq); return; } /* decode pbs_attribute */ if (!strcmp(psatl->al_name,ATTR_v)) { rc = decode_arst_merge( &pj->ji_wattr[index], psatl->al_name, psatl->al_resc, psatl->al_value); } else { rc = pdef->at_decode( &pj->ji_wattr[index], psatl->al_name, psatl->al_resc, psatl->al_value, resc_access_perm); } if (rc != 0) { /* FAILURE */ /* all errors are fatal for MOM */ mom_job_purge(pj); reply_badattr(rc, 1, psatl, preq); return; } if (psatl->al_op == DFLT) { if (psatl->al_resc) { resource *presc; resource_def *prdef; prdef = find_resc_def(svr_resc_def,psatl->al_resc,svr_resc_size); if (prdef == NULL) { mom_job_purge(pj); reply_badattr(rc, 1, psatl, preq); return; } presc = find_resc_entry(&pj->ji_wattr[index],prdef); if (presc != NULL) presc->rs_value.at_flags |= ATR_VFLAG_DEFLT; } else { pj->ji_wattr[index].at_flags |= ATR_VFLAG_DEFLT; } } /* END if (psatl->al_op == DFLT) */ psatl = (svrattrl *)GET_NEXT(psatl->al_link); } /* END while (psatl != NULL) */ if (IsCheckpoint == 1) { pj->ji_qs.ji_substate = JOB_SUBSTATE_TRANSIN; if (reply_jobid(preq,pj->ji_qs.ji_jobid,BATCH_REPLY_CHOICE_Queue) == 0) { delete_link(&pj->ji_alljobs); append_link(&svr_newjobs,&pj->ji_alljobs,pj); if (pj->ji_grpcache != NULL) { free(pj->ji_grpcache); pj->ji_grpcache = NULL; } pj->ji_qs.ji_un_type = JOB_UNION_TYPE_NEW; pj->ji_qs.ji_un.ji_newt.ji_fromsock = sock; pj->ji_qs.ji_un.ji_newt.ji_fromaddr = get_connectaddr(sock,FALSE); pj->ji_qs.ji_un.ji_newt.ji_scriptsz = 0; /* Per Eric R., req_mvjobfile was giving error in open_std_file, showed up as fishy error message */ if (pj->ji_grpcache != NULL) { free(pj->ji_grpcache); pj->ji_grpcache = NULL; } } else { close_conn(sock, FALSE); } /* SUCCESS */ return; } /* set remaining job structure elements */ pj->ji_qs.ji_state = JOB_STATE_TRANSIT; pj->ji_qs.ji_substate = JOB_SUBSTATE_TRANSIN; pj->ji_wattr[JOB_ATR_mtime].at_val.at_long = (long)time_now; pj->ji_wattr[JOB_ATR_mtime].at_flags |= ATR_VFLAG_SET; if (pj->ji_grpcache != NULL) { free(pj->ji_grpcache); pj->ji_grpcache = NULL; } pj->ji_qs.ji_un_type = JOB_UNION_TYPE_NEW; pj->ji_qs.ji_un.ji_newt.ji_fromsock = sock; pj->ji_qs.ji_un.ji_newt.ji_fromaddr = get_connectaddr(sock,FALSE); pj->ji_qs.ji_un.ji_newt.ji_scriptsz = 0; /* acknowledge the request with the job id */ if (reply_jobid(preq, pj->ji_qs.ji_jobid, BATCH_REPLY_CHOICE_Queue) != 0) { /* reply failed, purge the job and close the connection */ close_conn(sock, FALSE); mom_job_purge(pj); return; } /* link job into server's new jobs list request */ append_link(&svr_newjobs, &pj->ji_alljobs, pj); return; } /* END req_quejob() */ /* * req_jobcredential - receive a set of credentials to be used by the job * * THIS IS JUST A PLACE HOLDER FOR NOW * It does nothing but acknowledge the request */ void req_jobcredential( struct batch_request *preq) /* ptr to the decoded request */ { job *pj; pj = locate_new_job(preq->rq_conn, NULL); if (pj == NULL) { req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); return; } reply_ack(preq); return; } /* END req_jobcredential() */ /* * req_jobscript - receive job script section * * Each section is appended to the file */ void req_jobscript( struct batch_request *preq) /* ptr to the decoded request*/ { int fds; char namebuf[MAXPATHLEN]; job *pj; int filemode = 0700; extern char mom_host[]; errno = 0; pj = locate_new_job(preq->rq_conn, preq->rq_ind.rq_jobfile.rq_jobid); if (pj == NULL) { log_err(errno, __func__, (char *)"cannot locate new job"); req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); return; } /* what is the difference between JOB_SUBSTATE_TRANSIN and TRANSICM? */ if (pj->ji_qs.ji_substate != JOB_SUBSTATE_TRANSIN) { if (errno == 0) { sprintf(log_buffer, "job %s in unexpected state '%s'", pj->ji_qs.ji_jobid, PJobSubState[pj->ji_qs.ji_substate]); } else { sprintf(log_buffer, "job %s in unexpected state '%s' (errno=%d - %s)", pj->ji_qs.ji_jobid, PJobSubState[pj->ji_qs.ji_substate], errno, strerror(errno)); } log_err(errno, __func__, log_buffer); req_reject(PBSE_IVALREQ, 0, preq, mom_host, log_buffer); return; } /* mom - if job has been checkpointed, discard script,already have it */ if (pj->ji_qs.ji_svrflags & JOB_SVFLG_CHECKPOINT_FILE) { /* SUCCESS - do nothing, ignore script */ reply_ack(preq); return; } if (multi_mom) { snprintf(namebuf, sizeof(namebuf), "%s%s%d%s", path_jobs, pj->ji_qs.ji_fileprefix, pbs_rm_port, JOB_SCRIPT_SUFFIX); } else { snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_jobs, pj->ji_qs.ji_fileprefix, JOB_SCRIPT_SUFFIX); } if (pj->ji_qs.ji_un.ji_newt.ji_scriptsz == 0) { /* NOTE: fail is job script already exists */ fds = open(namebuf, O_WRONLY | O_CREAT | O_EXCL | O_Sync, filemode); } else { fds = open(namebuf, O_WRONLY | O_APPEND | O_Sync, filemode); } if (fds < 0) { char tmpLine[1024]; snprintf(tmpLine, sizeof(tmpLine), "cannot open '%s' errno=%d - %s", namebuf, errno, strerror(errno)); /* FAILURE */ /* NOTE: log_err may modify errno */ log_err(errno, __func__, msg_script_open); req_reject(PBSE_INTERNAL, 0, preq, mom_host, tmpLine); return; } if (write_ac_socket( fds, preq->rq_ind.rq_jobfile.rq_data, (unsigned)preq->rq_ind.rq_jobfile.rq_size) != preq->rq_ind.rq_jobfile.rq_size) { /* FAILURE */ log_err(errno, __func__, msg_script_write); req_reject(PBSE_INTERNAL, 0, preq, mom_host, "cannot write job command file"); close(fds); return; } close(fds); pj->ji_qs.ji_un.ji_newt.ji_scriptsz += preq->rq_ind.rq_jobfile.rq_size; /* job has a script file */ pj->ji_qs.ji_svrflags = (pj->ji_qs.ji_svrflags & ~JOB_SVFLG_CHECKPOINT_FILE) | JOB_SVFLG_SCRIPT; /* SUCCESS */ reply_ack(preq); return; } /* END req_jobscript() */ /* * req_mvjobfile - move the specifled job standard files * This is MOM's version. The files are owned by the user and placed * in either the spool area or the user's home directory depending * on the compile option, see std_file_name(). */ void req_mvjobfile( struct batch_request *preq) /* I */ { int fds; enum job_file jft; int oflag; job *pj; struct passwd *pwd; jft = (enum job_file)preq->rq_ind.rq_jobfile.rq_type; if (preq->rq_ind.rq_jobfile.rq_sequence == 0) oflag = O_CREAT | O_WRONLY | O_TRUNC; else oflag = O_CREAT | O_WRONLY | O_APPEND; pj = locate_new_job(preq->rq_conn, NULL); if (pj == NULL) pj = mom_find_job(preq->rq_ind.rq_jobfile.rq_jobid); if (pj == NULL) { snprintf(log_buffer, 1024, "cannot find job %s for move of %s file", preq->rq_ind.rq_jobfile.rq_jobid, TJobFileType[jft]); log_err(-1, __func__, log_buffer); req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); return; } if ((pj->ji_grpcache == NULL) && (check_pwd(pj) == NULL)) { req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); return; } if ((pwd = getpwnam_ext(pj->ji_wattr[JOB_ATR_euser].at_val.at_str)) == NULL) { /* FAILURE */ req_reject(PBSE_MOMREJECT, 0, preq, NULL, "password lookup failed"); return; } if ((fds = open_std_file(pj, jft, oflag, pwd->pw_gid)) < 0) { int keeping = 1; char *path = std_file_name(pj, jft, &keeping); snprintf(log_buffer,sizeof(log_buffer), "Cannot create file %s", path); req_reject(PBSE_SYSTEM, 0, preq, NULL, log_buffer); return; } if (write_ac_socket( fds, preq->rq_ind.rq_jobfile.rq_data, preq->rq_ind.rq_jobfile.rq_size) != preq->rq_ind.rq_jobfile.rq_size) { req_reject(PBSE_SYSTEM, 0, preq, NULL, "cannot create file"); } else { if (LOGLEVEL >= 6) { sprintf(log_buffer, "successfully moved %s file for job '%s'", TJobFileType[jft], preq->rq_ind.rq_jobfile.rq_jobid); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pj->ji_qs.ji_jobid, log_buffer); } reply_ack(preq); } close(fds); return; } /* END req_mvjobfile() */ /* * req_rdytocommit - Ready To Commit Batch Request * * Set substate to JOB_SUBSTATE_TRANSICM and * record job to permanent storage, i.e. written to the job save file * (called by both pbs_server and pbs_mom) */ void req_rdytocommit( struct batch_request *preq) /* I */ { job *pj; int sock = preq->rq_conn; int OrigState; int OrigSState; char OrigSChar; long OrigFlags; uint16_t momport = 0; pj = locate_new_job(sock, preq->rq_ind.rq_rdytocommit); if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "ready to commit job"); } if (pj == NULL) { /* FAILURE */ log_err(errno, "req_rdytocommit", (char *)"unknown job id"); req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); return; } if (pj->ji_qs.ji_substate != JOB_SUBSTATE_TRANSIN) { /* FAILURE */ log_err(errno, "req_rdytocommit", (char *)"cannot commit job in unexpected state"); req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); return; } OrigState = pj->ji_qs.ji_state; OrigSState = pj->ji_qs.ji_substate; OrigSChar = pj->ji_wattr[JOB_ATR_state].at_val.at_char; OrigFlags = pj->ji_wattr[JOB_ATR_state].at_flags; pj->ji_qs.ji_state = JOB_STATE_TRANSIT; pj->ji_qs.ji_substate = JOB_SUBSTATE_TRANSICM; pj->ji_wattr[JOB_ATR_state].at_val.at_char = 'T'; pj->ji_wattr[JOB_ATR_state].at_flags |= ATR_VFLAG_SET; if (multi_mom) { momport = pbs_rm_port; } if (job_save(pj, SAVEJOB_NEW, momport) == -1) { /* FAILURE */ char tmpLine[MAXLINE]; sprintf(tmpLine, "cannot save job - errno=%d - %s", errno, strerror(errno)); log_err(errno, "req_rdytocommit", tmpLine); /* commit failed, backoff state changes */ pj->ji_qs.ji_state = OrigState; pj->ji_qs.ji_substate = OrigSState; pj->ji_wattr[JOB_ATR_state].at_val.at_char = OrigSChar; pj->ji_wattr[JOB_ATR_state].at_flags = OrigFlags; req_reject(PBSE_SYSTEM, 0, preq, NULL, tmpLine); return; } /* acknowledge the request with the job id */ if (reply_jobid(preq, pj->ji_qs.ji_jobid, BATCH_REPLY_CHOICE_RdytoCom) != 0) { /* FAILURE */ /* reply failed, purge the job and close the connection */ sprintf(log_buffer, "cannot report jobid - errno=%d - %s", errno, strerror(errno)); log_err(errno, "req_rdytocommit", log_buffer); close_conn(sock, FALSE); mom_job_purge(pj); return; } if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pj->ji_qs.ji_jobid, "ready to commit job completed"); } return; } /* END req_rdytocommit() */ void reply_sid( struct batch_request *preq, /* M */ long sid, /* I */ int which) /* I */ { char sid_text[MAXLINE]; if (preq->rq_reply.brp_choice != BATCH_REPLY_CHOICE_NULL) { /* in case another reply was being built up, clean it out */ reply_free(&preq->rq_reply); } sprintf(sid_text,"%lu",sid); preq->rq_reply.brp_choice = which; preq->rq_reply.brp_un.brp_txt.brp_str = strdup(sid_text); preq->rq_reply.brp_un.brp_txt.brp_txtlen = strlen(sid_text); preq->rq_reply.brp_code = 0; preq->rq_reply.brp_auxcode = 0; reply_send_mom(preq); } /* END reply_text() */ /* * req_commit - commit ownership of job * * Set state of job to JOB_STATE_QUEUED (or Held or Waiting) and * enqueue the job into its destination queue. */ void req_commit( struct batch_request *preq) /* I */ { unsigned int momport = 0; int rc; job *pj = locate_new_job(preq->rq_conn, preq->rq_ind.rq_commit); if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "committing job"); } if (pj == NULL) { req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); return; } if (pj->ji_qs.ji_substate != JOB_SUBSTATE_TRANSICM) { log_err(errno, "req_commit", (char *)"cannot commit job in unexpected state"); req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); return; } /* move job from new job list to "all" job list, set to running state */ delete_link(&pj->ji_alljobs); append_link(&svr_alljobs, &pj->ji_alljobs, pj); /* ** Set JOB_SVFLG_HERE to indicate that this is Mother Superior. */ pj->ji_qs.ji_svrflags |= JOB_SVFLG_HERE; pj->ji_qs.ji_state = JOB_STATE_RUNNING; pj->ji_qs.ji_substate = JOB_SUBSTATE_PRERUN; pj->ji_qs.ji_un_type = JOB_UNION_TYPE_MOM; pj->ji_qs.ji_un.ji_momt.ji_svraddr = get_connectaddr(preq->rq_conn,FALSE); pj->ji_qs.ji_un.ji_momt.ji_exitstat = 0; /* For MOM - start up the job (blocks) */ if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "req_commit:starting job execution"); } rc = start_exec(pj); if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pj->ji_qs.ji_jobid, "req_commit:job execution started"); } /* if start request fails, reply with failure string */ if (pj->ji_qs.ji_substate == JOB_SUBSTATE_EXITING) { char tmpLine[1024]; if ((pj->ji_hosts != NULL) && (pj->ji_nodekill >= 0) && (pj->ji_hosts[pj->ji_nodekill].hn_host != NULL)) { sprintf(tmpLine, "start failed on node %s", pj->ji_hosts[pj->ji_nodekill].hn_host); } else { sprintf(tmpLine, "start failed on unknown node"); } if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", tmpLine); } reply_text(preq, rc, tmpLine); } else { reply_sid(preq, pj->ji_wattr[JOB_ATR_session_id].at_val.at_long,BATCH_REPLY_CHOICE_Text); } if (multi_mom) { momport = pbs_rm_port; } job_save(pj, SAVEJOB_FULL, momport); #ifdef NVIDIA_GPUS /* * Does this job have a gpuid assigned? * if so, then update gpu status */ if ((use_nvidia_gpu) && ((pj->ji_wattr[JOB_ATR_exec_gpus].at_flags & ATR_VFLAG_SET) != 0) && (pj->ji_wattr[JOB_ATR_exec_gpus].at_val.at_str != NULL)) { send_update_soon(); } #endif /* NVIDIA_GPUS */ /* NOTE: we used to flag JOB_ATR_errpath, JOB_ATR_outpath, * JOB_ATR_session_id, and JOB_ATR_altid as modified at this point to make sure * pbs_server got these attr values. This worked fine before TORQUE modified * job launched into an async process. At 2.0.0p6, a new pbs_attribute "SEND" flag * was added to handle this process. */ return; } /* END req_commit() */ /* * locate_new_job - locate a "new" job which has been set up req_quejob on * the servers new job list. * * This function is used by the sub-requests which make up the global * "Queue Job Request" to locate the job structure. * * If the jobid is specified (will be for rdytocommit and commit, but not * for script), we search for a matching jobid. * * The job must (also) match the socket specified and the host associated * with the socket unless ji_fromsock == -1, then its a recovery situation. */ /* FIXME: why bother checking for matching sock if a jobid is supplied? Seems * to me that a reconnect immediately invalidates fromsock. */ static job *locate_new_job( int sock, /* I */ char *jobid) /* I (optional) */ { job *pj; pj = (job *)GET_NEXT(svr_newjobs); while (pj != NULL) { if ((pj->ji_qs.ji_un.ji_newt.ji_fromsock == -1) || ((pj->ji_qs.ji_un.ji_newt.ji_fromsock == sock) && (pj->ji_qs.ji_un.ji_newt.ji_fromaddr == get_connectaddr(sock,FALSE)))) { if ((jobid != NULL) && (*jobid != '\0')) { if (!strncmp(pj->ji_qs.ji_jobid, jobid, PBS_MAXSVRJOBID)) { /* requested job located */ break; } } else if (pj->ji_qs.ji_un.ji_newt.ji_fromsock == -1) { /* empty job slot located */ break; } else { /* matching job slot located */ break; } } /* END if ((pj->ji_qs.ji_un.ji_newt.ji_fromsock == -1) || ...) */ pj = (job *)GET_NEXT(pj->ji_alljobs); } /* END while(pj != NULL) */ /* return job slot located (NULL on FAILURE) */ return(pj); } /* END locate_new_job() */ /* END req_quejob.c() */