XRootD
Loading...
Searching...
No Matches
XrdCephBulkAioRead.cc
Go to the documentation of this file.
2
3
4bulkAioRead::bulkAioRead(librados::IoCtx* ct, logfunc_pointer logwrapper, CephFileRef* fileref) {
13 context = ct;
14 file_ref = fileref;
15 log_func = logwrapper;
16}
17
24
29 operations.clear();
30 buffers.clear();
31}
32
33int bulkAioRead::addRequest(size_t obj_idx, char* out_buf, size_t size, off64_t offset) {
50
51 try{
52 auto &op_data = operations[obj_idx];
53 //When we start using C++17, the next two lines can be merged
54 buffers.emplace_back(out_buf);
55 auto &buf = buffers.back();
56 op_data.ceph_read_op.read(offset, size, &buf.bl, &buf.rc);
57 } catch (std::bad_alloc&) {
58 log_func((char*)"Memory allocation failed while reading file %s", file_ref->name.c_str());
59 return -ENOMEM;
60 }
61 return 0;
62}
63
73
74 for (auto &op_data: operations) {
75 size_t obj_idx = op_data.first;
76 //16 bytes for object hex number, 1 for dot and 1 for null-terminator
77 char object_suffix[18];
78 int sp_bytes_written;
79 sp_bytes_written = snprintf(object_suffix, sizeof(object_suffix), ".%016zx", obj_idx);
80 if (sp_bytes_written >= (int) sizeof(object_suffix)) {
81 log_func((char*)"Can not fit object suffix into buffer for file %s -- too big\n", file_ref->name.c_str());
82 return -EFBIG;
83 }
84
85 std::string obj_name;
86 try {
87 obj_name = file_ref->name + std::string(object_suffix);
88 } catch (std::bad_alloc&) {
89 log_func((char*)"Can not create object string for file %s)", file_ref->name.c_str());
90 return -ENOMEM;
91 }
92 context->aio_operate(obj_name, op_data.second.cmpl.use(), &op_data.second.ceph_read_op, 0);
93 }
94
95 for (auto &op_data: operations) {
96 op_data.second.cmpl.wait_for_complete();
97 int rval = op_data.second.cmpl.get_return_value();
98 /*
99 * Optimization is possible here: cancel all remaining read operations after the failure.
100 * One way to do so is the following: add context as an argument to the `use` method of CmplPtr.
101 * Then inside the class this pointer can be saved and used by the destructor to call
102 * `aio_cancel` (and probably `wait_for_complete`) before releasing the completion.
103 * Though one need to clarify whether it is necessary to cal `wait_for_complete` after
104 * `aio_cancel` (i.e. may the status variable/bufferlist still be written to or not).
105 */
106 if (rval < 0) {
107 log_func((char*)"Read of the object %ld for file %s failed", op_data.first, file_ref->name.c_str());
108 return rval;
109 }
110 }
111 return 0;
112}
113
125
126 ssize_t res = 0;
127 for (ReadOpData &op_data: buffers) {
128 if (op_data.rc < 0) {
129 //Is it possible to get here?
130 log_func((char*)"One of the reads failed with rc %d", op_data.rc);
131 return op_data.rc;
132 }
133 op_data.bl.begin().copy(op_data.bl.length(), op_data.out_buf);
134 res += op_data.bl.length();
135 }
136 //We should clear used completions to allow new operations
137 clear();
138 return res;
139}
140
141int bulkAioRead::read(void* out_buf, size_t req_size, off64_t offset) {
157
158 if (req_size == 0) {
159 log_func((char*)"Zero-length read request for file %s, probably client error", file_ref->name.c_str());
160 return 0;
161 }
162
163 char* const buf_start_ptr = (char*) out_buf;
164
165 size_t object_size = file_ref->objectSize;
166 //The amount of bytes that is yet to be read
167 size_t to_read = req_size;
168 //block means ceph object here
169 size_t start_block = offset / object_size;
170 size_t buf_pos = 0;
171 size_t chunk_start = offset % object_size;
172
173 while (to_read > 0) {
174 size_t chunk_len = std::min(to_read, object_size - chunk_start);
175
176 if (buf_pos >= req_size) {
177 log_func((char*)"Internal bug! Attempt to read %lu data for block (%lu, %lu) of file %s\n", buf_pos, offset, req_size, file_ref->name.c_str());
178 return -EINVAL;
179 }
180
181 int rc = addRequest(start_block, buf_start_ptr + buf_pos, chunk_len, chunk_start);
182 if (rc < 0) {
183 log_func((char*)"Unable to submit async read request, rc=%d\n", rc);
184 return rc;
185 }
186
187 buf_pos += chunk_len;
188
189 start_block++;
190 chunk_start = 0;
191 if (chunk_len > to_read) {
192 log_func((char*)"Internal bug! Read %lu bytes, more than expected %lu bytes for block (%lu, %lu) of file %s\n", chunk_len, to_read, offset, req_size, file_ref->name.c_str());
193 return -EINVAL;
194 }
195 to_read = to_read - chunk_len;
196 }
197 return 0;
198}
static void logwrapper(char *format, va_list argp)
Definition XrdCephOss.cc:62
int read(void *out_buf, size_t size, off64_t offset)
bulkAioRead(librados::IoCtx *ct, logfunc_pointer ptr, CephFileRef *fileref)
int submit_and_wait_for_complete()