/** * @project The CERN Tape Archive (CTA) * @copyright Copyright © 2023 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "scheduler/PostgresSchedDB/Helpers.hpp" #include "scheduler/PostgresSchedDB/sql/RetrieveJobSummary.hpp" #include namespace cta::postgresscheddb { //------------------------------------------------------------------------------ // Helpers::g_tapeStatuses //------------------------------------------------------------------------------ std::map Helpers::g_tapeStatuses; //------------------------------------------------------------------------------ // Helpers::g_retrieveQueueStatisticsMutex //------------------------------------------------------------------------------ cta::threading::Mutex Helpers::g_retrieveQueueStatisticsMutex; //------------------------------------------------------------------------------ // Helpers::g_retrieveQueueStatistics //------------------------------------------------------------------------------ std::map Helpers::g_retrieveQueueStatistics; std::string Helpers::selectBestVid4Retrieve ( const std::set &candidateVids, cta::catalogue::Catalogue &catalogue, postgresscheddb::Transaction &txn, bool isRepack ) { // We will build the retrieve stats of the non-disabled, non-broken/exported candidate vids here std::list candidateVidsStats; // We will build the retrieve stats of the disabled vids here, as a fallback std::list candidateVidsStatsFallback; // Take the global lock cta::threading::MutexLocker grqsmLock(g_retrieveQueueStatisticsMutex); // Ensure the tape status cache contains all the entries we need try { for(auto& v : candidateVids) { // throw std::out_of_range() if cache item not found or if it is stale auto timeSinceLastUpdate = time(nullptr) - g_tapeStatuses.at(v).updateTime; if(timeSinceLastUpdate > c_tapeCacheMaxAge) { throw std::out_of_range(""); } } } catch (std::out_of_range&) { // Remove stale cache entries for(auto it = g_tapeStatuses.cbegin(); it != g_tapeStatuses.cend(); ) { auto timeSinceLastUpdate = time(nullptr) - it->second.updateTime; if(timeSinceLastUpdate > c_tapeCacheMaxAge) { it = g_tapeStatuses.erase(it); } else { ++it; } } // Add in all the entries we need for this batch of candidates auto tapeStatuses = catalogue.Tape()->getTapesByVid(candidateVids); for(auto& ts : tapeStatuses) { g_tapeStatuses[ts.first].tapeStatus = ts.second; g_tapeStatuses[ts.first].updateTime = time(nullptr); } } // Find the vids to be fetched (if any) for (auto & v: candidateVids) { try { // Out of range or outdated will be updated the same way. // If an update is in progress, we wait on it, and get the result after. // We have to release the global lock while doing so. if (g_retrieveQueueStatistics.at(v).updating) { logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v),"g_retrieveQueueStatistics.at(v).updating"); // Cache is updating, we wait on update. auto updateFuture = g_retrieveQueueStatistics.at(v).updateFuture; grqsmLock.unlock(); updateFuture.wait(); grqsmLock.lock(); if ((g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::ACTIVE && !isRepack) || (g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::REPACKING && isRepack)) { logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v), "(g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::ACTIVE && !isRepack) " "|| (g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::REPACKING && isRepack)"); candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats); } else if ((g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::DISABLED && !isRepack) || (g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::REPACKING_DISABLED && isRepack)) { logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v), "(g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::DISABLED && !isRepack) " "|| (g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::REPACKING_DISABLED && isRepack)"); candidateVidsStatsFallback.emplace_back(g_retrieveQueueStatistics.at(v).stats); } } else { // We have a cache hit, check it's not stale. time_t timeSinceLastUpdate = time(nullptr) - g_retrieveQueueStatistics.at(v).updateTime; if (timeSinceLastUpdate > c_retrieveQueueCacheMaxAge) { logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v), "timeSinceLastUpdate ("+std::to_string(timeSinceLastUpdate)+")> c_retrieveQueueCacheMaxAge (" + std::to_string(c_retrieveQueueCacheMaxAge)+"), cache needs to be updated"); throw std::out_of_range(""); } logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v), "Cache is not updated, timeSinceLastUpdate (" + std::to_string(timeSinceLastUpdate) + ") <= c_retrieveQueueCacheMaxAge (" + std::to_string(c_retrieveQueueCacheMaxAge) + ")"); // We're lucky: cache hit (and not stale) if ((g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::ACTIVE && !isRepack) || (g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::REPACKING && isRepack)) { candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats); } else if ((g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::DISABLED && !isRepack) || (g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::REPACKING_DISABLED && isRepack)) { candidateVidsStatsFallback.emplace_back(g_retrieveQueueStatistics.at(v).stats); } } } catch (std::out_of_range &) { // We need to update the entry in the cache (miss or stale, we handle the same way). // We just update one vid at a time as doing several in parallel would be quite // hairy lock-wise (but give a slight performance boost). g_retrieveQueueStatistics[v].updating = true; std::promise updatePromise; g_retrieveQueueStatistics[v].updateFuture = updatePromise.get_future(); // Get the cached tape status value before releasing the lock if(g_tapeStatuses.find(v) == g_tapeStatuses.end()) { // Handle corner case where there are two candidate vids and the second candidate was evicted because it is stale auto tapeStatuses = catalogue.Tape()->getTapesByVid(v); if(tapeStatuses.size() != 1) { throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): candidate vid not found in the TAPE table."); } g_tapeStatuses[v].tapeStatus = tapeStatuses.begin()->second; g_tapeStatuses[v].updateTime = time(nullptr); } common::dataStructures::Tape tapeStatus = g_tapeStatuses.at(v).tapeStatus; // Give other threads a chance to access the cache for other vids. grqsmLock.unlock(); // Build a minimal service retrieve file queue criteria to query queues common::dataStructures::RetrieveFileQueueCriteria rfqc; common::dataStructures::TapeFile tf; tf.copyNb = 1; tf.vid = v; rfqc.archiveFile.tapeFiles.push_back(tf); auto queuesStats=Helpers::getRetrieveQueueStatistics(rfqc, {v}, txn); // We now have the data we need. Update the cache. grqsmLock.lock(); g_retrieveQueueStatistics[v].updating=false; g_retrieveQueueStatistics[v].updateFuture=std::shared_future(); // Check size of stats if (queuesStats.size()!=1) throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for queueStats."); if (queuesStats.front().vid!=v) throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in queueStats."); g_retrieveQueueStatistics[v].stats = queuesStats.front(); g_retrieveQueueStatistics[v].tapeStatus = tapeStatus; g_retrieveQueueStatistics[v].updateTime = time(nullptr); logUpdateCacheIfNeeded(true,g_retrieveQueueStatistics[v]); // Signal to potential waiters updatePromise.set_value(); // Update our own candidate list if needed. if ((g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::ACTIVE && !isRepack) || (g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::REPACKING && isRepack)) { candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats); } else if ((g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::DISABLED && !isRepack) || (g_retrieveQueueStatistics.at(v).tapeStatus.state == common::dataStructures::Tape::REPACKING_DISABLED && isRepack)) { candidateVidsStatsFallback.emplace_back(g_retrieveQueueStatistics.at(v).stats); } } } // We now have all the candidates listed (if any). if (candidateVidsStats.empty()) { if (candidateVidsStatsFallback.empty()) { throw NoTapeAvailableForRetrieve("In Helpers::selectBestRetrieveQueue(): no tape available to recall from."); } // If `candidateVidsStats` is empty, insert the DISABLED tapes candidateVidsStats.insert(candidateVidsStats.end(), candidateVidsStatsFallback.begin(), candidateVidsStatsFallback.end()); } // Sort the tapes. candidateVidsStats.sort(SchedulerDatabase::RetrieveQueueStatistics::leftGreaterThanRight); // Get a list of equivalent best tapes std::set shortSetVids; for (auto & s: candidateVidsStats) { if (!(scandidateVidsStats.front())) shortSetVids.insert(s.vid); } // If there is only one best tape, we're done if (shortSetVids.size()==1) return *shortSetVids.begin(); // There are several equivalent entries, choose one among them based on the // number of days since epoch std::vector shortListVids(shortSetVids.begin(), shortSetVids.end()); std::sort(shortListVids.begin(), shortListVids.end()); const time_t secondsSinceEpoch = time(nullptr); const uint64_t daysSinceEpoch = secondsSinceEpoch / (60*60*24); return shortListVids[daysSinceEpoch % shortListVids.size()]; } void Helpers::logUpdateCacheIfNeeded ( const bool entryCreation, const RetrieveQueueStatisticsWithTime &tapeStatistic, const std::string &message) { #ifdef HELPERS_CACHE_UPDATE_LOGGING std::ofstream logFile(HELPERS_CACHE_UPDATE_LOGGING_FILE, std::ofstream::app); std::time_t end_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); // Chomp newline in the end std::string date = std::ctime(&end_time); date.erase(std::remove(date.begin(), date.end(), '\n'), date.end()); logFile << date << " pid=" << ::getpid() << " tid=" << syscall(SYS_gettid) << " message=" << message << " entryCreation="<< entryCreation <<" vid=" << tapeStatistic.tapeStatus.vid << " state=" << common::dataStructures::Tape::stateToString(tapeStatistic.tapeStatus.state) << " filesQueued=" << tapeStatistic.stats.filesQueued << std::endl; #endif // HELPERS_CACHE_UPDATE_LOGGING } std::list Helpers::getRetrieveQueueStatistics ( const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, const std::set &vidsToConsider, postgresscheddb::Transaction &txn) { std::list ret; for (auto &tf:criteria.archiveFile.tapeFiles) { if (!vidsToConsider.count(tf.vid)) continue; rdbms::Rset summary = cta::postgresscheddb::sql::RetrieveJobSummaryRow::selectVid( tf.vid, common::dataStructures::JobQueueType::JobsToTransferForUser, txn ); if (!summary.next()) { ret.push_back(SchedulerDatabase::RetrieveQueueStatistics()); ret.back().vid=tf.vid; ret.back().bytesQueued=0; ret.back().currentPriority=0; ret.back().filesQueued=0; continue; } cta::postgresscheddb::sql::RetrieveJobSummaryRow rjs(summary); ret.push_back(SchedulerDatabase::RetrieveQueueStatistics()); ret.back().vid=rjs.vid; ret.back().currentPriority=rjs.priority; ret.back().bytesQueued=rjs.jobsTotalSize; ret.back().filesQueued=rjs.jobsCount; updateRetrieveQueueStatisticsCache(rjs.vid, rjs.jobsCount, rjs.jobsTotalSize, rjs.priority); } return ret; } //------------------------------------------------------------------------------ // Helpers::updateRetrieveQueueStatisticsCache() //------------------------------------------------------------------------------ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_t files, uint64_t bytes, uint64_t priority) { // We will not update the status of the tape if we already cached it (caller did not check), // We will also not update the update time, to force an update after a while. // If we update the entry while another thread is updating it, this is harmless (cache users will // anyway wait, and just not profit from our update. threading::MutexLocker ml(g_retrieveQueueStatisticsMutex); try { g_retrieveQueueStatistics.at(vid).stats.filesQueued=files; g_retrieveQueueStatistics.at(vid).stats.bytesQueued=bytes; g_retrieveQueueStatistics.at(vid).stats.currentPriority = priority; logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(vid)); } catch (std::out_of_range &) { // The entry is missing. We just create it. g_retrieveQueueStatistics[vid].stats.filesQueued=files; g_retrieveQueueStatistics[vid].stats.bytesQueued=bytes; g_retrieveQueueStatistics[vid].stats.currentPriority=priority; g_retrieveQueueStatistics[vid].stats.vid=vid; g_retrieveQueueStatistics[vid].updating = false; g_retrieveQueueStatistics[vid].updateTime = time(nullptr); try { // Use the cached tape status if we have it, otherwise fake it g_retrieveQueueStatistics[vid].tapeStatus = g_tapeStatuses.at(vid).tapeStatus; } catch(std::out_of_range&) { g_retrieveQueueStatistics[vid].tapeStatus.state = common::dataStructures::Tape::ACTIVE; g_retrieveQueueStatistics[vid].tapeStatus.full = false; } logUpdateCacheIfNeeded(true,g_retrieveQueueStatistics[vid]); } } void Helpers::flushRetrieveQueueStatisticsCacheForVid(const std::string & vid){ threading::MutexLocker ml(g_retrieveQueueStatisticsMutex); g_retrieveQueueStatistics.erase(vid); g_tapeStatuses.erase(vid); } } // namespace cta::postgresscheddb