43const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
49const char *File::m_traceID =
"File";
53File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
57 m_cfi(
Cache::GetInstance().GetTrace(),
Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
65 m_detach_time_logged(false),
71 m_prefetch_state(kOff),
73 m_prefetch_read_cnt(0),
74 m_prefetch_hit_cnt(0),
101 m_info_file->Close();
103 m_info_file =
nullptr;
109 m_data_file->Close();
111 m_data_file =
nullptr;
114 if (m_resmon_token >= 0)
119 if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
122 if (sr == 0 && s.st_blocks != m_st_blocks) {
125 m_st_blocks = s.st_blocks;
133 TRACEF(
Debug,
"Close() finished, prefetch score = " << m_prefetch_score);
138File*
File::FileOpen(
const std::string &path,
long long offset,
long long fileSize)
140 File *file =
new File(path, offset, fileSize);
166 m_in_shutdown =
true;
168 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
170 m_prefetch_state = kStopped;
171 cache()->DeRegisterPrefetchFile(
this);
174 report_and_merge_delta_stats();
181void File::check_delta_stats()
186 report_and_merge_delta_stats();
189void File::report_and_merge_delta_stats()
193 m_data_file->
Fstat(&s);
196 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
198 long long st_blocks_to_report = std::min((
long long) s.st_blocks, max_st_blocks_to_report);
200 m_st_blocks = st_blocks_to_report;
202 m_stats.
AddUp(m_delta_stats);
203 m_delta_stats.
Reset();
210 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
218 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
222 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
234 insert_remote_location(loc);
250 IoSet_i mi = m_io_set.find(io);
252 if (mi != m_io_set.end())
257 ", active_reads " << n_active_reads <<
258 ", active_prefetches " << io->m_active_prefetches <<
259 ", allow_prefetching " << io->m_allow_prefetching <<
260 ", ios_in_detach " << m_ios_in_detach);
262 "\tio_map.size() " << m_io_set.size() <<
263 ", block_map.size() " << m_block_map.size() <<
", file");
265 insert_remote_location(loc);
267 io->m_allow_prefetching =
false;
268 io->m_in_detach =
true;
271 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
273 if ( ! select_current_io_or_disable_prefetching(
false) )
275 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
282 bool io_active_result;
284 if (n_active_reads > 0)
286 io_active_result =
true;
288 else if (m_io_set.size() - m_ios_in_detach == 1)
290 io_active_result = ! m_block_map.empty();
294 io_active_result = io->m_active_prefetches > 0;
297 if ( ! io_active_result)
302 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
304 return io_active_result;
308 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
319 m_detach_time_logged =
false;
328 if ( ! m_in_shutdown)
330 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
332 report_and_merge_delta_stats();
333 m_cfi.WriteIOStatDetach(m_stats);
334 m_detach_time_logged =
true;
336 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
340 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
352 time_t now = time(0);
357 IoSet_i mi = m_io_set.find(io);
359 if (mi == m_io_set.end())
362 io->m_attach_time = now;
363 m_delta_stats.IoAttach();
365 insert_remote_location(loc);
367 if (m_prefetch_state == kStopped)
369 m_prefetch_state = kOn;
370 cache()->RegisterPrefetchFile(
this);
375 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
378 m_state_cond.UnLock();
389 time_t now = time(0);
393 IoSet_i mi = m_io_set.find(io);
395 if (mi != m_io_set.end())
397 if (mi == m_current_io)
402 m_delta_stats.IoDetach(now - io->m_attach_time);
406 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
408 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
409 m_prefetch_state = kStopped;
410 cache()->DeRegisterPrefetchFile(
this);
415 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
418 m_state_cond.UnLock();
427 static const char *tpfx =
"Open() ";
429 TRACEF(Dump, tpfx <<
"entered");
440 struct stat data_stat, info_stat;
444 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
445 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
448 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
449 myEnv.
Put(
"oss.asize", size_str);
461 m_data_file = myOss.
newFile(myUser);
462 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
466 delete m_data_file; m_data_file = 0;
470 myEnv.
Put(
"oss.asize",
"64k");
476 m_data_file->Close();
delete m_data_file; m_data_file = 0;
480 m_info_file = myOss.
newFile(myUser);
481 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
485 delete m_info_file; m_info_file = 0;
486 m_data_file->Close();
delete m_data_file; m_data_file = 0;
490 bool initialize_info_file =
true;
492 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
494 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
495 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
496 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
")");
499 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
501 initialize_info_file =
false;
503 TRACEF(Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
504 m_cfi.ResetAllAccessStats();
505 m_data_file->Ftruncate(0);
510 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.
get_cs_Chk())
515 TRACEF(Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
516 initialize_info_file =
true;
517 m_cfi.ResetAllAccessStats();
518 m_data_file->Ftruncate(0);
526 if (initialize_info_file)
528 m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.
m_bufferSize, m_file_size);
530 m_cfi.ResetNoCkSumTime();
531 m_cfi.Write(m_info_file, ifn.c_str());
532 m_info_file->Fsync();
533 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
534 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.GetNBlocks());
538 if (futimens(m_info_file->getFD(), NULL)) {
543 m_cfi.WriteIOStatAttach();
545 m_block_size = m_cfi.GetBufferSize();
546 m_num_blocks = m_cfi.GetNBlocks();
547 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped;
549 m_data_file->
Fstat(&data_stat);
550 m_st_blocks = data_stat.st_blocks;
553 constexpr long long MB = 1024 * 1024;
554 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
558 m_state_cond.UnLock();
574 if ((res = m_data_file->Fstat(&sbuff)))
return res;
576 sbuff.st_size = m_file_size;
578 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
589bool File::overlap(
int blk,
598 const long long beg = blk * blk_size;
599 const long long end = beg + blk_size;
600 const long long req_end = req_off + req_size;
602 if (req_off < end && req_end > beg)
604 const long long ovlp_beg = std::max(beg, req_off);
605 const long long ovlp_end = std::min(end, req_end);
607 off = ovlp_beg - req_off;
608 blk_off = ovlp_beg - beg;
609 size = (int) (ovlp_end - ovlp_beg);
611 assert(size <= blk_size);
622Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
630 const long long off = i * m_block_size;
631 const int last_block = m_num_blocks - 1;
632 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
634 int blk_size, req_size;
635 if (i == last_block) {
636 blk_size = req_size = m_file_size - off;
637 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
639 blk_size = req_size = m_block_size;
643 char *buf = cache()->RequestRAM(req_size);
647 b =
new (std::nothrow) Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
657 m_prefetch_state = kHold;
658 cache()->DeRegisterPrefetchFile(
this);
663 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
670void File::ProcessBlockRequest(
Block *b)
678 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
680 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
696 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
698 ProcessBlockRequest(*bi);
704void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
706 int n_chunks = ioVec.size();
709 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
710 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
720 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
725int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
727 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
729 long long rs = m_data_file->ReadV(ioVec.data(), (
int) ioVec.size());
733 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
737 if (rs != expected_size)
739 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
758 if (m_in_shutdown || io->m_in_detach)
760 m_state_cond.UnLock();
761 return m_in_shutdown ? -ENOENT : -EBADF;
766 if (m_cfi.IsComplete())
768 m_state_cond.UnLock();
769 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
772 m_delta_stats.AddBytesHit(ret);
778 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
780 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
787 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
791 if (m_in_shutdown || io->m_in_detach)
793 m_state_cond.UnLock();
794 return m_in_shutdown ? -ENOENT : -EBADF;
799 if (m_cfi.IsComplete())
801 m_state_cond.UnLock();
802 int ret = m_data_file->ReadV(
const_cast<XrdOucIOVec*
>(readV), readVnum);
805 m_delta_stats.AddBytesHit(ret);
811 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
816int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
828 int prefetch_cnt = 0;
833 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
835 std::vector<XrdOucIOVec> iovec_disk;
836 std::vector<XrdOucIOVec> iovec_direct;
837 int iovec_disk_total = 0;
838 int iovec_direct_total = 0;
840 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
847 const int idx_first = iUserOff / m_block_size;
848 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
850 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
852 enum LastBlock_e { LB_other, LB_disk, LB_direct };
854 LastBlock_e lbe = LB_other;
856 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
859 BlockMap_i bi = m_block_map.find(block_idx);
866 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
869 if (bi != m_block_map.end())
871 inc_ref_count(bi->second);
872 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
874 if (bi->second->is_finished())
878 assert(bi->second->is_ok());
880 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
882 if (bi->second->m_prefetch)
888 read_req =
new ReadRequest(io, rh);
893 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
900 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
902 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
905 iovec_disk.back().size += size;
907 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
908 iovec_disk_total += size;
910 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
919 read_req =
new ReadRequest(io, rh);
922 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
925 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
927 blks_to_request.push_back(b);
929 b->
m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
936 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
938 iovec_direct_total += size;
945 iovec_direct.back().size += size;
947 long long in_offset = block_idx * m_block_size + blk_off;
948 char *out_pos = iUserBuff + off;
955 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
964 inc_prefetch_hit_cnt(prefetch_cnt);
966 m_state_cond.UnLock();
969 if ( ! blks_to_request.empty())
971 ProcessBlockRequests(blks_to_request);
972 blks_to_request.clear();
976 if ( ! iovec_direct.empty())
978 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
980 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
985 long long bytes_read = 0;
989 if ( ! blks_ready.empty())
991 for (
auto &bvi : blks_ready)
993 for (
auto &cr : bvi.second)
995 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
996 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
997 bytes_read += cr.m_size;
1003 if ( ! iovec_disk.empty())
1005 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1006 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
1021 m_state_cond.Lock();
1023 for (
auto &bvi : blks_ready)
1024 dec_ref_count(bvi.first, (
int) bvi.second.size());
1037 m_delta_stats.AddReadStats(read_req->
m_stats);
1038 check_delta_stats();
1039 m_state_cond.UnLock();
1047 m_state_cond.UnLock();
1048 return -EWOULDBLOCK;
1053 m_delta_stats.m_BytesHit += bytes_read;
1054 check_delta_stats();
1055 m_state_cond.UnLock();
1059 return error_cond ? error_cond : bytes_read;
1071 long long offset = b->
m_offset - m_offset;
1075 if (m_cfi.IsCkSumCache())
1079 retval = m_data_file->pgWrite(b->
get_buff(), offset, size, 0, 0);
1081 retval = m_data_file->Write(b->
get_buff(), offset, size);
1086 TRACEF(
Error,
"WriteToDisk() write error " << retval);
1088 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1098 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1101 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1103 bool schedule_sync =
false;
1107 m_cfi.SetBitWritten(blk_idx);
1111 m_cfi.SetBitPrefetch(blk_idx);
1115 m_cfi.ResetCkSumNet();
1124 m_writes_during_sync.push_back(blk_idx);
1128 m_cfi.SetBitSynced(blk_idx);
1129 ++m_non_flushed_cnt;
1130 if ((m_cfi.IsComplete() || m_non_flushed_cnt >=
Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1133 schedule_sync =
true;
1135 m_non_flushed_cnt = 0;
1142 cache()->ScheduleFileSync(
this);
1152 int ret = m_data_file->Fsync();
1153 bool errorp =
false;
1159 report_and_merge_delta_stats();
1160 loc_stats = m_stats;
1162 m_cfi.WriteIOStat(loc_stats);
1163 m_cfi.Write(m_info_file, m_filename.c_str());
1164 int cret = m_info_file->Fsync();
1167 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1173 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1179 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1186 m_writes_during_sync.clear();
1192 int written_while_in_sync;
1193 bool resync =
false;
1196 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1198 m_cfi.SetBitSynced(*i);
1200 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1201 m_writes_during_sync.clear();
1205 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1210 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1221void File::free_block(
Block* b)
1224 int i = b->
m_offset / m_block_size;
1225 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1226 size_t ret = m_block_map.erase(i);
1230 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1240 m_prefetch_state = kOn;
1241 cache()->RegisterPrefetchFile(
this);
1247bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1251 int io_size = (int) m_io_set.size();
1256 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1259 m_current_io = m_io_set.begin();
1262 else if (io_size > 1)
1264 IoSet_i mi = m_current_io;
1265 if (skip_current && mi != m_io_set.end()) ++mi;
1267 for (
int i = 0; i < io_size; ++i)
1269 if (mi == m_io_set.end()) mi = m_io_set.begin();
1271 if ((*mi)->m_allow_prefetching)
1283 m_current_io = m_io_set.end();
1284 m_prefetch_state = kStopped;
1285 cache()->DeRegisterPrefetchFile(
this);
1293void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1299 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1301 m_state_cond.Lock();
1314 m_state_cond.UnLock();
1317 FinalizeReadRequest(rreq);
1344 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1347 m_state_cond.Lock();
1352 rreq->m_stats.m_BytesMissed += creq.
m_size;
1354 rreq->m_stats.m_BytesHit += creq.
m_size;
1356 --rreq->m_n_chunk_reqs;
1359 inc_prefetch_hit_cnt(1);
1363 bool rreq_complete = rreq->is_complete();
1365 m_state_cond.UnLock();
1368 FinalizeReadRequest(rreq);
1376 XrdSysCondVarHelper _lck(m_state_cond);
1377 m_delta_stats.AddReadStats(rreq->
m_stats);
1378 check_delta_stats();
1385void File::ProcessBlockResponse(
Block *b,
int res)
1387 static const char* tpfx =
"ProcessBlockResponse ";
1389 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1391 if (res >= 0 && res != b->
get_size())
1395 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1399 m_state_cond.Lock();
1405 IoSet_i mi = m_io_set.find(io);
1406 if (mi != m_io_set.end())
1408 --io->m_active_prefetches;
1411 if (res < 0 && io->m_allow_prefetching)
1413 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1414 io->m_allow_prefetching =
false;
1417 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1419 if ( ! select_current_io_or_disable_prefetching(
false) )
1421 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1427 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1430 m_state_cond.UnLock();
1444 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1445 if ( ! m_in_shutdown)
1451 cache()->AddWriteTask(b,
true);
1458 m_state_cond.UnLock();
1460 for (
auto &creq : creqs_to_notify)
1462 ProcessBlockSuccess(b, creq);
1471 <<
", io=" << b->
get_io() <<
", error=" << res);
1476 <<
", io=" << b->
get_io() <<
" incomplete, got " << res <<
" expected " << b->
get_size());
1477#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1488 std::list<ReadRequest*> rreqs_to_complete;
1497 ProcessBlockError(b, rreq);
1500 rreqs_to_complete.push_back(rreq);
1505 creqs_to_keep.push_back(creq);
1509 bool reissue =
false;
1510 if ( ! creqs_to_keep.empty())
1512 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1514 TRACEF(
Debug,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1515 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1522 m_state_cond.UnLock();
1524 for (
auto rreq : rreqs_to_complete)
1525 FinalizeReadRequest(rreq);
1528 ProcessBlockRequest(b);
1536 return m_filename.c_str();
1541int File::offsetIdx(
int iIdx)
const
1543 return iIdx - m_offset/m_block_size;
1557 TRACEF(DumpXL,
"Prefetch() entering.");
1561 if (m_prefetch_state != kOn)
1566 if ( ! select_current_io_or_disable_prefetching(
true) )
1568 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1573 for (
int f = 0; f < m_num_blocks; ++f)
1575 if ( ! m_cfi.TestBitWritten(f))
1577 int f_act = f + m_offset / m_block_size;
1579 BlockMap_i bi = m_block_map.find(f_act);
1580 if (bi == m_block_map.end())
1582 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1585 TRACEF(Dump,
"Prefetch take block " << f_act);
1589 inc_prefetch_read_cnt(1);
1594 TRACEF(Warning,
"Prefetch allocation failed for block " << f_act);
1603 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1604 m_prefetch_state = kComplete;
1605 cache()->DeRegisterPrefetchFile(
this);
1609 (*m_current_io)->m_active_prefetches += (int) blks.size();
1613 if ( ! blks.empty())
1615 ProcessBlockRequests(blks);
1624 return m_prefetch_score;
1637void File::insert_remote_location(
const std::string &loc)
1641 size_t p = loc.find_first_of(
'@');
1642 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1649 if ( ! m_remote_locations.empty())
1653 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1657 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1660 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1662 s +=
'"'; s += *i; s +=
'"';
1663 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
const char * XrdSysE2T(int errcode)
virtual int Fstat(struct stat *buf)
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void * get_req_id() const
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static ResourceMonitor & ResMon()
static Cache & GetInstance()
Singleton access.
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
bool register_incomplete_read()
XrdOucCacheIO * GetInput()
bool register_block_error(int res)
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
static const char * s_infoExtension
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
long long BytesReadAndWritten() const
long long m_BytesHit
number of bytes served from disk
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
prefetch buffer size, default 1MB
std::string m_meta_space
oss space for metadata files (cinfo)
std::string m_username
username passed to oss plugin
void update_error_cond(int ec)