FETPAPI 0.3.0.0
This project provides C++ classes which facilitate the developement of ETP1.2 clients and servers.
Loading...
Searching...
No Matches
AbstractSession.h
1/*-----------------------------------------------------------------------
2Licensed to the Apache Software Foundation (ASF) under one
3or more contributor license agreements. See the NOTICE file
4distributed with this work for additional information
5regarding copyright ownership. The ASF licenses this file
6to you under the Apache License, Version 2.0 (the
7"License"; you may not use this file except in compliance
8with the License. You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing,
13software distributed under the License is distributed on an
14"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15KIND, either express or implied. See the License for the
16specific language governing permissions and limitations
17under the License.
18-----------------------------------------------------------------------*/
19#pragma once
20
21#include <iostream>
22#include <mutex>
23#include <queue>
24#include <unordered_map>
25#include <utility>
26
27#include <boost/asio.hpp>
28#include <boost/beast/core.hpp>
29#include <boost/beast/websocket.hpp>
30#include <boost/uuid/uuid_io.hpp>
31#include <boost/uuid/nil_generator.hpp>
32
33#include "../nsDefinitions.h"
34
35#include "EtpHelpers.h"
36#include "ProtocolHandlers/CoreHandlers.h"
37#include "ProtocolHandlers/DiscoveryHandlers.h"
38#include "ProtocolHandlers/StoreHandlers.h"
39#include "ProtocolHandlers/StoreNotificationHandlers.h"
40#include "ProtocolHandlers/DataArrayHandlers.h"
41#include "ProtocolHandlers/TransactionHandlers.h"
42#include "ProtocolHandlers/DataspaceHandlers.h"
43#include "ProtocolHandlers/DataspaceOSDUHandlers.h"
44
45using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
46namespace websocket = boost::beast::websocket; // from <boost/beast/websocket.hpp>
47
48namespace ETP_NS
49{
50 class AbstractSession : public std::enable_shared_from_this<AbstractSession>
51 {
52 public:
53
54 virtual ~AbstractSession() = default;
55
60 const boost::uuids::uuid& getIdentifier() {
61 if (isEtpSessionClosed()) {
62 identifier = boost::uuids::nil_uuid();
63 }
64
65 return identifier;
66 }
67
71 void setTimeOut(double timeOut) {
72 _timeOut = timeOut;
73 }
74
78 double getTimeOut() const {
79 return _timeOut;
80 }
81
85 std::unordered_map<int64_t, Energistics::Etp::v12::Datatypes::Object::SubscriptionInfo> subscriptions;
86
90 FETPAPI_DLL_IMPORT_OR_EXPORT void setCoreProtocolHandlers(std::shared_ptr<CoreHandlers> coreHandlers) {
91 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Core), coreHandlers);
92 }
93
97 FETPAPI_DLL_IMPORT_OR_EXPORT void setDiscoveryProtocolHandlers(std::shared_ptr<DiscoveryHandlers> discoveryHandlers) {
98 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Discovery), discoveryHandlers);
99 }
100
104 FETPAPI_DLL_IMPORT_OR_EXPORT void setStoreProtocolHandlers(std::shared_ptr<StoreHandlers> storeHandlers) {
105 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Store), storeHandlers);
106 }
107
111 FETPAPI_DLL_IMPORT_OR_EXPORT void setStoreNotificationProtocolHandlers(std::shared_ptr<ETP_NS::StoreNotificationHandlers> storeNotificationHandlers) {
112 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::StoreNotification), storeNotificationHandlers);
113 }
114
118 FETPAPI_DLL_IMPORT_OR_EXPORT void setDataArrayProtocolHandlers(std::shared_ptr<DataArrayHandlers> dataArrayHandlers) {
119 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::DataArray), dataArrayHandlers);
120 }
121
125 FETPAPI_DLL_IMPORT_OR_EXPORT void setTransactionProtocolHandlers(std::shared_ptr<TransactionHandlers> transactionHandlers) {
126 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Transaction), transactionHandlers);
127 }
128
132 FETPAPI_DLL_IMPORT_OR_EXPORT void setDataspaceProtocolHandlers(std::shared_ptr<DataspaceHandlers> dataspaceHandlers) {
133 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Dataspace), dataspaceHandlers);
134 }
135
139 FETPAPI_DLL_IMPORT_OR_EXPORT void setDataspaceOSDUProtocolHandlers(std::shared_ptr<DataspaceOSDUHandlers> dataspaceOsduHandlers) {
140 setProtocolHandlers(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::DataspaceOSDU), dataspaceOsduHandlers);
141 }
142
152 template<typename T> int64_t send(const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0)
153 {
154 return sendWithSpecificHandler(mb, protocolHandlers.at(mb.protocolId), correlationId, messageFlags);
155 }
156
166 template<typename T> void sendAndBlock(const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0)
167 {
168 int64_t msgId = send(mb, correlationId, messageFlags);
169
170 auto t_start = std::chrono::high_resolution_clock::now();
171 while (isMessageStillProcessing(msgId)) {
172 if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
173 throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
174 }
175 }
176 }
177
187 template<typename T> int64_t sendWithSpecificHandler(const T & mb, std::shared_ptr<ETP_NS::ProtocolHandlers> specificHandler, int64_t correlationId = 0, int32_t messageFlags = 0)
188 {
189 // Encode the message into AVRO format
190 auto queueItem = encode(mb, correlationId, messageFlags);
191
192 const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
193 // Set the handlers which are going to be called for the response to this sent message
194 std::get<2>(queueItem) = specificHandler;
195 // Push the message into the queue
196 sendingQueue.push(queueItem);
197 fesapi_log("*************************************************");
198 fesapi_log("Message Header put in the queue : ");
199 fesapi_log("protocol :", std::to_string(mb.protocolId));
200 fesapi_log("type :" , std::to_string(mb.messageTypeId));
201 fesapi_log("id :" , std::to_string(std::get<0>(queueItem)));
202 fesapi_log("correlation id :" , std::to_string(correlationId));
203 fesapi_log("flags :" , std::to_string(messageFlags));
204 fesapi_log("Whole message size :" , std::to_string(std::get<1>(queueItem).size()) , "bytes.");
205 fesapi_log("*************************************************");
206
207 // Send the message directly if the sending queue was empty.
208 if (sendingQueue.size() == 1) {
209 do_write();
210 }
211
212 return std::get<0>(queueItem);
213 }
214
218 virtual void do_close() = 0;
219
223 virtual void do_read() = 0;
224
228 FETPAPI_DLL_IMPORT_OR_EXPORT void on_read(boost::system::error_code ec, std::size_t bytes_transferred);
229
230 void on_write(boost::system::error_code ec, std::size_t) {
231 if(ec) {
232 std::cerr << "on_write : " << ec.message() << std::endl;
233 }
234
235 // Remove the sent message from the queue
236 const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
237 sendingQueue.pop();
238
239 do_write();
240 }
241
242 void on_close(boost::system::error_code ec) {
243 if(ec) {
244 std::cerr << "on_close : " << ec.message() << std::endl;
245 }
246 if (!etpSessionClosed) {
247 std::cerr << "Websocket session is going to be closed BUT ETP SESSION HAS NOT BEEN CLOSED YET!!!" << std::endl;
248 }
249
250 // If we get here then the connection is closed gracefully
251 webSocketSessionClosed = true;
252 }
253
257 FETPAPI_DLL_IMPORT_OR_EXPORT bool isWebSocketSessionClosed() const { return webSocketSessionClosed; }
258
262 FETPAPI_DLL_IMPORT_OR_EXPORT bool isMessageStillProcessing(int64_t msgId) {
263 const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
264 const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
265 return (!sendingQueue.empty() && std::get<0>(sendingQueue.front()) <= msgId) || specificProtocolHandlers.count(msgId) > 0;
266 }
267 //FETPAPI_DLL_IMPORT_OR_EXPORT bool isMessageStillProcessing(int64_t msgId) const { return specificProtocolHandlers.count(msgId) > 0; }
268
269 virtual void setMaxWebSocketMessagePayloadSize(int64_t value) = 0;
270 int64_t getMaxWebSocketMessagePayloadSize() const { return maxWebSocketMessagePayloadSize; }
271
272 /****************
273 ***** CORE ******
274 ****************/
275
281 FETPAPI_DLL_IMPORT_OR_EXPORT void close() {
282 isCloseRequested = true;
283 sendingQueueMutex.lock();
284 specificProtocolHandlersMutex.lock();
285 if (specificProtocolHandlers.empty() && sendingQueue.empty()) {
286 etpSessionClosed = true;
287 sendingQueueMutex.unlock();
288 specificProtocolHandlersMutex.unlock();
290 }
291 else {
292 sendingQueueMutex.unlock();
293 specificProtocolHandlersMutex.unlock();
294 }
295 }
296
302 FETPAPI_DLL_IMPORT_OR_EXPORT void closeAndBlock() {
303 close();
304 auto t_start = std::chrono::high_resolution_clock::now();
305 while (!isEtpSessionClosed()) {
306 if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
307 throw std::runtime_error("Time out waiting for closing");
308 }
309 }
310 }
311
315 FETPAPI_DLL_IMPORT_OR_EXPORT bool isEtpSessionClosed() const { return webSocketSessionClosed || etpSessionClosed; }
316
317 void setEtpSessionClosed(bool etpSessionClosed_) { etpSessionClosed = etpSessionClosed_; }
318
319 /****************
320 *** DATASPACE ***
321 ****************/
322
333 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Dataspace> getDataspaces(int64_t storeLastWriteFilter = -1);
334
343 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> putDataspaces(const std::map<std::string, Energistics::Etp::v12::Datatypes::Object::Dataspace>& dataspaces);
344
353 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> deleteDataspaces(const std::map<std::string, std::string>& dataspaceUris);
354
355 /*********************
356 *** DATASPACE OSDU ***
357 **********************/
358
367 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Dataspace> getDataspaceInfo(const std::map<std::string, std::string>& dataspaceUris);
368
378 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> copyDataspacesContent(const std::map<std::string, std::string>& sourceDataspaceUris, const std::string& targetDataspaceUri);
379
389 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> lockDataspaces(const std::map<std::string, std::string>& dataspaceUris, bool lock);
390
400 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> copyToDataspace(const std::map<std::string, std::string>& sourceUris, const std::string& targetDataspaceUri);
401
402 /****************
403 *** DISCOVERY ***
404 ****************/
405
421 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Resource> getResources(
423 const Energistics::Etp::v12::Datatypes::Object::ContextScopeKind& scope,
424 int64_t storeLastWriteFilter = -1,
425 bool countObjects = false);
426
439 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::DeletedResource> getDeletedResources(
440 const std::string& dataspaceUri,
441 int64_t deleteTimeFilter = -1,
442 const std::vector<std::string>& dataObjectTypes = {});
443
444 /****************
445 ***** STORE *****
446 ****************/
447
456 FETPAPI_DLL_IMPORT_OR_EXPORT std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject> getDataObjects(const std::map<std::string, std::string>& uris);
457
466 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> putDataObjects(const std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject>& dataObjects);
467
476 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string> deleteDataObjects(const std::map<std::string, std::string>& uris);
477
478 /****************
479 ** TRANSACTION **
480 ****************/
481
490 FETPAPI_DLL_IMPORT_OR_EXPORT std::string startTransaction(std::vector<std::string> dataspaceUris = {}, bool readOnly = false);
491
498 FETPAPI_DLL_IMPORT_OR_EXPORT std::string rollbackTransaction();
499
500 /*
501 * A customer sends to a store to cancel a transaction. The store MUST disregard any requests or data sent
502 * with that transaction. The current transaction (the one being canceled) MUST NOT change the state of
503 * the store.
504 * It actually sends a message and block the current thread untill a response has been received from the store.
505 */
506 FETPAPI_DLL_IMPORT_OR_EXPORT std::string commitTransaction();
507
508 /***************************/
509 // LOGGING
510 /***************************/
511
515 void setVerbose(bool verbose) {
516 _verbose = verbose;
517 }
518
519 //terminating log
520 void fesapi_log() { std::cout << std::endl; }
521
522 template<typename First, typename ...Rest>
523 void fesapi_log(First && first, Rest && ...rest)
524 {
525 if (_verbose) {
526 std::cout << std::forward<First>(first) << " ";
527 fesapi_log(std::forward<Rest>(rest)...);
528 }
529 }
530
531 protected:
532 boost::beast::flat_buffer receivedBuffer;
534 std::unordered_map<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type, std::shared_ptr<ETP_NS::ProtocolHandlers>> protocolHandlers;
536 std::unordered_map<int64_t, std::shared_ptr<ETP_NS::ProtocolHandlers>> specificProtocolHandlers;
537 std::mutex specificProtocolHandlersMutex;
543 int64_t maxWebSocketMessagePayloadSize{ 16000000 };
545 std::atomic<bool> webSocketSessionClosed{ true };
547 std::atomic<bool> etpSessionClosed{ true };
549 std::atomic<double> _timeOut{ 30000 };
551 std::atomic<bool> _verbose{ false };
553 std::queue< std::tuple<int64_t, std::vector<uint8_t>, std::shared_ptr<ETP_NS::ProtocolHandlers>> > sendingQueue;
554 std::mutex sendingQueueMutex;
556 std::atomic<int64_t> messageId;
558 boost::uuids::uuid identifier;
560 bool isCloseRequested{ false };
561
562 AbstractSession() = default;
563
564 virtual boost::asio::io_context& getIoContext() = 0;
565
566 void flushReceivingBuffer() {
567 receivedBuffer.consume(receivedBuffer.size());
568 }
569
573 virtual void do_write() = 0;
574
579 Energistics::Etp::v12::Datatypes::MessageHeader decodeMessageHeader(avro::DecoderPtr decoder);
580
581 template<typename T> std::tuple<int64_t, std::vector<uint8_t>, std::shared_ptr<ETP_NS::ProtocolHandlers>> encode(const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0)
582 {
583 // Build message header
585 mh.protocol = mb.protocolId;
586 mh.messageType = mb.messageTypeId;
587 mh.correlationId = correlationId;
588 mh.messageId = messageId.fetch_add(2);
589 mh.messageFlags = messageFlags;
590
591 avro::OutputStreamPtr out = avro::memoryOutputStream();
592 avro::EncoderPtr e = avro::binaryEncoder();
593 e->init(*out);
594 avro::encode(*e, mh);
595 avro::encode(*e, mb);
596 e->flush();
597 const int64_t byteCount = e->byteCount();
598
599 if (byteCount < maxWebSocketMessagePayloadSize) {
600 return std::make_tuple(mh.messageId, *avro::snapshot(*out).get(), nullptr);
601 }
602 else {
603 messageId -= 2;
604 if (correlationId != 0) {
605 return encode(EtpHelpers::buildSingleMessageProtocolException(17, "I try to send you a too big message response of protocol "
606 + std::to_string(mb.protocolId) + " and type id " + std::to_string(mb.messageTypeId) + " and size " + std::to_string(byteCount)
607 + " bytes according to our negotiated size capability which is " + std::to_string(maxWebSocketMessagePayloadSize) + " bytes."), correlationId, 0x02);
608 }
609 else {
610 return std::make_tuple(-1, std::vector<uint8_t>{}, nullptr); // You cannot send a message which is too big. Please use message part or chunk or whatever else.
611 }
612 }
613 }
614
615 std::shared_ptr<ETP_NS::CoreHandlers> getCoreProtocolHandlers() {
616 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Core));
617 return it == protocolHandlers.end()
618 ? nullptr
619 : std::dynamic_pointer_cast<CoreHandlers>(it->second);
620 }
621
622 std::shared_ptr<ETP_NS::DiscoveryHandlers> getDiscoveryProtocolHandlers() {
623 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Discovery));
624 return it == protocolHandlers.end()
625 ? nullptr
626 : std::dynamic_pointer_cast<DiscoveryHandlers>(it->second);
627 }
628
629 std::shared_ptr<ETP_NS::StoreHandlers> getStoreProtocolHandlers() {
630 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Store));
631 return it == protocolHandlers.end()
632 ? nullptr
633 : std::dynamic_pointer_cast<StoreHandlers>(it->second);
634 }
635
636 std::shared_ptr<ETP_NS::StoreNotificationHandlers> getStoreNotificationProtocolHandlers() {
637 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::StoreNotification));
638 return it == protocolHandlers.end()
639 ? nullptr
640 : std::dynamic_pointer_cast<StoreNotificationHandlers>(it->second);
641 }
642
643 std::shared_ptr<ETP_NS::DataArrayHandlers> getDataArrayProtocolHandlers() {
644 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::DataArray));
645 return it == protocolHandlers.end()
646 ? nullptr
647 : std::dynamic_pointer_cast<DataArrayHandlers>(it->second);
648 }
649
650 std::shared_ptr<ETP_NS::TransactionHandlers> getTransactionProtocolHandlers() {
651 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Transaction));
652 return it == protocolHandlers.end()
653 ? nullptr
654 : std::dynamic_pointer_cast<TransactionHandlers>(it->second);
655 }
656
657 std::shared_ptr<ETP_NS::DataspaceHandlers> getDataspaceProtocolHandlers() {
658 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Dataspace));
659 return it == protocolHandlers.end()
660 ? nullptr
661 : std::dynamic_pointer_cast<DataspaceHandlers>(it->second);
662 }
663
664 std::shared_ptr<ETP_NS::DataspaceOSDUHandlers> getDataspaceOSDUProtocolHandlers() {
665 auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::DataspaceOSDU));
666 return it == protocolHandlers.end()
667 ? nullptr
668 : std::dynamic_pointer_cast<DataspaceOSDUHandlers>(it->second);
669 }
670
671 void setProtocolHandlers(std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type protocolId, std::shared_ptr<ProtocolHandlers> coreHandlers) {
672 // Verify that we don't modify handlers which could be in use on the io context thread
674 throw std::logic_error("You cannot set some protocol handlers once the session is running.");
675 }
676
677 protocolHandlers[protocolId] = coreHandlers;
678 }
679 };
680}
Definition AbstractSession.h:51
int64_t send(const T &mb, int64_t correlationId=0, int32_t messageFlags=0)
Definition AbstractSession.h:152
FETPAPI_DLL_IMPORT_OR_EXPORT std::string rollbackTransaction()
int64_t sendWithSpecificHandler(const T &mb, std::shared_ptr< ETP_NS::ProtocolHandlers > specificHandler, int64_t correlationId=0, int32_t messageFlags=0)
Definition AbstractSession.h:187
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > deleteDataObjects(const std::map< std::string, std::string > &uris)
double getTimeOut() const
Definition AbstractSession.h:78
FETPAPI_DLL_IMPORT_OR_EXPORT void setTransactionProtocolHandlers(std::shared_ptr< TransactionHandlers > transactionHandlers)
Definition AbstractSession.h:125
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< Energistics::Etp::v12::Datatypes::Object::Dataspace > getDataspaces(int64_t storeLastWriteFilter=-1)
void sendAndBlock(const T &mb, int64_t correlationId=0, int32_t messageFlags=0)
Definition AbstractSession.h:166
FETPAPI_DLL_IMPORT_OR_EXPORT void setDiscoveryProtocolHandlers(std::shared_ptr< DiscoveryHandlers > discoveryHandlers)
Definition AbstractSession.h:97
FETPAPI_DLL_IMPORT_OR_EXPORT std::string startTransaction(std::vector< std::string > dataspaceUris={}, bool readOnly=false)
FETPAPI_DLL_IMPORT_OR_EXPORT bool isWebSocketSessionClosed() const
Definition AbstractSession.h:257
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > putDataspaces(const std::map< std::string, Energistics::Etp::v12::Datatypes::Object::Dataspace > &dataspaces)
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > deleteDataspaces(const std::map< std::string, std::string > &dataspaceUris)
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > lockDataspaces(const std::map< std::string, std::string > &dataspaceUris, bool lock)
FETPAPI_DLL_IMPORT_OR_EXPORT void setStoreNotificationProtocolHandlers(std::shared_ptr< ETP_NS::StoreNotificationHandlers > storeNotificationHandlers)
Definition AbstractSession.h:111
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > putDataObjects(const std::map< std::string, Energistics::Etp::v12::Datatypes::Object::DataObject > &dataObjects)
void setTimeOut(double timeOut)
Definition AbstractSession.h:71
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > copyToDataspace(const std::map< std::string, std::string > &sourceUris, const std::string &targetDataspaceUri)
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< Energistics::Etp::v12::Datatypes::Object::Resource > getResources(const Energistics::Etp::v12::Datatypes::Object::ContextInfo &context, const Energistics::Etp::v12::Datatypes::Object::ContextScopeKind &scope, int64_t storeLastWriteFilter=-1, bool countObjects=false)
FETPAPI_DLL_IMPORT_OR_EXPORT std::map< std::string, Energistics::Etp::v12::Datatypes::Object::DataObject > getDataObjects(const std::map< std::string, std::string > &uris)
virtual void do_close()=0
FETPAPI_DLL_IMPORT_OR_EXPORT void on_read(boost::system::error_code ec, std::size_t bytes_transferred)
FETPAPI_DLL_IMPORT_OR_EXPORT void closeAndBlock()
Definition AbstractSession.h:302
void setVerbose(bool verbose)
Definition AbstractSession.h:515
FETPAPI_DLL_IMPORT_OR_EXPORT void setDataspaceProtocolHandlers(std::shared_ptr< DataspaceHandlers > dataspaceHandlers)
Definition AbstractSession.h:132
const boost::uuids::uuid & getIdentifier()
Definition AbstractSession.h:60
virtual void do_read()=0
FETPAPI_DLL_IMPORT_OR_EXPORT void setCoreProtocolHandlers(std::shared_ptr< CoreHandlers > coreHandlers)
Definition AbstractSession.h:90
FETPAPI_DLL_IMPORT_OR_EXPORT void setDataArrayProtocolHandlers(std::shared_ptr< DataArrayHandlers > dataArrayHandlers)
Definition AbstractSession.h:118
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< Energistics::Etp::v12::Datatypes::Object::Dataspace > getDataspaceInfo(const std::map< std::string, std::string > &dataspaceUris)
FETPAPI_DLL_IMPORT_OR_EXPORT void close()
Definition AbstractSession.h:281
FETPAPI_DLL_IMPORT_OR_EXPORT bool isMessageStillProcessing(int64_t msgId)
Definition AbstractSession.h:262
FETPAPI_DLL_IMPORT_OR_EXPORT bool isEtpSessionClosed() const
Definition AbstractSession.h:315
FETPAPI_DLL_IMPORT_OR_EXPORT void setDataspaceOSDUProtocolHandlers(std::shared_ptr< DataspaceOSDUHandlers > dataspaceOsduHandlers)
Definition AbstractSession.h:139
FETPAPI_DLL_IMPORT_OR_EXPORT void setStoreProtocolHandlers(std::shared_ptr< StoreHandlers > storeHandlers)
Definition AbstractSession.h:104
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< std::string > copyDataspacesContent(const std::map< std::string, std::string > &sourceDataspaceUris, const std::string &targetDataspaceUri)
FETPAPI_DLL_IMPORT_OR_EXPORT std::vector< Energistics::Etp::v12::Datatypes::Object::DeletedResource > getDeletedResources(const std::string &dataspaceUri, int64_t deleteTimeFilter=-1, const std::vector< std::string > &dataObjectTypes={})
std::unordered_map< int64_t, Energistics::Etp::v12::Datatypes::Object::SubscriptionInfo > subscriptions
Definition AbstractSession.h:85