350 {
351 if (action == "Open")
352 {
353 std::string url;
356 uint16_t timeout;
357 std::tie(url, flags, mode, timeout) = GetOpenArgs();
358
359 std::string lmetric;
361 {
362 metric.ios["OpenW::n"]++;
363 }
364 else
365 {
366 metric.ios["OpenR::n"]++;
367 }
368
369 metric.ios["Open::n"]++;
370
371 mytimer_t timer;
372
373 if (!simulate)
375 [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
376 {
377 metric.addIos("Open", "e", HandleStatus(s, orgststr, "Open"));
378 metric.addDelays("Open", "tmeas", timer.elapsed());
379 ending.reset();
380 closing.reset();
381 });
382 else
383 {
384 ending.reset();
385 closing.reset();
386 }
387 }
388 else if (action == "Close")
389 {
390 uint16_t timeout = GetCloseArgs();
391 mytimer_t timer;
392
393 if (closing)
394 {
395 auto& sem = closing->get();
396 closing.reset();
397 sem.Wait();
398 }
399
400 metric.ios["Close::n"]++;
401
402 if (!simulate)
404 [this, orgststr{ orgststr }, ending, timer, &metric](XRootDStatus& s) mutable
405 {
406 metric.addIos("Close", "e", HandleStatus(s, orgststr, "Close"));
407 metric.addDelays("Close", "tmeas", timer.elapsed());
408 ending.reset();
409 });
410 else
411 {
412 ending.reset();
413 }
414 }
415 else if (action == "Stat")
416 {
417 bool force;
418 uint16_t timeout;
419 std::tie(force, timeout) = GetStatArgs();
420 metric.ios["Stat::n"]++;
421 mytimer_t timer;
422
423 if (!simulate)
425 [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s, StatInfo& r) mutable
426 {
427 metric.addIos("Stat", "e", HandleStatus(s, orgststr, "Stat"));
428 metric.addDelays("Stat", "tmeas", timer.elapsed());
429 ending.reset();
430 closing.reset();
431 });
432 else
433 {
434 ending.reset();
435 closing.reset();
436 }
437 }
438 else if (action == "Read")
439 {
440 uint64_t offset;
441 buffer_t buffer;
442 uint16_t timeout;
443 std::tie(offset, buffer, timeout) = GetReadArgs();
444 metric.ios["Read::n"]++;
445 metric.ios["Read::b"] += buffer->size();
446 if ((offset + buffer->size()) > metric.ios["Read::o"])
447 metric.ios["Read::o"] = offset + buffer->size();
448
449 mytimer_t timer;
450 if (!simulate)
451 Async(
Read(file, offset, buffer->size(), buffer->data(), timeout) >>
452 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s,
453 ChunkInfo& r) mutable
454 {
455 metric.addIos("Read", "e", HandleStatus(s, orgststr, "Read"));
456 metric.addDelays("Read", "tmeas", timer.elapsed());
457 buffer.reset();
458 ending.reset();
459 closing.reset();
460 });
461 else
462 {
463 buffer.reset();
464 ending.reset();
465 closing.reset();
466 }
467 }
468 else if (action == "PgRead")
469 {
470 uint64_t offset;
471 buffer_t buffer;
472 uint16_t timeout;
473 std::tie(offset, buffer, timeout) = GetPgReadArgs();
474 metric.ios["PgRead::n"]++;
475 metric.ios["PgRead::b"] += buffer->size();
476 if ((offset + buffer->size()) > metric.ios["Read::o"])
477 metric.ios["Read::o"] = offset + buffer->size();
478 mytimer_t timer;
479 if (!simulate)
480 Async(
PgRead(file, offset, buffer->size(), buffer->data(), timeout) >>
481 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s,
482 PageInfo& r) mutable
483 {
484 metric.addIos("PgRead", "e", HandleStatus(s, orgststr, "PgRead"));
485 metric.addDelays("PgRead", "tmeas", timer.elapsed());
486 buffer.reset();
487 ending.reset();
488 closing.reset();
489 });
490 else
491 {
492 buffer.reset();
493 ending.reset();
494 closing.reset();
495 }
496 }
497 else if (action == "Write")
498 {
499 uint64_t offset;
500 buffer_t buffer;
501 uint16_t timeout;
502 std::tie(offset, buffer, timeout) = GetWriteArgs();
503 metric.ios["Write::n"]++;
504 metric.ios["Write::b"] += buffer->size();
505 if ((offset + buffer->size()) > metric.ios["Write::o"])
506 metric.ios["Write::o"] = offset + buffer->size();
507 mytimer_t timer;
508
509 if (!simulate)
511 Write(file, offset, buffer->size(), buffer->data(), timeout) >>
512 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
513 {
514 metric.addIos("Write", "e", HandleStatus(s, orgststr, "Write"));
515 metric.addDelays("Write", "tmeas", timer.elapsed());
516 buffer.reset();
517 ending.reset();
518 closing.reset();
519 });
520 else
521 {
522 buffer.reset();
523 ending.reset();
524 closing.reset();
525 }
526 }
527 else if (action == "PgWrite")
528 {
529 uint64_t offset;
530 buffer_t buffer;
531 uint16_t timeout;
532 std::tie(offset, buffer, timeout) = GetPgWriteArgs();
533 metric.ios["PgWrite::n"]++;
534 metric.ios["PgWrite::b"] += buffer->size();
535 if ((offset + buffer->size()) > metric.ios["Write::o"])
536 metric.ios["Write::o"] = offset + buffer->size();
537 mytimer_t timer;
538 if (!simulate)
540 PgWrite(file, offset, buffer->size(), buffer->data(), timeout) >>
541 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
542 {
543 metric.addIos("PgWrite", "e", HandleStatus(s, orgststr, "PgWrite"));
544 metric.addDelays("PgWrite", "tmeas", timer.elapsed());
545 buffer.reset();
546 ending.reset();
547 closing.reset();
548 });
549 else
550 {
551 buffer.reset();
552 ending.reset();
553 closing.reset();
554 }
555 }
556 else if (action == "Sync")
557 {
558 uint16_t timeout = GetSyncArgs();
559 metric.ios["Sync::n"]++;
560 mytimer_t timer;
561 if (!simulate)
563 [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
564 {
565 metric.addIos("Sync", "e", HandleStatus(s, orgststr, "Sync"));
566 metric.addDelays("Sync", "tmeas", timer.elapsed());
567 ending.reset();
568 closing.reset();
569 });
570 else
571 {
572 ending.reset();
573 closing.reset();
574 }
575 }
576 else if (action == "Truncate")
577 {
578 uint64_t size;
579 uint16_t timeout;
580 std::tie(size, timeout) = GetTruncateArgs();
581 metric.ios["Truncate::n"]++;
582 if (size > metric.ios["Truncate::o"])
583 metric.ios["Truncate::o"] = size;
584
585 mytimer_t timer;
586 if (!simulate)
588 [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
589 {
590 metric.addIos("Truncate", "e", HandleStatus(s, orgststr, "Truncate"));
591 metric.addDelays("Truncate", "tmeas", timer.elapsed());
592 ending.reset();
593 closing.reset();
594 });
595 else
596 {
597 ending.reset();
598 closing.reset();
599 }
600 }
601 else if (action == "VectorRead")
602 {
604 uint16_t timeout;
605 std::vector<buffer_t> buffers;
606 std::tie(chunks, timeout, buffers) = GetVectorReadArgs();
607 metric.ios["VectorRead::n"]++;
608 for (auto& ch : chunks)
609 {
610 metric.ios["VectorRead::b"] += ch.GetLength();
611 if ((ch.GetOffset() + ch.GetLength()) > metric.ios["Read::o"])
612 metric.ios["Read::o"] = ch.GetOffset() + ch.GetLength();
613 }
614
615 mytimer_t timer;
616 if (!simulate)
619 [this, orgststr{ orgststr }, buffers, ending, closing, timer, &metric](XRootDStatus& s, VectorReadInfo& r) mutable
620 {
621 metric.addIos("VectorRead", "e", HandleStatus(s, orgststr, "VectorRead"));
622 metric.addDelays("VectorRead", "tmeas", timer.elapsed());
623 buffers.clear();
624 ending.reset();
625 closing.reset();
626 });
627 else
628 {
629 buffers.clear();
630 ending.reset();
631 closing.reset();
632 }
633 }
634 else if (action == "VectorWrite")
635 {
637 uint16_t timeout;
638 std::vector<buffer_t> buffers;
639 std::tie(chunks, timeout, buffers) = GetVectorWriteArgs();
640 metric.ios["VectorWrite::n"]++;
641 for (auto& ch : chunks)
642 {
643 metric.ios["VectorWrite::b"] += ch.GetLength();
644 if ((ch.GetOffset() + ch.GetLength()) > metric.ios["Write::o"])
645 metric.ios["Write::o"] = ch.GetOffset() + ch.GetLength();
646 }
647 mytimer_t timer;
648 if (!simulate)
650 [this, orgststr{ orgststr }, buffers, ending, closing, timer, &metric](XRootDStatus& s) mutable
651 {
652 metric.addIos("VectorWrite", "e", HandleStatus(s, orgststr, "VectorWrite"));
653 metric.addDelays("VectorWrite", "tmeas", timer.elapsed());
654 buffers.clear();
655 ending.reset();
656 closing.reset();
657 });
658 else
659 {
660 buffers.clear();
661 ending.reset();
662 closing.reset();
663 }
664 }
665 else
666 {
668 }
669 }
static Log * GetLog()
Get default log.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
VectorWriteImpl< false > VectorWrite(Ctx< File > file, Arg< ChunkList > chunks, uint16_t timeout=0)
Factory for creating VectorWriteImpl objects.
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
TruncateImpl< false > Truncate(Ctx< File > file, Arg< uint64_t > size, uint16_t timeout)
PgReadImpl< false > PgRead(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
PgWriteImpl< false > PgWrite(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, Arg< std::vector< uint32_t > > cksums, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
std::vector< ChunkInfo > ChunkList
List of chunks.
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
Flags
Open flags, may be or'd when appropriate.
@ Write
Open only for writing.
@ Update
Open for reading and writing.