1 #ifndef _RHEO_MPI_SCATTER_INIT_H
2 #define _RHEO_MPI_SCATTER_INIT_H
24 #include "rheolef/compiler.h"
25 #include "rheolef/distributed.h"
26 #include "rheolef/scatter_message.h"
28 #include "rheolef/msg_sort_with_permutation.h"
29 #include "rheolef/msg_to_context.h"
30 #include "rheolef/msg_from_context_pattern.h"
31 #include "rheolef/msg_from_context_indices.h"
32 #include "rheolef/msg_local_context.h"
33 #include "rheolef/msg_local_optimize.h"
35 #include "rheolef/msg_util.h"
36 #include <boost/functional.hpp>
37 #include <boost/iterator/transform_iterator.hpp>
64 template <
class Message,
class Size,
class SizeRandomIterator1,
65 class SizeRandomIterator2,
class SizeRandomIterator3,
class Tag>
70 SizeRandomIterator1 idx,
72 SizeRandomIterator2 idy,
74 SizeRandomIterator3 ownership,
88 std::vector<size_type> msg_size(nproc, 0);
89 std::vector<size_type> msg_mark(nproc, 0);
90 std::vector<size_type> owner (nidx);
95 for (; iproc < nproc; iproc++) {
96 if (idx[i] >= ownership[iproc] && idx[i] < ownership[iproc+1]) {
99 if (!msg_mark[iproc]) {
106 check_macro (iproc != nproc,
"bad stash data: idx["<<i<<
"]="<<idx[i]<<
" out of range [0:"<<ownership[nproc]<<
"[");
114 msg_size [my_proc] = 0;
115 msg_mark [my_proc] = 0;
121 std::vector<size_type> work(nproc);
124 msg_mark.begin().operator->(),
126 work.begin().operator->(),
127 std::plus<size_type>());
128 size_type receive_nproc = work [my_proc];
134 msg_size.begin().operator->(),
136 work.begin().operator->(),
137 mpi::maximum<size_type>());
138 size_type receive_max_size = work [my_proc];
142 std::list<std::pair<size_type,mpi::request> > receive_waits;
143 std::vector<size_type> receive_data (receive_nproc*receive_max_size);
144 for (
size_type i_receive = 0; i_receive < receive_nproc; i_receive++) {
145 mpi::request i_req = comm.irecv (
148 receive_data.begin().operator->() + i_receive*receive_max_size,
150 receive_waits.push_back (std::make_pair(i_receive, i_req));
157 std::vector<size_type> send_data (nidx);
158 std::copy (idx, idx+nidx, send_data.begin());
162 std::list<std::pair<size_type,mpi::request> > send_waits;
166 for (
size_type iproc = 0; iproc < nproc; iproc++) {
168 if (i_msg_size == 0)
continue;
169 mpi::request i_req = comm.isend (
172 send_data.begin().operator->() + i_start,
174 send_waits.push_back(std::make_pair(i_send,i_req));
176 i_start += i_msg_size;
185 typedef boost::transform_iterator<select2nd<size_t,mpi::request>, std::list<std::pair<size_t,mpi::request> >::iterator>
187 std::vector<size_type> receive_size (receive_nproc);
188 std::vector<size_type> receive_proc (receive_nproc);
190 while (receive_waits.size() != 0) {
195 std::pair<mpi::status,request_iterator> pair_status = mpi::wait_any (iter_r_waits, last_r_waits);
197 boost::optional<int> i_msg_size_opt = pair_status.first.count<data_type>();
198 check_macro (i_msg_size_opt,
"receive wait failed");
199 int iproc = pair_status.first.source();
200 check_macro (iproc >= 0,
"receive: source iproc = "<<iproc<<
" < 0 !");
202 size_type i_msg_size = (size_t)i_msg_size_opt.get();
203 std::list<std::pair<size_t,mpi::request> >::iterator i_pair_ptr = pair_status.second.base();
204 size_type i_receive = (*i_pair_ptr).first;
205 receive_proc [i_receive] = iproc;
206 receive_size [i_receive] = i_msg_size;
207 receive_total_size += i_msg_size;
208 receive_waits.erase (i_pair_ptr);
213 to.resize (receive_total_size, receive_nproc);
219 std::vector<size_type> perm(receive_nproc);
222 receive_proc.begin().operator->(),
223 perm.begin().operator->(),
232 receive_proc.begin(),
233 receive_size.begin(),
234 receive_data.begin(),
239 to.indices().begin());
243 from.resize(nidy, send_nproc);
247 std::vector<size_type> proc2from_proc(nproc);
251 from.procs().begin(),
252 from.starts().begin(),
253 proc2from_proc.begin());
258 std::vector<size_type> start(send_nproc+1);
259 copy (from.starts().begin(), from.starts().end(), start.begin());
264 proc2from_proc.begin(),
268 from.indices().begin());
274 mpi::wait_all (iter_s_waits, last_s_waits);
279 from.local_slots.resize(n_local);
280 to.local_slots.resize(n_local);
289 to.local_slots.begin(),
290 to.local_slots.end(),
291 from.local_slots.begin());
296 to.local_slots.begin(),
297 to.local_slots.end(),
298 from.local_slots.begin());
300 if (has_opt && n_local != 0) {
301 to.local_is_copy =
true;
302 to.local_copy_start = to.local_slots[0];
303 to.local_copy_length = n_local;
304 from.local_is_copy =
true;
305 from.local_copy_start = from.local_slots[0];
306 from.local_copy_length = n_local;
field::size_type size_type
communicator communicator_type
check_macro(expr1.have_homogeneous_space(Xh1), "dual(expr1,expr2); expr1 should have homogeneous space. HINT: use dual(interpolate(Xh, expr1),expr2)")
This file is part of Rheolef.
void mpi_scatter_init(Size nidx, SizeRandomIterator1 idx, Size nidy, SizeRandomIterator2 idy, Size idy_maxval, SizeRandomIterator3 ownership, Tag tag, const distributor::communicator_type &comm, Message &from, Message &to)
bool msg_local_optimize(InputIterator1 to_loc_idx, InputIterator1 last_to_loc_idx, InputIterator2 from_loc_idy)
void msg_from_context_indices(InputIterator1 owner, InputIterator1 last_owner, InputIterator2 idy, InputRandomIterator proc2from_proc, Proc my_proc, Size idy_maxval, MutableRandomIterator ptr, OutputIterator from_idx)
void msg_from_context_pattern(InputIterator1 msg_size, InputIterator1 last_msg_size, OutputIterator1 from_proc, OutputIterator2 from_ptr, OutputIterator3 proc2from_proc)
void msg_to_context(InputIterator1 perm, InputIterator1 last_perm, InputRandomIterator2 r_iproc, InputRandomIterator3 r_size, InputRandomIterator4 r_idx, Size receive_max_size, Size istart, OutputIterator1 to_proc, OutputIterator2 to_ptr, OutputIterator3 to_idx)
void sort_with_permutation(RandomIterator v, SizeRandomIterator p, Size n)
void msg_local_context(InputIterator1 idx, InputIterator1 last_idx, InputIterator2 idy, Size idy_maxval, Size istart, Size ilast, OutputIterator1 to_loc_idx, OutputIterator1 last_to_loc_idx, OutputIterator2 from_loc_idy)