Browse Source

#缓存队列

zxs 4 days ago
parent
commit
3487c0bd98
2 changed files with 86 additions and 49 deletions
  1. 80 45
      robot/robotics/cache_queue.hpp
  2. 6 4
      robot/robotics/thread_pool.hpp

+ 80 - 45
robot/robotics/cache_queue.hpp

@@ -13,61 +13,83 @@
 #include <fstream>
 #include <filesystem>
 //robotics
-#include "linq.hpp"
+#include "thread_pool.hpp"
+#include "serialization.hpp"
+#include "event_wait_for.hpp"
 
 namespace robotics {
     namespace v3 {
-        template<typename _Type>
-        class cache_queue : public v3::linq<_Type> {
-        public:
+		template<typename _Type>
+		class cache_queue : public v3::thread_pool<cache_queue<_Type>> {
+		public:
 			/**
-			 * @brief 构造
-			 * @param name
-			 * @param values
+			 * @brief
 			 */
-			cache_queue(std::string const& name) :
-				v3::linq<_Type>(source_data_),
-				file_name_(name) {
+			cache_queue(std::string const& file_name) :
+				v3::thread_pool<cache_queue<_Type>>(),
+				file_name_(file_name) {
+				load();
 			}
 			/**
-			 * @brief 析构
+			 * @brief
+			 */
+			cache_queue() :
+				v3::thread_pool<cache_queue<_Type>>() {
+			}
+			/**
+			 * @brief
+			 */
+			void set_file_name(std::string const& file_name) {
+				file_name_ = file_name;
+			}
+			/**
+			 * @brief
 			 */
 			~cache_queue() {
+				quit_ = true;
+				empty_wait_.notify();
 			}
 			/**
 			 * @brief 添加
-			 * @param value 
+			 * @param value
 			 */
-			void push(_Type const&value) {
-				this->append(value);
-				save();
+			void push(_Type const& value) {
+				std::lock_guard<std::mutex> locker(mutex_);
+				work_queue_.push_back(value);
+				save(work_queue_);
+				empty_wait_.notify();
 			}
 			/**
-			 * @brief 移除
+			 * @brief
 			 * @return
 			 */
 			_Type pop() {
-				_Type value;
-				{
-					std::lock_guard<std::mutex> locker(this->mutex_);
-					value = source_data_.front();
-					source_data_.erase(source_data_.begin());
-				}
-				this->reset();
-				save();
-				return value;
+				do {
+					{
+						std::lock_guard<std::mutex> locker(mutex_);
+						if (!work_queue_.empty()) {
+							_Type value = work_queue_.front();
+							work_queue_.erase(work_queue_.begin());
+							save(work_queue_);
+							return value;
+						}
+					}
+					empty_wait_.wait(0xffffff);
+				} while (!quit_);
+				return {};
 			}
+		private:
 			/**
 			 * @brief 加载
 			 */
-			bool load() {
+			void load() {
 				std::filesystem::path directory("data");
 				if (!std::filesystem::exists(directory)) {
 					std::filesystem::create_directory(directory);
 				}
 				std::fstream file("data/" + file_name_, std::ios::binary | std::ios::in);
 				if (!file) {
-					return false;
+					return;
 				}
 
 				// 获取文件大小
@@ -84,41 +106,54 @@ namespace robotics {
 					while (!stream.empty()) {
 						_Type value;
 						stream >> value;
-						this->append(value);
+						push(value);
 					}
-					return true;
 				}
-				else {
-					return false;
+			}
+			/**
+			 * @brief 保存
+			 * @param value
+			 */
+			void save(std::vector<_Type> const& value) {
+				if (!is_save_) {
+					work_queue_tmp_ = value;
+					save();
 				}
 			}
 			/**
 			 * @brief 保存
 			 */
-			bool save() {
+			void save() {
+				REGISTER_ASYNC_FUNC(save);
+				is_save_ = true;
 				std::filesystem::path directory("data");
 				if (!std::filesystem::exists(directory)) {
 					std::filesystem::create_directory(directory);
 				}
 				v3::archive::stream stream;
-				auto datas = source_data_;
-				for (auto& it : datas) {
+				for (auto& it : work_queue_tmp_) {
 					stream << it;
 				}
 				std::fstream file("data/" + file_name_, std::ios::binary | std::ios::out);
-				if (!file) {
-					return false;
+				if (file) {
+					std::vector<std::uint8_t> data = stream.data();
+					if (!data.empty()) {
+						// 写入数据
+						file.write(reinterpret_cast<const char*>(&data[0]), data.size());
+					}
+					// 关闭文件
+					file.close();
 				}
-				std::vector<std::uint8_t> data = stream.data();
-				// 写入数据
-				file.write(reinterpret_cast<const char*>(&data[0]), data.size());
-				// 关闭文件
-				file.close();
-				return true;
+				is_save_ = false;
 			}
 		private:
-			std::vector<_Type> source_data_;
-			std::string file_name_;
-        };
+			bool			   quit_ = false;
+			std::atomic<bool>  is_save_ = false;
+			std::mutex		   mutex_;
+			std::string		   file_name_;
+			v3::event_wait_for empty_wait_;
+			std::vector<_Type> work_queue_;
+			std::vector<_Type> work_queue_tmp_;
+		};
     }
 }

+ 6 - 4
robot/robotics/thread_pool.hpp

@@ -8,8 +8,10 @@
 *
 */
 #pragma once
-//stl
-#include <asio.hpp>
+//boost
+#include <boost/bind.hpp>
+#include <boost/asio.hpp>
+#include <boost/thread.hpp>
 #ifdef REGISTER_ASYNC_FUNC
 #else
 #define PRIVATE_ARGS_GLUE(x, y) x y
@@ -93,12 +95,12 @@ namespace robotics {
 			bool submit(_Fn&& fn, _This&& ths, _Args&&... args) {
 				if (__size__ == 0 || __thread_pool__.executor().running_in_this_thread())
 					return true;
-				asio::post(__thread_pool__, std::bind(std::forward<_Fn>(fn), std::forward<_This>(ths), std::forward<_Args>(args)...));
+				boost::asio::post(__thread_pool__, std::bind(std::forward<_Fn>(fn), std::forward<_This>(ths), std::forward<_Args>(args)...));
 				return false;
 			}
 		private:
 			std::uint32_t __size__ = 1;
-			asio::thread_pool __thread_pool__;
+			boost::asio::thread_pool __thread_pool__;
 		};
 	}
 }