// ----------------------------------------------------------------------
// File: quarkdb-server.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "utils/Macros.hh"
#include "utils/FileUtils.hh"
#include "utils/InFlightTracker.hh"
#include "Configuration.hh"
#include "QuarkDBNode.hh"
#include "netio/AsioPoller.hh"
#include "ShardDirectory.hh"
#include "StateMachine.hh"
#include "EventFD.hh"
#include "../deps/CLI11.hpp"
#include "utils/Uuid.hh"
#include "utils/DirectoryIterator.hh"
#include
namespace quarkdb {
InFlightTracker inFlightTracker;
EventFD shutdownFD;
static void handle_sigint(int sig) {
inFlightTracker.setAcceptingRequests(false);
shutdownFD.notify();
}
void createTestCluster(const std::string &optConfigurationDir, int port) {
quarkdb::mkpath_or_die(optConfigurationDir, 0755);
std::string clusterID = generateUuid();
std::vector nodes;
for(size_t i = 0; i < 3; i++) {
nodes.emplace_back("localhost", port+i);
}
for(size_t i = 0; i < 3; i++) {
std::string dataDir = SSTR(optConfigurationDir << "data-" << port);
std::string configDir = SSTR(optConfigurationDir << "config-" << port);
std::ostringstream ss;
ss << "xrd.port " << port << std::endl;
ss << "xrd.protocol redis:" << port << "libXrdQuarkDB.so" << std::endl;
ss << "redis.mode raft" << std::endl;
ss << "redis.database " << dataDir << std::endl;
ss << "redis.myself localhost:" << port << std::endl;
write_file_or_die(configDir, ss.str());
std::unique_ptr shardDirectory;
quarkdb::Status st;
shardDirectory.reset(quarkdb::ShardDirectory::create(dataDir, clusterID, "default", nodes, 0, FsyncPolicy::kSyncImportantUpdates, {}, st));
port++;
}
}
int runCluster(const std::string &execPath, const std::string &optConfigurationDir, int port) {
std::string err;
if(!quarkdb::directoryExists(optConfigurationDir, err)) {
createTestCluster(optConfigurationDir, port);
}
//----------------------------------------------------------------------------
// Locate configuraiton files
//----------------------------------------------------------------------------
std::vector configurationFiles;
quarkdb::DirectoryIterator iter(optConfigurationDir);
struct dirent *entry;
while( (entry = iter.next()) ) {
if(entry->d_type != DT_DIR && StringUtils::startsWith(entry->d_name, "config-")) {
configurationFiles.emplace_back(SSTR(optConfigurationDir << entry->d_name));
}
}
//----------------------------------------------------------------------------
// Create new tmux session..
//----------------------------------------------------------------------------
std::string sessionId = generateUuid();
(void) !system(SSTR("tmux -2 new-session -d -s \"" << sessionId << "\"").c_str());
for(size_t i = 0; i < configurationFiles.size(); i++) {
if(i != 0) {
(void) !system("tmux split-window -v");
}
(void) !system(SSTR("tmux send-keys \"" << execPath << " --configuration " << configurationFiles[i] << " \" C-m").c_str());
}
(void) !system("tmux select-layout even-vertical");
execlp("tmux", "tmux", "-2", "attach-session", "-d", (char*) NULL);
return errno;
}
int runServer(const std::string &optConfiguration) {
//----------------------------------------------------------------------------
// Read configuration file, check validity..
//----------------------------------------------------------------------------
Configuration configuration;
bool success = Configuration::fromFile(optConfiguration, configuration);
if(!success) return 1;
if(configuration.getMode() != Mode::raft) {
qdb_throw("standalone mode not supported in quarkdb-server yet, sorry");
}
//----------------------------------------------------------------------------
// Let's get this party started
//----------------------------------------------------------------------------
std::unique_ptr node(new QuarkDBNode(configuration, defaultTimeouts));
std::unique_ptr poller(new AsioPoller(configuration.getMyself().port, 10, node.get()));
signal(SIGINT, handle_sigint);
signal(SIGTERM, handle_sigint);
while(inFlightTracker.isAcceptingRequests()) {
shutdownFD.wait();
}
//----------------------------------------------------------------------------
// Time to shut down
//----------------------------------------------------------------------------
qdb_event("Received request to shut down. Waiting until all requests in flight (" << inFlightTracker.getInFlight() << ") have been processed..");
poller.reset();
node.reset();
qdb_event("SHUTTING DOWN");
return 0;
}
}
struct FileExistenceValidator : public CLI::Validator {
FileExistenceValidator() : Validator("PATH") {
func_ = [](const std::string &path) {
std::string err;
if(!quarkdb::fileExists(path, err)) {
return SSTR("Path '" << path << "' does not exist, or is not a file.");
}
return std::string();
};
}
};
int main(int argc, char** argv) {
//----------------------------------------------------------------------------
// Setup variables
//----------------------------------------------------------------------------
CLI::App app("QuarkDB is a distributed datastore with a redis-like API. quarkdb-server is the main server executable.");
FileExistenceValidator fileExistenceValidator;
std::string optConfiguration;
std::string optConfigurationDir;
int clusterPort = 4444;
//----------------------------------------------------------------------------
// Setup options
//----------------------------------------------------------------------------
app.add_option("--configuration", optConfiguration, "Path to configuration file")
->check(fileExistenceValidator);
app.add_option("--configuration-dir", optConfigurationDir, "Path to configuration directory to launch local test cluster - requires to have tmux installed.");
app.add_option("--test-cluster-port", clusterPort, "The port to use when creating a local test cluster -- ignored if cluster configuration already existed.");
//----------------------------------------------------------------------------
// Parse..
//----------------------------------------------------------------------------
try {
app.parse(argc, argv);
if(optConfiguration.empty() && optConfigurationDir.empty()) {
std::cerr << "Either --configuration or --configuration-dir must be specified." << std::endl;
return 1;
}
} catch (const CLI::ParseError &e) {
return app.exit(e);
}
//----------------------------------------------------------------------------
// Run cluster
//----------------------------------------------------------------------------
if(!optConfigurationDir.empty()) {
if(optConfigurationDir[optConfigurationDir.size()-1] != '/') {
optConfigurationDir += "/";
}
return quarkdb::runCluster(argv[0], optConfigurationDir, clusterPort);
}
//----------------------------------------------------------------------------
// Run server.
//----------------------------------------------------------------------------
if(!optConfiguration.empty()) {
return quarkdb::runServer(optConfiguration);
}
qdb_throw("should never reach here");
}