/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ /* * req_quejob.c * * Functions relating to the Queue Job Batch Request sequence, including * Queue Job, Job Script, Ready to Commit, and Commit. * * Included functions are: * req_quejob() * req_jobcredential() * req_jobscript() * req_rdycommit() * req_commit() */ #include /* the master config generated by configure */ #include #include #include #include #include #include #include #include #include #include #include "libpbs.h" #include "server_limits.h" #include "list_link.h" #include "attribute.h" #include "resource.h" #include "server.h" #include "credential.h" #include "batch_request.h" #include "queue.h" #include "pbs_job.h" #include "net_connect.h" #include "pbs_error.h" #include "log.h" #include "svrfunc.h" #include "csv.h" #include "array.h" #include "work_task.h" extern void job_clone_wt(struct work_task *); extern int setup_array_struct(job *pjob); /* External Functions Called: */ extern int reply_jid(char *); extern void start_exec(job *); extern int svr_authorize_jobreq(struct batch_request *, job *); extern int svr_chkque(job *, pbs_queue *, char *, int, char *); extern int job_route(job *); extern int node_avail_complex(char *, int *, int *, int *, int*); /* Global Data Items: */ extern char *path_spool; extern struct server server; extern char server_name[]; extern int queue_rank; extern tlist_head svr_jobarrays; extern tlist_head svr_jobs_array_sum; extern const char *PJobSubState[]; /* sync w/enum job_file TJobFileType[]) */ const char *TJobFileType[] = { "jobscript", "stdin", "stdout", "stderr", "checkpoint", NULL }; extern int resc_access_perm; extern tlist_head svr_alljobs; extern tlist_head svr_newjobs; extern attribute_def job_attr_def[]; extern char *path_jobs; extern char *pbs_o_host; extern char *msg_script_open; extern char *msg_script_write; extern char *msg_init_abt; extern char *msg_jobnew; extern time_t time_now; extern int LOGLEVEL; extern char *msg_daemonname; /* Private Functions in this file */ static job *locate_new_job(int, char *); #ifdef PNOT static int user_account_verify(char *, char *); static char *user_account_default(char *); static int user_account_read_user(char *); #endif /* PNOT */ static char *pbs_o_que = "PBS_O_QUEUE="; /****************************************************************** * set_nodes_attr - Check to see is the node resource was requested * on the qsub line. If not add the node attribute * to the Resource_List and set the value to 1. This * makes it so that if procct is set on a queue * jobs without a node resource request will still * be properly routed. * Returns: 0 if OK * Non-Zero on failure *****************************************************************/ int set_nodes_attr(job *pjob) { resource *pres; int nodect_set = 0; int rc = 0; char *pname; if(pjob->ji_wattr[JOB_ATR_resource].at_flags & ATR_VFLAG_SET) { pres = (resource *)GET_NEXT(pjob->ji_wattr[JOB_ATR_resource].at_val.at_list); while(pres != NULL) { if(pres->rs_defin != NULL) { pname = pres->rs_defin->rs_name; if(pname == NULL || *pname == 0) { pres = (resource *)GET_NEXT(pres->rs_link); continue; } if(strncmp(pname, "nodes", 5) == 0 || strncmp(pname, "procs", 5) == 0) { nodect_set = 1; break; } } pres = (resource *)GET_NEXT(pres->rs_link); } } if(nodect_set == 0) { int temp_perm; /* neither procs nor nodes were requested. set procct to 1 */ temp_perm = resc_access_perm; resc_access_perm = ATR_DFLAG_WRACC | ATR_DFLAG_MGWR | ATR_DFLAG_RMOMIG; rc = decode_resc(&pjob->ji_wattr[JOB_ATR_resource], "Resource_List", "procct", "1"); resc_access_perm = temp_perm; } return(rc); } /* END set_nodes_attr() */ /* * req_quejob - Queue Job Batch Request processing routine * NOTE: calls svr_chkque() to validate queue access * */ void req_quejob( struct batch_request *preq) /* ptr to the decoded request */ { char *id = "req_quejob"; char basename[PBS_JOBBASE + 1]; int created_here = 0; int attr_index; char *jid; char namebuf[MAXPATHLEN + 1]; attribute_def *pdef; job *pj; svrattrl *psatl; int rc; int sock = preq->rq_conn; int i; char buf[256]; int fds; char jidbuf[PBS_MAXSVRJOBID + 1]; char *pc; pbs_queue *pque; char *qname; attribute tempattr; struct stat stat_buf; char EMsg[1024]; /* set basic (user) level access permission */ resc_access_perm = ATR_DFLAG_USWR | ATR_DFLAG_Creat; /* * if the job id is supplied, the request had better be * from another server */ if (preq->rq_fromsvr) { /* from another server - accept the extra attributes */ resc_access_perm |= ATR_DFLAG_MGWR | ATR_DFLAG_SvWR; jid = preq->rq_ind.rq_queuejob.rq_jid; } else if (preq->rq_ind.rq_queuejob.rq_jid[0] != '\0') { /* FAILURE */ /* a job id is not allowed from a client */ log_err(errno, id, "job id not allowed from client"); req_reject(PBSE_IVALREQ, 0, preq, NULL, "job not allowed from client"); return; } else { /* Create a job id */ char host_server[PBS_MAXSERVERNAME + 1]; int server_suffix = TRUE; created_here = JOB_SVFLG_HERE; memset(host_server, 0, sizeof(host_server)); if ((server.sv_attr[SRV_ATR_display_job_server_suffix].at_flags & ATR_VFLAG_SET) && (server.sv_attr[SRV_ATR_display_job_server_suffix].at_val.at_long == FALSE)) server_suffix = FALSE; if ((server.sv_attr[SRV_ATR_job_suffix_alias].at_flags & ATR_VFLAG_SET) && (server_suffix == TRUE)) { char *svrnm; if (get_fullhostname(pbs_default(), host_server, PBS_MAXSERVERNAME, NULL) == 0) { svrnm = host_server; } else { svrnm = server_name; } snprintf(jidbuf,sizeof(jidbuf),"%d.%s.%s", server.sv_qs.sv_jobidnumber, svrnm, server.sv_attr[SRV_ATR_job_suffix_alias].at_val.at_str); } else if (server.sv_attr[SRV_ATR_job_suffix_alias].at_flags & ATR_VFLAG_SET) { snprintf(jidbuf,sizeof(jidbuf),"%d.%s", server.sv_qs.sv_jobidnumber, server.sv_attr[SRV_ATR_job_suffix_alias].at_val.at_str); } else if (server_suffix == TRUE) { char *svrnm; if (get_fullhostname(pbs_default(), host_server, PBS_MAXSERVERNAME, NULL) == 0) { svrnm = host_server; } else { svrnm = server_name; } snprintf(jidbuf,sizeof(jidbuf),"%d.%s", server.sv_qs.sv_jobidnumber, svrnm); } else { snprintf(jidbuf,sizeof(jidbuf),"%d", server.sv_qs.sv_jobidnumber); } jid = jidbuf; /* having updated sv_jobidnumber, must save server struct */ if (++server.sv_qs.sv_jobidnumber > PBS_SEQNUMTOP) server.sv_qs.sv_jobidnumber = 0; /* wrap it */ /* Make the current job number visible in qmgr print server commnad. */ server.sv_attr[(int)SRV_ATR_NextJobNumber].at_val.at_long = server.sv_qs.sv_jobidnumber; server.sv_attr[(int)SRV_ATR_NextJobNumber].at_flags |= ATR_VFLAG_SET | ATR_VFLAG_MODIFY; if (svr_save(&server, SVR_SAVE_QUICK)) { /* FAILURE */ req_reject(PBSE_INTERNAL, 0, preq, NULL, NULL); return; } } /* does job already exist, check both old and new jobs */ if ((pj = find_job(jid)) == NULL) { pj = (job *)GET_NEXT(svr_newjobs); while (pj != NULL) { if (!strcmp(pj->ji_qs.ji_jobid, jid)) break; pj = (job *)GET_NEXT(pj->ji_alljobs); } } if (pj != NULL) { /* server will reject queue request if job already exists */ log_err(errno, id, "cannot queue new job, job already exists"); req_reject(PBSE_JOBEXIST, 0, preq, NULL, NULL); return; } /* find requested queue, is it there? */ qname = preq->rq_ind.rq_queuejob.rq_destin; if ((*qname == '\0') || (*qname == '@')) { /* use default queue */ pque = get_dfltque(); rc = PBSE_QUENODFLT; } else { /* else find the named queue */ pque = find_queuebyname(preq->rq_ind.rq_queuejob.rq_destin); rc = PBSE_UNKQUE; } if (pque == NULL) { /* FAILURE */ log_err(-1, id, "requested queue not found"); req_reject(rc, 0, preq, NULL, "cannot locate queue"); /* not there */ return; } /* * make up job file name, it is based on the jobid, however the * minimun acceptable file name limit is only 14 character in POSIX, * so we may have to "hash" the name slightly */ strncpy(basename, jid, PBS_JOBBASE); basename[PBS_JOBBASE] = '\0'; do { strcpy(namebuf, path_jobs); strcat(namebuf, basename); strcat(namebuf, JOB_FILE_SUFFIX); fds = open(namebuf, O_CREAT | O_EXCL | O_WRONLY, 0600); if (fds < 0) { if (errno == EEXIST) { pc = basename + strlen(basename) - 1; while (!isprint((int)*pc)) { pc--; if (pc <= basename) { /* FAILURE */ log_err(errno, id, "job file is corrupt"); req_reject(PBSE_INTERNAL, 0, preq, NULL, "job file is corrupt"); return; } } (*pc)++; } else { /* FAILURE */ log_err(errno, id, "cannot create job file"); req_reject(PBSE_SYSTEM, 0, preq, NULL, "cannot open new job file"); return; } } } while (fds < 0); close(fds); /* create the job structure */ if ((pj = job_alloc()) == NULL) { /* FAILURE */ log_err(errno, id, "cannot alloc new job"); unlink(namebuf); req_reject(PBSE_SYSTEM, 0, preq, NULL, "cannot alloc new job structure"); return; } strcpy(pj->ji_qs.ji_jobid, jid); strcpy(pj->ji_qs.ji_fileprefix, basename); pj->ji_modified = 1; pj->ji_qs.ji_svrflags = created_here; pj->ji_qs.ji_un_type = JOB_UNION_TYPE_NEW; pj->ji_wattr[JOB_ATR_mailpnts].at_val.at_str = 0; /* decode attributes from request into job structure */ psatl = (svrattrl *)GET_NEXT(preq->rq_ind.rq_queuejob.rq_attr); while (psatl != NULL) { if (psatl->al_atopl.resource) { if (strcmp(psatl->al_atopl.resource, "nodes") == 0) { pj->ji_have_nodes_request = 1; } } /* identify the attribute by name */ attr_index = find_attr(job_attr_def, psatl->al_name, JOB_ATR_LAST); if (attr_index < 0) { /* FAILURE */ /* didn`t recognize the name */ attr_index = JOB_ATR_UNKN; /* keep as "unknown" for now */ } pdef = &job_attr_def[attr_index]; /* Is attribute not writeable by manager or by a server? */ if ((pdef->at_flags & resc_access_perm) == 0) { /* FAILURE */ job_purge(pj); reply_badattr(PBSE_ATTRRO, 1, psatl, preq); return; } /* decode attribute */ rc = pdef->at_decode( &pj->ji_wattr[attr_index], psatl->al_name, psatl->al_resc, psatl->al_value); if (rc != 0) { if (rc == PBSE_UNKRESC) { /* check for RM extension */ /* NYI */ /* unknown resources not allowed in Exec queue */ if (pque->qu_qs.qu_type == QTYPE_Execution) { /* FAILURE */ job_purge(pj); reply_badattr(rc, 1, psatl, preq); return; } } else { /* FAILURE */ /* any other error is fatal */ job_purge(pj); reply_badattr(rc, 1, psatl, preq); return; } } /* END if (rc != 0) */ psatl = (svrattrl *)GET_NEXT(psatl->al_link); } /* END while (psatl != NULL) */ rc = set_nodes_attr(pj); if(rc) { /* just record that we could not set node count */ sprintf(log_buffer, "Could not set default node count. Error not fatal. Will continue submitting job: %d", rc); LOG_EVENT(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, id, log_buffer); } /* perform any at_action routine declared for the attributes */ for (i = 0; i < JOB_ATR_LAST; ++i) { pdef = &job_attr_def[i]; if ((pj->ji_wattr[i].at_flags & ATR_VFLAG_SET) && (pdef->at_action)) { rc = pdef->at_action(&pj->ji_wattr[i], pj, ATR_ACTION_NEW); if (rc) { job_purge(pj); req_reject(rc, i, preq, NULL, "cannot execute attribute action"); return; } } } /* END for (i) */ /* * Now that the attributes have been decoded, we can setup some * additional parameters and perform a few more checks. * * First, set some items based on who created the job... */ if (created_here) { /* created here */ /* check that job has a jobname */ if ((pj->ji_wattr[JOB_ATR_jobname].at_flags & ATR_VFLAG_SET) == 0) { job_attr_def[JOB_ATR_jobname].at_decode( &pj->ji_wattr[JOB_ATR_jobname], NULL, NULL, "none"); } /* check value of priority */ if (pj->ji_wattr[JOB_ATR_priority].at_flags & ATR_VFLAG_SET) { if ((pj->ji_wattr[JOB_ATR_priority].at_val.at_long < -1024) || (pj->ji_wattr[JOB_ATR_priority].at_val.at_long > 1024)) { job_purge(pj); req_reject(PBSE_BADATVAL, 0, preq, NULL, "invalid job priority"); return; } } /* check if a job id was supplied, and if so overwrite the job id */ if (pj->ji_wattr[JOB_ATR_job_id].at_flags & ATR_VFLAG_SET) { char *dot = strchr(pj->ji_qs.ji_jobid,'.'); char tmp_extension[PBS_MAXSVRJOBID + 1]; job *tmpjob; if (dot != NULL) { strcpy(tmp_extension,dot); snprintf(pj->ji_qs.ji_jobid, sizeof(pj->ji_qs.ji_jobid), "%s%s", pj->ji_wattr[JOB_ATR_job_id].at_val.at_str, tmp_extension); } else { strcpy(pj->ji_qs.ji_jobid,pj->ji_wattr[JOB_ATR_job_id].at_val.at_str); } if ((tmpjob = find_job(pj->ji_qs.ji_jobid)) != NULL) { /* not unique, reject job */ job_purge(pj); snprintf(log_buffer,sizeof(log_buffer), "Job with id %s already exists, cannot set job id\n", pj->ji_qs.ji_jobid); req_reject(PBSE_JOBEXIST,0,preq,NULL,log_buffer); return; } } /* set job owner attribute to user@host */ job_attr_def[JOB_ATR_job_owner].at_free( &pj->ji_wattr[JOB_ATR_job_owner]); strcpy(buf, preq->rq_user); strcat(buf, "@"); strcat(buf, preq->rq_host); job_attr_def[JOB_ATR_job_owner].at_decode( &pj->ji_wattr[JOB_ATR_job_owner], NULL, NULL, buf); /* set create time */ pj->ji_wattr[JOB_ATR_ctime].at_val.at_long = (long)time_now; pj->ji_wattr[JOB_ATR_ctime].at_flags |= ATR_VFLAG_SET; /* set hop count = 1 */ pj->ji_wattr[JOB_ATR_hopcount].at_val.at_long = 1; pj->ji_wattr[JOB_ATR_hopcount].at_flags |= ATR_VFLAG_SET; /* Interactive jobs are necessarily not rerunable */ if ((pj->ji_wattr[JOB_ATR_interactive].at_flags & ATR_VFLAG_SET) && pj->ji_wattr[JOB_ATR_interactive].at_val.at_long) { pj->ji_wattr[JOB_ATR_rerunable].at_val.at_long = 0; pj->ji_wattr[JOB_ATR_rerunable].at_flags |= ATR_VFLAG_SET; } strcpy(buf, pbs_o_que); strcat(buf, pque->qu_qs.qu_name); if (get_variable(pj, pbs_o_host) == NULL) { strcat(buf, ","); strcat(buf, pbs_o_host); strcat(buf, "="); strcat(buf, preq->rq_host); /* We need to add the hostname to the list of attributes for this job */ clear_attr(&tempattr, &job_attr_def[JOB_ATR_submit_host]); rc = job_attr_def[JOB_ATR_submit_host].at_decode( &tempattr, NULL, NULL, preq->rq_host); if (!rc) { rc = job_attr_def[JOB_ATR_submit_host].at_set( &pj->ji_wattr[JOB_ATR_submit_host], &tempattr, INCR); } if (rc) { sprintf(log_buffer, "failed to add submit_host %s. Minor error", preq->rq_host); log_event( PBSEVENT_JOB | PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); } job_attr_def[JOB_ATR_submit_host].at_free(&tempattr); } /* need to set certain environmental variables per POSIX */ clear_attr(&tempattr, &job_attr_def[JOB_ATR_variables]); job_attr_def[JOB_ATR_variables].at_decode( &tempattr, NULL, NULL, buf); job_attr_def[JOB_ATR_variables].at_set( &pj->ji_wattr[JOB_ATR_variables], &tempattr, INCR); job_attr_def[JOB_ATR_variables].at_free(&tempattr); /* if JOB_ATR_outpath/JOB_ATR_errpath not set, set default */ if (!(pj->ji_wattr[JOB_ATR_outpath].at_flags & ATR_VFLAG_SET) || (((pj->ji_wattr[JOB_ATR_outpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str) - 1] == ':')))) { pj->ji_wattr[JOB_ATR_outpath].at_val.at_str = prefix_std_file(pj, (int)'o'); pj->ji_wattr[JOB_ATR_outpath].at_flags |= ATR_VFLAG_SET; } /* * if the output path was specified and ends with a '/' * then append the standard file name */ else if ((pj->ji_wattr[JOB_ATR_outpath].at_flags & ATR_VFLAG_SET) && (((pj->ji_wattr[JOB_ATR_outpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str) - 1] == '/')))) { pj->ji_wattr[JOB_ATR_outpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str) - 1] = '\0'; replace_attr_string( &pj->ji_wattr[JOB_ATR_outpath], (add_std_filename(pj, pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, (int)'o'))); } else if (pj->ji_wattr[JOB_ATR_outpath].at_flags & ATR_VFLAG_SET) { /* check to see if the path pointed to is a directory. If so apendd the default output file name. Otherwise we treat the string as a path to a file */ char *ptr; ptr = strchr(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, ':'); if(ptr) { ptr++; rc = stat(ptr, &stat_buf); } else { rc = stat(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, &stat_buf); } if(rc == 0) { if(S_ISDIR(stat_buf.st_mode)) { /* strcat(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, "/"); */ replace_attr_string( &pj->ji_wattr[JOB_ATR_outpath], (add_std_filename(pj, pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, (int)'o'))); } } } if (!(pj->ji_wattr[JOB_ATR_errpath].at_flags & ATR_VFLAG_SET) || (((pj->ji_wattr[JOB_ATR_errpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_errpath].at_val.at_str) - 1] == ':')))) { pj->ji_wattr[JOB_ATR_errpath].at_val.at_str = prefix_std_file(pj, (int)'e'); pj->ji_wattr[JOB_ATR_errpath].at_flags |= ATR_VFLAG_SET; } /* * if the error path was specified and ends with a '/' * then append the standard file name */ else if ((pj->ji_wattr[JOB_ATR_errpath].at_flags & ATR_VFLAG_SET) && (((pj->ji_wattr[JOB_ATR_errpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_errpath].at_val.at_str) - 1] == '/')))) { pj->ji_wattr[JOB_ATR_errpath].at_val.at_str[strlen(pj->ji_wattr[JOB_ATR_errpath].at_val.at_str) - 1] = '\0'; replace_attr_string( &pj->ji_wattr[JOB_ATR_errpath], (add_std_filename(pj, pj->ji_wattr[JOB_ATR_errpath].at_val.at_str, (int)'e'))); } else if (pj->ji_wattr[JOB_ATR_errpath].at_flags & ATR_VFLAG_SET) { /* check to see if the path pointed to is a directory. If so apendd the default output file name. Otherwise we treat the string as a path to a file */ char *ptr; ptr = strchr(pj->ji_wattr[JOB_ATR_errpath].at_val.at_str, ':'); if(ptr) { ptr++; rc = stat(ptr, &stat_buf); } else { rc = stat(pj->ji_wattr[JOB_ATR_errpath].at_val.at_str, &stat_buf); } if(rc == 0) { if(S_ISDIR(stat_buf.st_mode)) { /* strcat(pj->ji_wattr[JOB_ATR_outpath].at_val.at_str, "/"); */ replace_attr_string( &pj->ji_wattr[JOB_ATR_errpath], (add_std_filename(pj, pj->ji_wattr[JOB_ATR_errpath].at_val.at_str, (int)'e'))); } } } if ((pj->ji_wattr[JOB_ATR_outpath].at_val.at_str == NULL) || (pj->ji_wattr[JOB_ATR_errpath].at_val.at_str == NULL)) { job_purge(pj); req_reject(PBSE_NOATTR, 0, preq, NULL, "no output/error file specified"); return; } /* * set any "unspecified" checkpoint with queue default values, if any */ set_chkpt_deflt(pj, pque); /* If queue has checkpoint directory name specified, propagate it to the job. */ if (!(pj->ji_wattr[JOB_ATR_checkpoint_dir].at_flags & ATR_VFLAG_SET)) { attribute *pattr; char *vp; pattr = &pj->ji_wattr[JOB_ATR_checkpoint]; if ((pattr->at_flags & ATR_VFLAG_SET) && (vp = csv_find_value(pattr->at_val.at_str, "dir"))) { clear_attr( &pj->ji_wattr[JOB_ATR_checkpoint_dir], &job_attr_def[JOB_ATR_checkpoint_dir]); job_attr_def[JOB_ATR_checkpoint_dir].at_decode( &pj->ji_wattr[JOB_ATR_checkpoint_dir], NULL, NULL, vp); } else if ((pque->qu_attr[QE_ATR_checkpoint_dir].at_flags & ATR_VFLAG_SET) && (pque->qu_attr[QE_ATR_checkpoint_dir].at_val.at_str)) { job_attr_def[JOB_ATR_checkpoint_dir].at_set( &pj->ji_wattr[JOB_ATR_checkpoint_dir], &pque->qu_attr[QE_ATR_checkpoint_dir], SET); } else if ((server.sv_attr[SRV_ATR_checkpoint_dir].at_flags & ATR_VFLAG_SET) && (server.sv_attr[SRV_ATR_checkpoint_dir].at_val.at_str)) { job_attr_def[JOB_ATR_checkpoint_dir].at_set( &pj->ji_wattr[JOB_ATR_checkpoint_dir], &server.sv_attr[SRV_ATR_checkpoint_dir], SET); } } #ifdef PNOT /************************************************************* * Argonne National Laboratory Account/Project enforcement patch * Dec 2000, Sep 2004, JP Navarro navarro@mcs.anl.gov * Verify the specified account (project) is valid for the user * If none is specified, assign the user's default if allowed * Else return error: valid user account is required *************************************************************/ if (pj->ji_wattr[JOB_ATR_account].at_flags & ATR_VFLAG_SET) { /* account specified, reject if it's not valid for user */ if (user_account_verify( preq->rq_user, pj->ji_wattr[JOB_ATR_account].at_val.at_str) == 0) { job_purge(pj); req_reject(PBSE_BADACCT, 0, preq, NULL, "invalid account"); return; } } else { /* account not specified, get default value */ job_attr_def[JOB_ATR_account].at_decode( &pj->ji_wattr[JOB_ATR_account], NULL, NULL, (char *)user_account_default(preq->rq_user)); if (pj->ji_wattr[JOB_ATR_account].at_val.at_str == 0) { /* no default found */ job_purge(pj); req_reject(PBSE_BADACCT, 0, preq, NULL, "no default account available"); return; } } #endif /* PNOT */ } /* END if (created_here) */ else { /* job was created elsewhere and moved here */ /* make sure job_owner is set, error if not */ if (!(pj->ji_wattr[JOB_ATR_job_owner].at_flags & ATR_VFLAG_SET)) { job_purge(pj); log_err(errno, "req_quejob", "job owner not set"); req_reject(PBSE_IVALREQ, 0, preq, NULL, "no job owner specified"); return; } /* increment hop count */ if (++pj->ji_wattr[JOB_ATR_hopcount].at_val.at_long > PBS_MAX_HOPCOUNT) { job_purge(pj); req_reject(PBSE_HOPCOUNT, 0, preq, NULL, "max job hop reached"); return; } } /* set up at_server attribute for status */ job_attr_def[JOB_ATR_at_server].at_decode( &pj->ji_wattr[JOB_ATR_at_server], NULL, NULL, server_name); /* * See if the job is qualified to go into the requested queue. * Note, if an execution queue, then ji_qs.ji_un.ji_exect is set up * * svr_chkque() is called way down here because it needs to have the * job structure and attributes already set up. */ /* set ji_is_array_template before calling */ if (pj->ji_wattr[JOB_ATR_job_array_request].at_flags & ATR_VFLAG_SET) { char *oldid; char *hostname; pj->ji_is_array_template = TRUE; /* rewrite jobid to include empty brackets this causes arrays to show up as id[].host in qstat output, and actions applied to id[] are applied to the entire array */ oldid = strdup(pj->ji_qs.ji_jobid); if (oldid == NULL) { /* TODO, return with error if unable to alocate memory! */ } hostname = index(oldid, '.'); if (hostname != NULL) { *(hostname++) = '\0'; snprintf(pj->ji_qs.ji_jobid, sizeof(pj->ji_qs.ji_jobid), "%s[].%s", oldid, hostname); } else { snprintf(pj->ji_qs.ji_jobid, sizeof(pj->ji_qs.ji_jobid), "%s[]", oldid); } free(oldid); } if ((rc = svr_chkque(pj, pque, preq->rq_host, MOVE_TYPE_Move, EMsg))) { job_purge(pj); req_reject(rc, 0, preq, NULL, EMsg); return; } /* FIXME: if EMsg[0] != '\0', send a warning email to the user */ strcpy(pj->ji_qs.ji_queue, pque->qu_qs.qu_name); pj->ji_wattr[JOB_ATR_substate].at_val.at_long = JOB_SUBSTATE_TRANSIN; pj->ji_wattr[JOB_ATR_substate].at_flags |= ATR_VFLAG_SET; /* set remaining job structure elements */ pj->ji_qs.ji_state = JOB_STATE_TRANSIT; pj->ji_qs.ji_substate = JOB_SUBSTATE_TRANSIN; pj->ji_wattr[JOB_ATR_mtime].at_val.at_long = (long)time_now; pj->ji_wattr[JOB_ATR_mtime].at_flags |= ATR_VFLAG_SET; pj->ji_qs.ji_un_type = JOB_UNION_TYPE_NEW; pj->ji_qs.ji_un.ji_newt.ji_fromsock = sock; pj->ji_qs.ji_un.ji_newt.ji_fromaddr = get_connectaddr(sock); pj->ji_qs.ji_un.ji_newt.ji_scriptsz = 0; /* acknowledge the request with the job id */ if (reply_jobid(preq, pj->ji_qs.ji_jobid, BATCH_REPLY_CHOICE_Queue) != 0) { /* reply failed, purge the job and close the connection */ close_conn(sock); job_purge(pj); return; } /* link job into server's new jobs list request */ append_link(&svr_newjobs, &pj->ji_alljobs, pj); return; } /* END req_quejob() */ /* * req_jobcredential - receive a set of credentials to be used by the job * * THIS IS JUST A PLACE HOLDER FOR NOW * It does nothing but acknowledge the request */ void req_jobcredential( struct batch_request *preq) /* ptr to the decoded request */ { job *pj; pj = locate_new_job(preq->rq_conn, NULL); if (pj == NULL) { req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); return; } if (svr_authorize_jobreq(preq, pj) == -1) { req_reject(PBSE_PERM, 0, preq, NULL, "job request not authorized"); return; } reply_ack(preq); return; } /* END req_jobcredential() */ /* * req_jobscript - receive job script section * * Each section is appended to the file */ void req_jobscript( struct batch_request *preq) /* ptr to the decoded request*/ { char *id = "req_jobscript"; int fds; char namebuf[MAXPATHLEN]; job *pj; int filemode = 0600; errno = 0; pj = locate_new_job(preq->rq_conn, preq->rq_ind.rq_jobfile.rq_jobid); if (pj == NULL) { log_err(errno, id, "cannot locate new job"); req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); return; } /* what is the difference between JOB_SUBSTATE_TRANSIN and TRANSICM? */ if (pj->ji_qs.ji_substate != JOB_SUBSTATE_TRANSIN) { if (errno == 0) { snprintf(log_buffer, sizeof(log_buffer), "job %s in unexpected state '%s'", pj->ji_qs.ji_jobid, PJobSubState[pj->ji_qs.ji_substate]); } else { snprintf(log_buffer, sizeof(log_buffer), "job %s in unexpected state '%s' (errno=%d - %s)", pj->ji_qs.ji_jobid, PJobSubState[pj->ji_qs.ji_substate], errno, strerror(errno)); } log_err(errno, id, log_buffer); req_reject(PBSE_IVALREQ, 0, preq, NULL, log_buffer); return; } if (svr_authorize_jobreq(preq, pj) == -1) { /* FAILURE */ log_err(errno, id, "cannot authorize request"); req_reject(PBSE_PERM, 0, preq, NULL, "cannot receive job script"); return; } strcpy(namebuf, path_jobs); strcat(namebuf, pj->ji_qs.ji_fileprefix); strcat(namebuf, JOB_SCRIPT_SUFFIX); if (pj->ji_qs.ji_un.ji_newt.ji_scriptsz == 0) { /* NOTE: fail is job script already exists */ fds = open(namebuf, O_WRONLY | O_CREAT | O_EXCL | O_Sync, filemode); } else { fds = open(namebuf, O_WRONLY | O_APPEND | O_Sync, filemode); } if (fds < 0) { char tmpLine[1024]; snprintf(tmpLine, sizeof(tmpLine), "cannot open '%s' errno=%d - %s", namebuf, errno, strerror(errno)); /* FAILURE */ /* NOTE: log_err may modify errno */ log_err(errno, id, msg_script_open); req_reject(PBSE_INTERNAL, 0, preq, NULL, tmpLine); return; } if (write( fds, preq->rq_ind.rq_jobfile.rq_data, (unsigned)preq->rq_ind.rq_jobfile.rq_size) != preq->rq_ind.rq_jobfile.rq_size) { /* FAILURE */ log_err(errno, id, msg_script_write); req_reject(PBSE_INTERNAL, 0, preq, NULL, "cannot write job command file"); close(fds); return; } close(fds); pj->ji_qs.ji_un.ji_newt.ji_scriptsz += preq->rq_ind.rq_jobfile.rq_size; /* job has a script file */ pj->ji_qs.ji_svrflags = (pj->ji_qs.ji_svrflags & ~JOB_SVFLG_CHECKPOINT_FILE) | JOB_SVFLG_SCRIPT; /* SUCCESS */ reply_ack(preq); return; } /* END req_jobscript() */ /* the following is for the server only, MOM has her own version below */ /* * req_mvjobfile - receive a job file * This request is used to move a file associated with a job, typically * the standard output or error, between a server and a server or from * a mom back to a server. For a server, the destination is always * within the spool directory. */ void req_mvjobfile( /* NOTE: routine for server only - mom code follows this routine */ struct batch_request *preq) /* ptr to the decoded request */ { int fds; char namebuf[MAXPATHLEN]; job *pj; pj = locate_new_job(preq->rq_conn, NULL); if (pj == NULL) pj = find_job(preq->rq_ind.rq_jobfile.rq_jobid); if ((preq->rq_fromsvr == 0) || (pj == NULL)) { snprintf(log_buffer, 1024, "cannot find job %s", preq->rq_ind.rq_jobfile.rq_jobid); log_err(errno, "req_mvjobfile", log_buffer); req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); return; } strcpy(namebuf, path_spool); strcat(namebuf, pj->ji_qs.ji_fileprefix); switch ((enum job_file)preq->rq_ind.rq_jobfile.rq_type) { case StdOut: strcat(namebuf, JOB_STDOUT_SUFFIX); break; case StdErr: strcat(namebuf, JOB_STDERR_SUFFIX); break; case Checkpoint: strcat(namebuf, JOB_CHECKPOINT_SUFFIX); break; default: log_err(errno, "req_mvjobfile", "unexpected move type"); req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); return; /*NOTREACHED*/ break; } if (preq->rq_ind.rq_jobfile.rq_sequence == 0) { unlink(namebuf); fds = open(namebuf, O_WRONLY | O_CREAT | O_EXCL | O_Sync, 0600); } else fds = open(namebuf, O_WRONLY | O_APPEND | O_Sync, 0600); if (fds < 0) { log_err(errno, "req_mvjobfile", msg_script_open); req_reject(PBSE_INTERNAL, 0, preq, NULL, NULL); return; } if (write( fds, preq->rq_ind.rq_jobfile.rq_data, (unsigned)preq->rq_ind.rq_jobfile.rq_size) != preq->rq_ind.rq_jobfile.rq_size) { log_err(errno, "req_jobfile", msg_script_write); req_reject(PBSE_SYSTEM, 0, preq, NULL, NULL); close(fds); return; } close(fds); if (LOGLEVEL >= 6) { snprintf(log_buffer, sizeof(log_buffer), "successfully moved file '%s' for job '%s'", namebuf, preq->rq_ind.rq_jobfile.rq_jobid); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", log_buffer); } reply_ack(preq); return; } /* END req_mvjobfile() */ /* * req_rdytocommit - Ready To Commit Batch Request * * Set substate to JOB_SUBSTATE_TRANSICM and * record job to permanent storage, i.e. written to the job save file * (called by both pbs_server and pbs_mom) */ void req_rdytocommit( struct batch_request *preq) /* I */ { job *pj; int sock = preq->rq_conn; int OrigState; int OrigSState; char OrigSChar; long OrigFlags; char *id = "req_rdytocommit"; char namebuf[MAXPATHLEN+1]; pj = locate_new_job(sock, preq->rq_ind.rq_rdytocommit); if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "ready to commit job"); } if (pj == NULL) { log_err(errno, id, "unknown job id"); req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); /* FAILURE */ return; } if (pj->ji_qs.ji_substate != JOB_SUBSTATE_TRANSIN) { log_err(errno, id, "cannot commit job in unexpected state"); req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); /* FAILURE */ return; } if (svr_authorize_jobreq(preq, pj) == -1) { req_reject(PBSE_PERM, 0, preq, NULL, "cannot authorize jobreq"); /* FAILURE */ return; } OrigState = pj->ji_qs.ji_state; OrigSState = pj->ji_qs.ji_substate; OrigSChar = pj->ji_wattr[(int)JOB_ATR_state].at_val.at_char; OrigFlags = pj->ji_wattr[(int)JOB_ATR_state].at_flags; pj->ji_qs.ji_state = JOB_STATE_TRANSIT; pj->ji_qs.ji_substate = JOB_SUBSTATE_TRANSICM; pj->ji_wattr[(int)JOB_ATR_state].at_val.at_char = 'T'; pj->ji_wattr[(int)JOB_ATR_state].at_flags |= ATR_VFLAG_SET; /* if this is a job array template then we'll delete the .JB file that was created for this job since we are going to save it with a different suffix here. XXX: not sure why the .JB file already exists before we do the SAVEJOB_NEW save below */ if (pj->ji_wattr[(int)JOB_ATR_job_array_request].at_flags & ATR_VFLAG_SET) { pj->ji_is_array_template = TRUE; strcpy(namebuf, path_jobs); strcat(namebuf, pj->ji_qs.ji_fileprefix); strcat(namebuf, JOB_FILE_SUFFIX); unlink(namebuf); } if (job_save(pj, SAVEJOB_NEW) == -1) { char tmpLine[1024]; snprintf(tmpLine, sizeof(tmpLine), "cannot save job - errno=%d - %s", errno, strerror(errno)); log_err(errno, id, tmpLine); /* commit failed, backoff state changes */ pj->ji_qs.ji_state = OrigState; pj->ji_qs.ji_substate = OrigSState; pj->ji_wattr[(int)JOB_ATR_state].at_val.at_char = OrigSChar; pj->ji_wattr[(int)JOB_ATR_state].at_flags = OrigFlags; req_reject(PBSE_SYSTEM, 0, preq, NULL, tmpLine); /* FAILURE */ return; } /* acknowledge the request with the job id */ if (reply_jobid(preq, pj->ji_qs.ji_jobid, BATCH_REPLY_CHOICE_RdytoCom) != 0) { /* reply failed, purge the job and close the connection */ snprintf(log_buffer, sizeof(log_buffer), "cannot report jobid - errno=%d - %s", errno, strerror(errno)); log_err(errno, id, log_buffer); close_conn(sock); job_purge(pj); /* FAILURE */ return; } if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "ready to commit job completed"); } return; } /* END req_rdytocommit() */ /* * req_commit - commit ownership of job * * Set state of job to JOB_STATE_QUEUED (or Held or Waiting) and * enqueue the job into its destination queue. */ void req_commit( struct batch_request *preq) /* I */ { job *pj; int newstate; int newsub; pbs_queue *pque; int rc; #ifdef AUTORUN_JOBS struct batch_request *preq_run = '\0'; attribute *pattr; int nodes_avail = -1; int dummy; char *spec = NULL; char *rq_destin = NULL; #endif /* AUTORUN_JOBS */ #ifdef QUICKCOMMIT int OrigState; int OrigSState; char OrigSChar; long OrigFlags; char namebuf[MAXPATHLEN+1]; #endif /* QUICKCOMMIT */ pj = locate_new_job(preq->rq_conn, preq->rq_ind.rq_commit); if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "committing job"); } if (pj == NULL) { req_reject(PBSE_UNKJOBID, 0, preq, NULL, NULL); return; } #ifdef QUICKCOMMIT if (pj->ji_qs.ji_substate != JOB_SUBSTATE_TRANSIN) { log_err(errno, "req_commit", "cannot commit job in unexpected state"); req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); /* FAILURE */ return; } OrigState = pj->ji_qs.ji_state; OrigSState = pj->ji_qs.ji_substate; OrigSChar = pj->ji_wattr[(int)JOB_ATR_state].at_val.at_char; OrigFlags = pj->ji_wattr[(int)JOB_ATR_state].at_flags; pj->ji_qs.ji_state = JOB_STATE_TRANSIT; pj->ji_qs.ji_substate = JOB_SUBSTATE_TRANSICM; pj->ji_wattr[(int)JOB_ATR_state].at_val.at_char = 'T'; pj->ji_wattr[(int)JOB_ATR_state].at_flags |= ATR_VFLAG_SET; if (pj->ji_wattr[(int)JOB_ATR_job_array_request].at_flags & ATR_VFLAG_SET) { pj->ji_is_array_template = TRUE; strcpy(namebuf, path_jobs); strcat(namebuf, pj->ji_qs.ji_fileprefix); strcat(namebuf, JOB_FILE_SUFFIX); unlink(namebuf); } #endif /* QUICKCOMMIT */ if (pj->ji_qs.ji_substate != JOB_SUBSTATE_TRANSICM) { log_err(errno, "req_commit", "cannot commit job in unexpected state"); req_reject(PBSE_IVALREQ, 0, preq, NULL, NULL); return; } if (svr_authorize_jobreq(preq, pj) == -1) { req_reject(PBSE_PERM, 0, preq, NULL, NULL); if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "no permission to start job"); } return; } /* remove job from the server new job list, set state, and enqueue it */ delete_link(&pj->ji_alljobs); /* job array, setup the array task *** job array under development */ if (pj->ji_is_array_template) { if ((rc = setup_array_struct(pj))) { if (rc == ARRAY_TOO_LARGE) { snprintf(log_buffer,sizeof(log_buffer), "Requested array size too large, limit is %ld", server.sv_attr[SRV_ATR_MaxArraySize].at_val.at_long); req_reject(PBSE_BAD_ARRAY_REQ, 0, preq, NULL, log_buffer); } else if (rc == INVALID_SLOT_LIMIT) { snprintf(log_buffer,sizeof(log_buffer), "Requested slot limit too large, limit is %ld", server.sv_attr[SRV_ATR_MaxSlotLimit].at_val.at_long); req_reject(PBSE_BAD_ARRAY_REQ, 0, preq, NULL, log_buffer); } else { req_reject(PBSE_BAD_ARRAY_REQ, 0, preq, NULL, NULL); } return; } } /* end if (pj->ji_wattr[(int)JOB_ATR_job_array_request].at_flags & ATR_VFLAG_SET) */ svr_evaljobstate(pj, &newstate, &newsub, 1); svr_setjobstate(pj, newstate, newsub); /* set the queue rank attribute */ pj->ji_wattr[(int)JOB_ATR_qrank].at_val.at_long = ++queue_rank; pj->ji_wattr[(int)JOB_ATR_qrank].at_flags |= ATR_VFLAG_SET; if ((rc = svr_enquejob(pj))) { job_purge(pj); req_reject(rc, 0, preq, NULL, NULL); if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "cannot queue job"); } return; } if (job_save(pj, SAVEJOB_FULL) != 0) { job_purge(pj); req_reject(PBSE_SYSTEM, 0, preq, NULL, NULL); if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "cannot save job"); } return; } /* * if the job went into a Route (push) queue that has been started, * try once to route it to give immediate feedback as a courtsey * to the user. */ pque = pj->ji_qhdr; if ((preq->rq_fromsvr == 0) && (pque->qu_qs.qu_type == QTYPE_RoutePush) && (pque->qu_attr[(int)QA_ATR_Started].at_val.at_long != 0)) { if ((rc = job_route(pj))) { job_purge(pj); req_reject(rc, 0, preq, NULL, NULL); if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", "job route job"); } /* FAILURE */ return; } } #ifdef AUTORUN_JOBS /* If we are auto running jobs with start_count = 0 then the * branch_request needs re creation since reply_jobid will free * the passed in one */ pattr = &pj->ji_wattr[(int)JOB_ATR_start_count]; spec = PBS_DEFAULT_NODE; node_avail_complex(spec, &nodes_avail, &dummy, &dummy, &dummy); if ((pattr->at_val.at_long == 0) && (nodes_avail > 0)) { /* Create a new batch request and fill it in */ preq_run = alloc_br(PBS_BATCH_AsyrunJob); preq_run->rq_perm = preq->rq_perm | ATR_DFLAG_OPWR; preq_run->rq_ind.rq_run.rq_resch = 0; preq_run->rq_ind.rq_run.rq_destin = rq_destin; preq_run->rq_fromsvr = preq->rq_fromsvr; preq_run->rq_extsz = preq->rq_extsz; preq_run->rq_noreply = TRUE; /* set for no replies */ memcpy(preq_run->rq_user, preq->rq_user, PBS_MAXUSER + 1); memcpy(preq_run->rq_host, preq->rq_host, PBS_MAXHOSTNAME + 1); memcpy(preq_run->rq_ind.rq_run.rq_jid, preq->rq_ind.rq_rdytocommit, PBS_MAXSVRJOBID + 1); } #endif /* need to format message first, before request goes away */ snprintf(log_buffer, sizeof(log_buffer), msg_jobnew, preq->rq_user, preq->rq_host, pj->ji_wattr[(int)JOB_ATR_job_owner].at_val.at_str, pj->ji_wattr[(int)JOB_ATR_jobname].at_val.at_str, pj->ji_qhdr->qu_qs.qu_name); /* acknowledge the request with the job id */ reply_jobid(preq, pj->ji_qs.ji_jobid, BATCH_REPLY_CHOICE_Commit); /* if job array, setup the cloning work task */ if (pj->ji_is_array_template) { set_task(WORK_Timed, time_now + 1, job_clone_wt, (void*)pj); } LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pj->ji_qs.ji_jobid, log_buffer); if ((pj->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) { /* notify creator where job is */ issue_track(pj); } #ifdef AUTORUN_JOBS /* If we are auto running jobs with start_count = 0 then the * branch_request was re created. Now we run the job if any nodes * are available */ if ((pattr->at_val.at_long == 0) && (nodes_avail > 0)) { if (LOGLEVEL >= 7) { snprintf(log_buffer, sizeof(log_buffer), "Trying to AUTORUN job %s", pj->ji_qs.ji_jobid); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, (pj != NULL) ? pj->ji_qs.ji_jobid : "NULL", log_buffer); } req_runjob(preq_run); } #endif /* AUTORUN_JOBS */ return; } /* END req_commit() */ /* * locate_new_job - locate a "new" job which has been set up req_quejob on * the servers new job list. * * This function is used by the sub-requests which make up the global * "Queue Job Request" to locate the job structure. * * If the jobid is specified (will be for rdytocommit and commit, but not * for script), we search for a matching jobid. * * The job must (also) match the socket specified and the host associated * with the socket unless ji_fromsock == -1, then its a recovery situation. */ /* FIXME: why bother checking for matching sock if a jobid is supplied? Seems * to me that a reconnect immediately invalidates fromsock. */ static job *locate_new_job( int sock, /* I */ char *jobid) /* I (optional) */ { job *pj; pj = (job *)GET_NEXT(svr_newjobs); while (pj != NULL) { if ((pj->ji_qs.ji_un.ji_newt.ji_fromsock == -1) || ((pj->ji_qs.ji_un.ji_newt.ji_fromsock == sock) && (pj->ji_qs.ji_un.ji_newt.ji_fromaddr == get_connectaddr(sock)))) { if ((jobid != NULL) && (*jobid != '\0')) { if (!strncmp(pj->ji_qs.ji_jobid, jobid, PBS_MAXSVRJOBID)) { /* requested job located */ break; } } else if (pj->ji_qs.ji_un.ji_newt.ji_fromsock == -1) { /* empty job slot located */ break; } else { /* matching job slot located */ break; } } /* END if ((pj->ji_qs.ji_un.ji_newt.ji_fromsock == -1) || ...) */ pj = (job *)GET_NEXT(pj->ji_alljobs); } /* END while(pj != NULL) */ /* return job slot located (NULL on FAILURE) */ return(pj); } /* END locate_new_job() */ /* * City Tools patch */ #define UserAcctMax 12 struct { int ActCnt; /* How many projects */ int ActMax; /* Max allowed in this struct */ char ActRaw[80]; /* The raw project data */ char ActDat[80]; /* ActRaw with \0 as necessary */ char *ActAdr[UserAcctMax]; /* Pointers to ActDat items */ } UserAcct = { 0, UserAcctMax, "", "", {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} }; /* UserAcctMax values */ #ifdef PNOT int user_account_verify( char *arguser, char *argaccount) { int i; int rc = 0; /* 0 = failure, 1 = success */ if (!user_account_read_user(arguser)) { sprintf(log_buffer, "user_account_verify(%s, %s) -> USER NOT FOUND", arguser, argaccount); goto user_account_verify_done; } for (i = 0;i < UserAcct.ActCnt;++i) { if (strcmp(argaccount, UserAcct.ActAdr[i]) == 0) { sprintf(log_buffer, "user_account_verify(%s, %s) -> SUCCESS", arguser, argaccount); rc = 1; goto user_account_verify_done; } } /* END for (i) */ sprintf(log_buffer, "user_account_verify(%s, %s) -> FAILED", arguser, argaccount); user_account_verify_done: log_event( PBSEVENT_JOB | PBSEVENT_SECURITY, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); return(rc); } char *user_account_default( char *arguser) { char *rc = 0; /* 0 = failure,
= success */ if (!user_account_read_user(arguser)) { sprintf(log_buffer, "user_account_default(%s) = USER NOT FOUND", arguser); goto user_account_default_done; } if (UserAcct.ActCnt < 1) { sprintf(log_buffer, "user_account_default(%s) = NO PROJECT FOUND", arguser); goto user_account_default_done; } rc = UserAcct.ActAdr[0]; sprintf(log_buffer, "user_account_default(%s) = %s", arguser, rc); user_account_default_done: log_event( PBSEVENT_JOB | PBSEVENT_SECURITY, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); return(rc); } /* END user_account_default() */ /* * Given a username, returns that user's accounts from the user->account file * Returns 0 if username isn't found */ #define AcctScanUser 0 #define AcctScanAcct 1 #define AcctScanLine -1 #define AcctFile "/usr/local/etc/usertg/project-by-user-torque.map" int user_account_read_user( char *arguser) { char proj_file[] = AcctFile; int fd; char s_buf[128*1024]; int readsize; int scanmode = AcctScanUser; int i, j; char *ci; char *cj; int arguserlen; arguserlen = strlen(arguser); if ((fd = open(proj_file, O_RDONLY, 0)) < 0) { return(0); } if (lseek(fd, (off_t)0, SEEK_SET) != 0) { close(fd); return(0); } readsize = read(fd, s_buf, sizeof(s_buf)); close(fd); if ((readsize < 1) || (readsize > sizeof(s_buf))) { /* bail if not sane */ return(0); } for (i = 0;i < readsize;++i) { /* First, handle comments and whitespace */ if (scanmode == AcctScanLine) { /* Looking for new line */ if (s_buf[i] == '\n') /* Found it */ scanmode = AcctScanUser; continue; } if (isspace(s_buf[i])) /* Skip spaces */ continue; if (s_buf[i] == '#') { /* Comment found */ scanmode = AcctScanLine; continue; } /* Next, handle user and account scanning */ if (scanmode == AcctScanUser) { if ((i + arguserlen) > readsize) /* Past the end */ { return(0); } if (strncmp(&s_buf[i], arguser, arguserlen)) { scanmode = AcctScanLine; /* Not arguser, next line */ continue; } if (isspace(s_buf[i + arguserlen]) || s_buf[i + arguserlen] == ':') { i += arguserlen; /* Is arguser */ scanmode = AcctScanAcct; } else { /* Whatever, ignore it */ scanmode = AcctScanLine; continue; } } else { /* scanmode == AcctScanAcct */ if ((s_buf[i] == ':') || isspace(s_buf[i])) continue; for (j = i;j < readsize;j++) { if (isspace(s_buf[j])) { strncpy(UserAcct.ActRaw, &s_buf[i], j - i); UserAcct.ActRaw[j-i] = '\0'; goto have_account; } } return(0); } } /* END for (i) */ return(0); /* Never found it */ have_account: if (strlen(UserAcct.ActRaw) < 1) /* Nothing found */ { return(0); } strcpy(UserAcct.ActDat, UserAcct.ActRaw); UserAcct.ActCnt = 0; UserAcct.ActMax = UserAcctMax; for (ci = &UserAcct.ActDat[0];*ci != '\0';ci++) { if (isspace(*ci) || (*ci == ',')) continue; for (cj = ci + 1;!isspace(*cj) && (*cj != ',') && (*cj != '\0');cj++) { /* NO-OP */ } *cj = '\0'; UserAcct.ActAdr[UserAcct.ActCnt++] = ci; ci = cj; } return(1); } /* END user_account_read_user() */ #endif /* PNOT */ /* END req_quejob.c() */