bes Updated for version 3.20.13
GlobalMetadataStore.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of HYrax, A C++ implementation of the OPeNDAP Data
4// Access Protocol.
5
6// Copyright (c) 2018 OPeNDAP, Inc.
7// Author: James Gallagher <jgallagher@opendap.org>
8//
9// This library is free software; you can redistribute it and/or
10// modify it under the terms of the GNU Lesser General Public
11// License as published by the Free Software Foundation; either
12// version 2.1 of the License, or (at your option) any later version.
13//
14// This library is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17// Lesser General Public License for more details.
18//
19// You should have received a copy of the GNU Lesser General Public
20// License along with this library; if not, write to the Free Software
21// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22//
23// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24
25#include "config.h"
26
27#include <fcntl.h> // for posix_advise
28#include <unistd.h>
29
30#include <cerrno>
31#include <cstring>
32
33#include <iostream>
34#include <string>
35#include <fstream>
36#include <sstream>
37#include <functional>
38#include <libdap/DAS.h>
39#include <memory>
40#include <sys/stat.h>
41
42#include <libdap/DapObj.h>
43#include <libdap/DDS.h>
44#include <libdap/DMR.h>
45#include <libdap/D4ParserSax2.h>
46#include <libdap/XMLWriter.h>
47#include <libdap/BaseTypeFactory.h>
48#include <libdap/D4BaseTypeFactory.h>
49
50#include "PicoSHA2/picosha2.h"
51
52#include "TempFile.h"
53#include "TheBESKeys.h"
54#include "BESUtil.h"
55#include "BESLog.h"
56#include "BESContextManager.h"
57#include "BESDebug.h"
58#include "BESRequestHandler.h"
59#include "BESRequestHandlerList.h"
60#include "BESNotFoundError.h"
61
62#include "BESInternalError.h"
63#include "BESInternalFatalError.h"
64
65#include "GlobalMetadataStore.h"
66
67#define DEBUG_KEY "metadata_store"
68#define MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED 0
69
70#ifdef HAVE_ATEXIT
71#define AT_EXIT(x) atexit((x))
72#else
73#define AT_EXIT(x)
74#endif
75
85#undef SYMETRIC_ADD_RESPONSES
86
87#define prolog std::string("GlobalMetadataStore::").append(__func__).append("() - ")
88
89using namespace std;
90using namespace libdap;
91using namespace bes;
92
93static const unsigned int default_cache_size = 20; // 20 GB
94static const string default_cache_prefix = "mds";
95static const string default_cache_dir = ""; // I'm making the default empty so that no key == no caching. jhrg 9.26.16
96static const string default_ledger_name = "mds_ledger.txt";
97
98static const string PATH_KEY = "DAP.GlobalMetadataStore.path";
99static const string PREFIX_KEY = "DAP.GlobalMetadataStore.prefix";
100static const string SIZE_KEY = "DAP.GlobalMetadataStore.size";
101static const string LEDGER_KEY = "DAP.GlobalMetadataStore.ledger";
102static const string LOCAL_TIME_KEY = "BES.LogTimeLocal";
103
104GlobalMetadataStore *GlobalMetadataStore::d_instance = 0;
105bool GlobalMetadataStore::d_enabled = true;
106
121void GlobalMetadataStore::transfer_bytes(int fd, ostream &os)
122{
123 static const int BUFFER_SIZE = 16*1024;
124
125#if _POSIX_C_SOURCE >= 200112L
126 /* Advise the kernel of our access pattern. */
127 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
128 if (status != 0)
129 ERROR_LOG(prolog << "Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
130#endif
131
132 char buf[BUFFER_SIZE + 1];
133
134 while(int bytes_read = read(fd, buf, BUFFER_SIZE))
135 {
136 if(bytes_read == -1)
137 throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
138 if (!bytes_read)
139 break;
140
141 os.write(buf, bytes_read);
142 }
143}
144
157void GlobalMetadataStore::insert_xml_base(int fd, ostream &os, const string &xml_base)
158{
159 static const int BUFFER_SIZE = 1024;
160
161#if _POSIX_C_SOURCE >= 200112L
162 /* Advise the kernel of our access pattern. */
163 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
164 if (status != 0)
165 ERROR_LOG(prolog << "Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
166#endif
167
168 char buf[BUFFER_SIZE + 1];
169 size_t bytes_read = read(fd, buf, BUFFER_SIZE);
170
171 if(bytes_read == (size_t)-1)
172 throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
173
174 if (bytes_read == 0)
175 return;
176
177 // Every valid DMR/++ response in the MDS starts with:
178 // <?xml version="1.0" encoding="ISO‌-8859-1"?>
179 //
180 // and has one of two kinds of <Dataset...> tags
181 // 1: <Dataset xmlns="..." xml:base="file:DMR_1.xml" ... >
182 // 2: <Dataset xmlns="..." ... >
183 //
184 // Assume it is well formed and always includes the prolog,
185 // but might not use <CR> <CRLF> chars
186
187 // transfer the prolog (<?xml version="1.0" encoding="ISO‌-8859-1"?>)
188 size_t i = 0;
189 while (buf[i++] != '>')
190 ; // 'i' now points one char past the xml prolog
191 os.write(buf, i);
192
193 // transfer <Dataset ...> with new value for xml:base
194 size_t s = i; // start of <Dataset ...>
195 size_t j = 0;
196 char xml_base_literal[] = "xml:base";
197 while (i < bytes_read) {
198 if (buf[i] == '>') { // Found end of Dataset; no xml:base was present
199 os.write(buf + s, i - s);
200 os << " xml:base=\"" << xml_base << "\"";
201 break;
202 }
203 else if (j == sizeof(xml_base_literal) - 1) { // found 'xml:base' literal
204 os.write(buf + s, i - s); // This will include all of <Dataset... including 'xml:base'
205 while (buf[i++] != '=')
206 ; // read/discard '="..."'
207 while (buf[i++] != '"')
208 ;
209 while (buf[i++] != '"')
210 ;
211 os << "=\"" << xml_base << "\""; // write the new xml:base value
212 break;
213 }
214 else if (buf[i] == xml_base_literal[j]) {
215 ++j;
216 }
217 else {
218 j = 0;
219 }
220
221 ++i;
222 }
223
224 // transfer the rest
225 os.write(buf + i, bytes_read - i);
226
227 // Now, if the response is more than 1k, use faster code to finish the tx
228 transfer_bytes(fd, os);
229}
230
231unsigned long GlobalMetadataStore::get_cache_size_from_config()
232{
233 bool found;
234 string size;
235 unsigned long size_in_megabytes = default_cache_size;
236 TheBESKeys::TheKeys()->get_value(SIZE_KEY, size, found);
237 if (found) {
238 BESDEBUG(DEBUG_KEY,
239 "GlobalMetadataStore::getCacheSizeFromConfig(): Located BES key " << SIZE_KEY << "=" << size << endl);
240 istringstream iss(size);
241 iss >> size_in_megabytes;
242 }
243
244 return size_in_megabytes;
245}
246
247string GlobalMetadataStore::get_cache_prefix_from_config()
248{
249 bool found;
250 string prefix = default_cache_prefix;
251 TheBESKeys::TheKeys()->get_value(PREFIX_KEY, prefix, found);
252 if (found) {
253 BESDEBUG(DEBUG_KEY,
254 "GlobalMetadataStore::getCachePrefixFromConfig(): Located BES key " << PREFIX_KEY << "=" << prefix << endl);
255 prefix = BESUtil::lowercase(prefix);
256 }
257
258 return prefix;
259}
260
261// If the cache prefix is the empty string, the cache is turned off.
262string GlobalMetadataStore::get_cache_dir_from_config()
263{
264 bool found;
265
266 string cacheDir = default_cache_dir;
267 TheBESKeys::TheKeys()->get_value(PATH_KEY, cacheDir, found);
268 if (found) {
269 BESDEBUG(DEBUG_KEY,
270 "GlobalMetadataStore::getCacheDirFromConfig(): Located BES key " << PATH_KEY<< "=" << cacheDir << endl);
271 }
272
273 return cacheDir;
274}
275
290
308GlobalMetadataStore::get_instance(const string &cache_dir, const string &prefix, unsigned long long size)
309{
310 if (d_enabled && d_instance == 0) {
311 d_instance = new GlobalMetadataStore(cache_dir, prefix, size); // never returns null_ptr
312 d_enabled = d_instance->cache_enabled();
313 if (!d_enabled) {
314 delete d_instance;
315 d_instance = 0;
316
317 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
318 }
319 else {
320 AT_EXIT(delete_instance);
321
322 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
323 }
324 }
325
326 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance(dir,prefix,size) - d_instance: " << d_instance << endl);
327
328 return d_instance;
329}
330
339{
340 if (d_enabled && d_instance == 0) {
341 d_instance = new GlobalMetadataStore(get_cache_dir_from_config(), get_cache_prefix_from_config(),
342 get_cache_size_from_config());
343 d_enabled = d_instance->cache_enabled();
344 if (!d_enabled) {
345 delete d_instance;
346 d_instance = nullptr;
347 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
348 }
349 else {
350 AT_EXIT(delete_instance);
351
352 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
353 }
354 }
355
356 BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance() - d_instance: " << (void *) d_instance << endl);
357
358 return d_instance;
359}
361
365void
367{
368 bool found;
369
370 TheBESKeys::TheKeys()->get_value(LEDGER_KEY, d_ledger_name, found);
371 if (found) {
372 BESDEBUG(DEBUG_KEY, "Located BES key " << LEDGER_KEY << "=" << d_ledger_name << endl);
373 }
374 else {
375 d_ledger_name = default_ledger_name;
376 }
377
378 ofstream of(d_ledger_name.c_str(), ios::app);
379
380 // By default, use UTC in the logs.
381 string local_time = "no";
382 TheBESKeys::TheKeys()->get_value(LOCAL_TIME_KEY, local_time, found);
383 d_use_local_time = (local_time == "YES" || local_time == "Yes" || local_time == "yes");
384}
385
402 : BESFileLockingCache(get_cache_dir_from_config(), get_cache_prefix_from_config(), get_cache_size_from_config())
403{
404 initialize();
405}
406
407GlobalMetadataStore::GlobalMetadataStore(const string &cache_dir, const string &prefix,
408 unsigned long long size) : BESFileLockingCache(cache_dir, prefix, size)
409{
410 initialize();
411}
413
418static void dump_time(ostream &os, bool use_local_time)
419{
420 time_t now;
421 time(&now);
422 char buf[sizeof "YYYY-MM-DDTHH:MM:SSzone"];
423 int status = 0;
424
425 // From StackOverflow:
426 // This will work too, if your compiler doesn't support %F or %T:
427 // strftime(buf, sizeof buf, "%Y-%m-%dT%H:%M:%S%Z", gmtime(&now));
428 //
429 // Apologies for the twisted logic - UTC is the default. Override to
430 // local time using BES.LogTimeLocal=yes in bes.conf. jhrg 11/15/17
431 struct tm result;
432 if (!use_local_time) {
433 gmtime_r(&now, &result);
434 status = strftime(buf, sizeof buf, "%FT%T%Z", &result);
435 }
436 else {
437 localtime_r(&now, &result);
438 status = strftime(buf, sizeof buf, "%FT%T%Z", &result);
439 }
440 if (!status)
441 ERROR_LOG(prolog << "Error getting time for Metadata Store ledger.");
442
443 os << buf;
444}
445
449void
451{
452 // open just once, <- done SBL 11.7.19
453
454 int fd; // value-result parameter;
455 if (get_exclusive_lock(d_ledger_name, fd)) {
456 BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Ledger " << d_ledger_name << " write locked." << endl);
457 if (of) {
458 try {
459 dump_time(of, d_use_local_time);
460 of << " " << d_ledger_entry << endl;
461 VERBOSE("MDS Ledger name: '" << d_ledger_name << "', entry: '" << d_ledger_entry + "'.");
462 unlock_and_close(d_ledger_name); // closes fd
463 }
464 catch (...) {
465 unlock_and_close(d_ledger_name);
466 throw;
467 }
468 }
469 else {
470 ERROR_LOG(prolog << "Warning: Metadata store could not write to its ledger file.");
471 unlock_and_close(d_ledger_name);
472 }
473 }
474 else {
475 throw BESInternalError("Could not write lock '" + d_ledger_name, __FILE__, __LINE__);
476 }
477}
478
485string
487{
488 if (name.empty())
489 throw BESInternalError("Empty name passed to the Metadata Store.", __FILE__, __LINE__);
490
491 return picosha2::hash256_hex_string(name[0] == '/' ? name : "/" + name);
492}
493
511{
512 if (d_dds) {
513 D4BaseTypeFactory factory;
514 DMR dmr(&factory, *d_dds);
515 XMLWriter xml;
516 dmr.print_dap4(xml);
517 os << xml.get_doc();
518 }
519 else if (d_dmr) {
520 XMLWriter xml;
521 d_dmr->print_dap4(xml);
522 os << xml.get_doc();
523 }
524 else {
525 throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
526 }
527}
528
531 if (d_dds)
532 d_dds->print(os);
533 else if (d_dmr)
534 d_dmr->getDDS()->print(os);
535 else
536 throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
537}
538
541 if (d_dds)
542 d_dds->print_das(os);
543 else if (d_dmr)
544 d_dmr->getDDS()->print_das(os);
545 else
546 throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
547}
548
563bool
564GlobalMetadataStore::store_dap_response(StreamDAP &writer, const string &key, const string &name,
565 const string &response_name)
566{
567 BESDEBUG(DEBUG_KEY, __FUNCTION__ << " BEGIN " << key << endl);
568
569 string item_name = get_cache_file_name(key, false /*mangle*/);
570
571 int fd;
572 if (create_and_lock(item_name, fd)) {
573 BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Storing " << item_name << endl);
574
575 // Get an output stream directed at the locked cache file
576 ofstream response(item_name.c_str(), ios::out|ios::app);
577 if (!response.is_open())
578 throw BESInternalError("Could not open '" + key + "' to write the response.", __FILE__, __LINE__);
579
580 try {
581 // for the different writers, look at the StreamDAP struct in the class
582 // definition. jhrg 2.27.18
583 writer(response); // different writers can write the DDS, DAS or DMR
584
585 // Compute/update/maintain the cache size? This is extra work
586 // that might never be used. It also locks the cache...
587 if (!is_unlimited() || MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED) {
588 // This enables the call to update_cache_info() below.
590
591 unsigned long long size = update_cache_info(item_name);
592 if (!is_unlimited() && cache_too_big(size)) update_and_purge(item_name);
593 }
594
595 unlock_and_close(item_name);
596 }
597 catch (...) {
598 // Bummer. There was a problem doing The Stuff. Now we gotta clean up.
599 response.close();
600 this->purge_file(item_name);
601 unlock_and_close(item_name);
602 throw;
603 }
604
605 VERBOSE("Metadata store: Wrote " << response_name << " response for '" << name << "'." << endl);
606 d_ledger_entry.append(" ").append(key);
607
608 return true;
609 }
610 else if (get_read_lock(item_name, fd)) {
611 // We found the key; didn't add this because it was already here
612 BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Found " << item_name << " in the store already." << endl);
613 unlock_and_close(item_name);
614
615 ERROR_LOG(prolog << "Metadata store: unable to store the " << response_name << " response for '" << name << "'." << endl);
616
617 return false;
618 }
619 else {
620 throw BESInternalError("Could neither create or open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
621 }
622}
623
638
650bool
651GlobalMetadataStore::add_responses(DDS *dds, const string &name)
652{
653 // Start the index entry
654 d_ledger_entry = string("add DDS ").append(name);
655
656 // I'm appending the 'dds r' string to the name before hashing so that
657 // the different hashes for the file's DDS, DAS, ..., are all very different.
658 // This will be useful if we use S3 instead of EFS for the Metadata Store.
659 //
660 // The helper() also updates the ledger string.
661 StreamDDS write_the_dds_response(dds);
662 bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
663
664 StreamDAS write_the_das_response(dds);
665 bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
666
667#if SYMETRIC_ADD_RESPONSES
668 StreamDMR write_the_dmr_response(dds);
669 bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
670#endif
671
672 write_ledger(); // write the index line
673
674#if SYMETRIC_ADD_RESPONSES
675 return (stored_dds && stored_das && stored_dmr);
676#else
677 return (stored_dds && stored_das);
678#endif
679}
680
692bool
693GlobalMetadataStore::add_responses(DMR *dmr, const string &name)
694{
695 // Start the index entry
696 d_ledger_entry = string("add DMR ").append(name);
697
698 // I'm appending the 'dds r' string to the name before hashing so that
699 // the different hashes for the file's DDS, DAS, ..., are all very different.
700 // This will be useful if we use S3 instead of EFS for the Metadata Store.
701 //
702 // The helper() also updates the ledger string.
703#if SYMETRIC_ADD_RESPONSES
704 StreamDDS write_the_dds_response(dmr);
705 bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
706
707 StreamDAS write_the_das_response(dmr);
708 bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
709#endif
710
711 StreamDMR write_the_dmr_response(dmr);
712 bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
713
714 write_ledger(); // write the index line
715
716#if SYMETRIC_ADD_RESPONSES
717 return (stored_dds && stored_das && stored_dmr);
718#else
719 return(stored_dmr /* && stored_dmrpp */);
720#endif
721}
723
737GlobalMetadataStore::get_read_lock_helper(const string &name, const string &suffix, const string &object_name)
738{
739 BESDEBUG(DEBUG_KEY, __func__ << "() MDS hashing name '" << name << "', '" << suffix << "'"<< endl);
740
741 if (name.empty())
742 throw BESInternalError("An empty name string was received by "
743 "GlobalMetadataStore::get_read_lock_helper(). That should never happen.", __FILE__, __LINE__);
744
745 string item_name = get_cache_file_name(get_hash(name + suffix), false);
746 int fd;
747 MDSReadLock lock(item_name, get_read_lock(item_name, fd), this);
748 BESDEBUG(DEBUG_KEY, __func__ << "() MDS lock for " << item_name << ": " << lock() << endl);
749
750 if (lock())
751 INFO_LOG(prolog << "MDS Cache hit for '" << name << "' and response " << object_name << endl);
752 else
753 INFO_LOG(prolog << "MDS Cache miss for '" << name << "' and response " << object_name << endl);
754
755 return lock;
756 }
757
778{
779 return get_read_lock_helper(name,"dmr_r","DMR");
780}//end is_dmr_available(string)
781
784{
785 //call get_read_lock_helper
786 MDSReadLock lock = get_read_lock_helper(container.get_relative_name(), "dmr_r", "DMR");
787 if (lock()){
788
789 bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmr_r");
790
791 if(reload){
792 lock.clearLock();
793 return lock;
794 }//end if
795 else{
796 return lock;
797 }//end else
798
799 }//end if(is locked)
800 else{
801 return lock;
802 }//end else
803
804}//end is_dmr_available(BESContainer)
805
807GlobalMetadataStore::is_dmr_available(const std::string &realName, const std::string &relativeName, const std::string &fileType)
808{
809 //call get_read_lock_helper
810 MDSReadLock lock = get_read_lock_helper(relativeName,"dmr_r","DMR");
811 if (lock()){
812
813 bool reload = is_available_helper(realName, relativeName, fileType, "dmr_r");
814
815 if(reload){
816 lock.clearLock();
817 return lock;
818 }//end if
819 else{
820 return lock;
821 }//end else
822
823 }//end if(is locked)
824 else{
825 return lock;
826 }//end else
827
828}//end is_dmr_available(string, string, string)
829
839{
840 return get_read_lock_helper(name,"dds_r","DDS");
841}//end is_dds_available(string)
842
855{
856 //call get_read_lock_helper
857 MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dds_r","DDS");
858 if (lock()){
859
860 bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dds_r");
861
862 if(reload){
863 lock.clearLock();
864 return lock;
865 }//end if
866 else{
867 return lock;
868 }//end else
869
870 }//end if(is locked)
871 else{
872 return lock;
873 }//end else
874
875}//end is_dds_available(BESContainer)
876
886{
887 return get_read_lock_helper(name,"das_r","DAS");
888}//end is_das_available(string)
889
892{
893 //return get_read_lock_helper(name, "das_r", "DAS");
894 //call get_read_lock_helper
895 MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"das_r","DAS");
896 if (lock()){
897
898 bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "das_r");
899
900 if(reload){
901 lock.clearLock();
902 return lock;
903 }//end if
904 else{
905 return lock;
906 }//end else
907
908 }//end if(is locked)
909 else{
910 return lock;
911 }//end else
912
913}//end is_das_available(BESContainer)
914
935{
936 return get_read_lock_helper(name,"dmrpp_r","DMR++");
937}//end is_dmrpp_available(string)
938
941{
942 //return get_read_lock_helper(name, "dmrpp_r", "DMR++");
943 //call get_read_lock_helper
944 MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dmrpp_r","DMR++");
945 if (lock()){
946
947 bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmrpp_r");
948
949 if(reload){
950 lock.clearLock();
951 return lock;
952 }//end if
953 else{
954 return lock;
955 }//end else
956
957 }//end if(is locked)
958 else{
959 return lock;
960 }//end else
961
962}//end is_dmrpp_available(BESContainer)
963
974bool
975GlobalMetadataStore::is_available_helper(const string &realName, const string &relativeName, const string &fileType, const string &suffix)
976{
977 //use type with find_handler() to get handler
978 BESRequestHandler *besRH = BESRequestHandlerList::TheList()->find_handler(fileType);
979
980 //use handler.get_lmt()
981 time_t file_time = besRH->get_lmt(realName);
982
983 //get the cache time of the handler
984 time_t cache_time = get_cache_lmt(relativeName, suffix);
985
986 //compare file lmt and time of creation of cache
987 if (file_time > cache_time){
988 return true;
989 }//end if(file > cache)
990 else {
991 return false;
992 }//end else
993}
994
1002time_t
1003GlobalMetadataStore::get_cache_lmt(const string &fileName, const string &suffix)
1004{
1005 string item_name = get_cache_file_name(get_hash(fileName + suffix), false);
1006 struct stat statbuf;
1007
1008 if (stat(item_name.c_str(), &statbuf) == -1){
1009 throw BESNotFoundError(strerror(errno), __FILE__, __LINE__);
1010 }//end if(error)
1011
1012 return statbuf.st_mtime;
1013}//end get_cache_lmt()
1014
1017
1025void
1026GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &object_name)
1027{
1028 string item_name = get_cache_file_name(get_hash(name + suffix), false);
1029 int fd; // value-result parameter;
1030 if (get_read_lock(item_name, fd)) {
1031 VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
1032 BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1033 try {
1034 transfer_bytes(fd, os);
1035 unlock_and_close(item_name); // closes fd
1036 }
1037 catch (...) {
1038 unlock_and_close(item_name);
1039 throw;
1040 }
1041 }
1042 else {
1043 throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1044 }
1045}
1046
1055void
1056GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &xml_base,
1057 const string &object_name)
1058{
1059 string item_name = get_cache_file_name(get_hash(name + suffix), false);
1060 int fd; // value-result parameter;
1061 if (get_read_lock(item_name, fd)) {
1062 VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
1063 BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1064 try {
1065 insert_xml_base(fd, os, xml_base);
1066
1067 transfer_bytes(fd, os);
1068 unlock_and_close(item_name); // closes fd
1069 }
1070 catch (...) {
1071 unlock_and_close(item_name);
1072 throw;
1073 }
1074 }
1075 else {
1076 throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1077 }
1078}
1080
1087void
1088GlobalMetadataStore::write_dds_response(const std::string &name, ostream &os)
1089{
1090 write_response_helper(name, os, "dds_r", "DDS");
1091}
1092
1099void
1100GlobalMetadataStore::write_das_response(const std::string &name, ostream &os)
1101{
1102 write_response_helper(name, os, "das_r", "DAS");
1103}
1104
1111void
1112GlobalMetadataStore::write_dmr_response(const std::string &name, ostream &os)
1113{
1114 bool found = false;
1115 string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1116 if (!found) {
1117#if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1118 write_response_helper(name, os, "dmr_r", "DMR");
1119#else
1120 throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1121#endif
1122 }
1123 else {
1124 write_response_helper(name, os, "dmr_r", xml_base, "DMR");
1125 }
1126}
1127
1134void
1135GlobalMetadataStore::write_dmrpp_response(const std::string &name, ostream &os)
1136{
1137 bool found = false;
1138 string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1139 if (!found) {
1140#if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1141 write_response_helper(name, os, "dmrpp_r", "DMR++");
1142#else
1143 throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1144#endif
1145 }
1146 else {
1147 write_response_helper(name, os, "dmrpp_r", xml_base, "DMR++");
1148 }
1149}
1150
1158bool
1159GlobalMetadataStore::remove_response_helper(const string& name, const string &suffix, const string &object_name)
1160{
1161 string hash = get_hash(name + suffix);
1162 if (unlink(get_cache_file_name(hash, false).c_str()) == 0) {
1163 VERBOSE("Metadata store: Removed " << object_name << " response for '" << hash << "'." << endl);
1164 d_ledger_entry.append(" ").append(hash);
1165 return true;
1166 }
1167 else {
1168 ERROR_LOG(prolog << "Metadata store: unable to remove the " << object_name << " response for '" << name << "' (" << strerror(errno) << ")."<< endl);
1169 }
1170
1171 return false;
1172}
1173
1180bool
1182{
1183 // Start the index entry
1184 d_ledger_entry = string("remove ").append(name);
1185
1186 bool removed_dds = remove_response_helper(name, "dds_r", "DDS");
1187
1188 bool removed_das = remove_response_helper(name, "das_r", "DAS");
1189
1190 bool removed_dmr = remove_response_helper(name, "dmr_r", "DMR");
1191
1192 bool removed_dmrpp = remove_response_helper(name, "dmrpp_r", "DMR++");
1193
1194 write_ledger(); // write the index line
1195
1196#if SYMETRIC_ADD_RESPONSES
1197 return (removed_dds && removed_das && removed_dmr);
1198#else
1199 return (removed_dds || removed_das || removed_dmr || removed_dmrpp);
1200#endif
1201}
1202
1215DMR *
1217{
1218 stringstream oss;
1219 write_dmr_response(name, oss); // throws BESInternalError if not found
1220
1221 D4BaseTypeFactory d4_btf;
1222 unique_ptr<DMR> dmr(new DMR(&d4_btf, "mds"));
1223
1224 D4ParserSax2 parser;
1225 parser.intern(oss.str(), dmr.get());
1226
1227 dmr->set_factory(0);
1228
1229 return dmr.release();
1230}
1231
1253DDS *
1255{
1256 TempFile dds_tmp;
1257 string dds_tmp_name = dds_tmp.create(get_cache_directory(), "mds_dds");
1258
1259 fstream dds_fs(dds_tmp_name.c_str(), std::fstream::out);
1260 try {
1261 write_dds_response(name, dds_fs); // throws BESInternalError if not found
1262 dds_fs.close();
1263 }
1264 catch (...) {
1265 dds_fs.close();
1266 throw;
1267 }
1268
1269 BaseTypeFactory btf;
1270 unique_ptr<DDS> dds(new DDS(&btf));
1271 dds->parse(dds_tmp_name);
1272
1273 TempFile das_tmp;
1274 string das_tmp_name = das_tmp.create(get_cache_directory(), "mds_das");
1275 fstream das_fs(das_tmp_name.c_str(), std::fstream::out);
1276 try {
1277 write_das_response(name, das_fs); // throws BESInternalError if not found
1278 das_fs.close();
1279 }
1280 catch (...) {
1281 das_fs.close();
1282 throw;
1283 }
1284
1285 unique_ptr<DAS> das(new DAS());
1286 das->parse(das_tmp_name);
1287
1288 dds->transfer_attributes(das.get());
1289 dds->set_factory(nullptr);
1290
1291 return dds.release();
1292}
1293
1294
1295void
1296GlobalMetadataStore::parse_das_from_mds(libdap::DAS* das, const std::string &name) {
1297 string suffix = "das_r";
1298 string item_name = get_cache_file_name(get_hash(name + suffix), false);
1299 int fd; // value-result parameter;
1300 if (get_read_lock(item_name, fd)) {
1301 VERBOSE("Metadata store: Cache hit: read " << " response for '" << name << "'." << endl);
1302 BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1303 try {
1304 // Just generate the DAS by parsing from the file
1305 das->parse(item_name);
1306 unlock_and_close(item_name); // closes fd
1307 }
1308 catch (...) {
1309 unlock_and_close(item_name);
1310 throw;
1311 }
1312 }
1313 else {
1314 throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1315 }
1316
1317}
1318
A container is something that holds data. E.G., a netcdf file or a database entry.
Definition: BESContainer.h:65
std::string get_relative_name() const
Get the relative name of the object in this container.
Definition: BESContainer.h:186
std::string get_container_type() const
retrieve the type of data this container holds, such as cedar or netcdf.
Definition: BESContainer.h:232
std::string get_real_name() const
retrieve the real name for this container, such as a file name.
Definition: BESContainer.h:180
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
Implementation of a caching mechanism for compressed data.
virtual void unlock_and_close(const std::string &target)
const std::string get_cache_directory()
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual bool get_exclusive_lock(const std::string &target, int &fd)
virtual void purge_file(const std::string &file)
Purge a single file from the cache.
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
exception thrown if internal error encountered
exception thrown if an internal error is found and is fatal to the BES
error thrown if the resource requested cannot be found
virtual BESRequestHandler * find_handler(const std::string &handler_name)
find and return the specified request handler
Represents a specific data type request handler.
virtual time_t get_lmt(const std::string &name)
Get the Last modified time for.
static std::string lowercase(const std::string &s)
Definition: BESUtil.cc:254
void get_value(const std::string &s, std::string &val, bool &found)
Retrieve the value of a given key, if set.
Definition: TheBESKeys.cc:340
static TheBESKeys * TheKeys()
Definition: TheBESKeys.cc:71
Store the DAP metadata responses.
virtual libdap::DDS * get_dds_object(const std::string &name)
Build a DDS object from the cached Response.
virtual void write_dmr_response(const std::string &name, std::ostream &os)
Write the stored DMR response to a stream.
void write_response_helper(const std::string &name, std::ostream &os, const std::string &suffix, const std::string &object_name)
std::string get_hash(const std::string &name)
static void transfer_bytes(int fd, std::ostream &os)
virtual bool remove_responses(const std::string &name)
Remove all cached responses and objects for a granule.
virtual void write_dds_response(const std::string &name, std::ostream &os)
Write the stored DDS response to a stream.
void initialize()
Configure the ledger using LEDGER_KEY and LOCAL_TIME_KEY.
static GlobalMetadataStore * get_instance()
bool store_dap_response(StreamDAP &writer, const std::string &key, const std::string &name, const std::string &response_name)
static void insert_xml_base(int fd, std::ostream &os, const std::string &xml_base)
like transfer_bytes(), but adds the xml:base attribute to the DMR/++
virtual void write_das_response(const std::string &name, std::ostream &os)
Write the stored DAS response to a stream.
bool remove_response_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
virtual MDSReadLock is_dmrpp_available(const std::string &name)
Is the DMR++ response for.
virtual MDSReadLock is_dds_available(const std::string &name)
Is the DDS response for.
virtual libdap::DMR * get_dmr_object(const std::string &name)
Build a DMR object from the cached Response.
MDSReadLock get_read_lock_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
virtual time_t get_cache_lmt(const std::string &fileName, const std::string &suffix)
Get the last modified time for the cached object file.
virtual MDSReadLock is_dmr_available(const std::string &name)
Is the DMR response for.
virtual MDSReadLock is_das_available(const std::string &name)
Is the DAS response for.
virtual bool is_available_helper(const std::string &realName, const std::string &relativeName, const std::string &fileType, const std::string &suffix)
helper function that checks if last modified time is greater than cached file
virtual bool add_responses(libdap::DDS *dds, const std::string &name)
Add the DAP2 metadata responses using a DDS.
virtual void write_dmrpp_response(const std::string &name, std::ostream &os)
Write the stored DMR++ response to a stream.
Get a new temporary file.
Definition: TempFile.h:46
std::string create(const std::string &dir_name="/tmp/hyrax_tmp", const std::string &path_template="opendap")
Create a new temporary file.
Definition: TempFile.cc:170
Unlock and close the MDS item when the ReadLock goes out of scope.
Instantiate with a DDS or DMR and use to write the DAS response.
virtual void operator()(std::ostream &os)
Instantiate with a DDS or DMR and use to write the DDS response.
virtual void operator()(std::ostream &os)
Instantiate with a DDS or DMR and use to write the DMR response.
virtual void operator()(std::ostream &os)
Use an object (DDS or DMR) to write data to the MDS.