XRootD
Loading...
Searching...
No Matches
XrdTpcMultistream.cc
Go to the documentation of this file.
1
5#ifdef XRD_CHUNK_RESP
6
7#include "XrdTpcTPC.hh"
8#include "XrdTpcState.hh"
9#include "XrdTpcCurlMulti.hh"
10
11#include "XrdSys/XrdSysError.hh"
12
13#include <curl/curl.h>
14
15#include <algorithm>
16#include <sstream>
17#include <stdexcept>
18
19
20using namespace TPC;
21
22class CurlHandlerSetupError : public std::runtime_error {
23public:
24 CurlHandlerSetupError(const std::string &msg) :
25 std::runtime_error(msg)
26 {}
27
28 virtual ~CurlHandlerSetupError() throw () {}
29};
30
31namespace {
32class MultiCurlHandler {
33public:
34 MultiCurlHandler(std::vector<State*> &states, XrdSysError &log) :
35 m_handle(curl_multi_init()),
36 m_states(states),
37 m_log(log),
38 m_bytes_transferred(0),
39 m_error_code(0),
40 m_status_code(0)
41 {
42 if (m_handle == NULL) {
43 throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle");
44 }
45 m_avail_handles.reserve(states.size());
46 m_active_handles.reserve(states.size());
47 for (std::vector<State*>::const_iterator state_iter = states.begin();
48 state_iter != states.end();
49 state_iter++) {
50 m_avail_handles.push_back((*state_iter)->GetHandle());
51 }
52 }
53
54 ~MultiCurlHandler()
55 {
56 if (!m_handle) {return;}
57 for (std::vector<CURL *>::const_iterator it = m_active_handles.begin();
58 it != m_active_handles.end();
59 it++) {
60 curl_multi_remove_handle(m_handle, *it);
61 }
62 curl_multi_cleanup(m_handle);
63 }
64
65 MultiCurlHandler(const MultiCurlHandler &) = delete;
66
67 CURLM *Get() const {return m_handle;}
68
69 void FinishCurlXfer(CURL *curl) {
70 CURLMcode mres = curl_multi_remove_handle(m_handle, curl);
71 if (mres) {
72 std::stringstream ss;
73 ss << "Failed to remove transfer from set: "
74 << curl_multi_strerror(mres);
75 throw std::runtime_error(ss.str());
76 }
77 for (std::vector<State*>::iterator state_iter = m_states.begin();
78 state_iter != m_states.end();
79 state_iter++) {
80 if (curl == (*state_iter)->GetHandle()) {
81 m_bytes_transferred += (*state_iter)->BytesTransferred();
82 int error_code = (*state_iter)->GetErrorCode();
83 if (error_code && !m_error_code) {
84 m_error_code = error_code;
85 m_error_message = (*state_iter)->GetErrorMessage();
86 }
87 int status_code = (*state_iter)->GetStatusCode();
88 if (status_code >= 400 && !m_status_code) {
89 m_status_code = status_code;
90 m_error_message = (*state_iter)->GetErrorMessage();
91 }
92 (*state_iter)->ResetAfterRequest();
93 break;
94 }
95 }
96 for (std::vector<CURL *>::iterator iter = m_active_handles.begin();
97 iter != m_active_handles.end();
98 ++iter)
99 {
100 if (*iter == curl) {
101 m_active_handles.erase(iter);
102 break;
103 }
104 }
105 m_avail_handles.push_back(curl);
106 }
107
108 off_t StartTransfers(off_t current_offset, off_t content_length, size_t block_size,
109 int &running_handles) {
110 bool started_new_xfer = false;
111 do {
112 size_t xfer_size = std::min(content_length - current_offset, static_cast<off_t>(block_size));
113 if (xfer_size == 0) {return current_offset;}
114 if (!(started_new_xfer = StartTransfer(current_offset, xfer_size))) {
115 // In this case, we need to start new transfers but weren't able to.
116 if (running_handles == 0) {
117 if (!CanStartTransfer(true)) {
118 m_log.Emsg("StartTransfers", "Unable to start transfers.");
119 }
120 }
121 break;
122 } else {
123 running_handles += 1;
124 }
125 current_offset += xfer_size;
126 } while (true);
127 return current_offset;
128 }
129
130 int Flush() {
131 int last_error = 0;
132 for (std::vector<State*>::iterator state_it = m_states.begin();
133 state_it != m_states.end();
134 state_it++)
135 {
136 int error = (*state_it)->Flush();
137 if (error) {last_error = error;}
138 }
139 return last_error;
140 }
141
142 off_t BytesTransferred() const {
143 return m_bytes_transferred;
144 }
145
146 int GetStatusCode() const {
147 return m_status_code;
148 }
149
150 int GetErrorCode() const {
151 return m_error_code;
152 }
153
154 void SetErrorCode(int error_code) {
155 m_error_code = error_code;
156 }
157
158 std::string GetErrorMessage() const {
159 return m_error_message;
160 }
161
162 void SetErrorMessage(const std::string &error_msg) {
163 m_error_message = error_msg;
164 }
165
166private:
167
168 bool StartTransfer(off_t offset, size_t size) {
169 if (!CanStartTransfer(false)) {return false;}
170 for (std::vector<CURL*>::const_iterator handle_it = m_avail_handles.begin();
171 handle_it != m_avail_handles.end();
172 handle_it++) {
173 for (std::vector<State*>::iterator state_it = m_states.begin();
174 state_it != m_states.end();
175 state_it++) {
176 if ((*state_it)->GetHandle() == *handle_it) { // This state object represents an idle handle.
177 (*state_it)->SetTransferParameters(offset, size);
178 ActivateHandle(**state_it);
179 return true;
180 }
181 }
182 }
183 return false;
184 }
185
186 void ActivateHandle(State &state) {
187 CURL *curl = state.GetHandle();
188 m_active_handles.push_back(curl);
189 CURLMcode mres;
190 mres = curl_multi_add_handle(m_handle, curl);
191 if (mres) {
192 std::stringstream ss;
193 ss << "Failed to add transfer to libcurl multi-handle"
194 << curl_multi_strerror(mres);
195 throw std::runtime_error(ss.str());
196 }
197 for (auto iter = m_avail_handles.begin();
198 iter != m_avail_handles.end();
199 ++iter)
200 {
201 if (*iter == curl) {
202 m_avail_handles.erase(iter);
203 break;
204 }
205 }
206 }
207
208 bool CanStartTransfer(bool log_reason) const {
209 size_t idle_handles = m_avail_handles.size();
210 size_t transfer_in_progress = 0;
211 for (std::vector<State*>::const_iterator state_iter = m_states.begin();
212 state_iter != m_states.end();
213 state_iter++) {
214 for (std::vector<CURL*>::const_iterator handle_iter = m_active_handles.begin();
215 handle_iter != m_active_handles.end();
216 handle_iter++) {
217 if (*handle_iter == (*state_iter)->GetHandle()) {
218 transfer_in_progress += (*state_iter)->BodyTransferInProgress();
219 break;
220 }
221 }
222 }
223 if (!idle_handles) {
224 if (log_reason) {
225 m_log.Emsg("CanStartTransfer", "Unable to start transfers as no idle CURL handles are available.");
226 }
227 return false;
228 }
229 ssize_t available_buffers = m_states[0]->AvailableBuffers();
230 // To be conservative, set aside buffers for any transfers that have been activated
231 // but don't have their first responses back yet.
232 available_buffers -= (m_active_handles.size() - transfer_in_progress);
233 if (log_reason && (available_buffers == 0)) {
234 std::stringstream ss;
235 ss << "Unable to start transfers as no buffers are available. Available buffers: " <<
236 m_states[0]->AvailableBuffers() << ", Active curl handles: " << m_active_handles.size()
237 << ", Transfers in progress: " << transfer_in_progress;
238 m_log.Emsg("CanStartTransfer", ss.str().c_str());
239 if (m_states[0]->AvailableBuffers() == 0) {
240 m_states[0]->DumpBuffers();
241 }
242 }
243 return available_buffers > 0;
244 }
245
246 CURLM *m_handle;
247 std::vector<CURL *> m_avail_handles;
248 std::vector<CURL *> m_active_handles;
249 std::vector<State*> &m_states;
250 XrdSysError &m_log;
251 off_t m_bytes_transferred;
252 int m_error_code;
253 int m_status_code;
254 std::string m_error_message;
255};
256}
257
258
259int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
260 size_t streams, std::vector<State*> &handles,
261 std::vector<ManagedCurlHandle> &curl_handles, TPCLogRecord &rec)
262{
263 int result;
264 bool success;
265 CURL *curl = state.GetHandle();
266 if ((result = DetermineXferSize(curl, req, state, success, rec)) || !success) {
267 return result;
268 }
269 off_t content_size = state.GetContentLength();
270 off_t current_offset = 0;
271
272 state.ResetAfterRequest();
273
274 size_t concurrency = streams * m_pipelining_multiplier;
275
276 handles.reserve(concurrency);
277 handles.push_back(new State());
278 handles[0]->Move(state);
279 for (size_t idx = 1; idx < concurrency; idx++) {
280 handles.push_back(handles[0]->Duplicate());
281 curl_handles.emplace_back(handles.back()->GetHandle());
282 }
283
284 // Notify the packet marking manager that the transfer will start after this point
285 rec.pmarkManager.startTransfer();
286
287 // Create the multi-handle and add in the current transfer to it.
288 MultiCurlHandler mch(handles, m_log);
289 CURLM *multi_handle = mch.Get();
290
291#ifdef USE_PIPELINING
292 curl_multi_setopt(multi_handle, CURLMOPT_PIPELINING, 1);
293 curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, streams);
294#endif
295
296 // Start response to client prior to the first call to curl_multi_perform
297 int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
298 if (retval) {
299 logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
300 "Failed to send the initial response to the TPC client");
301 return retval;
302 } else {
303 logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
304 "Initial transfer response sent to the TPC client");
305 }
306
307 // Start assigning transfers
308 int running_handles = 0;
309 current_offset = mch.StartTransfers(current_offset, content_size, m_block_size, running_handles);
310
311 // Transfer loop: use curl to actually run the transfer, but periodically
312 // interrupt things to send back performance updates to the client.
313 time_t last_marker = 0;
314 // Track the time since the transfer last made progress
315 off_t last_advance_bytes = 0;
316 time_t last_advance_time = time(NULL);
317 time_t transfer_start = last_advance_time;
318 CURLcode res = static_cast<CURLcode>(-1);
319 CURLMcode mres = CURLM_OK;
320 do {
321 time_t now = time(NULL);
322 time_t next_marker = last_marker + m_marker_period;
323 if (now >= next_marker) {
324 if (current_offset > last_advance_bytes) {
325 last_advance_bytes = current_offset;
326 last_advance_time = now;
327 }
328 if (SendPerfMarker(req, rec, handles, current_offset)) {
329 logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
330 "Failed to send a perf marker to the TPC client");
331 return -1;
332 }
333 int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
334 if (now > last_advance_time + timeout) {
335 const char *log_prefix = rec.log_prefix.c_str();
336 bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
337
338 mch.SetErrorCode(10);
339 std::stringstream ss;
340 ss << "Transfer failed because no bytes have been "
341 << (tpc_pull ? "received from the source (pull mode) in "
342 : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
343 mch.SetErrorMessage(ss.str());
344 break;
345 }
346 last_marker = now;
347 }
348
349 mres = curl_multi_perform(multi_handle, &running_handles);
350 if (mres == CURLM_CALL_MULTI_PERFORM) {
351 // curl_multi_perform should be called again immediately. On newer
352 // versions of curl, this is no longer used.
353 continue;
354 } else if (mres != CURLM_OK) {
355 break;
356 }
357
358 rec.pmarkManager.beginPMarks();
359
360
361 // Harvest any messages, looking for CURLMSG_DONE.
362 CURLMsg *msg;
363 do {
364 int msgq = 0;
365 msg = curl_multi_info_read(multi_handle, &msgq);
366 if (msg && (msg->msg == CURLMSG_DONE)) {
367 CURL *easy_handle = msg->easy_handle;
368 res = msg->data.result;
369 mch.FinishCurlXfer(easy_handle);
370 // If any requests fail, cut off the entire transfer.
371 if (res != CURLE_OK) {
372 break;
373 }
374 }
375 } while (msg);
376 if (res != static_cast<CURLcode>(-1) && res != CURLE_OK) {
377 std::stringstream ss;
378 ss << "Breaking loop due to failed curl transfer: " << curl_easy_strerror(res);
379 logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_CURL_FAILURE",
380 ss.str());
381 break;
382 }
383
384 if (running_handles < static_cast<int>(concurrency)) {
385 // Issue new transfers if there is still pending work to do.
386 // Otherwise, continue running until there are no handles left.
387 if (current_offset != content_size) {
388 current_offset = mch.StartTransfers(current_offset, content_size,
389 m_block_size, running_handles);
390 if (!running_handles) {
391 std::stringstream ss;
392 ss << "No handles are able to run. Streams=" << streams << ", concurrency="
393 << concurrency;
394
395 logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE", ss.str());
396 }
397 } else if (running_handles == 0) {
398 logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE",
399 "Unable to start new transfers; breaking loop.");
400 break;
401 }
402 }
403
404 int64_t max_sleep_time = next_marker - time(NULL);
405 if (max_sleep_time <= 0) {
406 continue;
407 }
408 int fd_count;
409#ifdef HAVE_CURL_MULTI_WAIT
410 mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000,
411 &fd_count);
412#else
413 mres = curl_multi_wait_impl(multi_handle, max_sleep_time*1000,
414 &fd_count);
415#endif
416 if (mres != CURLM_OK) {
417 break;
418 }
419 } while (running_handles);
420
421 if (mres != CURLM_OK) {
422 std::stringstream ss;
423 ss << "Internal libcurl multi-handle error: "
424 << curl_multi_strerror(mres);
425 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", ss.str());
426 throw std::runtime_error(ss.str());
427 }
428
429 // Harvest any messages, looking for CURLMSG_DONE.
430 CURLMsg *msg;
431 do {
432 int msgq = 0;
433 msg = curl_multi_info_read(multi_handle, &msgq);
434 if (msg && (msg->msg == CURLMSG_DONE)) {
435 CURL *easy_handle = msg->easy_handle;
436 mch.FinishCurlXfer(easy_handle);
437 if (res == CURLE_OK || res == static_cast<CURLcode>(-1))
438 res = msg->data.result; // Transfer result will be examined below.
439 }
440 } while (msg);
441
442 if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
443 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
444 "Internal state error in libcurl");
445 throw std::runtime_error("Internal state error in libcurl");
446 }
447
448 mch.Flush();
449
450 rec.bytes_transferred = mch.BytesTransferred();
451 rec.tpc_status = mch.GetStatusCode();
452
453 // Generate the final response back to the client.
454 std::stringstream ss;
455 success = false;
456 if (mch.GetStatusCode() >= 400) {
457 std::string err = mch.GetErrorMessage();
458 std::stringstream ss2;
459 ss2 << "Remote side failed with status code " << mch.GetStatusCode();
460 if (!err.empty()) {
461 std::replace(err.begin(), err.end(), '\n', ' ');
462 ss2 << "; error message: \"" << err << "\"";
463 }
464 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
465 ss << generateClientErr(ss2, rec);
466 } else if (mch.GetErrorCode()) {
467 std::string err = mch.GetErrorMessage();
468 if (err.empty()) {err = "(no error message provided)";}
469 else {std::replace(err.begin(), err.end(), '\n', ' ');}
470 std::stringstream ss2;
471 ss2 << "Error when interacting with local filesystem: " << err;
472 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
473 ss << generateClientErr(ss2, rec);
474 } else if (res != CURLE_OK) {
475 std::stringstream ss2;
476 ss2 << "Request failed when processing";
477 std::stringstream ss3;
478 ss3 << ss2.str() << ":" << curl_easy_strerror(res);
479 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss3.str());
480 ss << generateClientErr(ss2, rec, res);
481 } else if (current_offset != content_size) {
482 std::stringstream ss2;
483 ss2 << "Internal logic error led to early abort; current offset is " <<
484 current_offset << " while full size is " << content_size;
485 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
486 ss << generateClientErr(ss2, rec);
487 } else {
488 if (!handles[0]->Finalize()) {
489 std::stringstream ss2;
490 ss2 << "Failed to finalize and close file handle.";
491 ss << generateClientErr(ss2, rec);
492 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
493 ss2.str());
494 } else {
495 ss << "success: Created";
496 success = true;
497 }
498 }
499
500 if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
501 logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
502 "Failed to send last update to remote client");
503 return retval;
504 } else if (success) {
505 logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
506 rec.status = 0;
507 }
508 return req.ChunkResp(NULL, 0);
509}
510
511
512int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
513 size_t streams, TPCLogRecord &rec)
514{
515 std::vector<ManagedCurlHandle> curl_handles;
516 std::vector<State*> handles;
517 std::stringstream err_ss;
518 try {
519 int retval = RunCurlWithStreamsImpl(req, state, streams, handles, curl_handles, rec);
520 for (std::vector<State*>::iterator state_iter = handles.begin();
521 state_iter != handles.end();
522 state_iter++) {
523 delete *state_iter;
524 }
525 return retval;
526 } catch (CurlHandlerSetupError &e) {
527 for (std::vector<State*>::iterator state_iter = handles.begin();
528 state_iter != handles.end();
529 state_iter++) {
530 delete *state_iter;
531 }
532
533 rec.status = 500;
534 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
535 std::stringstream ss;
536 ss << e.what();
537 err_ss << generateClientErr(ss, rec);
538 return req.SendSimpleResp(rec.status, NULL, NULL, e.what(), 0);
539 } catch (std::runtime_error &e) {
540 for (std::vector<State*>::iterator state_iter = handles.begin();
541 state_iter != handles.end();
542 state_iter++) {
543 delete *state_iter;
544 }
545
546 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
547 std::stringstream ss;
548 ss << e.what();
549 err_ss << generateClientErr(ss, rec);
550 int retval;
551 if ((retval = req.ChunkResp(err_ss.str().c_str(), 0))) {
552 return retval;
553 }
554 return req.ChunkResp(NULL, 0);
555 }
556}
557
558#endif // XRD_CHUNK_RESP
#define Duplicate(x, y)
CURLMcode curl_multi_wait_impl(CURLM *multi_handle, int timeout_ms, int *numfds)
void CURL
CURL * GetHandle() const
void ResetAfterRequest()
int GetErrorCode() const
off_t GetContentLength() const
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.