/* Copyright 2008 Cluster Resources */ /** * checkpoint.c * * Support for job checkpoint and restart. * * Note that there are two coexistent systems for implementing * checkpoint and restart. The older one is based on machine * dependent code in the mom_mach.c module. The newer one uses * the BLCR system modules. The working of the code below is * based on the variable checkpoint_system_type which can have * the values CST_NONE, CST_MACH_DEP or CST_BLCR. This variable * is set by calling mom_does_checkpoint at system startup time. * * The CST_MACH_DEP system uses a directory based scheme of * taking a checkpoint and produces a file for each task. The * restart code iterates over the directory and does a restart * task for each file it finds. The entire set of files must * be written for the checkpoint to be valid. To ensure this, * an existing checkpoint directory is renamed with a .old * extension while writing a new checkpoint. If something * fails during this process, recovery code will notice the * .old directory and deduce that the existing directory is * invalid and should be deleted at the same time renaming * the backup to the current. * * In the case of the CST_BLCR system, BLCR is MPI aware and * takes care of doing the checkpoints for each task. * By the way, a task as defined in Torque is a process group * executing on a node. * So the checkpoint and restore for BLCR is done on the head * node task. A directory can be specified where the checkpoint * image will reside but there is only one file per checkpoint. * BLCR also differs in that multiple checkpoint images are * allowed to exist on the disk and when the restart is performed, * the name in the job is used to decide which one of these is * to be used. This name may be altered to a previous image by * the operators use of the qalter command. * */ #include /* the master config generated by configure */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "dis.h" #include "libpbs.h" #include "pbs_error.h" #include "attribute.h" #include "server_limits.h" #include "pbs_job.h" #include "batch_request.h" #include "attribute.h" #include "resmon.h" #include "mom_mach.h" #include "mom_func.h" #include "log.h" #include "mcom.h" #include "net_connect.h" #include "resource.h" #include "csv.h" #include "svrfunc.h" #include "pbs_ifl.h" extern int exiting_tasks; extern int LOGLEVEL; extern int lockfds; extern int ForceServerUpdate; extern char TORQUE_JData[]; extern int task_recov(job *pjob); extern char *path_spool; extern char *path_jobs; extern char *TRemChkptDirList[]; #ifdef USEJOBCREATE extern uint64_t get_jobid(char *); #endif /* USEJOBCREATE */ #ifdef ENABLE_CSA extern void add_wkm_start(uint64_t, char *); #endif /* ENABLE_CSA */ int checkpoint_system_type = CST_NONE; char path_checkpoint[MAXPATHLEN + 1]; /* BLCR variables */ char checkpoint_script_name[MAXPATHLEN + 1]; char restart_script_name[MAXPATHLEN + 1]; char checkpoint_run_exe_name[MAXPATHLEN + 1]; int default_checkpoint_interval = 10; /* minutes */ extern char *mk_dirs(char *); extern int mom_open_socket_to_jobs_server(job *, char *, void (*)(int)); extern void set_attr(struct attrl **, char *, char *); extern int write_nodes_to_file(job *); extern int write_gpus_to_file(job *); int create_missing_files(job *pjob); /* The following is used for building command line args * and makes sure that at least something is generated * for each arg so that the script gets a consistent * command line. */ #define SET_ARG(x) (((x) == NULL) || (*(x) == 0))?"-":(x) /** * mom_checkpoint_job_is_checkpointable * * @param pjob Pointer to job structure. * @see TMomFinalizeChild */ int mom_checkpoint_job_is_checkpointable(job *pjob) { attribute *pattr; int rc; pattr = &pjob->ji_wattr[(int)JOB_ATR_checkpoint]; rc = checkpoint_system_type != CST_NONE && checkpoint_script_name[0] != 0 && (pattr->at_flags & ATR_VFLAG_SET) && ((csv_find_string(pattr->at_val.at_str, "c") != NULL) || (csv_find_string(pattr->at_val.at_str, "s") != NULL) || (csv_find_string(pattr->at_val.at_str, "enabled") != NULL) || (csv_find_string(pattr->at_val.at_str, "shutdown") != NULL) || (csv_find_string(pattr->at_val.at_str, "periodic") != NULL)); return(rc); } /** * mom_checkpoint_execute_job * * This routine is called from the newly created child process. * It is required for the BLCR system because the job must run as * a child of the cr_run program. * * @param pjob Pointer to job structure. * @see TMomFinalizeChild */ int mom_checkpoint_execute_job(job *pjob, char *shell, char *arg[], struct var_table *vtable) { static char *id = "mom_checkpoint_execute_job"; /* Launch job executable with cr_run command so that cr_checkpoint command will work. */ /* shuffle up the existing args */ arg[5] = arg[4]; arg[4] = arg[3]; arg[3] = arg[2]; arg[2] = arg[1]; /* replace first arg with shell name note, this func is called from a child process that exits after the executable is launched, so we don't have to worry about freeing this malloc later */ arg[1] = malloc(strlen(shell) + 1); if (arg[1] == NULL) { log_err(errno,id,"cannot alloc env"); return(-1); } strcpy(arg[1], shell); arg[0] = checkpoint_run_exe_name; if (LOGLEVEL >= 10) { char cmd[MAXPATHLEN + 1]; int i; strcpy(cmd,arg[0]); for (i = 1; arg[i] != NULL; i++) { strcat(cmd," "); strcat(cmd,arg[i]); } strcat(cmd,")"); log_buffer[0] = '\0'; sprintf(log_buffer, "execing checkpoint command (%s)\n", cmd); log_ext(-1, id, log_buffer, LOG_DEBUG); } execve(checkpoint_run_exe_name, arg, vtable->v_envp); return (0); } /** * mom_checkpoint_init * * This routine is called from the mom startup code. * @see setup_program_environment */ int mom_checkpoint_init(void) { int c = 0; char *path_checkpt_tmp; checkpoint_system_type = mom_does_checkpoint(); /* {CST_NONE, CST_MACH_DEP, CST_BLCR} */ if (strlen(path_checkpoint) == 0) /* if not -C option */ { /* mk_dirs mallocs the string it returns so this string must be freed */ path_checkpt_tmp = mk_dirs("checkpoint/"); snprintf(path_checkpoint,sizeof(path_checkpoint),"%s",path_checkpt_tmp); free(path_checkpt_tmp); } if (checkpoint_system_type == CST_BLCR) { /* set permissions on default checkpoint path, if needed */ struct stat sb; if ((stat(path_checkpoint, &sb) == 0) && ((sb.st_mode & 01777) != 01777)) { chmod(path_checkpoint, 01777); } } #if !defined(DEBUG) && !defined(NO_SECURITY_CHECK) if (checkpoint_system_type == CST_BLCR) { c = chk_file_sec(path_checkpoint, 1, 1, 0, 1, NULL); } else { c = chk_file_sec(path_checkpoint, 1, 1, S_IWGRP | S_IWOTH, 1, NULL); } #endif /* not DEBUG and not NO_SECURITY_CHECK */ return(c); } /*========================================================================*/ /* Routines called from the config file parsing to set various variables. */ /*========================================================================*/ void mom_checkpoint_set_directory_path( char *str) { char *cp; strcpy(path_checkpoint, str); cp = &path_checkpoint[strlen(path_checkpoint)]; if (*cp != '/') { *cp++ = '/'; *cp++ = 0; } } unsigned long mom_checkpoint_set_checkpoint_interval(char *value) /* I */ { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "checkpoint_interval", value); default_checkpoint_interval = atoi(value); return(1); } /* END set_checkpoint_script() */ unsigned long mom_checkpoint_set_checkpoint_script(char *value) /* I */ { struct stat sbuf; log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "checkpoint_script", value); if ((stat(value, &sbuf) == -1) || !(sbuf.st_mode & S_IXUSR)) { /* file does not exist or is not executable */ return(0); /* error */ } strncpy(checkpoint_script_name, value, sizeof(checkpoint_script_name)); return(1); } /* END set_checkpoint_script() */ unsigned long mom_checkpoint_set_restart_script(char *value) /* I */ { struct stat sbuf; log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "restart_script", value); if ((stat(value, &sbuf) == -1) || !(sbuf.st_mode & S_IXUSR)) { /* file does not exist or is not executable */ return(0); /* error */ } strncpy(restart_script_name, value, sizeof(restart_script_name)); return(1); } /* END set_restart_script() */ unsigned long mom_checkpoint_set_checkpoint_run_exe_name(char *value) /* I */ { struct stat sbuf; log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "checkpoint_run_exe", value); if ((stat(value, &sbuf) == -1) || !(sbuf.st_mode & S_IXUSR)) { /* file does not exist or is not executable */ return(0); /* error */ } strncpy(checkpoint_run_exe_name, value, sizeof(checkpoint_run_exe_name)); return(1); } /* END set_checkpoint_run_exe() */ /* * get_jobs_default_checkpoint_dir - Fills in jobs default checkpoint path. */ void get_jobs_default_checkpoint_dir( char *prefix, /* I */ char *defaultpath) /* O */ { strcpy(defaultpath, path_checkpoint); strcat(defaultpath, prefix); strcat(defaultpath, JOB_CHECKPOINT_SUFFIX); return; } /* END get_jobs_default_checkpoint_dir() */ /* ** Get jobs checkpoint directory, add jobs directory if needed. */ void get_chkpt_dir_to_use( job *pjob, char *chkpt_dir) { /* * Append jobs directory, if needed. */ char job_dir[MAXPATHLEN+1]; if ((!(pjob->ji_wattr[(int)JOB_ATR_checkpoint_dir].at_flags & ATR_VFLAG_SET)) || (checkpoint_system_type != CST_BLCR)) { /* No dir specified, use the default job checkpoint directory e.g. /var/spool/torque/checkpoint/42.host.domain.CK */ get_jobs_default_checkpoint_dir(pjob->ji_qs.ji_fileprefix, chkpt_dir); } else { sprintf(job_dir,"%s%s", pjob->ji_qs.ji_fileprefix, JOB_CHECKPOINT_SUFFIX); strcpy(chkpt_dir, pjob->ji_wattr[(int)JOB_ATR_checkpoint_dir].at_val.at_str); if ((strlen(pjob->ji_wattr[(int)JOB_ATR_checkpoint_dir].at_val.at_str) <= strlen(job_dir)) || (strcmp(job_dir, &pjob->ji_wattr[(int)JOB_ATR_checkpoint_dir].at_val.at_str[strlen(pjob->ji_wattr[(int)JOB_ATR_checkpoint_dir].at_val.at_str) - strlen(job_dir)]))) { if (chkpt_dir[strlen(chkpt_dir) - 1] != '/') { strcat(chkpt_dir, "/"); } strcat(chkpt_dir, job_dir); } } return; } /* END get_chkpt_dir_to_use() */ /* * replace_checkpoint_path - Replaces MOM_DEFAULT_CHECKPOINT_DIR in string * with the default checkpoint path. Returns 1 if path was changed else 0. */ int replace_checkpoint_path( char *path) /* I */ { char *id = "replace_checkpoint_path"; char *ptr1; char *ptr2; char tmppath[MAXPATHLEN+1]; int len; int rtnval = 0; memcpy(tmppath, path, strlen(path)); ptr1 = strstr(path, MOM_DEFAULT_CHECKPOINT_DIR); ptr2 = strstr(tmppath, MOM_DEFAULT_CHECKPOINT_DIR); if (ptr1 != NULL) { ptr1++; ptr1 = strchr(ptr1,'$'); ptr1++; len = strlen(path_checkpoint); memcpy(ptr2, path_checkpoint, len); ptr2 += len; if ((path_checkpoint[strlen(path_checkpoint) - 1] == '/') && (ptr1[0] == '/')) { ptr1++; } strcpy(ptr2, ptr1); strcpy(path, tmppath); sprintf(log_buffer,"Converted filename is (%s)\n", path); log_ext(-1, id, log_buffer, LOG_DEBUG); rtnval = 1; } return (rtnval); } /* END replace_checkpoint_path() */ /* * in_remote_checkpoint_dir - Checks if path is in the remote checkpoint * directories list TRemChkptDirList. If it is then returns TRUE else FALSE * */ int in_remote_checkpoint_dir( char *ckpt_path) /* I */ { char *id = "in_remote_checkpoint_dir"; int dindex; /* * Is the checkpoint directory in the TRemChkptDirList */ for (dindex = 0;dindex < TMAX_RCDCOUNT;dindex++) { if (TRemChkptDirList[dindex] == NULL) { if ((LOGLEVEL >= 10) && (dindex == 0)) { sprintf(log_buffer, "NO remote checkpoint directories configured"); log_ext(-1, id, log_buffer, LOG_DEBUG); } break; } if (!strncasecmp(TRemChkptDirList[dindex], ckpt_path, strlen(TRemChkptDirList[dindex])) || !strcmp(TRemChkptDirList[dindex], "*")) { if (LOGLEVEL >= 10) { sprintf(log_buffer, "Checkpoint file %s matched in remote directory %s\n", ckpt_path, TRemChkptDirList[dindex]); log_ext(-1, id, log_buffer, LOG_DEBUG); } return (TRUE); } } /* END for (dindex) */ return (FALSE); } /* END in_remote_checkpoint_dir() */ /** * delete_blcr_checkpoint_files * * This routine is called to remove a checkpoint file / directory * * @param pjob Pointer to the job structure */ void delete_blcr_checkpoint_files( job *pjob) { static char id[] = "delete_blcr_checkpoint_files"; char namebuf[MAXPATHLEN+1]; if ((pjob->ji_wattr[(int)JOB_ATR_checkpoint_dir].at_flags & ATR_VFLAG_SET) == 0) { if (LOGLEVEL > 7) { sprintf(log_buffer, "No checkpoint directory specified for %s\n", pjob->ji_qs.ji_jobid); log_ext(-1, id, log_buffer, LOG_DEBUG); } return; } if (pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_flags & ATR_VFLAG_SET) { /* * Get jobs checkpoint directory. */ get_chkpt_dir_to_use(pjob, namebuf); /* * we need to clean up the checkpoint job directory * but not if it is in a remotely mounted directory */ if (!in_remote_checkpoint_dir(namebuf)) { if (LOGLEVEL >= 7) { sprintf(log_buffer, "remtree for checkpoint %s\n",namebuf); log_ext(-1, id, log_buffer, LOG_DEBUG); } remtree(namebuf); } } return; } /* END delete_blcr_checkpoint_files() */ void get_blcr_chkpt( job_file_delete_info *jfdi, /* I */ char *chkpt_dir) /* O */ { char job_dir[MAXPATHLEN+1]; sprintf(job_dir,"%s%s", jfdi->prefix, JOB_CHECKPOINT_SUFFIX); strcpy(chkpt_dir, jfdi->checkpoint_dir); if ((strlen(jfdi->checkpoint_dir) <= strlen(job_dir)) || (strcmp(job_dir, &jfdi->checkpoint_dir[strlen(jfdi->checkpoint_dir) - strlen(job_dir)]))) { if (chkpt_dir[strlen(chkpt_dir) - 1] != '/') { strcat(chkpt_dir, "/"); } strcat(chkpt_dir, job_dir); } } /* END get_blcr_chkpt() */ void delete_blcr_files( job_file_delete_info *jfdi) { static char id[] = "delete_blcr_files"; char namebuf[MAXPATHLEN+1]; if (jfdi->checkpoint_dir == NULL) { if (LOGLEVEL > 7) { sprintf(log_buffer, "No checkpoint directory specified for %s\n", jfdi->jobid); log_ext(-1, id, log_buffer, LOG_DEBUG); } return; } /* Get jobs checkpoint directory. */ get_blcr_chkpt(jfdi, namebuf); /* * we need to clean up the checkpoint job directory * but not if it is in a remotely mounted directory */ if (!in_remote_checkpoint_dir(namebuf)) { if (LOGLEVEL >= 7) { sprintf(log_buffer, "remtree for checkpoint %s\n", namebuf); log_ext(-1, id, log_buffer, LOG_DEBUG); } remtree(namebuf); } } /* END delete_blcr_files() */ /** * mom_checkpoint_delete_files * * This routine is called from the job_purge routine * which cleans up all files related to a job. * * @param pjob Pointer to the job structure * @see job_purge */ void mom_checkpoint_delete_files( job_file_delete_info *jfdi) { char namebuf[MAXPATHLEN+1]; if (checkpoint_system_type == CST_MACH_DEP) { /* delete any checkpoint file */ get_jobs_default_checkpoint_dir(jfdi->prefix, namebuf); remtree(namebuf); } else if (checkpoint_system_type == CST_BLCR) { delete_blcr_files(jfdi); } return; } /* END mom_checkpoint_delete_files() */ /** * mom_checkpoint_recover * * This routine is called from init_abort_jobs which in turn is called * on mom startup. The purpose is to recover jobs listed in the mom_priv/jobs * directory. * * This routine does not actually start the job. This happens in start_exec.c. * It's purpose is to remove a partially completed checkpoint directory, * signified by the name suffix of ".old". * * @param pjob Pointer to job data structure * @see init_abort_jobs */ void mom_checkpoint_recover( job *pjob) { char path[MAXPATHLEN + 1]; char oldp[MAXPATHLEN + 1]; struct stat statbuf; if (checkpoint_system_type == CST_MACH_DEP) { /* ** Check to see if a checkpoint.old dir exists. ** If so, remove the regular checkpoint dir ** and rename the old to the regular name. */ get_jobs_default_checkpoint_dir(pjob->ji_qs.ji_fileprefix, path); strncpy(oldp, path, MAXPATHLEN); strncat(oldp, ".old", MAXPATHLEN); if (stat(oldp, &statbuf) == 0) { remtree(path); if (rename(oldp, path) == -1) remtree(oldp); } } return; } /** * mom_checkpoint_check_periodic_timer * * This routine is called from the main loop routine examine_all_running_jobs. * Each job that is checkpointable will have timer variables set up. * This routine checks the timer variables and if set and it is time * to do a checkpoint, fires the code that starts a checkpoint. * * @param pjob Pointer to the job structure * @see examine_all_running_jobs * @see main_loop */ void mom_checkpoint_check_periodic_timer( job *pjob) { resource *prwall; extern int start_checkpoint(); int rc; static resource_def *rdwall; /* see if need to checkpoint any job */ if (pjob->ji_checkpoint_time != 0) /* ji_checkpoint_time gets set below */ { if (rdwall == NULL) { rdwall = find_resc_def(svr_resc_def, "walltime", svr_resc_size); } if (rdwall != NULL) { prwall = find_resc_entry( &pjob->ji_wattr[(int)JOB_ATR_resc_used], rdwall); /* resource definition cput set in startup */ if (prwall && (prwall->rs_value.at_val.at_long >= pjob->ji_checkpoint_next)) { pjob->ji_checkpoint_next = prwall->rs_value.at_val.at_long + pjob->ji_checkpoint_time; if ((rc = start_checkpoint(pjob, 0, 0)) != PBSE_NONE) { sprintf(log_buffer, "Checkpoint failed, error %d", rc); message_job(pjob, StdErr, log_buffer); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } } } } return; } /* END mom_checkpoint_check_periodic_timer() */ /** * blcr_checkpoint_job * * This routine lauches the checkpoint script for a BLCR * checkpoint system. * currently only supports single process job, so a BLCR job will * only have one task associated with the job. * * @see start_checkpoint() - parent * * @returns PBSE_NONE if no error */ #define MAX_CONN_RETRY 3 int blcr_checkpoint_job( job *pjob, /* I */ int abort, /* I */ struct batch_request *preq) /* may be null */ { char *id = "blcr_checkpoint_job"; char sid[20]; char *arg[20]; char buf[1024]; char **ap; FILE *fs; char *cmd; int rc; int request_type = 0; char err_buf[4098]; char line[1028]; char namebuf[MAXPATHLEN+1]; int conn = -1; int err; int conn_fail = 0; struct attrl *attrib = NULL; time_t epoch; assert(pjob != NULL); assert(pjob->ji_wattr[(int)JOB_ATR_checkpoint_dir].at_val.at_str != NULL); /* * Get jobs checkpoint directory. */ get_chkpt_dir_to_use(pjob, namebuf); /* Make sure that the specified directory exists. */ if (mkdir(namebuf, 0755) == 0) { /* Change the owner of the checkpoint directory to be the user */ if (chown(namebuf, pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) { log_err(errno, id, "cannot change checkpoint directory owner"); } } /* if a checkpoint script is defined launch it */ if (checkpoint_script_name[0] == '\0') { log_err(PBSE_RMEXIST, id, "No checkpoint script defined"); if (preq != NULL) { req_reject(PBSE_RMEXIST,PBS_CHECKPOINT_MIGRATE,preq,NULL,NULL); } exit(PBSE_RMEXIST); } /* Checkpoint successful (assumed) */ pjob->ji_qs.ji_svrflags |= JOB_SVFLG_CHECKPOINT_FILE; job_save(pjob,SAVEJOB_FULL); /* to save resources_used so far */ sprintf(log_buffer,"checkpointed to %s / %s at %ld", namebuf, pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_val.at_str, pjob->ji_wattr[(int)JOB_ATR_checkpoint_time].at_val.at_long); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); sprintf(sid,"%ld", pjob->ji_wattr[(int)JOB_ATR_session_id].at_val.at_long); arg[0] = checkpoint_script_name; arg[1] = sid; arg[2] = SET_ARG(pjob->ji_qs.ji_jobid); arg[3] = SET_ARG(pjob->ji_wattr[(int)JOB_ATR_euser].at_val.at_str); arg[4] = SET_ARG(pjob->ji_wattr[(int)JOB_ATR_egroup].at_val.at_str); arg[5] = SET_ARG(namebuf); arg[6] = SET_ARG(pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_val.at_str); arg[7] = (abort) ? "15" /*abort*/ : "0" /*run/continue*/; arg[8] = SET_ARG(csv_find_value(pjob->ji_wattr[(int)JOB_ATR_checkpoint].at_val.at_str, "depth")); arg[9] = NULL; /* XXX this should be fixed to make sure there is no chance of a buffer overrun */ strcpy(buf,"checkpoint args:"); cmd = buf + strlen("checkpoint args: "); /* this extra space compared to above is intentional... */ for (ap = arg; *ap; ap++) { strcat(buf, " "); strcat(buf, *ap); } strcat(buf, " 2>&1 1>/dev/null"); log_ext(-1, id, buf, LOG_DEBUG); if (preq != NULL) { request_type = preq->rq_type; reply_ack(preq); } /* execv(arg[0], arg); */ /* change execv to popen so we can grab the stderr */ fs = popen(cmd, "r"); /* create a read pipe for the command */ rc = 0; if (fs == NULL) { sprintf(buf, "error executing checkpoint script"); log_err(errno, id, buf); rc = -1; } else { err_buf[0] = '\0'; while (fgets(line, 1024, fs) != NULL && strlen(err_buf) + strlen(line) + 1 < 4098) { strcat(err_buf, line); } rc = pclose(fs); if (rc != -1) { rc = WEXITSTATUS(rc); } if (rc != 0) { sprintf(buf, "checkpoint script returned value %d\n", rc); log_err(-1, id, buf); } } if (rc != 0) { /* * If the checkpoint script did not return data for the err_buf, * fill it in so we can show that something went wrong */ if (strlen(err_buf) == 0) { sprintf(err_buf,"Checkpoint script failed with return value of %d", rc); } /* checkpoint script returned a non-zero value. We assume the checkpoint failed */ /* remove checkpoint directory that was created for this checkpoint attempt */ sprintf(buf, "Checkpoint failed for job %s, removing checkpoint directory\n", pjob->ji_qs.ji_jobid); log_ext(-1, id, buf, LOG_DEBUG); delete_blcr_checkpoint_files(pjob); /* open a connection to the server */ while ((conn < 0) && (conn_fail < MAX_CONN_RETRY)) { conn = pbs_connect(pjob->ji_wattr[(int)JOB_ATR_at_server].at_val.at_str); if (conn < 0) { conn_fail++; sleep(1); if (conn_fail == MAX_CONN_RETRY) { sprintf(log_buffer,"Job %s failed %d times to get connection to %s", pjob->ji_qs.ji_jobid, MAX_CONN_RETRY, pjob->ji_wattr[(int)JOB_ATR_at_server].at_val.at_str); log_err(-1, id, log_buffer); } } } set_attr(&attrib, ATTR_comment, err_buf); err = pbs_alterjob(conn, pjob->ji_qs.ji_jobid, attrib, NULL); if (err != 0) { sprintf(buf, "pbs_alterjob requested on job %s failed (%d-%s)\n", pjob->ji_qs.ji_jobid, err, pbs_strerror(err)); log_err(-1, id, buf); if (err == PBSE_UNKJOBID) { /* TODO: GB - can the job exit while waiting for the checkpoint script to exit?? call log_err */ pbs_disconnect(conn); goto done; } } if (abort != 0) { /* * we need to tell the server to release the hold (abort is non-zero * which means we are trying to hold the job) */ /* * send release job request, the job will still be running, * so it shouldn't have any holds set so we will send "uos" * to clear all holds */ pbs_rlsjob(conn, pjob->ji_qs.ji_jobid, "uos", NULL); } /* END if (abort != 0) */ pbs_disconnect(conn); } /* END if (rc != 0) */ else { char timestr[80]; /* checkpoint script returned a zero value. We assume the checkpoint suceeded */ /* open a connection to the server */ while ((conn < 0) && (conn_fail < MAX_CONN_RETRY)) { conn = pbs_connect(pjob->ji_wattr[(int)JOB_ATR_at_server].at_val.at_str); if (conn < 0) { conn_fail++; sleep(1); if (conn_fail == MAX_CONN_RETRY) { sprintf(log_buffer,"Job %s failed %d times to get connection to %s", pjob->ji_qs.ji_jobid, MAX_CONN_RETRY, pjob->ji_wattr[(int)JOB_ATR_at_server].at_val.at_str); log_err(-1, id, log_buffer); } } } sprintf(timestr,"%ld", (long)pjob->ji_wattr[(int)JOB_ATR_checkpoint_time].at_val.at_long); epoch = (time_t)pjob->ji_wattr[(int)JOB_ATR_checkpoint_time].at_val.at_long; sprintf(err_buf,"Job %s was checkpointed and %s to %s/%s at %s", pjob->ji_qs.ji_jobid, (request_type == PBS_BATCH_HoldJob) ? "terminated" : "continued", namebuf, pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_val.at_str, ctime(&epoch)); set_attr(&attrib, ATTR_comment, err_buf); set_attr(&attrib, ATTR_checkpoint_name, pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_val.at_str); set_attr(&attrib, ATTR_checkpoint_time, timestr); err = pbs_alterjob(conn, pjob->ji_qs.ji_jobid, attrib, (request_type == PBS_BATCH_HoldJob) ? CHECKPOINTHOLD : CHECKPOINTCONT); if (err != 0) { sprintf(buf, "pbs_alterjob requested on job %s failed (%d-%s)\n", pjob->ji_qs.ji_jobid, err, pbs_strerror(err)); log_err(-1, id, buf); if (err == PBSE_UNKJOBID) { /* TODO: GB - can the job exit while waiting for the checkpoint script to exit?? call log_err */ pbs_disconnect(conn); /* * If we get an unknown jobid after succesfully doing a non-hold * checkpoint, then it is most likely the result of a periodic * checkpoint for a job that had a qdel -p done, so we get rid of * any local checkpoint / restart files */ if (request_type == 0) { delete_blcr_checkpoint_files(pjob); } goto done; } } pbs_disconnect(conn); if (rc == 0) { /* Normally, this is an empty routine and does nothing. */ rc = site_mom_postchk(pjob,abort); } } done: exit (rc); } /* END blcr_checkpoint_job() */ /* * Checkpoint the job. * * If abort is TRUE, kill it too. Return a PBS error code. */ int mom_checkpoint_job( job *pjob, /* I */ int abort) /* I */ { int hasold = 0; int sesid = -1; int ckerr; struct stat statbuf; char path[MAXPATHLEN + 1]; char oldp[MAXPATHLEN + 1]; char file[MAXPATHLEN + 1]; task *ptask; assert(pjob != NULL); get_jobs_default_checkpoint_dir(pjob->ji_qs.ji_fileprefix, path); if (stat(path, &statbuf) == 0) { strcpy(oldp, path); /* file already exists, rename it */ strcat(oldp, ".old"); if (rename(path, oldp) < 0) { return(errno); } hasold = 1; } mkdir(path, 0755); strcpy(file, path); #ifdef _CRAY /* * if job is suspended and if is set, resume job first, * this is so job will be "Q"ueued and then back into "R"unning * when restarted. */ if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_Suspend) && abort) { for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { sesid = ptask->ti_qs.ti_sid; if (ptask->ti_qs.ti_status != TI_STATE_RUNNING) continue; /* XXX: What to do if some resume work and others don't? */ if ((ckerr = resume(C_JOB, sesid)) == 0) { post_resume(pjob, ckerr); } else { sprintf(log_buffer, "checkpoint failed: errno=%d sid=%d", errno, sesid); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(errno); } } } #endif /* _CRAY */ for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { sesid = ptask->ti_qs.ti_sid; if (ptask->ti_qs.ti_status != TI_STATE_RUNNING) continue; if (mach_checkpoint(ptask, file, abort) == -1) goto fail; } /* Checkpoint successful */ pjob->ji_qs.ji_svrflags |= JOB_SVFLG_CHECKPOINT_FILE; job_save(pjob, SAVEJOB_FULL); /* to save resources_used so far */ sprintf(log_buffer, "checkpointed to %s", path); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); if (hasold) remtree(oldp); return(PBSE_NONE); fail: /* A checkpoint has failed. Log and return error. */ ckerr = errno; sprintf(log_buffer, "checkpoint failed:errno=%d sid=%d", errno, sesid); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); /* ** See if any checkpoints worked and abort is set. ** If so, we need to restart these tasks so the whole job is ** still running. This has to wait until we reap the ** aborted task(s). */ if (abort) { return(PBSE_CKPSHORT); } /* Clean up files */ remtree(path); if (hasold) { if (rename(oldp, path) == -1) pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_CHECKPOINT_FILE; } if (ckerr == EAGAIN) { return(PBSE_CKPBSY); } return(ckerr); } /* END mom_checkpoint_job() */ /* * post_checkpoint - post processor for start_checkpoint() * * @see scan_for_terminated() - parent * * Called from scan_for_terminated() when found in ji_mompost; * * This sets the "has checkpoint image" bit in the job. * * job is referenced by parent after calling this routine - do not 'purge' * job from inside this routine */ void post_checkpoint( job *pjob, /* I - may be purged */ int ev) /* I */ { char path[MAXPATHLEN + 1]; DIR *dir; struct dirent *pdir; tm_task_id tid; task *ptask; int abort = pjob->ji_flags & MOM_CHECKPOINT_ACTIVE; exiting_tasks = 1; /* make sure we call scan_for_exiting() */ pjob->ji_flags &= ~MOM_CHECKPOINT_ACTIVE; if (ev == 0) { pjob->ji_qs.ji_svrflags |= JOB_SVFLG_CHECKPOINT_FILE; return; } /* since checkpointing failed, clear out checkpoint name and time */ pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_flags = 0; pjob->ji_wattr[(int)JOB_ATR_checkpoint_time].at_flags = 0; /* ** If we get here, an error happened. Only try to recover ** if we had abort set. */ if (abort == 0) { return; } /* ** Set a flag for scan_for_exiting() to be able to ** deal with a failed checkpoint rather than doing ** the usual processing. */ pjob->ji_flags |= MOM_CHECKPOINT_POST; /* ** Set the TI_FLAGS_CHECKPOINT flag for each task that ** was checkpointed and aborted. */ get_chkpt_dir_to_use(pjob, path); dir = opendir(path); if (dir == NULL) { return; } while ((pdir = readdir(dir)) != NULL) { if (pdir->d_name[0] == '.') continue; tid = atoi(pdir->d_name); if (tid == 0) continue; ptask = task_find(pjob,tid); if (ptask == NULL) continue; ptask->ti_flags |= TI_FLAGS_CHECKPOINT; } /* END while ((pdir = readdir(dir)) != NULL) */ closedir(dir); return; } /* END post_checkpoint() */ /** * start_checkpoint - start a checkpoint going * * checkpoint done from a child because it takes a while * * @see blcr_checkpoint() - child * @see start_checkpoint() - parent */ int start_checkpoint( job *pjob, int abort, /* I - boolean - 0 or 1 */ struct batch_request *preq) /* may be null */ { pid_t pid; char *id = "start_checkpoint"; int rc = PBSE_NONE; char name_buffer[MAXPATHLEN + 1]; time_t time_now; time_now = time((time_t *)0); switch (checkpoint_system_type) { case CST_MACH_DEP: /* NO-OP */ break; case CST_BLCR: /* Build the name of the checkpoint file before forking to the child because * we want this name to persist and this won't work if we are the child. * Notice that the ATR_VFLAG_SEND bit is not set. We don't want this to go * to the pbs_server until the checkpoint has completed successfully. */ sprintf(name_buffer,"ckpt.%s.%d", pjob->ji_qs.ji_jobid, (int)time_now); decode_str(&pjob->ji_wattr[(int)JOB_ATR_checkpoint_name], NULL, NULL, name_buffer); pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; /* Set the checkpoint time so can determine if the checkpoint is recent */ pjob->ji_wattr[(int)JOB_ATR_checkpoint_time].at_val.at_long = (long)time_now; pjob->ji_wattr[(int)JOB_ATR_checkpoint_time].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; /* For BLCR, there must be a directory name in the job attributes. */ if (!(pjob->ji_wattr[(int)JOB_ATR_checkpoint_dir].at_flags & ATR_VFLAG_SET)) { /* No dir specified, use the default job checkpoint directory e.g. /var/spool/torque/checkpoint/42.host.domain.CK */ get_jobs_default_checkpoint_dir(pjob->ji_qs.ji_fileprefix, name_buffer); decode_str(&pjob->ji_wattr[(int)JOB_ATR_checkpoint_dir],NULL,NULL,name_buffer); } break; case CST_NONE: default: return(PBSE_NOSUP); /* no checkpoint, reject request */ /*NOTREACHED*/ break; } /* now set up as child of MOM */ pid = fork_me((preq == NULL) ? -1 : preq->rq_conn); if (pid > 0) { /* parent */ /* MOM_CHECKPOINT_ACTIVE prevents scan_for_exiting from triggering obits while job is checkpointing. */ pjob->ji_flags |= MOM_CHECKPOINT_ACTIVE; pjob->ji_momsubt = pid; /* record pid in job for when child terminates */ /* Set the address of a function to execute in scan_for_terminated */ pjob->ji_mompost = (int (*)())post_checkpoint; if (preq) free_br(preq); /* child will send reply */ } else if (pid < 0) { /* error on fork */ log_err(errno, id, "cannot fork child process for checkpoint"); return(PBSE_SYSTEM); } else { /* child - does the checkpoint */ switch (checkpoint_system_type) { case CST_MACH_DEP: rc = mom_checkpoint_job(pjob,abort); break; case CST_BLCR: /* we don't return from here, so we can do checkpoint in this process id */ rc = blcr_checkpoint_job(pjob,abort,preq); break; } if (rc == PBSE_NONE) { rc = site_mom_postchk(pjob,abort); /* Normally, this is an empty routine and does nothing. */ } if (preq != NULL) { /* rc may be 0, req_reject is used to pass auxcode */ req_reject(rc,PBS_CHECKPOINT_MIGRATE,preq,NULL,NULL); /* BAD reject is used to send OK??? */ } exit(rc); /* zero exit tells main checkpoint ok */ } return(PBSE_NONE); /* parent return */ } /* END start_checkpoint() */ /* ** Restart each task which has exited and has TI_FLAGS_CHECKPOINT turned on. ** If all tasks have been restarted, turn off MOM_CHECKPOINT_POST. */ void checkpoint_partial( job *pjob) { static char id[] = "checkpoint_partial"; char namebuf[MAXPATHLEN]; task *ptask; int texit = 0; assert(pjob != NULL); get_jobs_default_checkpoint_dir(pjob->ji_qs.ji_fileprefix, namebuf); for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { /* ** See if the task was marked as one of those that did ** actually checkpoint. */ if ((ptask->ti_flags & TI_FLAGS_CHECKPOINT) == 0) continue; texit++; /* ** Now see if it was reaped. We don't want to ** fool with it until we see it die. */ if (ptask->ti_qs.ti_status != TI_STATE_EXITED) continue; texit--; if (mach_restart(ptask, namebuf) == -1) { pjob->ji_flags &= ~MOM_CHECKPOINT_POST; kill_job(pjob, SIGKILL, id, "failed to restart"); return; } ptask->ti_qs.ti_status = TI_STATE_RUNNING; ptask->ti_flags &= ~TI_FLAGS_CHECKPOINT; task_save(ptask); } if (texit == 0) { char oldname[MAXPATHLEN]; struct stat statbuf; /* ** All tasks should now be running. ** Turn off MOM_CHECKPOINT_POST flag so job is back to where ** it was before the bad checkpoint attempt. */ pjob->ji_flags &= ~MOM_CHECKPOINT_POST; /* ** Get rid of incomplete checkpoint directory and ** move old checkpoint dir back to regular if it exists. */ remtree(namebuf); strcpy(oldname, namebuf); strcat(oldname, ".old"); if (stat(oldname, &statbuf) == 0) { if (rename(oldname, namebuf) == -1) pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_CHECKPOINT_FILE; } } } /* END checkpoint_partial() */ /* BLCR version of restart */ int blcr_restart_job( job *pjob) { char *id = "blcr_restart_job"; int pid; char sid[20]; char *arg[20]; extern char restart_script_name[MAXPATHLEN + 1]; task *ptask; char buf[1024]; char namebuf[MAXPATHLEN + 1]; char restartfile[MAXPATHLEN + 1]; char script_buf[MAXPATHLEN + 1]; char **ap; #ifdef USEJOBCREATE uint64_t job_id; #endif /* USEJOBCREATE */ /* if a restart script is defined launch it */ if (restart_script_name[0] == '\0') { log_err(PBSE_RMEXIST, id, "No restart script defined"); return(PBSE_RMEXIST); } /* BLCR is not for parallel jobs, there can only be one task in the job. */ ptask = (task *) GET_NEXT(pjob->ji_tasks); if (ptask == NULL) { /* turns out if we are restarting a complete job then ptask will be null and we need to create a task We'll just create one task*/ if ((ptask = pbs_task_create(pjob, TM_NULL_TASK)) == NULL) { log_err(PBSE_RMNOPARAM, id, "Job has no tasks"); return(PBSE_RMNOPARAM); } strcpy(ptask->ti_qs.ti_parentjobid, pjob->ji_qs.ji_jobid); ptask->ti_qs.ti_parentnode = 0; ptask->ti_qs.ti_parenttask = 0; ptask->ti_qs.ti_task = 0; } #ifdef USEJOBCREATE /* * Get a job id from the system */ job_id = get_jobid(pjob->ji_qs.ji_jobid); pjob->ji_wattr[(int)JOB_ATR_pagg_id].at_val.at_ll = job_id; pjob->ji_wattr[(int)JOB_ATR_pagg_id].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY; #endif /* USEJOBCREATE */ /* launch the script and return success */ pid = fork(); if (pid < 0) { /* fork failed */ return(PBSE_RMSYSTEM); } else if (pid > 0) { /* parent */ ptask->ti_qs.ti_sid = pid; /* Apparently torque doesn't do anything with the session ID that we pass back here... */ ptask->ti_qs.ti_status = TI_STATE_RUNNING; task_save(ptask); return(PBSE_NONE); } else if (pid == 0) { /* child: execv the script */ /* if there are missing .OU or .ER files create them, they were probably empty and server didn't send them */ /* TODO: check return value? */ create_missing_files(pjob); /* * Get jobs checkpoint directory. */ get_chkpt_dir_to_use(pjob, namebuf); /* Change the owner of the .SC to be the user */ strcpy(script_buf, path_jobs); strcat(script_buf, pjob->ji_qs.ji_fileprefix); strcat(script_buf, JOB_SCRIPT_SUFFIX); if (chown(script_buf, pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) { sprintf(log_buffer,"cannot change owner for file %s", script_buf); log_err(errno, id, log_buffer); } strcpy(restartfile, namebuf); strcat(restartfile, "/"); strcat(restartfile, pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_val.at_str); /* Change the owner of the checkpoint restart file to be the user */ if (chown(restartfile, pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) { log_err(errno, id, "cannot change checkpoint restart file owner"); } sprintf(sid, "%ld", pjob->ji_wattr[(int)JOB_ATR_session_id].at_val.at_long); arg[0] = restart_script_name; arg[1] = sid; arg[2] = SET_ARG(pjob->ji_qs.ji_jobid); arg[3] = SET_ARG(pjob->ji_wattr[(int)JOB_ATR_euser].at_val.at_str); arg[4] = SET_ARG(pjob->ji_wattr[(int)JOB_ATR_egroup].at_val.at_str); arg[5] = SET_ARG(namebuf); arg[6] = SET_ARG(pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_val.at_str); arg[7] = NULL; strcpy(buf, "restart args:"); for (ap = arg;*ap;ap++) { strcat(buf, " "); strcat(buf, *ap); } log_ext(-1, id, buf, LOG_DEBUG); log_close(0); if (lockfds >= 0) { close(lockfds); lockfds = -1; } net_close(-1); /* set us up with a new session */ pid = set_job(pjob, NULL); if (pid < 0) { perror("setsid"); exit(-1); } #ifdef ENABLE_CSA /* * Add a workload management start record */ add_wkm_start(job_id, pjob->ji_qs.ji_jobid); #endif /* ENABLE_CSA */ execv(arg[0], arg); } /* END if (pid == 0) */ return(PBSE_NONE); } /* END blcr_restart_job() */ /* start each task based on task checkpoint records located job-specific checkpoint directory */ int mom_restart_job(job *pjob) { static char id[] = "mom_restart_job"; char path[MAXPATHLEN]; int i; char namebuf[MAXPATHLEN]; char *filnam; DIR *dir; struct dirent *pdir; tm_task_id taskid; task *ptask; int tcount = 0; long mach_restart(task *, char *path); get_jobs_default_checkpoint_dir(pjob->ji_qs.ji_fileprefix, namebuf); if ((dir = opendir(path)) == NULL) { sprintf(log_buffer, "opendir %s", path); log_err(errno, id, log_buffer); return(PBSE_RMEXIST); } strcpy(namebuf, path); strcat(namebuf, "/"); i = strlen(namebuf); filnam = &namebuf[i]; while ((pdir = readdir(dir)) != NULL) { if (strlen(pdir->d_name) <= 2) continue; if ((taskid = (tm_task_id)atoi(pdir->d_name)) == 0) { sprintf(log_buffer, "%s: garbled filename %s", pjob->ji_qs.ji_jobid, pdir->d_name); goto fail; } if ((ptask = task_find(pjob, taskid)) == NULL) { sprintf(log_buffer, "%s: task %d not found", pjob->ji_qs.ji_jobid, (int)taskid); goto fail; } strcpy(filnam, pdir->d_name); if (mach_restart(ptask, namebuf) == -1) { sprintf(log_buffer, "%s: task %d failed from file %s", pjob->ji_qs.ji_jobid, (int)taskid, namebuf); goto fail; } ptask->ti_qs.ti_status = TI_STATE_RUNNING; if (LOGLEVEL >= 6) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "task set to running (mom_restart_job)"); } task_save(ptask); tcount++; } closedir(dir); sprintf(log_buffer, "Restarted %d tasks", tcount); LOG_EVENT( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); return(PBSE_NONE); fail: log_err(errno, id, log_buffer); closedir(dir); return(PBSE_RMEXIST); } /* END mom_restart_job() */ /** * mom_checkpoint_init_job_periodic_timer * * The routine is called from TMomFinalizeJob1 in start_exec.c. * This code initializes checkpoint related variables in the job struct. * * @param pjob Pointer to job structure * @see TMomFinalizeJob1 */ void mom_checkpoint_init_job_periodic_timer(job *pjob) { attribute *pattr; char *vp; /* Should we set up the job for periodic checkpoint? */ pattr = &pjob->ji_wattr[(int)JOB_ATR_checkpoint]; if ((pattr->at_flags & ATR_VFLAG_SET) && (csv_find_string(pattr->at_val.at_str, "c") || csv_find_string(pattr->at_val.at_str, "periodic"))) { /* Yes, what is the interval in minutes. */ if ((vp = csv_find_value(pattr->at_val.at_str, "c")) || (vp = csv_find_value(pattr->at_val.at_str, "interval"))) { /* has checkpoint time (in minutes), convert to seconds */ pjob->ji_checkpoint_time = atoi(vp) * 60; pjob->ji_checkpoint_next = pjob->ji_checkpoint_time; } else { /* pick a default number of minutes */ pjob->ji_checkpoint_time = default_checkpoint_interval * 60; pjob->ji_checkpoint_next = pjob->ji_checkpoint_time; } } } /** * mom_checkpoint_job_has_checkpoint * * The routine is called from TMomFinalizeJob1 in start_exec.c. * It checks to see if the job has a checkpoint file to restart from. * * @param pjob Pointer to job structure * @see TMomFinalizeJob1 */ int mom_checkpoint_job_has_checkpoint( job *pjob) { /* Has the job has been checkpointed? */ switch (checkpoint_system_type) { case CST_MACH_DEP: if (pjob->ji_qs.ji_svrflags & (JOB_SVFLG_CHECKPOINT_FILE | JOB_SVFLG_CHECKPOINT_MIGRATEABLE)) { char buf[MAXPATHLEN + 2]; struct stat sb; /* Does the checkpoint directory exist? */ if (pjob->ji_wattr[(int)JOB_ATR_checkpoint_dir].at_flags & ATR_VFLAG_SET) { /* The job has a checkpoint directory specified, use it. */ strcpy(buf, pjob->ji_wattr[(int)JOB_ATR_checkpoint_dir].at_val.at_str); } else { /* Otherwise, use the default job checkpoint directory /var/spool/torque/checkpoint/42.host.domain.CK */ get_jobs_default_checkpoint_dir(pjob->ji_qs.ji_fileprefix, buf); } if (stat(buf, &sb) != 0) /* stat(buf) tests if the checkpoint directory exists */ { /* We thought there was a checkpoint but the directory was not there. */ pjob->ji_qs.ji_svrflags &= ~(JOB_SVFLG_CHECKPOINT_FILE | JOB_SVFLG_CHECKPOINT_MIGRATEABLE); break; } } if (LOGLEVEL >= 7) { log_ext(-1, "mom_checkpoint_job_has_checkpoint", "TRUE", LOG_DEBUG); } return(TRUE); /* Yes, there is a checkpoint. */ break; case CST_BLCR: if (pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_flags & ATR_VFLAG_SET) { if (LOGLEVEL >= 7) { log_ext(-1, "mom_checkpoint_job_has_checkpoint", "TRUE", LOG_DEBUG); } return(TRUE); } break; } if (LOGLEVEL >= 7) { log_ext(-1, "mom_checkpoint_job_has_checkpoint", "FALSE", LOG_DEBUG); } return(FALSE); /* No checkpoint attribute on job. */ } /** * mom_checkpoint_start_restart * * The routine is called from TMomFinalizeJob1 in start_exec.c. * This code initializes checkpoint related variables in the job struct. * If there is a checkpoint file, the job is restarted from this image. * * @param pjob Pointer to job structure * @see TMomFinalizeJob1 */ int mom_checkpoint_start_restart( job *pjob) { /* static char *id = "mom_checkpoint_start_restart"; */ int rc = PBSE_NONE; /* At this point we believe that there is a checkpoint image, try to restart it. */ switch (checkpoint_system_type) { case CST_MACH_DEP: /* perform any site required setup before restart, normally empty and does nothing */ if ((rc = site_mom_prerst(pjob)) != PBSE_NONE) { return(rc); } rc = mom_restart_job(pjob); /* Iterate over files in checkpoint dir, restarting all files found. */ break; case CST_BLCR: /* NOTE: partition creation handled in blcr_restart_job() */ /* make sure we recreate the nodes file, if needed */ if (pjob->ji_flags & MOM_HAS_NODEFILE) { if (write_nodes_to_file(pjob) == -1) { return(FAILURE); } if (write_gpus_to_file(pjob) == -1) { return(FAILURE); } } /* perform any site required setup before restart, normally empty and does nothing */ if ((rc = site_mom_prerst(pjob)) != PBSE_NONE) { return(rc); } rc = blcr_restart_job(pjob); break; case CST_NONE: default: return(PBSE_NOSUP); /*NOTREACHED*/ break; } return(rc); } /* END mom_checkpoint_start_restart() */ /* this file creates missing stderr/stdout files before restarting the checkpointed job. This was designed for BLCR checkpointing. empty .OU or .ER files are not retained by the server, so if we are restarting a checkpointed job then they will not get sent back out to use. the blcr restart command will expect these files to exist, even if empty. If any expected files are missing we create them here */ /* TODO: this needs to be modified to work with user .pbs_spool directories */ int create_missing_files(job *pjob) { int should_have_stderr; int should_have_stdout; attribute *pattr; char *pstr; char *namebuf; int bufsize; int files_created = 0; int fd; should_have_stderr = TRUE; should_have_stdout = TRUE; pattr = &pjob->ji_wattr[(int)JOB_ATR_join]; if (pattr->at_flags & ATR_VFLAG_SET) { pstr = pattr->at_val.at_str; if ((pstr != NULL) && (*pstr != '\0') && (*pstr != 'n')) { /* if not the first letter, and in list - is joined */ if ((*pstr != 'e') && (strchr(pstr + 1, (int)'e'))) { should_have_stderr = FALSE; /* being joined */ } else if ((*pstr != 'o') && (strchr(pstr + 1, (int)'o'))) { should_have_stdout = FALSE; } } } if (should_have_stdout) { bufsize = strlen(pjob->ji_qs.ji_fileprefix) + strlen(path_spool) + strlen(JOB_STDOUT_SUFFIX) + 1; namebuf = malloc(bufsize * sizeof(char)); if (namebuf == NULL) { return -1; } strcpy(namebuf, path_spool); strcat(namebuf, pjob->ji_qs.ji_fileprefix); strcat(namebuf, JOB_STDOUT_SUFFIX); if (access(namebuf, F_OK) != 0) { if ((fd = creat(namebuf, S_IRUSR | S_IWUSR)) > 0) { if (fchown(fd, pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) { log_err(errno, "create_missing_files", "cannot change file owner"); } close(fd); ++files_created; } else { /* couldn't create the file, why could this happen, TODO: what should we do? */ } } free(namebuf); } if (should_have_stderr) { bufsize = strlen(pjob->ji_qs.ji_fileprefix) + strlen(path_spool) + strlen(JOB_STDOUT_SUFFIX) + 1; namebuf = malloc(bufsize * sizeof(char)); if (namebuf == NULL) { return -1; } strcpy(namebuf, path_spool); strcat(namebuf, pjob->ji_qs.ji_fileprefix); strcat(namebuf, JOB_STDERR_SUFFIX); if (access(namebuf, F_OK) != 0) { if ((fd = creat(namebuf, S_IRUSR | S_IWUSR)) > 0) { if (fchown(fd, pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) { log_err(errno, "create_missing_files", "cannot change file ownership"); } close(fd); ++files_created; } else { /* couldn't create the file, why could this happen, TODO: what should we do? */ } } free(namebuf); } return files_created; }