00001 //------------------------------------------------------------------------------ 00002 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN) 00003 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>, 00004 // Michal Simon <michal.simon@cern.ch> 00005 //------------------------------------------------------------------------------ 00006 // This file is part of the XRootD software suite. 00007 // 00008 // XRootD is free software: you can redistribute it and/or modify 00009 // it under the terms of the GNU Lesser General Public License as published by 00010 // the Free Software Foundation, either version 3 of the License, or 00011 // (at your option) any later version. 00012 // 00013 // XRootD is distributed in the hope that it will be useful, 00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 // GNU General Public License for more details. 00017 // 00018 // You should have received a copy of the GNU Lesser General Public License 00019 // along with XRootD. If not, see <http://www.gnu.org/licenses/>. 00020 // 00021 // In applying this licence, CERN does not waive the privileges and immunities 00022 // granted to it by virtue of its status as an Intergovernmental Organization 00023 // or submit itself to any jurisdiction. 00024 //------------------------------------------------------------------------------ 00025 00026 #ifndef __XRD_CL_OPERATIONS_HH__ 00027 #define __XRD_CL_OPERATIONS_HH__ 00028 00029 #include <memory> 00030 #include <stdexcept> 00031 #include <sstream> 00032 #include <tuple> 00033 #include <future> 00034 #include "XrdCl/XrdClXRootDResponses.hh" 00035 #include "XrdCl/XrdClOperationHandlers.hh" 00036 #include "XrdClArg.hh" 00037 #include "XrdSys/XrdSysPthread.hh" 00038 00039 namespace XrdCl 00040 { 00041 00042 template<bool HasHndl> class Operation; 00043 00044 class Pipeline; 00045 00046 00047 //---------------------------------------------------------------------------- 00049 //---------------------------------------------------------------------------- 00050 typedef std::function<Operation<true>*(const XRootDStatus&)> rcvry_func; 00051 00052 //---------------------------------------------------------------------------- 00055 //---------------------------------------------------------------------------- 00056 class PipelineHandler: public ResponseHandler 00057 { 00058 template<bool> friend class Operation; 00059 00060 public: 00061 00062 //------------------------------------------------------------------------ 00067 //------------------------------------------------------------------------ 00068 PipelineHandler( ResponseHandler *handler, 00069 rcvry_func &&recovery ); 00070 00071 //------------------------------------------------------------------------ 00073 //------------------------------------------------------------------------ 00074 PipelineHandler( rcvry_func &&recovery ) : recovery( std::move( recovery ) ) 00075 { 00076 } 00077 00078 //------------------------------------------------------------------------ 00080 //------------------------------------------------------------------------ 00081 void HandleResponseWithHosts( XRootDStatus *status, AnyObject *response, 00082 HostList *hostList ); 00083 00084 //------------------------------------------------------------------------ 00086 //------------------------------------------------------------------------ 00087 void HandleResponse( XRootDStatus *status, AnyObject *response ); 00088 00089 //------------------------------------------------------------------------ 00091 //------------------------------------------------------------------------ 00092 ~PipelineHandler() 00093 { 00094 } 00095 00096 //------------------------------------------------------------------------ 00100 //------------------------------------------------------------------------ 00101 void AddOperation( Operation<true> *operation ); 00102 00103 //------------------------------------------------------------------------ 00110 //------------------------------------------------------------------------ 00111 void Assign( std::promise<XRootDStatus> prms, 00112 std::function<void(const XRootDStatus&)> final ); 00113 00114 private: 00115 00116 //------------------------------------------------------------------------ 00118 //------------------------------------------------------------------------ 00119 void HandleResponseImpl( XRootDStatus *status, AnyObject *response, 00120 HostList *hostList = nullptr ); 00121 00122 inline void dealloc( XRootDStatus *status, AnyObject *response, 00123 HostList *hostList ) 00124 { 00125 delete status; 00126 delete response; 00127 delete hostList; 00128 } 00129 00130 //------------------------------------------------------------------------ 00132 //------------------------------------------------------------------------ 00133 std::unique_ptr<ResponseHandler> responseHandler; 00134 00135 //------------------------------------------------------------------------ 00137 //------------------------------------------------------------------------ 00138 std::unique_ptr<Operation<true>> nextOperation; 00139 00140 //------------------------------------------------------------------------ 00142 //------------------------------------------------------------------------ 00143 std::promise<XRootDStatus> prms; 00144 00145 //------------------------------------------------------------------------ 00148 //------------------------------------------------------------------------ 00149 std::function<void(const XRootDStatus&)> final; 00150 00151 //------------------------------------------------------------------------ 00153 //------------------------------------------------------------------------ 00154 rcvry_func recovery; 00155 }; 00156 00157 //---------------------------------------------------------------------------- 00163 //---------------------------------------------------------------------------- 00164 template<bool HasHndl> 00165 class Operation 00166 { 00167 // Declare friendship between templates 00168 template<bool> 00169 friend class Operation; 00170 00171 friend std::future<XRootDStatus> Async( Pipeline ); 00172 00173 friend class Pipeline; 00174 friend class PipelineHandler; 00175 00176 public: 00177 00178 //------------------------------------------------------------------------ 00180 //------------------------------------------------------------------------ 00181 Operation() : valid( true ) 00182 { 00183 } 00184 00185 //------------------------------------------------------------------------ 00187 //------------------------------------------------------------------------ 00188 template<bool from> 00189 Operation( Operation<from> && op ) : 00190 handler( std::move( op.handler ) ), valid( true ) 00191 { 00192 if( !op.valid ) throw std::invalid_argument( "Cannot construct " 00193 "Operation from an invalid Operation!" ); 00194 op.valid = false; 00195 } 00196 00197 //------------------------------------------------------------------------ 00199 //------------------------------------------------------------------------ 00200 virtual ~Operation() 00201 { 00202 } 00203 00204 //------------------------------------------------------------------------ 00206 //------------------------------------------------------------------------ 00207 virtual std::string ToString() = 0; 00208 00209 //------------------------------------------------------------------------ 00213 //------------------------------------------------------------------------ 00214 virtual Operation<HasHndl>* Move() = 0; 00215 00216 //------------------------------------------------------------------------ 00221 //------------------------------------------------------------------------ 00222 virtual Operation<true>* ToHandled() = 0; 00223 00224 protected: 00225 00226 //------------------------------------------------------------------------ 00236 //------------------------------------------------------------------------ 00237 void Run( std::promise<XRootDStatus> prms, 00238 std::function<void(const XRootDStatus&)> final ) 00239 { 00240 static_assert(HasHndl, "Only an operation that has a handler can be assigned to workflow"); 00241 handler->Assign( std::move( prms ), std::move( final ) ); 00242 XRootDStatus st = RunImpl(); 00243 if( st.IsOK() ) handler.release(); 00244 else 00245 ForceHandler( st ); 00246 } 00247 00248 //------------------------------------------------------------------------ 00255 //------------------------------------------------------------------------ 00256 virtual XRootDStatus RunImpl() = 0; 00257 00258 //------------------------------------------------------------------------ 00265 //------------------------------------------------------------------------ 00266 void ForceHandler( const XRootDStatus &status ) 00267 { 00268 handler->HandleResponse( new XRootDStatus( status ), nullptr ); 00269 // HandleResponse already freed the memory so we have to 00270 // release the unique pointer 00271 handler.release(); 00272 } 00273 00274 //------------------------------------------------------------------------ 00278 //------------------------------------------------------------------------ 00279 void AddOperation( Operation<true> *op ) 00280 { 00281 if( handler ) 00282 handler->AddOperation( op ); 00283 } 00284 00285 //------------------------------------------------------------------------ 00287 //------------------------------------------------------------------------ 00288 std::unique_ptr<PipelineHandler> handler; 00289 00290 //------------------------------------------------------------------------ 00292 //------------------------------------------------------------------------ 00293 bool valid; 00294 }; 00295 00296 //---------------------------------------------------------------------------- 00302 //---------------------------------------------------------------------------- 00303 class Pipeline 00304 { 00305 template<bool> friend class ParallelOperation; 00306 friend std::future<XRootDStatus> Async( Pipeline ); 00307 00308 public: 00309 00310 //------------------------------------------------------------------------ 00312 //------------------------------------------------------------------------ 00313 Pipeline( Operation<true> *op ) : 00314 operation( op->Move() ) 00315 { 00316 00317 } 00318 00319 //------------------------------------------------------------------------ 00321 //------------------------------------------------------------------------ 00322 Pipeline( Operation<true> &op ) : 00323 operation( op.Move() ) 00324 { 00325 00326 } 00327 00328 //------------------------------------------------------------------------ 00330 //------------------------------------------------------------------------ 00331 Pipeline( Operation<true> &&op ) : 00332 operation( op.Move() ) 00333 { 00334 00335 } 00336 00337 Pipeline( Operation<false> *op ) : 00338 operation( op->ToHandled() ) 00339 { 00340 00341 } 00342 00343 //------------------------------------------------------------------------ 00345 //------------------------------------------------------------------------ 00346 Pipeline( Operation<false> &op ) : 00347 operation( op.ToHandled() ) 00348 { 00349 00350 } 00351 00352 //------------------------------------------------------------------------ 00354 //------------------------------------------------------------------------ 00355 Pipeline( Operation<false> &&op ) : 00356 operation( op.ToHandled() ) 00357 { 00358 00359 } 00360 00361 Pipeline( Pipeline &&pipe ) : 00362 operation( std::move( pipe.operation ) ) 00363 { 00364 00365 } 00366 00367 //------------------------------------------------------------------------ 00369 //------------------------------------------------------------------------ 00370 Pipeline& operator=( Pipeline &&pipe ) 00371 { 00372 operation = std::move( pipe.operation ); 00373 return *this; 00374 } 00375 00376 //------------------------------------------------------------------------ 00380 //------------------------------------------------------------------------ 00381 operator Operation<true>&() 00382 { 00383 if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." ); 00384 return *operation.get(); 00385 } 00386 00387 //------------------------------------------------------------------------ 00391 //------------------------------------------------------------------------ 00392 operator bool() 00393 { 00394 return bool( operation ); 00395 } 00396 00397 private: 00398 00399 //------------------------------------------------------------------------ 00404 //------------------------------------------------------------------------ 00405 Operation<true>* operator->() 00406 { 00407 return operation.get(); 00408 } 00409 00410 //------------------------------------------------------------------------ 00416 //------------------------------------------------------------------------ 00417 void Run( std::function<void(const XRootDStatus&)> final = nullptr ) 00418 { 00419 if( ftr.valid() ) 00420 throw std::logic_error( "Pipeline is already running" ); 00421 00422 // a promise that the pipe will have a result 00423 std::promise<XRootDStatus> prms; 00424 ftr = prms.get_future(); 00425 operation->Run( std::move( prms ), std::move( final ) ); 00426 } 00427 00428 //------------------------------------------------------------------------ 00430 //------------------------------------------------------------------------ 00431 std::unique_ptr<Operation<true>> operation; 00432 00433 //------------------------------------------------------------------------ 00435 //------------------------------------------------------------------------ 00436 std::future<XRootDStatus> ftr; 00437 00438 }; 00439 00440 //---------------------------------------------------------------------------- 00446 //---------------------------------------------------------------------------- 00447 inline std::future<XRootDStatus> Async( Pipeline pipeline ) 00448 { 00449 pipeline.Run(); 00450 return std::move( pipeline.ftr ); 00451 } 00452 00453 //---------------------------------------------------------------------------- 00460 //---------------------------------------------------------------------------- 00461 inline XRootDStatus WaitFor( Pipeline pipeline ) 00462 { 00463 return Async( std::move( pipeline ) ).get(); 00464 } 00465 00466 //---------------------------------------------------------------------------- 00473 //---------------------------------------------------------------------------- 00474 template<template<bool> class Derived, bool HasHndl, typename HdlrFactory, typename ... Args> 00475 class ConcreteOperation: public Operation<HasHndl> 00476 { 00477 template<template<bool> class, bool, typename, typename ...> 00478 friend class ConcreteOperation; 00479 00480 public: 00481 00482 //------------------------------------------------------------------------ 00486 //------------------------------------------------------------------------ 00487 ConcreteOperation( Args&&... args ) : args( std::tuple<Args...>( std::move( args )... ) ) 00488 { 00489 static_assert( !HasHndl, "It is only possible to construct operation without handler" ); 00490 } 00491 00492 //------------------------------------------------------------------------ 00498 //------------------------------------------------------------------------ 00499 template<bool from> 00500 ConcreteOperation( ConcreteOperation<Derived, from, HdlrFactory, Args...> && op ) : 00501 Operation<HasHndl>( std::move( op ) ), args( std::move( op.args ) ) 00502 { 00503 00504 } 00505 00506 //------------------------------------------------------------------------ 00514 //------------------------------------------------------------------------ 00515 template<typename Hdlr> 00516 Derived<true> operator>>( Hdlr &&hdlr ) 00517 { 00518 return this->StreamImpl( HdlrFactory::Create( hdlr ) ); 00519 } 00520 00521 //------------------------------------------------------------------------ 00527 //------------------------------------------------------------------------ 00528 Derived<true> operator|( Operation<true> &op ) 00529 { 00530 return PipeImpl( *this, op ); 00531 } 00532 00533 //------------------------------------------------------------------------ 00539 //------------------------------------------------------------------------ 00540 Derived<true> operator|( Operation<true> &&op ) 00541 { 00542 return PipeImpl( *this, op ); 00543 } 00544 00545 //------------------------------------------------------------------------ 00551 //------------------------------------------------------------------------ 00552 Derived<true> operator|( Operation<false> &op ) 00553 { 00554 return PipeImpl( *this, op ); 00555 } 00556 00557 //------------------------------------------------------------------------ 00563 //------------------------------------------------------------------------ 00564 Derived<true> operator|( Operation<false> &&op ) 00565 { 00566 return PipeImpl( *this, op ); 00567 } 00568 00569 //------------------------------------------------------------------------ 00571 //------------------------------------------------------------------------ 00572 Derived<HasHndl> Recovery( rcvry_func recovery ) 00573 { 00574 this->recovery = std::move( recovery ); 00575 return Transform<HasHndl>(); 00576 } 00577 00578 //------------------------------------------------------------------------ 00582 //------------------------------------------------------------------------ 00583 inline Operation<HasHndl>* Move() 00584 { 00585 Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this ); 00586 return new Derived<HasHndl>( std::move( *me ) ); 00587 } 00588 00589 //------------------------------------------------------------------------ 00593 //------------------------------------------------------------------------ 00594 inline Operation<true>* ToHandled() 00595 { 00596 this->handler.reset( new PipelineHandler( std::move( this->recovery ) ) ); 00597 Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this ); 00598 return new Derived<true>( std::move( *me ) ); 00599 } 00600 00601 protected: 00602 00603 //------------------------------------------------------------------------ 00607 //------------------------------------------------------------------------ 00608 template<bool to> 00609 inline Derived<to> Transform() 00610 { 00611 Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this ); 00612 return Derived<to>( std::move( *me ) ); 00613 } 00614 00615 //------------------------------------------------------------------------ 00621 //------------------------------------------------------------------------ 00622 inline Derived<true> StreamImpl( ResponseHandler *handler ) 00623 { 00624 static_assert( !HasHndl, "Operator >> is available only for operation without handler" ); 00625 this->handler.reset( new PipelineHandler( handler, std::move( this->recovery ) ) ); 00626 return Transform<true>(); 00627 } 00628 00629 //------------------------------------------------------------------------ 00636 //------------------------------------------------------------------------ 00637 inline static 00638 Derived<true> PipeImpl( ConcreteOperation<Derived, true, HdlrFactory, 00639 Args...> &me, Operation<true> &op ) 00640 { 00641 me.AddOperation( op.Move() ); 00642 return me.template Transform<true>(); 00643 } 00644 00645 //------------------------------------------------------------------------ 00652 //------------------------------------------------------------------------ 00653 inline static 00654 Derived<true> PipeImpl( ConcreteOperation<Derived, true, HdlrFactory, 00655 Args...> &me, Operation<false> &op ) 00656 { 00657 me.AddOperation( op.ToHandled() ); 00658 return me.template Transform<true>(); 00659 } 00660 00661 //------------------------------------------------------------------------ 00668 //------------------------------------------------------------------------ 00669 inline static 00670 Derived<true> PipeImpl( ConcreteOperation<Derived, false, HdlrFactory, 00671 Args...> &me, Operation<true> &op ) 00672 { 00673 me.handler.reset( new PipelineHandler( std::move( me.recovery ) ) ); 00674 me.AddOperation( op.Move() ); 00675 return me.template Transform<true>(); 00676 } 00677 00678 //------------------------------------------------------------------------ 00685 //------------------------------------------------------------------------ 00686 inline static 00687 Derived<true> PipeImpl( ConcreteOperation<Derived, false, HdlrFactory, 00688 Args...> &me, Operation<false> &op ) 00689 { 00690 me.handler.reset( new PipelineHandler( std::move( me.recovery ) ) ); 00691 me.AddOperation( op.ToHandled() ); 00692 return me.template Transform<true>(); 00693 } 00694 00695 //------------------------------------------------------------------------ 00697 //------------------------------------------------------------------------ 00698 std::tuple<Args...> args; 00699 00700 //------------------------------------------------------------------------ 00702 //------------------------------------------------------------------------ 00703 00704 rcvry_func recovery; 00705 }; 00706 } 00707 00708 #endif // __XRD_CL_OPERATIONS_HH__