51 class AbstractSession :
public std::enable_shared_from_this<AbstractSession>
55 virtual ~AbstractSession() =
default;
62 std::lock_guard<std::mutex> lock(identifierMutex);
84 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Core), coreHandlers);
91 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Discovery), discoveryHandlers);
98 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Store), storeHandlers);
105 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::CoreOSDU), coreOSDUHandlers);
112 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::StoreOSDU), storeOSDUHandlers);
119 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::StoreNotification), storeNotificationHandlers);
126 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::DataArray), dataArrayHandlers);
133 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Transaction), transactionHandlers);
140 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Dataspace), dataspaceHandlers);
147 setProtocolHandlers(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::DataspaceOSDU), dataspaceOSDUHandlers);
161 int64_t
send(std::shared_ptr<EtpMessage> message,
162 int64_t correlationId = 0, int32_t messageFlags = 0,
const std::vector<int64_t>& msgIdAliases = {})
164 return sendWithSpecificHandler(message, protocolHandlers.at(message->messageHeader.protocol), correlationId, messageFlags, msgIdAliases);
179 int64_t correlationId = 0, int32_t messageFlags = 0,
const std::vector<int64_t>& msgIdAliases = {})
181 int64_t msgId = send(message, correlationId, messageFlags, msgIdAliases);
188 auto t_start = std::chrono::high_resolution_clock::now();
189 while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
190 if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
191 throw std::runtime_error(
"Time out waiting for a response of message id " + std::to_string(msgId));
209 int64_t
sendWithSpecificHandler(std::shared_ptr<EtpMessage> message, std::shared_ptr<ETP_NS::ProtocolHandlers> specificHandler,
210 int64_t correlationId = 0, int32_t messageFlags = 0,
const std::vector<int64_t>& msgIdAliases = {})
212 message->messageHeader.correlationId = correlationId;
213 message->messageHeader.messageFlags = messageFlags;
214 message->messageHeader.messageId = messageId.fetch_add(2);
216 const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
219 sendingQueue.push(std::make_tuple(message, specificHandler, msgIdAliases));
220 fesapi_log(message->to_string());
222 if (message->messageHeader.protocol ==
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::CoreOSDU)
223 && message->messageHeader.messageType == Energistics::Etp::v12::Protocol::CoreOSDU::ResumeSession::messageTypeId) {
225 std::queue< std::tuple<std::shared_ptr<EtpMessage>, std::shared_ptr<ETP_NS::ProtocolHandlers>, std::vector<int64_t>> > queueWithPreviousInformation;
227 queueWithPreviousInformation.push(std::make_tuple(message, specificHandler, msgIdAliases));
229 int64_t previouslyDuplicatedId = -1;
230 for (
size_t i = 0; i < specificProtocolHandlers.size(); ++i) {
231 int64_t lowestMsgId = (std::numeric_limits<int64_t>::max)();
232 for (
auto keyVal : specificProtocolHandlers) {
233 if (keyVal.first > previouslyDuplicatedId && keyVal.first < lowestMsgId) {
234 lowestMsgId = keyVal.first;
237 previouslyDuplicatedId = lowestMsgId;
238 auto val = specificProtocolHandlers[lowestMsgId];
239 std::get<0>(val)->messageHeader.messageId = messageId.fetch_add(2);
240 std::get<2>(val).push_back(lowestMsgId);
241 queueWithPreviousInformation.push(val);
242 fesapi_log(std::get<0>(val)->to_string());
245 while (sendingQueue.size() != 1) {
246 std::get<0>(sendingQueue.front())->messageHeader.messageId = messageId.fetch_add(2);
247 queueWithPreviousInformation.push(sendingQueue.front());
248 fesapi_log(std::get<0>(sendingQueue.front())->to_string());
251 if (queueWithPreviousInformation.size() > 1) {
252 std::swap(sendingQueue, queueWithPreviousInformation);
256 else if (sendingQueue.size() == 1) {
261 return message->messageHeader.messageId;
276 int64_t correlationId = 0, int32_t messageFlags = 0,
const std::vector<int64_t>& msgIdAliases = {})
278 const int64_t msgId = sendWithSpecificHandler(message, specificHandler, correlationId, messageFlags, msgIdAliases);
285 const auto t_start = std::chrono::high_resolution_clock::now();
286 while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
287 if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
288 throw std::runtime_error(
"Time out waiting for a response of message id " + std::to_string(msgId));
298 throw std::runtime_error(
"The ETP session could not be opened in order to send again the message.");
321 FETPAPI_DLL_IMPORT_OR_EXPORT
void on_read(boost::system::error_code ec, std::size_t bytes_transferred);
323 void on_close(boost::system::error_code ec) {
325 std::cerr <<
"on_close : " << ec.message() << std::endl;
327 if (!etpSessionClosed) {
328 std::cerr <<
"Websocket session is going to be closed BUT ETP SESSION HAS NOT BEEN CLOSED YET!!!" << std::endl;
332 webSocketSessionClosed =
true;
349 std::scoped_lock lock(sendingQueueMutex, specificProtocolHandlersMutex);
350 return (!sendingQueue.empty() && std::get<0>(sendingQueue.front())->messageHeader.messageId <= msgId) || specificProtocolHandlers.count(msgId) > 0;
353 virtual void setMaxWebSocketMessagePayloadSize(uint64_t value) = 0;
354 uint64_t getMaxWebSocketMessagePayloadSize()
const {
return maxWebSocketMessagePayloadSize; }
365 FETPAPI_DLL_IMPORT_OR_EXPORT
void close() {
366 isCloseRequested_ =
true;
367 sendingQueueMutex.lock();
368 specificProtocolHandlersMutex.lock();
369 if (specificProtocolHandlers.empty() && sendingQueue.empty()) {
370 etpSessionClosed =
true;
371 sendingQueueMutex.unlock();
372 specificProtocolHandlersMutex.unlock();
373 send(std::make_shared<Energistics::Etp::v12::Protocol::Core::CloseSession>(), 0, 0x02);
376 sendingQueueMutex.unlock();
377 specificProtocolHandlersMutex.unlock();
388 auto t_start = std::chrono::high_resolution_clock::now();
389 while (!webSocketSessionClosed) {
390 if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
391 throw std::runtime_error(
"Time out waiting for closing");
399 FETPAPI_DLL_IMPORT_OR_EXPORT
bool isEtpSessionClosed()
const {
return webSocketSessionClosed || etpSessionClosed; }
415 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Dataspace>
getDataspaces(int64_t storeLastWriteFilter = -1);
425 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
putDataspaces(
const std::map<std::string, Energistics::Etp::v12::Datatypes::Object::Dataspace>& dataspaces);
435 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
deleteDataspaces(
const std::map<std::string, std::string>& dataspaceUris);
449 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Dataspace>
getDataspaceInfo(
const std::map<std::string, std::string>& dataspaceUris);
462 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
lockDataspaces(
const std::map<std::string, std::string>& dataspaceUris,
bool lock);
473 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
copyDataspacesContent(
const std::map<std::string, std::string>& sourceDataspaceUris,
const std::string& targetDataspaceUri);
484 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
copyToDataspace(
const std::map<std::string, std::string>& sourceDataobjectUris,
const std::string& targetDataspaceUri);
505 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::Resource>
getResources(
507 const Energistics::Etp::v12::Datatypes::Object::ContextScopeKind& scope,
508 int64_t storeLastWriteFilter = -1,
509 bool countObjects =
false);
523 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<Energistics::Etp::v12::Datatypes::Object::DeletedResource>
getDeletedResources(
524 const std::string& dataspaceUri,
525 int64_t deleteTimeFilter = -1,
526 const std::vector<std::string>& dataObjectTypes = {});
540 FETPAPI_DLL_IMPORT_OR_EXPORT std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject>
getDataObjects(
const std::map<std::string, std::string>& uris);
550 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
putDataObjects(
const std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject>& dataObjects);
560 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
deleteDataObjects(
const std::map<std::string, std::string>& uris);
580 FETPAPI_DLL_IMPORT_OR_EXPORT std::vector<std::string>
copyDataObjectsByValue(
const std::string& sourceDataobjectUri, int32_t sourcesDepth = 0,
const std::vector<std::string>& dataObjectTypes = {});
595 FETPAPI_DLL_IMPORT_OR_EXPORT std::string
startTransaction(std::vector<std::string> dataspaceUris = {},
bool readOnly =
false);
613 FETPAPI_DLL_IMPORT_OR_EXPORT std::string commitTransaction();
627 void fesapi_log() { std::cout << std::endl; }
629 template<
typename First,
typename ...Rest>
630 void fesapi_log(First && first, Rest && ...rest)
633 std::cout << std::forward<First>(first) <<
" ";
634 fesapi_log(std::forward<Rest>(rest)...);
639 beast::flat_buffer receivedBuffer;
641 std::unordered_map<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type, std::shared_ptr<ETP_NS::ProtocolHandlers>> protocolHandlers;
643 std::unordered_map<int64_t, std::tuple<std::shared_ptr<EtpMessage>, std::shared_ptr<ETP_NS::ProtocolHandlers>, std::vector<int64_t>>> specificProtocolHandlers;
644 std::mutex specificProtocolHandlersMutex;
650 uint64_t maxWebSocketMessagePayloadSize{ 16000000 };
652 std::atomic<bool> webSocketSessionClosed{
true };
654 std::atomic<bool> etpSessionClosed{
true };
656 std::atomic<double> _timeOut{ 30000 };
658 std::atomic<bool> _verbose{
false };
660 std::queue< std::tuple<std::shared_ptr<EtpMessage>, std::shared_ptr<ETP_NS::ProtocolHandlers>, std::vector<int64_t> > > sendingQueue;
661 std::mutex sendingQueueMutex;
663 std::atomic<int64_t> messageId;
665 boost::uuids::uuid identifier{ boost::uuids::nil_uuid() };
666 std::mutex identifierMutex;
668 bool isCloseRequested_{
false };
669 size_t reconnectionTryCount_ = 0;
671 AbstractSession() =
default;
673 virtual boost::asio::io_context& getIoContext() = 0;
675 void flushReceivingBuffer() {
676 receivedBuffer.consume(receivedBuffer.size());
679 void setEtpSessionClosed(
bool etpSessionClosed_) {
680 etpSessionClosed = etpSessionClosed_;
681 reconnectionTryCount_ = 0;
687 virtual void do_write() = 0;
693 Energistics::Etp::v12::Datatypes::MessageHeader decodeMessageHeader(avro::DecoderPtr decoder);
695 std::shared_ptr<ETP_NS::CoreHandlers> getCoreProtocolHandlers() {
696 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Core));
697 return it == protocolHandlers.end()
699 : std::dynamic_pointer_cast<CoreHandlers>(it->second);
702 std::shared_ptr<ETP_NS::DiscoveryHandlers> getDiscoveryProtocolHandlers() {
703 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Discovery));
704 return it == protocolHandlers.end()
706 : std::dynamic_pointer_cast<DiscoveryHandlers>(it->second);
709 std::shared_ptr<ETP_NS::StoreHandlers> getStoreProtocolHandlers() {
710 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Store));
711 return it == protocolHandlers.end()
713 : std::dynamic_pointer_cast<StoreHandlers>(it->second);
716 std::shared_ptr<ETP_NS::StoreNotificationHandlers> getStoreNotificationProtocolHandlers() {
717 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::StoreNotification));
718 return it == protocolHandlers.end()
720 : std::dynamic_pointer_cast<StoreNotificationHandlers>(it->second);
723 std::shared_ptr<ETP_NS::DataArrayHandlers> getDataArrayProtocolHandlers() {
724 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::DataArray));
725 return it == protocolHandlers.end()
727 : std::dynamic_pointer_cast<DataArrayHandlers>(it->second);
730 std::shared_ptr<ETP_NS::TransactionHandlers> getTransactionProtocolHandlers() {
731 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Transaction));
732 return it == protocolHandlers.end()
734 : std::dynamic_pointer_cast<TransactionHandlers>(it->second);
737 std::shared_ptr<ETP_NS::DataspaceHandlers> getDataspaceProtocolHandlers() {
738 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::Dataspace));
739 return it == protocolHandlers.end()
741 : std::dynamic_pointer_cast<DataspaceHandlers>(it->second);
744 std::shared_ptr<ETP_NS::CoreOSDUHandlers> getCoreOSDUProtocolHandlers() {
745 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::CoreOSDU));
746 return it == protocolHandlers.end()
748 : std::dynamic_pointer_cast<CoreOSDUHandlers>(it->second);
751 std::shared_ptr<ETP_NS::StoreOSDUHandlers> getStoreOSDUProtocolHandlers() {
752 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::StoreOSDU));
753 return it == protocolHandlers.end()
755 : std::dynamic_pointer_cast<StoreOSDUHandlers>(it->second);
758 std::shared_ptr<ETP_NS::DataspaceOSDUHandlers> getDataspaceOSDUProtocolHandlers() {
759 auto it = protocolHandlers.find(
static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type
>(Energistics::Etp::v12::Datatypes::Protocol::DataspaceOSDU));
760 return it == protocolHandlers.end()
762 : std::dynamic_pointer_cast<DataspaceOSDUHandlers>(it->second);
765 void setProtocolHandlers(std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type protocolId, std::shared_ptr<ProtocolHandlers> coreHandlers) {
768 throw std::logic_error(
"You cannot set some protocol handlers once the session is running.");
771 protocolHandlers[protocolId] = coreHandlers;
774 friend void CoreHandlers::decodeMessageBody(
const Energistics::Etp::v12::Datatypes::MessageHeader& mh, avro::DecoderPtr d);
775 friend void CoreHandlers::on_ProtocolException(
const Energistics::Etp::v12::Protocol::Core::ProtocolException& msg, int64_t correlationId);
776 friend void CoreOSDUHandlers::decodeMessageBody(
const Energistics::Etp::v12::Datatypes::MessageHeader& mh, avro::DecoderPtr d);