47#include "BESInternalError.h"
53#include "BESFileLockingCache.h"
57#define LOCK "cache-lock"
58#define LOCK_STATUS "cache-lock-status"
60#define CACHE_CONTROL "cache_control"
62#define prolog std::string("BESFileLockingCache::").append(__func__).append("() - ")
67static const unsigned long long BYTES_PER_MEG = 1048576ULL;
71static const unsigned long long MAX_CACHE_SIZE_IN_MEGABYTES = (1ULL << 44);
94BESFileLockingCache::BESFileLockingCache(
const string &cache_dir,
const string &prefix,
unsigned long long size) :
95 d_cache_dir(cache_dir), d_prefix(prefix), d_max_cache_size_in_bytes(size), d_target_size(0), d_cache_info(
""),
98 m_initialize_cache_info();
118 d_cache_dir = cache_dir;
120 d_max_cache_size_in_bytes = size;
122 m_initialize_cache_info();
125static inline string get_errno()
127 char *s_err = strerror(errno);
131 return "Unknown error.";
138static inline struct flock *lock(
int type)
140 static struct flock lock;
142 lock.l_whence = SEEK_SET;
145 lock.l_pid = getpid();
150inline void BESFileLockingCache::m_record_descriptor(
const string &file,
int fd)
152 BESDEBUG(LOCK, prolog <<
"Recording descriptor: " << file <<
", " << fd << endl);
154 d_locks.insert(std::pair<string, int>(file, fd));
157inline int BESFileLockingCache::m_remove_descriptor(
const string &file)
159 BESDEBUG(LOCK, prolog <<
"d_locks size: " << d_locks.size() << endl);
161 FilesAndLockDescriptors::iterator i = d_locks.find(file);
162 if (i == d_locks.end())
return -1;
167 BESDEBUG(LOCK, prolog <<
"Found file descriptor [" << fd <<
"] for file: " << file << endl);
172#if USE_GET_SHARED_LOCK
173inline int BESFileLockingCache::m_find_descriptor(
const string &file)
175 BESDEBUG(LOCK, prolog <<
"d_locks size: " << d_locks.size() << endl);
177 FilesAndLockDescriptors::iterator i = d_locks.find(file);
178 if (i == d_locks.end())
return -1;
180 BESDEBUG(LOCK, prolog <<
"Found file descriptor [" << i->second <<
"] for file: " << file << endl);
191static string lockStatus(
const int fd)
193 struct flock lock_query;
195 lock_query.l_type = F_WRLCK;
196 lock_query.l_start = 0;
197 lock_query.l_whence = SEEK_SET;
198 lock_query.l_len = 0;
199 lock_query.l_pid = 0;
201 int ret = fcntl(fd, F_GETLK, &lock_query);
206 ss <<
"fnctl(" << fd <<
",F_GETLK, &lock) returned: " << ret <<
" errno[" << errno <<
"]: "
207 << strerror(errno) << endl;
210 ss <<
"fnctl(" << fd <<
",F_GETLK, &lock) returned: " << ret << endl;
213 ss <<
"lock_info.l_len: " << lock_query.l_len << endl;
214 ss <<
"lock_info.l_pid: " << lock_query.l_pid << endl;
215 ss <<
"lock_info.l_start: " << lock_query.l_start << endl;
218 switch (lock_query.l_type) {
231 ss <<
"lock_info.l_type: " << type << endl;
232 ss <<
"lock_info.l_whence: " << lock_query.l_whence << endl;
242static void unlock(
int fd)
244 if (fcntl(fd, F_SETLK, lock(F_UNLCK)) == -1) {
245 throw BESInternalError(prolog +
"An error occurred trying to unlock the file: " + get_errno(), __FILE__, __LINE__);
248 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(fd) << endl);
250 if (close(fd) == -1)
throw BESInternalError(prolog +
"Could not close the (just) unlocked file.", __FILE__, __LINE__);
252 BESDEBUG(LOCK, prolog <<
"File Closed. fd: " << fd << endl);
274bool BESFileLockingCache::m_check_ctor_params()
287 BESDEBUG(CACHE, prolog <<
"BEGIN" << endl);
289 if (d_cache_dir.empty()) {
290 BESDEBUG(CACHE, prolog <<
"The cache directory was not specified. CACHE IS DISABLED." << endl);
297 int status = mkdir(d_cache_dir.c_str(), 0775);
300 if (status == -1 && errno != EEXIST) {
301 string err = prolog +
"The cache directory " + d_cache_dir +
" could not be created: " + strerror(errno);
302 throw BESError(err, BES_SYNTAX_USER_ERROR, __FILE__, __LINE__);
305 if (d_prefix.empty()) {
306 string err = prolog +
"The cache file prefix was not specified, must not be empty";
307 throw BESError(err, BES_SYNTAX_USER_ERROR, __FILE__, __LINE__);
317 if (d_max_cache_size_in_bytes < 0) {
318 string err =
"The cache size was not specified, must be greater than zero";
319 throw BESError(err, BES_SYNTAX_USER_ERROR, __FILE__, __LINE__);
324 "BESFileLockingCache::" << __func__ <<
"() -" <<
325 " d_cache_dir: " << d_cache_dir <<
326 " d_prefix: " << d_prefix <<
327 " d_max_cache_size_in_bytes: " << d_max_cache_size_in_bytes << endl);
342static bool createLockedFile(
const string &file_name,
int &ref_fd)
344 BESDEBUG(LOCK, prolog <<
"BEGIN file: " << file_name <<endl);
347 if ((fd = open(file_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666)) < 0) {
357 struct flock *l = lock(F_WRLCK);
359 if (fcntl(fd, F_SETLKW, l) == -1) {
362 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error for '" << file_name <<
"': " << get_errno();
366 BESDEBUG(LOCK, prolog <<
"END file: " << file_name <<endl);
382bool BESFileLockingCache::m_initialize_cache_info()
384 BESDEBUG(CACHE, prolog <<
"BEGIN" << endl);
388 d_max_cache_size_in_bytes = min(d_max_cache_size_in_bytes, MAX_CACHE_SIZE_IN_MEGABYTES);
389 d_max_cache_size_in_bytes *= BYTES_PER_MEG;
390 d_target_size = d_max_cache_size_in_bytes * 0.8;
392 BESDEBUG(CACHE, prolog <<
"d_max_cache_size_in_bytes: "
393 << d_max_cache_size_in_bytes <<
" d_target_size: "<<d_target_size<< endl);
395 bool status = m_check_ctor_params();
399 BESDEBUG(CACHE, prolog <<
"d_cache_info: " << d_cache_info << endl);
403 if (createLockedFile(d_cache_info, d_cache_info_fd)) {
405 unsigned long long size = 0;
406 if (write(d_cache_info_fd, &size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
407 throw BESInternalError(prolog +
"Could not write size info to the cache info file `" + d_cache_info +
"`",
415 if ((d_cache_info_fd = open(d_cache_info.c_str(), O_RDWR)) == -1) {
416 throw BESInternalError(prolog +
"Failed to open cache info file: " + d_cache_info +
" errno: " + get_errno(), __FILE__, __LINE__);
420 BESDEBUG(CACHE,prolog <<
"d_cache_info_fd: " << d_cache_info_fd << endl);
423 BESDEBUG(CACHE,prolog <<
"END [" <<
"CACHE IS " << (
cache_enabled()?
"ENABLED]":
"DISABLED]") << endl);
428static const string chars_excluded_from_filenames =
"<>=,/()\\\"\':? []()$";
448 BESDEBUG(CACHE, prolog <<
"src: '" << src <<
"' mangle: "<< mangle << endl);
453 string::size_type pos = target.find_first_of(chars_excluded_from_filenames);
454 while (pos != string::npos) {
455 target.replace(pos, 1,
"#", 1);
456 pos = target.find_first_of(chars_excluded_from_filenames);
460 if (target.length() > 254) {
462 msg << prolog <<
"Cache filename is longer than 254 characters (name length: ";
463 msg << target.length() <<
", name: " << target;
469 BESDEBUG(CACHE, prolog <<
"target: '" << target <<
"'" << endl);
474#if USE_GET_SHARED_LOCK
488static bool getSharedLock(
const string &file_name,
int &ref_fd)
490 BESDEBUG(LOCK, prolog <<
"Acquiring cache read lock for " << file_name <<endl);
493 if ((fd = open(file_name.c_str(), O_RDONLY)) < 0) {
503 struct flock *l = lock(F_RDLCK);
504 if (fcntl(fd, F_SETLKW, l) == -1) {
507 oss << prolog <<
"cache process: " << l->l_pid <<
" triggered a locking error for '" << file_name <<
"': " << get_errno();
511 BESDEBUG(LOCK, prolog <<
"SUCCESS Read Lock Acquired For " << file_name <<endl);
543#if USE_GET_SHARED_LOCK
544 status = getSharedLock(target, fd);
546 if (status) m_record_descriptor(target, fd);
548 fd = m_find_descriptor(target);
550 if ((fd == -1) && (fd = open(target.c_str(), O_RDONLY)) < 0) {
562 struct flock *l = lock(F_RDLCK);
563 if (fcntl(fd, F_SETLKW, l) == -1) {
567 m_record_descriptor(target, fd);
596 bool status = createLockedFile(target, fd);
598 BESDEBUG(LOCK, prolog <<
"target: " << target <<
" (status: " << status <<
", fd: " << fd <<
")" << endl);
600 if (status) m_record_descriptor(target, fd);
625 lock.l_type = F_RDLCK;
626 lock.l_whence = SEEK_SET;
629 lock.l_pid = getpid();
631 if (fcntl(fd, F_SETLKW, &lock) == -1) {
635 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(fd) << endl);
648 BESDEBUG(LOCK, prolog <<
"d_cache_info_fd: " << d_cache_info_fd << endl);
650 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_WRLCK)) == -1) {
651 throw BESInternalError(
"An error occurred trying to lock the cache-control file" + get_errno(), __FILE__,
655 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(d_cache_info_fd) << endl);
663 BESDEBUG(LOCK, prolog <<
"d_cache_info_fd: " << d_cache_info_fd << endl);
665 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_RDLCK)) == -1) {
666 throw BESInternalError(prolog +
"An error occurred trying to lock the cache-control file" + get_errno(), __FILE__,
670 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(d_cache_info_fd) << endl);
680 BESDEBUG(LOCK, prolog <<
"d_cache_info_fd: " << d_cache_info_fd << endl);
682 if (fcntl(d_cache_info_fd, F_SETLK, lock(F_UNLCK)) == -1) {
683 throw BESInternalError(prolog +
"An error occurred trying to unlock the cache-control file" + get_errno(), __FILE__,
687 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(d_cache_info_fd) << endl);
707 BESDEBUG(LOCK, prolog <<
"BEGIN file: " << file_name << endl);
709 int fd = m_remove_descriptor(file_name);
712 fd = m_remove_descriptor(file_name);
715 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(d_cache_info_fd) << endl);
716 BESDEBUG(LOCK, prolog <<
"END file: "<< file_name<< endl);
731 unsigned long long current_size;
735 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
736 throw BESInternalError(prolog +
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
739 if (read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
740 throw BESInternalError(prolog +
"Could not get read size info from the cache info file!", __FILE__, __LINE__);
743 int statret = stat(target.c_str(), &buf);
745 current_size += buf.st_size;
747 throw BESInternalError(prolog +
"Could not read the size of the new file: " + target +
" : " + get_errno(), __FILE__,
750 BESDEBUG(CACHE, prolog <<
"cache size updated to: " << current_size << endl);
752 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
753 throw BESInternalError(prolog +
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
755 if (write(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
756 throw BESInternalError(prolog +
"Could not write size info from the cache info file!", __FILE__, __LINE__);
774 return current_size > d_max_cache_size_in_bytes;
786 unsigned long long current_size;
790 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
791 throw BESInternalError(prolog +
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
793 if (read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
794 throw BESInternalError(prolog +
"Could not get read size info from the cache info file!", __FILE__, __LINE__);
808 return e1.time < e2.time;
812unsigned long long BESFileLockingCache::m_collect_cache_dir_info(CacheFiles &contents)
814 DIR *dip = opendir(d_cache_dir.c_str());
815 if (!dip)
throw BESInternalError(prolog +
"Unable to open cache directory " + d_cache_dir, __FILE__, __LINE__);
818 vector<string> files;
821 while ((dit = readdir(dip)) != NULL) {
822 string dirEntry = dit->d_name;
823 if (dirEntry.compare(0, d_prefix.length(), d_prefix) == 0 && dirEntry != d_cache_info) {
824 files.push_back(d_cache_dir +
"/" + dirEntry);
830 unsigned long long current_size = 0;
832 for (vector<string>::iterator file = files.begin(); file != files.end(); ++file) {
833 if (stat(file->c_str(), &buf) == 0) {
834 current_size += buf.st_size;
837 entry.size = buf.st_size;
838 entry.time = buf.st_atime;
842 throw BESInternalError(
"Zero-byte file found in cache. " + *file, __FILE__, __LINE__);
844 contents.push_back(entry);
849 contents.sort(entry_op);
872 BESDEBUG(LOCK, prolog <<
"BEGIN filename: " << file_name <<endl);
875 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
881 throw BESInternalError(prolog +
"errno: " + get_errno(), __FILE__, __LINE__);
885 struct flock *l = lock(F_WRLCK);
886 if (fcntl(fd, F_SETLK, l) == -1) {
890 BESDEBUG(LOCK,prolog <<
"exit (false): " << file_name <<
" by: " << l->l_pid << endl);
897 oss << prolog <<
"cache process: " << l->l_pid <<
" triggered a locking error for '" << file_name <<
"': " << get_errno();
905 m_record_descriptor(file_name,fd);
907 BESDEBUG(LOCK, prolog <<
"END filename: " << file_name <<endl);
929 BESDEBUG(CACHE, prolog <<
"Starting the purge" << endl);
932 BESDEBUG(CACHE, prolog <<
"Size is set to unlimited, so no need to purge." << endl);
940 unsigned long long computed_size = m_collect_cache_dir_info(contents);
942 if (BESISDEBUG(
"cache_contents" )) {
943 BESDEBUG(CACHE,
"BEFORE Purge " << computed_size/BYTES_PER_MEG << endl );
944 CacheFiles::iterator ti = contents.begin();
945 CacheFiles::iterator te = contents.end();
946 for (; ti != te; ti++) {
947 BESDEBUG(CACHE, (*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
951 BESDEBUG(CACHE, prolog <<
"Current and target size (in MB) "
952 << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl);
959 CacheFiles::iterator i = contents.begin();
960 while (i != contents.end() && computed_size > d_target_size) {
966 BESDEBUG(CACHE, prolog <<
"purge: " << i->name <<
" removed." << endl);
968 if (unlink(i->name.c_str()) != 0)
970 prolog +
"Unable to purge the file " + i->name +
" from the cache: " + get_errno(), __FILE__,
974 computed_size -= i->size;
978 BESDEBUG(CACHE,prolog <<
"Current and target size (in MB) "
979 << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl);
983 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
984 throw BESInternalError(prolog +
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
986 if (write(d_cache_info_fd, &computed_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
987 throw BESInternalError(prolog +
"Could not write size info to the cache info file!", __FILE__, __LINE__);
989 if (BESISDEBUG(
"cache_contents" )) {
991 computed_size = m_collect_cache_dir_info(contents);
992 BESDEBUG(CACHE,
"AFTER Purge " << computed_size/BYTES_PER_MEG << endl );
993 CacheFiles::iterator ti = contents.begin();
994 CacheFiles::iterator te = contents.end();
995 for (; ti != te; ti++) {
996 BESDEBUG(CACHE, (*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
1026 BESDEBUG(LOCK, prolog <<
"BEGIN filename: " << file_name <<endl);
1029 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
1037 msg << prolog <<
"FAILED to open file: " << file_name <<
" errno: " << get_errno();
1038 BESDEBUG(LOCK, msg.str() << endl);
1044 struct flock *l = lock(F_WRLCK);
1045 if (fcntl(fd, F_SETLKW, l) == -1) {
1048 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error for '" << file_name <<
"': " << get_errno();
1053 m_record_descriptor(file_name,fd);
1056 BESDEBUG(LOCK, prolog <<
"END filename: " << file_name <<endl);
1072 BESDEBUG(CACHE, prolog <<
"Starting the purge" << endl);
1081 unsigned long long size = 0;
1083 if (stat(file.c_str(), &buf) == 0) {
1087 BESDEBUG(CACHE, prolog <<
"file: " << file <<
" removed." << endl);
1089 if (unlink(file.c_str()) != 0)
1090 throw BESInternalError(prolog +
"Unable to purge the file " + file +
" from the cache: " + get_errno(), __FILE__,
1097 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
1098 throw BESInternalError(prolog +
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
1100 if (write(d_cache_info_fd, &cache_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
1101 throw BESInternalError(prolog +
"Could not write size info to the cache info file!", __FILE__, __LINE__);
1125 return (stat(dir.c_str(), &buf) == 0) && (buf.st_mode & S_IFDIR);
1138 strm << BESIndent::LMarg << prolog <<
"(" << (
void *)
this <<
")" << endl;
1139 BESIndent::Indent();
1140 strm << BESIndent::LMarg <<
"cache dir: " << d_cache_dir << endl;
1141 strm << BESIndent::LMarg <<
"prefix: " << d_prefix << endl;
1142 strm << BESIndent::LMarg <<
"size (bytes): " << d_max_cache_size_in_bytes << endl;
1143 BESIndent::UnIndent();
Base exception class for the BES with basic string message.
virtual void dump(std::ostream &strm) const
dumps information about this object
virtual void unlock_cache()
virtual unsigned long long get_cache_size()
Get the cache size.
bool cache_enabled() const
void initialize(const std::string &cache_dir, const std::string &prefix, unsigned long long size)
Initialize an instance of FileLockingCache.
virtual void unlock_and_close(const std::string &target)
const std::string get_cache_directory()
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
virtual void lock_cache_write()
void enable()
Enable the cache.
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
static bool dir_exists(const std::string &dir)
const std::string get_cache_file_prefix()
virtual void lock_cache_read()
virtual bool get_exclusive_lock(const std::string &target, int &fd)
void disable()
Disable the cache.
virtual bool get_exclusive_lock_nb(const std::string &target, int &fd)
virtual void purge_file(const std::string &file)
Purge a single file from the cache.
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
exception thrown if internal error encountered
static std::string assemblePath(const std::string &firstPart, const std::string &secondPart, bool leadingSlash=false, bool trailingSlash=false)
Assemble path fragments making sure that they are separated by a single '/' character.