XRootD
Loading...
Searching...
No Matches
XrdXrootdAioPgrw.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d X r o o t d A i o P g r w . c c */
4/* */
5/* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <unistd.h>
32#include <arpa/inet.h>
33
34#include "Xrd/XrdBuffer.hh"
42
43#define TRACELINK reqP
44#define ID ID()
45
46/******************************************************************************/
47/* G l o b a l S t a t i c s */
48/******************************************************************************/
49
51
52namespace XrdXrootd
53{
55}
56
57using namespace XrdXrootd;
58
59/******************************************************************************/
60/* L o c a l S t a t i c s */
61/******************************************************************************/
62
63const char *XrdXrootdAioPgrw::TraceID = "PgrwBuff";
64
65namespace
66{
67XrdSysMutex fqMutex;
68XrdXrootdAioBuff *fqFirst = 0;
69int numFree = 0;
70
71static const int csLen = sizeof(uint32_t);
72
73static const int maxKeep = 64; // 4MB in the pocket
74}
75
76/******************************************************************************/
77/* C o n s t r u c t o r */
78/******************************************************************************/
79
81 : XrdXrootdAioBuff(this, tP, bP)
82{
83 char *buff = bP->buff;
84 uint32_t *csV = csVec;
85
86// Fill out the iovec
87//
88 for (int i = 1; i <= acsSZ<<1; i+= 2)
89 {ioVec[i ].iov_base = csV;
90 ioVec[i ].iov_len = csLen;
91 ioVec[i+1].iov_base = buff;
92 ioVec[i+1].iov_len = XrdProto::kXR_pgPageSZ;
93 csV++;
95 }
96
97// Complete initialization
98//
99 Result = 0;
100 iovReset = 0;
101 cksVec = csVec;
102 TIdent = "AioPgrw";
103}
104
105/******************************************************************************/
106/* D e s t r u c t o r */
107/******************************************************************************/
108
110{
111// Recycle the buffer if we have one
112//
113 if (buffP) BPool->Release(buffP);
114}
115
116/******************************************************************************/
117/* A l l o c */
118/******************************************************************************/
119
121{
122 XrdXrootdAioBuff *aiobuff;
123
124// Obtain a preallocated aio object
125//
126 fqMutex.Lock();
127 if ((aiobuff = fqFirst))
128 {fqFirst = aiobuff->next;
129 numFree--;
130 }
131 fqMutex.UnLock();
132
133// If we have no object, create a new one. Otherwise initialize n old one
134//
135 if (!aiobuff)
136 {XrdBuffer *bP = BPool->Obtain(aioSZ);
137 if (!bP) return 0;
138 aiobuff = new XrdXrootdAioPgrw(arp, bP);
139 } else {
140 aiobuff->Result = 0;
141 aiobuff->cksVec = aiobuff->pgrwP->csVec;
142 aiobuff->pgrwP->reqP = arp;
143 }
144
145// Update aio counters
146//
147 arp->urProtocol()->aioUpdate(1);
148
149// All done
150//
151 return aiobuff->pgrwP;
152}
153
154/******************************************************************************/
155/* i o v 4 R e c v */
156/******************************************************************************/
157
158struct iovec *XrdXrootdAioPgrw::iov4Recv(int &iovNum)
159{
160// Readjust ioVec as needed
161//
162 if (aioSZ != (int)sfsAio.aio_nbytes)
163 {int fLen, lLen;
164 csNum = XrdOucPgrwUtils::csNum(sfsAio.aio_offset, sfsAio.aio_nbytes,
165 fLen, lLen);
166 ioVec[2].iov_len = fLen;
167 if (csNum > 1 && lLen != XrdProto::kXR_pgPageSZ)
168 {iovReset = csNum<<1;
169 ioVec[iovReset].iov_len = lLen;
170 }
171 } else csNum = acsSZ;
172
173// Return the iovec reception args
174//
175 iovNum = (csNum<<1);
176 return &ioVec[1];
177}
178
179/******************************************************************************/
180/* i o v 4 S e n d */
181/******************************************************************************/
182
183struct iovec *XrdXrootdAioPgrw::iov4Send(int &iovNum, int &iovLen, bool cs2net)
184{
185 int fLen, lLen;
186
187// Recalculate the iovec values for first and last read and summary values
188//
189 if (Result > 0)
190 {csNum = XrdOucPgrwUtils::csNum(sfsAio.aio_offset, Result, fLen, lLen);
191 iovNum = (csNum<<1) + 1;
192 iovLen = Result + (csNum * sizeof(uint32_t));
193 ioVec[2].iov_len = fLen;
194 if (csNum > 1 && lLen != XrdProto::kXR_pgPageSZ)
195 {iovReset = csNum<<1;
196 ioVec[iovReset].iov_len = lLen;
197 }
198 } else csNum = 0;
199
200// Convert checksums to net order if so requested
201//
202 if (cs2net) for (int i = 0; i < csNum; i++) csVec[i] = htonl(csVec[i]);
203
204// Return the iovec
205//
206 return ioVec;
207}
208
209/******************************************************************************/
210/* R e c y c l e */
211/******************************************************************************/
212
214{
215// Do some tracing
216//
217 TRACE(FSAIO, " Recycle " <<sfsAio.aio_nbytes<<'@'
218 <<sfsAio.aio_offset<<" numF="<<numFree);
219
220// Update aio counters
221//
222 reqP->urProtocol()->aioUpdate(-1);
223
224// Place the object on the free queue if possible
225//
226 fqMutex.Lock();
227 if (numFree >= maxKeep)
228 {fqMutex.UnLock();
229 delete this;
230 } else {
231 next = fqFirst;
232 fqFirst = this;
233 numFree++;
234 fqMutex.UnLock();
235 }
236}
237
238/******************************************************************************/
239/* S e t u p 2 R e c v */
240/******************************************************************************/
241
242int XrdXrootdAioPgrw::Setup2Recv(off_t offs, int dlen, const char *&eMsg)
243{
245
246// Reset any truncated segement in the iov vector
247//
248 if (iovReset)
249 {ioVec[iovReset].iov_len = XrdProto::kXR_pgPageSZ;
250 iovReset = 0;
251 }
252
253// Get the layout for the iovec
254//
255 if (!(csNum = XrdOucPgrwUtils::recvLayout(layout, offs, dlen, aioSZ)))
256 {eMsg = layout.eWhy;
257 return 0;
258 }
259 eMsg = 0;
260
261// Set the length of the first and last segments. Note that our iovec has
262// an extra leading element meant for writing to the network.
263//
264 ioVec[2].iov_len = layout.fLen;
265 if (csNum > 1 && layout.lLen < XrdProto::kXR_pgPageSZ)
266 {iovReset = csNum<<1;
267 ioVec[iovReset].iov_len = layout.lLen;
268 }
269
270// Setup for actual writing of the data
271//
272 sfsAio.aio_buf = ioVec[2].iov_base = buffP->buff + layout.bOffset;
273 sfsAio.aio_nbytes = layout.dataLen;
274 sfsAio.aio_offset = offs;
275
276// Return how much we can read from the socket
277//
278 return layout.sockLen;
279}
280
281/******************************************************************************/
282/* S e t u p 2 S e n d */
283/******************************************************************************/
284
285int XrdXrootdAioPgrw::Setup2Send(off_t offs, int dlen, const char *&eMsg)
286{
288
289// Reset any truncated segement in the iov vector
290//
291 if (iovReset)
292 {ioVec[iovReset].iov_len = XrdProto::kXR_pgPageSZ;
293 iovReset = 0;
294 }
295
296// Get the layout for the iovec
297//
298 if (!(csNum = XrdOucPgrwUtils::sendLayout(layout, offs, dlen, aioSZ)))
299 {eMsg = layout.eWhy;
300 return 0;
301 }
302 eMsg = 0;
303
304// Set the length of the first and last segments. Note that our iovec has
305// an extra leading element meant for writing to the network.
306//
307 ioVec[2].iov_len = layout.fLen;
308 if (csNum > 1 && layout.lLen < XrdProto::kXR_pgPageSZ)
309 {iovReset = csNum<<1;
310 ioVec[iovReset].iov_len = layout.lLen;
311 }
312
313// Setup for actual writing of the data
314//
315 sfsAio.aio_buf = ioVec[2].iov_base = buffP->buff + layout.bOffset;
316 sfsAio.aio_nbytes = layout.dataLen;
317 sfsAio.aio_offset = offs;
318
319// Return how much we can write to the socket
320//
321 return layout.dataLen;
322}
XrdOucTrace * XrdXrootdTrace
#define eMsg(x)
#define TRACE(act, x)
Definition XrdTrace.hh:63
char * buff
Definition XrdBuffer.hh:45
off_t bOffset
Buffer offset to apply iov[1].iov_base.
int dataLen
Total number of filesys bytes the iovec will handle.
static int sendLayout(Layout &layout, off_t offs, int dlen, int bsz=0)
int fLen
Length to use for iov[1].iov_len.
int sockLen
Total number of network bytes the iovec will handle.
const char * eWhy
Reason for failure when zero is returned.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
int lLen
Length to use for iov[csnum*2-1].iov_len)
static int recvLayout(Layout &layout, off_t offs, int dlen, int bsz=0)
Compute the layout for an iovec that receives network bytes applying.
uint32_t * cksVec
Definition XrdSfsAio.hh:63
ssize_t Result
Definition XrdSfsAio.hh:65
const char * TIdent
Definition XrdSfsAio.hh:67
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
XrdXrootdAioBuff * next
XrdXrootdAioPgrw *const pgrwP
XrdXrootdAioTask * reqP
XrdXrootdAioBuff(XrdXrootdAioTask *tP, XrdBuffer *bP)
int Setup2Send(off_t offs, int dlen, const char *&eMsg)
struct iovec * iov4Send(int &iovNum, int &iovLen, bool cs2net=false)
static const int acsSZ
static const int aioSZ
XrdXrootdAioPgrw(XrdXrootdAioTask *tP, XrdBuffer *bP)
struct iovec * iov4Recv(int &iovNum)
void Recycle() override
static XrdXrootdAioPgrw * Alloc(XrdXrootdAioTask *arp)
int Setup2Recv(off_t offs, int dlen, const char *&eMsg)
XrdXrootdProtocol * urProtocol()
static const int kXR_pgPageSZ
Definition XProtocol.hh:494
XrdBuffManager * BPool