mpi_process_group.ipp 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019
  1. // -*- C++ -*-
  2. // Copyright (C) 2004-2008 The Trustees of Indiana University.
  3. // Copyright (C) 2007 Douglas Gregor <doug.gregor@gmail.com>
  4. // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com>
  5. // Use, modification and distribution is subject to the Boost Software
  6. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  7. // http://www.boost.org/LICENSE_1_0.txt)
  8. // Authors: Douglas Gregor
  9. // Andrew Lumsdaine
  10. // Matthias Troyer
  11. //#define PBGL_PROCESS_GROUP_DEBUG
  12. #ifndef BOOST_GRAPH_USE_MPI
  13. #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
  14. #endif
  15. #include <boost/assert.hpp>
  16. #include <algorithm>
  17. #include <boost/graph/parallel/detail/untracked_pair.hpp>
  18. #include <numeric>
  19. #include <iterator>
  20. #include <functional>
  21. #include <vector>
  22. #include <queue>
  23. #include <stack>
  24. #include <list>
  25. #include <map>
  26. #include <boost/graph/distributed/detail/tag_allocator.hpp>
  27. #include <stdio.h>
  28. // #define PBGL_PROCESS_GROUP_DEBUG
  29. #ifdef PBGL_PROCESS_GROUP_DEBUG
  30. # include <iostream>
  31. #endif
  32. namespace boost { namespace graph { namespace distributed {
  33. struct mpi_process_group::impl
  34. {
  35. typedef mpi_process_group::message_header message_header;
  36. typedef mpi_process_group::outgoing_messages outgoing_messages;
  37. /**
  38. * Stores the incoming messages from a particular processor.
  39. *
  40. * @todo Evaluate whether we should use a deque instance, which
  41. * would reduce could reduce the cost of "receiving" messages and
  42. allow us to deallocate memory earlier, but increases the time
  43. spent in the synchronization step.
  44. */
  45. struct incoming_messages {
  46. incoming_messages();
  47. ~incoming_messages() {}
  48. std::vector<message_header> headers;
  49. buffer_type buffer;
  50. std::vector<std::vector<message_header>::iterator> next_header;
  51. };
  52. struct batch_request {
  53. MPI_Request request;
  54. buffer_type buffer;
  55. };
  56. // send once we have a certain number of messages or bytes in the buffer
  57. // these numbers need to be tuned, we keep them small at first for testing
  58. std::size_t batch_header_number;
  59. std::size_t batch_buffer_size;
  60. std::size_t batch_message_size;
  61. /**
  62. * The actual MPI communicator used to transmit data.
  63. */
  64. boost::mpi::communicator comm;
  65. /**
  66. * The MPI communicator used to transmit out-of-band replies.
  67. */
  68. boost::mpi::communicator oob_reply_comm;
  69. /// Outgoing message information, indexed by destination processor.
  70. std::vector<outgoing_messages> outgoing;
  71. /// Incoming message information, indexed by source processor.
  72. std::vector<incoming_messages> incoming;
  73. /// The numbers of processors that have entered a synchronization stage
  74. std::vector<int> processors_synchronizing_stage;
  75. /// The synchronization stage of a processor
  76. std::vector<int> synchronizing_stage;
  77. /// Number of processors still sending messages
  78. std::vector<int> synchronizing_unfinished;
  79. /// Number of batches sent since last synchronization stage
  80. std::vector<int> number_sent_batches;
  81. /// Number of batches received minus number of expected batches
  82. std::vector<int> number_received_batches;
  83. /// The context of the currently-executing trigger, or @c trc_none
  84. /// if no trigger is executing.
  85. trigger_receive_context trigger_context;
  86. /// Non-zero indicates that we're processing batches
  87. /// Increment this when processing patches,
  88. /// decrement it when you're done.
  89. int processing_batches;
  90. /**
  91. * Contains all of the active blocks corresponding to attached
  92. * distributed data structures.
  93. */
  94. blocks_type blocks;
  95. /// Whether we are currently synchronizing
  96. bool synchronizing;
  97. /// The MPI requests for posted sends of oob messages
  98. std::vector<MPI_Request> requests;
  99. /// The MPI buffers for posted irecvs of oob messages
  100. std::map<int,buffer_type> buffers;
  101. /// Queue for message batches received while already processing messages
  102. std::queue<std::pair<int,outgoing_messages> > new_batches;
  103. /// Maximum encountered size of the new_batches queue
  104. std::size_t max_received;
  105. /// The MPI requests and buffers for batchess being sent
  106. std::list<batch_request> sent_batches;
  107. /// Maximum encountered size of the sent_batches list
  108. std::size_t max_sent;
  109. /// Pre-allocated requests in a pool
  110. std::vector<batch_request> batch_pool;
  111. /// A stack controlling which batches are available
  112. std::stack<std::size_t> free_batches;
  113. void free_sent_batches();
  114. // Tag allocator
  115. detail::tag_allocator allocated_tags;
  116. impl(std::size_t num_headers, std::size_t buffers_size,
  117. communicator_type parent_comm);
  118. ~impl();
  119. private:
  120. void set_batch_size(std::size_t header_num, std::size_t buffer_sz);
  121. };
  122. inline trigger_receive_context mpi_process_group::trigger_context() const
  123. {
  124. return impl_->trigger_context;
  125. }
  126. template<typename T>
  127. void
  128. mpi_process_group::send_impl(int dest, int tag, const T& value,
  129. mpl::true_ /*is_mpi_datatype*/) const
  130. {
  131. BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
  132. impl::outgoing_messages& outgoing = impl_->outgoing[dest];
  133. // Start constructing the message header
  134. impl::message_header header;
  135. header.source = process_id(*this);
  136. header.tag = tag;
  137. header.offset = outgoing.buffer.size();
  138. boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer);
  139. oa << value;
  140. #ifdef PBGL_PROCESS_GROUP_DEBUG
  141. std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
  142. << tag << ", bytes = " << packed_size << std::endl;
  143. #endif
  144. // Store the header
  145. header.bytes = outgoing.buffer.size() - header.offset;
  146. outgoing.headers.push_back(header);
  147. maybe_send_batch(dest);
  148. }
  149. template<typename T>
  150. void
  151. mpi_process_group::send_impl(int dest, int tag, const T& value,
  152. mpl::false_ /*is_mpi_datatype*/) const
  153. {
  154. BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
  155. impl::outgoing_messages& outgoing = impl_->outgoing[dest];
  156. // Start constructing the message header
  157. impl::message_header header;
  158. header.source = process_id(*this);
  159. header.tag = tag;
  160. header.offset = outgoing.buffer.size();
  161. // Serialize into the buffer
  162. boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
  163. out << value;
  164. // Store the header
  165. header.bytes = outgoing.buffer.size() - header.offset;
  166. outgoing.headers.push_back(header);
  167. maybe_send_batch(dest);
  168. #ifdef PBGL_PROCESS_GROUP_DEBUG
  169. std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
  170. << tag << ", bytes = " << header.bytes << std::endl;
  171. #endif
  172. }
  173. template<typename T>
  174. inline void
  175. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  176. int tag, const T& value)
  177. {
  178. pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value,
  179. boost::mpi::is_mpi_datatype<T>());
  180. }
  181. template<typename T>
  182. typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type
  183. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  184. int tag, const T values[], std::size_t n)
  185. {
  186. pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
  187. boost::serialization::make_array(values,n),
  188. boost::mpl::true_());
  189. }
  190. template<typename T>
  191. typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
  192. mpi_process_group::
  193. array_send_impl(int dest, int tag, const T values[], std::size_t n) const
  194. {
  195. BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
  196. impl::outgoing_messages& outgoing = impl_->outgoing[dest];
  197. // Start constructing the message header
  198. impl::message_header header;
  199. header.source = process_id(*this);
  200. header.tag = tag;
  201. header.offset = outgoing.buffer.size();
  202. // Serialize into the buffer
  203. boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
  204. out << n;
  205. for (std::size_t i = 0; i < n; ++i)
  206. out << values[i];
  207. // Store the header
  208. header.bytes = outgoing.buffer.size() - header.offset;
  209. outgoing.headers.push_back(header);
  210. maybe_send_batch(dest);
  211. #ifdef PBGL_PROCESS_GROUP_DEBUG
  212. std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
  213. << tag << ", bytes = " << header.bytes << std::endl;
  214. #endif
  215. }
  216. template<typename T>
  217. typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
  218. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  219. int tag, const T values[], std::size_t n)
  220. {
  221. pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
  222. values, n);
  223. }
  224. template<typename InputIterator>
  225. void
  226. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  227. int tag, InputIterator first, InputIterator last)
  228. {
  229. typedef typename std::iterator_traits<InputIterator>::value_type value_type;
  230. std::vector<value_type> values(first, last);
  231. if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0);
  232. else send(pg, dest, tag, &values[0], values.size());
  233. }
  234. template<typename T>
  235. bool
  236. mpi_process_group::receive_impl(int source, int tag, T& value,
  237. mpl::true_ /*is_mpi_datatype*/) const
  238. {
  239. #ifdef PBGL_PROCESS_GROUP_DEBUG
  240. std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = "
  241. << tag << std::endl;
  242. #endif
  243. impl::incoming_messages& incoming = impl_->incoming[source];
  244. // Find the next header with the right tag
  245. std::vector<impl::message_header>::iterator header =
  246. incoming.next_header[my_block_number()];
  247. while (header != incoming.headers.end() && header->tag != tag) ++header;
  248. // If no header is found, notify the caller
  249. if (header == incoming.headers.end()) return false;
  250. // Unpack the data
  251. if (header->bytes > 0) {
  252. boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer,
  253. archive::no_header, header->offset);
  254. ia >> value;
  255. }
  256. // Mark this message as received
  257. header->tag = -1;
  258. // Move the "next header" indicator to the next unreceived message
  259. while (incoming.next_header[my_block_number()] != incoming.headers.end()
  260. && incoming.next_header[my_block_number()]->tag == -1)
  261. ++incoming.next_header[my_block_number()];
  262. if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
  263. bool finished = true;
  264. for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
  265. if (incoming.next_header[i] != incoming.headers.end()) finished = false;
  266. }
  267. if (finished) {
  268. std::vector<impl::message_header> no_headers;
  269. incoming.headers.swap(no_headers);
  270. buffer_type empty_buffer;
  271. incoming.buffer.swap(empty_buffer);
  272. for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
  273. incoming.next_header[i] = incoming.headers.end();
  274. }
  275. }
  276. return true;
  277. }
  278. template<typename T>
  279. bool
  280. mpi_process_group::receive_impl(int source, int tag, T& value,
  281. mpl::false_ /*is_mpi_datatype*/) const
  282. {
  283. impl::incoming_messages& incoming = impl_->incoming[source];
  284. // Find the next header with the right tag
  285. std::vector<impl::message_header>::iterator header =
  286. incoming.next_header[my_block_number()];
  287. while (header != incoming.headers.end() && header->tag != tag) ++header;
  288. // If no header is found, notify the caller
  289. if (header == incoming.headers.end()) return false;
  290. // Deserialize the data
  291. boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
  292. archive::no_header, header->offset);
  293. in >> value;
  294. // Mark this message as received
  295. header->tag = -1;
  296. // Move the "next header" indicator to the next unreceived message
  297. while (incoming.next_header[my_block_number()] != incoming.headers.end()
  298. && incoming.next_header[my_block_number()]->tag == -1)
  299. ++incoming.next_header[my_block_number()];
  300. if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
  301. bool finished = true;
  302. for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
  303. if (incoming.next_header[i] != incoming.headers.end()) finished = false;
  304. }
  305. if (finished) {
  306. std::vector<impl::message_header> no_headers;
  307. incoming.headers.swap(no_headers);
  308. buffer_type empty_buffer;
  309. incoming.buffer.swap(empty_buffer);
  310. for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
  311. incoming.next_header[i] = incoming.headers.end();
  312. }
  313. }
  314. return true;
  315. }
  316. template<typename T>
  317. typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
  318. mpi_process_group::
  319. array_receive_impl(int source, int tag, T* values, std::size_t& n) const
  320. {
  321. impl::incoming_messages& incoming = impl_->incoming[source];
  322. // Find the next header with the right tag
  323. std::vector<impl::message_header>::iterator header =
  324. incoming.next_header[my_block_number()];
  325. while (header != incoming.headers.end() && header->tag != tag) ++header;
  326. // If no header is found, notify the caller
  327. if (header == incoming.headers.end()) return false;
  328. // Deserialize the data
  329. boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
  330. archive::no_header, header->offset);
  331. std::size_t num_sent;
  332. in >> num_sent;
  333. if (num_sent > n)
  334. std::cerr << "ERROR: Have " << num_sent << " items but only space for "
  335. << n << " items\n";
  336. for (std::size_t i = 0; i < num_sent; ++i)
  337. in >> values[i];
  338. n = num_sent;
  339. // Mark this message as received
  340. header->tag = -1;
  341. // Move the "next header" indicator to the next unreceived message
  342. while (incoming.next_header[my_block_number()] != incoming.headers.end()
  343. && incoming.next_header[my_block_number()]->tag == -1)
  344. ++incoming.next_header[my_block_number()];
  345. if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
  346. bool finished = true;
  347. for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
  348. if (incoming.next_header[i] != incoming.headers.end()) finished = false;
  349. }
  350. if (finished) {
  351. std::vector<impl::message_header> no_headers;
  352. incoming.headers.swap(no_headers);
  353. buffer_type empty_buffer;
  354. incoming.buffer.swap(empty_buffer);
  355. for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
  356. incoming.next_header[i] = incoming.headers.end();
  357. }
  358. }
  359. return true;
  360. }
  361. // Construct triggers
  362. template<typename Type, typename Handler>
  363. void mpi_process_group::trigger(int tag, const Handler& handler)
  364. {
  365. BOOST_ASSERT(block_num);
  366. install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
  367. new trigger_launcher<Type, Handler>(*this, tag, handler)));
  368. }
  369. template<typename Type, typename Handler>
  370. void mpi_process_group::trigger_with_reply(int tag, const Handler& handler)
  371. {
  372. BOOST_ASSERT(block_num);
  373. install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
  374. new reply_trigger_launcher<Type, Handler>(*this, tag, handler)));
  375. }
  376. template<typename Type, typename Handler>
  377. void mpi_process_group::global_trigger(int tag, const Handler& handler,
  378. std::size_t sz)
  379. {
  380. if (sz==0) // normal trigger
  381. install_trigger(tag,0,shared_ptr<trigger_base>(
  382. new global_trigger_launcher<Type, Handler>(*this, tag, handler)));
  383. else // trigger with irecv
  384. install_trigger(tag,0,shared_ptr<trigger_base>(
  385. new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz)));
  386. }
  387. namespace detail {
  388. template<typename Type>
  389. void do_oob_receive(mpi_process_group const& self,
  390. int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/)
  391. {
  392. using boost::mpi::get_mpi_datatype;
  393. //self.impl_->comm.recv(source,tag,data);
  394. MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm,
  395. MPI_STATUS_IGNORE);
  396. }
  397. template<typename Type>
  398. void do_oob_receive(mpi_process_group const& self,
  399. int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/)
  400. {
  401. // self.impl_->comm.recv(source,tag,data);
  402. // Receive the size of the data packet
  403. boost::mpi::status status;
  404. status = self.impl_->comm.probe(source, tag);
  405. #if BOOST_VERSION >= 103600
  406. int size = status.count<boost::mpi::packed>().get();
  407. #else
  408. int size;
  409. MPI_Status& mpi_status = status;
  410. MPI_Get_count(&mpi_status, MPI_PACKED, &size);
  411. #endif
  412. // Receive the data packed itself
  413. boost::mpi::packed_iarchive in(self.impl_->comm);
  414. in.resize(size);
  415. MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm,
  416. MPI_STATUS_IGNORE);
  417. // Deserialize the data
  418. in >> data;
  419. }
  420. template<typename Type>
  421. void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data)
  422. {
  423. do_oob_receive(self, source, tag, data,
  424. boost::mpi::is_mpi_datatype<Type>());
  425. }
  426. } // namespace detail
  427. template<typename Type, typename Handler>
  428. void
  429. mpi_process_group::trigger_launcher<Type, Handler>::
  430. receive(mpi_process_group const&, int source, int tag,
  431. trigger_receive_context context, int block) const
  432. {
  433. #ifdef PBGL_PROCESS_GROUP_DEBUG
  434. std::cerr << (out_of_band? "OOB trigger" : "Trigger")
  435. << " receive from source " << source << " and tag " << tag
  436. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  437. #endif
  438. Type data;
  439. if (context == trc_out_of_band) {
  440. // Receive the message directly off the wire
  441. int realtag = self.encode_tag(
  442. block == -1 ? self.my_block_number() : block, tag);
  443. detail::do_oob_receive(self,source,realtag,data);
  444. }
  445. else
  446. // Receive the message out of the local buffer
  447. boost::graph::distributed::receive(self, source, tag, data);
  448. // Pass the message off to the handler
  449. handler(source, tag, data, context);
  450. }
  451. template<typename Type, typename Handler>
  452. void
  453. mpi_process_group::reply_trigger_launcher<Type, Handler>::
  454. receive(mpi_process_group const&, int source, int tag,
  455. trigger_receive_context context, int block) const
  456. {
  457. #ifdef PBGL_PROCESS_GROUP_DEBUG
  458. std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger")
  459. << " receive from source " << source << " and tag " << tag
  460. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  461. #endif
  462. BOOST_ASSERT(context == trc_out_of_band);
  463. boost::parallel::detail::untracked_pair<int, Type> data;
  464. // Receive the message directly off the wire
  465. int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block,
  466. tag);
  467. detail::do_oob_receive(self, source, realtag, data);
  468. // Pass the message off to the handler and send the result back to
  469. // the source.
  470. send_oob(self, source, data.first,
  471. handler(source, tag, data.second, context), -2);
  472. }
  473. template<typename Type, typename Handler>
  474. void
  475. mpi_process_group::global_trigger_launcher<Type, Handler>::
  476. receive(mpi_process_group const& self, int source, int tag,
  477. trigger_receive_context context, int block) const
  478. {
  479. #ifdef PBGL_PROCESS_GROUP_DEBUG
  480. std::cerr << (out_of_band? "OOB trigger" : "Trigger")
  481. << " receive from source " << source << " and tag " << tag
  482. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  483. #endif
  484. Type data;
  485. if (context == trc_out_of_band) {
  486. // Receive the message directly off the wire
  487. int realtag = self.encode_tag(
  488. block == -1 ? self.my_block_number() : block, tag);
  489. detail::do_oob_receive(self,source,realtag,data);
  490. }
  491. else
  492. // Receive the message out of the local buffer
  493. boost::graph::distributed::receive(self, source, tag, data);
  494. // Pass the message off to the handler
  495. handler(self, source, tag, data, context);
  496. }
  497. template<typename Type, typename Handler>
  498. void
  499. mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
  500. receive(mpi_process_group const& self, int source, int tag,
  501. trigger_receive_context context, int block) const
  502. {
  503. #ifdef PBGL_PROCESS_GROUP_DEBUG
  504. std::cerr << (out_of_band? "OOB trigger" : "Trigger")
  505. << " receive from source " << source << " and tag " << tag
  506. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  507. #endif
  508. Type data;
  509. if (context == trc_out_of_band) {
  510. return;
  511. }
  512. BOOST_ASSERT (context == trc_irecv_out_of_band);
  513. // force posting of new MPI_Irecv, even though buffer is already allocated
  514. boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]);
  515. ia >> data;
  516. // Start a new receive
  517. prepare_receive(self,tag,true);
  518. // Pass the message off to the handler
  519. handler(self, source, tag, data, context);
  520. }
  521. template<typename Type, typename Handler>
  522. void
  523. mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
  524. prepare_receive(mpi_process_group const& self, int tag, bool force) const
  525. {
  526. #ifdef PBGL_PROCESS_GROUP_DEBUG
  527. std::cerr << ("Posting Irecv for trigger")
  528. << " receive with tag " << tag << std::endl;
  529. #endif
  530. if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) {
  531. self.impl_->buffers[tag].resize(buffer_size);
  532. force = true;
  533. }
  534. BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size);
  535. //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >);
  536. if (force) {
  537. self.impl_->requests.push_back(MPI_Request());
  538. MPI_Request* request = &self.impl_->requests.back();
  539. MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size,
  540. MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request);
  541. }
  542. }
  543. template<typename T>
  544. inline mpi_process_group::process_id_type
  545. receive(const mpi_process_group& pg, int tag, T& value)
  546. {
  547. for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
  548. if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  549. value, boost::mpi::is_mpi_datatype<T>()))
  550. return source;
  551. }
  552. BOOST_ASSERT (false);
  553. }
  554. template<typename T>
  555. typename
  556. enable_if<boost::mpi::is_mpi_datatype<T>,
  557. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  558. receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
  559. {
  560. for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
  561. bool result =
  562. pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  563. boost::serialization::make_array(values,n),
  564. boost::mpl::true_());
  565. if (result)
  566. return std::make_pair(source, n);
  567. }
  568. BOOST_ASSERT(false);
  569. }
  570. template<typename T>
  571. typename
  572. disable_if<boost::mpi::is_mpi_datatype<T>,
  573. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  574. receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
  575. {
  576. for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
  577. if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  578. values, n))
  579. return std::make_pair(source, n);
  580. }
  581. BOOST_ASSERT(false);
  582. }
  583. template<typename T>
  584. mpi_process_group::process_id_type
  585. receive(const mpi_process_group& pg,
  586. mpi_process_group::process_id_type source, int tag, T& value)
  587. {
  588. if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  589. value, boost::mpi::is_mpi_datatype<T>()))
  590. return source;
  591. else {
  592. fprintf(stderr,
  593. "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
  594. process_id(pg), source, tag, pg.my_block_number());
  595. BOOST_ASSERT(false);
  596. abort();
  597. }
  598. }
  599. template<typename T>
  600. typename
  601. enable_if<boost::mpi::is_mpi_datatype<T>,
  602. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  603. receive(const mpi_process_group& pg, int source, int tag, T values[],
  604. std::size_t n)
  605. {
  606. if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  607. boost::serialization::make_array(values,n),
  608. boost::mpl::true_()))
  609. return std::make_pair(source,n);
  610. else {
  611. fprintf(stderr,
  612. "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
  613. process_id(pg), source, tag, pg.my_block_number());
  614. BOOST_ASSERT(false);
  615. abort();
  616. }
  617. }
  618. template<typename T>
  619. typename
  620. disable_if<boost::mpi::is_mpi_datatype<T>,
  621. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  622. receive(const mpi_process_group& pg, int source, int tag, T values[],
  623. std::size_t n)
  624. {
  625. pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  626. values, n);
  627. return std::make_pair(source, n);
  628. }
  629. template<typename T, typename BinaryOperation>
  630. T*
  631. all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
  632. BinaryOperation bin_op)
  633. {
  634. synchronize(pg);
  635. bool inplace = first == out;
  636. if (inplace) out = new T [last-first];
  637. boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg),
  638. boost::mpi::comm_attach),
  639. first, last-first, out, bin_op);
  640. if (inplace) {
  641. std::copy(out, out + (last-first), first);
  642. delete [] out;
  643. return last;
  644. }
  645. return out;
  646. }
  647. template<typename T>
  648. void
  649. broadcast(const mpi_process_group& pg, T& val,
  650. mpi_process_group::process_id_type root)
  651. {
  652. // broadcast the seed
  653. boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach);
  654. boost::mpi::broadcast(comm,val,root);
  655. }
  656. template<typename T, typename BinaryOperation>
  657. T*
  658. scan(const mpi_process_group& pg, T* first, T* last, T* out,
  659. BinaryOperation bin_op)
  660. {
  661. synchronize(pg);
  662. bool inplace = first == out;
  663. if (inplace) out = new T [last-first];
  664. boost::mpi::scan(communicator(pg), first, last-first, out, bin_op);
  665. if (inplace) {
  666. std::copy(out, out + (last-first), first);
  667. delete [] out;
  668. return last;
  669. }
  670. return out;
  671. }
  672. template<typename InputIterator, typename T>
  673. void
  674. all_gather(const mpi_process_group& pg, InputIterator first,
  675. InputIterator last, std::vector<T>& out)
  676. {
  677. synchronize(pg);
  678. // Stick a copy of the local values into a vector, so we can broadcast it
  679. std::vector<T> local_values(first, last);
  680. // Collect the number of vertices stored in each process
  681. int size = local_values.size();
  682. std::vector<int> sizes(num_processes(pg));
  683. int result = MPI_Allgather(&size, 1, MPI_INT,
  684. &sizes[0], 1, MPI_INT,
  685. communicator(pg));
  686. BOOST_ASSERT(result == MPI_SUCCESS);
  687. (void)result;
  688. // Adjust sizes based on the number of bytes
  689. //
  690. // std::transform(sizes.begin(), sizes.end(), sizes.begin(),
  691. // std::bind2nd(std::multiplies<int>(), sizeof(T)));
  692. //
  693. // std::bind2nd has been removed from C++17
  694. for( std::size_t i = 0, n = sizes.size(); i < n; ++i )
  695. {
  696. sizes[ i ] *= sizeof( T );
  697. }
  698. // Compute displacements
  699. std::vector<int> displacements;
  700. displacements.reserve(sizes.size() + 1);
  701. displacements.push_back(0);
  702. std::partial_sum(sizes.begin(), sizes.end(),
  703. std::back_inserter(displacements));
  704. // Gather all of the values
  705. out.resize(displacements.back() / sizeof(T));
  706. if (!out.empty()) {
  707. result = MPI_Allgatherv(local_values.empty()? (void*)&local_values
  708. /* local results */: (void*)&local_values[0],
  709. local_values.size() * sizeof(T),
  710. MPI_BYTE,
  711. &out[0], &sizes[0], &displacements[0], MPI_BYTE,
  712. communicator(pg));
  713. }
  714. BOOST_ASSERT(result == MPI_SUCCESS);
  715. }
  716. template<typename InputIterator>
  717. mpi_process_group
  718. process_subgroup(const mpi_process_group& pg,
  719. InputIterator first, InputIterator last)
  720. {
  721. /*
  722. boost::mpi::group current_group = communicator(pg).group();
  723. boost::mpi::group new_group = current_group.include(first,last);
  724. boost::mpi::communicator new_comm(communicator(pg),new_group);
  725. return mpi_process_group(new_comm);
  726. */
  727. std::vector<int> ranks(first, last);
  728. MPI_Group current_group;
  729. int result = MPI_Comm_group(communicator(pg), &current_group);
  730. BOOST_ASSERT(result == MPI_SUCCESS);
  731. (void)result;
  732. MPI_Group new_group;
  733. result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group);
  734. BOOST_ASSERT(result == MPI_SUCCESS);
  735. MPI_Comm new_comm;
  736. result = MPI_Comm_create(communicator(pg), new_group, &new_comm);
  737. BOOST_ASSERT(result == MPI_SUCCESS);
  738. result = MPI_Group_free(&new_group);
  739. BOOST_ASSERT(result == MPI_SUCCESS);
  740. result = MPI_Group_free(&current_group);
  741. BOOST_ASSERT(result == MPI_SUCCESS);
  742. if (new_comm != MPI_COMM_NULL) {
  743. mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach));
  744. result = MPI_Comm_free(&new_comm);
  745. BOOST_ASSERT(result == 0);
  746. return result_pg;
  747. } else {
  748. return mpi_process_group(mpi_process_group::create_empty());
  749. }
  750. }
  751. template<typename Receiver>
  752. Receiver* mpi_process_group::get_receiver()
  753. {
  754. return impl_->blocks[my_block_number()]->on_receive
  755. .template target<Receiver>();
  756. }
  757. template<typename T>
  758. typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
  759. receive_oob(const mpi_process_group& pg,
  760. mpi_process_group::process_id_type source, int tag, T& value, int block)
  761. {
  762. using boost::mpi::get_mpi_datatype;
  763. // Determine the actual message we expect to receive, and which
  764. // communicator it will come by.
  765. std::pair<boost::mpi::communicator, int> actual
  766. = pg.actual_communicator_and_tag(tag, block);
  767. // Post a non-blocking receive that waits until we complete this request.
  768. MPI_Request request;
  769. MPI_Irecv(&value, 1, get_mpi_datatype<T>(value),
  770. source, actual.second, actual.first, &request);
  771. int done = 0;
  772. do {
  773. MPI_Test(&request, &done, MPI_STATUS_IGNORE);
  774. if (!done)
  775. pg.poll(/*wait=*/false, block);
  776. } while (!done);
  777. }
  778. template<typename T>
  779. typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
  780. receive_oob(const mpi_process_group& pg,
  781. mpi_process_group::process_id_type source, int tag, T& value, int block)
  782. {
  783. // Determine the actual message we expect to receive, and which
  784. // communicator it will come by.
  785. std::pair<boost::mpi::communicator, int> actual
  786. = pg.actual_communicator_and_tag(tag, block);
  787. boost::optional<boost::mpi::status> status;
  788. do {
  789. status = actual.first.iprobe(source, actual.second);
  790. if (!status)
  791. pg.poll();
  792. } while (!status);
  793. //actual.first.recv(status->source(), status->tag(),value);
  794. // Allocate the receive buffer
  795. boost::mpi::packed_iarchive in(actual.first);
  796. #if BOOST_VERSION >= 103600
  797. in.resize(status->count<boost::mpi::packed>().get());
  798. #else
  799. int size;
  800. MPI_Status mpi_status = *status;
  801. MPI_Get_count(&mpi_status, MPI_PACKED, &size);
  802. in.resize(size);
  803. #endif
  804. // Receive the message data
  805. MPI_Recv(in.address(), in.size(), MPI_PACKED,
  806. status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE);
  807. // Unpack the message data
  808. in >> value;
  809. }
  810. template<typename SendT, typename ReplyT>
  811. typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
  812. send_oob_with_reply(const mpi_process_group& pg,
  813. mpi_process_group::process_id_type dest,
  814. int tag, const SendT& send_value, ReplyT& reply_value,
  815. int block)
  816. {
  817. detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
  818. send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair(
  819. (int)reply_tag, send_value), block);
  820. receive_oob(pg, dest, reply_tag, reply_value);
  821. }
  822. template<typename SendT, typename ReplyT>
  823. typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
  824. send_oob_with_reply(const mpi_process_group& pg,
  825. mpi_process_group::process_id_type dest,
  826. int tag, const SendT& send_value, ReplyT& reply_value,
  827. int block)
  828. {
  829. detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
  830. send_oob(pg, dest, tag,
  831. boost::parallel::detail::make_untracked_pair((int)reply_tag,
  832. send_value), block);
  833. receive_oob(pg, dest, reply_tag, reply_value);
  834. }
  835. } } } // end namespace boost::graph::distributed