// ---------------------------------------------------------------------- // File: com_cp.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ /*----------------------------------------------------------------------------*/ #include #include "common/StringTokenizer.hh" #include "console/ConsoleMain.hh" #include "common/Path.hh" #include "common/StringConversion.hh" #include "XrdPosix/XrdPosixXrootd.hh" #include "XrdOuc/XrdOucEnv.hh" #include "XrdCl/XrdClURL.hh" #include "XrdCl/XrdClFileSystem.hh" /*----------------------------------------------------------------------------*/ int com_cp_usage() { fprintf(stdout, "Usage: cp [--async] [--atomic] [--rate=] [--streams=] [--depth=] [--checksum] [--no-overwrite|-k] [--preserve|-p] [--recursive|-r|-R] [-s|--silent] [-a] [-n] [-S] [-d[=][] \n"); fprintf(stdout, "'[eos] cp ..' provides copy functionality to EOS.\n"); fprintf(stdout, " | can be root:///, a local path /tmp/../ or an eos path /eos/ in the connected instance\n"); fprintf(stdout, "Options:\n"); fprintf(stdout, " --atomic : run an atomic upload where files are only visible with the target name when their are completely uploaded [ adds ?eos.atomic=1 to the target URL ]\n"); fprintf(stdout, " --rate : limit the cp rate to \n"); fprintf(stdout, " --streams : use <#> parallel streams\n"); fprintf(stdout, " --depth : depth for recursive copy\n"); fprintf(stdout, " --checksum : output the checksums\n"); fprintf(stdout, " -a : append to the target, don't truncate\n"); fprintf(stdout, " -p : create destination directory\n"); fprintf(stdout, " -n : hide progress bar\n"); fprintf(stdout, " -S : print summary\n"); fprintf(stdout, " -d | --debug : enable debug information (optional =1|2|3)\n"); fprintf(stdout, " -s | --silent : no output outside error messages\n"); fprintf(stdout, " -k | --no-overwrite : disable overwriting of files\n"); fprintf(stdout, " -P | --preserve : preserves file creation and modification time from the source\n"); fprintf(stdout, " -r | -R | --recursive : copy source location recursively\n"); fprintf(stdout, "\n"); fprintf(stdout, "Remark: \n"); fprintf(stdout, " If you deal with directories always add a '/' in the end of source or target paths e.g. if the target should be a directory and not a file put a '/' in the end. To copy a directory hierarchy use '-r' and source and target directories terminated with '/' !\n"); fprintf(stdout, "\n"); fprintf(stdout, "Examples: \n"); fprintf(stdout, " eos cp /var/data/myfile /eos/foo/user/data/ : copy 'myfile' to /eos/foo/user/data/myfile\n"); fprintf(stdout, " eos cp /var/data/ /eos/foo/user/data/ : copy all plain files in /var/data to /eos/foo/user/data/\n"); fprintf(stdout, " eos cp -r /var/data/ /eos/foo/user/data/ : copy the full hierarchy from /var/data/ to /eos/foo/user/data/ => empty directories won't show up on the target!\n"); fprintf(stdout, " eos cp -r --checksum --silent /var/data/ /eos/foo/user/data/ : copy the full hierarchy and just printout the checksum information for each file copied!\n"); fprintf(stdout, "\nS3:\n"); fprintf(stdout, " URLs have to be written as:\n"); fprintf(stdout, " as3://// as implemented in ROOT\n"); fprintf(stdout, " or as3:/ with environment variable S3_HOSTNAME set\n"); fprintf(stdout, " and as3:....?s3.id=&s3.key=\n\n"); fprintf(stdout, " The access id can be defined in 3 ways:\n"); fprintf(stdout, " env S3_ACCESS_ID= [as used in ROOT ]\n"); fprintf(stdout, " env S3_ACCESS_KEY_ID= [as used in libs3 ]\n"); fprintf(stdout, " ?s3.id= [as used in EOS transfers ]\n"); fprintf(stdout, "\n"); fprintf(stdout, " The access key can be defined in 3 ways:\n"); fprintf(stdout, " env S3_ACCESS_KEY= [as used in ROOT ]\n"); fprintf(stdout, " env S3_SECRET_ACCESS_KEY= [as used in libs3 ]\n"); fprintf(stdout, " ?s3.key= [as used in EOS transfers ]\n"); fprintf(stdout, "\n"); fprintf(stdout, " If and are using S3, we are using the same credentials on both ends and the target credentials will overwrite source credentials!\n"); return (EINVAL); } /* Helper types */ enum Protocol { HTTP, HTTPS, GSIFTP, S3, AS3, XROOT, EOS, LOCAL, UNKNOWN }; struct File_t { XrdOucString name; XrdOucString opaque; Protocol protocol; timespec atime; timespec mtime; unsigned long long size; File_t() : name(""), opaque(""), protocol(Protocol::UNKNOWN), size(0) { } }; /* Helper functions */ int run_eos_command(const char* cmdline, std::vector& result); int run_command(const char* cmdline, std::vector& result); const char* absolute_path(const char* path); bool is_dir(const char* path, Protocol protocol, struct stat* buf = NULL); XrdOucString process_symlink(XrdOucString path); const char* setup_s3_environment(XrdOucString path, XrdOucString opaque); std::string eos_roles_opaque(); int do_stat(const char* path, Protocol protocol, struct stat& buf); int check_protocol_tool(const char* path); Protocol get_protocol(XrdOucString path); const char* protocol_to_string(Protocol protocol); int parse_debug_level(XrdOucString option); /* eos cp command */ int com_cp(char* argin) { XrdOucString rate = ""; XrdOucString streams = "0"; XrdOucString atomic = ""; std::vector source_find_list; std::vector source_basepath_list; std::vector source_list; File_t target; bool target_is_stdout; bool target_is_dir = false; bool recursive = false; bool summary = false; bool noprogress = false; bool append = false; bool makeparent = false; bool debug = false; int debug_level = 0; bool checksums = false; bool silent = false; bool nooverwrite = false; bool preserve = false; unsigned long long copysize = 0; unsigned long long copiedsize = 0; unsigned long depth = 0; struct timeval start_time, end_time; struct timezone tz; int files_copied = 0; int retc = 0; // Check if this is an 'async' command XrdOucString sarg = argin; // ---------------------------------------------------------------------------- // Parse arguments // ---------------------------------------------------------------------------- eos::common::StringTokenizer subtokenizer(argin); subtokenizer.GetLine(); do { XrdOucString option = subtokenizer.GetToken(); if (!option.length()) { break; } if (option.beginswith("--rate=")) { rate = option; rate.replace("--rate=", ""); } else if (option.beginswith("--streams=")) { streams = option; streams.replace("--streams=", ""); } else if ((option == "--recursive") || (option == "-R") || (option == "-r")) { recursive = true; } else if (option == "-n") { noprogress = true; } else if (option == "-a") { append = true; } else if (option == "-p") { makeparent = true; } else if (option == "-S") { summary = true; } else if ((option == "-s") || (option == "--silent")) { silent = true; } else if ((option == "-k") || (option == "--no-overwrite")) { nooverwrite = true; } else if (option == "--checksum") { checksums = true; } else if ((option.beginswith("-d")) || (option.beginswith("--debug"))) { if ((debug_level = parse_debug_level(option)) < 0) { return com_cp_usage(); } debug = true; } else if ((option == "--preserve") || (option == "-P")) { preserve = true; } else if (option == "--atomic") { atomic = "&eos.atomic=1"; } else if (option.beginswith("--depth=")) { option.replace("--depth=", ""); try { depth = std::stoul(option.c_str()); } catch (...) { fprintf(stderr, "error: invalid value for =%s", option.c_str()); return com_cp_usage(); } } else if (option.beginswith("-")) { return com_cp_usage(); } else { if ((!option.beginswith("/eos/")) || (!option.beginswith("root:/"))) { // Do this since tokenizer sealed the path when extracting the token! eos::common::StringConversion::UnsealXrdPath(option); } source_find_list.emplace_back(option.c_str()); break; } } while (true); if (silent || !hasterminal) { noprogress = true; } if (recursive) { makeparent = true; } // Store list of source locations + target destination XrdOucString nextarg = subtokenizer.GetToken(); XrdOucString lastarg = subtokenizer.GetToken(); while (lastarg.length()) { source_find_list.emplace_back(nextarg.c_str()); nextarg = lastarg; lastarg = subtokenizer.GetToken(); } target.name = nextarg; if (!target.name.length()) { fprintf(stderr, "warning: no target specified. Please view 'eos cp --help'.\n"); global_retc = 0; return 0; } // -------------------------------------------------------------------------- // Expand source list into final list to copy. // This means interpreting the '*' character in file names // and traversing directories for the recursive flag. // Every source path also has an associated base path, // which will get appended to the target. // -------------------------------------------------------------------------- for (size_t i = 0; i < source_find_list.size(); i++) { std::vector files; XrdOucString source = source_find_list[i]; XrdOucString source_opaque; XrdOucString basepath = ""; Protocol protocol; std::string sprotocol = ""; int opos = source.find("?"); bool wildcard = false; files.clear(); // Extract opaque info if (opos != STR_NPOS) { source_opaque = source; source_opaque.erase(0, opos + 1); source.erase(opos); } // Identify protocol protocol = get_protocol(source.c_str()); if (protocol == Protocol::UNKNOWN) { fprintf(stderr, "warning: %s -- protocol not recognized. Skipping path..", source.c_str()); continue; } // Convert local to absolute path const char* abs_path = absolute_path(source.c_str()); source = abs_path; free((char*)abs_path); // Check if source is a directory if (!source.endswith("/") && is_dir(source.c_str(), protocol, NULL)) { source.append("/"); } // Extract file name and parent path const char* filepath = source.c_str(); // URLs need different processing in order to extract the path if ((protocol != Protocol::EOS) && (protocol != Protocol::LOCAL)) { XrdOucString sprot, hostport; filepath = eos::common::StringConversion::ParseUrl(source.c_str(), sprot, hostport); if (!filepath) { fprintf(stderr, "error: cannot process file=%s [protocol=%s]\n", source.c_str(), protocol_to_string(protocol)); continue; } } eos::common::Path cPath(filepath); basepath = cPath.GetParentPath(); if ((source.find("*") != STR_NPOS) || (source.endswith("/"))) { std::string cmdtext; if ((protocol != Protocol::EOS) && (protocol != Protocol::LOCAL)) { fprintf(stderr, "error: %s -- path expansion not implemented for %s protocol." " Skipping path..\n", source.c_str(), protocol_to_string(protocol)); continue; } // Get all paths matching wildcard if (source.find("*") != STR_NPOS) { // Will use 'ls -lF' combined with grep to identify matches // ls -l[F|p] | awk 'NF == 9 {print $9}' [ | egrep "" ] // Note: eos::common::Path removes trailing '/'! XrdOucString basename = cPath.GetName(); if (source.endswith("/")) { basename.append("/"); } // Wildcards are supported only in the basename if (basename.find("*") == STR_NPOS) { fprintf(stderr, "warning: %s -- wildcards not supported outside basename." " Skipping path..\n", source.c_str()); continue; } XrdOucString match = basename.c_str(); wildcard = true; if (!match.beginswith("*")) { match.insert("^", 0); } if (!match.endswith("*")) { match.append("$"); } match.replace("*", ".*"); // Construct command text cmdtext = "ls -l"; cmdtext += (protocol == Protocol::EOS) ? "F " : "p "; cmdtext += basepath.c_str(); cmdtext += " | awk '{out=$9; for (i=10; i<=NF; i++) {out=out\" \"$i}; print out}' | egrep \""; cmdtext += match.c_str(); cmdtext += "\""; } else if (source.endswith("/")) { // Get all files within directory // Will use 'find' to identify files // local file: find [-maxdepth ] -follow -type f // eos file: find -f [--maxdepth ] if (!recursive) { fprintf(stderr, "warning: omitting directory %s\n", source.c_str()); continue; } // Enclose source path in quotes, as the path may contain whitespace stringstream ss; ss.clear(); ss << std::quoted(source.c_str()); source = ss.str().c_str(); // Capture only last directory // This will end up appended to the target std::string smaxdepth = " "; if (depth != 0) { smaxdepth = " -maxdepth "; smaxdepth += std::to_string(depth); smaxdepth += " "; if (protocol == Protocol::EOS) { smaxdepth.insert(1, "-"); } } cmdtext = "find "; if (protocol == Protocol::EOS) { cmdtext += "-f"; cmdtext += smaxdepth.c_str(); cmdtext += source.c_str(); } else { cmdtext += source.c_str(); cmdtext += smaxdepth.c_str(); cmdtext += "-follow -type f"; } } cmdtext += " 2> /dev/null"; if (debug) { fprintf(stderr, "[eos-cp] running: %s\n", cmdtext.c_str()); } int rc = (protocol == Protocol::EOS) ? run_eos_command(cmdtext.c_str(), files) : run_command(cmdtext.c_str(), files); if (rc && !files.size()) { fprintf(stderr, "warning: could not expand source: %s\n", source.c_str()); global_retc = rc; return -1; } } else { files.emplace_back(source.c_str()); } for (auto& file : files) { // Check if path expansion discovered a symlink if (file.find(" -> ") != STR_NPOS) { file = process_symlink(file.c_str()); } if (wildcard) { file.insert(basepath.c_str(), 0); source_find_list.emplace_back(file.c_str()); continue; } if (debug) { fprintf(stderr, "[eos-cp] Copy list: %s\n", file.c_str()); } File_t source_file; source_file.name = file.c_str(); source_file.opaque = source_opaque.c_str(); source_file.protocol = protocol; source_list.emplace_back(source_file); source_basepath_list.emplace_back(basepath.c_str()); } } // Check if there is any file in the list if (source_list.empty()) { fprintf(stderr, "warning: found zero files to copy!\n"); global_retc = 0; return 0; } // -------------------------------------------------------------------------- // Process target path // -------------------------------------------------------------------------- bool target_exists; struct stat target_stat; target.protocol = get_protocol(target.name.c_str()); // Make sure executable to reach target exists if (check_protocol_tool(target.name.c_str())) { return -1; } // Handle opaque information for target if (target.protocol != Protocol::LOCAL) { int qpos = target.name.find("?"); if (qpos != STR_NPOS) { target.opaque = target.name.c_str(); target.opaque.keep(qpos + 1); target.name.erase(qpos); } // Seal the target name if (target.protocol == Protocol::EOS) { eos::common::StringConversion::SealXrdPath(target.name); } } // Detect whether target is stdout const char* abs_path = absolute_path(target.name.c_str()); target.name = abs_path; free((char*)abs_path); target_is_stdout = (target.name == "-"); if (!target_is_stdout) { // Detect whether target is a directory int stat_rc = do_stat(target.name.c_str(), target.protocol, target_stat); target_exists = (stat_rc == 0); target_is_dir = is_dir(target.name.c_str(), target.protocol, &target_stat); // If multiple source files target must be a directory if (source_list.size() > 1) { // Target doesn't exist, mark it as directory if (!target_exists) { target_is_dir = true; } // Target is not a directory if (!target_is_dir) { fprintf(stderr, "error: target must be a directory\n"); global_retc = EINVAL; return -1; } } // Target doesn't exist but name suggests should be a directory if (!target_exists && target.name.endswith("/")) { target_is_dir = true; } // If target is a directory then the name should also reflect this if (target_is_dir && !target.name.endswith("/")) { target.name.append("/"); } // Check rights to create target directory if (target_is_dir && !target_exists) { if (!makeparent) { fprintf(stderr, "error: target must be created. Please try with " "create flag '-p' or see 'eos cp --help' for more info.\n"); global_retc = EINVAL; return -1; } } // Create target directory tree for EOS or local path if (makeparent) { if ((target.protocol == Protocol::EOS) || (target.protocol == Protocol::LOCAL)) { XrdOucString mktarget; if (target.name.endswith("/")) { mktarget = target.name.c_str(); } else { eos::common::Path cTarget(target.name.c_str()); mktarget = cTarget.GetParentPath(); } std::string cmdtext = "mkdir -p "; if (target.protocol == Protocol::LOCAL) { cmdtext += "--mode 755 "; } cmdtext += mktarget.c_str(); std::vector tmp; int rc = (target.protocol == Protocol::EOS) ? run_eos_command(cmdtext.c_str(), tmp) : run_command(cmdtext.c_str(), tmp); if (rc) { fprintf(stderr, "error: failed to create target directory : %s\n", mktarget.c_str()); global_retc = rc; return -1; } } } } else { // Disable all output for stdout target silent = true; noprogress = true; } // Set up environment for S3 target if ((target.protocol == Protocol::AS3) || (target.protocol == Protocol::S3)) { const char* url = setup_s3_environment(target.name, target.opaque); if (url == NULL) { return -1; } target.name = url; } // Expand '/eos/' shortcut for EOS protocol if ((target.protocol == Protocol::EOS) && (target.name.beginswith("/eos/"))) { if (!serveruri.endswith("/")) { target.name.insert("/", 0); } target.name.insert(serveruri.c_str(), 0); } if (debug) { fprintf(stderr, "[eos-cp] # of source files: %lu\n", source_list.size()); fprintf(stderr, "[eos-cp] Setting target %s [protocol=%s]\n", target.name.c_str(), protocol_to_string(target.protocol)); } // -------------------------------------------------------------------------- // Compute size for each source path // -------------------------------------------------------------------------- // As needed, check whether tools to access these protocols can be found bool s3_tool = false; bool http_tool = false; bool gsiftp_tool = false; for (auto& source : source_list) { bool statok = false; struct stat buf; source.atime.tv_nsec = source.mtime.tv_nsec = 0; switch (source.protocol) { // ------------------------------------------ // EOS, XRoot or local file // ------------------------------------------ case Protocol::EOS: case Protocol::XROOT: case Protocol::LOCAL: if (!do_stat(source.name.c_str(), source.protocol, buf)) { // For symbolic links, EOS stat returns the size of the link. // Ignore the size attribute in this case if (source.protocol != Protocol::LOCAL && !S_ISREG(buf.st_mode)) { source.size = 0; if (debug || !silent) { fprintf(stderr, "warning: disable size check for path=%s [EOS symbolic link]\n", source.name.c_str()); } } else { copysize += buf.st_size; source.size = (unsigned long long) buf.st_size; } // Store the a/m-time source.atime.tv_sec = buf.st_atime; source.mtime.tv_sec = buf.st_mtime; statok = true; } break; // ------------------------------------------ // S3 file // ------------------------------------------ case Protocol::AS3: case Protocol::S3: { if (!s3_tool) { if (check_protocol_tool(source.name.c_str())) { return -1; } s3_tool = true; } const char* url = setup_s3_environment(source.name, source.opaque); if (url == NULL) { return -1; } XrdOucString s3env = "env S3_ACCESS_KEY_ID="; s3env += getenv("S3_ACCESS_KEY_ID"); s3env += " S3_HOSTNAME="; s3env += getenv("S3_HOSTNAME"); s3env += " S3_SECRET_ACCESS_KEY="; s3env += getenv("S3_SECRET_ACCESS_KEY"); // Execute 's3' command to retrieve size XrdOucString cmdtext = "bash -c \""; cmdtext += s3env; cmdtext += " s3 head "; cmdtext += url; cmdtext += " | grep Content-Length | awk '{print \\$2}' 2> /dev/null\""; if (debug) { fprintf(stderr, "[eos-cp] running %s\n", cmdtext.c_str()); } long long size = eos::common::StringConversion::LongLongFromShellCmd( cmdtext.c_str()); if ((!size) || (size == LLONG_MAX)) { fprintf(stderr, "error: path=%s cannot obtain size of S3 source file " "or file size is 0!\n", source.name.c_str()); global_retc = EIO; return -1; } copysize += size; source.size = (unsigned long long) size; source.atime.tv_sec = source.mtime.tv_sec = 0; statok = true; break; } // ------------------------------------------ // HTTP(S) & GSIFTP file // ------------------------------------------ case Protocol::GSIFTP: case Protocol::HTTP: case Protocol::HTTPS: if ((source.protocol == Protocol::HTTP || source.protocol == Protocol::HTTPS) && (!http_tool)) { if (check_protocol_tool(source.name.c_str())) { return -1; } http_tool = true; } else if ((source.protocol == Protocol::GSIFTP) && (!gsiftp_tool)) { if (check_protocol_tool(source.name.c_str())) { return -1; } gsiftp_tool = true; } source.size = 0; source.atime.tv_sec = source.mtime.tv_sec = 0; if (debug || !silent) { fprintf(stderr, "warning: disabling size check for path=%s [protocol=%s]\n", source.name.c_str(), protocol_to_string(source.protocol)); } statok = true; break; default: break; } if (!statok) { fprintf(stderr, "error: cannot get file size of path=%s [protocol=%s]\n", source.name.c_str(), protocol_to_string(source.protocol)); global_retc = EINVAL; return -1; } if (debug) { fprintf(stderr, "[eos-cp] path=%s size=%llu [protocol=%s]\n", source.name.c_str(), source.size, protocol_to_string(source.protocol)); } } if (debug || (!silent && source_list.size() > 1)) { XrdOucString ssize; fprintf(stderr, "[eos-cp] going to copy %lu files and %s\n", source_list.size(), eos::common::StringConversion::GetReadableSizeString(ssize, copysize, "B")); } // Mark start timestamp gettimeofday(&start_time, &tz); // -------------------------------------------------------------------------- // Create 'eoscp' command for each source path // and effectively perform the copy operation // -------------------------------------------------------------------------- int file_idx = -1; retc = 0; for (auto& source : source_list) { XrdOucString dest = target.name.c_str(); // Processed target path + original target opaque info XrdOucString target_path = ""; // Temporary file upload flag bool temporary_file = false; file_idx++; //------------------------------------ // Process destination path //------------------------------------ // Append source suffix to destination // The source suffix: = if (target_is_dir) { XrdOucString source_suffix = source.name.c_str(); int pos = source_suffix.find(source_basepath_list[file_idx].c_str()); if (pos == STR_NPOS) { fprintf(stderr, "error: could not identify source suffix for path=%s\n", source.name.c_str()); global_retc = EINVAL; return -1; } pos += source_basepath_list[file_idx].length(); source_suffix.keep(pos); dest += source_suffix.c_str(); } // Check that source and destination are different if (!strcmp(source.name.c_str(), dest.c_str())) { fprintf(stderr, "warning: source and target are the same path=%s. Skipping path..\n", source.name.c_str()); continue; } // Add opaque info to destination if (target.opaque.length()) { dest += "?"; dest += target.opaque.c_str(); } target_path = dest.c_str(); // Continue processing for non STDOUT targets if (!target_is_stdout) { // Check if destination exists if (nooverwrite) { if ((target.protocol == Protocol::LOCAL) || (target.protocol == Protocol::EOS)) { struct stat tmp; if (!do_stat(dest.c_str(), target.protocol, tmp)) { fprintf(stderr, "warning: target=%s exists, but --no-overwrite " "flag specified\n", dest.c_str()); retc |= EEXIST; continue; } } } // Handle EOS specific opaque info if ((target.protocol == Protocol::EOS) || (target.protocol == Protocol::XROOT)) { char opaque[1024]; std::string roles = eos_roles_opaque(); snprintf(opaque, sizeof(opaque) - 1, "%ceos.targetsize=%llu&eos.bookingsize=%llu&eos.app=%s%s%s%s", (target.opaque.length()) ? '&' : '?', source.size, source.size, getenv("EOSAPP") ? getenv("EOSAPP") : "eoscp", atomic.c_str(), roles.size() ? "&" : "", roles.size() ? roles.c_str() : ""); dest.append(opaque); } // Protocols for EOS, XRoot and local targets are supported directly // S3 targets will be uploaded via STDIN & STDOUT pipes // Remaining protocols will be copied to a temporary file if ((target.protocol == Protocol::HTTP) || (target.protocol == Protocol::HTTPS) || (target.protocol == Protocol::GSIFTP)) { char tmp_name[] = "/tmp/com_cp.XXXXXX"; int tmp_fd = mkstemp(tmp_name); if (tmp_fd == -1) { fprintf(stderr, "error: failed to create temporary file " "while preparing copy for path=%s [protocol=%s]\n", dest.c_str(), protocol_to_string(target.protocol)); global_retc = errno; return -1; } close(tmp_fd); temporary_file = true; dest = tmp_name; } } //------------------------------------ // Process source path //------------------------------------ // Expand '/eos/' shortcut for EOS protocol if ((source.protocol == Protocol::EOS) && (source.name.beginswith("/eos/"))) { if (!serveruri.endswith("/")) { source.name.insert("/", 0); } source.name.insert(serveruri.c_str(), 0); } // Add opaque info to source if (source.opaque.length()) { source.name += "?"; source.name += source.opaque.c_str(); } if (debug) { fprintf(stderr, "\n[eos-cp] copying %s to %s\n", source.name.c_str(), target_path.c_str()); } //------------------------------------ // Prepare STDIN and STDOUT pipes //------------------------------------ XrdOucString transfersize = ""; // used for STDIN pipes to specify the target size to eoscp XrdOucString cmdtext = ""; bool rstdin = false; bool rstdout = false; if ((source.protocol == Protocol::EOS) || (source.protocol == Protocol::XROOT)) { std::string roles = eos_roles_opaque(); source.name += (source.opaque.length()) ? "&" : "?"; source.name += "eos.app="; source.name += getenv("EOSAPP") ? getenv("EOSAPP") : "eoscp"; source.name += roles.size() ? "&" : ""; source.name += roles.size() ? roles.c_str() : ""; } else if ((source.protocol != Protocol::LOCAL) && (source.protocol != Protocol::UNKNOWN)) { bool old_noprogress = noprogress; noprogress = true; XrdOucString safesource = source.name.c_str(); while (safesource.replace("'", "\\'")) {} safesource.replace("as3:", "", 0, 3); XrdOucString tool = ""; if (source.protocol == Protocol::HTTP) { tool = "curl "; } if (source.protocol == Protocol::HTTPS) { tool = "curl -k "; } if (source.protocol == Protocol::GSIFTP) { tool = "globus-url-copy "; } if ((source.protocol == Protocol::AS3) || (source.protocol == Protocol::S3)) { tool = "s3 get "; noprogress = old_noprogress; } cmdtext += tool; cmdtext += "$'"; cmdtext += safesource; cmdtext += "'"; if (source.protocol == Protocol::GSIFTP) { cmdtext += " -"; } cmdtext += " | "; rstdin = true; } if ((source.protocol == Protocol::AS3) || (source.protocol == Protocol::S3) || (target.protocol == Protocol::AS3) || (target.protocol == Protocol::S3)) { char ts[1024]; snprintf(ts, sizeof(ts) - 1, "%llu ", source.size); transfersize = ts; } if ((target.protocol == Protocol::AS3) || (target.protocol == Protocol::S3)) { rstdout = true; } //------------------------------------ // Prepare eoscp transaction name //------------------------------------ XrdOucString safename = source.name.c_str(); int qpos = safename.rfind("?"); if (qpos != STR_NPOS) { safename.erase(qpos); } if (source.protocol != Protocol::LOCAL) { XrdOucString sprot, hostport; const char* url = eos::common::StringConversion::ParseUrl(safename.c_str(), sprot, hostport); if (url) { std::string surl = url; safename = surl.c_str(); } } safename = eos::common::Path(safename.c_str()).GetName();; eos::common::StringConversion::SealXrdPath(safename); safename.replace("'", "\\'"); //------------------------------------ // Construct 'eoscp' command //------------------------------------ cmdtext += "eoscp "; if (append) { cmdtext += "-a "; } if (debug_level) { cmdtext += (debug_level == 1) ? "-v " : "-d "; } if (!summary) { cmdtext += "-s "; } if (makeparent) { cmdtext += "-p "; } if (noprogress) { cmdtext += "-n "; } if (nooverwrite) { cmdtext += "-x "; } if (transfersize.length()) { cmdtext += "-T "; cmdtext += transfersize; cmdtext += " "; } if (rate.length()) { cmdtext += "-t "; cmdtext += rate.c_str(); cmdtext += " "; } cmdtext += "-N $'"; cmdtext += safename.c_str(); cmdtext += "' "; if (rstdin) { cmdtext += "- "; } else { XrdOucString safesource = source.name.c_str(); safesource.replace("'", "\\'"); cmdtext += "$'"; cmdtext += safesource; cmdtext += "' "; } if (rstdout) { cmdtext += "-"; } else { XrdOucString safedest = dest.c_str(); safedest.replace("'", "\\'"); cmdtext += "$'"; cmdtext += safedest; cmdtext += "'"; } if ((target.protocol == Protocol::AS3) || (target.protocol == Protocol::S3)) { // s3 can upload via STDIN XrdOucString s3dest = dest.c_str(); s3dest.replace("as3:", "", 0, 3); cmdtext += " | s3 put "; cmdtext += s3dest.c_str(); cmdtext += " contentLength="; cmdtext += transfersize.c_str(); cmdtext += " > /dev/null"; } if (debug) { fprintf(stderr, "[eos-cp] running: %s\n", cmdtext.c_str()); } int lrc = system(cmdtext.c_str()); // Check if we got a CONTROL-C if (lrc == EINTR) { fprintf(stderr, "\n"); break; } if (WEXITSTATUS(lrc)) { fprintf(stderr, "error: failed copying path=%s\n", target_path.c_str()); retc |= lrc; continue; } //------------------------------------ // Check target size //------------------------------------ if (((target.protocol == Protocol::EOS) || (target.protocol == Protocol::XROOT) || (target.protocol == Protocol::LOCAL)) && (!target_is_stdout)) { struct stat buf; if (!do_stat(target_path.c_str(), target.protocol, buf)) { if ((!source.size) || (buf.st_size == (off_t)(append ? target_stat.st_size + source.size : (off_t) source.size) ) ) { // Preserve creation and modification timestamps if ((preserve) && (source.atime.tv_sec > 0) && (source.mtime.tv_sec > 0)) { bool updateok; if (target.protocol == Protocol::LOCAL) { struct timeval times[2]; times[0].tv_sec = source.atime.tv_sec; times[0].tv_usec = source.atime.tv_nsec / 1000; times[1].tv_sec = source.mtime.tv_sec; times[1].tv_usec = source.mtime.tv_nsec / 1000; updateok = (utimes(target_path.c_str(), times) == 0); } else { char update[1024]; auto roles = eos_roles_opaque(); sprintf(update, "%ceos.app=%s%s%s&mgm.pcmd=utimes" "&tv1_sec=%llu&tv1_nsec=%llu" "&tv2_sec=%llu&tv2_nsec=%llu", (target.opaque.length()) ? '&' : '?', getenv("EOSAPP") ? getenv("EOSAPP") : "eoscp", roles.size() ? "&" : "", roles.size() ? roles.c_str() : "", (unsigned long long) source.atime.tv_sec, (unsigned long long) source.atime.tv_nsec, (unsigned long long) source.mtime.tv_sec, (unsigned long long) source.mtime.tv_nsec); XrdOucString request = target_path.c_str(); request += update; char value[4096]; value[0] = 0; long long update_rc = XrdPosixXrootd::QueryOpaque(request.c_str(), value, 4096); updateok = (update_rc >= 0); // Parse the stat output if (updateok) { char tag[1024]; int tmp_retc; int items = sscanf(value, "%1023s retc=%d", tag, &tmp_retc); updateok = ((items == 2) && (strcmp(tag, "utimes:") == 0)); } } if (!updateok) { fprintf(stderr, "warning: creation/modification time " "could not be preserved for path=%s\n", target_path.c_str()); } } // Verify checksum if ((checksums) && (target.protocol != Protocol::LOCAL)) { XrdOucString address = serveruri.c_str(); address += "//dummy"; XrdCl::URL url(address.c_str()); if (!url.IsValid()) { fprintf(stderr, "error: invalid file system URL=%s " "[attempting checksum]\n", url.GetURL().c_str()); global_retc = EINVAL; return -1; } auto* fs = new XrdCl::FileSystem(url); if (!fs) { fprintf(stderr, "error: failed to get new FS object " "[attempting checksum]\n"); global_retc = EINVAL; return -1; } XrdCl::Buffer arg; XrdCl::Buffer* response = nullptr; XrdCl::XRootDStatus status; std::string query_path = dest.c_str(); std::string::size_type pos = query_path.rfind("//"); if (pos != std::string::npos) { query_path.erase(0, pos + 1); } arg.FromString(query_path); status = fs->Query(XrdCl::QueryCode::Checksum, arg, response); if (status.IsOK()) { XrdOucString xsum = response->GetBuffer(); xsum.replace("eos ", ""); fprintf(stdout, "path=%s size=%llu checksum=%s\n", source.name.c_str(), source.size, xsum.c_str()); } else { fprintf(stdout, "warning: failed getting checksum for path=%s size=%llu\n", source.name.c_str(), source.size); } delete response; delete fs; } } else { XrdOucString ssize1, ssize2; fprintf(stderr, "error: file size difference between source and target file " "source=%s [%s] target=%s [%s]\n", source.name.c_str(), eos::common::StringConversion::GetReadableSizeString(ssize1, source.size, "B"), target_path.c_str(), eos::common::StringConversion::GetReadableSizeString(ssize2, (unsigned long long) buf.st_size, "B")); lrc |= 0xffff00; } } else { fprintf(stderr, "error: target file not created source=%s target=%s\n", source.name.c_str(), target_path.c_str()); lrc |= 0xffff00; } } // Attempt to upload temporary file if (temporary_file) { if (target.protocol == Protocol::GSIFTP) { cmdtext = "globus-url-copy file://"; cmdtext += dest.c_str(); cmdtext += " "; cmdtext += target_path.c_str(); if (silent || noprogress) { cmdtext += " >& /dev/null"; } if (debug) { fprintf(stderr, "[eos-cp] running: %s\n", cmdtext.c_str()); } int rc = system(cmdtext.c_str()); if (WEXITSTATUS(rc)) { fprintf(stderr, "error: failed to upload %s [protocol=gsiftp]\n", target_path.c_str()); lrc |= 0xffff00; } } if ((target.protocol == Protocol::HTTP) || (target.protocol == Protocol::HTTPS)) { fprintf(stderr, "error: file uploads not supported for %s protocol [path=%s]\n", protocol_to_string(target.protocol), target_path.c_str()); lrc |= 0xffff00; } // Clean-up the temporary file unlink(dest.c_str()); } if (!WEXITSTATUS(lrc)) { files_copied++; copiedsize += source.size; } retc |= lrc; } // Mark end timestamp gettimeofday(&end_time, &tz); if (debug || !silent) { float time_elapsed = (float)(((end_time.tv_sec - start_time.tv_sec) * 1000000 + (end_time.tv_usec - start_time.tv_usec)) / 1000000.0); unsigned long long copyrate = (copiedsize / time_elapsed); XrdOucString ssize1, ssize2; fprintf(stderr, "%s[eos-cp] copied %d/%d files and %s in %.02f seconds with %s\n", (retc) ? "#WARNING " : "", files_copied, (int) source_list.size(), eos::common::StringConversion::GetReadableSizeString(ssize1, copiedsize, "B"), time_elapsed, eos::common::StringConversion::GetReadableSizeString(ssize2, copyrate, "B/s")); } global_retc = WEXITSTATUS(retc); return global_retc; } // ---------------------------------------------------------------------------- // Helper functions implementation // ---------------------------------------------------------------------------- /** * Convenience function to be used by 'eos cp' to query EOS for file names. * The output of the command is placed into the result vector. * @param cmdline the eos command to be executed * @param result reference to the result vector * @return error code of the command */ int run_eos_command(const char* cmdline, std::vector& result) { XrdOucString cmd = "eos -b "; if (user_role.length() && group_role.length()) { cmd += "--role "; cmd += user_role; cmd += " "; cmd += group_role; cmd += " "; } cmd += cmdline; return run_command(cmd.c_str(), result); } /** * Convenience function to be used by 'eos cp' to execute a command. * The output of the command is placed into the result vector. * @param cmdline the bash command to be executed * @param result reference to the result vector * @return error code of the command */ int run_command(const char* cmdline, std::vector& result) { FILE* fp = popen(cmdline, "r"); char line[4096]; int rc; if (!fp) { fprintf(stderr, "error: failed executing command %s\n", cmdline); return errno; } while (fgets(line, sizeof(line), fp)) { int size = strlen(line); if (line[size - 1] == '\n') { line[size - 1] = '\0'; } result.emplace_back(line); } rc = pclose(fp); return WEXITSTATUS(rc); } /** * Converts from local to absolute path. * This function makes the distinction between local or EOS paths. * Any other protocol will be left untouched. * Function is aware of interactive eos shell environment. * Local files will have the 'file:' prefix removed. * @param path the given path * @return abspath the absolute path */ const char* absolute_path(const char* path) { Protocol protocol = get_protocol(path); if (protocol != Protocol::EOS && protocol != Protocol::LOCAL) { return strdup(path); } if (strcmp(path, "-") == 0) { return strdup(path); } XrdOucString spath = path; if (protocol == Protocol::LOCAL && spath.beginswith("file:")) { spath.erase(0, 5); } if (!spath.beginswith("/")) { XrdOucString abspath = ""; if (interactive) { // Construct absolute path within eos shell abspath.insert(gPwd.c_str(), 0); } else { // Construct absolute path within regular shell abspath.insert("/", 0); abspath.insert(getenv("PWD"), 0); } spath.insert(abspath.c_str(), 0); } // Note: eos::common::Path expects an absolute path! // Note: eos::common::Path removes trailing '/'! std::string trailing_slash = ""; if ((spath.endswith("/")) && (!spath.endswith("/./")) && (!spath.endswith("/../"))) { trailing_slash = "/"; } // Sanitize '.' and '..' entries spath = eos::common::Path(spath.c_str()).GetFullPath().c_str(); spath += trailing_slash.c_str(); return strdup(spath.c_str()); } /** * Given a symlink path of the following format 'link -> target', * will return the name of the 'link'. * @param path the path to check * @return path the processed symlink name */ XrdOucString process_symlink(XrdOucString path) { int pos = path.find(" -> "); if (pos != STR_NPOS) { path.erase(pos); } return path; } /** * Will check whether the given path is a directory or not. * For local and EOS protocols, stat information is used. * The stat structure may be passed, otherwise it is constructed. * Function is aware of interactive eos shell environment. * @param path the path to check * @param protocol the protocol to access the path * @param buf stat structure * @return true if directory, false otherwise */ bool is_dir(const char* path, Protocol protocol, struct stat* buf) { if (protocol != Protocol::EOS && protocol != Protocol::LOCAL) { XrdOucString spath = path; return spath.endswith("/"); } int rc = 0; struct stat tmpbuf {}; if (buf == nullptr) { buf = &tmpbuf; const char* abs_path = absolute_path(path); rc = do_stat(abs_path, protocol, *buf); free(const_cast(abs_path)); } return (rc == 0) ? S_ISDIR(buf->st_mode) : false; } /** * Returns eos roles opaque info from the global user variables. * @return roles opaque info containing eos roles */ std::string eos_roles_opaque() { std::string roles; if (user_role.length() && group_role.length()) { roles = "eos.ruid="; roles += user_role.c_str(); roles += "&eos.rgid="; roles += group_role.c_str(); return roles; } return roles; } /** * Perform stat on a given path. * Function makes the distinction between local or EOS paths. * @param path the path to stat * @param protocol the protocol to access the path * @param buf stat structure to fill * @return rc stat error code */ int do_stat(const char* path, Protocol protocol, struct stat& buf) { const char* abs_path = absolute_path(path); int rc = -1; if (protocol == Protocol::EOS || protocol == Protocol::XROOT) { // Stat EOS file XrdOucString url = abs_path; std::string roles = eos_roles_opaque(); // Expand '/eos/' shortcut for EOS protocol if (url.beginswith("/eos/")) { url = serveruri.c_str(); url += (!url.endswith("/")) ? "/" : ""; url += abs_path; } if (!roles.empty()) { url += (url.find("?") == STR_NPOS) ? "?" : "&"; url += roles.c_str(); } rc = XrdPosixXrootd::Stat(url.c_str(), &buf); } else if (protocol == Protocol::LOCAL) { // Stat local file rc = stat(abs_path, &buf); } free((char*)abs_path); return rc; } /** * Given an S3 path, will parse and remove the opaque info. * The following environment variables are set: * S3_ACCESS_KEY_ID
* S3_SECRET_ACCESS_KEY
* S3_HOSTNAME
* @param path the S3 path * @param opaque the opaque info to parse for S3 info * @return url the S3 url */ const char* setup_s3_environment(XrdOucString path, XrdOucString opaque) { XrdOucString sprot, hostport; XrdOucString url = eos::common::StringConversion::ParseUrl(path.c_str(), sprot, hostport); if (!url.length()) { fprintf(stderr, "error: could not parse S3 url=%s", path.c_str()); global_retc = EINVAL; return 0; } if (opaque.length()) { XrdOucEnv env(opaque.c_str()); // Extract opaque S3 tags if present if (env.Get("s3.id")) { setenv("S3_ACCESS_KEY_ID", env.Get("s3.id"), 1); } if (env.Get("s3.key")) { setenv("S3_SECRET_ACCESS_KEY", env.Get("s3.key"), 1); } } if (hostport.length()) { setenv("S3_HOSTNAME", hostport.c_str(), 1); } // Apply the ROOT compatibility environment variables if (getenv("S3_ACCESS_ID")) { setenv("S3_ACCESS_KEY_ID", getenv("S3_ACCESS_ID"), 1); } if (getenv("S3_ACCESS_KEY")) { setenv("S3_SECRET_ACCESS_KEY", getenv("S3_ACCESS_KEY"), 1); } // Check S3 environment if ((!getenv("S3_HOSTNAME")) || (!getenv("S3_ACCESS_KEY_ID")) || (!getenv("S3_SECRET_ACCESS_KEY"))) { fprintf(stderr, "error: S3 environment not set up for %s\n", path.c_str()); fprintf(stderr, "You have to set the following environment variables: " "S3_ACCESS_KEY_ID or S3_ACCESS_ID\n" "S3_SECRET_ACCESS_KEY or S3_ACCESS_KEY\n" "S3_HOSTNAME (or use path with URI)"); global_retc = EINVAL; return 0; } return url.c_str(); } /** * Check if required tools are available to access the given path. * @param path the path to access */ int check_protocol_tool(const char* path) { Protocol protocol = get_protocol(path); std::string tool = ""; char cmd[128]; if (protocol == Protocol::HTTP || protocol == Protocol::HTTPS) { tool = "curl"; } else if (protocol == Protocol::AS3 || protocol == Protocol::S3) { tool = "s3"; } else if (protocol == Protocol::GSIFTP) { tool = "globus-url-copy"; } else { return 0; } sprintf(cmd, "which %s >& /dev/null", tool.c_str()); int rc = system(cmd); if (WEXITSTATUS(rc)) { fprintf(stderr, "error: %s executable not found in PATH\n", tool.c_str()); if (tool == "s3") { fprintf(stderr, " error: please install S3 executable from libs3\n"); } global_retc = WEXITSTATUS(rc); } return WEXITSTATUS(rc); } /** * Returns the protocol for a given path. * Function is aware of interactive eos shell environment. */ Protocol get_protocol(XrdOucString path) { if (path.beginswith("/eos/")) { return Protocol::EOS; } else if (path.beginswith("http://")) { return Protocol::HTTP; } else if (path.beginswith("https://")) { return Protocol::HTTPS; } else if (path.beginswith("gsiftp://")) { return Protocol::GSIFTP; } else if (path.beginswith("root://")) { return Protocol::XROOT; } else if (path.beginswith("as3:")) { return Protocol::AS3; } else if (path.beginswith("s3://")) { return Protocol::S3; } else if (path.beginswith("file:")) { return Protocol::LOCAL; } else if (path.beginswith("/") || (path.find(":/") == STR_NPOS)) { return (interactive) ? Protocol::EOS : Protocol::LOCAL; } return Protocol::UNKNOWN; } /** * Returns a string representation of the protocol. */ const char* protocol_to_string(Protocol protocol) { if (protocol == Protocol::EOS) { return "eos"; } else if (protocol == Protocol::HTTP) { return "http"; } else if (protocol == Protocol::HTTPS) { return "https"; } else if (protocol == Protocol::GSIFTP) { return "gsiftp"; } else if (protocol == Protocol::XROOT) { return "root"; } else if (protocol == Protocol::AS3) { return "as3"; } else if (protocol == Protocol::S3) { return "s3"; } else if (protocol == Protocol::LOCAL) { return "local"; } return "unknown"; } /** * Parse and returns debug level from option string or -1 if invalid. * Option format: -d[=][1|2|3] */ int parse_debug_level(XrdOucString option) { if (option.beginswith("-d")) { option.erase(0, 2); } else if (option.beginswith("--debug")) { option.erase(0, 7); } if (option.length() && ((option[0] == ' ') || (option[0] == '='))) { option.erase(0, 1); } if (!option.length()) { return 0; } int level = 0; try { level = std::stoul(option.c_str()); } catch (...) { } if (level < 1 || level > 3) { fprintf(stderr, "error: invalid value for =%s\n", option.c_str()); return -1; } return level - 1; }