bes Updated for version 3.20.13
DmrppArray.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of the BES
4
5// Copyright (c) 2016 OPeNDAP, Inc.
6// Author: James Gallagher <jgallagher@opendap.org>
7//
8// This library is free software; you can redistribute it and/or
9// modify it under the terms of the GNU Lesser General Public
10// License as published by the Free Software Foundation; either
11// version 2.1 of the License, or (at your option) any later version.
12//
13// This library is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16// Lesser General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public
19// License along with this library; if not, write to the Free Software
20// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21//
22// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23
24#include "config.h"
25
26#include <string>
27#include <sstream>
28#include <vector>
29#include <memory>
30#include <queue>
31#include <iterator>
32#include <thread>
33#include <future> // std::async, std::future
34#include <chrono> // std::chrono::milliseconds
35
36#include <cstring>
37#include <cassert>
38#include <cerrno>
39
40#include <pthread.h>
41#include <cmath>
42
43#include <unistd.h>
44
45#include <libdap/D4Enum.h>
46#include <libdap/D4Attributes.h>
47#include <libdap/D4Maps.h>
48#include <libdap/D4Group.h>
49
50#include "BESInternalError.h"
51#include "BESDebug.h"
52#include "BESLog.h"
53#include "BESStopWatch.h"
54
55#include "byteswap_compat.h"
56#include "CurlHandlePool.h"
57#include "Chunk.h"
58#include "DmrppArray.h"
59#include "DmrppRequestHandler.h"
60#include "DmrppNames.h"
61#include "Base64.h"
62
63// Used with BESDEBUG
64#define dmrpp_3 "dmrpp:3"
65#define dmrpp_4 "dmrpp:4"
66
67using namespace libdap;
68using namespace std;
69
70#define MB (1024*1024)
71#define prolog std::string("DmrppArray::").append(__func__).append("() - ")
72
73namespace dmrpp {
74
75
76// Transfer Thread Pool state variables.
77std::mutex transfer_thread_pool_mtx; // mutex for critical section
78atomic_uint transfer_thread_counter(0);
79
80
81
96bool get_next_future(list<std::future<bool>> &futures, atomic_uint &thread_counter, unsigned long timeout, string debug_prefix) {
97 bool future_finished = false;
98 bool done = false;
99 std::chrono::milliseconds timeout_ms (timeout);
100
101 while(!done){
102 auto futr = futures.begin();
103 auto fend = futures.end();
104 bool future_is_valid = true;
105 while(!future_finished && future_is_valid && futr != fend){
106 future_is_valid = (*futr).valid();
107 if(future_is_valid){
108 // What happens if wait_for() always returns future_status::timeout for a stuck thread?
109 // If that were to happen, the loop would run forever. However, we assume that these
110 // threads are never 'stuck.' We assume that their computations always complete, either
111 // with success or failure. For the transfer threads, timeouts will stop them if nothing
112 // else does and for the decompression threads, the worst case is a segmentation fault.
113 // jhrg 2/5/21
114 if((*futr).wait_for(timeout_ms) != std::future_status::timeout){
115 try {
116 bool success = (*futr).get();
117 future_finished = true;
118 BESDEBUG(dmrpp_3, debug_prefix << prolog << "Called future::get() on a ready future."
119 << " success: " << (success?"true":"false") << endl);
120 if(!success){
121 stringstream msg;
122 msg << debug_prefix << prolog << "The std::future has failed!";
123 msg << " thread_counter: " << thread_counter;
124 throw BESInternalError(msg.str(), __FILE__, __LINE__);
125 }
126 }
127 catch(...){
128 // TODO I had to add this to make the thread counting work when there's errors
129 // But I think it's primitive because it trashes everything - there's
130 // surely a way to handle the situation on a per thread basis and maybe even
131 // retry?
132 futures.clear();
133 thread_counter=0;
134 throw;
135 }
136 }
137 else {
138 futr++;
139 BESDEBUG(dmrpp_3, debug_prefix << prolog << "future::wait_for() timed out. (timeout: " <<
140 timeout << " ms) There are currently " << futures.size() << " futures in process."
141 << " thread_counter: " << thread_counter << endl);
142 }
143 }
144 else {
145 BESDEBUG(dmrpp_3, debug_prefix << prolog << "The future was not valid. Dumping... " << endl);
146 future_finished = true;
147 }
148 }
149
150 if (futr!=fend && future_finished) {
151 futures.erase(futr);
152 thread_counter--;
153 BESDEBUG(dmrpp_3, debug_prefix << prolog << "Erased future from futures list. (Erased future was "
154 << (future_is_valid?"":"not ") << "valid at start.) There are currently " <<
155 futures.size() << " futures in process. thread_counter: " << thread_counter << endl);
156 }
157
158 done = future_finished || futures.empty();
159 }
160
161 return future_finished;
162}
163
164
165
176bool one_child_chunk_thread_new(unique_ptr<one_child_chunk_args_new> args)
177{
178
179 args->child_chunk->read_chunk();
180
181 assert(args->the_one_chunk->get_rbuf());
182 assert(args->child_chunk->get_rbuf());
183 assert(args->child_chunk->get_bytes_read() == args->child_chunk->get_size());
184
185 // the_one_chunk offset \/
186 // the_one_chunk: mmmmmmmmmmmmmmmm
187 // child chunks: 1111222233334444 (there are four child chunks)
188 // child offsets: ^ ^ ^ ^
189 // For this example, child_1_offset - the_one_chunk_offset == 0 (that's always true)
190 // child_2_offset - the_one_chunk_offset == 4; child_2_offset - the_one_chunk_offset == 8
191 // and child_3_offset - the_one_chunk_offset == 12.
192 // Those are the starting locations with in the data buffer of the the_one_chunk
193 // where that child chunk should be written.
194 // Note: all the offset values start at the beginning of the file.
195
196 unsigned long long offset_within_the_one_chunk = args->child_chunk->get_offset() - args->the_one_chunk->get_offset();
197
198 memcpy(args->the_one_chunk->get_rbuf() + offset_within_the_one_chunk, args->child_chunk->get_rbuf(),
199 args->child_chunk->get_bytes_read());
200
201 return true;
202}
203
204
205
211bool one_super_chunk_transfer_thread(unique_ptr<one_super_chunk_args> args)
212{
213
214#if DMRPP_ENABLE_THREAD_TIMERS
215 stringstream timer_tag;
216 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
217 " parent_tid: 0x" << std::hex << args->parent_thread_id << " sc_id: " << args->super_chunk->id();
218 BESStopWatch sw(TRANSFER_THREADS);
219 sw.start(timer_tag.str());
220#endif
221
222 args->super_chunk->read();
223 return true;
224}
225
231bool one_super_chunk_unconstrained_transfer_thread(unique_ptr<one_super_chunk_args> args)
232{
233
234#if DMRPP_ENABLE_THREAD_TIMERS
235 stringstream timer_tag;
236 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
237 " parent_tid: 0x" << std::hex << args->parent_thread_id << " sc_id: " << args->super_chunk->id();
238 BESStopWatch sw(TRANSFER_THREADS);
239 sw.start(timer_tag.str());
240#endif
241
242 args->super_chunk->read_unconstrained();
243 return true;
244}
245
246
247bool start_one_child_chunk_thread(list<std::future<bool>> &futures, unique_ptr<one_child_chunk_args_new> args) {
248 bool retval = false;
249 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
250 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
251 transfer_thread_counter++;
252 futures.push_back( std::async(std::launch::async, one_child_chunk_thread_new, std::move(args)));
253 retval = true;
254 BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<
255 "' from std::async for " << args->child_chunk->to_string() << endl);
256 }
257 return retval;
258}
259
260
268bool start_super_chunk_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
269 bool retval = false;
270 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
271 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
272 transfer_thread_counter++;
273 futures.push_back(std::async(std::launch::async, one_super_chunk_transfer_thread, std::move(args)));
274 retval = true;
275 BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<
276 "' from std::async for " << args->super_chunk->to_string(false) << endl);
277 }
278 return retval;
279}
280
288bool start_super_chunk_unconstrained_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
289 bool retval = false;
290 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
291 if(transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
292 transfer_thread_counter++;
293 futures.push_back(std::async(std::launch::async, one_super_chunk_unconstrained_transfer_thread, std::move(args)));
294 retval = true;
295 BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<
296 "' from std::async, transfer_thread_counter: " << transfer_thread_counter << endl);
297 }
298 return retval;
299}
300
301
322void read_super_chunks_unconstrained_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
323{
324 BESStopWatch sw;
325 if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + " name: "+array->name(), "");
326
327 // Parallel version based on read_chunks_unconstrained(). There is
328 // substantial duplication of the code in read_chunks_unconstrained(), but
329 // wait to remove that when we move to C++11 which has threads integrated.
330
331 // We maintain a list of futures to track our parallel activities.
332 list<future<bool>> futures;
333 try {
334 bool done = false;
335 bool future_finished = true;
336 while (!done) {
337
338 if(!futures.empty())
339 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
340
341 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
342 // because future::get() was called or a call to future::valid() returned false.
343 BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
344
345 if (!super_chunks.empty()){
346 // Next we try to add a new Chunk compute thread if we can - there might be room.
347 bool thread_started = true;
348 while(thread_started && !super_chunks.empty()) {
349 auto super_chunk = super_chunks.front();
350 BESDEBUG(dmrpp_3, prolog << "Starting thread for " << super_chunk->to_string(false) << endl);
351
352 auto args = unique_ptr<one_super_chunk_args>(new one_super_chunk_args(super_chunk, array));
353 thread_started = start_super_chunk_unconstrained_transfer_thread(futures, std::move(args));
354
355 if (thread_started) {
356 super_chunks.pop();
357 BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << super_chunk->to_string(false) << endl);
358 } else {
359 // Thread did not start, ownership of the arguments was not passed to the thread.
360 BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
361 " transfer_thread_counter: " << transfer_thread_counter <<
362 " futures.size(): " << futures.size() << endl);
363 }
364 }
365 }
366 else {
367 // No more Chunks and no futures means we're done here.
368 if(futures.empty())
369 done = true;
370 }
371 future_finished = false;
372 }
373 }
374 catch (...) {
375 // Complete all the futures, otherwise we'll have threads out there using up resources
376 while(!futures.empty()){
377 if(futures.back().valid())
378 futures.back().get();
379 futures.pop_back();
380 }
381 // re-throw the exception
382 throw;
383 }
384}
385
386
387
388
409void read_super_chunks_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
410{
411 BESStopWatch sw;
412 if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + " name: "+array->name(), "");
413
414 // Parallel version based on read_chunks_unconstrained(). There is
415 // substantial duplication of the code in read_chunks_unconstrained(), but
416 // wait to remove that when we move to C++11 which has threads integrated.
417
418 // We maintain a list of futures to track our parallel activities.
419 list<future<bool>> futures;
420 try {
421 bool done = false;
422 bool future_finished = true;
423 while (!done) {
424
425 if(!futures.empty())
426 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
427
428 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
429 // because future::get() was called or a call to future::valid() returned false.
430 BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
431
432 if (!super_chunks.empty()){
433 // Next we try to add a new Chunk compute thread if we can - there might be room.
434 bool thread_started = true;
435 while(thread_started && !super_chunks.empty()) {
436 auto super_chunk = super_chunks.front();
437 BESDEBUG(dmrpp_3, prolog << "Starting thread for " << super_chunk->to_string(false) << endl);
438
439 auto args = unique_ptr<one_super_chunk_args>(new one_super_chunk_args(super_chunk, array));
440 thread_started = start_super_chunk_transfer_thread(futures, std::move(args));
441
442 if (thread_started) {
443 super_chunks.pop();
444 BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << super_chunk->to_string(false) << endl);
445 } else {
446 // Thread did not start, ownership of the arguments was not passed to the thread.
447 BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
448 " transfer_thread_counter: " << transfer_thread_counter <<
449 " futures.size(): " << futures.size() << endl);
450 }
451 }
452 }
453 else {
454 // No more Chunks and no futures means we're done here.
455 if(futures.empty())
456 done = true;
457 }
458 future_finished = false;
459 }
460 }
461 catch (...) {
462 // Complete all the futures, otherwise we'll have threads out there using up resources
463 while(!futures.empty()){
464 if(futures.back().valid())
465 futures.back().get();
466 futures.pop_back();
467 }
468 // re-throw the exception
469 throw;
470 }
471}
472
491static unsigned long long
492get_index(const vector<unsigned long long> &address_in_target, const vector<unsigned long long> &target_shape)
493{
494 assert(address_in_target.size() == target_shape.size()); // ranks must be equal
495
496 auto shape_index = target_shape.rbegin();
497 auto index = address_in_target.rbegin(), index_end = address_in_target.rend();
498
499 unsigned long long multiplier_var = *shape_index++;
500 unsigned long long offset = *index++;
501
502 while (index != index_end) {
503 assert(*index < *shape_index); // index < shape for each dim
504
505 offset += multiplier_var * *index++;
506 multiplier_var *= *shape_index++;
507 }
508
509 return offset;
510}
511
514
528static unsigned long multiplier(const vector<unsigned long long> &shape, unsigned int k)
529{
530 assert(shape.size() > 1);
531 assert(shape.size() > k + 1);
532
533 vector<unsigned long long>::const_iterator i = shape.begin(), e = shape.end();
534 advance(i, k + 1);
535 unsigned long multiplier = *i++;
536 while (i != e) {
537 multiplier *= *i++;
538 }
539
540 return multiplier;
541}
542
543//#####################################################################################################################
544//
545// DmrppArray code begins here.
546//
547// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
548
549DmrppArray &
550DmrppArray::operator=(const DmrppArray &rhs)
551{
552 if (this == &rhs) return *this;
553
554 dynamic_cast<Array &>(*this) = rhs; // run Constructor=
555
556 dynamic_cast<DmrppCommon &>(*this) = rhs;
557 // Removed DmrppCommon::m_duplicate_common(rhs); jhrg 11/12/21
558
559 return *this;
560}
561
566bool DmrppArray::is_projected()
567{
568 for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
569 if (dimension_size(p, true) != dimension_size(p, false)) return true;
570
571 return false;
572}
573
580unsigned long long DmrppArray::get_size(bool constrained)
581{
582 // number of array elements in the constrained array
583 unsigned long long size = 1;
584 for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
585 size *= dimension_size(dim, constrained);
586 }
587 return size;
588}
589
596vector<unsigned long long> DmrppArray::get_shape(bool constrained)
597{
598 auto dim = dim_begin(), edim = dim_end();
599 vector<unsigned long long> shape;
600
601 // For a 3d array, this method took 14ms without reserve(), 5ms with
602 // (when called many times).
603 shape.reserve(edim - dim);
604
605 for (; dim != edim; dim++) {
606 shape.push_back(dimension_size(dim, constrained));
607 }
608
609 return shape;
610}
611
617DmrppArray::dimension DmrppArray::get_dimension(unsigned int i)
618{
619 assert(i <= (dim_end() - dim_begin()));
620 return *(dim_begin() + i);
621}
622
625
636void DmrppArray::insert_constrained_contiguous(Dim_iter dim_iter, unsigned long *target_index,
637 vector<unsigned long long> &subset_addr,
638 const vector<unsigned long long> &array_shape, char /*Chunk*/*src_buf)
639{
640 BESDEBUG("dmrpp", "DmrppArray::" << __func__ << "() - subsetAddress.size(): " << subset_addr.size() << endl);
641
642 unsigned int bytes_per_elem = prototype()->width();
643
644 char *dest_buf = get_buf();
645
646 unsigned int start = this->dimension_start(dim_iter, true);
647 unsigned int stop = this->dimension_stop(dim_iter, true);
648 unsigned int stride = this->dimension_stride(dim_iter, true);
649
650 dim_iter++;
651
652 // The end case for the recursion is dimIter == dim_end(); stride == 1 is an optimization
653 // See the else clause for the general case.
654 if (dim_iter == dim_end() && stride == 1) {
655 // For the start and stop indexes of the subset, get the matching indexes in the whole array.
656 subset_addr.push_back(start);
657 unsigned long long start_index = get_index(subset_addr, array_shape);
658 subset_addr.pop_back();
659
660 subset_addr.push_back(stop);
661 unsigned long long stop_index = get_index(subset_addr, array_shape);
662 subset_addr.pop_back();
663
664 // Copy data block from start_index to stop_index
665 // TODO Replace this loop with a call to std::memcpy()
666 for (unsigned long source_index = start_index; source_index <= stop_index; source_index++) {
667 unsigned long target_byte = *target_index * bytes_per_elem;
668 unsigned long source_byte = source_index * bytes_per_elem;
669 // Copy a single value.
670 for (unsigned long i = 0; i < bytes_per_elem; i++) {
671 dest_buf[target_byte++] = src_buf[source_byte++];
672 }
673 (*target_index)++;
674 }
675
676 }
677 else {
678 for (unsigned int myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
679
680 // Is it the last dimension?
681 if (dim_iter != dim_end()) {
682 // Nope! Then we recurse to the last dimension to read stuff
683 subset_addr.push_back(myDimIndex);
684 insert_constrained_contiguous(dim_iter, target_index, subset_addr, array_shape, src_buf);
685 subset_addr.pop_back();
686 }
687 else {
688 // We are at the last (innermost) dimension, so it's time to copy values.
689 subset_addr.push_back(myDimIndex);
690 unsigned int sourceIndex = get_index(subset_addr, array_shape);
691 subset_addr.pop_back();
692
693 // Copy a single value.
694 unsigned long target_byte = *target_index * bytes_per_elem;
695 unsigned long source_byte = sourceIndex * bytes_per_elem;
696
697 for (unsigned int i = 0; i < bytes_per_elem; i++) {
698 dest_buf[target_byte++] = src_buf[source_byte++];
699 }
700 (*target_index)++;
701 }
702 }
703 }
704}
705
725void DmrppArray::read_contiguous()
726{
727 BESStopWatch sw;
728 if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + " name: "+name(), "");
729
730 // Get the single chunk that makes up this CONTIGUOUS variable.
731 if (get_chunks_size() != 1)
732 throw BESInternalError(string("Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
733
734 // This is the original chunk for this 'contiguous' variable.
735 auto the_one_chunk = get_immutable_chunks()[0];
736
737 unsigned long long the_one_chunk_offset = the_one_chunk->get_offset();
738 unsigned long long the_one_chunk_size = the_one_chunk->get_size();
739
740 // We only want to read in the Chunk concurrently if:
741 // - Concurrent transfers are enabled (DmrppRequestHandler::d_use_transfer_threads)
742 // - The variable's size is above the threshold value held in DmrppRequestHandler::d_contiguous_concurrent_threshold
743 if (!DmrppRequestHandler::d_use_transfer_threads || the_one_chunk_size <= DmrppRequestHandler::d_contiguous_concurrent_threshold) {
744 // Read the the_one_chunk as is. This is the non-parallel I/O case
745 the_one_chunk->read_chunk();
746 }
747 else {
748 // Allocate memory for the 'the_one_chunk' so the transfer threads can transfer data
749 // from the child chunks to it.
750 the_one_chunk->set_rbuf_to_size();
751
752 // The number of child chunks are determined based on the size of the data.
753 // If the size of the the_one_chunk is 3 MB then 3 chunks will be made. We will round down
754 // when necessary and handle the remainder later on (3.3MB = 3 chunks, 4.2MB = 4 chunks, etc.)
755 unsigned long long num_chunks = floor(the_one_chunk_size / MB);
756 if (num_chunks >= DmrppRequestHandler::d_max_transfer_threads)
757 num_chunks = DmrppRequestHandler::d_max_transfer_threads;
758
759 // Use the original chunk's size and offset to evenly split it into smaller chunks
760 unsigned long long chunk_size = the_one_chunk_size / num_chunks;
761 std::string chunk_byteorder = the_one_chunk->get_byte_order();
762
763 // If the size of the the_one_chunk is not evenly divisible by num_chunks, capture
764 // the remainder here and increase the size of the last chunk by this number of bytes.
765 unsigned long long chunk_remainder = the_one_chunk_size % num_chunks;
766
767 auto chunk_url = the_one_chunk->get_data_url();
768
769 // Set up a queue to break up the original the_one_chunk and keep track of the pieces
770 queue<shared_ptr<Chunk>> chunks_to_read;
771
772 // Make the Chunk objects
773 unsigned long long chunk_offset = the_one_chunk_offset;
774 for (unsigned int i = 0; i < num_chunks - 1; i++) {
775 chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_url, chunk_byteorder, chunk_size, chunk_offset)));
776 chunk_offset += chunk_size;
777 }
778 // Make the remainder Chunk, see above for details.
779 chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_url, chunk_byteorder, chunk_size + chunk_remainder, chunk_offset)));
780
781 // We maintain a list of futures to track our parallel activities.
782 list<future<bool>> futures;
783 try {
784 bool done = false;
785 bool future_finished = true;
786 while (!done) {
787
788 if (!futures.empty())
789 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
790
791 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
792 // because future::get() was called or a call to future::valid() returned false.
793 BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
794
795 if (!chunks_to_read.empty()) {
796 // Next we try to add a new Chunk compute thread if we can - there might be room.
797 bool thread_started = true;
798 while (thread_started && !chunks_to_read.empty()) {
799 auto current_chunk = chunks_to_read.front();
800 BESDEBUG(dmrpp_3, prolog << "Starting thread for " << current_chunk->to_string() << endl);
801
802 auto args = unique_ptr<one_child_chunk_args_new>(new one_child_chunk_args_new(current_chunk, the_one_chunk));
803 thread_started = start_one_child_chunk_thread(futures, std::move(args));
804
805 if (thread_started) {
806 chunks_to_read.pop();
807 BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << current_chunk->to_string() << endl);
808 } else {
809 // Thread did not start, ownership of the arguments was not passed to the thread.
810 BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
811 " transfer_thread_counter: " << transfer_thread_counter <<
812 " futures.size(): " << futures.size() << endl);
813 }
814 }
815 } else {
816 // No more Chunks and no futures means we're done here.
817 if (futures.empty())
818 done = true;
819 }
820 future_finished = false;
821 }
822 }
823 catch (...) {
824 // Complete all the futures, otherwise we'll have threads out there using up resources
825 while (!futures.empty()) {
826 if (futures.back().valid())
827 futures.back().get();
828 futures.pop_back();
829 }
830 // re-throw the exception
831 throw;
832 }
833 }
834
835 // Now that the_one_chunk has been read, we do what is necessary...
836 if (!is_filters_empty()){
837 the_one_chunk->filter_chunk(get_filters(), get_chunk_size_in_elements(), var()->width());
838 }
839
840 // The 'the_one_chunk' now holds the data values. Transfer it to the Array.
841 if (!is_projected()) { // if there is no projection constraint
842 reserve_value_capacity(get_size(false));
843 val2buf(the_one_chunk->get_rbuf()); // yes, it's not type-safe
844 }
845 else { // apply the constraint
846 vector<unsigned long long> array_shape = get_shape(false);
847
848 // Reserve space in this array for the constrained size of the data request
849 reserve_value_capacity(get_size(true));
850 unsigned long target_index = 0;
851 vector<unsigned long long> subset;
852
853 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, the_one_chunk->get_rbuf());
854 }
855
856 set_read_p(true);
857}
858
878void DmrppArray::insert_chunk_unconstrained(shared_ptr<Chunk> chunk, unsigned int dim, unsigned long long array_offset,
879 const vector<unsigned long long> &array_shape,
880 unsigned long long chunk_offset, const vector<unsigned long long> &chunk_shape,
881 const vector<unsigned long long> &chunk_origin)
882{
883 // Now we figure out the correct last element. It's possible that a
884 // chunk 'extends beyond' the Array bounds. Here 'end_element' is the
885 // last element of the destination array
886 dimension thisDim = this->get_dimension(dim);
887 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
888 if ((unsigned) thisDim.stop < end_element) {
889 end_element = thisDim.stop;
890 }
891
892 unsigned long long chunk_end = end_element - chunk_origin[dim];
893
894 unsigned int last_dim = chunk_shape.size() - 1;
895 if (dim == last_dim) {
896 unsigned int elem_width = prototype()->width();
897
898 array_offset += chunk_origin[dim];
899
900 // Compute how much we are going to copy
901 unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * elem_width;
902 char *source_buffer = chunk->get_rbuf();
903 char *target_buffer = get_buf();
904 memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
905 }
906 else {
907 unsigned long mc = multiplier(chunk_shape, dim);
908 unsigned long ma = multiplier(array_shape, dim);
909
910 // Not the last dimension, so we continue to proceed down the Recursion Branch.
911 for (unsigned int chunk_index = 0 /*chunk_start*/; chunk_index <= chunk_end; ++chunk_index) {
912 unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
913 unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
914
915 // Re-entry here:
916 insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape,
917 chunk_origin);
918 }
919 }
920}
921
933void DmrppArray::read_chunks_unconstrained()
934{
935 if (get_chunks_size() < 2)
936 throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
937
938 // Find all the required chunks to read. I used a queue to preserve the chunk order, which
939 // made using a debugger easier. However, order does not matter, AFAIK.
940
941 unsigned long long sc_count=0;
942 stringstream sc_id;
943 sc_id << name() << "-" << sc_count++;
944 queue<shared_ptr<SuperChunk>> super_chunks;
945 auto current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this)) ;
946 super_chunks.push(current_super_chunk);
947
948 // Make the SuperChunks using all the chunks.
949 for(const auto& chunk: get_immutable_chunks()) {
950 bool added = current_super_chunk->add_chunk(chunk);
951 if (!added) {
952 sc_id.str(std::string());
953 sc_id << name() << "-" << sc_count++;
954 current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this));
955 super_chunks.push(current_super_chunk);
956 if (!current_super_chunk->add_chunk(chunk)) {
957 stringstream msg ;
958 msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
959 throw BESInternalError(msg.str(), __FILE__, __LINE__);
960 }
961 }
962 }
963
964 reserve_value_capacity(get_size());
965 // The size in element of each of the array's dimensions
966 const vector<unsigned long long> array_shape = get_shape(true);
967 // The size, in elements, of each of the chunk's dimensions
968 const vector<unsigned long long> chunk_shape = get_chunk_dimension_sizes();
969
970 BESDEBUG(dmrpp_3, prolog << "d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ? "true" : "false") << endl);
971 BESDEBUG(dmrpp_3, prolog << "d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
972
973 if (!DmrppRequestHandler::d_use_transfer_threads) { // Serial transfers
974#if DMRPP_ENABLE_THREAD_TIMERS
975 BESStopWatch sw(dmrpp_3);
976 sw.start(prolog + "Serial SuperChunk Processing.");
977#endif
978 while(!super_chunks.empty()) {
979 auto super_chunk = super_chunks.front();
980 super_chunks.pop();
981 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(true) << endl );
982 super_chunk->read_unconstrained();
983 }
984 }
985 else { // Parallel transfers
986#if DMRPP_ENABLE_THREAD_TIMERS
987 stringstream timer_name;
988 timer_name << prolog << "Concurrent SuperChunk Processing. d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads;
989 BESStopWatch sw(dmrpp_3);
990 sw.start(timer_name.str());
991#endif
992 read_super_chunks_unconstrained_concurrent(super_chunks, this);
993 }
994 set_read_p(true);
995}
996
997
1000
1013unsigned long long DmrppArray::get_chunk_start(const dimension &thisDim, unsigned int chunk_origin)
1014{
1015 // What's the first element that we are going to access for this dimension of the chunk?
1016 unsigned long long first_element_offset = 0; // start with 0
1017 if ((unsigned) (thisDim.start) < chunk_origin) {
1018 // If the start is behind this chunk, then it's special.
1019 if (thisDim.stride != 1) {
1020 // And if the stride isn't 1, we have to figure our where to begin in this chunk.
1021 first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
1022 // If it's zero great!
1023 if (first_element_offset != 0) {
1024 // otherwise, adjust to get correct first element.
1025 first_element_offset = thisDim.stride - first_element_offset;
1026 }
1027 }
1028 }
1029 else {
1030 first_element_offset = thisDim.start - chunk_origin;
1031 }
1032
1033 return first_element_offset;
1034}
1035
1057shared_ptr<Chunk>
1058DmrppArray::find_needed_chunks(unsigned int dim, vector<unsigned long long> *target_element_address, shared_ptr<Chunk> chunk)
1059{
1060 BESDEBUG(dmrpp_3, prolog << " BEGIN, dim: " << dim << endl);
1061
1062 // The size, in elements, of each of the chunk's dimensions.
1063 const vector<unsigned long long> &chunk_shape = get_chunk_dimension_sizes();
1064
1065 // The chunk's origin point a.k.a. its "position in array".
1066 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1067
1068 dimension thisDim = this->get_dimension(dim);
1069
1070 // Do we even want this chunk?
1071 if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) ||
1072 (unsigned) thisDim.stop < chunk_origin[dim]) {
1073 return nullptr; // No. No, we do not. Skip this chunk.
1074 }
1075
1076 // What's the first element that we are going to access for this dimension of the chunk?
1077 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1078
1079 // Is the next point to be sent in this chunk at all? If no, return.
1080 if (chunk_start > chunk_shape[dim]) {
1081 return nullptr;
1082 }
1083
1084 // Now we figure out the correct last element, based on the subset expression
1085 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1086 if ((unsigned) thisDim.stop < end_element) {
1087 end_element = thisDim.stop;
1088 }
1089
1090 unsigned long long chunk_end = end_element - chunk_origin[dim];
1091
1092 unsigned int last_dim = chunk_shape.size() - 1;
1093 if (dim == last_dim) {
1094 BESDEBUG(dmrpp_3, prolog << " END, This is the last_dim. chunk: " << chunk->to_string() << endl);
1095 return chunk;
1096 }
1097 else {
1098 // Not the last dimension, so we continue to proceed down the Recursion Branch.
1099 for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1100 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1101
1102 // Re-entry here:
1103 auto needed = find_needed_chunks(dim + 1, target_element_address, chunk);
1104 if (needed){
1105 BESDEBUG(dmrpp_3, prolog << " END, Found chunk: " << needed->to_string() << endl);
1106 return needed;
1107 }
1108
1109 }
1110 }
1111 BESDEBUG(dmrpp_3, prolog << " END, dim: " << dim << endl);
1112
1113 return nullptr;
1114}
1115
1135void DmrppArray::insert_chunk(
1136 unsigned int dim,
1137 vector<unsigned long long> *target_element_address,
1138 vector<unsigned long long> *chunk_element_address,
1139 shared_ptr<Chunk> chunk,
1140 const vector<unsigned long long> &constrained_array_shape){
1141
1142 // The size, in elements, of each of the chunk's dimensions.
1143 const vector<unsigned long long> &chunk_shape = get_chunk_dimension_sizes();
1144
1145 // The chunk's origin point a.k.a. its "position in array".
1146 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1147
1148 dimension thisDim = this->get_dimension(dim);
1149
1150 // What's the first element that we are going to access for this dimension of the chunk?
1151 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1152
1153 // Now we figure out the correct last element, based on the subset expression
1154 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1155 if ((unsigned) thisDim.stop < end_element) {
1156 end_element = thisDim.stop;
1157 }
1158
1159 unsigned long long chunk_end = end_element - chunk_origin[dim];
1160
1161 unsigned int last_dim = chunk_shape.size() - 1;
1162 if (dim == last_dim) {
1163 char *source_buffer = chunk->get_rbuf();
1164 char *target_buffer = get_buf();
1165 unsigned int elem_width = prototype()->width();
1166
1167 if (thisDim.stride == 1) {
1168 // The start element in this array
1169 unsigned long long start_element = chunk_origin[dim] + chunk_start;
1170 // Compute how much we are going to copy
1171 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1172
1173 // Compute where we need to put it.
1174 (*target_element_address)[dim] = (start_element - thisDim.start); // / thisDim.stride;
1175 // Compute where we are going to read it from
1176 (*chunk_element_address)[dim] = chunk_start;
1177
1178 // See below re get_index()
1179 unsigned long long target_char_start_index =
1180 get_index(*target_element_address, constrained_array_shape) * elem_width;
1181 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1182
1183 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index,
1184 chunk_constrained_inner_dim_bytes);
1185 }
1186 else {
1187 // Stride != 1
1188 for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1189 // Compute where we need to put it.
1190 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1191
1192 // Compute where we are going to read it from
1193 (*chunk_element_address)[dim] = chunk_index;
1194
1195 // These calls to get_index() can be removed as with the insert...unconstrained() code.
1196 unsigned int target_char_start_index =
1197 get_index(*target_element_address, constrained_array_shape) * elem_width;
1198 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1199
1200 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1201 }
1202 }
1203 }
1204 else {
1205 // Not the last dimension, so we continue to proceed down the Recursion Branch.
1206 for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1207 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1208 (*chunk_element_address)[dim] = chunk_index;
1209
1210 // Re-entry here:
1211 insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape);
1212 }
1213 }
1214}
1215
1222void DmrppArray::read_chunks()
1223{
1224 if (get_chunks_size() < 2)
1225 throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
1226
1227 // Find all the required chunks to read. I used a queue to preserve the chunk order, which
1228 // made using a debugger easier. However, order does not matter, AFAIK.
1229 unsigned long long sc_count=0;
1230 stringstream sc_id;
1231 sc_id << name() << "-" << sc_count++;
1232 queue<shared_ptr<SuperChunk>> super_chunks;
1233 auto current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(), this)) ;
1234 super_chunks.push(current_super_chunk);
1235
1236 // TODO We know that non-contiguous chunks may be forward or backward in the file from
1237 // the current offset. When an add_chunk() call fails, prior to making a new SuperChunk
1238 // we might want try adding the rejected Chunk to the other existing SuperChunks to see
1239 // if it's contiguous there.
1240 // Find the required Chunks and put them into SuperChunks.
1241 bool found_needed_chunks = false;
1242 for(const auto& chunk: get_immutable_chunks()){
1243 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
1244 auto needed = find_needed_chunks(0 /* dimension */, &target_element_address, chunk);
1245 if (needed){
1246 found_needed_chunks = true;
1247 bool added = current_super_chunk->add_chunk(chunk);
1248 if(!added){
1249 sc_id.str(std::string()); // Clears stringstream.
1250 sc_id << name() << "-" << sc_count++;
1251 current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this));
1252 super_chunks.push(current_super_chunk);
1253 if(!current_super_chunk->add_chunk(chunk)){
1254 stringstream msg ;
1255 msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
1256 throw BESInternalError(msg.str(), __FILE__, __LINE__);
1257 }
1258 }
1259 }
1260 }
1261 BESDEBUG(dmrpp_3, prolog << "found_needed_chunks: " << (found_needed_chunks?"true":"false") << endl);
1262 if(!found_needed_chunks){ // Ouch! Something went horribly wrong...
1263 throw BESInternalError("ERROR - Failed to locate any chunks that correspond to the requested data.", __FILE__, __LINE__);
1264 }
1265
1266 reserve_value_capacity(get_size(true));
1267
1268 BESDEBUG(dmrpp_3, prolog << "d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ? "true" : "false") << endl);
1269 BESDEBUG(dmrpp_3, prolog << "d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
1270 BESDEBUG(dmrpp_3, prolog << "d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ? "true" : "false") << endl);
1271 BESDEBUG(dmrpp_3, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
1272 BESDEBUG(dmrpp_3, prolog << "SuperChunks.size(): " << super_chunks.size() << endl);
1273
1274 if (!DmrppRequestHandler::d_use_transfer_threads) {
1275 // This version is the 'serial' version of the code. It reads a chunk, inserts it,
1276 // reads the next one, and so on.
1277#if DMRPP_ENABLE_THREAD_TIMERS
1278 BESStopWatch sw(dmrpp_3);
1279 sw.start(prolog + "Serial SuperChunk Processing.");
1280#endif
1281 while (!super_chunks.empty()) {
1282 auto super_chunk = super_chunks.front();
1283 super_chunks.pop();
1284 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(true) << endl );
1285 super_chunk->read();
1286 }
1287 }
1288 else {
1289#if DMRPP_ENABLE_THREAD_TIMERS
1290 stringstream timer_name;
1291 timer_name << prolog << "Concurrent SuperChunk Processing. d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads;
1292 BESStopWatch sw(dmrpp_3);
1293 sw.start(timer_name.str());
1294#endif
1295 read_super_chunks_concurrent(super_chunks, this);
1296 }
1297 set_read_p(true);
1298}
1299
1300
1301#ifdef USE_READ_SERIAL
1323void DmrppArray::insert_chunk_serial(unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
1324 Chunk *chunk)
1325{
1326 BESDEBUG("dmrpp", __func__ << " dim: "<< dim << " BEGIN "<< endl);
1327
1328 // The size, in elements, of each of the chunk's dimensions.
1329 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
1330
1331 // The chunk's origin point a.k.a. its "position in array".
1332 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
1333
1334 dimension thisDim = this->get_dimension(dim);
1335
1336 // Do we even want this chunk?
1337 if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (unsigned) thisDim.stop < chunk_origin[dim]) {
1338 return; // No. No, we do not. Skip this.
1339 }
1340
1341 // What's the first element that we are going to access for this dimension of the chunk?
1342 unsigned int first_element_offset = get_chunk_start(dim, chunk_origin);
1343
1344 // Is the next point to be sent in this chunk at all? If no, return.
1345 if (first_element_offset > chunk_shape[dim]) {
1346 return;
1347 }
1348
1349 // Now we figure out the correct last element, based on the subset expression
1350 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1351 if ((unsigned) thisDim.stop < end_element) {
1352 end_element = thisDim.stop;
1353 }
1354
1355 unsigned long long chunk_start = first_element_offset; //start_element - chunk_origin[dim];
1356 unsigned long long chunk_end = end_element - chunk_origin[dim];
1357 vector<unsigned int> constrained_array_shape = get_shape(true);
1358
1359 unsigned int last_dim = chunk_shape.size() - 1;
1360 if (dim == last_dim) {
1361 // Read and Process chunk
1362 chunk->read_chunk();
1363
1364 chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
1365
1366 char *source_buffer = chunk->get_rbuf();
1367 char *target_buffer = get_buf();
1368 unsigned int elem_width = prototype()->width();
1369
1370 if (thisDim.stride == 1) {
1371 // The start element in this array
1372 unsigned long long start_element = chunk_origin[dim] + first_element_offset;
1373 // Compute how much we are going to copy
1374 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1375
1376 // Compute where we need to put it.
1377 (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
1378 // Compute where we are going to read it from
1379 (*chunk_element_address)[dim] = first_element_offset;
1380
1381 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1382 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1383
1384 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
1385 }
1386 else {
1387 // Stride != 1
1388 for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1389 // Compute where we need to put it.
1390 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1391
1392 // Compute where we are going to read it from
1393 (*chunk_element_address)[dim] = chunk_index;
1394
1395 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1396 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1397
1398 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1399 }
1400 }
1401 }
1402 else {
1403 // Not the last dimension, so we continue to proceed down the Recursion Branch.
1404 for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1405 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1406 (*chunk_element_address)[dim] = chunk_index;
1407
1408 // Re-entry here:
1409 insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
1410 }
1411 }
1412}
1413
1414void DmrppArray::read_chunks_serial()
1415{
1416 BESDEBUG("dmrpp", __func__ << " for variable '" << name() << "' - BEGIN" << endl);
1417
1418 vector<Chunk> &chunk_refs = get_chunk_vec();
1419 if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
1420
1421 // Allocate target memory.
1422 reserve_value_capacity(get_size(true));
1423
1424 /*
1425 * Find the chunks to be read, make curl_easy handles for them, and
1426 * stuff them into our curl_multi handle. This is a recursive activity
1427 * which utilizes the same code that copies the data from the chunk to
1428 * the variables.
1429 */
1430 for (unsigned long i = 0; i < chunk_refs.size(); i++) {
1431 Chunk &chunk = chunk_refs[i];
1432
1433 vector<unsigned int> chunk_source_address(dimensions(), 0);
1434 vector<unsigned int> target_element_address = chunk.get_position_in_array();
1435
1436 // Recursive insertion operation.
1437 insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
1438 }
1439
1440 set_read_p(true);
1441
1442 BESDEBUG("dmrpp", "DmrppArray::"<< __func__ << "() for " << name() << " END"<< endl);
1443}
1444#endif
1445
1446void
1447DmrppArray::set_send_p(bool state)
1448{
1449 if (!get_attributes_loaded())
1450 load_attributes(this);
1451
1452 Array::set_send_p(state);
1453}
1454
1462void DmrppArray::read_contiguous_string()
1463{
1464 BESStopWatch sw;
1465 if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + " name: "+name(), "");
1466
1467 // This is the original chunk for this 'contiguous' variable.
1468 auto the_one_chunk = get_immutable_chunks()[0];
1469
1470 // Read the the_one_chunk as is. This is the non-parallel I/O case
1471 the_one_chunk->read_chunk();
1472
1473 // Now that the_one_chunk has been read, we do what is necessary...
1474 if (!is_filters_empty()){
1475 the_one_chunk->filter_chunk(get_filters(), get_chunk_size_in_elements(), var()->width());
1476 }
1477
1478 // FIXME This part will only work if the array contains a single element. See below.
1479 // jhrg 3/3/22
1480 vector < string > ss; // Prepare for the general case
1481 string s(reinterpret_cast<char *>(the_one_chunk->get_rbuf()));
1482 ss.push_back(s);
1483 set_value(ss, ss.size());
1484
1485 set_read_p(true);
1486}
1487
1500{
1501 // If the chunks are not loaded, load them now. NB: load_chunks()
1502 // reads data for HDF5 COMPACT storage, so read_p() will be true
1503 // (but it does not read any other data). Thus, call load_chunks()
1504 // before testing read_p() to cover that case. jhrg 11/15/21
1505 // String Arrays that use COMPACT storage appear to work. jhrg 3/3/22
1506 if (!get_chunks_loaded())
1507 load_chunks(this);
1508
1509 if (read_p()) return true;
1510
1511 // FIXME Strings are a special case and, currently, we do not have enough
1512 // information in the DMR++ to cover most cases that can be present in HDF5
1513 // files. In addition, the way libdap stores string data means that we need
1514 // to build c++ string objects from the raw data we read from the source
1515 // data file. Thus, the code for strings (and URLs) is a special case.
1516 // Currently, we can process only arrays with one element. jhrg 3/3/22
1517
1518 if ((var()->type() == dods_str_c || var()->type() == dods_url_c)) {
1519 // FIXME Add support for both of these things once the DMR++ has the needed
1520 // information. jhrg 3/3/22
1521 if (is_projected())
1522 throw BESInternalError("Subsetting of Sting Arrays is not currently supported.", __FILE__, __LINE__);
1523
1524 if (length() != 1)
1525 throw BESInternalError("Only one dimensional String Arrays are currently supported.", __FILE__, __LINE__);
1526
1527 if (get_chunks_size() == 1) {
1528 read_contiguous_string(); // Throws on various errors
1529 }
1530 else { // Handle the more complex case where the data is chunked.
1531 //read_chunks_unconstrained();
1532 // FIXME Yup, fix this, too. jhrg 3/3/22
1533 throw BESInternalError("Chunked String Array data is not currently supported.", __FILE__, __LINE__);
1534 }
1535
1536 // exit here for strings; we only 'twiddle' bytes for integer data.
1537 return true;
1538 }
1539
1540 // Single chunk and 'contiguous' are the same for this code.
1541
1542 if (get_chunks_size() == 1) {
1543 read_contiguous(); // Throws on various errors
1544 }
1545 else { // Handle the more complex case where the data is chunked.
1546 if (!is_projected()) {
1547 read_chunks_unconstrained();
1548 }
1549 else {
1550 read_chunks();
1551 }
1552 }
1553
1554 if (this->twiddle_bytes()) {
1555 int num = this->length();
1556 Type var_type = this->var()->type();
1557
1558 switch (var_type) {
1559 case dods_int16_c:
1560 case dods_uint16_c: {
1561 dods_uint16 *local = reinterpret_cast<dods_uint16*>(this->get_buf());
1562 while (num--) {
1563 *local = bswap_16(*local);
1564 local++;
1565 }
1566 break;
1567 }
1568 case dods_int32_c:
1569 case dods_uint32_c: {
1570 dods_uint32 *local = reinterpret_cast<dods_uint32*>(this->get_buf());;
1571 while (num--) {
1572 *local = bswap_32(*local);
1573 local++;
1574 }
1575 break;
1576 }
1577 case dods_int64_c:
1578 case dods_uint64_c: {
1579 dods_uint64 *local = reinterpret_cast<dods_uint64*>(this->get_buf());;
1580 while (num--) {
1581 *local = bswap_64(*local);
1582 local++;
1583 }
1584 break;
1585 }
1586 default: break; // Do nothing for all other types.
1587 }
1588 }
1589
1590 return true;
1591}
1592
1597class PrintD4ArrayDimXMLWriter : public unary_function<Array::dimension &, void> {
1598 XMLWriter &xml;
1599 // Was this variable constrained using local/direct slicing? i.e., is d_local_constraint set?
1600 // If so, don't use shared dimensions; instead emit Dim elements that are anonymous.
1601 bool d_constrained;
1602public:
1603
1604 PrintD4ArrayDimXMLWriter(XMLWriter &xml, bool c) :
1605 xml(xml), d_constrained(c)
1606 {
1607 }
1608
1609 void operator()(Array::dimension &d)
1610 {
1611 // This duplicates code in D4Dimensions (where D4Dimension::print_dap4() is defined
1612 // because of the need to print the constrained size of a dimension). I think that
1613 // the constraint information has to be kept here and not in the dimension (since they
1614 // are shared dims). Could hack print_dap4() to take the constrained size, however.
1615 if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar *) "Dim") < 0)
1616 throw InternalErr(__FILE__, __LINE__, "Could not write Dim element");
1617
1618 string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
1619 // If there is a name, there must be a Dimension (named dimension) in scope
1620 // so write its name but not its size.
1621 if (!d_constrained && !name.empty()) {
1622 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name",
1623 (const xmlChar *) name.c_str()) < 0)
1624 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1625 }
1626 else if (d.use_sdim_for_slice) {
1627 assert(!name.empty());
1628 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name",
1629 (const xmlChar *) name.c_str()) < 0)
1630 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1631 }
1632 else {
1633 ostringstream size;
1634 size << (d_constrained ? d.c_size : d.size);
1635 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "size",
1636 (const xmlChar *) size.str().c_str()) < 0)
1637 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1638 }
1639
1640 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1641 throw InternalErr(__FILE__, __LINE__, "Could not end Dim element");
1642 }
1643};
1644
1645class PrintD4ConstructorVarXMLWriter : public unary_function<BaseType *, void> {
1646 XMLWriter &xml;
1647 bool d_constrained;
1648public:
1649 PrintD4ConstructorVarXMLWriter(XMLWriter &xml, bool c) :
1650 xml(xml), d_constrained(c)
1651 {
1652 }
1653
1654 void operator()(BaseType *btp)
1655 {
1656 btp->print_dap4(xml, d_constrained);
1657 }
1658};
1659
1660class PrintD4MapXMLWriter : public unary_function<D4Map *, void> {
1661 XMLWriter &xml;
1662
1663public:
1664 PrintD4MapXMLWriter(XMLWriter &xml) :
1665 xml(xml)
1666 {
1667 }
1668
1669 void operator()(D4Map *m)
1670 {
1671 m->print_dap4(xml);
1672 }
1673};
1675
1699void DmrppArray::print_dap4(XMLWriter &xml, bool constrained /*false*/)
1700{
1701 if (constrained && !send_p()) return;
1702
1703 if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar *) var()->type_name().c_str()) < 0)
1704 throw InternalErr(__FILE__, __LINE__, "Could not write " + type_name() + " element");
1705
1706 if (!name().empty())
1707 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name", (const xmlChar *) name().c_str()) <
1708 0)
1709 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1710
1711 // Hack job... Copied from D4Enum::print_xml_writer. jhrg 11/12/13
1712 if (var()->type() == dods_enum_c) {
1713 D4Enum *e = static_cast<D4Enum *>(var());
1714 string path = e->enumeration()->name();
1715 if (e->enumeration()->parent()) {
1716 // print the FQN for the enum def; D4Group::FQN() includes the trailing '/'
1717 path = static_cast<D4Group *>(e->enumeration()->parent()->parent())->FQN() + path;
1718 }
1719 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "enum", (const xmlChar *) path.c_str()) < 0)
1720 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for enum");
1721 }
1722
1723 if (prototype()->is_constructor_type()) {
1724 Constructor &c = static_cast<Constructor &>(*prototype());
1725 for_each(c.var_begin(), c.var_end(), PrintD4ConstructorVarXMLWriter(xml, constrained));
1726 // bind2nd(mem_fun_ref(&BaseType::print_dap4), xml));
1727 }
1728
1729 // Drop the local_constraint which is per-array and use a per-dimension on instead
1730 for_each(dim_begin(), dim_end(), PrintD4ArrayDimXMLWriter(xml, constrained));
1731
1732 attributes()->print_dap4(xml);
1733
1734 for_each(maps()->map_begin(), maps()->map_end(), PrintD4MapXMLWriter(xml));
1735
1736 // Only print the chunks' info if there. This is the code added to libdap::Array::print_dap4().
1737 // jhrg 5/10/18
1738 // Update: print the <chunks> element even if the chinks_size value is zero since this
1739 // might be a variable with all fill values. jhrg 4/24/22
1742
1743 // If this variable uses the COMPACT layout, encode the values for
1744 // the array using base64. Note that strings are a special case; each
1745 // element of the array is a string and is encoded in its own base64
1746 // xml element. So, while an array of 10 int32 will be encoded in a
1747 // single base64 element, an array of 10 strings will use 10 base64
1748 // elements. This is because the size of each string's value is different.
1749 // Not so for an int32.
1750 if (DmrppCommon::d_print_chunks && is_compact_layout() && read_p()) {
1751 switch (var()->type()) {
1752 case dods_byte_c:
1753 case dods_char_c:
1754 case dods_int8_c:
1755 case dods_uint8_c:
1756 case dods_int16_c:
1757 case dods_uint16_c:
1758 case dods_int32_c:
1759 case dods_uint32_c:
1760 case dods_int64_c:
1761 case dods_uint64_c:
1762
1763 case dods_enum_c:
1764
1765 case dods_float32_c:
1766 case dods_float64_c: {
1767 u_int8_t *values = 0;
1768 try {
1769 size_t size = buf2val(reinterpret_cast<void **>(&values));
1770 string encoded = base64::Base64::encode(values, size);
1772 delete[] values;
1773 }
1774 catch (...) {
1775 delete[] values;
1776 throw;
1777 }
1778 break;
1779 }
1780
1781 case dods_str_c:
1782 case dods_url_c: {
1783 string *values = 0;
1784 try {
1785 // discard the return value of buf2val()
1786 buf2val(reinterpret_cast<void **>(&values));
1787 string str;
1788 for (int i = 0; i < length(); ++i) {
1789 str = (*(static_cast<string *> (values) + i));
1790 string encoded = base64::Base64::encode(reinterpret_cast<const u_int8_t *>(str.c_str()), str.size());
1792 }
1793 delete[] values;
1794 }
1795 catch (...) {
1796 delete[] values;
1797 throw;
1798 }
1799 break;
1800 }
1801
1802 default:
1803 throw InternalErr(__FILE__, __LINE__, "Vector::val2buf: bad type");
1804 }
1805 }
1806 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1807 throw InternalErr(__FILE__, __LINE__, "Could not end " + type_name() + " element");
1808}
1809
1810void DmrppArray::dump(ostream &strm) const
1811{
1812 strm << BESIndent::LMarg << "DmrppArray::" << __func__ << "(" << (void *) this << ")" << endl;
1813 BESIndent::Indent();
1814 DmrppCommon::dump(strm);
1815 Array::dump(strm);
1816 strm << BESIndent::LMarg << "value: " << "----" << /*d_buf <<*/endl;
1817 BESIndent::UnIndent();
1818}
1819
1820} // namespace dmrpp
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition: BESDebug.h:168
exception thrown if internal error encountered
virtual bool start(std::string name)
Definition: BESStopWatch.cc:67
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
Definition: DmrppArray.cc:596
virtual unsigned long long get_size(bool constrained=false)
Return the total number of elements in this Array.
Definition: DmrppArray.cc:580
bool read() override
Read data for the array.
Definition: DmrppArray.cc:1499
virtual void print_dap4(libdap::XMLWriter &writer, bool constrained=false)
Shadow libdap::Array::print_dap4() - optionally prints DMR++ chunk information.
Definition: DmrppArray.cc:1699
static std::string d_ns_prefix
The XML namespace prefix to use.
Definition: DmrppCommon.h:136
virtual bool twiddle_bytes() const
Returns true if this object utilizes shuffle compression.
Definition: DmrppCommon.h:168
static bool d_print_chunks
if true, print_dap4() prints chunk elements
Definition: DmrppCommon.h:134
virtual bool is_compact_layout() const
Returns true if this object utilizes COMPACT layout.
Definition: DmrppCommon.h:158
virtual void load_attributes(libdap::BaseType *btp)
Load the attribute information for this variable.
Definition: DmrppCommon.cc:519
void print_compact_element(libdap::XMLWriter &xml, const std::string &name_space="", const std::string &encoded="")
Print the Compact base64-encoded information.
Definition: DmrppCommon.cc:432
virtual bool get_chunks_loaded() const
Have the chunks been loaded?
Definition: DmrppCommon.h:174
virtual size_t get_chunks_size() const
Use this when the number of chunks is needed.
Definition: DmrppCommon.h:191
void print_chunks_element(libdap::XMLWriter &xml, const std::string &name_space="")
Print the Chunk information.
Definition: DmrppCommon.cc:354
virtual const std::vector< std::shared_ptr< Chunk > > & get_immutable_chunks() const
A const reference to the vector of chunks.
Definition: DmrppCommon.h:185
virtual bool get_uses_fill_value() const
Definition: DmrppCommon.h:220
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
Definition: DmrppCommon.h:195
virtual void load_chunks(libdap::BaseType *btp)
Load chunk information for this variable.
Definition: DmrppCommon.cc:510
virtual unsigned long long get_chunk_size_in_elements() const
Get the number of elements in this chunk.
Definition: DmrppCommon.h:201
virtual std::string get_filters() const
Return the names of all the filters in the order they were applied.
Definition: DmrppCommon.h:147
virtual bool get_attributes_loaded() const
Have the attributes been loaded?
Definition: DmrppCommon.h:178
Type
Type of JSON value.
Definition: rapidjson.h:664