bes Updated for version 3.20.10
ArrayJoinExistingAggregation.cc
1
2// This file is part of the "NcML Module" project, a BES module designed
3// to allow NcML files to be used to be used as a wrapper to add
4// AIS to existing datasets of any format.
5//
6// Copyright (c) 2010 OPeNDAP, Inc.
7// Author: Michael Johnson <m.johnson@opendap.org>
8//
9// For more information, please also see the main website: http://opendap.org/
10//
11// This library is free software; you can redistribute it and/or
12// modify it under the terms of the GNU Lesser General Public
13// License as published by the Free Software Foundation; either
14// version 2.1 of the License, or (at your option) any later version.
15//
16// This library is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19// Lesser General Public License for more details.
20//
21// You should have received a copy of the GNU Lesser General Public
22// License along with this library; if not, write to the Free Software
23// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24//
25// Please see the files COPYING and COPYRIGHT for more information on the GLPL.
26//
27// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
29
30#include "config.h"
31
32#include <sstream>
33
34#include <libdap/Marshaller.h>
35
36#include "BESDebug.h"
37#include "BESStopWatch.h"
38
39#include "ArrayJoinExistingAggregation.h"
40
41#include "AggregationException.h" // agg_util
42#include "AggregationUtil.h" // agg_util
43#include "NCMLDebug.h"
44
45static const string DEBUG_CHANNEL(NCML_MODULE_DBG_CHANNEL_2);
46static const bool PRINT_CONSTRAINTS = false;
47
48// Timeouts are now handled in/by the BES framework in BESInterface.
49// jhrg 12/29/15
50#undef USE_LOCAL_TIMEOUT_SCHEME
51
52namespace agg_util {
53
55 const AMDList& memberDatasets, std::auto_ptr<ArrayGetterInterface>& arrayGetter, const Dimension& joinDim) :
56 ArrayAggregationBase(granuleTemplate, memberDatasets, arrayGetter), _joinDim(joinDim)
57{
58 BESDEBUG_FUNC(DEBUG_CHANNEL, "Making the aggregated outer dimension be: " + joinDim.toString() + "\n");
59
60 // We created the array with the given granule prototype, but the resulting
61 // outer dimension size must be calculated according to the
62 // value in the passed in dimension object.
63 libdap::Array::dimension& rOuterDim = *(dim_begin());
64 NCML_ASSERT_MSG(rOuterDim.name == joinDim.name, "The outer dimension name of this is not the expected "
65 "outer dimension name! Broken precondition: This ctor cannot be called "
66 "without this being true!");
67 rOuterDim.size = joinDim.size;
68 // Force it to recompute constraints since we changed size.
69 reset_constraint();
70
71 ostringstream oss;
73 if (PRINT_CONSTRAINTS) {
74 // constraints as well to ensure reset worked.
76 }
77 BESDEBUG_FUNC(DEBUG_CHANNEL, "Constrained Dims after set are: " + oss.str());
78}
79
81 ArrayAggregationBase(rhs), _joinDim(rhs._joinDim)
82{
83 duplicate(rhs);
84}
85
86/* virtual */
87ArrayJoinExistingAggregation::~ArrayJoinExistingAggregation()
88{
89 cleanup();
90}
91
92ArrayJoinExistingAggregation&
93ArrayJoinExistingAggregation::operator=(const ArrayJoinExistingAggregation& rhs)
94{
95 if (this != &rhs) {
96 cleanup();
97 ArrayAggregationBase::operator=(rhs);
98 duplicate(rhs);
99 }
100 return *this;
101}
102
103/* virtual */
104ArrayJoinExistingAggregation*
106{
107 return new ArrayJoinExistingAggregation(*this);
108}
109
110// Set this to 0 to get the old behavior where the entire response
111// (for this variable) is built in memory and then sent to the client.
112#define PIPELINING 1
113
114/* virtual */
115// begin modifying here for the double buffering
116// see notes about how this was written marked with '***'
117// Following this method is an older version of serialize that
118// provides no new functionality but does get run instead of the
119// more general implementation in libdap::Array.
120bool ArrayJoinExistingAggregation::serialize(libdap::ConstraintEvaluator &eval, libdap::DDS &dds, libdap::Marshaller &m,
121 bool ce_eval)
122{
123 BESStopWatch sw;
124 if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start("ArrayJoinExistingAggregation::serialize", "");
125
126 // *** This serialize() implementation was made by starting with a simple version that
127 // *** tested read_p(), calling read() if needed and tsting send_p() and is_in_selection(),
128 // *** returning true if the data did not need to be sent. I moved that test here.
129
130 // Only continue if we are supposed to serialize this object at all.
131 if (!(send_p() || is_in_selection())) {
132 BESDEBUG_FUNC(DEBUG_CHANNEL, "Object not in output, skipping... name=" << name() << endl);
133 return true;
134 }
135
136 // *** Add status so that we can do our magic _or_ pass off the call to libdap
137 // *** and collect the result either way.
138 bool status = false;
139
140 if (!read_p()) {
141 // *** copy lines from AggregationBase::read() into here in place
142 // *** of the call to read()
143
144 if (PRINT_CONSTRAINTS) {
145 BESDEBUG_FUNC(DEBUG_CHANNEL, "Constraints on this Array are:" << endl);
146 printConstraints(*this);
147 }
148
149 // call subclass impl
151
152 if (PRINT_CONSTRAINTS) {
153 BESDEBUG_FUNC(DEBUG_CHANNEL, "After transfer, constraints on the member template Array are: " << endl);
155 }
156
157 // *** Inserted code from readConstrainedGranuleArraysAndAggregateDataHook here
158
159 // outer one is the first in iteration
160 const Array::dimension& outerDim = *(dim_begin());
161 BESDEBUG("ncml",
162 "Aggregating datasets array with outer dimension constraints: " << " start=" << outerDim.start << " stride=" << outerDim.stride << " stop=" << outerDim.stop << endl);
163
164 try {
165#if PIPELINING
166 // assumes the constraints are already set properly on this
167 m.put_vector_start(length());
168#else
169 reserve_value_capacity();
170#endif
171
172 // Start the iteration state for the granule.
173 const AMDList& datasets = getDatasetList(); // the list
174 NCML_ASSERT(!datasets.empty());
175 int currDatasetIndex = 0; // index into datasets
176 const AggMemberDataset* pCurrDataset = (datasets[currDatasetIndex]).get();
177
178 int outerDimIndexOfCurrDatasetHead = 0;
179 int currDatasetSize = int(pCurrDataset->getCachedDimensionSize(_joinDim.name));
180 bool currDatasetWasRead = false;
181
182 // where in this output array we are writing next
183 unsigned int nextOutputBufferElementIndex = 0;
184
185 // Traverse the outer dimension constraints,
186 // Keeping track of which dataset we need to
187 // be inside for the given values of the constraint.
188 for (int outerDimIndex = outerDim.start; outerDimIndex <= outerDim.stop && outerDimIndex < outerDim.size;
189 outerDimIndex += outerDim.stride) {
190 // Figure out where the given outer index maps into in local granule space
191 int localGranuleIndex = outerDimIndex - outerDimIndexOfCurrDatasetHead;
192
193 // if this is beyond the dataset end, move state to the next dataset
194 // and try again until we're in the proper interval, with proper dataset.
195 while (localGranuleIndex >= currDatasetSize) {
196 localGranuleIndex -= currDatasetSize;
197 outerDimIndexOfCurrDatasetHead += currDatasetSize;
198 ++currDatasetIndex;
199 NCML_ASSERT(currDatasetIndex < int(datasets.size()));
200 pCurrDataset = datasets[currDatasetIndex].get();
201 currDatasetSize = pCurrDataset->getCachedDimensionSize(_joinDim.name);
202 currDatasetWasRead = false;
203
204 BESDEBUG_FUNC(DEBUG_CHANNEL,
205 "The constraint traversal passed a granule boundary " << "on the outer dimension and is stepping forward into " << "granule index=" << currDatasetIndex << endl);
206 }
207
208 // If we haven't read in this granule yet (we passed a boundary)
209 // then do it now. Map constraints into the local granule space.
210 if (!currDatasetWasRead) {
211 BESDEBUG_FUNC(DEBUG_CHANNEL,
212 " Current granule dataset was traversed but not yet " "read and copied into output. Mapping constraints " "and calling read()..." << endl);
213
214 // Set up a constraint object for the actual granule read
215 // so that it only loads the data values in which we are
216 // interested.
217 Array& granuleConstraintTemplate = getGranuleTemplateArray();
218
219 // The inner dim constraints were set up in the containing read() call.
220 // The outer dim was left open for us to fix now...
221 Array::Dim_iter outerDimIt = granuleConstraintTemplate.dim_begin();
222
223 // modify the outerdim size to match the dataset we need to
224 // load. The inners MUST match so we can let those get
225 //checked later...
226 outerDimIt->size = currDatasetSize;
227 outerDimIt->c_size = currDatasetSize; // this will get recalc below?
228
229 // find the mapped endpoint
230 // Basically, the fullspace endpoint mapped to local offset,
231 // clamped into the local granule size.
232 int granuleStopIndex = std::min(outerDim.stop - outerDimIndexOfCurrDatasetHead,
233 currDatasetSize - 1);
234
235 // we must clamp the stride to the interval of the
236 // dataset in order to avoid an exception in
237 // add_constraint on stride being larger than dataset.
238 int clampedStride = std::min(outerDim.stride, currDatasetSize);
239 // mapped endpoint clamped within this granule
240 granuleConstraintTemplate.add_constraint(outerDimIt, localGranuleIndex, clampedStride,
241 granuleStopIndex);
242#if USE_LOCAL_TIMEOUT_SCHEME
243 dds.timeout_on();
244#endif
245#if 0
246 // Do the constrained read and copy it into this output buffer
247 agg_util::AggregationUtil::addDatasetArrayDataToAggregationOutputArray(*this,// into the output buffer of this object
248 nextOutputBufferElementIndex,// into the next open slice
249 getGranuleTemplateArray(),// constraints we just setup
250 name(),// aggvar name
251 const_cast<AggMemberDataset&>(*pCurrDataset),// Dataset who's DDS should be searched
252 getArrayGetterInterface(), DEBUG_CHANNEL);
253#endif
254
256 getGranuleTemplateArray(), name(), const_cast<AggMemberDataset&>(*pCurrDataset),
257 getArrayGetterInterface(), DEBUG_CHANNEL);
258#if USE_LOCAL_TIMEOUT_SCHEME
259 dds.timeout_off();
260#endif
261
262#if PIPELINING
263 m.put_vector_part(pDatasetArray->get_buf(), getGranuleTemplateArray().length(), var()->width(),
264 var()->type());
265#else
266 this->set_value_slice_from_row_major_vector(*pDatasetArray, nextOutputBufferElementIndex);
267#endif
268
269 pDatasetArray->clear_local_data();
270
271 // Jump output buffer index forward by the amount we added.
272 nextOutputBufferElementIndex += getGranuleTemplateArray().length();
273 currDatasetWasRead = true;
274
275 BESDEBUG_FUNC(DEBUG_CHANNEL,
276 " The granule index " << currDatasetIndex << " was read with constraints and copied into the aggregation output." << endl);
277 } // !currDatasetWasRead
278 } // for loop over outerDim
279 } // end of try
280 catch (AggregationException& ex) {
281 THROW_NCML_PARSE_ERROR(-1, ex.what());
282 }
283
284 // *** end of code inserted from readConstrainedGranuleArraysAndAggregateDataHook
285
286#if PIPELINING
287 m.put_vector_end();
288 status = true;
289#else
290 set_read_p(true);
291 status = libdap::Array::serialize(eval, dds, m, ce_eval);
292#endif
293 }
294 else {
295 status = libdap::Array::serialize(eval, dds, m, ce_eval);
296 }
297
298 return status;
299}
300
302// Private Impl Below
303
304void ArrayJoinExistingAggregation::duplicate(const ArrayJoinExistingAggregation& rhs)
305{
306 _joinDim = rhs._joinDim;
307}
308
309void ArrayJoinExistingAggregation::cleanup() throw ()
310{
311}
312
313/* virtual */
315{
316 // transfer the constraints from this object into the subArray template
317 // skipping our first dim which is the join dim we need to do specially every
318 // granule in the read hook.
320 *this, // from this
321 true, // skip first dim in the copy since we handle it special
322 true, // also skip it in the toArray for the same reason.
323 true, // print debug
324 DEBUG_CHANNEL); // on this channel
325}
326
328{
329 BESStopWatch sw;
330 if (BESDebug::IsSet(TIMING_LOG_KEY))
331 sw.start("ArrayJoinExistingAggregation::readConstrainedGranuleArraysAndAggregateDataHook", "");
332
333 // outer one is the first in iteration
334 const Array::dimension& outerDim = *(dim_begin());
335 BESDEBUG("ncml",
336 "Aggregating datasets array with outer dimension constraints: " << " start=" << outerDim.start << " stride=" << outerDim.stride << " stop=" << outerDim.stop << endl);
337
338 try {
339 // assumes the constraints are already set properly on this
340 reserve_value_capacity();
341
342 // Start the iteration state for the granule.
343 const AMDList& datasets = getDatasetList(); // the list
344 NCML_ASSERT(!datasets.empty());
345 int currDatasetIndex = 0; // index into datasets
346 const AggMemberDataset* pCurrDataset = (datasets[currDatasetIndex]).get();
347
348 int outerDimIndexOfCurrDatasetHead = 0;
349 int currDatasetSize = int(pCurrDataset->getCachedDimensionSize(_joinDim.name));
350 bool currDatasetWasRead = false;
351
352 // where in this output array we are writing next
353 unsigned int nextOutputBufferElementIndex = 0;
354
355 // Traverse the outer dimension constraints,
356 // Keeping track of which dataset we need to
357 // be inside for the given values of the constraint.
358 for (int outerDimIndex = outerDim.start; outerDimIndex <= outerDim.stop && outerDimIndex < outerDim.size;
359 outerDimIndex += outerDim.stride) {
360 // Figure out where the given outer index maps into in local granule space
361 int localGranuleIndex = outerDimIndex - outerDimIndexOfCurrDatasetHead;
362
363 // if this is beyond the dataset end, move state to the next dataset
364 // and try again until we're in the proper interval, with proper dataset.
365 while (localGranuleIndex >= currDatasetSize) {
366 localGranuleIndex -= currDatasetSize;
367 outerDimIndexOfCurrDatasetHead += currDatasetSize;
368 ++currDatasetIndex;
369 NCML_ASSERT(currDatasetIndex < int(datasets.size()));
370 pCurrDataset = datasets[currDatasetIndex].get();
371 currDatasetSize = pCurrDataset->getCachedDimensionSize(_joinDim.name);
372 currDatasetWasRead = false;
373
374 BESDEBUG_FUNC(DEBUG_CHANNEL,
375 "The constraint traversal passed a granule boundary " << "on the outer dimension and is stepping forward into " << "granule index=" << currDatasetIndex << endl);
376 }
377
378 // If we haven't read in this granule yet (we passed a boundary)
379 // then do it now. Map constraints into the local granule space.
380 if (!currDatasetWasRead) {
381 BESDEBUG_FUNC(DEBUG_CHANNEL,
382 " Current granule dataset was traversed but not yet " "read and copied into output. Mapping constraints " "and calling read()..." << endl);
383
384 // Set up a constraint object for the actual granule read
385 // so that it only loads the data values in which we are
386 // interested.
387 Array& granuleConstraintTemplate = getGranuleTemplateArray();
388
389 // The inner dim constraints were set up in the containing read() call.
390 // The outer dim was left open for us to fix now...
391 Array::Dim_iter outerDimIt = granuleConstraintTemplate.dim_begin();
392
393 // modify the outerdim size to match the dataset we need to
394 // load. The inners MUST match so we can let those get
395 //checked later...
396 outerDimIt->size = currDatasetSize;
397 outerDimIt->c_size = currDatasetSize; // this will get recalc below?
398
399 // find the mapped endpoint
400 // Basically, the fullspace endpoint mapped to local offset,
401 // clamped into the local granule size.
402 int granuleStopIndex = std::min(outerDim.stop - outerDimIndexOfCurrDatasetHead, currDatasetSize - 1);
403
404 // we must clamp the stride to the interval of the
405 // dataset in order to avoid an exception in
406 // add_constraint on stride being larger than dataset.
407 int clampedStride = std::min(outerDim.stride, currDatasetSize);
408 // mapped endpoint clamped within this granule
409 granuleConstraintTemplate.add_constraint(outerDimIt, localGranuleIndex, clampedStride, granuleStopIndex);
410
411 // Do the constrained read and copy it into this output buffer
412 agg_util::AggregationUtil::addDatasetArrayDataToAggregationOutputArray(*this, // into the output buffer of this object
413 nextOutputBufferElementIndex, // into the next open slice
414 getGranuleTemplateArray(), // constraints we just setup
415 name(), // aggvar name
416 const_cast<AggMemberDataset&>(*pCurrDataset), // Dataset who's DDS should be searched
417 getArrayGetterInterface(), DEBUG_CHANNEL);
418
419 // Jump output buffer index forward by the amount we added.
420 nextOutputBufferElementIndex += getGranuleTemplateArray().length();
421 currDatasetWasRead = true;
422
423 BESDEBUG_FUNC(DEBUG_CHANNEL,
424 " The granule index " << currDatasetIndex << " was read with constraints and copied into the aggregation output." << endl);
425 } // !currDatasetWasRead
426 } // for loop over outerDim
427 } // try
428
429 catch (AggregationException& ex) {
430 THROW_NCML_PARSE_ERROR(-1, ex.what());
431 }
432
433}
434
435} // namespace agg_util
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition: BESDebug.h:168
virtual bool start(std::string name)
Definition: BESStopWatch.cc:67
virtual unsigned int getCachedDimensionSize(const std::string &dimName) const =0
static void addDatasetArrayDataToAggregationOutputArray(libdap::Array &oOutputArray, unsigned int atIndex, const libdap::Array &constrainedTemplateArray, const string &varName, AggMemberDataset &dataset, const ArrayGetterInterface &arrayGetter, const string &debugChannel)
static void transferArrayConstraints(libdap::Array *pToArray, const libdap::Array &fromArray, bool skipFirstFromDim, bool skipFirstToDim, bool printDebug=false, const std::string &debugChannel="agg_util")
static void printDimensions(std::ostream &os, const libdap::Array &fromArray)
static libdap::Array * readDatasetArrayDataForAggregation(const libdap::Array &constrainedTemplateArray, const std::string &varName, AggMemberDataset &dataset, const ArrayGetterInterface &arrayGetter, const std::string &debugChannel)
static void printConstraints(std::ostream &os, const libdap::Array &fromArray)
const AMDList & getDatasetList() const
void printConstraints(const Array &fromArray)
const ArrayGetterInterface & getArrayGetterInterface() const
ArrayJoinExistingAggregation(const libdap::Array &granuleTemplate, const AMDList &memberDatasets, std::auto_ptr< ArrayGetterInterface > &arrayGetter, const Dimension &joinDim)
virtual ArrayJoinExistingAggregation * ptr_duplicate()
Helper class for temporarily hijacking an existing dhi to load a DDX response for one particular file...
std::string toString() const
Definition: Dimension.cc:55