/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ /* * The entry point function for pbs_daemon. * * Included public functions re: * * main initialization and main loop of pbs_daemon */ #include /* the master config generated by configure */ #include #include #if (PLOCK_DAEMONS & 1) #include #endif /* PLOCK_DAEMONS */ #include "pbs_ifl.h" #include #include #include #include #include #include #include #include /* getpid */ #include #include #include "list_link.h" #include "work_task.h" #include "log.h" #include "../lib/Liblog/pbs_log.h" #include "../lib/Liblog/log_event.h" #include "../lib/Liblog/chk_file_sec.h" #include "../lib/Libifl/lib_ifl.h" #include "server_limits.h" #include "attribute.h" #include "pbs_job.h" #include "queue.h" #include "server.h" #include "pbs_nodes.h" #include "net_connect.h" #include "credential.h" #include "svrfunc.h" #include "tracking.h" #include "acct.h" #include "sched_cmds.h" #include "dis.h" #include "dis_init.h" #include "batch_request.h" #include "pbs_proto.h" #include "u_tree.h" #include "utils.h" #include "threadpool.h" #include "../lib/Libutils/u_lock_ctl.h" /* lock_init */ #include "svr_func.h" /* get_svr_attr_* */ #include "../lib/Libifl/lib_ifl.h" /* get_port_from_server_name_file */ #include "node_manager.h" /* svr_is_request */ #include "net_connect.h" /* set_localhost_name */ #include "../lib/Libnet/lib_net.h" /* start_listener_addrinfo */ #include "process_request.h" /* process_request */ #include "net_connect.h" /* set_localhost_name */ #include "tcp.h" /* tcp_chan */ #include "ji_mutex.h" #include "job_route.h" /* queue_route */ #include "exiting_jobs.h" #define TASK_CHECK_INTERVAL 10 #define HELLO_WAIT_TIME 600 #define HELLO_INTERVAL 10 #define TSERVER_HA_CHECK_TIME 1 /* 1 second sleep time between checks on the lock file for high availability */ #define UPDATE_TIMEOUT_INTERVAL 10 #define UPDATE_LOGLEVEL_INTERVAL 10 /* external functions called */ extern char *msg_startup3; extern void job_log_roll(int max_depth); extern int pbsd_init(int); extern void shutdown_ack(); extern void tcp_settimeout(long); extern void poll_job_task(struct work_task *); extern int schedule_jobs(void); extern int notify_listeners(void); extern void svr_shutdown(int); extern void acct_close(void); extern int svr_startjob(job *, struct batch_request **, char *, char *); extern int RPPConfigure(int, int); extern void acct_cleanup(long); void stream_eof(int, u_long, uint16_t, int); extern void scheduler_close(); #ifndef MAX_PATH_LEN #define MAX_PATH_LEN 256 #endif #ifndef MAX_NAME #define MAX_NAME 64 #endif #ifndef DEF_USPERSECOND #define DEF_USPERSECOND 1000000 #endif #ifndef bool_t #define bool_t unsigned char #endif #define ISEMPTYSTR(STR) ((STR)[0] == '\0') static void lock_out_ha(); /* external data items */ extern hello_container failures; extern int svr_chngNodesfile; extern int svr_totnodes; extern struct all_jobs alljobs; extern int run_change_logs; extern pthread_mutex_t *poll_job_task_mutex; extern int max_poll_job_tasks; /* External Functions */ extern int recov_svr_attr (int); extern void change_logs_handler(int); extern void change_logs(); /* ssize_t write_neverblocking_socket(int, const void *, ssize_t); */ /* Local Private Functions */ static int get_port (char *, unsigned int *, pbs_net_t *); static int daemonize_server (int, pid_t *); int mutex_lock (mutex_t *); int mutex_unlock (mutex_t *); int svr_restart(); void restore_attr_default (struct pbs_attribute *); /* Global Data Items */ int mom_hierarchy_retry_time = NODE_COMM_RETRY_TIME; time_t last_task_check_time = 0; int disable_timeout_check = FALSE; int lockfds = -1; int ForceCreation = FALSE; int high_availability_mode = FALSE; char *acct_file = NULL; char *log_file = NULL; char *job_log_file = NULL; char *path_home = (char *)PBS_SERVER_HOME; char *path_acct; char path_log[MAXPATHLEN + 1]; char *path_priv = NULL; char *path_arrays; char *path_credentials; char *path_jobs; char *path_queues; char *path_spool; char *path_svrdb = NULL; char *path_svrdb_new; char *path_svrlog; char *path_track; char *path_nodes; char *path_mom_hierarchy; char *path_nodes_new; char *path_nodestate; char *path_nodenote; char *path_nodenote_new; char *path_checkpoint; char *path_jobinfo_log; extern char *msg_daemonname; extern char *msg_info_server; /* Server information message */ const char *pbs_o_host = "PBS_O_HOST"; pbs_net_t pbs_mom_addr; unsigned int pbs_mom_port = 0; unsigned int pbs_rm_port; pbs_net_t pbs_scheduler_addr; unsigned int pbs_scheduler_port; extern pbs_net_t pbs_server_addr; unsigned int pbs_server_port_dis; bool auto_send_hierarchy = true; mom_hierarchy_t *mh; listener_connection listener_conns[MAXLISTENERS]; int queue_rank = 0; int a_opt_init = -1; int wait_for_moms_hierarchy = FALSE; int route_retry_interval = 10; /* time in seconds to check routing queues */ /* info useful when analyzing core file */ char Torque_Info_Version[] = PACKAGE_VERSION; char Torque_Info_Version_Revision[] = GIT_HASH; char Torque_Info_Component[] = "pbs_server"; char Torque_Info_SysVersion[BUF_SIZE]; /* HA global data items */ long HALockCheckTime = 0; long HALockUpdateTime = 0; char HALockFile[MAXPATHLEN+1]; mutex_t EUIDMutex; /* prevents thread from trying to lock the file from a different euid */ int HALockFD; /* END HA global data items */ struct server server; /* the server structure */ char server_host[PBS_MAXHOSTNAME + 1]; /* host_name */ char *mom_host = server_host; int server_init_type = RECOV_WARM; char server_name[PBS_MAXSERVERNAME + 1]; /* host_name[:service|port] */ int svr_do_schedule = SCH_SCHEDULE_NULL; pthread_mutex_t *svr_do_schedule_mutex; pthread_mutex_t *check_tasks_mutex; pthread_mutex_t *reroute_job_mutex; extern int listener_command; extern hello_container hellos; extern hello_container failures; pthread_mutex_t *listener_command_mutex; tlist_head svr_newnodes; /* list of newly created nodes */ pthread_mutex_t task_list_timed_mutex; pid_t sid; char *plogenv = NULL; int LOGLEVEL = 0; int DEBUGMODE = 0; int TDoBackground = 1; /* background daemon */ char *ProgName; char *NodeSuffix = NULL; int MultiMomMode = 0; int allow_any_mom = FALSE; int array_259_upgrade = FALSE; char server_localhost[PBS_MAXHOSTNAME + 1]; size_t localhost_len = PBS_MAXHOSTNAME; /* * need_y_response - on create/clean initialization that would delete * information, obtain the operator approval first. */ static void need_y_response( int type) /* I */ { static int answ = -2; int c; if (answ > 0) { return; /* already received a response */ } fflush(stdin); if (type == RECOV_CREATE) { printf("\nYou have selected to start pbs_server in create mode."); printf("\nIf the server database exists it will be overwritten."); printf("\ndo you wish to continue y/(n)?"); } else printf(msg_startup3, msg_daemonname, server_name, "Cold", "jobs may"); while (1) { answ = getchar(); c = answ; while ((c != '\n') && (c != EOF)) c = getchar(); switch (answ) { case 'y': case 'Y': return; /*NOTREACHED*/ break; case EOF: case '\n': case 'n': case 'N': printf("PBS server %s initialization aborted\n", server_name); exit(0); /*NOTREACHED*/ break; } printf("y(es) or n(o) please:\n"); } return; } /* END need_y_response() */ int process_pbs_server_port( int sock, int is_scheduler_port, long *args) { int proto_type; int rc = PBSE_NONE; int version; char log_buf[LOCAL_LOG_BUF_SIZE]; struct tcp_chan *chan = NULL; if ((chan = DIS_tcp_setup(sock)) == NULL) { return(PBSE_MEM_MALLOC); } proto_type = disrui_peek(chan,&rc); switch (proto_type) { case PBS_BATCH_PROT_TYPE: rc = process_request(chan); break; case IS_PROTOCOL: version = disrsi(chan, &rc); if (rc != DIS_SUCCESS) { log_err(-1, __func__, "Cannot read version - skipping this request.\n"); rc = PBSE_SOCKET_CLOSE; break; } rc = svr_is_request(chan, version, args); break; default: { struct sockaddr s_addr; struct sockaddr_in *addr; socklen_t len = sizeof(s_addr); if (getpeername(sock, &s_addr, &len) == 0) { addr = (struct sockaddr_in *)&s_addr; if (proto_type == 0) { /* * Don't log error if close is on scheduler port. Scheduler is * responsible for closing the connection */ if (!is_scheduler_port) { if (LOGLEVEL >= 8) { snprintf(log_buf, sizeof(log_buf), "proto_type: %d: Socket (%d) close detected from %s", proto_type, sock, netaddr(addr)); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_REQUEST, __func__, log_buf); } } if (chan->IsTimeout) { chan->IsTimeout = 0; rc = PBSE_TIMEOUT; } else rc = PBSE_SOCKET_CLOSE; } else { snprintf(log_buf,sizeof(log_buf), "Socket (%d) Unknown protocol %d from %s", sock, proto_type, netaddr(addr)); log_err(-1, __func__, log_buf); rc = PBSE_SOCKET_DATA; } } else rc = PBSE_SOCKET_CLOSE; break; } } if (chan != NULL) DIS_tcp_cleanup(chan); return(rc); } /* END process_pbs_server_port() */ void *start_process_pbs_server_port( void *new_sock) { long *args = (long *)new_sock; int sock; int rc = PBSE_NONE; sock = (int)args[0]; while ((rc != PBSE_SOCKET_DATA) && (rc != PBSE_SOCKET_INFORMATION) && (rc != PBSE_INTERNAL) && (rc != PBSE_SYSTEM) && (rc != PBSE_MEM_MALLOC) && (rc != PBSE_SOCKET_CLOSE)) { netcounter_incr(); rc = process_pbs_server_port(sock, FALSE, args); } free(new_sock); close_conn(sock, FALSE); /* Thread exit */ return(NULL); } void clear_listeners(void) /* I */ { int i; for (i = 0;i < MAXLISTENERS; i++) { listener_conns[i].address = 0; listener_conns[i].port = 0; listener_conns[i].sock = -1; listener_conns[i].first_time = 1; } return; } /* END clear_listeners */ int add_listener( pbs_net_t l_addr, /* I */ unsigned int l_port) /* I */ { int i; for (i = 0;i < MAXLISTENERS; i++) { if (listener_conns[i].address == 0) { listener_conns[i].address = l_addr; listener_conns[i].port = l_port; listener_conns[i].sock = -1; return (0); } } return (-1); } /* END add_listener */ int PBSShowUsage( const char *EMsg) /* I (optional) */ { fprintf(stderr, "Usage: %s\n", ProgName); fprintf(stderr, " -A \\\\ Path to accounting file\n"); fprintf(stderr, " -a \\\\ Scheduling\n"); fprintf(stderr, " -c \\\\ Wait for mom hierarchy\n"); fprintf(stderr, " -D \\\\ Debugmode\n"); fprintf(stderr, " -d \\\\ Homedir\n"); fprintf(stderr, " -e \\\\ Enable any mom\n"); fprintf(stderr, " -f \\\\ Force Overwrite Serverdb\n"); fprintf(stderr, " -h \\\\ Print Usage\n"); fprintf(stderr, " -H \\\\ Daemon Hostname\n"); fprintf(stderr, " -L \\\\ Logfile\n"); fprintf(stderr, " -l \\\\ Listener Port\n"); fprintf(stderr, " -M \\\\ MOM Port\n"); fprintf(stderr, " -p \\\\ Server Port\n"); fprintf(stderr, " -R \\\\ RM Port\n"); fprintf(stderr, " -S \\\\ Scheduler Port\n"); fprintf(stderr, " -t \\\\ Startup Type (hot, warm, cold, create)\n"); fprintf(stderr, " -v \\\\ Version\n"); fprintf(stderr, " --about \\\\ Print information about pbs_server\n"); fprintf(stderr, " --ha \\\\ High Availability MODE\n"); fprintf(stderr, " --help \\\\ Print Usage\n"); fprintf(stderr, " --version \\\\ Version and commit\n"); if (EMsg != NULL) { fprintf(stderr, " %s\n", EMsg); } return(0); } /* END PBSShowUsage() */ /** * parse_command_line * * parse the parameters from the command line */ void parse_command_line( int argc, char *argv[]) { extern int optind; extern char *optarg; char *pc = NULL; int c; int i; int local_errno = 0; char EMsg[1024]; char *servicename = NULL; pbs_net_t def_pbs_server_addr; pbs_net_t listener_addr; unsigned int listener_port; static struct { const char *it_name; int it_type; } init_name_type[] = { { "hot", RECOV_HOT }, { "warm", RECOV_WARM }, { "cold", RECOV_COLD }, { "create", RECOV_CREATE }, { "", RECOV_Invalid } }; ForceCreation = FALSE; while ((c = getopt(argc, argv, "A:a:cd:DefhH:L:l:mM:np:R:S:t:uv-:")) != -1) { switch (c) { case '-': if ((optarg == NULL) || (optarg[0] == '\0')) { PBSShowUsage("empty command line arg"); exit(1); } if (!strcmp(optarg, "version")) { fprintf(stderr, "Version: %s\nCommit: %s \n", PACKAGE_VERSION, GIT_HASH); exit(0); } if (!strcmp(optarg, "about")) { printf("Package: %s\n", PACKAGE_STRING); printf("Sourcedir: %s\n", PBS_SOURCE_DIR); printf("Configure: %s\n", PBS_CONFIG_ARGS); printf("Buildcflags: %s\n", PBS_CFLAGS); printf("Buildhost: %s\n", PBS_BUILD_HOST); printf("Builddate: %s\n", PBS_BUILD_DATE); printf("Builddir: %s\n", PBS_BUILD_DIR); printf("Builduser: %s\n", PBS_BUILD_USER); printf("Installdir: %s\n", PBS_INSTALL_DIR); printf("Serverhome: %s\n", PBS_SERVER_HOME); printf("Version: %s\n", PACKAGE_VERSION); printf("Commit: %s\n", GIT_HASH); exit(0); } if (!strcmp(optarg, "help")) { PBSShowUsage(NULL); exit(0); } if (!strcasecmp(optarg, "ha")) /* High Availability */ { high_availability_mode = TRUE; break; } PBSShowUsage("invalid command line arg"); exit(1); /*NOTREACHED*/ break; case 'a': pthread_mutex_lock(server.sv_attr_mutex); if (decode_b( &server.sv_attr[SRV_ATR_scheduling], NULL, NULL, optarg, 0) != 0) { (void)fprintf(stderr, "%s: bad -a option\n", argv[0]); exit(1); } a_opt_init = server.sv_attr[SRV_ATR_scheduling].at_val.at_long; pthread_mutex_unlock(server.sv_attr_mutex); break; case 'c': wait_for_moms_hierarchy = TRUE; break; case 'd': path_home = optarg; break; case 'D': TDoBackground = 0; break; case 'e': allow_any_mom = TRUE; break; case 'f': ForceCreation = TRUE; break; case 'h': PBSShowUsage(NULL); exit(0); break; case 'H': /* overwrite locally detected hostname with specified hostname */ /* (used for multi-homed hosts) */ snprintf(server_host, PBS_MAXHOSTNAME, "%s", optarg); if (get_fullhostname(server_host, server_host, PBS_MAXHOSTNAME, EMsg) == -1) { /* FAILURE */ if (EMsg[0] != '\0') { char tmpLine[1024]; snprintf(tmpLine, sizeof(tmpLine), "unable to determine full hostname for specified server host '%s' - %s", server_host, EMsg); log_err(-1, __func__, tmpLine); } else { log_err(-1, __func__, "unable to determine full server hostname"); } exit(1); } snprintf(server_name, PBS_MAXSERVERNAME, "%s", server_host); def_pbs_server_addr = pbs_server_addr; pbs_server_addr = get_hostaddr(&local_errno, server_host); if (pbs_mom_addr == def_pbs_server_addr) pbs_mom_addr = pbs_server_addr; if (pbs_scheduler_addr == def_pbs_server_addr) pbs_scheduler_addr = pbs_server_addr; if ((servicename != NULL) && (strlen(servicename) > 0)) { if (strlen(server_name) + strlen(servicename) + 1 > (size_t)PBS_MAXSERVERNAME) { fprintf(stderr, "%s: -h host too long\n", argv[0]); exit(1); } strcat(server_name, ":"); strcat(server_name, servicename); if ((pbs_server_port_dis = atoi(servicename)) == 0) { fprintf(stderr, "%s: host: %s, port: %s, max: %i\n", argv[0], server_name, servicename, PBS_MAXSERVERNAME); fprintf(stderr, "%s: -h host invalid\n", argv[0]); exit(1); } } /* END if (strlen(servicename) > 0) */ break; case 'l': clear_listeners(); if (get_port( optarg, &listener_port, &listener_addr)) { fprintf(stderr, "%s: bad -l %s\n", argv[0], optarg); exit(1); } if (add_listener(listener_addr, listener_port) < 0) { fprintf(stderr, "%s: failed to add listener %s\n", argv[0], optarg); exit(1); } break; case 'p': servicename = optarg; if (strlen(server_name) + strlen(servicename) + 1 > (size_t)PBS_MAXSERVERNAME) { fprintf(stderr, "%s: -p host:port too long\n", argv[0]); exit(1); } strcat(server_name, ":"); strcat(server_name, servicename); if ((pbs_server_port_dis = atoi(servicename)) == 0) { fprintf(stderr, "%s: -p host:port invalid\n", argv[0]); exit(1); } break; case 't': for (i = RECOV_HOT;i < RECOV_Invalid;i++) { if (strcmp(optarg, init_name_type[i].it_name) == 0) { server_init_type = init_name_type[i].it_type; break; } } /* END for (i) */ if (i == RECOV_Invalid) { fprintf(stderr, "%s -t bad recovery type\n", argv[0]); exit(1); } break; case 'A': acct_file = optarg; break; case 'L': log_file = optarg; break; case 'm': MultiMomMode = 1; break; case 'M': if (get_port(optarg, &pbs_mom_port, &pbs_mom_addr)) { fprintf(stderr, "%s: bad -M %s\n", argv[0], optarg); exit(1); } if (isalpha((int)*optarg)) { if ((pc = strchr(optarg, (int)':')) != NULL) * pc = '\0'; mom_host = optarg; } break; case 'n': auto_send_hierarchy = false; break; case 'R': if ((pbs_rm_port = atoi(optarg)) == 0) { fprintf(stderr, "%s: bad -R %s\n", argv[0], optarg); exit(1); } break; case 'S': /* FORMAT: ??? */ if (get_port( optarg, &pbs_scheduler_port, &pbs_scheduler_addr)) { fprintf(stderr, "%s: bad -S %s\n", argv[0], optarg); exit(1); } break; case 'u': array_259_upgrade = TRUE; break; case 'v': fprintf(stderr, "version: %s\n", PACKAGE_VERSION); exit(0); break; default: PBSShowUsage("invalid command line arg"); exit(1); break; } /* END switch (c) */ } /* END while (c) */ if (optind < argc) { fprintf(stderr, "%s: invalid operand\n", argv[0]); exit(1); } } /* Globals to thread the accept port */ pthread_t accept_thread_id = -1; int accept_thread_active = FALSE; int route_thread_active = FALSE; pthread_t route_retry_thread_id = -1; /* * check_tasks - look for the next work task to perform: * 1. All items on the immediate list, then * 2. All items on the timed task list which have expired times * * Returns: amount of time till next task */ void *check_tasks(void *notUsed) { work_task *ptask; int rc = PBSE_NONE; time_t time_now; pthread_mutex_lock(check_tasks_mutex); time_now = time(NULL); last_task_check_time = time_now; while ((ptask = pop_timed_task(time_now)) != NULL) { rc = dispatch_timed_task(ptask); /* will delete link */ /* if dispatch_task does not return PBSE_NONE it is because we have used up our alotment of threads. Break for now and come back to this next time through the main_loop */ if (rc != PBSE_NONE) { pthread_mutex_unlock(ptask->wt_mutex); break; } } /* should the scheduler be run? If so, adjust the schedule time */ if (server.sv_next_schedule - time_now <= 0) { pthread_mutex_lock(svr_do_schedule_mutex); svr_do_schedule = SCH_SCHEDULE_TIME; pthread_mutex_unlock(svr_do_schedule_mutex); pthread_mutex_lock(listener_command_mutex); listener_command = SCH_SCHEDULE_TIME; pthread_mutex_unlock(listener_command_mutex); } pthread_mutex_unlock(check_tasks_mutex); return(NULL); } /* END check_tasks() */ /* * start_hot_jobs - place any job which is state QUEUED and has the * HOT start flag set into execution. * * Returns the number of jobs to be hot started. */ static int start_hot_jobs(void) { int ct = 0; job *pjob; int iter = -1; while ((pjob = next_job(&alljobs,&iter)) != NULL) { if ((pjob->ji_qs.ji_substate == JOB_SUBSTATE_QUEUED) && (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HOTSTART)) { log_event( PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, pjob->ji_qs.ji_jobid, (char *)"attempting to hot start job"); svr_startjob(pjob, NULL, NULL, NULL); ct++; } unlock_ji_mutex(pjob, __func__, "1", LOGLEVEL); } return(ct); } /* END start_hot_jobs() */ void send_any_hellos_needed() { hello_info *hi; /* send hierarchy using threadpool */ while ((hi = pop_hello(&hellos)) != NULL) enqueue_threadpool_request(send_hierarchy_threadtask, hi); /* re-insert any failures */ while ((hi = pop_hello(&failures)) != NULL) add_hello_info(&hellos, hi); } /* END send_any_hellos_needed() */ void route_listener_cleanup( void *vp) { route_thread_active = FALSE; } /* END route_listener_cleanup() */ void *handle_queue_routing_retries( void *vp) { pbs_queue *pque; char *queuename = NULL; int iter = -1; int rc; char log_buf[LOCAL_LOG_BUF_SIZE]; pthread_attr_t routing_attr; route_thread_active = TRUE; pthread_cleanup_push(route_listener_cleanup, vp); if (pthread_attr_init(&routing_attr) != 0) { snprintf(log_buf, sizeof(log_buf), "pthread_attr_init failed in %s. Will try next iteration", __func__); log_err(-1, msg_daemonname, log_buf); return(NULL); } else if (pthread_attr_setdetachstate(&routing_attr, PTHREAD_CREATE_DETACHED) != 0) { snprintf(log_buf, sizeof(log_buf), "pthread_attr_setdetachstate failed in %s. Will try next iteration", __func__); log_err(-1, msg_daemonname, log_buf); pthread_attr_destroy(&routing_attr); /* we don't care if the succeeds or fails */ return(NULL); } while(1) { while ((pque = next_queue(&svr_queues, &iter)) != NULL) { if (pque->qu_qs.qu_type == QTYPE_RoutePush) { /* NYI. What happens if a queue is deleted */ if (pque->route_retry_thread_id == (pthread_t)-1) { queuename = strdup(pque->qu_qs.qu_name); /* make sure this gets freed inside queue_route */ /* thread not yet started. Let's start the route retry thread for this routing queue */ rc = pthread_create(&pque->route_retry_thread_id, &routing_attr, queue_route, queuename); if (rc != 0) { snprintf(log_buf, sizeof(log_buf), "pthread_attr_init failed: %d in %s. Will try next iteration", rc, __func__); log_err(-1, msg_daemonname, log_buf); free(queuename); /* Just go on to the next queue. do not return NULL here */ } } else { /* the thread was started. Check to see if it is still running */ /* Yes, calling pthread_kill with a 0 signal just let's us know if the thread is running. It does not kill the thread */ if (pthread_kill(pque->route_retry_thread_id, 0) == ESRCH) { queuename = strdup(pque->qu_qs.qu_name); /* make sure this gets freed inside queue_route */ rc = pthread_create(&pque->route_retry_thread_id, &routing_attr, queue_route, queuename); if (rc != 0) { snprintf(log_buf, sizeof(log_buf), "pthread_attr_init failed: %d in %s. Will try next iteration", rc, __func__); log_err(-1, msg_daemonname, log_buf); free(queuename); /* Just go on to the next queue. do not return NULL here */ } } } } unlock_queue(pque, __func__, NULL, LOGLEVEL); } sleep(route_retry_interval); } pthread_attr_destroy(&routing_attr); /* we don't care if the succeeds or fails */ pthread_cleanup_pop(1); return(NULL); } /* END handle_queue_routing_retries() */ void *handle_scheduler_contact( void *vp) { schedule_jobs(); notify_listeners(); return(NULL); } /* END handle_scheduler_contact() */ /* * accept_listener_cleanup() */ void accept_listener_cleanup( void *vp) { accept_thread_active = FALSE; } /* END accept_listener_cleanup() */ void *start_accept_listener( void *vp) { char server_name_trimmed[PBS_MAXSERVERNAME + 1]; char *colon_pos = NULL; colon_pos = strchr(server_name, ':'); if (colon_pos == NULL) strcpy(server_name_trimmed, server_name); else strncpy(server_name_trimmed, server_name, colon_pos - server_name); accept_thread_active = TRUE; pthread_cleanup_push(accept_listener_cleanup, vp); start_listener_addrinfo(server_name_trimmed, pbs_server_port_dis, start_process_pbs_server_port); pthread_cleanup_pop(1); return(NULL); } /* END start_accept_listener() */ void start_accept_thread() { pthread_attr_t accept_attr; accept_thread_id = -1; if ((pthread_attr_init(&accept_attr)) != 0) { perror("pthread_attr_init failed. Could not start accept thread"); log_err(-1, msg_daemonname,(char *)"pthread_attr_init failed. Could not start accept thread"); } else if ((pthread_attr_setdetachstate(&accept_attr, PTHREAD_CREATE_DETACHED) != 0)) { perror("pthread_attr_setdetatchedstate failed. Could not start accept thread"); log_err(-1, msg_daemonname,(char *)"pthread_attr_setdetachedstate failed. Could not start accept thread"); } else if ((pthread_create(&accept_thread_id, &accept_attr, start_accept_listener, NULL)) != 0) { perror("could not start listener for pbs_server"); log_err(-1, msg_daemonname, (char *)"Failed to start listener for pbs_server"); } } /* END start_accept_thread() */ void start_routing_retry_thread() { pthread_attr_t routing_attr; route_retry_thread_id = -1; if ((pthread_attr_init(&routing_attr)) != 0) { perror("pthread_attr_init failed. Could not start accept thread"); log_err(-1, msg_daemonname, "pthread_attr_init failed. Could not start handle_queue_routing_retries"); } else if ((pthread_attr_setdetachstate(&routing_attr, PTHREAD_CREATE_DETACHED) != 0)) { perror("pthread_attr_setdetatchedstate failed. Could not start accept thread"); log_err(-1, msg_daemonname, "pthread_attr_setdetachedstate failed. Could not start handle_queue_routing_retries"); } else if ((pthread_create(&route_retry_thread_id, &routing_attr, handle_queue_routing_retries, NULL)) != 0) { perror("could not start listener for pbs_server"); log_err(-1, msg_daemonname, "Failed to start handle_queue_routing_retries"); } } /* END start_routing_retry_thread() */ void start_exiting_retry_thread() { pthread_attr_t attr; pthread_t exiting_thread; if (pthread_attr_init(&attr) != 0) { perror("pthread_attr_init failed. Could not start exiting retry thread"); log_err(-1, msg_daemonname,(char *)"pthread_attr_init failed. Could not start inspect_exiting_jobs"); } else if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0) { perror("pthread_attr_setdetatchedstate failed. Could not start exiting retry thread"); log_err(-1, msg_daemonname,(char *)"pthread_attr_setdetachedstate failed. Could not start inspect_exiting_jobs"); } else if (pthread_create(&exiting_thread, &attr, inspect_exiting_jobs, NULL) != 0) { perror("could not start exiting job retry thread for pbs_server"); log_err(-1, msg_daemonname, (char *)"Failed to start inspect_exiting_jobs"); } } /* END start_exiting_retry_thread() */ void monitor_accept_thread() { if (accept_thread_active == FALSE) start_accept_thread(); } /* END monitor_accept_thread() */ void monitor_route_retry_thread() { if (route_thread_active == FALSE) start_routing_retry_thread(); } /* END monitor_route_retry_thread() */ void main_loop(void) { int c; long state = SV_STATE_DOWN; time_t waittime = 5; job *pjob; int iter; long when = 0; long timeout = 0; long log = 0; long scheduling = FALSE; long sched_iteration = 0; time_t time_now = time(NULL); time_t try_hellos = 0; time_t update_timeout = 0; time_t update_loglevel = 0; extern char *msg_startup2; /* log message */ char log_buf[LOCAL_LOG_BUF_SIZE]; void check_nodes(struct work_task *); void check_log(struct work_task *); void check_job_log(struct work_task *); void check_acct_log(struct work_task *); server.sv_started = time_now; /* time server started */ /* record the fact that we are up and running */ sprintf(log_buf, msg_startup2, sid, LOGLEVEL); log_event(PBSEVENT_SYSTEM | PBSEVENT_FORCE,PBS_EVENTCLASS_SERVER,msg_daemonname,log_buf); /* do not check nodes immediately as they will initially be marked down unless they have already reported in */ get_svr_attr_l(SRV_ATR_check_rate, &when); when += time_now; if (svr_totnodes > 1024) { /* for large systems, give newly reported nodes more time before being marked down while pbs_moms are initialy reporting in */ set_task(WORK_Timed, when + svr_totnodes / 12, check_nodes, (char *)NULL, FALSE); } else { set_task(WORK_Timed, when, check_nodes, (char *)NULL, FALSE); } /* Just check the nodes with check_nodes above and don't ping anymore. */ set_task(WORK_Timed, time_now + 5, check_log, (char *)NULL, FALSE); set_task(WORK_Timed,time_now + 10,check_acct_log, (char *)NULL, FALSE); /* * Now at last, we are ready to do some batch work. The * following section constitutes the "main" loop of the server */ get_svr_attr_l(SRV_ATR_State, &state); if (server_init_type == RECOV_HOT) state = SV_STATE_HOT; else state = SV_STATE_RUN; set_svr_attr(SRV_ATR_State, &state); if (plogenv == NULL) /* If no specification of loglevel from env */ { get_svr_attr_l(SRV_ATR_LogLevel, &log); LOGLEVEL = log; } #ifdef PBS_VERSION printf("pbs_server is up (svn version - %s, port %d)\n", PBS_VERSION, pbs_server_port_dis); #else printf("pbs_server is up (version - %s, port - %d)\n", VERSION, pbs_server_port_dis); #endif time_now = time(NULL); if (wait_for_moms_hierarchy == TRUE) try_hellos = time_now + HELLO_WAIT_TIME; start_accept_thread(); start_routing_retry_thread(); start_exiting_retry_thread(); while (state != SV_STATE_DOWN) { /* first process any task whose time delay has expired */ time_now = time(NULL); monitor_accept_thread(); monitor_route_retry_thread(); if (run_change_logs == TRUE) change_logs(); if (try_hellos <= time_now) { try_hellos = time_now + HELLO_INTERVAL; send_any_hellos_needed(); } if (time_now - last_task_check_time > TASK_CHECK_INTERVAL) enqueue_threadpool_request(check_tasks, NULL); if ((disable_timeout_check == FALSE) && (time_now > update_timeout)) { update_timeout = time_now + UPDATE_TIMEOUT_INTERVAL; get_svr_attr_l(SRV_ATR_tcp_timeout, &timeout); /* don't allow timeouts to go below 300 seconds - this is a safety * net for an extremely rare error */ if (timeout < 300) { snprintf(log_buf, sizeof(log_buf), "tcp timeout was %ld resetting to 300", timeout); timeout = 300; set_svr_attr(SRV_ATR_tcp_timeout, &timeout); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buf); } DIS_tcp_settimeout(timeout); } waittime = MAX(1, waittime); if (state == SV_STATE_RUN) { /* if time or event says to run scheduler, do it */ get_svr_attr_l(SRV_ATR_scheduling, &scheduling); get_svr_attr_l(SRV_ATR_scheduler_iteration, &sched_iteration); pthread_mutex_lock(svr_do_schedule_mutex); if ((svr_do_schedule != SCH_SCHEDULE_NULL) && scheduling) { pthread_mutex_unlock(svr_do_schedule_mutex); server.sv_next_schedule = time_now + sched_iteration; enqueue_threadpool_request(handle_scheduler_contact, NULL); } else { pthread_mutex_unlock(svr_do_schedule_mutex); } } else if (state == SV_STATE_HOT) { /* Are there HOT jobs to rerun */ /* only try every _CYCLE seconds */ c = 0; if (time_now > server.sv_hotcycle + SVR_HOT_CYCLE) { server.sv_hotcycle = time_now + SVR_HOT_CYCLE; c = start_hot_jobs(); } /* If more than _LIMIT seconds since start, stop */ if ((c == 0) || (time_now > server.sv_started + SVR_HOT_LIMIT)) { server_init_type = RECOV_WARM; state = SV_STATE_RUN; set_svr_attr(SRV_ATR_State, &state); } } /* qmgr can dynamically set the loglevel specification * we use the new value if PBSLOGLEVEL was not specified */ if ((plogenv == NULL) && (time_now > update_loglevel)) { update_loglevel = time_now + UPDATE_LOGLEVEL_INTERVAL; get_svr_attr_l(SRV_ATR_LogLevel, &log); LOGLEVEL = log; } /* * Can we comment this out? Would anything above change the * server state without setting the 'state' variable? */ get_svr_attr_l(SRV_ATR_State, &state); if (state == SV_STATE_SHUTSIG) svr_shutdown(SHUT_SIG); /* caught sig */ /* * if in process of shuting down and all running jobs * and all children are done, change state to DOWN */ if (state > SV_STATE_RUN) { bool change_state = false; pthread_mutex_lock(server.sv_jobstates_mutex); change_state = ((server.sv_jobstates[JOB_STATE_RUNNING] == 0) && (server.sv_jobstates[JOB_STATE_EXITING] == 0)); pthread_mutex_unlock(server.sv_jobstates_mutex); if (change_state == true) { state = SV_STATE_DOWN; set_svr_attr(SRV_ATR_State, &state); /* at this point kill the threadpool */ destroy_request_pool(); } } /* Sleep 1/4 of a second. Could probably be increased */ usleep(250000); get_svr_attr_l(SRV_ATR_State, &state); } /* END while (*state != SV_STATE_DOWN) */ if(accept_thread_id != (pthread_t)-1) { pthread_cancel(accept_thread_id); accept_thread_id = (pthread_t)-1; } svr_save(&server, SVR_SAVE_FULL); /* final recording of server */ track_save(NULL); /* save tracking data */ iter = -1; /* save any jobs that need saving */ while ((pjob = next_job(&alljobs, &iter)) != NULL) { if (pjob->ji_modified) job_save(pjob, SAVEJOB_FULL, 0); unlock_ji_mutex(pjob, __func__, "1", LOGLEVEL); } if (svr_chngNodesfile) { /*nodes created/deleted, or props changed and*/ /*update in req_manager failed; try again */ update_nodes_file(NULL); } } /* END main_loop() */ /** * initialize_globals * * Set the intial state of global variables. */ void initialize_globals(void) { strcpy(pbs_current_user, "PBS_Server"); msg_daemonname = strdup(pbs_current_user); server.sv_qs_mutex = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t)); server.sv_attr_mutex = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t)); server.sv_jobstates_mutex = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t)); pthread_mutex_init(server.sv_qs_mutex,NULL); pthread_mutex_init(server.sv_attr_mutex,NULL); pthread_mutex_init(server.sv_jobstates_mutex,NULL); initialize_ruserok_mutex(); lock_init(); } /** * set_globals_from_environment * * Set the intial state of global variables based on * the program environment variables. */ void set_globals_from_environment(void) { char *ptr; /* initialize service port numbers for self, Scheduler, and MOM */ if ((ptr = getenv("PBS_BATCH_SERVICE_PORT")) != NULL) { pbs_server_port_dis = (int)strtol(ptr, NULL, 10); } if ((ptr = getenv("PBS_SCHEDULER_SERVICE_PORT")) != NULL) { pbs_scheduler_port = (int)strtol(ptr, NULL, 10); } if ((ptr = getenv("PBS_MOM_SERVICE_PORT")) != NULL) { pbs_mom_port = (int)strtol(ptr, NULL, 10); } if ((ptr = getenv("PBS_MANAGER_SERVICE_PORT")) != NULL) { pbs_rm_port = (int)strtol(ptr, NULL, 10); } if ((plogenv = getenv("PBSLOGLEVEL")) != NULL) { /* Note the plogenv is global and is tested in main_loop */ LOGLEVEL = (int)strtol(plogenv, NULL, 10); } if (getenv("PBSDEBUG") != NULL) { DEBUGMODE = 1; TDoBackground = 0; } return; } /* END set_globals_from_environment() */ /* * main - the initialization and main loop of pbs_daemon */ int main( int argc, /* I */ char *argv[]) /* I */ { int i; int local_errno = 0; char lockfile[MAXPATHLEN + 1]; char *pc = NULL; char EMsg[MAX_LINE]; char tmpLine[MAX_LINE]; char log_buf[LOCAL_LOG_BUF_SIZE]; unsigned int server_name_file_port = 0; extern char pbs_server_name[]; extern char *msg_svrdown; /* log message */ extern char *msg_startup1; /* log message */ FILE *fp; /* populate a variable useful when analyzing a core file */ memset(Torque_Info_SysVersion, 0, sizeof(Torque_Info_SysVersion)); if ((fp = fopen("/proc/version", "r")) != NULL) { fread(Torque_Info_SysVersion, sizeof(Torque_Info_SysVersion), 1, fp); fclose(fp); } ProgName = argv[0]; srand(get_random_number()); tzset(); /* localtime_r needs this */ memset(&server, 0, sizeof(struct server)); log_init(NULL, NULL); initialize_globals(); set_globals_from_environment(); /* set standard umask */ umask(022); /* save argv and the path for later use */ save_args(argc, argv); /* close files for security purposes */ /* do the following before getuid() and geteuid() can trigger nss_ldap into opening a socket to the LDAP server */ i = sysconf(_SC_OPEN_MAX); while (--i > 2) close(i); /* close any file desc left open by parent */ /* find out the name of this machine (hostname) */ EMsg[0] = '\0'; if ((gethostname(server_host, PBS_MAXHOSTNAME) == -1) || (get_fullhostname(server_host, server_host, PBS_MAXHOSTNAME, EMsg) == -1)) { snprintf(tmpLine, sizeof(tmpLine), "unable to determine local server hostname %c %s", EMsg[0] ? '-' : ' ', EMsg); log_err(-1, "pbsd_main", tmpLine); exit(1); /* FAILURE - shutdown */ } snprintf(server_name, PBS_MAXHOSTNAME, "%s", server_host); /* by default server = host */ pbs_server_addr = get_hostaddr(&local_errno, server_host); pbs_mom_addr = pbs_server_addr; /* assume on same host */ pbs_scheduler_addr = pbs_server_addr; /* assume on same host */ get_port_from_server_name_file(&server_name_file_port); if (server_name_file_port != 0) pbs_server_port_dis = server_name_file_port; strcpy(pbs_server_name, server_name); /* The following port numbers might have been initialized in set_globals_from_environment() above. */ if (pbs_server_port_dis <= 0) pbs_server_port_dis = get_svrport((char *)PBS_BATCH_SERVICE_NAME, (char *)"tcp", PBS_BATCH_SERVICE_PORT); if (pbs_scheduler_port <= 0) pbs_scheduler_port = get_svrport((char *)PBS_SCHEDULER_SERVICE_NAME, (char *)"tcp", PBS_SCHEDULER_SERVICE_PORT); if (pbs_mom_port <= 0) pbs_mom_port = get_svrport((char *)PBS_MOM_SERVICE_NAME, (char *)"tcp", PBS_MOM_SERVICE_PORT); if (pbs_rm_port <= 0) pbs_rm_port = get_svrport((char *)PBS_MANAGER_SERVICE_NAME, (char *)"tcp", PBS_MANAGER_SERVICE_PORT); parse_command_line(argc, argv); /* if we are not running with real and effective uid of 0, forget it */ if (IamRoot() == 0) { return(1); } if ((check_network_port(pbs_server_port_dis) != PBSE_NONE) && (!high_availability_mode)) { perror("pbs_server port already bound"); exit(3); } /* With multi-threaded TORQUE we need to ask confirmation for RECOV_CREATE and RECOV_COLD before we daemonize the server */ if (server_init_type == RECOV_CREATE || server_init_type == RECOV_COLD) { if (ForceCreation == FALSE) { need_y_response(server_init_type); /* If we made it to here the response from user was yes. Otherwise we abort in need_y_response */ } } /* * Read in server attributes so they are available to be used * Attributes will not be read in on a pbs_server -t create */ if (recov_svr_attr(server_init_type) == -1) { fprintf(stderr,"%s: failed to get server attributes\n", ProgName); return(1); } /* * make sure no other server is running with this home directory. * If server lockfile pbs_attribute has been set use it. * If not use default location for it */ pthread_mutex_lock(server.sv_attr_mutex); if ((server.sv_attr[SRV_ATR_lockfile].at_flags & ATR_VFLAG_SET) && (server.sv_attr[SRV_ATR_lockfile].at_val.at_str)) { char *LockfilePtr = server.sv_attr[SRV_ATR_lockfile].at_val.at_str; /* check if an absolute path is specified or not */ if (LockfilePtr[0] == '/') { snprintf(lockfile,sizeof(lockfile),"%s",LockfilePtr); } else { snprintf(lockfile,sizeof(lockfile),"%s/%s/%s", path_home, PBS_SVR_PRIVATE, LockfilePtr); } } else { snprintf(lockfile,sizeof(lockfile),"%s/%s/server.lock", path_home, PBS_SVR_PRIVATE); } /* HA EVENTS MUST HAPPEN HERE */ snprintf(HALockFile,MAXPATHLEN,"%s", lockfile); HALockCheckTime = server.sv_attr[SRV_ATR_LockfileCheckTime].at_val.at_long; HALockUpdateTime = server.sv_attr[SRV_ATR_LockfileUpdateTime].at_val.at_long; pthread_mutex_unlock(server.sv_attr_mutex); /* apply HA defaults */ if (HALockCheckTime == 0) HALockCheckTime = PBS_LOCKFILE_CHECK_TIME; if (HALockUpdateTime == 0) HALockUpdateTime = PBS_LOCKFILE_UPDATE_TIME; if ((pc = getenv("PBSDEBUG")) != NULL) { DEBUGMODE = 1; TDoBackground = 0; } #ifdef DISABLE_DAEMONS TDoBackground = 0; #endif /* handle running in the background or not if we're debugging */ if (high_availability_mode) { if (daemonize_server(TDoBackground,&sid) == FAILURE) { exit(2); } } if (high_availability_mode) { lock_out_ha(); } else { if ((lockfds = open(lockfile,O_CREAT|O_TRUNC|O_WRONLY,0600)) < 0) { sprintf(log_buf,"%s: unable to open lock file '%s'", msg_daemonname, lockfile); fprintf(stderr,"%s\n",log_buf); log_err(errno,msg_daemonname,log_buf); exit(2); } } /* handle running in the background or not if we're debugging */ if (!high_availability_mode) { if (daemonize_server(TDoBackground,&sid) == FAILURE) { exit(2); } } sprintf(log_buf, "%ld\n", (long)sid); if (!high_availability_mode) { if (write_ac_socket(lockfds, log_buf, strlen(log_buf)) != (ssize_t)strlen(log_buf)) { log_err(errno, msg_daemonname, (char *)"failed to write pid to lockfile"); exit(-1); } } /* * Open the log file so we can start recording events * * set log_event_mask to point to the log_event pbs_attribute value so * it controls which events are logged. */ pthread_mutex_lock(server.sv_attr_mutex); log_event_mask = &server.sv_attr[SRV_ATR_log_events].at_val.at_long; pthread_mutex_unlock(server.sv_attr_mutex); sprintf(path_log, "%s/%s", path_home, PBS_LOGFILES); pthread_mutex_lock(&log_mutex); log_open(log_file, path_log); pthread_mutex_unlock(&log_mutex); sprintf(log_buf, msg_startup1, server_name, server_init_type); log_event( PBSEVENT_SYSTEM | PBSEVENT_ADMIN | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buf); set_localhost_name(server_localhost, localhost_len); /* initialize the server objects and perform specified recovery */ /* will be left in the server's private directory */ /* NOTE: env cleared in pbsd_init() */ if (pbsd_init(server_init_type) != PBSE_NONE) { log_err(-1, msg_daemonname, (char *)"pbsd_init failed"); exit(3); } /* initialize the network interface */ sprintf(log_buf, "Using ports Server:%d Scheduler:%d MOM:%d (server: '%s')", pbs_server_port_dis, pbs_scheduler_port, pbs_mom_port, server_host); log_event( PBSEVENT_SYSTEM | PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buf); if (init_network(0, start_process_pbs_server_port) != 0) { perror("pbs_server: unix domain socket"); log_err(-1, msg_daemonname, (char *)"init_network failed unix domain socket"); exit(3); } /* poll_job_task uses a mutex to protect a counter that prevents the number of poll job tasks from consuming all available threads */ poll_job_task_mutex = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t)); if (poll_job_task_mutex == NULL) { perror("pbs_server: failed to initialize poll_job_task_mutex"); log_err(-1, msg_daemonname, (char *)"pbs_server: failed to initialize poll_job_task_mutex"); exit(3); } pthread_mutex_init(poll_job_task_mutex, NULL); max_poll_job_tasks = (int)(request_pool->tp_max_threads * 0.7) - 5; if (max_poll_job_tasks <= 0) max_poll_job_tasks = 1; #if (PLOCK_DAEMONS & 1) plock(PROCLOCK); #endif /*==========*/ main_loop(); /*==========*/ shutdown_ack(); net_close(-1); /* close all network connections */ #ifdef ENABLE_UNIX_SOCKETS unlink(TSOCK_PATH); #endif /* END ENABLE_UNIX_SOCKETS */ log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, msg_svrdown); acct_close(); pthread_mutex_lock(&log_mutex); log_close(1); pthread_mutex_unlock(&log_mutex); pthread_mutex_lock(&job_log_mutex); job_log_close(1); pthread_mutex_unlock(&job_log_mutex); exit(0); } /* END main() */ void check_job_log( struct work_task *ptask) /* I */ { long depth = 1; char log_buf[LOCAL_LOG_BUF_SIZE]; time_t time_now = time(NULL); long keep_days = 0; long max_size = 0; long roll_depth = -1; /* remove logs older than LogKeepDays */ get_svr_attr_l(SRV_ATR_JobLogKeepDays, &keep_days); get_svr_attr_l(SRV_ATR_JobLogFileMaxSize, &max_size); if (keep_days != 0) { snprintf(log_buf,sizeof(log_buf),"checking for old job logs in dir '%s' (older than %ld days)", path_svrlog, keep_days); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buf); if (log_remove_old(path_jobinfo_log, keep_days * SECS_PER_DAY) != 0) { log_err(-1,"check_job_log",(char *)"failure occurred when checking for old job logs"); } } if (max_size != 0) { if ((job_log_size() >= max_size) && (max_size > 0)) { get_svr_attr_l(SRV_ATR_JobLogFileRollDepth, &roll_depth); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, (char *)"Rolling job log file"); if (roll_depth != -1) { depth = roll_depth; } if ((depth >= INT_MAX) || (depth < 1)) { log_err(-1, "check_job_log", (char *)"job log roll cancelled, logfile depth is out of range"); } else { job_log_roll(depth); } } } free(ptask->wt_mutex); free(ptask); set_task(WORK_Timed, time_now + PBS_LOG_CHECK_RATE, check_job_log, (char *)NULL, FALSE); } /* END check_job_log */ void check_log( struct work_task *ptask) /* I */ { long keep_days =0; long max_size = 0; char log_buf[LOCAL_LOG_BUF_SIZE]; time_t time_now = time(NULL); char *version = NULL; /* remove logs older than LogKeepDays */ if (get_svr_attr_l(SRV_ATR_LogKeepDays, &keep_days) == PBSE_NONE) { snprintf(log_buf,sizeof(log_buf),"checking for old pbs_server logs in dir '%s' (older than %ld days)", path_svrlog, keep_days); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buf); if (log_remove_old(path_svrlog, keep_days * SECS_PER_DAY) != 0) { log_err(-1,"check_log",(char *)"failure occurred when checking for old pbs_server logs"); } } if (get_svr_attr_l(SRV_ATR_LogFileMaxSize, &max_size) == PBSE_NONE) { long roll_depth = 1; if ((log_size() >= max_size) && (max_size > 0)) { log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, (char *)"Rolling log file"); get_svr_attr_l(SRV_ATR_LogFileRollDepth, &roll_depth); if ((roll_depth >= INT_MAX) || (roll_depth < 1)) { log_err(-1, "check_log", (char *)"log roll cancelled, logfile depth is out of range"); } else { log_roll(roll_depth); } } } /* periodically record the version and loglevel */ get_svr_attr_str(SRV_ATR_version, &version); sprintf(log_buf, msg_info_server, version, LOGLEVEL); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buf); free(ptask->wt_mutex); free(ptask); set_task(WORK_Timed, time_now + PBS_LOG_CHECK_RATE, check_log, (char *)NULL, FALSE); return; } /* END check_log */ void check_acct_log( struct work_task *ptask) /* I */ { char log_buf[LOCAL_LOG_BUF_SIZE]; time_t time_now = time(NULL); long keep_days = 0; if ((get_svr_attr_l(SRV_ATR_AcctKeepDays, &keep_days) == PBSE_NONE) && (keep_days >= 0)) { sprintf(log_buf,"Checking accounting files - keep days = %ld", keep_days); log_event( PBSEVENT_SYSTEM | PBSEVENT_FORCE, PBS_EVENTCLASS_SERVER, msg_daemonname, log_buf); acct_cleanup(keep_days); } free(ptask->wt_mutex); free(ptask); set_task(WORK_Timed,time_now + PBS_ACCT_CHECK_RATE,check_acct_log,(char *)NULL,FALSE); return; } /* END check_acct_log */ /* * get_port - parse host:port for -M and -S and -l option * Returns into *port and *addr if and only if that part is specified * Both port and addr are returned in HOST byte order. * Function return: 0=ok, -1=error */ static int get_port( char *arg, /* "host", "port", ":port", or "host:port" */ unsigned int *port, /* RETURN: new port iff one given */ pbs_net_t *addr) /* RETURN: daemon's address iff host given */ { char *name; int local_errno = 0; if (*arg == ':') { ++arg; /* port only specified */ *port = (unsigned int)atoi(arg); } else { char *a; for (a = arg; *a && isdigit(*a); a++) {} if ((! *a) && ((*port = (unsigned int)atoi(arg)) > 0) && (*port <= 65535)) { /* port only specified */ } else { name = parse_servername(arg, port); if (name == NULL) { /* FAILURE */ return(-1); } *addr = get_hostaddr(&local_errno, name); free(name); } } if ((*port <= 0) || (*addr == 0)) { /* FAILURE */ return(-1); } return(0); } /* END get_port() */ /** * This function will extract the directory portion of * the given path and copy it into the Dir parameter * * @param FullPath (I) * @param Dir (O) * @param DirSize */ char *extract_dir( char *FullPath, char *Dir, int DirSize) { char *ptr; if ((FullPath == NULL) || (Dir == NULL)) { return(NULL); } Dir[0] = '\0'; snprintf(Dir,DirSize,"%s",FullPath); ptr = strrchr(Dir,'/'); if (ptr != NULL) { *ptr = '\0'; } return(Dir); } /* END extract_dir() */ /** * Checks to see if HALockFile is defined, exists, and can be properly * opened, etc. * * @return TRUE if HALockFile is valid for use */ int is_ha_lock_file_valid( char *lockfile) { char LockDir[MAX_PATH_LEN]; struct stat Stat; bool_t GoodPermissions = FALSE; char log_buf[LOCAL_LOG_BUF_SIZE]; if (HALockFile[0] == '\0') { return(FALSE); } extract_dir(HALockFile,LockDir,sizeof(LockDir)); if (stat(LockDir,&Stat) != 0) { char *tmpLine; /* stat failed */ tmpLine = strerror(errno); snprintf(log_buf, sizeof(log_buf), "could not stat the lockfile dir '%s': %s", LockDir, tmpLine); log_err(errno, __func__, log_buf); return(FALSE); } /* directory must be owned by the TORQUE user and must have * read/write/exec permissions */ if ((Stat.st_uid == getuid()) && (Stat.st_mode & (S_IRUSR | S_IWUSR | S_IXUSR))) { /* we can write to this directory */ GoodPermissions = TRUE; } else if ((Stat.st_gid == getgid()) && (Stat.st_mode & (S_IRGRP | S_IWGRP | S_IXGRP))) { GoodPermissions = TRUE; } else if (Stat.st_mode & (S_IROTH | S_IWOTH | S_IXOTH)) { GoodPermissions = TRUE; } if (GoodPermissions == FALSE) { log_err(-1, __func__, "could not obtain the needed permissions for the lock file"); } return(GoodPermissions); } /* END is_ha_lock_file_valid() */ /** * Try to release a lock on the given file * * @param LockFile (I) Name of the file to unlock * @param LockFD (I) file descriptor to unlock [modified] * * @return SUCCESS if the lock is released, FAILURE otherwise. */ int release_file_lock( char *Lockfile, int *LockFD) { struct flock flock; int fds; if ((Lockfile == NULL) || (LockFD == NULL)) { return(FAILURE); } if (ISEMPTYSTR(Lockfile)) { return(FAILURE); } if (*LockFD > 0) { fds = *LockFD; } else { fds = open(Lockfile,O_CREAT|O_TRUNC|O_WRONLY,0600); } if (fds < 0) { /* could not open lock file */ return(FAILURE); } flock.l_type = F_UNLCK; flock.l_whence = SEEK_SET; flock.l_start = 0; flock.l_len = 0; if (fcntl(fds,F_SETLK,&flock) != 0) { close(fds); return(FAILURE); } close(fds); *LockFD = 0; return(SUCCESS); } /* END release_file_lock() */ /** * Try to acquire a lock on the given file * * @param LockFile (I) the name of the file to lock. * @param LockFD (I/O) File descriptor for the lock file. * @param FileType (I) For logging (type of file lock) * * @return SUCCESS if the lock is acquired, FAILURE otherwise */ int acquire_file_lock( char *LockFile, int *LockFD, char *FileType) { struct flock flock; int fds; char log_buf[LOCAL_LOG_BUF_SIZE]; if ((LockFile == NULL) || (LockFD == NULL) || (FileType == NULL)) { return(FAILURE); } if (LockFile[0] == '\0') { sprintf(log_buf,"ALERT: empty %s lock filename\n", FileType); log_err(-1, __func__, log_buf); return(FAILURE); } fds = open(LockFile,O_CREAT|O_RDWR,0600); if (fds < 0) { /* could not open lock file */ sprintf(log_buf,"ALERT: could not open %s lock file '%s' (errno: %d:%s)\n", FileType, LockFile, errno, strerror(errno)); log_err(errno, __func__, log_buf); return(FAILURE); } flock.l_type = F_WRLCK; flock.l_whence = SEEK_SET; flock.l_start = 0; flock.l_len = 0; flock.l_pid = getpid(); if (fcntl(fds,F_SETLK,&flock) != 0) { close(fds); sprintf(log_buf,"ALERT could not create lock on file '%s' (errno: %d:%s)\n", LockFile, errno, strerror(errno)); log_err(errno, __func__, log_buf); return(FAILURE); } /* don't close file; closing would lose the lock */ *LockFD = fds; return(SUCCESS); } /* END acquire_file_lock() */ /** *"Touch" the lockfile so that if locks are failing * other processes can see we still have possession * of the file. Also, we need to check that we can still * acccess the file. * * @param Arg (I) * * @return FAILURE if we don't have possession of the lock file anymore */ void *update_ha_lock_thread( void *Arg) /* I */ { char EMsg[MAX_LINE]; int LocalErrno = 0; int rc = 0; struct stat statbuf; struct utimbuf timebuf; static long LastModifyTime = 0; char log_buf[LOCAL_LOG_BUF_SIZE]; if (ISEMPTYSTR(HALockFile)) { /* locking HA not enabled */ return(NULL); } EMsg[0] = '\0'; while (TRUE) { usleep(DEF_USPERSECOND * HALockUpdateTime); rc = 0; LocalErrno = 0; mutex_lock(&EUIDMutex); errno = 0; if (stat(HALockFile,&statbuf) == 0) { /* check to make sure that no other process has modified this file * since the last time we did */ if ((LastModifyTime > 0) && (LastModifyTime != statbuf.st_mtime)) { snprintf(EMsg,sizeof(EMsg),"update time changed unexpectedly"); rc = -1; } else { /* no one has touched this file since we last did--continue */ LastModifyTime = time(NULL); timebuf.actime = LastModifyTime; timebuf.modtime = LastModifyTime; errno = 0; rc = utime(HALockFile,&timebuf); LocalErrno = errno; } } else { LocalErrno = errno; rc = -1; snprintf(EMsg,sizeof(EMsg),"could not stat file"); } mutex_unlock(&EUIDMutex); /* NOTE: HALockFile is emptied out when we delete the file during shutdown */ if ((rc == -1) && !ISEMPTYSTR(HALockFile)) { char *ErrorString; /* error occurred--immediate shutdown needed */ if (LocalErrno != 0) { ErrorString = strerror(LocalErrno); sprintf(log_buf,"could not update HA lock file '%s' in heartbeat thread (%s - errno %d:%s)", HALockFile, EMsg, LocalErrno, ErrorString); log_err(LocalErrno, __func__, log_buf); } else { sprintf(log_buf,"could not update HA lock file '%s' in heartbeat thread (%s)", HALockFile, EMsg); log_err(-1, __func__, log_buf); } /* restart pbs_server */ svr_restart(); } } /* END while (TRUE) */ /* NOTREACHED */ return(NULL); } /* END update_ha_lock_thread() */ int start_update_ha_lock_thread() { pthread_t HALockThread; pthread_attr_t HALockThreadAttr; int rc; int fds; char smallBuf[MAX_LINE]; /* write the pid to the lockfile for correctness */ fds = open(HALockFile,O_TRUNC|O_WRONLY,0600); if (fds < 0) { log_err(-1, __func__, "Couldn't write the pid to the lockfile\n"); return(FAILURE); } snprintf(smallBuf,sizeof(smallBuf),"%ld\n",(long)sid); if (write_ac_socket(fds,smallBuf,strlen(smallBuf)) != (ssize_t)strlen(smallBuf)) { log_err(-1, __func__, "Couldn't write the pid to the lockfile\n"); close(fds); return(FAILURE); } /* we don't need an open handle on the lockfile, just correct update times */ close(fds); if ((rc = pthread_attr_init(&HALockThreadAttr)) != 0) { perror("pthread_attr_init failed. Could not start update ha lock thread"); log_err(-1, msg_daemonname,"pthread_attr_init failed. Could not start ha lock thread"); return FAILURE; } rc = pthread_create(&HALockThread,&HALockThreadAttr,update_ha_lock_thread,NULL); if (rc != 0) { /* error creating thread */ log_err(-1, __func__, "Could not create HA Lock Thread\n"); return(FAILURE); } log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, "HA Lock update thread is now created\n"); return(SUCCESS); } /* END start_update_ha_lock_thread() */ int mutex_lock( mutex_t *Mutex) /* I */ { if (pthread_mutex_lock(Mutex) != 0) { log_err(-1,"mutex_lock",(char *)"ALERT: cannot lock mutex!\n"); return(FAILURE); } return(SUCCESS); } /* END mutex_lock() */ int mutex_unlock( mutex_t *Mutex) /* I */ { if (pthread_mutex_unlock(Mutex) != 0) { log_err(-1,"mutex_unlock",(char *)"ALERT: cannot unlock mutex!\n"); return(FAILURE); } return(SUCCESS); } /* END mutex_unlock() */ /* * * lock_out_ha - lock out using moab style high availability * */ static void lock_out_ha() { bool_t UseFLock = TRUE; bool_t FilePossession = FALSE; bool_t FileIsMissing = FALSE; char MutexLockFile[MAX_NAME]; int MutexLockFD = -1; int NumChecks = 0; char log_buf[LOCAL_LOG_BUF_SIZE]; time_t time_now = time(NULL); struct stat StatBuf; snprintf(MutexLockFile,sizeof(MutexLockFile),"%s.mutex", HALockFile); while (!FilePossession) { if (NumChecks > 0) { usleep(DEF_USPERSECOND * HALockCheckTime); time_now = time(NULL); UseFLock = FALSE; if (MutexLockFD > 0) close(MutexLockFD); } NumChecks++; UseFLock = is_ha_lock_file_valid(HALockFile); if (UseFLock == TRUE) { /* try to get a filesystem lock on the "mutex" file */ while (acquire_file_lock(MutexLockFile,&MutexLockFD, (char *)"HA") == FAILURE) { sprintf(log_buf,"Could not acquire HA flock--trying again in 1 second\n"); usleep(DEF_USPERSECOND); } } /* check if file lock exists */ if (stat(HALockFile,&StatBuf) == 0) { /* file DOES exist--check time */ FileIsMissing = FALSE; if ((time_now - StatBuf.st_mtime) < HALockCheckTime) { /* someone else probably has the lock */ continue; } /* update the file to mark it as ours */ utime(HALockFile,NULL); FilePossession = TRUE; } else { /* file doesn't exist--wait required amount of time and check again */ if (FileIsMissing == FALSE) { FileIsMissing = TRUE; /* if we don't have a mutex to protect file creation * race conditions, we need to wait and check again: * otherwise we can safely create it immediately */ if (UseFLock == FALSE) continue; } /* this is not the first time the file has been missing--we are * probably safe to create it */ HALockFD = open(HALockFile,O_CREAT|O_EXCL|O_RDONLY,0600); if (HALockFD < 0) { sprintf(log_buf,"could not create HA lock file '%s'--errno %d:%s", HALockFile, errno, strerror(errno)); continue; } FilePossession = TRUE; } if (FilePossession == TRUE) { /* start heartbeat thread */ start_update_ha_lock_thread(); } if (UseFLock == TRUE) close(MutexLockFD); /* unlock file mutex */ } /* END while (!FilePossession) */ /* we have the file lock--go ahead and log this fact */ log_record( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, (char *)"high availability file lock obtained"); } /* END lock_out_ha() */ /** * daemonize_server() * figures out, based on the mode, whether or not to run in the background and does so * * @param DoBackground - (I) indicates whether or not we should run in the background * @param sid - (O) set to the correct pid * @return success unless we could not run in the background and we're supposed to */ static int daemonize_server( int DoBackground, /* I */ int *sid) /* O */ { int pid; FILE *dummyfile; if (!DoBackground) { /* handle foreground (i.e. debug mode) */ *sid = getpid(); setvbuf(stdout,NULL,_IOLBF,0); setvbuf(stderr,NULL,_IOLBF,0); return(SUCCESS); } /* run pbs_server in the background */ /* fork to disconnect from terminal */ if ((pid = fork()) == -1) { log_err(errno, __func__, "cannot fork into background"); return(FAILURE); } if (pid != 0) { /* exit if parent */ log_event( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, (char *)"INFO: parent is exiting"); exit(0); } /* NOTE: setsid() disconnects from controlling-terminal */ if ((*sid = setsid()) == -1) { log_err(errno, __func__, "Could not disconnect from controlling terminal"); return(FAILURE); } /* disconnect stdin,stdout,stderr */ fclose(stdin); fclose(stdout); fclose(stderr); dummyfile = fopen("/dev/null","r"); assert((dummyfile != 0) && (fileno(dummyfile) == 0)); dummyfile = fopen("/dev/null","w"); assert((dummyfile != 0) && (fileno(dummyfile) == 1)); dummyfile = fopen("/dev/null","w"); assert((dummyfile != 0) && (fileno(dummyfile) == 2)); if ((pid = fork()) == -1) { log_err(errno, __func__, "cannot fork into background"); return(FAILURE); } if (pid != 0) { /* exit if parent */ log_event( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, (char *)"INFO: parent is exiting"); exit(0); } /* update the sid (pid written to the lock file) so that * the correct pid is present */ *sid = getpid(); log_event( PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, (char *)"INFO: child process in background"); return(SUCCESS); } /* END daemonize_server() */ /** * * Restarts the pbs_server * */ int svr_restart() { int rc; char log_buf[LOCAL_LOG_BUF_SIZE]; /* shut down network connections */ net_close(-1); /* close all network connections */ sprintf(log_buf, "INFO: about to exec '%s'\n", ArgV[0]); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, log_buf); pthread_mutex_lock(&log_mutex); log_close(1); pthread_mutex_unlock(&log_mutex); if ((rc = execv(ArgV[0],ArgV)) == -1) { /* exec failed */ exit(-10); } /* NOT REACHED */ exit(0); return(SUCCESS); } /* END svr_restart() */ /** * * restores this pbs_attribute to its default where supported/possible */ void restore_attr_default( pbs_attribute *attr) /* I */ { int index; index = (int)(attr - server.sv_attr); attr->at_flags &= ~ATR_VFLAG_SET; switch (index) { case SRV_ATR_log_events: server.sv_attr[SRV_ATR_log_events].at_val.at_long = PBSEVENT_MASK; break; case SRV_ATR_tcp_timeout: server.sv_attr[SRV_ATR_tcp_timeout].at_val.at_long = PBS_TCPTIMEOUT; break; case SRV_ATR_JobStatRate: server.sv_attr[SRV_ATR_JobStatRate].at_val.at_long = PBS_RESTAT_JOB; break; case SRV_ATR_PollJobs: server.sv_attr[SRV_ATR_PollJobs].at_val.at_long = PBS_POLLJOBS; break; case SRV_ATR_LogLevel: server.sv_attr[SRV_ATR_LogLevel].at_val.at_long = 0; break; default: /* should never get here, but if we do then reset the flags so the user knows * that the value hasn't been cleared */ attr->at_flags |= ATR_VFLAG_SET; break; } } /* END restore_attr_default() */ int lock_sv_qs_mutex(pthread_mutex_t *sv_qs_mutex, const char *msg_string) { int rc; char log_buf[LOCAL_LOG_BUF_SIZE + 1]; if (LOGLEVEL >= 7) { snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "%s: locking sv_qs_mutex", msg_string); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, log_buf); } rc = pthread_mutex_lock(sv_qs_mutex); return(rc); } int unlock_sv_qs_mutex(pthread_mutex_t *sv_qs_mutex, const char *msg_string) { int rc; char log_buf[LOCAL_LOG_BUF_SIZE + 1]; if (LOGLEVEL >= 7) { snprintf(log_buf, LOCAL_LOG_BUF_SIZE, "%s: unlocking sv_qs_mutex", msg_string); log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, __func__, log_buf); } rc = pthread_mutex_unlock(sv_qs_mutex); return(rc); } /* END pbsd_main.c */