#include "license_pbs.h" /* See here for the software license */ #include /* the master config generated by configure */ /* this file contains functions for manipulating job arrays included functions: is_array() determine if jobnum is actually an array identifyer get_array() return array struct for given "parent id" array_save() save array struct to disk array_get_parent_id() return id of parent job if job belongs to a job array array_recov() recover the array struct for a job array at restart array_delete() free memory used by struct and delete sved struct on disk */ #include #include #include #include #include #include /* INT_MAX */ /* this macro is for systems like BSD4 that do not have O_SYNC in fcntl.h, * but do have O_FSYNC! */ #ifndef O_SYNC #define O_SYNC O_FSYNC #endif /* !O_SYNC */ #include #include #include "pbs_ifl.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "../lib/Libifl/lib_ifl.h" #include "list_link.h" #include "attribute.h" #include "server_limits.h" #include "server.h" #include "pbs_job.h" #include "queue.h" #include "pbs_error.h" #include "svrfunc.h" #include "work_task.h" #include "utils.h" #include "array.h" #include "svr_func.h" #include "job_func.h" /* svr_job_purge */ #include "ji_mutex.h" #include "mutex_mgr.hpp" #include "batch_request.h" extern int array_upgrade(job_array *, int, int, int *); extern char *get_correct_jobname(const char *jobid); extern int count_user_queued_jobs(pbs_queue *,char *); extern void post_modify_arrayreq(batch_request *preq); /* global data items used */ /* list of job arrays */ extern struct server server; struct all_jobs array_summary; static struct all_arrays allarrays; extern char *path_arrays; extern char *path_jobs; extern int LOGLEVEL; extern char *pbs_o_host; extern int array_259_upgrade; int is_num(const char *); int array_request_token_count(const char *); int array_request_parse_token(char *, int *, int *); int parse_array_request(char *request, tlist_head *tl); job_array *next_array_check(int *, job_array *); job_array *get_array_from_hash(hash_map *hm, const char *id); /* search job array list to determine if id is a job array */ int is_array( char *id) { int rc = FALSE; char *bracket_ptr; char *end_bracket_ptr; char *tmpjobid; char jobid[PBS_MAXSVRJOBID]; char temp_jobid[PBS_MAXSVRJOBID]; tmpjobid = get_correct_jobname(id); if (tmpjobid == NULL) { /* Maybe we should just return ENOMEM? */ snprintf(jobid, sizeof(jobid), "%s", id); } else { snprintf(jobid, sizeof(jobid), "%s", tmpjobid); free(tmpjobid); } /* Check to see if we have an array dependency */ /* If there is an array dependency count then we will */ /* have an id of something like arrayid[][1]. We need to take */ /* off the [1] so we can compare the array id with and existing */ /* array entry. */ if ((bracket_ptr = strchr(jobid,'[')) != NULL) { /* Make sure the next character is ']' */ if (*(++bracket_ptr) != ']') { /* If we do not have a ']' then we have bad syntax. */ return(FALSE); } if (*(++bracket_ptr) == '[') { /* we made it to here. That means we have a count inside brackets. Just truncate them for the name comparison */ end_bracket_ptr = strchr(bracket_ptr, ']'); if (end_bracket_ptr == NULL) { /* we do not have a ']' then we have bad syntax. */ return(FALSE); } /* advance end_bracket_ptr one. We should be either NULL or '.' */ end_bracket_ptr++; /* truncate the string */ *bracket_ptr = 0; /* this makes jobid just the arrayid name */ /* append the rest of the job id */ snprintf(temp_jobid, sizeof(jobid), "%s%s", jobid, end_bracket_ptr); snprintf(jobid, sizeof(jobid), "%s", temp_jobid); } } else { /* No '[' then we do not have an array */ return (FALSE); } pthread_mutex_lock(allarrays.allarrays_mutex); if (get_array_from_hash(allarrays.hm, jobid) != NULL) rc = TRUE; pthread_mutex_unlock(allarrays.allarrays_mutex); return(rc); } /* END is_array() */ /* return a server's array info struct corresponding to an array id */ job_array *get_array( char *id) { job_array *pa; char *tmpjobid; tmpjobid = get_correct_jobname(id); pthread_mutex_lock(allarrays.allarrays_mutex); pa = get_array_from_hash(allarrays.hm, tmpjobid); if (pa != NULL) lock_ai_mutex(pa, __func__, NULL, LOGLEVEL); pthread_mutex_unlock(allarrays.allarrays_mutex); if (tmpjobid != NULL) free(tmpjobid); return(pa); } /* END get_array() */ /* save a job array struct to disk returns zero if no errors*/ int array_save( job_array *pa) { int fds; char namebuf[MAXPATHLEN]; array_request_node *rn; int num_tokens = 0; snprintf(namebuf, sizeof(namebuf), "%s%s%s", path_arrays, pa->ai_qs.fileprefix, ARRAY_FILE_SUFFIX); fds = open(namebuf, O_Sync | O_TRUNC | O_WRONLY | O_CREAT, 0600); if (fds < 0) { return -1; } if (write_ac_socket(fds, &(pa->ai_qs), sizeof(struct array_info)) == -1) { unlink(namebuf); close(fds); return -1; } /* count number of request tokens left */ for (rn = (array_request_node*)GET_NEXT(pa->request_tokens), num_tokens = 0; rn != NULL; rn = (array_request_node*)GET_NEXT(rn->request_tokens_link), num_tokens++) /* NO-OP, just counting */; if (write_ac_socket(fds, &num_tokens, sizeof(num_tokens)) == -1) { unlink(namebuf); close(fds); return -1; } if (num_tokens > 0) { for (rn = (array_request_node*)GET_NEXT(pa->request_tokens); rn != NULL; rn = (array_request_node*)GET_NEXT(rn->request_tokens_link)) { if (write_ac_socket(fds, rn, sizeof(array_request_node)) == -1) { unlink(namebuf); close(fds); return -1; } } } close(fds); return(PBSE_NONE); } /* END array_save() */ /* if a job belongs to an array, this will return the id of the parent job * returns job id if not array parent id */ void array_get_parent_id( char *job_id, char *parent_id) { char *c; char *pid; int bracket = 0; c = job_id; *parent_id = '\0'; pid = parent_id; /* copy up to the '[' */ while (!bracket && *c != '\0') { if (*c == '[') { bracket = 1; } *pid = *c; c++; pid++; } /* skip the until the closing bracket */ while (*c != ']' && *c != '\0') { c++; } /* copy the rest of the id */ *pid = '\0'; strcat(pid, c); } /* END array_get_parent_id() */ int read_and_convert_259_array( int fd, job_array *pa, char *path) { char log_buf[LOCAL_LOG_BUF_SIZE]; int len; job_array_259 *pa_259; /* This is for a backward compatibility problem put into 2.5.9 and 3.0.3 */ /* allocate the storage for the struct */ pa_259 = (job_array_259*)calloc(1, sizeof(job_array_259)); if (pa_259 == NULL) { return PBSE_SYSTEM; } len = read_ac_socket(fd, &(pa_259->ai_qs), sizeof(pa_259->ai_qs)); if (len < 0) { sprintf(log_buf, "error reading %s", path); log_err(errno, "read_and_convert_259_array", log_buf); free(pa_259); close(fd); return PBSE_BAD_ARRAY_DATA; } if (pa_259->ai_qs.struct_version == ARRAY_QS_STRUCT_VERSION) { sprintf(log_buf, "Already at array structure version 4. Restart pbs_server without -u option"); log_err(errno, "read_and_convert_259_array", log_buf); free(pa_259); close(fd); return PBSE_BAD_ARRAY_DATA; } if (pa_259->ai_qs.struct_version != 3) { sprintf(log_buf, "Cannot upgrade array version %d to %d", pa_259->ai_qs.struct_version, ARRAY_QS_STRUCT_VERSION); log_err(errno, "read_and_convert_259_array", log_buf); free(pa_259); close(fd); return PBSE_BAD_ARRAY_DATA; } pa->ai_qs.struct_version = ARRAY_QS_STRUCT_VERSION; pa->ai_qs.array_size = pa_259->ai_qs.array_size; pa->ai_qs.num_jobs = pa_259->ai_qs.num_jobs; pa->ai_qs.slot_limit = pa_259->ai_qs.slot_limit; pa->ai_qs.jobs_running = pa_259->ai_qs.jobs_running; pa->ai_qs.jobs_done = pa_259->ai_qs.jobs_done; pa->ai_qs.num_cloned = pa_259->ai_qs.num_cloned; pa->ai_qs.num_started = pa_259->ai_qs.num_started; pa->ai_qs.num_failed = pa_259->ai_qs.num_failed; pa->ai_qs.num_successful = pa_259->ai_qs.num_successful; pa->ai_qs.num_purged = pa_259->ai_qs.num_purged; pa->ai_qs.deps = pa_259->ai_qs.deps; snprintf(pa->ai_qs.owner, sizeof(pa->ai_qs.owner), "%s", pa_259->ai_qs.owner); snprintf(pa->ai_qs.parent_id, sizeof(pa->ai_qs.parent_id), "%s", pa_259->ai_qs.parent_id); snprintf(pa->ai_qs.fileprefix, sizeof(pa->ai_qs.fileprefix), "%s", pa_259->ai_qs.fileprefix); snprintf(pa->ai_qs.submit_host, sizeof(pa->ai_qs.submit_host), "%s", pa_259->ai_qs.submit_host); free(pa_259); array_save(pa); return(PBSE_NONE); } /* END read_and_convert_259_array() */ /* array_recov reads in an array struct saved to disk and inserts it into the servers list of arrays */ int array_recov( char *path, job_array **new_pa) { job_array *pa; array_request_node *rn; char log_buf[LOCAL_LOG_BUF_SIZE]; int fd; int old_version; int num_tokens; int i; int len; int rc; *new_pa = NULL; old_version = ARRAY_QS_STRUCT_VERSION; /* allocate the storage for the struct */ pa = (job_array*)calloc(1,sizeof(job_array)); if (pa == NULL) { return(PBSE_SYSTEM); } /* initialize the linked list nodes */ CLEAR_HEAD(pa->request_tokens); fd = open(path, O_RDONLY, 0); if(fd < 0) { free(pa); return(PBSE_SYSTEM); } if (array_259_upgrade) { rc = read_and_convert_259_array(fd, pa, path); if (rc != PBSE_NONE) { free(pa); close(fd); return(rc); } } else { /* read the file into the struct previously allocated. */ len = read_ac_socket(fd, &(pa->ai_qs), sizeof(pa->ai_qs)); if ((len < 0) || ((len < (int)sizeof(pa->ai_qs)) && (pa->ai_qs.struct_version == ARRAY_QS_STRUCT_VERSION))) { sprintf(log_buf, "error reading %s", path); log_err(errno, __func__, log_buf); free(pa); close(fd); return(PBSE_SYSTEM); } if (pa->ai_qs.struct_version != ARRAY_QS_STRUCT_VERSION) { rc = array_upgrade(pa, fd, pa->ai_qs.struct_version, &old_version); if (rc) { sprintf(log_buf, "Cannot upgrade array version %d to %d", pa->ai_qs.struct_version, ARRAY_QS_STRUCT_VERSION); log_err(errno, __func__, log_buf); free(pa); close(fd); return(rc); } } } pa->job_ids = (char **)calloc(pa->ai_qs.array_size, sizeof(char *)); if(pa->job_ids == NULL) { memset(&pa->ai_qs,0,sizeof(array_info)); return PBSE_SYSTEM; } /* check to see if there is any additional info saved in the array file */ /* check if there are any array request tokens that haven't been fully processed */ if (old_version > 1) { if (read_ac_socket(fd, &num_tokens, sizeof(int)) != sizeof(int)) { sprintf(log_buf, "error reading token count from %s", path); log_err(errno, __func__, log_buf); free(pa); close(fd); return(PBSE_SYSTEM); } for (i = 0; i < num_tokens; i++) { rn = (array_request_node *)calloc(1, sizeof(array_request_node)); if (read_ac_socket(fd, rn, sizeof(array_request_node)) != sizeof(array_request_node)) { sprintf(log_buf, "error reading array_request_node from %s", path); log_err(errno, __func__, log_buf); free(rn); for (rn = (array_request_node*)GET_NEXT(pa->request_tokens); rn != NULL; rn = (array_request_node*)GET_NEXT(pa->request_tokens)) { delete_link(&rn->request_tokens_link); free(rn); } free(pa); close(fd); return(PBSE_SYSTEM); } CLEAR_LINK(rn->request_tokens_link); append_link(&pa->request_tokens, &rn->request_tokens_link, (void*)rn); } } close(fd); CLEAR_HEAD(pa->ai_qs.deps); if (old_version != ARRAY_QS_STRUCT_VERSION) { /* resave the array struct if the version on disk is older than the current */ array_save(pa); } pa->ai_mutex = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t)); pthread_mutex_init(pa->ai_mutex,NULL); lock_ai_mutex(pa, __func__, NULL, LOGLEVEL); /* link the struct into the servers list of job arrays */ insert_array(pa); *new_pa = pa; return(PBSE_NONE); } /* END array_recov() */ /* delete a job array struct from memory and disk. This is used when the number * of jobs that belong to the array becomes zero. * returns zero if there are no errors, non-zero otherwise */ int array_delete( job_array *pa) { int i; char path[MAXPATHLEN + 1]; char log_buf[LOCAL_LOG_BUF_SIZE]; array_request_node *rn; struct array_depend *pdep; struct array_depend_job *pdj; /* first thing to do is take this out of the servers list of all arrays */ remove_array(pa); /* unlock the mutex and free it */ unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); free(pa->ai_mutex); /* delete the on disk copy of the struct */ snprintf(path, sizeof(path), "%s%s%s", path_arrays, pa->ai_qs.fileprefix, ARRAY_FILE_SUFFIX); if (unlink(path)) { sprintf(log_buf, "unable to delete %s", path); log_err(errno, "array_delete", log_buf); } /* clear array request linked list */ for (rn = (array_request_node *)GET_NEXT(pa->request_tokens); rn != NULL; rn = (array_request_node *)GET_NEXT(pa->request_tokens)) { delete_link(&rn->request_tokens_link); free(rn); } /* free the memory for the job pointers */ for (i = 0; i < pa->ai_qs.array_size; i++) { if (pa->job_ids[i] != NULL) free(pa->job_ids[i]); } free(pa->job_ids); /* free the dependencies, if any */ for (pdep = (struct array_depend *)GET_NEXT(pa->ai_qs.deps); pdep != NULL; pdep = (struct array_depend *)GET_NEXT(pa->ai_qs.deps)) { delete_link(&pdep->dp_link); for (pdj = (struct array_depend_job *)GET_NEXT(pdep->dp_jobs); pdj != NULL; pdj = (struct array_depend_job *)GET_NEXT(pdep->dp_jobs)) { delete_link(&pdj->dc_link); free(pdj); } free(pdep); } /* purge the "template" job, this also deletes the shared script file for the array*/ if (pa->ai_qs.parent_id[0] != '\0') { job *pjob; if ((pjob = svr_find_job(pa->ai_qs.parent_id, FALSE)) != NULL) svr_job_purge(pjob); } /* free the memory allocated for the struct */ free(pa); return(PBSE_NONE); } /* END array_delete() */ /* * set_slot_limit() * sets how many jobs can be run from this array at once * * @param request - the string array request * @param pa - the array to receive a slot limit * * @return 0 on SUCCESS */ int set_slot_limit( char *request, /* I */ job_array *pa) /* O */ { char *pcnt; long max_limit; /* check for a max slot limit */ if (get_svr_attr_l(SRV_ATR_MaxSlotLimit, &max_limit) != PBSE_NONE) max_limit = NO_SLOT_LIMIT; if ((pcnt = strchr(request,'%')) != NULL) { /* remove '%' from the request, or else it can't be parsed */ while (*pcnt == '%') { *pcnt = '\0'; pcnt++; } /* read the number if one is given */ if (strlen(pcnt) > 0) { pa->ai_qs.slot_limit = atoi(pcnt); if ((max_limit != NO_SLOT_LIMIT) && (max_limit < pa->ai_qs.slot_limit)) { return(INVALID_SLOT_LIMIT); } } else { pa->ai_qs.slot_limit = max_limit; } } else { pa->ai_qs.slot_limit = max_limit; } return(PBSE_NONE); } /* END set_slot_limit() */ int setup_array_struct( job *pjob) { job_array *pa; array_request_node *rn; int bad_token_count; int array_size; int rc; char log_buf[LOCAL_LOG_BUF_SIZE]; long max_array_size; if(pjob == NULL) return RM_ERR_BADPARAM; pa = (job_array *)calloc(1,sizeof(job_array)); pa->ai_qs.struct_version = ARRAY_QS_STRUCT_VERSION; strcpy(pa->ai_qs.parent_id, pjob->ji_qs.ji_jobid); strcpy(pa->ai_qs.fileprefix, pjob->ji_qs.ji_fileprefix); snprintf(pa->ai_qs.owner, sizeof(pa->ai_qs.owner), "%s", pjob->ji_wattr[JOB_ATR_job_owner].at_val.at_str); snprintf(pa->ai_qs.submit_host, sizeof(pa->ai_qs.submit_host), "%s", get_variable(pjob, pbs_o_host)); pa->ai_qs.num_cloned = 0; CLEAR_HEAD(pa->request_tokens); pa->ai_mutex = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t)); pthread_mutex_init(pa->ai_mutex, NULL); mutex_mgr pa_mutex = mutex_mgr(pa->ai_mutex); if (job_save(pjob, SAVEJOB_FULL, 0) != 0) { /* the array is deleted in svr_job_purge */ pa_mutex.unlock(); /* Does job array need to be removed? */ if (LOGLEVEL >= 6) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, (char *)"cannot save job"); } svr_job_purge(pjob); free(pa); return(1); } if ((rc = set_slot_limit(pjob->ji_wattr[JOB_ATR_job_array_request].at_val.at_str, pa))) { long max_limit = 0; get_svr_attr_l(SRV_ATR_MaxSlotLimit, &max_limit); snprintf(log_buf,sizeof(log_buf), "Array %s requested a slot limit above the max limit %ld, rejecting\n", pa->ai_qs.parent_id, max_limit); log_event(PBSEVENT_SYSTEM,PBS_EVENTCLASS_JOB,pa->ai_qs.parent_id,log_buf); array_delete(pa); pa_mutex.set_lock_on_exit(false); return(INVALID_SLOT_LIMIT); } pa->ai_qs.jobs_running = 0; pa->ai_qs.num_started = 0; pa->ai_qs.num_failed = 0; pa->ai_qs.num_successful = 0; bad_token_count = parse_array_request( pjob->ji_wattr[JOB_ATR_job_array_request].at_val.at_str, &(pa->request_tokens)); /* get the number of elements that should be allocated in the array */ rn = (array_request_node *)GET_NEXT(pa->request_tokens); array_size = 0; pa->ai_qs.num_jobs = 0; while (rn != NULL) { if (rn->end > array_size) array_size = rn->end; /* calculate the actual number of jobs (different from array size) */ pa->ai_qs.num_jobs += rn->end - rn->start + 1; rn = (array_request_node *)GET_NEXT(rn->request_tokens_link); } /* size of array is the biggest index + 1 */ array_size++; if (get_svr_attr_l(SRV_ATR_MaxArraySize, &max_array_size) == PBSE_NONE) { if (max_array_size < pa->ai_qs.num_jobs) { array_delete(pa); pa_mutex.set_lock_on_exit(false); return(ARRAY_TOO_LARGE); } } /* initialize the array */ pa->job_ids = (char **)calloc(array_size, sizeof(char *)); if (pa->job_ids == NULL) { sprintf(log_buf, "Failed to alloc job_ids: job %s", pjob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); array_delete(pa); return(PBSE_MEM_MALLOC); } /* remember array_size */ pa->ai_qs.array_size = array_size; CLEAR_HEAD(pa->ai_qs.deps); array_save(pa); if (bad_token_count > 0) { array_delete(pa); pa_mutex.set_lock_on_exit(false); return 2; } strcpy(pjob->ji_arraystructid, pa->ai_qs.parent_id); insert_array(pa); return(PBSE_NONE); } /* END setup_array_struct() */ int is_num( const char *str) { int i; int len; len = strlen(str); if (len == 0) { return(FALSE); } for (i = 0; i < len; i++) { if (str[i] < '0' || str[i] > '9') { return(FALSE); } } return(TRUE); } /* END is_num() */ int array_request_token_count( const char *str) { int token_count = 1; int len = strlen(str); int i; if (len == 0) { return 0; } for (i = 0; i < len; i++) { if (str[i] == ',') { token_count++; } } return(token_count); } /* END array_request_token_count() */ int array_request_parse_token( char *str, /* I */ int *start, /* O */ int *end) /* O */ { int num_ids; long start_l; long end_l; char *idx; char *ridx; if ((NULL == str) || (NULL == start) || (NULL == end)) { return 0; } idx = index(str, '-'); ridx = rindex(str, '-'); /* token is not a range, parse it as a single task id */ if (idx == NULL) { /* make sure it is a number...*/ if (!is_num(str)) { start_l = -1; end_l = -1; } else { /* token is a number, set start_l and end_l to the value */ start_l = strtol(str, NULL, 10); end_l = start_l; } } /* index and rindex found the same '-' character, this is a range */ else if (idx == ridx) { *idx = '\0'; idx++; /* check for an invalid range */ if ((!is_num(str)) || (!is_num(idx))) { start_l = -1; end_l = -1; } else { /* both sides of the range were numbers so we set start_l and end_l we will check later to make sure that the range is "good" */ start_l = strtol(str, NULL, 10); end_l = strtol(idx, NULL, 10); } } /* index/rindex found different '-' characters, this can't be a good range */ else { start_l = -1; end_l = -1; } /* restore the string so this function is non-destructive to the token */ if ((idx != NULL) && (idx == ridx + 1)) { idx--; *idx = '-'; } /* make sure the start or end of the range is not out of the range for job array task IDs, and make sure that end_l is not less than start_l (it is OK for end_l to == start_l)*/ if ((start_l < 0) || (start_l >= INT_MAX) || (end_l < 0) || (end_l >= INT_MAX) || (start_l > PBS_MAXJOBARRAY) || (end_l > PBS_MAXJOBARRAY) || (end_l < start_l)) { *start = -1; *end = -1; num_ids = 0; } else { /* calculate the number of task IDs in the range, and cast the start_l and end_l to ints when setting start and end (we already confirmed that start_l and end_l are > 0 and <= INT_MAX, so we will not under/overflow) */ num_ids = end_l - start_l + 1; *start = (int)start_l; *end = (int)end_l; } return(num_ids); } /* END array_request_parse_token() */ int parse_array_request( char *request, tlist_head *tl) { char *temp_str; int num_tokens; char **tokens; int i; int j; int num_elements; int start; int end; int num_bad_tokens; int searching; array_request_node *rn; array_request_node *rn2; if ((request == NULL) || (request[0] == '\0') || (tl == NULL)) { return 1; /* return "bad_token_count" as greater than 0 so caller knows there are problems */ } temp_str = strdup(request); num_tokens = array_request_token_count(request); num_bad_tokens = 0; tokens = (char**)calloc(num_tokens, sizeof(char *)); j = num_tokens - 1; /* start from back and scan backwards setting pointers to tokens and changing ',' to '\0' */ for (i = strlen(temp_str) - 1; i >= 0; i--) { if (temp_str[i] == ',') { tokens[j--] = &temp_str[i+1]; temp_str[i] = '\0'; } else if (i == 0) { tokens[0] = temp_str; } } for (i = 0; i < num_tokens; i++) { num_elements = array_request_parse_token(tokens[i], &start, &end); if (num_elements == 0) { num_bad_tokens++; } else { rn = (array_request_node*)calloc(1, sizeof(array_request_node)); rn->start = start; rn->end = end; CLEAR_LINK(rn->request_tokens_link); rn2 = (array_request_node *)GET_NEXT(*tl); searching = TRUE; while (searching) { if (rn2 == NULL) { append_link(tl, &rn->request_tokens_link, (void*)rn); searching = FALSE; } else if (rn->start < rn2->start) { insert_link(&rn2->request_tokens_link, &rn->request_tokens_link, (void*)rn, LINK_INSET_BEFORE); searching = FALSE; } else { rn2 = (array_request_node *)GET_NEXT(rn2->request_tokens_link); } } rn2 = (array_request_node *)GET_PRIOR(rn->request_tokens_link); if (rn2 != NULL && rn2->end >= rn->start) { num_bad_tokens++; } rn2 = (array_request_node *)GET_NEXT(rn->request_tokens_link); if (rn2 != NULL && rn2->start <= rn->end) { num_bad_tokens++; } } } free(tokens); free(temp_str); return num_bad_tokens; } /* END parse_array_request() */ /* * delete_array_range() * * deletes a range from a specific array * * @param pa - the array whose jobs are deleted * @param range_str - the user-given range to delete * @return - the number of jobs skipped, -1 if range error */ int delete_array_range( job_array *pa, char *range_str) { tlist_head tl; array_request_node *rn; array_request_node *to_free; job *pjob; char *range; int i; int num_skipped = 0; int num_deleted = 0; int deleted; int running; /* get just the numeric range specified, '=' should * always be there since we put it there in qdel */ if((range = strchr(range_str,'=')) == NULL) return(-1); range++; /* move past the '=' */ CLEAR_HEAD(tl); if (parse_array_request(range,&tl) > 0) { /* don't delete jobs if range error */ return(-1); } rn = (array_request_node*)GET_NEXT(tl); while (rn != NULL) { for (i = rn->start; i <= rn->end; i++) { if (pa->job_ids[i] == NULL) continue; /* don't stomp on other memory */ if (i >= pa->ai_qs.array_size) continue; if ((pjob = svr_find_job(pa->job_ids[i], FALSE)) == NULL) { free(pa->job_ids[i]); pa->job_ids[i] = NULL; } else { mutex_mgr pjob_mutex = mutex_mgr(pjob->ji_mutex, true); if (pjob->ji_qs.ji_state >= JOB_STATE_EXITING) { /* invalid state for request, skip */ continue; } running = (pjob->ji_qs.ji_state == JOB_STATE_RUNNING); pthread_mutex_unlock(pa->ai_mutex); deleted = attempt_delete(pjob); /* we come out of attempt_delete unlocked */ pjob_mutex.set_lock_on_exit(false); if (deleted == FALSE) { /* if the job was deleted, this mutex would be taked care of elsewhere. When it fails, * release it here */ num_skipped++; } else if (running == FALSE) { /* running jobs will increase the deleted count when their obit is reported */ num_deleted++; } pthread_mutex_lock(pa->ai_mutex); } } to_free = rn; rn = (array_request_node*)GET_NEXT(rn->request_tokens_link); /* release mem */ free(to_free); } pa->ai_qs.num_failed += num_deleted; return(num_skipped); } /* END delete_array_range() */ /* * first_job_index() * * @param pa - the array * @return the index of the first job in the array */ int first_job_index( job_array *pa) { int i; for (i = 0; i < pa->ai_qs.array_size; i++) { if (pa->job_ids[i] != NULL) return(i); } return(-1); } /* END first_job_index() */ /* * delete_whole_array() * * iterates over the array and deletes the whole thing * @param pa - the array to be deleted * @return - the number of jobs skipped */ int delete_whole_array( job_array *pa) /* I */ { int i; int num_skipped = 0; int num_jobs = 0; int num_deleted = 0; int deleted; int running; job *pjob; for (i = 0; i < pa->ai_qs.array_size; i++) { if (pa->job_ids[i] == NULL) continue; if ((pjob = svr_find_job(pa->job_ids[i], FALSE)) == NULL) { free(pa->job_ids[i]); pa->job_ids[i] = NULL; } else { mutex_mgr pjob_mutex = mutex_mgr(pjob->ji_mutex, true); num_jobs++; if (pjob->ji_qs.ji_state >= JOB_STATE_EXITING) { /* invalid state for request, skip */ continue; } running = (pjob->ji_qs.ji_state == JOB_STATE_RUNNING); pthread_mutex_unlock(pa->ai_mutex); deleted = attempt_delete(pjob); pjob_mutex.set_lock_on_exit(false); if (deleted == FALSE) { /* if the job was deleted, this mutex would be taked care of elsewhere. * When it fails, release it here */ num_skipped++; } else if (running == FALSE) { /* running jobs will increase the deleted count when their obit is reported */ num_deleted++; } pthread_mutex_lock(pa->ai_mutex); } } pa->ai_qs.num_failed += num_deleted; if (num_jobs == 0) return(NO_JOBS_IN_ARRAY); return(num_skipped); } /* END delete_whole_array() */ /* * hold_array_range() * * holds just a specified range from an array * @param pa - the array to be acted on * @param range_str - string specifying the range */ int hold_array_range( job_array *pa, /* O */ char *range_str, /* I */ pbs_attribute *temphold) /* I */ { tlist_head tl; int i; job *pjob; array_request_node *rn; array_request_node *to_free; char *range = strchr(range_str,'='); if (range == NULL) return(PBSE_IVALREQ); range++; /* move past the '=' */ CLEAR_HEAD(tl); if (parse_array_request(range,&tl) > 0) { /* don't hold the jobs if range error */ return(PBSE_IVALREQ); } else { /* hold just that range from the array */ rn = (array_request_node*)GET_NEXT(tl); while (rn != NULL) { for (i = rn->start; i <= rn->end; i++) { /* don't stomp on other memory */ if (i >= pa->ai_qs.array_size) continue; if (pa->job_ids[i] == NULL) continue; if ((pjob = svr_find_job(pa->job_ids[i], FALSE)) == NULL) { free(pa->job_ids[i]); pa->job_ids[i] = NULL; } else { hold_job(temphold,pjob); unlock_ji_mutex(pjob, __func__, NULL, LOGLEVEL); } } /* release mem */ to_free = rn; rn = (array_request_node*)GET_NEXT(rn->request_tokens_link); free(to_free); } } return(PBSE_NONE); } /* END hold_array_range() */ int release_array_range( job_array *pa, struct batch_request *preq, char *range_str) { tlist_head tl; int i; int rc; job *pjob; array_request_node *rn; array_request_node *to_free; char *range = strchr(range_str,'='); if (range == NULL) return(PBSE_IVALREQ); range++; /* move past the '=' */ CLEAR_HEAD(tl); if (parse_array_request(range,&tl) > 0) { /* don't hold the jobs if range error */ return(PBSE_IVALREQ); } /* hold just that range from the array */ rn = (array_request_node*)GET_NEXT(tl); while (rn != NULL) { for (i = rn->start; i <= rn->end; i++) { /* don't stomp on other memory */ if (i >= pa->ai_qs.array_size) continue; if (pa->job_ids[i] == NULL) continue; if ((pjob = svr_find_job(pa->job_ids[i], FALSE)) == NULL) { free(pa->job_ids[i]); pa->job_ids[i] = NULL; } else { mutex_mgr pjob_mutex = mutex_mgr(pjob->ji_mutex, true); if ((rc = release_job(preq,pjob))) { return(rc); } } } /* release mem */ to_free = rn; rn = (array_request_node*)GET_NEXT(rn->request_tokens_link); free(to_free); } return(PBSE_NONE); } /* END release_array_range() */ int modify_array_range( job_array *pa, /* I/O */ char *range, /* I */ svrattrl *plist, /* I */ batch_request *preq, /* I */ int checkpoint_req) /* I */ { tlist_head tl; int i; int rc = PBSE_NONE; job *pjob; array_request_node *rn; array_request_node *to_free; CLEAR_HEAD(tl); if (parse_array_request(range,&tl) > 0) { /* don't hold the jobs if range error */ return(FAILURE); } else { int array_gone = FALSE; /* hold just that range from the array */ rn = (array_request_node*)GET_NEXT(tl); while (rn != NULL) { if (array_gone == FALSE) { for (i = rn->start; i <= rn->end; i++) { if ((i >= pa->ai_qs.array_size) || (pa->job_ids[i] == NULL)) continue; if ((pjob = svr_find_job(pa->job_ids[i], FALSE)) == NULL) { free(pa->job_ids[i]); pa->job_ids[i] = NULL; } else { struct batch_request *array_req = duplicate_request(preq, i); mutex_mgr pjob_mutex = mutex_mgr(pjob->ji_mutex, true); pthread_mutex_unlock(pa->ai_mutex); rc = modify_job((void **)&pjob, plist, array_req, checkpoint_req, NO_MOM_RELAY); pa = get_jobs_array(&pjob); if (pa == NULL) { array_gone = TRUE; if (pjob == NULL) pjob_mutex.set_lock_on_exit(false); break; } if (pjob == NULL) { pjob_mutex.set_lock_on_exit(false); pa->job_ids[i] = NULL; } } } } /* release mem */ to_free = rn; rn = (array_request_node*)GET_NEXT(rn->request_tokens_link); free(to_free); } } return(rc); } /* END modify_array_range() */ /** * update_array_values() * * updates internal bookeeping values for job arrays * @param pa - array to update * @param pjob - the pjob that an event happened on * @param event - code for what event just happened */ void update_array_values( job_array *pa, /* I */ int old_state, /* I */ enum ArrayEventsEnum event, /* I */ char *job_id, long job_atr_hold, int job_exit_status) { long moab_compatible; switch (event) { case aeQueue: /* NYI, nothing needs to be done for this yet */ break; case aeRun: if (old_state != JOB_STATE_RUNNING) { pa->ai_qs.jobs_running++; pa->ai_qs.num_started++; } break; case aeTerminate: if (old_state == JOB_STATE_RUNNING) { if (pa->ai_qs.jobs_running > 0) pa->ai_qs.jobs_running--; } if (job_exit_status == 0) { pa->ai_qs.num_successful++; pa->ai_qs.jobs_done++; } else { pa->ai_qs.num_failed++; pa->ai_qs.jobs_done++; } array_save(pa); /* update slot limit hold if necessary */ if (get_svr_attr_l(SRV_ATR_MoabArrayCompatible, &moab_compatible) != PBSE_NONE) moab_compatible = FALSE; if (moab_compatible != FALSE) { /* only need to update if the job wasn't previously held */ if ((job_atr_hold & HOLD_l) == FALSE) { int i; int newstate; int newsub; job *pj; /* find the first held job and release its hold */ for (i = 0; i < pa->ai_qs.array_size; i++) { if (pa->job_ids[i] == NULL) continue; if (!strcmp(pa->job_ids[i], job_id)) continue; if ((pj = svr_find_job(pa->job_ids[i], TRUE)) == NULL) { free(pa->job_ids[i]); pa->job_ids[i] = NULL; } else { mutex_mgr pj_mutex = mutex_mgr(pj->ji_mutex, true); if (pj->ji_wattr[JOB_ATR_hold].at_val.at_long & HOLD_l) { pj->ji_wattr[JOB_ATR_hold].at_val.at_long &= ~HOLD_l; if (pj->ji_wattr[JOB_ATR_hold].at_val.at_long == 0) { pj->ji_wattr[JOB_ATR_hold].at_flags &= ~ATR_VFLAG_SET; } svr_evaljobstate(*pj, newstate, newsub, 1); svr_setjobstate(pj, newstate, newsub, FALSE); job_save(pj, SAVEJOB_FULL, 0); break; } } } } } break; default: /* log error? */ break; } set_array_depend_holds(pa); array_save(pa); } /* END update_array_values() */ /* * sets the state of the array summary job that is used strictly * for qstat displays. */ void update_array_statuses() { job_array *pa; job *pjob; int iter = -1; unsigned int running; unsigned int queued; unsigned int complete; char log_buf[LOCAL_LOG_BUF_SIZE]; char jobid[PBS_MAXSVRJOBID+1]; while ((pa = next_array(&iter)) != NULL) { running = pa->ai_qs.jobs_running; complete = pa->ai_qs.num_failed + pa->ai_qs.num_successful; queued = pa->ai_qs.num_jobs - running - complete; if (LOGLEVEL >= 7) { sprintf(log_buf, "%s: unlocking ai_mutex", __func__); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf); } strcpy(jobid, pa->ai_qs.parent_id); unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); if ((pjob = svr_find_job(jobid, TRUE)) != NULL) { mutex_mgr pjob_mutex = mutex_mgr(pjob->ji_mutex, true); if (running > 0) { svr_setjobstate(pjob, JOB_STATE_RUNNING, pjob->ji_qs.ji_substate, FALSE); } else if ((complete > 0) && (queued == 0)) { svr_setjobstate(pjob, JOB_STATE_COMPLETE, pjob->ji_qs.ji_substate, FALSE); } else { /* default to just calling the array queued */ svr_setjobstate(pjob, JOB_STATE_QUEUED, pjob->ji_qs.ji_substate, FALSE); } } } /* END for each array */ } /* END update_array_statuses() */ /* num_array_jobs() * * determine the number of jobs in the array from the array request * * @param req_str - the string of the array request * @return - the number of jobs in the array, -1 on error */ int num_array_jobs( const char *req_str) /* I */ { int num_jobs = 0; int start; int end; char *delim = (char *)","; char *ptr; char *dash; char *tmp_ptr; char tmp_str[MAXPATHLEN]; if (req_str == NULL) return(-1); snprintf(tmp_str, sizeof(tmp_str), "%s", req_str); tmp_ptr = tmp_str; ptr = threadsafe_tokenizer(&tmp_ptr, delim); while (ptr != NULL) { if ((dash = strchr(ptr,'-')) != NULL) { /* this is a range */ start = atoi(ptr); end = atoi(dash+1); /* check for invalid range */ if (end < start) return(-1); num_jobs += end - start + 1; } else { /* just one job */ num_jobs++; } ptr = threadsafe_tokenizer(&tmp_ptr, delim); } return(num_jobs); } /* END num_array_jobs */ /* * initializes the array to store all job_array pointers */ void initialize_all_arrays_array() { allarrays.hm = get_hash_map(INITIAL_HASH_MAP_SIZE); allarrays.allarrays_mutex = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t)); pthread_mutex_init(allarrays.allarrays_mutex,NULL); } /* END initialize_all_arrays_array() */ /* * insert pa into the global array */ int insert_array( job_array *pa) { int rc; pthread_mutex_lock(allarrays.allarrays_mutex); if ((rc = add_to_hash_map(allarrays.hm, pa, pa->ai_qs.parent_id)) == ENOMEM) { log_err(rc, __func__, "No memory to resize the array...SYSTEM FAILURE\n"); } pthread_mutex_unlock(allarrays.allarrays_mutex); return(rc); } /* END insert_array() */ int remove_array( job_array *pa) { int rc; char arrayid[PBS_MAXSVRJOBID+1]; if (pthread_mutex_trylock(allarrays.allarrays_mutex)) { strcpy(arrayid, pa->ai_qs.parent_id); unlock_ai_mutex(pa, __func__, "1", LOGLEVEL); pthread_mutex_lock(allarrays.allarrays_mutex); pa = get_array_from_hash(allarrays.hm, arrayid); if (pa != NULL) lock_ai_mutex(pa, __func__, "2", LOGLEVEL); } if (pa == NULL) rc = PBSE_NONE; else rc = remove_from_hash_map(allarrays.hm, pa->ai_qs.parent_id); pthread_mutex_unlock(allarrays.allarrays_mutex); return(rc); } /* END remove_array() */ job_array *next_array( int *iter) { job_array *pa = NULL; pthread_mutex_lock(allarrays.allarrays_mutex); pa = (job_array *)next_from_hash_map(allarrays.hm, iter); if (pa != NULL) lock_ai_mutex(pa, __func__, NULL, LOGLEVEL); pthread_mutex_unlock(allarrays.allarrays_mutex); return(pa); } /* END next_array() */ job_array *next_array_check( int *iter, job_array *owned) { job_array *pa = NULL; pthread_mutex_lock(allarrays.allarrays_mutex); pa = (job_array *)next_from_hash_map(allarrays.hm, iter); if ((pa != NULL) && (pa != owned)) lock_ai_mutex(pa, __func__, NULL, LOGLEVEL); pthread_mutex_unlock(allarrays.allarrays_mutex); return(pa); } /* END next_array_check() */ /* Search for the job array with and without the . extension */ job_array *get_array_from_hash(hash_map *hm, const char *id) { char jobid[PBS_MAXSVRJOBID]; char *dot; job_array *pJa = NULL; strcpy(jobid,id); dot = strchr(jobid,'.'); pJa = (job_array *)get_from_hash_map(hm,jobid); if(pJa != NULL) return pJa; if(dot != NULL) { *dot = '\0'; pJa = (job_array *)get_from_hash_map(hm,jobid); } return pJa; } /* END array_func.c */