XRootD
Loading...
Searching...
No Matches
XrdPfcIOFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19#include "XrdPfcIOFile.hh"
20#include "XrdPfcStats.hh"
21#include "XrdPfcTrace.hh"
22
23#include "XrdOss/XrdOss.hh"
25#include "XrdSys/XrdSysError.hh"
26
27#include "XrdOuc/XrdOucEnv.hh"
29
30#include <cstdio>
31#include <fcntl.h>
32
33using namespace XrdPfc;
34
35//______________________________________________________________________________
37 IO(io, cache),
38 m_file(0)
39{
40 m_file = Cache::GetInstance().GetFile(GetFilename(), this);
41}
42
43//______________________________________________________________________________
45{
46 // called from Detach() if no sync is needed or
47 // from Cache's sync thread
48 TRACEIO(Debug, "~IOFile() " << this);
49}
50
51//______________________________________________________________________________
52int IOFile::Fstat(struct stat &sbuff)
53{
54 // This is only called during Cache::Attach / Cache::GetFile() for file creation.
55 // Should really be separate name but one needs to change virtual interface
56 // so initialStat() becomes virtual in the base-class.
57 // Also, IOFileBlock should be ditched.
58 if ( ! m_file) {
59 return initialStat(sbuff);
60 }
61
62 return m_file->Fstat(sbuff);
63}
64
65//______________________________________________________________________________
66long long IOFile::FSize()
67{
68 return m_file->GetFileSize();
69}
70
71//______________________________________________________________________________
72int IOFile::initialStat(struct stat &sbuff)
73{
74 // Get stat to determine file-size.
75 // Called indirectly from the constructor, via Cache::GetFile().
76 // Either read it from cinfo file or obtain it from the remote IO.
77
78 static const char* trace_pfx = "initialStat ";
79
80 std::string fname = GetFilename();
81 if (m_cache.GetOss()->Stat(fname.c_str(), &sbuff) == XrdOssOK)
82 {
83 long long file_size = m_cache.DetermineFullFileSize(fname + Info::s_infoExtension);
84 if (file_size >= 0)
85 {
86 sbuff.st_size = file_size;
87 TRACEIO(Info, trace_pfx << "successfully read size " << sbuff.st_size << " from info file");
88 return 0;
89 }
90 TRACEIO(Error, trace_pfx << "failed reading from info file " << XrdSysE2T(-file_size));
91 }
92
93 int res = GetInput()->Fstat(sbuff);
94 TRACEIO(Debug, trace_pfx << "stat from client res = " << res << ", size = " << sbuff.st_size);
95
96 return res;
97}
98
99//______________________________________________________________________________
101{
102 IO::Update(iocp);
103 m_file->ioUpdated(this);
104}
105
106//______________________________________________________________________________
108{
110 return m_file->ioActive(this);
111}
112
113//______________________________________________________________________________
115{
116 // Effectively a destructor.
117
118 TRACE(Debug, "DetachFinalize() " << this);
119
120 m_file->RequestSyncOfDetachStats();
121 Cache::GetInstance().ReleaseFile(m_file, this);
122
123 if (( ! m_error_counts.empty() || m_incomplete_count > 0) && XRD_TRACE What >= TRACE_Error) {
124 char info[1024];
125 int pos = 0, cap = 1024;
126 bool truncated = false;
127 for (auto [err, cnt] : m_error_counts) {
128 int len = snprintf(&info[pos], cap, " ( %d : %d)", err, cnt);
129 if (len >= cap) {
130 truncated = true;
131 break;
132 }
133 pos += len;
134 cap -= len;
135 }
136 TRACE(Error, "DetachFinalize() " << this << " n_incomplete_reads=" << m_incomplete_count
137 << ", block (error : count) report:" << info << (truncated ? " - truncated" : ""));
138 }
139
140 delete this;
141}
142
143
144//==============================================================================
145// Read and pgRead - sync / async and helpers
146//==============================================================================
147
148//______________________________________________________________________________
149int IOFile::Read(char *buff, long long off, int size)
150{
152
153 auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr);
154
155 TRACEIO(Dump, "Read() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
156
157 rh->m_cond.Lock();
158 int retval = ReadBegin(buff, off, size, rh);
159 if (retval == -EWOULDBLOCK)
160 {
161 rh->m_cond.Wait();
162 retval = rh->m_retval;
163 }
164 rh->m_cond.UnLock();
165
166 return ReadEnd(retval, rh);
167}
168
169//______________________________________________________________________________
170void IOFile::Read(XrdOucCacheIOCB &iocb, char *buff, long long off, int size)
171{
172 struct ZHandler : ReadReqRH
173 { using ReadReqRH::ReadReqRH;
174 IOFile *m_io = nullptr;
175
176 void Done(int result) override {
177 m_io->ReadEnd(result, this);
178 }
179 };
180
182
183 auto *rh = new ZHandler(ObtainReadSid(), &iocb);
184 rh->m_io = this;
185
186 TRACEIO(Dump, "Read() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
187
188 int retval = ReadBegin(buff, off, size, rh);
189 if (retval != -EWOULDBLOCK)
190 {
191 rh->Done(retval);
192 }
193}
194
195//______________________________________________________________________________
196void IOFile::pgRead(XrdOucCacheIOCB &iocb, char *buff, long long off, int size,
197 std::vector<uint32_t> &csvec, uint64_t opts, int *csfix)
198{
199 struct ZHandler : ReadReqRH
200 { using ReadReqRH::ReadReqRH;
201 IOFile *m_io = nullptr;
202 std::function<void (int)> m_lambda {0};
203
204 void Done(int result) override {
205 if (m_lambda) m_lambda(result);
206 m_io->ReadEnd(result, this);
207 }
208 };
209
211
212 auto *rh = new ZHandler(ObtainReadSid(), &iocb);
213 rh->m_io = this;
214
215 TRACEIO(Dump, "pgRead() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
216
218 rh->m_lambda = [=, &csvec](int result) {
219 if (result > 0)
220 XrdOucPgrwUtils::csCalc((const char *)buff, (ssize_t)off, (size_t)result, csvec);
221 };
222
223 int retval = ReadBegin(buff, off, size, rh);
224 if (retval != -EWOULDBLOCK)
225 {
226 rh->Done(retval);
227 }
228}
229
230//______________________________________________________________________________
231int IOFile::ReadBegin(char *buff, long long off, int size, ReadReqRH *rh)
232{
233 // protect from reads over the file size
234 if (off >= FSize()) {
235 size = 0;
236 return 0;
237 }
238 if (off < 0) {
239 return -EINVAL;
240 }
241 if (off + size > FSize()) {
242 size = FSize() - off;
243 }
244 rh->m_expected_size = size;
245
246 return m_file->Read(this, buff, off, size, rh);
247}
248
249//______________________________________________________________________________
250int IOFile::ReadEnd(int retval, ReadReqRH *rh)
251{
252 TRACEIO(Dump, "ReadEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " retval: " << retval << " expected_size: " << rh->m_expected_size);
253
254 if (retval < 0) {
255 TRACEIO(Debug, "ReadEnd() error in File::Read(), exit status=" << retval << ", error=" << XrdSysE2T(-retval) << " sid: " << Xrd::hex1 << rh->m_seq_id);
256 } else if (retval < rh->m_expected_size) {
257 TRACEIO(Debug, "ReadEnd() bytes missed " << rh->m_expected_size - retval << " sid: " << Xrd::hex1 << rh->m_seq_id);
258 }
259 if (rh->m_iocb)
260 rh->m_iocb->Done(retval);
261
262 delete rh;
263
265
266 return retval;
267}
268
269
270//==============================================================================
271// ReadV
272//==============================================================================
273
274//______________________________________________________________________________
275int IOFile::ReadV(const XrdOucIOVec *readV, int n)
276{
278
279 auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr);
280
281 TRACEIO(Dump, "ReadV() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n);
282
283 rh->m_cond.Lock();
284 int retval = ReadVBegin(readV, n, rh);
285 if (retval == -EWOULDBLOCK)
286 {
287 rh->m_cond.Wait();
288 retval = rh->m_retval;
289 }
290 rh->m_cond.UnLock();
291 return ReadVEnd(retval, rh);
292}
293
294//______________________________________________________________________________
295void IOFile::ReadV(XrdOucCacheIOCB &iocb, const XrdOucIOVec *readV, int n)
296{
297 struct ZHandler : ReadReqRH
298 { using ReadReqRH::ReadReqRH;
299 IOFile *m_io = nullptr;
300
301 void Done(int result) override { m_io-> ReadVEnd(result, this); }
302 };
303
305
306 auto *rh = new ZHandler(ObtainReadSid(), &iocb);
307 rh->m_io = this;
308
309 TRACEIO(Dump, "ReadV() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n);
310
311 int retval = ReadVBegin(readV, n, rh);
312 if (retval != -EWOULDBLOCK)
313 {
314 rh->Done(retval);
315 }
316}
317
318//______________________________________________________________________________
319int IOFile::ReadVBegin(const XrdOucIOVec *readV, int n, ReadReqRH *rh)
320{
321 long long file_size = FSize();
322 for (int i = 0; i < n; ++i)
323 {
324 const XrdOucIOVec &vr = readV[i];
325 if (vr.offset < 0 || vr.offset >= file_size ||
326 vr.offset + vr.size > file_size)
327 {
328 return -EINVAL;
329 }
330 rh->m_expected_size += vr.size;
331 }
332 rh->m_n_chunks = n;
333
334 return m_file->ReadV(this, readV, n, rh);
335}
336
337//______________________________________________________________________________
338int IOFile::ReadVEnd(int retval, ReadReqRH *rh)
339{
340 TRACEIO(Dump, "ReadVEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id <<
341 " retval: " << retval << " n_chunks: " << rh->m_n_chunks << " expected_size: " << rh->m_expected_size);
342
343 if (retval < 0) {
344 TRACEIO(Debug, "ReadVEnd() error in File::ReadV(), exit status=" << retval << ", error=" << XrdSysE2T(-retval));
345 } else if (retval < rh->m_expected_size) {
346 TRACEIO(Debug, "ReadVEnd() bytes missed " << rh->m_expected_size - retval);
347 }
348 if (rh->m_iocb)
349 rh->m_iocb->Done(retval);
350
351 delete rh;
352
354
355 return retval;
356}
#define XrdOssOK
Definition XrdOss.hh:50
#define TRACE_Error
Definition XrdPfcTrace.hh:7
#define TRACEIO(act, x)
#define stat(a, b)
Definition XrdPosix.hh:101
#define XRD_TRACE
bool Debug
struct myOpts opts
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int Fstat(struct stat &sbuff)
static const uint64_t forceCS
XrdOucCacheIO()
Construct and Destructor.
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:152
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition XrdPfc.cc:925
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition XrdPfc.cc:389
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
void ReleaseFile(File *, IO *)
Definition XrdPfc.cc:471
XrdOss * GetOss() const
Definition XrdPfc.hh:268
void Update(XrdOucCacheIO &iocp) override
void DetachFinalize() override
Abstract virtual method of XrdPfc::IO Called to destruct the IO object after it is no longer used.
int Read(char *buff, long long off, int size) override
Pass Read request to the corresponding File object.
int Fstat(struct stat &sbuff) override
bool ioActive() override
Abstract virtual method of XrdPfc::IO Called to check if destruction needs to be done in a separate t...
void pgRead(XrdOucCacheIOCB &iocb, char *buff, long long off, int size, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0) override
int ReadV(const XrdOucIOVec *readV, int n) override
Pass ReadV request to the corresponding File object.
long long FSize() override
IOFile(XrdOucCacheIO *io, Cache &cache)
IO(XrdOucCacheIO *io, Cache &cache)
Definition XrdPfcIO.cc:7
int m_incomplete_count
Definition XrdPfcIO.hh:88
std::map< int, int > m_error_counts
Definition XrdPfcIO.hh:89
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31
Cache & m_cache
reference to Cache object
Definition XrdPfcIO.hh:50
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:70
const char * RefreshLocation()
Definition XrdPfcIO.hh:55
void Update(XrdOucCacheIO &iocp) override
Definition XrdPfcIO.cc:17
unsigned short ObtainReadSid()
Definition XrdPfcIO.hh:57
std::string GetFilename()
Definition XrdPfcIO.cc:36
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
static const char * s_infoExtension
long long offset
XrdOucCacheIOCB * m_iocb
Definition XrdPfcFile.hh:54
unsigned short m_seq_id
Definition XrdPfcFile.hh:53
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition XrdPfcFile.hh:56