bes Updated for version 3.20.13
SuperChunk.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of the BES
4
5// Copyright (c) 2018 OPeNDAP, Inc.
6// Author: Nathan Potter<ndp@opendap.org>
7//
8// This library is free software; you can redistribute it and/or
9// modify it under the terms of the GNU Lesser General Public
10// License as published by the Free Software Foundation; either
11// version 2.1 of the License, or (at your option) any later version.
12//
13// This library is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16// Lesser General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public
19// License along with this library; if not, write to the Free Software
20// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21//
22// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23
24#include "config.h"
25
26#include <sstream>
27#include <vector>
28#include <string>
29
30#include "BESInternalError.h"
31#include "BESDebug.h"
32
33#include "DmrppRequestHandler.h"
34#include "CurlHandlePool.h"
35#include "DmrppArray.h"
36#include "DmrppNames.h"
37#include "Chunk.h"
38#include "SuperChunk.h"
39
40#define prolog std::string("SuperChunk::").append(__func__).append("() - ")
41
42#define SUPER_CHUNK_MODULE "dmrpp:3"
43
44using std::stringstream;
45using std::string;
46using std::vector;
47
48namespace dmrpp {
49
50// ThreadPool state variables.
51std::mutex chunk_processing_thread_pool_mtx; // mutex for critical section
52atomic_uint chunk_processing_thread_counter(0);
53#define COMPUTE_THREADS "compute_threads"
54
55#define DMRPP_ENABLE_THREAD_TIMERS 0
56
75void process_one_chunk(shared_ptr<Chunk> chunk, DmrppArray *array, const vector<unsigned long long> &constrained_array_shape)
76{
77 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
78
79 // TODO If this is part of a SuperChunk, hasn't the data been read by SuperChunk::Retrieve_data()?
80 // If so, calling read() here is not needed. Same question below. jhg 5/7/22
81 chunk->read_chunk();
82
83 if(array) {
84 // If this chunk used/uses hdf5 fill values, do not attempt to deflate, et., its
85 // values since the fill value code makes the chunks 'fully formed.'' jhrg 5/16/22
86 if (!chunk->get_uses_fill_value() && !array->is_filters_empty())
87 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->var()->width());
88
89 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
90 vector<unsigned long long> chunk_source_address(array->dimensions(), 0);
91
92 array->insert_chunk(0, &target_element_address, &chunk_source_address,
93 chunk, constrained_array_shape);
94 }
95
96 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
97}
98
118void process_one_chunk_unconstrained(shared_ptr<Chunk> chunk, const vector<unsigned long long> &chunk_shape,
119 DmrppArray *array, const vector<unsigned long long> &array_shape)
120{
121 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
122
123 chunk->read_chunk();
124
125 if(array){
126 if (!chunk->get_uses_fill_value() && !array->is_filters_empty())
127 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->var()->width());
128
129 array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
130 }
131
132 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
133}
134
135
141bool one_chunk_compute_thread(unique_ptr<one_chunk_args> args)
142{
143#if DMRPP_ENABLE_THREAD_TIMERS
144 stringstream timer_tag;
145 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
146 " parent_tid: 0x" << std::hex << args->parent_thread_id << " parent_sc: " << args->parent_super_chunk_id;
147 BESStopWatch sw(COMPUTE_THREADS);
148 sw.start(timer_tag.str());
149#endif
150
151 process_one_chunk(args->chunk, args->array, args->array_shape);
152 return true;
153}
154
160bool one_chunk_unconstrained_compute_thread(unique_ptr<one_chunk_unconstrained_args> args)
161{
162#if DMRPP_ENABLE_THREAD_TIMERS
163 stringstream timer_tag;
164 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
165 " parent_tid: 0x" << std::hex << args->parent_thread_id << " parent_sc: " << args->parent_super_chunk_id ;
166 BESStopWatch sw(COMPUTE_THREADS);
167 sw.start(timer_tag.str());
168#endif
169
170 process_one_chunk_unconstrained(args->chunk, args->chunk_shape, args->array, args->array_shape);
171 return true;
172}
173
184bool start_one_chunk_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_args> args) {
185 bool retval = false;
186 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
187 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << " chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
188 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
189 chunk_processing_thread_counter++;
190 futures.push_back(std::async(std::launch::async, one_chunk_compute_thread, std::move(args)));
191 retval = true;
192 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Got std::future '" << futures.size() <<
193 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
194 }
195 return retval;
196}
197
208bool start_one_chunk_unconstrained_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_unconstrained_args> args) {
209 bool retval = false;
210 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
211 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
212 futures.push_back(std::async(std::launch::async, one_chunk_unconstrained_compute_thread, std::move(args)));
213 chunk_processing_thread_counter++;
214 retval = true;
215 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Got std::future '" << futures.size() <<
216 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
217 }
218 return retval;
219}
220
242void process_chunks_concurrent(
243 const string &super_chunk_id,
244 queue<shared_ptr<Chunk>> &chunks,
245 DmrppArray *array,
246 const vector<unsigned long long> &constrained_array_shape ){
247
248 // We maintain a list of futures to track our parallel activities.
249 list<future<bool>> futures;
250 try {
251 bool done = false;
252 bool future_finished = true;
253 while (!done) {
254
255 if(!futures.empty())
256 future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
257
258 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
259 // because future::get() was called or a call to future::valid() returned false.
260 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
261
262 if (!chunks.empty()){
263 // Next we try to add a new Chunk compute thread if we can - there might be room.
264 bool thread_started = true;
265 while(thread_started && !chunks.empty()) {
266 auto chunk = chunks.front();
267 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Starting thread for " << chunk->to_string() << endl);
268
269 auto args = unique_ptr<one_chunk_args>(new one_chunk_args(super_chunk_id, chunk, array, constrained_array_shape));
270 thread_started = start_one_chunk_compute_thread(futures, std::move(args));
271
272 if (thread_started) {
273 chunks.pop();
274 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "STARTED thread for " << chunk->to_string() << endl);
275 } else {
276 // Thread did not start, ownership of the arguments was not passed to the thread.
277 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Thread not started. args deleted, Chunk remains in queue.) " <<
278 "chunk_processing_thread_counter: " << chunk_processing_thread_counter << " futures.size(): " << futures.size() << endl);
279 }
280 }
281 }
282 else {
283 // No more Chunks and no futures means we're done here.
284 if(futures.empty())
285 done = true;
286 }
287 future_finished = false;
288 }
289 }
290 catch (...) {
291 // Complete all of the futures, otherwise we'll have threads out there using up resources
292 while(!futures.empty()){
293 if(futures.back().valid())
294 futures.back().get();
295 futures.pop_back();
296 }
297 // re-throw the exception
298 throw;
299 }
300}
301
324void process_chunks_unconstrained_concurrent(
325 const string &super_chunk_id,
326 queue<shared_ptr<Chunk>> &chunks,
327 const vector<unsigned long long> &chunk_shape,
328 DmrppArray *array,
329 const vector<unsigned long long> &array_shape){
330
331 // We maintain a list of futures to track our parallel activities.
332 list<future<bool>> futures;
333 try {
334 bool done = false;
335 bool future_finished = true;
336 while (!done) {
337
338 if(!futures.empty())
339 future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
340
341 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
342 // because future::get() was called or a call to future::valid() returned false.
343 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
344
345 if (!chunks.empty()){
346 // Next we try to add a new Chunk compute thread if we can - there might be room.
347 bool thread_started = true;
348 while(thread_started && !chunks.empty()) {
349 auto chunk = chunks.front();
350 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Starting thread for " << chunk->to_string() << endl);
351
352 auto args = unique_ptr<one_chunk_unconstrained_args>(
353 new one_chunk_unconstrained_args(super_chunk_id, chunk, array, array_shape, chunk_shape) );
354 thread_started = start_one_chunk_unconstrained_compute_thread(futures, std::move(args));
355
356 if (thread_started) {
357 chunks.pop();
358 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "STARTED thread for " << chunk->to_string() << endl);
359 } else {
360 // Thread did not start, ownership of the arguments was not passed to the thread.
361 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
362 " chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
363 " futures.size(): " << futures.size() << endl);
364 }
365 }
366 }
367 else {
368 // No more Chunks and no futures means we're done here.
369 if(futures.empty())
370 done = true;
371 }
372 future_finished = false;
373 }
374 }
375 catch (...) {
376 // Complete all of the futures, otherwise we'll have threads out there using up resources
377 while(!futures.empty()){
378 if(futures.back().valid())
379 futures.back().get();
380 futures.pop_back();
381 }
382 // re-throw the exception
383 throw;
384 }
385}
386
387//#####################################################################################################################
388//#####################################################################################################################
389//#####################################################################################################################
390//
391// SuperChunk Code Begins Here
392//
393// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
394
395// TODO There are (at least) two ways to handle 'fill value chunks.' The code can group
396// them all together as one big SuperChunk or store each FV chunk in its own SuperChunk.
397// (Of course, there are alternatives...) Using one SuperChunk is probably faster but
398// will require more work on the SuperChunk code. I think we should postpone that for now
399// to focus on getting the values correct (because that problem has yet to be solved).
400// I will add a ticket to return to this code and make that modification. jhrg 5/7/22
401//
425bool SuperChunk::add_chunk(const std::shared_ptr<Chunk> candidate_chunk) {
426 bool chunk_was_added = false;
427 if(d_chunks.empty()){
428 d_chunks.push_back(candidate_chunk);
429 d_offset = candidate_chunk->get_offset();
430 d_size = candidate_chunk->get_size();
431 // When get_uses_fill_value() is true, returns a shared_ptr<Chunk> initialized to nullptr. jhrg 5/7/22
432 d_uses_fill_value = candidate_chunk->get_uses_fill_value();
433 if (!d_uses_fill_value)
434 d_data_url = candidate_chunk->get_data_url();
435 else
436 d_data_url = nullptr;
437 chunk_was_added = true;
438 }
439 // For now, if a chunk uses fill values, it gets its own SuperChunk. jhrg 5/7/22
440 else if(!candidate_chunk->get_uses_fill_value() && is_contiguous(candidate_chunk)){
441 this->d_chunks.push_back(candidate_chunk);
442 d_size += candidate_chunk->get_size();
443 chunk_was_added = true;
444 }
445 return chunk_was_added;
446}
447
462bool SuperChunk::is_contiguous(const std::shared_ptr<Chunk> candidate_chunk) {
463 // Are the URLs the same?
464 bool contiguous = candidate_chunk->get_data_url()->str() == d_data_url->str();
465 if(contiguous){
466 // If the URLs match then see if the locations are matching
467 contiguous = (d_offset + d_size) == candidate_chunk->get_offset();
468 }
469 return contiguous;
470}
471
480void SuperChunk::map_chunks_to_buffer()
481{
482 unsigned long long bindex = 0;
483 for(const auto &chunk : d_chunks){
484 chunk->set_read_buffer(d_read_buffer + bindex, chunk->get_size(),0, false);
485 bindex += chunk->get_size();
486 if (bindex > d_size) {
487 stringstream msg;
488 msg << "ERROR The computed buffer index, " << bindex << " is larger than expected size of the SuperChunk. ";
489 msg << "d_size: " << d_size;
490 throw BESInternalError(msg.str(), __FILE__, __LINE__);
491 }
492 }
493}
494
499void SuperChunk::read_aggregate_bytes()
500{
501 // Since we already have a good infrastructure for reading Chunks, we just make a big-ol-Chunk to
502 // use for grabbing bytes. Then, once read, we'll use the child Chunks to do the dirty work of inflating
503 // and moving the results into the DmrppCommon object.
504 Chunk chunk(d_data_url, "NOT_USED", d_size, d_offset);
505
506 chunk.set_read_buffer(d_read_buffer, d_size,0,false);
507
508 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(&chunk);
509 if (!handle)
510 throw BESInternalError(prolog + "No more libcurl handles.", __FILE__, __LINE__);
511
512 try {
513 handle->read_data(); // throws if error
514 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
515 }
516 catch(...) {
517 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
518 throw;
519 }
520
521 // If the expected byte count was not read, it's an error.
522 if (d_size != chunk.get_bytes_read()) {
523 ostringstream oss;
524 oss << "Wrong number of bytes read for chunk; read: " << chunk.get_bytes_read() << ", expected: " << d_size;
525 throw BESInternalError(oss.str(), __FILE__, __LINE__);
526 }
527
528 d_is_read = true;
529}
530
538void SuperChunk::read_fill_value_chunk()
539{
540 if (d_chunks.size() != 1)
541 throw BESInternalError("Found a SuperChunk with uses_fill_value true but more than one child chunk.", __FILE__, __LINE__);
542
543 d_chunks.front()->read_chunk();
544}
545
550 // TODO I think this code should set d_is_read. It sets it for the Chunk, which may be redundant). jhrg 5/9/22
551 if (d_is_read) {
552 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "SuperChunk (" << (void **) this << ") has already been read! Returning." << endl);
553 return;
554 }
555
556 // TODO Move this into read_aggregate_bytes(), move map_chunks_to_buffer()
557 // after read_aggregate_bytes() and modify map_chunks_to_buffer() to set
558 // the chunk size and read state so the last for loop can be removed.
559 // jhrg 5/6/22
560 if (!d_read_buffer) {
561 // Allocate memory for SuperChunk receive buffer.
562 // release memory in destructor.
563 d_read_buffer = new char[d_size];
564 }
565
566 // Massage the chunks so that their read/receive/intern data buffer
567 // points to the correct section of the d_read_buffer memory.
568 // "Slice it up!"
569 map_chunks_to_buffer();
570
571 // Read the bytes from the target URL. (pthreads, maybe depends on size...)
572 // Use one (or possibly more) thread(s) depending on d_size
573 // and utilize our friend cURL to stuff the bytes into d_read_buffer
574 //
575 // TODO Replace or improve this way of handling fill value chunks. jhrg 5/7/22
576 if (d_uses_fill_value)
577 read_fill_value_chunk();
578 else
579 read_aggregate_bytes();
580
581 // TODO Check if Chunk::read() sets these. jhrg 5/9/22
582 // Set each Chunk's read state to true.
583 // Set each chunks byte count to the expected
584 // size for the chunk - because upstream events
585 // have assured this to be true.
586 for(const auto& chunk : d_chunks){
587 chunk->set_is_read(true);
588 chunk->set_bytes_read(chunk->get_size());
589 }
590}
591
597 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
599
600 vector<unsigned long long> constrained_array_shape = d_parent_array->get_shape(true);
601 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ? "true" : "false") << endl);
602 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
603
604 if (!DmrppRequestHandler::d_use_compute_threads) {
605#if DMRPP_ENABLE_THREAD_TIMERS
606 BESStopWatch sw(SUPER_CHUNK_MODULE);
607 sw.start(prolog+"Serial Chunk Processing. id: " + d_id);
608#endif
609 for(const auto &chunk :get_chunks()){
610 process_one_chunk(chunk,d_parent_array,constrained_array_shape);
611 }
612 }
613 else {
614#if DMRPP_ENABLE_THREAD_TIMERS
615 stringstream timer_name;
616 timer_name << prolog << "Concurrent Chunk Processing. id: " << d_id;
617 BESStopWatch sw(SUPER_CHUNK_MODULE);
618 sw.start(timer_name.str());
619#endif
620 queue<shared_ptr<Chunk>> chunks_to_process;
621 for(const auto &chunk:get_chunks())
622 chunks_to_process.push(chunk);
623
624 process_chunks_concurrent(d_id, chunks_to_process, d_parent_array, constrained_array_shape);
625 }
626 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
627}
628
629
635
636 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
638
639 // The size in element of each of the array's dimensions
640 const vector<unsigned long long> array_shape = d_parent_array->get_shape(true);
641 // The size, in elements, of each of the chunk's dimensions
642 const vector<unsigned long long> chunk_shape = d_parent_array->get_chunk_dimension_sizes();
643
644 if(!DmrppRequestHandler::d_use_compute_threads){
645#if DMRPP_ENABLE_THREAD_TIMERS
646 BESStopWatch sw(SUPER_CHUNK_MODULE);
647 sw.start(prolog + "Serial Chunk Processing. sc_id: " + d_id );
648#endif
649 for(auto &chunk :get_chunks()){
650 process_one_chunk_unconstrained(chunk, chunk_shape, d_parent_array, array_shape);
651 }
652 }
653 else {
654#if DMRPP_ENABLE_THREAD_TIMERS
655 stringstream timer_name;
656 timer_name << prolog << "Concurrent Chunk Processing. sc_id: " << d_id;
657 BESStopWatch sw(SUPER_CHUNK_MODULE);
658 sw.start(timer_name.str());
659#endif
660 queue<shared_ptr<Chunk>> chunks_to_process;
661 for (auto &chunk:get_chunks())
662 chunks_to_process.push(chunk);
663
664 process_chunks_unconstrained_concurrent(d_id,chunks_to_process, chunk_shape, d_parent_array, array_shape);
665 }
666
667}
668
669
675string SuperChunk::to_string(bool verbose=false) const {
676 stringstream msg;
677 msg << "[SuperChunk: " << (void **)this;
678 msg << " offset: " << d_offset;
679 msg << " size: " << d_size ;
680 msg << " chunk_count: " << d_chunks.size();
681 //msg << " parent: " << d_parent->name();
682 msg << "]";
683 if (verbose) {
684 msg << endl;
685 for (auto chunk: d_chunks) {
686 msg << chunk->to_string() << endl;
687 }
688 }
689 return msg.str();
690}
691
696void SuperChunk::dump(ostream & strm) const {
697 strm << to_string(false) ;
698}
699
700} // namespace dmrpp
exception thrown if internal error encountered
virtual bool start(std::string name)
Definition: BESStopWatch.cc:67
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
Definition: DmrppArray.cc:596
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
Definition: DmrppCommon.h:195
virtual void retrieve_data()
Cause the SuperChunk and all of it's subordinate Chunks to be read.
Definition: SuperChunk.cc:549
virtual bool add_chunk(std::shared_ptr< Chunk > candidate_chunk)
Attempts to add a new Chunk to this SuperChunk.
Definition: SuperChunk.cc:425
std::string to_string(bool verbose) const
Makes a string representation of the SuperChunk.
Definition: SuperChunk.cc:675
virtual void dump(std::ostream &strm) const
Writes the to_string() output to the stream strm.
Definition: SuperChunk.cc:696
virtual void process_child_chunks()
Reads the SuperChunk, inflates/de-shuffles the subordinate chunks as required and copies the values i...
Definition: SuperChunk.cc:596
virtual void process_child_chunks_unconstrained()
Reads the SuperChunk, inflates/deshuffles the subordinate chunks as required and copies the values in...
Definition: SuperChunk.cc:634