//------------------------------------------------------------------------------ // File: EnvironmentReader.cc // Author: Georgios Bitzes - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "EnvironmentReader.hh" //------------------------------------------------------------------------------ //! Constructor - launch a thread pool with the specified number of threads //------------------------------------------------------------------------------ EnvironmentReader::EnvironmentReader(size_t nthreads) { //---------------------------------------------------------------------------- // Start up our thread pool. //---------------------------------------------------------------------------- for (size_t i = 0; i < nthreads; i++) { threads.emplace_back(&EnvironmentReader::worker, this); } //---------------------------------------------------------------------------- // Wait until all threads have been properly spawned - this allows us to // assume all threads are active in the destructor. //---------------------------------------------------------------------------- while (threadsAlive != nthreads); } void EnvironmentReader::inject(pid_t pid, const Environment& env, std::chrono::milliseconds artificialDelay) { SimulatedResponse simulated; simulated.env = env; simulated.artificialDelay = artificialDelay; std::lock_guard lock(injectionMtx); injections[pid] = simulated; } void EnvironmentReader::removeInjection(pid_t pid) { std::lock_guard lock(injectionMtx); injections.erase(pid); } void EnvironmentReader::fillFromInjection(pid_t pid, Environment& env) { SimulatedResponse response; { std::lock_guard lock(injectionMtx); auto it = injections.find(pid); if (it == injections.end()) { return; } response = it->second; } std::this_thread::sleep_for(response.artificialDelay); env = response.env; } EnvironmentReader::~EnvironmentReader() { //---------------------------------------------------------------------------- // Spin until all threads are done, and join. //---------------------------------------------------------------------------- shutdown = true; while (threadsAlive != 0) { queueCV.notify_all(); } for (size_t i = 0; i < threads.size(); i++) { threads[i].join(); } } //------------------------------------------------------------------------------ // Each worker loops on the queue, waiting for pending requests to fulfill. //------------------------------------------------------------------------------ void EnvironmentReader::worker() { threadsAlive++; std::unique_lock lock(mtx); while (!shutdown) { //------------------------------------------------------------------------ // Is there an item for me to process? //------------------------------------------------------------------------ if (!requestQueue.empty()) { QueuedRequest request = std::move(requestQueue.front()); requestQueue.pop(); lock.unlock(); //------------------------------------------------------------------------ // Yes, I have work to do. Start timing how long it takes to receive // a response from the kernel. //------------------------------------------------------------------------ std::chrono::high_resolution_clock::time_point startTime = std::chrono::high_resolution_clock::now(); Environment env; //------------------------------------------------------------------------ // Provide simulated or real response? //------------------------------------------------------------------------ if (injections.empty()) { //---------------------------------------------------------------------- // Real response, read environment. If a (temporary) kernel deadlock // occurs, it will be at this point. Provide simulated or real response? //---------------------------------------------------------------------- env.fromFile(SSTR("/proc/" << request.pid << "/environ")); } else { //---------------------------------------------------------------------- // Simulation //---------------------------------------------------------------------- fillFromInjection(request.pid, env); } //---------------------------------------------------------------------- // Measure how long it took, issue warning if too high. //---------------------------------------------------------------------- std::chrono::high_resolution_clock::time_point endTime = std::chrono::high_resolution_clock::now(); std::chrono::milliseconds duration = std::chrono::duration_cast(endTime - startTime); if (duration.count() > 5) { eos_static_notice("Reading /proc/%d/environ took %dms (uid=%d)", request.pid, duration.count(), request.uid); } //------------------------------------------------------------------------ // It's over, it's done. Give back result. //------------------------------------------------------------------------ lock.lock(); auto it = pendingRequests.find(request.pid); if (it == pendingRequests.end()) { eos_static_crit("EnvironmentReader queue corruption, unable to find entry for pid %d", request.pid); } else { pendingRequests.erase(it); } request.promise.set_value(env); //------------------------------------------------------------------------ // Process next item in the queue, no waiting. //------------------------------------------------------------------------ } else { //------------------------------------------------------------------------ // No work to do, sleep. //------------------------------------------------------------------------ queueCV.wait_for(lock, std::chrono::seconds(1)); } } threadsAlive--; } //------------------------------------------------------------------------------ // Request to retrieve the environmnet variables for the given pid. // // Returns a FutureEnvironment object, which _might_ be kernel-deadlocked, // and must be waited-for with a timeout. //------------------------------------------------------------------------------ FutureEnvironment EnvironmentReader::stageRequest(pid_t pid, uid_t uid) { std::unique_lock lock(mtx); eos_static_debug("Staging request to read environment of pid %d for %d", pid, uid); //---------------------------------------------------------------------------- //! Check: Is this request already pending? If so, give back the same //! response, connected to the same promise object. //---------------------------------------------------------------------------- auto it = pendingRequests.find(pid); if (it != pendingRequests.end()) { eos_static_debug("Request to read environment for pid %d already staged", pid); return it->second; } //---------------------------------------------------------------------------- //! Nope, stage it //---------------------------------------------------------------------------- QueuedRequest request; FutureEnvironment response; request.pid = pid; request.uid = uid; response.contents = request.promise.get_future(); response.queuedSince = std::chrono::high_resolution_clock::now(); pendingRequests[pid] = response; requestQueue.push(std::move(request)); eos_static_debug("Queueing request to read environment for pid %d, notifying workers", pid); queueCV.notify_all(); return response; }