// ---------------------------------------------------------------------- // File: test-utils.hh // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #ifndef QUARKDB_TEST_UTILS_HH #define QUARKDB_TEST_UTILS_HH #include #include #include #include "Common.hh" #include "netio/AsioPoller.hh" #include "raft/RaftState.hh" #include #include #include "config/test-config.hh" #include "utils/FileUtils.hh" #include "StateMachine.hh" #include "raft/RaftJournal.hh" #include #include #include #include #include #include namespace quarkdb { //------------------------------------------------------------------------------ // Forward declarations //------------------------------------------------------------------------------ class Shard; class RaftGroup; class ShardDirectory; class RaftJournal; class RaftDispatcher; class RaftLease; class RaftDirector; class RaftCommitTracker; class RaftConfig; class RaftTrimmer; class QuarkDBNode; class RaftContactDetails; class Publisher; #define RETRY_ASSERT_TRUE_3(cond, retry, waitInterval) { \ bool ok = false; \ size_t nretries = 0; \ while(nretries++ < retry) { \ std::this_thread::sleep_for(std::chrono::milliseconds(waitInterval)); \ if((cond)) { \ qdb_info("Condition '" << #cond << "' is true after " << nretries << " attempts"); \ ok = true; \ break; \ } \ } \ if(!ok) { ASSERT_TRUE(cond) << " - failure after " << nretries << " retries "; } \ } #define RETRY_ASSERT_EQ_3(cond1, cond2, retry, waitInterval) { \ bool ok = false; \ size_t nretries = 0; \ while(nretries++ < retry) { \ std::this_thread::sleep_for(std::chrono::milliseconds(waitInterval)); \ if( (cond1) == (cond2) ) { \ qdb_info("Condition '" << #cond1 << " == " << #cond2 << " is true after " << nretries << " attempts"); \ ok = true; \ break; \ } \ } \ if(!ok) { ASSERT_EQ(cond1, cond2) << " - failure after " << nretries << " retries "; } \ } #define RETRY_ASSERT_NE_3(cond1, cond2, retry, waitInterval) { \ bool ok = false; \ size_t nretries = 0; \ while(nretries++ < retry) { \ std::this_thread::sleep_for(std::chrono::milliseconds(waitInterval)); \ if( (cond1) != (cond2) ) { \ qdb_info("Condition '" << #cond1 << " != " << #cond2 << " is true after " << nretries << " attempts"); \ ok = true; \ break; \ } \ } \ if(!ok) { ASSERT_NE(cond1, cond2) << " - failure after " << nretries << " retries "; } \ } #define RETRY_ASSERT_TRUE_SPIN(cond) RETRY_ASSERT_TRUE_3(cond, 100000, 0) #define RETRY_ASSERT_EQ_SPIN(cond1, cond2) RETRY_ASSERT_EQ_3(cond1, cond2, 100000, 0) #define NUMBER_OF_RETRIES ( (size_t) timeouts().getLow().count() * 10) // retry every 10ms #define RETRY_ASSERT_TRUE(cond) RETRY_ASSERT_TRUE_3(cond, NUMBER_OF_RETRIES, 10) #define RETRY_ASSERT_EQ(cond1, cond2) RETRY_ASSERT_EQ_3(cond1, cond2, NUMBER_OF_RETRIES, 10) #define RETRY_ASSERT_NE(cond1, cond2) RETRY_ASSERT_NE_3(cond1, cond2, NUMBER_OF_RETRIES, 10) #define RETRY_ASSERT_TRUE_20MIN(cond) RETRY_ASSERT_TRUE_3(cond, 100*60*20, 10) extern std::vector testreqs; // necessary because C macros are dumb and don't undestand // universal initialization with brackets {} template RedisRequest make_req(Args... args) { return RedisRequest { args... }; } template std::vector make_vec(Args... args) { return std::vector { args... }; } // Yes, passing a callback to QClient to convert it into a future is really // stupid, since QClient supports futures natively. This is used to test that // qclient callbacks work as they should.. class TrivialQCallback : public qclient::QCallback { public: TrivialQCallback() {} virtual ~TrivialQCallback() {} virtual void handleResponse(qclient::redisReplyPtr &&reply) override { promise.set_value(std::move(reply)); } std::future getFuture() { return promise.get_future(); } private: std::promise promise; }; class GlobalEnv : public testing::Environment { public: virtual void SetUp() override; virtual void TearDown() override; // Initialize a *clean* Shard Directory. The connection to the dbs is cached, // because even if rocksdb is local, it takes a long time to open. // (often 50+ ms) ShardDirectory* getShardDirectory(const std::string &path, RaftClusterID clusterID, const std::vector &nodes); void clearConnectionCache(); const std::string testdir = "/tmp/quarkdb-tests"; static RaftServer server(int id); private: std::map shardDirCache; }; extern GlobalEnv &commonState; // Includes everything needed to simulate a single raft-enabled server. // Everything is initialized lazily, so if you only want to test the journal for example, // this is possible, too. Just don't call eg group()->director(), and you won't have to worry // about raft messing up your variables and terms due to timeouts. class TestNode { public: TestNode(RaftServer myself, RaftClusterID clusterID, RaftTimeouts timeouts, const std::vector &nodes); ~TestNode(); QuarkDBNode* quarkdbNode(); ShardDirectory* shardDirectory(); Shard* shard(); RaftGroup* group(); AsioPoller *poller(); qclient::QClient *tunnel(); qclient::Options makeNoRedirectOptions(); std::unique_ptr makeQClientHandshake(); RaftServer myself(); std::vector nodes(); qclient::Members members(); void spinup(); void spindown(); void killTunnel(); private: RaftServer myselfSrv; RaftClusterID clusterID; std::vector initialNodes; QuarkDBNode *qdbnodeptr = nullptr; ShardDirectory *sharddirptr = nullptr; AsioPoller *pollerptr = nullptr; qclient::QClient *tunnelptr = nullptr; }; // Contains everything needed to simulate a cluster with an arbitrary number of nodes. // Everything is initialized lazily, including the nodes of the cluster themselves. class TestCluster { public: TestCluster(RaftTimeouts timeouts, RaftClusterID clusterID, const std::vector &nodes, int initialActiveNodes = -1); TestCluster(RaftClusterID clusterID, const std::vector &nodes, int initialActiveNodes = -1); ~TestCluster(); ShardDirectory* shardDirectory(int id = 0); StateMachine* stateMachine(int id = 0); RaftJournal* journal(int id = 0); RaftDispatcher *dispatcher(int id = 0); RaftState *state(int id = 0); AsioPoller *poller(int id = 0); RaftServer myself(int id = 0); RaftDirector *director(int id = 0); qclient::QClient *tunnel(int id = 0); RaftHeartbeatTracker *heartbeatTracker(int id = 0); RaftLease *lease(int id = 0); RaftCommitTracker *commitTracker(int id = 0); RaftConfig *raftconfig(int id = 0); RaftTrimmer* trimmer(int id = 0); const RaftContactDetails* contactDetails(int id = 0); Publisher* publisher(int id = 0); RaftTimeouts timeouts(); // issue manual vote, with a pre-vote test before that. // ensure pre-vote and vote match, and that pre-vote does not advance // raft state. RaftVoteResponse issueManualVote(const RaftVoteRequest &votereq, int id = 0); qclient::Options makeNoRedirectOptions(int id = 0); std::unique_ptr makeQClientHandshake(int id = 0); void killTunnel(int id = 0); // manage node state void spinup(int id); void spindown(int id); // In some tests, the latency of opening rocksdb can kill us, since by the // time the db is open raft starts timing out. // This function will prepare a node, so that spinning it up later is instant. void prepare(int id); // initialize nodes using information passed on the nodes variable, except if srv is set TestNode* node(int id = 0, const RaftServer &srv = {}); std::vector nodes(int id = 0); qclient::Members members(int id = 0); RaftClusterID clusterID(); size_t getClusterSize() const; template bool checkValueConsensus(const std::string &key, const std::string &value, const Args... args) { std::vector arguments = { args... }; for(size_t i = 0; i < arguments.size(); i++) { std::string tmp; rocksdb::Status st = stateMachine(arguments[i])->get(key, tmp); if(!st.ok()) return false; if(tmp != value) return false; } return true; } // Check whether the given nodes have reached full consensus - this means: // - State consensus // - All journals are the same size, with all entries committed // - All state machines have already applied all entries in the journal template bool checkFullConsensus(const Args... args) { if(!checkStateConsensusQuiet(args...)) return false; std::vector arguments = { args... }; LogIndex targetEntry = journal(arguments[0])->getLogSize() - 1; // Ensure all journals and state machines are at 'targetEntry' for(size_t i = 0; i < arguments.size(); i++) { if(journal(arguments[i])->getLogSize()-1 != targetEntry) return false; if(journal(arguments[i])->getCommitIndex() != targetEntry) return false; if(stateMachine(arguments[i])->getLastApplied() != targetEntry) return false; } qdb_info("Achieved full consensus with journal size " << targetEntry); return true; } template bool crossCheckJournals(const Args... args) { // Check journal contents, validate they're equal across all nodes. // If one journal was trimmed further than some other, we only check the // entries which exist across all journals. std::vector arguments = { args... }; LogIndex endingPoint = journal(arguments[0])->getLogSize(); LogIndex startingPoint = journal(arguments[0])->getLogStart(); for(size_t i = 0; i < arguments.size(); i++) { startingPoint = std::max(startingPoint, journal(arguments[i])->getLogStart()); } qdb_info("Cross-checking journals from entry #" << startingPoint << " to #" << endingPoint - 1); for(LogIndex index = startingPoint; index < endingPoint; index++) { RaftEntry entry; rocksdb::Status st = journal(arguments[0])->fetch(index, entry); if(!st.ok()) return false; if(!validateSingleEntry(index, entry.term, entry.request, args...)) { return false; } } qdb_info("Journal cross-checking successful!"); return true; } template bool validateSingleEntry(LogIndex index, RaftTerm term, const RedisRequest &request, const Args... args) { std::vector arguments = { args... }; for(size_t i = 0; i < arguments.size(); i++) { RaftEntry entry; rocksdb::Status st = journal(arguments[i])->fetch(index, entry); if(!st.ok()) return false; if(entry.request != request) { return false; } if(term >= 0 && entry.term != term) { return false; } } return true; } template bool checkStateConsensusWithSnapshots(bool quiet, std::vector &snapshots, const Args... args) { std::vector arguments = { args... }; snapshots.resize(arguments.size()); for(size_t i = 0; i < arguments.size(); i++) { snapshots[i] = state(arguments[i])->getSnapshot(); } for(size_t i = 1; i < snapshots.size(); i++) { if(snapshots[i]->leader.empty() || snapshots[i-1]->leader.empty()) return false; if(snapshots[i]->term != snapshots[i-1]->term) return false; if(snapshots[i]->leader != snapshots[i-1]->leader) return false; } // Exactly one should be leader, others followers size_t leaders = 0; size_t followers = 0; for(size_t i = 0; i < snapshots.size(); i++) { if(snapshots[i]->status == RaftStatus::LEADER) { leaders++; } else if(snapshots[i]->status == RaftStatus::FOLLOWER) { followers++; } } if(leaders != 1u) { return false; } if(followers != snapshots.size() - 1) { return false; } if(!quiet) { qdb_info("Achieved state consensus for term " << snapshots[0]->term << " with leader " << snapshots[0]->leader.toString()); } return true; } template bool checkStateConsensus(const Args... args) { std::vector snapshots; return checkStateConsensusWithSnapshots(false, snapshots, args...); } template bool checkStateConsensusQuiet(const Args... args) { std::vector snapshots; return checkStateConsensusWithSnapshots(true, snapshots, args...); } int getServerID(const RaftServer &srv); std::vector retrieveLeaders(); int getLeaderID(); qclient::SubscriptionOptions reasonableSubscriptionOptions(bool pushtypes = false); private: std::string rocksdbPath(int id = 0); RaftClusterID clusterid; RaftTimeouts clusterTimeouts; // The list of nodes which are initially part of the cluster. std::vector initialNodes; // The complete list of nodes we're playing with. Not all of them may be // officially part of the cluster, at first. std::vector allNodes; std::map testnodes; }; // Given a test cluster, shut down the leader on regular intervals. class ClusterDestabilizer { public: ClusterDestabilizer(TestCluster *testCluster); ~ClusterDestabilizer(); void main(ThreadAssistant &assistant); private: TestCluster *mTestCluster; AssistedThread mThread; }; // Convenience classes. Want to run tests on a simulated cluster of 3 nodes? // Inherit your test fixture from here. class TestCluster3Nodes : public TestCluster { public: TestCluster3Nodes() : TestCluster("a9b9e979-5428-42e9-8a52-f675c39fdf80", { GlobalEnv::server(0), GlobalEnv::server(1), GlobalEnv::server(2) }) { }; }; // Just like the above, but with relaxed raft timeouts class TestCluster3NodesRelaxedTimeouts : public TestCluster { public: TestCluster3NodesRelaxedTimeouts() : TestCluster(relaxedTimeouts, "a9b9e979-5428-42e9-8a52-f675c39fdf80", { GlobalEnv::server(0), GlobalEnv::server(1), GlobalEnv::server(2) }) { }; }; class TestCluster5Nodes : public TestCluster { public: TestCluster5Nodes() : TestCluster("a9b9e979-5428-42e9-8a52-f675c39fdf80", { GlobalEnv::server(0), GlobalEnv::server(1), GlobalEnv::server(2), GlobalEnv::server(3), GlobalEnv::server(4) }) { }; }; // A fixture which provides up to 10 raft nodes, but is initialized with // just a single one. class TestCluster10Nodes1Initial : public TestCluster { public: TestCluster10Nodes1Initial() : TestCluster("a9b9e979-5428-42e9-8a52-f675c39fdf80", { GlobalEnv::server(0), GlobalEnv::server(1), GlobalEnv::server(2), GlobalEnv::server(3), GlobalEnv::server(4), GlobalEnv::server(5), GlobalEnv::server(6), GlobalEnv::server(7), GlobalEnv::server(8), GlobalEnv::server(9) }, 1) { }; }; class TestCluster3NodesFixture : public TestCluster3Nodes, public ::testing::Test {}; class TestCluster3NodesRelaxedTimeoutsFixture : public TestCluster3NodesRelaxedTimeouts, public ::testing::Test {}; class TestCluster5NodesFixture : public TestCluster5Nodes, public ::testing::Test {}; class TestCluster10Nodes1InitialFixture : public TestCluster10Nodes1Initial, public ::testing::Test {}; class SocketListener { private: int s; struct sockaddr_in remote; public: SocketListener(int port) { struct addrinfo hints, *servinfo, *p; int rv, yes = 1; memset(&hints, 0, sizeof hints); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; if ((rv = getaddrinfo(NULL, std::to_string(port).c_str(), &hints, &servinfo)) != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); exit(1); } // loop through all the results and bind to the first we can for(p = servinfo; p != NULL; p = p->ai_next) { if ((s = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { perror("server: socket"); continue; } if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes,sizeof(int)) == -1) { perror("setsockopt"); exit(1); } if (bind(s, p->ai_addr, p->ai_addrlen) == -1) { close(s); perror("server: bind"); continue; } break; } freeaddrinfo(servinfo); // all done with this structure if (p == NULL) { fprintf(stderr, "server: failed to bind\n"); exit(1); } if (listen(s, 10) == -1) { perror("listen"); exit(1); } } ~SocketListener() { ::shutdown(s, SHUT_RDWR); close(s); } int accept() { socklen_t remoteSize = sizeof(remote); return ::accept(s, (struct sockaddr *)&remote, &remoteSize); } }; class IptablesHelper { public: bool singleDropPackets(int port); bool singleAcceptPackets(int port); }; } #endif