30#include "BESInternalError.h"
33#include "DmrppRequestHandler.h"
34#include "CurlHandlePool.h"
35#include "DmrppArray.h"
36#include "DmrppNames.h"
38#include "SuperChunk.h"
40#define prolog std::string("SuperChunk::").append(__func__).append("() - ")
42#define SUPER_CHUNK_MODULE "dmrpp:3"
44using std::stringstream;
51std::mutex chunk_processing_thread_pool_mtx;
52atomic_uint chunk_processing_thread_counter(0);
53#define COMPUTE_THREADS "compute_threads"
55#define DMRPP_ENABLE_THREAD_TIMERS 0
75void process_one_chunk(shared_ptr<Chunk> chunk, DmrppArray *array,
const vector<unsigned long long> &constrained_array_shape)
77 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl );
86 if (!chunk->get_uses_fill_value() && !array->is_filters_empty())
87 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->var()->width());
89 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
90 vector<unsigned long long> chunk_source_address(array->dimensions(), 0);
92 array->insert_chunk(0, &target_element_address, &chunk_source_address,
93 chunk, constrained_array_shape);
96 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"END" << endl );
118void process_one_chunk_unconstrained(shared_ptr<Chunk> chunk,
const vector<unsigned long long> &chunk_shape,
119 DmrppArray *array,
const vector<unsigned long long> &array_shape)
121 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl );
126 if (!chunk->get_uses_fill_value() && !array->is_filters_empty())
127 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->var()->width());
129 array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
132 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"END" << endl );
141bool one_chunk_compute_thread(unique_ptr<one_chunk_args> args)
143#if DMRPP_ENABLE_THREAD_TIMERS
144 stringstream timer_tag;
145 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
146 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" parent_sc: " << args->parent_super_chunk_id;
148 sw.start(timer_tag.str());
151 process_one_chunk(args->chunk, args->array, args->array_shape);
160bool one_chunk_unconstrained_compute_thread(unique_ptr<one_chunk_unconstrained_args> args)
162#if DMRPP_ENABLE_THREAD_TIMERS
163 stringstream timer_tag;
164 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
165 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" parent_sc: " << args->parent_super_chunk_id ;
167 sw.start(timer_tag.str());
170 process_one_chunk_unconstrained(args->chunk, args->chunk_shape, args->array, args->array_shape);
184bool start_one_chunk_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_args> args) {
186 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
187 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads <<
" chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
188 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
189 chunk_processing_thread_counter++;
190 futures.push_back(std::async(std::launch::async, one_chunk_compute_thread, std::move(args)));
192 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Got std::future '" << futures.size() <<
193 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
208bool start_one_chunk_unconstrained_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_unconstrained_args> args) {
210 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
211 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
212 futures.push_back(std::async(std::launch::async, one_chunk_unconstrained_compute_thread, std::move(args)));
213 chunk_processing_thread_counter++;
215 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Got std::future '" << futures.size() <<
216 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
242void process_chunks_concurrent(
243 const string &super_chunk_id,
244 queue<shared_ptr<Chunk>> &chunks,
246 const vector<unsigned long long> &constrained_array_shape ){
249 list<future<bool>> futures;
252 bool future_finished =
true;
256 future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
260 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
262 if (!chunks.empty()){
264 bool thread_started =
true;
265 while(thread_started && !chunks.empty()) {
266 auto chunk = chunks.front();
267 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Starting thread for " << chunk->to_string() << endl);
269 auto args = unique_ptr<one_chunk_args>(
new one_chunk_args(super_chunk_id, chunk, array, constrained_array_shape));
270 thread_started = start_one_chunk_compute_thread(futures, std::move(args));
272 if (thread_started) {
274 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"STARTED thread for " << chunk->to_string() << endl);
277 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Thread not started. args deleted, Chunk remains in queue.) " <<
278 "chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
" futures.size(): " << futures.size() << endl);
287 future_finished =
false;
292 while(!futures.empty()){
293 if(futures.back().valid())
294 futures.back().get();
324void process_chunks_unconstrained_concurrent(
325 const string &super_chunk_id,
326 queue<shared_ptr<Chunk>> &chunks,
327 const vector<unsigned long long> &chunk_shape,
329 const vector<unsigned long long> &array_shape){
332 list<future<bool>> futures;
335 bool future_finished =
true;
339 future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
343 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
345 if (!chunks.empty()){
347 bool thread_started =
true;
348 while(thread_started && !chunks.empty()) {
349 auto chunk = chunks.front();
350 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Starting thread for " << chunk->to_string() << endl);
352 auto args = unique_ptr<one_chunk_unconstrained_args>(
353 new one_chunk_unconstrained_args(super_chunk_id, chunk, array, array_shape, chunk_shape) );
354 thread_started = start_one_chunk_unconstrained_compute_thread(futures, std::move(args));
356 if (thread_started) {
358 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"STARTED thread for " << chunk->to_string() << endl);
361 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
362 " chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
363 " futures.size(): " << futures.size() << endl);
372 future_finished =
false;
377 while(!futures.empty()){
378 if(futures.back().valid())
379 futures.back().get();
426 bool chunk_was_added =
false;
427 if(d_chunks.empty()){
428 d_chunks.push_back(candidate_chunk);
429 d_offset = candidate_chunk->get_offset();
430 d_size = candidate_chunk->get_size();
432 d_uses_fill_value = candidate_chunk->get_uses_fill_value();
433 if (!d_uses_fill_value)
434 d_data_url = candidate_chunk->get_data_url();
436 d_data_url =
nullptr;
437 chunk_was_added =
true;
440 else if(!candidate_chunk->get_uses_fill_value() && is_contiguous(candidate_chunk)){
441 this->d_chunks.push_back(candidate_chunk);
442 d_size += candidate_chunk->get_size();
443 chunk_was_added =
true;
445 return chunk_was_added;
462bool SuperChunk::is_contiguous(
const std::shared_ptr<Chunk> candidate_chunk) {
464 bool contiguous = candidate_chunk->get_data_url()->str() == d_data_url->str();
467 contiguous = (d_offset + d_size) == candidate_chunk->get_offset();
480void SuperChunk::map_chunks_to_buffer()
482 unsigned long long bindex = 0;
483 for(
const auto &chunk : d_chunks){
484 chunk->set_read_buffer(d_read_buffer + bindex, chunk->get_size(),0,
false);
485 bindex += chunk->get_size();
486 if (bindex > d_size) {
488 msg <<
"ERROR The computed buffer index, " << bindex <<
" is larger than expected size of the SuperChunk. ";
489 msg <<
"d_size: " << d_size;
499void SuperChunk::read_aggregate_bytes()
504 Chunk chunk(d_data_url,
"NOT_USED", d_size, d_offset);
506 chunk.set_read_buffer(d_read_buffer, d_size,0,
false);
508 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(&chunk);
510 throw BESInternalError(prolog +
"No more libcurl handles.", __FILE__, __LINE__);
514 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
517 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
522 if (d_size != chunk.get_bytes_read()) {
524 oss <<
"Wrong number of bytes read for chunk; read: " << chunk.get_bytes_read() <<
", expected: " << d_size;
538void SuperChunk::read_fill_value_chunk()
540 if (d_chunks.size() != 1)
541 throw BESInternalError(
"Found a SuperChunk with uses_fill_value true but more than one child chunk.", __FILE__, __LINE__);
543 d_chunks.front()->read_chunk();
552 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"SuperChunk (" << (
void **)
this <<
") has already been read! Returning." << endl);
560 if (!d_read_buffer) {
563 d_read_buffer =
new char[d_size];
569 map_chunks_to_buffer();
576 if (d_uses_fill_value)
577 read_fill_value_chunk();
579 read_aggregate_bytes();
586 for(
const auto& chunk : d_chunks){
587 chunk->set_is_read(
true);
588 chunk->set_bytes_read(chunk->get_size());
597 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl );
600 vector<unsigned long long> constrained_array_shape = d_parent_array->
get_shape(
true);
601 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ?
"true" :
"false") << endl);
602 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
604 if (!DmrppRequestHandler::d_use_compute_threads) {
605#if DMRPP_ENABLE_THREAD_TIMERS
607 sw.
start(prolog+
"Serial Chunk Processing. id: " + d_id);
609 for(
const auto &chunk :get_chunks()){
610 process_one_chunk(chunk,d_parent_array,constrained_array_shape);
614#if DMRPP_ENABLE_THREAD_TIMERS
615 stringstream timer_name;
616 timer_name << prolog <<
"Concurrent Chunk Processing. id: " << d_id;
618 sw.
start(timer_name.str());
620 queue<shared_ptr<Chunk>> chunks_to_process;
621 for(
const auto &chunk:get_chunks())
622 chunks_to_process.push(chunk);
624 process_chunks_concurrent(d_id, chunks_to_process, d_parent_array, constrained_array_shape);
626 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"END" << endl );
636 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl );
640 const vector<unsigned long long> array_shape = d_parent_array->
get_shape(
true);
644 if(!DmrppRequestHandler::d_use_compute_threads){
645#if DMRPP_ENABLE_THREAD_TIMERS
647 sw.
start(prolog +
"Serial Chunk Processing. sc_id: " + d_id );
649 for(
auto &chunk :get_chunks()){
650 process_one_chunk_unconstrained(chunk, chunk_shape, d_parent_array, array_shape);
654#if DMRPP_ENABLE_THREAD_TIMERS
655 stringstream timer_name;
656 timer_name << prolog <<
"Concurrent Chunk Processing. sc_id: " << d_id;
658 sw.
start(timer_name.str());
660 queue<shared_ptr<Chunk>> chunks_to_process;
661 for (
auto &chunk:get_chunks())
662 chunks_to_process.push(chunk);
664 process_chunks_unconstrained_concurrent(d_id,chunks_to_process, chunk_shape, d_parent_array, array_shape);
677 msg <<
"[SuperChunk: " << (
void **)
this;
678 msg <<
" offset: " << d_offset;
679 msg <<
" size: " << d_size ;
680 msg <<
" chunk_count: " << d_chunks.size();
685 for (
auto chunk: d_chunks) {
686 msg << chunk->to_string() << endl;
exception thrown if internal error encountered
virtual bool start(std::string name)
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
virtual void retrieve_data()
Cause the SuperChunk and all of it's subordinate Chunks to be read.
virtual bool add_chunk(std::shared_ptr< Chunk > candidate_chunk)
Attempts to add a new Chunk to this SuperChunk.
std::string to_string(bool verbose) const
Makes a string representation of the SuperChunk.
virtual void dump(std::ostream &strm) const
Writes the to_string() output to the stream strm.
virtual void process_child_chunks()
Reads the SuperChunk, inflates/de-shuffles the subordinate chunks as required and copies the values i...
virtual void process_child_chunks_unconstrained()
Reads the SuperChunk, inflates/deshuffles the subordinate chunks as required and copies the values in...