72static const char filename[] =
"xt_redist_collection.c";
84 const void *
const *src_data,
85 void *
const *dst_data);
89 const void *
const *src_data,
90 void *
const *dst_data,
95 const void *src_data,
void *dst_data);
99 const void *src_data,
void *dst_data,
112 int *restrict *ranks);
160 int *restrict in_ranks[num_redists],
161 const size_t num_ranks[num_redists],
163 MPI_Datatype *component_dt,
166 size_t rank_pos[num_redists];
167 for (
size_t j = 0; j < num_redists; ++j)
171 for (
size_t i = 0; i < nmsgs; ++i) {
172 int min_rank = INT_MAX;
173 for (
size_t j = 0; j < num_redists; ++j)
174 if (rank_pos[j] < num_ranks[j] && in_ranks[j][rank_pos[j]] < min_rank)
175 min_rank = in_ranks[j][rank_pos[j]];
177 for (
size_t j = 0; j < num_redists; ++j)
178 component_dt[i * num_redists + j] =
179 (rank_pos[j] < num_ranks[j] && in_ranks[j][rank_pos[j]] == min_rank)
183 out_ranks[i] = min_rank;
184 for (
size_t j = 0; j < num_redists; ++j)
186 += (rank_pos[j] < num_ranks[j] && in_ranks[j][rank_pos[j]] == min_rank);
196 unsigned num_redists)
199 size_t num_displ = cache_size * num_redists;
201 =
xmalloc(2 * num_displ *
sizeof (*q));
202 for (
size_t i = 0; i < 2 * num_displ; i += num_redists)
211 for (
size_t i = 0; i < cache_size; ++i)
229 size_t nmsg = nmsg_recv + nmsg_send,
232 ranks_size =
nmsg *
sizeof (
int),
234 own_size = header_size + all_component_dt_size + ranks_size;
247 =
xmalloc(own_size + team_data_size);
250 redist_coll->
nmsg[
SEND] = (unsigned)nmsg_send;
251 redist_coll->
nmsg[
RECV] = (unsigned)nmsg_recv;
260 = (
unsigned char *)redist_coll + own_size;
275 Xt_abort(
comm,
"ERROR: invalid number of redists passed",
276 "xt_redist_collection_custom_new", __LINE__);
278 Xt_abort(
comm,
"ERROR: invalid cache size in xt_redist_collection_new",
282 size_t num_ranks[2][num_redists_];
283 int *restrict ranks[2][num_redists_];
296 redist_coll->
comm = new_comm;
318 const int ranks[num_messages],
319 const MPI_Datatype *component_dt,
328 block_lengths[i] = 1;
329 for (
size_t i = 0; i < num_messages; ++i) {
330 redist_msgs[i].datatype
333 block_lengths, ddt_list,
comm);
334 redist_msgs[i].rank = ranks[i];
343 MPI_Aint base_addr, offset;
344 base_addr = (MPI_Aint)(intptr_t)(
const void *)data[0];
345 displacements[0] = 0;
347 offset = (MPI_Aint)(intptr_t)(
const void *)data[i];
348 displacements[i] = offset - base_addr;
356 const MPI_Aint (*cached_displacements)[2][
num_redists],
360 cached_displacements[i][
SEND][0] == (MPI_Aint)0 &&
361 cached_displacements[i][
RECV][0] == (MPI_Aint)0; ++i) {
362 bool mismatch =
false;
363 for (
size_t d = 0; d < 2; ++d)
365 mismatch |= (displacements[d][j] != cached_displacements[i][d][j]);
366 if (!mismatch)
return i;
379 unsigned nmsg_send = redist_coll->
nmsg[
SEND],
380 nmsg_recv = redist_coll->
nmsg[
RECV];
385 redist_coll->
msg_ranks, all_component_dt,
386 displacements[
SEND], msgs, &ddt_list, comm);
389 all_component_dt + nmsg_send * num_redists,
391 msgs + nmsg_send, &ddt_list, comm);
395 return exchanger_new((
int)nmsg_send, (
int)nmsg_recv,
396 msgs, msgs + nmsg_send, comm, redist_coll->
tag_offset,
403 const void *
const * src_data,
const void *
const * dst_data,
404 unsigned num_redists)
406 MPI_Aint displacements[2][num_redists];
407 unsigned nmsg_send = redist_coll->
nmsg[
SEND],
408 nmsg_recv = redist_coll->
nmsg[
RECV];
422 (
const MPI_Aint (*)[2][num_redists])
423 cache->displacements,
426 if (cache_index != cache_size)
427 return cache->exchangers[cache_index];
429 cache_index = cache->token;
430 cache->token = (cache_index + 1) % cache_size;
431 memcpy(cache->displacements + cache_index * 2 * num_redists,
434 if (cache->exchangers[cache_index] != NULL)
437 size_t nmsg = (size_t)nmsg_send + nmsg_recv;
444 cache->exchangers[cache_index] = exchanger;
456 const void *
const *src_data,
457 void *
const *dst_data)
462 Xt_abort(redist_coll->
comm,
"ERROR: wrong number of arrays in "
463 "redist_collection_s_exchange",
filename, __LINE__);
468 (
const void *
const (*))dst_data,
479 const void *
const *src_data,
void *
const *dst_data,
485 Xt_abort(redist_coll->
comm,
"ERROR: wrong number of arrays in "
486 "redist_collection_a_exchange",
filename, __LINE__);
491 (
const void *
const (*))dst_data,
503 const MPI_Datatype *component_dt_orig,
504 MPI_Datatype *component_dt_copy,
507 for (
size_t i = 0; i < num_component_dt; ++i)
509 MPI_Datatype orig_dt = component_dt_orig[i];
510 if (orig_dt != MPI_DATATYPE_NULL)
513 component_dt_copy[i] = orig_dt;
524 unsigned nmsg_send = redist_coll->
nmsg[
SEND],
525 nmsg_recv = redist_coll->
nmsg[
RECV];
526 size_t nmsg = (size_t)nmsg_recv + nmsg_send;
531 if ((
unsigned char *)team_share > (
unsigned char *)redist_coll
532 && (
unsigned char *)team_share <
543 nmsg_send = redist_coll->
nmsg[
SEND],
544 nmsg_recv = redist_coll->
nmsg[
RECV];
545 size_t nmsg = (size_t)nmsg_recv + nmsg_send;
551 redist_copy->
comm = copy_comm;
555 sizeof (*redist_copy->
msg_ranks) * nmsg);
559 unsigned cache_size = redist_coll->
cache_size;
568 for (
size_t i = 0; i < num_dt; ++i)
569 if (all_component_dt[i] != MPI_DATATYPE_NULL)
586 if ((
unsigned char *)team_share > (
unsigned char *)redist_coll
587 && (
unsigned char *)team_share
602 return (
int)(
xrc(redist)->
nmsg[direction]);
612 Xt_abort(redist_coll->
comm,
"ERROR: datatype retrieval is not"
613 " supported for this xt_redist type (Xt_redist_collection)",
616 return MPI_DATATYPE_NULL;
621 const void *src_data,
void *dst_data)
628 Xt_abort(redist_coll->
comm,
"ERROR: s_exchange1 is not implemented for"
629 " this xt_redist type (Xt_redist_collection)",
filename, __LINE__);
634 const void *src_data,
void *dst_data,
642 Xt_abort(redist_coll->
comm,
"ERROR: a_exchange1 is not implemented for"
643 " this xt_redist type (Xt_redist_collection)",
filename, __LINE__);
649 int *restrict *ranks)
652 unsigned nmsg_direction = redist_coll->
nmsg[direction],
653 nmsg_send = redist_coll->
nmsg[
SEND];
654 size_t nmsg = (size_t)nmsg_direction + redist_coll->
nmsg[!direction];
657 + (((unsigned)direction-1) & nmsg_send);
658 if (nmsg_direction > 0) {
659 int *ranks_ = *ranks;
661 ranks_ = *ranks =
xmalloc(nmsg_direction *
sizeof (*ranks_));
662 memcpy(ranks_, ranks_orig, nmsg_direction *
sizeof (*ranks));
664 return (
int)nmsg_direction;
673 return redist_coll->
comm;
add versions of standard API functions not returning on error
#define xcalloc(nmemb, size)
Xt_exchanger_new exchanger_new
void * exchanger_team_share
const struct xt_redist_vtable * vtable
MPI_Datatype all_component_dt[]
struct exchanger_cache cache
Xt_exchanger * exchangers
Xt_redist(* copy)(Xt_redist)
int MPI_Type_free(MPI_Datatype *datatype)
int MPI_Type_dup(MPI_Datatype oldtype, MPI_Datatype *newtype)
Xt_exchanger_new xt_config_get_exchange_new_by_comm(Xt_config config, MPI_Comm comm)
struct Xt_config_ xt_default_config
implementation of configuration object
void xt_exchanger_s_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data)
void xt_exchanger_a_exchange(Xt_exchanger exchanger, const void *src_data, void *dst_data, Xt_request *request)
void xt_exchanger_delete(Xt_exchanger exchanger)
exchanging of data based on information provided by redist's
PPM_DSO_INTERNAL void xt_exchanger_new_team_share_destroy(Xt_exchanger_new exchanger_new, void *share)
PPM_DSO_INTERNAL void xt_exchanger_new_team_share_default_init(Xt_exchanger_new exchanger_new, void *share)
Xt_exchanger(* Xt_exchanger_new)(int nsend, int nrecv, const struct Xt_redist_msg *send_msgs, const struct Xt_redist_msg *recv_msgs, MPI_Comm comm, int tag_offset, Xt_config config)
PPM_DSO_INTERNAL size_t xt_exchanger_new_team_get_share_size(Xt_exchanger_new exchanger_new)
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
#define xt_mpi_call(call, comm)
void Xt_mpi_ddt_cache_free(struct Xt_mpiddt_list *ddt_list, MPI_Comm comm)
#define Xt_mpiddt_empty_list
MPI_Datatype xt_redist_get_MPI_Datatype(Xt_redist redist, int rank, enum xt_msg_direction direction, bool do_dup)
MPI_Datatype xt_create_compound_datatype(size_t count, const MPI_Aint displacements[count], const MPI_Datatype datatypes[count], const int block_lengths[count], struct Xt_mpiddt_list *ddt_list, MPI_Comm comm)
void xt_redist_check_comms(Xt_redist *redists, int num_redists, MPI_Comm comm)
unsigned xt_redist_agg_msg_count(size_t num_redists, enum xt_msg_direction direction, const Xt_redist redists[num_redists], size_t num_ranks[num_redists], int *restrict ranks[num_redists], Xt_config config)
static void init_cache(struct exchanger_cache *cache, size_t cache_size, unsigned num_redists)
static size_t lookup_cache_index(unsigned num_redists, const MPI_Aint displacements[2][num_redists], const MPI_Aint(*cached_displacements)[2][num_redists], size_t cache_size)
static void copy_component_dt(size_t num_component_dt, const MPI_Datatype *component_dt_orig, MPI_Datatype *component_dt_copy, MPI_Comm comm)
Xt_redist xt_redist_collection_new(Xt_redist *redists, int num_redists, int cache_size, MPI_Comm comm)
static const char filename[]
static Xt_exchanger create_exchanger(struct Xt_redist_collection_ *redist_coll, size_t num_redists, MPI_Aint displacements[2][num_redists], struct Xt_redist_msg *msgs)
static MPI_Comm redist_collection_get_MPI_Comm(Xt_redist redist)
static const struct xt_redist_vtable redist_collection_vtable
struct Xt_redist_collection_ * Xt_redist_collection
static void compute_displ(const void *const *data, unsigned num_redists, MPI_Aint displacements[num_redists])
static Xt_redist_collection alloc_redist_coll(size_t num_redists, size_t nmsg_send, size_t nmsg_recv, Xt_config config, MPI_Comm comm)
static void redist_collection_a_exchange1(Xt_redist redist, const void *src_data, void *dst_data, Xt_request *request)
static Xt_exchanger get_exchanger(struct Xt_redist_collection_ *redist_coll, const void *const *src_data, const void *const *dst_data, unsigned num_redists)
static void redist_collection_a_exchange(Xt_redist redist, int num_src_arrays, const void *const *src_data, void *const *dst_data, Xt_request *request)
static void align_component_dt(unsigned num_redists, unsigned nmsgs, const Xt_redist *redists, int *restrict in_ranks[num_redists], const size_t num_ranks[num_redists], int *out_ranks, MPI_Datatype *component_dt, enum xt_msg_direction direction)
static Xt_redist redist_collection_copy(Xt_redist redist)
static void redist_collection_s_exchange1(Xt_redist redist, const void *src_data, void *dst_data)
static void redist_collection_delete(Xt_redist redist)
static int redist_collection_get_msg_ranks(Xt_redist redist, enum xt_msg_direction direction, int *restrict *ranks)
static void destruct_cache(struct exchanger_cache *cache, size_t cache_size)
static Xt_redist redist_collection_custom_copy(Xt_redist redist, Xt_config config)
static void redist_collection_s_exchange(Xt_redist redist, int num_src_arrays, const void *const *src_data, void *const *dst_data)
@ redist_msg_stack_alloc_lim
static int redist_collection_get_num_msg(Xt_redist redist, enum xt_msg_direction direction)
static void free_component_dt(size_t num_dt, MPI_Datatype *all_component_dt, MPI_Comm comm)
static Xt_redist_collection xrc(void *redist)
Xt_redist xt_redist_collection_custom_new(Xt_redist *redists, int num_redists, int cache_size, MPI_Comm comm, Xt_config config)
static void create_all_dt_for_dir(unsigned num_messages, size_t num_redists, const int ranks[num_messages], const MPI_Datatype *component_dt, const MPI_Aint displacements[num_redists], struct Xt_redist_msg redist_msgs[num_messages], struct Xt_mpiddt_list *ddt_list, MPI_Comm comm)
static MPI_Datatype redist_collection_get_MPI_Datatype(Xt_redist redist, int rank, enum xt_msg_direction direction, bool do_dup)
@ DEFFAULT_DATATYPE_CACHE_SIZE
redistribution of data, non-public declarations