123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988 |
- #pragma once
- //stl
- #include <iostream>
- #include <optional>
- //asio2
- #include <asio2/asio2.hpp>
- //boost
- #include <boost/property_tree/ptree.hpp>
- #include <boost/property_tree/xml_parser.hpp>
- //robotics
- #include "utils.hpp"
- #include "function_traits.hpp"
- #include "logger.hpp"
- #include "serialization.hpp"
- #include "delegates.hpp"
- #include "thread_pool.hpp"
- #include "linq.hpp"
- #include "config.hpp"
- #include "timer.hpp"
- namespace robotics {
- namespace v3 {
- template<typename _Type>
- class match_role {
- public:
- /**
- * @brief 默认构造
- */
- explicit match_role() {}
- /**
- * @brief 构造
- * @param session
- */
- explicit match_role(std::shared_ptr<_Type> const& session) :
- session_(session) {
- }
- /**
- * @brief 解包
- * @tparam Iterator
- * @param begin
- * @param end
- * @return
- */
- template <typename Iterator>
- std::pair<Iterator, bool> operator()(Iterator begin, Iterator end) const {
- Iterator p = begin;
- while (p != end) {
- //判断协议头
- if (*p != 0x02) {
- asio2::error_code ec;
- session_->socket().close(ec);
- break;
- }
- //获取数据长度
- size_t size = size_t(std::uint8_t(*(++p))) << 24;
- size += size_t(std::uint8_t(*(++p))) << 16;
- size += size_t(std::uint8_t(*(++p))) << 8;
- size += size_t(std::uint8_t(*(++p))) << 0;
- if (end - begin >= size) {
- if (*(begin + size - 1) == 0x03) {
- return std::pair(begin + size, true);
- }
- else {
- asio2::error_code ec;
- session_->socket().close(ec);
- }
- }
- break;
- }
- return std::pair(begin, false);
- }
- /**
- * @brief 初始化
- * @param session
- */
- void init(std::shared_ptr<asio2::tcp_session>& session) {
- session_ = session;
- }
- private:
- /**
- * @brief 客户端
- */
- std::shared_ptr<_Type> session_;
- };
- }
- }
- template<> struct asio::is_match_condition<robotics::v3::match_role<asio2::tcp_client>> : public std::true_type {};
- template<> struct asio::is_match_condition<robotics::v3::match_role<asio2::tcp_session>> : public std::true_type {};
- namespace robotics {
- namespace v3 {
- /**
- * @brief 消息类型
- */
- enum class nexus_net_msg_type_enum {
- /**
- * @brief 身份认证
- */
- AUTHENTICATE = 1,
- /**
- * @brief 身份认证响应
- */
- REPAUTHENTICATE,
- /**
- * @brief 订阅
- */
- SUBSCRIBE,
- /**
- * @brief 订阅响应
- */
- REPSUBSCRIBE,
- /**
- * @brief 删除订阅
- */
- REMOVESUBSCRIBE,
- /**
- * @brief 发布
- */
- PUBLISHER,
- /**
- * @brief 发布响应
- */
- REPPUBLISHER,
- /**
- * @brief 心跳
- */
- HEARTBEAT,
- /**
- * @brief 详情
- */
- DETAILS,
- /**
- * @brief 详情响应
- */
- REPDETAILS
- };
- /**
- * @brief 消息
- */
- class nexus_net_message {
- public:
- /**
- * @brief 构造
- */
- nexus_net_message() {}
- /**
- * @brief 构造
- * @param sv
- */
- nexus_net_message(std::string_view sv) {
- set_data(sv);
- }
- /**
- * @brief 构造
- * @param data
- */
- nexus_net_message(std::vector<std::uint8_t>const& data) {
- set_data(data);
- }
- /**
- * @brief 设置
- * @param sv
- */
- void set_data(std::string_view sv) {
- set_data(std::vector<std::uint8_t>(sv.begin(), sv.end()));
- }
- /**
- * @brief 设置
- * @param data
- */
- void set_data(std::vector<std::uint8_t>const& data) {
- //消息类型
- msg_type = data[5];
- //消息ID
- size_t size = data[6];
- size_t begin = 7;
- size_t end = begin + size;
- msg_id.assign(data.begin() + begin, data.begin() + end);
- //路由
- size = data[end];
- begin = end + 1;
- end = begin + size;
- route.assign(data.begin() + begin, data.begin() + end);
- //路由列表
- size = data[end] << 8;
- size += data[++end];
- begin = end + 1;
- end = begin + size;
- if (size > 0) {
- std::string str_routes(data.begin() + begin, data.begin() + end);
- routes = v3::utils::split(str_routes, 0x0d);
- }
- //参数
- size = (data[end] << 24);
- size += (data[++end] << 16);
- size += (data[++end] << 8);
- size += data[++end];
- begin = end + 1;
- end = begin + size;
- args.add(std::vector<std::uint8_t>(data.begin() + begin, data.begin() + end));
- }
- /**
- * @brief 获取
- * @return
- */
- std::vector<std::uint8_t> get_data() {
- std::vector<std::uint8_t> result;
- //协议头
- result.push_back(0x02);
- //字节数量
- result.push_back(0x00);
- result.push_back(0x00);
- result.push_back(0x00);
- result.push_back(0x00);
- //消息类型
- result.push_back(msg_type);
- //消息ID
- result.push_back(msg_id.size());
- result.insert(result.end(), msg_id.begin(), msg_id.end());
- //路由
- result.push_back(route.size());
- result.insert(result.end(), route.begin(), route.end());
- //路由列表
- std::vector<std::uint8_t> vec_routes;
- for (size_t i = 0; i < routes.size(); ++i) {
- if (i + 1 == routes.size()) {
- vec_routes.insert(vec_routes.end(), routes[i].begin(), routes[i].end());
- }
- else {
- vec_routes.insert(vec_routes.end(), routes[i].begin(), routes[i].end());
- vec_routes.push_back(0x0d);
- }
- }
- result.push_back(vec_routes.size() >> 8);
- result.push_back(vec_routes.size() >> 0);
- result.insert(result.end(), vec_routes.begin(), vec_routes.end());
- //参数
- std::vector<std::uint8_t> vec_args = args.data();
- result.push_back(vec_args.size() >> 24);
- result.push_back(vec_args.size() >> 16);
- result.push_back(vec_args.size() >> 8);
- result.push_back(vec_args.size() >> 0);
- result.insert(result.end(), vec_args.begin(), vec_args.end());
- //协议尾
- result.push_back(0x03);
- result[1] = (result.size() >> 24);
- result[2] = (result.size() >> 16);
- result[3] = (result.size() >> 8);
- result[4] = (result.size() >> 0);
- return result;
- }
- public:
- /**
- * @brief 消息类型
- */
- std::uint8_t msg_type = 0;
- /**
- * @brief 消息ID
- */
- std::string msg_id;
- /**
- * @brief 路由
- */
- std::string route;
- /**
- * @brief 路由列表
- */
- std::vector<std::string> routes;
- /**
- * @brief 参数
- */
- v3::archive::stream args;
- };
- /**
- * @brief 日志类型
- */
- struct nexus_net_logger_config_info {
- /**
- * @brief 日志类型
- */
- int type = 0;
- /**
- * @brief 启用
- */
- bool enable = false;
- /**
- * @brief 备注
- */
- std::string remarks;
- };
- /**
- * @brief 父节点配置
- */
- struct nexus_net_client_config_info {
- /**
- * @brief IP地址
- */
- std::string ip;
- /**
- * @brief 端口
- */
- int port = 0;
- /**
- * @brief 启用
- */
- bool enable = false;
- /**
- * @brief 备注
- */
- std::string remarks;
- };
- /**
- * @brief 订阅配置
- */
- struct nexus_net_subscribe_config_info {
- /**
- * @brief KEY
- */
- std::string key;
- /**
- * @brief 路由
- */
- std::string route;
- /**
- * @brief 启用
- */
- bool enable = false;
- /**
- * @brief 备注
- */
- std::string remarks;
- };
- /**
- * @brief 发布配置
- */
- struct nexus_net_publisher_config_info {
- /**
- * @brief KEY
- */
- std::string key;
- /**
- * @brief 节点
- */
- std::string route;
- /**
- * @brief 启用
- */
- bool enable = false;
- /**
- * @brief 备注
- */
- std::string remarks;
- };
- /**
- * @brief 用户信息
- */
- struct nexus_net_user_config_info {
- /**
- * @brief 名称
- */
- std::string name;
- /**
- * @brief 代码
- */
- std::string code;
- /**
- * @brief 密码
- */
- std::string password;
- /**
- * @brief 启用
- */
- bool enable = false;
- /**
- * @brief 备注
- */
- std::string remarks;
- };
- /**
- * @brief 配置
- */
- struct nexus_net_config_info {
- /**
- * @brief 身份
- */
- nexus_net_user_config_info user;
- /**
- * @brief 日志
- */
- std::vector<nexus_net_logger_config_info> loggers;
- /**
- * @brief 客户端
- */
- nexus_net_client_config_info client;
- /**
- * @brief 订阅
- */
- std::map<std::string, nexus_net_subscribe_config_info> subscribes;
- /**
- * @brief 发布
- */
- std::map<std::string, nexus_net_publisher_config_info> publishers;
- };
- /**
- * @brief nexusnet配置
- */
- class nexus_net_config {
- public:
- static nexus_net_config_info& read() {
- static std::shared_ptr<nexus_net_config_info> result;
- static std::mutex mutex;
- std::lock_guard<std::mutex> locker(mutex);
- if (result)
- return *result;
- result.reset(new nexus_net_config_info());
- boost::property_tree::ptree root;
- read_xml(v3::config::read<std::string>("NEXUS_NET", "PATH", "./config/nexus_net_config.xml"), root);
- //身份信息
- for (auto& it : root.get_child("root.user")) {
- if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
- continue;
- result->user.name = it.second.get<std::string>("<xmlattr>.name", "admin");
- result->user.code = it.second.get<std::string>("<xmlattr>.code", "admin");
- result->user.password = it.second.get<std::string>("<xmlattr>.password", "123456");
- result->user.enable = it.second.get<bool>("<xmlattr>.enable", true);
- result->user.remarks = it.second.get<std::string>("<xmlattr>.remarks", "身份信息");
- break;
- }
- //日志
- for (auto& it : root.get_child("root.logger")) {
- if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
- continue;
- nexus_net_logger_config_info item;
- item.type = it.second.get<int>("<xmlattr>.type", 0);
- item.enable = it.second.get<bool>("<xmlattr>.enable", true);
- item.remarks = it.second.get<std::string>("<xmlattr>.remarks", "日志");
- result->loggers.push_back(item);
- }
- //客户端
- for (auto& it : root.get_child("root.client")) {
- if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
- continue;
- result->client.ip = it.second.get<std::string>("<xmlattr>.ip", "127.0.0.1");
- result->client.port = it.second.get<int>("<xmlattr>.port", 20001);
- result->client.enable = it.second.get<bool>("<xmlattr>.enable", false);
- result->client.remarks = it.second.get<std::string>("<xmlattr>.remarks", "客户");
- break;
- }
- //订阅
- for (auto& it : root.get_child("root.subscribe")) {
- if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
- continue;
- nexus_net_subscribe_config_info item;
- item.key = it.second.get<std::string>("<xmlattr>.key", "");
- item.route = it.second.get<std::string>("<xmlattr>.route", "");
- item.enable = it.second.get<bool>("<xmlattr>.enable", false);
- item.remarks = it.second.get<std::string>("<xmlattr>.remarks", "白名单");
- result->subscribes[item.key] = item;
- }
- //发布
- for (auto& it : root.get_child("root.publisher")) {
- if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
- continue;
- nexus_net_publisher_config_info item;
- item.key = it.second.get<std::string>("<xmlattr>.key", "");
- item.route = it.second.get<std::string>("<xmlattr>.route", "");
- item.enable = it.second.get<bool>("<xmlattr>.enable", false);
- item.remarks = it.second.get<std::string>("<xmlattr>.remarks", "白名单");
- result->publishers[item.key] = item;
- }
- return *result;
- }
- static void logger(bool success, bool is_out, nexus_net_message value) {
- static std::map<std::uint8_t, std::string> g_msg_type = {
- {(std::uint8_t)nexus_net_msg_type_enum::AUTHENTICATE, "身份认证"},
- {(std::uint8_t)nexus_net_msg_type_enum::REPAUTHENTICATE, "身份认证响应"},
- {(std::uint8_t)nexus_net_msg_type_enum::SUBSCRIBE, "订阅"},
- {(std::uint8_t)nexus_net_msg_type_enum::REPSUBSCRIBE, "订阅响应"},
- {(std::uint8_t)nexus_net_msg_type_enum::REMOVESUBSCRIBE, "删除订阅"},
- {(std::uint8_t)nexus_net_msg_type_enum::PUBLISHER, "发布"},
- {(std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER, "发布响应"},
- {(std::uint8_t)nexus_net_msg_type_enum::HEARTBEAT, "心跳"},
- {(std::uint8_t)nexus_net_msg_type_enum::DETAILS, "详情"},
- {(std::uint8_t)nexus_net_msg_type_enum::REPDETAILS, "详情响应"}
- };
- auto config = read();
- auto it = std::find_if(config.loggers.begin(), config.loggers.end(), [=](auto& it) {
- return it.type == value.msg_type && it.enable; });
- if (it != config.loggers.end()) {
- if (success) {
- LOG_INFO << (is_out ? "接收成功" : "发送成功") << ",消息类型:" << g_msg_type[value.msg_type] << " 消息ID:" << value.msg_id << " 事件:" << value.route << " 路由:" << value.routes;
- }
- else {
- LOG_ERROR << (is_out ? "接收失败" : "发送失败") << ",消息类型:" << g_msg_type[value.msg_type] << " 消息ID:" << value.msg_id << " 事件:" << value.route << " 路由:" << value.routes;
- }
- }
- }
- };
- /**
- * @brief 客户端
- */
- class nexus_net_tcp_client {
- public:
- nexus_net_tcp_client() :
- client_(new asio2::tcp_client()) {
- client_->bind_connect(&nexus_net_tcp_client::on_connect, this);
- client_->bind_disconnect(&nexus_net_tcp_client::on_disconnect, this);
- client_->bind_recv(&nexus_net_tcp_client::on_recv, this);
- }
- void start(std::string const& ip, int port) {
- client_->async_start(ip, port, match_role<asio2::tcp_client>(client_));
- }
- void stop() {
- client_->stop();
- }
- void send(nexus_net_message& data, std::function<void()> const& fn) {
- client_->async_send(data.get_data(), fn);
- }
- v3::delegates<bool, std::string const&> connect_event;
- v3::delegates<> disconnect_event;
- v3::delegates<nexus_net_message const&> recv_event;
- private:
- void on_connect() {
- connect_event(!asio2::get_last_error(), asio2::get_last_error_msg());
- }
- void on_disconnect() {
- disconnect_event();
- }
- void on_recv(std::string_view sv) {
- recv_event(nexus_net_message(sv));
- }
- private:
- std::shared_ptr<asio2::tcp_client> client_;
- };
- /**
- * @brief 客户端
- */
- class nexus_net_client : public v3::thread_pool<nexus_net_client> {
- private:
- class message_manage_gc;
- /**
- * @brief 消息管理
- */
- class message_manage {
- private:
- friend class message_manage_gc;
- typedef std::map<std::string, std::pair<std::shared_ptr<v3::event_wait_for>, nexus_net_message>> message_list_type;
- public:
- /**
- * @brief 添加
- * @param msg_id
- * @return
- */
- std::shared_ptr<v3::event_wait_for> add(std::string const& msg_id) {
- std::lock_guard<std::mutex> locker(mutex_);
- msg_list_[msg_id].first.reset(new v3::event_wait_for);
- return msg_list_[msg_id].first;
- }
- /**
- * @brief 获取数据
- * @param msg_id
- * @return
- */
- nexus_net_message& get_data(std::string const& msg_id) {
- std::lock_guard<std::mutex> locker(mutex_);
- return msg_list_[msg_id].second;
- }
- /**
- * @brief 设置数据
- * @param msg_id
- * @param data
- */
- void set_data(std::string const& msg_id, nexus_net_message const& data) {
- std::lock_guard<std::mutex> locker(mutex_);
- msg_list_[msg_id].second = data;
- msg_list_[msg_id].first->notify();
- }
- /**
- * @brief 查找数据
- * @param msg_id
- * @return
- */
- bool find(std::string const& msg_id) {
- std::lock_guard<std::mutex> locker(mutex_);
- return msg_list_.find(msg_id) != msg_list_.end();
- }
- /**
- * @brief 移出
- * @param msg_id
- */
- void remove(std::string const& msg_id) {
- std::lock_guard<std::mutex> locker(mutex_);
- msg_list_.erase(msg_id);
- }
- private:
- message_list_type msg_list_;
- std::mutex mutex_;
- };
- /**
- * @brief 消息管理GC
- */
- class message_manage_gc {
- public:
- /**
- * @brief 构造
- * @param data
- * @param msg_id
- */
- message_manage_gc(message_manage& data, std::string& msg_id) :
- message_list_(data),
- msg_id_(msg_id) {
- wait_ = message_list_.add(msg_id_);
- }
- /**
- * @brief 析构
- */
- ~message_manage_gc() {
- message_list_.remove(msg_id_);
- }
- //添加
- bool loop(int millisecond) {
- return wait_->wait(millisecond);
- }
- /**
- * @brief 设置数据
- * @return
- */
- nexus_net_message& get_data() {
- return message_list_.get_data(msg_id_);
- }
- private:
- std::string& msg_id_;
- message_manage& message_list_;
- std::shared_ptr<v3::event_wait_for> wait_;
- };
- public:
- /**
- * @brief 单例
- * @return
- */
- static nexus_net_client* instance() {
- static nexus_net_client g_nexus_net_client;
- return &g_nexus_net_client;
- }
- /**
- * @brief 析构
- */
- ~nexus_net_client() {
- stop();
- }
- /**
- * @brief 启动
- */
- void start() {
- auto config = nexus_net_config::read();
- if (config.client.enable) {
- client_.start(config.client.ip, config.client.port);
- }
- }
- /**
- * @brief 停止
- */
- void stop() {
- client_.stop();
- }
- /**
- * @brief 呼叫者消息
- * @return
- */
- static std::optional<nexus_net_message> caller_message() {
- nexus_net_client* client = instance();
- std::lock_guard<std::mutex> locker(client->caller_mutex_);
- if (client->caller_message_list_.contains(std::this_thread::get_id())) {
- return client->caller_message_list_[std::this_thread::get_id()];
- }
- else {
- return std::nullopt;
- }
- }
- /**
- * @brief 添加订阅
- * @tparam _Fn
- * @tparam _This
- * @param key
- * @param fn
- * @param ths
- * @return
- */
- template<typename _Fn, typename _This>
- bool subscribe(std::string const& key, _Fn&& fn, _This&& ths) {
- if (nexus_net_config::read().subscribes.contains(key)) {
- functions_.bind(nexus_net_config::read().subscribes[key].route, std::forward<_Fn>(fn), std::forward<_This>(ths));
- nexus_net_message msg;
- msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::SUBSCRIBE;
- msg.route = nexus_net_config::read().subscribes[key].route;
- subscribes_.push_back(msg);
- return true;
- }
- return false;
- }
- /**
- * @brief 自定义订阅
- * @tparam _Fn
- * @tparam _This
- * @param route
- * @param fn
- * @param ths
- */
- template<typename _Fn, typename _This>
- void custom_subscribe(std::string const& route, _Fn&& fn, _This&& ths) {
- functions_.bind(route, std::forward<_Fn>(fn), std::forward<_This>(ths));
- nexus_net_message msg;
- msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::SUBSCRIBE;
- msg.route = route;
- subscribes_.push_back(msg);
- }
- /**
- * @brief 发布
- * @tparam _Ret
- * @tparam ..._Args
- * @param key
- * @param ...args
- * @return
- */
- template<typename _Ret, typename ..._Args>
- _Ret publisher(std::string const& key, _Args&&...args) {
- return publisher<_Ret>(2000, key, std::forward<_Args>(args)...);
- }
- /**
- * @brief 发布
- * @tparam _Ret
- * @tparam ..._Args
- * @param timeout
- * @param route
- * @param ...args
- * @return
- */
- template<typename _Ret, typename ..._Args>
- _Ret publisher(int timeout, std::string const& key, _Args&&...args) {
- auto& config = nexus_net_config::read();
- if (!config.publishers.contains(key))
- throw std::runtime_error("key不存在!");
- nexus_net_message msg;
- msg.route = config.publishers[key].route;
- msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::PUBLISHER;
- msg.args << std::make_tuple(std::forward<_Args>(args)...);
- if constexpr (!std::is_same<void, _Ret>::value) {
- msg.msg_id = v3::utils::uuid();
- message_manage_gc gc(message_manage_, msg.msg_id);
- send(msg);
- if (!gc.loop(timeout))
- throw std::runtime_error("请求超时");
- _Ret result;
- gc.get_data().args >> result;
- return result;
- }
- else {
- send(msg);
- }
- }
- /**
- * @brief 自定义发布
- * @tparam _Ret
- * @tparam ..._Args
- * @param key 配置KEY
- * @param format 值
- * @param ...args 参数列表
- * @return
- */
- template<typename _Ret, typename ..._Args>
- _Ret custom_publisher(std::string const& key, std::string const& format, _Args&&...args) {
- return custom_publisher<_Ret>(2000, key, format, std::forward<_Args>(args)...);
- }
- /**
- * @brief 自定义发布
- * @tparam _Ret
- * @tparam ..._Args
- * @param timeout 超时时间
- * @param key 配置KEY
- * @param format 值
- * @param ...args 参数列表
- * @return
- */
- template<typename _Ret, typename ..._Args>
- _Ret custom_publisher(int timeout, std::string const& key, std::string const& format, _Args&&...args) {
- auto& config = nexus_net_config::read();
- if (!config.publishers.contains(key))
- throw std::runtime_error("key不存在!");
- nexus_net_message msg;
- msg.route = v3::utils::format(config.publishers[key].route, format);
- msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::PUBLISHER;
- msg.args << std::make_tuple(std::forward<_Args>(args)...);
- if constexpr (!std::is_same<void, _Ret>::value) {
- msg.msg_id = v3::utils::uuid();
- message_manage_gc gc(message_manage_, msg.msg_id);
- send(msg);
- if (!gc.loop(timeout))
- throw std::runtime_error("请求超时");
- _Ret result;
- gc.get_data().args >> result;
- return result;
- }
- else {
- send(msg);
- }
- }
- /**
- * @brief 授权状态
- */
- v3::delegates<bool, std::string const&, std::string const&> authorized_event;
- /**
- * @brief 连接断开
- */
- v3::delegates<> disconnect_event;
- /**
- * @brief 连接状态
- */
- v3::delegates<bool> connect_event;
- private:
- /**
- * @brief 构造
- */
- nexus_net_client() :
- v3::thread_pool<nexus_net_client>(5) {
- client_.connect_event.bind(&nexus_net_client::on_connect, this);
- client_.disconnect_event.bind(&nexus_net_client::on_disconnect, this);
- client_.recv_event.bind(&nexus_net_client::do_work_nexus_net_message, this);
- timer_.timeout_event.bind(&nexus_net_client::on_timeout, this);
- timer_.start(10000);
- }
- /**
- * @brief 发送
- * @param data
- */
- void send(nexus_net_message data) {
- if (data.msg_type != (std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER) {
- auto config = nexus_net_config::read();
- data.routes.push_back(config.user.code);
- if (data.msg_type == (std::uint8_t)nexus_net_msg_type_enum::SUBSCRIBE) {
- data.route = fmt::format("{}/{}/{}", parent_code_, config.user.code, data.route);
- }
- }
- else if (data.msg_type == (std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER && !data.routes.empty()) {
- data.routes.pop_back();
- }
- client_.send(data, [=]() {
- nexus_net_config::logger(!asio2::get_last_error(), false, data); });
- }
- /**
- * @brief 身份认证
- */
- void authenticate() {
- auto config = nexus_net_config::read();
- if (config.user.enable) {
- nexus_net_message data;
- data.msg_type = (std::uint8_t)nexus_net_msg_type_enum::AUTHENTICATE;
- data.args << config.user.code << config.user.password << config.user.name << 1;
- send(data);
- }
- }
- /**
- * @brief 心跳
- */
- void heartbeat() {
- if (!is_heartbeat_)
- return;
- nexus_net_message msg;
- msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::HEARTBEAT;
- send(msg);
- }
- /**
- * @brief 订阅
- */
- void subscribe() {
- for (auto& it : subscribes_) {
- send(it);
- }
- }
- private:
- void on_connect(bool success, std::string const& msg) {
- if (success) {
- connect_event(true);
- authenticate();
- }
- else {
- connect_event(false);
- }
- }
- void on_disconnect() {
- disconnect_event();
- is_heartbeat_ = false;
- }
- void do_work_nexus_net_message(nexus_net_message data) {
- nexus_net_config::logger(true, true, data);
-
- try {
- switch ((nexus_net_msg_type_enum)data.msg_type) {
- case nexus_net_msg_type_enum::REPAUTHENTICATE: on_authenticate(data); break;
- case nexus_net_msg_type_enum::PUBLISHER: on_publisher(data); break;
- case nexus_net_msg_type_enum::REPPUBLISHER: on_reppublisher(data); break;
- case nexus_net_msg_type_enum::REPSUBSCRIBE: on_repsubscribe(data); break;
- }
- }
- catch (std::exception const& ec) {
- LOG_ERROR << ec;
- }
- }
- void on_timeout() {
- heartbeat();
- }
- private:
- void on_repsubscribe(nexus_net_message data) {
- bool success = false;
- try {
- data.args >> success;
- }
- catch (std::exception const& ec) {
- LOG_ERROR << ec;
- }
- LOG_INFO << "订阅状态:" << (success ? "成功" : "失败");
- }
- /**
- * @brief 授权响应
- * @param data
- */
- void on_authenticate(nexus_net_message data) {
- std::string node_name;
- std::string message;
- data.args >> is_heartbeat_ >> parent_code_ >> parent_name_ >> node_name >> message;
- if (is_heartbeat_) {
- authorized_event(true, parent_code_, parent_name_);
- subscribe();
- }
- else {
- authorized_event(false, "", "");
- }
- }
- /**
- * @brief 发布
- * @param data
- */
- void on_publisher(nexus_net_message data) {
- REGISTER_ASYNC_FUNC(on_publisher, data);
- if (3 != std::count(data.route.begin(), data.route.end(), '/')) {
- return;
- }
- std::string key = data.route.substr(data.route.find('/', data.route.find('/') + 1) + 1);
- {
- std::lock_guard<std::mutex> locker(caller_mutex_);
- caller_message_list_[std::this_thread::get_id()] = data;
- }
- data.args = functions_.invoke(key, data.args);
- {
- std::lock_guard<std::mutex> locker(caller_mutex_);
- caller_message_list_.erase(std::this_thread::get_id());
- }
- if (!data.args.empty()) {
- data.msg_type = (std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER;
- send(data);
- }
- }
- /**
- * @brief 发布响应
- * @param data
- */
- void on_reppublisher(nexus_net_message data) {
- if (message_manage_.find(data.msg_id)) {
- message_manage_.set_data(data.msg_id, data);
- }
- }
- private:
- v3::timer timer_;
- std::string parent_code_;
- std::string parent_name_;
- bool is_heartbeat_ = false;
- nexus_net_tcp_client client_;
- std::mutex caller_mutex_;
- std::map<std::thread::id, nexus_net_message> caller_message_list_;
- message_manage message_manage_;
- std::vector<nexus_net_message> subscribes_;
- v3::archive::function_manage<std::string> functions_;
- };
- }
- }
|