33 void on_connect(boost::system::error_code ec) {
35 std::cerr <<
"on_connect : " << ec.message() << std::endl;
38#if BOOST_VERSION < 107000
40 derived().ws().async_handshake_ex(responseType,
41 etpServerHost +
":" + etpServerPort, etpServerTarget,
42 [&](websocket::request_type& m)
44 m.insert(boost::beast::http::field::sec_websocket_protocol,
"etp12.energistics.org");
45 m.insert(boost::beast::http::field::authorization, etpServerAuthorization);
46 if (!proxyHost.empty() && !isTls()) {
47 m.insert(boost::beast::http::field::proxy_authorization, proxyAuthorization);
49 m.insert(
"etp-encoding",
"binary");
50 for (
const auto& mapEntry : additionalHandshakeHeaderFields_) {
51 m.insert(mapEntry.first, mapEntry.second);
55 &AbstractClientSessionCRTP::on_handshake,
56 std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
57 std::placeholders::_1));
59 derived().ws().set_option(websocket::stream_base::decorator(
60 [&](websocket::request_type& m)
62 m.insert(boost::beast::http::field::sec_websocket_protocol,
"etp12.energistics.org");
63 m.insert(boost::beast::http::field::authorization, etpServerAuthorization);
64 if (!proxyHost.empty() && !isTls()) {
65 m.insert(boost::beast::http::field::proxy_authorization, proxyAuthorization);
67 m.insert(
"etp-encoding",
"binary");
68 for (
const auto& mapEntry : additionalHandshakeHeaderFields_) {
69 m.insert(mapEntry.first, mapEntry.second);
74 derived().ws().async_handshake(responseType,
75 etpServerHost +
":" + etpServerPort, etpServerTarget,
77 &AbstractClientSessionCRTP::on_handshake,
78 std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
79 std::placeholders::_1));
88 derived().ws().async_close(websocket::close_code::normal,
90 &AbstractSession::on_close,
92 std::placeholders::_1));
95 FETPAPI_DLL_IMPORT_OR_EXPORT
void do_read()
97 if (webSocketSessionClosed) {
98 fesapi_log(
"CLOSED : NOTHING MORE TO DO");
103 derived().ws().async_read(
106 &AbstractSession::on_read,
108 std::placeholders::_1,
109 std::placeholders::_2));
112 void on_handshake(boost::system::error_code ec)
115 std::cerr <<
"on_handshake : " << ec.message() << std::endl;
116 std::cerr <<
"Sometimes some ETP server require a trailing slash at the end of their URL. Did you also check your optional \"data-partition-id\" additional Header Field?" << std::endl;
120 if (!responseType.count(boost::beast::http::field::sec_websocket_protocol) ||
121 responseType[boost::beast::http::field::sec_websocket_protocol] !=
"etp12.energistics.org")
122 std::cerr <<
"The client MUST specify the Sec-Websocket-Protocol header value of etp12.energistics.org, and the server MUST reply with the same" << std::endl;
124 successfulConnection =
true;
125 webSocketSessionClosed =
false;
127 send(requestSession, 0, 0x02);
131 void setMaxWebSocketMessagePayloadSize(int64_t value)
final {
132 maxWebSocketMessagePayloadSize = value;
133 derived().ws().read_message_max(value);
137 using ClientSession::ClientSession;
140 Derived& derived() {
return static_cast<Derived&
>(*this); }
143 const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
144 if (sendingQueue.empty()) {
145 fesapi_log(
"The sending queue is empty.");
149 bool previousSentMessageCompleted = specificProtocolHandlers.find(std::get<0>(sendingQueue.front())) == specificProtocolHandlers.end();
151 if (!previousSentMessageCompleted) {
152 fesapi_log(
"Cannot send Message id :", std::to_string(std::get<0>(sendingQueue.front())),
"because the previous message has not finished to be sent.");
155 fesapi_log(
"Sending Message id :", std::to_string(std::get<0>(sendingQueue.front())));
157 derived().ws().async_write(
158 boost::asio::buffer(std::get<1>(sendingQueue.front())),
160 &AbstractSession::on_write,
162 std::placeholders::_1,
163 std::placeholders::_2));
166 specificProtocolHandlers[std::get<0>(sendingQueue.front())] = std::get<2>(sendingQueue.front());