123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- /*
- * Copyright (c) 2017-2023 zhllxt
- *
- * author : zhllxt
- * email : 37792738@qq.com
- *
- * Distributed under the Boost Software License, Version 1.0. (See accompanying
- * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- */
- #ifndef __ASIO2_POST_COMPONENT_HPP__
- #define __ASIO2_POST_COMPONENT_HPP__
- #if defined(_MSC_VER) && (_MSC_VER >= 1200)
- #pragma once
- #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
- #include <set>
- #include <future>
- #include <functional>
- #include <asio2/base/iopool.hpp>
- #include <asio2/base/detail/allocator.hpp>
- namespace asio2::detail
- {
- template<class derived_t, class args_t = void>
- class post_cp
- {
- public:
- /**
- * @brief constructor
- */
- post_cp() {}
- /**
- * @brief destructor
- */
- ~post_cp() = default;
- public:
- /**
- * @brief Submits a completion token or function object for execution.
- * This function submits an object for execution using the object's associated
- * executor. The function object is queued for execution, and is never called
- * from the current thread prior to returning from <tt>post()</tt>.
- */
- template<typename Function>
- inline derived_t & post(Function&& fn)
- {
- derived_t& derive = static_cast<derived_t&>(*this);
- // if use call post, but the user callback "fn" has't hold the session_ptr,
- // it maybe cause crash, so we need hold the session_ptr again at here.
- // if the session_ptr is already destroyed, the selfptr() will cause crash.
- asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
- [p = derive.selfptr(), fn = std::forward<Function>(fn)]() mutable
- {
- detail::ignore_unused(p);
- fn();
- }));
- return derive;
- }
- /**
- * @brief Submits a completion token or function object for execution after specified delay time.
- * This function submits an object for execution using the object's associated
- * executor. The function object is queued for execution, and is never called
- * from the current thread prior to returning from <tt>post()</tt>.
- */
- template<typename Function, typename Rep, typename Period>
- inline derived_t & post(Function&& fn, std::chrono::duration<Rep, Period> delay)
- {
- derived_t& derive = static_cast<derived_t&>(*this);
- // note : need test.
- // has't check where the server or client is stopped, if the server is stopping, but the
- // iopool's wait_for_io_context_stopped() has't compelete and just at sleep, then user
- // call post but don't call stop_all_timed_tasks, it maybe cause the
- // wait_for_io_context_stopped() can't compelete forever,and the server.stop or client.stop
- // never compeleted.
- if (delay > std::chrono::duration_cast<
- std::chrono::duration<Rep, Period>>((asio::steady_timer::duration::max)()))
- delay = std::chrono::duration_cast<std::chrono::duration<Rep, Period>>(
- (asio::steady_timer::duration::max)());
- asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
- [this, &derive, p = derive.selfptr(), fn = std::forward<Function>(fn), delay]() mutable
- {
- std::unique_ptr<asio::steady_timer> timer = std::make_unique<
- asio::steady_timer>(derive.io_->context());
- this->timed_tasks_.emplace(timer.get());
- derive.io_->timers().emplace(timer.get());
- timer->expires_after(delay);
- timer->async_wait(
- [this, &derive, p = std::move(p), timer = std::move(timer), fn = std::move(fn)]
- (const error_code& ec) mutable
- {
- ASIO2_ASSERT((!ec) || ec == asio::error::operation_aborted);
- derive.io_->timers().erase(timer.get());
- detail::ignore_unused(p);
- set_last_error(ec);
- #if defined(ASIO2_ENABLE_TIMER_CALLBACK_WHEN_ERROR)
- fn();
- #else
- if (!ec)
- {
- fn();
- }
- #endif
- this->timed_tasks_.erase(timer.get());
- });
- }));
- return derive;
- }
- /**
- * @brief Submits a completion token or function object for execution.
- * This function submits an object for execution using the object's associated
- * executor. The function object is queued for execution, and is never called
- * from the current thread prior to returning from <tt>post()</tt>.
- * note : Never call future's waiting function(eg:wait,get) in a communication(eg:on_recv)
- * thread, it will cause dead lock;
- */
- template<typename Function, typename Allocator>
- inline auto post(Function&& fn, asio::use_future_t<Allocator>) ->
- std::future<std::invoke_result_t<Function>>
- {
- using return_type = std::invoke_result_t<Function>;
- derived_t& derive = static_cast<derived_t&>(*this);
- std::packaged_task<return_type()> task(std::forward<Function>(fn));
- std::future<return_type> future = task.get_future();
- asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
- [p = derive.selfptr(), t = std::move(task)]() mutable
- {
- detail::ignore_unused(p);
- t();
- }));
- return future;
- }
- /**
- * @brief Submits a completion token or function object for execution after specified delay time.
- * This function submits an object for execution using the object's associated
- * executor. The function object is queued for execution, and is never called
- * from the current thread prior to returning from <tt>post()</tt>.
- * note : Never call future's waiting function(eg:wait,get) in a communication(eg:on_recv)
- * thread, it will cause dead lock;
- */
- template<typename Function, typename Rep, typename Period, typename Allocator>
- inline auto post(Function&& fn, std::chrono::duration<Rep, Period> delay, asio::use_future_t<Allocator>)
- -> std::future<std::invoke_result_t<Function>>
- {
- using return_type = std::invoke_result_t<Function>;
- derived_t& derive = static_cast<derived_t&>(*this);
- if (delay > std::chrono::duration_cast<
- std::chrono::duration<Rep, Period>>((asio::steady_timer::duration::max)()))
- delay = std::chrono::duration_cast<std::chrono::duration<Rep, Period>>(
- (asio::steady_timer::duration::max)());
- std::packaged_task<return_type()> task(std::forward<Function>(fn));
- std::future<return_type> future = task.get_future();
- asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
- [this, &derive, p = derive.selfptr(), t = std::move(task), delay]() mutable
- {
- std::unique_ptr<asio::steady_timer> timer = std::make_unique<
- asio::steady_timer>(derive.io_->context());
- this->timed_tasks_.emplace(timer.get());
- derive.io_->timers().emplace(timer.get());
- timer->expires_after(delay);
- timer->async_wait(
- [this, &derive, p = std::move(p), timer = std::move(timer), t = std::move(t)]
- (const error_code& ec) mutable
- {
- ASIO2_ASSERT((!ec) || ec == asio::error::operation_aborted);
- derive.io_->timers().erase(timer.get());
- detail::ignore_unused(p);
- set_last_error(ec);
- #if defined(ASIO2_ENABLE_TIMER_CALLBACK_WHEN_ERROR)
- t();
- #else
- if (!ec)
- {
- t();
- }
- #endif
- this->timed_tasks_.erase(timer.get());
- });
- }));
- return future;
- }
- /**
- * @brief Submits a completion token or function object for execution.
- * This function submits an object for execution using the object's associated
- * executor. The function object is queued for execution. if current thread is
- * the io_context's thread, the function will be executed immediately, otherwise
- * the task will be asynchronously post to io_context to execute.
- */
- template<typename Function>
- inline derived_t & dispatch(Function&& fn)
- {
- derived_t& derive = static_cast<derived_t&>(*this);
- asio::dispatch(derive.io_->context(), make_allocator(derive.wallocator(),
- [p = derive.selfptr(), fn = std::forward<Function>(fn)]() mutable
- {
- detail::ignore_unused(p);
- fn();
- }));
- return derive;
- }
- /**
- * @brief Submits a completion token or function object for execution.
- * This function submits an object for execution using the object's associated
- * executor. The function object is queued for execution. if current thread is
- * the io_context's thread, the function will be executed immediately, otherwise
- * the task will be asynchronously post to io_context to execute.
- * note : Never call future's waiting function(eg:wait,get) in a communication(eg:on_recv)
- * thread, it will cause dead lock;
- */
- template<typename Function, typename Allocator>
- inline auto dispatch(Function&& fn, asio::use_future_t<Allocator>) ->
- std::future<std::invoke_result_t<Function>>
- {
- using return_type = std::invoke_result_t<Function>;
- derived_t& derive = static_cast<derived_t&>(*this);
- std::packaged_task<return_type()> task(std::forward<Function>(fn));
- std::future<return_type> future = task.get_future();
- // Make sure we run on the io_context thread
- asio::dispatch(derive.io_->context(), make_allocator(derive.wallocator(),
- [p = derive.selfptr(), t = std::move(task)]() mutable
- {
- detail::ignore_unused(p);
- t();
- }));
- return future;
- }
- /**
- * @brief Stop all timed events which you posted with a delay duration.
- */
- inline derived_t& stop_all_timed_events()
- {
- derived_t& derive = static_cast<derived_t&>(*this);
- asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
- [this, p = derive.selfptr()]() mutable
- {
- detail::ignore_unused(p);
- for (asio::steady_timer* timer : this->timed_tasks_)
- {
- detail::cancel_timer(*timer);
- }
- }));
- return derive;
- }
- /**
- * @brief Stop all timed tasks which you posted with a delay duration.
- * This function is the same as stop_all_timed_events.
- */
- inline derived_t& stop_all_timed_tasks()
- {
- derived_t& derive = static_cast<derived_t&>(*this);
- return derive.stop_all_timed_events();
- }
- protected:
- /**
- * @brief Stop all timed events which you posted with a delay duration.
- * Use dispatch instead of post, this function is used for inner.
- */
- inline derived_t& _dispatch_stop_all_timed_events()
- {
- derived_t& derive = static_cast<derived_t&>(*this);
- asio::dispatch(derive.io_->context(), make_allocator(derive.wallocator(),
- [this, p = derive.selfptr()]() mutable
- {
- detail::ignore_unused(p);
- for (asio::steady_timer* timer : this->timed_tasks_)
- {
- detail::cancel_timer(*timer);
- }
- }));
- return derive;
- }
- protected:
- /// Used to exit the timer tasks when component is ready to stop.
- std::set<asio::steady_timer*> timed_tasks_;
- };
- }
- #endif // !__ASIO2_POST_COMPONENT_HPP__
|