/*****************************************************************************\ * task_cray_aries.c - Library for task pre-launch and post_termination * functions on a Cray/Aries system ***************************************************************************** * Copyright (C) 2013 SchedMD LLC * Copyright 2013 Hewlett Packard Enterprise Development LP * * This file is part of Slurm, a resource management program. * For details, see . * Please also read the included file: DISCLAIMER. * * Slurm is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free * Software Foundation; either version 2 of the License, or (at your option) * any later version. * * In addition, as a special exception, the copyright holders give permission * to link the code of portions of this program with the OpenSSL library under * certain conditions as described in each individual source file, and * distribute linked combinations including the two. You must obey the GNU * General Public License in all respects for all of the code used other than * OpenSSL. If you modify file(s) with this exception, you may extend this * exception to your version of the file(s), but you are not obligated to do * so. If you do not wish to do so, delete this exception statement from your * version. If you delete this exception statement from all source files in * the program, then also delete it here. * * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License along * with Slurm; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. \*****************************************************************************/ #include "config.h" #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #ifdef HAVE_NUMA # include #endif #include "slurm/slurm_errno.h" #include "src/common/slurm_xlator.h" #include "src/slurmd/slurmstepd/slurmstepd_job.h" #ifdef HAVE_NATIVE_CRAY # include "alpscomm_cn.h" #endif /* These are defined here so when we link with something other than * the slurmctld we will have these symbols defined. They will get * overwritten when linking with the slurmctld. */ #if defined (__APPLE__) extern slurm_conf_t slurm_conf __attribute__((weak_import)); #else slurm_conf_t slurm_conf; #endif /* * These variables are required by the generic plugin interface. If they * are not found in the plugin, the plugin loader will ignore it. * * plugin_name - a string giving a human-readable description of the * plugin. There is no maximum length, but the symbol must refer to * a valid string. * * plugin_type - a string suggesting the type of the plugin or its * applicability to a particular form of data or method of data handling. * If the low-level plugin API is used, the contents of this string are * unimportant and may be anything. Slurm uses the higher-level plugin * interface which requires this string to be of the form * * / * * where is a description of the intended application of * the plugin (e.g., "task" for task control) and is a description * of how this plugin satisfies that application. Slurm will only load * a task plugin if the plugin_type string has a prefix of "task/". * * plugin_version - an unsigned 32-bit integer containing the Slurm version * (major.minor.micro combined into a single number). */ const char plugin_name[] = "task Cray/Aries plugin"; const char plugin_type[] = "task/cray_aries"; const uint32_t plugin_version = SLURM_VERSION_NUMBER; #ifdef HAVE_NATIVE_CRAY #ifdef HAVE_NUMA // TODO: Remove this prototype once the prototype appears in numa.h. unsigned int numa_bitmask_weight(const struct bitmask *bmp); #endif static void _alpsc_debug(const char *file, int line, const char *func, int rc, int expected_rc, const char *alpsc_func, char *err_msg); static int _make_status_file(stepd_step_rec_t *job); static int _check_status_file(stepd_step_rec_t *job, stepd_step_task_info_t *task); static int _get_numa_nodes(char *path, int *cnt, int **numa_array); static int _get_cpu_masks(int num_numa_nodes, int32_t *numa_array, cpu_set_t **cpuMasks); static int _update_num_steps(int val); static int _step_prologue(void); static int _step_epilogue(void); static int track_status = 1; // A directory on the compute node where temporary files will be kept #define TASK_CRAY_RUN_DIR "/var/run/task_cray" // The spool directory used by libalpslli // If it doesn't exist, skip exit status recording #define LLI_SPOOL_DIR "/var/opt/cray/alps/spool" // Filename to write status information to // This file consists of job->node_tasks + 1 bytes. Each byte will // be either 1 or 0, indicating that that particular event has occured. // The first byte indicates the starting LLI message, and the next bytes // indicate the exiting LLI messages for each task #define LLI_STATUS_FILE LLI_SPOOL_DIR"/status%"PRIu64 // Size of buffer which is guaranteed to hold an LLI_STATUS_FILE #define LLI_STATUS_FILE_BUF_SIZE 128 // Offset within status file to write to, different for each task #define LLI_STATUS_OFFS_ENV "ALPS_LLI_STATUS_OFFSET" // Application rank environment variable for PMI #define ALPS_APP_PE_ENV "ALPS_APP_PE" // Environment variable telling PMI not to fork #define PMI_NO_FORK_ENV "PMI_NO_FORK" // Environment variable providing the apid using a common name #define ALPS_APP_ID_ENV "ALPS_APP_ID" // File containing the number of currently running Slurm steps #define NUM_STEPS_FILE TASK_CRAY_RUN_DIR"/slurm_num_steps" #define _ALPSC_DEBUG(f) _alpsc_debug(THIS_FILE, __LINE__, __func__, \ rc, 1, f, err_msg); #define CRAY_ERR(fmt, ...) error("(%s: %d: %s) "fmt, THIS_FILE, __LINE__, \ __func__, ##__VA_ARGS__); #endif /* * init() is called when the plugin is loaded, before any other functions * are called. Put global initialization here. */ extern int init (void) { debug("%s loaded.", plugin_name); char *task_cgroup = xstrstr(slurm_conf.task_plugin, "cgroup"); char *task_cray = xstrstr(slurm_conf.task_plugin, "cray"); if (!task_cgroup || !task_cray || (task_cgroup < task_cray)) { fatal("task/cgroup must be used with, and listed after, " "task/cray in TaskPlugin"); } #ifdef HAVE_NATIVE_CRAY int rc; struct stat st; // Create the run directory errno = 0; rc = mkdir(TASK_CRAY_RUN_DIR, 0755); if ((rc == -1) && (errno != EEXIST)) { CRAY_ERR("Couldn't create %s: %m", TASK_CRAY_RUN_DIR); return SLURM_ERROR; } // Determine whether to track app status with LLI rc = stat(LLI_SPOOL_DIR, &st); if (rc == -1) { debug("stat %s failed, disabling exit status tracking: %m", LLI_SPOOL_DIR); track_status = 0; } else { track_status = 1; } #endif return SLURM_SUCCESS; } /* * fini() is called when the plugin is removed. Clear any allocated * storage here. */ extern int fini (void) { return SLURM_SUCCESS; } /* * task_p_slurmd_batch_request() */ extern int task_p_slurmd_batch_request (batch_job_launch_msg_t *req) { debug("%s: %u", __func__, req->job_id); return SLURM_SUCCESS; } /* * task_p_slurmd_launch_request() */ extern int task_p_slurmd_launch_request (launch_tasks_request_msg_t *req, uint32_t node_id) { return SLURM_SUCCESS; } /* * task_p_slurmd_suspend_job() */ extern int task_p_slurmd_suspend_job (uint32_t job_id) { DEF_TIMERS; START_TIMER; debug("task_p_slurmd_suspend_job: %u", job_id); #ifdef HAVE_NATIVE_CRAY _step_epilogue(); #endif END_TIMER; if (slurm_conf.debug_flags & DEBUG_FLAG_TIME_CRAY) INFO_LINE("call took: %s", TIME_STR); return SLURM_SUCCESS; } /* * task_p_slurmd_resume_job() */ extern int task_p_slurmd_resume_job (uint32_t job_id) { DEF_TIMERS; START_TIMER; debug("task_p_slurmd_resume_job: %u", job_id); #ifdef HAVE_NATIVE_CRAY _step_prologue(); #endif END_TIMER; if (slurm_conf.debug_flags & DEBUG_FLAG_TIME_CRAY) INFO_LINE("call took: %s", TIME_STR); return SLURM_SUCCESS; } /* * task_p_pre_setuid() is called before setting the UID for the * user to launch his jobs. Use this to create the CPUSET directory * and set the owner appropriately. */ extern int task_p_pre_setuid (stepd_step_rec_t *job) { DEF_TIMERS; START_TIMER; debug("%s: %ps", __func__, &job->step_id); #ifdef HAVE_NATIVE_CRAY if (!job->batch) _step_prologue(); #endif END_TIMER; if (slurm_conf.debug_flags & DEBUG_FLAG_TIME_CRAY) INFO_LINE("call took: %s", TIME_STR); return SLURM_SUCCESS; } /* * task_p_pre_launch() is called prior to exec of application task. * It is followed by TaskProlog program (from slurm.conf) and * --task-prolog (from srun command line). */ extern int task_p_pre_launch (stepd_step_rec_t *job) { #ifdef HAVE_NATIVE_CRAY int rc; uint64_t apid; uint32_t jobid; uint32_t taskid; uint32_t offset = 0; DEF_TIMERS; START_TIMER; if (job->het_job_id && (job->het_job_id != NO_VAL)) jobid = job->het_job_id; else jobid = job->step_id.job_id; if (job->het_job_task_offset != NO_VAL) offset = job->het_job_task_offset; taskid = offset + job->task[job->envtp->localid]->gtid; apid = SLURM_ID_HASH(jobid, job->step_id.step_id); debug2("%s: %ps, apid %"PRIu64", task %u", __func__, &job->step_id, apid, taskid); /* * Send the rank to the application's PMI layer via an environment * variable. */ rc = env_array_overwrite_fmt(&job->env, ALPS_APP_PE_ENV, "%u", taskid); if (rc == 0) { CRAY_ERR("Failed to set env variable %s", ALPS_APP_PE_ENV); return SLURM_ERROR; } /* * Set the PMI_NO_FORK environment variable. */ rc = env_array_overwrite(&job->env, PMI_NO_FORK_ENV, "1"); if (rc == 0) { CRAY_ERR("Failed to set env variable %s", PMI_NO_FORK_ENV); return SLURM_ERROR; } /* * Notify the task which offset to use */ rc = env_array_overwrite_fmt(&job->env, LLI_STATUS_OFFS_ENV, "%d", job->envtp->localid + 1); if (rc == 0) { CRAY_ERR("Failed to set env variable %s", LLI_STATUS_OFFS_ENV); return SLURM_ERROR; } /* * Set the ALPS_APP_ID environment variable for use by * Cray tools. */ rc = env_array_overwrite_fmt(&job->env, ALPS_APP_ID_ENV, "%"PRIu64, apid); if (rc == 0) { CRAY_ERR("Failed to set env variable %s", ALPS_APP_ID_ENV); } END_TIMER; if (slurm_conf.debug_flags & DEBUG_FLAG_TIME_CRAY) INFO_LINE("call took: %s", TIME_STR); #endif return SLURM_SUCCESS; } /* * task_p_pre_launch_priv() is called prior to exec of application task. * in privileged mode, just after slurm_spank_task_init_privileged */ extern int task_p_pre_launch_priv(stepd_step_rec_t *job, pid_t pid) { int rc = SLURM_SUCCESS; DEF_TIMERS; START_TIMER; #ifdef HAVE_NATIVE_CRAY debug("%s: %ps", __func__, &job->step_id); if (track_status) { rc = _make_status_file(job); } #endif END_TIMER; if (slurm_conf.debug_flags & DEBUG_FLAG_TIME_CRAY) INFO_LINE("call took: %s", TIME_STR); return rc; } /* * task_term() is called after termination of application task. * It is preceded by --task-epilog (from srun command line) * followed by TaskEpilog program (from slurm.conf). */ extern int task_p_post_term (stepd_step_rec_t *job, stepd_step_task_info_t *task) { int rc = SLURM_SUCCESS; DEF_TIMERS; START_TIMER; #ifdef HAVE_NATIVE_CRAY debug("%s: %ps, task %d", __func__, &job->step_id, task->id); if (track_status) { rc = _check_status_file(job, task); } #endif END_TIMER; if (slurm_conf.debug_flags & DEBUG_FLAG_TIME_CRAY) INFO_LINE("call took: %s", TIME_STR); return rc; } /* * task_p_post_step() is called after termination of the step * (all the tasks) */ extern int task_p_post_step (stepd_step_rec_t *job) { #ifdef HAVE_NATIVE_CRAY char llifile[LLI_STATUS_FILE_BUF_SIZE]; int rc, cnt; char *err_msg = NULL, path[PATH_MAX]; int32_t *numa_nodes; cpu_set_t *cpuMasks; uint64_t apid; uint32_t jobid; DEF_TIMERS; START_TIMER; if (job->het_job_id && (job->het_job_id != NO_VAL)) jobid = job->het_job_id; else jobid = job->step_id.job_id; if (track_status) { apid = SLURM_ID_HASH(jobid, job->step_id.step_id); // Get the lli file name snprintf(llifile, sizeof(llifile), LLI_STATUS_FILE, apid); // Unlink the file errno = 0; rc = unlink(llifile); if ((rc == -1) && (errno != ENOENT)) { CRAY_ERR("unlink(%s) failed: %m", llifile); } else if (rc == 0) { debug("Unlinked %s", llifile); } // Unlink the backwards compatibility symlink if (apid != SLURM_ID_HASH_LEGACY(apid)) { snprintf(llifile, sizeof(llifile), LLI_STATUS_FILE, SLURM_ID_HASH_LEGACY(apid)); rc = unlink(llifile); if ((rc == -1) && (errno != ENOENT)) { CRAY_ERR("unlink(%s) failed: %m", llifile); } else if (rc == 0) { debug("Unlinked %s", llifile); } } } /* * Compact Memory * * Determine which NUMA nodes and CPUS an application is using. It will * be used to compact the memory. * * You'll find the information in the following location. * For a normal job step: * /dev/cpuset/slurm/uid_/job_/step_/ * * For a batch job step (only on the head node and only for batch jobs): * /dev/cpuset/slurm/uid_/job_/step_batch/ * * NUMA node: mems (or cpuset.mems) */ if (job->step_id.step_id == SLURM_BATCH_SCRIPT) { // Batch Job Step rc = snprintf(path, sizeof(path), "/dev/cpuset/slurm/uid_%d/job_%" PRIu32 "/step_batch", job->uid, jobid); if (rc < 0) { CRAY_ERR("snprintf failed. Return code: %d", rc); return SLURM_ERROR; } } else if (job->step_id.step_id == SLURM_EXTERN_CONT) { // Container for PAM to use for externally launched processes rc = snprintf(path, sizeof(path), "/dev/cpuset/slurm/uid_%d/job_%" PRIu32 "/step_extern", job->uid, jobid); if (rc < 0) { CRAY_ERR("snprintf failed. Return code: %d", rc); return SLURM_ERROR; } } else { // Normal Job Step /* Only run epilogue on non-batch steps */ _step_epilogue(); rc = snprintf(path, sizeof(path), "/dev/cpuset/slurm/uid_%d/job_%" PRIu32 "/step_%" PRIu32, job->uid, jobid, job->step_id.step_id); if (rc < 0) { CRAY_ERR("snprintf failed. Return code: %d", rc); return SLURM_ERROR; } } rc = _get_numa_nodes(path, &cnt, &numa_nodes); if (rc < 0) { /* Failure common due to race condition in releasing cgroups */ debug("%s: _get_numa_nodes failed. Return code: %d", __func__, rc); return SLURM_ERROR; } rc = _get_cpu_masks(cnt, numa_nodes, &cpuMasks); if (rc < 0) { CRAY_ERR("_get_cpu_masks failed. Return code: %d", rc); xfree(numa_nodes); return SLURM_ERROR; } /* * Compact Memory * The last argument which is a path to the cpuset directory has to be * NULL because the CPUSET directory has already been cleaned up. */ rc = alpsc_compact_mem(&err_msg, cnt, numa_nodes, cpuMasks, NULL); _ALPSC_DEBUG("alpsc_compact_mem"); xfree(numa_nodes); xfree(cpuMasks); if (rc != 1) return SLURM_ERROR; END_TIMER; if (slurm_conf.debug_flags & DEBUG_FLAG_TIME_CRAY) INFO_LINE("call took: %s", TIME_STR); #endif return SLURM_SUCCESS; } #ifdef HAVE_NATIVE_CRAY /* * Print the results of an alpscomm call */ static void _alpsc_debug(const char *file, int line, const char *func, int rc, int expected_rc, const char *alpsc_func, char *err_msg) { if (rc != expected_rc) { error("(%s: %d: %s) %s failed: %s", file, line, func, alpsc_func, err_msg ? err_msg : "No error message present"); } else if (err_msg) { info("%s: %s", alpsc_func, err_msg); } else log_flag(TASK, "Called %s", alpsc_func); free(err_msg); } /* * If it wasn't created already, make the LLI_STATUS_FILE with given owner * and group, permissions 644, with given size */ static int _make_status_file(stepd_step_rec_t *job) { char llifile[LLI_STATUS_FILE_BUF_SIZE]; char oldllifile[LLI_STATUS_FILE_BUF_SIZE]; int rv, fd; uint32_t jobid; uint64_t apid; if (job->het_job_id && (job->het_job_id != NO_VAL)) jobid = job->het_job_id; else jobid = job->step_id.job_id; apid = SLURM_ID_HASH(jobid, job->step_id.step_id); // Get the lli file name snprintf(llifile, sizeof(llifile), LLI_STATUS_FILE, apid); // Make the file errno = 0; fd = open(llifile, O_CREAT|O_EXCL|O_WRONLY, 0644); if (fd == -1) { // Another task_p_pre_launch_priv already created it, ignore if (errno == EEXIST) { return SLURM_SUCCESS; } CRAY_ERR("creat(%s) failed: %m", llifile); return SLURM_ERROR; } // Resize it rv = ftruncate(fd, job->node_tasks + 1); if (rv == -1) { CRAY_ERR("ftruncate(%s) failed: %m", llifile); TEMP_FAILURE_RETRY(close(fd)); return SLURM_ERROR; } // Change owner/group so app can write to it rv = fchown(fd, job->uid, job->gid); if (rv == -1) { CRAY_ERR("chown(%s) failed: %m", llifile); TEMP_FAILURE_RETRY(close(fd)); return SLURM_ERROR; } debug("Created file %s", llifile); TEMP_FAILURE_RETRY(close(fd)); // Create a backwards compatibility link if (apid != SLURM_ID_HASH_LEGACY(apid)) { snprintf(oldllifile, sizeof(oldllifile), LLI_STATUS_FILE, SLURM_ID_HASH_LEGACY(apid)); rv = symlink(llifile, oldllifile); if (rv == -1) { CRAY_ERR("symlink(%s, %s) failed: %m", llifile, oldllifile); return SLURM_ERROR; } } return SLURM_SUCCESS; } /* * Check the status file for the exit of the given local task id * and terminate the job step if an improper exit is found */ static int _check_status_file(stepd_step_rec_t *job, stepd_step_task_info_t *task) { char llifile[LLI_STATUS_FILE_BUF_SIZE]; char status; int rv, fd; uint32_t jobid; uint32_t taskid; uint32_t offset = 0; // We only need to special case termination with exit(0) // srun already handles abnormal exit conditions fine if (!WIFEXITED(task->estatus) || (WEXITSTATUS(task->estatus) != 0)) return SLURM_SUCCESS; if (job->het_job_id && (job->het_job_id != NO_VAL)) jobid = job->het_job_id; else jobid = job->step_id.job_id; if (job->het_job_task_offset != NO_VAL) offset = job->het_job_task_offset; taskid = offset + task->gtid; // Get the lli file name snprintf(llifile, sizeof(llifile), LLI_STATUS_FILE, SLURM_ID_HASH(jobid, job->step_id.step_id)); // Open the lli file. fd = open(llifile, O_RDONLY); if (fd == -1) { // There's a timing issue for large jobs; this file could // already be cleaned up by the time we get here. // However, this is during a normal cleanup so no big deal. debug("open(%s) failed: %m", llifile); return SLURM_SUCCESS; } // Read the first byte (indicates starting) rv = read(fd, &status, sizeof(status)); if (rv == -1) { CRAY_ERR("read failed: %m"); return SLURM_ERROR; } // If the first byte is 0, we either aren't an MPI app or // it didn't make it past pmi_init, in any case, return success if (status == 0) { TEMP_FAILURE_RETRY(close(fd)); return SLURM_SUCCESS; } // Seek to the correct offset rv = lseek(fd, task->id + 1, SEEK_SET); if (rv == -1) { CRAY_ERR("lseek failed: %m"); TEMP_FAILURE_RETRY(close(fd)); return SLURM_ERROR; } // Read the exiting byte rv = read(fd, &status, sizeof(status)); TEMP_FAILURE_RETRY(close(fd)); if (rv == -1) { CRAY_ERR("read failed: %m"); return SLURM_SUCCESS; } // Check the result if (status == 0) { if (task->killed_by_cmd) { // We've been killed by request. User already knows return SLURM_SUCCESS; } verbose("%ps task %u exited without calling PMI_Finalize()", &job->step_id, taskid); } return SLURM_SUCCESS; } /* * Function: _get_numa_nodes * Description: * Returns a count of the NUMA nodes that the application is running on. * * Returns an array of NUMA nodes that the application is running on. * * * IN char* path -- The path to the directory containing the files containing * information about NUMA nodes. * * OUT *cnt -- The number of NUMA nodes in the array * OUT **numa_array -- An integer array containing the NUMA nodes. * This array must be xfreed by the caller. * * RETURN * 0 on success and -1 on failure. */ static int _get_numa_nodes(char *path, int *cnt, int32_t **numa_array) { bool cpuset_prefix_set = true; char *cpuset_prefix = "cpuset."; struct bitmask *bm; int i, index, rc = 0; int lsz; size_t sz; char buffer[PATH_MAX]; FILE *f = NULL; char *lin = NULL; again: rc = snprintf(buffer, sizeof(buffer), "%s/%s%s", path, cpuset_prefix, "mems"); if (rc < 0) CRAY_ERR("snprintf failed. Return code: %d", rc); f = fopen(buffer, "r"); if (f == NULL) { if (cpuset_prefix_set) { cpuset_prefix_set = false; cpuset_prefix = ""; goto again; } /* Failure common due to race condition in releasing cgroups */ debug("%s: Failed to open file %s: %m", __func__, buffer); return SLURM_ERROR; } lsz = getline(&lin, &sz, f); if (lsz > 0) { if (lin[strlen(lin) - 1] == '\n') { lin[strlen(lin) - 1] = '\0'; } bm = numa_parse_nodestring(lin); if (bm == NULL) { CRAY_ERR("Error numa_parse_nodestring:" " Invalid node string: %s", lin); free(lin); return SLURM_ERROR; } } else { debug("%s: Reading %s failed", __func__, buffer); return SLURM_ERROR; } free(lin); *cnt = numa_bitmask_weight(bm); if (*cnt == 0) { CRAY_ERR("No NUMA Nodes found"); return -1; } log_flag(TASK, "Bitmask %#lx size: %lu sizeof(*(bm->maskp)): %zu weight: %u", *(bm->maskp), bm->size, sizeof(*(bm->maskp)), *cnt); *numa_array = xmalloc(*cnt * sizeof(int32_t)); index = 0; for (i = 0; i < bm->size; i++) { if (*(bm->maskp) & ((long unsigned) 1 << i)) { log_flag(TASK, "(%s: %d: %s) NUMA Node %d is present", THIS_FILE, __LINE__, __func__, i); (*numa_array)[index++] = i; } } numa_free_nodemask(bm); return 0; } /* * Function: _get_cpu_masks * Description: * * Returns cpuMasks which contains an array of a cpu_set_t cpumask one per * NUMA node id within the numaNodes array; the cpumask identifies * which CPUs are within that NUMA node. * * It does the following. * 0. Uses the cpuset.mems file to determine the total number of Numa Nodes * and their individual index numbers. * 1. Uses numa_node_to_cpus to get the bitmask of CPUs for each Numa Node. * 2. Obtains the bitmask of CPUs for the cpuset from the cpuset.cpus file. * 3. Bitwise-ANDs the bitmasks from steps #1 and #2 to obtain the CPUs * allowed per Numa Node bitmask. * * IN int num_numa_nodes -- Number of NUMA nodes in numa_array * IN int32_t *numa_array -- Array of NUMA nodes length num_numa_nodes * OUT cpu_set_t **cpuMasks -- An array of cpu_set_t's one per NUMA node * The caller must free *cpuMasks via xfree(). * RETURN * 0 on success and -1 on failure. */ #define NUM_INTS_TO_HOLD_ALL_CPUS \ (numa_all_cpus_ptr->size / (sizeof(unsigned long) * 8)) static int _get_cpu_masks(int num_numa_nodes, int32_t *numa_array, cpu_set_t **cpuMasks) { struct bitmask **remaining_numa_node_cpus = NULL, *collective; unsigned long **numa_node_cpus = NULL; int i, j, at_least_one_cpu = 0, rc = 0; cpu_set_t *cpusetptr; char *bitmask_str = NULL; if (numa_available()) { CRAY_ERR("Libnuma not available"); return -1; } /* * numa_node_cpus: The CPUs available to the NUMA node. * numa_all_cpus_ptr: all CPUs on which the calling task may execute. * remaining_numa_node_cpus: Bitwise-AND of the above two to get all of * the CPUs that the task can run on in this * NUMA node. * collective: Collects all of the CPUs as a precaution. */ remaining_numa_node_cpus = xmalloc(num_numa_nodes * sizeof(struct bitmask *)); collective = numa_allocate_cpumask(); numa_node_cpus = xmalloc(num_numa_nodes * sizeof(unsigned long*)); for (i = 0; i < num_numa_nodes; i++) { remaining_numa_node_cpus[i] = numa_allocate_cpumask(); numa_node_cpus[i] = xmalloc(sizeof(unsigned long) * NUM_INTS_TO_HOLD_ALL_CPUS); rc = numa_node_to_cpus(numa_array[i], numa_node_cpus[i], NUM_INTS_TO_HOLD_ALL_CPUS); if (rc) { CRAY_ERR("numa_node_to_cpus failed: Return code %d", rc); } for (j = 0; j < NUM_INTS_TO_HOLD_ALL_CPUS; j++) { (remaining_numa_node_cpus[i]->maskp[j]) = (numa_node_cpus[i][j]) & (numa_all_cpus_ptr->maskp[j]); collective->maskp[j] |= (remaining_numa_node_cpus[i]->maskp[j]); } } /* * Ensure that we have not masked off all of the CPUs. * If we have, just re-enable them all. Better to clear them all than * none of them. */ for (j = 0; j < collective->size; j++) { if (numa_bitmask_isbitset(collective, j)) { at_least_one_cpu = 1; } } if (!at_least_one_cpu) { for (i = 0; i < num_numa_nodes; i++) { for (j = 0; j < (remaining_numa_node_cpus[i]->size / (sizeof(unsigned long) * 8)); j++) { (remaining_numa_node_cpus[i]->maskp[j]) = (numa_all_cpus_ptr->maskp[j]); } } } if (slurm_conf.debug_flags & DEBUG_FLAG_TASK) { bitmask_str = NULL; for (i = 0; i < num_numa_nodes; i++) { for (j = 0; j < NUM_INTS_TO_HOLD_ALL_CPUS; j++) { xstrfmtcat(bitmask_str, "%6lx ", numa_node_cpus[i][j]); } } info("%sBitmask: Allowed CPUs for NUMA Node", bitmask_str); xfree(bitmask_str); bitmask_str = NULL; for (i = 0; i < num_numa_nodes; i++) { for (j = 0; j < NUM_INTS_TO_HOLD_ALL_CPUS; j++) { xstrfmtcat(bitmask_str, "%6lx ", numa_all_cpus_ptr->maskp[j]); } } info("%sBitmask: Allowed CPUs for cpuset", bitmask_str); xfree(bitmask_str); bitmask_str = NULL; for (i = 0; i < num_numa_nodes; i++) { for (j = 0; j < NUM_INTS_TO_HOLD_ALL_CPUS; j++) { xstrfmtcat(bitmask_str, "%6lx ", remaining_numa_node_cpus[i]-> maskp[j]); } } info("%sBitmask: Allowed CPUs between cpuset and NUMA Node", bitmask_str); xfree(bitmask_str); } // Convert bitmasks to cpu_set_t types cpusetptr = xmalloc(num_numa_nodes * sizeof(cpu_set_t)); for (i = 0; i < num_numa_nodes; i++) { CPU_ZERO(&cpusetptr[i]); for (j = 0; j < remaining_numa_node_cpus[i]->size; j++) { if (numa_bitmask_isbitset(remaining_numa_node_cpus[i], j)) { CPU_SET(j, &cpusetptr[i]); } } log_flag(TASK, "CPU_COUNT() of set: %d", CPU_COUNT(&cpusetptr[i])); } *cpuMasks = cpusetptr; // Freeing Everything numa_free_cpumask(collective); for (i = 0; i < num_numa_nodes; i++) { xfree(numa_node_cpus[i]); numa_free_cpumask(remaining_numa_node_cpus[i]); } xfree(numa_node_cpus); xfree(numa_node_cpus); xfree(remaining_numa_node_cpus); return 0; } /* * Update the number of running steps on the node * Set val to 1 to increment and -1 to decrement the value * Returns the new value, or -1 on error */ static int _update_num_steps(int val) { int rc, fd, num_steps = 0; ssize_t size; off_t offset; struct flock lock; // Sanity check the argument if (val != 1 && val != -1) { CRAY_ERR("invalid val %d", val); return -1; } // Open the file fd = open(NUM_STEPS_FILE, O_RDWR | O_CREAT, 0644); if (fd == -1) { CRAY_ERR("open failed: %m"); return -1; } // Exclusive lock on the first byte of the file // Automatically released when the file descriptor is closed lock.l_type = F_WRLCK; lock.l_whence = SEEK_SET; lock.l_start = 0; lock.l_len = sizeof(int); rc = fcntl(fd, F_SETLKW, &lock); if (rc == -1) { CRAY_ERR("fcntl failed: %m"); TEMP_FAILURE_RETRY(close(fd)); return -1; } // Read the value size = read(fd, &num_steps, sizeof(int)); if (size == -1) { CRAY_ERR("read failed: %m"); TEMP_FAILURE_RETRY(close(fd)); return -1; } else if (size == 0) { // Value doesn't exist, must be the first step num_steps = 0; } // Increment or decrement and check result num_steps += val; if (num_steps < 0) { CRAY_ERR("Invalid step count (%d) on the node", num_steps); TEMP_FAILURE_RETRY(close(fd)); return 0; } // Write the new value offset = lseek(fd, 0, SEEK_SET); if (offset == -1) { CRAY_ERR("fseek failed: %m"); TEMP_FAILURE_RETRY(close(fd)); return -1; } size = write(fd, &num_steps, sizeof(int)); if (size < sizeof(int)) { CRAY_ERR("write failed: %m"); TEMP_FAILURE_RETRY(close(fd)); return -1; } log_flag(TASK, "Wrote %d steps to %s", num_steps, NUM_STEPS_FILE); TEMP_FAILURE_RETRY(close(fd)); return num_steps; } /* * Runs Cray-specific step prologue commands * Returns SLURM_ERROR or SLURM_SUCCESS */ static int _step_prologue(void) { int num_steps, rc; char *err_msg; num_steps = _update_num_steps(1); if (num_steps == -1) { return SLURM_ERROR; } rc = alpsc_node_app_prologue(&err_msg); _ALPSC_DEBUG("alpsc_node_app_prologue"); if (rc != 1) { return SLURM_ERROR; } return SLURM_SUCCESS; } /* * Runs Cray-specific step epilogue commands * Returns SLURM_ERROR or SLURM_SUCCESS */ static int _step_epilogue(void) { int num_steps, rc; char *err_msg; // Note the step is done num_steps = _update_num_steps(-1); if (num_steps == -1) { return SLURM_ERROR; } // If we're the last step, run the app epilogue if (num_steps == 0) { rc = alpsc_node_app_epilogue(&err_msg); _ALPSC_DEBUG("alpsc_node_app_epilogue"); if (rc != 1) { return SLURM_ERROR; } } else log_flag(TASK, "Skipping epilogue, %d other steps running", num_steps); return SLURM_SUCCESS; } #endif /* * Keep track a of a pid. */ extern int task_p_add_pid (pid_t pid) { return SLURM_SUCCESS; }