00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #ifndef __XRD_CL_PARALLELOPERATION_HH__
00027 #define __XRD_CL_PARALLELOPERATION_HH__
00028
00029 #include "XrdCl/XrdClOperations.hh"
00030 #include "XrdCl/XrdClOperationHandlers.hh"
00031
00032 #include <atomic>
00033
00034 namespace XrdCl
00035 {
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 struct PolicyExecutor
00056 {
00057 virtual ~PolicyExecutor()
00058 {
00059 }
00060
00061 virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0;
00062 };
00063
00064
00070
00071 template<bool HasHndl>
00072 class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
00073 {
00074 template<bool> friend class ParallelOperation;
00075
00076 public:
00077
00078
00080
00081 template<bool from>
00082 ParallelOperation( ParallelOperation<from> &&obj ) :
00083 ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
00084 pipelines( std::move( obj.pipelines ) ),
00085 policy( std::move( obj.policy ) )
00086 {
00087 }
00088
00089
00095
00096 template<class Container>
00097 ParallelOperation( Container &&container )
00098 {
00099 static_assert( !HasHndl, "Constructor is available only operation without handler");
00100
00101 pipelines.reserve( container.size() );
00102 auto begin = std::make_move_iterator( container.begin() );
00103 auto end = std::make_move_iterator( container.end() );
00104 std::copy( begin, end, std::back_inserter( pipelines ) );
00105 container.clear();
00106 }
00107
00108
00110
00111 std::string ToString()
00112 {
00113 std::ostringstream oss;
00114 oss << "Parallel(";
00115 for( size_t i = 0; i < pipelines.size(); i++ )
00116 {
00117 oss << pipelines[i]->ToString();
00118 if( i + 1 != pipelines.size() )
00119 {
00120 oss << " && ";
00121 }
00122 }
00123 oss << ")";
00124 return oss.str();
00125 }
00126
00127
00132
00133 ParallelOperation<HasHndl> All()
00134 {
00135 policy.reset( new AllPolicy() );
00136 return std::move( *this );
00137 }
00138
00139
00144
00145 ParallelOperation<HasHndl> Any()
00146 {
00147 policy.reset( new AnyPolicy( pipelines.size() ) );
00148 return std::move( *this );
00149 }
00150
00151
00152
00156
00157 ParallelOperation<HasHndl> Some( size_t threshold )
00158 {
00159 policy.reset( new SomePolicy( pipelines.size(), threshold ) );
00160 return std::move( *this );
00161 }
00162
00163
00169
00170 ParallelOperation<HasHndl> AtLeast( size_t threshold )
00171 {
00172 policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) );
00173 return std::move( *this );
00174 }
00175
00176 private:
00177
00178
00183
00184 struct AllPolicy : public PolicyExecutor
00185 {
00186 bool Examine( const XrdCl::XRootDStatus &status )
00187 {
00188 if( status.IsOK() ) return false;
00189
00190 return true;
00191 }
00192 };
00193
00194
00199
00200 struct AnyPolicy : public PolicyExecutor
00201 {
00202 AnyPolicy( size_t size) : cnt( size )
00203 {
00204 }
00205
00206 bool Examine( const XrdCl::XRootDStatus &status )
00207 {
00208
00209 size_t nb = cnt.fetch_sub( 1 );
00210
00211 if( status.IsOK() ) return true;
00212
00213 if( nb == 1 ) return true;
00214
00215 return false;
00216 }
00217
00218 private:
00219 std::atomic<size_t> cnt;
00220 };
00221
00222
00227
00228 struct SomePolicy : PolicyExecutor
00229 {
00230 SomePolicy( size_t size, size_t threshold ) : cnt( size ), succeeded( 0 ), threshold( threshold )
00231 {
00232 }
00233
00234 bool Examine( const XrdCl::XRootDStatus &status )
00235 {
00236
00237 size_t nb = cnt.fetch_sub( 1 );
00238 if( status.IsOK() )
00239 {
00240 size_t s = succeeded.fetch_add( 1 );
00241 if( s + 1 == threshold ) return true;
00242
00243 return false;
00244 }
00245
00246 if( nb == threshold ) return true;
00247
00248 return false;
00249 }
00250
00251 private:
00252 std::atomic<size_t> cnt;
00253 std::atomic<size_t> succeeded;
00254 const size_t threshold;
00255 };
00256
00257
00263
00264 struct AtLeastPolicy : PolicyExecutor
00265 {
00266 AtLeastPolicy( size_t size, size_t threshold ) : cnt( size ), threshold( threshold )
00267 {
00268 }
00269
00270 bool Examine( const XrdCl::XRootDStatus &status )
00271 {
00272
00273 size_t nb = cnt.fetch_sub( 1 );
00274
00275 if( status.IsOK() ) return false;
00276 if( nb == threshold ) return true;
00277
00278 return false;
00279 }
00280
00281 private:
00282 std::atomic<size_t> cnt;
00283 const size_t threshold;
00284 };
00285
00286
00291
00292 struct Ctx
00293 {
00294
00298
00299 Ctx( PipelineHandler *handler, PolicyExecutor *policy ): handler( handler ),
00300 policy( policy )
00301 {
00302 }
00303
00304
00306
00307 ~Ctx()
00308 {
00309 Handle( XRootDStatus() );
00310 }
00311
00312
00317
00318 inline void Examine( const XRootDStatus &st )
00319 {
00320 if( policy->Examine( st ) )
00321 Handle( st );
00322 }
00323
00324
00329
00330 inline void Handle( const XRootDStatus &st )
00331 {
00332 PipelineHandler* hdlr = handler.exchange( nullptr );
00333 if( hdlr )
00334 hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
00335 }
00336
00337
00339
00340 std::atomic<PipelineHandler*> handler;
00341
00342
00344
00345 std::unique_ptr<PolicyExecutor> policy;
00346 };
00347
00348
00354
00355 XRootDStatus RunImpl()
00356 {
00357
00358 if( !policy ) policy.reset( new AllPolicy() );
00359
00360 std::shared_ptr<Ctx> ctx =
00361 std::make_shared<Ctx>( this->handler.release(), policy.release() );
00362
00363 try
00364 {
00365 for( size_t i = 0; i < pipelines.size(); ++i )
00366 {
00367 pipelines[i].Run( [ctx]( const XRootDStatus &st ){ ctx->Examine( st ); } );
00368 }
00369 }
00370 catch( const PipelineException& ex )
00371 {
00372 return ex.GetError();
00373 }
00374 catch( const std::exception& ex )
00375 {
00376 return XRootDStatus( stError, ex.what() );
00377 }
00378
00379 return XRootDStatus();
00380 }
00381
00382 std::vector<Pipeline> pipelines;
00383 std::unique_ptr<PolicyExecutor> policy;
00384 };
00385
00386
00388
00389 template<class Container>
00390 inline ParallelOperation<false> Parallel( Container &container )
00391 {
00392 return ParallelOperation<false>( container );
00393 }
00394
00395
00397
00398 inline void PipesToVec( std::vector<Pipeline>& )
00399 {
00400
00401 }
00402
00403
00404
00405
00406
00407 template<typename ... Others>
00408 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
00409 Others&... others );
00410
00411 template<typename ... Others>
00412 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
00413 Others&... others );
00414
00415 template<typename ... Others>
00416 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
00417 Others&... others );
00418
00419
00420
00421
00422 template<typename ... Others>
00423 void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
00424 Others&... others )
00425 {
00426 v.emplace_back( operation );
00427 PipesToVec( v, others... );
00428 }
00429
00430 template<typename ... Others>
00431 void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
00432 Others&... others )
00433 {
00434 v.emplace_back( operation );
00435 PipesToVec( v, others... );
00436 }
00437
00438 template<typename ... Others>
00439 void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
00440 Others&... others )
00441 {
00442 v.emplace_back( std::move( pipeline ) );
00443 PipesToVec( v, others... );
00444 }
00445
00446
00451
00452 template<typename ... Operations>
00453 inline ParallelOperation<false> Parallel( Operations&& ... operations )
00454 {
00455 constexpr size_t size = sizeof...( operations );
00456 std::vector<Pipeline> v;
00457 v.reserve( size );
00458 PipesToVec( v, operations... );
00459 return Parallel( v );
00460 }
00461 }
00462
00463 #endif // __XRD_CL_OPERATIONS_HH__