123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599 |
- #ifndef __ASIO2_KCP_STREAM_CP_HPP__
- #define __ASIO2_KCP_STREAM_CP_HPP__
- #if defined(_MSC_VER) && (_MSC_VER >= 1200)
- #pragma once
- #endif
- #include <asio2/base/iopool.hpp>
- #include <asio2/base/define.hpp>
- #include <asio2/base/listener.hpp>
- #include <asio2/base/session_mgr.hpp>
- #include <asio2/base/detail/object.hpp>
- #include <asio2/base/detail/allocator.hpp>
- #include <asio2/base/detail/util.hpp>
- #include <asio2/base/detail/buffer_wrap.hpp>
- #include <asio2/udp/detail/kcp_util.hpp>
- namespace asio2::detail
- {
- ASIO2_CLASS_FORWARD_DECLARE_UDP_BASE;
- ASIO2_CLASS_FORWARD_DECLARE_UDP_CLIENT;
- ASIO2_CLASS_FORWARD_DECLARE_UDP_SERVER;
- ASIO2_CLASS_FORWARD_DECLARE_UDP_SESSION;
-
- template<class derived_t, class args_t>
- class kcp_stream_cp
- {
- friend derived_t;
- ASIO2_CLASS_FRIEND_DECLARE_UDP_BASE;
- ASIO2_CLASS_FRIEND_DECLARE_UDP_CLIENT;
- ASIO2_CLASS_FRIEND_DECLARE_UDP_SERVER;
- ASIO2_CLASS_FRIEND_DECLARE_UDP_SESSION;
- public:
-
- kcp_stream_cp(derived_t& d, asio::io_context& ioc)
- : derive(d), kcp_timer_(ioc)
- {
- }
-
- ~kcp_stream_cp() noexcept
- {
- if (this->kcp_)
- {
- kcp::ikcp_release(this->kcp_);
- this->kcp_ = nullptr;
- }
- }
-
- inline void set_illegal_response_handler(std::function<void(std::string_view)> fn) noexcept
- {
- this->illegal_response_handler_ = std::move(fn);
- }
- protected:
- void _kcp_start(std::shared_ptr<derived_t> this_ptr, std::uint32_t conv)
- {
-
- kcp::ikcpcb* old = this->kcp_;
- struct old_kcp_destructor
- {
- old_kcp_destructor(kcp::ikcpcb* p) : p_(p) {}
- ~old_kcp_destructor()
- {
- if (p_)
- kcp::ikcp_release(p_);
- }
- kcp::ikcpcb* p_ = nullptr;
- } old_kcp_destructor_guard(old);
- ASIO2_ASSERT(conv != 0);
- this->kcp_ = kcp::ikcp_create(conv, (void*)this);
- this->kcp_->output = &kcp_stream_cp<derived_t, args_t>::_kcp_output;
- if (old)
- {
-
- kcp::ikcp_setmtu(this->kcp_, old->mtu);
-
- kcp::ikcp_wndsize(this->kcp_, old->snd_wnd, old->rcv_wnd);
-
- kcp::ikcp_nodelay(this->kcp_, old->nodelay, old->interval, old->fastresend, old->nocwnd);
- }
- else
- {
- kcp::ikcp_nodelay(this->kcp_, 1, 10, 2, 1);
- kcp::ikcp_wndsize(this->kcp_, 128, 512);
- }
-
-
- asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
- [this, this_ptr = std::move(this_ptr)]() mutable
- {
- this->_post_kcp_timer(std::move(this_ptr));
- }));
- }
- void _kcp_stop()
- {
- error_code ec_ignore{};
-
- if (this->send_fin_)
- this->_kcp_send_hdr(kcp::make_kcphdr_fin(0), ec_ignore);
- detail::cancel_timer(this->kcp_timer_);
- }
- inline void _kcp_reset()
- {
- kcp::ikcp_reset(this->kcp_);
- }
- protected:
- inline std::size_t _kcp_send_hdr(kcp::kcphdr hdr, error_code& ec)
- {
- std::string msg = kcp::to_string(hdr);
- std::size_t sent_bytes = 0;
- #if defined(_DEBUG) || defined(DEBUG)
- ASIO2_ASSERT(derive.post_send_counter_.load() == 0);
- derive.post_send_counter_++;
- #endif
- if constexpr (args_t::is_session)
- sent_bytes = derive.stream().send_to(asio::buffer(msg), derive.remote_endpoint_, 0, ec);
- else
- sent_bytes = derive.stream().send(asio::buffer(msg), 0, ec);
- #if defined(_DEBUG) || defined(DEBUG)
- derive.post_send_counter_--;
- #endif
- return sent_bytes;
- }
- inline std::size_t _kcp_send_syn(std::uint32_t seq, error_code& ec)
- {
- kcp::kcphdr syn = kcp::make_kcphdr_syn(derive.kcp_conv_, seq);
- return this->_kcp_send_hdr(syn, ec);
- }
- inline std::size_t _kcp_send_synack(kcp::kcphdr syn, error_code& ec)
- {
-
- kcp::kcphdr synack = kcp::make_kcphdr_synack(syn.th_ack, syn.th_seq);
- return this->_kcp_send_hdr(synack, ec);
- }
- template<class Data, class Callback>
- inline bool _kcp_send(Data& data, Callback&& callback)
- {
- #if defined(ASIO2_ENABLE_LOG)
- static_assert(decltype(tallocator_)::storage_size == 168);
- #endif
- auto buffer = asio::buffer(data);
- #if defined(_DEBUG) || defined(DEBUG)
- ASIO2_ASSERT(derive.post_send_counter_.load() == 0);
- derive.post_send_counter_++;
- #endif
- int ret = kcp::ikcp_send(this->kcp_, (const char *)buffer.data(), (int)buffer.size());
- #if defined(_DEBUG) || defined(DEBUG)
- derive.post_send_counter_--;
- #endif
- switch (ret)
- {
- case 0: set_last_error(error_code{} ); break;
- case -1: set_last_error(asio::error::invalid_argument ); break;
- case -2: set_last_error(asio::error::no_memory ); break;
- default: set_last_error(asio::error::operation_not_supported); break;
- }
- if (ret == 0)
- {
- kcp::ikcp_flush(this->kcp_);
- }
- callback(get_last_error(), ret < 0 ? 0 : buffer.size());
- return (ret == 0);
- }
- void _post_kcp_timer(std::shared_ptr<derived_t> this_ptr)
- {
- std::uint32_t clock1 = static_cast<std::uint32_t>(std::chrono::duration_cast<
- std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count());
- std::uint32_t clock2 = kcp::ikcp_check(this->kcp_, clock1);
- this->kcp_timer_.expires_after(std::chrono::milliseconds(clock2 - clock1));
- this->kcp_timer_.async_wait(make_allocator(this->tallocator_,
- [this, this_ptr = std::move(this_ptr)](const error_code & ec) mutable
- {
- this->_handle_kcp_timer(ec, std::move(this_ptr));
- }));
- }
- void _handle_kcp_timer(const error_code & ec, std::shared_ptr<derived_t> this_ptr)
- {
- if (ec == asio::error::operation_aborted) return;
- std::uint32_t clock = static_cast<std::uint32_t>(std::chrono::duration_cast<
- std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count());
- kcp::ikcp_update(this->kcp_, clock);
- if (this->kcp_->state == (kcp::IUINT32)-1)
- {
- if (derive.state_ == state_t::started)
- {
- derive._do_disconnect(asio::error::network_reset, std::move(this_ptr));
- }
- return;
- }
- if (derive.is_started())
- this->_post_kcp_timer(std::move(this_ptr));
- }
- template<typename C>
- void _kcp_recv(
- std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs, std::string_view data)
- {
- auto& buffer = derive.buffer();
- int len = kcp::ikcp_input(this->kcp_, (const char *)data.data(), (long)data.size());
- buffer.consume(buffer.size());
- if (len != 0)
- {
- set_last_error(asio::error::message_size);
- this->_call_illegal_data_callback(data);
- return;
- }
- for (;;)
- {
- len = kcp::ikcp_recv(this->kcp_, (char *)buffer.prepare(
- buffer.pre_size()).data(), (int)buffer.pre_size());
- if (len >= 0)
- {
- buffer.commit(len);
- derive._fire_recv(this_ptr, ecs, std::string_view(static_cast
- <std::string_view::const_pointer>(buffer.data().data()), len));
- buffer.consume(len);
- }
- else if (len == -3)
- {
- buffer.pre_size((std::min)(buffer.pre_size() * 2, buffer.max_size()));
- }
- else
- {
- break;
- }
- }
- kcp::ikcp_flush(this->kcp_);
- }
- template<typename C>
- inline void _kcp_handle_recv(
- error_code ec, std::string_view data,
- std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs)
- {
-
-
- if (data.size() > kcp::kcphdr::required_size())
- {
- this->_kcp_recv(this_ptr, ecs, data);
- }
- else if (data.size() == kcp::kcphdr::required_size())
- {
-
-
-
-
-
-
- if (kcp::is_kcphdr_syn(data))
- {
- ASIO2_ASSERT(this->kcp_);
- if (this->kcp_)
- {
- kcp::kcphdr syn = kcp::to_kcphdr(data);
- std::uint32_t conv = syn.th_ack;
- if (conv == 0)
- {
- conv = this->kcp_->conv;
- syn.th_ack = conv;
- }
-
-
-
- #if 0
-
- this->send_fin_ = false;
- this->_kcp_stop();
- this->_kcp_start(this_ptr, conv);
- #else
- this->_kcp_reset();
- #endif
- this->send_fin_ = true;
-
- this->_kcp_send_synack(syn, ec);
- if (ec)
- {
- derive._do_disconnect(ec, this_ptr);
- }
- }
- else
- {
- derive._do_disconnect(asio::error::operation_aborted, this_ptr);
- }
- }
- else if (kcp::is_kcphdr_synack(data, 0, true))
- {
- ASIO2_ASSERT(this->kcp_);
- }
- else if (kcp::is_kcphdr_ack(data, 0, true))
- {
- ASIO2_ASSERT(this->kcp_);
- }
- else if (kcp::is_kcphdr_fin(data))
- {
- ASIO2_ASSERT(this->kcp_);
- this->send_fin_ = false;
- derive._do_disconnect(asio::error::connection_reset, this_ptr);
- }
- else
- {
- this->_call_illegal_data_callback(data);
- }
- }
- else
- {
- this->_call_illegal_data_callback(data);
- }
- }
- template<typename C, typename DeferEvent>
- void _session_post_handshake(
- std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
- error_code ec;
- std::string& data = *(derive.first_data_);
-
- kcp::kcphdr syn = kcp::to_kcphdr(data);
- std::uint32_t conv = syn.th_ack;
- if (conv == 0)
- {
- conv = derive.kcp_conv_;
- syn.th_ack = conv;
- }
-
- this->_kcp_send_synack(syn, ec);
- if (ec)
- {
- derive._do_disconnect(ec, std::move(this_ptr), std::move(chain));
- return;
- }
- this->_kcp_start(this_ptr, conv);
- this->_handle_handshake(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- template<typename C, typename DeferEvent>
- void _client_post_handshake(
- std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
- error_code ec;
-
- std::uint32_t seq = static_cast<std::uint32_t>(
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch()).count());
- this->_kcp_send_syn(seq, ec);
- if (ec)
- {
- derive._do_disconnect(ec, std::move(this_ptr), defer_event(chain.move_guard()));
- return;
- }
-
-
- std::shared_ptr<detail::safe_timer> timer =
- mktimer(derive.io_->context(), std::chrono::milliseconds(500),
- [this, this_ptr, seq](error_code ec) mutable
- {
- if (ec == asio::error::operation_aborted)
- return false;
- this->_kcp_send_syn(seq, ec);
- if (ec)
- {
- set_last_error(ec);
- if (derive.state_ == state_t::started)
- {
- derive._do_disconnect(ec, std::move(this_ptr));
- }
- return false;
- }
-
-
- return true;
- });
- #if defined(_DEBUG) || defined(DEBUG)
- ASIO2_ASSERT(derive.post_recv_counter_.load() == 0);
- derive.post_recv_counter_++;
- #endif
-
- derive.socket().async_receive(derive.buffer().prepare(derive.buffer().pre_size()),
- make_allocator(derive.rallocator(),
- [this, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain),
- seq, timer = std::move(timer)]
- (const error_code & ec, std::size_t bytes_recvd) mutable
- {
- #if defined(_DEBUG) || defined(DEBUG)
- derive.post_recv_counter_--;
- #endif
- ASIO2_ASSERT(derive.io_->running_in_this_thread());
- timer->cancel();
- if (ec)
- {
-
-
-
- this->_handle_handshake(
- derive.connect_timeout_timer_ ? ec : asio::error::timed_out,
- std::move(this_ptr), std::move(ecs), std::move(chain));
- return;
- }
- derive.buffer().commit(bytes_recvd);
- std::string_view data = std::string_view(static_cast<std::string_view::const_pointer>
- (derive.buffer().data().data()), bytes_recvd);
-
- if (kcp::is_kcphdr_synack(data, seq))
- {
- kcp::kcphdr hdr = kcp::to_kcphdr(data);
- std::uint32_t conv = hdr.th_seq;
- if (derive.kcp_conv_ != 0)
- {
- ASIO2_ASSERT(derive.kcp_conv_ == conv);
- }
- this->_kcp_start(this_ptr, conv);
- this->_handle_handshake(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- else
- {
- this->_handle_handshake(asio::error::address_family_not_supported,
- std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- derive.buffer().consume(bytes_recvd);
- }));
- }
- template<typename C, typename DeferEvent>
- void _post_handshake(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
- if constexpr (args_t::is_session)
- {
- this->_session_post_handshake(std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- else
- {
- this->_client_post_handshake(std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- }
- template<typename C, typename DeferEvent>
- void _handle_handshake(
- const error_code& ec, std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs,
- DeferEvent chain)
- {
- set_last_error(ec);
- if constexpr (args_t::is_session)
- {
- derive._fire_handshake(this_ptr);
- if (ec)
- {
- derive._do_disconnect(ec, std::move(this_ptr), std::move(chain));
- return;
- }
- derive._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- else
- {
- derive._fire_handshake(this_ptr);
- derive._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- }
- static int _kcp_output(const char *buf, int len, kcp::ikcpcb *kcp, void *user)
- {
- std::ignore = kcp;
- kcp_stream_cp * zhis = ((kcp_stream_cp*)user);
- derived_t & derive = zhis->derive;
- error_code ec;
- if constexpr (args_t::is_session)
- derive.stream().send_to(asio::buffer(buf, len), derive.remote_endpoint_, 0, ec);
- else
- derive.stream().send(asio::buffer(buf, len), 0, ec);
- return 0;
- }
- inline void _call_illegal_data_callback(std::string_view data)
- {
- if (this->illegal_response_handler_)
- {
- this->illegal_response_handler_(data);
- }
- }
- protected:
- derived_t & derive;
-
- kcp::ikcpcb * kcp_ = nullptr;
-
- bool send_fin_ = true;
- asio::steady_timer kcp_timer_;
- handler_memory<std::true_type, allocator_fixed_size_op<168>> tallocator_;
- std::function<void(std::string_view)> illegal_response_handler_;
- };
- }
- #endif
|