distributed_property_map.ipp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. // Copyright (C) 2004-2006 The Trustees of Indiana University.
  2. // Use, modification and distribution is subject to the Boost Software
  3. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. // Authors: Douglas Gregor
  6. // Nick Edmonds
  7. // Andrew Lumsdaine
  8. #include <boost/assert.hpp>
  9. #include <boost/property_map/parallel/distributed_property_map.hpp>
  10. #include <boost/property_map/parallel/detail/untracked_pair.hpp>
  11. #include <boost/type_traits/is_base_and_derived.hpp>
  12. #include <boost/property_map/parallel/simple_trigger.hpp>
  13. namespace boost { namespace parallel {
  14. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  15. template<typename Reduce>
  16. PBGL_DISTRIB_PMAP
  17. ::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global,
  18. const StorageMap& pm, const Reduce& reduce)
  19. : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver))
  20. {
  21. typedef handle_message<Reduce> Handler;
  22. data->ghost_cells.reset(new ghost_cells_type());
  23. data->reset = &data_t::template do_reset<Reduce>;
  24. data->process_group.replace_handler(Handler(data, reduce));
  25. data->process_group.template get_receiver<Handler>()
  26. ->setup_triggers(data->process_group);
  27. }
  28. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  29. PBGL_DISTRIB_PMAP::~distributed_property_map() { }
  30. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  31. template<typename Reduce>
  32. void
  33. PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce)
  34. {
  35. typedef handle_message<Reduce> Handler;
  36. data->process_group.replace_handler(Handler(data, reduce));
  37. Handler* handler = data->process_group.template get_receiver<Handler>();
  38. BOOST_ASSERT(handler);
  39. handler->setup_triggers(data->process_group);
  40. data->get_default_value = reduce;
  41. data->has_default_resolver = Reduce::non_default_resolver;
  42. int model = data->model;
  43. data->reset = &data_t::template do_reset<Reduce>;
  44. set_consistency_model(model);
  45. }
  46. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  47. void PBGL_DISTRIB_PMAP::prune_ghost_cells() const
  48. {
  49. if (data->max_ghost_cells == 0)
  50. return;
  51. while (data->ghost_cells->size() > data->max_ghost_cells) {
  52. // Evict the last ghost cell
  53. if (data->model & cm_flush) {
  54. // We need to flush values when we evict them.
  55. boost::parallel::detail::untracked_pair<key_type, value_type> const& victim
  56. = data->ghost_cells->back();
  57. send(data->process_group, get(data->global, victim.first).first,
  58. property_map_put, victim);
  59. }
  60. // Actually remove the ghost cell
  61. data->ghost_cells->pop_back();
  62. }
  63. }
  64. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  65. typename PBGL_DISTRIB_PMAP::value_type&
  66. PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const
  67. {
  68. // Index by key
  69. ghost_cells_key_index_type const& key_index
  70. = data->ghost_cells->template get<1>();
  71. // Search for the ghost cell by key, and project back to the sequence
  72. iterator ghost_cell
  73. = data->ghost_cells->template project<0>(key_index.find(key));
  74. if (ghost_cell == data->ghost_cells->end()) {
  75. value_type value;
  76. if (data->has_default_resolver)
  77. // Since we have a default resolver, use it to create a default
  78. // value for this ghost cell.
  79. value = data->get_default_value(key);
  80. else if (request_if_missing)
  81. // Request the actual value of this key from its owner
  82. send_oob_with_reply(data->process_group, get(data->global, key).first,
  83. property_map_get, key, value);
  84. else
  85. value = value_type();
  86. // Create a ghost cell containing the new value
  87. ghost_cell
  88. = data->ghost_cells->push_front(std::make_pair(key, value)).first;
  89. // If we need to, prune the ghost cells
  90. if (data->max_ghost_cells > 0)
  91. prune_ghost_cells();
  92. } else if (data->max_ghost_cells > 0)
  93. // Put this cell at the beginning of the MRU list
  94. data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell);
  95. return const_cast<value_type&>(ghost_cell->second);
  96. }
  97. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  98. template<typename Reduce>
  99. void
  100. PBGL_DISTRIB_PMAP
  101. ::handle_message<Reduce>::operator()(process_id_type source, int tag)
  102. {
  103. BOOST_ASSERT(false);
  104. }
  105. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  106. template<typename Reduce>
  107. void
  108. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  109. handle_put(int /*source*/, int /*tag*/,
  110. const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context)
  111. {
  112. using boost::get;
  113. shared_ptr<data_t> data(data_ptr);
  114. owner_local_pair p = get(data->global, req.first);
  115. BOOST_ASSERT(p.first == process_id(data->process_group));
  116. detail::maybe_put(data->storage, p.second,
  117. reduce(req.first,
  118. get(data->storage, p.second),
  119. req.second));
  120. }
  121. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  122. template<typename Reduce>
  123. typename PBGL_DISTRIB_PMAP::value_type
  124. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  125. handle_get(int source, int /*tag*/, const key_type& key,
  126. trigger_receive_context)
  127. {
  128. using boost::get;
  129. shared_ptr<data_t> data(data_ptr);
  130. BOOST_ASSERT(data);
  131. owner_local_pair p = get(data->global, key);
  132. return get(data->storage, p.second);
  133. }
  134. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  135. template<typename Reduce>
  136. void
  137. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  138. handle_multiget(int source, int tag, const std::vector<key_type>& keys,
  139. trigger_receive_context)
  140. {
  141. shared_ptr<data_t> data(data_ptr);
  142. BOOST_ASSERT(data);
  143. typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value;
  144. std::vector<key_value> results;
  145. std::size_t n = keys.size();
  146. results.reserve(n);
  147. using boost::get;
  148. for (std::size_t i = 0; i < n; ++i) {
  149. local_key_type local_key = get(data->global, keys[i]).second;
  150. results.push_back(key_value(keys[i], get(data->storage, local_key)));
  151. }
  152. send(data->process_group, source, property_map_multiget_reply, results);
  153. }
  154. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  155. template<typename Reduce>
  156. void
  157. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  158. handle_multiget_reply
  159. (int source, int tag,
  160. const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg,
  161. trigger_receive_context)
  162. {
  163. shared_ptr<data_t> data(data_ptr);
  164. BOOST_ASSERT(data);
  165. // Index by key
  166. ghost_cells_key_index_type const& key_index
  167. = data->ghost_cells->template get<1>();
  168. std::size_t n = msg.size();
  169. for (std::size_t i = 0; i < n; ++i) {
  170. // Search for the ghost cell by key, and project back to the sequence
  171. iterator position
  172. = data->ghost_cells->template project<0>(key_index.find(msg[i].first));
  173. if (position != data->ghost_cells->end())
  174. const_cast<value_type&>(position->second) = msg[i].second;
  175. }
  176. }
  177. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  178. template<typename Reduce>
  179. void
  180. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  181. handle_multiput
  182. (int source, int tag,
  183. const std::vector<unsafe_pair<local_key_type, value_type> >& values,
  184. trigger_receive_context)
  185. {
  186. using boost::get;
  187. shared_ptr<data_t> data(data_ptr);
  188. BOOST_ASSERT(data);
  189. std::size_t n = values.size();
  190. for (std::size_t i = 0; i < n; ++i) {
  191. local_key_type local_key = values[i].first;
  192. value_type local_value = get(data->storage, local_key);
  193. detail::maybe_put(data->storage, values[i].first,
  194. reduce(values[i].first,
  195. local_value,
  196. values[i].second));
  197. }
  198. }
  199. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  200. template<typename Reduce>
  201. void
  202. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  203. setup_triggers(process_group_type& pg)
  204. {
  205. using boost::parallel::simple_trigger;
  206. simple_trigger(pg, property_map_put, this, &handle_message::handle_put);
  207. simple_trigger(pg, property_map_get, this, &handle_message::handle_get);
  208. simple_trigger(pg, property_map_multiget, this,
  209. &handle_message::handle_multiget);
  210. simple_trigger(pg, property_map_multiget_reply, this,
  211. &handle_message::handle_multiget_reply);
  212. simple_trigger(pg, property_map_multiput, this,
  213. &handle_message::handle_multiput);
  214. }
  215. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  216. void
  217. PBGL_DISTRIB_PMAP
  218. ::on_synchronize::operator()()
  219. {
  220. int stage=0; // we only get called at the start now
  221. shared_ptr<data_t> data(data_ptr);
  222. BOOST_ASSERT(data);
  223. // Determine in which stage backward consistency messages should be sent.
  224. int backward_stage = -1;
  225. if (data->model & cm_backward) {
  226. if (data->model & cm_flush) backward_stage = 1;
  227. else backward_stage = 0;
  228. }
  229. // Flush results in first stage
  230. if (stage == 0 && data->model & cm_flush)
  231. data->flush();
  232. // Backward consistency
  233. if (stage == backward_stage && !(data->model & (cm_clear | cm_reset)))
  234. data->refresh_ghost_cells();
  235. // Optionally clear results
  236. if (data->model & cm_clear)
  237. data->clear();
  238. // Optionally reset results
  239. if (data->model & cm_reset) {
  240. if (data->reset) ((*data).*data->reset)();
  241. }
  242. }
  243. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  244. void
  245. PBGL_DISTRIB_PMAP::set_consistency_model(int model)
  246. {
  247. data->model = model;
  248. bool need_on_synchronize = (model != cm_forward);
  249. // Backward consistency is a two-stage process.
  250. if (model & cm_backward) {
  251. // For backward consistency to work, we absolutely cannot throw
  252. // away any ghost cells.
  253. data->max_ghost_cells = 0;
  254. }
  255. // attach the on_synchronize handler.
  256. if (need_on_synchronize)
  257. data->process_group.replace_on_synchronize_handler(on_synchronize(data));
  258. }
  259. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  260. void
  261. PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells)
  262. {
  263. if ((data->model & cm_backward) && max_ghost_cells > 0)
  264. boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: "
  265. "cannot limit ghost-cell usage with a backward "
  266. "consistency model"));
  267. if (max_ghost_cells == 1)
  268. // It is not safe to have only 1 ghost cell; the cell() method
  269. // will fail.
  270. max_ghost_cells = 2;
  271. data->max_ghost_cells = max_ghost_cells;
  272. prune_ghost_cells();
  273. }
  274. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  275. void PBGL_DISTRIB_PMAP::clear()
  276. {
  277. data->clear();
  278. }
  279. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  280. void PBGL_DISTRIB_PMAP::data_t::clear()
  281. {
  282. ghost_cells->clear();
  283. }
  284. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  285. void PBGL_DISTRIB_PMAP::reset()
  286. {
  287. if (data->reset) ((*data).*data->reset)();
  288. }
  289. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  290. void PBGL_DISTRIB_PMAP::flush()
  291. {
  292. data->flush();
  293. }
  294. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  295. void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells()
  296. {
  297. using boost::get;
  298. std::vector<std::vector<key_type> > keys;
  299. keys.resize(num_processes(process_group));
  300. // Collect the set of keys for which we will request values
  301. for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
  302. keys[get(global, i->first).first].push_back(i->first);
  303. // Send multiget requests to each of the other processors
  304. typedef typename ProcessGroup::process_size_type process_size_type;
  305. process_size_type n = num_processes(process_group);
  306. process_id_type id = process_id(process_group);
  307. for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) {
  308. if (!keys[p].empty())
  309. send(process_group, p, property_map_multiget, keys[p]);
  310. }
  311. }
  312. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  313. void PBGL_DISTRIB_PMAP::data_t::flush()
  314. {
  315. using boost::get;
  316. int n = num_processes(process_group);
  317. std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values;
  318. values.resize(n);
  319. // Collect all of the flushed values
  320. for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) {
  321. std::pair<int, local_key_type> g = get(global, i->first);
  322. values[g.first].push_back(std::make_pair(g.second, i->second));
  323. }
  324. // Transmit flushed values
  325. for (int p = 0; p < n; ++p) {
  326. if (!values[p].empty())
  327. send(process_group, p, property_map_multiput, values[p]);
  328. }
  329. }
  330. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  331. void PBGL_DISTRIB_PMAP::do_synchronize()
  332. {
  333. if (data->model & cm_backward) {
  334. synchronize(data->process_group);
  335. return;
  336. }
  337. // Request refreshes of the values of our ghost cells
  338. data->refresh_ghost_cells();
  339. // Allows all of the multigets to get to their destinations
  340. synchronize(data->process_group);
  341. // Allows all of the multiget responses to get to their destinations
  342. synchronize(data->process_group);
  343. }
  344. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  345. template<typename Resolver>
  346. void PBGL_DISTRIB_PMAP::data_t::do_reset()
  347. {
  348. Resolver* resolver = get_default_value.template target<Resolver>();
  349. BOOST_ASSERT(resolver);
  350. for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
  351. const_cast<value_type&>(i->second) = (*resolver)(i->first);
  352. }
  353. } } // end namespace boost::parallel