Yet Another eXchange Tool 0.11.3
Loading...
Searching...
No Matches
xt_exchanger_simple_base.c
Go to the documentation of this file.
1
12/*
13 * Keywords:
14 * Maintainer: Jörg Behrens <behrens@dkrz.de>
15 * Moritz Hanke <hanke@dkrz.de>
16 * Thomas Jahns <jahns@dkrz.de>
17 * URL: https://dkrz-sw.gitlab-pages.dkrz.de/yaxt/
18 *
19 * Redistribution and use in source and binary forms, with or without
20 * modification, are permitted provided that the following conditions are
21 * met:
22 *
23 * Redistributions of source code must retain the above copyright notice,
24 * this list of conditions and the following disclaimer.
25 *
26 * Redistributions in binary form must reproduce the above copyright
27 * notice, this list of conditions and the following disclaimer in the
28 * documentation and/or other materials provided with the distribution.
29 *
30 * Neither the name of the DKRZ GmbH nor the names of its contributors
31 * may be used to endorse or promote products derived from this software
32 * without specific prior written permission.
33 *
34 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 */
46#ifdef HAVE_CONFIG_H
47#include <config.h>
48#endif
49
50#include <assert.h>
51#include <mpi.h>
52#include <stdbool.h>
53
54#include "core/core.h"
55#include "core/ppm_xfuncs.h"
56#include "xt/xt_mpi.h"
57#include "xt_config_internal.h"
58#include "xt_mpi_internal.h"
59#include "xt_redist_internal.h"
60#include "xt_exchanger.h"
62
63static const char filename[] = "xt_exchanger_simple_base.c";
64
65static Xt_exchanger
67 MPI_Comm newComm, int new_tag_offset);
70 const void * src_data,
71 void * dst_data);
73 const void * src_data,
74 void * dst_data,
75 Xt_request *request);
76static int
78 enum xt_msg_direction direction,
79 int *restrict *ranks);
80
81static MPI_Datatype
83 int rank,
84 enum xt_msg_direction direction,
85 bool do_dup);
86
89
99
101
115
118{
119 Xt_exchanger_simple_base exchanger;
120 size_t header_size = sizeof(*exchanger),
121 body_size = nmsg * sizeof (exchanger->msgs[0]);
122 exchanger = xmalloc(header_size + body_size);
124 return exchanger;
125}
126
127static inline int
128adjusted_rank(int r, int comm_rank, int comm_size)
129{
130 return r + (r <= comm_rank ? comm_size : 0);
131}
132
133#define XT_SORTFUNC_DECL static
134#define SORT_TYPE struct Xt_redist_msg
135#define SORT_TYPE_SUFFIX redist_msg
136#define SORT_TYPE_CMP_LT(u,v,i,j) \
137 (adjusted_rank((u).rank, comm_rank, comm_size) \
138 < adjusted_rank((v).rank, comm_rank, comm_size))
139#define SORT_TYPE_CMP_LE(u,v,i,j) \
140 (adjusted_rank((u).rank, comm_rank, comm_size) \
141 <= adjusted_rank((v).rank, comm_rank, comm_size))
142#define SORT_TYPE_CMP_EQ(u,v,i,j) (((u).rank) == ((v).rank))
143#define XT_SORT_EXTRA_ARGS_DECL , int comm_rank, int comm_size
144#define XT_SORT_EXTRA_ARGS_PASS , comm_rank, comm_size
145#define XT_SORT_VECSWAP_EXTRA_ARGS_DECL
146#define XT_SORT_VECSWAP_EXTRA_ARGS_PASS
147
148#include "xt_quicksort_base.h"
149
152 int nsend, int nrecv,
153 const struct Xt_redist_msg *send_msgs,
154 const struct Xt_redist_msg *recv_msgs,
155 MPI_Comm comm, int tag_offset,
158 xt_simple_create_omp_share_func create_omp_share_func,
159 Xt_config config)
160{
166 if (s_func == 0)
167 Xt_abort(comm, "ERROR(xt_exchanger_simple_base_new): invalid synchronous "
168 "exchange function pointer", filename, __LINE__);
169
170 assert((nsend >= 0) & (nrecv >= 0));
171 size_t nmsg = (size_t)nsend + (size_t)nrecv;
174 exchanger->comm = comm;
175 exchanger->tag_offset = tag_offset;
176 exchanger->nmsg[SEND] = nsend;
177 exchanger->nmsg[RECV] = nrecv;
178 bool dt_dup = !(config->flags & exch_no_dt_dup);
179 xt_redist_msgs_strided_copy((size_t)nsend, send_msgs, sizeof (send_msgs[0]),
180 exchanger->msgs, sizeof (exchanger->msgs[0]),
181 comm, dt_dup);
182 xt_redist_msgs_strided_copy((size_t)nrecv, recv_msgs, sizeof (recv_msgs[0]),
183 exchanger->msgs + nsend,
184 sizeof (exchanger->msgs[0]),
185 comm, dt_dup);
186 exchanger->s_func = s_func;
187 exchanger->a_func = a_func;
188 exchanger->create_omp_share_func = create_omp_share_func;
189
190 {
191 int comm_size, comm_rank, is_inter;
192 xt_mpi_call(MPI_Comm_rank(comm, &comm_rank), comm);
193 xt_mpi_call(MPI_Comm_test_inter(comm, &is_inter), comm);
194 int (*get_comm_size)(MPI_Comm, int *)
195 = is_inter ? MPI_Comm_remote_size : MPI_Comm_size;
196 xt_mpi_call(get_comm_size(comm, &comm_size), comm);
197 /* In order to avoid congestion of messages, the order of send and
198 * receive messages is changed. This is done by sorting the
199 * messages according to the rank of the respective message
200 * partner. Before the sorting to ranks that are smaller or equal
201 * to the local rank the size of the communicator is added.
202 *
203 * example: process 5 is supposed to communicate with
204 * processes: 9, 5, 2, 6, 1
205 * 1. add comm_size(10): 9, 15, 12, 6, 11
206 * 2. sort: 6, 9, 11, 12, 15
207 * 3. subtrace comm_size again -> final order: 6, 9, 1, 2, 5
208 */
209 xt_quicksort_redist_msg(exchanger->msgs, (size_t)nsend,
210 comm_rank, comm_size);
211 xt_quicksort_redist_msg(exchanger->msgs + (size_t)nsend, (size_t)nrecv,
212 comm_rank, comm_size);
213 }
214
215 return (Xt_exchanger)exchanger;
216}
217
218static Xt_exchanger
220 MPI_Comm new_comm, int new_tag_offset)
221{
222 Xt_exchanger_simple_base exchanger_sb =
223 (Xt_exchanger_simple_base)exchanger;
224 int nsend = exchanger_sb->nmsg[SEND],
225 nrecv = exchanger_sb->nmsg[RECV];
226 size_t nmsg = (size_t)nsend + (size_t)nrecv;
227 Xt_exchanger_simple_base exchanger_copy
229 exchanger_copy->nmsg[SEND] = nsend;
230 exchanger_copy->nmsg[RECV] = nrecv;
231 exchanger_copy->s_func = exchanger_sb->s_func;
232 exchanger_copy->a_func = exchanger_sb->a_func;
233 exchanger_copy->create_omp_share_func = exchanger_sb->create_omp_share_func;
234 struct Xt_redist_msg *restrict new_msgs = exchanger_copy->msgs,
235 *restrict orig_msgs = exchanger_sb->msgs;
236 xt_redist_msgs_strided_copy(nmsg, orig_msgs, sizeof (*orig_msgs),
237 new_msgs, sizeof (*new_msgs),
238 new_comm, true);
239 exchanger_copy->comm = new_comm;
240 exchanger_copy->tag_offset = new_tag_offset;
241 return (Xt_exchanger)exchanger_copy;
242}
243
244
246
247 Xt_exchanger_simple_base exchanger_sb =
248 (Xt_exchanger_simple_base)exchanger;
249
250 size_t nmsg = (size_t)exchanger_sb->nmsg[SEND] + (size_t)exchanger_sb->nmsg[RECV];
251 struct Xt_redist_msg *restrict msgs = exchanger_sb->msgs;
252 xt_redist_msgs_strided_destruct(nmsg, msgs, exchanger_sb->comm,
253 sizeof (msgs[0]));
254 free(exchanger_sb);
255}
256
258 const void * src_data,
259 void * dst_data) {
260
261 Xt_exchanger_simple_base exchanger_sb =
262 (Xt_exchanger_simple_base)exchanger;
263
264 int nsend = exchanger_sb->nmsg[SEND];
265 exchanger_sb->s_func(src_data, dst_data, nsend,
266 exchanger_sb->nmsg[RECV], exchanger_sb->msgs,
267 exchanger_sb->msgs + (size_t)nsend,
268 exchanger_sb->tag_offset, exchanger_sb->comm);
269}
270
272 const void * src_data,
273 void * dst_data,
274 Xt_request *request) {
275
276 Xt_exchanger_simple_base exchanger_sb =
277 (Xt_exchanger_simple_base)exchanger;
278
279 if (exchanger_sb->a_func == NULL)
280 Xt_abort(exchanger_sb->comm, "ERROR(xt_exchanger_simple_base_a_exchange): "
281 "asynchronous exchange function not defined for current exchanger",
282 filename, __LINE__);
283
284 int nsend = exchanger_sb->nmsg[SEND];
285 exchanger_sb->a_func(src_data, dst_data, nsend,
286 exchanger_sb->nmsg[RECV], exchanger_sb->msgs,
287 exchanger_sb->msgs + (size_t)nsend,
288 exchanger_sb->tag_offset, exchanger_sb->comm, request);
289}
290
291static MPI_Datatype
293 int rank,
294 enum xt_msg_direction direction,
295 bool do_dup)
296{
297 Xt_exchanger_simple_base exchanger_sb =
298 (Xt_exchanger_simple_base)exchanger;
299 size_t nsend = (size_t)exchanger_sb->nmsg[SEND],
300 nmsg = (size_t)exchanger_sb->nmsg[direction],
301 ofs = direction == SEND ? 0 : nsend;
302 struct Xt_redist_msg *restrict msgs = exchanger_sb->msgs + ofs;
303 MPI_Datatype datatype_copy = MPI_DATATYPE_NULL;
304 for (size_t i = 0; i < nmsg; ++i)
305 if (msgs[i].rank == rank) {
306 if (do_dup)
307 xt_mpi_call(MPI_Type_dup(msgs[i].datatype, &datatype_copy),
308 exchanger_sb->comm);
309 else
310 datatype_copy = msgs[i].datatype;
311 break;
312 }
313 return datatype_copy;
314}
315
316static int
318 enum xt_msg_direction direction,
319 int *restrict *ranks)
320{
321 Xt_exchanger_simple_base exchanger_sb = (Xt_exchanger_simple_base)exchanger;
322 size_t nmsg = (size_t)exchanger_sb->nmsg[direction];
323 struct Xt_redist_msg *restrict msgs = exchanger_sb->msgs
324 + (direction == RECV ? (size_t)exchanger_sb->nmsg[SEND] : 0);
325 int *restrict ranks_ = *ranks;
326 if (!ranks_)
327 ranks_ = *ranks = xmalloc(nmsg * sizeof (*ranks_));
328 for (size_t i = 0; i < nmsg; ++i)
329 ranks_[i] = msgs[i].rank;
330 return (int)nmsg;
331}
332
335{
336 Xt_exchanger_simple_base exchanger_sb =
337 (Xt_exchanger_simple_base)exchanger;
338 int nsend = exchanger_sb->nmsg[SEND];
339 return exchanger_sb->create_omp_share_func(
340 nsend, exchanger_sb->nmsg[RECV],
341 exchanger_sb->msgs, exchanger_sb->msgs + nsend, exchanger_sb->comm);
342}
343
344/*
345 * Local Variables:
346 * c-basic-offset: 2
347 * coding: utf-8
348 * indent-tabs-mode: nil
349 * show-trailing-whitespace: t
350 * require-trailing-newline: t
351 * End:
352 */
int MPI_Comm
Definition core.h:64
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition ppm_xfuncs.h:70
xt_simple_a_exchange_func a_func
xt_simple_s_exchange_func s_func
xt_simple_create_omp_share_func create_omp_share_func
const struct xt_exchanger_vtable * vtable
MPI_Datatype datatype
Definition xt_redist.h:69
Xt_exchanger(* copy)(Xt_exchanger, MPI_Comm, int)
int MPI_Type_dup(MPI_Datatype oldtype, MPI_Datatype *newtype)
implementation of configuration object
@ exch_no_dt_dup
exchanging of data based on information provided by redist's
struct Xt_exchanger_omp_share_ * Xt_exchanger_omp_share
const struct xt_exchanger_vtable xt_exchanger_simple_base_vtable
static void xt_exchanger_simple_base_a_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data, Xt_request *request)
static int adjusted_rank(int r, int comm_rank, int comm_size)
static const char filename[]
Xt_exchanger xt_exchanger_simple_base_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset, xt_simple_s_exchange_func s_func, xt_simple_a_exchange_func a_func, xt_simple_create_omp_share_func create_omp_share_func, Xt_config config)
static Xt_exchanger_simple_base xt_exchanger_simple_base_alloc(size_t nmsg)
static int xt_exchanger_simple_base_get_msg_ranks(Xt_exchanger exchanger, enum xt_msg_direction direction, int *restrict *ranks)
static void xt_exchanger_simple_base_delete(Xt_exchanger exchanger)
static Xt_exchanger xt_exchanger_simple_base_copy(Xt_exchanger exchanger, MPI_Comm newComm, int new_tag_offset)
static void xt_exchanger_simple_base_s_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data)
struct Xt_exchanger_simple_base_ * Xt_exchanger_simple_base
static Xt_exchanger_omp_share xt_exchanger_simple_base_create_omp_share(Xt_exchanger exchanger)
static MPI_Datatype xt_exchanger_simple_base_get_MPI_Datatype(Xt_exchanger exchanger, int rank, enum xt_msg_direction direction, bool do_dup)
void(* xt_simple_s_exchange_func)(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm)
void(* xt_simple_a_exchange_func)(const void *src_data, void *dst_data, int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, int tag_offset, MPI_Comm comm, Xt_request *request)
Xt_exchanger_omp_share(* xt_simple_create_omp_share_func)(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm)
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition xt_mpi.h:68
macros to create quicksort implementations
redistribution of data, non-public declarations
xt_msg_direction
PPM_DSO_INTERNAL void xt_redist_msgs_strided_copy(size_t n, const struct Xt_redist_msg *restrict src, size_t src_stride, struct Xt_redist_msg *restrict dst, size_t dst_stride, MPI_Comm comm, bool dt_dup)
PPM_DSO_INTERNAL void xt_redist_msgs_strided_destruct(size_t n, struct Xt_redist_msg *msgs, MPI_Comm comm, size_t ofs_stride)