XRootD
Loading...
Searching...
No Matches
XrdFrmReqBoss.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d F r m R e q B o s s . c c */
4/* */
5/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <cstdlib>
33#include <cstring>
34#include <strings.h>
35#include <unistd.h>
36#include <fcntl.h>
37#include <sys/types.h>
38#include <sys/stat.h>
39
40#include "XrdFrc/XrdFrcCID.hh"
41#include "XrdFrc/XrdFrcTrace.hh"
42#include "XrdFrc/XrdFrcUtils.hh"
45#include "XrdNet/XrdNetMsg.hh"
46#include "XrdOuc/XrdOucUtils.hh"
48
49using namespace XrdFrc;
50
51/******************************************************************************/
52/* T h r e a d I n t e r f a c e s */
53/******************************************************************************/
54
55void *mainServerXeq(void *parg)
56{
57 XrdFrmReqBoss *theBoss = (XrdFrmReqBoss *)parg;
58 theBoss->Process();
59 return (void *)0;
60}
61
62/******************************************************************************/
63/* Public: A d d */
64/******************************************************************************/
65
67{
68
69// Complete the request including verifying the priority
70//
71 if (Request.Prty > XrdFrcRequest::maxPrty)
73 else if (Request.Prty < 0)Request.Prty = 0;
74 Request.addTOD = time(0);
75
76// Now add it to the queue
77//
78 rQueue[static_cast<int>(Request.Prty)]->Add(&Request);
79
80// Now wake ourselves up
81//
82 Wakeup(1);
83}
84
85/******************************************************************************/
86/* Public: D e l */
87/******************************************************************************/
88
90{
91 int i;
92
93// Remove all pending requests for this id
94//
95 for (i = 0; i <= XrdFrcRequest::maxPrty; i++) rQueue[i]->Can(&Request);
96}
97
98/******************************************************************************/
99/* Public: P r o c e s s */
100/******************************************************************************/
101
103{
104 EPNAME("Process");
105 XrdFrcRequest myReq;
106 int i, rc, numXfr, numPull;;
107
108// Perform staging in an endless loop
109//
110do{Wakeup(0);
111 do{numXfr = 0;
112 for (i = XrdFrcRequest::maxPrty; i >= 0; i--)
113 {numPull = i+1;
114 while(numPull && (rc = rQueue[i]->Get(&myReq)))
115 {if (myReq.Options & XrdFrcRequest::Register) Register(myReq,i);
116 else {numPull -= XrdFrmXfrQueue::Add(&myReq,rQueue[i],theQ);
117 numXfr++;
118 DEBUG(Persona <<" from Q " << i <<' ' <<numPull <<" left");
119 if (rc < 0) break;
120 }
121 }
122 }
123 } while(numXfr);
124 } while(1);
125}
126
127/******************************************************************************/
128/* Private: R e g i s t e r */
129/******************************************************************************/
130
131void XrdFrmReqBoss::Register(XrdFrcRequest &Req, int qNum)
132{
133 EPNAME("Register");
134 char *eP;
135 int Pid;
136
137// Ignore this request if there is no cluster id or the process if is invalid
138//
139 if (!(*Req.LFN)) return;
140 Pid = strtol(Req.ID, &eP, 10);
141 if (*eP || Pid == 0) return;
142
143// Register this cluster
144//
145 if (CID.Add(Req.iName, Req.LFN, static_cast<time_t>(Req.addTOD), Pid))
146 {DEBUG("Instance=" <<Req.iName <<" cluster=" <<Req.LFN <<" pid=" <<Pid);}
147 else rQueue[qNum]->Del(&Req);
148}
149
150/******************************************************************************/
151/* S t a r t */
152/******************************************************************************/
153
154int XrdFrmReqBoss::Start(char *aPath, int aMode)
155{
156 pthread_t tid;
157 char buff[2048], *qPath;
158 int retc, i;
159
160// Generate the queue directory path
161//
162 if (!(qPath = XrdFrcUtils::makeQDir(aPath, aMode))) return 0;
163
164// Initialize the request queues if all went well
165//
166 for (i = 0; i <= XrdFrcRequest::maxPrty; i++)
167 {sprintf(buff, "%s%sQ.%d", qPath, Persona, i);
168 rQueue[i] = new XrdFrcReqFile(buff, 0);
169 if (!rQueue[i]->Init()) {free(qPath); return 0;}
170 }
171 free(qPath);
172
173// Start the request processing thread
174//
175 if ((retc = XrdSysThread::Run(&tid, mainServerXeq, (void *)this,
176 XRDSYSTHREAD_BIND, Persona)))
177 {sprintf(buff, "create %s request thread", Persona);
178 Say.Emsg("Start", retc, buff);
179 return 0;
180 }
181
182// All done
183//
184 return 1;
185}
186
187/******************************************************************************/
188/* Public: W a k e u p */
189/******************************************************************************/
190
191void XrdFrmReqBoss::Wakeup(int PushIt)
192{
193 static XrdSysMutex rqMutex;
194
195// If this is a PushIt then see if we need to push the binary semaphore
196//
197 if (PushIt) {rqMutex.Lock();
198 if (!isPosted) {rqReady.Post(); isPosted = 1;}
199 rqMutex.UnLock();
200 }
201 else {rqReady.Wait();
202 rqMutex.Lock(); isPosted = 0; rqMutex.UnLock();
203 }
204}
#define DEBUG(x)
#define EPNAME(x)
XrdOucPup XrdCmsParser::Pup & Say
void * mainServerXeq(void *parg)
#define XRDSYSTHREAD_BIND
char LFN[3072]
static const int Register
signed char Prty
static const int maxPrty
long long addTOD
static char * makeQDir(const char *Path, int Mode)
void Del(XrdFrcRequest &Request)
void Add(XrdFrcRequest &Request)
int Start(char *aPath, int aMode)
void Wakeup(int PushIt=1)
static int Add(XrdFrcRequest *rP, XrdFrcReqFile *reqF, int theQ)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
XrdFrcCID CID
Definition XrdFrcCID.cc:56