34#include <BESInternalError.h>
35#include <BESSyntaxUserError.h>
36#include <BESForbiddenError.h>
37#include <BESContextManager.h>
40#define PUGIXML_NO_XPATH
41#define PUGIXML_HEADER_ONLY
46#include "CurlHandlePool.h"
47#include "EffectiveUrlCache.h"
48#include "DmrppRequestHandler.h"
49#include "DmrppNames.h"
54#define prolog std::string("Chunk::").append(__func__).append("() - ")
56#define FLETCHER32_CHECKSUM 4
57#define ACTUALLY_USE_FLETCHER32_CHECKSUM 1
73size_t chunk_header_callback(
char *buffer,
size_t ,
size_t nitems,
void *data) {
79 string header(buffer, buffer + nitems - 2);
82 if (header.find(
"Content-Type") != string::npos) {
84 auto c_ptr =
reinterpret_cast<Chunk *
>(data);
85 c_ptr->set_response_content_type(header.substr(header.find_last_of(
' ') + 1));
96void process_s3_error_response(
const shared_ptr<http::url> &data_url,
const string &xml_message)
99 string json_message = xml2json(xml_message.c_str());
101 d.Parse(json_message.c_str());
107 pugi::xml_document error;
108 pugi::xml_parse_result result = error.load_string(xml_message.c_str());
110 throw BESInternalError(
"The underlying data store returned an unintelligible error message.", __FILE__, __LINE__);
112 pugi::xml_node err_elmnt = error.document_element();
113 if (!err_elmnt || (strcmp(err_elmnt.name(),
"Error") != 0))
114 throw BESInternalError(
"The underlying data store returned a bogus error message.", __FILE__, __LINE__);
116 string code = err_elmnt.child_value(
"Code");
117 string message = err_elmnt.child_value(
"Message");
123 if (code ==
"AccessDenied") {
125 msg << prolog <<
"ACCESS DENIED - The underlying object store has refused access to: ";
126 msg << data_url->str() <<
" Object Store Message: " << message;
127 BESDEBUG(MODULE, msg.str() << endl);
128 VERBOSE(msg.str() << endl);
133 msg << prolog <<
"ERROR - The underlying object store returned an error. ";
134 msg <<
"(Tried: " << data_url->str() <<
") Object Store Message: " << message;
135 BESDEBUG(MODULE, msg.str() << endl);
136 VERBOSE(msg.str() << endl);
154size_t chunk_write_data(
void *buffer,
size_t size,
size_t nmemb,
void *data) {
155 BESDEBUG(MODULE, prolog <<
"BEGIN " << endl);
156 size_t nbytes = size * nmemb;
157 auto chunk =
reinterpret_cast<Chunk *
>(data);
160 auto data_url = chunk->get_data_url();
161 BESDEBUG(MODULE, prolog <<
"chunk->get_data_url():" << data_url << endl);
164 BESDEBUG(MODULE, prolog <<
"chunk->get_response_content_type():" << chunk->get_response_content_type() << endl);
165 if (chunk->get_response_content_type().find(
"application/xml") != string::npos) {
168 string xml_message =
reinterpret_cast<const char *
>(buffer);
169 xml_message.erase(xml_message.find_last_not_of(
"\t\n\v\f\r 0") + 1);
174 process_s3_error_response(data_url, xml_message);
181 catch (std::exception &e) {
183 msg << prolog <<
"Caught std::exception when accessing object store data.";
184 msg <<
" (Tried: " << data_url->str() <<
")" <<
" Message: " << e.what();
185 BESDEBUG(MODULE, msg.str() << endl);
194 unsigned long long bytes_read = chunk->get_bytes_read();
197 if (bytes_read + nbytes > chunk->get_rbuf_size()) {
199 msg << prolog <<
"ERROR! The number of bytes_read: " << bytes_read <<
" plus the number of bytes to read: "
200 << nbytes <<
" is larger than the target buffer size: " << chunk->get_rbuf_size();
201 BESDEBUG(MODULE, msg.str() << endl);
202 DmrppRequestHandler::curl_handle_pool->release_all_handles();
206 memcpy(chunk->get_rbuf() + bytes_read, buffer, nbytes);
207 chunk->set_bytes_read(bytes_read + nbytes);
209 BESDEBUG(MODULE, prolog <<
"END" << endl);
224void inflate(
char *dest,
unsigned long long dest_len,
char *src,
unsigned long long src_len) {
228 assert(dest_len > 0);
235 memset(&z_strm, 0,
sizeof(z_strm));
236 z_strm.next_in = (Bytef *) src;
237 z_strm.avail_in = src_len;
238 z_strm.next_out = (Bytef *) dest;
239 z_strm.avail_out = dest_len;
242 if (Z_OK != inflateInit(&z_strm))
243 throw BESError(
"Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
249 status = inflate(&z_strm, Z_SYNC_FLUSH);
252 if (Z_STREAM_END == status)
break;
255 if (Z_OK != status) {
256 stringstream err_msg;
257 err_msg <<
"Failed to inflate data chunk.";
258 char *err_msg_cstr = z_strm.msg;
260 err_msg <<
" zlib message: " << err_msg_cstr;
261 (void) inflateEnd(&z_strm);
262 throw BESError(err_msg.str(), BES_INTERNAL_ERROR, __FILE__, __LINE__);
269 if (0 == z_strm.avail_out) {
270 throw BESError(
"Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
277 if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
278 (void) inflateEnd(&z_strm);
279 HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0,
"memory allocation failed for inflate decompression")
284 z_strm.next_out = (
unsigned char*) outbuf + z_strm.total_out;
285 z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
292 (void) inflateEnd(&z_strm);
320void unshuffle(
char *dest,
const char *src,
unsigned long long src_size,
unsigned long long width) {
321 unsigned long long elems = src_size / width;
324 if (!(width > 1 && elems > 1)) {
325 memcpy(dest,
const_cast<char *
>(src), src_size);
329 char *_src =
const_cast<char *
>(src);
333 for (
unsigned int i = 0; i < width; i++) {
345 size_t duffs_index = (elems + 7) / 8;
348 assert(0 &&
"This Should never be executed!");
353#define DUFF_GUTS *_dest = *_src++; _dest += width;
370 }
while (--duffs_index > 0);
378 size_t leftover = src_size % width;
383 _dest -= (width - 1);
384 memcpy((
void *) _dest, (
void *) _src, leftover);
394static void split_by_comma(
const string &s, vector<unsigned long long> &res)
396 const string delimiter =
",";
397 const size_t delim_len = delimiter.length();
399 size_t pos_start = 0, pos_end;
401 while ((pos_end = s.find (delimiter, pos_start)) != string::npos) {
402 res.push_back (stoull(s.substr(pos_start, pos_end - pos_start)));
403 pos_start = pos_end + delim_len;
406 res.push_back (stoull(s.substr (pos_start)));
409void Chunk::parse_chunk_position_in_array_string(
const string &pia, vector<unsigned long long> &cpia_vect)
411 if (pia.empty())
return;
413 if (!cpia_vect.empty()) cpia_vect.clear();
417 if (pia.find(
'[') == string::npos || pia.find(
']') == string::npos || pia.length() < 3)
418 throw BESInternalError(
"while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
420 if (pia.find_first_not_of(
"[]1234567890,") != string::npos)
421 throw BESInternalError(
"while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
425 istringstream iss(pia.substr(1, pia.length() - 2));
431 cpia_vect.push_back(i);
436 split_by_comma(pia.substr(1, pia.length() - 2), cpia_vect);
438 catch(std::invalid_argument &e) {
439 throw BESInternalError(
string(
"while parsing a DMR++, chunk position string illegal character(s): ").append(e.what()), __FILE__, __LINE__);
460 if (pia.empty())
return;
462 if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
466 if (pia.find(
'[') == string::npos || pia.find(
']') == string::npos || pia.length() < 3)
467 throw BESInternalError(
"while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
469 if (pia.find_first_not_of(
"[]1234567890,") != string::npos)
470 throw BESInternalError(
"while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
473 istringstream iss(pia.substr(1, pia.length() - 2));
479 d_chunk_position_in_array.push_back(i);
483 parse_chunk_position_in_array_string(pia,d_chunk_position_in_array);
495 if (pia.empty())
return;
497 if (!d_chunk_position_in_array.empty()) d_chunk_position_in_array.clear();
499 d_chunk_position_in_array = pia;
510 return curl::get_range_arg_string(d_offset, d_size);
529 if(d_data_url ==
nullptr)
533 string cloudydap_context_value = BESContextManager::TheManager()->
get_context(S3_TRACKING_CONTEXT, found);
551 bool add_tracking =
false;
556 string s3_vh_regex_str = R
"(^https?:\/\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\.s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/.*$)";
558 BESRegex s3_vh_regex(s3_vh_regex_str.c_str());
559 int match_result = s3_vh_regex.
match(d_data_url->str().c_str(), d_data_url->str().length());
560 if(match_result>=0) {
561 auto match_length = (
unsigned int) match_result;
562 if (match_length == d_data_url->str().length()) {
564 prolog <<
"FULL MATCH. pattern: " << s3_vh_regex_str <<
" url: " << d_data_url->str() << endl);
565 add_tracking =
true;;
571 string s3_path_regex_str = R
"(^https?:\/\/s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\/.*$)";
572 BESRegex s3_path_regex(s3_path_regex_str.c_str());
573 match_result = s3_path_regex.match(d_data_url->str().c_str(), d_data_url->str().length());
574 if(match_result>=0) {
575 auto match_length = (
unsigned int) match_result;
576 if (match_length == d_data_url->str().length()) {
578 prolog <<
"FULL MATCH. pattern: " << s3_vh_regex_str <<
" url: " << d_data_url->str() << endl);
579 add_tracking =
true;;
586 d_query_marker.append(S3_TRACKING_CONTEXT).append(
"=").append(cloudydap_context_value);
597checksum_fletcher32(
const void *_data,
size_t _len)
599 const auto *data = (
const uint8_t *)_data;
600 size_t len = _len / 2;
601 uint32_t sum1 = 0, sum2 = 0;
610 size_t tlen = len > 360 ? 360 : len;
613 sum1 += (uint32_t)(((uint16_t)data[0]) << 8) | ((uint16_t)data[1]);
617 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
618 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
623 sum1 += (uint32_t)(((uint16_t)*data) << 8);
625 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
626 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
630 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
631 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
633 return ((sum2 << 16) | sum1);
648void Chunk::inflate_chunk(
bool deflate,
bool shuffle,
bool fletcher32,
unsigned long long chunk_size,
649 unsigned long long elem_width) {
664 chunk_size *= elem_width;
667 char *dest =
new char[chunk_size];
671#if DMRPP_USE_SUPER_CHUNKS
674 set_rbuf(dest, chunk_size);
688#if DMRPP_USE_SUPER_CHUNKS
702#if ACTUALLY_USE_FLETCHER32_CHECKSUM
705#pragma GCC diagnostic push
706#pragma GCC diagnostic ignored "-Wcast-align"
710#pragma GCC diagnostic pop
715 if (f_checksum != checksum_fletcher32((
const void *)
get_rbuf(),
get_rbuf_size() - FLETCHER32_CHECKSUM)) {
716 throw BESInternalError(
"Data read from the DMR++ handler did not match the Fletcher32 checksum.",
720 if (d_read_buffer_size > FLETCHER32_CHECKSUM)
721 d_read_buffer_size -= FLETCHER32_CHECKSUM;
723 throw BESInternalError(
"Data filtered with fletcher32 don't include the four-byte checksum.",
728 d_is_inflated =
true;
733 dods_float32 *vals = (dods_float32 *)
get_rbuf();
735 (*os) << std::fixed << std::setfill(
'_') << std::setw(10) << std::setprecision(0);
736 (*os) <<
"DmrppArray::"<< __func__ <<
"() - Chunk[" << i <<
"]: " << endl;
737 for(
unsigned long long k=0; k< chunk_buf_size/prototype()->width(); k++) {
738 (*os) << vals[k] <<
", " << ((k==0)|((k+1)%10)?
"":
"\n");
755void Chunk::filter_chunk(
const string &filters,
unsigned long long chunk_size,
unsigned long long elem_width) {
760 chunk_size *= elem_width;
764 for (
auto i = filter_array.rbegin(), e = filter_array.rend(); i != e; ++i){
767 if (filter ==
"deflate"){
768 char *dest =
new char[chunk_size];
772#if DMRPP_USE_SUPER_CHUNKS
775 set_rbuf(dest, chunk_size);
783 else if (filter ==
"shuffle"){
788#if DMRPP_USE_SUPER_CHUNKS
799 else if (filter ==
"fletcher32"){
801#if ACTUALLY_USE_FLETCHER32_CHECKSUM
804#pragma GCC diagnostic push
805#pragma GCC diagnostic ignored "-Wcast-align"
809#pragma GCC diagnostic pop
814 uint32_t calc_checksum = checksum_fletcher32((
const void *)
get_rbuf(),
get_rbuf_size() - FLETCHER32_CHECKSUM);
815 if (f_checksum != calc_checksum) {
816 throw BESInternalError(
"Data read from the DMR++ handler did not match the Fletcher32 checksum.",
820 if (d_read_buffer_size > FLETCHER32_CHECKSUM)
821 d_read_buffer_size -= FLETCHER32_CHECKSUM;
823 throw BESInternalError(
"Data filtered with fletcher32 don't include the four-byte checksum.",
828 d_is_inflated =
true;
842 BESDEBUG(MODULE, prolog <<
"Already been read! Returning." << endl);
848 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(
this);
850 throw BESInternalError(prolog +
"No more libcurl handles.", __FILE__, __LINE__);
854 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
857 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
864 oss <<
"Wrong number of bytes read for chunk; read: " <<
get_bytes_read() <<
", expected: " <<
get_size();
882 oss <<
"[ptr='" << (
void *)
this <<
"']";
883 oss <<
"[data_url='" << d_data_url->str() <<
"']";
884 oss <<
"[offset=" << d_offset <<
"]";
885 oss <<
"[size=" << d_size <<
"]";
886 oss <<
"[chunk_position_in_array=(";
887 for (
unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
889 oss << d_chunk_position_in_array[i];
892 oss <<
"[is_read=" << d_is_read <<
"]";
893 oss <<
"[is_inflated=" << d_is_inflated <<
"]";
896string Chunk::to_string()
const {
897 std::ostringstream oss;
905 std::shared_ptr<http::EffectiveUrl> effective_url = EffectiveUrlCache::TheCache()->get_effective_url(d_data_url);
906 BESDEBUG(MODULE, prolog <<
"Using data_url: " << effective_url->str() << endl);
910 if (!d_query_marker.empty()) {
911 string url_str = effective_url->str();
912 if(url_str.find(
"?") != string::npos){
918 url_str += d_query_marker;
919 shared_ptr<http::url> query_marker_url(
new http::url(url_str));
920 return query_marker_url;
923 return effective_url;
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
static std::ostream * GetStrm()
return the debug stream
Abstract exception class for the BES with basic string message.
error thrown if the BES is not allowed to access the resource requested
exception thrown if internal error encountered
Regular expression matching.
int match(const char *s, int len, int pos=0) const
Does the pattern match.
error thrown if there is a user syntax error in the request or any other user error
static std::vector< std::string > split(const std::string &s, char delim='/', bool skip_empty=true)
Splits the string s into the return vector of tokens using the delimiter delim and skipping empty val...
virtual void dump(std::ostream &strm) const
virtual char * get_rbuf()
virtual void read_chunk()
void add_tracking_query_param()
Modify this chunk's data URL so that it includes tracking info.
virtual std::string get_curl_range_arg_string()
Returns a curl range argument. The libcurl requires a string argument for range-ge activitys,...
virtual std::shared_ptr< http::url > get_data_url() const
Get the data url for this Chunk's data block.
virtual void set_rbuf_to_size()
Allocates the internal read buffer to be d_size bytes.
virtual unsigned long long get_bytes_read() const
Get the number of bytes read so far for this Chunk.
void set_position_in_array(const std::string &pia)
parse the chunk position string
virtual unsigned long long get_rbuf_size() const
virtual unsigned long long get_size() const
Get the size of this Chunk's data block on disk.
void set_read_buffer(char *buf, unsigned long long buf_size, unsigned long long bytes_read=0, bool assume_ownership=true)
Set the target read buffer for this chunk.
virtual void filter_chunk(const std::string &filters, unsigned long long chunk_size, unsigned long long elem_width)
filter data in the chunk
Bundle a libcurl easy handle with other information.
void read_data()
This is the read_data() method for all transfers.
GenericValue< UTF8<> > Value
GenericValue with UTF8 encoding.
GenericDocument< UTF8<> > Document
GenericDocument with UTF8 encoding.