nexus_net_client.hpp 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988
  1. #pragma once
  2. //stl
  3. #include <iostream>
  4. #include <optional>
  5. //asio2
  6. #include <asio2/asio2.hpp>
  7. //boost
  8. #include <boost/property_tree/ptree.hpp>
  9. #include <boost/property_tree/xml_parser.hpp>
  10. //robotics
  11. #include "utils.hpp"
  12. #include "function_traits.hpp"
  13. #include "logger.hpp"
  14. #include "serialization.hpp"
  15. #include "delegates.hpp"
  16. #include "thread_pool.hpp"
  17. #include "linq.hpp"
  18. #include "config.hpp"
  19. #include "timer.hpp"
  20. namespace robotics {
  21. namespace v3 {
  22. template<typename _Type>
  23. class match_role {
  24. public:
  25. /**
  26. * @brief 默认构造
  27. */
  28. explicit match_role() {}
  29. /**
  30. * @brief 构造
  31. * @param session
  32. */
  33. explicit match_role(std::shared_ptr<_Type> const& session) :
  34. session_(session) {
  35. }
  36. /**
  37. * @brief 解包
  38. * @tparam Iterator
  39. * @param begin
  40. * @param end
  41. * @return
  42. */
  43. template <typename Iterator>
  44. std::pair<Iterator, bool> operator()(Iterator begin, Iterator end) const {
  45. Iterator p = begin;
  46. while (p != end) {
  47. //判断协议头
  48. if (*p != 0x02) {
  49. asio2::error_code ec;
  50. session_->socket().close(ec);
  51. break;
  52. }
  53. //获取数据长度
  54. size_t size = size_t(std::uint8_t(*(++p))) << 24;
  55. size += size_t(std::uint8_t(*(++p))) << 16;
  56. size += size_t(std::uint8_t(*(++p))) << 8;
  57. size += size_t(std::uint8_t(*(++p))) << 0;
  58. if (end - begin >= size) {
  59. if (*(begin + size - 1) == 0x03) {
  60. return std::pair(begin + size, true);
  61. }
  62. else {
  63. asio2::error_code ec;
  64. session_->socket().close(ec);
  65. }
  66. }
  67. break;
  68. }
  69. return std::pair(begin, false);
  70. }
  71. /**
  72. * @brief 初始化
  73. * @param session
  74. */
  75. void init(std::shared_ptr<asio2::tcp_session>& session) {
  76. session_ = session;
  77. }
  78. private:
  79. /**
  80. * @brief 客户端
  81. */
  82. std::shared_ptr<_Type> session_;
  83. };
  84. }
  85. }
  86. template<> struct asio::is_match_condition<robotics::v3::match_role<asio2::tcp_client>> : public std::true_type {};
  87. template<> struct asio::is_match_condition<robotics::v3::match_role<asio2::tcp_session>> : public std::true_type {};
  88. namespace robotics {
  89. namespace v3 {
  90. /**
  91. * @brief 消息类型
  92. */
  93. enum class nexus_net_msg_type_enum {
  94. /**
  95. * @brief 身份认证
  96. */
  97. AUTHENTICATE = 1,
  98. /**
  99. * @brief 身份认证响应
  100. */
  101. REPAUTHENTICATE,
  102. /**
  103. * @brief 订阅
  104. */
  105. SUBSCRIBE,
  106. /**
  107. * @brief 订阅响应
  108. */
  109. REPSUBSCRIBE,
  110. /**
  111. * @brief 删除订阅
  112. */
  113. REMOVESUBSCRIBE,
  114. /**
  115. * @brief 发布
  116. */
  117. PUBLISHER,
  118. /**
  119. * @brief 发布响应
  120. */
  121. REPPUBLISHER,
  122. /**
  123. * @brief 心跳
  124. */
  125. HEARTBEAT,
  126. /**
  127. * @brief 详情
  128. */
  129. DETAILS,
  130. /**
  131. * @brief 详情响应
  132. */
  133. REPDETAILS
  134. };
  135. /**
  136. * @brief 消息
  137. */
  138. class nexus_net_message {
  139. public:
  140. /**
  141. * @brief 构造
  142. */
  143. nexus_net_message() {}
  144. /**
  145. * @brief 构造
  146. * @param sv
  147. */
  148. nexus_net_message(std::string_view sv) {
  149. set_data(sv);
  150. }
  151. /**
  152. * @brief 构造
  153. * @param data
  154. */
  155. nexus_net_message(std::vector<std::uint8_t>const& data) {
  156. set_data(data);
  157. }
  158. /**
  159. * @brief 设置
  160. * @param sv
  161. */
  162. void set_data(std::string_view sv) {
  163. set_data(std::vector<std::uint8_t>(sv.begin(), sv.end()));
  164. }
  165. /**
  166. * @brief 设置
  167. * @param data
  168. */
  169. void set_data(std::vector<std::uint8_t>const& data) {
  170. //消息类型
  171. msg_type = data[5];
  172. //消息ID
  173. size_t size = data[6];
  174. size_t begin = 7;
  175. size_t end = begin + size;
  176. msg_id.assign(data.begin() + begin, data.begin() + end);
  177. //路由
  178. size = data[end];
  179. begin = end + 1;
  180. end = begin + size;
  181. route.assign(data.begin() + begin, data.begin() + end);
  182. //路由列表
  183. size = data[end] << 8;
  184. size += data[++end];
  185. begin = end + 1;
  186. end = begin + size;
  187. if (size > 0) {
  188. std::string str_routes(data.begin() + begin, data.begin() + end);
  189. routes = v3::utils::split(str_routes, 0x0d);
  190. }
  191. //参数
  192. size = (data[end] << 24);
  193. size += (data[++end] << 16);
  194. size += (data[++end] << 8);
  195. size += data[++end];
  196. begin = end + 1;
  197. end = begin + size;
  198. args.add(std::vector<std::uint8_t>(data.begin() + begin, data.begin() + end));
  199. }
  200. /**
  201. * @brief 获取
  202. * @return
  203. */
  204. std::vector<std::uint8_t> get_data() {
  205. std::vector<std::uint8_t> result;
  206. //协议头
  207. result.push_back(0x02);
  208. //字节数量
  209. result.push_back(0x00);
  210. result.push_back(0x00);
  211. result.push_back(0x00);
  212. result.push_back(0x00);
  213. //消息类型
  214. result.push_back(msg_type);
  215. //消息ID
  216. result.push_back(msg_id.size());
  217. result.insert(result.end(), msg_id.begin(), msg_id.end());
  218. //路由
  219. result.push_back(route.size());
  220. result.insert(result.end(), route.begin(), route.end());
  221. //路由列表
  222. std::vector<std::uint8_t> vec_routes;
  223. for (size_t i = 0; i < routes.size(); ++i) {
  224. if (i + 1 == routes.size()) {
  225. vec_routes.insert(vec_routes.end(), routes[i].begin(), routes[i].end());
  226. }
  227. else {
  228. vec_routes.insert(vec_routes.end(), routes[i].begin(), routes[i].end());
  229. vec_routes.push_back(0x0d);
  230. }
  231. }
  232. result.push_back(vec_routes.size() >> 8);
  233. result.push_back(vec_routes.size() >> 0);
  234. result.insert(result.end(), vec_routes.begin(), vec_routes.end());
  235. //参数
  236. std::vector<std::uint8_t> vec_args = args.data();
  237. result.push_back(vec_args.size() >> 24);
  238. result.push_back(vec_args.size() >> 16);
  239. result.push_back(vec_args.size() >> 8);
  240. result.push_back(vec_args.size() >> 0);
  241. result.insert(result.end(), vec_args.begin(), vec_args.end());
  242. //协议尾
  243. result.push_back(0x03);
  244. result[1] = (result.size() >> 24);
  245. result[2] = (result.size() >> 16);
  246. result[3] = (result.size() >> 8);
  247. result[4] = (result.size() >> 0);
  248. return result;
  249. }
  250. public:
  251. /**
  252. * @brief 消息类型
  253. */
  254. std::uint8_t msg_type = 0;
  255. /**
  256. * @brief 消息ID
  257. */
  258. std::string msg_id;
  259. /**
  260. * @brief 路由
  261. */
  262. std::string route;
  263. /**
  264. * @brief 路由列表
  265. */
  266. std::vector<std::string> routes;
  267. /**
  268. * @brief 参数
  269. */
  270. v3::archive::stream args;
  271. };
  272. /**
  273. * @brief 日志类型
  274. */
  275. struct nexus_net_logger_config_info {
  276. /**
  277. * @brief 日志类型
  278. */
  279. int type = 0;
  280. /**
  281. * @brief 启用
  282. */
  283. bool enable = false;
  284. /**
  285. * @brief 备注
  286. */
  287. std::string remarks;
  288. };
  289. /**
  290. * @brief 父节点配置
  291. */
  292. struct nexus_net_client_config_info {
  293. /**
  294. * @brief IP地址
  295. */
  296. std::string ip;
  297. /**
  298. * @brief 端口
  299. */
  300. int port = 0;
  301. /**
  302. * @brief 启用
  303. */
  304. bool enable = false;
  305. /**
  306. * @brief 备注
  307. */
  308. std::string remarks;
  309. };
  310. /**
  311. * @brief 订阅配置
  312. */
  313. struct nexus_net_subscribe_config_info {
  314. /**
  315. * @brief KEY
  316. */
  317. std::string key;
  318. /**
  319. * @brief 路由
  320. */
  321. std::string route;
  322. /**
  323. * @brief 启用
  324. */
  325. bool enable = false;
  326. /**
  327. * @brief 备注
  328. */
  329. std::string remarks;
  330. };
  331. /**
  332. * @brief 发布配置
  333. */
  334. struct nexus_net_publisher_config_info {
  335. /**
  336. * @brief KEY
  337. */
  338. std::string key;
  339. /**
  340. * @brief 节点
  341. */
  342. std::string route;
  343. /**
  344. * @brief 启用
  345. */
  346. bool enable = false;
  347. /**
  348. * @brief 备注
  349. */
  350. std::string remarks;
  351. };
  352. /**
  353. * @brief 用户信息
  354. */
  355. struct nexus_net_user_config_info {
  356. /**
  357. * @brief 名称
  358. */
  359. std::string name;
  360. /**
  361. * @brief 代码
  362. */
  363. std::string code;
  364. /**
  365. * @brief 密码
  366. */
  367. std::string password;
  368. /**
  369. * @brief 启用
  370. */
  371. bool enable = false;
  372. /**
  373. * @brief 备注
  374. */
  375. std::string remarks;
  376. };
  377. /**
  378. * @brief 配置
  379. */
  380. struct nexus_net_config_info {
  381. /**
  382. * @brief 身份
  383. */
  384. nexus_net_user_config_info user;
  385. /**
  386. * @brief 日志
  387. */
  388. std::vector<nexus_net_logger_config_info> loggers;
  389. /**
  390. * @brief 客户端
  391. */
  392. nexus_net_client_config_info client;
  393. /**
  394. * @brief 订阅
  395. */
  396. std::map<std::string, nexus_net_subscribe_config_info> subscribes;
  397. /**
  398. * @brief 发布
  399. */
  400. std::map<std::string, nexus_net_publisher_config_info> publishers;
  401. };
  402. /**
  403. * @brief nexusnet配置
  404. */
  405. class nexus_net_config {
  406. public:
  407. static nexus_net_config_info& read() {
  408. static std::shared_ptr<nexus_net_config_info> result;
  409. static std::mutex mutex;
  410. std::lock_guard<std::mutex> locker(mutex);
  411. if (result)
  412. return *result;
  413. result.reset(new nexus_net_config_info());
  414. boost::property_tree::ptree root;
  415. read_xml(v3::config::read<std::string>("NEXUS_NET", "PATH", "./config/nexus_net_config.xml"), root);
  416. //身份信息
  417. for (auto& it : root.get_child("root.user")) {
  418. if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
  419. continue;
  420. result->user.name = it.second.get<std::string>("<xmlattr>.name", "admin");
  421. result->user.code = it.second.get<std::string>("<xmlattr>.code", "admin");
  422. result->user.password = it.second.get<std::string>("<xmlattr>.password", "123456");
  423. result->user.enable = it.second.get<bool>("<xmlattr>.enable", true);
  424. result->user.remarks = it.second.get<std::string>("<xmlattr>.remarks", "身份信息");
  425. break;
  426. }
  427. //日志
  428. for (auto& it : root.get_child("root.logger")) {
  429. if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
  430. continue;
  431. nexus_net_logger_config_info item;
  432. item.type = it.second.get<int>("<xmlattr>.type", 0);
  433. item.enable = it.second.get<bool>("<xmlattr>.enable", true);
  434. item.remarks = it.second.get<std::string>("<xmlattr>.remarks", "日志");
  435. result->loggers.push_back(item);
  436. }
  437. //客户端
  438. for (auto& it : root.get_child("root.client")) {
  439. if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
  440. continue;
  441. result->client.ip = it.second.get<std::string>("<xmlattr>.ip", "127.0.0.1");
  442. result->client.port = it.second.get<int>("<xmlattr>.port", 20001);
  443. result->client.enable = it.second.get<bool>("<xmlattr>.enable", false);
  444. result->client.remarks = it.second.get<std::string>("<xmlattr>.remarks", "客户");
  445. break;
  446. }
  447. //订阅
  448. for (auto& it : root.get_child("root.subscribe")) {
  449. if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
  450. continue;
  451. nexus_net_subscribe_config_info item;
  452. item.key = it.second.get<std::string>("<xmlattr>.key", "");
  453. item.route = it.second.get<std::string>("<xmlattr>.route", "");
  454. item.enable = it.second.get<bool>("<xmlattr>.enable", false);
  455. item.remarks = it.second.get<std::string>("<xmlattr>.remarks", "白名单");
  456. result->subscribes[item.key] = item;
  457. }
  458. //发布
  459. for (auto& it : root.get_child("root.publisher")) {
  460. if (it.first != "item" || !it.second.get<bool>("<xmlattr>.enable", false))
  461. continue;
  462. nexus_net_publisher_config_info item;
  463. item.key = it.second.get<std::string>("<xmlattr>.key", "");
  464. item.route = it.second.get<std::string>("<xmlattr>.route", "");
  465. item.enable = it.second.get<bool>("<xmlattr>.enable", false);
  466. item.remarks = it.second.get<std::string>("<xmlattr>.remarks", "白名单");
  467. result->publishers[item.key] = item;
  468. }
  469. return *result;
  470. }
  471. static void logger(bool success, bool is_out, nexus_net_message value) {
  472. static std::map<std::uint8_t, std::string> g_msg_type = {
  473. {(std::uint8_t)nexus_net_msg_type_enum::AUTHENTICATE, "身份认证"},
  474. {(std::uint8_t)nexus_net_msg_type_enum::REPAUTHENTICATE, "身份认证响应"},
  475. {(std::uint8_t)nexus_net_msg_type_enum::SUBSCRIBE, "订阅"},
  476. {(std::uint8_t)nexus_net_msg_type_enum::REPSUBSCRIBE, "订阅响应"},
  477. {(std::uint8_t)nexus_net_msg_type_enum::REMOVESUBSCRIBE, "删除订阅"},
  478. {(std::uint8_t)nexus_net_msg_type_enum::PUBLISHER, "发布"},
  479. {(std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER, "发布响应"},
  480. {(std::uint8_t)nexus_net_msg_type_enum::HEARTBEAT, "心跳"},
  481. {(std::uint8_t)nexus_net_msg_type_enum::DETAILS, "详情"},
  482. {(std::uint8_t)nexus_net_msg_type_enum::REPDETAILS, "详情响应"}
  483. };
  484. auto config = read();
  485. auto it = std::find_if(config.loggers.begin(), config.loggers.end(), [=](auto& it) {
  486. return it.type == value.msg_type && it.enable; });
  487. if (it != config.loggers.end()) {
  488. if (success) {
  489. LOG_INFO << (is_out ? "接收成功" : "发送成功") << ",消息类型:" << g_msg_type[value.msg_type] << " 消息ID:" << value.msg_id << " 事件:" << value.route << " 路由:" << value.routes;
  490. }
  491. else {
  492. LOG_ERROR << (is_out ? "接收失败" : "发送失败") << ",消息类型:" << g_msg_type[value.msg_type] << " 消息ID:" << value.msg_id << " 事件:" << value.route << " 路由:" << value.routes;
  493. }
  494. }
  495. }
  496. };
  497. /**
  498. * @brief 客户端
  499. */
  500. class nexus_net_tcp_client {
  501. public:
  502. nexus_net_tcp_client() :
  503. client_(new asio2::tcp_client()) {
  504. client_->bind_connect(&nexus_net_tcp_client::on_connect, this);
  505. client_->bind_disconnect(&nexus_net_tcp_client::on_disconnect, this);
  506. client_->bind_recv(&nexus_net_tcp_client::on_recv, this);
  507. }
  508. void start(std::string const& ip, int port) {
  509. client_->async_start(ip, port, match_role<asio2::tcp_client>(client_));
  510. }
  511. void stop() {
  512. client_->stop();
  513. }
  514. void send(nexus_net_message& data, std::function<void()> const& fn) {
  515. client_->async_send(data.get_data(), fn);
  516. }
  517. v3::delegates<bool, std::string const&> connect_event;
  518. v3::delegates<> disconnect_event;
  519. v3::delegates<nexus_net_message const&> recv_event;
  520. private:
  521. void on_connect() {
  522. connect_event(!asio2::get_last_error(), asio2::get_last_error_msg());
  523. }
  524. void on_disconnect() {
  525. disconnect_event();
  526. }
  527. void on_recv(std::string_view sv) {
  528. recv_event(nexus_net_message(sv));
  529. }
  530. private:
  531. std::shared_ptr<asio2::tcp_client> client_;
  532. };
  533. /**
  534. * @brief 客户端
  535. */
  536. class nexus_net_client : public v3::thread_pool<nexus_net_client> {
  537. private:
  538. class message_manage_gc;
  539. /**
  540. * @brief 消息管理
  541. */
  542. class message_manage {
  543. private:
  544. friend class message_manage_gc;
  545. typedef std::map<std::string, std::pair<std::shared_ptr<v3::event_wait_for>, nexus_net_message>> message_list_type;
  546. public:
  547. /**
  548. * @brief 添加
  549. * @param msg_id
  550. * @return
  551. */
  552. std::shared_ptr<v3::event_wait_for> add(std::string const& msg_id) {
  553. std::lock_guard<std::mutex> locker(mutex_);
  554. msg_list_[msg_id].first.reset(new v3::event_wait_for);
  555. return msg_list_[msg_id].first;
  556. }
  557. /**
  558. * @brief 获取数据
  559. * @param msg_id
  560. * @return
  561. */
  562. nexus_net_message& get_data(std::string const& msg_id) {
  563. std::lock_guard<std::mutex> locker(mutex_);
  564. return msg_list_[msg_id].second;
  565. }
  566. /**
  567. * @brief 设置数据
  568. * @param msg_id
  569. * @param data
  570. */
  571. void set_data(std::string const& msg_id, nexus_net_message const& data) {
  572. std::lock_guard<std::mutex> locker(mutex_);
  573. msg_list_[msg_id].second = data;
  574. msg_list_[msg_id].first->notify();
  575. }
  576. /**
  577. * @brief 查找数据
  578. * @param msg_id
  579. * @return
  580. */
  581. bool find(std::string const& msg_id) {
  582. std::lock_guard<std::mutex> locker(mutex_);
  583. return msg_list_.find(msg_id) != msg_list_.end();
  584. }
  585. /**
  586. * @brief 移出
  587. * @param msg_id
  588. */
  589. void remove(std::string const& msg_id) {
  590. std::lock_guard<std::mutex> locker(mutex_);
  591. msg_list_.erase(msg_id);
  592. }
  593. private:
  594. message_list_type msg_list_;
  595. std::mutex mutex_;
  596. };
  597. /**
  598. * @brief 消息管理GC
  599. */
  600. class message_manage_gc {
  601. public:
  602. /**
  603. * @brief 构造
  604. * @param data
  605. * @param msg_id
  606. */
  607. message_manage_gc(message_manage& data, std::string& msg_id) :
  608. message_list_(data),
  609. msg_id_(msg_id) {
  610. wait_ = message_list_.add(msg_id_);
  611. }
  612. /**
  613. * @brief 析构
  614. */
  615. ~message_manage_gc() {
  616. message_list_.remove(msg_id_);
  617. }
  618. //添加
  619. bool loop(int millisecond) {
  620. return wait_->wait(millisecond);
  621. }
  622. /**
  623. * @brief 设置数据
  624. * @return
  625. */
  626. nexus_net_message& get_data() {
  627. return message_list_.get_data(msg_id_);
  628. }
  629. private:
  630. std::string& msg_id_;
  631. message_manage& message_list_;
  632. std::shared_ptr<v3::event_wait_for> wait_;
  633. };
  634. public:
  635. /**
  636. * @brief 单例
  637. * @return
  638. */
  639. static nexus_net_client* instance() {
  640. static nexus_net_client g_nexus_net_client;
  641. return &g_nexus_net_client;
  642. }
  643. /**
  644. * @brief 析构
  645. */
  646. ~nexus_net_client() {
  647. stop();
  648. }
  649. /**
  650. * @brief 启动
  651. */
  652. void start() {
  653. auto config = nexus_net_config::read();
  654. if (config.client.enable) {
  655. client_.start(config.client.ip, config.client.port);
  656. }
  657. }
  658. /**
  659. * @brief 停止
  660. */
  661. void stop() {
  662. client_.stop();
  663. }
  664. /**
  665. * @brief 呼叫者消息
  666. * @return
  667. */
  668. static std::optional<nexus_net_message> caller_message() {
  669. nexus_net_client* client = instance();
  670. std::lock_guard<std::mutex> locker(client->caller_mutex_);
  671. if (client->caller_message_list_.contains(std::this_thread::get_id())) {
  672. return client->caller_message_list_[std::this_thread::get_id()];
  673. }
  674. else {
  675. return std::nullopt;
  676. }
  677. }
  678. /**
  679. * @brief 添加订阅
  680. * @tparam _Fn
  681. * @tparam _This
  682. * @param key
  683. * @param fn
  684. * @param ths
  685. * @return
  686. */
  687. template<typename _Fn, typename _This>
  688. bool subscribe(std::string const& key, _Fn&& fn, _This&& ths) {
  689. if (nexus_net_config::read().subscribes.contains(key)) {
  690. functions_.bind(nexus_net_config::read().subscribes[key].route, std::forward<_Fn>(fn), std::forward<_This>(ths));
  691. nexus_net_message msg;
  692. msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::SUBSCRIBE;
  693. msg.route = nexus_net_config::read().subscribes[key].route;
  694. subscribes_.push_back(msg);
  695. return true;
  696. }
  697. return false;
  698. }
  699. /**
  700. * @brief 自定义订阅
  701. * @tparam _Fn
  702. * @tparam _This
  703. * @param route
  704. * @param fn
  705. * @param ths
  706. */
  707. template<typename _Fn, typename _This>
  708. void custom_subscribe(std::string const& route, _Fn&& fn, _This&& ths) {
  709. functions_.bind(route, std::forward<_Fn>(fn), std::forward<_This>(ths));
  710. nexus_net_message msg;
  711. msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::SUBSCRIBE;
  712. msg.route = route;
  713. subscribes_.push_back(msg);
  714. }
  715. /**
  716. * @brief 发布
  717. * @tparam _Ret
  718. * @tparam ..._Args
  719. * @param key
  720. * @param ...args
  721. * @return
  722. */
  723. template<typename _Ret, typename ..._Args>
  724. _Ret publisher(std::string const& key, _Args&&...args) {
  725. return publisher<_Ret>(2000, key, std::forward<_Args>(args)...);
  726. }
  727. /**
  728. * @brief 发布
  729. * @tparam _Ret
  730. * @tparam ..._Args
  731. * @param timeout
  732. * @param route
  733. * @param ...args
  734. * @return
  735. */
  736. template<typename _Ret, typename ..._Args>
  737. _Ret publisher(int timeout, std::string const& key, _Args&&...args) {
  738. auto& config = nexus_net_config::read();
  739. if (!config.publishers.contains(key))
  740. throw std::runtime_error("key不存在!");
  741. nexus_net_message msg;
  742. msg.route = config.publishers[key].route;
  743. msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::PUBLISHER;
  744. msg.args << std::make_tuple(std::forward<_Args>(args)...);
  745. if constexpr (!std::is_same<void, _Ret>::value) {
  746. msg.msg_id = v3::utils::uuid();
  747. message_manage_gc gc(message_manage_, msg.msg_id);
  748. send(msg);
  749. if (!gc.loop(timeout))
  750. throw std::runtime_error("请求超时");
  751. _Ret result;
  752. gc.get_data().args >> result;
  753. return result;
  754. }
  755. else {
  756. send(msg);
  757. }
  758. }
  759. /**
  760. * @brief 自定义发布
  761. * @tparam _Ret
  762. * @tparam ..._Args
  763. * @param key 配置KEY
  764. * @param format 值
  765. * @param ...args 参数列表
  766. * @return
  767. */
  768. template<typename _Ret, typename ..._Args>
  769. _Ret custom_publisher(std::string const& key, std::string const& format, _Args&&...args) {
  770. return custom_publisher<_Ret>(2000, key, format, std::forward<_Args>(args)...);
  771. }
  772. /**
  773. * @brief 自定义发布
  774. * @tparam _Ret
  775. * @tparam ..._Args
  776. * @param timeout 超时时间
  777. * @param key 配置KEY
  778. * @param format 值
  779. * @param ...args 参数列表
  780. * @return
  781. */
  782. template<typename _Ret, typename ..._Args>
  783. _Ret custom_publisher(int timeout, std::string const& key, std::string const& format, _Args&&...args) {
  784. auto& config = nexus_net_config::read();
  785. if (!config.publishers.contains(key))
  786. throw std::runtime_error("key不存在!");
  787. nexus_net_message msg;
  788. msg.route = v3::utils::format(config.publishers[key].route, format);
  789. msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::PUBLISHER;
  790. msg.args << std::make_tuple(std::forward<_Args>(args)...);
  791. if constexpr (!std::is_same<void, _Ret>::value) {
  792. msg.msg_id = v3::utils::uuid();
  793. message_manage_gc gc(message_manage_, msg.msg_id);
  794. send(msg);
  795. if (!gc.loop(timeout))
  796. throw std::runtime_error("请求超时");
  797. _Ret result;
  798. gc.get_data().args >> result;
  799. return result;
  800. }
  801. else {
  802. send(msg);
  803. }
  804. }
  805. /**
  806. * @brief 授权状态
  807. */
  808. v3::delegates<bool, std::string const&, std::string const&> authorized_event;
  809. /**
  810. * @brief 连接断开
  811. */
  812. v3::delegates<> disconnect_event;
  813. /**
  814. * @brief 连接状态
  815. */
  816. v3::delegates<bool> connect_event;
  817. private:
  818. /**
  819. * @brief 构造
  820. */
  821. nexus_net_client() :
  822. v3::thread_pool<nexus_net_client>(5) {
  823. client_.connect_event.bind(&nexus_net_client::on_connect, this);
  824. client_.disconnect_event.bind(&nexus_net_client::on_disconnect, this);
  825. client_.recv_event.bind(&nexus_net_client::do_work_nexus_net_message, this);
  826. timer_.timeout_event.bind(&nexus_net_client::on_timeout, this);
  827. timer_.start(10000);
  828. }
  829. /**
  830. * @brief 发送
  831. * @param data
  832. */
  833. void send(nexus_net_message data) {
  834. if (data.msg_type != (std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER) {
  835. auto config = nexus_net_config::read();
  836. data.routes.push_back(config.user.code);
  837. if (data.msg_type == (std::uint8_t)nexus_net_msg_type_enum::SUBSCRIBE) {
  838. data.route = fmt::format("{}/{}/{}", parent_code_, config.user.code, data.route);
  839. }
  840. }
  841. else if (data.msg_type == (std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER && !data.routes.empty()) {
  842. data.routes.pop_back();
  843. }
  844. client_.send(data, [=]() {
  845. nexus_net_config::logger(!asio2::get_last_error(), false, data); });
  846. }
  847. /**
  848. * @brief 身份认证
  849. */
  850. void authenticate() {
  851. auto config = nexus_net_config::read();
  852. if (config.user.enable) {
  853. nexus_net_message data;
  854. data.msg_type = (std::uint8_t)nexus_net_msg_type_enum::AUTHENTICATE;
  855. data.args << config.user.code << config.user.password << config.user.name << 1;
  856. send(data);
  857. }
  858. }
  859. /**
  860. * @brief 心跳
  861. */
  862. void heartbeat() {
  863. if (!is_heartbeat_)
  864. return;
  865. nexus_net_message msg;
  866. msg.msg_type = (std::uint8_t)nexus_net_msg_type_enum::HEARTBEAT;
  867. send(msg);
  868. }
  869. /**
  870. * @brief 订阅
  871. */
  872. void subscribe() {
  873. for (auto& it : subscribes_) {
  874. send(it);
  875. }
  876. }
  877. private:
  878. void on_connect(bool success, std::string const& msg) {
  879. if (success) {
  880. connect_event(true);
  881. authenticate();
  882. }
  883. else {
  884. connect_event(false);
  885. }
  886. }
  887. void on_disconnect() {
  888. disconnect_event();
  889. is_heartbeat_ = false;
  890. }
  891. void do_work_nexus_net_message(nexus_net_message data) {
  892. nexus_net_config::logger(true, true, data);
  893. try {
  894. switch ((nexus_net_msg_type_enum)data.msg_type) {
  895. case nexus_net_msg_type_enum::REPAUTHENTICATE: on_authenticate(data); break;
  896. case nexus_net_msg_type_enum::PUBLISHER: on_publisher(data); break;
  897. case nexus_net_msg_type_enum::REPPUBLISHER: on_reppublisher(data); break;
  898. case nexus_net_msg_type_enum::REPSUBSCRIBE: on_repsubscribe(data); break;
  899. }
  900. }
  901. catch (std::exception const& ec) {
  902. LOG_ERROR << ec;
  903. }
  904. }
  905. void on_timeout() {
  906. heartbeat();
  907. }
  908. private:
  909. void on_repsubscribe(nexus_net_message data) {
  910. bool success = false;
  911. try {
  912. data.args >> success;
  913. }
  914. catch (std::exception const& ec) {
  915. LOG_ERROR << ec;
  916. }
  917. LOG_INFO << "订阅状态:" << (success ? "成功" : "失败");
  918. }
  919. /**
  920. * @brief 授权响应
  921. * @param data
  922. */
  923. void on_authenticate(nexus_net_message data) {
  924. std::string node_name;
  925. std::string message;
  926. data.args >> is_heartbeat_ >> parent_code_ >> parent_name_ >> node_name >> message;
  927. if (is_heartbeat_) {
  928. authorized_event(true, parent_code_, parent_name_);
  929. subscribe();
  930. }
  931. else {
  932. authorized_event(false, "", "");
  933. }
  934. }
  935. /**
  936. * @brief 发布
  937. * @param data
  938. */
  939. void on_publisher(nexus_net_message data) {
  940. REGISTER_ASYNC_FUNC(on_publisher, data);
  941. if (3 != std::count(data.route.begin(), data.route.end(), '/')) {
  942. return;
  943. }
  944. std::string key = data.route.substr(data.route.find('/', data.route.find('/') + 1) + 1);
  945. {
  946. std::lock_guard<std::mutex> locker(caller_mutex_);
  947. caller_message_list_[std::this_thread::get_id()] = data;
  948. }
  949. data.args = functions_.invoke(key, data.args);
  950. {
  951. std::lock_guard<std::mutex> locker(caller_mutex_);
  952. caller_message_list_.erase(std::this_thread::get_id());
  953. }
  954. if (!data.args.empty()) {
  955. data.msg_type = (std::uint8_t)nexus_net_msg_type_enum::REPPUBLISHER;
  956. send(data);
  957. }
  958. }
  959. /**
  960. * @brief 发布响应
  961. * @param data
  962. */
  963. void on_reppublisher(nexus_net_message data) {
  964. if (message_manage_.find(data.msg_id)) {
  965. message_manage_.set_data(data.msg_id, data);
  966. }
  967. }
  968. private:
  969. v3::timer timer_;
  970. std::string parent_code_;
  971. std::string parent_name_;
  972. bool is_heartbeat_ = false;
  973. nexus_net_tcp_client client_;
  974. std::mutex caller_mutex_;
  975. std::map<std::thread::id, nexus_net_message> caller_message_list_;
  976. message_manage message_manage_;
  977. std::vector<nexus_net_message> subscribes_;
  978. v3::archive::function_manage<std::string> functions_;
  979. };
  980. }
  981. }