62 identifier = boost::uuids::nil_uuid();
85 std::unordered_map<int64_t, Energistics::Etp::v12::Datatypes::Object::SubscriptionInfo>
subscriptions;
91 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Core), coreHandlers);
98 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Discovery), discoveryHandlers);
105 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Store), storeHandlers);
112 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::StoreNotification), storeNotificationHandlers);
119 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::DataArray), dataArrayHandlers);
126 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Transaction), transactionHandlers);
133 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Dataspace), dataspaceHandlers);
140 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::DataspaceOSDU), dataspaceOsduHandlers);
152 template<
typename T> int64_t
send(
const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0)
166 template<
typename T>
void sendAndBlock(
const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0)
168 int64_t msgId =
send(mb, correlationId, messageFlags);
170 auto t_start = std::chrono::high_resolution_clock::now();
172 if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
173 throw std::runtime_error(
"Time out waiting for a response of message id " + std::to_string(msgId));
187 template<
typename T> int64_t
sendWithSpecificHandler(
const T & mb, std::shared_ptr<ETP_NS::ProtocolHandlers> specificHandler, int64_t correlationId = 0, int32_t messageFlags = 0)
190 auto queueItem = encode(mb, correlationId, messageFlags);
192 const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
194 std::get<2>(queueItem) = specificHandler;
196 sendingQueue.push(queueItem);
197 fesapi_log(
"*************************************************");
198 fesapi_log(
"Message Header put in the queue : ");
199 fesapi_log(
"protocol :", std::to_string(mb.protocolId));
200 fesapi_log(
"type :" , std::to_string(mb.messageTypeId));
201 fesapi_log(
"id :" , std::to_string(std::get<0>(queueItem)));
202 fesapi_log(
"correlation id :" , std::to_string(correlationId));
203 fesapi_log(
"flags :" , std::to_string(messageFlags));
204 fesapi_log(
"Whole message size :" , std::to_string(std::get<1>(queueItem).size()) ,
"bytes.");
205 fesapi_log(
"*************************************************");
208 if (sendingQueue.size() == 1) {
212 return std::get<0>(queueItem);
228 FETPAPI_DLL_IMPORT_OR_EXPORT
void on_read(boost::system::error_code ec, std::size_t bytes_transferred);
230 void on_write(boost::system::error_code ec, std::size_t) {
232 std::cerr <<
"on_write : " << ec.message() << std::endl;
236 const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
242 void on_close(boost::system::error_code ec) {
244 std::cerr <<
"on_close : " << ec.message() << std::endl;
246 if (!etpSessionClosed) {
247 std::cerr <<
"Websocket session is going to be closed BUT ETP SESSION HAS NOT BEEN CLOSED YET!!!" << std::endl;
251 webSocketSessionClosed =
true;
263 const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
264 const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
265 return (!sendingQueue.empty() && std::get<0>(sendingQueue.front()) <= msgId) || specificProtocolHandlers.count(msgId) > 0;
269 virtual void setMaxWebSocketMessagePayloadSize(int64_t value) = 0;
270 int64_t getMaxWebSocketMessagePayloadSize()
const {
return maxWebSocketMessagePayloadSize; }
281 FETPAPI_DLL_IMPORT_OR_EXPORT
void close() {
282 isCloseRequested =
true;
283 sendingQueueMutex.lock();
284 specificProtocolHandlersMutex.lock();
285 if (specificProtocolHandlers.empty() && sendingQueue.empty()) {
286 etpSessionClosed =
true;
287 sendingQueueMutex.unlock();
288 specificProtocolHandlersMutex.unlock();
292 sendingQueueMutex.unlock();
293 specificProtocolHandlersMutex.unlock();
304 auto t_start = std::chrono::high_resolution_clock::now();
306 if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
307 throw std::runtime_error(
"Time out waiting for closing");
315 FETPAPI_DLL_IMPORT_OR_EXPORT
bool isEtpSessionClosed()
const {
return webSocketSessionClosed || etpSessionClosed; }
317 void setEtpSessionClosed(
bool etpSessionClosed_) { etpSessionClosed = etpSessionClosed_; }
333 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Dataspace>
getDataspaces(int64_t storeLastWriteFilter = -1);
343 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
putDataspaces(
const std::map<std::string, Energistics::Etp::v12::Datatypes::Object::Dataspace>& dataspaces);
353 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
deleteDataspaces(
const std::map<std::string, std::string>& dataspaceUris);
367 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Dataspace>
getDataspaceInfo(
const std::map<std::string, std::string>& dataspaceUris);
378 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
copyDataspacesContent(
const std::map<std::string, std::string>& sourceDataspaceUris,
const std::string& targetDataspaceUri);
389 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
lockDataspaces(
const std::map<std::string, std::string>& dataspaceUris,
bool lock);
400 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
copyToDataspace(
const std::map<std::string, std::string>& sourceUris,
const std::string& targetDataspaceUri);
421 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Resource>
getResources(
423 const Energistics::Etp::v12::Datatypes::Object::ContextScopeKind& scope,
424 int64_t storeLastWriteFilter = -1,
425 bool countObjects =
false);
439 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::DeletedResource>
getDeletedResources(
440 const std::string& dataspaceUri,
441 int64_t deleteTimeFilter = -1,
442 const std::vector<std::string>& dataObjectTypes = {});
456 FETPAPI_DLL_IMPORT_OR_EXPORT std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject>
getDataObjects(
const std::map<std::string, std::string>& uris);
466 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
putDataObjects(
const std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject>& dataObjects);
476 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
deleteDataObjects(
const std::map<std::string, std::string>& uris);
490 FETPAPI_DLL_IMPORT_OR_EXPORT std::string
startTransaction(std::vector<std::string> dataspaceUris = {},
bool readOnly =
false);
506 FETPAPI_DLL_IMPORT_OR_EXPORT std::string commitTransaction();
520 void fesapi_log() { std::cout << std::endl; }
522 template<
typename First,
typename ...Rest>
523 void fesapi_log(First && first, Rest && ...rest)
526 std::cout << std::forward<First>(first) <<
" ";
527 fesapi_log(std::forward<Rest>(rest)...);
532 boost::beast::flat_buffer receivedBuffer;
534 std::unordered_map<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type, std::shared_ptr<ETP_NS::ProtocolHandlers>> protocolHandlers;
536 std::unordered_map<int64_t, std::shared_ptr<ETP_NS::ProtocolHandlers>> specificProtocolHandlers;
537 std::mutex specificProtocolHandlersMutex;
543 int64_t maxWebSocketMessagePayloadSize{ 16000000 };
545 std::atomic<bool> webSocketSessionClosed{
true };
547 std::atomic<bool> etpSessionClosed{
true };
549 std::atomic<double> _timeOut{ 30000 };
551 std::atomic<bool> _verbose{
false };
553 std::queue< std::tuple<int64_t, std::vector<uint8_t>, std::shared_ptr<ETP_NS::ProtocolHandlers>> > sendingQueue;
554 std::mutex sendingQueueMutex;
556 std::atomic<int64_t> messageId;
558 boost::uuids::uuid identifier;
560 bool isCloseRequested{
false };
562 AbstractSession() =
default;
564 virtual boost::asio::io_context& getIoContext() = 0;
566 void flushReceivingBuffer() {
567 receivedBuffer.consume(receivedBuffer.size());
573 virtual void do_write() = 0;
581 template<
typename T> std::tuple<int64_t, std::vector<uint8_t>, std::shared_ptr<ETP_NS::ProtocolHandlers>> encode(
const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0)
585 mh.protocol = mb.protocolId;
586 mh.messageType = mb.messageTypeId;
587 mh.correlationId = correlationId;
588 mh.messageId = messageId.fetch_add(2);
589 mh.messageFlags = messageFlags;
591 avro::OutputStreamPtr out = avro::memoryOutputStream();
592 avro::EncoderPtr e = avro::binaryEncoder();
594 avro::encode(*e, mh);
595 avro::encode(*e, mb);
597 const int64_t byteCount = e->byteCount();
599 if (byteCount < maxWebSocketMessagePayloadSize) {
600 return std::make_tuple(mh.messageId, *avro::snapshot(*out).get(),
nullptr);
604 if (correlationId != 0) {
605 return encode(EtpHelpers::buildSingleMessageProtocolException(17,
"I try to send you a too big message response of protocol "
606 + std::to_string(mb.protocolId) +
" and type id " + std::to_string(mb.messageTypeId) +
" and size " + std::to_string(byteCount)
607 +
" bytes according to our negotiated size capability which is " + std::to_string(maxWebSocketMessagePayloadSize) +
" bytes."), correlationId, 0x02);
610 return std::make_tuple(-1, std::vector<uint8_t>{},
nullptr);
615 std::shared_ptr<ETP_NS::CoreHandlers> getCoreProtocolHandlers() {
616 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Core));
617 return it == protocolHandlers.end()
619 : std::dynamic_pointer_cast<CoreHandlers>(it->second);
622 std::shared_ptr<ETP_NS::DiscoveryHandlers> getDiscoveryProtocolHandlers() {
623 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Discovery));
624 return it == protocolHandlers.end()
626 : std::dynamic_pointer_cast<DiscoveryHandlers>(it->second);
629 std::shared_ptr<ETP_NS::StoreHandlers> getStoreProtocolHandlers() {
630 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Store));
631 return it == protocolHandlers.end()
633 : std::dynamic_pointer_cast<StoreHandlers>(it->second);
636 std::shared_ptr<ETP_NS::StoreNotificationHandlers> getStoreNotificationProtocolHandlers() {
637 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::StoreNotification));
638 return it == protocolHandlers.end()
640 : std::dynamic_pointer_cast<StoreNotificationHandlers>(it->second);
643 std::shared_ptr<ETP_NS::DataArrayHandlers> getDataArrayProtocolHandlers() {
644 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::DataArray));
645 return it == protocolHandlers.end()
647 : std::dynamic_pointer_cast<DataArrayHandlers>(it->second);
650 std::shared_ptr<ETP_NS::TransactionHandlers> getTransactionProtocolHandlers() {
651 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Transaction));
652 return it == protocolHandlers.end()
654 : std::dynamic_pointer_cast<TransactionHandlers>(it->second);
657 std::shared_ptr<ETP_NS::DataspaceHandlers> getDataspaceProtocolHandlers() {
658 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Dataspace));
659 return it == protocolHandlers.end()
661 : std::dynamic_pointer_cast<DataspaceHandlers>(it->second);
664 std::shared_ptr<ETP_NS::DataspaceOSDUHandlers> getDataspaceOSDUProtocolHandlers() {
665 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::DataspaceOSDU));
666 return it == protocolHandlers.end()
668 : std::dynamic_pointer_cast<DataspaceOSDUHandlers>(it->second);
671 void setProtocolHandlers(std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type protocolId, std::shared_ptr<ProtocolHandlers> coreHandlers) {
674 throw std::logic_error(
"You cannot set some protocol handlers once the session is running.");
677 protocolHandlers[protocolId] = coreHandlers;