00001 #ifndef __XRDPFC_FILE_HH__
00002 #define __XRDPFC_FILE_HH__
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "XrdCl/XrdClXRootDResponses.hh"
00022 #include "XrdCl/XrdClDefaultEnv.hh"
00023
00024 #include "XrdOuc/XrdOucCache.hh"
00025 #include "XrdOuc/XrdOucIOVec.hh"
00026
00027 #include "XrdPfcInfo.hh"
00028 #include "XrdPfcStats.hh"
00029
00030 #include <string>
00031 #include <map>
00032 #include <set>
00033
00034 class XrdJob;
00035 class XrdOucIOVec;
00036
00037 namespace XrdCl
00038 {
00039 class Log;
00040 }
00041
00042 namespace XrdPfc
00043 {
00044 class BlockResponseHandler;
00045 class DirectResponseHandler;
00046 class IO;
00047
00048 struct ReadVBlockListRAM;
00049 struct ReadVChunkListRAM;
00050 struct ReadVBlockListDisk;
00051 struct ReadVChunkListDisk;
00052 }
00053
00054
00055 namespace XrdPfc
00056 {
00057
00058 class File;
00059
00060 class Block
00061 {
00062 public:
00063 File *m_file;
00064 IO *m_io;
00065
00066 char *m_buff;
00067 long long m_offset;
00068 int m_size;
00069 int m_refcnt;
00070 int m_errno;
00071 bool m_downloaded;
00072 bool m_prefetch;
00073
00074 Block(File *f, IO *io, char *buf, long long off, int size, bool m_prefetch) :
00075 m_file(f), m_io(io), m_buff(buf), m_offset(off), m_size(size),
00076 m_refcnt(0), m_errno(0), m_downloaded(false), m_prefetch(m_prefetch)
00077 {}
00078
00079 char* get_buff() { return m_buff; }
00080 int get_size() { return m_size; }
00081 long long get_offset() { return m_offset; }
00082
00083 IO* get_io() const { return m_io; }
00084
00085 bool is_finished() { return m_downloaded || m_errno != 0; }
00086 bool is_ok() { return m_downloaded; }
00087 bool is_failed() { return m_errno != 0; }
00088
00089 void set_downloaded() { m_downloaded = true; }
00090 void set_error(int err) { m_errno = err; }
00091
00092 void reset_error_and_set_io(IO *io)
00093 {
00094 m_errno = 0;
00095 m_io = io;
00096 }
00097 };
00098
00099
00100
00101 class BlockResponseHandler : public XrdOucCacheIOCB
00102 {
00103 public:
00104 Block *m_block;
00105 bool m_for_prefetch;
00106
00107 BlockResponseHandler(Block *b, bool prefetch) :
00108 m_block(b), m_for_prefetch(prefetch) {}
00109
00110 virtual void Done(int result);
00111 };
00112
00113
00114
00115 class DirectResponseHandler : public XrdOucCacheIOCB
00116 {
00117 public:
00118 XrdSysCondVar m_cond;
00119 int m_to_wait;
00120 int m_errno;
00121
00122 DirectResponseHandler(int to_wait) : m_cond(0), m_to_wait(to_wait), m_errno(0) {}
00123
00124 bool is_finished() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0; }
00125 bool is_ok() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0 && m_errno == 0; }
00126 bool is_failed() { XrdSysCondVarHelper _lck(m_cond); return m_errno != 0; }
00127
00128 virtual void Done(int result);
00129 };
00130
00131
00132
00133 class File
00134 {
00135 public:
00136
00138
00139 File(const std::string &path, long long offset, long long fileSize);
00140
00141
00143
00144 static File* FileOpen(const std::string &path, long long offset, long long fileSize);
00145
00146
00148
00149 ~File();
00150
00152 void BlockRemovedFromWriteQ(Block*);
00153
00155 void BlocksRemovedFromWriteQ(std::list<Block*>&);
00156
00158 bool Open();
00159
00161 int ReadV(IO *io, const XrdOucIOVec *readV, int n);
00162
00164 int Read (IO *io, char* buff, long long offset, int size);
00165
00166
00168
00169 bool isOpen() const { return m_is_open; }
00170
00171
00174
00175 bool ioActive(IO *io);
00176
00177
00180
00181 void RequestSyncOfDetachStats();
00182
00183
00186
00187 bool FinalizeSyncBeforeExit();
00188
00189
00191
00192 void Sync();
00193
00194
00195 void ProcessBlockResponse(BlockResponseHandler* brh, int res);
00196 void WriteBlockToDisk(Block* b);
00197
00198 void Prefetch();
00199
00200 float GetPrefetchScore() const;
00201
00203 const char* lPath() const;
00204
00205 std::string& GetLocalPath() { return m_filename; }
00206
00207 XrdSysError* GetLog();
00208 XrdSysTrace* GetTrace();
00209
00210 long long GetFileSize() { return m_file_size; }
00211
00212 void AddIO(IO *io);
00213 int GetPrefetchCountOnIO(IO *io);
00214 void StopPrefetchingOnIO(IO *io);
00215 void RemoveIO(IO *io);
00216
00217 Stats DeltaStatsFromLastCall();
00218
00219 const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); }
00220 size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
00221 int GetBlockSize() const { return m_cfi.GetBufferSize(); }
00222 int GetNBlocks() const { return m_cfi.GetSizeInBits(); }
00223 int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
00224
00225
00226 int get_ref_cnt() { return m_ref_cnt; }
00227 int inc_ref_cnt() { return ++m_ref_cnt; }
00228 int dec_ref_cnt() { return --m_ref_cnt; }
00229
00230 void initiate_emergency_shutdown();
00231 bool is_in_emergency_shutdown() { return m_in_shutdown; }
00232
00233 private:
00234 enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
00235
00236 int m_ref_cnt;
00237
00238 bool m_is_open;
00239 bool m_in_shutdown;
00240
00241 XrdOssDF *m_data_file;
00242 XrdOssDF *m_info_file;
00243 Info m_cfi;
00244
00245 std::string m_filename;
00246 long long m_offset;
00247 long long m_file_size;
00248
00249
00250
00251 struct IODetails
00252 {
00253 time_t m_attach_time;
00254 int m_active_prefetches;
00255 bool m_allow_prefetching;
00256 bool m_ioactive_false_reported;
00257
00258 IODetails(time_t at) :
00259 m_attach_time (at),
00260 m_active_prefetches (0),
00261 m_allow_prefetching (true),
00262 m_ioactive_false_reported (false)
00263 {}
00264 };
00265
00266 typedef std::map<IO*, IODetails> IoMap_t;
00267 typedef IoMap_t::iterator IoMap_i;
00268
00269 IoMap_t m_io_map;
00270 IoMap_i m_current_io;
00271 int m_ios_in_detach;
00272
00273
00274 std::vector<int> m_writes_during_sync;
00275 int m_non_flushed_cnt;
00276 bool m_in_sync;
00277
00278 typedef std::list<int> IntList_t;
00279 typedef IntList_t::iterator IntList_i;
00280
00281 typedef std::list<Block*> BlockList_t;
00282 typedef BlockList_t::iterator BlockList_i;
00283
00284 typedef std::map<int, Block*> BlockMap_t;
00285 typedef BlockMap_t::iterator BlockMap_i;
00286
00287 typedef std::set<Block*> BlockSet_t;
00288 typedef BlockSet_t::iterator BlockSet_i;
00289
00290
00291 BlockMap_t m_block_map;
00292
00293 XrdSysCondVar m_state_cond;
00294
00295 Stats m_stats;
00296 Stats m_last_stats;
00297
00298 PrefetchState_e m_prefetch_state;
00299
00300 int m_prefetch_read_cnt;
00301 int m_prefetch_hit_cnt;
00302 float m_prefetch_score;
00303
00304 bool m_detach_time_logged;
00305
00306 static const char *m_traceID;
00307
00308 bool overlap(int blk,
00309 long long blk_size,
00310 long long req_off,
00311 int req_size,
00312
00313 long long &off,
00314 long long &blk_off,
00315 long long &size);
00316
00317
00318 Block* PrepareBlockRequest(int i, IO *io, bool prefetch);
00319
00320 void ProcessBlockRequest (Block *b, bool prefetch);
00321 void ProcessBlockRequests(BlockList_t& blks, bool prefetch);
00322
00323 int RequestBlocksDirect(IO *io, DirectResponseHandler *handler, IntList_t& blocks,
00324 char* buff, long long req_off, long long req_size);
00325
00326 int ReadBlocksFromDisk(IntList_t& blocks,
00327 char* req_buf, long long req_off, long long req_size);
00328
00329
00330 bool VReadValidate (const XrdOucIOVec *readV, int n);
00331 void VReadPreProcess (IO *io, const XrdOucIOVec *readV, int n,
00332 BlockList_t& blks_to_request,
00333 ReadVBlockListRAM& blks_to_process,
00334 ReadVBlockListDisk& blks_on_disk,
00335 std::vector<XrdOucIOVec>& chunkVec);
00336 int VReadFromDisk (const XrdOucIOVec *readV, int n,
00337 ReadVBlockListDisk& blks_on_disk);
00338 int VReadProcessBlocks(IO *io, const XrdOucIOVec *readV, int n,
00339 std::vector<ReadVChunkListRAM>& blks_to_process,
00340 std::vector<ReadVChunkListRAM>& blks_processed,
00341 long long& bytes_hit,
00342 long long& bytes_missed);
00343
00344 long long BufferSize();
00345
00346 void inc_ref_count(Block*);
00347 void dec_ref_count(Block*);
00348 void free_block(Block*);
00349
00350 bool select_current_io_or_disable_prefetching(bool skip_current);
00351
00352 int offsetIdx(int idx);
00353 };
00354
00355 }
00356
00357 #endif