#include "license_pbs.h" /* See here for the software license */ /* * The entry point function for MOM. */ #include /* the master config generated by configure */ #include "mom_main.h" #include #include #include #include #include #include #ifdef _CRAY #include #endif /* _CRAY */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if (PLOCK_DAEMONS & 4) #include #endif /* PLOCK_DAEMONS */ #include #include #include #ifdef _CRAY #include #include #include #endif /* _CRAY */ #include #include #include #include #include "libpbs.h" #include "pbs_ifl.h" #include "server_limits.h" #include "list_link.h" #include "attribute.h" #include "resource.h" #include "pbs_job.h" #include "mom_mach.h" #include "mom_func.h" #include "mom_comm.h" #include "svrfunc.h" #include "pbs_error.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "../lib/Liblog/chk_file_sec.h" #include "../lib/Liblog/setup_env.h" #include "../lib/Libnet/lib_net.h" /* socket_avail_bytes_on_descriptor */ #include "../lib/Libifl/lib_ifl.h" #include "net_connect.h" #include "dis.h" #include "dis_init.h" #include "resmon.h" #include "pbs_nodes.h" #include "dis.h" #include "csv.h" #include "utils.h" #include "u_tree.h" #ifdef PENABLE_LINUX26_CPUSETS #include "pbs_cpuset.h" #include "node_internals.hpp" #endif #include "threadpool.h" #include "mom_hierarchy.h" #include "../lib/Libutils/u_lock_ctl.h" /* lock_init */ #include "mom_server.h" #include "mom_job_func.h" /* mom_job_purge */ #include "net_cache.h" #include "mom_job_cleanup.h" #include "mcom.h" #include "mom_server_lib.h" /* shutdown_to_server */ #ifdef NOPOSIXMEMLOCK #undef _POSIX_MEMLOCK #endif /* NOPOSIXMEMLOCK */ #ifdef _POSIX_MEMLOCK #include #endif /* _POSIX_MEMLOCK */ #ifdef HAVE_SYS_STATVFS_H #include #endif #define MAX_UPDATES_BEFORE_SENDING 20 #define PMOMTCPTIMEOUT 60 /* duration in seconds mom TCP requests will block */ #define TCP_READ_PROTO_TIMEOUT 2 #define DEFAULT_JOB_EXIT_WAIT_TIME 600 #define MAX_JOIN_WAIT_TIME 600 #define RESEND_WAIT_TIME 300 /* Global Data Items */ int MOMIsLocked = 0; int MOMIsPLocked = 0; int ServerStatUpdateInterval = DEFAULT_SERVER_STAT_UPDATES; int CheckPollTime = CHECK_POLL_TIME; int ForceServerUpdate = 0; int verbositylevel = 0; double cputfactor = 1.00; unsigned int default_server_port = 0; int exiting_tasks = 0; float ideal_load_val = -1.0; int internal_state = 0; /* info useful when analyzing core file */ char Torque_Info_Version[] = PACKAGE_VERSION; char Torque_Info_Version_Revision[] = GIT_HASH; char Torque_Info_Component[] = "pbs_mom"; char Torque_Info_SysVersion[BUF_SIZE]; /* mom data items */ #ifdef NUMA_SUPPORT int num_node_boards; nodeboard node_boards[MAX_NODE_BOARDS]; int numa_index; #else char path_meminfo[MAX_LINE]; #endif extern pthread_mutex_t log_mutex; int thread_unlink_calls = FALSE; /* by default, enforce these policies */ int ignwalltime = 0; int ignmem = 0; int igncput = 0; int ignvmem = 0; /* end policies */ int spoolasfinalname = 0; int maxupdatesbeforesending = MAX_UPDATES_BEFORE_SENDING; char *apbasil_path = NULL; char *apbasil_protocol = NULL; int reject_job_submit = 0; int attempttomakedir = 0; int reduceprologchecks; int lockfds = -1; int multi_mom = 0; time_t loopcnt; /* used for MD5 calc */ float max_load_val = -1.0; int hostname_specified = 0; char mom_host[PBS_MAXHOSTNAME + 1]; char mom_alias[PBS_MAXHOSTNAME + 1]; char TMOMRejectConn[MAXLINE]; /* most recent rejected connection */ char mom_short_name[PBS_MAXHOSTNAME + 1]; int num_var_env; bool received_cluster_addrs; time_t requested_cluster_addrs; time_t first_update_time = 0; char *path_epilog; char *path_epilogp; char *path_epiloguser; char *path_epiloguserp; char *path_epilogpdel; char *path_jobs; char *path_prolog; char *path_prologp; char *path_prologuser; char *path_prologuserp; char *path_mom_hierarchy; char *path_spool; char *path_undeliv; char *path_aux; char *path_home = (char *)PBS_SERVER_HOME; char *mom_home; extern dynamic_string *mom_status; extern int multi_mom; char *path_layout; extern char *msg_daemonname; /* for logs */ extern char *msg_info_mom; /* Mom information message */ gid_t pbsgroup; uid_t pbsuser; unsigned int pbs_mom_port = 0; unsigned int pbs_rm_port = 0; tlist_head mom_polljobs; /* jobs that must have resource limits polled */ tlist_head svr_newjobs; /* jobs being sent to MOM */ tlist_head svr_alljobs; /* all jobs under MOM's control */ tlist_head mom_varattrs; /* variable attributes */ int termin_child = 0; /* boolean - one or more children need to be terminated this iteration */ int exec_with_exec = 0; time_t time_now = 0; time_t last_poll_time = 0; extern tlist_head svr_requests; extern struct var_table vtable; /* see start_exec.c */ double wallfactor = 1.00; long log_file_max_size = 0; long log_file_roll_depth = 1; int job_oom_score_adjust = 0; /* no oom score adjust by default */ int mom_oom_immunize = 1; /* make pbs_mom processes immune? no by default */ int job_exit_wait_time = DEFAULT_JOB_EXIT_WAIT_TIME; int max_join_job_wait_time = MAX_JOIN_WAIT_TIME; int resend_join_job_wait_time = RESEND_WAIT_TIME; int mom_hierarchy_retry_time = NODE_COMM_RETRY_TIME; time_t last_log_check; char *nodefile_suffix = NULL; /* suffix to append to each host listed in job host file */ char *submithost_suffix = NULL; /* suffix to append to submithost for interactive jobs */ char *TNoSpoolDirList[TMAX_NSDCOUNT]; char *TRemChkptDirList[TMAX_RCDCOUNT]; char JobsToResend[MAX_RESEND_JOBS][PBS_MAXSVRJOBID+1]; resizable_array *exiting_job_list; resizable_array *things_to_resend; char *AllocParCmd = NULL; /* (alloc) */ int src_login_batch = TRUE; int src_login_interactive = TRUE; mom_hierarchy_t *mh; char jobstarter_exe_name[MAXPATHLEN + 1]; int jobstarter_set = 0; #ifdef PENABLE_LINUX26_CPUSETS node_internals internal_layout; hwloc_topology_t topology = NULL; /* system topology */ int memory_pressure_threshold = 0; /* 0: off, >0: check and kill */ short memory_pressure_duration = 0; /* 0: off, >0: check and kill */ int MOMConfigUseSMT = 1; /* 0: off, 1: on */ #endif int is_reporter_mom = FALSE; int is_login_node = FALSE; /* externs */ char *server_alias = NULL; extern unsigned int pe_alarm_time; extern long MaxConnectTimeout; extern resizable_array *received_statuses; extern hash_table_t *received_table; time_t pbs_tcp_timeout = PMOMTCPTIMEOUT; char tmpdir_basename[MAXPATHLEN]; /* for $TMPDIR */ char rcp_path[MAXPATHLEN]; char rcp_args[MAXPATHLEN]; char xauth_path[MAXPATHLEN]; time_t LastServerUpdateTime = 0; /* NOTE: all servers updated together */ int UpdateFailCount = 0; time_t MOMStartTime = 0; int MOMPrologTimeoutCount; int MOMPrologFailureCount; char MOMConfigVersion[64]; char MOMUNameMissing[64]; int MOMConfigDownOnError = 0; int MOMConfigRestart = 0; int MOMConfigRReconfig = 0; long system_ncpus = 0; char *auto_ideal_load = NULL; char *auto_max_load = NULL; #ifdef NVIDIA_GPUS int MOMNvidiaDriverVersion = 0; int use_nvidia_gpu = TRUE; #endif /* NVIDIA_GPUS */ #define TMAX_JE 64 pjobexec_t TMOMStartInfo[TMAX_JE]; /* prototypes */ void sort_paths(); void resend_things(); extern void add_resc_def(char *, char *); extern void mom_server_all_diag(std::stringstream &output); extern void mom_server_all_init(void); extern void mom_server_all_update_stat(void); extern void mom_server_all_update_gpustat(void); extern int mark_for_resend(job *); extern int mom_server_add(const char *name); extern int mom_server_count; extern int post_epilogue(job *, int); extern int mom_checkpoint_init(void); extern void mom_checkpoint_check_periodic_timer(job *pjob); extern void mom_checkpoint_set_directory_path(const char *str); #ifdef NVIDIA_GPUS #ifdef NVML_API extern int init_nvidia_nvml(); extern int shut_nvidia_nvml(); #endif /* NVML_API */ extern int check_nvidia_setup(); #endif /* NVIDIA_GPUS */ int send_join_job_to_a_sister(job *pjob, int stream, eventent *ep, tlist_head phead, int node_id); void prepare_child_tasks_for_delete(); static void mom_lock(int fds, int op); #ifdef NUMA_SUPPORT int setup_nodeboards(); #else #ifdef PENABLE_LINUX26_CPUSETS void recover_cpuset_reservation(job &pjob); #endif #endif /* NUMA_SUPPORT */ /* Local Data Items */ static char *log_file = NULL; enum PMOMStateEnum { MOM_RUN_STATE_RUNNING, MOM_RUN_STATE_EXIT, MOM_RUN_STATE_KILLALL, MOM_RUN_STATE_RESTART, MOM_RUN_STATE_LAST }; static enum PMOMStateEnum mom_run_state; static int recover = JOB_RECOV_RUNNING; static int recover_set = FALSE; static int call_hup = 0; static int nconfig; static char *path_log; struct config_list { struct config c; struct config_list *c_link; }; /* NOTE: must adjust RM_NPARM in resmom.h to be larger than number of parameters specified below */ static unsigned long setxauthpath(const char *); static unsigned long setrcpcmd(const char *); static unsigned long setpbsclient(const char *); static unsigned long configversion(const char *); static unsigned long cputmult(const char *); static unsigned long setallocparcmd(const char *); static unsigned long setidealload(const char *); static unsigned long setignwalltime(const char *); static unsigned long setignmem(const char *); static unsigned long setigncput(const char *); static unsigned long setignvmem(const char *); static unsigned long setlogevent(const char *); static unsigned long setloglevel(const char *); static unsigned long setumask(const char *); static unsigned long setpreexec(const char *); static unsigned long setmaxload(const char *); static unsigned long setenablemomrestart(const char *); static unsigned long prologalarm(const char *); static unsigned long restricted(const char *); static unsigned long jobstartblocktime(const char *); static unsigned long usecp(const char *); static unsigned long wallmult(const char *); static unsigned long setpbsserver(const char *); static unsigned long setnodecheckscript(const char *); unsigned long setnodecheckinterval(const char *); static unsigned long settimeout(const char *); extern unsigned long mom_checkpoint_set_checkpoint_interval(const char *); extern unsigned long mom_checkpoint_set_checkpoint_script(const char *); extern unsigned long mom_checkpoint_set_restart_script(const char *); extern unsigned long mom_checkpoint_set_checkpoint_run_exe_name(const char *); static unsigned long setdownonerror(const char *); static unsigned long setstatusupdatetime(const char *); static unsigned long setcheckpolltime(const char *); static unsigned long settmpdir(const char *); static unsigned long setlogfilemaxsize(const char *); static unsigned long setlogfilerolldepth(const char *); static unsigned long setlogfilesuffix(const char *); static unsigned long setlogdirectory(const char *); static unsigned long setlogkeepdays(const char *); static unsigned long setvarattr(const char *); static unsigned long setautoidealload(const char *); static unsigned long setautomaxload(const char *); static unsigned long setnodefilesuffix(const char *); static unsigned long setnospooldirlist(const char *); static unsigned long setmomhost(const char *); static unsigned long setrreconfig(const char *); static unsigned long setsourceloginbatch(const char *); static unsigned long setsourcelogininteractive(const char *); static unsigned long setspoolasfinalname(const char *); static unsigned long setattempttomakedir(const char *); static unsigned long setremchkptdirlist(const char *); static unsigned long setmaxconnecttimeout(const char *); unsigned long aliasservername(const char *); unsigned long jobstarter(const char *value); #ifdef PENABLE_LINUX26_CPUSETS static unsigned long setusesmt(const char *); static unsigned long setmempressthr(const char *); static unsigned long setmempressdur(const char *); #endif static unsigned long setreduceprologchecks(const char *); static unsigned long setextpwdretry(const char *); static unsigned long setexecwithexec(const char *); static unsigned long setmaxupdatesbeforesending(const char *); static unsigned long setthreadunlinkcalls(const char *); static unsigned long setapbasilpath(const char *); static unsigned long setapbasilprotocol(const char *); static unsigned long setreportermom(const char *); static unsigned long setloginnode(const char *); static unsigned long setrejectjobsubmission(const char *); static unsigned long setjoboomscoreadjust(const char *); static unsigned long setmomoomimmunize(const char *); unsigned long setjobexitwaittime(const char *); unsigned long setmaxjoinjobwaittime(const char *); unsigned long setresendjoinjobwaittime(const char *); unsigned long setmomhierarchyretrytime(const char *); static struct specials { const char *name; u_long(*handler)(const char *); } special[] = { { "alloc_par_cmd", setallocparcmd }, { "auto_ideal_load", setautoidealload }, { "auto_max_load", setautomaxload }, { "xauthpath", setxauthpath }, { "rcpcmd", setrcpcmd }, { "rcp_cmd", setrcpcmd }, { "pbsclient", setpbsclient }, { "configversion", configversion }, { "cputmult", cputmult }, { "ideal_load", setidealload }, { "ignwalltime", setignwalltime }, { "ignmem", setignmem }, { "igncput", setigncput }, { "ignvmem", setignvmem }, { "logevent", setlogevent }, { "loglevel", setloglevel }, { "max_load", setmaxload }, { "enablemomrestart", setenablemomrestart }, { "prologalarm", prologalarm }, { "restricted", restricted }, { "jobstartblocktime", jobstartblocktime }, { "usecp", usecp }, { "wallmult", wallmult }, { "clienthost", setpbsserver }, /* deprecated - use pbsserver */ { "pbsserver", setpbsserver }, { "node_check_script", setnodecheckscript }, { "node_check_interval", setnodecheckinterval }, { "timeout", settimeout }, { "checkpoint_interval", mom_checkpoint_set_checkpoint_interval }, { "checkpoint_script", mom_checkpoint_set_checkpoint_script }, { "restart_script", mom_checkpoint_set_restart_script }, { "checkpoint_run_exe", mom_checkpoint_set_checkpoint_run_exe_name }, { "down_on_error", setdownonerror }, { "status_update_time", setstatusupdatetime }, { "check_poll_time", setcheckpolltime }, { "tmpdir", settmpdir }, { "log_directory", setlogdirectory }, { "log_file_max_size", setlogfilemaxsize }, { "log_file_roll_depth", setlogfilerolldepth }, { "log_file_suffix", setlogfilesuffix }, { "log_keep_days", setlogkeepdays }, { "varattr", setvarattr }, { "nodefile_suffix", setnodefilesuffix }, { "nospool_dir_list", setnospooldirlist }, { "mom_host", setmomhost }, { "remote_reconfig", setrreconfig }, { "job_output_file_umask", setumask }, { "preexec", setpreexec }, { "source_login_batch", setsourceloginbatch }, { "source_login_interactive", setsourcelogininteractive }, { "spool_as_final_name", setspoolasfinalname }, { "remote_checkpoint_dirs", setremchkptdirlist }, { "max_conn_timeout_micro_sec", setmaxconnecttimeout }, { "alias_server_name", aliasservername }, { "job_starter", jobstarter}, #ifdef PENABLE_LINUX26_CPUSETS { "use_smt", setusesmt }, { "memory_pressure_threshold", setmempressthr }, { "memory_pressure_duration", setmempressdur }, #endif { "reduce_prolog_checks", setreduceprologchecks}, { "thread_unlink_calls", setthreadunlinkcalls }, { "attempt_to_make_dir", setattempttomakedir }, { "ext_pwd_retry", setextpwdretry }, { "exec_with_exec", setexecwithexec }, { "max_updates_before_sending", setmaxupdatesbeforesending }, { "apbasil_path", setapbasilpath }, { "reject_job_submission", setrejectjobsubmission }, { "apbasil_protocol", setapbasilprotocol }, { "reporter_mom", setreportermom }, { "login_node", setloginnode }, { "job_oom_score_adjust", setjoboomscoreadjust }, { "mom_oom_immunize", setmomoomimmunize }, { "job_exit_wait_time", setjobexitwaittime }, { "max_join_job_wait_time", setmaxjoinjobwaittime}, { "resend_join_job_wait_time", setresendjoinjobwaittime}, { "mom_hierarchy_retry_time", setmomhierarchyretrytime}, { NULL, NULL } }; static const char *arch(struct rm_attribute *); static const char *opsys(struct rm_attribute *); static const char *requname(struct rm_attribute *); static const char *validuser(struct rm_attribute *); static const char *reqmsg(struct rm_attribute *); const char *reqgres(struct rm_attribute *); static const char *reqstate(struct rm_attribute *); static const char *getjoblist(struct rm_attribute *); static const char *reqvarattr(struct rm_attribute *); /* static const char *nullproc(struct rm_attribute *); */ struct config common_config[] = { { "arch", {arch} }, /* machine architecture */ { "opsys", {opsys} }, /* operating system */ { "uname", {requname} }, /* user name ??? */ { "validuser", {validuser} }, /* valid user ??? */ { "message", {reqmsg} }, /* message ??? */ { "gres", {reqgres} }, /* generic resource (licenses...) */ { "state", {reqstate} }, /* state of pbs_mom */ { "jobs", {getjoblist} }, /* job list this pbs_mom */ { "varattr", {reqvarattr} }, /* ??? */ { NULL, {NULL} } }; int LOGLEVEL = 0; /* valid values (0 - 10) */ int LOGKEEPDAYS = 0; /* days each log file should be kept before deleting */ int EXTPWDRETRY = 3; /* # of times to try external pwd check */ int DEBUGMODE = 0; int DOBACKGROUND = 1; char DEFAULT_UMASK[1024]; char PRE_EXEC[1024]; long TJobStartBlockTime = 5; /* seconds to wait for job to launch before backgrounding */ long TJobStartTimeout = 300; /* seconds to wait for job to launch before purging */ char *ret_string; long ret_size; struct config *config_array = NULL; struct config_list *config_list = NULL; sigset_t allsigs; int rm_errno; unsigned int reqnum = 0; /* the packet number */ #ifndef NOPRIVPORTS int port_care = TRUE; /* secure connecting ports */ #else int port_care = FALSE; /* configured without priv ports, don't care about them */ #endif uid_t uid = 0; /* uid we are running with */ unsigned int alarm_time = 10; /* time before alarm */ extern AvlTree okclients; /* accept connections from */ char **maskclient = NULL; /* wildcard connections */ int mask_num = 0; int mask_max = 0; u_long localaddr = 0; char extra_parm[] = "extra parameter(s)"; char no_parm[] = "required parameter not found"; char varattr_delimiter[] = ";"; int cphosts_num = 0; struct cphosts *pcphosts = NULL; static int config_file_specified = 0; static char config_file[_POSIX_PATH_MAX] = "config"; char PBSNodeMsgBuf[1024]; char PBSNodeCheckPath[1024]; int PBSNodeCheckInterval = 1; int PBSNodeCheckProlog = 0; int PBSNodeCheckEpilog = 0; static time_t MOMExeTime = 0; /* sync w/#define JOB_SUBSTATE_XXX (in include/pbs_job.h)*/ const char *PJobSubState[] = { "TRANSIN", /* Transit in, wait for commit */ "TRANSICM", /* Transit in, wait for commit */ "TRNOUT", /* transiting job outbound */ "TRNOUTCM", /* transiting outbound, rdy to commit */ "SUBSTATE04", "SUBSTATE05", "SUBSTATE06", "SUBSTATE07", "SUBSTATE08", "SUBSTATE09", "QUEUED", /* job queued and ready for selection */ "PRESTAGEIN", /* job queued, has files to stage in */ "SUBSTATE12", "SYNCRES", /* job waiting on sync start ready */ "STAGEIN", /* job staging in files then wait */ "STAGEGO", /* job staging in files and then run */ "STAGECMP", /* job stage in complete */ "SUBSTATE17", "SUBSTATE18", "SUBSTATE19", "HELD", /* job held - user or operator */ "SYNCHOLD", /* job held - waiting on sync regist */ "DEPNHOLD", /* job held - waiting on dependency */ "SUBSTATE23", "SUBSTATE24", "SUBSTATE25", "SUBSTATE26", "SUBSTATE27", "SUBSTATE28", "SUBSTATE29", "WAITING", /* job waiting on execution time */ "SUBSTATE31", "SUBSTATE32", "SUBSTATE33", "SUBSTATE34", "SUBSTATE35", "SUBSTATE36", "STAGEFAIL", /* job held - file stage in failed */ "SUBSTATE38", "SUBSTATE39", "PRERUN", /* job sent to MOM to run */ "STARTING", /* final job start initiated */ "RUNNING", /* job running */ "SUSPEND", /* job suspended, CRAY only */ "SUBSTATE44", "SUBSTATE45", "SUBSTATE46", "SUBSTATE47", "SUBSTATE48", "WAIT_SISTER_KILL_CONFIRM", "EXITING", /* Start of job exiting processing */ "EXIT_WAIT", /* Waiting for a response from other mom's */ "STAGEOUT", /* job staging out (other) files */ "STAGEDEL", /* job deleteing staged out files */ "EXITED", /* job exit processing completed */ "ABORT", /* job is being aborted by server */ "NOTERM_REQUE", "PREOBIT", /* preobit job status */ "OBIT", /* (MOM) job obit notice sent */ "COMPLETED", "RERUN", /* job is rerun, recover output stage */ "RERUN1", /* job is rerun, stageout phase */ "RERUN2", /* job is rerun, delete files stage */ "RERUN3", /* job is rerun, mom delete job */ "RETSTD", /* job has checkpoint file, return stdout / stderr files to server * spool dir so that job can be restarted */ NULL }; /* sync w/#define IS_XXX */ const char *PBSServerCmds[] = { "NULL", "HELLO", "CLUSTER_ADDRS", "UPDATE", "STATUS", "GPU_STATUS", NULL }; /* ** These routines are in the "dependent" code. */ extern void dep_initialize(void); extern void dep_cleanup(void); /* External Functions */ extern void catch_child(int); extern void init_abort_jobs(int); extern void scan_for_exiting(); extern void scan_for_terminated(); extern int TMomCheckJobChild(pjobexec_t *, int, int *, int *); extern int TMomFinalizeJob3(pjobexec_t *, int, int, int *); extern void check_state(int); /* Local public functions */ static void stop_me(int); static void PBSAdjustLogLevel(int); int TMOMScanForStarting(void); unsigned long getsize(resource *); unsigned long gettime(resource *); /* Local private functions */ void check_log(void); const char *nullproc( struct rm_attribute *attrib) { log_err(-1, __func__, "should not be called"); return(NULL); } /* END nullproc() */ static const char *arch( struct rm_attribute *attrib) /* I */ { struct config *cp; if (attrib != NULL) { log_err(-1, __func__, extra_parm); rm_errno = RM_ERR_BADPARAM; return(NULL); } if (config_array == NULL) { return(PBS_MACH); } /* locate arch string */ for (cp = config_array;cp->c_name != NULL;cp++) { if (cp->c_u.c_value == NULL) continue; if (strcmp(cp->c_name, "arch")) continue; return(cp->c_u.c_value); } /* END for (cp) */ return(PBS_MACH); } /* END arch() */ static const char *opsys( struct rm_attribute *attrib) /* I */ { struct config *cp; if (attrib != NULL) { log_err(-1, __func__, extra_parm); rm_errno = RM_ERR_BADPARAM; return(NULL); } if (config_array == NULL) { return(PBS_MACH); } /* locate opsys string */ for (cp = config_array;cp->c_name != NULL;cp++) { if (cp->c_u.c_value == NULL) continue; if (strcmp(cp->c_name, "opsys")) continue; return(cp->c_u.c_value); } /* END for (cp) */ return(PBS_MACH); } /* END opsys() */ const char * getuname(void) { struct utsname n; static char *name = NULL; if (name == NULL) { if (uname(&n) == -1) { return(NULL); } sprintf(ret_string, "%s %s %s %s %s", n.sysname, n.nodename, n.release, n.version, n.machine); name = strdup(ret_string); } /* END if (name == NULL) */ return(name); } /* END getuname() */ static const char *reqmsg( struct rm_attribute *attrib) { if (attrib != NULL) { log_err(-1, __func__, extra_parm); rm_errno = RM_ERR_BADPARAM; return(NULL); } return(PBSNodeMsgBuf); } /* END reqmsg() */ static const char *getjoblist( struct rm_attribute *attrib) /* I */ { static char *list = NULL; static int listlen = 0; job *pjob; int firstjob = 1; #ifdef NUMA_SUPPORT char mom_check_name[PBS_MAXSERVERNAME]; char *dot; #endif if (list == NULL) { if ((list = (char *)calloc(BUFSIZ + 50, sizeof(char)))==NULL) { /* FAILURE - cannot alloc memory */ fprintf(stderr,"ERROR: could not calloc!\n"); /* since memory cannot be allocated, report no jobs */ return (" "); } listlen = BUFSIZ; } *list = '\0'; /* reset the list */ if ((pjob = (job *)GET_NEXT(svr_alljobs)) == NULL) { /* no jobs - return space character */ return(" "); } #ifdef NUMA_SUPPORT /* initialize the name to check for for this numa mom */ strcpy(mom_check_name,mom_host); if ((dot = strchr(mom_check_name,'.')) != NULL) *dot = '\0'; sprintf(mom_check_name + strlen(mom_check_name),"-%d/",numa_index); #endif for (;pjob != NULL;pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { #ifdef NUMA_SUPPORT /* skip over jobs that aren't on this node */ if (strstr(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str,mom_check_name) == NULL) continue; #endif if (!firstjob) strcat(list, " "); strcat(list, pjob->ji_qs.ji_jobid); if ((int)strlen(list) >= listlen) { int new_list_len = listlen + BUFSIZ; char *tmpList; tmpList = (char *)realloc(list, new_list_len); if (tmpList == NULL) { /* FAILURE - cannot alloc memory */ fprintf(stderr,"ERROR: could not realloc!\n"); /* since memory cannot be allocated, report no jobs */ return(" "); } memset(tmpList + listlen, 0, new_list_len - listlen); list = tmpList; listlen = new_list_len; } firstjob = 0; } /* END for (pjob) */ if (list[0] == '\0') { /* no jobs - return space character */ strcat(list, " "); } return(list); } /* END getjoblist() */ #define TMAX_VARBUF 65536 static const char *reqvarattr( struct rm_attribute *attrib) /* I */ { static char *list = NULL; char *child_spot; static int listlen = 0; struct varattr *pva; int fd, len, child_len; int first_line; FILE *child; char *ptr; char *ptr2; char tmpBuf[TMAX_VARBUF + 1]; if (list == NULL) { list = (char *)calloc(BUFSIZ + 1024, sizeof(char)); if (list == NULL) { /* FAILURE - cannot alloc memory */ log_err(errno, __func__, "cannot alloc memory"); return((char *)" "); } listlen = BUFSIZ; } *list = '\0'; /* reset the list */ if ((pva = (struct varattr *)GET_NEXT(mom_varattrs)) == NULL) { return((char *)" "); } for (;pva != NULL;pva = (struct varattr *)GET_NEXT(pva->va_link)) { /* loop for each $varattr parameter */ if ((pva->va_lasttime == 0) || (time_now >= (pva->va_ttl + pva->va_lasttime))) { if ((pva->va_ttl == -1) && (pva->va_lasttime != 0)) { if (pva->va_value[0] != '\0') { if (*list != '\0') strcat(list, varattr_delimiter); strcat(list, pva->va_value); } if ((int)strlen(list) >= listlen) { listlen += BUFSIZ; list = (char *)realloc(list, listlen); if (list == NULL) { log_err(errno, __func__, "cannot alloc memory"); return((char *)" "); } } continue; /* ttl of -1 is only run once */ } /* TTL is satisfied, reload value */ pva->va_lasttime = time_now; if (pva->va_value == NULL) pva->va_value = (char *)calloc(TMAX_VARBUF, sizeof(char)); /* execute script and get a new value */ if ((child = popen(pva->va_cmd, "r")) == NULL) { sprintf(pva->va_value, "error: %d %s", errno, strerror(errno)); } else { fd = fileno(child); child_spot = tmpBuf; child_len = 0; child_spot[0] = '\0'; while (child_len < TMAX_VARBUF) { len = read_ac_socket(fd, child_spot, TMAX_VARBUF - child_len); if ((len <= 0) && (errno != EINTR)) break; else if (len < 0) continue; child_len += len; child_spot += len; } if (len == -1) { /* FAILURE - cannot read var script output */ log_err(errno, __func__, "pipe read"); sprintf(pva->va_value, "? %d", RM_ERR_SYSTEM); pclose(child); continue; } /* SUCCESS */ pclose(child); tmpBuf[child_len] = '\0'; /* Transfer returned data into var value field */ first_line = TRUE; ptr = strtok(tmpBuf,"\n;"); ptr2 = pva->va_value; ptr2[0] = '\0'; /* * OUTPUT FORMAT: Take what script gives us. * Script should output 1 or more lines of Name=value1+value2+... */ while (ptr != NULL) { if (!first_line) strcat(ptr2,varattr_delimiter); strcat(ptr2,ptr); first_line = FALSE; ptr = strtok(NULL,"\n;"); } /* END while (ptr != NULL) */ } /* END else ((child = popen(pva->va_cmd,"r")) == NULL) */ } /* END if ((pva->va_lasttime == 0) || ...) */ if (pva->va_value[0] != '\0') { if (*list != '\0') strcat(list, varattr_delimiter); strcat(list, pva->va_value); } if ((int)strlen(list) >= listlen) { listlen += BUFSIZ; list = (char *)realloc(list, listlen); if (list == NULL) { log_err(errno, __func__, "cannot alloc memory"); return((char *)" "); } } } /* END for (pva) */ if (list[0] == '\0') strcat(list, " "); return(list); } /* END reqvarattr() */ const char *reqgres( struct rm_attribute *attrib) /* I (ignored) */ { struct config *cp; static char GResBuf[1024]; char tmpLine[1024]; int sindex; if (attrib != NULL) { log_err(-1, __func__, extra_parm); rm_errno = RM_ERR_BADPARAM; return(NULL); } /* build gres string */ /* FORMAT: :[+:]... */ GResBuf[0] = '\0'; if (config_array == NULL) { return(GResBuf); } for (cp = config_array; cp->c_name != NULL; cp++) { if (cp->c_u.c_value == NULL) continue; /* verify parameter is not special */ for (sindex = 0;sindex < RM_NPARM;sindex++) { if (special[sindex].name == NULL) break; if (!strcmp(special[sindex].name, cp->c_name)) break; } /* END for (sindex) */ if ((sindex < RM_NPARM) && (special[sindex].name != NULL) && (!strcmp(special[sindex].name, cp->c_name))) { /* specified parameter is special parameter */ continue; } /* verify parameter is not common */ /* Coverity reports (correctly) that RM_NPARM == 32, but common_config * only has 10 elements, so this loop's upper bound is dangerous. * However, the final element in common_config has c_name == NULL, * so the first test inside this loop safely breaks. */ for (sindex = 0;sindex < RM_NPARM;sindex++) { if (common_config[sindex].c_name == NULL) break; if (!strcmp(common_config[sindex].c_name, cp->c_name)) break; } /* END for (sindex) */ if ((sindex < RM_NPARM) && (common_config[sindex].c_name != NULL) && !strcmp(common_config[sindex].c_name, cp->c_name) && strcmp(common_config[sindex].c_name, "gres")) { /* specified parameter is common parameter */ continue; } if (!strncmp(cp->c_name, "size", strlen("size"))) continue; if (GResBuf[0] != '\0') { strncat(GResBuf, "+", sizeof(GResBuf) - 1 - strlen(GResBuf)); } snprintf(tmpLine, 1024, "%s:%s", cp->c_name, cp->c_u.c_value); strncat(GResBuf, tmpLine, (sizeof(GResBuf) - strlen(GResBuf) - 1)); } /* END for (cp) */ return(GResBuf); } /* END reqgres() */ static const char *reqstate( struct rm_attribute *attrib) /* I (ignored) */ { static char state[1024]; if ((internal_state & INUSE_DOWN) && (MOMConfigDownOnError != 0)) strcpy(state, "down"); else if (internal_state & INUSE_BUSY) strcpy(state, "busy"); else strcpy(state, "free"); return(state); } /* END reqstate() */ static const char *requname( struct rm_attribute *attrib) { const char *cp; if (attrib != NULL) { log_err(-1, __func__, extra_parm); rm_errno = RM_ERR_BADPARAM; return(NULL); } cp = getuname(); return(cp); } /* END requname() */ static const char *validuser( struct rm_attribute *attrib) { struct passwd *p; if ((attrib == NULL) || (attrib->a_value == NULL)) { log_err(-1, __func__, no_parm); rm_errno = RM_ERR_NOPARAM; return(NULL); } p = getpwnam_ext(attrib->a_value); if (p != NULL) { return("yes"); } return("no"); } /* END validuser() */ const char *loadave( struct rm_attribute *attrib) { #ifdef NUMA_SUPPORT /* the hardware provides no way to get information on specific cores, * so there's no way to obtain the load average of a numa node */ return(NULL); #else static char ret_string[20]; double la; if (attrib) { log_err(-1, __func__, extra_parm); rm_errno = RM_ERR_BADPARAM; return(NULL); } if (get_la(&la) != 0) { rm_errno = RM_ERR_SYSTEM; return(NULL); } sprintf(ret_string, "%.2f", la); return(ret_string); #endif /* NUMA_SUPPORT */ } /* END loadave() */ /* ** Search the array of resources read from the config files. */ struct config *rm_search( struct config *where, /* I */ const char *what) /* I */ { struct config *cp; if (where == NULL || what == NULL) { return(NULL); } for (cp = where;cp->c_name != NULL;cp++) { if (strcmp(cp->c_name, what) == 0) { return(cp); } } /* END for (cp) */ return(NULL); } /* END rm_search() */ /* ** Search the various resource lists. */ const char *dependent( const char *res, /* I */ struct rm_attribute *attr) /* I */ { struct config *ap; extern struct config standard_config[]; extern struct config dependent_config[]; ap = rm_search(common_config, res); if (ap != NULL) { return(ap->c_u.c_func(attr)); } ap = rm_search(standard_config, res); if (ap != NULL) { return(ap->c_u.c_func(attr)); } ap = rm_search(dependent_config, res); if (ap != NULL) { return(ap->c_u.c_func(attr)); } rm_errno = RM_ERR_UNKNOWN; return(NULL); } /* END dependent() */ /* ** Initialize standard resource array */ void initialize(void) { log_record(PBSEVENT_SYSTEM, 0, __func__, "independent"); dep_initialize(); return; } void cleanup(void) { dep_cleanup(); return; } /* ** Clean up after a signal. */ void die( int sig) { if (sig > 0) { sprintf(log_buffer, "caught signal %d", sig); log_record(PBSEVENT_SYSTEM, 0, __func__, log_buffer); } else { log_record(PBSEVENT_SYSTEM, 0, __func__, "abnormal termination"); } cleanup(); #if defined(NVIDIA_GPUS) && defined(NVML_API) shut_nvidia_nvml(); #endif /* NVIDIA_GPUS and NVML_API */ log_close(1); exit(1); } /* END die() */ /* ** Check for fatal memory allocation error. */ void memcheck( const char *buf) { if (buf != NULL) { return; } log_err(-1, "memcheck", "memory allocation failed"); die(0); return; } /* END memcheck() */ /* ** Check the ret_string buffer to make sure that there is ** enought room starting at *spot to hold len characters more. ** If not, realloc the buffer and make *spot point to ** the corresponding place that it used to point to in ** the old buffer. */ void checkret( char **spot, long len) { char *hold; if ((*spot - ret_string) < (ret_size - len)) { return; } ret_size += len * 2; /* new buf size */ sprintf(log_buffer, "size increased to %ld", ret_size); log_record(PBSEVENT_SYSTEM, 0, __func__, log_buffer); hold = (char *)realloc(ret_string, ret_size); /* new buf */ memcheck(hold); *spot = *spot - ret_string + hold; /* new spot in buf */ ret_string = hold; return; } /* END checkret() */ char *skipwhite( char *str) { for (;*str;str++) { if (!isspace(*str)) break; } return(str); } char *tokcpy( char *str, char *tok) { for (;*str;str++, tok++) { if (!isalnum(*str) && *str != ':' && *str != '_') break; *tok = *str; } /* END tokcpy() */ *tok = '\0'; return(str); } /* END tokcpy() */ void rmnl( char *str) { int i; i = strlen(str); while (--i) { if ((*(str + i) != '\n') && !isspace((int)*(str + i))) break; *(str + i) = '\0'; } return; } /* * Parse a boolean config option value. * Return 1 (true), 0 (false), -1 (error). * Accepts: "true", "yes", "on", "1", "false", "no", "off", "0" */ static int setbool( const char *value) /* I */ { int enable = -1; if (value != NULL) { switch (value[0]) { case 't': case 'T': case 'y': case 'Y': case '1': enable = 1; break; case 'f': case 'F': case 'n': case 'N': case '0': enable = 0; break; case 'o': case 'O': if ((strcasecmp(value,"on") == 0)) enable = 1; else if ((strcasecmp(value,"off") == 0)) enable = 0; break; } } return(enable); } u_long addclient( const char *name) /* I */ { struct addrinfo *addr_info; struct in_addr saddr; u_long ipaddr; /* FIXME: must be able to retry failed lookups later */ if (pbs_getaddrinfo(name, NULL,&addr_info) != 0) { sprintf(log_buffer, "host %s not found", name); log_err(-1, __func__, log_buffer); return(0); } saddr = ((struct sockaddr_in *)addr_info->ai_addr)->sin_addr; ipaddr = ntohl(saddr.s_addr); okclients = AVL_insert(ipaddr, 0, NULL, okclients); return(ipaddr); } /* END addclient() */ static u_long setpbsclient( const char *value) /* I */ { u_long rc; if ((value == NULL) || (value[0] == '\0')) { /* FAILURE */ return(1); } rc = addclient(value); if (rc != 0) { /* FAILURE */ return(1); } return(0); } /* END setpbsclient() */ static u_long setjoboomscoreadjust( const char *value) /* I */ { int v = 0; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); v = atoi(value); /* check for allowed value range */ if( v >= -17 && v <= 15 ) { job_oom_score_adjust = v; /* ok */ return 1; } else { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, "value is out of valid range <-17,15>. Using defaults"); /* error */ return 0; } } /* END setjoboomscoreadjust() */ static u_long setmomoomimmunize( const char *Value) /* I */ { int enable = -1; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, Value); if (Value == NULL) { /* FAILURE */ return(0); } /* accept various forms of "true", "yes", and "1" */ switch (Value[0]) { case 't': case 'T': case 'y': case 'Y': case '1': enable = 1; break; case 'f': case 'F': case 'n': case 'N': case '0': enable = 0; break; } if (enable != -1) { mom_oom_immunize = enable; } return(1); } /* END setmomoomimmunize() */ /* FIXME: we need to handle a non-default port number */ static u_long setpbsserver( const char *value) /* I */ { if ((value == NULL) || (*value == '\0')) { return(1); /* FAILURE - nothing specified */ } log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); return(mom_server_add(value)); } /* END setpbsserver() */ static u_long settmpdir( const char *Value) { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, Value); if (*Value != '/') { log_err(-1, __func__, "tmpdir must be a full path"); return(0); } snprintf(tmpdir_basename, sizeof(tmpdir_basename), "%s", Value); return(1); } static u_long setexecwithexec( const char *value) { static const char *id = "setexecwithexec"; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, value); if (!strncasecmp(value, "t", 1) || (value[0] == '1') || !strcasecmp(value, "on") ) exec_with_exec = 1; else exec_with_exec = 0; return(1); } /* END setexecwithexec() */ static u_long setxauthpath( const char *Value) { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, Value); if (*Value != '/') { log_err(-1, __func__, "xauthpath must be a full path"); return(0); } snprintf(xauth_path, sizeof(xauth_path), "%s", Value); return(1); } static u_long setrcpcmd( const char *Value) /* I */ { static char *ptr; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, Value); if (*Value != '/') { log_err(-1, __func__, "rcpcmd must be a full path"); /* FAILURE */ return(0); } snprintf(rcp_path, sizeof(rcp_path), "%s", Value); strcpy(rcp_args, ""); if ((ptr = strchr(rcp_path, ' ')) != NULL) { *ptr = '\0'; if (*(ptr + 1) != '\0') { snprintf(rcp_args, sizeof(rcp_args), "%s", ptr + 1); } } /* SUCCESS */ return(1); } /* END setrcpcmd() */ static u_long setlogevent( const char *value) { char *bad; *log_event_mask = strtol(value, &bad, 0); if ((*bad == '\0') || isspace((int)*bad)) { return(1); } return(0); } /* END setlogevent() */ /* NOTE: maskclient is global */ static u_long restricted( const char *name) { char **tmpMaskClient; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, name); if (mask_max == 0) { if ((maskclient = (char **)calloc(4, sizeof(char *))) == NULL) { /* FAILURE - cannot alloc memory */ log_err(errno, __func__, "cannot alloc memory"); return(-1); } mask_max = 4; } maskclient[mask_num] = strdup(name); if (maskclient[mask_num] == NULL) { /* FAILURE - cannot alloc memory */ log_err(errno, __func__, "cannot alloc memory"); return(-1); } mask_num++; if (mask_num == mask_max) { mask_max *= 2; tmpMaskClient = (char **)realloc( maskclient, mask_max * sizeof(char *)); if (tmpMaskClient == NULL) { /* FAILURE - cannot alloc memory */ log_err(errno, __func__, "cannot alloc memory"); return(-1); } maskclient = tmpMaskClient; } /* SUCCESS */ return(1); } /* END restricted() */ static u_long configversion( const char *Value) /* I */ { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, Value); if (Value == NULL) { /* FAILURE */ return(0); } snprintf(MOMConfigVersion, sizeof(MOMConfigVersion), "%s", Value); /* SUCCESS */ return(1); } /* END configversion() */ static u_long setdownonerror( const char *value) /* I */ { int enable; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "down_on_error", value); if ((enable = setbool(value)) != -1) MOMConfigDownOnError = enable; return(1); } /* END setdownonerror() */ static u_long setenablemomrestart( const char *value) /* I */ { int enable; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "enablemomrestart", value); if ((enable = setbool(value)) != -1) MOMConfigRestart = enable; return(1); } /* END setenablemomrestart() */ static u_long cputmult( const char *value) /* I */ { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); if ((cputfactor = atof(value)) == 0.0) { return(0); /* error */ } return(1); } /* END cputmult() */ static u_long wallmult( const char *value) { double tmpD; if (value == NULL) { /* FAILURE */ return(0); } log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); tmpD = atof(value); if ((tmpD == 0.0) && (value[0] != '\0')) { /* FAILURE */ return(0); } /* SUCCESS */ wallfactor = tmpD; return(1); } /* END wallmult() */ static u_long usecp( const char *value) /* I */ { char *pnxt; static int cphosts_max = 0; struct cphosts *newp = NULL; /* FORMAT: : */ /* * HvB and Willem added this for logging purpose */ log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); if (cphosts_max == 0) { pcphosts = (struct cphosts *)calloc(2, sizeof(struct cphosts)); if (pcphosts == NULL) { sprintf(log_buffer, "%s: out of memory while allocating pcphosts", __func__); log_err(-1, __func__, log_buffer); return(0); } cphosts_max = 2; } else if (cphosts_max == cphosts_num) { newp = (struct cphosts *)realloc( pcphosts, (cphosts_max + 2) * sizeof(struct cphosts)); if (newp == NULL) { /* FAILURE */ sprintf(log_buffer,"%s: out of memory while reallocating pcphosts", __func__); log_err(-1, __func__, log_buffer); return(0); } pcphosts = newp; cphosts_max += 2; } pnxt = strchr((char *)value, (int)':'); if (pnxt == NULL) { /* request failed */ sprintf(log_buffer, "invalid host specification: %s", value); log_err(-1, __func__, log_buffer); return(0); } *pnxt++ = '\0'; pcphosts[cphosts_num].cph_hosts = strdup(value); if (pcphosts[cphosts_num].cph_hosts == NULL) { /* FAILURE */ sprintf(log_buffer, "%s: out of memory in strdup(cph_hosts)", __func__); log_err(-1, __func__, log_buffer); return(0); } value = pnxt; /* now ptr to path */ while (!isspace(*pnxt)) { if (*pnxt == '\0') { sprintf(log_buffer, "invalid '%s' specification %s: " "missing destination path", __func__, value); log_err(-1, __func__, log_buffer); free(pcphosts[cphosts_num].cph_hosts); return(0); } pnxt++; } *pnxt++ = '\0'; pcphosts[cphosts_num].cph_from = strdup(value); if (pcphosts[cphosts_num].cph_from == NULL) { sprintf(log_buffer, "%s: out of memory in strdup(cph_from)", __func__); log_err(-1, __func__, log_buffer); free(pcphosts[cphosts_num].cph_hosts); return(0); } pcphosts[cphosts_num].cph_to = strdup(skipwhite(pnxt)); if (pcphosts[cphosts_num].cph_to == NULL) { sprintf(log_buffer, "%s: out of memory in strdup(cph_to)", __func__); log_err(-1, __func__, log_buffer); free(pcphosts[cphosts_num].cph_hosts); free(pcphosts[cphosts_num].cph_from); return(0); } cphosts_num++; return(1); } /* END usecp() */ static unsigned long prologalarm( const char *value) /* I */ { int i; log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "prologalarm", value); i = (int)atoi(value); if (i <= 0) { return(0); /* error */ } pe_alarm_time = (unsigned int)i; return(1); } /* END prologalarm() */ static unsigned long setloglevel( const char *value) /* I */ { int i; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); i = (int)atoi(value); if (i < 0) { return(0); /* error */ } LOGLEVEL = (unsigned int)i; return(1); } /* END setloglevel() */ static unsigned long setmaxupdatesbeforesending( const char *value) { int i; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); i = (int)atoi(value); if (i < 0) return(0); /* error */ maxupdatesbeforesending = i; /* SUCCESS */ return(1); } /* END setmaxupdatesbeforesending() */ static unsigned long setumask( const char *value) /* I */ { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "setumask", value); snprintf(DEFAULT_UMASK, sizeof(DEFAULT_UMASK), "%s", value); return(1); } /* END setumask() */ static unsigned long setpreexec( const char *value) /* I */ { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "setpreexec", value); snprintf(PRE_EXEC, sizeof(PRE_EXEC), "%s", value); #if SHELL_USE_ARGV == 0 log_err(0, __func__, "pbs_mom not configured with enable-shell-user-argv option"); #endif return(1); } /* END setpreexec() */ static unsigned long setsourceloginbatch( const char *value) /* I */ { int enable; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "source_login_batch", value); if ((enable = setbool(value)) != -1) src_login_batch = enable; return(1); } /* END setsourceloginbatch() */ static unsigned long setsourcelogininteractive( const char *value) /* I */ { int enable; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "source_login_interactive", value); if ((enable = setbool(value)) != -1) src_login_interactive = enable; return(1); } /* END setsourcelogininteractive() */ static unsigned long jobstartblocktime( const char *value) /* I */ { int i; log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "startblocktime", value); i = (int)strtol(value, NULL, 10); if ((i < 0) || ((i == 0) && (value[0] != '0'))) { return(0); /* error */ } TJobStartBlockTime = i; return(1); } /* END jobstartblocktime() */ static unsigned long setstatusupdatetime( const char *value) /* I */ { int i; log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "setstateuspdatetime", value); i = (int)strtol(value, NULL, 10); if (i < 1) { return(0); /* error */ } ServerStatUpdateInterval = (unsigned int)i; return(1); } /* END setstatusupdatetime() */ static unsigned long setcheckpolltime( const char *value) /* I */ { int i; log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "setcheckpolltime", value); i = (int)strtol(value, NULL, 10); if (i < 1) { return(0); /* error */ } CheckPollTime = (unsigned int)i; return(1); } /* END setcheckpolltime() */ /* ** Add static resource or shell escape line from config file. ** This is a support routine for read_config(). */ static void add_static( char *str, /* I */ char *file, /* I */ int linenum) /* I */ { int i; char name[50]; struct config_list *cp; str = tokcpy(str, name); /* resource name */ str = skipwhite(str); /* resource value */ /* FORMAT: [!] */ if (*str == '!') /* shell escape command */ { /* remove trailing newline */ rmnl(str); } else { /* get the value */ i = strlen(str); while (--i) { /* strip trailing blanks */ if (!isspace((int)*(str + i))) break; *(str + i) = '\0'; } } cp = (struct config_list *)calloc(1, sizeof(struct config_list)); memcheck((char *)cp); cp->c_link = config_list; cp->c.c_name = strdup(name); memcheck(cp->c.c_name); cp->c.c_u.c_value = strdup(str); memcheck(cp->c.c_u.c_value); sprintf(log_buffer, "%s[%d] add name %s value %s", file, linenum, name, str); log_record(PBSEVENT_DEBUG, 0, "add_static", log_buffer); config_list = cp; return; } /* END add_static() */ static unsigned long setidealload( const char *value) { float val; val = atof(value); log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "ideal_load", value); if (val < 0.0) { return(0); /* error */ } ideal_load_val = val; if (max_load_val < 0.0) max_load_val = val; /* set a default */ return(1); } /* END setidealload() */ static unsigned long setignwalltime( const char *value) /* I */ { int enable; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "ignwalltime", value); if ((enable = setbool(value)) != -1) ignwalltime = enable; return(1); } /* END setignwalltime() */ static unsigned long setignmem( const char *value) /* I */ { int enable; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "ignmem", value); if ((enable = setbool(value)) != -1) ignmem = enable; return(1); } /* END setignmem() */ static unsigned long setigncput( const char *value) /* I */ { int enable; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "igncput", value); if ((enable = setbool(value)) != -1) igncput = enable; return(1); } static unsigned long setignvmem( const char *value) /* I */ { int enable; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "ignvmem", value); if ((enable = setbool(value)) != -1) ignvmem = enable; return(1); } /* END setignvmem() */ static unsigned long setautoidealload( const char *value) { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "auto_ideal_load", value); auto_ideal_load = strdup(value); /* add_static(auto_ideal_load,"config",0); nconfig++; */ return(1); } /* END setautoidealload() */ static unsigned long setallocparcmd( const char *value) /* I */ { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "allocparcmd", value); AllocParCmd = strdup(value); return(1); } /* END setallocparcmd() */ static unsigned long setautomaxload( const char *value) { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "auto_max_load", value); auto_max_load = strdup(value); /* add_static(auto_ideal_load,"config",0); nconfig++; */ return(1); } /* END setautomaxload() */ static unsigned long setmaxconnecttimeout( const char *value) /* I */ { MaxConnectTimeout = strtol(value, NULL, 10); if (MaxConnectTimeout < 0) { MaxConnectTimeout = 10000; return(0); } return(1); } static unsigned long setreduceprologchecks( const char *value) { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "reduceprologchecks", value); if (!strncasecmp(value,"t",1) || (value[0] == '1') || !strcasecmp(value,"on") ) reduceprologchecks = TRUE; else reduceprologchecks = FALSE; return(1); } /* END setreduceprologchecks() */ static unsigned long setnodecheckscript( const char *value) { struct stat sbuf; log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "node_check_script", value); if ((stat(value, &sbuf) == -1) || !(sbuf.st_mode & S_IXUSR)) { /* FAILURE */ /* file does not exist or is not executable */ return(0); } snprintf(PBSNodeCheckPath, sizeof(PBSNodeCheckPath), "%s", value); /* SUCCESS */ return(1); } /* END setnodecheckscript() */ unsigned long setnodecheckinterval( const char *value) { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "node_check_interval", value); PBSNodeCheckInterval = (int)strtol(value, NULL, 10); if (strstr(value, "jobstart")) PBSNodeCheckProlog = 1; if (strstr(value, "jobend")) PBSNodeCheckEpilog = 1; return(1); } /* END setnodecheckinterval() */ static unsigned long settimeout( const char *value) { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "timeout", value); DIS_tcp_settimeout(strtol(value, NULL, 10)); return(1); } /* END settimeout() */ static unsigned long setmaxload( const char *value) /* I */ { float val; val = atof(value); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "max_load", value); if (val < 0.0) { return(0); /* error */ } max_load_val = val; if (ideal_load_val < 0.0) ideal_load_val = val; return(1); } /* END max_load() */ static unsigned long setlogfilemaxsize( const char *value) /* I */ { log_file_max_size = strtol(value, NULL, 10); if (log_file_max_size < 0) { log_file_max_size = 0; return(0); } return(1); } static unsigned long setlogfilerolldepth( const char *value) /* I */ { log_file_roll_depth = strtol(value, NULL, 10); if (log_file_roll_depth < 1) { log_file_roll_depth = 1; return(0); } return(1); } static unsigned long setlogdirectory( const char *value) /* I */ { path_log = strdup(value); return(1); } static unsigned long setlogfilesuffix( const char *value) /* I */ { log_init(value, NULL); return(1); } static unsigned long setlogkeepdays( const char *value) /* I */ { int i; i = (int)atoi(value); if (i < 0) { return(0); /* error */ } LOGKEEPDAYS = i; return(1); } static unsigned long setextpwdretry( const char *value) /* I */ { int i; i = (int)atoi(value); if (i < 0) { return(0); /* error */ } EXTPWDRETRY = i; return(1); } /* END setextpwdretry() */ static u_long setvarattr( const char *value) /* I */ { struct varattr *pva; const char *ptr; pva = (struct varattr *)calloc(1, sizeof(struct varattr)); if (pva == NULL) { /* FAILURE */ log_err(errno, __func__, "no memory"); return(0); } CLEAR_LINK(pva->va_link); /* FORMAT: */ /* extract TTL */ ptr = value; pva->va_ttl = strtol(ptr, NULL, 10); /* step forward to end of TTL */ while ((!isspace(*ptr)) && (*ptr != '\0')) ptr++; if (*ptr == '\0') { free(pva); return(0); } /* skip white space */ while (isspace(*ptr)) ptr++; if (*ptr == '\0') { free(pva); return(0); } /* preserve command and args */ pva->va_cmd = strdup(ptr); append_link(&mom_varattrs, &pva->va_link, pva); /* SUCCESS */ return(1); } /* END setvarattr() */ static unsigned long setthreadunlinkcalls( const char *value) /* I */ { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "threadunlinkcalls", value); if (!strncasecmp(value,"t",1) || (value[0] == '1') || !strcasecmp(value,"on") ) thread_unlink_calls = TRUE; else thread_unlink_calls = FALSE; return(1); } /* END setthreadunlinkcalls() */ static unsigned long setnodefilesuffix( const char *value) /* I */ { char *ptr; char *cpy = strdup(value); if(cpy == NULL) { return (-1); } ptr = strtok(cpy, ","); nodefile_suffix = strdup(ptr); ptr = strtok(NULL, ","); if (ptr != NULL) submithost_suffix = strdup(ptr); /* SUCCESS */ free(cpy); return(1); } /* END setnodexfilesuffix() */ static unsigned long setmomhost( const char *value) /* I */ { hostname_specified = 1; snprintf(mom_host, sizeof(mom_host), "%s", value); /* SUCCESS */ return(1); } /* END setmomhost() */ static u_long setrreconfig( const char *value) /* I */ { int enable; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "remote_reconfig", value); if ((enable = setbool(value)) != -1) MOMConfigRReconfig = enable; return(1); } /* END setrreconfig() */ static unsigned long setnospooldirlist( const char *value) /* I */ { char *TokPtr; char *ptr; int index = 0; char tmpLine[1024]; ptr = strtok_r((char *)value, " \t\n:,", &TokPtr); while (ptr != NULL) { TNoSpoolDirList[index] = strdup(ptr); snprintf(tmpLine, sizeof(tmpLine), "added NoSpoolDir[%d] '%s'", index, ptr); log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "setnospooldirlist", tmpLine); index++; if (index >= TMAX_NSDCOUNT) break; ptr = strtok_r(NULL, " \t\n:,", &TokPtr); } /* END while (ptr != NULL) */ /* SUCCESS */ return(1); } /* END setnospooldirlist() */ unsigned long aliasservername(const char *value) { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); if (value) { server_alias = (char *)calloc(1, strlen(value)+1); if (server_alias) { strcpy(server_alias, value); } } return(1); } /* END aliasservername() */ static unsigned long setspoolasfinalname( const char *value) /* I */ { int enable; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "spool_as_final_name", value); if ((enable = setbool(value)) != -1) spoolasfinalname = enable; return(1); } /* END setspoolasfinalname() */ static unsigned long setapbasilpath( const char *value) { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "apbasil_path", value); if (value != NULL) { if (value[0] == '/') apbasil_path = strdup(value); else { snprintf(log_buffer, sizeof(log_buffer), "Path must be an absolute path, but is %s", value); log_err(-1, __func__, log_buffer); } } return(1); } /* END setapbasilpath() */ static unsigned long setapbasilprotocol( const char *value) { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "apbasil_protocol", value); if (value != NULL) { /* only versions 1.0 to 1.3 are supported for now. update later */ if ((strlen(value) != 3) || (value[0] != '1') || (value[1] != '.') || ((value[2] != '0') && (value[2] != '1') && (value[2] != '2') && (value[2] != '3'))) { snprintf(log_buffer, sizeof(log_buffer), "Value must be 1.[0-3] but is %s", value); log_err(-1, __func__, log_buffer); } else apbasil_protocol = strdup(value); } return(1); } /* END setapbasilprotocol() */ static unsigned long setreportermom( const char *value) { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); if (value != NULL) { if ((value[0] == 't') || (value[0] == 'T') || (value[0] == 'y') || (value[0] == 'Y')) is_reporter_mom = TRUE; else is_reporter_mom = FALSE; } return(1); } /* END setreportermom() */ static unsigned long setloginnode( const char *value) { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); if (value != NULL) { if ((value[0] == 't') || (value[0] == 'T') || (value[0] == 'y') || (value[0] == 'Y')) is_login_node = TRUE; else is_login_node = FALSE; } return(1); } /* END setloginnode() */ unsigned long setjobexitwaittime( const char *value) { int tmp; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); if (value != NULL) { tmp = strtol(value, NULL, 10); if (tmp != 0) job_exit_wait_time = tmp; } return(1); } /* END setjobexitwaittime() */ unsigned long setmaxjoinjobwaittime( const char *value) { int tmp; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); if (value != NULL) { tmp = strtol(value, NULL, 10); if (tmp != 0) max_join_job_wait_time = tmp; } return(1); } /* END setmaxjoinjobwaittime() */ unsigned long setresendjoinjobwaittime( const char *value) { int tmp; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); if (value != NULL) { tmp = strtol(value, NULL, 10); if (tmp != 0) resend_join_job_wait_time = tmp; } return(1); } /* END setresendjoinjobwaittime() */ unsigned long setmomhierarchyretrytime( const char *value) { int tmp; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); if (value != NULL) { tmp = strtol(value, NULL, 10); if (tmp != 0) mom_hierarchy_retry_time = tmp; } return(1); } static unsigned long setrejectjobsubmission( const char *value) { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, value); if (!strncasecmp(value,"t",1) || (value[0] == '1') || (!strcasecmp(value,"on"))) reject_job_submit = TRUE; else reject_job_submit = FALSE; return(1); } /* END setrejectjobsubmission() */ static unsigned long setattempttomakedir( const char *value) { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "attempttomakedir", value); if (!strncasecmp(value,"t",1) || (value[0] == '1') || !strcasecmp(value,"on") ) attempttomakedir = TRUE; else attempttomakedir = 0; return(1); } /* END setattempttomakedir() */ /******************************************************** * jobstarter - set the name of the job starter program * to the value given in the mom config file * * Return: 1 success * 0 file named by value does not exist. fail *******************************************************/ unsigned long jobstarter(const char *value) /* I */ { struct stat sbuf; log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "checkpoint_run_exe", value); if ((stat(value, &sbuf) == -1) || !(sbuf.st_mode & S_IXUSR)) { /* file does not exist or is not executable */ return(0); /* error */ } snprintf(jobstarter_exe_name, sizeof(jobstarter_exe_name), "%s", value); jobstarter_set = 1; return(1); } /* END jobstarter() */ static unsigned long setremchkptdirlist( const char *value) /* I */ { char *TokPtr; char *ptr; int index = 0; char tmpLine[1024]; while ((TRemChkptDirList[index] != NULL) && (index < TMAX_RCDCOUNT)) { index++; } if (index >= TMAX_RCDCOUNT) return (1); ptr = strtok_r((char *)value, " \t\n:,", &TokPtr); while (ptr != NULL) { TRemChkptDirList[index] = strdup(ptr); snprintf(tmpLine, sizeof(tmpLine), "added RemChkptDir[%d] '%s'", index, ptr); log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "setremchkptdirlist", tmpLine); index++; if (index >= TMAX_RCDCOUNT) break; ptr = strtok_r(NULL, " \t\n:,", &TokPtr); } /* END while (ptr != NULL) */ /* SUCCESS */ return (1); } /* END setremchkptdirlist() */ void check_log(void) { last_log_check = time_now; /* periodically record the version and loglevel */ sprintf(log_buffer, msg_info_mom, PACKAGE_VERSION, LOGLEVEL); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); if (LOGKEEPDAYS > 0) { /* remove logs older than log_keep_days */ snprintf(log_buffer,sizeof(log_buffer),"checking for old pbs_mom logs in dir '%s' (older than %d days)", path_log, LOGKEEPDAYS); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); if (log_remove_old(path_log,(LOGKEEPDAYS * SECS_PER_DAY)) != 0) { log_err(-1,"check_log", "failure occurred when checking for old pbs_mom logs"); } } if (log_file_max_size <= 0) { return; } if (log_size() >= log_file_max_size) { log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, "Rolling log file"); log_roll(log_file_roll_depth); } return; } /* END check_log() */ /* ** Open and read the config file. Save information in a linked ** list. After reading the file, create an array, copy the list ** elements to the array and free the list. */ /* NOTE: add new mom config parameters to 'special[]' */ int read_config( char *file) /* I */ { FILE *conf; struct stat sb; struct config_list *cp; struct config *ap; char line[120]; char name[50]; char *str; char *ptr; int linenum; int i; int IgnConfig = 0; int rc; int n, list_len; char *server_list_ptr; char *tp; if (LOGLEVEL >= 3) { sprintf(log_buffer, "updating configuration using file '%s'", (file != NULL) ? file : "NULL"); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, log_buffer); } for (i = 0;i < mask_num;i++) { free(maskclient[i]); } mask_num = 0; if (file == NULL) file = config_file; rc = 0; if (file[0] == '\0') { log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, "ALERT: no config file specified"); IgnConfig = 1; /* no config file */ } if ((IgnConfig == 0) && (stat(file, &sb) == -1)) { IgnConfig = 1; if (config_file_specified != 0) { /* file specified and not there, return failure */ log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, "ALERT: cannot open config file - no file"); rc = 1; } else { /* "config" file not located, return success */ if (LOGLEVEL >= 3) { sprintf(log_buffer, "cannot open file '%s'", file); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, log_buffer); } rc = 0; } } /* END if ((IgnConfig == 0) && (stat(file,&sb) == -1)) */ if (IgnConfig == 0) { #if !defined(DEBUG) && !defined(NO_SECURITY_CHECK) if (chk_file_sec(file, 0, 0, S_IWGRP | S_IWOTH, 1, NULL)) { /* not authorized to access specified file, return failure */ log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, "ALERT: cannot open config file - permissions"); IgnConfig = 1; rc = 1; } #endif /* NO_SECURITY_CHECK */ } /* END if (ignConfig == 0) */ if (IgnConfig == 0) { if ((conf = fopen(file, "r")) == NULL) { sprintf(log_buffer, "fopen: %s", file); log_err(errno, __func__, log_buffer); IgnConfig = 1; rc = 1; } } /* END if (IgnConfig == 0) */ if (IgnConfig == 0) { nconfig = 0; linenum = 0; memset(line, 0, sizeof(line)); memset(name, 0, sizeof(name)); while (fgets(line, sizeof(line) - 1, conf)) { linenum++; if (line[0] == '#') /* comment */ { memset(line, 0, sizeof(line)); continue; } if ((ptr = strchr(line, '#')) != NULL) { /* allow inline comments */ *ptr = '\0'; } str = skipwhite(line); /* pass over initial whitespace */ if (*str == '\0') { memset(line, 0, sizeof(line)); continue; } if (LOGLEVEL >= 6) { sprintf(log_buffer, "processing config line '%.64s'", str); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, log_buffer); } if (*str == '$') { /* special command */ str = tokcpy(++str, name); /* resource name */ for (i = 0;special[i].name;i++) { if (strcasecmp(name, special[i].name) == 0) break; } /* END for (i) */ if (special[i].name == NULL) { /* didn't find it */ sprintf(log_buffer, "special command name %s not found (ignoring line)", name); log_err(-1, __func__, log_buffer); memset(line, 0, sizeof(line)); continue; } str = skipwhite(str); /* command param */ rmnl(str); if (special[i].handler(str) == 0) { sprintf(log_buffer, "%s[%d] special command %s failed with %s", file, linenum, name, str); log_err(-1, __func__, log_buffer); } memset(line, 0, sizeof(line)); continue; } add_static(str, file, linenum); nconfig++; memset(line, 0, sizeof(line)); } /* END while (fgets()) */ /* ** Create a new array. */ if (config_array != NULL) { for (ap = config_array;ap->c_name != NULL;ap++) { free((void *)ap->c_name); free((void *)ap->c_u.c_value); } free(config_array); } config_array = (struct config *)calloc(nconfig + 1, sizeof(struct config)); memcheck((const char *)config_array); /* ** Copy in the new information saved from the file. */ for (i = 0, ap = config_array;i < nconfig;i++, ap++) { *ap = config_list->c; cp = config_list->c_link; free(config_list); /* don't free name and value strings */ config_list = cp; /* they carry over from the list */ } ap->c_name = NULL; /* one extra */ fclose(conf); } /* END if (IgnConfig == 0) */ if (mom_server_count == 0) { /* No server names in torque/mom_priv/config. Get names from torque/server_name. */ server_list_ptr = pbs_get_server_list(); list_len = csv_length(server_list_ptr); for (n = 0; n < list_len; n++) { tp = csv_nth(server_list_ptr, n); if (tp) { /* Trim any leading space */ while(isspace(*tp)) tp++; setpbsserver(tp); } } } return(rc); } /* END read_config() */ #ifdef PENABLE_LINUX26_CPUSETS static u_long setusesmt( const char *value) /* I */ { int enable; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "use_smt", value); if ((enable = setbool(value)) != -1) MOMConfigUseSMT = enable; return(1); } /* END setusesmt() */ static u_long setmempressthr( const char *value) { long long val; log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "memory_pressure_threshold", value); if ((val = atoll(value)) < 0) return(0); /* error, leave as is */ if (val > INT_MAX) val = INT_MAX; memory_pressure_threshold = (int)val; return(1); } static u_long setmempressdur( const char *value) { int val; val = atoi(value); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, "memory_pressure_duration", value); if ((val = atoi(value)) < 0) return(0); /* error, leave as is */ if (val > SHRT_MAX) val = SHRT_MAX; memory_pressure_duration = (short)val; return(1); } #endif /* ** Get an rm_attribute structure from a string. If a NULL is passed ** for the string, use the previously remembered string. */ struct rm_attribute *momgetattr( char *str) /* I */ { static char cookie[] = "tag:"; /* rm_attribute to ignore */ static char *hold = NULL; static char qual[80] = ""; static char valu[4096] = ""; static struct rm_attribute attr = { qual, valu }; int level, i; if (str == NULL) /* if NULL is passed, use prev value */ str = hold; if (str == NULL) return(NULL); /* FORMAT: ??? */ do { str = skipwhite(str); if (*str++ != '[') { return(NULL); } str = skipwhite(str); /* copy qualifier */ str = tokcpy(str, qual); str = skipwhite(str); if (*str++ != '=') { return(NULL); } level = 0; for (i = 0;*str;str++, i++) { if (*str == '[') { level++; } else if (*str == ']') { if (level == 0) break; level--; } valu[i] = *str; } if (*str++ != ']') { return(NULL); } valu[i] = '\0'; if (LOGLEVEL >= 7) { sprintf(log_buffer, "found %s = %s", qual, valu); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); } } while (strncmp(qual, cookie, sizeof(cookie) - 1) == 0); hold = str; if (LOGLEVEL >= 5) { sprintf(log_buffer, "passing back %s = %s", qual, valu); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); } return(&attr); } /* END momgetattr() */ /* ** Check the request against the format of the line read from ** the config file. If it is a static value, there should be ** no params. If it is a shell escape, the parameters (if any) ** should match the command line for the system call. */ char *conf_res( char *resline, /* I */ struct rm_attribute *attr) /* I */ { char *name[RM_NPARM]; char *value[RM_NPARM]; int used[RM_NPARM]; /* (boolean) */ char param[80]; char *d; int i; int fd; int len; FILE *child; char *child_spot; int child_len; if (resline == NULL) { return((char *)""); } if (resline[0] != '!') { /* static value */ if (attr != NULL) { sprintf(ret_string, "? %d", RM_ERR_BADPARAM); return(ret_string); } return(resline); } /* ** From here on we are going to put together a shell command ** to do the requestor's bidding. Parameter substitution ** is the first step. */ for (i = 0; i < RM_NPARM - 1; i++) { /* remember params */ if (attr == NULL) { /* FAILURE */ break; } name[i] = strdup(attr->a_qualifier); memcheck(name[i]); value[i] = strdup(attr->a_value); memcheck(value[i]); used[i] = 0; attr = momgetattr(NULL); } /* END for (i) */ if (attr != NULL) { /* too many params */ log_err(-1, __func__, "too many params"); sprintf(ret_string, "? %d", RM_ERR_BADPARAM); goto done; } name[i] = '\0'; for (d = ret_string, resline++;*resline;) { /* scan command */ if (*resline == '%') { /* possible token */ char *hold; hold = tokcpy(resline + 1, param); for (i = 0;name[i];i++) { if (strcmp(param, name[i]) == 0) break; } if (name[i]) { /* found a match */ char *x = value[i]; while (*x) { *d++ = *x++; } resline = hold; used[i] = 1; } else { *d++ = *resline++; } } else { *d++ = *resline++; } } for (i = 0;name[i];i++) { if (!used[i]) { /* parameter sent but not used */ log_err(-1, __func__, "unused parameters"); sprintf(ret_string, "? %d", RM_ERR_BADPARAM); goto done; } } /* END for (i) */ *d = '\0'; DBPRT(("command: %s\n", ret_string)) if ((child = popen(ret_string, "r")) == NULL) { log_err(errno, __func__, "popen"); sprintf(ret_string, "? %d", RM_ERR_SYSTEM); goto done; } fd = fileno(child); child_spot = ret_string; child_len = 0; child_spot[0] = '\0'; retryread: while ((len = read_ac_socket(fd, child_spot, ret_size - child_len)) > 0) { for (i = 0;i < len;i++) { if (child_spot[i] == '\n') break; } if (i < len) { /* found newline */ child_len += i + 1; break; } child_len += len; child_spot += len; checkret(&child_spot, len); } if (len == -1) { if (errno == EINTR) { goto retryread; } log_err(errno, __func__, "pipe read"); sprintf(ret_string, "? %d", RM_ERR_SYSTEM); fclose(child); goto done; } pclose(child); if (child_len > 0) ret_string[child_len - 1] = '\0'; /* hack off newline */ done: for (i = 0;name[i] != NULL;i++) { /* free up params */ free(name[i]); free(value[i]); } /* END for (i) */ return(ret_string); } /* END conf_res() */ static void catch_abort( int sig) { struct rlimit rlimit; /* * Reset ourselves to the default signal handler to try and * prevent recursive core dumps. */ struct sigaction act; sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = SIG_DFL; sigaction(SIGSEGV, &act, NULL); sigaction(SIGBUS, &act, NULL); sigaction(SIGFPE, &act, NULL); sigaction(SIGILL, &act, NULL); sigaction(SIGTRAP, &act, NULL); sigaction(SIGSYS, &act, NULL); log_err(sig, "mom_main", "Caught fatal core signal"); rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_CORE, &rlimit); abort(); return; } /* END catch_abort() */ static void catch_hup( int sig) { sprintf(log_buffer, "caught signal %d", sig); log_record(PBSEVENT_SYSTEM, 0, "catch_hup", "reset"); call_hup = 1; return; } /* END catch_hup() */ /* * Do a restart of resmom. * Read the last seen config file and * Clean up and reinit the dependent code. */ static void process_hup(void) { call_hup = 0; log_record(PBSEVENT_SYSTEM, 0, __func__, "reset"); pthread_mutex_lock(&log_mutex); log_close(1); log_open(log_file, path_log); pthread_mutex_unlock(&log_mutex); log_file_max_size = 0; log_file_roll_depth = 1; #ifdef PENABLE_LINUX26_CPUSETS memory_pressure_threshold = 0; memory_pressure_duration = 0; #endif clear_servers(); read_config(NULL); check_log(); cleanup(); initialize(); return; } /* END process_hup() */ /* ** Got an alarm call. */ void toolong( int sig) { if (LOGLEVEL >= 1) DBPRT(("alarm call\n")) return; } /* END toolong() */ #ifdef DEBUG void log_verbose( char *id, char *buf, int len) { int i; char *cp; len = MIN(len, 50); cp = log_buffer; for (i = 0;i < len;i++) { int c = buf[i]; if (isprint(c)) { *cp++ = c; } else { sprintf(cp, "(%d)", c); cp += strlen(cp); } } *cp = '\0'; log_record(PBSEVENT_DEBUG, 0, id, log_buffer); return; } /* END log_verbose() */ #else #define log_verbose(a, b, c) #endif /* ** See if an IP address matches any names stored as "restricted" ** access hosts. Return 0 if a name matches, 1 if not. */ int bad_restrict( u_long ipadd) { struct hostent *host; struct in_addr in; int i, len1, len2; char *cp1, *cp2; in.s_addr = htonl(ipadd); if ((host = gethostbyaddr( (void *) & in, sizeof(struct in_addr), AF_INET)) == NULL) { return(1); } len1 = strlen(host->h_name) - 1; for (i = 0;i < mask_num;i++) { len2 = strlen(maskclient[i]) - 1; if (len1 < len2) continue; cp1 = (char *)&host->h_name[len1]; cp2 = &maskclient[i][len2]; /* check case insensitve */ while ((len2 >= 0) && (tolower(*cp1) == tolower(*cp2))) { cp1--; cp2--; len2--; } /* END while () */ if (((len2 == 0) && (*cp2 == '*')) || (len2 == -1)) { return(0); } } /* END for (i) */ return(1); } /* END bad_restrict() */ /* * mom_lock - lock out other MOMs from this directory. */ static void mom_lock( int fds, int op) /* F_WRLCK or F_UNLCK */ { struct flock flock; flock.l_type = op; flock.l_whence = SEEK_SET; flock.l_start = 0; flock.l_len = 0; /* whole file */ if (fcntl(fds, F_SETLK, &flock) < 0) { char tmpPath[256]; tmpPath[0] = '\0'; if (getcwd(tmpPath, sizeof(tmpPath)) == NULL) tmpPath[0] = '\0'; sprintf(log_buffer, "cannot lock '%s/mom.lock' - another mom running", (tmpPath[0] != '\0') ? tmpPath : "$MOM_HOME"); log_err(errno, msg_daemonname, log_buffer); fprintf(stderr, "%s\n", log_buffer); exit(1); } return; } /* END mom_lock() */ int could_be_mic_or_gpu_file( const char *jobid) { int len = strlen(jobid); const char *start = jobid + len - 3; if (*start == 'g') { if ((start[1] == 'p') && (start[2] == 'u')) return(TRUE); } else if (*start == 'm') { if ((start[1] == 'i') && (start[2] == 'c')) return(TRUE); } return(FALSE); } /* END could_be_mic_or_gpu_file() */ void cleanup_aux() { struct dirent *pdirent; DIR *auxdir; char namebuf[MAXLINE]; unsigned int len; auxdir = opendir(path_aux); if (auxdir != NULL) { while ((pdirent = readdir(auxdir)) != NULL) { if (pdirent->d_name[0] == '.') continue; if (could_be_mic_or_gpu_file(pdirent->d_name) == TRUE) continue; if (mom_find_job(pdirent->d_name) == NULL) { /* this job doesn't exist */ snprintf(namebuf, sizeof(namebuf), "%s/%s", path_aux, pdirent->d_name); unlink(namebuf); len = strlen(namebuf); if (sizeof(namebuf) - 3 > len) { strcpy(namebuf + len, "gpu"); unlink(namebuf); strcpy(namebuf + len, "mic"); unlink(namebuf); } } } closedir(auxdir); } } /* END cleanup_aux() */ int process_clear_job_request( std::stringstream &output, char *curr) { char *ptr = NULL; job *pjob = NULL; job *pjobnext = NULL; if ((*curr == '=') && ((*curr) + 1 != '\0')) { ptr = curr + 1; } /* purge job if local */ if (ptr == NULL) { output << "invalid clearjob request"; } else { std::string tmpLine; if (!strcasecmp(ptr, "all")) { if ((pjob = (job *)GET_NEXT(svr_alljobs)) != NULL) { while (pjob != NULL) { tmpLine.append("clearing job "); tmpLine.append(pjob->ji_qs.ji_jobid); log_record(PBSEVENT_SYSTEM, 0, __func__, tmpLine.c_str()); pjobnext = (job *)GET_NEXT(pjob->ji_alljobs); mom_job_purge(pjob); pjob = pjobnext; output << tmpLine << "\n"; } } output << "clear completed"; } else if ((pjob = mom_find_job(ptr)) != NULL) { output << "clearing job "; output << pjob->ji_qs.ji_jobid; log_record(PBSEVENT_SYSTEM, 0, __func__, tmpLine.c_str()); mom_job_purge(pjob); output << tmpLine; } } return(PBSE_NONE); } /* END process_clear_job_request() */ void add_diag_header( std::stringstream &output) { output << "\nHost: " << mom_short_name << "/" << mom_host << " Version: "; output << PACKAGE_VERSION << " PID: " << getpid() << "\n"; } /* END add_diag_header() */ void add_diag_home_dir( std::stringstream &output) { output << "HomeDirectory: "; if (mom_home != NULL) output << mom_home; else output << "N/A"; output << "\n"; } /* END add_diag_home_dir() */ void add_diag_remaining_spool_space( std::stringstream &output) { struct statvfs VFSStat; if (statvfs((const char *)path_spool, &VFSStat) < 0) { output << "ALERT: cannot stat stdout/stderr spool directory '" << path_spool; output << "' (errno=" << errno << ")" << strerror(errno) << "\n"; } else { if (VFSStat.f_bavail > 0) { if (verbositylevel >= 1) { output << "stdout/stderr spool directory: '" << path_spool << "' (" << VFSStat.f_bavail; output << "blocks available)\n"; } } else { output << "ALERT: stdout/stderr spool directory '" << path_spool << "' is full.\nFree space then restart the pbs_mom.\n"; } } } /* END add_diag_remaining_spool_space() */ void add_diag_config_version( std::stringstream &output) { if (MOMConfigVersion[0] != '\0') output << "ConfigVersion: " << MOMConfigVersion << "\n"; } /* END add_diag_config_version() */ void add_diag_syslog_status( std::stringstream &output) { #if SYSLOG output << "NOTE: syslog enabled\n"; #else /* SYSLOG */ output << "NOTE: syslog not enabled (use 'configure --enable-syslog' to enable)\n"; #endif /* SYSLOG */ } void add_diag_health_check_script_status( std::stringstream &output) { if (PBSNodeCheckPath[0] != '\0') { output << "Node Health Check Script: " << PBSNodeCheckPath << " ("; output << PBSNodeCheckInterval * ServerStatUpdateInterval << " second update interval)\n"; } } /* END add_diag_health_check_script_status() */ void add_diag_active_time( std::stringstream &output) { output << "MOM active: " << (long)time(NULL) - MOMStartTime << " seconds\n"; } /* END add_diag_active_time() */ void add_diag_check_poll_time( std::stringstream &output) { output << "Check Poll Time: " << CheckPollTime << " seconds\n"; } void add_diag_status_update_interval( std::stringstream &output) { output << "Server Update Interval: " << ServerStatUpdateInterval << " seconds\n"; } void add_diag_mom_message( std::stringstream &output) { if (PBSNodeMsgBuf[0] != '\0') { output << "MOM Message: " << PBSNodeMsgBuf << " (use 'momctl -q clearmsg' to clear)\n"; } } void add_diag_mom_name_error_if_needed( std::stringstream &output) { if (MOMUNameMissing[0] != '\0') { output << "WARNING: passwd file is corrupt (job requests user '" << MOMUNameMissing; output << "' - not found in local passwd file)\n"; } } void add_diag_prolog_timeout_count( std::stringstream &output) { if (MOMPrologTimeoutCount > 0) { output << "WARNING: " << MOMPrologTimeoutCount << " prolog timeouts (" << pe_alarm_time; output << " seconds) detected since start up - increase $prologalarm or investigate prolog\n"; } } void add_diag_prolog_failure_count( std::stringstream &output) { if (MOMPrologFailureCount > 0) { output << "WARNING: " << MOMPrologFailureCount; output << " prolog failures detected since start up - investigate prolog\n"; } } void add_diag_log_level( std::stringstream &output) { output << "LogLevel: " << LOGLEVEL << " (use SIGUSR1/SIGUSR2 to adjust)\n"; } void add_diag_communication_model( std::stringstream &output) { output << "Communication Model: TCP\n"; } void add_diag_mom_locking_information( std::stringstream &output) { if ((MOMIsLocked == 1) || (MOMIsPLocked == 1) || (verbositylevel >= 4)) { output << "MemLocked: "; if (MOMIsLocked == 0) output << "FALSE"; else output << "TRUE"; if (MOMIsLocked == 1) output << " (mlock)"; if (MOMIsPLocked == 1) output << " (plocked)"; output << "\n"; } } void add_diag_tcp_timeout( std::stringstream &output) { output << "TCP Timeout: " << pbs_tcp_timeout << " seconds\n"; } void add_diag_prolog_epilog_info( std::stringstream &output) { struct stat s; int prologfound = 0; if (stat(path_prolog, &s) != -1) { output << "Prolog: " << path_prolog << " (enabled)\n"; prologfound = 1; } else if (verbositylevel >= 2) { output << "Prolog: " << path_prolog << " (disabled)\n"; } if (stat(path_prologp, &s) != -1) { output << "Parallel Prolog: " << path_prologp << " (enabled)\n"; prologfound = 1; } if (prologfound == 1) { output << "Prolog Alarm Time: " << pe_alarm_time << " seconds\n"; } } void add_diag_alarm_time( std::stringstream &output) { int rc = alarm(alarm_time); alarm(rc); output << "Alarm Time: " << rc << " of " << alarm_time << " seconds\n"; } void add_diag_okclient_list( std::stringstream &output) { long max_len = 1024; long final_len = 0; char *tmp_line = (char *)calloc(1, max_len + 1); if (tmp_line != NULL) { int ret = AVL_list(okclients, &tmp_line, &final_len, &max_len); output << "Trusted Client List: " << tmp_line << ": " << ret << "\n"; free(tmp_line); } else { output << "Trusted Client Could not be retrieved\n"; } } void add_diag_copy_command( std::stringstream &output) { output << "Copy Command: " << rcp_path << " " << rcp_args << "\n"; } void add_diag_jobs_session_ids( std::stringstream &output, job *pjob) { bool first = true; task *ptask; output << " sidlist="; for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { /* only check on tasks that we think should still be around */ if (ptask->ti_qs.ti_status != TI_STATE_RUNNING) continue; /* NOTE: on linux systems, the session master should have pid == sessionid */ if (first == true) output << ptask->ti_qs.ti_sid; else output << "," << ptask->ti_qs.ti_sid; } /* END for (task) */ } void add_diag_jobs_memory_info( std::stringstream &output, job *pjob) { resource *pres; const char *resname; unsigned long resvalue; /* Selected resources used by job (on this node) */ if (verbositylevel >= 2) { if (pjob->ji_wattr[JOB_ATR_resc_used].at_type != ATR_TYPE_RESC) { output << "\n"; return; } pres = (resource *)GET_NEXT(pjob->ji_wattr[JOB_ATR_resc_used].at_val.at_list); for (;pres != NULL;pres = (resource *)GET_NEXT(pres->rs_link)) { if (pres->rs_defin == NULL) { continue; } resname = pres->rs_defin->rs_name; if (!(strcmp(resname, "mem"))) resvalue = getsize(pres); else if (!(strcmp(resname, "vmem"))) resvalue = getsize(pres); else if (!(strcmp(resname, "cput"))) resvalue = gettime(pres); else continue; output << " " << resname << "=" << resvalue; } #ifdef PENABLE_LINUX26_CPUSETS /* report current memory pressure */ output << " mempressure=" << pjob->ji_mempressure_curr; #endif } } void add_diag_jobs_batch_env_vars( std::stringstream &output, job *pjob) { if (verbositylevel >= 4) { char *VPtr = get_job_envvar(pjob, "BATCH_PARTITION_ID"); if (VPtr != NULL) output << " BATCH_PARTITION_ID=" << VPtr; VPtr = get_job_envvar(pjob, "BATCH_ALLOC_COOKIE"); if (VPtr != NULL) output << " BATCH_ALLOC_COOKIE=" << VPtr; } /* END if (verbositylevel >= 4) */ } int diag_jobs_count_vnodes( job *pjob) { int vnindex; int numvnodes = 0; /* count the CPUs assigned to this mom */ for (vnindex = 0; vnindex < pjob->ji_numvnod; vnindex++) { if (!strcmp(pjob->ji_vnods[vnindex].vn_host->hn_host, mom_alias)) { numvnodes++; } } return(numvnodes); } void add_diag_jobid_and_state( std::stringstream &output, job *pjob) { output << "job[" << pjob->ji_qs.ji_jobid << "] state=" << PJobSubState[pjob->ji_qs.ji_substate]; } void add_diag_job_entry( std::stringstream &output, int *numvnodes, job *pjob) { *numvnodes += diag_jobs_count_vnodes(pjob); add_diag_jobid_and_state(output, pjob); add_diag_jobs_memory_info(output, pjob); add_diag_jobs_session_ids(output, pjob); add_diag_jobs_batch_env_vars(output, pjob); output << "\n"; } /* END add_diag_job_entry() */ void add_diag_new_jobs( std::stringstream &output) { job *pjob; if ((pjob = (job *)GET_NEXT(svr_newjobs)) != NULL) { while (pjob != NULL) { output << "job[" << pjob->ji_qs.ji_jobid << "] state=NEW\n"; pjob = (job *)GET_NEXT(pjob->ji_alljobs); } } } void add_diag_job_list( std::stringstream &output) { job *pjob; if ((pjob = (job *)GET_NEXT(svr_alljobs)) == NULL) { output << "NOTE: no local jobs detected\n"; } else { int numvnodes = 0; for (;pjob != NULL;pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { add_diag_job_entry(output, &numvnodes, pjob); } /* END for (pjob) */ output << "Assigned CPU Count: " << numvnodes << "\n"; } /* END else ((pjob = (job *)GET_NEXT(svr_alljobs)) == NULL) */ add_diag_new_jobs(output); } /* END add_diag_job_list() */ void add_diag_var_attrs( std::stringstream &output) { struct varattr *pva; if ((pva = (struct varattr *)GET_NEXT(mom_varattrs)) != NULL) { output << "Varattrs:\n"; while (pva != NULL) { output << " ttl=" << pva->va_ttl << " last=" << ctime(&pva->va_lasttime); output << " cmd=" << pva->va_cmd << "\n value="; if (pva->va_value != NULL) output << pva->va_value; else output << "NULL"; output << "\n\n", pva = (struct varattr *)GET_NEXT(pva->va_link); } } } /* * set_verbosity_level() * * parses the string to determine what verbosity level this diag output should be * * @pre-cond: name must be a valid string * @post-cond: global variable verbositylevel is set */ void set_verbosity_level( const char *name) { const char *ptr; ptr = name + strlen("diag"); verbositylevel = (int)strtol(ptr, NULL, 10); } int process_diag_request( std::stringstream &output, const char *name) { output.str(""); set_verbosity_level(name); add_diag_header(output); mom_server_all_diag(output); add_diag_home_dir(output); #ifdef HAVE_SYS_STATVFS_H add_diag_remaining_spool_space(output); #endif /* HAVE_SYS_STATVFS_H */ add_diag_config_version(output); if (verbositylevel >= 3) { add_diag_syslog_status(output); add_diag_health_check_script_status(output); } add_diag_active_time(output); if (verbositylevel >= 1) { add_diag_check_poll_time(output); add_diag_status_update_interval(output); } add_diag_mom_message(output); add_diag_mom_name_error_if_needed(output); add_diag_prolog_timeout_count(output); add_diag_prolog_failure_count(output); add_diag_log_level(output); if (verbositylevel >= 1) { add_diag_communication_model(output); add_diag_mom_locking_information(output); add_diag_tcp_timeout(output); add_diag_prolog_epilog_info(output); if (verbositylevel >= 2) { add_diag_alarm_time(output); } add_diag_okclient_list(output); add_diag_copy_command(output); } add_diag_job_list(output); add_diag_var_attrs(output); output << "\ndiagnostics complete\n"; log_record(PBSEVENT_SYSTEM, 0, __func__, "internal diagnostics complete"); return(PBSE_NONE); } /* END process_diag_request() */ /* * clear_rm_messages() * clears any stored messages for this mom * @pre-condition: output must be a valid stringstream */ void clear_rm_messages( std::stringstream &output) { /* this is where the messages are stored */ PBSNodeMsgBuf[0] = '\0'; output << "messages cleared"; log_record(PBSEVENT_SYSTEM, 0, __func__, "messages cleared"); } /* END clear_rm_messages() */ /* * force_status_update() * forces an update to pbs_server * @see mom_server_all_update_stat() - this calls send_update() to * determine whether or not to send an update. */ void force_status_update( std::stringstream &output) { /* make it seem like we've never sent an update to pbs_server */ LastServerUpdateTime = 0; output << "cycle forced"; log_record(PBSEVENT_SYSTEM, 0, __func__, "reporting cycle forced"); } /* END force_status_update() */ /* * set_report_status_update_interval() * @pre-cond: curr points to a valid character pointer */ void set_report_status_update_interval( std::stringstream &output, char *curr) { if ((*curr == '=') && ((*curr) + 1 != '\0')) { setstatusupdatetime(curr + 1); } output << "status_update_time=" << ServerStatUpdateInterval; } /* END set_report_status_update_interval() */ /* * set_report_check_poll_time() * @pre-cond: curr points to a valid character pointer */ void set_report_check_poll_time( std::stringstream &output, char *curr) { if ((*curr == '=') && ((*curr) + 1 != '\0')) { setcheckpolltime(curr + 1); } output << "check_poll_time=" << CheckPollTime; } /* set_report_check_poll_time() */ /* * set_report_job_start_block_time() * @pre-cond: curr points to a valid character pointer */ void set_report_job_start_block_time( std::stringstream &output, char *curr) { if ((*curr == '=') && ((*curr) + 1 != '\0')) { jobstartblocktime(curr + 1); } output << "jobstartblocktime=" << TJobStartBlockTime; } /* END set_report_job_start_block_time() */ /* * set_report_log_level() * @pre-cond: curr points to a valid character pointer */ void set_report_log_level( std::stringstream &output, char *curr) { if ((*curr == '=') && ((*curr) + 1 != '\0')) { setloglevel(curr + 1); } output << "loglevel=" << LOGLEVEL; } /* END set_report_log_level() */ /* * set_report_down_on_error() * @pre-cond: curr points to a valid character pointer */ void set_report_down_on_error( std::stringstream &output, char *curr) { if ((*curr == '=') && ((*curr) + 1 != '\0')) { setdownonerror(curr + 1); } output << "down_on_error=" << MOMConfigDownOnError; } /* END set_report_down_on_error() */ /* * set_report_mom_rolling_restart() * @pre-cond: curr points to a valid character pointer */ void set_report_mom_rolling_restart( std::stringstream &output, char *curr) { if ((*curr == '=') && ((*curr) + 1 != '\0')) { setenablemomrestart(curr + 1); } output << "enablemomrestart=" << MOMConfigRestart; } /* END set_report_mom_rolling_restart() */ void set_report_rcpcmd( std::stringstream &output, char *curr) { if ((*curr == '=') && ((*curr) + 1 != '\0')) { setrcpcmd(curr + 1); } output << "rcpcmd=" << rcp_path << " " << rcp_args; } /* END set_report_rcpcmd() */ #ifdef PENABLE_LINUX26_CPUSETS void set_report_memory_pressure_threshold( std::stringstream &output, char *curr) { if ((*curr == '=') && ((*curr) + 1 != '\0')) setmempressthr(curr + 1); output << "memory_pressure_threshold=" << memory_pressure_threshold; } /* END set_report_memory_pressure_threshold() */ void set_report_memory_pressure_duration( std::stringstream &output, char *curr) { if ((*curr == '=') && ((*curr) + 1 != '\0')) setmempressdur(curr + 1); output << "memory_pressure_duration=" << memory_pressure_duration; } /* END set_report_memory_pressure_duration() */ #endif void report_version( std::stringstream &output) { output << "version=" << PACKAGE_VERSION; } /* END report_version() */ void report_config_version( std::stringstream &output) { output << "configversion=" << MOMConfigVersion; } /* END report_config_version() */ /* * report_other_configured_attribute() * @pre-cond: name points to a valid str * @pre-cond: curr points to a valid str */ void report_other_configured_attribute( std::stringstream &output, char *name, char *curr, char *cp, bool restrictrm) { struct config *ap = rm_search(config_array, name); struct rm_attribute *attr = momgetattr(curr); if (LOGLEVEL >= 3) { snprintf(log_buffer, sizeof(log_buffer), "setting alarm in %s", __func__); log_record(PBSEVENT_SYSTEM, 0, __func__, log_buffer); } alarm(alarm_time); if ((ap != NULL) && !restrictrm) { /* static */ output << cp << "=" << conf_res(ap->c_u.c_value, attr); } else { /* check dependent code */ log_buffer[0] = '\0'; const char *value = dependent(name, attr); if (value != NULL) { output << cp << "=" << value; } else { /* not found anywhere */ output << cp << "=? " << rm_errno; } } alarm(0); } /* END report_other_configured_attribute() */ /* * process_rm_cmd_request() * * @pre-condition: chan must have a pointer to a valid socket from which * @pre-condition: num_queries indicates the number of queries the rm * will execute * the request can be read and to which the response can be written. */ int process_rm_cmd_request( struct tcp_chan *chan, /* I */ int num_queries, /* I */ bool restrictrm) { int ret; char name[100]; std::stringstream output; /* query resource data */ reqnum++; ret = diswsi(chan, RM_RSP_OK); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "write request response failed: %s", dis_emsg[ret]); return(ret); } for (int query_index = 0; query_index < num_queries; query_index++) { char *cp = disrst(chan, &ret); if (ret == DIS_EOD) { if (cp != NULL) { free(cp); cp = NULL; } break; } if (ret != DIS_SUCCESS) { sprintf(log_buffer, "problem with request line: %s", dis_emsg[ret]); return(ret); } char *curr = skipwhite(cp); curr = tokcpy(curr, name); if (name[0] == '\0') { /* no name */ output << cp; output << "=? "; output << RM_ERR_UNKNOWN; } else { if (!strncasecmp(name, "clearjob", strlen("clearjob"))) { process_clear_job_request(output, curr); } else if (!strncasecmp(name, "clearmsg", strlen("clearmsg"))) { /* clear rm messages */ clear_rm_messages(output); } else if (!strncasecmp(name, "cycle", strlen("cycle"))) { force_status_update(output); } else if (!strncasecmp(name, "status_update_time", strlen("status_update_time"))) { set_report_status_update_interval(output, curr); } else if (!strncasecmp(name, "check_poll_time", strlen("check_poll_time"))) { set_report_check_poll_time(output, curr); } else if (!strncasecmp(name, "jobstartblocktime", strlen("jobstartblocktime"))) { set_report_job_start_block_time(output, curr); } else if (!strncasecmp(name, "loglevel", strlen("loglevel"))) { set_report_log_level(output, curr); } else if (!strncasecmp(name, "down_on_error", strlen("down_on_error"))) { set_report_down_on_error(output, curr); } else if (!strncasecmp(name, "enablemomrestart", strlen("enablemomrestart"))) { set_report_mom_rolling_restart(output, curr); } else if (!strncasecmp(name, "rcpcmd", strlen("rcpcmd"))) { set_report_rcpcmd(output, curr); } #ifdef PENABLE_LINUX26_CPUSETS else if (!strncasecmp(name, "memory_pressure_threshold", strlen("memory_pressure_threshold"))) { set_report_memory_pressure_threshold(output, curr); } else if (!strncasecmp(name, "memory_pressure_duration", strlen("memory_pressure_duration"))) { set_report_memory_pressure_duration(output, curr); } #endif else if (!strncasecmp(name, "version", strlen("version"))) { report_version(output); } else if ((!strncasecmp(name, "configversion", strlen("configversion"))) && (MOMConfigVersion[0] != '\0')) { report_config_version(output); } else if (!strncasecmp(name, "diag", strlen("diag"))) { ret = process_diag_request(output, name); } else { report_other_configured_attribute(output, name, curr, cp, restrictrm); } } /* END (name[0] == '\0') */ free(cp); ret = diswst(chan, output.str().c_str()); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "write string failed %s", dis_emsg[ret]); return(ret); } if (DIS_tcp_wflush(chan) == -1) { log_err(errno, __func__, "flush"); return(-1); } } /* END for each query */ return(ret); } /* END process_rm_cmd_request() */ /* ** Process a request for the resource monitor. The i/o ** will take place using DIS over a tcp fd or an rpp stream. */ int rm_request( struct tcp_chan *chan, int version) { char output[BUFSIZ << 2]; int len; int command; int ret; bool restrictrm = false; char *body; int sindex; unsigned long ipadd; u_short port; extern struct connection svr_conn[]; int NotTrusted = 0; char *tmp; int num_queries = 0; errno = 0; log_buffer[0] = '\0'; ipadd = svr_conn[chan->sock].cn_addr; port = svr_conn[chan->sock].cn_port; if (version != RM_PROTOCOL_VER) { sprintf(log_buffer, "protocol version %d unknown", version); goto bad; } if (((port_care != FALSE) && (port >= IPPORT_RESERVED)) || (AVL_is_in_tree_no_port_compare(ipadd, 0, okclients) == 0 )) { if (bad_restrict(ipadd)) { sprintf(log_buffer, "bad attempt to connect - unauthorized (port: %d)", port); NotTrusted = 1; goto bad; } restrictrm = true; } /* looks okay, find out how many queries and what command it is */ num_queries = disrsi(chan, &ret); if (ret == DIS_SUCCESS) command = disrsi(chan, &ret); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "no command %s", dis_emsg[ret]); goto bad; } switch (command) { case RM_CMD_CLOSE: /* no response to this */ return DIS_EOD; /*NOTREACHED*/ break; case RM_CMD_REQUEST: if (process_rm_cmd_request(chan, num_queries, restrictrm) != DIS_SUCCESS) goto bad; break; case RM_CMD_CONFIG: { char *ptr; if (MOMConfigRReconfig == FALSE) { log_err(-1, __func__, "remote reconfiguration disabled, ignoring request"); goto bad; } if (restrictrm) { log_err(-1, __func__, "restricted configure attempt"); goto bad; } log_record(PBSEVENT_SYSTEM, 0, __func__, "configure"); body = disrst(chan, &ret); /* FORMAT: FILE: or (NYI) */ if (ret == DIS_EOD) { /* no file specified, use default */ body = NULL; } else if (ret != DIS_SUCCESS) { sprintf(log_buffer, "problem with config body %s", dis_emsg[ret]); goto bad; } else { FILE *fp; if ((ptr = strstr(body, "CONFIG:")) != NULL) { ptr += strlen("CONFIG:"); /* overwrite config with data and clear body */ if ((fp = fopen(config_file, "w+")) == NULL) { sprintf(log_buffer, "cannot open config file %s", config_file); goto bad; } if (fwrite(ptr, sizeof(char), strlen(ptr) + 1, fp) < (strlen(ptr) + 1)) { fclose(fp); sprintf(log_buffer, "cannot write config file %s", config_file); goto bad; } fclose(fp); free(body); body = NULL; } } clear_servers(); len = read_config(body); free(body); ret = diswsi(chan, len ? RM_RSP_ERROR : RM_RSP_OK); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "write config response failed %s", dis_emsg[ret]); goto bad; } if (DIS_tcp_wflush(chan) == -1) { log_err(errno, __func__, "flush"); goto bad; } } /* END (case RM_CMD_CONFIG) */ break; case RM_CMD_SHUTDOWN: if (restrictrm) { log_err(-1, __func__, "restricted shutdown attempt"); goto bad; } log_record(PBSEVENT_SYSTEM, 0, __func__, "shutdown"); ret = diswsi(chan, RM_RSP_OK); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "write shutdown response failed %s", dis_emsg[ret]); log_err(-1, __func__, log_buffer); } DIS_tcp_wflush(chan); close_conn(chan->sock, FALSE); DIS_tcp_cleanup(chan); mom_lock(lockfds, F_UNLCK); close(lockfds); mom_run_state = MOM_RUN_STATE_EXIT; internal_state = INUSE_DOWN; for (sindex = 0; sindex < PBS_MAXSERVER; sindex++) { if (mom_servers[sindex].pbs_servername[0] != '\0') shutdown_to_server(sindex); } cleanup(); #if defined(NVIDIA_GPUS) && defined(NVML_API) shut_nvidia_nvml(); #endif /* NVIDIA_GPUS and NVML_API */ log_close(1); exit(0); /*NOTREACHED*/ break; default: sprintf(log_buffer, "unknown command %d", command); log_err(-1, __func__, log_buffer); ret = diswsi(chan, RM_RSP_ERROR); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "write default response failed %s", dis_emsg[ret]); goto bad; } ret = diswst(chan, log_buffer); if (ret != DIS_SUCCESS) { sprintf(log_buffer, "write string failed %s", dis_emsg[ret]); goto bad; } break; } /* END switch(command) */ return PBSE_NONE; bad: tmp = netaddr_pbs_net_t(ipadd); sprintf(output, "\n\tmessage refused from port %d addr %s", port, tmp); sprintf(TMOMRejectConn, "%s:%d %s", tmp, port, (NotTrusted == 1) ? "(server not authorized)" : "(request corrupt)"); free(tmp); snprintf(log_buffer + strlen(log_buffer), sizeof(log_buffer) - strlen(log_buffer), "%s", output); log_err(errno, __func__, log_buffer); return DIS_EOD; } /* END rm_request() */ int tcp_read_proto_version( struct tcp_chan *chan, int *proto, int *version) { int rc = DIS_SUCCESS; time_t tmpT; tmpT = pbs_tcp_timeout; pbs_tcp_timeout = TCP_READ_PROTO_TIMEOUT; *proto = disrsi(chan, &rc); if (tmpT > 0) { /* restore */ pbs_tcp_timeout = tmpT; } else { /* initialize */ pbs_tcp_timeout = PMOMTCPTIMEOUT; } switch (rc) { case DIS_SUCCESS: /* worked */ break; case DIS_EOF: /* closed */ case DIS_EOD: /* still open */ break; default: sprintf(log_buffer, "no protocol number: %s (errno = %d)", dis_emsg[rc], errno); log_err(rc, __func__, log_buffer); break; } /* END switch (rc) */ if (rc == PBSE_NONE) { *version = disrsi(chan, &rc); if (rc != DIS_SUCCESS) { sprintf(log_buffer, "End of messages on socket %d", chan->sock); log_record(PBSEVENT_DEBUG, PBS_EVENTCLASS_REQUEST, __func__, log_buffer); } } return(rc); } int do_tcp( int socket, struct sockaddr_in *pSockAddr) { int rc = PBSE_NONE; int proto = -1; int version = -1; struct tcp_chan *chan = NULL; extern struct connection svr_conn[]; if ((chan = DIS_tcp_setup(socket)) == NULL) { sprintf(log_buffer, "Can not allocate memory for socket buffer"); log_err(errno, __func__, log_buffer); return(PBSE_MEM_MALLOC); } if ((rc = tcp_read_proto_version(chan, &proto, &version)) != DIS_SUCCESS) { DIS_tcp_cleanup(chan); chan = NULL; goto do_tcp_cleanup; } switch (proto) { case RM_PROTOCOL: { time_t tmpT; DBPRT(("%s: got a resource monitor request\n", __func__)) tmpT = pbs_tcp_timeout; rc = rm_request(chan, version); if (tmpT > 0) { /* restore */ pbs_tcp_timeout = tmpT; } else { /* initialize */ pbs_tcp_timeout = PMOMTCPTIMEOUT; } if (rc == PBSE_NONE) rc = RM_PROTOCOL * -1; svr_conn[chan->sock].cn_stay_open = FALSE; } /* END BLOCK (case RM_PROTOCOL) */ break; case TM_PROTOCOL: DBPRT(("%s: got an internal task manager request\n", __func__)) svr_conn[chan->sock].cn_stay_open = TRUE; rc = tm_request(chan, version); while ((rc == PBSE_NONE) && (tcp_chan_has_data(chan) == TRUE)) { if ((rc = tcp_read_proto_version(chan, &proto, &version)) == DIS_SUCCESS) rc = tm_request(chan, version); } /* chan will be freed by the task that was initiated in tm_request */ break; case IS_PROTOCOL: mom_is_request(chan,version,NULL); break; case IM_PROTOCOL: im_request(chan,version,pSockAddr); if(chan->sock >= 0) { svr_conn[chan->sock].cn_stay_open = FALSE; } break; default: { struct sockaddr_in *addr = NULL; struct sockaddr s_addr; unsigned int len = sizeof(s_addr); if (getpeername(chan->sock, &s_addr, &len) == 0) { addr = (struct sockaddr_in *)&s_addr; DBPRT(("%s: unknown request %d from %s", __func__, proto, netaddr(addr))) } else { DBPRT(("%s: unknown request %d\n", __func__, proto)) } svr_conn[chan->sock].cn_stay_open = FALSE; } goto do_tcp_cleanup; break; } /* END switch (proto) */ /* don't close these connections -- the pointer is saved in * the tasks for MPI jobs */ if ((chan != NULL) && (((chan->sock >= 0) && (svr_conn[chan->sock].cn_stay_open == FALSE)) || ((chan->sock == -1)))) DIS_tcp_cleanup(chan); DBPRT(("%s:%d", __func__, proto)); return(rc); do_tcp_cleanup: if (chan != NULL) DIS_tcp_cleanup(chan); return(DIS_INVALID); } /* END do_tcp() */ void *tcp_request( void *new_sock) { long ipadd; char address[80]; char *tmp; int rc = PBSE_NONE; int avail_bytes = -1; int socket = *(int *)new_sock; int addr = *((int *)new_sock + 1); int port = *((int *)new_sock + 2); struct sockaddr_in sockAddr; extern struct connection svr_conn[]; if ((avail_bytes = socket_avail_bytes_on_descriptor(socket)) == 0) { close_conn(socket, FALSE); return(NULL); } memset(&sockAddr,0,sizeof(sockAddr)); sockAddr.sin_addr.s_addr = htonl(addr); sockAddr.sin_family = AF_INET; sockAddr.sin_port = htons(port); ipadd = svr_conn[socket].cn_addr; tmp = netaddr_pbs_net_t(ipadd); sprintf(address, "%s:%d", tmp, svr_conn[socket].cn_port); free(tmp); if (LOGLEVEL >= 6) { sprintf(log_buffer, "%s: fd %d addr %s", __func__, socket, address); log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); } if (AVL_is_in_tree_no_port_compare(ipadd, 0, okclients) == 0) { sprintf(log_buffer, "bad connect from %s", address); log_err(-1, __func__, log_buffer); close_conn(socket, FALSE); return(NULL); } log_buffer[0] = '\0'; rc = RM_PROTOCOL * -1; while (rc == RM_PROTOCOL * -1) rc = do_tcp(socket,&sockAddr); switch (rc) { case PBSE_NONE: if (svr_conn[socket].cn_stay_open == FALSE) close_conn(socket, FALSE); break; case DIS_EOF: DBPRT(("Closing socket %d twice...\n", socket)) case PBSE_MEM_MALLOC: case DIS_EOD: case DIS_INVALID: if (svr_conn[socket].cn_stay_open == FALSE) close_conn(socket, FALSE); break; default: close_conn(socket, FALSE); DBPRT(("Error in connection. Closing %d\n", socket)) break; } DBPRT(("%s:(exit loop for socket %d) processed\n", __func__, socket)) return(NULL); } /* END tcp_request() */ const char *find_signal_name( int sig) { struct sig_tbl *psigt; extern struct sig_tbl sig_tbl[]; for (psigt = sig_tbl; psigt->sig_name != NULL; psigt++) { if (psigt->sig_val == sig) { return(psigt->sig_name); } } return("unknown signal"); } /* * Kill a job. * Call with the job pointer and a signal number. * * NOTE: sends a signal to a job, does not purge job record * * @see kill_task() - child * @see scan_for_exiting() - parent */ int kill_job( job *pjob, /* I */ int sig, /* I */ const char *killer_id_name, /* I - process name of calling routine */ const char *why_killed_reason) /* I - reason for killing */ { task *ptask; int ct = 0; sprintf(log_buffer, "%s: sending signal %d, \"%s\" to job %s, reason: %s", killer_id_name, sig, find_signal_name(sig), pjob->ji_qs.ji_jobid, why_killed_reason); if (LOGLEVEL >= 2) { log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); } DBPRT(("%s\n", log_buffer)); DBPRT(("Job - %s Current State %s\n", pjob->ji_qs.ji_jobid, PJobSubState[MAX(0,pjob->ji_qs.ji_substate)])); /* NOTE: should change be made to only execute precancel epilog if * job is active? (NYI) */ /* NOTE: epilog blocks until complete, which may cause issues if * shutdown grace time is enabled. Change model to allow * epilog.precancel to run in background and have kill_task() * executed once it is complete (NYI) */ /* NOTE: this will allow kill_job to return immediately and will * require sigchild harvesting and the kill_task loop to be called * once this signal is received */ /* NOTE: if path_epilogpdel is not set, kill_task should be called * immediately (NYI) */ if (sig == SIGTERM) { if (run_pelog(PE_EPILOGUSER, path_epilogpdel, pjob, PE_IO_TYPE_NULL, FALSE) != 0) { log_err(-1, __func__, "precancel epilog failed"); sprintf(PBSNodeMsgBuf, "ERROR: precancel epilog failed"); } } ptask = (task *)GET_NEXT(pjob->ji_tasks); while (ptask != NULL) { if (ptask->ti_qs.ti_status == TI_STATE_RUNNING) { if (LOGLEVEL >= 4) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "kill_job found a task to kill"); } ct += kill_task(ptask, sig, 0); } ptask = (task *)GET_NEXT(ptask->ti_jobtask); } /* END while (ptask != NULL) */ if (LOGLEVEL >= 6) { sprintf(log_buffer, "kill_job done (killed %d processes)", ct); log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } return(ct); } /* END kill_job() */ /* * size decoding routine. * * Accepts a resource pointer and a pointer to the unsigned long integer * to receive the decoded value. It returns the decoded value in kb. * * sizeof(word) = sizeof(int) */ unsigned long getsize( resource *pres) /* I */ { unsigned long value; unsigned long shift; if (pres->rs_value.at_type != ATR_TYPE_SIZE) { return(0); } value = pres->rs_value.at_val.at_size.atsv_num; shift = pres->rs_value.at_val.at_size.atsv_shift; if (pres->rs_value.at_val.at_size.atsv_units == ATR_SV_WORDSZ) { if (value > ULONG_MAX / sizeof(int)) { return(0); } value *= sizeof(int); } if (shift > 10) { shift -= 10; return(value << shift); } shift = 10 - shift; return(value >> shift); } /* * time decoding routine. * * Accepts a resource pointer and a pointer to the unsigned long integer * to receive the decoded value. It returns the decoded value of time * in seconds. */ unsigned long gettime( resource *pres) { if (pres->rs_value.at_type != ATR_TYPE_LONG) { return(0); } if (pres->rs_value.at_val.at_long < 0) { return(0); } return((unsigned long)pres->rs_value.at_val.at_long); } /* END getttime() */ /* log_buffer reports detailed failure reason */ /* return 0: no issues detected */ /* return 1: over limit/child termination request detected */ int job_over_limit( job *pjob) /* I */ { pbs_attribute *attr; pbs_attribute *used; resource *limresc; resource *useresc; struct resource_def *rd; unsigned long total; int index; unsigned long limit; char *units; int rc; #ifndef NUMA_SUPPORT int i; #endif /* ndef NUMA_SUPPORT */ if ((rc = mom_over_limit(pjob)) != PBSE_NONE) { /* mom limits violated, log_buffer populated */ /* no more POLL's */ pjob->ji_nodekill = pjob->ji_nodeid; pjob->ji_qs.ji_un.ji_momt.ji_exitstat = rc; return(1); } #ifndef NUMA_SUPPORT /* cannot perform this check with NUMA, numnodes always is 1 and * you'll never have job stats */ if ((pjob->ji_numnodes == 1) || (am_i_mother_superior(*pjob) == false)) { /* no other nodes or not mother superior */ /* SUCCESS */ return(0); } #endif if (pjob->ji_nodekill != TM_ERROR_NODE) { /* one of the sister nodes reports a fatal error */ hnodent *pnode = &pjob->ji_hosts[pjob->ji_nodekill]; if (pnode->hn_sister != 0) { switch (pnode->hn_sister) { case SISTER_KILLDONE: sprintf(log_buffer, "node %d (%s) requested job terminate, '%s' (%d)", pjob->ji_nodekill, pnode->hn_host, "killdone", pnode->hn_sister); break; case SISTER_BADPOLL: sprintf(log_buffer, "node %d (%s) requested job terminate, '%s' (code %d)", pjob->ji_nodekill, pnode->hn_host, "badpoll", pnode->hn_sister); break; case SISTER_EOF: sprintf(log_buffer, "node %d (%s) requested job terminate, '%s' (code %d) - received SISTER_EOF attempting to communicate with sister MOM's", pjob->ji_nodekill, pnode->hn_host, "EOF", pnode->hn_sister); break; default: sprintf(log_buffer, "node %d (%s) requested job terminate, '%s' (code %d) - internal or network failure attempting to communicate with sister MOM's", pjob->ji_nodekill, pnode->hn_host, "EOF", pnode->hn_sister); break; } /* END switch (pnode->hn_sister) */ /* FAILURE */ return(1); } /* END if (pnode->hn_sister != 0) */ } /* END if (pjob->ji_nodekill != TM_ERROR_NODE) */ attr = &pjob->ji_wattr[JOB_ATR_resource]; used = &pjob->ji_wattr[JOB_ATR_resc_used]; /* only enforce cpu time and memory usage */ for (limresc = (resource *)GET_NEXT(attr->at_val.at_list); limresc != NULL; limresc = (resource *)GET_NEXT(limresc->rs_link)) { if ((limresc->rs_value.at_flags & ATR_VFLAG_SET) == 0) continue; rd = limresc->rs_defin; if (!strcmp(rd->rs_name, "cput")) { if (igncput == TRUE) continue; else index = 0; } else if (!strcmp(rd->rs_name, "mem")) { if (ignmem == TRUE) continue; else index = 1; } else continue; useresc = find_resc_entry(used, rd); if (useresc == NULL) continue; if ((useresc->rs_value.at_flags & ATR_VFLAG_SET) == 0) continue; total = (index == 0) ? gettime(useresc) : getsize(useresc); #ifndef NUMA_SUPPORT for (i = 0;i < pjob->ji_numnodes - 1;i++) { noderes *nr = &pjob->ji_resources[i]; total += ((index == 0) ? nr->nr_cput : nr->nr_mem); } #endif /* ndef NUMA_SUPPORT */ limit = (index == 0) ? gettime(limresc) : getsize(limresc); if (limit <= total) break; } /* END for (limresc) */ if (limresc == NULL) { /* no limit violation detected, job ok */ return(0); } units = index == 0 ? (char *)"secs" : (char *)"kb"; sprintf(log_buffer, "%s job total %lu %s exceeded limit %lu %s", rd->rs_name, total, units, limit, units); pjob->ji_nodekill = pjob->ji_nodeid; return(1); } /* END job_over_limit() */ void usage( char *prog) /* I */ { fprintf(stderr, "Usage: %s\n", prog); fprintf(stderr, " -a \\\\ Alarm Time\n"); fprintf(stderr, " -c \\\\ Config File\n"); fprintf(stderr, " -C \\\\ Checkpoint Dir\n"); fprintf(stderr, " -d \\\\ Home Dir\n"); fprintf(stderr, " -C \\\\ Checkpoint Dir\n"); fprintf(stderr, " -D \\\\ DEBUG - do not background\n"); fprintf(stderr, " -h \\\\ Print Usage\n"); fprintf(stderr, " -H \\\\ Hostname\n"); fprintf(stderr, " -l \\\\ MOM Log Dir Path\n"); fprintf(stderr, " -L \\\\ Logfile\n"); fprintf(stderr, " -M \\\\ MOM Port\n"); fprintf(stderr, " -p \\\\ Recover Jobs (Default)\n"); fprintf(stderr, " -P \\\\ Purge Jobs\n"); fprintf(stderr, " -q \\\\ Do Not Recover Jobs\n"); fprintf(stderr, " -r \\\\ Recover Jobs (2)\n"); fprintf(stderr, " -R \\\\ RM Port\n"); fprintf(stderr, " -s \\\\ Logfile Suffix\n"); fprintf(stderr, " -S \\\\ Server Port\n"); fprintf(stderr, " -v \\\\ Version\n"); fprintf(stderr, " -x \\\\ Do Not Use Privileged Ports\n"); fprintf(stderr, " --about \\\\ Print Build Information\n"); fprintf(stderr, " --help \\\\ Print Usage\n"); fprintf(stderr, " --version \\\\ Version\n"); } /* END usage() */ /* * MOMGetFileMtime - return the mtime of a file */ time_t MOMGetFileMtime( const char *fpath) { struct stat sbuf; int ret; if ((fpath == NULL) || (*fpath == '\0')) { return(0); } ret = stat(fpath, &sbuf); if (ret == 0) { return(sbuf.st_mtime); } return(0); } /* END MOMGetFileMtime */ /* * MOMCheckRestart() - set mom_run_state to restart if appropriate. * this is called when no jobs are running (below * in the main loop, and in mom_job_purge().) */ void MOMCheckRestart(void) { time_t newmtime; /* make sure we're not making a mess in the aux dir */ cleanup_aux(); if ((MOMConfigRestart <= 0) || (MOMExeTime <= 0)) { return; } newmtime = MOMGetFileMtime(ArgV[0]); if ((newmtime > 0) && (newmtime != MOMExeTime)) { if (mom_run_state == MOM_RUN_STATE_RUNNING) mom_run_state = MOM_RUN_STATE_RESTART; sprintf( log_buffer, "%s has changed, initiating re-exec (now: %ld, was: %ld)", ArgV[0], (long int)newmtime, (long int)MOMExeTime); if (LOGLEVEL > 6) { log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); } DBPRT(("%s\n", log_buffer)); } } /* END MOMCheckRestart() */ /* * initialize_globals */ void initialize_globals(void) { char *ptr; /* local tmp variable */ strcpy(pbs_current_user, "pbs_mom"); msg_daemonname = pbs_current_user; time(&MOMStartTime); CLEAR_HEAD(svr_newjobs); CLEAR_HEAD(svr_alljobs); CLEAR_HEAD(mom_polljobs); CLEAR_HEAD(svr_requests); CLEAR_HEAD(mom_varattrs); if (getenv("PBSMOMHOME") != NULL) { path_home = getenv("PBSMOMHOME"); } MOMConfigVersion[0] = '\0'; mom_server_all_init(); pbsgroup = getgid(); pbsuser = getuid(); loopcnt = time(NULL); MOMExeTime = MOMGetFileMtime(ArgV[0]); strcpy(xauth_path, XAUTH_PATH); strcpy(rcp_path, RCP_PATH); strcpy(rcp_args, RCP_ARGS); #ifdef DEFAULT_MOMLOGDIR path_log = strdup(DEFAULT_MOMLOGDIR); #endif #ifdef DEFAULT_MOMLOGSUFFIX log_init(DEFAULT_MOMLOGSUFFIX, NULL); #endif /* get default service port */ ptr = getenv("PBS_MOM_SERVICE_PORT"); if (ptr != NULL) { pbs_mom_port = (int)strtol(ptr, NULL, 10); } if (pbs_mom_port <= 0) { pbs_mom_port = get_svrport( (char *)PBS_MOM_SERVICE_NAME, (char *)"tcp", PBS_MOM_SERVICE_PORT); } ptr = getenv("PBS_BATCH_SERVICE_PORT"); if (ptr != NULL) { default_server_port = (int)strtol(ptr, NULL, 10); } if (default_server_port <= 0) { default_server_port = get_svrport( (char *)PBS_BATCH_SERVICE_NAME, (char *)"tcp", PBS_BATCH_SERVICE_PORT); } ptr = getenv("PBS_MANAGER_SERVICE_PORT"); if (ptr != NULL) { pbs_rm_port = (int)strtol(ptr, NULL, 10); } if (pbs_rm_port <= 0) { pbs_rm_port = get_svrport( (char *)PBS_MANAGER_SERVICE_NAME, (char *)"tcp", PBS_MANAGER_SERVICE_PORT); } /* set timeout values for MOM */ MaxConnectTimeout = 10000; /* in microseconds */ memset(JobsToResend,0,sizeof(JobsToResend)); /* set the mom alias name to nothing */ mom_alias[0] = '\0'; lock_init(); } /* END initialize_globals() */ /* * stop_me = signal handler for SIGTERM */ static void stop_me( int sig) /* I */ { const char *dowhat; /* just exit, leaving jobs running */ mom_run_state = MOM_RUN_STATE_EXIT; dowhat = "leaving jobs running, just exiting"; sprintf(log_buffer, "caught signal %d: %s", sig, dowhat); log_record( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); return; } /* END void stop_me() */ /* * PBSAdjustLogLevel */ static void PBSAdjustLogLevel( int sig) /* I */ { if (sig == SIGUSR1) { /* increase log level */ LOGLEVEL = MIN(LOGLEVEL + 1, 10); } else if (sig == SIGUSR2) { /* increase log level */ LOGLEVEL = MAX(LOGLEVEL - 1, 0); } sprintf(log_buffer, "received signal %d: adjusting loglevel to %d", sig, LOGLEVEL); log_record( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); return; } /* END PBSAdjustLogLevel() */ /* * mk_dirs - make the directory names used by MOM */ char *mk_dirs( const char *base) /* I */ { char *pn; int ltop = strlen(path_home); pn = (char *)calloc(1, ltop + strlen(base) + 2); if (pn == NULL) { /* cannot allocate memory */ exit(2); } strcpy(pn, path_home); if (*(path_home + ltop - 1) != '/') strcat(pn, "/"); strcat(pn, base); return(pn); } /* END mk_dirs() */ /* * parse_command_line */ void parse_command_line( int argc, /* I */ char *argv[]) /* I */ { extern char *optarg; extern int optind; int errflg; int c; char *ptr; /* local tmp variable */ errflg = 0; while ((c = getopt(argc, argv, "a:A:c:C:d:DhH:l:L:mM:pPqrR:s:S:vwx-:")) != -1) { switch (c) { case '-': if (optarg == NULL) break; if (!strcmp(optarg, "about")) { printf("package: %s\n", PACKAGE_STRING); printf("sourcedir: %s\n", PBS_SOURCE_DIR); printf("configure: %s\n", PBS_CONFIG_ARGS); printf("buildcflags: %s\n", PBS_CFLAGS); printf("buildhost: %s\n", PBS_BUILD_HOST); printf("builddate: %s\n", PBS_BUILD_DATE); printf("builddir: %s\n", PBS_BUILD_DIR); printf("builduser: %s\n", PBS_BUILD_USER); printf("installdir: %s\n", PBS_INSTALL_DIR); printf("serverhome: %s\n", PBS_SERVER_HOME); printf("version: %s\n", PACKAGE_VERSION); exit(0); } else if (!strcmp(optarg, "version")) { printf("Version: %s\nRevision: %s\n", PACKAGE_VERSION, GIT_HASH); exit(0); } else if (!strcmp(optarg, "help")) { usage(argv[0]); exit(0); } else { errflg = 1; } break; case 'a': alarm_time = (int)strtol(optarg, &ptr, 10); if ((alarm_time <= 0) || (*ptr != '\0')) { fprintf(stderr, "%s: bad alarm time\n", optarg); errflg = 1; } break; case 'A': /* mom's alias name - used for multi-mom */ snprintf(mom_alias, sizeof(mom_alias), "%s", optarg); break; case 'c': /* config file */ config_file_specified = 1; snprintf(config_file, sizeof(config_file), "%s", optarg); break; case 'h': usage(argv[0]); /* exits */ exit(0); break; case 'H': /* multihomed host */ hostname_specified = 1; snprintf(mom_host, sizeof(mom_host), "%s", optarg); break; case 'C': mom_checkpoint_set_directory_path(optarg); break; case 'd': /* directory */ path_home = optarg; break; case 'D': /* debug */ DOBACKGROUND = 0; break; case 'l': path_log = strdup(optarg); break; case 'L': log_file = optarg; break; case 'M': pbs_mom_port = (unsigned int)atoi(optarg); if (pbs_mom_port == 0) { fprintf(stderr, "Bad MOM port value %s\n", optarg); exit(1); } break; case 'm': multi_mom = 1; break; case 'p': if (!recover_set) { recover = JOB_RECOV_RUNNING; recover_set = TRUE; } else { errflg = 1; } break; case 'P': if ( !recover_set ) { recover = JOB_RECOV_DELETE; recover_set = TRUE; } else { errflg = 1; } break; case 'r': if (!recover_set) { recover = JOB_RECOV_TERM_REQUE; recover_set = TRUE; } else { errflg = 1; } break; case 'q': if (!recover_set) { recover = JOB_RECOV_REQUE; recover_set = TRUE; } else { errflg = 1; } break; case 'R': pbs_rm_port = (unsigned int)atoi(optarg); if (pbs_rm_port == 0) { fprintf(stderr, "Bad RM port value %s\n", optarg); exit(1); } break; case 's': log_init(optarg, NULL); break; case 'S': default_server_port = (unsigned int)atoi(optarg); if (default_server_port == 0) { fprintf(stderr, "Bad Server port value %s\n", optarg); exit(1); } break; case 'v': fprintf(stderr, "version: %s\n", PACKAGE_VERSION); exit(0); break; case 'w': /* wait 10 minutes or until you get the mom hierarchy file from the * server to send your first update */ first_update_time = time(NULL) + 600; break; case 'x': port_care = FALSE; break; case '?': default: errflg = 1; break; } /* END switch(c) */ } /* END while ((c = getopt(argc,argv,"a:c:C:d:Dh:L:M:prR:S:vx-:")) != -1) */ if ((errflg > 0) || (optind != argc)) { usage(argv[0]); /* exits */ exit(1); } return; } /* END parse_command_line() */ /* * verify_mom_hierarchy() * * iterates over the mom hierarchy to prune away paths and or levels * below this node's level and also add all entries in the hierarchy to * the okclients list. * * @pre-cond: parse_mom_hierarchy() must be called before this function. * @post-cond: the old hierarchy will be freed and a new, appropriately * pruned hierarchy will be put in its place. * @return true if there were legitimate nodes in the hierarchy, false otherwise */ bool verify_mom_hierarchy() { int paths_iter = -1; mom_hierarchy_t *tmp_hierarchy = initialize_mom_hierarchy(); resizable_array *paths; int path_index = 0; bool legitimate_hierarchy = false; while ((paths = (resizable_array *)next_thing(mh->paths, &paths_iter)) != NULL) { resizable_array *level; int levels_iter = -1; int level_index = 0; bool continue_on_path = true; bool legitimate_path = false; while ((level = (resizable_array *)next_thing(paths, &levels_iter)) != NULL) { node_comm_t *nc; int node_iter = -1; while ((nc = (node_comm_t *)next_thing(level, &node_iter)) != NULL) { if (!strcmp(nc->name, mom_alias)) continue_on_path = false; else { unsigned short rm_port = ntohs(nc->sock_addr.sin_port); unsigned long ipaddr = ntohl(nc->sock_addr.sin_addr.s_addr); okclients = AVL_insert(ipaddr, rm_port, NULL, okclients); } legitimate_hierarchy = true; } if (continue_on_path == true) { node_iter = -1; while ((nc = (node_comm_t *)next_thing(level, &node_iter)) != NULL) { struct addrinfo *addr_info; if (pbs_getaddrinfo(nc->name, NULL, &addr_info) == 0) { add_network_entry(tmp_hierarchy, nc->name, addr_info, ntohs(nc->sock_addr.sin_port), path_index, level_index); // mark that we found legitimate nodes in the hierarchy legitimate_path = true; } } } level_index++; } if (legitimate_path == true) path_index++; } if (legitimate_hierarchy == true) { free_mom_hierarchy(mh); mh = tmp_hierarchy; sort_paths(); } else free_mom_hierarchy(tmp_hierarchy); return(legitimate_hierarchy); } /* END verify_mom_hierarchy() */ /* * read_mom_hierarchy() * * opens the path to mom hierarchy and reads it in, if it exists. * If it doesn't exist, sets things to request the hierarchy from pbs_server. */ void read_mom_hierarchy() { int fds; mh = initialize_mom_hierarchy(); if ((fds = open(path_mom_hierarchy, O_RDONLY, 0)) < 0) { log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, "No local mom hierarchy file found, will request from server."); received_cluster_addrs = false; } else { parse_mom_hierarchy(fds); // if we read something successfully, we have the cluster addresses and don't // need to request them. received_cluster_addrs = verify_mom_hierarchy(); } } /* END read_mom_hierarchy() */ #if PENABLE_LINUX26_CPUSETS void recover_internal_layout() { #ifndef NUMA_SUPPORT std::list job_list; // get a list of jobs in start time order, first to last for (job *pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { if (job_list.empty() == true) job_list.push_back(pjob); else { std::list::iterator it; bool added = false; for (it = job_list.begin(); it != job_list.end(); it++) { if ((*it)->ji_qs.ji_stime >= pjob->ji_qs.ji_stime) { job_list.insert(it, pjob); added = true; break; } } if (added == false) job_list.push_back(pjob); } } // now, re-create the reservation for each job. for (std::list::iterator it = job_list.begin(); it != job_list.end(); it++) { recover_cpuset_reservation(*(*it)); } #endif } #endif /** * setup_program_environment */ int setup_program_environment(void) { int c; int hostc = 1; #if !defined(DEBUG) && !defined(DISABLE_DAEMONS) FILE *dummyfile; #endif char logSuffix[MAX_PORT_STRING_LEN]; char momLock[MAX_LOCK_FILE_NAME_LEN]; #ifdef PENABLE_LINUX26_CPUSETS int rc; #endif struct sigaction act; char *ptr; /* local tmp variable */ /* must be started with real and effective uid of 0 */ if (IamRoot() == 0) { return(1); } /* The following is code to reduce security risks */ /* start out with standard umask, system resource limit infinite */ umask(022); if (getenv("PBSLOGLEVEL") != NULL) { LOGLEVEL = (int)strtol(getenv("PBSLOGLEVEL"), NULL, 0); } if (getenv("PBSDEBUG") != NULL) { DEBUGMODE = 1; DOBACKGROUND = 0; } memset(TMOMStartInfo, 0, sizeof(pjobexec_t)*TMAX_JE); /* modify program environment */ if ((num_var_env = setup_env(PBS_ENVIRON)) == -1) { exit(1); } c = getgid(); /* secure suppl. groups */ if (setgroups(1,(gid_t *)&c) != 0) { snprintf(log_buffer, sizeof(log_buffer), "Unable to drop secondary groups. Some MAC framework is active?\n"); log_err(errno, __func__, log_buffer); snprintf(log_buffer, sizeof(log_buffer), "setgroups(group = %lu) failed: %s\n", (unsigned long)c, strerror(errno)); log_err(errno, __func__, log_buffer); return(1); } #ifndef DEBUG #ifdef _CRAY limit(C_JOB, 0, L_CPROC, 0); limit(C_JOB, 0, L_CPU, 0); limit(C_JOBPROCS, 0, L_CPU, 0); limit(C_PROC, 0, L_FD, 255); limit(C_JOB, 0, L_FSBLK, 0); limit(C_JOBPROCS, 0, L_FSBLK, 0); limit(C_JOB, 0, L_MEM , 0); limit(C_JOBPROCS, 0, L_MEM , 0); #else /* _CRAY */ { struct rlimit rlimit; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_CPU, &rlimit); setrlimit(RLIMIT_FSIZE, &rlimit); setrlimit(RLIMIT_DATA, &rlimit); #ifdef RLIMIT_RSS setrlimit(RLIMIT_RSS, &rlimit); #endif /* RLIMIT_RSS */ #ifdef RLIMIT_VMEM setrlimit(RLIMIT_VMEM, &rlimit); #endif /* RLIMIT_VMEM */ } /* END BLOCK */ #endif /* else _CRAY */ #endif /* DEBUG */ /* set up and validate home paths */ c = 0; mom_home = mk_dirs("mom_priv"); path_jobs = mk_dirs("mom_priv/jobs/"); path_epilog = mk_dirs("mom_priv/epilogue"); path_prolog = mk_dirs("mom_priv/prologue"); path_epiloguser = mk_dirs("mom_priv/epilogue.user"); path_prologuser = mk_dirs("mom_priv/prologue.user"); path_epilogp = mk_dirs("mom_priv/epilogue.parallel"); path_prologp = mk_dirs("mom_priv/prologue.parallel"); path_epiloguserp = mk_dirs("mom_priv/epilogue.user.parallel"); path_prologuserp = mk_dirs("mom_priv/prologue.user.parallel"); path_epilogpdel = mk_dirs("mom_priv/epilogue.precancel"); path_layout = mk_dirs("mom_priv/mom.layout"); path_mom_hierarchy = mk_dirs("mom_priv/mom_hierarchy"); #ifndef DEFAULT_MOMLOGDIR if (path_log == NULL) path_log = mk_dirs("mom_logs"); #endif path_spool = mk_dirs("spool/"); path_undeliv = mk_dirs("undelivered/"); #ifdef __CYGWIN__ /* AUX is reserved word in Windows */ path_aux = mk_dirs("auxx/"); #else path_aux = mk_dirs("aux/"); #endif /* __CYGWIN__ */ /* initialize the mom_status */ mom_status = get_dynamic_string(16 * 1024, NULL); if(mom_status == NULL) { snprintf(log_buffer, sizeof(log_buffer), "get_dynamic_string() failed: %s\n",strerror(errno)); log_err(errno, __func__, log_buffer); return(1); } init_resc_defs(); c |= mom_checkpoint_init(); /* change working directory to mom_priv */ if (chdir(mom_home) == -1) { char tmpLine[1024]; sprintf(tmpLine, "cannot change directory to home '%s'", mom_home); perror(tmpLine); return(1); } #if !defined(DEBUG) && !defined(NO_SECURITY_CHECK) c |= chk_file_sec(path_jobs, 1, 0, S_IWGRP | S_IWOTH, 1, NULL); c |= chk_file_sec(path_aux, 1, 0, S_IWGRP | S_IWOTH, 1, NULL); c |= chk_file_sec(path_spool, 1, 1, S_IWOTH, 0, NULL); c |= chk_file_sec(path_undeliv, 1, 1, S_IWOTH, 0, NULL); c |= chk_file_sec(PBS_ENVIRON, 0, 0, S_IWGRP | S_IWOTH, 0, NULL); if (c) { return(3); } #endif /* not DEBUG and not NO_SECURITY_CHECK */ if (hostname_specified == 0) { hostc = gethostname(mom_host, PBS_MAXHOSTNAME); } if (!multi_mom) { log_init(NULL, mom_host); } else { sprintf(logSuffix, "%d", pbs_mom_port); log_init(logSuffix, mom_host); } /* open log file while std in,out,err still open, forces to fd 4 */ pthread_mutex_lock(&log_mutex); if ((c = log_open(log_file, path_log)) != 0) { pthread_mutex_unlock(&log_mutex); /* use given name */ fprintf(stderr, "pbs_mom: Unable to open logfile\n"); return(1); } pthread_mutex_unlock(&log_mutex); check_log(); /* see if this log should be rolled */ if (!multi_mom) sprintf(momLock,"mom.lock"); else sprintf(momLock, "mom%d.lock", pbs_mom_port); lockfds = open(momLock, O_CREAT | O_WRONLY, 0644); if (lockfds < 0) { sprintf(log_buffer, "pbs_mom: unable to open lock file - errno=%d '%s'\n", errno, strerror(errno)); fprintf(stderr, "%s", log_buffer); return(1); } mom_lock(lockfds, F_WRLCK); /* See if other MOMs are running */ /* initialize the network interface */ if (init_network(pbs_mom_port, mom_process_request) != 0) { c = errno; sprintf(log_buffer, "server port = %u, errno = %d (%s)", pbs_mom_port, c, strerror(c)); if (c == EADDRINUSE) strcat(log_buffer, ", already in use"); log_err(-1, msg_daemonname, log_buffer); strcat(log_buffer, "\n"); fprintf(stderr, "%s", log_buffer); return(3); } if (init_network(pbs_rm_port, tcp_request) != 0) { c = errno; sprintf(log_buffer, "resource (tcp) port = %u, errno = %d (%s)", pbs_rm_port, c, strerror(c)); if (c == EADDRINUSE) strcat(log_buffer, ", already in use"); log_err(-1, msg_daemonname, log_buffer); strcat(log_buffer, "\n"); fprintf(stderr, "%s", log_buffer); return(3); } if (read_config(NULL)) { fprintf(stderr, "pbs_mom: cannot load config file '%s'\n", config_file); exit(1); } #ifdef PENABLE_LINUX26_CPUSETS /* load system topology */ if ((hwloc_topology_init(&topology) == -1)) { log_err(-1, msg_daemonname, "Unable to init machine topology"); return(-1); } if ((hwloc_topology_set_flags(topology, HWLOC_TOPOLOGY_FLAG_WHOLE_SYSTEM) != 0)) { log_err(-1, msg_daemonname, "Unable to configure machine topology"); return(-1); } if ((hwloc_topology_load(topology) == -1)) { log_err(-1, msg_daemonname, "Unable to load machine topology"); return(-1); } sprintf(log_buffer, "machine topology contains %d memory nodes, %d cpus", hwloc_get_nbobjs_by_type(topology, HWLOC_OBJ_NODE), hwloc_get_nbobjs_by_type(topology, HWLOC_OBJ_PU)); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, log_buffer); internal_layout = node_internals(); #endif #ifdef NUMA_SUPPORT if ((rc = setup_nodeboards()) != 0) return(rc); #else snprintf(path_meminfo,sizeof(path_meminfo),"%s", "/proc/meminfo"); #endif /* END NUMA_SUPPORT */ #if defined(PENABLE_LINUX26_CPUSETS) /* NOTE: moved to before we go into background so that we can print an error * message if we can't mount the cpuset directory and it isn't mounted */ /* Create the top level torque cpuset if it doesn't already exist. */ if ((rc = init_torque_cpuset()) != 0) return(rc); #endif /* go into the background and become own session/process group */ #if !defined(DEBUG) && !defined(DISABLE_DAEMONS) mom_lock(lockfds, F_UNLCK); /* unlock so child can relock */ if (DOBACKGROUND == 1) { if (fork() > 0) { exit(0); /* parent goes away */ } if (setsid() == -1) { log_err(errno, msg_daemonname, "setsid failed"); return(2); } fclose(stdin); fclose(stdout); fclose(stderr); dummyfile = fopen("/dev/null", "r"); assert((dummyfile != 0) && (fileno(dummyfile) == 0)); dummyfile = fopen("/dev/null", "w"); assert((dummyfile != 0) && (fileno(dummyfile) == 1)); dummyfile = fopen("/dev/null", "w"); assert((dummyfile != 0) && (fileno(dummyfile) == 2)); } /* END if (DOBACKGROUND == 1) */ mom_lock(lockfds, F_WRLCK); /* lock out other MOMs */ #else /* DEBUG */ #if defined(_CRAY) /* CRAY cannot restart checkpointed job if MOM has controlling tty */ sprintf(log_buffer, "/tmp/pbs_mom.%d", getpid()); printf("Debug output will be in %s\n", log_buffer); freopen(log_buffer, "w", stdout); freopen(log_buffer, "w", stderr); ioctl(0, TCCLRCTTY, 0); close(0); #endif /* _CRAY */ setvbuf(stdout, NULL, _IOLBF, 0); setvbuf(stderr, NULL, _IOLBF, 0); #endif /* DEBUG */ /* write MOM's pid into lockfile */ if (ftruncate(lockfds, (off_t)0) != 0) { log_err(errno, msg_daemonname, "failed to truncate lockfile"); return(2); } sprintf(log_buffer, "%ld\n", (long)getpid()); if (write_ac_socket(lockfds, log_buffer, strlen(log_buffer) + 1) != (ssize_t)(strlen(log_buffer) + 1)) { log_err(errno, msg_daemonname, "failed to write to lockfile"); return(2); } #if (PLOCK_DAEMONS & 4) /* lock daemon into memory */ /* NOTE: should reduce maximum stack limit using ulimit() before calling plock */ if (plock(PROCLOCK) == -1) { log_err(errno, msg_daemonname, "failed to lock mom into memory with plock"); } else { MOMIsPLocked = 1; } #endif /* PLOCK_DAEMONS */ sigemptyset(&allsigs); act.sa_mask = allsigs; act.sa_flags = 0; /* ** Signals to be ignored. */ act.sa_handler = SIG_IGN; sigaction(SIGPIPE, &act, NULL); #ifdef SIGINFO sigaction(SIGINFO, &act, NULL); #endif /* SIGINFO */ sigaddset(&allsigs, SIGHUP); /* remember to block these */ sigaddset(&allsigs, SIGINT); /* during critical sections */ sigaddset(&allsigs, SIGTERM); /* so we don't get confused */ sigaddset(&allsigs, SIGCHLD); #ifdef _CRAY sigaddset(&allsigs, WJSIGNAL); #endif act.sa_mask = allsigs; /* ** We want to abort system calls ** and call a function. */ #ifdef SA_INTERRUPT act.sa_flags |= SA_INTERRUPT; /* don't restart system calls */ #endif #ifdef NOSIGCHLDMOM act.sa_handler = SIG_DFL; #else act.sa_handler = catch_child; /* set up to catch death of child */ #endif sigaction(SIGCHLD, &act, NULL); #ifdef _CRAY sigaction(WJSIGNAL, &act, NULL); #endif /* * Catch these signals to ensure we core dump even if * our rlimit for core dumps is set to 0 initially. * * Chris Samuel - VPAC * csamuel@vpac.org - 29th July 2003 * * Now conditional on the PBSCOREDUMP environment variable. */ if (getenv("PBSCOREDUMP")) { act.sa_handler = catch_abort; /* make sure we core dump */ sigaction(SIGSEGV, &act, NULL); sigaction(SIGBUS, &act, NULL); sigaction(SIGFPE, &act, NULL); sigaction(SIGILL, &act, NULL); sigaction(SIGTRAP, &act, NULL); sigaction(SIGSYS, &act, NULL); } act.sa_handler = catch_hup; /* do a restart on SIGHUP */ sigaction(SIGHUP, &act, NULL); act.sa_handler = toolong; /* handle an alarm call */ sigaction(SIGALRM, &act, NULL); act.sa_handler = stop_me; /* shutdown for these */ sigaction(SIGINT, &act, NULL); sigaction(SIGTERM, &act, NULL); act.sa_handler = PBSAdjustLogLevel; sigaction(SIGUSR1, &act, NULL); sigaction(SIGUSR2, &act, NULL); #ifdef SIGXCPU sigaction(SIGXCPU, &act, NULL); #endif #ifdef SIGXFSZ sigaction(SIGXFSZ, &act, NULL); #endif #ifdef SIGCPULIM sigaction(SIGCPULIM, &act, NULL); #endif #ifdef SIGSHUTDN sigaction(SIGSHUTDN, &act, NULL); #endif #ifdef _CRAY /* Special code for CRAY MLS Systems */ if (sysconf(_SC_CRAY_SECURE_SYS)) { struct usrv usrv; if (getusrv(&usrv) < 0) { fprintf(stderr, "cannot get security info\n"); return(1); } usrv.sv_permit = 0; usrv.sv_intcat = 0; usrv.sv_valcat = 0; if (setusrv(&usrv) < 0) { fprintf(stderr, "cannot put security info\n"); return(1); } if (setucat(0) < 0) { fprintf(stderr, "cannot put security cat\n"); return(2); } } #endif /* _CRAY */ /* initialize variables */ if ((hostname_specified != 0) || (hostc == 0)) { strcpy(mom_short_name, mom_host); c = get_fullhostname(mom_host, mom_host, PBS_MAXHOSTNAME, NULL); if (c != 0) { char logbuf[1024]; snprintf(logbuf, 1024, "Unable to get my full hostname for %s error %d", mom_host, c); log_err(-1, msg_daemonname, logbuf); return(-1); } } time_now = time((time_t *)0); ret_size = RETURN_STRING_SIZE; if ((ret_string = (char *)calloc(1, ret_size)) == NULL) { perror("calloc"); exit(1); } localaddr = ntohl(inet_addr("127.0.0.1")); okclients = AVL_insert(localaddr, 0, NULL, okclients); addclient(mom_host); if (gethostname(ret_string, ret_size) == 0) addclient(ret_string); /* if no alias is specified, make mom_alias the same as mom_host */ if (mom_alias[0] == '\0') strcpy(mom_alias,mom_short_name); initialize(); /* init RM code */ read_mom_hierarchy(); /* initialize machine-dependent polling routines */ if ((c = mom_open_poll()) != PBSE_NONE) { log_err(c, msg_daemonname, "pre_poll failed"); return(3); } if (mom_get_sample() != PBSE_NONE) { log_err(c, msg_daemonname, "mom_get_sample failed after mom_open_poll"); return(3); } things_to_resend = initialize_resizable_array(10); exiting_job_list = initialize_resizable_array(10); /* recover & abort jobs which were under MOM's control */ log_record( PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, msg_daemonname, "before init_abort_jobs"); init_abort_jobs(recover); #if PENABLE_LINUX26_CPUSETS /* Nuke cpusets that do not correspond to existing jobs */ cleanup_torque_cpuset(); recover_internal_layout(); #endif #ifdef _POSIX_MEMLOCK /* call mlockall() only 1 time, since it seems to leak mem */ if (MOMIsLocked == 0) { int mlockall_return; /* make sure pbs_mom stays in RAM and doesn't get paged out */ mlockall_return = mlockall(MCL_CURRENT | MCL_FUTURE); /* exit iff mlock failed, but ignore function not implemented error */ if ((mlockall_return == -1) && (errno != ENOSYS)) { perror("pbs_mom:mom_main.c:mlockall()"); exit(1); } MOMIsLocked = 1; } #endif /* _POSIX_MEMLOCK */ /* record the fact that we are up and running */ log_record( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, "Is up"); DBPRT(("MOM is up\n")); sprintf(log_buffer, "MOM executable path and mtime at launch: %s %ld", ArgV[0] == NULL ? "NULL" : ArgV[0], (long int)MOMExeTime); log_record(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, log_buffer); ptr = getenv("MOMSLEEPTIME"); if (ptr != NULL) { long tmpL; tmpL = strtol(ptr, NULL, 10); sleep(tmpL % (rand() + 1)); } /* END if (ptr != NULL) */ initialize_threadpool(&request_pool,MOM_THREADS,MOM_THREADS,THREAD_INFINITE); requested_cluster_addrs = 0; /* allocate status strings if needed */ received_statuses = initialize_resizable_array(2); received_table = create_hash(101); if ((received_statuses == NULL) || (received_table == NULL)) { log_err(ENOMEM, __func__, "No memory!!!"); return(-1); } srand(get_random_number()); return(PBSE_NONE); } /* END setup_program_environment() */ /* * TMOMJobGetStartInfo * * NOTE: if pjob is NULL, return empty slot, otherwise return slot containing job. */ int TMOMJobGetStartInfo( job *pjob, /* I */ pjobexec_t **TJEP) /* O */ { int index; for (index = 0;index < TMAX_JE;index++) { if (TMOMStartInfo[index].pjob == pjob) { *TJEP = &TMOMStartInfo[index]; return(SUCCESS); } } /* END for (index) */ return(FAILURE); } /* END TMOMJobGetStartInfo() */ /* * TMOMScanForStarting */ int TMOMScanForStarting(void) { job *pjob; job *nextjob; int Count; int RC; int SC; #ifdef MSIC list_link *tmpL; #endif #ifdef MSIC /* NOTE: solaris system is choking on GET_NEXT - isolate */ tmpL = GET_NEXT(svr_alljobs); tmpL = svr_alljobs.ll_next->ll_struct; pjob = (job *)tmpL; #endif /* MSIC */ pjob = (job *)GET_NEXT(svr_alljobs); while (pjob != NULL) { nextjob = (job *)GET_NEXT(pjob->ji_alljobs); if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_STARTING) { pjobexec_t *TJE = NULL; if (LOGLEVEL >= 2) { snprintf(log_buffer, sizeof(log_buffer), "checking job start in %s - examining pipe from child", __func__); log_record( PBSEVENT_JOB | PBSEVENT_FORCE, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } if (TMOMJobGetStartInfo(pjob, &TJE) == FAILURE) { sprintf(log_buffer, "job %s start data lost, server will retry", pjob->ji_qs.ji_jobid); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); exec_bail(pjob, JOB_EXEC_RETRY); pjob = nextjob; continue; } /* check if job is ready */ if (TMomCheckJobChild(TJE, 1, &Count, &RC) == FAILURE) { long STime; if (LOGLEVEL >= 3) { sprintf(log_buffer, "job %s child not started, will check later", pjob->ji_qs.ji_jobid); log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } /* if job has been in prerun > TJobStartTimeout, purge job */ STime = pjob->ji_wattr[JOB_ATR_mtime].at_val.at_long; if ((STime > 0) && ((time_now - STime) > TJobStartTimeout)) { sprintf(log_buffer, "job %s child not started after %ld seconds, server will retry", pjob->ji_qs.ji_jobid, TJobStartTimeout); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); memset(TJE, 0, sizeof(pjobexec_t)); exec_bail(pjob, JOB_EXEC_RETRY); } } else { /* NOTE: TMomFinalizeJob3() populates SC */ if (TMomFinalizeJob3(TJE, Count, RC, &SC) == FAILURE) { /* no need to log this, TMomFinalizeJob3() already did */ sprintf(log_buffer, "job %s failed after TMomFinalizeJob3", pjob->ji_qs.ji_jobid); log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); memset(TJE, 0, sizeof(pjobexec_t)); exec_bail(pjob, SC); } else { /* job successfully started */ memset(TJE, 0, sizeof(pjobexec_t)); if (LOGLEVEL >= 3) { sprintf(log_buffer, "%s:job %s reported successful start", __func__, pjob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buffer); } } } /* END else (TMomCheckJobChild() == FAILURE) */ } /* END if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_STARTING) */ pjob = nextjob; } /* END while (pjob != NULL) */ return(SUCCESS); } /* END TMOMScanForStarting() */ /** * examine_all_polled_jobs * * check on over limit condition for polled jobs */ void examine_all_polled_jobs(void) { job *pjob; int c; for (pjob = (job *)GET_NEXT(mom_polljobs);pjob; pjob = (job *)GET_NEXT(pjob->ji_jobque)) { if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING) continue; /* ** Send message to get info from other MOM's ** if I am Mother Superior for the job and ** it is not being killed. */ if ((am_i_mother_superior(*pjob) == true) && (pjob->ji_nodekill == TM_ERROR_NODE)) { /* ** If can't send poll to everybody, the ** time has come to die. */ if (send_sisters(pjob, IM_POLL_JOB, FALSE) != pjob->ji_numnodes - 1) { sprintf(log_buffer, "cannot contact all sisters"); log_record(PBSEVENT_JOB | PBSEVENT_FORCE, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); } } c = pjob->ji_qs.ji_svrflags; if (c & JOB_SVFLG_OVERLMT2) { kill_job(pjob, SIGKILL, __func__, "job is over-limit-2"); continue; } if (c & JOB_SVFLG_OVERLMT1) { kill_job(pjob, SIGTERM, __func__, "job is over-limit-1"); pjob->ji_qs.ji_svrflags |= JOB_SVFLG_OVERLMT2; continue; } if (c & JOB_SVFLG_JOB_ABORTED) { kill_job(pjob, SIGTERM, __func__, "job has been aborted"); continue; } log_buffer[0] = '\0'; if (job_over_limit(pjob) != 0) { log_record( PBSEVENT_JOB | PBSEVENT_FORCE, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, log_buffer); if (am_i_mother_superior(*pjob) == true) { char *kill_msg; kill_msg = (char *)calloc(1, 80 + strlen(log_buffer)); if (kill_msg != NULL) { sprintf(kill_msg,"=>> PBS: job killed: %s\n", log_buffer); message_job(pjob,StdErr,kill_msg); free(kill_msg); } } kill_job(pjob, SIGTERM, __func__, "job is over-limit-0"); pjob->ji_qs.ji_svrflags |= JOB_SVFLG_OVERLMT1; } } /* END for (pjob) */ return; } /* END examine_all_polled_jobs() */ /* * examine_all_running_jobs */ void examine_all_running_jobs(void) { job *pjob; #ifdef _CRAY int c; #endif task *ptask; for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { if (pjob->ji_qs.ji_substate != JOB_SUBSTATE_RUNNING) { if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_PRERUN) { if (pjob->ji_examined <= 10) { pjob->ji_examined++; } else { sprintf(log_buffer, "job %s already examined. substate=%d", pjob->ji_qs.ji_jobid, pjob->ji_qs.ji_substate); log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, __func__, log_buffer); /* This is the second time through examine_all_running_jobs for this job while in a PRERUN state. Something is wrong. kill the job. */ exec_bail(pjob, JOB_EXEC_INITABT); } } continue; /* This job is not running, skip it. */ } if (am_i_mother_superior(*pjob) == false) continue; /* We are not the Mother Superior for this job, skip it. */ /* update information for my tasks */ mom_set_use(pjob); /* Machine dependent function to compute and set attributes like cput, vmem, etc. */ /* Have all job processes vanished undetected? */ /* double check by sig0 to session pid for each task */ /* But why not use the proc_array? */ if (pjob->ji_flags & MOM_NO_PROC) { pjob->ji_flags &= ~MOM_NO_PROC; for (ptask = (task *)GET_NEXT(pjob->ji_tasks); ptask != NULL; ptask = (task *)GET_NEXT(ptask->ti_jobtask)) { #ifdef _CRAY if (pjob->ji_globid[0] == '\0') break; c = atoi(pjob->ji_globid); if ((kill((pid_t)c, 0) == -1) && (errno == ESRCH)) #else /* not cray */ if ((kill(ptask->ti_qs.ti_sid, 0) == -1) && (errno == ESRCH)) #endif /* not cray */ { if (LOGLEVEL >= 3) { log_event( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "no active process found"); } ptask->ti_qs.ti_exitstat = 0; ptask->ti_qs.ti_status = TI_STATE_EXITED; pjob->ji_qs.ji_un.ji_momt.ji_exitstat = 0; if (LOGLEVEL >= 6) { log_record( PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "saving task (main loop)"); } task_save(ptask); exiting_tasks = 1; } /* END if ((kill == -1) && ...) */ } /* END while (ptask != NULL) */ } /* END if (pjob->ji_flags & MOM_NO_PROC) */ mom_checkpoint_check_periodic_timer(pjob); } /* END for (pjob) */ return; } /* END examine_all_running_jobs() */ /** * examine_all_jobs_to_resend * * tries to resend each of the jobs that hasn't been sent yet */ void examine_all_jobs_to_resend(void) { int jindex; job *pjob; for (jindex=0;jindex < MAX_RESEND_JOBS;jindex++) { /* no job ptrs are stored after a NULL value */ if (JobsToResend[jindex][0] == '\0') break; /* skip dummy job */ if (JobsToResend[jindex][0] == (char)DUMMY_JOB_PTR) continue; if ((pjob = mom_find_job(JobsToResend[jindex])) == NULL) { /* job no longer exists, remove from re-send array */ JobsToResend[jindex][0] = (char)DUMMY_JOB_PTR; JobsToResend[jindex][1] = '\0'; } else if (!post_epilogue(pjob, MOM_OBIT_RETRY)) { if (LOGLEVEL >= 7) log_record(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "job obit resent"); /* sent successfully, make this slot the dummy pointer */ JobsToResend[jindex][0] = (char)DUMMY_JOB_PTR; JobsToResend[jindex][1] = '\0'; } } } /* END examine_all_jobs_to_resend() */ void resend_waiting_joins( job *pjob) { hnodent *np; int i; int stream; eventent *ep; tlist_head phead; CLEAR_HEAD(phead); for (i = 0; i < JOB_ATR_LAST; i++) { (job_attr_def + i)->at_encode(pjob->ji_wattr + i, &phead, (job_attr_def + i)->at_name, NULL, ATR_ENCODE_MOM, ATR_DFLAG_ACCESS); } attrl_fixlink(&phead); for (i = 1; i < pjob->ji_numnodes; i++) { np = &pjob->ji_hosts[i]; if ((ep = (eventent *)GET_NEXT(np->hn_events)) != NULL) { /* we haven't received the reply yet */ stream = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr, sizeof(np->sock_addr)); if (IS_VALID_STREAM(stream)) { if (send_join_job_to_a_sister(pjob, stream, ep, phead, i) == DIS_SUCCESS) { /* SUCCESS */ snprintf(log_buffer, sizeof(log_buffer), "Successfully re-sent join job request to %s", np->hn_host); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); } close(stream); } } } free_attrlist(&phead); } /* END resend_waiting_joins() */ void check_jobs_awaiting_join_job_reply() { job *pjob; time_now = time(NULL); for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { if ((pjob->ji_qs.ji_substate == JOB_SUBSTATE_PRERUN) && (pjob->ji_qs.ji_state == JOB_STATE_RUNNING) && (am_i_mother_superior(*pjob) == true)) { /* these jobs have sent out join requests but haven't received all replies */ if (time_now - pjob->ji_joins_sent > max_join_job_wait_time) { exec_bail(pjob, JOB_EXEC_RETRY); } else if ((time_now - pjob->ji_joins_sent > resend_join_job_wait_time) && (pjob->ji_joins_resent == FALSE)) { pjob->ji_joins_resent = TRUE; resend_waiting_joins(pjob); } } } /* END for each job */ } /* END check_jobs_awaiting_join_job_reply() */ void check_jobs_in_mom_wait() { job *pjob; time_now = time(NULL); for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_MOM_WAIT) { if ((pjob->ji_kill_started != 0) && (time_now - pjob->ji_kill_started > job_exit_wait_time)) { /* job has exceeded the time to wait for all sisters * to report that the job is killed. Go ahead and finish * it anyway */ snprintf(log_buffer, sizeof(log_buffer), "Job %s has exceeded %d seconds, the time to wait for sisters to confirm it is finished. Cleaning up.", pjob->ji_qs.ji_jobid, job_exit_wait_time); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buffer); pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; job_save(pjob, SAVEJOB_QUICK, pbs_rm_port); exiting_tasks = 1; } } } /* END loop over all jobs */ } /* END check_jobs_in_mom_wait() */ void check_exiting_jobs() { exiting_job_info *eji; job *pjob; time_t time_now = time(NULL); while ((eji = (exiting_job_info *)pop_thing(exiting_job_list)) != NULL) { if (time_now - eji->obit_sent < 300) { /* insert this back at the front */ insert_thing_after(exiting_job_list, eji, ALWAYS_EMPTY_INDEX); break; } pjob = mom_find_job(eji->jobid); if (pjob == NULL) { free(eji); } else { post_epilogue(pjob, 0); eji->obit_sent = time_now; insert_thing(exiting_job_list, eji); } } } /* END check_exiting_jobs() */ /* * kill_all_running_jobs */ void kill_all_running_jobs(void) { job *pjob; unsigned int momport = 0; for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_RUNNING) { kill_job(pjob, SIGKILL, "kill_all_running_jobs", "mom is terminating with kill jobs flag"); pjob->ji_qs.ji_substate = JOB_SUBSTATE_EXITING; if (multi_mom) { momport = pbs_rm_port; } job_save(pjob, SAVEJOB_QUICK, momport); } else { term_job(pjob); } } /* END for (pjob) */ #ifndef NOSIGCHLDMOM if (termin_child != 0) #endif scan_for_terminated(); if (exiting_tasks) scan_for_exiting(); return; } /* END kill_all_running_jobs() */ /** * mark_for_resend * * used to keep track of jobs whose obits weren't sent correctly * marks them so they can be resent * * @param pjob - the job that should be resent */ int mark_for_resend( job *pjob) /* I */ { int jindex; int rc = PBSE_NONE; if (pjob == NULL) { rc = PBSE_JOBNOTFOUND; return(rc); } for (jindex = 0;jindex < MAX_RESEND_JOBS;jindex++) { if ((JobsToResend[jindex][0] == '\0') || (JobsToResend[jindex][0] == (char)DUMMY_JOB_PTR)) { strcpy(JobsToResend[jindex], pjob->ji_qs.ji_jobid); if (LOGLEVEL >= 7) { log_record( PBSEVENT_JOB, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "marking job for resend"); } rc = PBSE_NONE; break; } } return(rc); } /* * This is for a mom starting with the -P option. Set all existing * tasks to TI_STATE_EXITED so they can be cleanup up on the mom * and at the server */ void prepare_child_tasks_for_delete() { job *pJob; for (pJob = (job *)GET_NEXT(svr_alljobs);pJob != NULL;pJob = (job *)GET_NEXT(pJob->ji_alljobs)) { task *pTask; for (pTask = (task *)GET_NEXT(pJob->ji_tasks);pTask != NULL;pTask = (task *)GET_NEXT(pTask->ti_jobtask)) { char buf[128]; extern int exiting_tasks; sprintf(buf, "preparing exited session %d for task %d in job %s for deletion", (int)pTask->ti_qs.ti_sid, pTask->ti_qs.ti_task, pJob->ji_qs.ji_jobid); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, buf); pTask->ti_qs.ti_exitstat = 0; /* actually unknown */ pTask->ti_qs.ti_status = TI_STATE_EXITED; task_save(pTask); exiting_tasks = 1; } } } /** * main_loop * * @see main() - parent */ void main_loop(void) { extern time_t wait_time; double myla; time_t tmpTime; #ifdef USESAVEDRESOURCES int check_dead = TRUE; #endif /* USESAVEDRESOURCES */ mom_run_state = MOM_RUN_STATE_RUNNING; /* mom_run_state is altered by stop_me() or MOMCheckRestart() */ while (mom_run_state == MOM_RUN_STATE_RUNNING) { if (call_hup) { process_hup(); /* Do a restart of resmom */ } dep_main_loop_cycle(); /* Call machine dependent code periodically */ time_now = time(NULL); /* check if loadave means we should be "busy" */ if (max_load_val > 0.0) { get_la(&myla); /* Machine dependent load average computation (for linux read contents of /proc/loadavg) */ /* check if need to update busy state */ check_busy(myla); } /* should we check the log file ?*/ if (time_now >= (last_log_check + PBS_LOG_CHECK_RATE)) { check_log(); /* Possibly do a log_roll */ } /* check for what needs to be sent is now done within */ mom_server_all_update_stat(); /* if needed, update server with my state change */ /* can be changed in check_busy(), query_adp(), and update_stat() */ mom_server_all_send_state(); #ifdef USESAVEDRESOURCES /* if -p, must poll tasks inside jobs to look for completion */ if ((check_dead) && (recover == JOB_RECOV_RUNNING)) scan_non_child_tasks(); #endif if (time_now >= (last_poll_time + CheckPollTime)) { last_poll_time = time_now; if (GET_NEXT(svr_alljobs)) { /* There are jobs, update process status from the OS */ if (mom_get_sample() == PBSE_NONE) { /* no errors in getting process status information */ examine_all_running_jobs(); examine_all_polled_jobs(); examine_all_jobs_to_resend(); } } } #ifdef USESAVEDRESOURCES check_dead = FALSE; #endif /* USESAVEDRESOURCES */ #ifndef NOSIGCHLDMOM if (termin_child != 0) /* termin_child is a flag set by the catch_child signal handler */ #endif scan_for_terminated(); /* machine dependent (calls mom_get_sample()???) */ /* if -p, must poll tasks inside jobs to look for completion */ if (recover == JOB_RECOV_RUNNING) scan_non_child_tasks(); if (recover == JOB_RECOV_DELETE) { prepare_child_tasks_for_delete(); /* we can only do this once so set recover back to the default */ recover = JOB_RECOV_RUNNING; } check_jobs_awaiting_join_job_reply(); check_jobs_in_mom_wait(); if (exiting_tasks) scan_for_exiting(); check_exiting_jobs(); TMOMScanForStarting(); /* unblock signals */ if (sigprocmask(SIG_UNBLOCK, &allsigs, NULL) == -1) log_err(errno, __func__, "sigprocmask(UNBLOCK)"); time_now = time((time_t *)0); tmpTime = MIN(wait_time, time_now - (LastServerUpdateTime + ServerStatUpdateInterval)); tmpTime = MIN(tmpTime, time_now - (last_poll_time + CheckPollTime)); tmpTime = MAX(1, tmpTime); if (LastServerUpdateTime == 0) tmpTime = 1; resend_things(); /* wait_request does a select and then calls the connection's cn_func for sockets with data */ if (wait_request(tmpTime, NULL) != 0) { if (errno == EBADF) { init_network(pbs_mom_port, mom_process_request); init_network(pbs_rm_port, tcp_request); } log_err(-1, msg_daemonname, "wait_request failed"); } /* block signals while we do things */ if (sigprocmask(SIG_BLOCK, &allsigs, NULL) == -1) log_err(errno, __func__, "sigprocmask(BLOCK)"); if (GET_NEXT(svr_alljobs) == NULL) { MOMCheckRestart(); /* There are no jobs, see if the server needs to be restarted. */ } } /* END while (mom_run_state == MOM_RUN_STATE_RUNNING) */ return; } /* END main_loop() */ /* * restart_mom */ void restart_mom( int argc, char *argv[]) { const char *envstr = "PATH"; int rc; rc = put_env_var(envstr, OriginalPath); if (rc) { sprintf(log_buffer, "put_env_var failed with %d prior to execing myself", rc); log_err(rc, __func__, log_buffer); return; } DBPRT(("Re-execing myself now...\n")); execv(ArgV[0], argv); sprintf(log_buffer, "execing myself failed: %s (%d)", strerror(errno), errno); log_err(errno, __func__, log_buffer); return; } /* END restart_mom() */ #ifdef NUMA_SUPPORT /* * Parses mom.layout for layout of nodeboards. * Initializes nodeset of each nodeboard. * num_node_boards is set to the number of nodeboards. * Return 0 on success, other on error. */ int read_layout_file() { FILE *layout; char line[MAX_LINE]; const char *delims = " \t\n\r="; char *tok = NULL; const char *val = NULL; int i = 0; int empty_line; hwloc_bitmap_t nodeset = NULL; if ((layout = fopen(path_layout, "r")) == NULL) { snprintf(log_buffer,sizeof(log_buffer), "Unable to read the layout file in %s\n", path_layout); log_err(errno, __func__, log_buffer); return(NO_LAYOUT_FILE); } /* parse lines. format is: * nodes= * extra key=value pairs are ignored by TORQUE but don't cause a failure */ while (fgets(line, sizeof(line), layout) != NULL) { empty_line = TRUE; /* initialize the end to -1 while start defaults to zero * so that add_mic_status() skips over the nodeboard if not * configured */ node_boards[i].mic_end_index = -1; /* Strip off comments */ if ((tok = strchr(line, '#')) != NULL) *tok = '\0'; tok = strtok(line,delims); /* find the specifications */ while (tok != NULL) { /* do general error checking on each pair, should be in * the format name=val, spacing optional */ val = strtok(NULL,delims); if (val == NULL) { snprintf(log_buffer,sizeof(log_buffer), "Malformed mom.layout file, line:\n%s\n", line); goto failure; } empty_line = FALSE; if (strcmp(tok,"nodes") == 0) { /* Allocate temp nodeset */ if ((nodeset = hwloc_bitmap_alloc()) == NULL) { log_err(errno, __func__, "failed to allocate nodeset"); return(-1); } /* Parse val into nodeset, abort if parsing fails */ if ((node_boards[i].num_nodes = hwloc_bitmap_list_sscanf(nodeset, val)) < 0) { sprintf(log_buffer, "failed to parse mom.layout file token: nodes=%s", val); goto failure; } } else if (strcmp(tok, "mic") == 0) { /* read the mics specified for this node board. This is in the form * index1[-index2] specifying a range*/ char *micval = strdup(val); char *start = strtok(micval, "-"); char *end = strtok(NULL, "-"); node_boards[i].mic_start_index = strtol(start, NULL, 10); node_boards[i].mic_end_index = node_boards[i].mic_start_index; if (end != NULL) node_boards[i].mic_end_index = strtol(end, NULL, 10); free(micval); } else if (strcmp(tok,"memsize") == 0) { node_boards[i].memsize = atoi(val); /* default to KB, keeping with TORQUE tradition */ if ((strstr(val,"gb")) || (strstr(val,"GB"))) { node_boards[i].memsize *= 1024 * 1024; } else if ((strstr(val,"mb")) || (strstr(val,"MB"))) { node_boards[i].memsize *= 1024; } } else { /* ignore other stuff for now */ } /* advance token */ tok = strtok(NULL,delims); } /* END while (parsing line) */ if (empty_line == FALSE) { /* Check if we have a nodeset, abort if not */ if (nodeset != NULL) { /* Store nodeset of node_boards[i], may be empty */ if ((node_boards[i].nodeset = hwloc_bitmap_dup(nodeset)) == NULL) { log_err(errno, __func__, "failed to duplicate nodeset"); return(-1); } /* Free temp nodeset */ hwloc_bitmap_free(nodeset); nodeset = NULL; /* Show what we have */ sprintf(log_buffer, "nodeboard %2d: %d NUMA nodes: ", i, node_boards[i].num_nodes); hwloc_bitmap_list_snprintf(log_buffer + strlen(log_buffer), sizeof(log_buffer) - strlen(log_buffer), node_boards[i].nodeset); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE, __func__, log_buffer); } else { sprintf(log_buffer, "nodeboard %d has no nodeset", i); goto failure; } /* Done with this nodeboard */ i++; } } /* END while (parsing file) */ fclose(layout); num_node_boards = i; snprintf(log_buffer,sizeof(log_buffer), "Setting up this mom to function as %d numa nodes\n", num_node_boards); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE, __func__, log_buffer); return(PBSE_NONE); failure: log_err(-1, __func__, log_buffer); if (nodeset) hwloc_bitmap_free(nodeset); return(BAD_LAYOUT_FILE); } /* END read_layout_file() */ /* * Parses mom.layout, registers procs/mem. Dies if there's a problem. * Currently we allow nodeset and/or cpuset to be empty. * We also do not check if nodesets/cpusets of nodeboards overlap. * * parses mom.layout, registers procs/mem * @return nonzero if there's a problem */ int setup_nodeboards() { hwloc_obj_t obj; hwloc_obj_t pu; int i; int j; int k; int rc; int mempath_len = strlen("/sys/devices/system/node/node999999/meminfo"); /* Read mom.layout, init nodesets */ if ((rc = read_layout_file()) != PBSE_NONE) { log_err(-1, __func__, "Could not read layout file!\n"); exit(rc); } /* * Walk through nodeboards * - set up cpuset and nodeset * - set up meminfo */ for (i = 0; i < num_node_boards; i++) { /* Allocate cpuset for this nodeboard */ if ((node_boards[i].cpuset = hwloc_bitmap_alloc()) == NULL) { log_err(errno, __func__, "failed to allocate cpuset"); exit(-1); } /* Derive cpuset from nodeset */ hwloc_cpuset_from_nodeset_strict(topology, node_boards[i].cpuset, node_boards[i].nodeset); /* * Handle SMT CPUs. * If a system has SMT enabled, there are more than one logical CPU per physical core. * If MOMConfigUseSMT is off, we only want the first logical CPU of a core in the cpuset. * Thus we map the additional logical CPUs out of the cpuset. * To be portable among architectures as much as possible, the only assumption that * is made here is that the CPUs to become mapped out are HWLOC_OBJ_PU objects that * are children of a HWLOC_OBJ_CORE object. * If there are no HWLOC_OBJ_CORE objects in the cpuset, we cannot detect if cpuset members * are physical or logical. Then the cpuset is left as-is. */ if (!MOMConfigUseSMT) { for (obj = hwloc_get_next_obj_inside_cpuset_by_type(topology, node_boards[i].cpuset, HWLOC_OBJ_CORE, NULL); obj; obj = hwloc_get_next_obj_inside_cpuset_by_type(topology, node_boards[i].cpuset, HWLOC_OBJ_CORE, obj)) { j = 1; while ((pu = hwloc_get_obj_inside_cpuset_by_type(topology, obj->cpuset, HWLOC_OBJ_PU, j++)) != NULL) hwloc_bitmap_andnot(node_boards[i].cpuset, node_boards[i].cpuset, pu->cpuset); } } /* Number of CPUs in cpuset */ node_boards[i].num_cpus = hwloc_bitmap_weight(node_boards[i].cpuset); /* * Convert cpuset back to nodeset. * This maps out NUMA nodes, which were given in mom.layout, * but which do not exist for whatever reason. */ hwloc_cpuset_to_nodeset_strict(topology, node_boards[i].cpuset, node_boards[i].nodeset); node_boards[i].num_nodes = hwloc_bitmap_weight(node_boards[i].nodeset); /* Set up meminfo paths */ if (node_boards[i].num_nodes) { if ((node_boards[i].path_meminfo = (char **)calloc(node_boards[i].num_nodes, sizeof(char *))) == NULL) { log_err(errno, __func__, "failed to allocate memory"); exit(-1); } k = 0; hwloc_bitmap_foreach_begin(j, node_boards[i].nodeset) { if ((node_boards[i].path_meminfo[k] = (char *)calloc(1, mempath_len)) == NULL) { log_err(errno, __func__, "failed to allocate memory"); exit(-1); } snprintf(node_boards[i].path_meminfo[k], mempath_len, "/sys/devices/system/node/node%d/meminfo", j); k++; } hwloc_bitmap_foreach_end(); } /* Show what we have */ snprintf(log_buffer, sizeof(log_buffer), "nodeboard %2d: %d cpus (", i, node_boards[i].num_cpus); hwloc_bitmap_list_snprintf(log_buffer + strlen(log_buffer), sizeof(log_buffer) - strlen(log_buffer), node_boards[i].cpuset); snprintf(log_buffer + strlen(log_buffer), sizeof(log_buffer) - strlen(log_buffer), "), %d mems (", node_boards[i].num_nodes); hwloc_bitmap_list_snprintf(log_buffer + strlen(log_buffer), sizeof(log_buffer) - strlen(log_buffer), node_boards[i].nodeset); snprintf(log_buffer + strlen(log_buffer), sizeof(log_buffer) - strlen(log_buffer), ")"); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE, __func__, log_buffer); } /* END for each nodeboard */ return(PBSE_NONE); } /* END setup_nodeboards() */ #endif /* ifdef NUMA_SUPPORT */ /* * * @see main_loop() - child */ int main( int argc, /* I */ char *argv[]) /* I */ { int rc; int tmpFD; FILE *fp; /* populate a variable useful when analyzing a core file */ memset(Torque_Info_SysVersion, 0, sizeof(Torque_Info_SysVersion)); if ((fp = fopen("/proc/version", "r")) != NULL) { fread(Torque_Info_SysVersion, sizeof(Torque_Info_SysVersion), 1, fp); fclose(fp); } tmpFD = sysconf(_SC_OPEN_MAX); /* close any inherited extra files, leaving stdin, stdout, and stderr open */ while (--tmpFD > 2) close(tmpFD); save_args(argc, argv); initialize_globals(); parse_command_line(argc, argv); /* Calls exit on command line error */ if ((rc = setup_program_environment()) != 0) { return(rc); } #ifdef NVIDIA_GPUS #ifdef NVML_API if (!init_nvidia_nvml()) { use_nvidia_gpu = FALSE; } #endif /* NVML_API */ if (!check_nvidia_setup()) { use_nvidia_gpu = FALSE; } if (!use_nvidia_gpu) { sprintf(log_buffer, "Not using Nvidia gpu support even though built with --enable-nvidia-gpus"); log_ext(-1, "main", log_buffer, LOG_DEBUG); } #endif /* NVIDIA_GPUS */ main_loop(); if (mom_run_state == MOM_RUN_STATE_KILLALL) { kill_all_running_jobs(); } /* shutdown mom */ #if defined(NVIDIA_GPUS) && defined(NVML_API) shut_nvidia_nvml(); #endif /* NVIDIA_GPUS and NVML_API */ mom_close_poll(); net_close(-1); /* close all network connections */ if (mom_run_state == MOM_RUN_STATE_RESTART) { sprintf(log_buffer, "Will be restarting: %s", ArgV[0]); log_record(PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); } log_record(PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, "Is down"); log_close(1); if (mom_run_state == MOM_RUN_STATE_RESTART) { restart_mom(argc, argv); } return(0); } /* END main() */ im_compose_info *create_compose_reply_info( char *jobid, char *cookie, hnodent *np, int command, tm_event_t event, tm_task_id taskid) { im_compose_info *ici = (im_compose_info *)calloc(1, sizeof(im_compose_info)); if (ici != NULL) { snprintf(ici->jobid, sizeof(ici->jobid), "%s", jobid); snprintf(ici->cookie, sizeof(ici->cookie), "%s", cookie); memcpy(&(ici->np), np, sizeof(ici->np)); ici->command = command; ici->event = event; ici->taskid = taskid; } else log_err(ENOMEM, __func__, "Cannot allocate memory!"); return(ici); } /* END create_compose_reply_info() */ int im_compose_send_info( struct tcp_chan *chan, im_compose_info *ici) { return(im_compose(chan, ici->jobid, ici->cookie, ici->command, ici->event, ici->taskid)); } /* END im_compose_send_info() */ int resend_compose_reply( im_compose_info *ici) { int ret = -1; hnodent *np; int stream; char log_buf[LOCAL_LOG_BUF_SIZE]; struct tcp_chan *chan = NULL; np = &ici->np; stream = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr, sizeof(np->sock_addr)); if (IS_VALID_STREAM(stream)) { if ((chan = DIS_tcp_setup(stream)) == NULL) { } else if ((ret = im_compose_send_info(chan, ici)) == DIS_SUCCESS) { if ((ret = DIS_tcp_wflush(chan)) == DIS_SUCCESS) { sprintf(log_buf, "re-sent im_compose reply to %s", netaddr(&np->sock_addr)); log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, ici->jobid, log_buf); free(ici); } } close(stream); if (chan != NULL) DIS_tcp_cleanup(chan); } return(ret); } /* END resend_compose_reply() */ int resend_kill_job_reply( killjob_reply_info *kj) { int stream; int ret = -1; hnodent *np; struct tcp_chan *chan = NULL; np = &kj->ici->np; stream = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr, sizeof(np->sock_addr)); if (IS_VALID_STREAM(stream)) { if ((chan = DIS_tcp_setup(stream)) == NULL) { } else if ((ret = im_compose_send_info(chan, kj->ici)) == DIS_SUCCESS) { if ((ret = diswul(chan, kj->cputime)) == DIS_SUCCESS) { if ((ret = diswul(chan, kj->mem)) == DIS_SUCCESS) { if ((ret = diswul(chan, kj->vmem)) == DIS_SUCCESS) { if (kj->node_id >= 0) ret = diswsi(chan, kj->node_id); if (ret == DIS_SUCCESS) { if ((ret = DIS_tcp_wflush(chan)) == DIS_SUCCESS) { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, kj->ici->jobid, "Successfully re-sent kill job reply"); free(kj->ici); free(kj); } } } } } } close(stream); if (chan != NULL) DIS_tcp_cleanup(chan); } return(ret); } /* END resend_kill_job_reply() */ int resend_spawn_task_reply( spawn_task_info *st) { int ret = -1; hnodent *np = &st->ici->np; struct tcp_chan *chan = NULL; int stream = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr, sizeof(np->sock_addr)); if (IS_VALID_STREAM(stream)) { if ((chan = DIS_tcp_setup(stream)) == NULL) { } else if ((ret = im_compose_send_info(chan, st->ici)) == DIS_SUCCESS) { if ((ret = diswsi(chan, st->ti_task)) == DIS_SUCCESS) { if ((ret = DIS_tcp_wflush(chan)) == DIS_SUCCESS) { if (ret == DIS_SUCCESS) { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, st->ici->jobid, "Successfully re-sent spawn task reply"); free(st->ici); free(st); } } } } close(stream); if (chan != NULL) DIS_tcp_cleanup(chan); } return(ret); } /* END resend_spawn_task_reply() */ int resend_obit_task_reply( obit_task_info *ot) { int ret = -1; hnodent *np = &ot->ici->np; struct tcp_chan *chan = NULL; int stream = tcp_connect_sockaddr((struct sockaddr *)&np->sock_addr, sizeof(np->sock_addr)); if (IS_VALID_STREAM(stream)) { if ((chan = DIS_tcp_setup(stream)) == NULL) { } else if ((ret = im_compose_send_info(chan, ot->ici)) == DIS_SUCCESS) { if ((ret = diswsi(chan, ot->ti_exitstat)) == DIS_SUCCESS) { if ((ret = DIS_tcp_wflush(chan)) == DIS_SUCCESS) { if (ret == DIS_SUCCESS) { log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, ot->ici->jobid, "Successfully re-sent obit task reply"); free(ot->ici); free(ot); } } } } close(stream); if (chan != NULL) DIS_tcp_cleanup(chan); } return(ret); } /* END resend_obit_task_reply() */ void resend_things() { int iter = -1; int ret; resend_momcomm *mc; im_compose_info *ici; killjob_reply_info *kj; spawn_task_info *st; time_t time_now = time(NULL); while ((mc = (resend_momcomm *)next_thing(things_to_resend, &iter)) != NULL) { if (time_now - mc->resend_time < RESEND_INTERVAL) continue; ret = -1; mc->resend_attempts += 1; switch (mc->mc_type) { case COMPOSE_REPLY: ici = (im_compose_info *)mc->mc_struct; ret = resend_compose_reply(ici); break; case KILLJOB_REPLY: kj = (killjob_reply_info *)mc->mc_struct; ret = resend_kill_job_reply(kj); break; case SPAWN_TASK_REPLY: st = (spawn_task_info *)mc->mc_struct; ret = resend_spawn_task_reply(st); break; case OBIT_TASK_REPLY: ret = resend_obit_task_reply((obit_task_info *)mc->mc_struct); break; default: snprintf(log_buffer, sizeof(log_buffer), "I don't recognize send mom communication of type %d", mc->mc_type); log_err(-1, __func__, log_buffer); /* remove the garbage */ ret = DIS_SUCCESS; break; } if ((ret == DIS_SUCCESS) || (mc->resend_attempts > 3)) { remove_thing(things_to_resend, mc); free(mc); } else mc->resend_time = time_now; } /* END for each resendable thing */ } /* END resend_things() */ int add_to_resend_things( resend_momcomm *mc) { mc->resend_time = time(NULL); return(insert_thing(things_to_resend, mc)); } /* END add_to_resend_things() */ /* END mom_main.c */