34#include <BESInternalError.h>
35#include <BESSyntaxUserError.h>
36#include <BESForbiddenError.h>
37#include <BESContextManager.h>
40#define PUGIXML_NO_XPATH
41#define PUGIXML_HEADER_ONLY
46#include "CurlHandlePool.h"
47#include "EffectiveUrlCache.h"
48#include "DmrppRequestHandler.h"
49#include "DmrppNames.h"
54#define prolog std::string("Chunk::").append(__func__).append("() - ")
56#define FLETCHER32_CHECKSUM 4
57#define ACTUALLY_USE_FLETCHER32_CHECKSUM 1
73size_t chunk_header_callback(
char *buffer,
size_t ,
size_t nitems,
void *data) {
79 string header(buffer, buffer + nitems - 2);
82 if (header.find(
"Content-Type") != string::npos) {
84 auto c_ptr =
reinterpret_cast<Chunk *
>(data);
85 c_ptr->set_response_content_type(header.substr(header.find_last_of(
' ') + 1));
96void process_s3_error_response(
const shared_ptr<http::url> &data_url,
const string &xml_message)
100 pugi::xml_document error;
101 pugi::xml_parse_result result = error.load_string(xml_message.c_str());
103 throw BESInternalError(
"The underlying data store returned an unintelligible error message.", __FILE__, __LINE__);
105 pugi::xml_node err_elmnt = error.document_element();
106 if (!err_elmnt || (strcmp(err_elmnt.name(),
"Error") != 0))
107 throw BESInternalError(
"The underlying data store returned a bogus error message.", __FILE__, __LINE__);
109 string code = err_elmnt.child_value(
"Code");
110 string message = err_elmnt.child_value(
"Message");
116 if (code ==
"AccessDenied") {
118 msg << prolog <<
"ACCESS DENIED - The underlying object store has refused access to: ";
119 msg << data_url->str() <<
" Object Store Message: " << message;
120 BESDEBUG(MODULE, msg.str() << endl);
121 VERBOSE(msg.str() << endl);
126 msg << prolog <<
"ERROR - The underlying object store returned an error. ";
127 msg <<
"(Tried: " << data_url->str() <<
") Object Store Message: " << message;
128 BESDEBUG(MODULE, msg.str() << endl);
129 VERBOSE(msg.str() << endl);
147size_t chunk_write_data(
void *buffer,
size_t size,
size_t nmemb,
void *data) {
148 BESDEBUG(MODULE, prolog <<
"BEGIN " << endl);
149 size_t nbytes = size * nmemb;
150 auto chunk =
reinterpret_cast<Chunk *
>(data);
153 auto data_url = chunk->get_data_url();
154 BESDEBUG(MODULE, prolog <<
"chunk->get_data_url():" << data_url << endl);
157 BESDEBUG(MODULE, prolog <<
"chunk->get_response_content_type():" << chunk->get_response_content_type() << endl);
158 if (chunk->get_response_content_type().find(
"application/xml") != string::npos) {
161 string xml_message =
reinterpret_cast<const char *
>(buffer);
162 xml_message.erase(xml_message.find_last_not_of(
"\t\n\v\f\r 0") + 1);
167 process_s3_error_response(data_url, xml_message);
174 catch (std::exception &e) {
176 msg << prolog <<
"Caught std::exception when accessing object store data.";
177 msg <<
" (Tried: " << data_url->str() <<
")" <<
" Message: " << e.what();
178 BESDEBUG(MODULE, msg.str() << endl);
187 unsigned long long bytes_read = chunk->get_bytes_read();
190 if (bytes_read + nbytes > chunk->get_rbuf_size()) {
192 msg << prolog <<
"ERROR! The number of bytes_read: " << bytes_read <<
" plus the number of bytes to read: "
193 << nbytes <<
" is larger than the target buffer size: " << chunk->get_rbuf_size();
194 BESDEBUG(MODULE, msg.str() << endl);
195 DmrppRequestHandler::curl_handle_pool->release_all_handles();
199 memcpy(chunk->get_rbuf() + bytes_read, buffer, nbytes);
200 chunk->set_bytes_read(bytes_read + nbytes);
202 BESDEBUG(MODULE, prolog <<
"END" << endl);
217void inflate(
char *dest,
unsigned long long dest_len,
char *src,
unsigned long long src_len) {
221 assert(dest_len > 0);
228 memset(&z_strm, 0,
sizeof(z_strm));
229 z_strm.next_in = (Bytef *) src;
230 z_strm.avail_in = src_len;
231 z_strm.next_out = (Bytef *) dest;
232 z_strm.avail_out = dest_len;
235 if (Z_OK != inflateInit(&z_strm))
236 throw BESError(
"Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
242 status = inflate(&z_strm, Z_SYNC_FLUSH);
245 if (Z_STREAM_END == status)
break;
248 if (Z_OK != status) {
249 stringstream err_msg;
250 err_msg <<
"Failed to inflate data chunk.";
251 char const *err_msg_cstr = z_strm.msg;
253 err_msg <<
" zlib message: " << err_msg_cstr;
254 (void) inflateEnd(&z_strm);
255 throw BESError(err_msg.str(), BES_INTERNAL_ERROR, __FILE__, __LINE__);
262 if (0 == z_strm.avail_out) {
263 throw BESError(
"Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
270 if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
271 (void) inflateEnd(&z_strm);
272 HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0,
"memory allocation failed for inflate decompression")
277 z_strm.next_out = (
unsigned char*) outbuf + z_strm.total_out;
278 z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
285 (void) inflateEnd(&z_strm);
313void unshuffle(
char *dest,
const char *src,
unsigned long long src_size,
unsigned long long width) {
314 unsigned long long elems = src_size / width;
317 if (!(width > 1 && elems > 1)) {
318 memcpy(dest,
const_cast<char *
>(src), src_size);
322 char *_src =
const_cast<char *
>(src);
326 for (
unsigned int i = 0; i < width; i++) {
338 size_t duffs_index = (elems + 7) / 8;
341 assert(0 &&
"This Should never be executed!");
346#define DUFF_GUTS *_dest = *_src++; _dest += width;
363 }
while (--duffs_index > 0);
371 size_t leftover = src_size % width;
376 _dest -= (width - 1);
377 memcpy((
void *) _dest, (
void *) _src, leftover);
387static void split_by_comma(
const string &s, vector<unsigned long long> &res)
389 const string delimiter =
",";
390 const size_t delim_len = delimiter.length();
392 size_t pos_start = 0, pos_end;
394 while ((pos_end = s.find (delimiter, pos_start)) != string::npos) {
395 res.push_back (stoull(s.substr(pos_start, pos_end - pos_start)));
396 pos_start = pos_end + delim_len;
399 res.push_back (stoull(s.substr (pos_start)));
402void Chunk::parse_chunk_position_in_array_string(
const string &pia, vector<unsigned long long> &cpia_vect)
404 if (pia.empty())
return;
406 if (!cpia_vect.empty()) cpia_vect.clear();
410 if (pia.find(
'[') == string::npos || pia.find(
']') == string::npos || pia.length() < 3)
411 throw BESInternalError(
"while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
413 if (pia.find_first_not_of(
"[]1234567890,") != string::npos)
414 throw BESInternalError(
"while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
417 split_by_comma(pia.substr(1, pia.length() - 2), cpia_vect);
419 catch(
const std::invalid_argument &e) {
420 throw BESInternalError(
string(
"while parsing a DMR++, chunk position string illegal character(s): ").append(e.what()), __FILE__, __LINE__);
439 parse_chunk_position_in_array_string(pia,d_chunk_position_in_array);
451 if (pia.empty())
return;
453 if (!d_chunk_position_in_array.empty()) d_chunk_position_in_array.clear();
455 d_chunk_position_in_array = pia;
466 return curl::get_range_arg_string(d_offset, d_size);
489 if(d_data_url ==
nullptr)
493 string cloudydap_context_value = BESContextManager::TheManager()->
get_context(S3_TRACKING_CONTEXT, found);
511 bool add_tracking =
false;
516 string s3_vh_regex_str = R
"(^https?:\/\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\.s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/.*$)";
518 BESRegex s3_vh_regex(s3_vh_regex_str.c_str());
519 int match_result = s3_vh_regex.
match(d_data_url->str().c_str(), d_data_url->str().length());
520 if(match_result>=0) {
521 auto match_length = (
unsigned int) match_result;
522 if (match_length == d_data_url->str().length()) {
524 prolog <<
"FULL MATCH. pattern: " << s3_vh_regex_str <<
" url: " << d_data_url->str() << endl);
525 add_tracking =
true;;
531 string s3_path_regex_str = R
"(^https?:\/\/s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\/.*$)";
532 BESRegex s3_path_regex(s3_path_regex_str.c_str());
533 match_result = s3_path_regex.match(d_data_url->str().c_str(), d_data_url->str().length());
534 if(match_result>=0) {
535 auto match_length = (
unsigned int) match_result;
536 if (match_length == d_data_url->str().length()) {
538 prolog <<
"FULL MATCH. pattern: " << s3_vh_regex_str <<
" url: " << d_data_url->str() << endl);
539 add_tracking =
true;;
546 d_query_marker.append(S3_TRACKING_CONTEXT).append(
"=").append(cloudydap_context_value);
557checksum_fletcher32(
const void *_data,
size_t _len)
559 const auto *data = (
const uint8_t *)_data;
560 size_t len = _len / 2;
561 uint32_t sum1 = 0, sum2 = 0;
570 size_t tlen = len > 360 ? 360 : len;
573 sum1 += (uint32_t)(((uint16_t)data[0]) << 8) | ((uint16_t)data[1]);
577 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
578 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
583 sum1 += (uint32_t)(((uint16_t)*data) << 8);
585 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
586 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
590 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
591 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
593 return ((sum2 << 16) | sum1);
606void Chunk::filter_chunk(
const string &filters,
unsigned long long chunk_size,
unsigned long long elem_width) {
611 chunk_size *= elem_width;
615 for (
auto i = filter_array.rbegin(), e = filter_array.rend(); i != e; ++i) {
618 if (filter ==
"deflate") {
619 char *dest =
new char[chunk_size];
623#if DMRPP_USE_SUPER_CHUNKS
626 set_rbuf(dest, chunk_size);
634 else if (filter ==
"shuffle"){
639#if DMRPP_USE_SUPER_CHUNKS
650 else if (filter ==
"fletcher32"){
652#if ACTUALLY_USE_FLETCHER32_CHECKSUM
662 uint32_t calc_checksum = checksum_fletcher32((
const void *)
get_rbuf(),
get_rbuf_size() - FLETCHER32_CHECKSUM);
663 if (f_checksum != calc_checksum) {
664 throw BESInternalError(
"Data read from the DMR++ handler did not match the Fletcher32 checksum.",
668 if (d_read_buffer_size > FLETCHER32_CHECKSUM)
669 d_read_buffer_size -= FLETCHER32_CHECKSUM;
671 throw BESInternalError(
"Data filtered with fletcher32 don't include the four-byte checksum.",
676 d_is_inflated =
true;
679static unsigned int get_value_size(libdap::Type type)
682 case libdap::dods_int8_c:
683 return sizeof(int8_t);
685 case libdap::dods_int16_c:
686 return sizeof(int16_t);
688 case libdap::dods_int32_c:
689 return sizeof(int32_t);
691 case libdap::dods_int64_c:
692 return sizeof(int64_t);
694 case libdap::dods_uint8_c:
695 case libdap::dods_byte_c:
696 return sizeof(uint8_t);
698 case libdap::dods_uint16_c:
699 return sizeof(uint16_t);
701 case libdap::dods_uint32_c:
702 return sizeof(uint32_t);
704 case libdap::dods_uint64_c:
705 return sizeof(uint64_t);
707 case libdap::dods_float32_c:
708 return sizeof(float);
710 case libdap::dods_float64_c:
711 return sizeof(double);
718const char *get_value_ptr(fill_value &fv, libdap::Type type,
const string &v)
721 case libdap::dods_int8_c:
722 fv.int8 = (int8_t)stoi(v);
723 return (
const char *)&fv.int8;
725 case libdap::dods_int16_c:
726 fv.int16 = (int16_t)stoi(v);
727 return (
const char *)&fv.int16;
729 case libdap::dods_int32_c:
730 fv.int32 = (int32_t)stoi(v);
731 return (
const char *)&fv.int32;
733 case libdap::dods_int64_c:
734 fv.int64 = (int64_t)stoll(v);
735 return (
const char *)&fv.int64;
737 case libdap::dods_uint8_c:
738 case libdap::dods_byte_c:
739 fv.uint8 = (uint8_t)stoi(v);
740 return (
const char *)&fv.uint8;
742 case libdap::dods_uint16_c:
743 fv.uint16 = (uint16_t)stoi(v);
744 return (
const char *)&fv.uint16;
746 case libdap::dods_uint32_c:
747 fv.uint32 = (uint32_t)stoul(v);
748 return (
const char *)&fv.uint32;
750 case libdap::dods_uint64_c:
751 fv.uint64 = (uint64_t)stoull(v);
752 return (
const char *)&fv.uint64;
754 case libdap::dods_float32_c:
756 return (
const char *)&fv.f;
758 case libdap::dods_float64_c:
760 return (
const char *)&fv.d;
772 const char *value = get_value_ptr(fv, d_fill_value_type, d_fill_value);
773 unsigned int value_size = get_value_size(d_fill_value_type);
775 unsigned long long num_values =
get_rbuf_size() / value_size;
778 for (
int i = 0; i < num_values; ++i, buffer += value_size) {
779 memcpy(buffer, value, value_size);
800 if (d_read_buffer_is_mine)
803 if (d_uses_fill_value) {
807 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(
this);
809 throw BESInternalError(prolog +
"No more libcurl handles.", __FILE__, __LINE__);
813 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
822 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
830 oss <<
"Wrong number of bytes read for chunk; read: " <<
get_bytes_read() <<
", expected: " <<
get_size();
848 oss <<
"[ptr='" << (
void *)
this <<
"']";
849 oss <<
"[data_url='" << d_data_url->str() <<
"']";
850 oss <<
"[offset=" << d_offset <<
"]";
851 oss <<
"[size=" << d_size <<
"]";
852 oss <<
"[chunk_position_in_array=(";
853 for (
unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
855 oss << d_chunk_position_in_array[i];
858 oss <<
"[is_read=" << d_is_read <<
"]";
859 oss <<
"[is_inflated=" << d_is_inflated <<
"]";
862string Chunk::to_string()
const {
863 std::ostringstream oss;
870 std::shared_ptr<http::EffectiveUrl> effective_url = EffectiveUrlCache::TheCache()->get_effective_url(d_data_url);
871 BESDEBUG(MODULE, prolog <<
"Using data_url: " << effective_url->str() << endl);
873#if ENABLE_TRACKING_QUERY_PARAMETER
876 if (!d_query_marker.empty()) {
877 string url_str = effective_url->str();
878 if(url_str.find(
'?') != string::npos){
884 url_str += d_query_marker;
885 shared_ptr<http::url> query_marker_url(
new http::url(url_str));
886 return query_marker_url;
890 return effective_url;
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
Base exception class for the BES with basic string message.
error thrown if the BES is not allowed to access the resource requested
exception thrown if internal error encountered
Regular expression matching.
int match(const char *s, int len, int pos=0) const
Does the pattern match.
error thrown if there is a user syntax error in the request or any other user error
static std::vector< std::string > split(const std::string &s, char delim='/', bool skip_empty=true)
Splits the string s into the return vector of tokens using the delimiter delim and skipping empty val...
virtual void set_bytes_read(unsigned long long bytes_read)
Set the size of this Chunk's data block.
virtual void dump(std::ostream &strm) const
virtual char * get_rbuf()
virtual void read_chunk()
void add_tracking_query_param()
Modify this chunk's data URL so that it includes tracking info.
virtual std::string get_curl_range_arg_string()
Returns a curl range argument. The libcurl requires a string argument for range-ge activitys,...
virtual std::shared_ptr< http::url > get_data_url() const
virtual void set_rbuf_to_size()
Allocates the internal read buffer to be d_size bytes.
virtual unsigned long long get_bytes_read() const
void set_position_in_array(const std::string &pia)
parse the chunk position string
virtual unsigned long long get_rbuf_size() const
virtual unsigned long long get_size() const
void set_read_buffer(char *buf, unsigned long long buf_size, unsigned long long bytes_read=0, bool assume_ownership=true)
Set the target read buffer for this chunk.
virtual void filter_chunk(const std::string &filters, unsigned long long chunk_size, unsigned long long elem_width)
filter data in the chunk
virtual void load_fill_values()
Load the chunk with fill values - temporary implementation.
Bundle a libcurl easy handle with other information.
void read_data()
This is the read_data() method for all transfers.