thread_pool.hpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. *
  10. * refenced from : https://github.com/progschj/ThreadPool
  11. * see c++ 17 version : https://github.com/jhasse/ThreadPool
  12. *
  13. *
  14. * note :
  15. *
  16. * 1 : when declare an global thread_pool object in dll,when enter the
  17. * constructor to create std::thread,it will blocking forever.
  18. * 2 : when declare an global thread_pool object in dll and when dll is
  19. * released,the code will run into thread_pool destructor,then call
  20. * notify_all in the destructor, but the notify_all calling will
  21. * blocking forever.
  22. *
  23. * one resolve method is add a start and stop function,and move the
  24. * notify_all into the stop inner,and tell user call the start and
  25. * stop function manual.
  26. *
  27. * but in order to keep the interface simple,we don't add stop function,
  28. * you can use "new" "delete" way to avoid above problems,you can delete
  29. * thread_pool pointer object before exit.
  30. *
  31. * std::thread cause deadlock in DLLMain :
  32. * The constructor for the std::thread cannot return until the new thread
  33. * starts executing the thread procedure. When a new thread is created,
  34. * before the thread procedure is invoked, the entry point of each loaded
  35. * DLL is invoked for DLL_THREAD_ATTACH. To do this, the new thread must
  36. * acquire the loader lock. Unfortunately, your existing thread already
  37. * holds the loader lock.
  38. *
  39. */
  40. #ifndef __ASIO2_THREAD_POOL_HPP__
  41. #define __ASIO2_THREAD_POOL_HPP__
  42. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  43. #pragma once
  44. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  45. #include <cstdlib>
  46. #include <vector>
  47. #include <queue>
  48. #include <memory>
  49. #include <thread>
  50. #include <mutex>
  51. #include <condition_variable>
  52. #include <future>
  53. #include <functional>
  54. #include <stdexcept>
  55. namespace asio2
  56. {
  57. class thread_group;
  58. /**
  59. * thread pool interface, this pool is multi thread safed.
  60. * the tasks will be running in random thread.
  61. */
  62. class thread_pool
  63. {
  64. friend class thread_group;
  65. public:
  66. /**
  67. * @brief constructor
  68. */
  69. explicit thread_pool(std::size_t thread_count = std::thread::hardware_concurrency())
  70. {
  71. if (thread_count < static_cast<std::size_t>(1))
  72. thread_count = static_cast<std::size_t>(1);
  73. this->workers_.reserve(thread_count);
  74. for (std::size_t i = 0; i < thread_count; ++i)
  75. {
  76. // emplace_back can use the parameters to construct the std::thread object automictly
  77. // use lambda function as the thread proc function,lambda can has no parameters list
  78. this->workers_.emplace_back([this]() mutable
  79. {
  80. for (;;)
  81. {
  82. std::packaged_task<void()> task;
  83. {
  84. std::unique_lock<std::mutex> lock(this->mtx_);
  85. this->cv_.wait(lock, [this] { return (this->stop_ || !this->tasks_.empty()); });
  86. if (this->stop_ && this->tasks_.empty())
  87. return;
  88. task = std::move(this->tasks_.front());
  89. this->tasks_.pop();
  90. }
  91. task();
  92. }
  93. });
  94. }
  95. }
  96. /**
  97. * @brief destructor
  98. */
  99. ~thread_pool()
  100. {
  101. this->stop();
  102. }
  103. /**
  104. * @brief post a function object into the thread pool, then return immediately,
  105. * the function object will never be executed inside this function. Instead, it will
  106. * be executed asynchronously in the thread pool.
  107. * @param fun - global function,static function,lambda,member function,std::function.
  108. * @return std::future<fun_return_type>
  109. */
  110. template<class Fun, class... Args>
  111. auto post(Fun&& fun, Args&&... args) -> std::future<std::invoke_result_t<Fun, Args...>>
  112. {
  113. using return_type = std::invoke_result_t<Fun, Args...>;
  114. std::packaged_task<return_type()> task(
  115. std::bind(std::forward<Fun>(fun), std::forward<Args>(args)...));
  116. std::future<return_type> future = task.get_future();
  117. {
  118. std::unique_lock<std::mutex> lock(this->mtx_);
  119. // don't allow post after stopping the pool
  120. if (this->stop_)
  121. throw std::runtime_error("post a task into thread pool but the pool is stopped");
  122. this->tasks_.emplace(std::move(task));
  123. }
  124. this->cv_.notify_one();
  125. return future;
  126. }
  127. /**
  128. * @brief get thread count of the thread pool with no lock
  129. */
  130. inline std::size_t thread_count() noexcept
  131. {
  132. return this->workers_.size();
  133. }
  134. /**
  135. * @brief get thread count of the thread pool with lock
  136. */
  137. inline std::size_t get_thread_count() noexcept
  138. {
  139. std::unique_lock<std::mutex> lock(this->mtx_);
  140. return this->workers_.size();
  141. }
  142. /**
  143. * @brief get thread count of the thread pool with no lock, same as thread_count()
  144. */
  145. inline std::size_t pool_size() noexcept
  146. {
  147. return this->workers_.size();
  148. }
  149. /**
  150. * @brief get thread count of the thread pool with lock, same as get_thread_count()
  151. */
  152. inline std::size_t get_pool_size() noexcept
  153. {
  154. // is std container.size() thread safety ?
  155. // @see: the size() function in file: /asio2/base/session_mgr.hpp
  156. std::unique_lock<std::mutex> lock(this->mtx_);
  157. return this->workers_.size();
  158. }
  159. /**
  160. * @brief get remain task size with no lock
  161. */
  162. inline std::size_t task_size() noexcept
  163. {
  164. return this->tasks_.size();
  165. }
  166. /**
  167. * @brief get remain task size with lock
  168. */
  169. inline std::size_t get_task_size() noexcept
  170. {
  171. std::unique_lock<std::mutex> lock(this->mtx_);
  172. return this->tasks_.size();
  173. }
  174. /**
  175. * @brief Determine whether current code is running in the pool's threads.
  176. */
  177. inline bool running_in_threads() noexcept
  178. {
  179. std::unique_lock<std::mutex> lock(this->mtx_);
  180. std::thread::id curr_tid = std::this_thread::get_id();
  181. for (std::thread& thread : this->workers_)
  182. {
  183. if (curr_tid == thread.get_id())
  184. return true;
  185. }
  186. return false;
  187. }
  188. /**
  189. * @brief Determine whether current code is running in the thread by index
  190. */
  191. inline bool running_in_thread(std::size_t index) noexcept
  192. {
  193. std::unique_lock<std::mutex> lock(this->mtx_);
  194. if (!(index < this->workers_.size()))
  195. return false;
  196. return (std::this_thread::get_id() == this->workers_[index].get_id());
  197. }
  198. /**
  199. * @brief Get the thread id of the specified thread index with no lock.
  200. */
  201. inline std::thread::id thread_id(std::size_t index) noexcept
  202. {
  203. return this->workers_[index % this->workers_.size()].get_id();
  204. }
  205. /**
  206. * @brief Get the thread id of the specified thread index with lock.
  207. */
  208. inline std::thread::id get_thread_id(std::size_t index) noexcept
  209. {
  210. std::unique_lock<std::mutex> lock(this->mtx_);
  211. return this->workers_[index % this->workers_.size()].get_id();
  212. }
  213. protected:
  214. /**
  215. * @brief Stop the thread pool and block until all tasks finish executing
  216. */
  217. void stop()
  218. {
  219. {
  220. std::unique_lock<std::mutex> lock(this->mtx_);
  221. this->stop_ = true;
  222. }
  223. this->cv_.notify_all();
  224. for (std::thread& worker : this->workers_)
  225. {
  226. if (worker.joinable())
  227. worker.join();
  228. }
  229. }
  230. private:
  231. /// no copy construct function
  232. thread_pool(const thread_pool&) = delete;
  233. /// no operator equal function
  234. thread_pool& operator=(const thread_pool&) = delete;
  235. protected:
  236. // need to keep track of threads so we can join them
  237. std::vector<std::thread> workers_;
  238. // the task queue
  239. std::queue<std::packaged_task<void()>> tasks_;
  240. // synchronization
  241. std::mutex mtx_;
  242. std::condition_variable cv_;
  243. // flag indicate the pool is stoped
  244. bool stop_ = false;
  245. };
  246. /**
  247. * thread group interface, this group is multi thread safed.
  248. * the task will be running in the specified thread.
  249. */
  250. class thread_group
  251. {
  252. public:
  253. // Avoid conflicts with thread_pool in other namespace
  254. using worker_t = asio2::thread_pool;
  255. /**
  256. * @brief constructor
  257. */
  258. explicit thread_group(std::size_t thread_count = std::thread::hardware_concurrency())
  259. {
  260. if (thread_count < static_cast<std::size_t>(1))
  261. thread_count = static_cast<std::size_t>(1);
  262. this->workers_.reserve(thread_count);
  263. for (std::size_t i = 0; i < thread_count; ++i)
  264. {
  265. this->workers_.emplace_back(new worker_t(static_cast<std::size_t>(1)));
  266. }
  267. }
  268. /**
  269. * @brief destructor
  270. */
  271. ~thread_group()
  272. {
  273. // must block until all threads exited, otherwise maybe cause crash.
  274. // eg:
  275. // asio2::thread_group thpool;
  276. // thpool.post(1, [&thpool]()
  277. // {
  278. // // here, if the thread 0 is deleted already, this function will cause crash.
  279. // thpool.running_in_threads();
  280. // });
  281. for (worker_t* p : this->workers_)
  282. {
  283. p->stop();
  284. }
  285. for (worker_t* p : this->workers_)
  286. {
  287. delete p;
  288. }
  289. this->workers_.clear();
  290. }
  291. /**
  292. * @brief post a function object into the thread group with specified thread index,
  293. * then return immediately, the function object will never be executed inside this
  294. * function. Instead, it will be executed asynchronously in the thread group.
  295. * @param thread_index - which thread to execute the function.
  296. * @param fun - global function,static function,lambda,member function,std::function.
  297. * @return std::future<fun_return_type>
  298. */
  299. template<class IntegerT, class Fun, class... Args,
  300. std::enable_if_t<std::is_integral_v<std::remove_cv_t<std::remove_reference_t<IntegerT>>>, int> = 0>
  301. auto post(IntegerT thread_index, Fun&& fun, Args&&... args) -> std::future<std::invoke_result_t<Fun, Args...>>
  302. {
  303. return this->workers_[thread_index % this->workers_.size()]->post(
  304. std::forward<Fun>(fun), std::forward<Args>(args)...);
  305. }
  306. /**
  307. * @brief get thread count of the thread group with no lock
  308. */
  309. inline std::size_t thread_count() noexcept
  310. {
  311. return this->workers_.size();
  312. }
  313. /**
  314. * @brief get thread count of the thread group with lock
  315. */
  316. inline std::size_t get_thread_count() noexcept
  317. {
  318. return this->workers_.size();
  319. }
  320. /**
  321. * @brief get thread count of the thread group with no lock, same as thread_count()
  322. */
  323. inline std::size_t pool_size() noexcept
  324. {
  325. return this->workers_.size();
  326. }
  327. /**
  328. * @brief get thread count of the thread group with lock, same as get_thread_count()
  329. */
  330. inline std::size_t get_pool_size() noexcept
  331. {
  332. return this->workers_.size();
  333. }
  334. /**
  335. * @brief get remain task size with no lock
  336. */
  337. inline std::size_t task_size() noexcept
  338. {
  339. std::size_t count = 0;
  340. for (worker_t* p : this->workers_)
  341. {
  342. count += p->task_size();
  343. }
  344. return count;
  345. }
  346. /**
  347. * @brief get remain task size with lock
  348. */
  349. inline std::size_t get_task_size() noexcept
  350. {
  351. std::size_t count = 0;
  352. for (worker_t* p : this->workers_)
  353. {
  354. count += p->get_task_size();
  355. }
  356. return count;
  357. }
  358. /**
  359. * @brief get remain task size of the specified thread with no lock
  360. */
  361. inline std::size_t task_size(std::size_t thread_index) noexcept
  362. {
  363. return this->workers_[thread_index % this->workers_.size()]->task_size();
  364. }
  365. /**
  366. * @brief get remain task size of the specified thread with lock
  367. */
  368. inline std::size_t get_task_size(std::size_t thread_index) noexcept
  369. {
  370. return this->workers_[thread_index % this->workers_.size()]->get_task_size();
  371. }
  372. /**
  373. * @brief Determine whether current code is running in the group's threads.
  374. */
  375. inline bool running_in_threads() noexcept
  376. {
  377. for (worker_t* p : this->workers_)
  378. {
  379. if (p->running_in_threads())
  380. return true;
  381. }
  382. return false;
  383. }
  384. /**
  385. * @brief Determine whether current code is running in the thread by index
  386. */
  387. inline bool running_in_thread(std::size_t index) noexcept
  388. {
  389. if (!(index < this->workers_.size()))
  390. return false;
  391. return this->workers_[index]->running_in_thread(0);
  392. }
  393. /**
  394. * @brief Get the thread id of the specified thread index with no lock.
  395. */
  396. inline std::thread::id thread_id(std::size_t index) noexcept
  397. {
  398. return this->workers_[index % this->workers_.size()]->thread_id(0);
  399. }
  400. /**
  401. * @brief Get the thread id of the specified thread index with lock.
  402. */
  403. inline std::thread::id get_thread_id(std::size_t index) noexcept
  404. {
  405. return this->workers_[index % this->workers_.size()]->get_thread_id(0);
  406. }
  407. private:
  408. /// no copy construct function
  409. thread_group(const thread_group&) = delete;
  410. /// no operator equal function
  411. thread_group& operator=(const thread_group&) = delete;
  412. protected:
  413. //
  414. std::vector<worker_t*> workers_;
  415. };
  416. }
  417. #endif // !__ASIO2_THREAD_POOL_HPP__