00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
00026 #define __XRD_CL_XROOTD_MSG_HANDLER_HH__
00027
00028 #include "XrdCl/XrdClPostMasterInterfaces.hh"
00029 #include "XrdCl/XrdClXRootDResponses.hh"
00030 #include "XrdCl/XrdClDefaultEnv.hh"
00031 #include "XrdCl/XrdClMessage.hh"
00032 #include "XProtocol/XProtocol.hh"
00033 #include "XrdCl/XrdClLog.hh"
00034 #include "XrdCl/XrdClConstants.hh"
00035
00036 #include "XrdSys/XrdSysPthread.hh"
00037
00038 #include <sys/uio.h>
00039
00040 #include <list>
00041 #include <memory>
00042
00043 #if __cplusplus >= 201103L
00044 #include <atomic>
00045 #endif
00046
00047 namespace XrdCl
00048 {
00049 class PostMaster;
00050 class SIDManager;
00051 class URL;
00052 class LocalFileHandler;
00053 class Socket;
00054
00055
00056
00057
00058 struct RedirectEntry
00059 {
00060 enum Type
00061 {
00062 EntryRedirect,
00063 EntryRedirectOnWait,
00064 EntryRetry,
00065 EntryWait
00066 };
00067
00068 RedirectEntry( const URL &from, const URL &to, Type type ) :
00069 from( from ), to( to ), type( type )
00070 {
00071
00072 }
00073
00074 URL from;
00075 URL to;
00076 Type type;
00077 XRootDStatus status;
00078
00079 std::string ToString( bool prevok = true )
00080 {
00081 const std::string tostr = to.GetLocation();
00082 const std::string fromstr = from.GetLocation();
00083
00084 if( prevok )
00085 {
00086 switch( type )
00087 {
00088 case EntryRedirect: return "Redirected from: " + fromstr + " to: "
00089 + tostr;
00090
00091 case EntryRedirectOnWait: return "Server responded with wait. "
00092 "Falling back to virtual redirector: " + tostr;
00093
00094 case EntryRetry: return "Retrying: " + tostr;
00095
00096 case EntryWait: return "Waited at server request. Resending: "
00097 + tostr;
00098 }
00099 }
00100 return "Failed at: " + fromstr + ", retrying at: " + tostr;
00101 }
00102 };
00103
00104
00106
00107 class XRootDMsgHandler: public IncomingMsgHandler,
00108 public OutgoingMsgHandler
00109 {
00110 friend class HandleRspJob;
00111
00112 public:
00113
00122
00123 XRootDMsgHandler( Message *msg,
00124 ResponseHandler *respHandler,
00125 const URL *url,
00126 std::shared_ptr<SIDManager> sidMgr,
00127 LocalFileHandler *lFileHandler):
00128 pRequest( msg ),
00129 pResponse( 0 ),
00130 pResponseHandler( respHandler ),
00131 pUrl( *url ),
00132 pEffectiveDataServerUrl( 0 ),
00133 pSidMgr( sidMgr ),
00134 pLFileHandler( lFileHandler ),
00135 pExpiration( 0 ),
00136 pRedirectAsAnswer( false ),
00137 pOksofarAsAnswer( false ),
00138 pHosts( 0 ),
00139 pHasLoadBalancer( false ),
00140 pHasSessionId( false ),
00141 pChunkList( 0 ),
00142 pRedirectCounter( 0 ),
00143 pNotAuthorizedCounter( 0 ),
00144
00145 pAsyncOffset( 0 ),
00146 pAsyncChunkIndex( 0 ),
00147 pAsyncReadSize( 0 ),
00148 pAsyncReadBuffer( 0 ),
00149 pAsyncMsgSize( 0 ),
00150
00151 pReadRawStarted( false ),
00152 pReadRawCurrentOffset( 0 ),
00153
00154 pReadVRawMsgOffset( 0 ),
00155 pReadVRawChunkHeaderDone( false ),
00156 pReadVRawChunkHeaderStarted( false ),
00157 pReadVRawSizeError( false ),
00158 pReadVRawChunkIndex( 0 ),
00159 pReadVRawMsgDiscard( false ),
00160
00161 pOtherRawStarted( false ),
00162
00163 pFollowMetalink( false ),
00164
00165 pStateful( false ),
00166
00167 pAggregatedWaitTime( 0 ),
00168
00169 pMsgInFly( false ),
00170
00171 pTimeoutFence( false ),
00172
00173 pDirListStarted( false ),
00174 pDirListWithStat( false ),
00175
00176 pCV( 0 )
00177
00178 {
00179 pPostMaster = DefaultEnv::GetPostMaster();
00180 if( msg->GetSessionId() )
00181 pHasSessionId = true;
00182 memset( &pReadVRawChunkHeader, 0, sizeof( readahead_list ) );
00183
00184 Log *log = DefaultEnv::GetLog();
00185 log->Debug( ExDbgMsg, "[%s] MsgHandler created: 0x%x (message: %s ).",
00186 pUrl.GetHostId().c_str(), this,
00187 pRequest->GetDescription().c_str() );
00188 }
00189
00190
00192
00193 ~XRootDMsgHandler()
00194 {
00195 DumpRedirectTraceBack();
00196
00197 if( !pHasSessionId )
00198 delete pRequest;
00199 delete pResponse;
00200 std::vector<Message *>::iterator it;
00201 for( it = pPartialResps.begin(); it != pPartialResps.end(); ++it )
00202 delete *it;
00203
00204 delete pEffectiveDataServerUrl;
00205
00206 pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
00207 pResponse = reinterpret_cast<Message*>( 0xDEADBEEF );
00208 pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
00209 pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
00210 pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
00211 pHosts = reinterpret_cast<HostList*>( 0xDEADBEEF );
00212 pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
00213 pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
00214
00215 Log *log = DefaultEnv::GetLog();
00216 log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: 0x%x.",
00217 pUrl.GetHostId().c_str(), this );
00218 }
00219
00220
00226
00227 virtual uint16_t Examine( Message *msg );
00228
00229
00233
00234 virtual uint16_t GetSid() const;
00235
00236
00240
00241 virtual void Process( Message *msg );
00242
00243
00253
00254 virtual Status ReadMessageBody( Message *msg,
00255 Socket *socket,
00256 uint32_t &bytesRead );
00257
00258
00264
00265 virtual uint8_t OnStreamEvent( StreamEvent event,
00266 XRootDStatus status );
00267
00268
00270
00271 virtual void OnStatusReady( const Message *message,
00272 XRootDStatus status );
00273
00274
00276
00277 virtual bool IsRaw() const;
00278
00279
00288
00289 Status WriteMessageBody( Socket *socket,
00290 uint32_t &bytesWritten );
00291
00292
00296
00297 void WaitDone( time_t now );
00298
00299
00301
00302 void SetExpiration( time_t expiration )
00303 {
00304 pExpiration = expiration;
00305 }
00306
00307
00310
00311 void SetRedirectAsAnswer( bool redirectAsAnswer )
00312 {
00313 pRedirectAsAnswer = redirectAsAnswer;
00314 }
00315
00316
00319
00320 void SetOksofarAsAnswer( bool oksofarAsAnswer )
00321 {
00322 pOksofarAsAnswer = oksofarAsAnswer;
00323 }
00324
00325
00327
00328 const Message *GetRequest() const
00329 {
00330 return pRequest;
00331 }
00332
00333
00335
00336 void SetLoadBalancer( const HostInfo &loadBalancer )
00337 {
00338 if( !loadBalancer.url.IsValid() )
00339 return;
00340 pLoadBalancer = loadBalancer;
00341 pHasLoadBalancer = true;
00342 }
00343
00344
00346
00347 void SetHostList( HostList *hostList )
00348 {
00349 delete pHosts;
00350 pHosts = hostList;
00351 }
00352
00353
00355
00356 void SetChunkList( ChunkList *chunkList )
00357 {
00358 pChunkList = chunkList;
00359 if( chunkList )
00360 pChunkStatus.resize( chunkList->size() );
00361 else
00362 pChunkStatus.clear();
00363 }
00364
00365
00367
00368 void SetRedirectCounter( uint16_t redirectCounter )
00369 {
00370 pRedirectCounter = redirectCounter;
00371 }
00372
00373 void SetFollowMetalink( bool followMetalink )
00374 {
00375 pFollowMetalink = followMetalink;
00376 }
00377
00378 void SetStateful( bool stateful )
00379 {
00380 pStateful = stateful;
00381 }
00382
00383
00385
00386 void TakeDownTimeoutFence();
00387
00388 private:
00389
00390
00392
00393 Status ReadRawRead( Message *msg,
00394 Socket *socket,
00395 uint32_t &bytesRead );
00396
00397
00399
00400 Status ReadRawReadV( Message *msg,
00401 Socket *socket,
00402 uint32_t &bytesRead );
00403
00404
00406
00407 Status ReadRawOther( Message *msg,
00408 Socket *socket,
00409 uint32_t &bytesRead );
00410
00411
00414
00415 Status ReadAsync( Socket *socket, uint32_t &btesRead );
00416
00417
00419
00420 void HandleError( XRootDStatus status, Message *msg = 0 );
00421
00422
00424
00425 Status RetryAtServer( const URL &url, RedirectEntry::Type entryType );
00426
00427
00429
00430 void HandleResponse();
00431
00432
00434
00435 XRootDStatus *ProcessStatus();
00436
00437
00440
00441 Status ParseResponse( AnyObject *&response );
00442
00443
00446
00447 Status ParseXAttrResponse( char *data, size_t len, AnyObject *&response );
00448
00449
00452
00453 Status RewriteRequestRedirect( const URL &newUrl );
00454
00455
00457
00458 Status RewriteRequestWait();
00459
00460
00462
00463 Status PostProcessReadV( VectorReadInfo *vReadInfo );
00464
00465
00467
00468 Status UnPackReadVResponse( Message *msg );
00469
00470
00472
00473 void UpdateTriedCGI(uint32_t errNo=0);
00474
00475
00477
00478 void SwitchOnRefreshFlag();
00479
00480
00483
00484 void HandleRspOrQueue();
00485
00486
00488
00489 void HandleLocalRedirect( URL *url );
00490
00491
00496
00497 bool IsRetriable( Message *request );
00498
00499
00506
00507 bool OmitWait( Message *request, const URL &url );
00508
00509
00515
00516 bool RetriableErrorResponse( const Status &status );
00517
00518
00520
00521 void DumpRedirectTraceBack();
00522
00529
00530 template<typename T>
00531 Status ReadFromBuffer( char *&buffer, size_t &buflen, T& result );
00532
00533
00540
00541 Status ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result );
00542
00543
00551
00552 Status ReadFromBuffer( char *&buffer, size_t &buflen, size_t size,
00553 std::string &result );
00554
00555
00556
00557
00558 struct ChunkStatus
00559 {
00560 ChunkStatus(): sizeError( false ), done( false ) {}
00561 bool sizeError;
00562 bool done;
00563 };
00564
00565 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
00566
00567 Message *pRequest;
00568 Message *pResponse;
00569 std::vector<Message *> pPartialResps;
00570 ResponseHandler *pResponseHandler;
00571 URL pUrl;
00572 URL *pEffectiveDataServerUrl;
00573 PostMaster *pPostMaster;
00574 std::shared_ptr<SIDManager> pSidMgr;
00575 LocalFileHandler *pLFileHandler;
00576 XRootDStatus pStatus;
00577 Status pLastError;
00578 time_t pExpiration;
00579 bool pRedirectAsAnswer;
00580 bool pOksofarAsAnswer;
00581 HostList *pHosts;
00582 bool pHasLoadBalancer;
00583 HostInfo pLoadBalancer;
00584 bool pHasSessionId;
00585 std::string pRedirectUrl;
00586 ChunkList *pChunkList;
00587 std::vector<ChunkStatus> pChunkStatus;
00588 uint16_t pRedirectCounter;
00589 uint16_t pNotAuthorizedCounter;
00590
00591 uint32_t pAsyncOffset;
00592 uint32_t pAsyncChunkIndex;
00593 uint32_t pAsyncReadSize;
00594 char* pAsyncReadBuffer;
00595 uint32_t pAsyncMsgSize;
00596
00597 bool pReadRawStarted;
00598 uint32_t pReadRawCurrentOffset;
00599
00600 uint32_t pReadVRawMsgOffset;
00601 bool pReadVRawChunkHeaderDone;
00602 bool pReadVRawChunkHeaderStarted;
00603 bool pReadVRawSizeError;
00604 int32_t pReadVRawChunkIndex;
00605 readahead_list pReadVRawChunkHeader;
00606 bool pReadVRawMsgDiscard;
00607
00608 bool pOtherRawStarted;
00609
00610 bool pFollowMetalink;
00611
00612 bool pStateful;
00613 int pAggregatedWaitTime;
00614
00615 std::unique_ptr<RedirectEntry> pRdirEntry;
00616 RedirectTraceBack pRedirectTraceBack;
00617
00618 bool pMsgInFly;
00619
00620
00621
00622
00623
00624
00625 #if __cplusplus >= 201103L
00626 std::atomic<bool> pTimeoutFence;
00627 #else
00628 bool pTimeoutFence;
00629 #endif
00630
00631
00632
00633
00634
00635
00636 bool pDirListStarted;
00637 bool pDirListWithStat;
00638
00639
00640
00641
00642
00643 XrdSysCondVar pCV;
00644 };
00645 }
00646
00647 #endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__