fastcgi++
|
00001 00002 /*************************************************************************** 00003 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org] * 00004 * * 00005 * This file is part of fastcgi++. * 00006 * * 00007 * fastcgi++ is free software: you can redistribute it and/or modify it * 00008 * under the terms of the GNU Lesser General Public License as published * 00009 * by the Free Software Foundation, either version 3 of the License, or (at * 00010 * your option) any later version. * 00011 * * 00012 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT * 00013 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * 00014 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public * 00015 * License for more details. * 00016 * * 00017 * You should have received a copy of the GNU Lesser General Public License * 00018 * along with fastcgi++. If not, see <http://www.gnu.org/licenses/>. * 00019 ****************************************************************************/ 00020 00021 00022 #ifndef MANAGER_HPP 00023 #define MANAGER_HPP 00024 00025 #include <map> 00026 #include <string> 00027 #include <queue> 00028 #include <algorithm> 00029 #include <cstring> 00030 00031 #include <boost/bind.hpp> 00032 #include <boost/shared_ptr.hpp> 00033 #include <boost/thread.hpp> 00034 #include <boost/thread/shared_mutex.hpp> 00035 00036 #include <signal.h> 00037 00038 #include <fastcgi++/protocol.hpp> 00039 #include <fastcgi++/transceiver.hpp> 00040 00042 namespace Fastcgipp 00043 { 00045 00055 class ManagerPar 00056 { 00057 public: 00059 00070 ManagerPar(int fd, const boost::function<void(Protocol::FullId, Message)>& sendMessage_, bool doSetupSignals); 00071 00072 ~ManagerPar() { instance=0; } 00073 00075 00084 void stop(); 00085 00087 00095 void terminate(); 00096 00098 00105 void setupSignals(); 00106 00108 size_t getMessagesSize() const { return messages.size(); } 00109 00110 protected: 00112 Transceiver transceiver; 00113 00115 00119 class Tasks: public std::queue<Protocol::FullId>, public boost::mutex {}; 00121 00124 Tasks tasks; 00125 00127 std::queue<Message> messages; 00128 00130 00138 void localHandler(Protocol::FullId id); 00139 00141 bool asleep; 00143 boost::mutex sleepMutex; 00144 00146 00149 bool stopBool; 00151 boost::mutex stopMutex; 00153 00156 bool terminateBool; 00158 boost::mutex terminateMutex; 00159 00160 private: 00162 static void signalHandler(int signum); 00164 static ManagerPar* instance; 00165 }; 00166 00168 00178 template<class T> class Manager: public ManagerPar 00179 { 00180 public: 00182 00192 Manager(int fd=0, bool doSetupSignals=true): ManagerPar(fd, boost::bind(&Manager::push, boost::ref(*this), _1, _2), doSetupSignals) {} 00193 00195 00202 void handler(); 00203 00205 00220 void push(Protocol::FullId id, Message message); 00221 00223 size_t getRequestsSize() const { return requests.size(); } 00224 private: 00226 00230 class Requests: public std::map<Protocol::FullId, boost::shared_ptr<T> >, public boost::shared_mutex {}; 00232 00236 Requests requests; 00237 }; 00238 } 00239 00240 template<class T> void Fastcgipp::Manager<T>::push(Protocol::FullId id, Message message) 00241 { 00242 using namespace std; 00243 using namespace Protocol; 00244 using namespace boost; 00245 00246 if(id.fcgiId) 00247 { 00248 shared_lock<shared_mutex> reqReadLock(requests); 00249 typename Requests::iterator it(requests.find(id)); 00250 if(it!=requests.end()) 00251 { 00252 lock_guard<mutex> mesLock(it->second->messages); 00253 it->second->messages.push(message); 00254 lock_guard<mutex> tasksLock(tasks); 00255 tasks.push(id); 00256 } 00257 else if(!message.type) 00258 { 00259 Header& header=*(Header*)message.data.get(); 00260 if(header.getType()==BEGIN_REQUEST) 00261 { 00262 BeginRequest& body=*(BeginRequest*)(message.data.get()+sizeof(Header)); 00263 00264 reqReadLock.unlock(); 00265 unique_lock<shared_mutex> reqWriteLock(requests); 00266 00267 boost::shared_ptr<T>& request = requests[id]; 00268 request.reset(new T); 00269 request->set(id, transceiver, body.getRole(), !body.getKeepConn(), boost::bind(&Manager::push, boost::ref(*this), id, _1)); 00270 } 00271 else 00272 return; 00273 } 00274 } 00275 else 00276 { 00277 messages.push(message); 00278 tasks.push(id); 00279 } 00280 00281 lock_guard<mutex> sleepLock(sleepMutex); 00282 if(asleep) 00283 transceiver.wake(); 00284 } 00285 00286 template<class T> void Fastcgipp::Manager<T>::handler() 00287 { 00288 using namespace std; 00289 using namespace boost; 00290 00291 while(1) 00292 {{ 00293 { 00294 lock_guard<mutex> stopLock(stopMutex); 00295 if(stopBool) 00296 { 00297 stopBool=false; 00298 return; 00299 } 00300 } 00301 00302 bool sleep=transceiver.handler(); 00303 00304 { 00305 lock_guard<mutex> terminateLock(terminateMutex); 00306 if(terminateBool) 00307 { 00308 shared_lock<shared_mutex> requestsLock(requests); 00309 if(requests.empty() && sleep) 00310 { 00311 terminateBool=false; 00312 return; 00313 } 00314 } 00315 } 00316 00317 unique_lock<mutex> tasksLock(tasks); 00318 unique_lock<mutex> sleepLock(sleepMutex); 00319 00320 if(tasks.empty()) 00321 { 00322 tasksLock.unlock(); 00323 00324 asleep=true; 00325 sleepLock.unlock(); 00326 00327 if(sleep) transceiver.sleep(); 00328 00329 sleepLock.lock(); 00330 asleep=false; 00331 sleepLock.unlock(); 00332 00333 continue; 00334 } 00335 00336 sleepLock.unlock(); 00337 00338 Protocol::FullId id=tasks.front(); 00339 tasks.pop(); 00340 tasksLock.unlock(); 00341 00342 if(id.fcgiId==0) 00343 localHandler(id); 00344 else 00345 { 00346 shared_lock<shared_mutex> reqReadLock(requests); 00347 typename map<Protocol::FullId, boost::shared_ptr<T> >::iterator it(requests.find(id)); 00348 if(it!=requests.end() && it->second->handler()) 00349 { 00350 reqReadLock.unlock(); 00351 unique_lock<shared_mutex> reqWriteLock(requests); 00352 00353 requests.erase(it); 00354 } 00355 } 00356 }} 00357 } 00358 00359 #endif