61 ntohl(*(static_cast<unsigned int *>(static_cast<void *>(x))))
70 XrdXrootdTransit::TranStack(
"TranStack",
71 "transit protocol anchor");
89 xp->
Init(rsltP, linkP, seceP, nameP, protP);
98 const struct iovec *ioV,
int ioN,
int ioL)
105 {
TRACE(REQ,
"Unable to find request for " <<lP->
ID <<
" sid=" <<*theSID);
111 return tP->
bridge->AttnCont(tP, rcode, ioV, ioN, ioL);
119 const struct iovec *ioV,
int ioN,
int ioL)
132 && (!ioN ||
XRD_GETNUM(ioV[0].iov_base) == 0))
139 rc =
Send(rcode, ioV, ioN, ioL);
143 if (rc >= 0 && !runWait)
144 {
if (runDone) runStatus.store(0, std::memory_order_release);
164 if (runStatus.fetch_add(1, std::memory_order_acq_rel))
return false;
168 Link->setProtocol(realProt);
172 sprintf(buff,
"%s disconnection", pName);
177 TranStack.Push(&TranLink);
185bool XrdXrootdTransit::Fail(
int ecode,
const char *etext)
196int XrdXrootdTransit::Fatal(
int rc)
198 XrdXrootd::Bridge::Context rInfo(
Link,
Request.header.streamid,
201 return (respObj->Error(rInfo, runError, runEText) ? rc : -1);
211 TranStack.Set(qMax, qTTL);
233 runStatus.store(0, std::memory_order_release);
257 strncpy(uname, nameP,
sizeof(uname)-1);
258 uname[
sizeof(uname)-1] = 0;
309 who = (seceP && seceP->
name ? seceP->
name :
"nobody");
337 if (rc >= 0)
Link->Enable();
338 else if (rc != -EINPROGRESS)
Link->Close();
362do{rc = realProt->Process((reInvoke ? 0 : lp));
363 if (rc >= 0 && runStatus.load(std::memory_order_acquire))
364 {reInvoke = (rc == 0);
365 if (runError) rc = Fatal(rc);
366 else {runDone =
false;
369 {
if (runWait) rc = -EINPROGRESS;
370 if (!runDone)
return rc;
371 runStatus.store(0, std::memory_order_release);
374 }
else reInvoke =
false;
375 }
while(rc >= 0 && reInvoke);
379 runStatus.store(0, std::memory_order_release);
383 return (rc ? rc : 1);
395 runStatus.fetch_add(1, std::memory_order_acq_rel);
402 TRACEP(
EMSG,
"WARNING: Recycle is canceling wait job; the wait job might already be running during recycle.");
403 Sched->Cancel(&waitJob);
408 if (realProt) realProt->Recycle(lp, consec, reason);
416 if (runArgs) {free(runArgs); runArgs = 0;}
424 TranStack.Push(&TranLink);
434 static char eText[] =
"Insufficent memory to re-issue request";
435 static struct iovec ioV[] = {{(
char *)&eCode,
sizeof(eCode)},
436 {(
char *)&eText,
sizeof(eText)}};
441 TRACEP(REQ,
"Bridge redrive runStatus="<<runStatus.load(std::memory_order_acquire)
442 <<
" runError="<<runError
443 <<
" runWait="<<runWait<<
" runWTot="<<runWTot);
459 if (!runALen || RunCopy(runArgs, runALen)) {
461 TRACEP(REQ,
"Bridge redrive Process2 rc="<<rc
462 <<
" runError="<<runError<<
" runWait="<<runWait);
463 if (rc == 0 && !runWait && !runError) {
464 rc = realProt->Process(NULL);
465 TRACEP(REQ,
"Bridge redrive callback rc="<<rc
466 <<
" runStatus="<<runStatus.load(std::memory_order_acquire));
468 }
while((rc == 0) && !runError && !runWait);
474 if (rc >= 0 && runWait)
return;
479 runStatus.store(0, std::memory_order_release);
484 if (rc < 0)
Link->Close();
492#define KXR_INDEX(x) x-kXR_auth
500 memset(rTab, 0,
sizeof(rTab));
531bool XrdXrootdTransit::ReqWrite(
char *xdataP,
int xdataL)
541 {
Resume = 0; wBuff = xdataP; wBLen = xdataL;
566 if (runStatus.fetch_add(1, std::memory_order_acq_rel))
567 {
TRACEP(REQ,
"Bridge request failed due to re-entry");
578 if (
Request.header.requestid & 0x8000
581 {
TRACEP(REQ,
"Unsupported bridge request");
589 {
TRACEP(REQ,
"Invalid request data length");
597 <<
" dlen=" <<
Request.header.dlen <<
" blen=" <<xdataL);
608 {movLen = (xdataL <
Request.header.dlen ? xdataL :
Request.header.dlen);
609 if (!RunCopy(xdataP, movLen))
return true;
610 if (!runArgs || movLen > runABsz)
611 {
if (runArgs) free(runArgs);
612 if (!(runArgs = (
char *)malloc(movLen)))
613 {
TRACEP(REQ,
"Failed to allocate memory");
618 memcpy(runArgs, xdataP, movLen); runALen = movLen;
637bool XrdXrootdTransit::RunCopy(
char *buffP,
int buffL)
645 {Fail(
kXR_ArgTooLong,
"Request argument too long");
return false;}
651 memcpy(
argp->buff, buffP, buffL);
652 argp->buff[buffL] = 0;
674 eMsg = (ioN < 2 ?
"" : (
const char *)ioV[1].iov_base);
675 if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
676 aOK = respObj->Error(rInfo, rc,
eMsg);
679 if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
680 aOK = (ioN ? respObj->Data(rInfo, ioV, ioN, ioL,
true)
681 : respObj->Done(rInfo));
684 aOK = respObj->Data(rInfo, ioV, ioN, ioL,
false);
688 if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
690 aOK = respObj->Redir(rInfo,rc,(
const char *)ioV[1].iov_base);
693 return Wait(rInfo, ioV, ioN, ioL);
697 return WaitResp(rInfo, ioV, ioN, ioL);
699 default:
if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
701 "internal logic error");
707 return (aOK ? 0 : -1);
716 offset, dlen, fdnum);
721 return (respObj->File(sfInfo, dlen) ? 0 : -1);
730 sfvec, sfvnum, dlen);
735 return (respObj->File(sfInfo, dlen) ? 0 : -1);
743 const struct iovec *ioV,
int ioN,
int ioL)
750 eMsg = (ioN < 2 ?
"reason unknown" : (
const char *)ioV[1].iov_base);
755 {
int wtime = runWait;
757 return (respObj->
Wait(rInfo, wtime,
eMsg) ? 0 : -1);
762 if (runWTot >= runWMax)
770 if (runWait > runWMax) runWait = runWMax;
774 if (runWCall && !(respObj->Wait(rInfo, runWait,
eMsg)))
return -1;
778 TRACEP(REQ,
"Bridge delaying request " <<runWait <<
" sec (" <<
eMsg <<
")");
779 Sched->Schedule((
XrdJob *)&waitJob, time(0)+runWait);
788 const struct iovec *ioV,
int ioN,
int ioL)
790 XrdXrootdTransPend *trP;
797 eMsg = (ioN < 2 ?
"reason unknown" : (
const char *)ioV[1].iov_base);
798 TRACEP(REQ,
"Bridge waiting for resp; sid=" <<rInfo.
sID.num
799 <<
" wt=" <<wTime <<
" (" <<
eMsg <<
")");
808 trP =
new XrdXrootdTransPend(
Link,
this, &
Request);
struct ClientRequestHdr header
struct ClientLoginRequest login
XrdOucTrace * XrdXrootdTrace
void Release(XrdBuffer *bp)
XrdBuffer * Obtain(int bsz)
friend class XrdScheduler
XrdJob(const char *desc="")
XrdProtocol * setProtocol(XrdProtocol *pp, bool runit=false, bool push=false)
void setID(const char *userid, int procid)
XrdNetAddrInfo * AddrInfo()
char * ID
Pointer to the client's link identity.
void armBridge()
Mark this link as an in-memory communications bridge (internal use only).
void setProtName(const char *name)
const char * Host() const
unsigned int Inst() const
bool isIPType(IPType ipType) const
static void Sanitize(char *instr, char subc='_')
void Schedule(XrdJob *jp)
char prot[XrdSecPROTOIDSIZE]
Auth protocol used (e.g. krb5)
char * name
Entity's name.
char * host
Entity's host name dnr dependent.
static XrdXrootdStats * SI
static XrdSysError & eDest
static unsigned int getSID()
XrdXrootdMonitor::User Monitor
int(XrdXrootdProtocol::* Resume)()
static XrdScheduler * Sched
int Process(XrdLink *lp) override
void Recycle(XrdLink *lp, int consec, const char *reason) override
XrdXrootdResponse Response
static XrdBuffManager * BPool
static XrdSfsFileSystem * osFS
static XrdXrootdTransPend * Remove(XrdLink *lP, short sid)
union XrdXrootdTransPend::@371012140333040222300212162025004307132302363251 Pend
XrdXrootdTransit * bridge
static void Clear(XrdXrootdTransit *trP)
bool Run(const char *xreqP, char *xdataP=0, int xdataL=0)
Inject an xrootd request into the protocol stack.
static const char * ReqTable()
Initialize the valid request table.
void Redrive()
Redrive a request after a wait.
int Send(int rcode, const struct iovec *ioVec, int ioNum, int ioLen)
Handle request data response.
void Recycle(XrdLink *lp, int consec, const char *reason)
Handle link shutdown.
static void Init(XrdScheduler *schedP, int qMax, int qTTL)
Perform one-time initialization.
static XrdXrootdTransit * Alloc(XrdXrootd::Bridge::Result *respP, XrdLink *linkP, XrdSecEntity *seceP, const char *nameP, const char *protP)
Get a new transit object.
static int Attn(XrdLink *lP, short *theSID, int rcode, const struct iovec *ioVec, int ioNum, int ioLen)
Handle attention response (i.e. async response)
XrdXrootdTransit()
Constructor & Destructor.
void Proceed()
Resume processing after a waitresp completion.
bool Disc()
Handle dismantlement.
int Process(XrdLink *lp)
Handle link activation (replaces parent activation).
union XrdXrootd::Bridge::Context::@216053020250347016153077031152206173164054152024 sID
associated request stream ID
virtual bool Wait(Bridge::Context &info, int wtime, const char *wtext)
static const int uIPv4
ucap: Supports read redirects