XRootD
Loading...
Searching...
No Matches
XrdSysIOEventsPollPort.icc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d S y s I O E v e n t s P o l l E . i c c */
4/* */
5/* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdlib>
32#include <sys/types.h>
33#include <sys/stat.h>
34#include <port.h>
35
36@include "XrdSys/XrdSysE2T.hh"
37
38
39/******************************************************************************/
40/* C l a s s P o l l P o r t */
41/******************************************************************************/
42
43namespace XrdSys
44{
45namespace IOEvents
46{
47class PollPort : public Poller
48{
49public:
50
51static int AllocMem(void **memP, int slots);
52
53 PollPort(port_event_t *ptab, int numfd, int pfd, int pFD[2])
54 : Poller(pFD[0], pFD[1]), pollTab(ptab),
55 pollDfd(pfd), pollMax(numfd)
56 {}
58
59static const int pollER = POLLERR| POLLHUP;
60static const int pollOK = POLLIN | POLLRDNORM | POLLPRI | POLLOUT;
61static const int pollRD = POLLIN | POLLRDNORM | POLLPRI;
62static const int pollWR = POLLOUT;
63
64protected:
65
66 void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg);
67
68timespec_t *BegTO(timespec_t &theTO)
69 {int toval = TmoGet();
70 if (toval < 0) return 0;
71 theTO.tv_sec = toval/1000;
72 theTO.tv_nsec= 0;
73 return &theTO;
74 }
75
76 void Exclude(Channel *cP, bool &isLocked, bool dover=1);
77
78 bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
79
80 bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
81
82 void Shutdown();
83
84private:
85
86 void Dispatch(Channel *cP, int pollEv);
87 bool Process(int curr);
88
89 port_event_t *pollTab;
90 Channel *cbNow;
91 int cbCurr;
92 int pollDfd;
93 int pollMax;
94 unsigned int numPoll;
95static void *deadChP;
96};
97 void *PollPort::deadChP = 0;
98};
99};
100
101/******************************************************************************/
102/* C l a s s P o l l e r */
103/******************************************************************************/
104/******************************************************************************/
105/* Static: n e w P o l l e r */
106/******************************************************************************/
107
109XrdSys::IOEvents::Poller::newPoller(int pipeFD[2],
110 int &eNum,
111 const char **eTxt)
112
113{
114 static const int allocFD = 170;
115 port_event_t *pp;
116 int pfd;
117
118// reate an event driver
119//
120 if ((pfd = port_create()) < 0)
121 {eNum = errno;
122 if (eTxt) *eTxt = "creating event port";
123 return 0;
124 }
125 fcntl(pfd, F_SETFD, FD_CLOEXEC);
126
127// Add the request side of the pipe fd to the poll set (always fd[0])
128//
129 if (port_associate(pfd, PORT_SOURCE_FD, pipeFD[0], PollPort::pollRD, 0))
130 { eNum = errno;
131 *eTxt = "adding communication pipe";
132 return 0;
133 }
134
135// Allocate the event table
136//
137 if ((eNum = XrdSys::IOEvents::PollPort::AllocMem((void **)&pp, allocFD)))
138 {if (eTxt) *eTxt = "creating port event table";
139 close(pfd);
140 return 0;
141 }
142
143// Create new poll object
144//
145 return (Poller *)new PollPort(pp, allocFD, pfd, pipeFD);
146}
147
148/******************************************************************************/
149/* C l a s s P o l l P o r t */
150/******************************************************************************/
151/******************************************************************************/
152/* A l l o c M e m */
153/******************************************************************************/
154
155int XrdSys::IOEvents::PollPort::AllocMem(void **memP, int slots)
156{
157 int bytes, alignment, pagsz = getpagesize();
158
159// Calculate the size of the poll table and allocate it
160//
161 bytes = slots * sizeof(port_event_t);
162 alignment = (bytes < pagsz ? 1024 : pagsz);
163 if (posix_memalign(memP, alignment, bytes)) return ENOMEM;
164 memset(*memP, 0, bytes);
165 return 0;
166}
167
168/******************************************************************************/
169/* Protected: B e g i n */
170/******************************************************************************/
171
173 int &retcode,
174 const char **eTxt)
175{
176 unsigned int numpolled;
177 int rc;
178 timespec_t toVal;
179 Channel *cP;
180
181// Indicate to the starting thread that all went well
182//
183 retcode = 0;
184 *eTxt = 0;
185 syncsem->Post();
186
187// Now start dispatching channels that are ready. We use the wakePend flag to
188// keep the chatter down when we actually wakeup. There is also a "feature" of
189// poll_getn() that can return an errno of zero upon a timeout, sigh.
190//
191 do {numpolled = 1; errno = 0;
192 do {rc = port_getn(pollDfd, pollTab, pollMax, &numpolled, BegTO(toVal));}
193 while (rc < 0 && errno == EINTR);
194 wakePend = true; numPoll = numpolled;
195 if (rc)
196 {if (errno == ETIME || !errno) CbkTMO();
197 else {int rc = errno;
198 //--------------------------------------------------------------
199 // If we are in a child process and the poll file descriptor
200 // has been closed, there is an immense chance the fork will be
201 // followed by an exec, in which case we don't want to abort
202 //--------------------------------------------------------------
203 if( rc == EBADF && parentPID != getpid() ) return;
204 std::cerr <<"PollP: " <<XrdSysE2T(rc) <<" polling for events" <<std::endl;
205 abort();
206 }
207 }
208 for (int i = 0; i < (int)numpolled; i++)
209 if (pollTab[i].portev_source == PORT_SOURCE_FD)
210 {if ((cP = (Channel *)pollTab[i].portev_user))
211 {cbCurr = i; Dispatch(cP, pollTab[i].portev_events);}
212 else if (!Process(i)) return;
213 }
214 } while(1);
215}
216
217/******************************************************************************/
218/* Private: D i s p a t c h */
219/******************************************************************************/
220
221void XrdSys::IOEvents::PollPort::Dispatch(XrdSys::IOEvents::Channel *cP,
222 int pollEv)
223{
224 const char *eTxt;
225 int eNum, events = 0;
226 bool isLocked = false;
227
228// Make sure this not a dispatch to a dead channel (rare but true)
229//
230 if (cP == (XrdSys::IOEvents::Channel *)&deadChP) return;
231
232// Translate the event to something reasonable
233//
234 if (pollEv & pollER)
235 {eTxt = "polling";
236 eNum = (pollEv & POLLERR ? EPIPE : ECONNRESET); // Error or HUP
237 }
238 else if (pollEv & pollOK)
239 {if (pollEv & pollRD) events |= CallBack::ReadyToRead;
240 if (pollEv & pollWR) events |= CallBack::ReadyToWrite;
241 eNum = 0; eTxt = 0;
242 }
243 else {eTxt = "polling"; eNum = EIO;}
244
245// Execute the callback
246//
247 cbNow = cP;
248 if (!CbkXeq(cP, events, eNum, eTxt)) Exclude(cP, isLocked, 0);
249 else Modify(cP, eNum, &eTxt, isLocked);
250 cbNow = 0;
251}
252
253/******************************************************************************/
254/* Protected: E x c l u d e */
255/******************************************************************************/
256
258 bool &isLocked, bool dover)
259{
260
261// Remove this channel from the poll set. We ignore errors as the descriptor
262// may have been closed prior to this call (though this shouldn't happen).
263//
264 port_dissociate(pollDfd, PORT_SOURCE_FD, cP->GetFD());
265
266// If we need to verify this action, sync with the poller thread (note that the
267// poller thread will not ask for this action unless it wants to deadlock). We
268// may actually deadlock anyway if the channel lock is held. We are allowed to
269// release it if the caller locked it. This will prevent a deadlock.
270//
271 if (dover)
272 {PipeData cmdbuff;
273 if (isLocked)
274 {isLocked = false;
275 UnLockChannel(cP);
276 }
277 cmdbuff.req = PipeData::RmFD;
278 cmdbuff.fd = cP->GetFD();
279 SendCmd(cmdbuff);
280 } else {
281 if (cbNow && cbNow != cP)
282 for (int i = cbCurr+1; i < numPoll; i++)
283 {if (cP == (Channel *)pollTab[i].portev_user)
284 pollTab[i].portev_user = &deadChP;
285 }
286 }
287}
288
289/******************************************************************************/
290/* Protected: I n c l u d e */
291/******************************************************************************/
292
294 int &eNum,
295 const char **eTxt,
296 bool &isLocked)
297{
298 int pEvents = 0, events = cP->GetEvents();
299
300// Establish new event mask
301//
302 if (events & Channel:: readEvents) pEvents = pollRD;
303 if (events & Channel::writeEvents) pEvents |= pollWR;
304
305// Add this fd to the poll set
306//
307 if (port_associate(pollDfd, PORT_SOURCE_FD, cP->GetFD(), pEvents, cP))
308 {eNum = errno;
309 if (eTxt) *eTxt = "adding channel";
310 return false;
311 }
312
313// All went well.
314//
315 return true;
316}
317
318/******************************************************************************/
319/* Protected: M o d i f y */
320/******************************************************************************/
321
323 int &eNum,
324 const char **eTxt,
325 bool &isLocked)
326{
327 int pEvents = 0, events = cP->GetEvents();
328
329// Establish new event mask
330//
331 if (events & Channel:: readEvents) pEvents = pollRD;
332 if (events & Channel::writeEvents) pEvents |= pollWR;
333
334// Associate the fd to the poll set
335//
336 if (port_associate(pollDfd, PORT_SOURCE_FD, cP->GetFD(), pEvents, cP))
337 {eNum = errno;
338 if (eTxt) *eTxt = "modifying poll events";
339 return false;
340 }
341
342// All done
343//
344 return true;
345}
346
347/******************************************************************************/
348/* Private: P r o c e s s */
349/******************************************************************************/
350
351bool XrdSys::IOEvents::PollPort::Process(int curr)
352{
353// Get the pipe request and check out actions of interest.
354//
355 if (GetRequest())
356 { if (reqBuff.req == PipeData::RmFD)
357 {Channel *cP;
358 for (int i = curr+1; i < numPoll; i++)
359 {if (reqBuff.fd == (int)pollTab[i].portev_object)
360 pollTab[i].portev_user = &deadChP;
361 }
362 reqBuff.theSem->Post();
363 }
364 else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
365 return false;
366 }
367 }
368
369// Associate the pipe again and return true
370//
371 port_associate(pollDfd, PORT_SOURCE_FD, reqFD, pollRD, 0);
372 return true;
373}
374
375/******************************************************************************/
376/* Protected: S h u t d o w n */
377/******************************************************************************/
378
380{
381 static XrdSysMutex shutMutex;
382
383// To avoid race conditions, we serialize this code
384//
385 shutMutex.Lock();
386
387// Release the poll table
388//
389 if (pollTab) {free(pollTab); pollTab = 0;}
390
391// Close the epoll file descriptor
392//
393 if (pollDfd >= 0) {close(pollDfd); pollDfd = -1;}
394
395// All done
396//
397 shutMutex.UnLock();
398}
int fcntl(int fd, int cmd,...)
#define close(a)
Definition XrdPosix.hh:43
#define eMsg(x)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:99
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
timespec_t * BegTO(timespec_t &theTO)
PollPort(port_event_t *ptab, int numfd, int pfd, int pFD[2])
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
static int AllocMem(void **memP, int slots)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)