/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ /* * req_quejob.c * * Functions relating to the Queue Job Batch Request sequence, including * Queue Job, Job Script, Ready to Commit, and Commit. * * Included functions are: * req_quejob() * req_jobcredential() * req_jobscript() * req_rdycommit() * req_commit() */ #include /* the master config generated by configure */ #include #include #include #include #include #include #include #include #include #include "libpbs.h" #include "server_limits.h" #include "list_link.h" #include "attribute.h" #include "resource.h" #include "server.h" #include "credential.h" #include "batch_request.h" #include "queue.h" #include "pbs_job.h" #include "net_connect.h" #include "pbs_error.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "../lib/Libifl/lib_ifl.h" #include "svrfunc.h" #include "csv.h" #include "array.h" #include "queue_func.h" /* get_dfltque, find_queuebyname */ #include "svr_func.h" /* get_svr_attr_* */ #include "array_func.h" /* setup_array_struct */ #include "threadpool.h" #include "job_func.h" /* svr_job_purge */ #include "pbs_nodes.h" #include "../lib/Libutils/u_lock_ctl.h" /* lock_node, unlock_node */ #include "ji_mutex.h" #include "user_info.h" #include "work_task.h" #include "req_runjob.h" #include "mutex_mgr.hpp" /* External Functions Called: */ extern int reply_jid(char *); extern int svr_authorize_jobreq(struct batch_request *, job *); extern int svr_chkque(job *, pbs_queue *, char *, int, char *); extern int job_route(job *); extern int node_avail_complex(char *, int *, int *, int *, int*); extern void set_chkpt_deflt(job *, pbs_queue *); void *job_clone_wt(void *); /* Global Data Items: */ extern struct all_jobs newjobs; extern char *path_spool; extern struct server server; extern char server_name[]; extern int queue_rank; extern const char *PJobSubState[]; /* sync w/enum job_file TJobFileType[]) */ const char *TJobFileType[] = { "jobscript", "stdin", "stdout", "stderr", "checkpoint", NULL }; extern attribute_def job_attr_def[]; extern char *path_jobs; extern char *pbs_o_host; extern char *msg_script_open; extern char *msg_script_write; extern char *msg_init_abt; extern char *msg_jobnew; extern int LOGLEVEL; extern char *msg_daemonname; /* Private Functions in this file */ static job *locate_new_job(int, char *); #ifdef PNOT static int user_account_verify(char *, char *); static char *user_account_default(char *); static int user_account_read_user(char *); #endif /* PNOT */ static const char *pbs_o_que = "PBS_O_QUEUE="; /****************************************************************** * set_nodes_attr - Check to see is the node resource was requested * on the qsub line. If not add the node attribute * to the Resource_List and set the value to 1. This * makes it so that if procct is set on a queue * jobs without a node resource request will still * be properly routed. * Returns: 0 if OK * Non-Zero on failure *****************************************************************/ int set_nodes_attr( job *pjob) { resource *pres; int nodect_set = 0; int rc = 0; const char *pname; if (pjob->ji_wattr[JOB_ATR_resource].at_flags & ATR_VFLAG_SET) { pres = (resource *)GET_NEXT(pjob->ji_wattr[JOB_ATR_resource].at_val.at_list); while (pres != NULL) { if (pres->rs_defin != NULL) { pname = pres->rs_defin->rs_name; if (pname == NULL || *pname == 0) { pres = (resource *)GET_NEXT(pres->rs_link); continue; } if ((strncmp(pname, "nodes", 5) == 0) || (strncmp(pname, "procs", 5) == 0)) { nodect_set = 1; break; } } pres = (resource *)GET_NEXT(pres->rs_link); } } if (nodect_set == 0) { int resc_access_perm = ATR_DFLAG_WRACC | ATR_DFLAG_MGWR | ATR_DFLAG_RMOMIG; /* neither procs nor nodes were requested. set procct to 1 */ rc = decode_resc(&pjob->ji_wattr[JOB_ATR_resource], "Resource_List", "procct", "1", resc_access_perm); } return(rc); } /* END set_nodes_attr() */ /* * sum_select_mem_request * * parses the select statement for the total memory requested by the job * if there is no select statement, this does nothing */ void sum_select_mem_request( job * pj) { char select[] = "select"; char mem_str[] = "mem="; char memval_str[MAXPATHLEN]; char *end; char *current; char *clause_end; int mem_str_len = strlen(mem_str); int multiplier = 1; char log_buf[LOCAL_LOG_BUF_SIZE]; unsigned long mem_total = 0; if ((!(pj->ji_wattr[JOB_ATR_submit_args].at_flags & ATR_VFLAG_SET)) || (pj->ji_wattr[JOB_ATR_submit_args].at_val.at_str == NULL)) return; current = strstr(pj->ji_wattr[JOB_ATR_submit_args].at_val.at_str,select); if (current != NULL) { /* comma delimits different options for -W */ end = strchr(current,','); /* make current the last character if no comma */ if (end == NULL) end = current + strlen(current); if (isdigit(*(current + 1 + strlen(select)))) multiplier = atoi(current + strlen(select) + 1); clause_end = strchr(current,'+'); current = strstr(current,mem_str); /* find each mem request */ while ((current != NULL) && (current < end - mem_str_len)) { unsigned long tmp; /* make sure we have the right number of "tasks" for this mem * request */ if (clause_end != NULL) { if (current > clause_end) { if (isdigit(*(clause_end+1))) multiplier = atoi(clause_end+1); else multiplier = 1; clause_end = strchr(clause_end+1,'+'); } } current += mem_str_len; tmp = atoi(current); /* advance past the digits to the units */ while ((current != NULL) && (isdigit(*current))) current++; if (current == NULL) { /* no units, assume kb */ mem_total += tmp; break; } /* if not kb, convert */ switch (*current) { case 'k': case 'K': /* do nothing, we're converting to kb */ break; case 'm': case 'M': tmp = tmp << 10; break; case 'g': case 'G': tmp = tmp << 20; break; case 't': case 'T': tmp = tmp << 30; break; default: snprintf(log_buf,sizeof(log_buf), "WARNING: Unknown unit %cb used in memory request\n", *current); log_err(-1, __func__, log_buf); break; } mem_total += tmp * multiplier; current = strstr(current,mem_str); } } /* set the memory requirement so we can enfore it where applicable */ if (mem_total != 0) { resource *mem_rc = find_resc_entry( &pj->ji_wattr[JOB_ATR_resource], find_resc_def(svr_resc_def,"mem",svr_resc_size)); if (mem_rc != NULL) { mem_rc->rs_value.at_val.at_size.atsv_num = mem_total; mem_rc->rs_value.at_val.at_size.atsv_shift = 10; } else { snprintf(memval_str,sizeof(memval_str), "%lukb", mem_total); decode_resc(&pj->ji_wattr[JOB_ATR_resource], ATTR_l, "mem", memval_str, ATR_DFLAG_ACCESS); } } } /* END sum_select_mem_request() */ /* * req_quejob - Queue Job Batch Request processing routine * NOTE: calls svr_chkque() to validate queue access * */ int req_quejob( struct batch_request *preq, char **pjob_id) { int created_here = 0; int attr_index; int sock = preq->rq_conn; int rc = PBSE_NONE; /* set basic (user) level access permission */ int resc_access_perm = ATR_DFLAG_USWR | ATR_DFLAG_Creat; int i; int fds; char *jid; char *pc; char *qname; char jidbuf[PBS_MAXSVRJOBID + 1]; char basename[PBS_JOBBASE + 1]; char namebuf[MAXPATHLEN + 1]; char buf[256]; char EMsg[MAXPATHLEN]; char log_buf[LOCAL_LOG_BUF_SIZE]; time_t time_now = time(NULL); job *pj; attribute_def *pdef; svrattrl *psatl; pbs_queue *pque; pbs_attribute tempattr; char *alias = NULL; int jobid_number; struct stat stat_buf; get_svr_attr_str(SRV_ATR_job_suffix_alias, &alias); /* * if the job id is supplied, the request had better be * from another server */ if (preq->rq_fromsvr) { /* from another server - accept the extra attributes */ resc_access_perm |= ATR_DFLAG_MGWR | ATR_DFLAG_SvWR; jid = preq->rq_ind.rq_queuejob.rq_jid; } else if (preq->rq_ind.rq_queuejob.rq_jid[0] != '\0') { /* FAILURE */ /* a job id is not allowed from a client */ rc = PBSE_CLIENT_INVALID; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "job id not allowed from client"); log_err(rc, __func__, log_buf); req_reject(PBSE_IVALREQ, 0, preq, NULL, log_buf); return rc; } else { /* Create a job id */ char host_server[PBS_MAXSERVERNAME + 1]; long server_suffix = TRUE; created_here = JOB_SVFLG_HERE; memset(host_server, 0, sizeof(host_server)); get_svr_attr_l(SRV_ATR_display_job_server_suffix, &server_suffix); lock_sv_qs_mutex(server.sv_qs_mutex, __func__); if ((alias != NULL) && (server_suffix == TRUE)) { char *svrnm; if (get_fullhostname(pbs_default(), host_server, PBS_MAXSERVERNAME, NULL) == 0) { svrnm = host_server; } else { svrnm = server_name; } snprintf(jidbuf,sizeof(jidbuf),"%d.%s.%s", server.sv_qs.sv_jobidnumber, svrnm, alias); } else if (alias != NULL) { snprintf(jidbuf,sizeof(jidbuf),"%d.%s", server.sv_qs.sv_jobidnumber, alias); } else if (server_suffix == TRUE) { char *svrnm; if (get_fullhostname(pbs_default(), host_server, PBS_MAXSERVERNAME, NULL) == 0) { svrnm = host_server; } else { svrnm = server_name; } snprintf(jidbuf, sizeof(jidbuf), "%d.%s", server.sv_qs.sv_jobidnumber, svrnm); } else { snprintf(jidbuf, sizeof(jidbuf), "%d", server.sv_qs.sv_jobidnumber); } jid = jidbuf; /* having updated sv_jobidnumber, must save server struct */ if (++server.sv_qs.sv_jobidnumber > PBS_SEQNUMTOP) server.sv_qs.sv_jobidnumber = 0; /* wrap it */ jobid_number = server.sv_qs.sv_jobidnumber; unlock_sv_qs_mutex(server.sv_qs_mutex, __func__); /* Make the current job number visible in qmgr print server commnad. */ pthread_mutex_lock(server.sv_attr_mutex); server.sv_attr[SRV_ATR_NextJobNumber].at_val.at_long = jobid_number; server.sv_attr[SRV_ATR_NextJobNumber].at_flags |= ATR_VFLAG_SET | ATR_VFLAG_MODIFY; pthread_mutex_unlock(server.sv_attr_mutex); /* Change - do not fail if the server can't be saved - the job file is saved */ svr_save(&server, SVR_SAVE_QUICK); } /* does job already exist, check both old and new jobs */ if ((pj = svr_find_job(jid, FALSE)) == NULL) { int iter = -1; while ((pj = next_job(&newjobs,&iter)) != NULL) { if (!strcmp(pj->ji_qs.ji_jobid, jid)) break; unlock_ji_mutex(pj, __func__, "1", LOGLEVEL); } } if (pj != NULL) { /* server will reject queue request if job already exists */ rc = PBSE_JOBEXIST; log_err(rc, __func__, "cannot queue new job, job already exists"); req_reject(rc, 0, preq, NULL, NULL); unlock_ji_mutex(pj, __func__, "2", LOGLEVEL); return rc; } /* find requested queue, is it there? */ qname = preq->rq_ind.rq_queuejob.rq_destin; if ((*qname == '\0') || (*qname == '@')) { /* use default queue */ pque = get_dfltque(); rc = PBSE_QUENODFLT; } else { /* else find the named queue */ pque = find_queuebyname(qname); rc = PBSE_UNKQUE; } if (pque == NULL) { /* FAILURE */ snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "requested queue not found"); log_err(-1, __func__, log_buf); req_reject(rc, 0, preq, NULL, log_buf); /* not there */ return rc; } mutex_mgr que_mgr(pque->qu_mutex, true); /* unlock the queue. We validated that it was there, now let someone else use it until we need it */ sprintf(log_buf, "Just validated queue"); que_mgr.unlock(); /* * make up job file name, it is based on the jobid, however the * minimun acceptable file name limit is only 14 character in POSIX, * so we may have to "hash" the name slightly */ snprintf(basename, sizeof(basename), "%s", jid); do { snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_jobs, basename, JOB_FILE_SUFFIX); fds = open(namebuf, O_CREAT | O_EXCL | O_WRONLY, 0600); if (fds < 0) { if (errno == EEXIST) { pc = basename + strlen(basename) - 1; while (!isprint((int)*pc)) { pc--; if (pc <= basename) { /* FAILURE */ rc = PBSE_JOB_FILE_CORRUPT; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "job file %s corrupt (%d - %s)", namebuf, errno, strerror(errno)); log_err(rc, __func__, log_buf); req_reject(rc, 0, preq, NULL, log_buf); return(rc); } } (*pc)++; } else { /* FAILURE */ rc = PBSE_CAN_NOT_SAVE_FILE; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot create job file %s (%d - %s)", namebuf, errno, strerror(errno)); log_err(rc, __func__, log_buf); req_reject(PBSE_SYSTEM, 0, preq, NULL, log_buf); return(rc); } } } while (fds < 0); close(fds); /* create the job structure */ if ((pj = job_alloc()) == NULL) { /* FAILURE */ rc = PBSE_MEM_MALLOC; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot alloc memory for new job %s - (%d - %s)", namebuf, errno, strerror(errno)); log_err(rc, __func__, log_buf); unlink(namebuf); req_reject(PBSE_SYSTEM, 0, preq, NULL, log_buf); return(rc); } mutex_mgr job_mutex(pj->ji_mutex, true); strcpy(pj->ji_qs.ji_jobid, jid); strcpy(pj->ji_qs.ji_fileprefix, basename); pj->ji_modified = 1; pj->ji_qs.ji_svrflags = created_here; pj->ji_qs.ji_un_type = JOB_UNION_TYPE_NEW; pj->ji_wattr[JOB_ATR_mailpnts].at_val.at_str = 0; /* decode attributes from request into job structure */ psatl = (svrattrl *)GET_NEXT(preq->rq_ind.rq_queuejob.rq_attr); while (psatl != NULL) { if (psatl->al_atopl.resource) { if (strcmp(psatl->al_atopl.resource, "nodes") == 0) { pj->ji_have_nodes_request = 1; } } /* identify the pbs_attribute by name */ attr_index = find_attr(job_attr_def, psatl->al_name, JOB_ATR_LAST); if (attr_index < 0) { /* FAILURE */ /* didn`t recognize the name */ attr_index = JOB_ATR_UNKN; /* keep as "unknown" for now */ } pdef = &job_attr_def[attr_index]; /* Is pbs_attribute not writeable by manager or by a server? */ if ((pdef->at_flags & resc_access_perm) == 0) { /* FAILURE */ rc = PBSE_ATTRRO; svr_job_purge(pj); job_mutex.set_lock_on_exit(false); reply_badattr(rc, 1, psatl, preq); return(rc); } /* special gpu case * if both ncpus and gpus specified, add resource for gpus */ if ((strcmp(psatl->al_name,ATTR_l) == 0) && (strcmp(psatl->al_resc,"ncpus") == 0) && ((pc = strstr(psatl->al_value,":gpus=")) != NULL)) { /* save off gpu resource list then add new resource_list entry for it */ char *gpuval; gpuval = strdup(pc+6); (*pc) = '\0'; if (gpuval != NULL) { rc = pdef->at_decode( &pj->ji_wattr[attr_index], psatl->al_name, "gpus", gpuval, ATR_DFLAG_ACCESS); free(gpuval); if (rc != 0) { /* FAILURE */ /* any other error is fatal */ svr_job_purge(pj); job_mutex.set_lock_on_exit(false); reply_badattr(rc, 1, psatl, preq); return(rc); } } pdef = &job_attr_def[attr_index]; } /* decode pbs_attribute */ rc = pdef->at_decode( &pj->ji_wattr[attr_index], psatl->al_name, psatl->al_resc, psatl->al_value, resc_access_perm); if (rc != 0) { if (rc == PBSE_UNKRESC) { /* check for RM extension */ /* NYI */ /* unknown resources not allowed in Exec queue */ if (pque->qu_qs.qu_type == QTYPE_Execution) { /* FAILURE */ svr_job_purge(pj); job_mutex.set_lock_on_exit(false); reply_badattr(rc, 1, psatl, preq); return(rc); } } else { /* FAILURE */ /* any other error is fatal */ svr_job_purge(pj); job_mutex.set_lock_on_exit(false); reply_badattr(rc, 1, psatl, preq); return(rc); } } /* END if (rc != 0) */ psatl = (svrattrl *)GET_NEXT(psatl->al_link); } /* END while (psatl != NULL) */ rc = set_nodes_attr(pj); if (rc) { /* just record that we could not set node count */ sprintf(log_buf, "Could not set default node count. Error not fatal. Will continue submitting job: %d", rc); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); } /* perform any at_action routine declared for the attributes */ for (i = 0; i < JOB_ATR_LAST; ++i) { pdef = &job_attr_def[i]; if ((pj->ji_wattr[i].at_flags & ATR_VFLAG_SET) && (pdef->at_action)) { rc = pdef->at_action(&pj->ji_wattr[i], pj, ATR_ACTION_NEW); if (rc) { svr_job_purge(pj); job_mutex.set_lock_on_exit(false); req_reject(rc, i, preq, NULL, "cannot execute attribute action"); return(rc); } } } /* END for (i) */ sum_select_mem_request(pj); /* * Now that the attributes have been decoded, we can setup some * additional parameters and perform a few more checks. * * First, set some items based on who created the job... */ if (created_here) { /* created here */ /* check that job has a jobname */ if ((pj->ji_wattr[JOB_ATR_jobname].at_flags & ATR_VFLAG_SET) == 0) { job_attr_def[JOB_ATR_jobname].at_decode( &pj->ji_wattr[JOB_ATR_jobname], NULL, NULL, "none", resc_access_perm); } /* check value of priority */ if (pj->ji_wattr[JOB_ATR_priority].at_flags & ATR_VFLAG_SET) { if ((pj->ji_wattr[JOB_ATR_priority].at_val.at_long < -1024) || (pj->ji_wattr[JOB_ATR_priority].at_val.at_long > 1024)) { rc = PBSE_BADATVAL; svr_job_purge(pj); job_mutex.set_lock_on_exit(false); req_reject(rc, 0, preq, NULL, "invalid job priority"); return rc; } } /* check if a job id was supplied, and if so overwrite the job id */ if (pj->ji_wattr[JOB_ATR_job_id].at_flags & ATR_VFLAG_SET) { char *dot = strchr(pj->ji_qs.ji_jobid,'.'); char tmp_job_id[PBS_MAXSVRJOBID + 1]; job *tmpjob; if (dot != NULL) { snprintf(tmp_job_id, sizeof(tmp_job_id), "%s%s", pj->ji_wattr[JOB_ATR_job_id].at_val.at_str, dot); } else { strcpy(tmp_job_id, pj->ji_wattr[JOB_ATR_job_id].at_val.at_str); } /* make sure the job id doesn't already exist */ if ((tmpjob = svr_find_job(tmp_job_id, FALSE)) != NULL) { unlock_ji_mutex(tmpjob, __func__, "3", LOGLEVEL); /* not unique, reject job */ svr_job_purge(pj); job_mutex.set_lock_on_exit(false); rc = PBSE_JOBEXIST; snprintf(log_buf,sizeof(log_buf), "Job with id %s already exists, cannot set job id\n", pj->ji_qs.ji_jobid); req_reject(rc,0,preq,NULL,log_buf); return rc; } else { /* now change the job id */ strcpy(pj->ji_qs.ji_jobid, tmp_job_id); } } /* set job owner attribute to user@host */ job_attr_def[JOB_ATR_job_owner].at_free( &pj->ji_wattr[JOB_ATR_job_owner]); snprintf(buf, sizeof(buf), "%s@%s", preq->rq_user, preq->rq_host); job_attr_def[JOB_ATR_job_owner].at_decode( &pj->ji_wattr[JOB_ATR_job_owner], NULL, NULL, buf, resc_access_perm); /* set create time */ pj->ji_wattr[JOB_ATR_ctime].at_val.at_long = (long)time_now; pj->ji_wattr[JOB_ATR_ctime].at_flags |= ATR_VFLAG_SET; /* set hop count = 1 */ pj->ji_wattr[JOB_ATR_hopcount].at_val.at_long = 1; pj->ji_wattr[JOB_ATR_hopcount].at_flags |= ATR_VFLAG_SET; /* Interactive jobs are necessarily not rerunable */ if ((pj->ji_wattr[JOB_ATR_interactive].at_flags & ATR_VFLAG_SET) && pj->ji_wattr[JOB_ATR_interactive].at_val.at_long) { pj->ji_wattr[JOB_ATR_rerunable].at_val.at_long = 0; pj->ji_wattr[JOB_ATR_rerunable].at_flags |= ATR_VFLAG_SET; } snprintf(buf, sizeof(buf), "%s%s", pbs_o_que, pque->qu_qs.qu_name); if (get_variable(pj, pbs_o_host) == NULL) { strcat(buf, ","); strcat(buf, pbs_o_host); strcat(buf, "="); strcat(buf, preq->rq_host); /* The user did not send the PBS_O_HOST name with the job. We need to adde it here from the information we gathered from the incoming connection. */ clear_attr(&tempattr, &job_attr_def[JOB_ATR_submit_host]); rc = job_attr_def[JOB_ATR_submit_host].at_decode( &tempattr, NULL, NULL, preq->rq_host, 0); if (!rc) { rc = job_attr_def[JOB_ATR_submit_host].at_set( &pj->ji_wattr[JOB_ATR_submit_host], &tempattr, SET); } if (rc) { sprintf(log_buf, "failed to add submit_host %s. Minor error", preq->rq_host); log_event( PBSEVENT_JOB | PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buf); } job_attr_def[JOB_ATR_submit_host].at_free(&tempattr); } /* need to set certain environmental variables per POSIX */ clear_attr(&tempattr, &job_attr_def[JOB_ATR_variables]); job_attr_def[JOB_ATR_variables].at_decode( &tempattr, NULL, NULL, buf, resc_access_perm); job_attr_def[JOB_ATR_variables].at_set( &pj->ji_wattr[JOB_ATR_variables], &tempattr, INCR); job_attr_def[JOB_ATR_variables].at_free(&tempattr); /* if JOB_ATR_outpath/JOB_ATR_errpath not set, set default */ if (!(pj->ji_wattr[JOB_ATR_outpath].at_flags & ATR_VFLAG_SET) || (((pj->ji_wattr[JOB_ATR_outpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str) - 1] == ':')))) { dynamic_string *ds = get_dynamic_string(-1, NULL); pj->ji_wattr[JOB_ATR_outpath].at_val.at_str = prefix_std_file(pj, ds, (int)'o'); pj->ji_wattr[JOB_ATR_outpath].at_flags |= ATR_VFLAG_SET; /* don't call free_dynamic_string() */ free(ds); } /* * if the output path was specified and ends with a '/' * then append the standard file name */ else if ((pj->ji_wattr[JOB_ATR_outpath].at_flags & ATR_VFLAG_SET) && (((pj->ji_wattr[JOB_ATR_outpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str) - 1] == '/')))) { dynamic_string *ds = get_dynamic_string(-1, NULL); pj->ji_wattr[JOB_ATR_outpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str) - 1] = '\0'; replace_attr_string( &pj->ji_wattr[JOB_ATR_outpath], (add_std_filename(pj, pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, (int)'o', ds))); /* don't call free_dynamic_string() */ free(ds); } else if (pj->ji_wattr[JOB_ATR_outpath].at_flags & ATR_VFLAG_SET) { /* check to see if the path pointed to is a directory. If so apendd the default output file name. Otherwise we treat the string as a path to a file */ char *ptr; ptr = strchr(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, ':'); if (ptr) { ptr++; rc = stat(ptr, &stat_buf); } else { rc = stat(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, &stat_buf); } if (rc == 0) { if (S_ISDIR(stat_buf.st_mode)) { dynamic_string *ds = get_dynamic_string(-1, NULL); /* strcat(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, "/"); */ replace_attr_string( &pj->ji_wattr[JOB_ATR_outpath], (add_std_filename(pj, pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, (int)'o', ds))); /* don't call free_dynamic_string() */ free(ds); } } } if (!(pj->ji_wattr[JOB_ATR_errpath].at_flags & ATR_VFLAG_SET) || (((pj->ji_wattr[JOB_ATR_errpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_errpath].at_val.at_str) - 1] == ':')))) { dynamic_string *ds = get_dynamic_string(-1, NULL); pj->ji_wattr[JOB_ATR_errpath].at_val.at_str = prefix_std_file(pj, ds, (int)'e'); pj->ji_wattr[JOB_ATR_errpath].at_flags |= ATR_VFLAG_SET; /* don't call free_dynamic_string() */ free(ds); } /* * if the error path was specified and ends with a '/' * then append the standard file name */ else if ((pj->ji_wattr[JOB_ATR_errpath].at_flags & ATR_VFLAG_SET) && (((pj->ji_wattr[JOB_ATR_errpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_errpath].at_val.at_str) - 1] == '/')))) { dynamic_string *ds = get_dynamic_string(-1, NULL); pj->ji_wattr[JOB_ATR_errpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_errpath].at_val.at_str) - 1] = '\0'; replace_attr_string(&pj->ji_wattr[JOB_ATR_errpath], (add_std_filename(pj, pj->ji_wattr[JOB_ATR_errpath].at_val.at_str, (int)'e', ds))); /* don't call free_dynamic_string() */ free(ds); } else if (pj->ji_wattr[JOB_ATR_errpath].at_flags & ATR_VFLAG_SET) { /* check to see if the path pointed to is a directory. If so apendd the default output file name. Otherwise we treat the string as a path to a file */ char *ptr; ptr = strchr(pj->ji_wattr[JOB_ATR_errpath].at_val.at_str, ':'); if (ptr) { ptr++; rc = stat(ptr, &stat_buf); } else { rc = stat(pj->ji_wattr[JOB_ATR_errpath].at_val.at_str, &stat_buf); } if (rc == 0) { if (S_ISDIR(stat_buf.st_mode)) { dynamic_string *ds = get_dynamic_string(-1, NULL); /* strcat(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, "/"); */ replace_attr_string(&pj->ji_wattr[JOB_ATR_errpath], (add_std_filename(pj, pj->ji_wattr[JOB_ATR_errpath].at_val.at_str, (int)'e', ds))); /* don't call free_dynamic_string() */ free(ds); } } } if ((pj->ji_wattr[JOB_ATR_outpath].at_val.at_str == NULL) || (pj->ji_wattr[JOB_ATR_errpath].at_val.at_str == NULL)) { rc = PBSE_NOATTR; svr_job_purge(pj); job_mutex.set_lock_on_exit(false); req_reject(rc, 0, preq, NULL, "no output/error file specified"); return rc; } /* * set any "unspecified" checkpoint with queue default values, if any */ que_mgr.lock(); set_chkpt_deflt(pj, pque); que_mgr.unlock(); /* If queue has checkpoint directory name specified, propagate it to the job. */ if (!(pj->ji_wattr[JOB_ATR_checkpoint_dir].at_flags & ATR_VFLAG_SET)) { pbs_attribute *pattr; char *vp; pattr = &pj->ji_wattr[JOB_ATR_checkpoint]; if ((pattr->at_flags & ATR_VFLAG_SET) && (vp = csv_find_value(pattr->at_val.at_str, "dir"))) { clear_attr( &pj->ji_wattr[JOB_ATR_checkpoint_dir], &job_attr_def[JOB_ATR_checkpoint_dir]); job_attr_def[JOB_ATR_checkpoint_dir].at_decode( &pj->ji_wattr[JOB_ATR_checkpoint_dir], NULL, NULL, vp, resc_access_perm); } else if ((pque->qu_attr[QE_ATR_checkpoint_dir].at_flags & ATR_VFLAG_SET) && (pque->qu_attr[QE_ATR_checkpoint_dir].at_val.at_str)) { job_attr_def[JOB_ATR_checkpoint_dir].at_set( &pj->ji_wattr[JOB_ATR_checkpoint_dir], &pque->qu_attr[QE_ATR_checkpoint_dir], SET); } else { pthread_mutex_lock(server.sv_attr_mutex); if ((server.sv_attr[SRV_ATR_checkpoint_dir].at_flags & ATR_VFLAG_SET) && (server.sv_attr[SRV_ATR_checkpoint_dir].at_val.at_str)) { job_attr_def[JOB_ATR_checkpoint_dir].at_set( &pj->ji_wattr[JOB_ATR_checkpoint_dir], &server.sv_attr[SRV_ATR_checkpoint_dir], SET); } pthread_mutex_unlock(server.sv_attr_mutex); } } #ifdef PNOT /************************************************************* * Argonne National Laboratory Account/Project enforcement patch * Dec 2000, Sep 2004, JP Navarro navarro@mcs.anl.gov * Verify the specified account (project) is valid for the user * If none is specified, assign the user's default if allowed * Else return error: valid user account is required *************************************************************/ if (pj->ji_wattr[JOB_ATR_account].at_flags & ATR_VFLAG_SET) { /* account specified, reject if it's not valid for user */ if (user_account_verify( preq->rq_user, pj->ji_wattr[JOB_ATR_account].at_val.at_str) == 0) { rc = PBSE_BADACCT; svr_job_purge(pj); job_mutex.set_lock_on_exit(false); req_reject(rc, 0, preq, NULL, "invalid account"); return rc; } } else { /* account not specified, get default value */ job_attr_def[JOB_ATR_account].at_decode( &pj->ji_wattr[JOB_ATR_account], NULL, NULL, (char *)user_account_default(preq->rq_user), resc_access_perm); if (pj->ji_wattr[JOB_ATR_account].at_val.at_str == 0) { /* no default found */ rc = PBSE_BADACCT; svr_job_purge(pj); job_mutex.set_lock_on_exit(false); req_reject(rc, 0, preq, NULL, "no default account available"); return rc; } } #endif /* PNOT */ } /* END if (created_here) */ else { /* job was created elsewhere and moved here */ /* make sure job_owner is set, error if not */ if (!(pj->ji_wattr[JOB_ATR_job_owner].at_flags & ATR_VFLAG_SET)) { rc = PBSE_IVALREQ; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "no job owner specified"); svr_job_purge(pj); job_mutex.set_lock_on_exit(false); log_err(rc, __func__, log_buf); req_reject(rc, 0, preq, NULL, log_buf); return rc; } /* increment hop count */ if (++pj->ji_wattr[JOB_ATR_hopcount].at_val.at_long > PBS_MAX_HOPCOUNT) { rc = PBSE_HOPCOUNT; svr_job_purge(pj); job_mutex.set_lock_on_exit(false); req_reject(rc, 0, preq, NULL, "max job hop reached"); return rc; } } /* set up at_server pbs_attribute for status */ job_attr_def[JOB_ATR_at_server].at_decode( &pj->ji_wattr[JOB_ATR_at_server], NULL, NULL, server_name, resc_access_perm); /* make sure its okay to submit this job */ if (can_queue_new_job(pj->ji_wattr[JOB_ATR_job_owner].at_val.at_str, pj) == FALSE) { long max_queuable; get_svr_attr_l(SRV_ATR_MaxUserQueuable, &max_queuable); snprintf(log_buf, sizeof(log_buf), "Job %s violates the global server limit of %ld jobs queued per user", pj->ji_qs.ji_jobid, max_queuable); svr_job_purge(pj); job_mutex.set_lock_on_exit(false); req_reject(PBSE_MAXQUED, 0, preq, NULL, log_buf); return(PBSE_MAXQUED); } else { increment_queued_jobs(&users, pj->ji_wattr[JOB_ATR_job_owner].at_val.at_str, pj); } /* * See if the job is qualified to go into the requested queue. * Note, if an execution queue, then ji_qs.ji_un.ji_exect is set up * * svr_chkque() is called way down here because it needs to have the * job structure and attributes already set up. */ /* set ji_is_array_template before calling */ if (pj->ji_wattr[JOB_ATR_job_array_request].at_flags & ATR_VFLAG_SET) { char *oldid; char *hostname; pj->ji_is_array_template = TRUE; /* rewrite jobid to include empty brackets this causes arrays to show up as id[].host in qstat output, and actions applied to id[] are applied to the entire array */ oldid = strdup(pj->ji_qs.ji_jobid); if (oldid == NULL) { /* TODO, return with error if unable to alocate memory! */ } hostname = index(oldid, '.'); if (hostname != NULL) { *(hostname++) = '\0'; snprintf(pj->ji_qs.ji_jobid, PBS_MAXSVRJOBID, "%s[].%s", oldid, hostname); } else snprintf(pj->ji_qs.ji_jobid, sizeof(pj->ji_qs.ji_jobid), "%s[]", oldid); free(oldid); } que_mgr.lock(); if ((rc = svr_chkque(pj, pque, preq->rq_host, MOVE_TYPE_Move, EMsg))) { que_mgr.unlock(); svr_job_purge(pj); job_mutex.set_lock_on_exit(false); req_reject(rc, 0, preq, NULL, EMsg); return(rc); } que_mgr.unlock(); /* FIXME: if EMsg[0] != '\0', send a warning email to the user */ strcpy(pj->ji_qs.ji_queue, pque->qu_qs.qu_name); pj->ji_wattr[JOB_ATR_substate].at_val.at_long = JOB_SUBSTATE_TRANSIN; pj->ji_wattr[JOB_ATR_substate].at_flags |= ATR_VFLAG_SET; /* set remaining job structure elements */ pj->ji_qs.ji_state = JOB_STATE_TRANSIT; pj->ji_qs.ji_substate = JOB_SUBSTATE_TRANSIN; pj->ji_wattr[JOB_ATR_mtime].at_val.at_long = (long)time_now; pj->ji_wattr[JOB_ATR_mtime].at_flags |= ATR_VFLAG_SET; pj->ji_qs.ji_un_type = JOB_UNION_TYPE_NEW; pj->ji_qs.ji_un.ji_newt.ji_fromsock = sock; pj->ji_qs.ji_un.ji_newt.ji_fromaddr = get_connectaddr(sock,TRUE); pj->ji_qs.ji_un.ji_newt.ji_scriptsz = 0; /* acknowledge the request with the job id */ if (reply_jobid(preq, pj->ji_qs.ji_jobid, BATCH_REPLY_CHOICE_Queue) != 0) { /* reply failed, purge the job and close the connection */ rc = PBSE_SOCKET_WRITE; /* Re-write reply_jobid to return the error */ svr_job_purge(pj); job_mutex.set_lock_on_exit(false); return(rc); } job_save(pj, SAVEJOB_NEW, 0); /* link job into server's new jobs list request */ insert_job(&newjobs,pj); *pjob_id = strdup(pj->ji_qs.ji_jobid); return(rc); } /* END req_quejob() */ /* * req_jobcredential - receive a set of credentials to be used by the job * * THIS IS JUST A PLACE HOLDER FOR NOW * It does nothing but acknowledge the request */ int req_jobcredential( struct batch_request *preq) { int rc = PBSE_NONE; job *pj; pj = locate_new_job(preq->rq_conn, NULL); if (pj == NULL) { rc = PBSE_IVALREQ; req_reject(rc, 0, preq, NULL, NULL); return rc; } if (svr_authorize_jobreq(preq, pj) == -1) { rc = PBSE_PERM; req_reject(rc, 0, preq, NULL, "job request not authorized"); unlock_ji_mutex(pj, __func__, "1", LOGLEVEL); return rc; } reply_ack(preq); unlock_ji_mutex(pj, __func__, "2", LOGLEVEL); return rc; } /* END req_jobcredential() */ /* * req_jobscript - receive job script section * * Each section is appended to the file */ int req_jobscript( struct batch_request *preq) { int fds; char namebuf[MAXPATHLEN]; job *pj; int filemode = 0600; char log_buf[LOCAL_LOG_BUF_SIZE]; int rc = PBSE_NONE; errno = 0; pj = locate_new_job(preq->rq_conn, preq->rq_ind.rq_jobfile.rq_jobid); if (pj == NULL) { rc = PBSE_IVALREQ; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot locate new job %s (%d - %s)", preq->rq_ind.rq_jobfile.rq_jobid, errno, strerror(errno)); log_err(rc, __func__, log_buf); req_reject(rc, 0, preq, NULL, log_buf); return(rc); } /* what is the difference between JOB_SUBSTATE_TRANSIN and TRANSICM? */ if (pj->ji_qs.ji_substate != JOB_SUBSTATE_TRANSIN) { rc = PBSE_IVALREQ; if (errno == 0) { snprintf(log_buf, sizeof(log_buf), "job %s in unexpected state '%s'", pj->ji_qs.ji_jobid, PJobSubState[pj->ji_qs.ji_substate]); } else { snprintf(log_buf, sizeof(log_buf), "job %s in unexpected state '%s' (errno=%d - %s)", pj->ji_qs.ji_jobid, PJobSubState[pj->ji_qs.ji_substate], errno, strerror(errno)); } log_err(rc, __func__, log_buf); req_reject(rc, 0, preq, NULL, log_buf); unlock_ji_mutex(pj, __func__, "1", LOGLEVEL); return(rc); } if (svr_authorize_jobreq(preq, pj) == -1) { rc = PBSE_PERM; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot authorize request %s (%d-%s)", preq->rq_ind.rq_jobfile.rq_jobid, errno, strerror(errno)); log_err(rc, __func__, log_buf); req_reject(rc, 0, preq, NULL, log_buf); unlock_ji_mutex(pj, __func__, "2", LOGLEVEL); return rc; } snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_jobs, pj->ji_qs.ji_fileprefix, JOB_SCRIPT_SUFFIX); if (pj->ji_qs.ji_un.ji_newt.ji_scriptsz == 0) { /* NOTE: fail is job script already exists */ fds = open(namebuf, O_WRONLY | O_CREAT | O_EXCL | O_Sync, filemode); } else { fds = open(namebuf, O_WRONLY | O_APPEND | O_Sync, filemode); } if (fds < 0) { rc = PBSE_CAN_NOT_OPEN_FILE; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot open '%s' errno=%d - %s (%s)", namebuf, errno, strerror(errno), msg_script_open); log_err(rc, __func__, log_buf); req_reject(rc, 0, preq, NULL, log_buf); unlock_ji_mutex(pj, __func__, "3", LOGLEVEL); return rc; } if (write_ac_socket( fds, preq->rq_ind.rq_jobfile.rq_data, (unsigned)preq->rq_ind.rq_jobfile.rq_size) != preq->rq_ind.rq_jobfile.rq_size) { rc = PBSE_CAN_NOT_WRITE_FILE; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot write to file %s (%d-%s) %s", namebuf, errno, strerror(errno), msg_script_write); log_err(rc, __func__, log_buf); req_reject(PBSE_INTERNAL, 0, preq, NULL, log_buf); close(fds); unlock_ji_mutex(pj, __func__, "4", LOGLEVEL); return rc; } close(fds); pj->ji_qs.ji_un.ji_newt.ji_scriptsz += preq->rq_ind.rq_jobfile.rq_size; /* job has a script file */ pj->ji_qs.ji_svrflags = (pj->ji_qs.ji_svrflags & ~JOB_SVFLG_CHECKPOINT_FILE) | JOB_SVFLG_SCRIPT; /* SUCCESS */ unlock_ji_mutex(pj, __func__, "5", LOGLEVEL); reply_ack(preq); return(rc); } /* END req_jobscript() */ /* the following is for the server only, MOM has her own version below */ /* * req_mvjobfile - receive a job file * This request is used to move a file associated with a job, typically * the standard output or error, between a server and a server or from * a mom back to a server. For a server, the destination is always * within the spool directory. */ /* NOTE: routine for server only - mom code follows this routine */ int req_mvjobfile( struct batch_request *preq) /* ptr to the decoded request */ { int fds; char namebuf[MAXPATHLEN]; job *pj; char log_buf[LOCAL_LOG_BUF_SIZE]; int rc = PBSE_NONE; pj = locate_new_job(preq->rq_conn, NULL); if (pj == NULL) pj = svr_find_job(preq->rq_ind.rq_jobfile.rq_jobid, FALSE); if ((preq->rq_fromsvr == 0) || (pj == NULL)) { rc = PBSE_IVALREQ; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot find job %s - (%d-%s)", preq->rq_ind.rq_jobfile.rq_jobid, errno, strerror(errno)); log_err(rc, __func__, log_buf); req_reject(rc, 0, preq, NULL, NULL); if (pj != NULL) unlock_ji_mutex(pj, __func__, "1", LOGLEVEL); return(rc); } mutex_mgr job_mutex(pj->ji_mutex, true); snprintf(namebuf, sizeof(namebuf), "%s%s", path_spool, pj->ji_qs.ji_fileprefix); switch ((enum job_file)preq->rq_ind.rq_jobfile.rq_type) { case StdOut: strcat(namebuf, JOB_STDOUT_SUFFIX); break; case StdErr: strcat(namebuf, JOB_STDERR_SUFFIX); break; case Checkpoint: strcat(namebuf, JOB_CHECKPOINT_SUFFIX); break; default: rc = PBSE_IVALREQ; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "unexpected move type %s - (%d-%s)", pj->ji_qs.ji_jobid, errno, strerror(errno)); log_err(rc, __func__, log_buf); req_reject(rc, 0, preq, NULL, log_buf); return(rc); break; } if (preq->rq_ind.rq_jobfile.rq_sequence == 0) { unlink(namebuf); fds = open(namebuf, O_WRONLY | O_CREAT | O_EXCL | O_Sync, 0600); } else fds = open(namebuf, O_WRONLY | O_APPEND | O_Sync, 0600); if (fds < 0) { rc = PBSE_CAN_NOT_OPEN_FILE; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot open file %s for %s - (%d-%s) - %s", namebuf, pj->ji_qs.ji_jobid, errno, strerror(errno), msg_script_open); log_err(errno, __func__, log_buf); req_reject(rc, 0, preq, NULL, log_buf); return(rc); } if (write_ac_socket( fds, preq->rq_ind.rq_jobfile.rq_data, (unsigned)preq->rq_ind.rq_jobfile.rq_size) != preq->rq_ind.rq_jobfile.rq_size) { rc = PBSE_CAN_NOT_WRITE_FILE; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot write to file %s for %s - (%d-%s) - %s", namebuf, pj->ji_qs.ji_jobid, errno, strerror(errno), msg_script_write); log_err(rc, "req_jobfile", log_buf); req_reject(PBSE_SYSTEM, 0, preq, NULL, log_buf); close(fds); return(rc); } close(fds); if (LOGLEVEL >= 6) { snprintf(log_buf, sizeof(log_buf), "successfully moved file '%s' for job '%s'", namebuf, preq->rq_ind.rq_jobfile.rq_jobid); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, preq->rq_ind.rq_jobfile.rq_jobid, log_buf); } reply_ack(preq); return(rc); } /* END req_mvjobfile() */ /* * req_rdytocommit - Ready To Commit Batch Request * * Set substate to JOB_SUBSTATE_TRANSICM and e record job to permanent storage, i.e. written to the job save file * (called by both pbs_server and pbs_mom) */ int req_rdytocommit( batch_request *preq) /* I */ { job *pj; int sock = preq->rq_conn; int OrigState; int OrigSState; char OrigSChar; long OrigFlags; char namebuf[MAXPATHLEN+1]; char jobid[PBS_MAXSVRJOBID + 1]; char log_buf[LOCAL_LOG_BUF_SIZE]; int rc = PBSE_NONE; pj = locate_new_job(sock, preq->rq_ind.rq_rdytocommit); if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "ready to commit job"); } if (pj == NULL) { rc = PBSE_UNKJOBID; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "unknown job id %s (%d-%s)", preq->rq_ind.rq_rdytocommit, errno, strerror(errno)); log_err(rc, __func__, log_buf); req_reject(rc, 0, preq, NULL, log_buf); return(rc); } mutex_mgr job_mutex(pj->ji_mutex, true); if (pj->ji_qs.ji_substate != JOB_SUBSTATE_TRANSIN) { rc = PBSE_IVALREQ; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot commit job in unexpected state %s (%d-%s)", preq->rq_ind.rq_rdytocommit, errno, strerror(errno)); log_err(rc, __func__, log_buf); req_reject(rc, 0, preq, NULL, log_buf); return(rc); } if (svr_authorize_jobreq(preq, pj) == -1) { rc = PBSE_PERM; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot authorize job req %s", preq->rq_ind.rq_rdytocommit); req_reject(rc, 0, preq, NULL, log_buf); return(rc); } OrigState = pj->ji_qs.ji_state; OrigSState = pj->ji_qs.ji_substate; OrigSChar = pj->ji_wattr[JOB_ATR_state].at_val.at_char; OrigFlags = pj->ji_wattr[JOB_ATR_state].at_flags; pj->ji_qs.ji_state = JOB_STATE_TRANSIT; pj->ji_qs.ji_substate = JOB_SUBSTATE_TRANSICM; pj->ji_wattr[JOB_ATR_state].at_val.at_char = 'T'; pj->ji_wattr[JOB_ATR_state].at_flags |= ATR_VFLAG_SET; /* if this is a job array template then we'll delete the .JB file that was created for this job since we are going to save it with a different suffix here. XXX: not sure why the .JB file already exists before we do the SAVEJOB_NEW save below */ if (pj->ji_wattr[JOB_ATR_job_array_request].at_flags & ATR_VFLAG_SET) { pj->ji_is_array_template = TRUE; snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_jobs, pj->ji_qs.ji_fileprefix, JOB_FILE_SUFFIX); unlink(namebuf); } if (job_save(pj, SAVEJOB_FULL, 0) == -1) { rc = PBSE_CAN_NOT_WRITE_FILE; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot save job - %s - (%d - %s)", pj->ji_qs.ji_jobid, errno, strerror(errno)); log_err(rc, __func__, log_buf); /* commit failed, backoff state changes */ pj->ji_qs.ji_state = OrigState; pj->ji_qs.ji_substate = OrigSState; pj->ji_wattr[JOB_ATR_state].at_val.at_char = OrigSChar; pj->ji_wattr[JOB_ATR_state].at_flags = OrigFlags; req_reject(rc, 0, preq, NULL, log_buf); return(rc); } /* acknowledge the request with the job id */ strcpy(jobid, pj->ji_qs.ji_jobid); /* unlock now to prevent a potential deadlock */ job_mutex.unlock(); if (reply_jobid(preq, jobid, BATCH_REPLY_CHOICE_RdytoCom) != 0) { rc = PBSE_SOCKET_WRITE; /* reply_jobid need to return an accurate code */ /* reply failed, purge the job and close the connection */ snprintf(log_buf, sizeof(log_buf), "cannot report jobid - %s - (%d - %s)", jobid, errno, strerror(errno)); log_err(rc, __func__, log_buf); if ((pj = svr_find_job(jobid, FALSE)) != NULL) svr_job_purge(pj); return(rc); } if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, "ready to commit job completed"); } return(rc); } /* END req_rdytocommit() */ int set_interactive_job_roaming_policy( job *pjob) { long interactive_roaming = FALSE; long cray_enabled = FALSE; struct pbsnode *pnode; char *submit_node_id; char *dot; char log_buf[LOCAL_LOG_BUF_SIZE]; int rc = PBSE_NONE; get_svr_attr_l(SRV_ATR_InteractiveJobsCanRoam, &interactive_roaming); get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled); if (cray_enabled == TRUE) { if (pjob->ji_wattr[JOB_ATR_interactive].at_val.at_long) { if (interactive_roaming == FALSE) { submit_node_id = strdup(pjob->ji_wattr[JOB_ATR_submit_host].at_val.at_str); if ((pnode = find_nodebyname(submit_node_id)) == NULL) { if ((dot = strchr(submit_node_id, '.')) != NULL) { *dot = '\0'; pnode = find_nodebyname(submit_node_id); } } if (pnode != NULL) { pjob->ji_wattr[JOB_ATR_login_prop].at_flags |= ATR_VFLAG_SET; pjob->ji_wattr[JOB_ATR_login_prop].at_val.at_str = submit_node_id; unlock_node(pnode, __func__, NULL, LOGLEVEL); } else { snprintf(log_buf, sizeof(log_buf), "Couldn't determine which login node is %s", pjob->ji_wattr[JOB_ATR_submit_host].at_val.at_str); log_err(PBSE_UNKNODE, __func__, log_buf); rc = -1; } } } } return(rc); } /* END set_interactive_job_roaming_policy() */ /* * req_commit - commit ownership of job * * Set state of job to JOB_STATE_QUEUED (or Held or Waiting) and * enqueue the job into its destination queue. */ int req_commit( struct batch_request *preq) /* I */ { int rc = PBSE_NONE; job *pj; int newstate; int newsub; pbs_queue *pque; char log_buf[LOCAL_LOG_BUF_SIZE] = {0}; #ifdef AUTORUN_JOBS struct batch_request *preq_run = NULL; pbs_attribute *pattr; int nodes_avail = -1; int dummy; char spec[MAXPATHLEN]; char *rq_destin = NULL; #endif /* AUTORUN_JOBS */ #ifdef QUICKCOMMIT int OrigState; int OrigSState; char OrigSChar; long OrigFlags; char namebuf[MAXPATHLEN+1]; #endif /* QUICKCOMMIT */ pj = locate_new_job(preq->rq_conn, preq->rq_ind.rq_commit); if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "committing job"); } if (pj == NULL) { rc = PBSE_UNKJOBID; req_reject(rc, 0, preq, NULL, NULL); return(rc); } if (LOGLEVEL >= 10) LOG_EVENT(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, pj->ji_qs.ji_jobid); mutex_mgr job_mutex = mutex_mgr(pj->ji_mutex, true); #ifdef QUICKCOMMIT if (pj->ji_qs.ji_substate != JOB_SUBSTATE_TRANSIN) { rc = PBSE_IVALREQ; snprint(log_buf, LOCAL_LOG_BUF_SIZE, "cannot commit job in unexpected state (%d - %s)", errno, strerror(errno)); log_err(rc, __func__, log_buf); req_reject(PBSE_IVALREQ, 0, preq, NULL, log_buf); job_mutex.unlock(); return(rc); } OrigState = pj->ji_qs.ji_state; OrigSState = pj->ji_qs.ji_substate; OrigSChar = pj->ji_wattr[JOB_ATR_state].at_val.at_char; OrigFlags = pj->ji_wattr[JOB_ATR_state].at_flags; pj->ji_qs.ji_state = JOB_STATE_TRANSIT; pj->ji_qs.ji_substate = JOB_SUBSTATE_TRANSICM; pj->ji_wattr[JOB_ATR_state].at_val.at_char = 'T'; pj->ji_wattr[JOB_ATR_state].at_flags |= ATR_VFLAG_SET; if (pj->ji_wattr[JOB_ATR_job_array_request].at_flags & ATR_VFLAG_SET) { pj->ji_is_array_template = TRUE; snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_jobs, pj->ji_qs.ji_fileprefix, JOB_FILE_SUFFIX); unlink(namebuf); } #endif /* QUICKCOMMIT */ if (pj->ji_qs.ji_substate != JOB_SUBSTATE_TRANSICM) { rc = PBSE_IVALREQ; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot commit job in unexpected state (%d-%s)", errno, strerror(errno)); log_err(rc, __func__, "cannot commit job in unexpected state"); req_reject(rc, 0, preq, NULL, NULL); job_mutex.unlock(); return(rc); } if (svr_authorize_jobreq(preq, pj) == -1) { rc = PBSE_PERM; snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "no permission to start job %s", pj->ji_qs.ji_jobid); req_reject(rc, 0, preq, NULL, log_buf); if (LOGLEVEL >= 6) log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pj->ji_qs.ji_jobid, log_buf); job_mutex.unlock(); return(rc); } /* remove job from the server new job list, set state, and enqueue it */ remove_job(&newjobs,pj); /* job array, setup the array task *** job array under development */ if (pj->ji_is_array_template) { long max_size = 0; long max_slot = 0; if ((rc = setup_array_struct(pj))) { if (rc == ARRAY_TOO_LARGE) { get_svr_attr_l(SRV_ATR_MaxArraySize, &max_size); snprintf(log_buf,sizeof(log_buf), "Requested array size too large, limit is %ld", max_size); req_reject(PBSE_BAD_ARRAY_REQ, 0, preq, NULL, log_buf); } else if (rc == INVALID_SLOT_LIMIT) { get_svr_attr_l(SRV_ATR_MaxSlotLimit, &max_slot); snprintf(log_buf,sizeof(log_buf), "Requested slot limit invalid, limit is %ld", max_slot); req_reject(PBSE_BAD_ARRAY_REQ, 0, preq, NULL, log_buf); } else { req_reject(PBSE_BAD_ARRAY_REQ, 0, preq, NULL, NULL); } job_mutex.unlock(); return(PBSE_BAD_ARRAY_REQ); } } /* end if (pj->ji_is_array_template) */ svr_evaljobstate(*pj, newstate, newsub, 1); svr_setjobstate(pj, newstate, newsub, FALSE); set_interactive_job_roaming_policy(pj); /* set the queue rank attribute */ pj->ji_wattr[JOB_ATR_qrank].at_val.at_long = ++queue_rank; pj->ji_wattr[JOB_ATR_qrank].at_flags |= ATR_VFLAG_SET; if ((rc = svr_enquejob(pj, FALSE, -1, false)) != PBSE_NONE) { if (rc != PBSE_JOB_RECYCLED) { if (LOGLEVEL >= 6) { snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot queue job %s", pj->ji_qs.ji_jobid); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pj->ji_qs.ji_jobid, log_buf); } svr_job_purge(pj); } else job_mutex.unlock(); req_reject(rc, 0, preq, NULL, log_buf); return(rc); } /* * if the job went into a Route (push) queue that has been started, * try once to route it to give immediate feedback as a courtsey * to the user. */ if ((pque = get_jobs_queue(&pj)) != NULL) { mutex_mgr pque_mutex = mutex_mgr(pque->qu_mutex,true); if ((preq->rq_fromsvr == 0) && (pque->qu_qs.qu_type == QTYPE_RoutePush) && (pque->qu_attr[QA_ATR_Started].at_val.at_long != 0)) { /* job_route expects the queue to be unlocked */ pque_mutex.unlock(); if ((rc = job_route(pj))) { if (LOGLEVEL >= 6) { snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot route job %s", pj->ji_qs.ji_jobid); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pj->ji_qs.ji_jobid, log_buf); } decrement_queued_jobs(&users, pj->ji_wattr[JOB_ATR_job_owner].at_val.at_str); svr_job_purge(pj); req_reject(rc, 0, preq, NULL, log_buf); return(rc); } } if (job_save(pj, SAVEJOB_FULL, 0) != 0) { rc = PBSE_CAN_NOT_SAVE_FILE; if (LOGLEVEL >= 6) { snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "cannot save job %s", pj->ji_qs.ji_jobid); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pj->ji_qs.ji_jobid, log_buf); } svr_job_purge(pj); req_reject(rc, 0, preq, NULL, log_buf); return(rc); } /* this needs to be done if there are routing queues. queue_route checks to see if req_commit is done routing the job with this flag */ pj->ji_commit_done = 1; /* need to format message first, before request goes away - moved here because we have the mutex */ snprintf(log_buf, sizeof(log_buf), msg_jobnew, preq->rq_user, preq->rq_host, pj->ji_wattr[JOB_ATR_job_owner].at_val.at_str, pj->ji_wattr[JOB_ATR_jobname].at_val.at_str, pque->qu_qs.qu_name); } #ifdef AUTORUN_JOBS /* If we are auto running jobs with start_count = 0 then the * branch_request needs re creation since reply_jobid will free * the passed in one */ pattr = &pj->ji_wattr[JOB_ATR_start_count]; snprintf(spec, sizeof(spec), PBS_DEFAULT_NODE); node_avail_complex(spec, &nodes_avail, &dummy, &dummy, &dummy); if ((pattr->at_val.at_long == 0) && (nodes_avail > 0)) { /* Create a new batch request and fill it in */ preq_run = alloc_br(PBS_BATCH_RunJob); preq_run->rq_perm = preq->rq_perm | ATR_DFLAG_OPWR; preq_run->rq_ind.rq_run.rq_resch = 0; preq_run->rq_ind.rq_run.rq_destin = rq_destin; preq_run->rq_fromsvr = preq->rq_fromsvr; preq_run->rq_extsz = preq->rq_extsz; preq_run->rq_noreply = TRUE; /* set for no replies */ strcpy(preq_run->rq_user, preq->rq_user); strcpy(preq_run->rq_host, preq->rq_host); strcpy(preq_run->rq_ind.rq_run.rq_jid, preq->rq_ind.rq_rdytocommit); } #endif /* acknowledge the request with the job id */ reply_jobid(preq, pj->ji_qs.ji_jobid, BATCH_REPLY_CHOICE_Commit); /* if job array, setup the cloning work task */ if (pj->ji_is_array_template) { sprintf(log_buf, "threading job_clone_wt: job id %s", pj->ji_qs.ji_jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); enqueue_threadpool_request(job_clone_wt, strdup(pj->ji_qs.ji_jobid)); } sprintf(log_buf, "job_id: %s", pj->ji_qs.ji_jobid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,__func__,log_buf); if ((pj->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { /* notify creator where job is */ issue_track(pj); } job_mutex.unlock(); #ifdef AUTORUN_JOBS /* If we are auto running jobs with start_count = 0 then the * branch_request was re created. Now we run the job if any nodes * are available */ if ((pattr->at_val.at_long == 0) && (nodes_avail > 0)) { if (LOGLEVEL >= 7) { snprintf(log_buf, sizeof(log_buf), "Trying to AUTORUN job %s", pj->ji_qs.ji_jobid); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", log_buf); } req_runjob(preq_run); } #endif /* AUTORUN_JOBS */ return(rc); } /* END req_commit() */ /* * locate_new_job - locate a "new" job which has been set up req_quejob on * the servers new job list. * * This function is used by the sub-requests which make up the global * "Queue Job Request" to locate the job structure. * * If the jobid is specified (will be for rdytocommit and commit, but not * for script), we search for a matching jobid. * * The job must (also) match the socket specified and the host associated * with the socket unless ji_fromsock == -1, then its a recovery situation. */ /* FIXME: why bother checking for matching sock if a jobid is supplied? Seems * to me that a reconnect immediately invalidates fromsock. */ static job *locate_new_job( int sock, /* I */ char *jobid) /* I (optional) */ { job *pj; int iter = -1; while ((pj = next_job(&newjobs,&iter)) != NULL) { if ((jobid != NULL) && (*jobid != '\0')) { if (!strncmp(pj->ji_qs.ji_jobid, jobid, PBS_MAXSVRJOBID)) { /* requested job located */ break; } } else if (pj->ji_qs.ji_un.ji_newt.ji_fromsock == -1) { /* empty job slot located */ break; } else { /* matching job slot located */ break; } unlock_ji_mutex(pj, __func__, "1", LOGLEVEL); } /* END while(pj != NULL) */ /* return job slot located (NULL on FAILURE) */ return(pj); } /* END locate_new_job() */ /* * City Tools patch */ #define UserAcctMax 12 typedef struct _UserAcct { int ActCnt; /* How many projects */ int ActMax; /* Max allowed in this struct */ char ActRaw[80]; /* The raw project data */ char ActDat[80]; /* ActRaw with \0 as necessary */ char *ActAdr[UserAcctMax]; /* Pointers to ActDat items */ } UserAcct_t; UserAcct_t UserAcct = { 0, UserAcctMax, "", "", {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} }; /* UserAcctMax values */ #ifdef PNOT int user_account_verify( char *arguser, char *argaccount) { int i; int rc = 0; /* 0 = failure, 1 = success */ char log_buf[LOCAL_LOG_BUF_SIZE]; if (!user_account_read_user(arguser)) { sprintf(log_buf, "user_account_verify(%s, %s) -> USER NOT FOUND", arguser, argaccount); goto user_account_verify_done; } for (i = 0;i < UserAcct.ActCnt;++i) { if (strcmp(argaccount, UserAcct.ActAdr[i]) == 0) { sprintf(log_buf, "user_account_verify(%s, %s) -> SUCCESS", arguser, argaccount); rc = 1; goto user_account_verify_done; } } /* END for (i) */ sprintf(log_buf, "user_account_verify(%s, %s) -> FAILED", arguser, argaccount); user_account_verify_done: log_event( PBSEVENT_JOB | PBSEVENT_SECURITY, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buf); return(rc); } char *user_account_default( char *arguser) { char *rc = 0; /* 0 = failure,
= success */ char log_buf[LOCAL_LOG_BUF_SIZE]; if (!user_account_read_user(arguser)) { sprintf(log_buf, "user_account_default(%s) = USER NOT FOUND", arguser); goto user_account_default_done; } if (UserAcct.ActCnt < 1) { sprintf(log_buf, "user_account_default(%s) = NO PROJECT FOUND", arguser); goto user_account_default_done; } rc = UserAcct.ActAdr[0]; sprintf(log_buf, "user_account_default(%s) = %s", arguser, rc); user_account_default_done: log_event( PBSEVENT_JOB | PBSEVENT_SECURITY, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buf); return(rc); } /* END user_account_default() */ /* * Given a username, returns that user's accounts from the user->account file * Returns 0 if username isn't found */ #define AcctScanUser 0 #define AcctScanAcct 1 #define AcctScanLine -1 #define AcctFile "/usr/local/etc/usertg/project-by-user-torque.map" int user_account_read_user( char *arguser) { char proj_file[] = AcctFile; int fd; char s_buf[128*1024]; int readsize; int scanmode = AcctScanUser; int i, j; char *ci; char *cj; int arguserlen; arguserlen = strlen(arguser); if ((fd = open(proj_file, O_RDONLY, 0)) < 0) { return(0); } if (lseek(fd, (off_t)0, SEEK_SET) != 0) { close(fd); return(0); } readsize = read_ac_socket(fd, s_buf, sizeof(s_buf)); close(fd); if ((readsize < 1) || (readsize > sizeof(s_buf))) { /* bail if not sane */ return(0); } for (i = 0;i < readsize;++i) { /* First, handle comments and whitespace */ if (scanmode == AcctScanLine) { /* Looking for new line */ if (s_buf[i] == '\n') /* Found it */ scanmode = AcctScanUser; continue; } if (isspace(s_buf[i])) /* Skip spaces */ continue; if (s_buf[i] == '#') { /* Comment found */ scanmode = AcctScanLine; continue; } /* Next, handle user and account scanning */ if (scanmode == AcctScanUser) { if ((i + arguserlen) > readsize) /* Past the end */ { return(0); } if (strncmp(&s_buf[i], arguser, arguserlen)) { scanmode = AcctScanLine; /* Not arguser, next line */ continue; } if (isspace(s_buf[i + arguserlen]) || s_buf[i + arguserlen] == ':') { i += arguserlen; /* Is arguser */ scanmode = AcctScanAcct; } else { /* Whatever, ignore it */ scanmode = AcctScanLine; continue; } } else { /* scanmode == AcctScanAcct */ if ((s_buf[i] == ':') || isspace(s_buf[i])) continue; for (j = i;j < readsize;j++) { if (isspace(s_buf[j])) { snprintf(UserAcct.ActRaw, sizeof(UserAcct.ActRaw), "%s", &s_buf[i]); UserAcct.ActRaw[j-i] = '\0'; goto have_account; } } return(0); } } /* END for (i) */ return(0); /* Never found it */ have_account: if (strlen(UserAcct.ActRaw) < 1) /* Nothing found */ { return(0); } strcpy(UserAcct.ActDat, UserAcct.ActRaw); UserAcct.ActCnt = 0; UserAcct.ActMax = UserAcctMax; for (ci = &UserAcct.ActDat[0];*ci != '\0';ci++) { if (isspace(*ci) || (*ci == ',')) continue; for (cj = ci + 1;!isspace(*cj) && (*cj != ',') && (*cj != '\0');cj++) { /* NO-OP */ } *cj = '\0'; UserAcct.ActAdr[UserAcct.ActCnt++] = ci; ci = cj; } return(1); } /* END user_account_read_user() */ #endif /* PNOT */ /* END req_quejob.c() */