/*****************************************************************************\ ** pmi2.c - PMI2 client(task) command handling ***************************************************************************** * Copyright (C) 2011-2012 National University of Defense Technology. * Written by Hongjia Cao . * All rights reserved. * * This file is part of Slurm, a resource management program. * For details, see . * Please also read the included file: DISCLAIMER. * * Slurm is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free * Software Foundation; either version 2 of the License, or (at your option) * any later version. * * In addition, as a special exception, the copyright holders give permission * to link the code of portions of this program with the OpenSSL library under * certain conditions as described in each individual source file, and * distribute linked combinations including the two. You must obey the GNU * General Public License in all respects for all of the code used other than * OpenSSL. If you modify file(s) with this exception, you may extend this * exception to your version of the file(s), but you are not obligated to do * so. If you do not wish to do so, delete this exception statement from your * version. If you delete this exception statement from all source files in * the program, then also delete it here. * * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License along * with Slurm; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. \*****************************************************************************/ #if defined(__FreeBSD__) #include /* AF_INET */ #endif #include #include #include #include #include "src/common/slurm_xlator.h" #include "src/common/log.h" #include "pmi.h" #include "client.h" #include "spawn.h" #include "kvs.h" #include "info.h" #include "setup.h" #include "agent.h" #include "nameserv.h" #include "ring.h" /* PMI2 command handlers */ static int _handle_fullinit(int fd, int lrank, client_req_t *req); static int _handle_finalize(int fd, int lrank, client_req_t *req); static int _handle_abort(int fd, int lrank, client_req_t *req); static int _handle_job_getid(int fd, int lrank, client_req_t *req); static int _handle_job_connect(int fd, int lrank, client_req_t *req); static int _handle_job_disconnect(int fd, int lrank, client_req_t *req); static int _handle_ring(int fd, int lrank, client_req_t *req); static int _handle_kvs_put(int fd, int lrank, client_req_t *req); static int _handle_kvs_fence(int fd, int lrank, client_req_t *req); static int _handle_kvs_get(int fd, int lrank, client_req_t *req); static int _handle_info_getnodeattr(int fd, int lrank, client_req_t *req); static int _handle_info_putnodeattr(int fd, int lrank, client_req_t *req); static int _handle_info_getjobattr(int fd, int lrank, client_req_t *req); static int _handle_name_publish(int fd, int lrank, client_req_t *req); static int _handle_name_unpublish(int fd, int lrank, client_req_t *req); static int _handle_name_lookup(int fd, int lrank, client_req_t *req); static int _handle_spawn(int fd, int lrank, client_req_t *req); static struct { char *cmd; int (*handler)(int fd, int lrank, client_req_t *req); } pmi2_cmd_handlers[] = { { FULLINIT_CMD, _handle_fullinit }, { FINALIZE_CMD, _handle_finalize }, { ABORT_CMD, _handle_abort }, { JOBGETID_CMD, _handle_job_getid }, { JOBCONNECT_CMD, _handle_job_connect }, { JOBDISCONNECT_CMD, _handle_job_disconnect }, { RING_CMD, _handle_ring }, { KVSPUT_CMD, _handle_kvs_put }, { KVSFENCE_CMD, _handle_kvs_fence }, { KVSGET_CMD, _handle_kvs_get }, { GETNODEATTR_CMD, _handle_info_getnodeattr }, { PUTNODEATTR_CMD, _handle_info_putnodeattr }, { GETJOBATTR_CMD, _handle_info_getjobattr }, { NAMEPUBLISH_CMD, _handle_name_publish }, { NAMEUNPUBLISH_CMD, _handle_name_unpublish }, { NAMELOOKUP_CMD, _handle_name_lookup }, { SPAWN_CMD, _handle_spawn }, { NULL, NULL}, }; static int _handle_fullinit(int fd, int lrank, client_req_t *req) { int pmi_jobid, pmi_rank; bool threaded; int found, rc = PMI2_SUCCESS; client_resp_t *resp; debug3("mpi/pmi2: _handle_fullinit"); client_req_parse_body(req); found = client_req_get_int(req, PMIJOBID_KEY, &pmi_jobid); if (! found) { error(PMIJOBID_KEY" missing in fullinit command"); rc = PMI2_ERR_INVALID_ARG; goto response; } found = client_req_get_int(req, PMIRANK_KEY, &pmi_rank); if (! found) { error(PMIRANK_KEY" missing in fullinit command"); rc = PMI2_ERR_INVALID_ARG; goto response; } found = client_req_get_bool(req, THREADED_KEY, &threaded); if (! found) { error(THREADED_KEY" missing in fullinit command"); rc = PMI2_ERR_INVALID_ARG; goto response; } /* TODO: use threaded */ response: resp = client_resp_new(); /* what's the difference between DEBUGGED and VERBOSE? */ /* TODO: APPNUM */ client_resp_append(resp, CMD_KEY"="FULLINITRESP_CMD";" RC_KEY"=%d;" PMIVERSION_KEY"=%d;" PMISUBVER_KEY"=%d;" RANK_KEY"=%d;" SIZE_KEY"=%d;" APPNUM_KEY"=-1;" DEBUGGED_KEY"="FALSE_VAL";" PMIVERBOSE_KEY"=%s;", rc, PMI20_VERSION, PMI20_SUBVERSION, job_info.gtids[lrank], job_info.ntasks, (job_info.pmi_debugged ? TRUE_VAL : FALSE_VAL)); if (job_info.spawner_jobid) { client_resp_append(resp, SPAWNERJOBID_KEY"=%s;", job_info.spawner_jobid); } rc = client_resp_send(resp, fd); client_resp_free(resp); debug3("mpi/pmi2: fullinit done"); return rc; } static int _handle_finalize(int fd, int lrank, client_req_t *req) { client_resp_t *resp; int rc = 0; resp = client_resp_new(); client_resp_append(resp, CMD_KEY"="FINALIZERESP_CMD";" RC_KEY"=%d;", rc); rc = client_resp_send(resp, fd); client_resp_free(resp); /* shutdown the PMI fd */ shutdown(fd, SHUT_RDWR); close(fd); task_finalize(lrank); return rc; } static int _handle_abort(int fd, int lrank, client_req_t *req) { int rc = SLURM_SUCCESS; bool is_world = false; debug3("mpi/pmi2: in _handle_abort"); client_req_parse_body(req); client_req_get_bool(req, ISWORLD_KEY, &is_world); /* no response needed. just cancel the job step if required */ if (is_world) { slurm_kill_job_step(job_info.step_id.job_id, job_info.step_id.step_id, SIGKILL); } return rc; } static int _handle_job_getid(int fd, int lrank, client_req_t *req) { int rc = SLURM_SUCCESS; client_resp_t *resp; debug3("mpi/pmi2: in _handle_job_getid"); resp = client_resp_new(); client_resp_append(resp, CMD_KEY"="JOBGETIDRESP_CMD";" RC_KEY"=0;" JOBID_KEY"=%s;", job_info.pmi_jobid); rc = client_resp_send(resp, fd); client_resp_free(resp); debug3("mpi/pmi2: out _handle_job_getid"); return rc; } static int _handle_job_connect(int fd, int lrank, client_req_t *req) { int rc = SLURM_SUCCESS; error("mpi/pmi2: job connect not implemented for now"); return rc; } static int _handle_job_disconnect(int fd, int lrank, client_req_t *req) { int rc = SLURM_SUCCESS; error("mpi/pmi2: job disconnect not implemented for now"); return rc; } static int _handle_ring(int fd, int lrank, client_req_t *req) { int rc = SLURM_SUCCESS; int count = 0; char *left = NULL; char *right = NULL; debug3("mpi/pmi2: in _handle_ring"); /* extract left, right, and count values from ring payload */ client_req_parse_body(req); client_req_get_int(req, RING_COUNT_KEY, &count); client_req_get_str(req, RING_LEFT_KEY, &left); client_req_get_str(req, RING_RIGHT_KEY, &right); /* compute ring_id, we list all application tasks first, * followed by stepds, so here we just use the application * process rank */ int ring_id = lrank; rc = pmix_ring_in(ring_id, count, left, right); xfree(left); xfree(right); /* the repsonse is sent back to client from the pmix_ring_out call */ debug3("mpi/pmi2: out _handle_ring"); return rc; } static int _handle_kvs_put(int fd, int lrank, client_req_t *req) { int rc = SLURM_SUCCESS; client_resp_t *resp; char *key = NULL, *val = NULL; debug3("mpi/pmi2: in _handle_kvs_put"); client_req_parse_body(req); client_req_get_str(req, KEY_KEY, &key); client_req_get_str(req, VALUE_KEY, &val); /* no need to add k-v to hash. just get it ready to be up-forward */ rc = temp_kvs_add(key, val); xfree(key); xfree(val); resp = client_resp_new(); client_resp_append(resp, CMD_KEY"="KVSPUTRESP_CMD";" RC_KEY"=%d;", rc); rc = client_resp_send(resp, fd); client_resp_free(resp); debug3("mpi/pmi2: out _handle_kvs_put"); return rc; } static int _handle_kvs_fence(int fd, int lrank, client_req_t *req) { int rc = 0; debug3("mpi/pmi2: in _handle_kvs_fence, from task %d", job_info.gtids[lrank]); if (tasks_to_wait == 0 && children_to_wait == 0) { tasks_to_wait = job_info.ltasks; children_to_wait = tree_info.num_children; } tasks_to_wait --; /* mutex protection is not required */ if (tasks_to_wait == 0 && children_to_wait == 0) { rc = temp_kvs_send(); if (rc != SLURM_SUCCESS) { error("mpi/pmi2: %d failed to send temp kvs to %s", __LINE__, tree_info.parent_node ?: "srun"); send_kvs_fence_resp_to_clients( rc, "mpi/pmi2: failed to send temp kvs"); /* cancel the step to avoid tasks hang */ slurm_kill_job_step(job_info.step_id.job_id, job_info.step_id.step_id, SIGKILL); } else { waiting_kvs_resp = 1; } } debug3("mpi/pmi2: out _handle_kvs_fence, tasks_to_wait=%d, " "children_to_wait=%d", tasks_to_wait, children_to_wait); return rc; } static int _handle_kvs_get(int fd, int lrank, client_req_t *req) { int rc; client_resp_t *resp; char *key = NULL, *val; debug3("mpi/pmi2: in _handle_kvs_get"); client_req_parse_body(req); client_req_get_str(req, KEY_KEY, &key); val = kvs_get(key); xfree(key); resp = client_resp_new(); if (val != NULL) { client_resp_append(resp, CMD_KEY"="KVSGETRESP_CMD";" RC_KEY"=0;" FOUND_KEY"="TRUE_VAL";" VALUE_KEY"=%s;", val); } else { client_resp_append(resp, CMD_KEY"="KVSGETRESP_CMD";" RC_KEY"=0;" FOUND_KEY"="FALSE_VAL";"); } rc = client_resp_send(resp, fd); client_resp_free(resp); debug3("mpi/pmi2: out _handle_kvs_get"); return rc; } static int _handle_info_getnodeattr(int fd, int lrank, client_req_t *req) { int rc = 0; client_resp_t *resp; char *key = NULL, *val; bool wait = false; debug3("mpi/pmi2: in _handle_info_getnodeattr from lrank %d", lrank); client_req_parse_body(req); client_req_get_str(req, KEY_KEY, &key); client_req_get_bool(req, WAIT_KEY, &wait); val = node_attr_get(key); if (val != NULL || (! wait)) { resp = client_resp_new(); client_resp_append(resp, CMD_KEY"="GETNODEATTRRESP_CMD";" RC_KEY"=0;" ); if (val == NULL) { client_resp_append(resp, FOUND_KEY"="FALSE_VAL";" ); } else { client_resp_append(resp, FOUND_KEY"="TRUE_VAL";" VALUE_KEY"=%s;", val); } rc = client_resp_send(resp, fd); client_resp_free(resp); } else { rc = enqueue_nag_req(fd, lrank, key); } xfree(key); debug3("mpi/pmi2: out _handle_info_getnodeattr"); return rc; } static int _handle_info_putnodeattr(int fd, int lrank, client_req_t *req) { char *key, *val; client_resp_t *resp; int rc = 0; debug3("mpi/pmi2: in _handle_info_putnodeattr"); client_req_parse_body(req); client_req_get_str(req, KEY_KEY, &key); client_req_get_str(req, VALUE_KEY, &val); rc = node_attr_put(key, val); xfree(key); xfree(val); resp = client_resp_new(); client_resp_append(resp, CMD_KEY"="PUTNODEATTRRESP_CMD";" RC_KEY"=%d;", rc); rc = client_resp_send(resp, fd); client_resp_free(resp); debug3("mpi/pmi2: out _handle_info_putnodeattr"); return rc; } static int _handle_info_getjobattr(int fd, int lrank, client_req_t *req) { char *key = NULL, *val; client_resp_t *resp; int rc; debug3("mpi/pmi2: in _handle_info_getjobattr"); client_req_parse_body(req); client_req_get_str(req, KEY_KEY, &key); val = job_attr_get(key); xfree(key); resp = client_resp_new(); client_resp_append(resp, CMD_KEY"="GETJOBATTRRESP_CMD";" RC_KEY"=0;"); if (val != NULL) { client_resp_append(resp, FOUND_KEY"="TRUE_VAL";" VALUE_KEY"=%s;", val); } else { client_resp_append(resp, FOUND_KEY"="FALSE_VAL";"); } rc = client_resp_send(resp, fd); client_resp_free(resp); debug3("mpi/pmi2: out _handle_info_getjobattr"); return rc; } static int _handle_name_publish(int fd, int lrank, client_req_t *req) { int rc; client_resp_t *resp; char *name = NULL, *port = NULL; debug3("mpi/pmi2: in _handle_publish_name"); client_req_parse_body(req); client_req_get_str(req, NAME_KEY, &name); client_req_get_str(req, PORT_KEY, &port); rc = name_publish_up(name, port); xfree(name); xfree(port); resp = client_resp_new(); client_resp_append(resp, CMD_KEY"="NAMEPUBLISHRESP_CMD";" RC_KEY"=%d;", rc); rc = client_resp_send(resp, fd); client_resp_free(resp); debug3("mpi/pmi2: out _handle_publish_name"); return rc; } static int _handle_name_unpublish(int fd, int lrank, client_req_t *req) { int rc; client_resp_t *resp; char *name = NULL; debug3("mpi/pmi2: in _handle_unpublish_name"); client_req_parse_body(req); client_req_get_str(req, NAME_KEY, &name); rc = name_unpublish_up(name); xfree(name); resp = client_resp_new(); client_resp_append(resp, CMD_KEY"="NAMEUNPUBLISHRESP_CMD";" RC_KEY"=%d;", rc); rc = client_resp_send(resp, fd); client_resp_free(resp); debug3("mpi/pmi2: out _handle_unpublish_name"); return rc; } static int _handle_name_lookup(int fd, int lrank, client_req_t *req) { int rc; client_resp_t *resp; char *name = NULL, *port = NULL; debug3("mpi/pmi2: in _handle_lookup_name"); client_req_parse_body(req); client_req_get_str(req, NAME_KEY, &name); port = name_lookup_up(name); resp = client_resp_new(); client_resp_append(resp, CMD_KEY"="NAMELOOKUPRESP_CMD";"); if (port == NULL) { client_resp_append(resp, RC_KEY"=1;"); } else { client_resp_append(resp, RC_KEY"=0;"VALUE_KEY"=%s;", port); } rc = client_resp_send(resp, fd); client_resp_free(resp); xfree(name); xfree(port); debug3("mpi/pmi2: out _handle_lookup_name"); return rc; } static int _handle_spawn(int fd, int lrank, client_req_t *req) { int rc; spawn_req_t *spawn_req = NULL; spawn_resp_t *spawn_resp = NULL; client_resp_t *task_resp; debug3("mpi/pmi2: in _handle_spawn"); client_req_parse_body(req); spawn_req = client_req_parse_spawn_req(req); if (spawn_req == NULL) { task_resp = client_resp_new(); client_resp_append(task_resp, CMD_KEY"="SPAWNRESP_CMD";" RC_KEY"=%d;" ERRMSG_KEY"=invalid command;", PMI2_ERR_INVALID_ARGS); client_resp_send(task_resp, fd); client_resp_free(task_resp); return SLURM_ERROR; } /* a resp will be send back from srun. * this will not be forwarded to the tasks */ rc = spawn_req_send_to_srun(spawn_req, &spawn_resp); if (spawn_resp->rc != SLURM_SUCCESS) { task_resp = client_resp_new(); client_resp_append(task_resp, CMD_KEY"="SPAWNRESP_CMD";" RC_KEY"=%d;" ERRMSG_KEY"=spawn failed;", spawn_resp->rc); client_resp_send(task_resp, fd); client_resp_free(task_resp); spawn_req_free(spawn_req); spawn_resp_free(spawn_resp); debug("mpi/pmi2: spawn failed"); return SLURM_ERROR; } debug3("mpi/pmi2: spawn request sent to srun"); spawn_psr_enqueue(spawn_resp->seq, fd, lrank, NULL); spawn_req_free(spawn_req); spawn_resp_free(spawn_resp); debug3("mpi/pmi2: out _handle_spawn"); return rc; } /**************************************************/ extern int handle_pmi2_cmd(int fd, int lrank) { int i, len; char len_buf[7], *buf = NULL; client_req_t *req = NULL; int rc = SLURM_SUCCESS; debug3("mpi/pmi2: in handle_pmi2_cmd"); safe_read(fd, len_buf, 6); len_buf[6] = '\0'; len = atoi(len_buf); buf = xmalloc(len + 1); safe_read(fd, buf, len); buf[len] = '\0'; debug2("mpi/pmi2: got client request: %s %s", len_buf, buf); if (!len) { /* * This is an invalid request. * * The most likely cause of an invalid client request is a * second PMI2_Init call from the client end. This arrives * first as a "cmd=init" call. Ideally, we'd capture that * request, and respond with "cmd=response_to_init" with the rc * field set to PMI2_ERR_INIT and expect the client to cleanup * and die correctly. * * However - Slurm's libpmi2 has historically ignored the rc * value and immediately sends the FULLINIT_CMD regardless, and * then waits for a response to that. Rather than construct * two successive error messages, this call will send back * "cmd=finalize-response" back that will trigger the desired * error handling paths, and then tears down the connection * for good measure. */ _handle_finalize(fd, 0, NULL); xfree(buf); return SLURM_ERROR; } req = client_req_init(len, buf); if (req == NULL) { error("mpi/pmi2: invalid client request"); return SLURM_ERROR; } i = 0; while (pmi2_cmd_handlers[i].cmd != NULL) { if (!xstrcmp(req->cmd, pmi2_cmd_handlers[i].cmd)) break; i ++; } if (pmi2_cmd_handlers[i].cmd == NULL) { error("mpi/pmi2: invalid pmi2 command received: '%s'", req->cmd); rc = SLURM_ERROR; } else { rc = pmi2_cmd_handlers[i].handler(fd, lrank, req); } client_req_free(req); debug3("mpi/pmi2: out handle_pmi2_cmd"); return rc; rwfail: xfree(buf); return SLURM_ERROR; }