XRootD
Loading...
Searching...
No Matches
XrdLinkXeq.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d L i n k X e q . c c */
4/* */
5/* (c) 2018 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <limits.h>
31#include <poll.h>
32#include <signal.h>
33#include <cstdio>
34#include <cstring>
35#include <unistd.h>
36#include <sys/types.h>
37#include <sys/uio.h>
38
39#if defined(__linux__) || defined(__GNU__)
40#include <netinet/tcp.h>
41#if !defined(TCP_CORK)
42#undef HAVE_SENDFILE
43#endif
44#endif
45
46#ifdef HAVE_SENDFILE
47
48#if defined(__solaris__) || defined(__linux__) || defined(__GNU__)
49#include <sys/sendfile.h>
50#endif
51
52#endif
53
55#include "XrdSys/XrdSysError.hh"
56#include "XrdSys/XrdSysFD.hh"
58
59#include "Xrd/XrdBuffer.hh"
60#include "Xrd/XrdLink.hh"
61#include "Xrd/XrdLinkCtl.hh"
62#include "Xrd/XrdLinkXeq.hh"
63#include "Xrd/XrdPoll.hh"
64#include "Xrd/XrdScheduler.hh"
65#include "Xrd/XrdSendQ.hh"
66#include "Xrd/XrdTcpMonPin.hh"
67
68#define TRACE_IDENT ID
69#include "Xrd/XrdTrace.hh"
70
71/******************************************************************************/
72/* G l o b a l s */
73/******************************************************************************/
74
75namespace
76{
77int getIovMax()
78{
79int maxiov;
80#ifdef _SC_IOV_MAX
81 if ((maxiov = sysconf(_SC_IOV_MAX)) > 0) return maxiov;
82#endif
83#ifdef IOV_MAX
84 return IOV_MAX;
85#else
86 return 1024;
87#endif
88}
89};
90
91namespace XrdGlobal
92{
93extern XrdSysError Log;
94extern XrdScheduler Sched;
95extern XrdTlsContext *tlsCtx;
97extern int devNull;
98 int maxIOV = getIovMax();
99};
100
101using namespace XrdGlobal;
102
103/******************************************************************************/
104/* S t a t i c s */
105/******************************************************************************/
106
107 const char *XrdLinkXeq::TraceID = "LinkXeq";
108
109 long long XrdLinkXeq::LinkBytesIn = 0;
110 long long XrdLinkXeq::LinkBytesOut = 0;
111 long long XrdLinkXeq::LinkConTime = 0;
112 long long XrdLinkXeq::LinkCountTot = 0;
113 int XrdLinkXeq::LinkCount = 0;
119
120/******************************************************************************/
121/* C o n s t r u c t o r */
122/******************************************************************************/
123
124XrdLinkXeq::XrdLinkXeq() : XrdLink(*this), PollInfo((XrdLink &)*this)
125{
127}
128
130{
131 memcpy(Uname+sizeof(Uname)-7, "anon.0@", 7);
132 strcpy(Lname, "somewhere");
133 ID = &Uname[sizeof(Uname)-5];
134 Comment = ID;
135 sendQ = 0;
136 stallCnt = stallCntTot = 0;
137 tardyCnt = tardyCntTot = 0;
138 SfIntr = 0;
139 isIdle = 0;
141 LockReads= false;
142 KeepFD = false;
143 Protocol = 0;
144 ProtoAlt = 0;
145
146 LinkInfo.Reset();
147 PollInfo.Zorch();
148 ResetLink();
149}
150
151/******************************************************************************/
152/* B a c k l o g */
153/******************************************************************************/
154
156{
158
159// Return backlog information
160//
161 return (sendQ ? sendQ->Backlog() : 0);
162}
163
164/******************************************************************************/
165/* C l i e n t */
166/******************************************************************************/
167
168int XrdLinkXeq::Client(char *nbuf, int nbsz)
169{
170 int ulen;
171
172// Generate full client name
173//
174 if (nbsz <= 0) return 0;
175 ulen = (Lname - ID);
176 if ((ulen + HNlen) >= nbsz) ulen = 0;
177 else {strncpy(nbuf, ID, ulen);
178 strcpy(nbuf+ulen, HostName);
179 ulen += HNlen;
180 }
181 return ulen;
182}
183
184/******************************************************************************/
185/* C l o s e */
186/******************************************************************************/
187
188int XrdLinkXeq::Close(bool defer)
190 int csec, fd, rc = 0;
191
192// If a defer close is requested, we can close the descriptor but we must
193// keep the slot number to prevent a new client getting the same fd number.
194// Linux is peculiar in that any in-progress operations will remain in that
195// state even after the FD is closed unless there is some activity either on
196// the connection or an event occurs that causes an operation restart. We
197// portably solve this problem by issuing a shutdown() on the socket prior
198// closing it. On most platforms, this informs readers that the connection is
199// gone (though not on old (i.e. <= 2.3) versions of Linux, sigh). Also, if
200// nonblocking mode is enabled, we need to do this in a separate thread as
201// a shutdown may block for a pretty long time if lots\ of messages are queued.
202// We will ask the SendQ object to schedule the shutdown for us before it
203// commits suicide.
204// Note that we can hold the opMutex while we also get the wrMutex.
205//
206 if (defer)
207 {if (!sendQ) Shutdown(false);
208 else {TRACEI(DEBUG, "Shutdown FD " <<LinkInfo.FD<<" only via SendQ");
209 LinkInfo.InUse++;
210 LinkInfo.FD = -LinkInfo.FD; // Leave poll version untouched!
211 wrMutex.Lock();
212 sendQ->Terminate(this);
213 sendQ = 0;
214 wrMutex.UnLock();
215 }
216 return 0;
217 }
218
219// If we got here then this is not a deferred close so we just need to check
220// if there is a sendq appendage we need to get rid of.
221//
222 if (sendQ)
223 {wrMutex.Lock();
224 sendQ->Terminate();
225 sendQ = 0;
226 wrMutex.UnLock();
227 }
228
229// Multiple protocols may be bound to this link. If it is in use, defer the
230// actual close until the use count drops to one.
231//
232 while(LinkInfo.InUse > 1)
233 {opHelper.UnLock();
234 TRACEI(DEBUG, "Close FD "<<LinkInfo.FD <<" deferred, use count="
235 <<LinkInfo.InUse);
236 Serialize();
237 opHelper.Lock(&LinkInfo.opMutex);
238 }
239 LinkInfo.InUse--;
240 Instance = 0;
241
242// Add up the statistic for this link
243//
244 syncStats(&csec);
245
246// Cleanup TLS if it is active
247//
248 if (isTLS) tlsIO.Shutdown();
249
250// Clean this link up
251//
252 if (Protocol) {Protocol->Recycle(this, csec, LinkInfo.Etext); Protocol = 0;}
253 if (ProtoAlt) {ProtoAlt->Recycle(this, csec, LinkInfo.Etext); ProtoAlt = 0;}
254 if (LinkInfo.Etext) {free(LinkInfo.Etext); LinkInfo.Etext = 0;}
255 LinkInfo.InUse = 0;
256
257// At this point we can have no lock conflicts, so if someone is waiting for
258// us to terminate let them know about it. Note that we will get the condvar
259// mutex while we hold the opMutex. This is the required order! We will also
260// zero out the pointer to the condvar while holding the opmutex.
261//
262 if (LinkInfo.KillcvP)
266 LinkInfo.KillcvP = 0;
267 }
268
269// Remove ourselves from the poll table and then from the Link table. We may
270// not hold on to the opMutex when we acquire the LTMutex. However, the link
271// table needs to be cleaned up prior to actually closing the socket. So, we
272// do some fancy footwork to prevent multiple closes of this link.
273//
274 fd = abs(LinkInfo.FD);
275 if (PollInfo.FD > 0)
277 PollInfo.FD = -1;
278 opHelper.UnLock();
280 } else opHelper.UnLock();
281
282// Invoke the TCP monitor if it was loaded.
283//
284 if (TcpMonPin && fd > 2)
285 {XrdTcpMonPin::LinkInfo lnkInfo;
286 lnkInfo.tident = ID;
287 lnkInfo.fd = fd;
288 lnkInfo.consec = csec;
289 lnkInfo.bytesIn = BytesInTot;
290 lnkInfo.bytesOut = BytesOutTot;
291 TcpMonPin->Monitor(Addr, lnkInfo, sizeof(lnkInfo));
292 }
293
294// Close the file descriptor if it isn't being shared. Do it as the last
295// thing because closes and accepts and not interlocked.
296//
297 if (fd >= 2) {if (KeepFD) rc = 0;
298 else rc = (close(fd) < 0 ? errno : 0);
299 }
300 if (rc) Log.Emsg("Link", rc, "close", ID);
301 return rc;
302}
303
304/******************************************************************************/
305/* D o I t */
306/******************************************************************************/
307
309{
310 int rc;
311
312// The Process() return code tells us what to do:
313// < 0 -> Stop getting requests,
314// -EINPROGRESS leave link disabled but otherwise all is well
315// -n Error, disable and close the link
316// = 0 -> OK, get next request, if allowed, o/w enable the link
317// > 0 -> Slow link, stop getting requests and enable the link
318//
319 if (Protocol)
320 do {rc = Protocol->Process(this);} while (!rc && Sched.canStick());
321 else {Log.Emsg("Link", "Dispatch on closed link", ID);
322 return;
323 }
324
325// Either re-enable the link and cycle back waiting for a new request, leave
326// disabled, or terminate the connection.
327//
328 if (rc >= 0)
330 else if (rc != -EINPROGRESS) Close();
331}
332
333/******************************************************************************/
334/* g e t P e e r C e r t s */
335/******************************************************************************/
336
338{
339 return (isTLS ? tlsIO.getCerts(true) : 0);
340}
341
342/******************************************************************************/
343/* P e e k */
344/******************************************************************************/
345
346int XrdLinkXeq::Peek(char *Buff, int Blen, int timeout)
347{
348 XrdSysMutexHelper theMutex;
349 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
350 ssize_t mlen;
351 int retc;
352
353// Lock the read mutex if we need to, the helper will unlock it upon exit
354//
355 if (LockReads) theMutex.Lock(&rdMutex);
356
357// Wait until we can actually read something
358//
359 isIdle = 0;
360 do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
361 if (retc != 1)
362 {if (retc == 0) return 0;
363 return Log.Emsg("Link", -errno, "poll", ID);
364 }
365
366// Verify it is safe to read now
367//
368 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
369 {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
370 return -1;
371 }
372
373// Do the peek.
374//
375 do {mlen = recv(LinkInfo.FD, Buff, Blen, MSG_PEEK);}
376 while(mlen < 0 && errno == EINTR);
377
378// Return the result
379//
380 if (mlen >= 0) return int(mlen);
381 Log.Emsg("Link", errno, "peek on", ID);
382 return -1;
383}
384
385/******************************************************************************/
386/* R e c v */
387/******************************************************************************/
388
389int XrdLinkXeq::Recv(char *Buff, int Blen)
390{
391 ssize_t rlen;
392
393// Note that we will read only as much as is queued. Use Recv() with a
394// timeout to receive as much data as possible.
395//
396 if (LockReads) rdMutex.Lock();
397 isIdle = 0;
398 do {rlen = read(LinkInfo.FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
399 if (rlen > 0) AtomicAdd(BytesIn, rlen);
400 if (LockReads) rdMutex.UnLock();
401
402 if (rlen >= 0) return int(rlen);
403 if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
404 return -1;
405}
406
407/******************************************************************************/
408
409int XrdLinkXeq::Recv(char *Buff, int Blen, int timeout)
410{
411 XrdSysMutexHelper theMutex;
412 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
413 ssize_t rlen, totlen = 0;
414 int retc;
415
416// Lock the read mutex if we need to, the helper will unlock it upon exit
417//
418 if (LockReads) theMutex.Lock(&rdMutex);
419
420// Wait up to timeout milliseconds for data to arrive
421//
422 isIdle = 0;
423 while(Blen > 0)
424 {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
425 if (retc != 1)
426 {if (retc == 0)
427 {tardyCnt++;
428 if (totlen)
429 {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
430 AtomicAdd(BytesIn, totlen);
431 }
432 return int(totlen);
433 }
434 return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
435 }
436
437 // Verify it is safe to read now
438 //
439 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
440 {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents),
441 "polling", ID);
442 return -1;
443 }
444
445 // Read as much data as you can. Note that we will force an error
446 // if we get a zero-length read after poll said it was OK.
447 //
448 do {rlen = recv(LinkInfo.FD, Buff, Blen, 0);}
449 while(rlen < 0 && errno == EINTR);
450 if (rlen <= 0)
451 {if (!rlen) return -ENOMSG;
452 if (LinkInfo.FD > 0) Log.Emsg("Link", -errno, "receive from", ID);
453 return -1;
454 }
455 totlen += rlen; Blen -= rlen; Buff += rlen;
456 }
457
458 AtomicAdd(BytesIn, totlen);
459 return int(totlen);
460}
461
462/******************************************************************************/
463
464int XrdLinkXeq::Recv(const struct iovec *iov, int iocnt, int timeout)
465{
466 XrdSysMutexHelper theMutex;
467 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
468 int retc, rlen;
469
470// Lock the read mutex if we need to, the helper will unlock it upon exit
471//
472 if (LockReads) theMutex.Lock(&rdMutex);
473
474// Wait up to timeout milliseconds for data to arrive
475//
476 isIdle = 0;
477 do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
478 if (retc != 1)
479 {if (retc == 0)
480 {tardyCnt++;
481 return 0;
482 }
483 return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
484 }
485
486// Verify it is safe to read now
487//
488 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
489 {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
490 return -1;
491 }
492
493// If the iocnt is within limits then just go ahead and read once.
494//
495 if (iocnt <= maxIOV)
496 {rlen = RecvIOV(iov, iocnt);
497 if (rlen > 0) {AtomicAdd(BytesIn, rlen);}
498 return rlen;
499 }
500
501// We will have to break this up into allowable segments and we need to add up
502// the bytes in each segment so that we know when to stop reading.
503//
504 int seglen, segcnt = maxIOV, totlen = 0;
505 do {seglen = 0;
506 for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
507 if ((rlen = RecvIOV(iov, segcnt)) < 0) return rlen;
508 totlen += rlen;
509 if (rlen < seglen) break;
510 iov += segcnt;
511 iocnt -= segcnt;
512 if (iocnt <= maxIOV) segcnt = iocnt;
513 } while(iocnt > 0);
514
515// All done
516//
517 AtomicAdd(BytesIn, totlen);
518 return totlen;
519}
520
521/******************************************************************************/
522/* R e c v A l l */
523/******************************************************************************/
524
525int XrdLinkXeq::RecvAll(char *Buff, int Blen, int timeout)
526{
527 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
528 ssize_t rlen;
529 int retc;
530
531// Check if timeout specified. Notice that the timeout is the max we will
532// for some data. We will wait forever for all the data. Yeah, it's weird.
533//
534 if (timeout >= 0)
535 {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
536 if (retc != 1)
537 {if (!retc) return -ETIMEDOUT;
538 Log.Emsg("Link",errno,"poll",ID);
539 return -1;
540 }
541 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
542 {Log.Emsg("Link",XrdPoll::Poll2Text(polltab.revents),"polling",ID);
543 return -1;
544 }
545 }
546
547// Note that we will block until we receive all he bytes.
548//
549 if (LockReads) rdMutex.Lock();
550 isIdle = 0;
551 do {rlen = recv(LinkInfo.FD, Buff, Blen, MSG_WAITALL);}
552 while(rlen < 0 && errno == EINTR);
553 if (rlen > 0) AtomicAdd(BytesIn, rlen);
554 if (LockReads) rdMutex.UnLock();
555
556 if (int(rlen) == Blen) return Blen;
557 if (!rlen) {TRACEI(DEBUG, "No RecvAll() data; errno=" <<errno);}
558 else if (rlen > 0) Log.Emsg("RecvAll", "Premature end from", ID);
559 else if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
560 return -1;
561}
562
563/******************************************************************************/
564/* Protected: R e c v I O V */
565/******************************************************************************/
566
567int XrdLinkXeq::RecvIOV(const struct iovec *iov, int iocnt)
568{
569 ssize_t retc = 0;
570
571// Read the data in. On some version of Unix (e.g., Linux) a readv() may
572// end at any time without reading all the bytes when directed to a socket.
573// We always return the number bytes read (or an error). The caller needs to
574// restart the read at the appropriate place in the iovec when more data arrives.
575//
576 do {retc = readv(LinkInfo.FD, iov, iocnt);}
577 while(retc < 0 && errno == EINTR);
578
579// Check how we completed
580//
581 if (retc < 0) Log.Emsg("Link", errno, "receive from", ID);
582 return retc;
583}
584
585/******************************************************************************/
586/* R e g i s t e r */
587/******************************************************************************/
588
589bool XrdLinkXeq::Register(const char *hName)
590{
591
592// First see if we can register this name with the address object
593//
594 if (!Addr.Register(hName)) return false;
595
596// Make appropriate changes here
597//
598 if (HostName) free(HostName);
599 HostName = strdup(hName);
600 strlcpy(Lname, hName, sizeof(Lname));
601 return true;
602}
603
604/******************************************************************************/
605/* S e n d */
606/******************************************************************************/
607
608int XrdLinkXeq::Send(const char *Buff, int Blen)
609{
610 ssize_t retc = 0, bytesleft = Blen;
611
612// Get a lock
613//
614 wrMutex.Lock();
615 isIdle = 0;
616 AtomicAdd(BytesOut, Blen);
617
618// Do non-blocking writes if we are setup to do so.
619//
620 if (sendQ)
621 {retc = sendQ->Send(Buff, Blen);
622 wrMutex.UnLock();
623 return retc;
624 }
625
626// Write the data out
627//
628 while(bytesleft)
629 {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
630 {if (errno == EINTR) continue;
631 else break;
632 }
633 bytesleft -= retc; Buff += retc;
634 }
635
636// All done
637//
638 wrMutex.UnLock();
639 if (retc >= 0) return Blen;
640 Log.Emsg("Link", errno, "send to", ID);
641 return -1;
642}
643
644/******************************************************************************/
645
646int XrdLinkXeq::Send(const struct iovec *iov, int iocnt, int bytes)
647{
648 int retc;
649 static int maxIOV = -1;
650 if (maxIOV == -1) {
651#ifdef _SC_IOV_MAX
652 maxIOV = sysconf(_SC_IOV_MAX);
653 if (maxIOV == -1)
654#endif
655#ifdef IOV_MAX
656 maxIOV = IOV_MAX;
657#else
658 maxIOV = 1024;
659#endif
660 }
661
662// Get a lock and assume we will be successful (statistically we are)
663//
664 wrMutex.Lock();
665 isIdle = 0;
666 AtomicAdd(BytesOut, bytes);
667
668// Do non-blocking writes if we are setup to do so.
669//
670 if (sendQ)
671 {retc = sendQ->Send(iov, iocnt, bytes);
672 wrMutex.UnLock();
673 return retc;
674 }
675
676// If the iocnt is within limits then just go ahead and write this out
677//
678 if (iocnt <= maxIOV)
679 {retc = SendIOV(iov, iocnt, bytes);
680 wrMutex.UnLock();
681 return retc;
682 }
683
684// We will have to break this up into allowable segments
685//
686 int seglen, segcnt = maxIOV, iolen = 0;
687 do {seglen = 0;
688 for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
689 if ((retc = SendIOV(iov, segcnt, seglen)) < 0)
690 {wrMutex.UnLock();
691 return retc;
692 }
693 iolen += retc;
694 iov += segcnt;
695 iocnt -= segcnt;
696 if (iocnt <= maxIOV) segcnt = iocnt;
697 } while(iocnt > 0);
698
699// All done
700//
701 wrMutex.UnLock();
702 return iolen;
703}
704
705/******************************************************************************/
706
707int XrdLinkXeq::Send(const sfVec *sfP, int sfN)
708{
709#if !defined(HAVE_SENDFILE)
710
711 return -1;
712
713#elif defined(__solaris__)
714
715 sendfilevec_t vecSF[XrdOucSFVec::sfMax], *vecSFP = vecSF;
716 size_t xframt, totamt, bytes = 0;
717 ssize_t retc;
718 int i = 0;
719
720// Construct the sendfilev() vector
721//
722 for (i = 0; i < sfN; sfP++, i++)
723 {if (sfP->fdnum < 0)
724 {vecSF[i].sfv_fd = SFV_FD_SELF;
725 vecSF[i].sfv_off = (off_t)sfP->buffer;
726 } else {
727 vecSF[i].sfv_fd = sfP->fdnum;
728 vecSF[i].sfv_off = sfP->offset;
729 }
730 vecSF[i].sfv_flag = 0;
731 vecSF[i].sfv_len = sfP->sendsz;
732 bytes += sfP->sendsz;
733 }
734 totamt = bytes;
735
736// Lock the link, issue sendfilev(), and unlock the link. The documentation
737// is very spotty and inconsistent. We can only retry this operation under
738// very limited conditions.
739//
740 wrMutex.Lock();
741 isIdle = 0;
742do{retc = sendfilev(LinkInfo.FD, vecSFP, sfN, &xframt);
743
744// Check if all went well and return if so (usual case)
745//
746 if (xframt == bytes)
747 {AtomicAdd(BytesOut, bytes);
748 wrMutex.UnLock();
749 return totamt;
750 }
751
752// The only one we will recover from is EINTR. We cannot legally get EAGAIN.
753//
754 if (retc < 0 && errno != EINTR) break;
755
756// Try to resume the transfer
757//
758 if (xframt > 0)
759 {AtomicAdd(BytesOut, xframt); bytes -= xframt; SfIntr++;
760 while(xframt > 0 && sfN)
761 {if ((ssize_t)xframt < (ssize_t)vecSFP->sfv_len)
762 {vecSFP->sfv_off += xframt; vecSFP->sfv_len -= xframt; break;}
763 xframt -= vecSFP->sfv_len; vecSFP++; sfN--;
764 }
765 }
766 } while(sfN > 0);
767
768// See if we can recover without destroying the connection
769//
770 retc = (retc < 0 ? errno : ECANCELED);
771 wrMutex.UnLock();
772 Log.Emsg("Link", retc, "send file to", ID);
773 return -1;
774
775#elif defined(__linux__) || defined(__GNU__)
776
777 static const int setON = 1, setOFF = 0;
778 ssize_t retc = 0, bytesleft;
779 off_t myOffset;
780 int i, xfrbytes = 0, uncork = 1, xIntr = 0;
781
782// lock the link
783//
784 wrMutex.Lock();
785 isIdle = 0;
786
787// In linux we need to cork the socket. On permanent errors we do not uncork
788// the socket because it will be closed in short order.
789//
790 if (setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setON, sizeof(setON)) < 0)
791 {Log.Emsg("Link", errno, "cork socket for", ID);
792 uncork = 0; sfOK = 0;
793 }
794
795// Send the header first
796//
797 for (i = 0; i < sfN; sfP++, i++)
798 {if (sfP->fdnum < 0) retc = sendData(sfP->buffer, sfP->sendsz);
799 else {myOffset = sfP->offset; bytesleft = sfP->sendsz;
800 while(bytesleft
801 && (retc=sendfile(LinkInfo.FD,sfP->fdnum,&myOffset,bytesleft)) > 0)
802 {bytesleft -= retc; xIntr++;}
803 }
804 if (retc < 0 && errno == EINTR) continue;
805 if (retc <= 0) break;
806 xfrbytes += sfP->sendsz;
807 }
808
809// Diagnose any sendfile errors
810//
811 if (retc <= 0)
812 {if (retc == 0) errno = ECANCELED;
813 wrMutex.UnLock();
814 Log.Emsg("Link", errno, "send file to", ID);
815 return -1;
816 }
817
818// Now uncork the socket
819//
820 if (uncork
821 && setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setOFF, sizeof(setOFF)) < 0)
822 Log.Emsg("Link", errno, "uncork socket for", ID);
823
824// All done
825//
826 if (xIntr > sfN) SfIntr += (xIntr - sfN);
827 AtomicAdd(BytesOut, xfrbytes);
828 wrMutex.UnLock();
829 return xfrbytes;
830
831#else
832
833 return -1;
834
835#endif
836}
837
838/******************************************************************************/
839/* Protected: s e n d D a t a */
840/******************************************************************************/
841
842int XrdLinkXeq::sendData(const char *Buff, int Blen)
843{
844 ssize_t retc = 0, bytesleft = Blen;
845
846// Write the data out
847//
848 while(bytesleft)
849 {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
850 {if (errno == EINTR) continue;
851 else break;
852 }
853 bytesleft -= retc; Buff += retc;
854 }
855
856// All done
857//
858 return retc;
859}
860
861/******************************************************************************/
862/* Protected: S e n d I O V */
863/******************************************************************************/
864
865int XrdLinkXeq::SendIOV(const struct iovec *iov, int iocnt, int bytes)
866{
867 ssize_t bytesleft, n, retc = 0;
868 const char *Buff;
869
870// Write the data out. On some version of Unix (e.g., Linux) a writev() may
871// end at any time without writing all the bytes when directed to a socket.
872// So, we attempt to resume the writev() using a combination of write() and
873// a writev() continuation. This approach slowly converts a writev() to a
874// series of writes if need be. We must do this inline because we must hold
875// the lock until all the bytes are written or an error occurs.
876//
877 bytesleft = static_cast<ssize_t>(bytes);
878 while(bytesleft)
879 {do {retc = writev(LinkInfo.FD, iov, iocnt);}
880 while(retc < 0 && errno == EINTR);
881 if (retc >= bytesleft || retc < 0) break;
882 bytesleft -= retc;
883 while(retc >= (n = static_cast<ssize_t>(iov->iov_len)))
884 {retc -= n; iov++; iocnt--;}
885 Buff = (const char *)iov->iov_base + retc; n -= retc; iov++; iocnt--;
886 while(n) {if ((retc = write(LinkInfo.FD, Buff, n)) < 0)
887 {if (errno == EINTR) continue;
888 else break;
889 }
890 n -= retc; Buff += retc; bytesleft -= retc;
891 }
892 if (retc < 0 || iocnt < 1) break;
893 }
894
895// All done
896//
897 if (retc >= 0) return bytes;
898 Log.Emsg("Link", errno, "send to", ID);
899 return -1;
900}
901
902/******************************************************************************/
903/* s e t I D */
904/******************************************************************************/
905
906void XrdLinkXeq::setID(const char *userid, int procid)
907{
908 char buff[sizeof(Uname)], *bp, *sp;
909 int ulen;
910
911 snprintf(buff, sizeof(buff), "%s.%d:%d", userid, procid, PollInfo.FD);
912 ulen = strlen(buff);
913 sp = buff + ulen - 1;
914 bp = &Uname[sizeof(Uname)-1];
915 if (ulen > (int)sizeof(Uname)) ulen = sizeof(Uname);
916 *bp = '@'; bp--;
917 while(ulen--) {*bp = *sp; bp--; sp--;}
918 ID = bp+1;
919 Comment = (const char *)ID;
920
921// Update the ID in the TLS socket if enabled
922//
923 if (isTLS) tlsIO.SetTraceID(ID);
924}
925
926/******************************************************************************/
927/* s e t N B */
928/******************************************************************************/
929
931{
932// We don't support non-blocking output except for Linux at the moment
933//
934#if !defined(__linux__)
935 return false;
936#else
937// Trace this request
938//
939 TRACEI(DEBUG,"enabling non-blocking output");
940
941// If we don't already have a sendQ object get one. This is a one-time call
942// so to optimize checking if this object exists we also get the opMutex.'
943//
945 if (!sendQ)
946 {wrMutex.Lock();
947 sendQ = new XrdSendQ(*this, wrMutex);
948 wrMutex.UnLock();
949 }
951 return true;
952#endif
953}
954
955/******************************************************************************/
956/* s e t P r o t o c o l */
957/******************************************************************************/
958
960{
961
962// Set new protocol.
963//
965 XrdProtocol *op = Protocol;
966 if (push) ProtoAlt = Protocol;
967 Protocol = pp;
969 return op;
970}
971
972/******************************************************************************/
973/* s e t P r o t N a m e */
974/******************************************************************************/
975
976void XrdLinkXeq::setProtName(const char *name)
977{
978
979// Set the protocol name.
980//
982 Addr.SetDialect(name);
984}
985
986/******************************************************************************/
987/* s e t T L S */
988/******************************************************************************/
989
990bool XrdLinkXeq::setTLS(bool enable, XrdTlsContext *ctx)
991{ //???
992// static const XrdTlsConnection::RW_Mode rwMode=XrdTlsConnection::TLS_RNB_WBL;
995 const char *eNote;
996 XrdTls::RC rc;
997
998// If we are already in a compatible mode, we are done
999//
1000
1001 if (isTLS == enable) return true;
1002
1003// If this is a shutdown, then do it now.
1004//
1005 if (!enable)
1006 {tlsIO.Shutdown();
1007 isTLS = enable;
1008 Addr.SetTLS(enable);
1009 return true;
1010 }
1011// We want to initialize TLS, do so now.
1012//
1013 if (!ctx) ctx = tlsCtx;
1014 eNote = tlsIO.Init(*ctx, PollInfo.FD, rwMode, hsMode, false, false, ID);
1015
1016// Check for errors
1017//
1018 if (eNote)
1019 {char buff[1024];
1020 snprintf(buff, sizeof(buff), "Unable to enable tls for %s;", ID);
1021 Log.Emsg("LinkXeq", buff, eNote);
1022 return false;
1023 }
1024
1025// Now we need to accept this TLS connection
1026//
1027 std::string eMsg;
1028 rc = tlsIO.Accept(&eMsg);
1029
1030// Diagnose return state
1031//
1032 if (rc != XrdTls::TLS_AOK) Log.Emsg("LinkXeq", eMsg.c_str());
1033 else {isTLS = enable;
1034 Addr.SetTLS(enable);
1035 Log.Emsg("LinkXeq", ID, "connection upgraded to", verTLS());
1036 }
1037 return rc == XrdTls::TLS_AOK;
1038}
1039
1040/******************************************************************************/
1041/* S F E r r o r */
1042/******************************************************************************/
1043
1045{
1046 Log.Emsg("TLS", rc, "send file to", ID);
1047 return -1;
1048}
1049
1050/******************************************************************************/
1051/* S h u t d o w n */
1052/******************************************************************************/
1053
1054void XrdLinkXeq::Shutdown(bool getLock)
1055{
1056 int temp;
1057
1058// Trace the entry
1059//
1060 TRACEI(DEBUG, (getLock ? "Async" : "Sync") <<" link shutdown in progress");
1061
1062// Get the lock if we need too (external entry via another thread)
1063//
1064 if (getLock) LinkInfo.opMutex.Lock();
1065
1066// If there is something to do, do it now
1067//
1068 temp = Instance; Instance = 0;
1069 if (!KeepFD)
1070 {shutdown(PollInfo.FD, SHUT_RDWR);
1071 if (dup2(devNull, PollInfo.FD) < 0)
1072 {Instance = temp;
1073 Log.Emsg("Link", errno, "shutdown FD for", ID);
1074 }
1075 }
1076
1077// All done
1078//
1079 if (getLock) LinkInfo.opMutex.UnLock();
1080}
1081
1082/******************************************************************************/
1083/* S t a t s */
1084/******************************************************************************/
1085
1086int XrdLinkXeq::Stats(char *buff, int blen, bool do_sync)
1087{
1088 static const char statfmt[] = "<stats id=\"link\"><num>%d</num>"
1089 "<maxn>%d</maxn><tot>%lld</tot><in>%lld</in><out>%lld</out>"
1090 "<ctime>%lld</ctime><tmo>%d</tmo><stall>%d</stall>"
1091 "<sfps>%d</sfps></stats>";
1092 int i;
1093
1094// Check if actual length wanted
1095//
1096 if (!buff) return sizeof(statfmt)+17*6;
1097
1098// We must synchronize the statistical counters
1099//
1100 if (do_sync) XrdLinkCtl::SyncAll();
1101
1102// Obtain lock on the stats area and format it
1103//
1105 i = snprintf(buff, blen, statfmt, AtomicGet(LinkCount),
1115 return i;
1116}
1117
1118/******************************************************************************/
1119/* s y n c S t a t s */
1120/******************************************************************************/
1121
1123{
1124 long long tmpLL;
1125 int tmpI4;
1126
1127// If this is dynamic, get the opMutex lock
1128//
1129 if (!ctime) LinkInfo.opMutex.Lock();
1130
1131// Either the caller has the opMutex or this is called out of close. In either
1132// case, we need to get the read and write mutexes; each followed by the stats
1133// mutex. This order is important because we should not hold the stats mutex
1134// for very long and the r/w mutexes may take a long time to acquire. If we
1135// must maintain the link count we need to actually acquire the stats mutex as
1136// we will be doing compound operations. Atomics are still used to keep other
1137// threads from seeing partial results.
1138//
1140
1141 if (ctime)
1142 {*ctime = time(0) - LinkInfo.conTime;
1143 AtomicAdd(LinkConTime, *ctime);
1144 statsMutex.Lock();
1145 if (LinkCount > 0) AtomicDec(LinkCount);
1147 }
1148
1150
1151 tmpLL = AtomicFAZ(BytesIn);
1152 AtomicAdd(LinkBytesIn, tmpLL); AtomicAdd(BytesInTot, tmpLL);
1153 tmpI4 = AtomicFAZ(tardyCnt);
1155 tmpI4 = AtomicFAZ(stallCnt);
1156 AtomicAdd(LinkStalls, tmpI4); AtomicAdd(stallCntTot, tmpI4);
1158
1160 tmpLL = AtomicFAZ(BytesOut);
1162 tmpI4 = AtomicFAZ(SfIntr);
1163 AtomicAdd(LinkSfIntr, tmpI4);
1165
1166// Make sure the protocol updates it's statistics as well
1167//
1168 if (Protocol) Protocol->Stats(0, 0, 1);
1169
1170// All done
1171//
1172 if (!ctime) LinkInfo.opMutex.UnLock();
1173}
1174
1175/******************************************************************************/
1176/* Protected: T L S _ E r r o r */
1177/******************************************************************************/
1178
1179int XrdLinkXeq::TLS_Error(const char *act, XrdTls::RC rc)
1180{
1181 std::string reason = XrdTls::RC2Text(rc);
1182 char msg[512];
1183
1184 snprintf(msg, sizeof(msg), "Unable to %s %s;", act, ID);
1185 Log.Emsg("TLS", msg, reason.c_str());
1186 return -1;
1187}
1188
1189/******************************************************************************/
1190/* T L S _ P e e k */
1191/******************************************************************************/
1192
1193int XrdLinkXeq::TLS_Peek(char *Buff, int Blen, int timeout)
1194{
1195 XrdSysMutexHelper theMutex;
1196 XrdTls::RC retc;
1197 int rc, rlen;
1198
1199// Lock the read mutex if we need to, the helper will unlock it upon exit
1200//
1201 if (LockReads) theMutex.Lock(&rdMutex);
1202
1203// Wait until we can actually read something
1204//
1205 isIdle = 0;
1206 if (timeout)
1207 {rc = Wait4Data(timeout);
1208 if (rc < 1) return rc;
1209 }
1210
1211// Do the peek and if sucessful, the number of bytes available.
1212//
1213 retc = tlsIO.Peek(Buff, Blen, rlen);
1214 if (retc == XrdTls::TLS_AOK) return rlen;
1215
1216// Dianose the TLS error and return failure
1217//
1218 return TLS_Error("peek on", retc);
1219}
1220
1221/******************************************************************************/
1222/* T L S _ R e c v */
1223/******************************************************************************/
1224
1225int XrdLinkXeq::TLS_Recv(char *Buff, int Blen)
1226{
1227 XrdSysMutexHelper theMutex;
1228 XrdTls::RC retc;
1229 int rlen;
1230
1231// Lock the read mutex if we need to, the helper will unlock it upon exit
1232//
1233 if (LockReads) theMutex.Lock(&rdMutex);
1234
1235// Note that we will read only as much as is queued. Use Recv() with a
1236// timeout to receive as much data as possible.
1237//
1238 isIdle = 0;
1239 retc = tlsIO.Read(Buff, Blen, rlen);
1240 if (retc != XrdTls::TLS_AOK) return TLS_Error("receive from", retc);
1241 if (rlen > 0) AtomicAdd(BytesIn, rlen);
1242 return rlen;
1243}
1244
1245/******************************************************************************/
1246
1247int XrdLinkXeq::TLS_Recv(char *Buff, int Blen, int timeout, bool havelock)
1248{
1249 XrdSysMutexHelper theMutex;
1250 XrdTls::RC retc;
1251 int pend, rlen, totlen = 0;
1252
1253// Lock the read mutex if we need to, the helper will unlock it upon exit
1254//
1255 if (LockReads && !havelock) theMutex.Lock(&rdMutex);
1256
1257// Wait up to timeout milliseconds for data to arrive
1258//
1259 isIdle = 0;
1260 while(Blen > 0)
1261 {pend = tlsIO.Pending(true);
1262 if (!pend) pend = Wait4Data(timeout);
1263 if (pend < 1)
1264 {if (pend < 0) return -1;
1265 tardyCnt++;
1266 if (totlen)
1267 {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
1268 AtomicAdd(BytesIn, totlen);
1269 }
1270 return totlen;
1271 }
1272
1273 // Read as much data as you can. Note that we will force an error
1274 // if we get a zero-length read after poll said it was OK. However,
1275 // if we never read anything, then we simply return -ENOMSG to avoid
1276 // generating a "read link error" as clearly there was a hangup.
1277 //
1278 retc = tlsIO.Read(Buff, Blen, rlen);
1279 if (retc != XrdTls::TLS_AOK)
1280 {if (!totlen) return -ENOMSG;
1281 AtomicAdd(BytesIn, totlen);
1282 return TLS_Error("receive from", retc);
1283 }
1284 if (rlen <= 0) break;
1285 totlen += rlen; Blen -= rlen; Buff += rlen;
1286 }
1287
1288 AtomicAdd(BytesIn, totlen);
1289 return totlen;
1290}
1291
1292/******************************************************************************/
1293
1294int XrdLinkXeq::TLS_Recv(const struct iovec *iov, int iocnt, int timeout)
1295{
1296 XrdSysMutexHelper theMutex;
1297 char *Buff;
1298 int Blen, rlen, totlen = 0;
1299
1300// Lock the read mutex if we need to, the helper will unlock it upon exit
1301//
1302 if (LockReads) theMutex.Lock(&rdMutex);
1303
1304// Individually process each element until we can't read any more
1305//
1306 isIdle = 0;
1307 for (int i = 0; i < iocnt; i++)
1308 {Buff = (char *)iov[i].iov_base;
1309 Blen = iov[i].iov_len;
1310 rlen = TLS_Recv(Buff, Blen, timeout, true);
1311 if (rlen <= 0) break;
1312 totlen += rlen;
1313 if (rlen < Blen) break;
1314 }
1315
1316 if (totlen) {AtomicAdd(BytesIn, totlen);}
1317 return totlen;
1318}
1319
1320/******************************************************************************/
1321/* T L S _ R e c v A l l */
1322/******************************************************************************/
1323
1324int XrdLinkXeq::TLS_RecvAll(char *Buff, int Blen, int timeout)
1325{
1326 int retc;
1327
1328// Check if timeout specified. Notice that the timeout is the max we will
1329// wait for some data. We will wait forever for all the data. Yeah, it's weird.
1330//
1331 if (timeout >= 0)
1332 {retc = tlsIO.Pending(true);
1333 if (!retc) retc = Wait4Data(timeout);
1334 if (retc < 1) return (retc ? -1 : -ETIMEDOUT);
1335 }
1336
1337// Note that we will block until we receive all the bytes.
1338//
1339 return TLS_Recv(Buff, Blen, -1);
1340}
1341
1342/******************************************************************************/
1343/* T L S _ S e n d */
1344/******************************************************************************/
1345
1346int XrdLinkXeq::TLS_Send(const char *Buff, int Blen)
1347{
1349 ssize_t bytesleft = Blen;
1350 XrdTls::RC retc;
1351 int byteswritten;
1352
1353// Prepare to send
1354//
1355 isIdle = 0;
1356 AtomicAdd(BytesOut, Blen);
1357
1358// Do non-blocking writes if we are setup to do so.
1359//
1360 if (sendQ) return sendQ->Send(Buff, Blen);
1361
1362// Write the data out
1363//
1364 while(bytesleft)
1365 {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1366 if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1367 bytesleft -= byteswritten; Buff += byteswritten;
1368 }
1369
1370// All done
1371//
1372 return Blen;
1373}
1374
1375/******************************************************************************/
1376
1377int XrdLinkXeq::TLS_Send(const struct iovec *iov, int iocnt, int bytes)
1378{
1380 XrdTls::RC retc;
1381 int byteswritten;
1382
1383// Get a lock and assume we will be successful (statistically we are). Note
1384// that the calling interface gauranteed bytes are not zero.
1385//
1386 isIdle = 0;
1387 AtomicAdd(BytesOut, bytes);
1388
1389// Do non-blocking writes if we are setup to do so.
1390//
1391 if (sendQ) return sendQ->Send(iov, iocnt, bytes);
1392
1393// Write the data out.
1394//
1395 for (int i = 0; i < iocnt; i++)
1396 {ssize_t bytesleft = iov[i].iov_len;
1397 char *Buff = (char *)iov[i].iov_base;
1398 while(bytesleft)
1399 {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1400 if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1401 bytesleft -= byteswritten; Buff += byteswritten;
1402 }
1403 }
1404
1405// All done
1406//
1407 return bytes;
1408}
1409
1410/******************************************************************************/
1411
1412int XrdLinkXeq::TLS_Send(const sfVec *sfP, int sfN)
1413{
1415 int bytes, buffsz, fileFD, retc;
1416 off_t offset;
1417 ssize_t totamt = 0;
1418 char myBuff[65536];
1419
1420// Convert the sendfile to a regular send. The conversion is not particularly
1421// fast and caller are advised to avoid using sendfile on TLS connections.
1422//
1423 isIdle = 0;
1424 for (int i = 0; i < sfN; sfP++, i++)
1425 {if (!(bytes = sfP->sendsz)) continue;
1426 totamt += bytes;
1427 if (sfP->fdnum < 0)
1428 {if (!TLS_Write(sfP->buffer, bytes)) return -1;
1429 continue;
1430 }
1431 offset = sfP->offset;
1432 fileFD = sfP->fdnum;
1433 buffsz = (bytes < (int)sizeof(myBuff) ? bytes : sizeof(myBuff));
1434 do {do {retc = pread(fileFD, myBuff, buffsz, offset);}
1435 while(retc < 0 && errno == EINTR);
1436 if (retc < 0) return SFError(errno);
1437 if (!retc) break;
1438 if (!TLS_Write(myBuff, buffsz)) return -1;
1439 offset += buffsz; bytes -= buffsz; totamt += retc;
1440 } while(bytes > 0);
1441 }
1442
1443// We are done
1444//
1445 AtomicAdd(BytesOut, totamt);
1446 return totamt;
1447}
1448
1449/******************************************************************************/
1450/* Protected: T L S _ W r i t e */
1451/******************************************************************************/
1452
1453bool XrdLinkXeq::TLS_Write(const char *Buff, int Blen)
1454{
1455 XrdTls::RC retc;
1456 int byteswritten;
1457
1458// Write the data out
1459//
1460 while(Blen)
1461 {retc = tlsIO.Write(Buff, Blen, byteswritten);
1462 if (retc != XrdTls::TLS_AOK)
1463 {TLS_Error("write to", retc);
1464 return false;
1465 }
1466 Blen -= byteswritten; Buff += byteswritten;
1467 }
1468
1469// All done
1470//
1471 return true;
1472}
1473
1474/******************************************************************************/
1475/* v e r T L S */
1476/******************************************************************************/
1477
1479{
1480 return tlsIO.Version();
1481}
#define DEBUG(x)
#define close(a)
Definition XrdPosix.hh:43
#define write(a, b, c)
Definition XrdPosix.hh:110
#define writev(a, b, c)
Definition XrdPosix.hh:112
#define readv(a, b, c)
Definition XrdPosix.hh:79
#define read(a, b, c)
Definition XrdPosix.hh:77
#define pread(a, b, c, d)
Definition XrdPosix.hh:75
#define eMsg(x)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicDec(x)
#define AtomicGet(x)
#define AtomicEnd(Mtx)
#define AtomicAdd(x, y)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define TRACEI(act, x)
Definition XrdTrace.hh:66
const char * Comment
Definition XrdJob.hh:47
static void SyncAll()
Synchronize statustics for ll links.
static void Unhook(int fd)
Unhook a link from the active table of links.
time_t conTime
XrdSysRecMutex opMutex
XrdSysCondVar * KillcvP
static const char * TraceID
int TLS_Send(const char *Buff, int Blen)
long long BytesOut
int TLS_Error(const char *act, XrdTls::RC rc)
int TLS_Peek(char *Buff, int Blen, int timeout)
int Client(char *buff, int blen)
char Uname[24]
XrdTlsPeerCerts * getPeerCerts()
static int LinkCountMax
XrdLinkInfo LinkInfo
XrdProtocol * ProtoAlt
int Close(bool defer=false)
XrdNetAddr Addr
int TLS_Recv(char *Buff, int Blen)
int sendData(const char *Buff, int Blen)
long long BytesInTot
bool TLS_Write(const char *Buff, int Blen)
int SendIOV(const struct iovec *iov, int iocnt, int bytes)
XrdProtocol * setProtocol(XrdProtocol *pp, bool push)
static long long LinkCountTot
long long BytesOutTot
void Shutdown(bool getLock)
int Peek(char *buff, int blen, int timeout=-1)
static int LinkCount
void Reset()
int Backlog()
XrdSysMutex wrMutex
static int Stats(char *buff, int blen, bool do_sync=false)
XrdSendQ * sendQ
XrdPollInfo PollInfo
void setID(const char *userid, int procid)
int Recv(char *buff, int blen)
static long long LinkBytesIn
int TLS_RecvAll(char *Buff, int Blen, int timeout)
int SFError(int rc)
long long BytesIn
int Send(const char *buff, int blen)
XrdSysMutex rdMutex
const char * verTLS()
bool setNB()
int RecvIOV(const struct iovec *iov, int iocnt)
char Lname[256]
static long long LinkConTime
static int LinkSfIntr
XrdTlsSocket tlsIO
void DoIt()
int RecvAll(char *buff, int blen, int timeout=-1)
XrdProtocol * Protocol
bool Register(const char *hName)
static XrdSysMutex statsMutex
void setProtName(const char *name)
static int LinkStalls
static long long LinkBytesOut
void syncStats(int *ctime=0)
bool setTLS(bool enable, XrdTlsContext *ctx=0)
static int LinkTimeOuts
void SetDialect(const char *dP)
bool Register(const char *hName)
void SetTLS(bool val)
XrdPoll * Poller
virtual int Enable(XrdPollInfo &pInfo)=0
static char * Poll2Text(short events)
Definition XrdPoll.cc:272
static void Detach(XrdPollInfo &pInfo)
Definition XrdPoll.cc:177
virtual void Recycle(XrdLink *lp=0, int consec=0, const char *reason=0)=0
virtual int Stats(char *buff, int blen, int do_sync=0)=0
virtual int Process(XrdLink *lp)=0
void Terminate(XrdLink *lP=0)
Definition XrdSendQ.cc:396
int Send(const char *buff, int blen)
Definition XrdSendQ.cc:230
unsigned int Backlog()
Definition XrdSendQ.hh:46
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Lock(XrdSysMutex *Mutex)
int fd
Socket file descriptor.
long long bytesOut
Bytes written to the socket.
int consec
Seconds connected.
virtual void Monitor(XrdNetAddrInfo &netInfo, LinkInfo &lnkInfo, int liLen)=0
long long bytesIn
Bytes read from the socket.
const char * tident
Pointer to the client's trace identifier.
@ TLS_HS_BLOCK
Always block during handshake.
XrdTls::RC Accept(std::string *eMsg=0)
void Shutdown(SDType=sdImmed)
@ TLS_RBL_WBL
blocking read blocking write
XrdTls::RC Write(const char *buffer, size_t size, int &bytesOut)
const char * Version()
XrdTls::RC Read(char *buffer, size_t size, int &bytesRead)
Read from the TLS connection. If necessary, a handshake will be done.
const char * Init(XrdTlsContext &ctx, int sfd, RW_Mode rwm, HS_Mode hsm, bool isClient, bool serial=true, const char *tid="")
void SetTraceID(const char *tid)
int Pending(bool any=true)
XrdTls::RC Peek(char *buffer, size_t size, int &bytesPeek)
XrdTlsPeerCerts * getCerts(bool ver=true)
static std::string RC2Text(XrdTls::RC rc, bool dbg=false)
Definition XrdTls.cc:127
@ TLS_AOK
All went well, will always be zero.
Definition XrdTls.hh:40
XrdTlsContext * tlsCtx
Definition XrdGlobals.cc:52
XrdTcpMonPin * TcpMonPin
Definition XrdLinkXeq.cc:96
XrdSysError Log
Definition XrdConfig.cc:111
XrdScheduler Sched
Definition XrdLinkCtl.cc:54
int fdnum
File descriptor for data.
int sendsz
Length of data at offset.