123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805 |
- #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
- #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
- #ifndef BOOST_GRAPH_USE_MPI
- #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
- #endif
- #define SEND_OOB_BSEND
- #include <boost/optional.hpp>
- #include <boost/shared_ptr.hpp>
- #include <boost/weak_ptr.hpp>
- #include <utility>
- #include <memory>
- #include <boost/function/function1.hpp>
- #include <boost/function/function2.hpp>
- #include <boost/function/function0.hpp>
- #include <boost/mpi.hpp>
- #include <boost/property_map/parallel/process_group.hpp>
- #include <boost/serialization/vector.hpp>
- #include <boost/utility/enable_if.hpp>
- namespace boost { namespace graph { namespace distributed {
- struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { };
- class mpi_process_group
- {
- struct impl;
- public:
-
- static const int max_tags = 256;
-
- typedef function<void(int source, int tag)> receiver_type;
-
- typedef function0<void> on_synchronize_event_type;
-
- struct create_empty {};
-
- typedef boost::mpi::packed_oprimitive::buffer_type buffer_type;
-
- typedef int process_id_type;
-
- typedef int process_size_type;
-
- typedef boost::mpi::communicator communicator_type;
-
- struct communication_category
- : virtual boost::parallel::bsp_process_group_tag,
- virtual mpi_process_group_tag { };
-
-
- struct message_header {
-
- process_id_type source;
-
- int tag;
-
- std::size_t offset;
-
- std::size_t bytes;
-
- template <class Archive>
- void serialize(Archive& ar, int)
- {
- ar & source & tag & offset & bytes;
- }
- };
-
- struct outgoing_messages {
- outgoing_messages() {}
- ~outgoing_messages() {}
- std::vector<message_header> headers;
- buffer_type buffer;
-
- template <class Archive>
- void serialize(Archive& ar, int)
- {
- ar & headers & buffer;
- }
-
- void swap(outgoing_messages& x)
- {
- headers.swap(x.headers);
- buffer.swap(x.buffer);
- }
- };
- private:
-
- class trigger_base : boost::noncopyable
- {
- public:
- explicit trigger_base(int tag) : tag_(tag) { }
-
- int tag() const { return tag_; }
- virtual ~trigger_base() { }
-
- virtual void
- receive(mpi_process_group const& pg, int source, int tag,
- trigger_receive_context context, int block=-1) const = 0;
- protected:
-
- int tag_;
- };
-
- template<typename Type, typename Handler>
- class trigger_launcher : public trigger_base
- {
- public:
- explicit trigger_launcher(mpi_process_group& self, int tag,
- const Handler& handler)
- : trigger_base(tag), self(self), handler(handler)
- {}
- void
- receive(mpi_process_group const& pg, int source, int tag,
- trigger_receive_context context, int block=-1) const;
- private:
- mpi_process_group& self;
- mutable Handler handler;
- };
-
- template<typename Type, typename Handler>
- class reply_trigger_launcher : public trigger_base
- {
- public:
- explicit reply_trigger_launcher(mpi_process_group& self, int tag,
- const Handler& handler)
- : trigger_base(tag), self(self), handler(handler)
- {}
- void
- receive(mpi_process_group const& pg, int source, int tag,
- trigger_receive_context context, int block=-1) const;
- private:
- mpi_process_group& self;
- mutable Handler handler;
- };
- template<typename Type, typename Handler>
- class global_trigger_launcher : public trigger_base
- {
- public:
- explicit global_trigger_launcher(mpi_process_group& self, int tag,
- const Handler& handler)
- : trigger_base(tag), handler(handler)
- {
- }
- void
- receive(mpi_process_group const& pg, int source, int tag,
- trigger_receive_context context, int block=-1) const;
- private:
- mutable Handler handler;
-
-
- };
- template<typename Type, typename Handler>
- class global_irecv_trigger_launcher : public trigger_base
- {
- public:
- explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag,
- const Handler& handler, int sz)
- : trigger_base(tag), handler(handler), buffer_size(sz)
- {
- prepare_receive(self,tag);
- }
- void
- receive(mpi_process_group const& pg, int source, int tag,
- trigger_receive_context context, int block=-1) const;
- private:
- void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const;
- Handler handler;
- int buffer_size;
-
-
- };
- public:
-
- mpi_process_group(communicator_type parent_comm = communicator_type());
-
- mpi_process_group( std::size_t num_headers, std::size_t buffer_size,
- communicator_type parent_comm = communicator_type());
-
- mpi_process_group(const mpi_process_group& other,
- const receiver_type& handler,
- bool out_of_band_receive = false);
-
- mpi_process_group(const mpi_process_group& other,
- attach_distributed_object,
- bool out_of_band_receive = false);
-
- explicit mpi_process_group(create_empty) {}
-
- ~mpi_process_group();
-
- void replace_handler(const receiver_type& handler,
- bool out_of_band_receive = false);
-
- void make_distributed_object();
-
- void
- replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0);
-
- int my_block_number() const { return block_num? *block_num : 0; }
-
- int encode_tag(int block_num, int tag) const
- { return block_num * max_tags + tag; }
-
- std::pair<int, int> decode_tag(int encoded_tag) const
- { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); }
-
-
-
-
- int allocate_block(bool out_of_band_receive = false);
-
- bool maybe_emit_receive(int process, int encoded_tag) const;
-
- bool emit_receive(int process, int encoded_tag) const;
-
- void emit_on_synchronize() const;
-
- template<typename Receiver>
- Receiver* get_receiver();
- template<typename T>
- void
- send_impl(int dest, int tag, const T& value,
- mpl::true_ ) const;
- template<typename T>
- void
- send_impl(int dest, int tag, const T& value,
- mpl::false_ ) const;
- template<typename T>
- typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
- array_send_impl(int dest, int tag, const T values[], std::size_t n) const;
- template<typename T>
- bool
- receive_impl(int source, int tag, T& value,
- mpl::true_ ) const;
- template<typename T>
- bool
- receive_impl(int source, int tag, T& value,
- mpl::false_ ) const;
-
- template<typename T>
- typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
- array_receive_impl(int source, int tag, T* values, std::size_t& n) const;
- optional<std::pair<mpi_process_group::process_id_type, int> > probe() const;
- void synchronize() const;
- operator bool() { return bool(impl_); }
- mpi_process_group base() const;
-
- template<typename Type, typename Handler>
- void trigger(int tag, const Handler& handler);
-
- template<typename Type, typename Handler>
- void trigger_with_reply(int tag, const Handler& handler);
- template<typename Type, typename Handler>
- void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0);
-
- optional<std::pair<int, int> >
- poll(bool wait = false, int block = -1, bool synchronizing = false) const;
-
- trigger_receive_context trigger_context() const;
-
- void receive_batch(process_id_type source, outgoing_messages& batch) const;
-
-
-
-
- std::pair<boost::mpi::communicator, int>
- actual_communicator_and_tag(int tag, int block) const;
-
-
- static void set_message_buffer_size(std::size_t s);
-
- static std::size_t message_buffer_size();
- static int old_buffer_size;
- static void* old_buffer;
- private:
- void install_trigger(int tag, int block,
- shared_ptr<trigger_base> const& launcher);
- void poll_requests(int block=-1) const;
-
-
- void maybe_send_batch(process_id_type dest) const;
-
- void send_batch(process_id_type dest, outgoing_messages& batch) const;
- void send_batch(process_id_type dest) const;
- void pack_headers() const;
-
- void process_batch(process_id_type source) const;
- void receive_batch(boost::mpi::status& status) const;
-
-
-
- enum status_messages {
-
- msg_reserved_first = 126,
-
- msg_batch = 126,
-
-
- msg_large_batch = 127,
-
-
- msg_synchronizing = 128,
-
- msg_reserved_last = 128
- };
-
- struct block_type
- {
- block_type() { }
-
- receiver_type on_receive;
-
- on_synchronize_event_type on_synchronize;
-
-
-
- std::vector<shared_ptr<trigger_base> > triggers;
- };
-
- typedef std::vector<block_type*> blocks_type;
-
- typedef blocks_type::iterator block_iterator;
-
- struct deallocate_block;
-
- static std::vector<char> message_buffer;
- public:
-
- shared_ptr<impl> impl_;
-
- shared_ptr<int> block_num;
-
- int rank;
-
- int size;
- };
- inline mpi_process_group::process_id_type
- process_id(const mpi_process_group& pg)
- { return pg.rank; }
- inline mpi_process_group::process_size_type
- num_processes(const mpi_process_group& pg)
- { return pg.size; }
- mpi_process_group::communicator_type communicator(const mpi_process_group& pg);
- template<typename T>
- void
- send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
- int tag, const T& value);
- template<typename InputIterator>
- void
- send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
- int tag, InputIterator first, InputIterator last);
- template<typename T>
- inline void
- send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
- int tag, T* first, T* last)
- { send(pg, dest, tag, first, last - first); }
- template<typename T>
- inline void
- send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
- int tag, const T* first, const T* last)
- { send(pg, dest, tag, first, last - first); }
- template<typename T>
- mpi_process_group::process_id_type
- receive(const mpi_process_group& pg, int tag, T& value);
- template<typename T>
- mpi_process_group::process_id_type
- receive(const mpi_process_group& pg,
- mpi_process_group::process_id_type source, int tag, T& value);
- optional<std::pair<mpi_process_group::process_id_type, int> >
- probe(const mpi_process_group& pg);
- void synchronize(const mpi_process_group& pg);
- template<typename T, typename BinaryOperation>
- T*
- all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
- BinaryOperation bin_op);
- template<typename T, typename BinaryOperation>
- T*
- scan(const mpi_process_group& pg, T* first, T* last, T* out,
- BinaryOperation bin_op);
- template<typename InputIterator, typename T>
- void
- all_gather(const mpi_process_group& pg,
- InputIterator first, InputIterator last, std::vector<T>& out);
- template<typename InputIterator>
- mpi_process_group
- process_subgroup(const mpi_process_group& pg,
- InputIterator first, InputIterator last);
- template<typename T>
- void
- broadcast(const mpi_process_group& pg, T& val,
- mpi_process_group::process_id_type root);
- inline void
- swap(mpi_process_group::outgoing_messages& x,
- mpi_process_group::outgoing_messages& y)
- {
- x.swap(y);
- }
- template<typename T>
- typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
- send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
- int tag, const T& value, int block=-1)
- {
- using boost::mpi::get_mpi_datatype;
-
-
- std::pair<boost::mpi::communicator, int> actual
- = pg.actual_communicator_and_tag(tag, block);
- #ifdef SEND_OOB_BSEND
- if (mpi_process_group::message_buffer_size()) {
- MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
- actual.second, actual.first);
- return;
- }
- #endif
- MPI_Request request;
- MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
- actual.second, actual.first, &request);
-
- int done=0;
- do {
- pg.poll();
- MPI_Test(&request,&done,MPI_STATUS_IGNORE);
- } while (!done);
- }
- template<typename T>
- typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
- send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
- int tag, const T& value, int block=-1)
- {
- using boost::mpi::packed_oarchive;
-
-
- std::pair<boost::mpi::communicator, int> actual
- = pg.actual_communicator_and_tag(tag, block);
-
- packed_oarchive out(actual.first);
- out << value;
- std::size_t size = out.size();
-
- #ifdef SEND_OOB_BSEND
- if (mpi_process_group::message_buffer_size()) {
- MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED,
- dest, actual.second, actual.first);
- return;
- }
- #endif
- MPI_Request request;
- MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED,
- dest, actual.second, actual.first, &request);
- int done=0;
- do {
- pg.poll();
- MPI_Test(&request,&done,MPI_STATUS_IGNORE);
- } while (!done);
- }
- template<typename T>
- typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
- receive_oob(const mpi_process_group& pg,
- mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
- template<typename T>
- typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
- receive_oob(const mpi_process_group& pg,
- mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
- template<typename SendT, typename ReplyT>
- typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
- send_oob_with_reply(const mpi_process_group& pg,
- mpi_process_group::process_id_type dest,
- int tag, const SendT& send_value, ReplyT& reply_value,
- int block = -1);
- template<typename SendT, typename ReplyT>
- typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
- send_oob_with_reply(const mpi_process_group& pg,
- mpi_process_group::process_id_type dest,
- int tag, const SendT& send_value, ReplyT& reply_value,
- int block = -1);
- } } }
- BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header)
- namespace boost { namespace mpi {
- template<>
- struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { };
- } }
- BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable)
- BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never)
- #include <boost/graph/distributed/detail/mpi_process_group.ipp>
- #endif
|