45#include <libdap/D4Enum.h>
46#include <libdap/D4Attributes.h>
47#include <libdap/D4Maps.h>
48#include <libdap/D4Group.h>
50#include "BESInternalError.h"
53#include "BESStopWatch.h"
55#include "byteswap_compat.h"
56#include "CurlHandlePool.h"
58#include "DmrppArray.h"
59#include "DmrppRequestHandler.h"
60#include "DmrppNames.h"
64#define dmrpp_3 "dmrpp:3"
65#define dmrpp_4 "dmrpp:4"
71#define prolog std::string("DmrppArray::").append(__func__).append("() - ")
77std::mutex transfer_thread_pool_mtx;
78atomic_uint transfer_thread_counter(0);
96bool get_next_future(list<std::future<bool>> &futures, atomic_uint &thread_counter,
unsigned long timeout,
string debug_prefix) {
97 bool future_finished =
false;
99 std::chrono::milliseconds timeout_ms (timeout);
102 auto futr = futures.begin();
103 auto fend = futures.end();
104 bool future_is_valid =
true;
105 while(!future_finished && future_is_valid && futr != fend){
106 future_is_valid = (*futr).valid();
114 if((*futr).wait_for(timeout_ms) != std::future_status::timeout){
116 bool success = (*futr).get();
117 future_finished =
true;
118 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"Called future::get() on a ready future."
119 <<
" success: " << (success?
"true":
"false") << endl);
122 msg << debug_prefix << prolog <<
"The std::future has failed!";
123 msg <<
" thread_counter: " << thread_counter;
139 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"future::wait_for() timed out. (timeout: " <<
140 timeout <<
" ms) There are currently " << futures.size() <<
" futures in process."
141 <<
" thread_counter: " << thread_counter << endl);
145 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"The future was not valid. Dumping... " << endl);
146 future_finished =
true;
150 if (futr!=fend && future_finished) {
153 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"Erased future from futures list. (Erased future was "
154 << (future_is_valid?
"":
"not ") <<
"valid at start.) There are currently " <<
155 futures.size() <<
" futures in process. thread_counter: " << thread_counter << endl);
158 done = future_finished || futures.empty();
161 return future_finished;
176bool one_child_chunk_thread_new(unique_ptr<one_child_chunk_args_new> args)
179 args->child_chunk->read_chunk();
181 assert(args->the_one_chunk->get_rbuf());
182 assert(args->child_chunk->get_rbuf());
183 assert(args->child_chunk->get_bytes_read() == args->child_chunk->get_size());
196 unsigned long long offset_within_the_one_chunk = args->child_chunk->get_offset() - args->the_one_chunk->get_offset();
198 memcpy(args->the_one_chunk->get_rbuf() + offset_within_the_one_chunk, args->child_chunk->get_rbuf(),
199 args->child_chunk->get_bytes_read());
211bool one_super_chunk_transfer_thread(unique_ptr<one_super_chunk_args> args)
214#if DMRPP_ENABLE_THREAD_TIMERS
215 stringstream timer_tag;
216 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
217 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" sc_id: " << args->super_chunk->id();
219 sw.start(timer_tag.str());
222 args->super_chunk->read();
231bool one_super_chunk_unconstrained_transfer_thread(unique_ptr<one_super_chunk_args> args)
234#if DMRPP_ENABLE_THREAD_TIMERS
235 stringstream timer_tag;
236 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
237 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" sc_id: " << args->super_chunk->id();
239 sw.start(timer_tag.str());
242 args->super_chunk->read_unconstrained();
247bool start_one_child_chunk_thread(list<std::future<bool>> &futures, unique_ptr<one_child_chunk_args_new> args) {
249 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
250 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
251 transfer_thread_counter++;
252 futures.push_back( std::async(std::launch::async, one_child_chunk_thread_new, std::move(args)));
254 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '" << futures.size() <<
255 "' from std::async for " << args->child_chunk->to_string() << endl);
268bool start_super_chunk_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
270 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
271 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
272 transfer_thread_counter++;
273 futures.push_back(std::async(std::launch::async, one_super_chunk_transfer_thread, std::move(args)));
275 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '" << futures.size() <<
276 "' from std::async for " << args->super_chunk->to_string(
false) << endl);
288bool start_super_chunk_unconstrained_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
290 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
291 if(transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
292 transfer_thread_counter++;
293 futures.push_back(std::async(std::launch::async, one_super_chunk_unconstrained_transfer_thread, std::move(args)));
295 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '" << futures.size() <<
296 "' from std::async, transfer_thread_counter: " << transfer_thread_counter << endl);
322void read_super_chunks_unconstrained_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
332 list<future<bool>> futures;
335 bool future_finished =
true;
339 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
343 BESDEBUG(dmrpp_3, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
345 if (!super_chunks.empty()){
347 bool thread_started =
true;
348 while(thread_started && !super_chunks.empty()) {
349 auto super_chunk = super_chunks.front();
350 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << super_chunk->to_string(
false) << endl);
352 auto args = unique_ptr<one_super_chunk_args>(
new one_super_chunk_args(super_chunk, array));
353 thread_started = start_super_chunk_unconstrained_transfer_thread(futures, std::move(args));
355 if (thread_started) {
357 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for " << super_chunk->to_string(
false) << endl);
360 BESDEBUG(dmrpp_3, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
361 " transfer_thread_counter: " << transfer_thread_counter <<
362 " futures.size(): " << futures.size() << endl);
371 future_finished =
false;
376 while(!futures.empty()){
377 if(futures.back().valid())
378 futures.back().get();
409void read_super_chunks_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
419 list<future<bool>> futures;
422 bool future_finished =
true;
426 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
430 BESDEBUG(dmrpp_3, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
432 if (!super_chunks.empty()){
434 bool thread_started =
true;
435 while(thread_started && !super_chunks.empty()) {
436 auto super_chunk = super_chunks.front();
437 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << super_chunk->to_string(
false) << endl);
439 auto args = unique_ptr<one_super_chunk_args>(
new one_super_chunk_args(super_chunk, array));
440 thread_started = start_super_chunk_transfer_thread(futures, std::move(args));
442 if (thread_started) {
444 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for " << super_chunk->to_string(
false) << endl);
447 BESDEBUG(dmrpp_3, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
448 " transfer_thread_counter: " << transfer_thread_counter <<
449 " futures.size(): " << futures.size() << endl);
458 future_finished =
false;
463 while(!futures.empty()){
464 if(futures.back().valid())
465 futures.back().get();
491static unsigned long long
492get_index(
const vector<unsigned long long> &address_in_target,
const vector<unsigned long long> &target_shape)
494 assert(address_in_target.size() == target_shape.size());
496 auto shape_index = target_shape.rbegin();
497 auto index = address_in_target.rbegin(), index_end = address_in_target.rend();
499 unsigned long long multiplier_var = *shape_index++;
500 unsigned long long offset = *index++;
502 while (index != index_end) {
503 assert(*index < *shape_index);
505 offset += multiplier_var * *index++;
506 multiplier_var *= *shape_index++;
528static unsigned long multiplier(
const vector<unsigned long long> &shape,
unsigned int k)
530 assert(shape.size() > 1);
531 assert(shape.size() > k + 1);
533 vector<unsigned long long>::const_iterator i = shape.begin(), e = shape.end();
535 unsigned long multiplier = *i++;
550DmrppArray::operator=(
const DmrppArray &rhs)
552 if (
this == &rhs)
return *
this;
554 dynamic_cast<Array &
>(*this) = rhs;
556 dynamic_cast<DmrppCommon &
>(*this) = rhs;
566bool DmrppArray::is_projected()
568 for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
569 if (dimension_size(p,
true) != dimension_size(p,
false))
return true;
583 unsigned long long size = 1;
584 for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
585 size *= dimension_size(dim, constrained);
598 auto dim = dim_begin(), edim = dim_end();
599 vector<unsigned long long> shape;
603 shape.reserve(edim - dim);
605 for (; dim != edim; dim++) {
606 shape.push_back(dimension_size(dim, constrained));
617DmrppArray::dimension DmrppArray::get_dimension(
unsigned int i)
619 assert(i <= (dim_end() - dim_begin()));
620 return *(dim_begin() + i);
636void DmrppArray::insert_constrained_contiguous(Dim_iter dim_iter,
unsigned long *target_index,
637 vector<unsigned long long> &subset_addr,
638 const vector<unsigned long long> &array_shape,
char *src_buf)
640 BESDEBUG(
"dmrpp",
"DmrppArray::" << __func__ <<
"() - subsetAddress.size(): " << subset_addr.size() << endl);
642 unsigned int bytes_per_elem = prototype()->width();
644 char *dest_buf = get_buf();
646 unsigned int start = this->dimension_start(dim_iter,
true);
647 unsigned int stop = this->dimension_stop(dim_iter,
true);
648 unsigned int stride = this->dimension_stride(dim_iter,
true);
654 if (dim_iter == dim_end() && stride == 1) {
656 subset_addr.push_back(start);
657 unsigned long long start_index = get_index(subset_addr, array_shape);
658 subset_addr.pop_back();
660 subset_addr.push_back(stop);
661 unsigned long long stop_index = get_index(subset_addr, array_shape);
662 subset_addr.pop_back();
666 for (
unsigned long source_index = start_index; source_index <= stop_index; source_index++) {
667 unsigned long target_byte = *target_index * bytes_per_elem;
668 unsigned long source_byte = source_index * bytes_per_elem;
670 for (
unsigned long i = 0; i < bytes_per_elem; i++) {
671 dest_buf[target_byte++] = src_buf[source_byte++];
678 for (
unsigned int myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
681 if (dim_iter != dim_end()) {
683 subset_addr.push_back(myDimIndex);
684 insert_constrained_contiguous(dim_iter, target_index, subset_addr, array_shape, src_buf);
685 subset_addr.pop_back();
689 subset_addr.push_back(myDimIndex);
690 unsigned int sourceIndex = get_index(subset_addr, array_shape);
691 subset_addr.pop_back();
694 unsigned long target_byte = *target_index * bytes_per_elem;
695 unsigned long source_byte = sourceIndex * bytes_per_elem;
697 for (
unsigned int i = 0; i < bytes_per_elem; i++) {
698 dest_buf[target_byte++] = src_buf[source_byte++];
725void DmrppArray::read_contiguous()
732 throw BESInternalError(
string(
"Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
737 unsigned long long the_one_chunk_offset = the_one_chunk->get_offset();
738 unsigned long long the_one_chunk_size = the_one_chunk->get_size();
743 if (!DmrppRequestHandler::d_use_transfer_threads || the_one_chunk_size <= DmrppRequestHandler::d_contiguous_concurrent_threshold) {
745 the_one_chunk->read_chunk();
750 the_one_chunk->set_rbuf_to_size();
755 unsigned long long num_chunks = floor(the_one_chunk_size / MB);
756 if (num_chunks >= DmrppRequestHandler::d_max_transfer_threads)
757 num_chunks = DmrppRequestHandler::d_max_transfer_threads;
760 unsigned long long chunk_size = the_one_chunk_size / num_chunks;
761 std::string chunk_byteorder = the_one_chunk->get_byte_order();
765 unsigned long long chunk_remainder = the_one_chunk_size % num_chunks;
767 auto chunk_url = the_one_chunk->get_data_url();
770 queue<shared_ptr<Chunk>> chunks_to_read;
773 unsigned long long chunk_offset = the_one_chunk_offset;
774 for (
unsigned int i = 0; i < num_chunks - 1; i++) {
775 chunks_to_read.push(shared_ptr<Chunk>(
new Chunk(chunk_url, chunk_byteorder, chunk_size, chunk_offset)));
776 chunk_offset += chunk_size;
779 chunks_to_read.push(shared_ptr<Chunk>(
new Chunk(chunk_url, chunk_byteorder, chunk_size + chunk_remainder, chunk_offset)));
782 list<future<bool>> futures;
785 bool future_finished =
true;
788 if (!futures.empty())
789 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
793 BESDEBUG(dmrpp_3, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
795 if (!chunks_to_read.empty()) {
797 bool thread_started =
true;
798 while (thread_started && !chunks_to_read.empty()) {
799 auto current_chunk = chunks_to_read.front();
800 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << current_chunk->to_string() << endl);
802 auto args = unique_ptr<one_child_chunk_args_new>(
new one_child_chunk_args_new(current_chunk, the_one_chunk));
803 thread_started = start_one_child_chunk_thread(futures, std::move(args));
805 if (thread_started) {
806 chunks_to_read.pop();
807 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for " << current_chunk->to_string() << endl);
810 BESDEBUG(dmrpp_3, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
811 " transfer_thread_counter: " << transfer_thread_counter <<
812 " futures.size(): " << futures.size() << endl);
820 future_finished =
false;
825 while (!futures.empty()) {
826 if (futures.back().valid())
827 futures.back().get();
836 if (!is_filters_empty()){
841 if (!is_projected()) {
842 reserve_value_capacity(
get_size(
false));
843 val2buf(the_one_chunk->get_rbuf());
846 vector<unsigned long long> array_shape =
get_shape(
false);
849 reserve_value_capacity(
get_size(
true));
850 unsigned long target_index = 0;
851 vector<unsigned long long> subset;
853 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, the_one_chunk->get_rbuf());
878void DmrppArray::insert_chunk_unconstrained(shared_ptr<Chunk> chunk,
unsigned int dim,
unsigned long long array_offset,
879 const vector<unsigned long long> &array_shape,
880 unsigned long long chunk_offset,
const vector<unsigned long long> &chunk_shape,
881 const vector<unsigned long long> &chunk_origin)
886 dimension thisDim = this->get_dimension(dim);
887 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
888 if ((
unsigned) thisDim.stop < end_element) {
889 end_element = thisDim.stop;
892 unsigned long long chunk_end = end_element - chunk_origin[dim];
894 unsigned int last_dim = chunk_shape.size() - 1;
895 if (dim == last_dim) {
896 unsigned int elem_width = prototype()->width();
898 array_offset += chunk_origin[dim];
901 unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * elem_width;
902 char *source_buffer = chunk->get_rbuf();
903 char *target_buffer = get_buf();
904 memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
907 unsigned long mc = multiplier(chunk_shape, dim);
908 unsigned long ma = multiplier(array_shape, dim);
911 for (
unsigned int chunk_index = 0 ; chunk_index <= chunk_end; ++chunk_index) {
912 unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
913 unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
916 insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape,
933void DmrppArray::read_chunks_unconstrained()
936 throw BESInternalError(
string(
"Expected chunks for variable ") + name(), __FILE__, __LINE__);
941 unsigned long long sc_count=0;
943 sc_id << name() <<
"-" << sc_count++;
944 queue<shared_ptr<SuperChunk>> super_chunks;
945 auto current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this)) ;
946 super_chunks.push(current_super_chunk);
950 bool added = current_super_chunk->add_chunk(chunk);
952 sc_id.str(std::string());
953 sc_id << name() <<
"-" << sc_count++;
954 current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this));
955 super_chunks.push(current_super_chunk);
956 if (!current_super_chunk->add_chunk(chunk)) {
958 msg << prolog <<
"Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
966 const vector<unsigned long long> array_shape =
get_shape(
true);
970 BESDEBUG(dmrpp_3, prolog <<
"d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ?
"true" :
"false") << endl);
971 BESDEBUG(dmrpp_3, prolog <<
"d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
973 if (!DmrppRequestHandler::d_use_transfer_threads) {
974#if DMRPP_ENABLE_THREAD_TIMERS
976 sw.
start(prolog +
"Serial SuperChunk Processing.");
978 while(!super_chunks.empty()) {
979 auto super_chunk = super_chunks.front();
981 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(
true) << endl );
982 super_chunk->read_unconstrained();
986#if DMRPP_ENABLE_THREAD_TIMERS
987 stringstream timer_name;
988 timer_name << prolog <<
"Concurrent SuperChunk Processing. d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads;
990 sw.
start(timer_name.str());
992 read_super_chunks_unconstrained_concurrent(super_chunks,
this);
1013unsigned long long DmrppArray::get_chunk_start(
const dimension &thisDim,
unsigned int chunk_origin)
1016 unsigned long long first_element_offset = 0;
1017 if ((
unsigned) (thisDim.start) < chunk_origin) {
1019 if (thisDim.stride != 1) {
1021 first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
1023 if (first_element_offset != 0) {
1025 first_element_offset = thisDim.stride - first_element_offset;
1030 first_element_offset = thisDim.start - chunk_origin;
1033 return first_element_offset;
1058DmrppArray::find_needed_chunks(
unsigned int dim, vector<unsigned long long> *target_element_address, shared_ptr<Chunk> chunk)
1060 BESDEBUG(dmrpp_3, prolog <<
" BEGIN, dim: " << dim << endl);
1066 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1068 dimension thisDim = this->get_dimension(dim);
1071 if ((
unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) ||
1072 (
unsigned) thisDim.stop < chunk_origin[dim]) {
1077 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1080 if (chunk_start > chunk_shape[dim]) {
1085 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1086 if ((
unsigned) thisDim.stop < end_element) {
1087 end_element = thisDim.stop;
1090 unsigned long long chunk_end = end_element - chunk_origin[dim];
1092 unsigned int last_dim = chunk_shape.size() - 1;
1093 if (dim == last_dim) {
1094 BESDEBUG(dmrpp_3, prolog <<
" END, This is the last_dim. chunk: " << chunk->to_string() << endl);
1099 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1100 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1103 auto needed = find_needed_chunks(dim + 1, target_element_address, chunk);
1105 BESDEBUG(dmrpp_3, prolog <<
" END, Found chunk: " << needed->to_string() << endl);
1111 BESDEBUG(dmrpp_3, prolog <<
" END, dim: " << dim << endl);
1135void DmrppArray::insert_chunk(
1137 vector<unsigned long long> *target_element_address,
1138 vector<unsigned long long> *chunk_element_address,
1139 shared_ptr<Chunk> chunk,
1140 const vector<unsigned long long> &constrained_array_shape){
1146 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1148 dimension thisDim = this->get_dimension(dim);
1151 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1154 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1155 if ((
unsigned) thisDim.stop < end_element) {
1156 end_element = thisDim.stop;
1159 unsigned long long chunk_end = end_element - chunk_origin[dim];
1161 unsigned int last_dim = chunk_shape.size() - 1;
1162 if (dim == last_dim) {
1163 char *source_buffer = chunk->get_rbuf();
1164 char *target_buffer = get_buf();
1165 unsigned int elem_width = prototype()->width();
1167 if (thisDim.stride == 1) {
1169 unsigned long long start_element = chunk_origin[dim] + chunk_start;
1171 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1174 (*target_element_address)[dim] = (start_element - thisDim.start);
1176 (*chunk_element_address)[dim] = chunk_start;
1179 unsigned long long target_char_start_index =
1180 get_index(*target_element_address, constrained_array_shape) * elem_width;
1181 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1183 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index,
1184 chunk_constrained_inner_dim_bytes);
1188 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1190 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1193 (*chunk_element_address)[dim] = chunk_index;
1196 unsigned int target_char_start_index =
1197 get_index(*target_element_address, constrained_array_shape) * elem_width;
1198 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1200 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1206 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1207 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1208 (*chunk_element_address)[dim] = chunk_index;
1211 insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape);
1222void DmrppArray::read_chunks()
1225 throw BESInternalError(
string(
"Expected chunks for variable ") + name(), __FILE__, __LINE__);
1229 unsigned long long sc_count=0;
1231 sc_id << name() <<
"-" << sc_count++;
1232 queue<shared_ptr<SuperChunk>> super_chunks;
1233 auto current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this)) ;
1234 super_chunks.push(current_super_chunk);
1241 bool found_needed_chunks =
false;
1243 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
1244 auto needed = find_needed_chunks(0 , &target_element_address, chunk);
1246 found_needed_chunks =
true;
1247 bool added = current_super_chunk->add_chunk(chunk);
1249 sc_id.str(std::string());
1250 sc_id << name() <<
"-" << sc_count++;
1251 current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this));
1252 super_chunks.push(current_super_chunk);
1253 if(!current_super_chunk->add_chunk(chunk)){
1255 msg << prolog <<
"Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
1261 BESDEBUG(dmrpp_3, prolog <<
"found_needed_chunks: " << (found_needed_chunks?
"true":
"false") << endl);
1262 if(!found_needed_chunks){
1263 throw BESInternalError(
"ERROR - Failed to locate any chunks that correspond to the requested data.", __FILE__, __LINE__);
1266 reserve_value_capacity(
get_size(
true));
1268 BESDEBUG(dmrpp_3, prolog <<
"d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ?
"true" :
"false") << endl);
1269 BESDEBUG(dmrpp_3, prolog <<
"d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
1270 BESDEBUG(dmrpp_3, prolog <<
"d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ?
"true" :
"false") << endl);
1271 BESDEBUG(dmrpp_3, prolog <<
"d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
1272 BESDEBUG(dmrpp_3, prolog <<
"SuperChunks.size(): " << super_chunks.size() << endl);
1274 if (!DmrppRequestHandler::d_use_transfer_threads) {
1277#if DMRPP_ENABLE_THREAD_TIMERS
1279 sw.
start(prolog +
"Serial SuperChunk Processing.");
1281 while (!super_chunks.empty()) {
1282 auto super_chunk = super_chunks.front();
1284 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(
true) << endl );
1285 super_chunk->read();
1289#if DMRPP_ENABLE_THREAD_TIMERS
1290 stringstream timer_name;
1291 timer_name << prolog <<
"Concurrent SuperChunk Processing. d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads;
1293 sw.
start(timer_name.str());
1295 read_super_chunks_concurrent(super_chunks,
this);
1301#ifdef USE_READ_SERIAL
1323void DmrppArray::insert_chunk_serial(
unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
1326 BESDEBUG(
"dmrpp", __func__ <<
" dim: "<< dim <<
" BEGIN "<< endl);
1332 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
1334 dimension thisDim = this->get_dimension(dim);
1337 if ((
unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (
unsigned) thisDim.stop < chunk_origin[dim]) {
1342 unsigned int first_element_offset = get_chunk_start(dim, chunk_origin);
1345 if (first_element_offset > chunk_shape[dim]) {
1350 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1351 if ((
unsigned) thisDim.stop < end_element) {
1352 end_element = thisDim.stop;
1355 unsigned long long chunk_start = first_element_offset;
1356 unsigned long long chunk_end = end_element - chunk_origin[dim];
1357 vector<unsigned int> constrained_array_shape =
get_shape(
true);
1359 unsigned int last_dim = chunk_shape.size() - 1;
1360 if (dim == last_dim) {
1362 chunk->read_chunk();
1366 char *source_buffer = chunk->get_rbuf();
1367 char *target_buffer = get_buf();
1368 unsigned int elem_width = prototype()->width();
1370 if (thisDim.stride == 1) {
1372 unsigned long long start_element = chunk_origin[dim] + first_element_offset;
1374 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1377 (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
1379 (*chunk_element_address)[dim] = first_element_offset;
1381 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1382 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1384 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
1388 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1390 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1393 (*chunk_element_address)[dim] = chunk_index;
1395 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1396 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1398 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1404 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1405 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1406 (*chunk_element_address)[dim] = chunk_index;
1409 insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
1414void DmrppArray::read_chunks_serial()
1416 BESDEBUG(
"dmrpp", __func__ <<
" for variable '" << name() <<
"' - BEGIN" << endl);
1418 vector<Chunk> &chunk_refs = get_chunk_vec();
1419 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
1422 reserve_value_capacity(
get_size(
true));
1430 for (
unsigned long i = 0; i < chunk_refs.size(); i++) {
1431 Chunk &chunk = chunk_refs[i];
1433 vector<unsigned int> chunk_source_address(dimensions(), 0);
1434 vector<unsigned int> target_element_address = chunk.get_position_in_array();
1437 insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
1442 BESDEBUG(
"dmrpp",
"DmrppArray::"<< __func__ <<
"() for " << name() <<
" END"<< endl);
1447DmrppArray::set_send_p(
bool state)
1452 Array::set_send_p(state);
1462void DmrppArray::read_contiguous_string()
1471 the_one_chunk->read_chunk();
1474 if (!is_filters_empty()){
1480 vector < string > ss;
1481 string s(
reinterpret_cast<char *
>(the_one_chunk->get_rbuf()));
1483 set_value(ss, ss.size());
1509 if (read_p())
return true;
1518 if ((var()->type() == dods_str_c || var()->type() == dods_url_c)) {
1522 throw BESInternalError(
"Subsetting of Sting Arrays is not currently supported.", __FILE__, __LINE__);
1525 throw BESInternalError(
"Only one dimensional String Arrays are currently supported.", __FILE__, __LINE__);
1528 read_contiguous_string();
1533 throw BESInternalError(
"Chunked String Array data is not currently supported.", __FILE__, __LINE__);
1546 if (!is_projected()) {
1547 read_chunks_unconstrained();
1555 int num = this->length();
1556 Type var_type = this->var()->type();
1560 case dods_uint16_c: {
1561 dods_uint16 *local =
reinterpret_cast<dods_uint16*
>(this->get_buf());
1563 *local = bswap_16(*local);
1569 case dods_uint32_c: {
1570 dods_uint32 *local =
reinterpret_cast<dods_uint32*
>(this->get_buf());;
1572 *local = bswap_32(*local);
1578 case dods_uint64_c: {
1579 dods_uint64 *local =
reinterpret_cast<dods_uint64*
>(this->get_buf());;
1581 *local = bswap_64(*local);
1597class PrintD4ArrayDimXMLWriter :
public unary_function<Array::dimension &, void> {
1604 PrintD4ArrayDimXMLWriter(XMLWriter &xml,
bool c) :
1605 xml(xml), d_constrained(c)
1609 void operator()(Array::dimension &d)
1615 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar *)
"Dim") < 0)
1616 throw InternalErr(__FILE__, __LINE__,
"Could not write Dim element");
1618 string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
1621 if (!d_constrained && !name.empty()) {
1622 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name",
1623 (
const xmlChar *) name.c_str()) < 0)
1624 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1626 else if (d.use_sdim_for_slice) {
1627 assert(!name.empty());
1628 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name",
1629 (
const xmlChar *) name.c_str()) < 0)
1630 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1634 size << (d_constrained ? d.c_size : d.size);
1635 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"size",
1636 (
const xmlChar *) size.str().c_str()) < 0)
1637 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1640 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1641 throw InternalErr(__FILE__, __LINE__,
"Could not end Dim element");
1645class PrintD4ConstructorVarXMLWriter :
public unary_function<BaseType *, void> {
1649 PrintD4ConstructorVarXMLWriter(XMLWriter &xml,
bool c) :
1650 xml(xml), d_constrained(c)
1654 void operator()(BaseType *btp)
1656 btp->print_dap4(xml, d_constrained);
1660class PrintD4MapXMLWriter :
public unary_function<D4Map *, void> {
1664 PrintD4MapXMLWriter(XMLWriter &xml) :
1669 void operator()(D4Map *m)
1701 if (constrained && !send_p())
return;
1703 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar *) var()->type_name().c_str()) < 0)
1704 throw InternalErr(__FILE__, __LINE__,
"Could not write " + type_name() +
" element");
1706 if (!name().empty())
1707 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name", (
const xmlChar *) name().c_str()) <
1709 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1712 if (var()->type() == dods_enum_c) {
1713 D4Enum *e =
static_cast<D4Enum *
>(var());
1714 string path = e->enumeration()->name();
1715 if (e->enumeration()->parent()) {
1717 path =
static_cast<D4Group *
>(e->enumeration()->parent()->parent())->FQN() + path;
1719 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"enum", (
const xmlChar *) path.c_str()) < 0)
1720 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for enum");
1723 if (prototype()->is_constructor_type()) {
1724 Constructor &c =
static_cast<Constructor &
>(*prototype());
1725 for_each(c.var_begin(), c.var_end(), PrintD4ConstructorVarXMLWriter(xml, constrained));
1730 for_each(dim_begin(), dim_end(), PrintD4ArrayDimXMLWriter(xml, constrained));
1732 attributes()->print_dap4(xml);
1734 for_each(maps()->map_begin(), maps()->map_end(), PrintD4MapXMLWriter(xml));
1751 switch (var()->type()) {
1765 case dods_float32_c:
1766 case dods_float64_c: {
1767 u_int8_t *values = 0;
1769 size_t size = buf2val(
reinterpret_cast<void **
>(&values));
1770 string encoded = base64::Base64::encode(values, size);
1786 buf2val(
reinterpret_cast<void **
>(&values));
1788 for (
int i = 0; i < length(); ++i) {
1789 str = (*(
static_cast<string *
> (values) + i));
1790 string encoded = base64::Base64::encode(
reinterpret_cast<const u_int8_t *
>(str.c_str()), str.size());
1803 throw InternalErr(__FILE__, __LINE__,
"Vector::val2buf: bad type");
1806 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1807 throw InternalErr(__FILE__, __LINE__,
"Could not end " + type_name() +
" element");
1810void DmrppArray::dump(ostream &strm)
const
1812 strm << BESIndent::LMarg <<
"DmrppArray::" << __func__ <<
"(" << (
void *)
this <<
")" << endl;
1813 BESIndent::Indent();
1814 DmrppCommon::dump(strm);
1816 strm << BESIndent::LMarg <<
"value: " <<
"----" << endl;
1817 BESIndent::UnIndent();
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
exception thrown if internal error encountered
virtual bool start(std::string name)
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
virtual unsigned long long get_size(bool constrained=false)
Return the total number of elements in this Array.
bool read() override
Read data for the array.
virtual void print_dap4(libdap::XMLWriter &writer, bool constrained=false)
Shadow libdap::Array::print_dap4() - optionally prints DMR++ chunk information.
static std::string d_ns_prefix
The XML namespace prefix to use.
virtual bool twiddle_bytes() const
Returns true if this object utilizes shuffle compression.
static bool d_print_chunks
if true, print_dap4() prints chunk elements
virtual bool is_compact_layout() const
Returns true if this object utilizes COMPACT layout.
virtual void load_attributes(libdap::BaseType *btp)
Load the attribute information for this variable.
void print_compact_element(libdap::XMLWriter &xml, const std::string &name_space="", const std::string &encoded="")
Print the Compact base64-encoded information.
virtual bool get_chunks_loaded() const
Have the chunks been loaded?
virtual size_t get_chunks_size() const
Use this when the number of chunks is needed.
void print_chunks_element(libdap::XMLWriter &xml, const std::string &name_space="")
Print the Chunk information.
virtual const std::vector< std::shared_ptr< Chunk > > & get_immutable_chunks() const
A const reference to the vector of chunks.
virtual bool get_uses_fill_value() const
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
virtual void load_chunks(libdap::BaseType *btp)
Load chunk information for this variable.
virtual unsigned long long get_chunk_size_in_elements() const
Get the number of elements in this chunk.
virtual std::string get_filters() const
Return the names of all the filters in the order they were applied.
virtual bool get_attributes_loaded() const
Have the attributes been loaded?