25#ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
26#define __XRD_CL_XROOTD_MSG_HANDLER_HH__
63 class LocalFileHandler;
99 case EntryRedirect:
return "Redirected from: " + fromstr +
" to: "
103 "Falling back to virtual redirector: " + tostr;
107 case EntryWait:
return "Waited at server request. Resending: "
111 return "Failed at: " + fromstr +
", retrying at: " + tostr;
136 std::shared_ptr<SIDManager> sidMgr,
139 pResponseHandler( respHandler ),
141 pEffectiveDataServerUrl( 0 ),
143 pLFileHandler( lFileHandler ),
145 pRedirectAsAnswer( false ),
146 pOksofarAsAnswer( false ),
147 pHasLoadBalancer( false ),
148 pHasSessionId( false ),
151 pRedirectCounter( 0 ),
152 pNotAuthorizedCounter( 0 ),
155 pAsyncChunkIndex( 0 ),
157 pPgWrtCksumBuff( 4 ),
158 pPgWrtCurrentPageOffset( 0 ),
159 pPgWrtCurrentPageNb( 0 ),
161 pOtherRawStarted( false ),
163 pFollowMetalink( false ),
167 pAggregatedWaitTime( 0 ),
171 pTimeoutFence( false ),
173 pDirListStarted( false ),
174 pDirListWithStat( false ),
182 pHasSessionId =
true;
185 log->
Debug(
ExDbgMsg,
"[%s] MsgHandler created: 0x%x (message: %s ).",
194 ntohl( pgrdreq->
rlen ) ) );
210 DumpRedirectTraceBack();
214 delete pEffectiveDataServerUrl;
216 pRequest =
reinterpret_cast<Message*
>( 0xDEADBEEF );
218 pPostMaster =
reinterpret_cast<PostMaster*
>( 0xDEADBEEF );
220 pChunkList =
reinterpret_cast<ChunkList*
>( 0xDEADBEEF );
221 pEffectiveDataServerUrl =
reinterpret_cast<URL*
>( 0xDEADBEEF );
235 virtual uint16_t
Examine( std::shared_ptr<Message> &msg );
254 virtual uint16_t
GetSid()
const;
276 uint32_t &bytesRead );
296 virtual bool IsRaw()
const;
309 uint32_t &bytesWritten );
323 pExpiration = expiration;
340 pRedirectAsAnswer = redirectAsAnswer;
349 pOksofarAsAnswer = oksofarAsAnswer;
367 pLoadBalancer = loadBalancer;
368 pHasLoadBalancer =
true;
376 pHosts.reset( hostList );
384 pChunkList = chunkList;
386 pBodyReader->SetChunkList( chunkList );
388 pChunkStatus.resize( chunkList->size() );
390 pChunkStatus.clear();
395 pCrc32cDigests = std::move( crc32cDigests );
411 pRedirectCounter = redirectCounter;
416 pFollowMetalink = followMetalink;
421 pStateful = stateful;
446 void HandleResponse();
463 Status ParseXAttrResponse(
char *data,
size_t len,
AnyObject *&response );
469 Status RewriteRequestRedirect(
const URL &newUrl );
474 Status RewriteRequestWait();
479 void UpdateTriedCGI(uint32_t errNo=0);
484 void SwitchOnRefreshFlag();
490 void HandleRspOrQueue();
495 void HandleLocalRedirect(
URL *url );
513 bool OmitWait(
Message &request,
const URL &url );
522 bool RetriableErrorResponse(
const Status &status );
527 void DumpRedirectTraceBack();
537 Status ReadFromBuffer(
char *&buffer,
size_t &buflen, T& result );
547 Status ReadFromBuffer(
char *&buffer,
size_t &buflen, std::string &result );
558 Status ReadFromBuffer(
char *&buffer,
size_t &buflen,
size_t size,
559 std::string &result );
566 ChunkStatus(): sizeError( false ), done( false ) {}
571 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
573 static const size_t CksumSize =
sizeof( uint32_t );
575 static const size_t MaxSslErrRetry = 3;
577 inline static size_t NbPgPerRsp( uint64_t offset, uint32_t dlen )
587 if( _1stpg + CksumSize > dlen )
588 _1stpg = dlen - CksumSize;
589 dlen -= _1stpg + CksumSize;
591 pgcnt += dlen / PageWithCksum;
592 if( dlen % PageWithCksum )
598 std::shared_ptr<Message> pResponse;
599 std::vector<std::shared_ptr<Message>> pPartialResps;
600 ResponseHandler *pResponseHandler;
602 URL *pEffectiveDataServerUrl;
603 PostMaster *pPostMaster;
604 std::shared_ptr<SIDManager> pSidMgr;
605 LocalFileHandler *pLFileHandler;
606 XRootDStatus pStatus;
609 bool pRedirectAsAnswer;
610 bool pOksofarAsAnswer;
611 std::unique_ptr<HostList> pHosts;
612 bool pHasLoadBalancer;
613 HostInfo pLoadBalancer;
615 std::string pRedirectUrl;
617 std::vector<uint32_t> pCrc32cDigests;
619 std::vector<ChunkStatus> pChunkStatus;
620 uint16_t pRedirectCounter;
621 uint16_t pNotAuthorizedCounter;
623 uint32_t pAsyncOffset;
624 uint32_t pAsyncChunkIndex;
626 std::unique_ptr<AsyncPageReader> pPageReader;
627 std::unique_ptr<AsyncRawReaderIntfc> pBodyReader;
629 Buffer pPgWrtCksumBuff;
630 uint32_t pPgWrtCurrentPageOffset;
631 uint32_t pPgWrtCurrentPageNb;
633 bool pOtherRawStarted;
635 bool pFollowMetalink;
638 int pAggregatedWaitTime;
640 std::unique_ptr<RedirectEntry> pRdirEntry;
641 RedirectTraceBack pRedirectTraceBack;
650 std::atomic<bool> pTimeoutFence;
657 bool pDirListStarted;
658 bool pDirListWithStat;
Object for discarding data.
Object for reading out data from the kXR_read response.
Object for reading out data from the VectorRead response.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
The message representation used throughout the system.
const std::string & GetDescription() const
Get the description of the message.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
StreamEvent
Events that may have occurred to the stream.
A hub for dispatching and receiving messages.
Handle an async response.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
std::string GetLocation() const
Get location (protocol://host:port/path)
bool IsValid() const
Is the url valid.
Handle/Process/Forward XRootD messages.
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
virtual uint16_t Examine(std::shared_ptr< Message > &msg)
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
void SetFollowMetalink(bool followMetalink)
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
void SetHostList(HostList *hostList)
Set host list.
void SetCrc32cDigests(std::vector< uint32_t > &&crc32cDigests)
const Message * GetRequest() const
Get the request pointer.
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status)
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
XRootDMsgHandler(Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
virtual void OnStatusReady(const Message *message, XRootDStatus status)
The requested action has been performed and the status is available.
~XRootDMsgHandler()
Destructor.
void WaitDone(time_t now)
void SetStateful(bool stateful)
virtual bool IsRaw() const
Are we a raw writer or not?
time_t GetExpiration()
Get a timestamp after which we give up.
void SetOksofarAsAnswer(bool oksofarAsAnswer)
void SetKernelBuffer(XrdSys::KernelBuffer *kbuff)
Set the kernel buffer.
virtual void Process()
Process the message if it was "taken" by the examine action.
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
void SetRedirectAsAnswer(bool redirectAsAnswer)
virtual uint16_t InspectStatusRsp()
virtual uint16_t GetSid() const
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
std::vector< HostInfo > HostList
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
RedirectEntry(const URL &from, const URL &to, Type type)
std::string ToString(bool prevok=true)
Procedure execution status.