/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ /* * The entry point function for pbs_daemon. * * Included public functions re: * * main initialization and main loop of pbs_daemon */ #include <pbs_config.h> /* the master config generated by configure */ #include <sys/wait.h> #include <utime.h> #include <sys/types.h> #include <sys/stat.h> #if (PLOCK_DAEMONS & 1) #include <sys/lock.h> #endif /* PLOCK_DAEMONS */ #include <netinet/in.h> #include "pbs_ifl.h" #include <assert.h> #include <ctype.h> #include <errno.h> #include <fcntl.h> #include <netdb.h> #include <stdio.h> #include <string.h> #include <time.h> #include <unistd.h> #include <stdlib.h> #include "list_link.h" #include "work_task.h" #include "log.h" #include "server_limits.h" #include "attribute.h" #include "pbs_job.h" #include "queue.h" #include "server.h" #include "pbs_nodes.h" #include "net_connect.h" #include "credential.h" #include "svrfunc.h" #include "tracking.h" #include "acct.h" #include "sched_cmds.h" #include "rpp.h" #include "dis.h" #include "dis_init.h" #include "batch_request.h" #include "pbs_proto.h" #ifdef USE_HA_THREADS #include <pthread.h> #endif /* USE_HA_THREADS */ #define TSERVER_HA_CHECK_TIME 1 /* 1 second sleep time between checks on the lock file for high availability */ /* external functions called */ extern void job_log_roll(int max_depth); extern int pbsd_init(int); extern void shutdown_ack(); extern int update_nodes_file(void); extern void tcp_settimeout(long); extern void poll_job_task(struct work_task *); extern int schedule_jobs(void); extern int notify_listeners(void); extern void queue_route(pbs_queue *); extern void svr_shutdown(int); extern void acct_close(void); extern int svr_startjob(job *, struct batch_request *, char *, char *); extern int RPPConfigure(int, int); extern void acct_cleanup(long); #ifdef NO_SIGCHLD extern void check_children(); #endif #ifndef MAX_LINE #define MAX_LINE 1024 #endif #ifndef MAX_PATH_LEN #define MAX_PATH_LEN 256 #endif #ifndef MAX_NAME #define MAX_NAME 64 #endif #ifndef DEF_USPERSECOND #define DEF_USPERSECOND 1000000 #endif #ifndef bool_t #define bool_t unsigned char #endif #define ISEMPTYSTR(STR) ((STR)[0] == '\0') #ifndef MAX_CMD_ARGS #define MAX_CMD_ARGS 10 #endif #ifdef USE_HA_THREADS static void lock_out_ha (); #else static void lock_out (int,int); static int try_lock_out (int,int); #endif /* external data items */ extern int svr_chngNodesfile; extern int svr_totnodes; /* External Functions */ extern int get_svr_attr (int); /* Local Private Functions */ static int get_port (char *, unsigned int *, pbs_net_t *); static int daemonize_server (int,int *); int mutex_lock (mutex_t *); int mutex_unlock (mutex_t *); int get_file_info (char *,unsigned long *,long *,bool_t *,bool_t *); int get_full_path (char *,char *,int); int svr_restart(); void restore_attr_default (struct attribute *); /* Global Data Items */ int high_availability_mode = FALSE; char *acct_file = NULL; char *log_file = NULL; char *job_log_file = NULL; char *path_home = PBS_SERVER_HOME; char *path_acct; char path_log[MAXPATHLEN + 1]; char *path_priv = NULL; char *path_arrays; char *path_credentials; char *path_jobs; char *path_queues; char *path_spool; char *path_svrdb = NULL; char *path_svrdb_new; char *path_svrlog; char *path_track; char *path_nodes; char *path_nodes_new; char *path_nodestate; char *path_nodenote; char *path_nodenote_new; char *path_checkpoint; char *path_jobinfo_log; char *ArgV[MAX_CMD_ARGS]; extern char *msg_daemonname; extern char *msg_info_server; /* Server information message */ extern int pbs_errno; char *pbs_o_host = "PBS_O_HOST"; pbs_net_t pbs_mom_addr; unsigned int pbs_mom_port = 0; unsigned int pbs_rm_port; pbs_net_t pbs_scheduler_addr; unsigned int pbs_scheduler_port; extern pbs_net_t pbs_server_addr; unsigned int pbs_server_port_dis; listener_connection listener_conns[MAXLISTENERS]; int queue_rank = 0; int a_opt_init = -1; /* HA global data items */ long HALockCheckTime = 0; long HALockUpdateTime = 0; char HALockFile[MAXPATHLEN+1]; char OriginalPath[MAXPATHLEN+1]; mutex_t EUIDMutex; /* prevents thread from trying to lock the file from a different euid */ int HALockFD; /* END HA global data items */ struct server server; /* the server structure */ char server_host[PBS_MAXHOSTNAME + 1]; /* host_name */ char *mom_host = server_host; int server_init_type = RECOV_WARM; char server_name[PBS_MAXSERVERNAME + 1]; /* host_name[:service|port] */ int svr_delay_entry = 0; int svr_do_schedule = SCH_SCHEDULE_NULL; extern int listener_command; tlist_head svr_queues; /* list of queues */ tlist_head svr_alljobs; /* list of all jobs in server */ tlist_head svr_jobs_array_sum; /* list of jobs in server, arrays summarized as single "placeholder" job */ tlist_head svr_newjobs; /* list of incoming new jobs */ tlist_head svr_newnodes; /* list of newly created nodes */ tlist_head svr_jobarrays; /* list of all job arrays */ tlist_head task_list_immed; tlist_head task_list_timed; tlist_head task_list_event; pid_t sid; time_t time_now = 0; char *plogenv = NULL; int LOGLEVEL = 0; int DEBUGMODE = 0; int TDoBackground = 1; /* background daemon */ int TForceUpdate = 0; /* (boolean) */ char *ProgName; char *NodeSuffix = NULL; int allow_any_mom = FALSE; int array_259_upgrade = FALSE; void DIS_rpp_reset(void) { if (dis_getc != rpp_getc) { dis_getc = rpp_getc; dis_puts = (int (*)(int, const char *, size_t))rpp_write; dis_gets = (int (*)(int, char *, size_t))rpp_read; disr_skip = (int (*)(int, size_t))rpp_skip; disr_commit = rpp_rcommit; disw_commit = rpp_wcommit; } return; } /* END DIS_rpp_reset() */ /** * Read a RPP message from a stream. * * NOTE: Only one kind of message is expected -- Inter Server requests from MOM's. * * @param stream (I) */ void do_rpp( int stream) /* I */ { static char id[] = "do_rpp"; int ret, proto, version; void is_request(int, int, int *); void stream_eof(int, u_long, int); if (LOGLEVEL >= 4) { sprintf(log_buffer, "rpp request received on stream %d", stream); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } DIS_rpp_reset(); proto = disrsi(stream, &ret); if (ret != DIS_SUCCESS) { /* FAILURE */ /* This error case may be associated with IP communication * problems such as may happen with multi-homed servers. */ if (LOGLEVEL >= 1) { struct pbsnode *node; extern tree *streams; /* tree of stream numbers */ node = tfind((u_long)stream, &streams); sprintf(log_buffer, "corrupt rpp request received on stream %d (node: \"%s\", %s) - invalid protocol - rc=%d (%s)", stream, (node != NULL) ? node->nd_name : "NULL", netaddr(rpp_getaddr(stream)), ret, dis_emsg[ret]); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } stream_eof(stream, 0, ret); return; } /* END if (ret != DIS_SUCCESS) */ version = disrsi(stream, &ret); if (ret != DIS_SUCCESS) { if (LOGLEVEL >= 1) { sprintf(log_buffer, "corrupt rpp request received on stream %d - invalid version - rc=%d (%s)", stream, ret, dis_emsg[ret]); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } stream_eof(stream, 0, ret); return; } switch (proto) { case IS_PROTOCOL: if (LOGLEVEL >= 6) { sprintf(log_buffer, "inter-server request received"); log_record( PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, id, log_buffer); } is_request(stream, version, NULL); break; default: if (LOGLEVEL >= 6) { sprintf(log_buffer, "unknown request protocol received (%d)\n", proto); log_err(errno, id, log_buffer); } rpp_close(stream); break; } /* END switch(proto) */ return; } /* END do_rpp() */ void rpp_request( int fd) /* not used */ { static char id[] = "rpp_request"; int stream; for (;;) { if ((stream = rpp_poll()) == -1) { log_err(errno, id, "rpp_poll"); break; } if (stream == -2) break; do_rpp(stream); } return; } /* END rpp_request() */ void clear_listeners(void) /* I */ { int i; for (i = 0;i < MAXLISTENERS; i++) { listener_conns[i].address = 0; listener_conns[i].port = 0; listener_conns[i].sock = -1; listener_conns[i].first_time = 1; } return; } /* END clear_listeners */ int add_listener( pbs_net_t l_addr, /* I */ unsigned int l_port) /* I */ { int i; for (i = 0;i < MAXLISTENERS; i++) { if (listener_conns[i].address == 0) { listener_conns[i].address = l_addr; listener_conns[i].port = l_port; listener_conns[i].sock = -1; return (0); } } return (-1); } /* END add_listener */ int PBSShowUsage( char *EMsg) /* I (optional) */ { fprintf(stderr, "Usage: %s\n", ProgName); fprintf(stderr, " -A <INT> \\\\ Alarm Time\n"); fprintf(stderr, " -a <BOOL> \\\\ Scheduling\n"); fprintf(stderr, " -d <PATH> \\\\ Homedir\n"); fprintf(stderr, " -D \\\\ Debugmode\n"); fprintf(stderr, " -f \\\\ Force Overwrite Serverdb\n"); fprintf(stderr, " -h \\\\ Print Usage\n"); fprintf(stderr, " -H <HOST> \\\\ Daemon Hostname\n"); fprintf(stderr, " -L <PATH> \\\\ Logfile\n"); fprintf(stderr, " -l <PORT> \\\\ Listener Port\n"); fprintf(stderr, " -M <PORT> \\\\ MOM Port\n"); fprintf(stderr, " -p <PORT> \\\\ Server Port\n"); fprintf(stderr, " -R <PORT> \\\\ RM Port\n"); fprintf(stderr, " -S <PORT> \\\\ Scheduler Port\n"); fprintf(stderr, " -t <TYPE> \\\\ Startup Type (hot, warm, cold, create)\n"); fprintf(stderr, " -v \\\\ Version\n"); fprintf(stderr, " --ha \\\\ High Availability MODE\n"); fprintf(stderr, " --help \\\\ Print Usage\n"); fprintf(stderr, " --version \\\\ Version\n"); if (EMsg != NULL) { fprintf(stderr, " %s\n", EMsg); } return(0); } /* END PBSShowUsage() */ /** * parse_command_line * * parse the parameters from the command line */ void parse_command_line(int argc, char *argv[]) { extern int optind; extern char *optarg; char *pc = NULL; int c; int i; char EMsg[1024]; char *servicename = NULL; pbs_net_t def_pbs_server_addr; pbs_net_t listener_addr; unsigned int listener_port; static struct { char *it_name; int it_type; } init_name_type[] = { { "hot", RECOV_HOT }, { "warm", RECOV_WARM }, { "cold", RECOV_COLD }, { "create", RECOV_CREATE }, { "", RECOV_Invalid } }; while ((c = getopt(argc, argv, "A:a:d:DefhH:L:l:mM:p:R:S:t:uv-:")) != -1) { switch (c) { case '-': if ((optarg == NULL) || (optarg[0] == '\0')) { PBSShowUsage("empty command line arg"); exit(1); } if (!strcmp(optarg, "version")) { fprintf(stderr, "version: %s\n", PACKAGE_VERSION); exit(0); } if (!strcmp(optarg, "about")) { printf("package: %s\n", PACKAGE_STRING); printf("sourcedir: %s\n", PBS_SOURCE_DIR); printf("configure: %s\n", PBS_CONFIG_ARGS); printf("buildcflags: %s\n", PBS_CFLAGS); printf("buildhost: %s\n", PBS_BUILD_HOST); printf("builddate: %s\n", PBS_BUILD_DATE); printf("builddir: %s\n", PBS_BUILD_DIR); printf("builduser: %s\n", PBS_BUILD_USER); printf("installdir: %s\n", PBS_INSTALL_DIR); printf("serverhome: %s\n", PBS_SERVER_HOME); printf("version: %s\n", PACKAGE_VERSION); exit(0); } if (!strcmp(optarg, "help")) { PBSShowUsage(NULL); exit(0); } if (!strcasecmp(optarg, "ha")) /* High Availability */ { high_availability_mode = TRUE; break; } PBSShowUsage("invalid command line arg"); exit(1); /*NOTREACHED*/ break; case 'a': if (decode_b( &server.sv_attr[(int)SRV_ATR_scheduling], NULL, NULL, optarg) != 0) { (void)fprintf(stderr, "%s: bad -a option\n", argv[0]); exit(1); } a_opt_init = server.sv_attr[(int)SRV_ATR_scheduling].at_val.at_long; break; case 'd': path_home = optarg; break; case 'D': TDoBackground = 0; break; case 'e': allow_any_mom = TRUE; break; case 'f': TForceUpdate = 1; break; case 'h': PBSShowUsage(NULL); exit(0); break; case 'H': /* overwrite locally detected hostname with specified hostname */ /* (used for multi-homed hosts) */ strncpy(server_host, optarg, PBS_MAXHOSTNAME); if (get_fullhostname(server_host, server_host, PBS_MAXHOSTNAME, EMsg) == -1) { /* FAILURE */ if (EMsg[0] != '\0') { char tmpLine[1024]; snprintf(tmpLine, sizeof(tmpLine), "unable to determine full hostname for specified server host '%s' - %s", server_host, EMsg); log_err(-1, "pbsd_main", tmpLine); } else { log_err(-1, "pbsd_main", "unable to determine full server hostname"); } exit(1); } strcpy(server_name, server_host); def_pbs_server_addr = pbs_server_addr; pbs_server_addr = get_hostaddr(server_host); if (pbs_mom_addr == def_pbs_server_addr) pbs_mom_addr = pbs_server_addr; if (pbs_scheduler_addr == def_pbs_server_addr) pbs_scheduler_addr = pbs_server_addr; if ((servicename != NULL) && (strlen(servicename) > 0)) { if (strlen(server_name) + strlen(servicename) + 1 > (size_t)PBS_MAXSERVERNAME) { fprintf(stderr, "%s: -h host too long\n", argv[0]); exit(1); } strcat(server_name, ":"); strcat(server_name, servicename); if ((pbs_server_port_dis = atoi(servicename)) == 0) { fprintf(stderr, "%s: host: %s, port: %s, max: %i\n", argv[0], server_name, servicename, PBS_MAXSERVERNAME); fprintf(stderr, "%s: -h host invalid\n", argv[0]); exit(1); } } /* END if (strlen(servicename) > 0) */ break; case 'l': clear_listeners(); if (get_port( optarg, &listener_port, &listener_addr)) { fprintf(stderr, "%s: bad -l %s\n", argv[0], optarg); exit(1); } if (add_listener(listener_addr, listener_port) < 0) { fprintf(stderr, "%s: failed to add listener %s\n", argv[0], optarg); exit(1); } break; case 'p': servicename = optarg; if (strlen(server_name) + strlen(servicename) + 1 > (size_t)PBS_MAXSERVERNAME) { fprintf(stderr, "%s: -p host:port too long\n", argv[0]); exit(1); } strcat(server_name, ":"); strcat(server_name, servicename); if ((pbs_server_port_dis = atoi(servicename)) == 0) { fprintf(stderr, "%s: -p host:port invalid\n", argv[0]); exit(1); } break; case 't': for (i = RECOV_HOT;i < RECOV_Invalid;i++) { if (strcmp(optarg, init_name_type[i].it_name) == 0) { server_init_type = init_name_type[i].it_type; break; } } /* END for (i) */ if (i == RECOV_Invalid) { fprintf(stderr, "%s -t bad recovery type\n", argv[0]); exit(1); } break; case 'A': acct_file = optarg; break; case 'L': log_file = optarg; break; case 'M': if (get_port(optarg, &pbs_mom_port, &pbs_mom_addr)) { fprintf(stderr, "%s: bad -M %s\n", argv[0], optarg); exit(1); } if (isalpha((int)*optarg)) { if ((pc = strchr(optarg, (int)':')) != NULL) * pc = '\0'; mom_host = optarg; } break; case 'R': if ((pbs_rm_port = atoi(optarg)) == 0) { fprintf(stderr, "%s: bad -R %s\n", argv[0], optarg); exit(1); } break; case 'S': /* FORMAT: ??? */ if (get_port( optarg, &pbs_scheduler_port, &pbs_scheduler_addr)) { fprintf(stderr, "%s: bad -S %s\n", argv[0], optarg); exit(1); } break; case 'u': array_259_upgrade = TRUE; break; case 'v': fprintf(stderr, "version: %s\n", PACKAGE_VERSION); exit(0); break; default: PBSShowUsage("invalid command line arg"); exit(1); break; } /* END switch (c) */ } /* END while (c) */ if (optind < argc) { fprintf(stderr, "%s: invalid operand\n", argv[0]); exit(1); } } /* * next_task - look for the next work task to perform: * 1. If svr_delay_entry is set, then a delayed task is ready so * find and process it. * 2. All items on the immediate list, then * 3. All items on the timed task list which have expired times * * Returns: amount of time till next task */ static time_t next_task() { time_t delay; struct work_task *nxt; struct work_task *ptask; time_t tilwhen = server.sv_attr[(int)SRV_ATR_scheduler_iteration].at_val.at_long; time_now = time((time_t *)0); if (svr_delay_entry) { ptask = (struct work_task *)GET_NEXT(task_list_event); while (ptask != NULL) { nxt = (struct work_task *)GET_NEXT(ptask->wt_linkall); if (ptask->wt_type == WORK_Deferred_Cmp) dispatch_task(ptask); ptask = nxt; } svr_delay_entry = 0; } while ((ptask = (struct work_task *)GET_NEXT(task_list_immed)) != NULL) dispatch_task(ptask); while ((ptask = (struct work_task *)GET_NEXT(task_list_timed)) != NULL) { if ((delay = ptask->wt_event - time_now) > 0) { if (tilwhen > delay) tilwhen = delay; break; } else { dispatch_task(ptask); /* will delete link */ } } /* should the scheduler be run? If so, adjust the delay time */ if ((delay = server.sv_next_schedule - time_now) <= 0) { svr_do_schedule = SCH_SCHEDULE_TIME; listener_command = SCH_SCHEDULE_TIME; } else if (delay < tilwhen) tilwhen = delay; return(tilwhen); } /* END next_task() */ /* * start_hot_jobs - place any job which is state QUEUED and has the * HOT start flag set into execution. * * Returns the number of jobs to be hot started. */ static int start_hot_jobs(void) { int ct = 0; job *pjob; for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { if ((pjob->ji_qs.ji_substate == JOB_SUBSTATE_QUEUED) && (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HOTSTART)) { log_event( PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, "attempting to hot start job"); svr_startjob(pjob, NULL, NULL, NULL); ct++; } } return(ct); } /* END start_hot_jobs() */ void main_loop(void) { int c; long *state; time_t waittime; pbs_queue *pque; job *pjob; time_t last_jobstat_time; int when; void ping_nodes(struct work_task *); void check_nodes(struct work_task *); void check_log(struct work_task *); void check_job_log(struct work_task *); void check_acct_log(struct work_task *); extern char *msg_startup2; /* log message */ last_jobstat_time = time_now; server.sv_started = time(&time_now); /* time server started */ /* record the fact that we are up and running */ sprintf(log_buffer, msg_startup2, sid, LOGLEVEL); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); #ifdef NO_SIGCHLD log_record( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, "Server NOT using signal handler for SIGCHLD"); #endif /* do not check nodes immediately as they will initially be marked down unless they have already reported in */ when = server.sv_attr[(int)SRV_ATR_check_rate].at_val.at_long + time_now; if (svr_totnodes > 1024) { /* for large systems, give newly reported nodes more time before being marked down while pbs_moms are intialy reporting in */ set_task(WORK_Timed, when + svr_totnodes / 12, check_nodes, NULL); } else { set_task(WORK_Timed, when, check_nodes, NULL); } /* Just check the nodes with check_nodes above and don't ping anymore. */ set_task(WORK_Timed, time_now + 5, ping_nodes, NULL); set_task(WORK_Timed, time_now + 5, check_log, NULL); set_task(WORK_Timed,time_now + 10,check_acct_log,NULL); /* * Now at last, we are ready to do some batch work. The * following section constitutes the "main" loop of the server */ state = &server.sv_attr[(int)SRV_ATR_State].at_val.at_long; DIS_tcp_settimeout(server.sv_attr[(int)SRV_ATR_tcp_timeout].at_val.at_long); if (server_init_type == RECOV_HOT) *state = SV_STATE_HOT; else *state = SV_STATE_RUN; DBPRT(("pbs_server is up\n")); while (*state != SV_STATE_DOWN) { /* first process any task whose time delay has expired */ if (server.sv_attr[(int)SRV_ATR_PollJobs].at_val.at_long) waittime = MIN(next_task(), JobStatRate - (time_now - last_jobstat_time)); else waittime = next_task(); waittime = MAX(1, waittime); if (*state == SV_STATE_RUN) { /* In normal Run state */ /* if time or event says to run scheduler, do it */ if ((svr_do_schedule != SCH_SCHEDULE_NULL) && server.sv_attr[(int)SRV_ATR_scheduling].at_val.at_long) { server.sv_next_schedule = time_now + server.sv_attr[(int)SRV_ATR_scheduler_iteration].at_val.at_long; schedule_jobs(); notify_listeners(); } } else if (*state == SV_STATE_HOT) { /* Are there HOT jobs to rerun */ /* only try every _CYCLE seconds */ c = 0; if (time_now > server.sv_hotcycle + SVR_HOT_CYCLE) { server.sv_hotcycle = time_now + SVR_HOT_CYCLE; c = start_hot_jobs(); } /* If more than _LIMIT seconds since start, stop */ if ((c == 0) || (time_now > server.sv_started + SVR_HOT_LIMIT)) { server_init_type = RECOV_WARM; *state = SV_STATE_RUN; } } /* any jobs to route today */ for (pque = (pbs_queue *)GET_NEXT(svr_queues); pque != NULL; pque = (pbs_queue *)GET_NEXT(pque->qu_link)) { if (pque->qu_qs.qu_type == QTYPE_RoutePush) queue_route(pque); } #ifdef NO_SIGCHLD /* if we have any jobs, check if any child jobs have completed */ if (server.sv_qs.sv_numjobs > 0) { check_children(); } #endif /* touch the rpp streams that need to send */ rpp_request(0); /* wait for a request and process it */ if (wait_request(waittime, state) != 0) { log_err(-1, msg_daemonname, "wait_request failed"); } /* qmgr can dynamically set the loglevel specification * we use the new value if PBSLOGLEVEL was not specified */ if (plogenv == NULL) /* If no specification of loglevel from env */ LOGLEVEL = server.sv_attr[(int)SRV_ATR_LogLevel].at_val.at_long; /* any running jobs need a status update? */ if (server.sv_attr[(int)SRV_ATR_PollJobs].at_val.at_long && (last_jobstat_time + JobStatRate <= time_now)) { struct work_task *ptask; for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { if (pjob->ji_qs.ji_substate == JOB_SUBSTATE_RUNNING) { /* spread these out over the next JobStatRate seconds */ when = pjob->ji_wattr[(int)JOB_ATR_qrank].at_val.at_long % JobStatRate; ptask = set_task(WORK_Timed, when + time_now, poll_job_task, pjob); if (ptask != NULL) { append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask); } } } last_jobstat_time = time_now; } /* END if (should poll jobs now) */ if (*state == SV_STATE_SHUTSIG) svr_shutdown(SHUT_SIG); /* caught sig */ /* * if in process of shuting down and all running jobs * and all children are done, change state to DOWN */ if ((*state > SV_STATE_RUN) && (server.sv_jobstates[JOB_STATE_RUNNING] == 0) && (server.sv_jobstates[JOB_STATE_EXITING] == 0) && (GET_NEXT(task_list_event) == NULL)) { *state = SV_STATE_DOWN; } } /* END while (*state != SV_STATE_DOWN) */ svr_save(&server, SVR_SAVE_FULL); /* final recording of server */ track_save(NULL); /* save tracking data */ /* save any jobs that need saving */ for (pjob = (job *)GET_NEXT(svr_alljobs); pjob != NULL; pjob = (job *)GET_NEXT(pjob->ji_alljobs)) { if (pjob->ji_modified) job_save(pjob, SAVEJOB_FULL); } if (svr_chngNodesfile) { /*nodes created/deleted, or props changed and*/ /*update in req_manager failed; try again */ update_nodes_file(); } } /** * initialize_globals * * Set the intial state of global variables. */ void initialize_globals(void) { strcpy(pbs_current_user, "PBS_Server"); msg_daemonname = strdup(pbs_current_user); } /** * set_globals_from_environment * * Set the intial state of global variables based on * the program environment variables. */ void set_globals_from_environment(void) { char *ptr; /* initialize service port numbers for self, Scheduler, and MOM */ if ((ptr = getenv("PBS_BATCH_SERVICE_PORT")) != NULL) { pbs_server_port_dis = (int)strtol(ptr, NULL, 10); } if ((ptr = getenv("PBS_SCHEDULER_SERVICE_PORT")) != NULL) { pbs_scheduler_port = (int)strtol(ptr, NULL, 10); } if ((ptr = getenv("PBS_MOM_SERVICE_PORT")) != NULL) { pbs_mom_port = (int)strtol(ptr, NULL, 10); } if ((ptr = getenv("PBS_MANAGER_SERVICE_PORT")) != NULL) { pbs_rm_port = (int)strtol(ptr, NULL, 10); } if ((plogenv = getenv("PBSLOGLEVEL")) != NULL) { /* Note the plogenv is global and is tested in main_loop */ LOGLEVEL = (int)strtol(plogenv, NULL, 10); } if ((ptr = getenv("PBSDEBUG")) != NULL) { DEBUGMODE = 1; TDoBackground = 0; } return; } /* END set_globals_from_environment() */ /* * main - the initialization and main loop of pbs_daemon */ int main( int argc, /* I */ char *argv[]) /* I */ { int i; int lockfds = -1; int rppfd; /* fd to receive is HELLO's */ int privfd; /* fd to send is messages */ uint tryport; char lockfile[MAXPATHLEN + 1]; char *pc = NULL; char *pathPtr = NULL; char EMsg[MAX_LINE]; char tmpLine[MAX_LINE]; extern char *msg_svrdown; /* log message */ extern char *msg_startup1; /* log message */ ProgName = argv[0]; memset(&server, 0, sizeof(struct server)); initialize_globals(); time_now = time((time_t *)0); set_globals_from_environment(); /* set standard umask */ umask(022); /* save argv and the path for later use */ for (i = 0;i < argc;i++) { ArgV[i] = (char *)malloc(sizeof(char) * (strlen(argv[i])+1)); if (ArgV[i] == NULL) { printf("ERROR: failed to allocate memory to save argv, shutting down\n"); exit(-1); } strcpy(ArgV[i],argv[i]); } /* save the path before we go into the background. If we don't do this * we can't restart the server because the path will change */ pathPtr = getenv("PATH"); snprintf(OriginalPath,sizeof(OriginalPath),"%s",pathPtr); /* close files for security purposes */ /* do the following before getuid() and geteuid() can trigger nss_ldap into opening a socket to the LDAP server */ i = sysconf(_SC_OPEN_MAX); while (--i > 2) close(i); /* close any file desc left open by parent */ /* find out the name of this machine (hostname) */ EMsg[0] = '\0'; if ((gethostname(server_host, PBS_MAXHOSTNAME) == -1) || (get_fullhostname(server_host, server_host, PBS_MAXHOSTNAME, EMsg) == -1)) { snprintf(tmpLine, sizeof(tmpLine), "unable to determine local server hostname %c %s", EMsg[0] ? '-' : ' ', EMsg); log_err(-1, "pbsd_main", tmpLine); exit(1); /* FAILURE - shutdown */ } strcpy(server_name, server_host); /* by default server = host */ pbs_server_addr = get_hostaddr(server_host); pbs_mom_addr = pbs_server_addr; /* assume on same host */ pbs_scheduler_addr = pbs_server_addr; /* assume on same host */ /* The following port numbers might have been initialized in set_globals_from_environment() above. */ if (pbs_server_port_dis <= 0) pbs_server_port_dis = get_svrport(PBS_BATCH_SERVICE_NAME, "tcp", PBS_BATCH_SERVICE_PORT_DIS); if (pbs_scheduler_port <= 0) pbs_scheduler_port = get_svrport(PBS_SCHEDULER_SERVICE_NAME, "tcp", PBS_SCHEDULER_SERVICE_PORT); if (pbs_mom_port <= 0) pbs_mom_port = get_svrport(PBS_MOM_SERVICE_NAME, "tcp", PBS_MOM_SERVICE_PORT); if (pbs_rm_port <= 0) pbs_rm_port = get_svrport(PBS_MANAGER_SERVICE_NAME, "tcp", PBS_MANAGER_SERVICE_PORT); parse_command_line(argc, argv); /* if we are not running with real and effective uid of 0, forget it */ if (IamRoot() == 0) { return(1); } /* * Read in server attributes so they are available to be used * Attributes will not be read in on a pbs_server -t create */ if (get_svr_attr(server_init_type) == -1) { fprintf(stderr,"%s: failed to get server attributes\n", ProgName); return(1); } /* * make sure no other server is running with this home directory. * If server lockfile attribute has been set use it. * If not use default location for it */ if ((server.sv_attr[(int)SRV_ATR_lockfile].at_flags & ATR_VFLAG_SET) && (server.sv_attr[(int)SRV_ATR_lockfile].at_val.at_str)) { char *LockfilePtr = server.sv_attr[(int)SRV_ATR_lockfile].at_val.at_str; /* check if an absolute path is specified or not */ if (LockfilePtr[0] == '/') { snprintf(lockfile,sizeof(lockfile),"%s",LockfilePtr); } else { snprintf(lockfile,sizeof(lockfile),"%s/%s/%s", path_home, PBS_SVR_PRIVATE, LockfilePtr); } } else { sprintf(lockfile,"%s/%s/server.lock", path_home, PBS_SVR_PRIVATE); } #ifndef USE_HA_THREADS if ((lockfds = open(lockfile, O_CREAT | O_TRUNC | O_WRONLY, 0600)) < 0) { sprintf(log_buffer, "%s: unable to open lock file '%s'", msg_daemonname, lockfile); fprintf(stderr, "%s\n", log_buffer); log_err(errno, msg_daemonname, log_buffer); exit(2); } #endif /* !USE_HA_THREADS */ /* HA EVENTS MUST HAPPEN HERE */ strcpy(HALockFile,lockfile); HALockCheckTime = server.sv_attr[(int)SRV_ATR_LockfileCheckTime].at_val.at_long; HALockUpdateTime = server.sv_attr[(int)SRV_ATR_LockfileUpdateTime].at_val.at_long; /* apply HA defaults */ if (HALockCheckTime == 0) HALockCheckTime = PBS_LOCKFILE_CHECK_TIME; if (HALockUpdateTime == 0) HALockUpdateTime = PBS_LOCKFILE_UPDATE_TIME; if ((pc = getenv("PBSDEBUG")) != NULL) { DEBUGMODE = 1; TDoBackground = 0; } #ifdef DISABLE_DAEMONS TDoBackground = 0; #endif /* handle running in the background or not if we're debugging */ if(high_availability_mode) { if (daemonize_server(TDoBackground,&sid) == FAILURE) { exit(2); } } #ifdef OS_LOSES_FD_OVER_FORK /* NOTE: file descriptors may be lost across forks in SLES 10 SP1 */ #ifndef USE_HA_THREADS close(lockfds); if ((lockfds = open(lockfile, O_CREAT | O_TRUNC | O_WRONLY, 0600)) < 0) { sprintf(log_buffer, "%s: unable to open lock file '%s'", msg_daemonname, lockfile); fprintf(stderr, "%s\n",log_buffer); log_err(errno, msg_daemonname, log_buffer); exit(2); } #endif /* !USE_HA_THREADS */ /* no file descriptor was held if we're using ha threads */ #endif /* OS_LOSES_FD_OVER_FORK */ #ifdef USE_HA_THREADS if (high_availability_mode) { lock_out_ha(); } else { if ((lockfds = open(lockfile,O_CREAT|O_TRUNC|O_WRONLY,0600)) < 0) { sprintf(log_buffer,"%s: unable to open lock file '%s'", msg_daemonname, lockfile); fprintf(stderr,"%s\n",log_buffer); log_err(errno,msg_daemonname,log_buffer); exit(2); } } #else if (high_availability_mode) { /* This will allow multiple instance of the pbs_server to be * running. This must be done before setting up the client * sockets interface, reading the config file, and contacting * the compute nodes. */ while (try_lock_out(lockfds, F_WRLCK)) sleep(TSERVER_HA_CHECK_TIME); /* Relinquish */ } /* END if (high_availability_mode) */ else { lock_out(lockfds, F_WRLCK); } #endif /* * Open the log file so we can start recording events * * set log_event_mask to point to the log_event attribute value so * it controls which events are logged. */ log_event_mask = &server.sv_attr[SRV_ATR_log_events].at_val.at_long; sprintf(path_log, "%s/%s", path_home, PBS_LOGFILES); log_open(log_file, path_log); sprintf(log_buffer, msg_startup1, server_name, server_init_type); log_event( PBSEVENT_SYSTEM | PBSEVENT_ADMIN | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); /* initialize the server objects and perform specified recovery */ /* will be left in the server's private directory */ /* NOTE: env cleared in pbsd_init() */ if (pbsd_init(server_init_type) != 0) { log_err(-1, msg_daemonname, "pbsd_init failed"); exit(3); } /* initialize the network interface */ sprintf(log_buffer, "Using ports Server:%d Scheduler:%d MOM:%d (server: '%s')", pbs_server_port_dis, pbs_scheduler_port, pbs_mom_port, server_host); log_event( PBSEVENT_SYSTEM | PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); if (init_network(pbs_server_port_dis, process_request) != 0) { perror("pbs_server: network"); log_err(-1, msg_daemonname, "init_network failed dis"); exit(3); } if (init_network(0, process_request) != 0) { perror("pbs_server: unix domain socket"); log_err(-1, msg_daemonname, "init_network failed unix domain socket"); exit(3); } /* handle running in the background or not if we're debugging */ if(!high_availability_mode) { if (daemonize_server(TDoBackground,&sid) == FAILURE) { exit(2); } } sprintf(log_buffer, "%ld\n", (long)sid); if (!high_availability_mode) { if (write(lockfds, log_buffer, strlen(log_buffer)) != (ssize_t)strlen(log_buffer)) { log_err(errno, msg_daemonname, "failed to write pid to lockfile"); exit(-1); } } #if (PLOCK_DAEMONS & 1) plock(PROCLOCK); #endif if ((rppfd = rpp_bind(pbs_server_port_dis)) == -1) { log_err(errno, msg_daemonname, "rpp_bind"); exit(1); } rpp_fd = -1; /* force rpp_bind() to get another socket */ tryport = IPPORT_RESERVED; privfd = -1; while (--tryport > 0) { if ((privfd = rpp_bind(tryport)) != -1) break; if ((errno != EADDRINUSE) && (errno != EADDRNOTAVAIL)) break; } if (privfd == -1) { log_err(errno, msg_daemonname, "no privileged ports"); exit(1); } if (LOGLEVEL >= 5) { log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, "creating rpp and private interfaces"); } add_conn(rppfd, Primary, (pbs_net_t)0, 0, PBS_SOCK_INET, rpp_request); add_conn(privfd, Primary, (pbs_net_t)0, 0, PBS_SOCK_INET, rpp_request); /*==========*/ main_loop(); /*==========*/ RPPConfigure(1, 0); /* help rpp_shutdown go a bit faster */ rpp_shutdown(); shutdown_ack(); net_close(-1); /* close all network connections */ #ifdef ENABLE_UNIX_SOCKETS unlink(TSOCK_PATH); #endif /* END ENABLE_UNIX_SOCKETS */ log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, msg_svrdown); acct_close(); log_close(1); job_log_close(1); exit(0); } /* END main() */ void check_job_log( struct work_task *ptask) /* I */ { long depth = 1; /* remove logs older than LogKeepDays */ if ((server.sv_attr[(int)SRV_ATR_JobLogKeepDays].at_flags & ATR_VFLAG_SET) != 0) { snprintf(log_buffer,sizeof(log_buffer),"checking for old job logs in dir '%s' (older than %ld days)", path_svrlog, server.sv_attr[(int)SRV_ATR_JobLogKeepDays].at_val.at_long); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); if (log_remove_old(path_jobinfo_log,server.sv_attr[(int)SRV_ATR_JobLogKeepDays].at_val.at_long * SECS_PER_DAY) != 0) { log_err(-1,"check_job_log","failure occurred when checking for old job logs"); } } if ((server.sv_attr[(int)SRV_ATR_JobLogFileMaxSize].at_flags & ATR_VFLAG_SET) != 0) { if ((job_log_size() >= server.sv_attr[(int)SRV_ATR_JobLogFileMaxSize].at_val.at_long) && (server.sv_attr[(int)SRV_ATR_JobLogFileMaxSize].at_val.at_long > 0)) { log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, "Rolling job log file"); if ((server.sv_attr[(int)SRV_ATR_JobLogFileRollDepth].at_flags & ATR_VFLAG_SET) != 0) { depth = server.sv_attr[(int)SRV_ATR_JobLogFileRollDepth].at_val.at_long; } if ((depth >= INT_MAX) || (depth < 1)) { log_err(-1, "check_job_log", "job log roll cancelled, logfile depth is out of range"); } else { job_log_roll(depth); } } } /* periodically record the version and loglevel */ sprintf(log_buffer, msg_info_server, server.sv_attr[(int)SRV_ATR_version].at_val.at_str, LOGLEVEL); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); set_task(WORK_Timed, time_now + PBS_LOG_CHECK_RATE, check_job_log, NULL); return; } /* END check_job_log */ void check_log( struct work_task *ptask) /* I */ { long depth = 1; /* remove logs older than LogKeepDays */ if ((server.sv_attr[(int)SRV_ATR_LogKeepDays].at_flags & ATR_VFLAG_SET) != 0) { snprintf(log_buffer,sizeof(log_buffer),"checking for old pbs_server logs in dir '%s' (older than %ld days)", path_svrlog, server.sv_attr[(int)SRV_ATR_LogKeepDays].at_val.at_long); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); if (log_remove_old(path_svrlog,server.sv_attr[(int)SRV_ATR_LogKeepDays].at_val.at_long * SECS_PER_DAY) != 0) { log_err(-1,"check_log","failure occurred when checking for old pbs_server logs"); } } if ((server.sv_attr[(int)SRV_ATR_LogFileMaxSize].at_flags & ATR_VFLAG_SET) != 0) { if ((log_size() >= server.sv_attr[(int)SRV_ATR_LogFileMaxSize].at_val.at_long) && (server.sv_attr[(int)SRV_ATR_LogFileMaxSize].at_val.at_long > 0)) { log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, "Rolling log file"); if ((server.sv_attr[(int)SRV_ATR_LogFileRollDepth].at_flags & ATR_VFLAG_SET) != 0) { depth = server.sv_attr[(int)SRV_ATR_LogFileRollDepth].at_val.at_long; } if ((depth >= INT_MAX) || (depth < 1)) { log_err(-1, "check_log", "log roll cancelled, logfile depth is out of range"); } else { log_roll(depth); } } } /* periodically record the version and loglevel */ sprintf(log_buffer, msg_info_server, server.sv_attr[(int)SRV_ATR_version].at_val.at_str, LOGLEVEL); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); set_task(WORK_Timed, time_now + PBS_LOG_CHECK_RATE, check_log, NULL); return; } /* END check_log */ void check_acct_log( struct work_task *ptask) /* I */ { if (((server.sv_attr[(int)SRV_ATR_AcctKeepDays].at_flags & ATR_VFLAG_SET) != 0) && (server.sv_attr[(int)SRV_ATR_AcctKeepDays].at_val.at_long >= 0)) { sprintf(log_buffer,"Checking accounting files - keep days = %ld", server.sv_attr[(int)SRV_ATR_AcctKeepDays].at_val.at_long); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buffer); acct_cleanup(server.sv_attr[(int)SRV_ATR_AcctKeepDays].at_val.at_long); } set_task(WORK_Timed,time_now + PBS_ACCT_CHECK_RATE,check_acct_log,NULL); return; } /* END check_acct_log */ /* * get_port - parse host:port for -M and -S and -l option * Returns into *port and *addr if and only if that part is specified * Both port and addr are returned in HOST byte order. * Function return: 0=ok, -1=error */ static int get_port( char *arg, /* "host", "port", ":port", or "host:port" */ unsigned int *port, /* RETURN: new port iff one given */ pbs_net_t *addr) /* RETURN: daemon's address iff host given */ { char *name; if (*arg == ':') ++arg; if (isdigit(*arg)) { /* port only specified */ *port = (unsigned int)atoi(arg); } else { name = parse_servername(arg, port); if (name == NULL) { /* FAILURE */ return(-1); } *addr = get_hostaddr(name); } if ((*port <= 0) || (*addr == 0)) { /* FAILURE */ return(-1); } return(0); } /* END get_port() */ /** * This function will extract the directory portion of * the given path and copy it into the Dir parameter * * @param FullPath (I) * @param Dir (O) * @param DirSize */ char *extract_dir( char *FullPath, char *Dir, int DirSize) { char *ptr; if ((FullPath == NULL) || (Dir == NULL)) { return(NULL); } Dir[0] = '\0'; snprintf(Dir, DirSize,"%s",FullPath); ptr = strrchr(Dir,'/'); if (ptr != NULL) { *ptr = '\0'; } return(Dir); } /* END extract_dir() */ /** * Checks to see if HALockFile is defined, exists, and can be properly * opened, etc. * * @return TRUE if HALockFile is valid for use */ int is_ha_lock_file_valid( char *lockfile) { char LockDir[MAX_PATH_LEN]; char ErrorString[MAX_LINE]; char id[] = "is_ha_lock_file_valid"; struct stat Stat; bool_t GoodPermissions = FALSE; if (lockfile[0] == '\0') { return(FALSE); } extract_dir(lockfile, LockDir, sizeof(LockDir)); if (stat(LockDir,&Stat) != 0) { char *tmpLine; /* stat failed */ tmpLine = strerror(errno); snprintf(tmpLine,sizeof(tmpLine),"could not stat the lockfile dir '%s': %s", LockDir, ErrorString); log_err(errno,id,tmpLine); return(FALSE); } /* directory must be owned by the TORQUE user and must have * read/write/exec permissions */ if ((Stat.st_uid == getuid()) && (Stat.st_mode & (S_IRUSR | S_IWUSR | S_IXUSR))) { /* we can write to this directory */ GoodPermissions = TRUE; } else if ((Stat.st_gid == getgid()) && (Stat.st_mode & (S_IRGRP | S_IWGRP | S_IXGRP))) { GoodPermissions = TRUE; } else if (Stat.st_mode & (S_IROTH | S_IWOTH | S_IXOTH)) { GoodPermissions = TRUE; } if (GoodPermissions == FALSE) { log_err(-1,id,"could not obtain the needed permissions for the lock file"); } return(GoodPermissions); } /* END is_ha_lock_file_valid() */ /** * Try to release a lock on the given file * * @param LockFile (I) Name of the file to unlock * @param LockFD (I) file descriptor to unlock [modified] * * @return SUCCESS if the lock is released, FAILURE otherwise. */ int release_file_lock( char *Lockfile, int *LockFD) { struct flock flock; int fds; if ((Lockfile == NULL) || (LockFD == NULL)) { return(FAILURE); } if (ISEMPTYSTR(Lockfile)) { return(FAILURE); } if (*LockFD > 0) { fds = *LockFD; } else { fds = open(Lockfile,O_CREAT|O_TRUNC|O_WRONLY,0600); } if (fds < 0) { /* could not open lock file */ return(FAILURE); } flock.l_type = F_UNLCK; flock.l_whence = SEEK_SET; flock.l_start = 0; flock.l_len = 0; if (fcntl(fds,F_SETLK,&flock) != 0) { close(fds); return(FAILURE); } close(fds); *LockFD = 0; return(SUCCESS); } /* END release_file_lock() */ /** * Try to acquire a lock on the given file * * @param LockFile (I) the name of the file to lock. * @param LockFD (I/O) File descriptor for the lock file. * @param FileType (I) For logging (type of file lock) * * @return SUCCESS if the lock is acquired, FAILURE otherwise */ int acquire_file_lock( char *LockFile, int *LockFD, char *FileType) { struct flock flock; int fds; char id[] = "acquire_file_lock"; if ((LockFile == NULL) || (LockFD == NULL) || (FileType == NULL)) { return(FAILURE); } if (LockFile[0] == '\0') { sprintf(log_buffer,"ALERT: empty %s lock filename\n", FileType); log_err(-1,id,log_buffer); return(FAILURE); } fds = open(LockFile,O_CREAT|O_RDWR,0600); if (fds < 0) { /* could not open lock file */ sprintf(log_buffer,"ALERT: could not open %s lock file '%s' (errno: %d:%s)\n", FileType, LockFile, errno, strerror(errno)); log_err(errno,id,log_buffer); return(FAILURE); } flock.l_type = F_WRLCK; flock.l_whence = SEEK_SET; flock.l_start = 0; flock.l_len = 0; flock.l_pid = getpid(); if (fcntl(fds,F_SETLK,&flock) != 0) { close(fds); sprintf(log_buffer,"ALERT could not create lock on file '%s' (errno: %d:%s)\n", LockFile, errno, strerror(errno)); log_err(errno,id,log_buffer); return(FAILURE); } /* don't close file; closing would lose the lock */ *LockFD = fds; return(SUCCESS); } /* END acquire_file_lock() */ /** *"Touch" the lockfile so that if locks are failing * other processes can see we still have possession * of the file. Also, we need to check that we can still * acccess the file. * * @param Arg (I) * * @return FAILURE if we don't have possession of the lock file anymore */ void *update_ha_lock_thread( void *Arg) /* I */ { char EMsg[MAX_LINE]; int LocalErrno = 0; int rc = 0; struct stat statbuf; struct utimbuf timebuf; static long LastModifyTime = 0; char id[] = "update_ha_lock_thread"; if (ISEMPTYSTR(HALockFile)) { /* locking HA not enabled */ return(NULL); } EMsg[0] = '\0'; while (TRUE) { usleep(DEF_USPERSECOND * HALockUpdateTime); rc = 0; LocalErrno = 0; mutex_lock(&EUIDMutex); errno = 0; if (stat(HALockFile,&statbuf) == 0) { /* check to make sure that no other process has modified this file * since the last time we did */ if ((LastModifyTime > 0) && (LastModifyTime != statbuf.st_mtime)) { snprintf(EMsg,sizeof(EMsg),"update time changed unexpectedly"); rc = -1; } else { /* no one has touched this file since we last did--continue */ LastModifyTime = time(NULL); timebuf.actime = LastModifyTime; timebuf.modtime = LastModifyTime; errno = 0; rc = utime(HALockFile,&timebuf); LocalErrno = errno; } } else { LocalErrno = errno; rc = -1; snprintf(EMsg,sizeof(EMsg),"could not stat file"); } mutex_unlock(&EUIDMutex); /* NOTE: HALockFile is emptied out when we delete the file during shutdown */ if ((rc == -1) && !ISEMPTYSTR(HALockFile)) { char *ErrorString; /* error occurred--immediate shutdown needed */ if (LocalErrno != 0) { ErrorString = strerror(LocalErrno); sprintf(log_buffer,"could not update HA lock file '%s' in heartbeat thread (%s - errno %d:%s)", HALockFile, EMsg, LocalErrno, ErrorString); log_err(LocalErrno,id,log_buffer); } else { sprintf(log_buffer,"could not update HA lock file '%s' in heartbeat thread (%s)", HALockFile, EMsg); log_err(-1,id,log_buffer); } /* restart pbs_server */ svr_restart(); } } /* END while (TRUE) */ /* NOTREACHED */ return(NULL); } /* END update_ha_lock_thread() */ int start_update_ha_lock_thread() { #ifndef USE_HA_THREADS /* not compiled with threads */ log_err(-1,"start_update_ha_lock_thread", "WARNING: cannot create HA update thread - pthreads not enabled\n"); return(FAILURE); #else /* USE_HA_THREADS is defined */ pthread_t HALockThread; pthread_attr_t HALockThreadAttr; int rc; int fds; char smallBuf[MAX_LINE]; char id[] = "start_update_ha_lock_thread"; /* write the pid to the lockfile for correctness */ fds = open(HALockFile,O_TRUNC|O_WRONLY,0600); if (fds < 0) { log_err(-1,id,"Couldn't write the pid to the lockfile\n"); return(FAILURE); } snprintf(smallBuf,sizeof(smallBuf),"%ld\n",(long)sid); if (write(fds,smallBuf,strlen(smallBuf)) != (ssize_t)strlen(smallBuf)) { log_err(-1,id,"Couldn't write the pid to the lockfile\n"); return(FAILURE); } /* we don't need an open handle on the lockfile, just correct update times */ close(fds); pthread_attr_init(&HALockThreadAttr); rc = pthread_create(&HALockThread,&HALockThreadAttr,update_ha_lock_thread,NULL); if (rc != 0) { /* error creating thread */ log_err(-1,id,"Could not create HA Lock Thread\n"); return(FAILURE); } log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, "HA Lock update thread is now created\n"); #endif /* ifndef USE_HA_THREADS */ return(SUCCESS); } /* END start_update_ha_lock_thread() */ int mutex_lock( mutex_t *Mutex) /* I */ { #ifdef USE_HA_THREADS if (pthread_mutex_lock(Mutex) != 0) { log_err(-1,"mutex_lock","ALERT: cannot lock mutex!\n"); return(FAILURE); } #endif /* ifdef USE_HA_THREADS */ return(SUCCESS); } /* END mutex_lock() */ int mutex_unlock( mutex_t *Mutex) /* I */ { #ifdef USE_HA_THREADS if (pthread_mutex_unlock(Mutex) != 0) { log_err(-1,"mutex_unlock","ALERT: cannot unlock mutex!\n"); return(FAILURE); } #endif /* ifdef USE_HA_THREADS */ return(SUCCESS); } /* END mutex_unlock() */ #ifdef USE_HA_THREADS /* * * lock_out_ha - lock out using moab style high availability * */ static void lock_out_ha() { bool_t UseFLock = TRUE; bool_t FilePossession = FALSE; bool_t FileIsMissing = FALSE; char MutexLockFile[MAX_NAME]; char id[] = "lock_out_ha"; int MutexLockFD = -1; int NumChecks = 0; struct stat StatBuf; snprintf(MutexLockFile,sizeof(MutexLockFile),"%s.mutex", HALockFile); time_now = time(NULL); while (!FilePossession) { if (NumChecks > 0) { usleep(DEF_USPERSECOND * HALockCheckTime); time_now = time(NULL); UseFLock = FALSE; if (MutexLockFD > 0) close(MutexLockFD); } NumChecks++; UseFLock = is_ha_lock_file_valid(HALockFile); if (UseFLock == TRUE) { /* try to get a filesystem lock on the "mutex" file */ while (acquire_file_lock(MutexLockFile,&MutexLockFD,"HA") == FAILURE) { strcpy(log_buffer,"Could not acquire HA flock--trying again in 1 second\n"); usleep(DEF_USPERSECOND); } } /* check if file lock exists */ if (stat(HALockFile,&StatBuf) == 0) { /* file DOES exist--check time */ FileIsMissing = FALSE; if ((time_now - StatBuf.st_mtime) < HALockCheckTime) { /* someone else probably has the lock */ continue; } /* update the file to mark it as ours */ utime(HALockFile,NULL); FilePossession = TRUE; } else { /* file doesn't exist--wait required amount of time and check again */ if (FileIsMissing == FALSE) { FileIsMissing = TRUE; /* if we don't have a mutex to protect file creation * race conditions, we need to wait and check again: * otherwise we can safely create it immediately */ if (UseFLock == FALSE) continue; } /* this is not the first time the file has been missing--we are * probably safe to create it */ HALockFD = open(HALockFile,O_CREAT|O_EXCL|O_RDONLY,0600); if (HALockFD < 0) { sprintf(log_buffer,"could not create HA lock file '%s'--errno %d:%s", HALockFile, errno, strerror(errno)); continue; } FilePossession = TRUE; } if (FilePossession == TRUE) { /* start heartbeat thread */ start_update_ha_lock_thread(); } if (UseFLock == TRUE) close(MutexLockFD); /* unlock file mutex */ } /* END while (!FilePossession) */ /* we have the file lock--go ahead and log this fact */ log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, "high availability file lock obtained"); } /* END lock_out_ha() */ #endif /* USE_HA_THREADS */ /** * daemonize_server() * figures out, based on the mode, whether or not to run in the background and does so * * @param DoBackground - (I) indicates whether or not we should run in the background * @param sid - (O) set to the correct pid * @return success unless we could not run in the background and we're supposed to */ static int daemonize_server( int DoBackground, /* I */ int *sid) /* O */ { int pid; FILE *dummyfile; char id[] = "daemonize_server"; if (!DoBackground) { /* handle foreground (i.e. debug mode) */ *sid = getpid(); setvbuf(stdout,NULL,_IOLBF,0); setvbuf(stderr,NULL,_IOLBF,0); return(SUCCESS); } /* run pbs_server in the background */ /* fork to disconnect from terminal */ if ((pid = fork()) == -1) { log_err(errno,id,"cannot fork into background"); return(FAILURE); } if (pid != 0) { /* exit if parent */ log_event( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, "INFO: parent is exiting"); exit(0); } /* NOTE: setsid() disconnects from controlling-terminal */ if ((*sid = setsid()) == -1) { log_err(errno,id,"Could not disconnect from controlling terminal"); return(FAILURE); } /* disconnect stdin,stdout,stderr */ fclose(stdin); fclose(stdout); fclose(stderr); dummyfile = fopen("/dev/null","r"); assert((dummyfile != 0) && (fileno(dummyfile) == 0)); dummyfile = fopen("/dev/null","w"); assert((dummyfile != 0) && (fileno(dummyfile) == 1)); dummyfile = fopen("/dev/null","w"); assert((dummyfile != 0) && (fileno(dummyfile) == 2)); if ((pid = fork()) == -1) { log_err(errno,id,"cannot fork into background"); return(FAILURE); } if (pid != 0) { /* exit if parent */ log_event( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, "INFO: parent is exiting"); exit(0); } /* update the sid (pid written to the lock file) so that * the correct pid is present */ *sid = getpid(); log_event( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, "INFO: child process in background"); return(SUCCESS); } /* END daemonize_server() */ #ifndef USE_HA_THREADS /* * lock_out - lock out other daemons from this directory. */ static void lock_out( int fds, int op) /* F_WRLCK or F_UNLCK */ { if (try_lock_out(fds,op)) { strcpy(log_buffer,"pbs_server: another server running\n"); log_err(errno,msg_daemonname,log_buffer); fprintf(stderr,"%s", log_buffer); exit(1); } } /** * * Try to lock * * @return Zero on success, one on failure * */ static int try_lock_out( int fds, int op) /* F_WRLCK or F_UNLCK */ { struct flock flock; flock.l_type = op; flock.l_whence = SEEK_SET; flock.l_start = 0; flock.l_len = 0; return(fcntl(fds,F_SETLK,&flock) != 0); } #endif /* !USE_HA_THREADS */ /** * gets attributes for the specified file/directory * * @param FileName (I) * @param ModifyTime (O) [optional] * @param FileSize (O) [optional] * @param IsExe (O) [optional] * @param IsDir (O) [optional] */ int get_file_info( char *FileName, /* I */ unsigned long *ModifyTime, /* O (optional */ long *FileSize, /* O (optional */ bool_t *IsExe, /* O (optional */ bool_t *IsDir) /* O (optional */ { int rc; char *ptr; char *id = "get_file_info"; struct stat sbuf; if (IsExe != NULL) *IsExe = FALSE; if (ModifyTime != NULL) *ModifyTime = 0; if (FileSize != NULL) *FileSize = 0; if (IsDir != NULL) *IsDir = FALSE; if ((FileName == NULL) || (FileName[0] == '\0')) { return(FAILURE); } /* FORMAT: <FILENAME>[ <ARG>]... */ /* NOTE: mask off, then restore possible args */ ptr = strchr(FileName,' '); if (ptr != NULL) *ptr = '\0'; rc = stat(FileName,&sbuf); if (rc == -1) { sprintf(log_buffer,"INFO: cannot stat file '%s', errno: %d (%s)\n", FileName, errno, strerror(errno)); log_err(errno,id,log_buffer); return(FAILURE); } if (ModifyTime != NULL) { *ModifyTime = (unsigned long)sbuf.st_mtime; } if (FileSize != NULL) { *FileSize = (long)sbuf.st_size; } if (IsExe != NULL) { if (sbuf.st_mode & S_IXUSR) *IsExe = TRUE; else *IsExe = FALSE; } if (IsDir != NULL) { if (sbuf.st_mode & S_IFDIR) *IsDir = TRUE; else *IsDir = FALSE; } return(SUCCESS); } /* end get_file_info() */ /** * gets the full path for command * * @return SUCCESS if the path is found, FAILURE otherwise */ int get_full_path( char *Cmd, /* I */ char *GoodCmd, /* O */ int GoodCmdLen) /* O */ { char *TokPtr = NULL; char *Delims = ":;"; /* windows and unix path deliminators */ char *PathLocation; char tmpPath[MAX_LINE]; bool_t IsExe = FALSE; bool_t IsDir = FALSE; if (Cmd[0] == '/') { /* absolute path specified */ if (get_file_info(Cmd,NULL,NULL,&IsExe,&IsDir) == FAILURE) { return(FAILURE); } if ((IsExe == FALSE) && (IsDir == FALSE)) { return(FAILURE); } snprintf(GoodCmd,GoodCmdLen,"%s",Cmd); return(SUCCESS); } PathLocation = strtok_r(OriginalPath,Delims,&TokPtr); while (PathLocation != NULL) { if (strlen(PathLocation) <= 0) { PathLocation = strtok_r(NULL,Delims,&TokPtr); continue; } if (PathLocation[strlen(PathLocation) - 1] == '/') { sprintf(tmpPath,"%s%s", PathLocation, Cmd); } else { sprintf(tmpPath,"%s/%s", PathLocation, Cmd); } if (get_file_info(tmpPath,NULL,NULL,&IsExe,NULL) == FAILURE) { PathLocation = strtok_r(NULL,Delims,&TokPtr); continue; } if (IsExe == FALSE) { PathLocation = strtok_r(NULL,Delims,&TokPtr); continue; } snprintf(GoodCmd,GoodCmdLen,"%s",tmpPath); return(SUCCESS); } /* END while (PathLocation != NULL) */ return(FAILURE); } /* END get_full_path() */ /** * * Restarts the pbs_server * */ int svr_restart() { int rc; char FullCmd[MAX_LINE]; char *id = "svr_restart"; if (get_full_path( ArgV[0], FullCmd, sizeof(FullCmd)) == FAILURE) { sprintf(log_buffer,"ALERT: cannot locate full path for '%s'\n", ArgV[0]); log_err(-1,id,log_buffer); exit(-10); } /* shut down network connections and rpp */ RPPConfigure(1,0); /* help rpp_shutdown go a bit faster */ rpp_shutdown(); net_close(-1); /* close all network connections */ /* copying FullCmd to ArV[0] is necessary for multiple restarts because * the path changes when we run pbs_server in the background. */ if (strcmp(FullCmd,ArgV[0]) != 0) { free(ArgV[0]); ArgV[0] = malloc(sizeof(char) * (strlen(FullCmd) + 1)); if (ArgV[0] == NULL) { /* could not malloc */ log_err(errno,id,"ERROR: cannot allocate memory for full command, cannot restart\n"); exit(-10); } strcpy(ArgV[0],FullCmd); } sprintf(log_buffer,"INFO: about to exec '%s'\n",ArgV[0]); log_event( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, id, log_buffer); log_close(1); if ((rc = execv(FullCmd,ArgV)) == -1) { /* exec failed */ exit(-10); } /* NOT REACHED */ exit(0); return(SUCCESS); } /* END svr_restart() */ /** * * restores this attribute to its default where supported/possible */ void restore_attr_default( struct attribute *attr) /* I */ { int index; index = (int)(attr - server.sv_attr); attr->at_flags &= ~ATR_VFLAG_SET; switch (index) { case SRV_ATR_log_events: server.sv_attr[(int)SRV_ATR_log_events].at_val.at_long = PBSEVENT_MASK; break; case SRV_ATR_tcp_timeout: server.sv_attr[(int)SRV_ATR_tcp_timeout].at_val.at_long = PBS_TCPTIMEOUT; break; case SRV_ATR_JobStatRate: server.sv_attr[(int)SRV_ATR_JobStatRate].at_val.at_long = PBS_RESTAT_JOB; break; case SRV_ATR_PollJobs: server.sv_attr[(int)SRV_ATR_PollJobs].at_val.at_long = PBS_POLLJOBS; break; case SRV_ATR_LogLevel: server.sv_attr[(int)SRV_ATR_LogLevel].at_val.at_long = 0; break; default: /* should never get here, but if we do then reset the flags so the user knows * that the value hasn't been cleared */ attr->at_flags |= ATR_VFLAG_SET; break; } } /* END restore_attr_default() */ /* END pbsd_main.c */