|
@@ -0,0 +1,907 @@
|
|
|
+#pragma once
|
|
|
+//stl
|
|
|
+#include <iostream>
|
|
|
+//boost
|
|
|
+#include <boost/property_tree/ptree.hpp>
|
|
|
+#include <boost/property_tree/xml_parser.hpp>
|
|
|
+//robotics
|
|
|
+#include <robotics/utils.hpp>
|
|
|
+#include <robotics/function_traits.hpp>
|
|
|
+#include <robotics/logger.hpp>
|
|
|
+#include <robotics/serialization.hpp>
|
|
|
+#include <robotics/delegates.hpp>
|
|
|
+#include <robotics/thread_pool.hpp>
|
|
|
+#include <robotics/linq.hpp>
|
|
|
+#include <robotics/config.hpp>
|
|
|
+#include <robotics/timer.hpp>
|
|
|
+
|
|
|
+namespace robotics::v3 {
|
|
|
+ template<typename _Type>
|
|
|
+ class match_role {
|
|
|
+ public:
|
|
|
+ /**
|
|
|
+ * @brief 默认构造
|
|
|
+ */
|
|
|
+ explicit match_role() {}
|
|
|
+ /**
|
|
|
+ * @brief 构造
|
|
|
+ * @param session
|
|
|
+ */
|
|
|
+ explicit match_role(std::shared_ptr<_Type> const& session) :
|
|
|
+ session_(session) {
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 解包
|
|
|
+ * @tparam Iterator
|
|
|
+ * @param begin
|
|
|
+ * @param end
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ template <typename Iterator>
|
|
|
+ std::pair<Iterator, bool> operator()(Iterator begin, Iterator end) const {
|
|
|
+ Iterator p = begin;
|
|
|
+ while (p != end) {
|
|
|
+ //判断协议头
|
|
|
+ if (*p != 0x02) {
|
|
|
+ asio2::error_code ec;
|
|
|
+ session_->socket().close(ec);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ //获取数据长度
|
|
|
+ size_t size = size_t(std::uint8_t(*(++p))) << 24;
|
|
|
+ size += size_t(std::uint8_t(*(++p))) << 16;
|
|
|
+ size += size_t(std::uint8_t(*(++p))) << 8;
|
|
|
+ size += size_t(std::uint8_t(*(++p))) << 0;
|
|
|
+
|
|
|
+ if (end - begin >= size) {
|
|
|
+ if (*(begin + size - 1) == 0x03) {
|
|
|
+ return std::pair(begin + size, true);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ asio2::error_code ec;
|
|
|
+ session_->socket().close(ec);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ return std::pair(begin, false);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 初始化
|
|
|
+ * @param session
|
|
|
+ */
|
|
|
+ void init(std::shared_ptr<asio2::tcp_session>& session) {
|
|
|
+ session_ = session;
|
|
|
+ }
|
|
|
+ private:
|
|
|
+ /**
|
|
|
+ * @brief 客户端
|
|
|
+ */
|
|
|
+ std::shared_ptr<_Type> session_;
|
|
|
+ };
|
|
|
+
|
|
|
+ template<> struct asio::is_match_condition<match_role<asio2::tcp_client>> : public std::true_type {};
|
|
|
+
|
|
|
+ template<> struct asio::is_match_condition<match_role<asio2::tcp_session>> : public std::true_type {};
|
|
|
+ /**
|
|
|
+ * @brief 消息类型
|
|
|
+ */
|
|
|
+ enum class nexus_net_msg_type_enum {
|
|
|
+ /**
|
|
|
+ * @brief 身份认证
|
|
|
+ */
|
|
|
+ AUTHENTICATE = 1,
|
|
|
+ /**
|
|
|
+ * @brief 身份认证响应
|
|
|
+ */
|
|
|
+ REPAUTHENTICATE,
|
|
|
+ /**
|
|
|
+ * @brief 订阅
|
|
|
+ */
|
|
|
+ SUBSCRIBE,
|
|
|
+ /**
|
|
|
+ * @brief 删除订阅
|
|
|
+ */
|
|
|
+ REMOVESUBSCRIBE,
|
|
|
+ /**
|
|
|
+ * @brief 发布
|
|
|
+ */
|
|
|
+ PUBLISHER,
|
|
|
+ /**
|
|
|
+ * @brief 发布响应
|
|
|
+ */
|
|
|
+ REPPUBLISHER,
|
|
|
+ /**
|
|
|
+ * @brief 心跳
|
|
|
+ */
|
|
|
+ HEARTBEAT,
|
|
|
+ /**
|
|
|
+ * @brief 详情
|
|
|
+ */
|
|
|
+ DETAILS,
|
|
|
+ /**
|
|
|
+ * @brief 详情响应
|
|
|
+ */
|
|
|
+ REPDETAILS
|
|
|
+ };
|
|
|
+ /**
|
|
|
+ * @brief 消息
|
|
|
+ */
|
|
|
+ class nexus_net_message {
|
|
|
+ public:
|
|
|
+ /**
|
|
|
+ * @brief 构造
|
|
|
+ */
|
|
|
+ nexus_net_message() {}
|
|
|
+ /**
|
|
|
+ * @brief 构造
|
|
|
+ * @param sv
|
|
|
+ */
|
|
|
+ nexus_net_message(std::string_view sv) {
|
|
|
+ set_data(sv);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 构造
|
|
|
+ * @param data
|
|
|
+ */
|
|
|
+ nexus_net_message(std::vector<std::uint8_t>const& data) {
|
|
|
+ set_data(data);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 设置
|
|
|
+ * @param sv
|
|
|
+ */
|
|
|
+ void set_data(std::string_view sv) {
|
|
|
+ set_data(std::vector<std::uint8_t>(sv.begin(), sv.end()));
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 设置
|
|
|
+ * @param data
|
|
|
+ */
|
|
|
+ void set_data(std::vector<std::uint8_t>const& data) {
|
|
|
+ //消息类型
|
|
|
+ msg_type = data[5];
|
|
|
+ //消息ID
|
|
|
+ size_t size = data[6];
|
|
|
+ size_t begin = 7;
|
|
|
+ size_t end = begin + size;
|
|
|
+ msg_id.assign(data.begin() + begin, data.begin() + end);
|
|
|
+ //路由
|
|
|
+ size = data[end];
|
|
|
+ begin = end + 1;
|
|
|
+ end = begin + size;
|
|
|
+ route.assign(data.begin() + begin, data.begin() + end);
|
|
|
+ //路由列表
|
|
|
+ size = data[end] << 8;
|
|
|
+ size += data[++end];
|
|
|
+ begin = end + 1;
|
|
|
+ end = begin + size;
|
|
|
+ if (size > 0) {
|
|
|
+ std::string str_routes(data.begin() + begin, data.begin() + end);
|
|
|
+ routes = v3::utils::split(str_routes, 0x0d);
|
|
|
+ }
|
|
|
+ //参数
|
|
|
+ size = (data[end] << 24);
|
|
|
+ size += (data[++end] << 16);
|
|
|
+ size += (data[++end] << 8);
|
|
|
+ size += data[++end];
|
|
|
+ begin = end + 1;
|
|
|
+ end = begin + size;
|
|
|
+ args.add(std::vector<std::uint8_t>(data.begin() + begin, data.begin() + end));
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 获取
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ std::vector<std::uint8_t> get_data() {
|
|
|
+ std::vector<std::uint8_t> result;
|
|
|
+ //协议头
|
|
|
+ result.push_back(0x02);
|
|
|
+ //字节数量
|
|
|
+ result.push_back(0x00);
|
|
|
+ result.push_back(0x00);
|
|
|
+ result.push_back(0x00);
|
|
|
+ result.push_back(0x00);
|
|
|
+ //消息类型
|
|
|
+ result.push_back(msg_type);
|
|
|
+ //消息ID
|
|
|
+ result.push_back(msg_id.size());
|
|
|
+ result.insert(result.end(), msg_id.begin(), msg_id.end());
|
|
|
+ //路由
|
|
|
+ result.push_back(route.size());
|
|
|
+ result.insert(result.end(), route.begin(), route.end());
|
|
|
+ //路由列表
|
|
|
+ std::vector<std::uint8_t> vec_routes;
|
|
|
+ for (size_t i = 0; i < routes.size(); ++i) {
|
|
|
+ if (i + 1 == routes.size()) {
|
|
|
+ vec_routes.insert(vec_routes.end(), routes[i].begin(), routes[i].end());
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ vec_routes.insert(vec_routes.end(), routes[i].begin(), routes[i].end());
|
|
|
+ vec_routes.push_back(0x0d);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ result.push_back(vec_routes.size() >> 8);
|
|
|
+ result.push_back(vec_routes.size() >> 0);
|
|
|
+ result.insert(result.end(), vec_routes.begin(), vec_routes.end());
|
|
|
+ //参数
|
|
|
+ std::vector<std::uint8_t> vec_args = args.data();
|
|
|
+ result.push_back(vec_args.size() >> 24);
|
|
|
+ result.push_back(vec_args.size() >> 16);
|
|
|
+ result.push_back(vec_args.size() >> 8);
|
|
|
+ result.push_back(vec_args.size() >> 0);
|
|
|
+ result.insert(result.end(), vec_args.begin(), vec_args.end());
|
|
|
+ //协议尾
|
|
|
+ result.push_back(0x03);
|
|
|
+
|
|
|
+ result[1] = (result.size() >> 24);
|
|
|
+ result[2] = (result.size() >> 16);
|
|
|
+ result[3] = (result.size() >> 8);
|
|
|
+ result[4] = (result.size() >> 0);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ public:
|
|
|
+ /**
|
|
|
+ * @brief 消息类型
|
|
|
+ */
|
|
|
+ std::uint8_t msg_type = 0;
|
|
|
+ /**
|
|
|
+ * @brief 消息ID
|
|
|
+ */
|
|
|
+ std::string msg_id;
|
|
|
+ /**
|
|
|
+ * @brief 路由
|
|
|
+ */
|
|
|
+ std::string route;
|
|
|
+ /**
|
|
|
+ * @brief 路由列表
|
|
|
+ */
|
|
|
+ std::vector<std::string> routes;
|
|
|
+ /**
|
|
|
+ * @brief 参数
|
|
|
+ */
|
|
|
+ v3::archive::stream args;
|
|
|
+ };
|
|
|
+ /**
|
|
|
+ * @brief 日志类型
|
|
|
+ */
|
|
|
+ struct nexus_net_logger_config_info {
|
|
|
+ /**
|
|
|
+ * @brief 日志类型
|
|
|
+ */
|
|
|
+ int type = 0;
|
|
|
+ /**
|
|
|
+ * @brief 启用
|
|
|
+ */
|
|
|
+ bool enable = false;
|
|
|
+ /**
|
|
|
+ * @brief 备注
|
|
|
+ */
|
|
|
+ std::string remarks;
|
|
|
+ };
|
|
|
+ /**
|
|
|
+ * @brief 父节点配置
|
|
|
+ */
|
|
|
+ struct nexus_net_client_config_info {
|
|
|
+ /**
|
|
|
+ * @brief IP地址
|
|
|
+ */
|
|
|
+ std::string ip;
|
|
|
+ /**
|
|
|
+ * @brief 端口
|
|
|
+ */
|
|
|
+ int port = 0;
|
|
|
+ /**
|
|
|
+ * @brief 启用
|
|
|
+ */
|
|
|
+ bool enable = false;
|
|
|
+ /**
|
|
|
+ * @brief 备注
|
|
|
+ */
|
|
|
+ std::string remarks;
|
|
|
+ };
|
|
|
+ /**
|
|
|
+ * @brief 订阅配置
|
|
|
+ */
|
|
|
+ struct nexus_net_subscribe_config_info {
|
|
|
+ /**
|
|
|
+ * @brief KEY
|
|
|
+ */
|
|
|
+ std::string key;
|
|
|
+ /**
|
|
|
+ * @brief 路由
|
|
|
+ */
|
|
|
+ std::string route;
|
|
|
+ /**
|
|
|
+ * @brief 启用
|
|
|
+ */
|
|
|
+ bool enable = false;
|
|
|
+ /**
|
|
|
+ * @brief 备注
|
|
|
+ */
|
|
|
+ std::string remarks;
|
|
|
+ };
|
|
|
+ /**
|
|
|
+ * @brief 发布配置
|
|
|
+ */
|
|
|
+ struct nexus_net_publisher_config_info {
|
|
|
+ /**
|
|
|
+ * @brief KEY
|
|
|
+ */
|
|
|
+ std::string key;
|
|
|
+ /**
|
|
|
+ * @brief 节点
|
|
|
+ */
|
|
|
+ std::string route;
|
|
|
+ /**
|
|
|
+ * @brief 启用
|
|
|
+ */
|
|
|
+ bool enable = false;
|
|
|
+ /**
|
|
|
+ * @brief 备注
|
|
|
+ */
|
|
|
+ std::string remarks;
|
|
|
+ };
|
|
|
+ /**
|
|
|
+ * @brief 用户信息
|
|
|
+ */
|
|
|
+ struct nexus_net_user_config_info {
|
|
|
+ /**
|
|
|
+ * @brief 名称
|
|
|
+ */
|
|
|
+ std::string name;
|
|
|
+ /**
|
|
|
+ * @brief 密码
|
|
|
+ */
|
|
|
+ std::string password;
|
|
|
+ /**
|
|
|
+ * @brief 启用
|
|
|
+ */
|
|
|
+ bool enable = false;
|
|
|
+ /**
|
|
|
+ * @brief 备注
|
|
|
+ */
|
|
|
+ std::string remarks;
|
|
|
+ };
|
|
|
+ /**
|
|
|
+ * @brief 配置
|
|
|
+ */
|
|
|
+ struct nexus_net_config_info {
|
|
|
+ /**
|
|
|
+ * @brief 身份
|
|
|
+ */
|
|
|
+ nexus_net_user_config_info user;
|
|
|
+ /**
|
|
|
+ * @brief 日志
|
|
|
+ */
|
|
|
+ std::vector<nexus_net_logger_config_info> loggers;
|
|
|
+ /**
|
|
|
+ * @brief 客户端
|
|
|
+ */
|
|
|
+ nexus_net_client_config_info client;
|
|
|
+ /**
|
|
|
+ * @brief 订阅
|
|
|
+ */
|
|
|
+ std::vector<nexus_net_subscribe_config_info> subscribes;
|
|
|
+ /**
|
|
|
+ * @brief 发布
|
|
|
+ */
|
|
|
+ std::vector<nexus_net_publisher_config_info> publishers;
|
|
|
+ };
|
|
|
+ /**
|
|
|
+ * @brief nexusnet配置
|
|
|
+ */
|
|
|
+ class nexus_net_config {
|
|
|
+ public:
|
|
|
+ static nexus_net_config_info read() {
|
|
|
+ static std::shared_ptr<nexus_net_config_info> result;
|
|
|
+ static std::mutex mutex;
|
|
|
+ std::lock_guard<std::mutex> locker(mutex);
|
|
|
+ if (result)
|
|
|
+ return *result;
|
|
|
+ result.reset(new nexus_net_config_info());
|
|
|
+ boost::property_tree::ptree root;
|
|
|
+ read_xml("config/nexus_net_config.xml", root);
|
|
|
+ //身份信息
|
|
|
+ for (auto& it : root.get_child("root.user")) {
|
|
|
+ if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
|
|
|
+ continue;
|
|
|
+ result->user.name = it.second.get<std::string>("<xmlattr>.name", "admin");
|
|
|
+ result->user.password = it.second.get<std::string>("<xmlattr>.password", "123456");
|
|
|
+ result->user.enable = it.second.get<bool>("<xmlattr>.enable", true);
|
|
|
+ result->user.remarks = it.second.get<std::string>("<xmlattr>.remarks", "身份信息");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ //日志
|
|
|
+ for (auto& it : root.get_child("root.logger")) {
|
|
|
+ if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
|
|
|
+ continue;
|
|
|
+ nexus_net_logger_config_info item;
|
|
|
+ item.type = it.second.get<int>("<xmlattr>.type", 0);
|
|
|
+ item.enable = it.second.get<bool>("<xmlattr>.enable", true);
|
|
|
+ item.remarks = it.second.get<std::string>("<xmlattr>.remarks", "日志");
|
|
|
+ result->loggers.push_back(item);
|
|
|
+ }
|
|
|
+ //客户端
|
|
|
+ for (auto& it : root.get_child("root.client")) {
|
|
|
+ if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
|
|
|
+ continue;
|
|
|
+ result->client.ip = it.second.get<std::string>("<xmlattr>.ip", "0.0.0.0");
|
|
|
+ result->client.port = it.second.get<int>("<xmlattr>.port", 20001);
|
|
|
+ result->client.enable = it.second.get<bool>("<xmlattr>.enable", false);
|
|
|
+ result->client.remarks = it.second.get<std::string>("<xmlattr>.remarks", "客户");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ //订阅
|
|
|
+ for (auto& it : root.get_child("root.subscribe")) {
|
|
|
+ if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
|
|
|
+ continue;
|
|
|
+ nexus_net_subscribe_config_info item;
|
|
|
+ item.key = it.second.get<std::string>("<xmlattr>.key", "");
|
|
|
+ item.route = it.second.get<std::string>("<xmlattr>.route", "");
|
|
|
+ item.enable = it.second.get<bool>("<xmlattr>.enable", false);
|
|
|
+ item.remarks = it.second.get<std::string>("<xmlattr>.remarks", "白名单");
|
|
|
+ result->subscribes.push_back(item);
|
|
|
+ }
|
|
|
+ //发布
|
|
|
+ for (auto& it : root.get_child("root.publisher")) {
|
|
|
+ if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
|
|
|
+ continue;
|
|
|
+ nexus_net_publisher_config_info item;
|
|
|
+ item.key = it.second.get<std::string>("<xmlattr>.key", "");
|
|
|
+ item.route = it.second.get<std::string>("<xmlattr>.route", "");
|
|
|
+ item.enable = it.second.get<bool>("<xmlattr>.enable", false);
|
|
|
+ item.remarks = it.second.get<std::string>("<xmlattr>.remarks", "白名单");
|
|
|
+ result->publishers.push_back(item);
|
|
|
+ }
|
|
|
+ return *result;
|
|
|
+ }
|
|
|
+ static void logger(bool success, bool is_out, nexus_net_message value) {
|
|
|
+ static std::map<std::uint8_t, std::string> g_msg_type = {
|
|
|
+ {(std::uint8_t)nexus_net_msg_type_enum::AUTHENTICATE, "身份认证"},
|
|
|
+ {(std::uint8_t)nexus_net_msg_type_enum::REPAUTHENTICATE, "身份认证响应"},
|
|
|
+ {(std::uint8_t)nexus_net_msg_type_enum::SUBSCRIBE, "订阅"},
|
|
|
+ {(std::uint8_t)nexus_net_msg_type_enum::REMOVESUBSCRIBE, "删除订阅"},
|
|
|
+ {(std::uint8_t)nexus_net_msg_type_enum::PUBLISHER, "发布"},
|
|
|
+ {(std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER, "发布响应"},
|
|
|
+ {(std::uint8_t)nexus_net_msg_type_enum::HEARTBEAT, "心跳"},
|
|
|
+ {(std::uint8_t)nexus_net_msg_type_enum::DETAILS, "详情"},
|
|
|
+ {(std::uint8_t)nexus_net_msg_type_enum::REPDETAILS, "详情响应"}
|
|
|
+ };
|
|
|
+ auto config = read();
|
|
|
+ auto it = std::find_if(config.loggers.begin(), config.loggers.end(), [=](auto& it) {
|
|
|
+ return it.type == value.msg_type && it.enable; });
|
|
|
+ if (it != config.loggers.end()) {
|
|
|
+ if (success) {
|
|
|
+ LOG_INFO << (is_out ? "接收成功" : "发送成功") << ",消息类型:" << g_msg_type[value.msg_type] << " 消息ID:" << value.msg_id << " 事件:" << value.route << " 路由:" << value.routes;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ LOG_ERROR << (is_out ? "接收失败" : "发送失败") << ",消息类型:" << g_msg_type[value.msg_type] << " 消息ID:" << value.msg_id << " 事件:" << value.route << " 路由:" << value.routes;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ /**
|
|
|
+ * @brief 客户端
|
|
|
+ */
|
|
|
+ class nexus_net_tcp_client {
|
|
|
+ public:
|
|
|
+ nexus_net_tcp_client() :
|
|
|
+ client_(new asio2::tcp_client()) {
|
|
|
+ client_->bind_connect(&nexus_net_tcp_client::on_connect, this);
|
|
|
+ client_->bind_disconnect(&nexus_net_tcp_client::on_disconnect, this);
|
|
|
+ client_->bind_recv(&nexus_net_tcp_client::on_recv, this);
|
|
|
+ }
|
|
|
+ void start(std::string const& ip, int port) {
|
|
|
+ client_->async_start(ip, port, match_role<asio2::tcp_client>(client_));
|
|
|
+ }
|
|
|
+ void stop() {
|
|
|
+ client_->stop();
|
|
|
+ }
|
|
|
+ void send(nexus_net_message& data, std::function<void()> const& fn) {
|
|
|
+ client_->async_send(data.get_data(), fn);
|
|
|
+ }
|
|
|
+ v3::delegates<bool, std::string const&> connect_event;
|
|
|
+ v3::delegates<> disconnect_event;
|
|
|
+ v3::delegates<nexus_net_message const&> recv_event;
|
|
|
+ private:
|
|
|
+ void on_connect() {
|
|
|
+ connect_event(!asio2::get_last_error(), asio2::get_last_error_msg());
|
|
|
+ }
|
|
|
+ void on_disconnect() {
|
|
|
+ disconnect_event();
|
|
|
+ }
|
|
|
+ void on_recv(std::string_view sv) {
|
|
|
+ recv_event(nexus_net_message(sv));
|
|
|
+ }
|
|
|
+ private:
|
|
|
+ std::shared_ptr<asio2::tcp_client> client_;
|
|
|
+ };
|
|
|
+ /**
|
|
|
+ * @brief 客户端
|
|
|
+ */
|
|
|
+ class nexus_net_client : public v3::thread_pool<nexus_net_client> {
|
|
|
+ private:
|
|
|
+ class message_manage_gc;
|
|
|
+ /**
|
|
|
+ * @brief 消息管理
|
|
|
+ */
|
|
|
+ class message_manage {
|
|
|
+ private:
|
|
|
+ friend class message_manage_gc;
|
|
|
+ typedef std::map<std::string, std::pair<std::shared_ptr<v3::event_wait_for>, nexus_net_message>> message_list_type;
|
|
|
+ public:
|
|
|
+ /**
|
|
|
+ * @brief 添加
|
|
|
+ * @param msg_id
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ std::shared_ptr<v3::event_wait_for> add(std::string const& msg_id) {
|
|
|
+ std::lock_guard<std::mutex> locker(mutex_);
|
|
|
+ msg_list_[msg_id].first.reset(new v3::event_wait_for);
|
|
|
+ return msg_list_[msg_id].first;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 获取数据
|
|
|
+ * @param msg_id
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ nexus_net_message& get_data(std::string const& msg_id) {
|
|
|
+ std::lock_guard<std::mutex> locker(mutex_);
|
|
|
+ return msg_list_[msg_id].second;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 设置数据
|
|
|
+ * @param msg_id
|
|
|
+ * @param data
|
|
|
+ */
|
|
|
+ void set_data(std::string const& msg_id, nexus_net_message const& data) {
|
|
|
+ std::lock_guard<std::mutex> locker(mutex_);
|
|
|
+ msg_list_[msg_id].second = data;
|
|
|
+ msg_list_[msg_id].first->notify();
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 查找数据
|
|
|
+ * @param msg_id
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ bool find(std::string const& msg_id) {
|
|
|
+ std::lock_guard<std::mutex> locker(mutex_);
|
|
|
+ return msg_list_.find(msg_id) != msg_list_.end();
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 移出
|
|
|
+ * @param msg_id
|
|
|
+ */
|
|
|
+ void remove(std::string const& msg_id) {
|
|
|
+ std::lock_guard<std::mutex> locker(mutex_);
|
|
|
+ msg_list_.erase(msg_id);
|
|
|
+ }
|
|
|
+ private:
|
|
|
+ message_list_type msg_list_;
|
|
|
+ std::mutex mutex_;
|
|
|
+ };
|
|
|
+ /**
|
|
|
+ * @brief 消息管理GC
|
|
|
+ */
|
|
|
+ class message_manage_gc {
|
|
|
+ public:
|
|
|
+ /**
|
|
|
+ * @brief 构造
|
|
|
+ * @param data
|
|
|
+ * @param msg_id
|
|
|
+ */
|
|
|
+ message_manage_gc(message_manage& data, std::string& msg_id) :
|
|
|
+ message_list_(data),
|
|
|
+ msg_id_(msg_id) {
|
|
|
+ wait_ = message_list_.add(msg_id_);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 析构
|
|
|
+ */
|
|
|
+ ~message_manage_gc() {
|
|
|
+ message_list_.remove(msg_id_);
|
|
|
+ }
|
|
|
+ //添加
|
|
|
+ bool loop(int millisecond) {
|
|
|
+ return wait_->wait(millisecond);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 设置数据
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ nexus_net_message& get_data() {
|
|
|
+ return message_list_.get_data(msg_id_);
|
|
|
+ }
|
|
|
+ private:
|
|
|
+ std::string& msg_id_;
|
|
|
+ message_manage& message_list_;
|
|
|
+ std::shared_ptr<v3::event_wait_for> wait_;
|
|
|
+ };
|
|
|
+ public:
|
|
|
+ /**
|
|
|
+ * @brief 构造
|
|
|
+ */
|
|
|
+ nexus_net_client() :
|
|
|
+ v3::thread_pool<nexus_net_client>(5) {
|
|
|
+ client_.connect_event.bind(&nexus_net_client::on_connect, this);
|
|
|
+ client_.disconnect_event.bind(&nexus_net_client::on_disconnect, this);
|
|
|
+ client_.recv_event.bind(&nexus_net_client::do_work_nexus_net_message, this);
|
|
|
+ timer_.timeout_event.bind(&nexus_net_client::on_timeout, this);
|
|
|
+ timer_.start(5000);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 析构
|
|
|
+ */
|
|
|
+ ~nexus_net_client() {
|
|
|
+ stop();
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 启动
|
|
|
+ */
|
|
|
+ void start() {
|
|
|
+ auto config = nexus_net_config::read();
|
|
|
+ if (config.client.enable) {
|
|
|
+ client_.start(config.client.ip, config.client.port);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 停止
|
|
|
+ */
|
|
|
+ void stop() {
|
|
|
+ client_.stop();
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 获取父节点名称
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ std::string get_parent_name() {
|
|
|
+ return parent_name_;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 订阅
|
|
|
+ * @tparam _Fn
|
|
|
+ * @tparam _This
|
|
|
+ * @param route
|
|
|
+ * @param fn
|
|
|
+ * @param ths
|
|
|
+ */
|
|
|
+ template<typename _Fn, typename _This>
|
|
|
+ void subscribe(std::string const& route, _Fn&& fn, _This&& ths) {
|
|
|
+ functions_.bind(route, std::forward<_Fn>(fn), std::forward<_This>(ths));
|
|
|
+ nexus_net_message msg;
|
|
|
+ msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::SUBSCRIBE;
|
|
|
+ msg.route = route;
|
|
|
+ subscribes_.push_back(msg);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 发布
|
|
|
+ * @tparam _Ret
|
|
|
+ * @tparam ..._Args
|
|
|
+ * @param route
|
|
|
+ * @param ...args
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ template<typename _Ret, typename ..._Args>
|
|
|
+ _Ret publisher(std::string const& route, _Args&&...args) {
|
|
|
+ nexus_net_message msg;
|
|
|
+ msg.route = route;
|
|
|
+ msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::PUBLISHER;
|
|
|
+ msg.args << std::make_tuple(std::forward<_Args>(args)...);
|
|
|
+ if constexpr (!std::is_same<void, _Ret>::value) {
|
|
|
+ msg.msg_id = v3::utils::uuid();
|
|
|
+ message_manage_gc gc(message_manage_, msg.msg_id);
|
|
|
+ send(msg);
|
|
|
+ if (!gc.loop(2000))
|
|
|
+ throw std::exception("请求超时");
|
|
|
+ _Ret result;
|
|
|
+ gc.get_data().args >> result;
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ send(msg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 发布
|
|
|
+ * @tparam _Ret
|
|
|
+ * @tparam ..._Args
|
|
|
+ * @param timeout
|
|
|
+ * @param route
|
|
|
+ * @param ...args
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ template<typename _Ret, typename ..._Args>
|
|
|
+ _Ret publisher(int timeout, std::string const& route, _Args&&...args) {
|
|
|
+ nexus_net_message msg;
|
|
|
+ msg.route = route;
|
|
|
+ msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::PUBLISHER;
|
|
|
+ msg.args << std::make_tuple(std::forward<_Args>(args)...);
|
|
|
+ if constexpr (!std::is_same<void, _Ret>::value) {
|
|
|
+ msg.msg_id = v3::utils::uuid();
|
|
|
+ message_manage_gc gc(message_manage_, msg.msg_id);
|
|
|
+ send(msg);
|
|
|
+ if (!gc.loop(timeout))
|
|
|
+ throw std::exception("请求超时");
|
|
|
+ _Ret result;
|
|
|
+ gc.get_data().args >> result;
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ send(msg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 获取节点信息
|
|
|
+ * @param route
|
|
|
+ * @param timeout
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ std::pair<nexus_net_response_details_info, std::string> details(std::string const& route, int timeout = 2000) {
|
|
|
+ std::pair<nexus_net_response_details_info, std::string> result;
|
|
|
+ nexus_net_message msg;
|
|
|
+ msg.route = route;
|
|
|
+ msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::DETAILS;
|
|
|
+ msg.msg_id = v3::utils::uuid();
|
|
|
+ message_manage_gc gc(message_manage_, msg.msg_id);
|
|
|
+ send(msg);
|
|
|
+ if (!gc.loop(timeout))
|
|
|
+ throw std::exception("请求超时");
|
|
|
+ gc.get_data().args >> result.first;
|
|
|
+ result.second = gc.get_data().route;
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 授权状态
|
|
|
+ */
|
|
|
+ v3::delegates<bool> authorized_event;
|
|
|
+ /**
|
|
|
+ * @brief 连接断开
|
|
|
+ */
|
|
|
+ v3::delegates<> disconnect_event;
|
|
|
+ /**
|
|
|
+ * @brief 连接状态
|
|
|
+ */
|
|
|
+ v3::delegates<bool> connect_event;
|
|
|
+ private:
|
|
|
+ void send(nexus_net_message data) {
|
|
|
+ if (data.msg_type != (std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER) {
|
|
|
+ auto config = nexus_net_config::read();
|
|
|
+ data.routes.push_back(config.user.name);
|
|
|
+ if (data.msg_type == (std::uint8_t)nexus_net_msg_type_enum::SUBSCRIBE) {
|
|
|
+ data.route = fmt::format("{}/{}/{}", parent_name_, config.user.name, data.route);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else if (data.msg_type == (std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER && !data.routes.empty()) {
|
|
|
+ data.routes.pop_back();
|
|
|
+ }
|
|
|
+ client_.send(data, [=]() {
|
|
|
+ nexus_net_config::logger(!asio2::get_last_error(), false, data); });
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 身份认证
|
|
|
+ */
|
|
|
+ void authenticate() {
|
|
|
+ auto config = nexus_net_config::read();
|
|
|
+ if (config.user.enable) {
|
|
|
+ nexus_net_message data;
|
|
|
+ data.msg_type = (std::uint8_t)nexus_net_msg_type_enum::AUTHENTICATE;
|
|
|
+ data.args << config.user.name << config.user.password;
|
|
|
+ send(data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 心跳
|
|
|
+ */
|
|
|
+ void heartbeat() {
|
|
|
+ if (!is_heartbeat_)
|
|
|
+ return;
|
|
|
+ nexus_net_message msg;
|
|
|
+ msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::HEARTBEAT;
|
|
|
+ send(msg);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 订阅
|
|
|
+ */
|
|
|
+ void subscribe() {
|
|
|
+ for (auto& it : subscribes_) {
|
|
|
+ send(it);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ private:
|
|
|
+ void on_connect(bool success, std::string const& msg) {
|
|
|
+ if (success) {
|
|
|
+ connect_event(true);
|
|
|
+ authenticate();
|
|
|
+ LOG_INFO << "连接成功!";
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ connect_event(false);
|
|
|
+ LOG_ERROR << "连接失败!";
|
|
|
+ }
|
|
|
+ }
|
|
|
+ void on_disconnect() {
|
|
|
+ disconnect_event();
|
|
|
+ is_heartbeat_ = false;
|
|
|
+ LOG_INFO << "连接断开!";
|
|
|
+ }
|
|
|
+ void do_work_nexus_net_message(nexus_net_message data) {
|
|
|
+ nexus_net_config::logger(true, true, data);
|
|
|
+ switch ((nexus_net_msg_type_enum)data.msg_type) {
|
|
|
+ case nexus_net_msg_type_enum::REPAUTHENTICATE: on_authenticate(data); break;
|
|
|
+ case nexus_net_msg_type_enum::PUBLISHER: on_publisher(data); break;
|
|
|
+ case nexus_net_msg_type_enum::REPPUBLISHER: on_reppublisher(data); break;
|
|
|
+ case nexus_net_msg_type_enum::REPDETAILS: on_repdetails(data); break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ void on_timeout() {
|
|
|
+ heartbeat();
|
|
|
+ }
|
|
|
+ private:
|
|
|
+ /**
|
|
|
+ * @brief 授权响应
|
|
|
+ * @param data
|
|
|
+ */
|
|
|
+ void on_authenticate(nexus_net_message data) {
|
|
|
+ try {
|
|
|
+ data.args >> is_heartbeat_ >> parent_name_;
|
|
|
+ if (is_heartbeat_) {
|
|
|
+ authorized_event(true);
|
|
|
+ subscribe();
|
|
|
+ LOG_INFO << "身份验证成功!";
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ authorized_event(false);
|
|
|
+ LOG_ERROR << "身份验证失败!";
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (std::exception const& ec) {
|
|
|
+ LOG_ERROR << ec;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 发布
|
|
|
+ * @param data
|
|
|
+ */
|
|
|
+ void on_publisher(nexus_net_message data) {
|
|
|
+ REGISTER_ASYNC_FUNC(on_publisher, data);
|
|
|
+ if (3 != std::count(data.route.begin(), data.route.end(), '/')) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ std::string key = data.route.substr(data.route.find('/', data.route.find('/') + 1) + 1);
|
|
|
+ data.args = functions_.invoke(key, data.args);
|
|
|
+ if (!data.args.empty()) {
|
|
|
+ data.msg_type = (std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER;
|
|
|
+ send(data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 发布响应
|
|
|
+ * @param data
|
|
|
+ */
|
|
|
+ void on_reppublisher(nexus_net_message data) {
|
|
|
+ if (message_manage_.find(data.msg_id)) {
|
|
|
+ message_manage_.set_data(data.msg_id, data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * @brief 详情响应
|
|
|
+ * @param data
|
|
|
+ */
|
|
|
+ void on_repdetails(nexus_net_message data) {
|
|
|
+ if (message_manage_.find(data.msg_id)) {
|
|
|
+ message_manage_.set_data(data.msg_id, data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ private:
|
|
|
+ v3::timer timer_;
|
|
|
+ std::thread work_thread_;
|
|
|
+ std::string parent_name_;
|
|
|
+ bool is_heartbeat_ = false;
|
|
|
+ nexus_net_tcp_client client_;
|
|
|
+ message_manage message_manage_;
|
|
|
+ std::vector<nexus_net_message> subscribes_;
|
|
|
+ v3::archive::function_manage<std::string> functions_;
|
|
|
+ };
|
|
|
+
|
|
|
+}
|