/*****************************************************************************\ * msg_aggr.c - Message Aggregator for sending messages to the * slurmctld, if a reply is expected this also will wait * and get that reply when received. ***************************************************************************** * Copyright (C) 2015 Bull S. A. S. * Bull, Rue Jean Jaures, B.P.68, 78340, Les Clayes-sous-Bois. * Copyright (C) 2015 SchedMD LLC. * Written by Martin Perry * Danny Auble * * This file is part of Slurm, a resource management program. * For details, see . * Please also read the included file: DISCLAIMER. * * Slurm is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free * Software Foundation; either version 2 of the License, or (at your option) * any later version. * * In addition, as a special exception, the copyright holders give permission * to link the code of portions of this program with the OpenSSL library under * certain conditions as described in each individual source file, and * distribute linked combinations including the two. You must obey the GNU * General Public License in all respects for all of the code used other than * OpenSSL. If you modify file(s) with this exception, you may extend this * exception to your version of the file(s), but you are not obligated to do * so. If you do not wish to do so, delete this exception statement from your * version. If you delete this exception statement from all source files in * the program, then also delete it here. * * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License along * with Slurm; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. \*****************************************************************************/ #include #include "slurm/slurm.h" #include "src/common/macros.h" #include "src/common/msg_aggr.h" #include "src/common/read_config.h" #include "src/common/slurm_auth.h" #include "src/common/slurm_route.h" #include "src/common/slurm_protocol_interface.h" #include "src/common/xmalloc.h" #include "src/common/xstring.h" #include "src/slurmd/slurmd/slurmd.h" typedef struct { pthread_mutex_t aggr_mutex; pthread_cond_t cond; uint32_t debug_flags; bool max_msgs; uint64_t max_msg_cnt; List msg_aggr_list; List msg_list; pthread_mutex_t mutex; slurm_addr_t node_addr; bool running; pthread_t thread_id; uint64_t window; } msg_collection_type_t; typedef struct { uint16_t msg_index; void (*resp_callback) (slurm_msg_t *msg); pthread_cond_t wait_cond; } msg_aggr_t; /* * Message collection data & controls */ static msg_collection_type_t msg_collection; static void _msg_aggr_free(void *x) { msg_aggr_t *object = (msg_aggr_t *)x; if (object) { slurm_cond_destroy(&object->wait_cond); xfree(object); } } static msg_aggr_t *_handle_msg_aggr_ret(uint32_t msg_index, bool locked) { msg_aggr_t *msg_aggr; ListIterator itr; if (!locked) slurm_mutex_lock(&msg_collection.aggr_mutex); itr = list_iterator_create(msg_collection.msg_aggr_list); while ((msg_aggr = list_next(itr))) { /* just remove them all */ if (!msg_index) { /* make sure we don't wait any longer */ slurm_cond_signal(&msg_aggr->wait_cond); list_remove(itr); } else if (msg_aggr->msg_index == msg_index) { list_remove(itr); break; } } list_iterator_destroy(itr); if (!locked) slurm_mutex_unlock(&msg_collection.aggr_mutex); return msg_aggr; } static int _send_to_backup_collector(slurm_msg_t *msg, int rc) { slurm_addr_t *next_dest = NULL; if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) { info("%s: primary %s, getting backup", __func__, rc ? "can't be reached" : "is null"); } if ((next_dest = route_g_next_collector_backup())) { int rc2 = SLURM_SUCCESS; if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) { char addrbuf[100]; slurm_print_slurm_addr(next_dest, addrbuf, 32); info("%s: *next_dest is %s", __func__, addrbuf); } memcpy(&msg->address, next_dest, sizeof(slurm_addr_t)); rc = slurm_send_recv_rc_msg_only_one(msg, &rc2, 0); if (rc2 != SLURM_SUCCESS && !rc) rc = rc2; } if (!next_dest || (rc != SLURM_SUCCESS)) { int rc2 = SLURM_SUCCESS; if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) info("%s: backup %s, sending msg to controller", __func__, rc ? "can't be reached" : "is null"); rc = slurm_send_recv_controller_rc_msg(msg, &rc2, working_cluster_rec); if (rc2 != SLURM_SUCCESS && !rc) rc = rc2; } return rc; } /* * Send a msg to the next msg aggregation collector node. If primary * collector is unavailable or returns error, try backup collector. * If backup collector is unavailable or returns error, send msg * directly to controller. */ static int _send_to_next_collector(slurm_msg_t *msg) { slurm_addr_t *next_dest = NULL; bool i_am_collector; int rc = SLURM_SUCCESS; int rc2 = SLURM_SUCCESS; if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) info("msg aggr: send_to_next_collector: getting primary next " "collector"); if ((next_dest = route_g_next_collector(&i_am_collector))) { if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) { char addrbuf[100]; slurm_print_slurm_addr(next_dest, addrbuf, 32); info("msg aggr: send_to_next_collector: *next_dest is " "%s", addrbuf); } memcpy(&msg->address, next_dest, sizeof(slurm_addr_t)); rc = slurm_send_recv_rc_msg_only_one(msg, &rc2, 0); if (rc2 != SLURM_SUCCESS && !rc) rc = rc2; } if (!next_dest || (rc != SLURM_SUCCESS)) rc = _send_to_backup_collector(msg, rc); return rc; } /* * _msg_aggregation_sender() * * Start and terminate message collection windows. * Send collected msgs to next collector node or final destination * at window expiration. */ static void * _msg_aggregation_sender(void *arg) { struct timeval now; struct timespec timeout; slurm_msg_t msg; composite_msg_t cmp; slurm_mutex_lock(&msg_collection.mutex); msg_collection.running = 1; slurm_cond_signal(&msg_collection.cond); while (msg_collection.running) { /* Wait for a new msg to be collected */ slurm_cond_wait(&msg_collection.cond, &msg_collection.mutex); if (!msg_collection.running && !list_count(msg_collection.msg_list)) break; /* A msg has been collected; start new window */ gettimeofday(&now, NULL); timeout.tv_sec = now.tv_sec + (msg_collection.window / MSEC_IN_SEC); timeout.tv_nsec = (now.tv_usec * NSEC_IN_USEC) + (NSEC_IN_MSEC * (msg_collection.window % MSEC_IN_SEC)); timeout.tv_sec += timeout.tv_nsec / NSEC_IN_SEC; timeout.tv_nsec %= NSEC_IN_SEC; slurm_cond_timedwait(&msg_collection.cond, &msg_collection.mutex, &timeout); if (!msg_collection.running && !list_count(msg_collection.msg_list)) break; msg_collection.max_msgs = true; /* Msg collection window has expired and message collection * is suspended; now build and send composite msg */ memset(&msg, 0, sizeof(slurm_msg_t)); memset(&cmp, 0, sizeof(composite_msg_t)); memcpy(&cmp.sender, &msg_collection.node_addr, sizeof(slurm_addr_t)); cmp.msg_list = msg_collection.msg_list; msg_collection.msg_list = list_create(slurm_free_comp_msg_list); msg_collection.max_msgs = false; slurm_msg_t_init(&msg); msg.msg_type = MESSAGE_COMPOSITE; msg.protocol_version = SLURM_PROTOCOL_VERSION; msg.data = &cmp; if (_send_to_next_collector(&msg) != SLURM_SUCCESS) { error("_msg_aggregation_engine: Unable to send " "composite msg: %m"); } FREE_NULL_LIST(cmp.msg_list); /* Resume message collection */ slurm_cond_broadcast(&msg_collection.cond); } slurm_mutex_unlock(&msg_collection.mutex); return NULL; } extern void msg_aggr_sender_init(char *host, uint16_t port, uint64_t window, uint64_t max_msg_cnt) { if (msg_collection.running || (max_msg_cnt <= 1)) return; memset(&msg_collection, 0, sizeof(msg_collection_type_t)); slurm_mutex_init(&msg_collection.aggr_mutex); slurm_mutex_init(&msg_collection.mutex); slurm_mutex_lock(&msg_collection.mutex); slurm_mutex_lock(&msg_collection.aggr_mutex); slurm_cond_init(&msg_collection.cond, NULL); slurm_set_addr(&msg_collection.node_addr, port, host); msg_collection.window = window; msg_collection.max_msg_cnt = max_msg_cnt; msg_collection.msg_aggr_list = list_create(_msg_aggr_free); msg_collection.msg_list = list_create(slurm_free_comp_msg_list); msg_collection.max_msgs = false; msg_collection.debug_flags = slurm_get_debug_flags(); slurm_mutex_unlock(&msg_collection.aggr_mutex); slurm_thread_create(&msg_collection.thread_id, &_msg_aggregation_sender, NULL); /* wait for thread to start */ slurm_cond_wait(&msg_collection.cond, &msg_collection.mutex); slurm_mutex_unlock(&msg_collection.mutex); } extern void msg_aggr_sender_reconfig(uint64_t window, uint64_t max_msg_cnt) { if (msg_collection.running) { slurm_mutex_lock(&msg_collection.mutex); msg_collection.window = window; msg_collection.max_msg_cnt = max_msg_cnt; msg_collection.debug_flags = slurm_get_debug_flags(); slurm_mutex_unlock(&msg_collection.mutex); } else if (max_msg_cnt > 1) { error("can't start the msg_aggr on a reconfig, " "a restart is needed"); } } extern void msg_aggr_sender_fini(void) { if (!msg_collection.running) return; msg_collection.running = 0; slurm_mutex_lock(&msg_collection.mutex); slurm_cond_signal(&msg_collection.cond); slurm_mutex_unlock(&msg_collection.mutex); pthread_join(msg_collection.thread_id, NULL); msg_collection.thread_id = (pthread_t) 0; slurm_cond_destroy(&msg_collection.cond); /* signal and clear the waiting list */ slurm_mutex_lock(&msg_collection.aggr_mutex); _handle_msg_aggr_ret(0, 1); FREE_NULL_LIST(msg_collection.msg_aggr_list); slurm_mutex_unlock(&msg_collection.aggr_mutex); FREE_NULL_LIST(msg_collection.msg_list); slurm_mutex_destroy(&msg_collection.mutex); } extern void msg_aggr_add_msg(slurm_msg_t *msg, bool wait, void (*resp_callback) (slurm_msg_t *msg)) { int count; static uint16_t msg_index = 1; static uint32_t wait_count = 0; if (!msg_collection.running) return; slurm_mutex_lock(&msg_collection.mutex); if (msg_collection.max_msgs == true) { slurm_cond_wait(&msg_collection.cond, &msg_collection.mutex); } msg->msg_index = msg_index++; /* Add msg to message collection */ list_append(msg_collection.msg_list, msg); count = list_count(msg_collection.msg_list); /* First msg in collection; initiate new window */ if (count == 1) slurm_cond_signal(&msg_collection.cond); /* Max msgs reached; terminate window */ if (count >= msg_collection.max_msg_cnt) { msg_collection.max_msgs = true; slurm_cond_signal(&msg_collection.cond); } slurm_mutex_unlock(&msg_collection.mutex); if (wait) { msg_aggr_t *msg_aggr = xmalloc(sizeof(msg_aggr_t)); uint16_t msg_timeout; struct timeval now; struct timespec timeout; msg_aggr->msg_index = msg->msg_index; msg_aggr->resp_callback = resp_callback; slurm_cond_init(&msg_aggr->wait_cond, NULL); slurm_mutex_lock(&msg_collection.aggr_mutex); list_append(msg_collection.msg_aggr_list, msg_aggr); msg_timeout = slurm_get_msg_timeout(); gettimeofday(&now, NULL); timeout.tv_sec = now.tv_sec + msg_timeout; timeout.tv_nsec = now.tv_usec * 1000; wait_count++; if (pthread_cond_timedwait(&msg_aggr->wait_cond, &msg_collection.aggr_mutex, &timeout) == ETIMEDOUT) _handle_msg_aggr_ret(msg_aggr->msg_index, 1); wait_count--; slurm_mutex_unlock(&msg_collection.aggr_mutex); if (!msg_collection.running && !wait_count) slurm_mutex_destroy(&msg_collection.aggr_mutex); _msg_aggr_free(msg_aggr); } } extern void msg_aggr_add_comp(Buf buffer, void *auth_cred, header_t *header) { slurm_msg_t *msg; if (!msg_collection.running) return; msg = xmalloc_nz(sizeof(slurm_msg_t)); slurm_msg_t_init(msg); msg->protocol_version = header->version; msg->msg_type = header->msg_type; msg->flags = header->flags; msg->auth_cred = auth_cred; msg->data = buffer; msg->data_size = remaining_buf(buffer); msg_aggr_add_msg(msg, 0, NULL); } extern void msg_aggr_resp(slurm_msg_t *msg) { slurm_msg_t *next_msg; composite_msg_t *comp_msg; msg_aggr_t *msg_aggr; ListIterator itr; comp_msg = (composite_msg_t *)msg->data; itr = list_iterator_create(comp_msg->msg_list); if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) info("msg_aggr_resp: processing composite msg_list..."); while ((next_msg = list_next(itr))) { switch (next_msg->msg_type) { case RESPONSE_NODE_REGISTRATION: case RESPONSE_SLURM_RC: /* signal sending thread that slurmctld received this * msg */ if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) info("msg_aggr_resp: response found for " "index %u signaling sending thread", next_msg->msg_index); slurm_mutex_lock(&msg_collection.aggr_mutex); if (!(msg_aggr = _handle_msg_aggr_ret( next_msg->msg_index, 1))) { debug2("msg_aggr_resp: error: unable to " "locate aggr message struct for job %u", next_msg->msg_index); slurm_mutex_unlock(&msg_collection.aggr_mutex); continue; } if (msg_aggr->resp_callback && (next_msg->msg_type != RESPONSE_SLURM_RC)) (*(msg_aggr->resp_callback))(next_msg); slurm_cond_signal(&msg_aggr->wait_cond); slurm_mutex_unlock(&msg_collection.aggr_mutex); break; case RESPONSE_MESSAGE_COMPOSITE: comp_msg = (composite_msg_t *)next_msg->data; /* set up the address here for the next node */ memcpy(&next_msg->address, &comp_msg->sender, sizeof(slurm_addr_t)); if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) { char addrbuf[100]; slurm_print_slurm_addr(&next_msg->address, addrbuf, 32); info("msg_aggr_resp: composite response msg " "found for %s", addrbuf); } slurm_send_only_node_msg(next_msg); break; default: error("_rpc_composite_resp: invalid msg type in " "composite msg_list"); break; } } list_iterator_destroy(itr); if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) info("msg aggr: _rpc_composite_resp: finished processing " "composite msg_list..."); }