// /************************************************************************
// * EOS - the CERN Disk Storage System *
// * Copyright (C) 2022 CERN/Switzerland *
// * *
// * This program is free software: you can redistribute it and/or modify *
// * it under the terms of the GNU General Public License as published by *
// * the Free Software Foundation, either version 3 of the License, or *
// * (at your option) any later version. *
// * *
// * This program is distributed in the hope that it will be useful, *
// * but WITHOUT ANY WARRANTY; without even the implied warranty of *
// * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
// * GNU General Public License for more details. *
// * *
// * You should have received a copy of the GNU General Public License *
// * along with this program. If not, see .*
// ************************************************************************
//
#include "common/async/ExecutorMgr.hh"
#include "unit_tests/common/async/FollyExecutorFixture.hh"
#include
TEST(ExecutorMgr, Construction)
{
eos::common::ExecutorMgr mgr("std", 2);
ASSERT_TRUE(mgr.IsThreadPool());
ASSERT_FALSE(mgr.IsFollyExecutor());
eos::common::ExecutorMgr mgr2("folly", 2);
ASSERT_FALSE(mgr2.IsThreadPool());
ASSERT_TRUE(mgr2.IsFollyExecutor());
}
TEST(ExecutorMgr, ThreadPool)
{
eos::common::ExecutorMgr mgr("std", 3, 3);
ASSERT_TRUE(mgr.IsThreadPool());
std::vector> futures;
for (int i = 0; i < 10; i++) {
auto future = mgr.PushTask(
[] {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
return std::this_thread::get_id();
}
);
futures.emplace_back(std::move(future));
}
std::set threadIds;
for (auto && future : futures) {
threadIds.insert(future.getValue());
}
// Check if we have exactly 3 different thread ids
ASSERT_EQ(3, threadIds.size());
}
TEST_F(FollyExecutor_F, IOThreadPoolExecutorTests)
{
eos::common::ExecutorMgr mgr(folly_io_executor);
ASSERT_TRUE(mgr.IsFollyExecutor());
std::vector> futures;
for (int i = 0; i < 10; i++) {
auto future = mgr.PushTask(
[] {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
return std::this_thread::get_id();
}
);
futures.emplace_back(std::move(future));
}
std::set threadIds;
for (auto && future : futures) {
threadIds.insert(future.getValue());
}
// Check if we have exactly 4 different thread ids
ASSERT_EQ(kNumThreads, threadIds.size());
}
TEST_F(FollyExecutor_F, CPUThreadPoolExecutorTests)
{
eos::common::ExecutorMgr mgr(folly_cpu_executor);
ASSERT_TRUE(mgr.IsFollyExecutor());
std::vector> futures;
for (int i = 0; i < 10; i++) {
auto future = mgr.PushTask(
[] {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
return std::this_thread::get_id();
}
);
futures.emplace_back(std::move(future));
}
std::set threadIds;
for (auto && future : futures) {
threadIds.insert(future.getValue());
}
// Check if we have exactly 4 different thread ids
ASSERT_EQ(kNumThreads, threadIds.size());
}
TEST(ExecutorMgr, ThreadPoolShutdown)
{
eos::common::ExecutorMgr mgr("std",2,4);
ASSERT_TRUE(mgr.IsThreadPool());
std::atomic counter{0};
for (int i = 0; i < 100; i++) {
mgr.PushTask(
[&counter] {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
counter++;
}
);
}
// currently the tasks shouldn't complete just yet!
ASSERT_GT(100, counter);
mgr.Shutdown();
ASSERT_EQ(100, counter);
std::cout << "common::ThreadPool executed " << counter << " tasks" << std::endl;
}
TEST_F(FollyExecutor_F, IOThreadPoolShutdown)
{
eos::common::ExecutorMgr mgr(folly_io_executor);
ASSERT_TRUE(mgr.IsFollyExecutor());
std::atomic counter{0};
for (int i = 0; i < 100; i++) {
mgr.PushTask(
[&counter] {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
counter++;
}
);
}
ASSERT_GT(100, counter);
mgr.Shutdown();
// There is no stopping the IOThreadPoolExecutor!!!
ASSERT_EQ(100, counter); // 100 tasks should have been executed
std::cout << "folly::IOThreadPoolExecutor executed " << counter << " tasks" << std::endl;
}
TEST_F(FollyExecutor_F, CPUThreadPoolShutdown)
{
eos::common::ExecutorMgr mgr(folly_cpu_executor);
ASSERT_TRUE(mgr.IsFollyExecutor());
std::atomic counter{0};
for (int i = 0; i < 100; i++) {
mgr.PushTask(
[&counter] {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
counter++;
}
);
}
ASSERT_GT(100, counter);
mgr.Shutdown();
// CPU ThreadPool supports true cancellation!
ASSERT_GT(100, counter);
std::cout << "folly::CPUThreadPoolExecutor executed " << counter << " tasks" << std::endl;
}