110 size_t send_size_filled = (size_t)-1;
111 size_t max_num_intersect
112 = (size_t)config->xmdd_bucket_gen->get_intersect_max_num(
113 bucket_gen_state, bucket_type);
115 if (max_num_intersect) {
122 size_t send_buffer_size = 0;
124 =
xmalloc(max_num_intersect *
sizeof(*sends));
126 while ((bucket = config->xmdd_bucket_gen->next(
127 bucket_gen_state, bucket_type)).list) {
135 sends[num_msg].list = isect2send;
136 sends[num_msg].rank = (int)
rank;
148 if (idxlist_sorted != idxlist)
150 for (
size_t rank = send_size_filled+1;
rank < (size_t)comm_size; ++
rank)
153 unsigned char *send_buffer =
xmalloc(send_buffer_size);
155 for (
size_t i = 0; i < num_msg; ++i) {
158 (
int)(send_buffer_size-ofs), &position, comm);
159 send_size[sends[i].rank][
SEND_SIZE] = position;
160 ofs += (size_t)position;
166 result.
buffer = send_buffer;
168 memset(send_size, 0, (
size_t)comm_size *
sizeof (*send_size));
181 size_t tx_num = 0, size_sum = 0;
182 for (
size_t i = 0; i < (size_t)comm_size; ++i)
185 size_sum += (size_t)tx_size;
187 if (counts) counts[tx_num] = sizes[i][
SEND_NUM];
205 bucket_gen_state, bucket_type, idxlist,
206 send_size, comm, comm_size, config);
216typedef int (*
tx_fp)(
void *, int, MPI_Datatype, int, int,
221 unsigned char *
buffer, MPI_Request *requests,
225 for (
size_t i = 0; i <
num_msg; ++i)
229 count, MPI_PACKED, rank, tag, comm, requests + i), comm);
230 ofs += (size_t)count;
237 void *recv_buffer, MPI_Request *requests,
247 void *send_buffer, MPI_Request *requests,
261 size_t num_msg = tx_stat.
num_msg, buf_size = tx_stat.
bytes;
266 for (
size_t i = 0; i < num_msg; ++i)
283#if __GNUC__ >= 11 && __GNUC__ <= 13 && defined MPICH
284#pragma GCC diagnostic push
285#pragma GCC diagnostic ignored "-Wstringop-overread"
286#pragma GCC diagnostic ignored "-Wstringop-overflow"
294 int remote_size,
int comm_size,
297 size_t bgd_size = config->xmdd_bucket_gen->gen_state_size;
298 bgd_size = (bgd_size +
sizeof (
void *) - 1)/
sizeof (
void *) *
sizeof (
void *);
302 = config->xmdd_bucket_gen->init(
303 &bgd, src_idxlist, dst_idxlist, config, comms, NULL,
304 config->xmdd_bucket_gen);
306 =
xmalloc(((
size_t)comm_size + (
size_t)remote_size)
307 * 2 *
sizeof(*send_size_local)),
312 void *send_buffer_local
314 tx_stat_local, recv_size_local,
315 send_size_local, src_idxlist,
316 comms->intra_comm, comm_size, config);
317 void *send_buffer_remote
319 tx_stat_remote, recv_size_remote,
320 send_size_remote, dst_idxlist,
321 comms->inter_comm, remote_size, config);
323 size_t num_req = tx_stat_local[0].
num_msg + tx_stat_remote[0].
num_msg
325 MPI_Request *dir_init_requests
326 =
xmalloc(num_req *
sizeof(*dir_init_requests)
327 + tx_stat_local[0].
bytes + tx_stat_remote[0].
bytes);
328 void *recv_buffer_local = dir_init_requests + num_req,
329 *recv_buffer_remote = ((
unsigned char *)recv_buffer_local
330 + tx_stat_local[0].
bytes);
331 int tag_intra = comms->tag_offset_intra
333 size_t req_ofs = tx_stat_local[0].
num_msg;
336 recv_buffer_local, dir_init_requests,
337 tag_intra, comms->intra_comm);
338 int tag_inter = comms->tag_offset_inter
342 recv_buffer_remote, dir_init_requests + req_ofs,
343 tag_inter, comms->inter_comm);
344 req_ofs += tx_stat_remote[0].
num_msg;
347 send_buffer_local, dir_init_requests + req_ofs,
348 tag_intra, comms->intra_comm);
349 req_ofs += tx_stat_local[1].
num_msg;
352 send_buffer_remote, dir_init_requests + req_ofs,
353 tag_inter, comms->inter_comm);
355 xt_mpi_call(MPI_Waitall((
int)num_req, dir_init_requests,
356 MPI_STATUSES_IGNORE), comms->inter_comm);
357 free(send_buffer_local);
358 free(send_buffer_remote);
362 recv_buffer_local, comms->intra_comm);
366 recv_buffer_remote, comms->inter_comm);
367 free(send_size_local);
368 free(dir_init_requests);
375 const struct isect *restrict src_dst_intersections,
380 size_t total_send_size = 0;
381 for (
int i = 0; i < comm_size; ++i)
382 (
void)(send_size_target[i][
SEND_SIZE] = 0),
383 (
void)(send_size_target[i][
SEND_NUM] = 0);
386 xt_mpi_call(MPI_Pack_size(1, MPI_INT, comm, &rank_pack_size), comm);
388 for (
size_t i = 0; i < num_intersections; ++i)
390 size_t msg_size = (size_t)rank_pack_size
392 size_t target_rank = (size_t)src_dst_intersections[i].rank[target];
394 ++(send_size_target[target_rank][
SEND_NUM]);
395 total_send_size += msg_size;
397 assert(total_send_size <= INT_MAX);
398 return total_send_size;
403 struct
isect *restrict src_dst_intersections,
406 bool isect_idxlist_delete,
MPI_Comm comm,
int comm_size) {
408 size_t total_send_size
410 src_dst_intersections,
412 comm, comm_size, send_size);
414 unsigned char *send_buffer =
xmalloc(total_send_size);
415 qsort(src_dst_intersections, num_intersections,
416 sizeof (src_dst_intersections[0]),
422 target, num_intersections, src_dst_intersections,
423 isect_idxlist_delete,
425 send_buffer, total_send_size, &ofs, comm);
426 return (
struct mmsg_buf){ .num_msg = num_requests,
427 .buffer = send_buffer };
432 void *restrict recv_buffer,
433 int *restrict entry_counts,
436 size_t num_msg = tx_stat.
num_msg;
437 int buf_size = (int)tx_stat.
bytes;
439 size_t num_entries_sent = 0;
440 for (
size_t i = 0; i < num_msg; ++i)
441 num_entries_sent += (
size_t)entry_counts[i];
444 + (
sizeof (
struct Xt_com_list) * num_entries_sent));
447 size_t num_entries = 0;
448 for (
size_t i = 0; i < num_msg; ++i) {
449 size_t num_entries_from_rank = (size_t)entry_counts[i];
450 for (
size_t j = 0; j < num_entries_from_rank; ++j) {
451 xt_mpi_call(MPI_Unpack(recv_buffer, buf_size, &position,
452 &entries[num_entries].
rank,
453 1, MPI_INT, comm), comm);
454 entries[num_entries].list =
459 assert(num_entries == num_entries_sent);
472 int comm_size, remote_size;
473 xt_mpi_call(MPI_Comm_size(comms->inter_comm, &comm_size),
475 xt_mpi_call(MPI_Comm_remote_size(comms->inter_comm, &remote_size),
480 remote_size, comm_size,
485 =
xmalloc(((
size_t)comm_size + (
size_t)remote_size)
486 * 2U *
sizeof(*send_size_local)),
493 struct isect *src_dst_intersections;
494 size_t num_intersections
497 &src_dst_intersections, config);
500 struct mmsg_buf dd_local, dd_remote;
502 =
pack_dist_dirs(num_intersections, src_dst_intersections, send_size_local,
506 =
pack_dist_dirs(num_intersections, src_dst_intersections, send_size_remote,
509 free(src_dst_intersections);
514 comms->intra_comm), comms->intra_comm);
517 comms->inter_comm), comms->inter_comm);
520 int *isect_counts_recv_local
521 =
xmalloc(((
size_t)comm_size + (
size_t)remote_size) *
sizeof (
int)),
522 *isect_counts_recv_remote = isect_counts_recv_local + comm_size;
523 compress_sizes(send_size_local, comm_size, tx_stat_local+1, NULL);
525 isect_counts_recv_local);
526 compress_sizes(send_size_remote, remote_size, tx_stat_remote+1, NULL);
528 isect_counts_recv_remote);
534 assert(num_requests <= INT_MAX);
535 MPI_Request *requests
536 =
xmalloc(num_requests *
sizeof(*requests)
537 + tx_stat_local[0].
bytes + tx_stat_remote[0].
bytes);
538 void *recv_buf_local = requests + num_requests,
539 *recv_buf_remote = (
unsigned char *)recv_buf_local + tx_stat_local[0].
bytes;
540 size_t req_ofs = tx_stat_local[0].
num_msg;
544 recv_buf_local, requests, tag_intra, comms->intra_comm);
548 recv_buf_remote, requests+req_ofs, tag_inter,
550 req_ofs += tx_stat_remote[0].
num_msg;
553 dd_local.
buffer, requests+req_ofs, tag_intra,
555 req_ofs += tx_stat_local[1].
num_msg;
558 dd_remote.
buffer, requests+req_ofs, tag_inter,
560 xt_mpi_call(MPI_Waitall((
int)num_requests, requests, MPI_STATUSES_IGNORE),
564 free(send_size_local);
570 isect_counts_recv_local, comms->intra_comm);
573 isect_counts_recv_remote, comms->inter_comm);
575 free(isect_counts_recv_local);
588 INSTR_DEF(this_instr,
"xt_xmap_dist_dir_intercomm_new")
606 Xt_xmap (*xmap_new)(
int num_src_intersections,
608 int num_dst_intersections,
add versions of standard API functions not returning on error
struct dist_dir_pair dist_dirs
struct Xt_com_list entries[]
struct Xt_config_ xt_default_config
struct Xt_config_ * Xt_config
implementation of configuration object
#define XT_CONFIG_GET_FORCE_NOSORT(config)
#define XT_CONFIG_GET_XMAP_STRIPING(config)
#define XT_CONFIG_BUCKET_DESTROY(config, bucket_gen_state)
struct Xt_xmap_ * Xt_xmap
struct Xt_idxlist_ * Xt_idxlist
Xt_idxlist xt_idxlist_unpack(void *buffer, int buffer_size, int *position, MPI_Comm comm)
Xt_idxlist xt_idxlist_sorted_copy_custom(Xt_idxlist idxlist, Xt_config config)
void xt_idxlist_pack(Xt_idxlist idxlist, void *buffer, int buffer_size, int *position, MPI_Comm comm)
size_t xt_idxlist_get_pack_size(Xt_idxlist idxlist, MPI_Comm comm)
Xt_idxlist xt_idxlist_get_intersection(Xt_idxlist idxlist_src, Xt_idxlist idxlist_dst)
int xt_idxlist_get_sorting(Xt_idxlist idxlist)
void xt_idxlist_delete(Xt_idxlist idxlist)
Provide non-public declarations common to all index lists.
#define xt_idxlist_get_num_indices(idxlist)
MPI_Comm xt_mpi_comm_smart_dup(MPI_Comm comm, int *tag_offset)
void xt_mpi_comm_smart_dedup(MPI_Comm *comm, int tag_offset)
#define xt_mpi_call(call, comm)
@ xt_mpi_tag_xmap_dist_dir_src_send
exchange map declarations
static void exchange_idxlists(struct Xt_com_list **src_intersections, size_t *num_src_intersections, struct Xt_com_list **dst_intersections, size_t *num_dst_intersections, int *stripify, Xt_idxlist src_idxlist_local, Xt_idxlist dst_idxlist_local, MPI_Comm comm, Xt_config config)
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int tag_offset, MPI_Comm comm, int comm_size, Xt_config config)
static struct capbi_result compute_and_pack_bucket_intersections(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, int(*send_size)[SEND_SIZE_ASIZE], void **send_buffer_, MPI_Comm comm, int tag_offset, int comm_size, Xt_config config)
@ Xt_dist_dir_bucket_gen_type_send
@ Xt_dist_dir_bucket_gen_type_recv
Default bucket generator for creation of distributed directories.
int xt_com_list_rank_cmp(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_src_rank(const void *a_, const void *b_)
int xt_xmdd_cmp_isect_dst_rank(const void *a_, const void *b_)
void xt_xmdd_free_dist_dirs(struct dist_dir_pair dist_dirs)
size_t xt_xmap_dist_dir_match_src_dst(const struct dist_dir *src_dist_dir, const struct dist_dir *dst_dist_dir, struct isect **src_dst_intersections, Xt_config config)
void xt_xmap_dist_dir_same_rank_merge(struct dist_dir **dist_dir_results)
size_t xt_xmap_dist_dir_pack_intersections(enum xt_xmdd_direction target, size_t num_intersections, const struct isect *restrict src_dst_intersections, bool isect_idxlist_delete, size_t send_size_asize, size_t send_size_idx, int(*send_size)[send_size_asize], unsigned char *buffer, size_t buf_size, size_t *ofs, MPI_Comm comm)
Utility functions for creation of distributed directories.
static void * create_intersections(void *bucket_gen_state, int bucket_type, struct Xt_xmdd_txstat tx_stat[2], int recv_size[][SEND_SIZE_ASIZE], int send_size[][SEND_SIZE_ASIZE], Xt_idxlist idxlist, MPI_Comm comm, int comm_size, Xt_config config)
static void rank_no_send(size_t rank, int(*restrict send_size)[SEND_SIZE_ASIZE])
static void irecv_intersections(size_t num_msg, const int(*recv_size)[SEND_SIZE_ASIZE], void *recv_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
static struct dist_dir * unpack_dist_dir_results(struct Xt_xmdd_txstat tx_stat, void *restrict recv_buffer, int *restrict entry_counts, MPI_Comm comm)
static void isend_intersections(size_t num_msg, const int(*send_size)[SEND_SIZE_ASIZE], void *send_buffer, MPI_Request *requests, int tag, MPI_Comm comm)
int(* tx_fp)(void *, int, MPI_Datatype, int, int, MPI_Comm, MPI_Request *)
static struct dd_result exchange_idxlists(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, const struct Xt_xmdd_bucket_gen_comms *comms, Xt_config config)
static struct mmsg_buf compute_and_pack_bucket_intersections(void *bucket_gen_state, int bucket_type, Xt_idxlist idxlist, int(*send_size)[SEND_SIZE_ASIZE], MPI_Comm comm, int comm_size, Xt_config config)
static size_t send_size_from_intersections(size_t num_intersections, const struct isect *restrict src_dst_intersections, enum xt_xmdd_direction target, MPI_Comm comm, int comm_size, int(*restrict send_size_target)[SEND_SIZE_ASIZE])
Xt_xmap xt_xmap_dist_dir_intercomm_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm, MPI_Comm intra_comm)
static void tx_intersections(size_t num_msg, const int(*sizes)[SEND_SIZE_ASIZE], unsigned char *buffer, MPI_Request *requests, int tag, MPI_Comm comm, tx_fp tx_op)
Xt_xmap xt_xmap_dist_dir_intercomm_custom_new(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm inter_comm, MPI_Comm intra_comm, Xt_config config)
static struct dist_dir * unpack_dist_dir(struct Xt_xmdd_txstat tx_stat, const int(*sizes)[SEND_SIZE_ASIZE], void *buffer, MPI_Comm comm)
static struct mmsg_buf pack_dist_dirs(size_t num_intersections, struct isect *restrict src_dst_intersections, int(*send_size)[SEND_SIZE_ASIZE], enum xt_xmdd_direction target, bool isect_idxlist_delete, MPI_Comm comm, int comm_size)
static struct dd_result generate_distributed_directories(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, const struct Xt_xmdd_bucket_gen_comms *comms, int remote_size, int comm_size, Xt_config config)
static void compress_sizes(int(*restrict sizes)[SEND_SIZE_ASIZE], int comm_size, struct Xt_xmdd_txstat *tx_stat, int *counts)
Xt_xmap xt_xmap_intersection_ext_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)
Xt_xmap xt_xmap_intersection_new(int num_src_intersections, const struct Xt_com_list src_com[num_src_intersections], int num_dst_intersections, const struct Xt_com_list dst_com[num_dst_intersections], Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm)