Yet Another eXchange Tool 0.11.3
Loading...
Searching...
No Matches
xt_exchanger_mix_isend_irecv.c
Go to the documentation of this file.
1
12/*
13 * Keywords:
14 * Maintainer: Jörg Behrens <behrens@dkrz.de>
15 * Moritz Hanke <hanke@dkrz.de>
16 * Thomas Jahns <jahns@dkrz.de>
17 * URL: https://dkrz-sw.gitlab-pages.dkrz.de/yaxt/
18 *
19 * Redistribution and use in source and binary forms, with or without
20 * modification, are permitted provided that the following conditions are
21 * met:
22 *
23 * Redistributions of source code must retain the above copyright notice,
24 * this list of conditions and the following disclaimer.
25 *
26 * Redistributions in binary form must reproduce the above copyright
27 * notice, this list of conditions and the following disclaimer in the
28 * documentation and/or other materials provided with the distribution.
29 *
30 * Neither the name of the DKRZ GmbH nor the names of its contributors
31 * may be used to endorse or promote products derived from this software
32 * without specific prior written permission.
33 *
34 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
35 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
36 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
37 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
38 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
39 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
40 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
41 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
42 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
43 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
44 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 */
46#ifdef HAVE_CONFIG_H
47#include <config.h>
48#endif
49
50#include <assert.h>
51#include <mpi.h>
52#ifdef _OPENMP
53#include <omp.h>
54#endif
55
56#include "core/core.h"
57#include "core/ppm_xfuncs.h"
58#include "xt_config_internal.h"
59#include "xt/xt_mpi.h"
60#include "xt/xt_request_msgs.h"
62#include "xt_mpi_internal.h"
63#include "xt_redist_internal.h"
64#include "xt_exchanger.h"
66
67/* unfortunately GCC 11 to 13 cannot handle the literal constants used for
68 * MPI_STATUSES_IGNORE by MPICH */
69#if __GNUC__ >= 11 && __GNUC__ <= 13 && defined MPICH
70#pragma GCC diagnostic push
71#pragma GCC diagnostic ignored "-Wstringop-overread"
72#pragma GCC diagnostic ignored "-Wstringop-overflow"
73#endif
74
75static Xt_exchanger
77 MPI_Comm newComm, int new_tag_offset);
79static void
81 const void *src_data,
82 void *dst_data);
83#ifdef _OPENMP
84static void
85xt_exchanger_mix_isend_irecv_s_exchange_omp(Xt_exchanger exchanger,
86 const void *src_data,
87 void *dst_data);
88#endif
89static void
91 Xt_exchanger exchanger, const void * src_data, void * dst_data,
92 Xt_request *request);
93#ifdef _OPENMP
94static void
95xt_exchanger_mix_isend_irecv_a_exchange_omp(
96 Xt_exchanger exchanger, const void * src_data, void * dst_data,
97 Xt_request *request);
98#endif
101
102static int
104 enum xt_msg_direction direction,
105 int *restrict *ranks);
106
107static MPI_Datatype
109 int rank,
110 enum xt_msg_direction direction,
111 bool do_dup);
112
113const struct xt_exchanger_vtable
123#ifdef _OPENMP
124 ,
125xt_exchanger_mix_isend_irecv_auto_omp_vtable = {
128 .s_exchange = xt_exchanger_mix_isend_irecv_s_exchange_omp,
129 .a_exchange = xt_exchanger_mix_isend_irecv_a_exchange_omp,
133}
134#endif
135;
136
138
139#define MSG_DIR(msg) ((msg).rank < 0)
140
149
152 Xt_config config)
153{
154 (void)config;
156 size_t header_size = sizeof (*exchanger),
157 body_size = sizeof (struct Xt_redist_msg) * nmsg;
158 exchanger = xmalloc(header_size + body_size);
159 exchanger->n = (int)nmsg;
160#ifdef _OPENMP
161 int mthread_mode = xt_config_get_redist_mthread_mode(config);
162 if (mthread_mode == XT_MT_OPENMP)
163 exchanger->vtable = &xt_exchanger_mix_isend_irecv_auto_omp_vtable;
164 else
165#endif
167 return exchanger;
168}
169
170static inline int
171adjusted_rank(int r, int comm_rank, int comm_size)
172{
173 int r_ = r & INT_MAX;
174 return r_ + (r_ <= comm_rank ? comm_size : 0);
175}
176
177#define XT_SORTFUNC_DECL static
178#define SORT_TYPE struct Xt_redist_msg
179#define SORT_TYPE_SUFFIX redist_msg_mix
180#define SORT_TYPE_CMP_LT(u,v,i,j) \
181 (adjusted_rank((u).rank, comm_rank, comm_size) \
182 < adjusted_rank((v).rank, comm_rank, comm_size))
183#define SORT_TYPE_CMP_LE(u,v,i,j) \
184 (adjusted_rank((u).rank, comm_rank, comm_size) \
185 <= adjusted_rank((v).rank, comm_rank, comm_size))
186#define SORT_TYPE_CMP_EQ(u,v,i,j) \
187 (((u).rank & INT_MAX) == ((v).rank & INT_MAX))
188#define XT_SORT_EXTRA_ARGS_DECL , int comm_rank, int comm_size
189#define XT_SORT_EXTRA_ARGS_PASS , comm_rank, comm_size
190#define XT_SORT_VECSWAP_EXTRA_ARGS_DECL
191#define XT_SORT_VECSWAP_EXTRA_ARGS_PASS
192
193#include "xt_quicksort_base.h"
194
195
198 const struct Xt_redist_msg *send_msgs,
199 const struct Xt_redist_msg *recv_msgs,
200 MPI_Comm comm, int tag_offset,
201 Xt_config config)
202{
208 assert((nsend >= 0) & (nrecv >= 0));
209 size_t nmsg = (size_t)nsend + (size_t)nrecv;
212 exchanger->comm = comm;
213 exchanger->tag_offset = tag_offset;
214 struct Xt_redist_msg *restrict msgs = exchanger->msgs;
215 bool dt_dup = !(config->flags & exch_no_dt_dup);
216 xt_redist_msgs_strided_copy((size_t)nsend, send_msgs, sizeof (send_msgs[0]),
217 msgs, sizeof (msgs[0]), comm, dt_dup);
218 xt_redist_msgs_strided_copy((size_t)nrecv, recv_msgs, sizeof (recv_msgs[0]),
219 msgs+nsend, sizeof (msgs[0]), comm,
220 dt_dup);
221 for (size_t i = 0; i < (size_t)nrecv; ++i)
222 msgs[i + (size_t)nsend].rank |= ~INT_MAX;
223
224 {
225 int comm_size, comm_rank, is_inter;
226 xt_mpi_call(MPI_Comm_rank(comm, &comm_rank), comm);
227 xt_mpi_call(MPI_Comm_test_inter(comm, &is_inter), comm);
228 int (*get_comm_size)(MPI_Comm, int *)
229 = is_inter ? MPI_Comm_remote_size : MPI_Comm_size;
230 xt_mpi_call(get_comm_size(comm, &comm_size), comm);
231 xt_quicksort_redist_msg_mix(msgs, nmsg, comm_rank, comm_size);
232 }
233
234 for (size_t i = 1; i < nmsg; ++i) {
235
236 if ((msgs[i-1].rank & INT_MAX) == (msgs[i].rank & INT_MAX)
237 && MSG_DIR(msgs[i]) == SEND) {
238
239 struct Xt_redist_msg temp = msgs[i-1];
240 msgs[i-1] = msgs[i];
241 msgs[i] = temp;
242 i++;
243 }
244 }
245
246 return (Xt_exchanger)exchanger;
247}
248
249static Xt_exchanger
251 MPI_Comm new_comm, int new_tag_offset)
252{
253 Xt_exchanger_mix_isend_irecv exchanger_msr =
255 size_t nmsg = (size_t)exchanger_msr->n;
256 /* fixme: needs to use custom config */
257 Xt_exchanger_mix_isend_irecv exchanger_copy
259 exchanger_copy->comm = new_comm;
260 exchanger_copy->tag_offset = new_tag_offset;
261 exchanger_copy->vtable = exchanger_msr->vtable;
262 struct Xt_redist_msg *restrict new_msgs = exchanger_copy->msgs,
263 *restrict orig_msgs = exchanger_msr->msgs;
264 xt_redist_msgs_strided_copy(nmsg, orig_msgs, sizeof (*orig_msgs),
265 new_msgs, sizeof (*new_msgs),
266 new_comm, true);
267 return (Xt_exchanger)exchanger_copy;
268}
269
270enum { max_on_stack_req = 16 };
271
273
274 Xt_exchanger_mix_isend_irecv exchanger_msr =
276
277 size_t nmsg = (size_t)exchanger_msr->n;
278 struct Xt_redist_msg *restrict msgs = exchanger_msr->msgs;
279
280 xt_redist_msgs_strided_destruct(nmsg, msgs, exchanger_msr->comm,
281 sizeof (*msgs));
282 free(exchanger_msr);
283}
284
285static inline void
287 const struct Xt_redist_msg *restrict msgs,
288 const void *src_data, void *dst_data,
289 MPI_Request *requests,
290 MPI_Comm comm, int tag_offset)
291{
292 for (size_t i = 0; i < nmsg; ++i) {
293 typedef int (*ifp)(void *buf, int count, MPI_Datatype datatype, int dest,
294 int tag, MPI_Comm comm, MPI_Request *request);
295 ifp op = MSG_DIR(msgs[i]) == SEND ? (ifp)MPI_Isend : (ifp)MPI_Irecv;
296 void *data = MSG_DIR(msgs[i]) == SEND
297 ? (void *)(intptr_t)src_data : dst_data;
298 xt_mpi_call(op(data, 1, msgs[i].datatype, msgs[i].rank & INT_MAX,
299 tag_offset + xt_mpi_tag_exchange_msg,
300 comm, requests+i), comm);
301 }
302}
303
304
306 const void * src_data,
307 void * dst_data) {
308
309 Xt_exchanger_mix_isend_irecv exchanger_msr =
311
312 if (exchanger_msr->n > 0) {
313 size_t nmsg = (size_t)exchanger_msr->n;
314 MPI_Request req_buf[max_on_stack_req];
315 MPI_Request *requests
316 = nmsg <= max_on_stack_req
317 ? req_buf : xmalloc(nmsg * sizeof (*requests));
318 redist_msgs_to_req(nmsg, exchanger_msr->msgs,
319 src_data, dst_data, requests,
320 exchanger_msr->comm, exchanger_msr->tag_offset);
321 xt_mpi_call(MPI_Waitall((int)nmsg, requests, MPI_STATUSES_IGNORE),
322 exchanger_msr->comm);
323 if (requests != req_buf)
324 free(requests);
325 }
326}
327
329 Xt_exchanger exchanger, const void * src_data, void * dst_data,
330 Xt_request *request) {
331
332 Xt_exchanger_mix_isend_irecv exchanger_msr =
334
335 if (exchanger_msr->n > 0) {
336 size_t nmsg = (size_t)exchanger_msr->n;
337 struct Xt_config_ conf = xt_default_config;
339 Xt_request requests
340 = xt_request_msgs_alloc((int)nmsg, exchanger_msr->comm, &conf);
341 MPI_Request *requests_
342 = xt_request_msgs_get_req_ptr(requests);
343 redist_msgs_to_req(nmsg, exchanger_msr->msgs,
344 src_data, dst_data, requests_,
345 exchanger_msr->comm, exchanger_msr->tag_offset);
346 *request = requests;
347 } else
348 *request = XT_REQUEST_NULL;
349}
350
351#ifdef _OPENMP
352static void
353xt_exchanger_mix_isend_irecv_a_exchange_mt(Xt_exchanger exchanger,
354 const void * src_data,
355 void * dst_data,
356 Xt_exchanger_omp_share shared_req)
357{
358 MPI_Request *requests
360 Xt_exchanger_mix_isend_irecv exchanger_msr =
362 size_t num_threads = (size_t)omp_get_num_threads(),
363 tid = (size_t)omp_get_thread_num();
364 size_t nmsg = (size_t)exchanger_msr->n,
365 start = (nmsg * tid) / num_threads,
366 nmsg_ = (nmsg * (tid+1)) / num_threads - start;
367 redist_msgs_to_req(nmsg_, exchanger_msr->msgs+start,
368 src_data, dst_data, requests+start,
369 exchanger_msr->comm, exchanger_msr->tag_offset);
370}
371
372static void
373xt_exchanger_mix_isend_irecv_a_exchange_omp(Xt_exchanger exchanger,
374 const void *src_data,
375 void *dst_data,
376 Xt_request *request)
377{
378 Xt_exchanger_omp_share shared_req
380#pragma omp parallel
381 xt_exchanger_mix_isend_irecv_a_exchange_mt(exchanger, src_data, dst_data,
382 shared_req);
383 *request = (Xt_request)shared_req;
384}
385
386static void
387xt_exchanger_mix_isend_irecv_s_exchange_mt(Xt_exchanger exchanger,
388 const void * src_data,
389 void * dst_data,
390 Xt_exchanger_omp_share shared_req)
391{
392 MPI_Request *requests
394 Xt_exchanger_mix_isend_irecv exchanger_msr =
396 size_t num_threads = (size_t)omp_get_num_threads(),
397 tid = (size_t)omp_get_thread_num();
398 size_t nmsg = (size_t)exchanger_msr->n,
399 start = (nmsg * tid) / num_threads,
400 nmsg_ = (nmsg * (tid+1)) / num_threads - start;
401 redist_msgs_to_req(nmsg_, exchanger_msr->msgs+start,
402 src_data, dst_data, requests+start,
403 exchanger_msr->comm, exchanger_msr->tag_offset);
404 xt_mpi_call(MPI_Waitall((int)nmsg_, requests+start,
405 MPI_STATUSES_IGNORE), exchanger_msr->comm);
406}
407
408static void
409xt_exchanger_mix_isend_irecv_s_exchange_omp(Xt_exchanger exchanger,
410 const void * src_data,
411 void * dst_data)
412{
413 Xt_exchanger_omp_share shared_req
415#pragma omp parallel
416 xt_exchanger_mix_isend_irecv_s_exchange_mt(exchanger, src_data, dst_data,
417 shared_req);
418 free(shared_req);
419}
420#endif
421
424{
425 struct Xt_config_ conf = xt_default_config;
427 Xt_exchanger_mix_isend_irecv exchanger_msr =
429 return (Xt_exchanger_omp_share)xt_request_msgs_alloc(exchanger_msr->n,
430 exchanger_msr->comm,
431 &conf);
432}
433
434static int
436 enum xt_msg_direction direction,
437 int *restrict *ranks)
438{
439 Xt_exchanger_mix_isend_irecv exchanger_msr =
441 size_t nmsg = 0, nmsg_all = (size_t)exchanger_msr->n;
442 const struct Xt_redist_msg *restrict msgs = exchanger_msr->msgs;
443 for (size_t i = 0; i < nmsg_all; ++i)
444 nmsg += MSG_DIR(msgs[i]) == direction;
445 int *restrict ranks_ = *ranks;
446 if (!ranks_)
447 ranks_ = *ranks = xmalloc(nmsg * sizeof (*ranks_));
448 for (size_t i = 0, j = (size_t)-1; i < nmsg_all; ++i)
449 if (MSG_DIR(msgs[i]) == direction)
450 ranks_[++j] = msgs[i].rank & INT_MAX;
451 return (int)nmsg;
452}
453
454static MPI_Datatype
456 int rank,
457 enum xt_msg_direction direction,
458 bool do_dup)
459{
460 Xt_exchanger_mix_isend_irecv exchanger_msr =
462 size_t nmsg = (size_t)exchanger_msr->n;
463 struct Xt_redist_msg *restrict msgs = exchanger_msr->msgs;
464 MPI_Datatype datatype_copy = MPI_DATATYPE_NULL;
465 if (direction == RECV) rank |= ~INT_MAX;
466 for (size_t i = 0; i < nmsg; ++i)
467 if (msgs[i].rank == rank) {
468 if (do_dup)
469 xt_mpi_call(MPI_Type_dup(msgs[i].datatype, &datatype_copy),
470 exchanger_msr->comm);
471 else
472 datatype_copy = msgs[i].datatype;
473 break;
474 }
475 return datatype_copy;
476}
477
478/*
479 * Local Variables:
480 * c-basic-offset: 2
481 * coding: utf-8
482 * indent-tabs-mode: nil
483 * show-trailing-whitespace: t
484 * require-trailing-newline: t
485 * End:
486 */
int MPI_Comm
Definition core.h:64
add versions of standard API functions not returning on error
#define xmalloc(size)
Definition ppm_xfuncs.h:70
const struct xt_exchanger_vtable * vtable
MPI_Datatype datatype
Definition xt_redist.h:69
Xt_exchanger(* copy)(Xt_exchanger, MPI_Comm, int)
int MPI_Type_dup(MPI_Datatype oldtype, MPI_Datatype *newtype)
struct Xt_config_ xt_default_config
Definition xt_config.c:204
void xt_config_set_redist_mthread_mode(Xt_config config, int mode)
Definition xt_config.c:347
@ XT_MT_OPENMP
Definition xt_config.h:142
@ XT_MT_NONE
Definition xt_config.h:140
int xt_config_get_redist_mthread_mode(Xt_config config)
Definition xt_config.c:340
implementation of configuration object
@ exch_no_dt_dup
exchanging of data based on information provided by redist's
struct Xt_exchanger_omp_share_ * Xt_exchanger_omp_share
#define MSG_DIR(msg)
static int adjusted_rank(int r, int comm_rank, int comm_size)
static Xt_exchanger_mix_isend_irecv xt_exchanger_mix_isend_irecv_alloc(size_t nmsg, Xt_config config)
Xt_exchanger xt_exchanger_mix_isend_irecv_new(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset, Xt_config config)
static MPI_Datatype xt_exchanger_mix_isend_irecv_get_MPI_Datatype(Xt_exchanger exchanger, int rank, enum xt_msg_direction direction, bool do_dup)
struct Xt_exchanger_mix_isend_irecv_ * Xt_exchanger_mix_isend_irecv
static void xt_exchanger_mix_isend_irecv_a_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data, Xt_request *request)
static void xt_exchanger_mix_isend_irecv_delete(Xt_exchanger exchanger)
static int xt_exchanger_mix_isend_irecv_get_msg_ranks(Xt_exchanger exchanger, enum xt_msg_direction direction, int *restrict *ranks)
static void xt_exchanger_mix_isend_irecv_s_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data)
static Xt_exchanger xt_exchanger_mix_isend_irecv_copy(Xt_exchanger exchanger, MPI_Comm newComm, int new_tag_offset)
static void redist_msgs_to_req(size_t nmsg, const struct Xt_redist_msg *restrict msgs, const void *src_data, void *dst_data, MPI_Request *requests, MPI_Comm comm, int tag_offset)
static Xt_exchanger_omp_share xt_exchanger_mix_isend_irecv_create_omp_share(Xt_exchanger exchanger)
const struct xt_exchanger_vtable xt_exchanger_mix_isend_irecv_vtable
utility routines for MPI
#define xt_mpi_call(call, comm)
Definition xt_mpi.h:68
@ xt_mpi_tag_exchange_msg
macros to create quicksort implementations
redistribution of data, non-public declarations
xt_msg_direction
PPM_DSO_INTERNAL void xt_redist_msgs_strided_copy(size_t n, const struct Xt_redist_msg *restrict src, size_t src_stride, struct Xt_redist_msg *restrict dst, size_t dst_stride, MPI_Comm comm, bool dt_dup)
PPM_DSO_INTERNAL void xt_redist_msgs_strided_destruct(size_t n, struct Xt_redist_msg *msgs, MPI_Comm comm, size_t ofs_stride)
struct Xt_request_ * Xt_request
Definition xt_request.h:51
#define XT_REQUEST_NULL
Definition xt_request.h:52
MPI_Request * xt_request_msgs_get_req_ptr(Xt_request request)
Xt_request xt_request_msgs_alloc(int n, MPI_Comm comm, Xt_config config)
internal interfaces for xt_request_msgs