FETPAPI 0.6.0.0
This project provides C++ classes which facilitate the developement of ETP1.2 clients and servers.
Loading...
Searching...
No Matches
AbstractSession.h
1/*-----------------------------------------------------------------------
2Licensed to the Apache Software Foundation (ASF) under one
3or more contributor license agreements. See the NOTICE file
4distributed with this work for additional information
5regarding copyright ownership. The ASF licenses this file
6to you under the Apache License, Version 2.0 (the
7"License"; you may not use this file except in compliance
8with the License. You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing,
13software distributed under the License is distributed on an
14"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15KIND, either express or implied. See the License for the
16specific language governing permissions and limitations
17under the License.
18-----------------------------------------------------------------------*/
19#pragma once
20
21#include <iostream>
22#include <mutex>
23#include <queue>
24#include <unordered_map>
25#include <utility>
26
27#include <boost/asio.hpp>
28#include <boost/beast/core.hpp>
29#include <boost/beast/websocket.hpp>
30#include <boost/uuid/uuid_io.hpp>
31#include <boost/uuid/nil_generator.hpp>
32
33#include "EtpHelpers.h"
34#include "protocolHandlers/CoreHandlers.h"
35#include "protocolHandlers/DiscoveryHandlers.h"
36#include "protocolHandlers/StoreHandlers.h"
37#include "protocolHandlers/StoreNotificationHandlers.h"
38#include "protocolHandlers/DataArrayHandlers.h"
39#include "protocolHandlers/TransactionHandlers.h"
40#include "protocolHandlers/DataspaceHandlers.h"
41#include "protocolHandlers/CoreOSDUHandlers.h"
42#include "protocolHandlers/StoreOSDUHandlers.h"
43#include "protocolHandlers/DataspaceOSDUHandlers.h"
44
45namespace beast = boost::beast; // from <boost/beast.hpp>
46namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
47using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
48
49namespace ETP_NS
50{
51 class AbstractSession : public std::enable_shared_from_this<AbstractSession>
52 {
53 public:
54
55 virtual ~AbstractSession() = default;
56
61 const boost::uuids::uuid& getIdentifier() {
62 std::lock_guard<std::mutex> lock(identifierMutex);
63 return identifier;
64 }
65
69 void setTimeOut(double timeOut) {
70 _timeOut = timeOut;
71 }
72
76 double getTimeOut() const {
77 return _timeOut;
78 }
79
83 FETPAPI_DLL_IMPORT_OR_EXPORT void setCoreProtocolHandlers(std::shared_ptr<CoreHandlers> coreHandlers) {
84 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Core), coreHandlers);
85 }
86
90 FETPAPI_DLL_IMPORT_OR_EXPORT void setDiscoveryProtocolHandlers(std::shared_ptr<DiscoveryHandlers> discoveryHandlers) {
91 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Discovery), discoveryHandlers);
92 }
93
97 FETPAPI_DLL_IMPORT_OR_EXPORT void setStoreProtocolHandlers(std::shared_ptr<StoreHandlers> storeHandlers) {
98 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Store), storeHandlers);
99 }
100
104 FETPAPI_DLL_IMPORT_OR_EXPORT void setCoreOSDUProtocolHandlers(std::shared_ptr<CoreOSDUHandlers> coreOSDUHandlers) {
105 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::CoreOSDU), coreOSDUHandlers);
106 }
107
111 FETPAPI_DLL_IMPORT_OR_EXPORT void setStoreOSDUProtocolHandlers(std::shared_ptr<StoreOSDUHandlers> storeOSDUHandlers) {
112 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::StoreOSDU), storeOSDUHandlers);
113 }
114
118 FETPAPI_DLL_IMPORT_OR_EXPORT void setStoreNotificationProtocolHandlers(std::shared_ptr<ETP_NS::StoreNotificationHandlers> storeNotificationHandlers) {
119 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::StoreNotification), storeNotificationHandlers);
120 }
121
125 FETPAPI_DLL_IMPORT_OR_EXPORT void setDataArrayProtocolHandlers(std::shared_ptr<DataArrayHandlers> dataArrayHandlers) {
126 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::DataArray), dataArrayHandlers);
127 }
128
132 FETPAPI_DLL_IMPORT_OR_EXPORT void setTransactionProtocolHandlers(std::shared_ptr<TransactionHandlers> transactionHandlers) {
133 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Transaction), transactionHandlers);
134 }
135
139 FETPAPI_DLL_IMPORT_OR_EXPORT void setDataspaceProtocolHandlers(std::shared_ptr<DataspaceHandlers> dataspaceHandlers) {
140 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Dataspace), dataspaceHandlers);
141 }
142
146 FETPAPI_DLL_IMPORT_OR_EXPORT void setDataspaceOSDUProtocolHandlers(std::shared_ptr<DataspaceOSDUHandlers> dataspaceOSDUHandlers) {
147 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::DataspaceOSDU), dataspaceOSDUHandlers);
148 }
149
161 int64_t send(std::shared_ptr<EtpMessage> message,
162 int64_t correlationId = 0, int32_t messageFlags = 0, const std::vector<int64_t>& msgIdAliases = {})
163 {
164 return sendWithSpecificHandler(message, protocolHandlers.at(message->messageHeader.protocol), correlationId, messageFlags, msgIdAliases);
165 }
166
178 int64_t sendAndBlock(std::shared_ptr<EtpMessage> message,
179 int64_t correlationId = 0, int32_t messageFlags = 0, const std::vector<int64_t>& msgIdAliases = {})
180 {
181 int64_t msgId = send(message, correlationId, messageFlags, msgIdAliases);
182 // The correlationId of the first message MUST be set to 0 and the correlationId of all successive
183 // messages in the same multipart request or notification MUST be set to the messageId of the first
184 // message of the multipart request or notification.
185 // If the request message is itself multipart, the correlationId of each message of the multipart
186 // response MUST be set to the messageId of the FIRST message in the multipart request.
187
188 auto t_start = std::chrono::high_resolution_clock::now();
189 while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
190 if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
191 throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
192 }
193 }
194
195 return msgId;
196 }
197
209 int64_t sendWithSpecificHandler(std::shared_ptr<EtpMessage> message, std::shared_ptr<ETP_NS::ProtocolHandlers> specificHandler,
210 int64_t correlationId = 0, int32_t messageFlags = 0, const std::vector<int64_t>& msgIdAliases = {})
211 {
212 message->messageHeader.correlationId = correlationId;
213 message->messageHeader.messageFlags = messageFlags;
214 message->messageHeader.messageId = messageId.fetch_add(2);
215
216 const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
217
218 // Push the message into the queue
219 sendingQueue.push(std::make_tuple(message, specificHandler, msgIdAliases));
220 fesapi_log(message->to_string());
221
222 if (message->messageHeader.protocol == static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::CoreOSDU)
223 && message->messageHeader.messageType == Energistics::Etp::v12::Protocol::CoreOSDU::ResumeSession::messageTypeId) {
224 // Handle previous messages which have not been processed but written in the previous sendingQueue.
225 std::queue< std::tuple<std::shared_ptr<EtpMessage>, std::shared_ptr<ETP_NS::ProtocolHandlers>, std::vector<int64_t>> > queueWithPreviousInformation;
226 // Put ResumeSession at first position whatever opens
227 queueWithPreviousInformation.push(std::make_tuple(message, specificHandler, msgIdAliases));
228 // Reorder message according to their id
229 int64_t previouslyDuplicatedId = -1;
230 for (size_t i = 0; i < specificProtocolHandlers.size(); ++i) {
231 int64_t lowestMsgId = (std::numeric_limits<int64_t>::max)();
232 for (auto keyVal : specificProtocolHandlers) {
233 if (keyVal.first > previouslyDuplicatedId && keyVal.first < lowestMsgId) {
234 lowestMsgId = keyVal.first;
235 }
236 }
237 previouslyDuplicatedId = lowestMsgId;
238 auto val = specificProtocolHandlers[lowestMsgId];
239 std::get<0>(val)->messageHeader.messageId = messageId.fetch_add(2);
240 std::get<2>(val).push_back(lowestMsgId);
241 queueWithPreviousInformation.push(val);
242 fesapi_log(std::get<0>(val)->to_string());
243 }
244 // The only remaining item must be the resumeSession which is already at the first position of queueWithPreviousInformation
245 while (sendingQueue.size() != 1) {
246 std::get<0>(sendingQueue.front())->messageHeader.messageId = messageId.fetch_add(2);
247 queueWithPreviousInformation.push(sendingQueue.front());
248 fesapi_log(std::get<0>(sendingQueue.front())->to_string());
249 sendingQueue.pop();
250 }
251 if (queueWithPreviousInformation.size() > 1) {
252 std::swap(sendingQueue, queueWithPreviousInformation);
253 }
254 do_write();
255 }
256 else if (sendingQueue.size() == 1) {
257 // Send the message directly if the sending queue was empty.
258 do_write();
259 }
260
261 return message->messageHeader.messageId;
262 }
263
275 int64_t sendWithSpecificHandlerAndBlock(std::shared_ptr<EtpMessage> message, std::shared_ptr<ETP_NS::ProtocolHandlers> specificHandler,
276 int64_t correlationId = 0, int32_t messageFlags = 0, const std::vector<int64_t>& msgIdAliases = {})
277 {
278 const int64_t msgId = sendWithSpecificHandler(message, specificHandler, correlationId, messageFlags, msgIdAliases);
279 // The correlationId of the first message MUST be set to 0 and the correlationId of all successive
280 // messages in the same multipart request or notification MUST be set to the messageId of the first
281 // message of the multipart request or notification.
282 // If the request message is itself multipart, the correlationId of each message of the multipart
283 // response MUST be set to the messageId of the FIRST message in the multipart request.
284
285 const auto t_start = std::chrono::high_resolution_clock::now();
286 while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
287 if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
288 throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
289 }
290 }
291
292 // If the message has not been answered correctly
294 // Wait for a reconnection
295 while (isEtpSessionClosed() && !isCloseRequested()) {}
296 // Check if reconnection is successfull
297 if (isEtpSessionClosed()) {
298 throw std::runtime_error("The ETP session could not be opened in order to send again the message.");
299 }
300 else {
301 return sendWithSpecificHandlerAndBlock(message, specificHandler, correlationId, messageFlags);
302 }
303 }
304
305 return msgId;
306 }
307
311 virtual void do_close() = 0;
312
316 virtual void do_read() = 0;
317
321 FETPAPI_DLL_IMPORT_OR_EXPORT void on_read(boost::system::error_code ec, std::size_t bytes_transferred);
322
323 void on_close(boost::system::error_code ec) {
324 if(ec) {
325 std::cerr << "on_close : " << ec.message() << std::endl;
326 }
327 if (!etpSessionClosed) {
328 std::cerr << "Websocket session is going to be closed BUT ETP SESSION HAS NOT BEEN CLOSED YET!!!" << std::endl;
329 }
330
331 // If we get here then the connection is closed gracefully
332 webSocketSessionClosed = true;
333 }
334
338 FETPAPI_DLL_IMPORT_OR_EXPORT bool isCloseRequested() const { return isCloseRequested_; }
339
343 FETPAPI_DLL_IMPORT_OR_EXPORT bool isWebSocketSessionClosed() const { return webSocketSessionClosed; }
344
348 FETPAPI_DLL_IMPORT_OR_EXPORT bool isMessageStillProcessing(int64_t msgId) {
349 std::scoped_lock lock(sendingQueueMutex, specificProtocolHandlersMutex);
350 return (!sendingQueue.empty() && std::get<0>(sendingQueue.front())->messageHeader.messageId <= msgId) || specificProtocolHandlers.count(msgId) > 0;
351 }
352
353 virtual void setMaxWebSocketMessagePayloadSize(uint64_t value) = 0;
354 uint64_t getMaxWebSocketMessagePayloadSize() const { return maxWebSocketMessagePayloadSize; }
355
356 /****************
357 ***** CORE ******
358 ****************/
359
365 FETPAPI_DLL_IMPORT_OR_EXPORT void close() {
366 isCloseRequested_ = true;
367 sendingQueueMutex.lock();
368 specificProtocolHandlersMutex.lock();
369 if (specificProtocolHandlers.empty() && sendingQueue.empty()) {
370 etpSessionClosed = true;
371 sendingQueueMutex.unlock();
372 specificProtocolHandlersMutex.unlock();
373 send(std::make_shared<Energistics::Etp::v12::Protocol::Core::CloseSession>(), 0, 0x02);
374 }
375 else {
376 sendingQueueMutex.unlock();
377 specificProtocolHandlersMutex.unlock();
378 }
379 }
380
386 FETPAPI_DLL_IMPORT_OR_EXPORT void closeAndBlock() {
387 close();
388 auto t_start = std::chrono::high_resolution_clock::now();
389 while (!webSocketSessionClosed) {
390 if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
391 throw std::runtime_error("Time out waiting for closing");
392 }
393 }
394 }
395
399 FETPAPI_DLL_IMPORT_OR_EXPORT bool isEtpSessionClosed() const { return webSocketSessionClosed || etpSessionClosed; }
400
401 /****************
402 *** DATASPACE ***
403 ****************/
404
415 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Dataspace> getDataspaces(int64_t storeLastWriteFilter = -1);
416
425 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> putDataspaces(const std::map<std::string, Energistics::Etp::v12::Datatypes::Object::Dataspace>& dataspaces);
426
435 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> deleteDataspaces(const std::map<std::string, std::string>& dataspaceUris);
436
437 /*********************
438 *** DATASPACE OSDU ***
439 **********************/
440
449 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Dataspace> getDataspaceInfo(const std::map<std::string, std::string>& dataspaceUris);
450
462 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> lockDataspaces(const std::map<std::string, std::string>& dataspaceUris, bool lock);
463
473 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> copyDataspacesContent(const std::map<std::string, std::string>& sourceDataspaceUris, const std::string& targetDataspaceUri);
474
484 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> copyToDataspace(const std::map<std::string, std::string>& sourceDataobjectUris, const std::string& targetDataspaceUri);
485
486 /****************
487 *** DISCOVERY ***
488 ****************/
489
505 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Resource> getResources(
507 const Energistics::Etp::v12::Datatypes::Object::ContextScopeKind& scope,
508 int64_t storeLastWriteFilter = -1,
509 bool countObjects = false);
510
523 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::DeletedResource> getDeletedResources(
524 const std::string& dataspaceUri,
525 int64_t deleteTimeFilter = -1,
526 const std::vector<std::string>& dataObjectTypes = {});
527
528 /****************
529 ***** STORE *****
530 ****************/
531
540 FETPAPI_DLL_IMPORT_OR_EXPORT std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject> getDataObjects(const std::map<std::string, std::string>& uris);
541
550 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> putDataObjects(const std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject>& dataObjects);
551
560 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> deleteDataObjects(const std::map<std::string, std::string>& uris);
561
562 /*********************
563 ***** STORE OSDU *****
564 **********************/
565
580 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> copyDataObjectsByValue(const std::string& sourceDataobjectUri, int32_t sourcesDepth = 0, const std::vector<std::string>& dataObjectTypes = {});
581
582 /****************
583 ** TRANSACTION **
584 ****************/
585
595 FETPAPI_DLL_IMPORT_OR_EXPORT std::string startTransaction(std::vector<std::string> dataspaceUris = {}, bool readOnly = false);
596
604 FETPAPI_DLL_IMPORT_OR_EXPORT std::string rollbackTransaction();
605
606 /*
607 * A customer sends to a store to cancel a transaction. The store MUST disregard any requests or data sent
608 * with that transaction. The current transaction (the one being canceled) MUST NOT change the state of
609 * the store.
610 * It actually sends a message and blocks the current thread until a response has been received from the store.
611 * @return Failure message or empty string if success
612 */
613 FETPAPI_DLL_IMPORT_OR_EXPORT std::string commitTransaction();
614
615 /***************************/
616 // LOGGING
617 /***************************/
618
622 void setVerbose(bool verbose) {
623 _verbose = verbose;
624 }
625
626 //terminating log
627 void fesapi_log() { std::cout << std::endl; }
628
629 template<typename First, typename ...Rest>
630 void fesapi_log(First && first, Rest && ...rest)
631 {
632 if (_verbose) {
633 std::cout << std::forward<First>(first) << " ";
634 fesapi_log(std::forward<Rest>(rest)...);
635 }
636 }
637
638 protected:
639 beast::flat_buffer receivedBuffer;
641 std::unordered_map<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type, std::shared_ptr<ETP_NS::ProtocolHandlers>> protocolHandlers;
643 std::unordered_map<int64_t, std::tuple<std::shared_ptr<EtpMessage>, std::shared_ptr<ETP_NS::ProtocolHandlers>, std::vector<int64_t>>> specificProtocolHandlers;
644 std::mutex specificProtocolHandlersMutex;
650 uint64_t maxWebSocketMessagePayloadSize{ 16000000 };
652 std::atomic<bool> webSocketSessionClosed{ true };
654 std::atomic<bool> etpSessionClosed{ true };
656 std::atomic<double> _timeOut{ 30000 };
658 std::atomic<bool> _verbose{ false };
660 std::queue< std::tuple<std::shared_ptr<EtpMessage>, std::shared_ptr<ETP_NS::ProtocolHandlers>, std::vector<int64_t> > > sendingQueue;
661 std::mutex sendingQueueMutex;
663 std::atomic<int64_t> messageId;
665 boost::uuids::uuid identifier{ boost::uuids::nil_uuid() };
666 std::mutex identifierMutex;
668 bool isCloseRequested_{ false };
669 size_t reconnectionTryCount_ = 0;
670
671 AbstractSession() = default;
672
673 virtual boost::asio::io_context& getIoContext() = 0;
674
675 void flushReceivingBuffer() {
676 receivedBuffer.consume(receivedBuffer.size());
677 }
678
679 void setEtpSessionClosed(bool etpSessionClosed_) {
680 etpSessionClosed = etpSessionClosed_;
681 reconnectionTryCount_ = 0;
682 }
683
687 virtual void do_write() = 0;
688
693 Energistics::Etp::v12::Datatypes::MessageHeader decodeMessageHeader(avro::DecoderPtr decoder);
694
695 std::shared_ptr<ETP_NS::CoreHandlers> getCoreProtocolHandlers() {
696 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Core));
697 return it == protocolHandlers.end()
698 ? nullptr
699 : std::dynamic_pointer_cast<CoreHandlers>(it->second);
700 }
701
702 std::shared_ptr<ETP_NS::DiscoveryHandlers> getDiscoveryProtocolHandlers() {
703 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Discovery));
704 return it == protocolHandlers.end()
705 ? nullptr
706 : std::dynamic_pointer_cast<DiscoveryHandlers>(it->second);
707 }
708
709 std::shared_ptr<ETP_NS::StoreHandlers> getStoreProtocolHandlers() {
710 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Store));
711 return it == protocolHandlers.end()
712 ? nullptr
713 : std::dynamic_pointer_cast<StoreHandlers>(it->second);
714 }
715
716 std::shared_ptr<ETP_NS::StoreNotificationHandlers> getStoreNotificationProtocolHandlers() {
717 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::StoreNotification));
718 return it == protocolHandlers.end()
719 ? nullptr
720 : std::dynamic_pointer_cast<StoreNotificationHandlers>(it->second);
721 }
722
723 std::shared_ptr<ETP_NS::DataArrayHandlers> getDataArrayProtocolHandlers() {
724 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::DataArray));
725 return it == protocolHandlers.end()
726 ? nullptr
727 : std::dynamic_pointer_cast<DataArrayHandlers>(it->second);
728 }
729
730 std::shared_ptr<ETP_NS::TransactionHandlers> getTransactionProtocolHandlers() {
731 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Transaction));
732 return it == protocolHandlers.end()
733 ? nullptr
734 : std::dynamic_pointer_cast<TransactionHandlers>(it->second);
735 }
736
737 std::shared_ptr<ETP_NS::DataspaceHandlers> getDataspaceProtocolHandlers() {
738 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Dataspace));
739 return it == protocolHandlers.end()
740 ? nullptr
741 : std::dynamic_pointer_cast<DataspaceHandlers>(it->second);
742 }
743
744 std::shared_ptr<ETP_NS::CoreOSDUHandlers> getCoreOSDUProtocolHandlers() {
745 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::CoreOSDU));
746 return it == protocolHandlers.end()
747 ? nullptr
748 : std::dynamic_pointer_cast<CoreOSDUHandlers>(it->second);
749 }
750
751 std::shared_ptr<ETP_NS::StoreOSDUHandlers> getStoreOSDUProtocolHandlers() {
752 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::StoreOSDU));
753 return it == protocolHandlers.end()
754 ? nullptr
755 : std::dynamic_pointer_cast<StoreOSDUHandlers>(it->second);
756 }
757
758 std::shared_ptr<ETP_NS::DataspaceOSDUHandlers> getDataspaceOSDUProtocolHandlers() {
759 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::DataspaceOSDU));
760 return it == protocolHandlers.end()
761 ? nullptr
762 : std::dynamic_pointer_cast<DataspaceOSDUHandlers>(it->second);
763 }
764
765 void setProtocolHandlers(std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type protocolId, std::shared_ptr<ProtocolHandlers> coreHandlers) {
766 // Verify that we don't modify handlers which could be in use on the io context thread
768 throw std::logic_error("You cannot set some protocol handlers once the session is running.");
769 }
770
771 protocolHandlers[protocolId] = coreHandlers;
772 }
773
774 friend void CoreHandlers::decodeMessageBody(const Energistics::Etp::v12::Datatypes::MessageHeader& mh, avro::DecoderPtr d);
775 friend void CoreHandlers::on_ProtocolException(const Energistics::Etp::v12::Protocol::Core::ProtocolException& msg, int64_t correlationId);
776 friend void CoreOSDUHandlers::decodeMessageBody(const Energistics::Etp::v12::Datatypes::MessageHeader& mh, avro::DecoderPtr d);
777 };
778}
FETPAPI_DLL_IMPORT_OR_EXPORT std::string rollbackTransaction()
FETPAPI_DLL_IMPORT_OR_EXPORT void setStoreOSDUProtocolHandlers(std::shared_ptr< StoreOSDUHandlers > storeOSDUHandlers)
Definition AbstractSession.h:111
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > deleteDataObjects(const std::map< std::string, std::string > &uris)
double getTimeOut() const
Definition AbstractSession.h:76
int64_t sendWithSpecificHandler(std::shared_ptr< EtpMessage > message, std::shared_ptr< ETP_NS::ProtocolHandlers > specificHandler, int64_t correlationId=0, int32_t messageFlags=0, const std::vector< int64_t > &msgIdAliases={})
Definition AbstractSession.h:209
FETPAPI_DLL_IMPORT_OR_EXPORT void setTransactionProtocolHandlers(std::shared_ptr< TransactionHandlers > transactionHandlers)
Definition AbstractSession.h:132
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< Energistics::Etp::v12::Datatypes::Object::Dataspace > getDataspaces(int64_t storeLastWriteFilter=-1)
FETPAPI_DLL_IMPORT_OR_EXPORT void setDiscoveryProtocolHandlers(std::shared_ptr< DiscoveryHandlers > discoveryHandlers)
Definition AbstractSession.h:90
FETPAPI_DLL_IMPORT_OR_EXPORT std::string startTransaction(std::vector< std::string > dataspaceUris={}, bool readOnly=false)
FETPAPI_DLL_IMPORT_OR_EXPORT bool isWebSocketSessionClosed() const
Definition AbstractSession.h:343
int64_t send(std::shared_ptr< EtpMessage > message, int64_t correlationId=0, int32_t messageFlags=0, const std::vector< int64_t > &msgIdAliases={})
Definition AbstractSession.h:161
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > putDataspaces(const std::map< std::string, Energistics::Etp::v12::Datatypes::Object::Dataspace > &dataspaces)
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > deleteDataspaces(const std::map< std::string, std::string > &dataspaceUris)
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > lockDataspaces(const std::map< std::string, std::string > &dataspaceUris, bool lock)
int64_t sendWithSpecificHandlerAndBlock(std::shared_ptr< EtpMessage > message, std::shared_ptr< ETP_NS::ProtocolHandlers > specificHandler, int64_t correlationId=0, int32_t messageFlags=0, const std::vector< int64_t > &msgIdAliases={})
Definition AbstractSession.h:275
FETPAPI_DLL_IMPORT_OR_EXPORT void setStoreNotificationProtocolHandlers(std::shared_ptr< ETP_NS::StoreNotificationHandlers > storeNotificationHandlers)
Definition AbstractSession.h:118
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > copyToDataspace(const std::map< std::string, std::string > &sourceDataobjectUris, const std::string &targetDataspaceUri)
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > copyDataObjectsByValue(const std::string &sourceDataobjectUri, int32_t sourcesDepth=0, const std::vector< std::string > &dataObjectTypes={})
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > putDataObjects(const std::map< std::string, Energistics::Etp::v12::Datatypes::Object::DataObject > &dataObjects)
void setTimeOut(double timeOut)
Definition AbstractSession.h:69
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< Energistics::Etp::v12::Datatypes::Object::Resource > getResources(const Energistics::Etp::v12::Datatypes::Object::ContextInfo &context, const Energistics::Etp::v12::Datatypes::Object::ContextScopeKind &scope, int64_t storeLastWriteFilter=-1, bool countObjects=false)
FETPAPI_DLL_IMPORT_OR_EXPORT std::map< std::string, Energistics::Etp::v12::Datatypes::Object::DataObject > getDataObjects(const std::map< std::string, std::string > &uris)
virtual void do_close()=0
FETPAPI_DLL_IMPORT_OR_EXPORT void on_read(boost::system::error_code ec, std::size_t bytes_transferred)
FETPAPI_DLL_IMPORT_OR_EXPORT void closeAndBlock()
Definition AbstractSession.h:386
void setVerbose(bool verbose)
Definition AbstractSession.h:622
FETPAPI_DLL_IMPORT_OR_EXPORT void setDataspaceProtocolHandlers(std::shared_ptr< DataspaceHandlers > dataspaceHandlers)
Definition AbstractSession.h:139
FETPAPI_DLL_IMPORT_OR_EXPORT void setDataspaceOSDUProtocolHandlers(std::shared_ptr< DataspaceOSDUHandlers > dataspaceOSDUHandlers)
Definition AbstractSession.h:146
const boost::uuids::uuid & getIdentifier()
Definition AbstractSession.h:61
virtual void do_read()=0
FETPAPI_DLL_IMPORT_OR_EXPORT void setCoreProtocolHandlers(std::shared_ptr< CoreHandlers > coreHandlers)
Definition AbstractSession.h:83
FETPAPI_DLL_IMPORT_OR_EXPORT void setDataArrayProtocolHandlers(std::shared_ptr< DataArrayHandlers > dataArrayHandlers)
Definition AbstractSession.h:125
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< Energistics::Etp::v12::Datatypes::Object::Dataspace > getDataspaceInfo(const std::map< std::string, std::string > &dataspaceUris)
FETPAPI_DLL_IMPORT_OR_EXPORT void close()
Definition AbstractSession.h:365
FETPAPI_DLL_IMPORT_OR_EXPORT void setCoreOSDUProtocolHandlers(std::shared_ptr< CoreOSDUHandlers > coreOSDUHandlers)
Definition AbstractSession.h:104
FETPAPI_DLL_IMPORT_OR_EXPORT bool isMessageStillProcessing(int64_t msgId)
Definition AbstractSession.h:348
FETPAPI_DLL_IMPORT_OR_EXPORT bool isEtpSessionClosed() const
Definition AbstractSession.h:399
FETPAPI_DLL_IMPORT_OR_EXPORT bool isCloseRequested() const
Definition AbstractSession.h:338
FETPAPI_DLL_IMPORT_OR_EXPORT void setStoreProtocolHandlers(std::shared_ptr< StoreHandlers > storeHandlers)
Definition AbstractSession.h:97
int64_t sendAndBlock(std::shared_ptr< EtpMessage > message, int64_t correlationId=0, int32_t messageFlags=0, const std::vector< int64_t > &msgIdAliases={})
Definition AbstractSession.h:178
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > copyDataspacesContent(const std::map< std::string, std::string > &sourceDataspaceUris, const std::string &targetDataspaceUri)
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< Energistics::Etp::v12::Datatypes::Object::DeletedResource > getDeletedResources(const std::string &dataspaceUri, int64_t deleteTimeFilter=-1, const std::vector< std::string > &dataObjectTypes={})