33 void on_ssl_handshake(boost::system::error_code ec) {
35 std::cerr <<
"ERROR at Websocket connection : " << ec.message() << std::endl;
39#if BOOST_VERSION < 107000
41 derived().ws()->async_handshake_ex(responseType,
42 etpServerHost +
":" + etpServerPort, etpServerTarget,
43 [&](websocket::request_type& m)
45 m.insert(boost::beast::http::field::sec_websocket_protocol,
"etp12.energistics.org");
46 m.insert(boost::beast::http::field::authorization, etpServerAuthorization);
47 if (!proxyHost.empty() && !isTls()) {
48 m.insert(boost::beast::http::field::proxy_authorization, proxyAuthorization);
50 m.insert(
"etp-encoding",
"binary");
51 for (
const auto& mapEntry : additionalHandshakeHeaderFields_) {
52 m.insert(mapEntry.first, mapEntry.second);
56 &ClientSession::on_handshake,
57 std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
58 std::placeholders::_1));
60 derived().ws()->set_option(websocket::stream_base::decorator(
61 [&](websocket::request_type& m)
63 m.insert(boost::beast::http::field::sec_websocket_protocol,
"etp12.energistics.org");
64 m.insert(boost::beast::http::field::authorization, etpServerAuthorization);
65 if (!proxyHost.empty() && !isTls()) {
66 m.insert(boost::beast::http::field::proxy_authorization, proxyAuthorization);
68 m.insert(
"etp-encoding",
"binary");
69 for (
const auto& mapEntry : additionalHandshakeHeaderFields_) {
70 m.insert(mapEntry.first, mapEntry.second);
75 derived().ws()->async_handshake(responseType,
76 etpServerHost +
":" + etpServerPort, etpServerTarget,
78 &ClientSession::on_handshake,
79 std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
80 std::placeholders::_1));
89 derived().ws()->async_close(websocket::close_code::normal,
91 &AbstractSession::on_close,
93 std::placeholders::_1));
96 FETPAPI_DLL_IMPORT_OR_EXPORT
void do_read()
98 if (webSocketSessionClosed) {
99 fesapi_log(
"CLOSED : NOTHING MORE TO DO");
104 derived().ws()->async_read(
109 std::placeholders::_1,
110 std::placeholders::_2));
113 void setMaxWebSocketMessagePayloadSize(uint64_t value)
final {
114 maxWebSocketMessagePayloadSize = value;
115 derived().ws()->read_message_max(value);
119 using ClientSession::ClientSession;
122 Derived& derived() {
return static_cast<Derived&
>(*this); }
125 if (sendingQueue.empty()) {
126 fesapi_log(
"The sending queue is empty.");
130 auto& front = sendingQueue.front();
131 const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
132 bool previousSentMessageCompleted = specificProtocolHandlers.find(std::get<0>(front)->messageHeader.messageId) == specificProtocolHandlers.end();
134 if (!previousSentMessageCompleted) {
135 fesapi_log(
"Cannot send Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId),
"because the previous message has not finished to be sent.");
138 fesapi_log(
"Sending Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId));
140 auto avroBytes = std::get<0>(front)->encodeHeaderAndBody();
143 if (avroBytes->size() < maxWebSocketMessagePayloadSize) {
144 derived().ws()->async_write(
145 boost::asio::buffer(*avroBytes),
146 [
this, self{ this->shared_from_this() }, avroBytes](boost::system::error_code ec, std::size_t)
151 std::cerr <<
"on_write : " << ec.message() << std::endl;
155 const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
156 auto& front = sendingQueue.front();
157 auto nextMessage = std::get<0>(front);
158 specificProtocolHandlers[nextMessage->messageHeader.messageId] =
159 std::make_tuple(nextMessage, std::get<1>(front), std::get<2>(front));
163 const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
170 throw std::invalid_argument(
"You cannot send a message which is too big. Please use message part or chunk or whatever else.");