XRootD
Loading...
Searching...
No Matches
XrdCephBufferAlgSimple.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2//------------------------------------------------------------------------------
3
4#include <sys/types.h>
6
7#include "../XrdCephPosix.hh"
8#include <XrdOuc/XrdOucEnv.hh>
9#include <fcntl.h>
10#include <sys/stat.h>
11#include <iostream>
12#include <thread>
13
14#include "XrdSfs/XrdSfsAio.hh"
15
16
17using namespace XrdCephBuffer;
18
19
20XrdCephBufferAlgSimple::XrdCephBufferAlgSimple(std::unique_ptr<IXrdCephBufferData> buffer,
21 std::unique_ptr<ICephIOAdapter> cephio, int fd,
22 bool useStriperlessReads):
23m_bufferdata(std::move(buffer)), m_cephio(std::move(cephio)), m_fd(fd),
24m_useStriperlessReads(useStriperlessReads) {
25
26}
27
29 int prec = std::cout.precision();
30 float bytesBuffered = m_stats_bytes_fromceph - m_stats_bytes_bypassed;
31 float cacheUseFraction = bytesBuffered > 0 ? (1.*(m_stats_bytes_toclient-m_stats_bytes_bypassed)/bytesBuffered) : 1. ;
32
33 BUFLOG("XrdCephBufferAlgSimple::Destructor, fd=" << m_fd
34 << ", retrieved_bytes=" << m_stats_bytes_fromceph
35 << ", bypassed_bytes=" << m_stats_bytes_bypassed
36 << ", delivered_bytes=" << m_stats_bytes_toclient
37 << std::setprecision(4)
38 << ", cache_hit_frac=" << cacheUseFraction << std::setprecision(prec));
39 m_fd = -1;
40}
41
42
44 // Currently this is not supported, and callers using this should recieve the appropriate error code
45 //return -ENOSYS;
46
47 ssize_t rc(-ENOSYS);
48 if (!aoip) {
49 return -EINVAL;
50 }
51
52 volatile void * buf = aoip->sfsAio.aio_buf;
53 size_t blen = aoip->sfsAio.aio_nbytes;
54 off_t offset = aoip->sfsAio.aio_offset;
55
56 // translate the aio read into a simple sync read.
57 // hopefully don't get too many out of sequence reads to effect the caching
58 rc = read(buf, offset, blen);
59
60 aoip->Result = rc;
61 aoip->doneRead();
62
63 return rc;
64
65}
66
68 // Currently this is not supported, and callers using this should recieve the appropriate error code
69 // return -ENOSYS;
70
71 ssize_t rc(-ENOSYS);
72 if (!aoip) {
73 return -EINVAL;
74 }
75
76 // volatile void * buf = aoip->sfsAio.aio_buf;
77 // size_t blen = aoip->sfsAio.aio_nbytes;
78 // off_t offset = aoip->sfsAio.aio_offset;
79 size_t blen = aoip->sfsAio.aio_nbytes;
80 off_t offset = aoip->sfsAio.aio_offset;
81
82 rc = write(const_cast<const void*>(aoip->sfsAio.aio_buf), offset, blen);
83 aoip->Result = rc;
84 aoip->doneWrite();
85 return rc;
86
87}
88
89
90ssize_t XrdCephBufferAlgSimple::read(volatile void *buf, off_t offset, size_t blen) {
91 // Set a lock for any attempt at a simultaneous operation
92 // Use recursive, as flushCache also calls the lock and don't want to deadlock
93 // No call to flushCache should happen in a read, but be consistent
94 // BUFLOG("XrdCephBufferAlgSimple::read: preLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << offset << " " << blen);
95 const std::lock_guard<std::recursive_mutex> lock(m_data_mutex); //
96 // BUFLOG("XrdCephBufferAlgSimple::read: postLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << offset << " " << blen);
97
98 // BUFLOG("XrdCephBufferAlgSimple::read status:"
99 // << "\n\tRead off/len/end: " << offset << "/" << blen << "/(" << (offset+blen) <<")"
100 // << "\n\tBuffer: start/length/end/cap: " << m_bufferStartingOffset << "/" << m_bufferLength << "/"
101 // << (m_bufferStartingOffset + m_bufferLength) << "/" << m_bufferdata->capacity()
102 // );
103 if (blen == 0) return 0;
104
109 if (blen >= m_bufferdata->capacity()) {
110 //BUFLOG("XrdCephBufferAlgSimple::read: Readthrough cache: fd: " << m_fd
111 // << " " << offset << " " << blen);
112 // larger than cache, so read through, and invalidate the cache anyway
113 m_bufferdata->invalidate();
114 m_bufferLength =0; // ensure cached data is set to zero length
115 // #FIXME JW: const_cast is probably a bit poor.
116
117 ssize_t rc = ceph_posix_maybestriper_pread (m_fd, const_cast<void*>(buf), blen, offset, m_useStriperlessReads);
118 if (rc > 0) {
119 m_stats_bytes_fromceph += rc;
120 m_stats_bytes_toclient += rc;
121 m_stats_bytes_bypassed += rc;
122 }
123 return rc;
124 }
125
126 ssize_t rc(-1);
127 size_t bytesRemaining = blen; // track how many bytes still need to be read
128 off_t offsetDelta = 0;
129 size_t bytesRead = 0;
135 while (bytesRemaining > 0) {
136 // BUFLOG("In loop: " << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining << " " << m_bufferLength);
137
138 bool loadCache = false;
139 // run some checks to see if we need to fill the cache.
140 if (m_bufferLength == 0) {
141 // no data in buffer
142 loadCache = true;
143 } else if (offset < m_bufferStartingOffset) {
144 // offset before any cache data
145 loadCache = true;
146 } else if (offset >= (off_t) (m_bufferStartingOffset + m_bufferLength) ) {
147 // offset is beyond the stored data
148 loadCache = true;
149 } else if ((offset - m_bufferStartingOffset + offsetDelta) >= (off_t)m_bufferLength) {
150 // we have now read to the end of the buffers data
151 loadCache = true;
152 }
153
158 if (loadCache) {
159 // BUFLOG("XrdCephBufferAlgSimple::read: preLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << "Filling the cache");
160 m_bufferdata->invalidate();
161 m_bufferLength =0; // set lengh of data stored to 0
162 rc = m_cephio->read(offset + offsetDelta, m_bufferdata->capacity()); // fill the cache
163 // BUFLOG("LoadCache ReadToCache: " << rc << " " << offset + offsetDelta << " " << m_bufferdata->capacity() );
164 if (rc < 0) {
165 BUFLOG("LoadCache Error: " << rc);
166 return rc;// TODO return correct errors
167 }
168 m_stats_bytes_fromceph += rc;
169 m_bufferStartingOffset = offset + offsetDelta;
170 m_bufferLength = rc;
171 if (rc == 0) {
172 // We should be at the end of file, with nothing more to read, and nothing that could be returned
173 // break out of the loop.
174 break;
175 }
176 }
177
178
179 //now read as much data as possible
180 off_t bufPosition = offset + offsetDelta - m_bufferStartingOffset;
181 rc = m_bufferdata->readBuffer( (void*) &(((char*)buf)[offsetDelta]) , bufPosition , bytesRemaining);
182 // BUFLOG("Fill result: " << offsetDelta << " " << bufPosition << " " << bytesRemaining << " " << rc)
183 if (rc < 0 ) {
184 BUFLOG("Reading from Cache Failed: " << rc << " " << offset << " "
185 << offsetDelta << " " << m_bufferStartingOffset << " "
186 << bufPosition << " "
187 << bytesRemaining );
188 return rc; // TODO return correct errors
189 }
190 if (rc == 0) {
191 // no bytes returned; much be at end of file
192 //BUFLOG("No bytes returned: " << rc << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining);
193 break; // leave the loop even though bytesremaing is probably >=0.
194 //i.e. requested a full buffers worth, but only a fraction of the file is here.
195 }
196 m_stats_bytes_toclient += rc;
197 // BUFLOG("End of loop: " << rc << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining);
198 offsetDelta += rc;
199 bytesRemaining -= rc;
200 bytesRead += rc;
201
202 } // while bytesremaing
203
204 return bytesRead;
205}
206
207ssize_t XrdCephBufferAlgSimple::write (const void *buf, off_t offset, size_t blen) {
208 // Set a lock for any attempt at a simultaneous operation
209 // Use recursive, as flushCache also calls the lock and don't want to deadlock
210 const std::lock_guard<std::recursive_mutex> lock(m_data_mutex);
211
212 // take the data in buf and put it into the cache; when the cache is full, write to underlying storage
213 // remember to flush the cache at the end of operations ...
214 ssize_t rc(-1);
215 ssize_t bytesWrittenToStorage(0);
216
217 if (blen == 0) {
218 return 0; // nothing to write; are we done?
219 }
220
227 off_t expected_offset = (off_t)(m_bufferStartingOffset + m_bufferLength);
228
229 if ((offset != expected_offset) && (m_bufferLength > 0) ) {
230 // for the moment we just log that there is some non expected offset value
231 // TODO, might be dangerous to flush the cache on non-aligned writes ...
232 BUFLOG("Non expected offset: " << rc << " " << offset << " " << expected_offset);
233 // rc = flushWriteCache();
234 // if (rc < 0) {
235 // return rc; // TODO return correct errors
236 // }
237 } // mismatched offset
238
241 if ( (m_bufferStartingOffset % m_bufferdata->capacity()) != 0 ) {
242 BUFLOG(" Non aligned offset?" << m_bufferStartingOffset << " "
243 << m_bufferdata->capacity() << " " << m_bufferStartingOffset % m_bufferdata->capacity() );
244 }
245
246 // Commmented out below. It would be good to pass writes, which are larger than the buffer size,
247 // straight-through. However if the ranges are not well aligned, this could be an issue.
248 // And, what then to do about a possible partial filled buffer?
249
250 // if (blen >= m_bufferdata->capacity()) {
251 // // TODO, might be dangerous to flush the cache on non-aligned writes ...
252 // // flush the cache now, if needed
253 // rc = flushWriteCache();
254 // if (rc < 0) {
255 // return rc; // TODO return correct errors
256 // }
257 // bytesWrittenToStorage += rc;
258
259 // // Size is larger than the buffer; send the write straight through
260 // std::clog << "XrdCephBufferAlgSimple::write: Readthrough cache: fd: " << m_fd
261 // << " " << offset << " " << blen << std::endl;
262 // // larger than cache, so read through, and invalidate the cache anyway
263 // m_bufferdata->invalidate();
264 // m_bufferLength=0;
265 // m_bufferStartingOffset=0;
266 // rc = ceph_posix_pwrite(m_fd, buf, blen, offset);
267 // if (rc < 0) {
268 // return rc; // TODO return correct errors
269 // }
270 // bytesWrittenToStorage += rc;
271 // return rc;
272 // }
273
278 if ((offset != expected_offset) && (m_bufferLength > 0) ) {
279 BUFLOG("Error trying to write out of order: expected at: " << expected_offset
280 << " got offset" << offset << " of len " << blen);
281 return -EINVAL;
282 }
283 if (offset < 0) {
284 BUFLOG("Got a negative offset: " << offset);
285 return -EINVAL;
286 }
287
288
289 size_t bytesRemaining = blen;
290 size_t bytesWritten = 0;
291
295 while (bytesRemaining > 0) {
299 if (m_bufferLength == m_bufferdata->capacity()) {
300 rc = flushWriteCache();
301 if (rc < 0) {
302 return rc;
303 }
304 bytesWrittenToStorage += rc;
305 } // at capacity;
306
307 if (m_bufferLength == 0) {
308 // cache is currently empty, so set the 'reference' to the external offset now
309 m_bufferStartingOffset = offset + bytesWritten;
310 }
311 //add data to the cache from buf, from buf[offsetDelta] to the cache at position m_bufferLength
312 // make sure to write only as many bytes as left in the cache.
313 size_t nBytesToWrite = std::min(bytesRemaining, m_bufferdata->capacity()-m_bufferLength);
314 const void* bufAtOffset = (void*)((char*)buf + bytesWritten); // nasty cast as void* doesn't do arithmetic
315 if (nBytesToWrite == 0) {
316 BUFLOG( "Wanting to write 0 bytes; why is that?");
317 }
318 rc = m_bufferdata->writeBuffer(bufAtOffset, m_bufferLength, nBytesToWrite, 0);
319 if (rc < 0) {
320 BUFLOG( "WriteBuffer step failed: " << rc << " " << m_bufferLength << " " << blen << " " << offset );
321 return rc; // pass the error condidition upwards
322 }
323 if (rc != (ssize_t)nBytesToWrite) {
324 BUFLOG( "WriteBuffer returned unexpected number of bytes: " << rc << " Expected: " << nBytesToWrite << " "
325 << m_bufferLength << " " << blen << " " << offset );
326 return -EBADE; // is bad exchange error best errno here?
327 }
328
329 // lots of repetition here; #TODO try to reduce
330 m_bufferLength += rc;
331 bytesWritten += rc;
332 bytesRemaining -= rc;
333
334 } // while byteRemaining
335
339 if (m_bufferLength == m_bufferdata->capacity()){
340 rc = flushWriteCache();
341 if (rc < 0)
342 {
343 return rc; // TODO return correct errors
344 }
345 bytesWrittenToStorage += rc;
346 } // at capacity;
347
348 //BUFLOG( "WriteBuffer " << bytesWritten << " " << bytesWrittenToStorage << " " << offset << " " << blen << " " );
349 return bytesWritten;
350}
351
352
353
355 // Set a lock for any attempt at a simultaneous operation
356 // Use recursive, as write (and read) also calls the lock and don't want to deadlock
357 const std::lock_guard<std::recursive_mutex> lock(m_data_mutex); //
358 // BUFLOG("flushWriteCache: " << m_bufferStartingOffset << " " << m_bufferLength);
359 ssize_t rc(-1);
360 if (m_bufferLength == 0) {
361 BUFLOG("Empty buffer to flush: ");
362 rc = 0; // not an issue
363 }
364
365 if (m_bufferLength > 0) {
366 rc = m_cephio->write(m_bufferStartingOffset, m_bufferLength);
367 if (rc < 0) {
368 BUFLOG("WriteBuffer write step failed: " << rc);
369 }
370 } // some bytes to write
371
372 // reset values
373 m_bufferLength=0;
374 m_bufferStartingOffset=0;
375 m_bufferdata->invalidate();
376 // return bytes written, or errorcode if failure
377 return rc;
378}
379
380
381ssize_t XrdCephBufferAlgSimple::rawRead (void *buf, off_t offset, size_t blen) {
382 return -ENOSYS;
383}
384
385ssize_t XrdCephBufferAlgSimple::rawWrite(void *buf, off_t offset, size_t blen) {
386 return -ENOSYS;
387}
#define BUFLOG(x)
ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper)
#define write(a, b, c)
Definition XrdPosix.hh:115
#define read(a, b, c)
Definition XrdPosix.hh:82
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
void * aio_buf
Definition XrdSfsAio.hh:47
virtual ssize_t write_aio(XrdSfsAio *aoip) override
possible aio based code
virtual ssize_t rawWrite(void *buff, off_t offset, size_t blen)
virtual const IXrdCephBufferData * buffer() const
virtual ssize_t flushWriteCache() override
remember to flush the cache on final writes
XrdCephBufferAlgSimple(std::unique_ptr< IXrdCephBufferData > buffer, std::unique_ptr< ICephIOAdapter > cephio, int fd, bool useStriperlessReads=true)
virtual ssize_t write(const void *buff, off_t offset, size_t blen) override
write data through the buffer
virtual ssize_t rawRead(void *buff, off_t offset, size_t blen)
virtual ssize_t read(volatile void *buff, off_t offset, size_t blen) override
read data through the buffer
virtual ssize_t read_aio(XrdSfsAio *aoip) override
possible aio based code
ssize_t Result
Definition XrdSfsAio.hh:65
virtual void doneRead()=0
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
virtual void doneWrite()=0
is a simple implementation of IXrdCephBufferData using std::vector<char> representation for the buffe...