FETPAPI 0.6.0.0
This project provides C++ classes which facilitate the developement of ETP1.2 clients and servers.
Loading...
Searching...
No Matches
AbstractClientSessionCRTP.h
1/*-----------------------------------------------------------------------
2Licensed to the Apache Software Foundation (ASF) under one
3or more contributor license agreements. See the NOTICE file
4distributed with this work for additional information
5regarding copyright ownership. The ASF licenses this file
6to you under the Apache License, Version 2.0 (the
7"License"; you may not use this file except in compliance
8with the License. You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing,
13software distributed under the License is distributed on an
14"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15KIND, either express or implied. See the License for the
16specific language governing permissions and limitations
17under the License.
18-----------------------------------------------------------------------*/
19#pragma once
20
21#include "ClientSession.h"
22
23namespace ETP_NS
24{
25 // This uses the Curiously Recurring Template Pattern so that the same code works with both SSL streams and regular sockets.
26 template<class Derived>
27 class AbstractClientSessionCRTP : public ETP_NS::ClientSession
28 {
29 public:
30
31 virtual ~AbstractClientSessionCRTP() = default;
32
33 void on_ssl_handshake(boost::system::error_code ec) {
34 if (ec) {
35 std::cerr << "ERROR at Websocket connection : " << ec.message() << std::endl;
36 return;
37 }
38
39#if BOOST_VERSION < 107000
40 // Perform the websocket handshake
41 derived().ws()->async_handshake_ex(responseType,
42 etpServerHost + ":" + etpServerPort, etpServerTarget,
43 [&](websocket::request_type& m)
44 {
45 m.insert(boost::beast::http::field::sec_websocket_protocol, "etp12.energistics.org");
46 m.insert(boost::beast::http::field::authorization, etpServerAuthorization);
47 if (!proxyHost.empty() && !isTls()) {
48 m.insert(boost::beast::http::field::proxy_authorization, proxyAuthorization);
49 }
50 m.insert("etp-encoding", "binary");
51 for (const auto& mapEntry : additionalHandshakeHeaderFields_) {
52 m.insert(mapEntry.first, mapEntry.second);
53 }
54 },
55 std::bind(
56 &ClientSession::on_handshake,
57 std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
58 std::placeholders::_1));
59#else
60 derived().ws()->set_option(websocket::stream_base::decorator(
61 [&](websocket::request_type& m)
62 {
63 m.insert(boost::beast::http::field::sec_websocket_protocol, "etp12.energistics.org");
64 m.insert(boost::beast::http::field::authorization, etpServerAuthorization);
65 if (!proxyHost.empty() && !isTls()) {
66 m.insert(boost::beast::http::field::proxy_authorization, proxyAuthorization);
67 }
68 m.insert("etp-encoding", "binary");
69 for (const auto& mapEntry : additionalHandshakeHeaderFields_) {
70 m.insert(mapEntry.first, mapEntry.second);
71 }
72 })
73 );
74 // Perform the websocket handshake
75 derived().ws()->async_handshake(responseType,
76 etpServerHost + ":" + etpServerPort, etpServerTarget,
77 std::bind(
78 &ClientSession::on_handshake,
79 std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
80 std::placeholders::_1));
81#endif
82 }
83
88 FETPAPI_DLL_IMPORT_OR_EXPORT void do_close() {
89 derived().ws()->async_close(websocket::close_code::normal,
90 std::bind(
91 &AbstractSession::on_close,
92 shared_from_this(),
93 std::placeholders::_1));
94 }
95
96 FETPAPI_DLL_IMPORT_OR_EXPORT void do_read()
97 {
98 if (webSocketSessionClosed) {
99 fesapi_log("CLOSED : NOTHING MORE TO DO");
100 return;
101 }
102
103 // Read a message into our buffer
104 derived().ws()->async_read(
105 receivedBuffer,
106 std::bind(
108 shared_from_this(),
109 std::placeholders::_1,
110 std::placeholders::_2));
111 }
112
113 void setMaxWebSocketMessagePayloadSize(uint64_t value) final {
114 maxWebSocketMessagePayloadSize = value;
115 derived().ws()->read_message_max(value);
116 }
117
118 protected:
119 using ClientSession::ClientSession;
120
121 // Access the derived class, this is part of the Curiously Recurring Template Pattern idiom.
122 Derived& derived() { return static_cast<Derived&>(*this); }
123
124 void do_write() {
125 if (sendingQueue.empty()) {
126 fesapi_log("The sending queue is empty.");
127 return;
128 }
129
130 auto& front = sendingQueue.front();
131 const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
132 bool previousSentMessageCompleted = specificProtocolHandlers.find(std::get<0>(front)->messageHeader.messageId) == specificProtocolHandlers.end();
133
134 if (!previousSentMessageCompleted) {
135 fesapi_log("Cannot send Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId), "because the previous message has not finished to be sent.");
136 }
137 else {
138 fesapi_log("Sending Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId));
139
140 auto avroBytes = std::get<0>(front)->encodeHeaderAndBody();
141
142 //asio::buffer is a non-owning view. We must keep the underlying storage alive until the I/O completes.
143 if (avroBytes->size() < maxWebSocketMessagePayloadSize) {
144 derived().ws()->async_write(
145 boost::asio::buffer(*avroBytes),
146 [this, self{ this->shared_from_this() }, avroBytes](boost::system::error_code ec, std::size_t)
147 ->void
148 {
149
150 if (ec) {
151 std::cerr << "on_write : " << ec.message() << std::endl;
152 }
153 else {
154 // Register the handler to respond to the sent message
155 const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
156 auto& front = sendingQueue.front();
157 auto nextMessage = std::get<0>(front);
158 specificProtocolHandlers[nextMessage->messageHeader.messageId] =
159 std::make_tuple(nextMessage, std::get<1>(front), std::get<2>(front));
160 }
161
162 // Remove the sent message from the queue
163 const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
164 sendingQueue.pop();
165
166 do_write();
167 });
168 }
169 else {
170 throw std::invalid_argument("You cannot send a message which is too big. Please use message part or chunk or whatever else.");
171 }
172 }
173 }
174 };
175}
Definition AbstractClientSessionCRTP.h:28
FETPAPI_DLL_IMPORT_OR_EXPORT void do_close()
Definition AbstractClientSessionCRTP.h:88
FETPAPI_DLL_IMPORT_OR_EXPORT void do_read()
Definition AbstractClientSessionCRTP.h:96
FETPAPI_DLL_IMPORT_OR_EXPORT void on_read(boost::system::error_code ec, std::size_t bytes_transferred)