00001 //------------------------------------------------------------------------------ 00002 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN) 00003 // Author: Lukasz Janyst <ljanyst@cern.ch> 00004 //------------------------------------------------------------------------------ 00005 // XRootD is free software: you can redistribute it and/or modify 00006 // it under the terms of the GNU Lesser General Public License as published by 00007 // the Free Software Foundation, either version 3 of the License, or 00008 // (at your option) any later version. 00009 // 00010 // XRootD is distributed in the hope that it will be useful, 00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 // GNU General Public License for more details. 00014 // 00015 // You should have received a copy of the GNU Lesser General Public License 00016 // along with XRootD. If not, see <http://www.gnu.org/licenses/>. 00017 //------------------------------------------------------------------------------ 00018 00019 #ifndef __XRD_CL_STREAM_HH__ 00020 #define __XRD_CL_STREAM_HH__ 00021 00022 #include "XrdCl/XrdClPoller.hh" 00023 #include "XrdCl/XrdClStatus.hh" 00024 #include "XrdCl/XrdClURL.hh" 00025 #include "XrdCl/XrdClPostMasterInterfaces.hh" 00026 #include "XrdCl/XrdClChannelHandlerList.hh" 00027 #include "XrdCl/XrdClJobManager.hh" 00028 #include "XrdCl/XrdClInQueue.hh" 00029 #include "XrdCl/XrdClUtils.hh" 00030 00031 #include "XrdSys/XrdSysPthread.hh" 00032 #include "XrdNet/XrdNetAddr.hh" 00033 #include <list> 00034 #include <vector> 00035 #include <functional> 00036 00037 namespace XrdCl 00038 { 00039 class Message; 00040 class Channel; 00041 class TransportHandler; 00042 class TaskManager; 00043 struct SubStreamData; 00044 00045 //---------------------------------------------------------------------------- 00047 //---------------------------------------------------------------------------- 00048 class Stream 00049 { 00050 public: 00051 //------------------------------------------------------------------------ 00053 //------------------------------------------------------------------------ 00054 enum StreamStatus 00055 { 00056 Disconnected = 0, 00057 Connected = 1, 00058 Connecting = 2, 00059 Error = 3 00060 }; 00061 00062 //------------------------------------------------------------------------ 00064 //------------------------------------------------------------------------ 00065 Stream( const URL *url ); 00066 00067 //------------------------------------------------------------------------ 00069 //------------------------------------------------------------------------ 00070 ~Stream(); 00071 00072 //------------------------------------------------------------------------ 00074 //------------------------------------------------------------------------ 00075 XRootDStatus Initialize(); 00076 00077 //------------------------------------------------------------------------ 00079 //------------------------------------------------------------------------ 00080 XRootDStatus Send( Message *msg, 00081 OutgoingMsgHandler *handler, 00082 bool stateful, 00083 time_t expires ); 00084 00085 //------------------------------------------------------------------------ 00087 //------------------------------------------------------------------------ 00088 void SetTransport( TransportHandler *transport ) 00089 { 00090 pTransport = transport; 00091 } 00092 00093 //------------------------------------------------------------------------ 00095 //------------------------------------------------------------------------ 00096 void SetPoller( Poller *poller ) 00097 { 00098 pPoller = poller; 00099 } 00100 00101 //------------------------------------------------------------------------ 00103 //------------------------------------------------------------------------ 00104 void SetIncomingQueue( InQueue *incomingQueue ) 00105 { 00106 pIncomingQueue = incomingQueue; 00107 delete pQueueIncMsgJob; 00108 pQueueIncMsgJob = new QueueIncMsgJob( incomingQueue ); 00109 } 00110 00111 //------------------------------------------------------------------------ 00113 //------------------------------------------------------------------------ 00114 void SetChannelData( AnyObject *channelData ) 00115 { 00116 pChannelData = channelData; 00117 } 00118 00119 //------------------------------------------------------------------------ 00121 //------------------------------------------------------------------------ 00122 void SetTaskManager( TaskManager *taskManager ) 00123 { 00124 pTaskManager = taskManager; 00125 } 00126 00127 //------------------------------------------------------------------------ 00129 //------------------------------------------------------------------------ 00130 void SetJobManager( JobManager *jobManager ) 00131 { 00132 pJobManager = jobManager; 00133 } 00134 00135 //------------------------------------------------------------------------ 00139 //------------------------------------------------------------------------ 00140 XRootDStatus EnableLink( PathID &path ); 00141 00142 //------------------------------------------------------------------------ 00144 //------------------------------------------------------------------------ 00145 void Disconnect( bool force = false ); 00146 00147 //------------------------------------------------------------------------ 00150 //------------------------------------------------------------------------ 00151 void Tick( time_t now ); 00152 00153 //------------------------------------------------------------------------ 00155 //------------------------------------------------------------------------ 00156 const URL *GetURL() const 00157 { 00158 return pUrl; 00159 } 00160 00161 //------------------------------------------------------------------------ 00163 //------------------------------------------------------------------------ 00164 void ForceConnect(); 00165 00166 //------------------------------------------------------------------------ 00168 //------------------------------------------------------------------------ 00169 const std::string &GetName() const 00170 { 00171 return pStreamName; 00172 } 00173 00174 //------------------------------------------------------------------------ 00176 //------------------------------------------------------------------------ 00177 void DisableIfEmpty( uint16_t subStream ); 00178 00179 //------------------------------------------------------------------------ 00181 //------------------------------------------------------------------------ 00182 void OnIncoming( uint16_t subStream, 00183 Message *msg, 00184 uint32_t bytesReceived ); 00185 00186 //------------------------------------------------------------------------ 00187 // Call when one of the sockets is ready to accept a new message 00188 //------------------------------------------------------------------------ 00189 std::pair<Message *, OutgoingMsgHandler *> 00190 OnReadyToWrite( uint16_t subStream ); 00191 00192 //------------------------------------------------------------------------ 00193 // Call when a message is written to the socket 00194 //------------------------------------------------------------------------ 00195 void OnMessageSent( uint16_t subStream, 00196 Message *msg, 00197 uint32_t bytesSent ); 00198 00199 //------------------------------------------------------------------------ 00201 //------------------------------------------------------------------------ 00202 void OnConnect( uint16_t subStream ); 00203 00204 //------------------------------------------------------------------------ 00206 //------------------------------------------------------------------------ 00207 void OnConnectError( uint16_t subStream, XRootDStatus status ); 00208 00209 //------------------------------------------------------------------------ 00211 //------------------------------------------------------------------------ 00212 void OnError( uint16_t subStream, XRootDStatus status ); 00213 00214 //------------------------------------------------------------------------ 00216 //------------------------------------------------------------------------ 00217 void ForceError( XRootDStatus status ); 00218 00219 //------------------------------------------------------------------------ 00221 //------------------------------------------------------------------------ 00222 void OnReadTimeout( uint16_t subStream, bool &isBroken ); 00223 00224 //------------------------------------------------------------------------ 00226 //------------------------------------------------------------------------ 00227 void OnWriteTimeout( uint16_t subStream ); 00228 00229 //------------------------------------------------------------------------ 00231 //------------------------------------------------------------------------ 00232 void RegisterEventHandler( ChannelEventHandler *handler ); 00233 00234 //------------------------------------------------------------------------ 00236 //------------------------------------------------------------------------ 00237 void RemoveEventHandler( ChannelEventHandler *handler ); 00238 00239 //------------------------------------------------------------------------ 00248 //------------------------------------------------------------------------ 00249 std::pair<IncomingMsgHandler *, bool> 00250 InstallIncHandler( Message *msg, uint16_t stream ); 00251 00252 //------------------------------------------------------------------------ 00254 //------------------------------------------------------------------------ 00255 void SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob ) 00256 { 00257 pOnDataConnJob = onConnJob; 00258 } 00259 00260 private: 00261 00262 //------------------------------------------------------------------------ 00263 // Job queuing the incoming messages 00264 //------------------------------------------------------------------------ 00265 class QueueIncMsgJob: public Job 00266 { 00267 public: 00268 QueueIncMsgJob( InQueue *queue ): pQueue( queue ) {}; 00269 virtual ~QueueIncMsgJob() {}; 00270 virtual void Run( void *arg ) 00271 { 00272 Message *msg = (Message *)arg; 00273 pQueue->AddMessage( msg ); 00274 } 00275 private: 00276 InQueue *pQueue; 00277 }; 00278 00279 //------------------------------------------------------------------------ 00280 // Job handling the incoming messages 00281 //------------------------------------------------------------------------ 00282 class HandleIncMsgJob: public Job 00283 { 00284 public: 00285 HandleIncMsgJob( IncomingMsgHandler *handler ): pHandler( handler ) {}; 00286 virtual ~HandleIncMsgJob() {}; 00287 virtual void Run( void *arg ) 00288 { 00289 Message *msg = (Message *)arg; 00290 pHandler->Process( msg ); 00291 delete this; 00292 } 00293 private: 00294 IncomingMsgHandler *pHandler; 00295 }; 00296 00297 //------------------------------------------------------------------------ 00299 //------------------------------------------------------------------------ 00300 void OnFatalError( uint16_t subStream, 00301 XRootDStatus status, 00302 XrdSysMutexHelper &lock ); 00303 00304 //------------------------------------------------------------------------ 00306 //------------------------------------------------------------------------ 00307 void MonitorDisconnection( XRootDStatus status ); 00308 00309 //------------------------------------------------------------------------ 00311 //------------------------------------------------------------------------ 00312 XRootDStatus RequestClose( Message *resp ); 00313 00314 typedef std::vector<SubStreamData*> SubStreamList; 00315 00316 //------------------------------------------------------------------------ 00317 // Data members 00318 //------------------------------------------------------------------------ 00319 const URL *pUrl; 00320 std::string pStreamName; 00321 TransportHandler *pTransport; 00322 Poller *pPoller; 00323 TaskManager *pTaskManager; 00324 JobManager *pJobManager; 00325 XrdSysRecMutex pMutex; 00326 InQueue *pIncomingQueue; 00327 AnyObject *pChannelData; 00328 uint32_t pLastStreamError; 00329 XRootDStatus pLastFatalError; 00330 uint16_t pStreamErrorWindow; 00331 uint16_t pConnectionCount; 00332 uint16_t pConnectionRetry; 00333 time_t pConnectionInitTime; 00334 uint16_t pConnectionWindow; 00335 SubStreamList pSubStreams; 00336 std::vector<XrdNetAddr> pAddresses; 00337 Utils::AddressType pAddressType; 00338 ChannelHandlerList pChannelEvHandlers; 00339 uint64_t pSessionId; 00340 00341 //------------------------------------------------------------------------ 00342 // Jobs 00343 //------------------------------------------------------------------------ 00344 QueueIncMsgJob *pQueueIncMsgJob; 00345 00346 //------------------------------------------------------------------------ 00347 // Monitoring info 00348 //------------------------------------------------------------------------ 00349 timeval pConnectionStarted; 00350 timeval pConnectionDone; 00351 uint64_t pBytesSent; 00352 uint64_t pBytesReceived; 00353 00354 //------------------------------------------------------------------------ 00355 // Data stream on-connect handler 00356 //------------------------------------------------------------------------ 00357 std::shared_ptr<Job> pOnDataConnJob; 00358 }; 00359 } 00360 00361 #endif // __XRD_CL_STREAM_HH__