00001 #ifndef __XRDPFC_CACHE_HH__
00002 #define __XRDPFC_CACHE_HH__
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <string>
00021 #include <list>
00022 #include <map>
00023 #include <set>
00024
00025 #include "Xrd/XrdScheduler.hh"
00026 #include "XrdVersion.hh"
00027 #include "XrdSys/XrdSysPthread.hh"
00028 #include "XrdOuc/XrdOucCache.hh"
00029 #include "XrdOuc/XrdOucCallBack.hh"
00030 #include "XrdCl/XrdClDefaultEnv.hh"
00031
00032 #include "XrdPfcFile.hh"
00033 #include "XrdPfcDecision.hh"
00034
00035 class XrdOucStream;
00036 class XrdSysError;
00037 class XrdSysTrace;
00038 class XrdXrootdGStream;
00039
00040 namespace XrdCl
00041 {
00042 class Log;
00043 }
00044
00045 namespace XrdPfc
00046 {
00047 class File;
00048 class IO;
00049
00050 class DataFsState;
00051 }
00052
00053
00054 namespace XrdPfc
00055 {
00056
00057
00059
00060 struct Configuration
00061 {
00062 Configuration() :
00063 m_hdfsmode(false),
00064 m_allow_xrdpfc_command(false),
00065 m_data_space("public"),
00066 m_meta_space("public"),
00067 m_diskTotalSpace(-1),
00068 m_diskUsageLWM(-1),
00069 m_diskUsageHWM(-1),
00070 m_fileUsageBaseline(-1),
00071 m_fileUsageNominal(-1),
00072 m_fileUsageMax(-1),
00073 m_purgeInterval(300),
00074 m_purgeColdFilesAge(-1),
00075 m_purgeColdFilesPeriod(-1),
00076 m_accHistorySize(20),
00077 m_dirStatsMaxDepth(-1),
00078 m_dirStatsStoreDepth(-1),
00079 m_dirStats(false),
00080 m_bufferSize(1024*1024),
00081 m_RamAbsAvailable(0),
00082 m_RamKeepStdBlocks(0),
00083 m_wqueue_blocks(16),
00084 m_wqueue_threads(4),
00085 m_prefetch_max_blocks(10),
00086 m_hdfsbsize(128*1024*1024),
00087 m_flushCnt(2000)
00088 {}
00089
00090 bool are_file_usage_limits_set() const { return m_fileUsageMax > 0; }
00091 bool is_age_based_purge_in_effect() const { return m_purgeColdFilesAge > 0; }
00092 bool is_purge_plugin_set_up() const { return false; }
00093
00094 void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu);
00095
00096
00097 bool are_dirstats_enabled() const { return m_dirStats; }
00098
00099 bool m_hdfsmode;
00100 bool m_allow_xrdpfc_command;
00101
00102 std::string m_username;
00103 std::string m_data_space;
00104 std::string m_meta_space;
00105
00106 long long m_diskTotalSpace;
00107 long long m_diskUsageLWM;
00108 long long m_diskUsageHWM;
00109 long long m_fileUsageBaseline;
00110 long long m_fileUsageNominal;
00111 long long m_fileUsageMax;
00112 int m_purgeInterval;
00113 int m_purgeColdFilesAge;
00114 int m_purgeColdFilesPeriod;
00115 int m_accHistorySize;
00116
00117 std::set<std::string> m_dirStatsDirs;
00118 std::set<std::string> m_dirStatsDirGlobs;
00119 int m_dirStatsMaxDepth;
00120 int m_dirStatsStoreDepth;
00121 bool m_dirStats;
00122
00123 long long m_bufferSize;
00124 long long m_RamAbsAvailable;
00125 int m_RamKeepStdBlocks;
00126 int m_wqueue_blocks;
00127 int m_wqueue_threads;
00128 int m_prefetch_max_blocks;
00129
00130 long long m_hdfsbsize;
00131 long long m_flushCnt;
00132 };
00133
00134
00135
00136 struct TmpConfiguration
00137 {
00138 std::string m_diskUsageLWM;
00139 std::string m_diskUsageHWM;
00140 std::string m_fileUsageBaseline;
00141 std::string m_fileUsageNominal;
00142 std::string m_fileUsageMax;
00143 std::string m_flushRaw;
00144
00145 TmpConfiguration() :
00146 m_diskUsageLWM("0.90"), m_diskUsageHWM("0.95"),
00147 m_flushRaw("")
00148 {}
00149 };
00150
00151
00152
00153 struct SplitParser
00154 {
00155 char *str;
00156 const char *delim;
00157 char *state;
00158 bool first;
00159
00160 SplitParser(const std::string &s, const char *d) :
00161 str(strdup(s.c_str())), delim(d), state(0), first(true)
00162 {}
00163 ~SplitParser() { free(str); }
00164
00165 char* get_token()
00166 {
00167 if (first) { first = false; return strtok_r(str, delim, &state); }
00168 else { return strtok_r(0, delim, &state); }
00169 }
00170
00171 char* get_reminder_with_delim()
00172 {
00173 if (first) { return str; }
00174 else { *(state - 1) = delim[0]; return state - 1; }
00175 }
00176
00177 char *get_reminder()
00178 {
00179 return first ? str : state;
00180 }
00181
00182 int fill_argv(std::vector<char*> &argv)
00183 {
00184 if (!first) return 0;
00185 int dcnt = 0; { char *p = str; while (*p) { if (*(p++) == delim[0]) ++dcnt; } }
00186 argv.reserve(dcnt + 1);
00187 int argc = 0;
00188 char *i = strtok_r(str, delim, &state);
00189 while (i)
00190 {
00191 ++argc;
00192 argv.push_back(i);
00193
00194 i = strtok_r(0, delim, &state);
00195 }
00196 return argc;
00197 }
00198 };
00199
00200 struct PathTokenizer : private SplitParser
00201 {
00202 std::vector<const char*> m_dirs;
00203 const char *m_reminder;
00204 int m_n_dirs;
00205
00206 PathTokenizer(const std::string &path, int max_depth, bool parse_as_lfn) :
00207 SplitParser(path, "/"),
00208 m_reminder (0)
00209 {
00210
00211
00212
00213 m_dirs.reserve(max_depth);
00214
00215 char *t;
00216 for (int i = 0; i < max_depth; ++i)
00217 {
00218 t = get_token();
00219 if (t == 0) break;
00220 m_dirs.emplace_back(t);
00221 }
00222 if (parse_as_lfn && (t == 0 || * get_reminder() == 0))
00223 {
00224 m_reminder = m_dirs.back();
00225 m_dirs.pop_back();
00226 }
00227 else
00228 {
00229 m_reminder = get_reminder();
00230 }
00231 m_n_dirs = (int) m_dirs.size();
00232 }
00233
00234 int get_n_dirs()
00235 {
00236 return m_n_dirs;
00237 }
00238
00239 const char *get_dir(int pos)
00240 {
00241 if (pos >= m_n_dirs) return 0;
00242 return m_dirs[pos];
00243 }
00244
00245 std::string make_path()
00246 {
00247 std::string res;
00248 for (std::vector<const char*>::iterator i = m_dirs.begin(); i != m_dirs.end(); ++i)
00249 {
00250 res += "/";
00251 res += *i;
00252 }
00253 if (m_reminder != 0)
00254 {
00255 res += "/";
00256 res += m_reminder;
00257 }
00258 return res;
00259 }
00260
00261 void deboog()
00262 {
00263 printf("PathTokenizer::deboog size=%d\n", m_n_dirs);
00264 for (int i = 0; i < m_n_dirs; ++i)
00265 {
00266 printf(" %2d: %s\n", i, m_dirs[i]);
00267 }
00268 printf(" rem: %s\n", m_reminder);
00269 }
00270 };
00271
00272
00273
00274
00275
00276
00277
00279
00280 class Cache : public XrdOucCache
00281 {
00282 public:
00283
00285
00286 Cache(XrdSysLogger *logger, XrdOucEnv *env);
00287
00288
00290
00291 using XrdOucCache::Attach;
00292
00293 virtual XrdOucCacheIO *Attach(XrdOucCacheIO *, int Options = 0);
00294
00295
00296
00297
00298 virtual int LocalFilePath(const char *url, char *buff=0, int blen=0,
00299 LFP_Reason why=ForAccess, bool forall=false);
00300
00301
00302
00303 virtual int Prepare(const char *url, int oflags, mode_t mode);
00304
00305
00306 virtual int Stat(const char *url, struct stat &sbuff);
00307
00308
00309 virtual int Unlink(const char *url);
00310
00311
00317
00318 bool Decide(XrdOucCacheIO*);
00319
00320
00322
00323 const Configuration& RefConfiguration() const { return m_configuration; }
00324
00325
00332
00333 bool Config(const char *config_filename, const char *parameters);
00334
00335
00337
00338 static Cache &CreateInstance(XrdSysLogger *logger, XrdOucEnv *env);
00339
00340
00342
00343 static Cache &GetInstance();
00344
00345
00347
00348 static bool VCheck(XrdVersionInfo &urVersion) { return true; }
00349
00350
00352
00353 void ResourceMonitorHeartBeat();
00354
00355
00357
00358 void Purge();
00359
00360
00362
00363 int UnlinkUnlessOpen(const std::string& f_name);
00364
00365
00367
00368 void AddWriteTask(Block* b, bool from_read);
00369
00370
00373
00374 void RemoveWriteQEntriesFor(File *f);
00375
00376
00378
00379 void ProcessWriteTasks();
00380
00381 char* RequestRAM(long long size);
00382 void ReleaseRAM(char* buf, long long size);
00383
00384 void RegisterPrefetchFile(File*);
00385 void DeRegisterPrefetchFile(File*);
00386
00387 File* GetNextFileToPrefetch();
00388
00389 void Prefetch();
00390
00391 XrdOss* GetOss() const { return m_oss; }
00392
00393 bool IsFileActiveOrPurgeProtected(const std::string&);
00394
00395 File* GetFile(const std::string&, IO*, long long off = 0, long long filesize = 0);
00396
00397 void ReleaseFile(File*, IO*);
00398
00399 void ScheduleFileSync(File* f) { schedule_file_sync(f, false, false); }
00400
00401 void FileSyncDone(File*, bool high_debug);
00402
00403 XrdSysError* GetLog() { return &m_log; }
00404 XrdSysTrace* GetTrace() { return m_trace; }
00405
00406 XrdXrootdGStream* GetGStream() { return m_gstream; }
00407
00408 void ExecuteCommandUrl(const std::string& command_url);
00409
00410 static XrdScheduler *schedP;
00411
00412 private:
00413 bool ConfigParameters(std::string, XrdOucStream&, TmpConfiguration &tmpc);
00414 bool ConfigXeq(char *, XrdOucStream &);
00415 bool xdlib(XrdOucStream &);
00416 bool xtrace(XrdOucStream &);
00417
00418 bool cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name);
00419
00420 int UnlinkCommon(const std::string& f_name, bool fail_if_open);
00421
00422 static Cache *m_instance;
00423
00424 XrdOucEnv *m_env;
00425 XrdSysError m_log;
00426 XrdSysTrace *m_trace;
00427 const char *m_traceID;
00428
00429 XrdOucCacheStats m_ouc_stats;
00430 XrdOss *m_oss;
00431
00432 XrdXrootdGStream *m_gstream;
00433
00434 std::vector<XrdPfc::Decision*> m_decisionpoints;
00435
00436 Configuration m_configuration;
00437
00438 XrdSysCondVar m_prefetch_condVar;
00439 bool m_prefetch_enabled;
00440
00441 XrdSysMutex m_RAM_mutex;
00442 long long m_RAM_used;
00443 long long m_RAM_write_queue;
00444 std::list<char*> m_RAM_std_blocks;
00445 int m_RAM_std_size;
00446
00447 bool m_isClient;
00448
00449 struct WriteQ
00450 {
00451 WriteQ() : condVar(0), writes_between_purges(0), size(0) {}
00452
00453 XrdSysCondVar condVar;
00454 std::list<Block*> queue;
00455 long long writes_between_purges;
00456 int size;
00457 };
00458
00459 WriteQ m_writeQ;
00460
00461
00462 typedef std::map<std::string, File*> ActiveMap_t;
00463 typedef ActiveMap_t::iterator ActiveMap_i;
00464 typedef std::multimap<std::string, XrdPfc::Stats> StatsMMap_t;
00465 typedef StatsMMap_t::iterator StatsMMap_i;
00466 typedef std::set<std::string> FNameSet_t;
00467
00468 ActiveMap_t m_active;
00469 StatsMMap_t m_closed_files_stats;
00470 FNameSet_t m_purge_delay_set;
00471 bool m_in_purge;
00472 XrdSysCondVar m_active_cond;
00473
00474 void inc_ref_cnt(File*, bool lock, bool high_debug);
00475 void dec_ref_cnt(File*, bool high_debug);
00476
00477 void schedule_file_sync(File*, bool ref_cnt_already_set, bool high_debug);
00478
00479
00480 typedef std::vector<File*> PrefetchList;
00481 PrefetchList m_prefetchList;
00482
00483
00484
00485
00486 enum ScanAndPurgeThreadState_e { SPTS_Idle, SPTS_Scan, SPTS_Purge, SPTS_Done };
00487
00488 XrdSysCondVar m_stats_n_purge_cond;
00489
00490 DataFsState *m_fs_state;
00491
00492 int m_last_scan_duration;
00493 int m_last_purge_duration;
00494 ScanAndPurgeThreadState_e m_spt_state;
00495
00496 void copy_out_active_stats_and_update_data_fs_state();
00497 };
00498
00499 }
00500
00501 #endif