/*****************************************************************************\ * acct_gather_profile_influxdb.c - slurm accounting plugin for influxdb * profiling. ***************************************************************************** * Author: Carlos Fenoy Garcia * Copyright (C) 2016 F. Hoffmann - La Roche * * Based on the HDF5 profiling plugin and Elasticsearch job completion plugin. * * Portions Copyright (C) 2013 Bull S. A. S. * Bull, Rue Jean Jaures, B.P.68, 78340, Les Clayes-sous-Bois. * * Portions Copyright (C) 2013 SchedMD LLC. * * This file is part of Slurm, a resource management program. * For details, see . * Please also read the included file: DISCLAIMER. * * Slurm is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free * Software Foundation; either version 2 of the License, or (at your option) * any later version. * * In addition, as a special exception, the copyright holders give permission * to link the code of portions of this program with the OpenSSL library under * certain conditions as described in each individual source file, and * distribute linked combinations including the two. You must obey the GNU * General Public License in all respects for all of the code used other than * OpenSSL. If you modify file(s) with this exception, you may extend this * exception to your version of the file(s), but you are not obligated to do * so. If you do not wish to do so, delete this exception statement from your * version. If you delete this exception statement from all source files in * the program, then also delete it here. * * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License along * with Slurm; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. * * This file is patterned after jobcomp_linux.c, written by Morris Jette and * Copyright (C) 2002 The Regents of the University of California. \*****************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include "src/common/slurm_xlator.h" #include "src/common/fd.h" #include "src/common/slurm_acct_gather_profile.h" #include "src/common/slurm_protocol_api.h" #include "src/common/slurm_protocol_defs.h" #include "src/common/slurm_time.h" #include "src/common/macros.h" #include "src/slurmd/common/proctrack.h" /* * These variables are required by the generic plugin interface. If they * are not found in the plugin, the plugin loader will ignore it. * * plugin_name - a string giving a human-readable description of the * plugin. There is no maximum length, but the symbol must refer to * a valid string. * * plugin_type - a string suggesting the type of the plugin or its * applicability to a particular form of data or method of data handling. * If the low-level plugin API is used, the contents of this string are * unimportant and may be anything. Slurm uses the higher-level plugin * interface which requires this string to be of the form * * / * * where is a description of the intended application of * the plugin (e.g., "jobacct" for Slurm job completion logging) and * is a description of how this plugin satisfies that application. Slurm will * only load job completion logging plugins if the plugin_type string has a * prefix of "jobacct/". * * plugin_version - an unsigned 32-bit integer containing the Slurm version * (major.minor.micro combined into a single number). */ const char plugin_name[] = "AcctGatherProfile influxdb plugin"; const char plugin_type[] = "acct_gather_profile/influxdb"; const uint32_t plugin_version = SLURM_VERSION_NUMBER; typedef struct { char *host; char *database; uint32_t def; char *password; char *rt_policy; char *username; } slurm_influxdb_conf_t; typedef struct { char ** names; uint32_t *types; size_t size; char * name; } table_t; /* Type for handling HTTP responses */ struct http_response { char *message; size_t size; }; union data_t{ uint64_t u; double d; }; static slurm_influxdb_conf_t influxdb_conf; static uint32_t g_profile_running = ACCT_GATHER_PROFILE_NOT_SET; static stepd_step_rec_t *g_job = NULL; static char *datastr = NULL; static int datastrlen = 0; static table_t *tables = NULL; static size_t tables_max_len = 0; static size_t tables_cur_len = 0; static void _free_tables(void) { int i, j; debug3("%s %s called", plugin_type, __func__); for (i = 0; i < tables_cur_len; i++) { table_t *table = &(tables[i]); for (j = 0; j < tables->size; j++) xfree(table->names[j]); xfree(table->name); xfree(table->names); xfree(table->types); } xfree(tables); } static uint32_t _determine_profile(void) { uint32_t profile; debug3("%s %s called", plugin_type, __func__); xassert(g_job); if (g_profile_running != ACCT_GATHER_PROFILE_NOT_SET) profile = g_profile_running; else if (g_job->profile >= ACCT_GATHER_PROFILE_NONE) profile = g_job->profile; else profile = influxdb_conf.def; return profile; } /* Callback to handle the HTTP response */ static size_t _write_callback(void *contents, size_t size, size_t nmemb, void *userp) { size_t realsize = size * nmemb; struct http_response *mem = (struct http_response *) userp; debug3("%s %s called", plugin_type, __func__); mem->message = xrealloc(mem->message, mem->size + realsize + 1); memcpy(&(mem->message[mem->size]), contents, realsize); mem->size += realsize; mem->message[mem->size] = 0; return realsize; } /* Try to send data to influxdb */ static int _send_data(const char *data) { CURL *curl_handle = NULL; CURLcode res; struct http_response chunk; int rc = SLURM_SUCCESS; long response_code; static int error_cnt = 0; char *url = NULL; size_t length; debug3("%s %s called", plugin_type, __func__); /* * Every compute node which is sampling data will try to establish a * different connection to the influxdb server. In order to reduce the * number of connections, every time a new sampled data comes in, it * is saved in the 'datastr' buffer. Once this buffer is full, then we * try to open the connection and send this buffer, instead of opening * one per sample. */ if (data && ((datastrlen + strlen(data)) <= BUF_SIZE)) { xstrcat(datastr, data); length = strlen(data); datastrlen += length; log_flag(PROFILE, "%s %s: %zu bytes of data added to buffer. New buffer size: %d", plugin_type, __func__, length, datastrlen); return rc; } DEF_TIMERS; START_TIMER; if (curl_global_init(CURL_GLOBAL_ALL) != 0) { error("%s %s: curl_global_init: %m", plugin_type, __func__); rc = SLURM_ERROR; goto cleanup_global_init; } else if ((curl_handle = curl_easy_init()) == NULL) { error("%s %s: curl_easy_init: %m", plugin_type, __func__); rc = SLURM_ERROR; goto cleanup_easy_init; } xstrfmtcat(url, "%s/write?db=%s&rp=%s&precision=s", influxdb_conf.host, influxdb_conf.database, influxdb_conf.rt_policy); chunk.message = xmalloc(1); chunk.size = 0; curl_easy_setopt(curl_handle, CURLOPT_URL, url); if (influxdb_conf.password) curl_easy_setopt(curl_handle, CURLOPT_PASSWORD, influxdb_conf.password); curl_easy_setopt(curl_handle, CURLOPT_POST, 1); curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, datastr); curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE, strlen(datastr)); if (influxdb_conf.username) curl_easy_setopt(curl_handle, CURLOPT_USERNAME, influxdb_conf.username); curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, _write_callback); curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *) &chunk); if ((res = curl_easy_perform(curl_handle)) != CURLE_OK) { if ((error_cnt++ % 100) == 0) error("%s %s: curl_easy_perform failed to send data (discarded). Reason: %s", plugin_type, __func__, curl_easy_strerror(res)); rc = SLURM_ERROR; goto cleanup; } if ((res = curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &response_code)) != CURLE_OK) { error("%s %s: curl_easy_getinfo response code failed: %s", plugin_type, __func__, curl_easy_strerror(res)); rc = SLURM_ERROR; goto cleanup; } /* In general, status codes of the form 2xx indicate success, * 4xx indicate that InfluxDB could not understand the request, and * 5xx indicate that the system is overloaded or significantly impaired. * Errors are returned in JSON. * https://docs.influxdata.com/influxdb/v0.13/concepts/api/ */ if (response_code >= 200 && response_code <= 205) { debug2("%s %s: data write success", plugin_type, __func__); if (error_cnt > 0) error_cnt = 0; } else { rc = SLURM_ERROR; debug2("%s %s: data write failed, response code: %ld", plugin_type, __func__, response_code); if (slurm_conf.debug_flags & DEBUG_FLAG_PROFILE) { /* Strip any trailing newlines. */ while (chunk.message[strlen(chunk.message) - 1] == '\n') chunk.message[strlen(chunk.message) - 1] = '\0'; info("%s %s: JSON response body: %s", plugin_type, __func__, chunk.message); } } cleanup: xfree(chunk.message); xfree(url); cleanup_easy_init: curl_easy_cleanup(curl_handle); cleanup_global_init: curl_global_cleanup(); END_TIMER; log_flag(PROFILE, "%s %s: took %s to send data", plugin_type, __func__, TIME_STR); if (data) { datastr = xstrdup(data); datastrlen = strlen(data); } else { datastr[0] = '\0'; datastrlen = 0; } return rc; } /* * init() is called when the plugin is loaded, before any other functions * are called. Put global initialization here. */ extern int init(void) { debug3("%s %s called", plugin_type, __func__); if (!running_in_slurmstepd()) return SLURM_SUCCESS; datastr = xmalloc(BUF_SIZE); return SLURM_SUCCESS; } extern int fini(void) { debug3("%s %s called", plugin_type, __func__); _free_tables(); xfree(datastr); xfree(influxdb_conf.host); xfree(influxdb_conf.database); xfree(influxdb_conf.password); xfree(influxdb_conf.rt_policy); xfree(influxdb_conf.username); return SLURM_SUCCESS; } extern void acct_gather_profile_p_conf_options(s_p_options_t **full_options, int *full_options_cnt) { debug3("%s %s called", plugin_type, __func__); s_p_options_t options[] = { {"ProfileInfluxDBHost", S_P_STRING}, {"ProfileInfluxDBDatabase", S_P_STRING}, {"ProfileInfluxDBDefault", S_P_STRING}, {"ProfileInfluxDBPass", S_P_STRING}, {"ProfileInfluxDBRTPolicy", S_P_STRING}, {"ProfileInfluxDBUser", S_P_STRING}, {NULL} }; transfer_s_p_options(full_options, options, full_options_cnt); return; } extern void acct_gather_profile_p_conf_set(s_p_hashtbl_t *tbl) { char *tmp = NULL; debug3("%s %s called", plugin_type, __func__); influxdb_conf.def = ACCT_GATHER_PROFILE_ALL; if (tbl) { s_p_get_string(&influxdb_conf.host, "ProfileInfluxDBHost", tbl); if (s_p_get_string(&tmp, "ProfileInfluxDBDefault", tbl)) { influxdb_conf.def = acct_gather_profile_from_string(tmp); if (influxdb_conf.def == ACCT_GATHER_PROFILE_NOT_SET) fatal("ProfileInfluxDBDefault can not be set to %s, please specify a valid option", tmp); xfree(tmp); } s_p_get_string(&influxdb_conf.database, "ProfileInfluxDBDatabase", tbl); s_p_get_string(&influxdb_conf.password, "ProfileInfluxDBPass", tbl); s_p_get_string(&influxdb_conf.rt_policy, "ProfileInfluxDBRTPolicy", tbl); s_p_get_string(&influxdb_conf.username, "ProfileInfluxDBUser", tbl); } if (!influxdb_conf.host) fatal("No ProfileInfluxDBHost in your acct_gather.conf file. This is required to use the %s plugin", plugin_type); if (!influxdb_conf.database) fatal("No ProfileInfluxDBDatabase in your acct_gather.conf file. This is required to use the %s plugin", plugin_type); if (influxdb_conf.password && !influxdb_conf.username) fatal("No ProfileInfluxDBUser in your acct_gather.conf file. This is required if ProfileInfluxDBPass is specified to use the %s plugin", plugin_type); if (!influxdb_conf.rt_policy) fatal("No ProfileInfluxDBRTPolicy in your acct_gather.conf file. This is required to use the %s plugin", plugin_type); debug("%s loaded", plugin_name); } extern void acct_gather_profile_p_get(enum acct_gather_profile_info info_type, void *data) { uint32_t *uint32 = (uint32_t *) data; char **tmp_char = (char **) data; debug3("%s %s called", plugin_type, __func__); switch (info_type) { case ACCT_GATHER_PROFILE_DIR: *tmp_char = xstrdup(influxdb_conf.host); break; case ACCT_GATHER_PROFILE_DEFAULT: *uint32 = influxdb_conf.def; break; case ACCT_GATHER_PROFILE_RUNNING: *uint32 = g_profile_running; break; default: debug2("%s %s: info_type %d invalid", plugin_type, __func__, info_type); } } extern int acct_gather_profile_p_node_step_start(stepd_step_rec_t* job) { int rc = SLURM_SUCCESS; char *profile_str; debug3("%s %s called", plugin_type, __func__); xassert(running_in_slurmstepd()); g_job = job; profile_str = acct_gather_profile_to_string(g_job->profile); debug2("%s %s: option --profile=%s", plugin_type, __func__, profile_str); g_profile_running = _determine_profile(); return rc; } extern int acct_gather_profile_p_child_forked(void) { debug3("%s %s called", plugin_type, __func__); return SLURM_SUCCESS; } extern int acct_gather_profile_p_node_step_end(void) { int rc = SLURM_SUCCESS; debug3("%s %s called", plugin_type, __func__); xassert(running_in_slurmstepd()); return rc; } extern int acct_gather_profile_p_task_start(uint32_t taskid) { int rc = SLURM_SUCCESS; debug3("%s %s called with %d prof", plugin_type, __func__, g_profile_running); xassert(running_in_slurmstepd()); xassert(g_job); xassert(g_profile_running != ACCT_GATHER_PROFILE_NOT_SET); if (g_profile_running <= ACCT_GATHER_PROFILE_NONE) return rc; return rc; } extern int acct_gather_profile_p_task_end(pid_t taskpid) { debug3("%s %s called", plugin_type, __func__); _send_data(NULL); return SLURM_SUCCESS; } extern int64_t acct_gather_profile_p_create_group(const char* name) { debug3("%s %s called", plugin_type, __func__); return 0; } extern int acct_gather_profile_p_create_dataset(const char* name, int64_t parent, acct_gather_profile_dataset_t *dataset) { table_t * table; acct_gather_profile_dataset_t *dataset_loc = dataset; debug3("%s %s called", plugin_type, __func__); if (g_profile_running <= ACCT_GATHER_PROFILE_NONE) return SLURM_ERROR; /* compute the size of the type needed to create the table */ if (tables_cur_len == tables_max_len) { if (tables_max_len == 0) ++tables_max_len; tables_max_len *= 2; tables = xrealloc(tables, tables_max_len * sizeof(table_t)); } table = &(tables[tables_cur_len]); table->name = xstrdup(name); table->size = 0; while (dataset_loc && (dataset_loc->type != PROFILE_FIELD_NOT_SET)) { table->names = xrealloc(table->names, (table->size+1) * sizeof(char *)); table->types = xrealloc(table->types, (table->size+1) * sizeof(char *)); (table->names)[table->size] = xstrdup(dataset_loc->name); switch (dataset_loc->type) { case PROFILE_FIELD_UINT64: table->types[table->size] = PROFILE_FIELD_UINT64; break; case PROFILE_FIELD_DOUBLE: table->types[table->size] = PROFILE_FIELD_DOUBLE; break; case PROFILE_FIELD_NOT_SET: break; } table->size++; dataset_loc++; } ++tables_cur_len; return tables_cur_len - 1; } extern int acct_gather_profile_p_add_sample_data(int table_id, void *data, time_t sample_time) { table_t *table = &tables[table_id]; int i = 0; char *str = NULL; debug3("%s %s called", plugin_type, __func__); for(; i < table->size; i++) { switch (table->types[i]) { case PROFILE_FIELD_UINT64: xstrfmtcat(str, "%s,job=%d,step=%d,task=%s," "host=%s value=%"PRIu64" " "%"PRIu64"\n", table->names[i], g_job->step_id.job_id, g_job->step_id.step_id, table->name, g_job->node_name, ((union data_t*)data)[i].u, (uint64_t)sample_time); break; case PROFILE_FIELD_DOUBLE: xstrfmtcat(str, "%s,job=%d,step=%d,task=%s," "host=%s value=%.2f %"PRIu64"" "\n", table->names[i], g_job->step_id.job_id, g_job->step_id.step_id, table->name, g_job->node_name, ((union data_t*)data)[i].d, (uint64_t)sample_time); break; case PROFILE_FIELD_NOT_SET: break; } } _send_data(str); xfree(str); return SLURM_SUCCESS; } extern void acct_gather_profile_p_conf_values(List *data) { config_key_pair_t *key_pair; debug3("%s %s called", plugin_type, __func__); xassert(*data); key_pair = xmalloc(sizeof(config_key_pair_t)); key_pair->name = xstrdup("ProfileInfluxDBHost"); key_pair->value = xstrdup(influxdb_conf.host); list_append(*data, key_pair); key_pair = xmalloc(sizeof(config_key_pair_t)); key_pair->name = xstrdup("ProfileInfluxDBDatabase"); key_pair->value = xstrdup(influxdb_conf.database); list_append(*data, key_pair); key_pair = xmalloc(sizeof(config_key_pair_t)); key_pair->name = xstrdup("ProfileInfluxDBDefault"); key_pair->value = xstrdup(acct_gather_profile_to_string(influxdb_conf.def)); list_append(*data, key_pair); key_pair = xmalloc(sizeof(config_key_pair_t)); key_pair->name = xstrdup("ProfileInfluxDBPass"); key_pair->value = xstrdup(influxdb_conf.password); list_append(*data, key_pair); key_pair = xmalloc(sizeof(config_key_pair_t)); key_pair->name = xstrdup("ProfileInfluxDBRTPolicy"); key_pair->value = xstrdup(influxdb_conf.rt_policy); list_append(*data, key_pair); key_pair = xmalloc(sizeof(config_key_pair_t)); key_pair->name = xstrdup("ProfileInfluxDBUser"); key_pair->value = xstrdup(influxdb_conf.username); list_append(*data, key_pair); return; } extern bool acct_gather_profile_p_is_active(uint32_t type) { debug3("%s %s called", plugin_type, __func__); if (g_profile_running <= ACCT_GATHER_PROFILE_NONE) return false; return (type == ACCT_GATHER_PROFILE_NOT_SET) || (g_profile_running & type); }