XRootD
Loading...
Searching...
No Matches
XrdPfcFile.hh
Go to the documentation of this file.
1#ifndef __XRDPFC_FILE_HH__
2#define __XRDPFC_FILE_HH__
3//----------------------------------------------------------------------------------
4// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5// Author: Alja Mrak-Tadel, Matevz Tadel
6//----------------------------------------------------------------------------------
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//----------------------------------------------------------------------------------
20
21#include "XrdPfcTypes.hh"
22#include "XrdPfcInfo.hh"
23#include "XrdPfcStats.hh"
24
25#include "XrdOuc/XrdOucCache.hh"
26#include "XrdOuc/XrdOucIOVec.hh"
27
28#include <functional>
29#include <list>
30#include <map>
31#include <set>
32#include <string>
33
34class XrdJob;
35class XrdOucIOVec;
36
37namespace XrdPfc
38{
39class File;
42class IO;
43
44struct ReadVBlockListRAM;
45struct ReadVChunkListRAM;
46struct ReadVBlockListDisk;
47struct ReadVChunkListDisk;
48
50{
52 int m_n_chunks = 0; // Only set for ReadV().
53 unsigned short m_seq_id;
54 XrdOucCacheIOCB *m_iocb; // External callback passed into IO::Read().
55
56 ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) :
57 m_seq_id(sid), m_iocb(iocb)
58 {}
59};
60
61// -------------------------------------------------------------
62
64{
66 ReadReqRH *m_rh; // Internal callback created in IO::Read().
67
68 long long m_bytes_read = 0;
69 int m_error_cond = 0; // to be set to -errno
72
74 bool m_sync_done = false;
75 bool m_direct_done = true;
76
78 m_io(io), m_rh(rh)
79 {}
80
82
83 bool is_complete() const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; }
85};
86
87// -------------------------------------------------------------
88
90{
92 char *m_buf; // Where to place the data chunk.
93 long long m_off; // Offset *within* the corresponding block.
94 int m_size; // Size of the data chunk.
95
96 ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) :
97 m_read_req(rreq), m_buf(buf), m_off(off), m_size(size)
98 {}
99};
100
101using vChunkRequest_t = std::vector<ChunkRequest>;
102using vChunkRequest_i = std::vector<ChunkRequest>::iterator;
103
104// ================================================================
105
106class Block
107{
108public:
110 IO *m_io; // IO that handled current request, used for == / != comparisons only
111 void *m_req_id; // Identity of requestor -- used for stats.
112
113 char *m_buff;
114 long long m_offset;
118 int m_errno; // stores negative errno
124
126
127 Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize,
128 bool m_prefetch, bool cks_net) :
129 m_file(f), m_io(io), m_req_id(rid),
130 m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
133 {}
134
135 char* get_buff() const { return m_buff; }
136 int get_size() const { return m_size; }
137 int get_req_size() const { return m_req_size; }
138 long long get_offset() const { return m_offset; }
139
140 File* get_file() const { return m_file; }
141 IO* get_io() const { return m_io; }
142 void* get_req_id() const { return m_req_id; }
143
144 bool is_finished() const { return m_downloaded || m_errno != 0; }
145 bool is_ok() const { return m_downloaded; }
146 bool is_failed() const { return m_errno != 0; }
147
148 void set_downloaded() { m_downloaded = true; }
149 void set_error(int err) { m_errno = err; }
150 int get_error() const { return m_errno; }
151
152 void reset_error_and_set_io(IO *io, void *rid)
153 {
154 m_errno = 0;
155 m_io = io;
156 m_req_id = rid;
157 }
158
159 bool req_cksum_net() const { return m_req_cksum_net; }
160 bool has_cksums() const { return ! m_cksum_vec.empty(); }
164};
165
166using BlockList_t = std::list<Block*>;
167using BlockList_i = std::list<Block*>::iterator;
168
169// ================================================================
170
172{
173public:
175
177
178 void Done(int result) override;
179};
180
181// ----------------------------------------------------------------
182
184{
185public:
191 int m_errno = 0;
192
193 DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) :
194 m_file(file), m_read_req(rreq), m_to_wait(to_wait)
195 {}
196
197 void Done(int result) override;
198};
199
200// ================================================================
201
202class File
203{
204 friend class Cache;
207public:
208 // Constructor, destructor, Open() and Close() are private.
209
211 static File* FileOpen(const std::string &path, long long offset, long long fileSize);
212
215
217 void BlocksRemovedFromWriteQ(std::list<Block*>&);
218
220 int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh);
221
223 int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh);
224
225 //----------------------------------------------------------------------
227 //----------------------------------------------------------------------
228 void ioUpdated(IO *io);
229
230 //----------------------------------------------------------------------
233 //----------------------------------------------------------------------
234 bool ioActive(IO *io);
235
236 //----------------------------------------------------------------------
239 //----------------------------------------------------------------------
241
242 //----------------------------------------------------------------------
245 //----------------------------------------------------------------------
247
248 //----------------------------------------------------------------------
250 //----------------------------------------------------------------------
251 void Sync();
252
253 void WriteBlockToDisk(Block* b);
254
255 void Prefetch();
256
257 float GetPrefetchScore() const;
258
260 const char* lPath() const;
261
262 const std::string& GetLocalPath() const { return m_filename; }
263
266
267 long long GetFileSize() const { return m_file_size; }
268
269 void AddIO(IO *io);
272 void RemoveIO(IO *io);
273
274 std::string GetRemoteLocations() const;
275 const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); }
276 size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
277 int GetBlockSize() const { return m_cfi.GetBufferSize(); }
278 int GetNBlocks() const { return m_cfi.GetNBlocks(); }
279 int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
280 long long GetPrefetchedBytes() const { return m_prefetch_bytes; }
281 const Stats& RefStats() const { return m_stats; }
282
283 int Fstat(struct stat &sbuff);
284
285 // These three methods are called under Cache's m_active lock
286 int get_ref_cnt() { return m_ref_cnt; }
287 int inc_ref_cnt() { return ++m_ref_cnt; }
288 int dec_ref_cnt() { return --m_ref_cnt; }
289
290 long long initiate_emergency_shutdown();
291 bool is_in_emergency_shutdown() { return m_in_shutdown; }
292
293private:
295 File(const std::string &path, long long offset, long long fileSize);
296
298 ~File();
299
301 void Close();
302
304 bool Open();
305
306 static const char *m_traceID;
307
308 int m_ref_cnt;
309
310 XrdOssDF *m_data_file;
311 XrdOssDF *m_info_file;
312 Info m_cfi;
313
314 const std::string m_filename;
315 const long long m_offset;
316 const long long m_file_size;
317
318 // IO objects attached to this file.
319
320 typedef std::set<IO*> IoSet_t;
321 typedef IoSet_t::iterator IoSet_i;
322
323 IoSet_t m_io_set;
324 IoSet_i m_current_io;
325 int m_ios_in_detach;
326
327 // FSync
328
329 std::vector<int> m_writes_during_sync;
330 int m_non_flushed_cnt;
331 bool m_in_sync;
332 bool m_detach_time_logged;
333 bool m_in_shutdown;
334
335 // Block state and management
336
337 typedef std::list<int> IntList_t;
338 typedef IntList_t::iterator IntList_i;
339
340 typedef std::map<int, Block*> BlockMap_t;
341 typedef BlockMap_t::iterator BlockMap_i;
342
343 BlockMap_t m_block_map;
344 XrdSysCondVar m_state_cond;
345 long long m_block_size;
346 int m_num_blocks;
347
348 // Stats and ResourceMonitor interface
349
350 Stats m_stats;
351 Stats m_delta_stats;
352 long long m_st_blocks;
353 long long m_resmon_report_threshold;
354 int m_resmon_token;
355
356 void check_delta_stats();
357 void report_and_merge_delta_stats();
358
359 std::set<std::string> m_remote_locations;
360 void insert_remote_location(const std::string &loc);
361
362 // Prefetch
363
364 enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
365
366 PrefetchState_e m_prefetch_state;
367
368 long long m_prefetch_bytes;
369 int m_prefetch_read_cnt;
370 int m_prefetch_hit_cnt;
371 float m_prefetch_score; // cached
372
373 void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } }
374 void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt += phc; calc_prefetch_score(); } }
375 void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; }
376
377 // Helpers
378
379 bool overlap(int blk, // block to query
380 long long blk_size, //
381 long long req_off, // offset of user request
382 int req_size, // size of user request
383 // output:
384 long long &off, // offset in user buffer
385 long long &blk_off, // offset in block
386 int &size);
387
388 // Read & ReadV
389
390 Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch);
391
392 void ProcessBlockRequest (Block *b);
393 void ProcessBlockRequests(BlockList_t& blks);
394
395 void RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size);
396
397 int ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size);
398
399 int ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
400 ReadReqRH *rh, const char *tpfx);
401
402 void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond);
403 void ProcessBlockError(Block *b, ReadRequest *rreq);
404 void ProcessBlockSuccess(Block *b, ChunkRequest &creq);
405 void FinalizeReadRequest(ReadRequest *rreq);
406
407 void ProcessBlockResponse(Block *b, int res);
408
409 // Block management
410
411 void inc_ref_count(Block* b);
412 void dec_ref_count(Block* b, int count = 1);
413 void free_block(Block*);
414
415 bool select_current_io_or_disable_prefetching(bool skip_current);
416
417 int offsetIdx(int idx) const;
418};
419
420//------------------------------------------------------------------------------
421
422inline void File::inc_ref_count(Block* b)
423{
424 // Method always called under lock.
425 b->m_refcnt++;
426}
427
428//------------------------------------------------------------------------------
429
430inline void File::dec_ref_count(Block* b, int count)
431{
432 // Method always called under lock.
433 assert(b->is_finished());
434 b->m_refcnt -= count;
435 assert(b->m_refcnt >= 0);
436
437 if (b->m_refcnt == 0)
438 {
439 free_block(b);
440 }
441}
442
443}
444
445#endif
#define stat(a, b)
Definition XrdPosix.hh:101
XrdOucString File
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net)
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
bool is_finished() const
bool is_ok() const
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
bool is_failed() const
long long m_offset
File * get_file() const
vCkSum_t m_cksum_vec
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
void Done(int result) override
DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait)
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
XrdSysTrace * GetTrace()
void WriteBlockToDisk(Block *b)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
XrdSysError * GetLog()
int GetNBlocks() const
void StopPrefetchingOnIO(IO *io)
std::string GetRemoteLocations() const
size_t GetAccessCnt() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
long long GetPrefetchedBytes() const
int GetBlockSize() const
int GetNDownloadedBlocks() const
const Info::AStat * GetLastAccessStats() const
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
int inc_ref_cnt()
int GetPrefetchCountOnIO(IO *io)
const Stats & RefStats() const
void Sync()
Sync file cache inf o and output data with disk.
int dec_ref_cnt()
int get_ref_cnt()
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
long long GetFileSize() const
const std::string & GetLocalPath() const
void RemoveIO(IO *io)
friend class Cache
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
bool is_in_emergency_shutdown()
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
Statistics of cache utilisation by a File object.
std::list< Block * > BlockList_t
std::vector< ChunkRequest >::iterator vChunkRequest_i
std::vector< ChunkRequest > vChunkRequest_t
std::vector< uint32_t > vCkSum_t
std::list< Block * >::iterator BlockList_i
ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size)
Definition XrdPfcFile.hh:96
ReadRequest * m_read_req
Definition XrdPfcFile.hh:91
Access statistics.
Definition XrdPfcInfo.hh:57
XrdOucCacheIOCB * m_iocb
Definition XrdPfcFile.hh:54
unsigned short m_seq_id
Definition XrdPfcFile.hh:53
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition XrdPfcFile.hh:56
void update_error_cond(int ec)
Definition XrdPfcFile.hh:81
ReadRequest(IO *io, ReadReqRH *rh)
Definition XrdPfcFile.hh:77
bool is_complete() const
Definition XrdPfcFile.hh:83
int return_value() const
Definition XrdPfcFile.hh:84
long long m_bytes_read
Definition XrdPfcFile.hh:68