| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423 | //// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)//// Distributed under the Boost Software License, Version 1.0. (See accompanying// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)//// Official repository: https://github.com/boostorg/beast//#ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP#define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP#include <boost/beast/core/buffer_traits.hpp>#include <boost/beast/websocket/teardown.hpp>#include <boost/beast/websocket/detail/mask.hpp>#include <boost/beast/websocket/impl/stream_impl.hpp>#include <boost/beast/core/async_base.hpp>#include <boost/beast/core/buffers_prefix.hpp>#include <boost/beast/core/buffers_suffix.hpp>#include <boost/beast/core/flat_static_buffer.hpp>#include <boost/beast/core/read_size.hpp>#include <boost/beast/core/stream_traits.hpp>#include <boost/beast/core/detail/bind_continuation.hpp>#include <boost/beast/core/detail/buffer.hpp>#include <boost/beast/core/detail/clamp.hpp>#include <boost/beast/core/detail/config.hpp>#include <boost/asio/coroutine.hpp>#include <boost/assert.hpp>#include <boost/config.hpp>#include <boost/optional.hpp>#include <boost/throw_exception.hpp>#include <algorithm>#include <limits>#include <memory>namespace boost {namespace beast {namespace websocket {/*  Read some message data into a buffer sequence.    Also reads and handles control frames.*/template<class NextLayer, bool deflateSupported>template<class Handler, class MutableBufferSequence>class stream<NextLayer, deflateSupported>::read_some_op    : public beast::async_base<        Handler, beast::executor_type<stream>>    , public asio::coroutine{    boost::weak_ptr<impl_type> wp_;    MutableBufferSequence bs_;    buffers_suffix<MutableBufferSequence> cb_;    std::size_t bytes_written_ = 0;    error_code result_;    close_code code_;    bool did_read_ = false;public:    static constexpr int id = 1; // for soft_mutex    template<class Handler_>    read_some_op(        Handler_&& h,        boost::shared_ptr<impl_type> const& sp,        MutableBufferSequence const& bs)        : async_base<            Handler, beast::executor_type<stream>>(                std::forward<Handler_>(h),                    sp->stream().get_executor())        , wp_(sp)        , bs_(bs)        , cb_(bs)        , code_(close_code::none)    {        (*this)({}, 0, false);    }    void operator()(        error_code ec = {},        std::size_t bytes_transferred = 0,        bool cont = true)    {        using beast::detail::clamp;        auto sp = wp_.lock();        if(! sp)        {            BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);            bytes_written_ = 0;            return this->complete(cont, ec, bytes_written_);        }        auto& impl = *sp;        BOOST_ASIO_CORO_REENTER(*this)        {            impl.update_timer(this->get_executor());        acquire_read_lock:            // Acquire the read lock            if(! impl.rd_block.try_lock(this))            {            do_suspend:                BOOST_ASIO_CORO_YIELD                {                    BOOST_ASIO_HANDLER_LOCATION((                        __FILE__, __LINE__,                        "websocket::async_read_some"));                    this->set_allowed_cancellation(net::cancellation_type::all);                    impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all);                }                if (ec)                    return this->complete(cont, ec, bytes_written_);                this->set_allowed_cancellation(net::cancellation_type::terminal);                impl.rd_block.lock(this);                BOOST_ASIO_CORO_YIELD                {                    BOOST_ASIO_HANDLER_LOCATION((                        __FILE__, __LINE__,                        "websocket::async_read_some"));                    const auto ex = this->get_immediate_executor();                    net::dispatch(ex, std::move(*this));                }                BOOST_ASSERT(impl.rd_block.is_locked(this));                BOOST_ASSERT(!ec);                if(impl.check_stop_now(ec))                {                    // Issue 2264 - There is no guarantee that the next                    // error will be operation_aborted.                    // The error could be a result of the peer resetting the                     // connection                    // BOOST_ASSERT(ec == net::error::operation_aborted);                    goto upcall;                }                // VFALCO Should never get here                // The only way to get read blocked is if                // a `close_op` wrote a close frame                BOOST_ASSERT(impl.wr_close);                BOOST_ASSERT(impl.status_ != status::open);                BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);                goto upcall;            }            else            {                // Make sure the stream is not closed                if( impl.status_ == status::closed ||                    impl.status_ == status::failed)                {                    BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);                    goto upcall;                }            }            // if status_ == status::closing, we want to suspend            // the read operation until the close completes,            // then finish the read with operation_aborted.        loop:            BOOST_ASSERT(impl.rd_block.is_locked(this));            // See if we need to read a frame header. This            // condition is structured to give the decompressor            // a chance to emit the final empty deflate block            //            if(impl.rd_remain == 0 &&                (! impl.rd_fh.fin || impl.rd_done))            {                // Read frame header                while(! impl.parse_fh(                    impl.rd_fh, impl.rd_buf, result_))                {                    if(result_)                    {                        // _Fail the WebSocket Connection_                        if(result_ == error::message_too_big)                            code_ = close_code::too_big;                        else                            code_ = close_code::protocol_error;                        goto close;                    }                    BOOST_ASSERT(impl.rd_block.is_locked(this));                    BOOST_ASIO_CORO_YIELD                    {                        BOOST_ASIO_HANDLER_LOCATION((                            __FILE__, __LINE__,                            "websocket::async_read_some"));                        impl.stream().async_read_some(                            impl.rd_buf.prepare(read_size(                                impl.rd_buf, impl.rd_buf.max_size())),                                    std::move(*this));                    }                    BOOST_ASSERT(impl.rd_block.is_locked(this));                    impl.rd_buf.commit(bytes_transferred);                    if(impl.check_stop_now(ec))                        goto upcall;                    impl.reset_idle();                    // Allow a close operation                    // to acquire the read block                    impl.rd_block.unlock(this);                    if( impl.op_r_close.maybe_invoke())                    {                        // Suspend                        BOOST_ASSERT(impl.rd_block.is_locked());                        goto do_suspend;                    }                    // Acquire read block                    impl.rd_block.lock(this);                }                // Immediately apply the mask to the portion                // of the buffer holding payload data.                if(impl.rd_fh.len > 0 && impl.rd_fh.mask)                    detail::mask_inplace(buffers_prefix(                        clamp(impl.rd_fh.len),                            impl.rd_buf.data()),                                impl.rd_key);                if(detail::is_control(impl.rd_fh.op))                {                    // Clear this otherwise the next                    // frame will be considered final.                    impl.rd_fh.fin = false;                    // Handle ping frame                    if(impl.rd_fh.op == detail::opcode::ping)                    {                        if(impl.ctrl_cb)                        {                            if(! cont)                            {                                BOOST_ASIO_CORO_YIELD                                {                                    BOOST_ASIO_HANDLER_LOCATION((                                        __FILE__, __LINE__,                                        "websocket::async_read_some"));                                    const auto ex = this->get_immediate_executor();                                    net::dispatch(ex, std::move(*this));                                }                                BOOST_ASSERT(cont);                                // VFALCO call check_stop_now() here?                            }                        }                        {                            auto const b = buffers_prefix(                                clamp(impl.rd_fh.len),                                    impl.rd_buf.data());                            auto const len = buffer_bytes(b);                            BOOST_ASSERT(len == impl.rd_fh.len);                            ping_data payload;                            detail::read_ping(payload, b);                            impl.rd_buf.consume(len);                            // Ignore ping when closing                            if(impl.status_ == status::closing)                                goto loop;                            if(impl.ctrl_cb)                                impl.ctrl_cb(                                    frame_type::ping, to_string_view(payload));                            impl.rd_fb.clear();                            impl.template write_ping<                                flat_static_buffer_base>(impl.rd_fb,                                    detail::opcode::pong, payload);                        }                        // Allow a close operation                        // to acquire the read block                        impl.rd_block.unlock(this);                        impl.op_r_close.maybe_invoke();                        // Acquire the write lock                        if(! impl.wr_block.try_lock(this))                        {                            BOOST_ASIO_CORO_YIELD                            {                                BOOST_ASIO_HANDLER_LOCATION((                                    __FILE__, __LINE__,                                    "websocket::async_read_some"));                                impl.op_rd.emplace(std::move(*this));                            }                            if (ec)                                return this->complete(cont, ec, bytes_written_);                            impl.wr_block.lock(this);                            BOOST_ASIO_CORO_YIELD                            {                                BOOST_ASIO_HANDLER_LOCATION((                                    __FILE__, __LINE__,                                    "websocket::async_read_some"));                                const auto ex = this->get_immediate_executor();                                net::dispatch(ex, std::move(*this));                            }                            BOOST_ASSERT(impl.wr_block.is_locked(this));                            if(impl.check_stop_now(ec))                                goto upcall;                        }                        // Send pong                        BOOST_ASSERT(impl.wr_block.is_locked(this));                        BOOST_ASIO_CORO_YIELD                        {                            BOOST_ASIO_HANDLER_LOCATION((                                __FILE__, __LINE__,                                "websocket::async_read_some"));                            net::async_write(                                impl.stream(), net::const_buffer(impl.rd_fb.data()),                                beast::detail::bind_continuation(std::move(*this)));                        }                        BOOST_ASSERT(impl.wr_block.is_locked(this));                        if(impl.check_stop_now(ec))                            goto upcall;                        impl.wr_block.unlock(this);                        impl.op_close.maybe_invoke()                            || impl.op_idle_ping.maybe_invoke()                            || impl.op_ping.maybe_invoke()                            || impl.op_wr.maybe_invoke();                        goto acquire_read_lock;                    }                    // Handle pong frame                    if(impl.rd_fh.op == detail::opcode::pong)                    {                        // Ignore pong when closing                        if(! impl.wr_close && impl.ctrl_cb)                        {                            if(! cont)                            {                                BOOST_ASIO_CORO_YIELD                                {                                    BOOST_ASIO_HANDLER_LOCATION((                                        __FILE__, __LINE__,                                        "websocket::async_read_some"));                                    const auto ex = this->get_immediate_executor();                                    net::dispatch(ex, std::move(*this));                                }                                BOOST_ASSERT(cont);                            }                        }                        auto const cb = buffers_prefix(clamp(                            impl.rd_fh.len), impl.rd_buf.data());                        auto const len = buffer_bytes(cb);                        BOOST_ASSERT(len == impl.rd_fh.len);                        ping_data payload;                        detail::read_ping(payload, cb);                        impl.rd_buf.consume(len);                        // Ignore pong when closing                        if(! impl.wr_close && impl.ctrl_cb)                            impl.ctrl_cb(frame_type::pong, to_string_view(payload));                        goto loop;                    }                    // Handle close frame                    BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);                    {                        if(impl.ctrl_cb)                        {                            if(! cont)                            {                                BOOST_ASIO_CORO_YIELD                                {                                    BOOST_ASIO_HANDLER_LOCATION((                                        __FILE__, __LINE__,                                        "websocket::async_read_some"));                                    const auto ex = this->get_immediate_executor();                                    net::dispatch(ex, std::move(*this));                                }                                BOOST_ASSERT(cont);                            }                        }                        auto const cb = buffers_prefix(clamp(                            impl.rd_fh.len), impl.rd_buf.data());                        auto const len = buffer_bytes(cb);                        BOOST_ASSERT(len == impl.rd_fh.len);                        BOOST_ASSERT(! impl.rd_close);                        impl.rd_close = true;                        close_reason cr;                        detail::read_close(cr, cb, result_);                        if(result_)                        {                            // _Fail the WebSocket Connection_                            code_ = close_code::protocol_error;                            goto close;                        }                        impl.cr = cr;                        impl.rd_buf.consume(len);                        if(impl.ctrl_cb)                            impl.ctrl_cb(frame_type::close,                                to_string_view(impl.cr.reason));                        // See if we are already closing                        if(impl.status_ == status::closing)                        {                            // _Close the WebSocket Connection_                            BOOST_ASSERT(impl.wr_close);                            code_ = close_code::none;                            result_ = error::closed;                            goto close;                        }                        // _Start the WebSocket Closing Handshake_                        code_ = cr.code == close_code::none ?                            close_code::normal :                            static_cast<close_code>(cr.code);                        result_ = error::closed;                        goto close;                    }                }                if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)                {                    // Empty non-final frame                    goto loop;                }                impl.rd_done = false;            }            if(! impl.rd_deflated())            {                if(impl.rd_remain > 0)                {                    if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >                        (std::min)(clamp(impl.rd_remain),                            buffer_bytes(cb_)))                    {                        // Fill the read buffer first, otherwise we                        // get fewer bytes at the cost of one I/O.                        BOOST_ASIO_CORO_YIELD                        {                            BOOST_ASIO_HANDLER_LOCATION((                                __FILE__, __LINE__,                                "websocket::async_read_some"));                            impl.stream().async_read_some(                                impl.rd_buf.prepare(read_size(                                    impl.rd_buf, impl.rd_buf.max_size())),                                        std::move(*this));                        }                        impl.rd_buf.commit(bytes_transferred);                        if(impl.check_stop_now(ec))                            goto upcall;                        impl.reset_idle();                        if(impl.rd_fh.mask)                            detail::mask_inplace(buffers_prefix(clamp(                                impl.rd_remain), impl.rd_buf.data()),                                    impl.rd_key);                    }                    if(impl.rd_buf.size() > 0)                    {                        // Copy from the read buffer.                        // The mask was already applied.                        bytes_transferred = net::buffer_copy(cb_,                            impl.rd_buf.data(), clamp(impl.rd_remain));                        auto const mb = buffers_prefix(                            bytes_transferred, cb_);                        impl.rd_remain -= bytes_transferred;                        if(impl.rd_op == detail::opcode::text)                        {                            if(! impl.rd_utf8.write(mb) ||                                (impl.rd_remain == 0 && impl.rd_fh.fin &&                                    ! impl.rd_utf8.finish()))                            {                                // _Fail the WebSocket Connection_                                code_ = close_code::bad_payload;                                result_ = error::bad_frame_payload;                                goto close;                            }                        }                        bytes_written_ += bytes_transferred;                        impl.rd_size += bytes_transferred;                        impl.rd_buf.consume(bytes_transferred);                    }                    else                    {                        // Read into caller's buffer                        BOOST_ASSERT(impl.rd_remain > 0);                        BOOST_ASSERT(buffer_bytes(cb_) > 0);                        BOOST_ASSERT(buffer_bytes(buffers_prefix(                            clamp(impl.rd_remain), cb_)) > 0);                        BOOST_ASIO_CORO_YIELD                        {                            BOOST_ASIO_HANDLER_LOCATION((                                __FILE__, __LINE__,                                "websocket::async_read_some"));                            impl.stream().async_read_some(buffers_prefix(                                clamp(impl.rd_remain), cb_), std::move(*this));                        }                        if(impl.check_stop_now(ec))                            goto upcall;                        impl.reset_idle();                        BOOST_ASSERT(bytes_transferred > 0);                        auto const mb = buffers_prefix(                            bytes_transferred, cb_);                        impl.rd_remain -= bytes_transferred;                        if(impl.rd_fh.mask)                            detail::mask_inplace(mb, impl.rd_key);                        if(impl.rd_op == detail::opcode::text)                        {                            if(! impl.rd_utf8.write(mb) ||                                (impl.rd_remain == 0 && impl.rd_fh.fin &&                                    ! impl.rd_utf8.finish()))                            {                                // _Fail the WebSocket Connection_                                code_ = close_code::bad_payload;                                result_ = error::bad_frame_payload;                                goto close;                            }                        }                        bytes_written_ += bytes_transferred;                        impl.rd_size += bytes_transferred;                    }                }                BOOST_ASSERT( ! impl.rd_done );                if( impl.rd_remain == 0 && impl.rd_fh.fin )                    impl.rd_done = true;            }            else            {                // Read compressed message frame payload:                // inflate even if rd_fh_.len == 0, otherwise we                // never emit the end-of-stream deflate block.                while(buffer_bytes(cb_) > 0)                {                    if( impl.rd_remain > 0 &&                        impl.rd_buf.size() == 0 &&                        ! did_read_)                    {                        // read new                        BOOST_ASIO_CORO_YIELD                        {                            BOOST_ASIO_HANDLER_LOCATION((                                __FILE__, __LINE__,                                "websocket::async_read_some"));                            impl.stream().async_read_some(                                impl.rd_buf.prepare(read_size(                                    impl.rd_buf, impl.rd_buf.max_size())),                                        std::move(*this));                        }                        if(impl.check_stop_now(ec))                            goto upcall;                        impl.reset_idle();                        BOOST_ASSERT(bytes_transferred > 0);                        impl.rd_buf.commit(bytes_transferred);                        if(impl.rd_fh.mask)                            detail::mask_inplace(                                buffers_prefix(clamp(impl.rd_remain),                                    impl.rd_buf.data()), impl.rd_key);                        did_read_ = true;                    }                    zlib::z_params zs;                    {                        auto const out = buffers_front(cb_);                        zs.next_out = out.data();                        zs.avail_out = out.size();                        BOOST_ASSERT(zs.avail_out > 0);                    }                    // boolean to track the end of the message.                    bool fin = false;                    if(impl.rd_remain > 0)                    {                        if(impl.rd_buf.size() > 0)                        {                            // use what's there                            auto const in = buffers_prefix(                                clamp(impl.rd_remain), buffers_front(                                    impl.rd_buf.data()));                            zs.avail_in = in.size();                            zs.next_in = in.data();                        }                        else                        {                            break;                        }                    }                    else if(impl.rd_fh.fin)                    {                        // append the empty block codes                        static std::uint8_t constexpr                            empty_block[4] = { 0x00, 0x00, 0xff, 0xff };                        zs.next_in = empty_block;                        zs.avail_in = sizeof(empty_block);                        fin = true;                    }                    else                    {                        break;                    }                    impl.inflate(zs, zlib::Flush::sync, ec);                    if(impl.check_stop_now(ec))                        goto upcall;                    if(fin && zs.total_out == 0) {                        impl.do_context_takeover_read(impl.role);                        impl.rd_done = true;                        break;                    }                    if(impl.rd_msg_max && beast::detail::sum_exceeds(                        impl.rd_size, zs.total_out, impl.rd_msg_max))                    {                        // _Fail the WebSocket Connection_                        code_ = close_code::too_big;                        result_ = error::message_too_big;                        goto close;                    }                    cb_.consume(zs.total_out);                    impl.rd_size += zs.total_out;                    if (! fin) {                        impl.rd_remain -= zs.total_in;                        impl.rd_buf.consume(zs.total_in);                    }                    bytes_written_ += zs.total_out;                }                if(impl.rd_op == detail::opcode::text)                {                    // check utf8                    if(! impl.rd_utf8.write(                        buffers_prefix(bytes_written_, bs_)) || (                            impl.rd_done && ! impl.rd_utf8.finish()))                    {                        // _Fail the WebSocket Connection_                        code_ = close_code::bad_payload;                        result_ = error::bad_frame_payload;                        goto close;                    }                }            }            goto upcall;        close:            // Acquire the write lock            if(! impl.wr_block.try_lock(this))            {                BOOST_ASIO_CORO_YIELD                {                    BOOST_ASIO_HANDLER_LOCATION((                        __FILE__, __LINE__,                        "websocket::async_read_some"));                    impl.op_rd.emplace(std::move(*this));                }                if (ec)                    return this->complete(cont, ec, bytes_written_);                impl.wr_block.lock(this);                BOOST_ASIO_CORO_YIELD                {                    BOOST_ASIO_HANDLER_LOCATION((                        __FILE__, __LINE__,                        "websocket::async_read_some"));                    const auto ex = this->get_immediate_executor();                    net::dispatch(ex, std::move(*this));                }                BOOST_ASSERT(impl.wr_block.is_locked(this));                if(impl.check_stop_now(ec))                    goto upcall;            }            impl.change_status(status::closing);            if(! impl.wr_close)            {                impl.wr_close = true;                // Serialize close frame                impl.rd_fb.clear();                impl.template write_close<                    flat_static_buffer_base>(                        impl.rd_fb, code_);                // Send close frame                BOOST_ASSERT(impl.wr_block.is_locked(this));                BOOST_ASIO_CORO_YIELD                {                    BOOST_ASIO_HANDLER_LOCATION((                        __FILE__, __LINE__,                        "websocket::async_read_some"));                    net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),                        beast::detail::bind_continuation(std::move(*this)));                }                BOOST_ASSERT(impl.wr_block.is_locked(this));                if(impl.check_stop_now(ec))                    goto upcall;            }            // Teardown            using beast::websocket::async_teardown;            BOOST_ASSERT(impl.wr_block.is_locked(this));            BOOST_ASIO_CORO_YIELD            {                BOOST_ASIO_HANDLER_LOCATION((                    __FILE__, __LINE__,                    "websocket::async_read_some"));                async_teardown(impl.role, impl.stream(),                    beast::detail::bind_continuation(std::move(*this)));            }            BOOST_ASSERT(impl.wr_block.is_locked(this));            if(ec == net::error::eof)            {                // Rationale:                // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error                ec = {};            }            if(! ec)            {                BOOST_BEAST_ASSIGN_EC(ec, result_);            }            if(ec && ec != error::closed)                impl.change_status(status::failed);            else                impl.change_status(status::closed);            impl.close();        upcall:            impl.rd_block.try_unlock(this);            impl.op_r_close.maybe_invoke();            if(impl.wr_block.try_unlock(this))                impl.op_close.maybe_invoke()                    || impl.op_idle_ping.maybe_invoke()                    || impl.op_ping.maybe_invoke()                    || impl.op_wr.maybe_invoke();            this->complete(cont, ec, bytes_written_);        }    }};//------------------------------------------------------------------------------template<class NextLayer, bool deflateSupported>template<class Handler,  class DynamicBuffer>class stream<NextLayer, deflateSupported>::read_op    : public beast::async_base<        Handler, beast::executor_type<stream>>    , public asio::coroutine{    boost::weak_ptr<impl_type> wp_;    DynamicBuffer& b_;    std::size_t limit_;    std::size_t bytes_written_ = 0;    bool some_;public:    template<class Handler_>    read_op(        Handler_&& h,        boost::shared_ptr<impl_type> const& sp,        DynamicBuffer& b,        std::size_t limit,        bool some)        : async_base<Handler,            beast::executor_type<stream>>(                std::forward<Handler_>(h),                    sp->stream().get_executor())        , wp_(sp)        , b_(b)        , limit_(limit ? limit : (            std::numeric_limits<std::size_t>::max)())        , some_(some)    {        (*this)({}, 0, false);    }    void operator()(        error_code ec = {},        std::size_t bytes_transferred = 0,        bool cont = true)    {        using beast::detail::clamp;        auto sp = wp_.lock();        if(! sp)        {            BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);            bytes_written_ = 0;            return this->complete(cont, ec, bytes_written_);        }        auto& impl = *sp;        using mutable_buffers_type = typename            DynamicBuffer::mutable_buffers_type;        BOOST_ASIO_CORO_REENTER(*this)        {            do            {                // VFALCO TODO use boost::beast::bind_continuation                BOOST_ASIO_CORO_YIELD                {                    auto mb = beast::detail::dynamic_buffer_prepare(b_,                        clamp(impl.read_size_hint_db(b_), limit_),                            ec, error::buffer_overflow);                    if(impl.check_stop_now(ec))                        goto upcall;                    BOOST_ASIO_HANDLER_LOCATION((                        __FILE__, __LINE__,                        "websocket::async_read"));                    read_some_op<read_op, mutable_buffers_type>(                        std::move(*this), sp, *mb);                }                b_.commit(bytes_transferred);                bytes_written_ += bytes_transferred;                if(ec)                    goto upcall;            }            while(! some_ && ! impl.rd_done);        upcall:            this->complete(cont, ec, bytes_written_);        }    }};template<class NextLayer, bool deflateSupported>struct stream<NextLayer, deflateSupported>::    run_read_some_op{    boost::shared_ptr<impl_type> const& self;    using executor_type = typename stream::executor_type;    executor_type    get_executor() const noexcept    {        return self->stream().get_executor();    }    template<        class ReadHandler,        class MutableBufferSequence>    void    operator()(        ReadHandler&& h,        MutableBufferSequence const& b)    {        // If you get an error on the following line it means        // that your handler does not meet the documented type        // requirements for the handler.        static_assert(            beast::detail::is_invocable<ReadHandler,                void(error_code, std::size_t)>::value,            "ReadHandler type requirements not met");        read_some_op<            typename std::decay<ReadHandler>::type,            MutableBufferSequence>(                std::forward<ReadHandler>(h),                self,                b);    }};template<class NextLayer, bool deflateSupported>struct stream<NextLayer, deflateSupported>::    run_read_op{    boost::shared_ptr<impl_type> const& self;    using executor_type = typename stream::executor_type;    executor_type    get_executor() const noexcept    {        return self->stream().get_executor();    }    template<        class ReadHandler,        class DynamicBuffer>    void    operator()(        ReadHandler&& h,        DynamicBuffer* b,        std::size_t limit,        bool some)    {        // If you get an error on the following line it means        // that your handler does not meet the documented type        // requirements for the handler.        static_assert(            beast::detail::is_invocable<ReadHandler,                void(error_code, std::size_t)>::value,            "ReadHandler type requirements not met");        read_op<            typename std::decay<ReadHandler>::type,            DynamicBuffer>(                std::forward<ReadHandler>(h),                self,                *b,                limit,                some);    }};//------------------------------------------------------------------------------template<class NextLayer, bool deflateSupported>template<class DynamicBuffer>std::size_tstream<NextLayer, deflateSupported>::read(DynamicBuffer& buffer){    static_assert(is_sync_stream<next_layer_type>::value,        "SyncStream type requirements not met");    static_assert(        net::is_dynamic_buffer<DynamicBuffer>::value,        "DynamicBuffer type requirements not met");    error_code ec;    auto const bytes_written = read(buffer, ec);    if(ec)        BOOST_THROW_EXCEPTION(system_error{ec});    return bytes_written;}template<class NextLayer, bool deflateSupported>template<class DynamicBuffer>std::size_tstream<NextLayer, deflateSupported>::read(DynamicBuffer& buffer, error_code& ec){    static_assert(is_sync_stream<next_layer_type>::value,        "SyncStream type requirements not met");    static_assert(        net::is_dynamic_buffer<DynamicBuffer>::value,        "DynamicBuffer type requirements not met");    std::size_t bytes_written = 0;    do    {        bytes_written += read_some(buffer, 0, ec);        if(ec)            return bytes_written;    }    while(! is_message_done());    return bytes_written;}template<class NextLayer, bool deflateSupported>template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>BOOST_BEAST_ASYNC_RESULT2(ReadHandler)stream<NextLayer, deflateSupported>::async_read(DynamicBuffer& buffer, ReadHandler&& handler){    static_assert(is_async_stream<next_layer_type>::value,        "AsyncStream type requirements not met");    static_assert(        net::is_dynamic_buffer<DynamicBuffer>::value,        "DynamicBuffer type requirements not met");    return net::async_initiate<        ReadHandler,        void(error_code, std::size_t)>(            run_read_op{impl_},            handler,            &buffer,            0,            false);}//------------------------------------------------------------------------------template<class NextLayer, bool deflateSupported>template<class DynamicBuffer>std::size_tstream<NextLayer, deflateSupported>::read_some(    DynamicBuffer& buffer,    std::size_t limit){    static_assert(is_sync_stream<next_layer_type>::value,        "SyncStream type requirements not met");    static_assert(        net::is_dynamic_buffer<DynamicBuffer>::value,        "DynamicBuffer type requirements not met");    error_code ec;    auto const bytes_written =        read_some(buffer, limit, ec);    if(ec)        BOOST_THROW_EXCEPTION(system_error{ec});    return bytes_written;}template<class NextLayer, bool deflateSupported>template<class DynamicBuffer>std::size_tstream<NextLayer, deflateSupported>::read_some(    DynamicBuffer& buffer,    std::size_t limit,    error_code& ec){    static_assert(is_sync_stream<next_layer_type>::value,        "SyncStream type requirements not met");    static_assert(        net::is_dynamic_buffer<DynamicBuffer>::value,        "DynamicBuffer type requirements not met");    using beast::detail::clamp;    if(! limit)        limit = (std::numeric_limits<std::size_t>::max)();    auto const size =        clamp(impl_->read_size_hint_db(buffer), limit);    BOOST_ASSERT(size > 0);    auto mb = beast::detail::dynamic_buffer_prepare(        buffer, size, ec, error::buffer_overflow);    if(impl_->check_stop_now(ec))        return 0;    auto const bytes_written = read_some(*mb, ec);    buffer.commit(bytes_written);    return bytes_written;}template<class NextLayer, bool deflateSupported>template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>BOOST_BEAST_ASYNC_RESULT2(ReadHandler)stream<NextLayer, deflateSupported>::async_read_some(    DynamicBuffer& buffer,    std::size_t limit,    ReadHandler&& handler){    static_assert(is_async_stream<next_layer_type>::value,        "AsyncStream type requirements not met");    static_assert(        net::is_dynamic_buffer<DynamicBuffer>::value,        "DynamicBuffer type requirements not met");    return net::async_initiate<        ReadHandler,        void(error_code, std::size_t)>(            run_read_op{impl_},            handler,            &buffer,            limit,            true);}//------------------------------------------------------------------------------template<class NextLayer, bool deflateSupported>template<class MutableBufferSequence>std::size_tstream<NextLayer, deflateSupported>::read_some(    MutableBufferSequence const& buffers){    static_assert(is_sync_stream<next_layer_type>::value,        "SyncStream type requirements not met");    static_assert(net::is_mutable_buffer_sequence<            MutableBufferSequence>::value,        "MutableBufferSequence type requirements not met");    error_code ec;    auto const bytes_written = read_some(buffers, ec);    if(ec)        BOOST_THROW_EXCEPTION(system_error{ec});    return bytes_written;}template<class NextLayer, bool deflateSupported>template<class MutableBufferSequence>std::size_tstream<NextLayer, deflateSupported>::read_some(    MutableBufferSequence const& buffers,    error_code& ec){    static_assert(is_sync_stream<next_layer_type>::value,        "SyncStream type requirements not met");    static_assert(net::is_mutable_buffer_sequence<            MutableBufferSequence>::value,        "MutableBufferSequence type requirements not met");    using beast::detail::clamp;    auto& impl = *impl_;    close_code code{};    std::size_t bytes_written = 0;    ec = {};    // Make sure the stream is open    if(impl.check_stop_now(ec))        return bytes_written;loop:    // See if we need to read a frame header. This    // condition is structured to give the decompressor    // a chance to emit the final empty deflate block    //    if(impl.rd_remain == 0 && (        ! impl.rd_fh.fin || impl.rd_done))    {        // Read frame header        error_code result;        while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))        {            if(result)            {                // _Fail the WebSocket Connection_                if(result == error::message_too_big)                    code = close_code::too_big;                else                    code = close_code::protocol_error;                do_fail(code, result, ec);                return bytes_written;            }            auto const bytes_transferred =                impl.stream().read_some(                    impl.rd_buf.prepare(read_size(                        impl.rd_buf, impl.rd_buf.max_size())),                    ec);            impl.rd_buf.commit(bytes_transferred);            if(impl.check_stop_now(ec))                return bytes_written;        }        // Immediately apply the mask to the portion        // of the buffer holding payload data.        if(impl.rd_fh.len > 0 && impl.rd_fh.mask)            detail::mask_inplace(buffers_prefix(                clamp(impl.rd_fh.len), impl.rd_buf.data()),                    impl.rd_key);        if(detail::is_control(impl.rd_fh.op))        {            // Get control frame payload            auto const b = buffers_prefix(                clamp(impl.rd_fh.len), impl.rd_buf.data());            auto const len = buffer_bytes(b);            BOOST_ASSERT(len == impl.rd_fh.len);            // Clear this otherwise the next            // frame will be considered final.            impl.rd_fh.fin = false;            // Handle ping frame            if(impl.rd_fh.op == detail::opcode::ping)            {                ping_data payload;                detail::read_ping(payload, b);                impl.rd_buf.consume(len);                if(impl.wr_close)                {                    // Ignore ping when closing                    goto loop;                }                if(impl.ctrl_cb)                    impl.ctrl_cb(frame_type::ping, to_string_view(payload));                detail::frame_buffer fb;                impl.template write_ping<flat_static_buffer_base>(fb,                    detail::opcode::pong, payload);                net::write(impl.stream(), fb.data(), ec);                if(impl.check_stop_now(ec))                    return bytes_written;                goto loop;            }            // Handle pong frame            if(impl.rd_fh.op == detail::opcode::pong)            {                ping_data payload;                detail::read_ping(payload, b);                impl.rd_buf.consume(len);                if(impl.ctrl_cb)                    impl.ctrl_cb(frame_type::pong, to_string_view(payload));                goto loop;            }            // Handle close frame            BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);            {                BOOST_ASSERT(! impl.rd_close);                impl.rd_close = true;                close_reason cr;                detail::read_close(cr, b, result);                if(result)                {                    // _Fail the WebSocket Connection_                    do_fail(close_code::protocol_error,                        result, ec);                    return bytes_written;                }                impl.cr = cr;                impl.rd_buf.consume(len);                if(impl.ctrl_cb)                    impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason));                BOOST_ASSERT(! impl.wr_close);                // _Start the WebSocket Closing Handshake_                do_fail(                    cr.code == close_code::none ?                        close_code::normal :                        static_cast<close_code>(cr.code),                    error::closed, ec);                return bytes_written;            }        }        if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)        {            // Empty non-final frame            goto loop;        }        impl.rd_done = false;    }    else    {        ec = {};    }    if(! impl.rd_deflated())    {        if(impl.rd_remain > 0)        {            if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >                (std::min)(clamp(impl.rd_remain),                    buffer_bytes(buffers)))            {                // Fill the read buffer first, otherwise we                // get fewer bytes at the cost of one I/O.                impl.rd_buf.commit(impl.stream().read_some(                    impl.rd_buf.prepare(read_size(impl.rd_buf,                        impl.rd_buf.max_size())), ec));                if(impl.check_stop_now(ec))                    return bytes_written;                if(impl.rd_fh.mask)                    detail::mask_inplace(                        buffers_prefix(clamp(impl.rd_remain),                            impl.rd_buf.data()), impl.rd_key);            }            if(impl.rd_buf.size() > 0)            {                // Copy from the read buffer.                // The mask was already applied.                auto const bytes_transferred = net::buffer_copy(                    buffers, impl.rd_buf.data(),                        clamp(impl.rd_remain));                auto const mb = buffers_prefix(                    bytes_transferred, buffers);                impl.rd_remain -= bytes_transferred;                if(impl.rd_op == detail::opcode::text)                {                    if(! impl.rd_utf8.write(mb) ||                        (impl.rd_remain == 0 && impl.rd_fh.fin &&                            ! impl.rd_utf8.finish()))                    {                        // _Fail the WebSocket Connection_                        do_fail(close_code::bad_payload,                            error::bad_frame_payload, ec);                        return bytes_written;                    }                }                bytes_written += bytes_transferred;                impl.rd_size += bytes_transferred;                impl.rd_buf.consume(bytes_transferred);            }            else            {                // Read into caller's buffer                BOOST_ASSERT(impl.rd_remain > 0);                BOOST_ASSERT(buffer_bytes(buffers) > 0);                BOOST_ASSERT(buffer_bytes(buffers_prefix(                    clamp(impl.rd_remain), buffers)) > 0);                auto const bytes_transferred =                    impl.stream().read_some(buffers_prefix(                        clamp(impl.rd_remain), buffers), ec);                // VFALCO What if some bytes were written?                if(impl.check_stop_now(ec))                    return bytes_written;                BOOST_ASSERT(bytes_transferred > 0);                auto const mb = buffers_prefix(                    bytes_transferred, buffers);                impl.rd_remain -= bytes_transferred;                if(impl.rd_fh.mask)                    detail::mask_inplace(mb, impl.rd_key);                if(impl.rd_op == detail::opcode::text)                {                    if(! impl.rd_utf8.write(mb) ||                        (impl.rd_remain == 0 && impl.rd_fh.fin &&                            ! impl.rd_utf8.finish()))                    {                        // _Fail the WebSocket Connection_                        do_fail(close_code::bad_payload,                            error::bad_frame_payload, ec);                        return bytes_written;                    }                }                bytes_written += bytes_transferred;                impl.rd_size += bytes_transferred;            }        }        BOOST_ASSERT( ! impl.rd_done );        if( impl.rd_remain == 0 && impl.rd_fh.fin )            impl.rd_done = true;    }    else    {        // Read compressed message frame payload:        // inflate even if rd_fh_.len == 0, otherwise we        // never emit the end-of-stream deflate block.        //        bool did_read = false;        buffers_suffix<MutableBufferSequence> cb(buffers);        while(buffer_bytes(cb) > 0)        {            zlib::z_params zs;            {                auto const out = beast::buffers_front(cb);                zs.next_out = out.data();                zs.avail_out = out.size();                BOOST_ASSERT(zs.avail_out > 0);            }            // boolean to track the end of the message.            bool fin = false;            if(impl.rd_remain > 0)            {                if(impl.rd_buf.size() > 0)                {                    // use what's there                    auto const in = buffers_prefix(                        clamp(impl.rd_remain), beast::buffers_front(                            impl.rd_buf.data()));                    zs.avail_in = in.size();                    zs.next_in = in.data();                }                else if(! did_read)                {                    // read new                    auto const bytes_transferred =                        impl.stream().read_some(                            impl.rd_buf.prepare(read_size(                                impl.rd_buf, impl.rd_buf.max_size())),                            ec);                    if(impl.check_stop_now(ec))                        return bytes_written;                    BOOST_ASSERT(bytes_transferred > 0);                    impl.rd_buf.commit(bytes_transferred);                    if(impl.rd_fh.mask)                        detail::mask_inplace(                            buffers_prefix(clamp(impl.rd_remain),                                impl.rd_buf.data()), impl.rd_key);                    auto const in = buffers_prefix(                        clamp(impl.rd_remain), buffers_front(                            impl.rd_buf.data()));                    zs.avail_in = in.size();                    zs.next_in = in.data();                    did_read = true;                }                else                {                    break;                }            }            else if(impl.rd_fh.fin)            {                // append the empty block codes                static std::uint8_t constexpr                    empty_block[4] = { 0x00, 0x00, 0xff, 0xff };                zs.next_in = empty_block;                zs.avail_in = sizeof(empty_block);                fin = true;            }            else            {                break;            }            impl.inflate(zs, zlib::Flush::sync, ec);            if(impl.check_stop_now(ec))                return bytes_written;            if (fin && zs.total_out == 0) {                impl.do_context_takeover_read(impl.role);                impl.rd_done = true;                break;            }            if(impl.rd_msg_max && beast::detail::sum_exceeds(                impl.rd_size, zs.total_out, impl.rd_msg_max))            {                do_fail(close_code::too_big,                    error::message_too_big, ec);                return bytes_written;            }            cb.consume(zs.total_out);            impl.rd_size += zs.total_out;            if (! fin) {                impl.rd_remain -= zs.total_in;                impl.rd_buf.consume(zs.total_in);            }            bytes_written += zs.total_out;        }        if(impl.rd_op == detail::opcode::text)        {            // check utf8            if(! impl.rd_utf8.write(beast::buffers_prefix(                bytes_written, buffers)) || (                    impl.rd_done && ! impl.rd_utf8.finish()))            {                // _Fail the WebSocket Connection_                do_fail(close_code::bad_payload,                    error::bad_frame_payload, ec);                return bytes_written;            }        }    }    return bytes_written;}template<class NextLayer, bool deflateSupported>template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>BOOST_BEAST_ASYNC_RESULT2(ReadHandler)stream<NextLayer, deflateSupported>::async_read_some(    MutableBufferSequence const& buffers,    ReadHandler&& handler){    static_assert(is_async_stream<next_layer_type>::value,        "AsyncStream type requirements not met");    static_assert(net::is_mutable_buffer_sequence<            MutableBufferSequence>::value,        "MutableBufferSequence type requirements not met");    return net::async_initiate<        ReadHandler,        void(error_code, std::size_t)>(            run_read_some_op{impl_},            handler,            buffers);}} // websocket} // beast} // boost#endif
 |