FETPAPI 0.6.0.0
This project provides C++ classes which facilitate the developement of ETP1.2 clients and servers.
Loading...
Searching...
No Matches
ClientSession.h
1/*-----------------------------------------------------------------------
2Licensed to the Apache Software Foundation (ASF) under one
3or more contributor license agreements. See the NOTICE file
4distributed with this work for additional information
5regarding copyright ownership. The ASF licenses this file
6to you under the Apache License, Version 2.0 (the
7"License"; you may not use this file except in compliance
8with the License. You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing,
13software distributed under the License is distributed on an
14"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15KIND, either express or implied. See the License for the
16specific language governing permissions and limitations
17under the License.
18-----------------------------------------------------------------------*/
19#pragma once
20
21#include "AbstractSession.h"
22
23#include "InitializationParameters.h"
24
25namespace ETP_NS
26{
27 class ClientSession : public ETP_NS::AbstractSession
28 {
29 public:
30
31 virtual ~ClientSession() = default;
32
33 boost::asio::io_context& getIoContext() {
34 return ioc;
35 }
36
37 const std::string& getEtpServerHost() const { return etpServerHost; }
38 const std::string& getEtpServerPort() const { return etpServerPort; }
39 const std::string& getEtpServerTarget() const { return etpServerTarget; }
40 const std::string& getEtpServerAuthorization() const { return etpServerAuthorization; }
41 const std::string& getProxyHost() const { return proxyHost; }
42 const std::string& getProxyPort() const { return proxyPort; }
43 const std::string& getProxyAuthorization() const { return proxyAuthorization; }
44
51 void run() {
52 // Look up the domain name before to run the session
53 // It is important to do this before to run the io context. Otherwise running the io context would return immediately if nothing has to be done.
54 resolver.async_resolve(
55 proxyHost.empty() ? etpServerHost : proxyHost,
56 proxyHost.empty() ? etpServerPort : proxyPort,
57 std::bind(
58 &ClientSession::on_resolve,
59 std::static_pointer_cast<ClientSession>(shared_from_this()),
60 std::placeholders::_1,
61 std::placeholders::_2));
62
63 // Run the io_context to perform the resolver and all other binding functions
64 // Run will return only when there will no more be any uncomplete operations (such as a reading operation for example)
65 getIoContext().run();
66
67 // Try to reconnect up to 10 times
68 if (!isCloseRequested_ && reconnectionTryCount_ < 10) {
69 ++reconnectionTryCount_;
70 std::cerr << "Session " << getIdentifier() << " has been disconnected, trying to reconnect... " << reconnectionTryCount_ << "/10" << std::endl;
71 getIoContext().restart();
72 run();
73 }
74
75 if (!isCloseRequested_ && reconnectionTryCount_ >= 10) {
76 std::cerr << "Could not reconnect after 10 retries... Give up and close" << reconnectionTryCount_ << "/10" << std::endl;
77 isCloseRequested_ = true;
78 }
79 }
80
81 void on_resolve(boost::system::error_code ec, tcp::resolver::results_type results) {
82 if (ec) {
83 std::cerr << "on_resolve : " << ec.message() << std::endl;
84 }
85
86 asyncConnect(results);
87 }
88
89 virtual void asyncConnect(const tcp::resolver::results_type& results) = 0;
90
91 virtual bool isTls() const = 0;
92
93 void on_handshake(boost::system::error_code ec)
94 {
95 if (ec) {
96 std::cerr << "on WS handshake, error code number : " << ec.value() << std::endl;
97 std::cerr << "on WS handshake, error message : " << ec.message() << std::endl;
98 std::cerr << "on WS handshake, error category : " << ec.category().name() << std::endl;
99 std::cerr << "Sometimes some ETP server require a trailing slash at the end of their URL. Did you also check your optional \"data-partition-id\" additional Header Field? Has your token expired?" << std::endl;
100 return;
101 }
102
103 if (!responseType.count(boost::beast::http::field::sec_websocket_protocol) ||
104 responseType[boost::beast::http::field::sec_websocket_protocol] != "etp12.energistics.org")
105 std::cerr << "The client MUST specify the Sec-Websocket-Protocol header value of etp12.energistics.org, and the server MUST reply with the same" << std::endl;
106
107 fesapi_log("Now connected to Websocket");
108 webSocketSessionClosed = false;
109
110 if (reconnectionTryCount_ > 0) {
111 auto resumeSession = std::make_shared<Energistics::Etp::v12::Protocol::CoreOSDU::ResumeSession>();
112 resumeSession->applicationName = requestSession->applicationName;
113 resumeSession->applicationVersion = requestSession->applicationVersion;
114 resumeSession->clientInstanceId = requestSession->clientInstanceId;
115 std::copy(identifier.begin(), identifier.end(), resumeSession->sessionId.array.begin());
116 send(resumeSession, 0, 0x02);
117 }
118 else {
119 send(requestSession, 0, 0x02);
120 }
121 do_read();
122 }
123
124 protected:
125 boost::asio::io_context ioc;
126 tcp::resolver resolver;
127 std::string etpServerHost;
128 std::string etpServerPort;
129 std::string etpServerTarget;
130 std::string etpServerAuthorization;
131 std::string proxyHost;
132 std::string proxyPort;
133 std::string proxyAuthorization;
134 std::map<std::string, std::string> additionalHandshakeHeaderFields_;
135 websocket::response_type responseType; // In order to check handshake sec_websocket_protocol
136 std::shared_ptr<Energistics::Etp::v12::Protocol::Core::RequestSession> requestSession;
137
144 ClientSession(
145 InitializationParameters const* initializationParams, const std::string& target, const std::string& etpServerAuth, const std::string& proxyAuth = "") :
146 ioc(),
147 resolver(ioc),
148 etpServerHost(initializationParams->getEtpServerHost()),
149 etpServerPort(std::to_string(initializationParams->getEtpServerPort())),
150 etpServerTarget(target),
151 etpServerAuthorization(etpServerAuth),
152 proxyHost(initializationParams->getProxyHost()),
153 proxyPort(std::to_string(initializationParams->getProxyPort())),
154 proxyAuthorization(proxyAuth)
155 {
156 messageId = 2; // The client side of the connection MUST use ONLY non-zero even-numbered messageIds.
157
158 initializationParams->postSessionCreationOperation(this);
159
160 // Build the request session
161 requestSession = std::make_shared<Energistics::Etp::v12::Protocol::Core::RequestSession>();
162 requestSession->applicationName = initializationParams->getApplicationName();
163 requestSession->applicationVersion = initializationParams->getApplicationVersion();
164
165 std::copy(initializationParams->getInstanceId().begin(), initializationParams->getInstanceId().end(), requestSession->clientInstanceId.array.begin());
166
167 requestSession->requestedProtocols = initializationParams->makeSupportedProtocols();
168 requestSession->supportedDataObjects = initializationParams->makeSupportedDataObjects();
169 requestSession->supportedFormats.push_back("xml");
170 requestSession->currentDateTime = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
171
172 auto caps = initializationParams->makeEndpointCapabilities();
173 if (!caps.empty()) {
174 requestSession->endpointCapabilities = caps;
175 }
176
177 maxWebSocketMessagePayloadSize = initializationParams->getMaxWebSocketMessagePayloadSize();
178 }
179 };
180}
int64_t send(std::shared_ptr< EtpMessage > message, int64_t correlationId=0, int32_t messageFlags=0, const std::vector< int64_t > &msgIdAliases={})
Definition AbstractSession.h:161
const boost::uuids::uuid & getIdentifier()
Definition AbstractSession.h:61
virtual void do_read()=0
void run()
Definition ClientSession.h:51