From d1385e48c40ba0ebfd6532110a3cd9551aaf9d1f Mon Sep 17 00:00:00 2001 From: Dave Wickelhaus Date: Sun, 23 Nov 2025 21:41:55 -0500 Subject: [PATCH 01/11] On branch main_websocket_fixes_3 Changes to be committed: modified: src/mtconnect/sink/rest_sink/rest_service.cpp - RestService::createAssetRoutings - added asset, assets commands to the handler routings - added assetsById command to to the idHandler routings - RestService::createSampleRoutings - added code to set the request->parameter("count") parameter to 100 if it's not instantiated. --- src/mtconnect/sink/rest_sink/rest_service.cpp | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/mtconnect/sink/rest_sink/rest_service.cpp b/src/mtconnect/sink/rest_sink/rest_service.cpp index 1f01a8445..d550ed06e 100644 --- a/src/mtconnect/sink/rest_sink/rest_service.cpp +++ b/src/mtconnect/sink/rest_sink/rest_service.cpp @@ -569,12 +569,14 @@ namespace mtconnect { "count={integer:100}&device={string}&pretty={bool:false}&format={string}"); m_server->addRouting({boost::beast::http::verb::get, "/assets?" + qp, handler}) .document("MTConnect assets request", "Returns up to `count` assets"); + m_server->addRouting({boost::beast::http::verb::get, "/{device}/assets?" + qp, handler}) + .document("MTConnect assets request", "Returns up to `count` assets for deivce `device`") + .command("assets"); m_server->addRouting({boost::beast::http::verb::get, "/asset?" + qp, handler}) .document("MTConnect asset request", "Returns up to `count` assets"); - m_server->addRouting({boost::beast::http::verb::get, "/{device}/assets?" + qp, handler}) - .document("MTConnect assets request", "Returns up to `count` assets for deivce `device`"); m_server->addRouting({boost::beast::http::verb::get, "/{device}/asset?" + qp, handler}) - .document("MTConnect asset request", "Returns up to `count` assets for deivce `device`"); + .document("MTConnect asset request", "Returns up to `count` assets for deivce `device`") + .command("asset"); m_server->addRouting({boost::beast::http::verb::get, "/assets/{assetIds}", idHandler}) .document( "MTConnect assets request", @@ -582,7 +584,8 @@ namespace mtconnect { m_server->addRouting({boost::beast::http::verb::get, "/asset/{assetIds}", idHandler}) .document("MTConnect asset request", "Returns a set of assets identified by asset ids `asset` separated by " - "semi-colon (;)"); + "semi-colon (;)") + .command("assetsById"); if (m_server->arePutsAllowed()) { @@ -727,9 +730,13 @@ namespace mtconnect { void RestService::createSampleRoutings() { using namespace rest_sink; + + auto handler = [&](SessionPtr session, RequestPtr request) -> bool { request->m_request = "MTConnectStreams"; + if(!request->parameter("count")) request->m_parameters["count"] =100; + auto interval = request->parameter("interval"); if (interval) { From 888a48acb856a83f4be025f472c867abfe69230f Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Tue, 16 Dec 2025 18:34:57 +0100 Subject: [PATCH 02/11] Updated validation information --- src/mtconnect/validation/observation_validations.hpp | 6 ++++-- test_package/task_test.cpp | 10 +++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/mtconnect/validation/observation_validations.hpp b/src/mtconnect/validation/observation_validations.hpp index f91d98024..52308e458 100644 --- a/src/mtconnect/validation/observation_validations.hpp +++ b/src/mtconnect/validation/observation_validations.hpp @@ -4,7 +4,6 @@ Validation ControlledVocabularies { {{"ACTIVE", {SCHEMA_VERSION(1, 2), 0}}, {"INACTIVE", {SCHEMA_VERSION(1, 2), 0}}}}, {"Alarm", {}}, {"AssetChanged", {}}, - {"AssetAdded", {}}, {"AssetRemoved", {}}, {"Availability", {{"AVAILABLE", {SCHEMA_VERSION(1, 1), 0}}, {"UNAVAILABLE", {SCHEMA_VERSION(1, 1), 0}}}}, @@ -249,4 +248,7 @@ Validation ControlledVocabularies { {"ActivePowerSource", {}}, {"LocationNarrative", {}}, {"Thickness", {}}, - {"LocationSpatialGeographic", {}}}; + {"LocationSpatialGeographic", {}}, + {"PartIndex", {}}, + {"AssociatedAssetId", {}}, + {"AssetAdded", {}}}; \ No newline at end of file diff --git a/test_package/task_test.cpp b/test_package/task_test.cpp index cf72162db..b07042d62 100644 --- a/test_package/task_test.cpp +++ b/test_package/task_test.cpp @@ -1190,13 +1190,13 @@ TEST_F(TaskAssetTest, task_must_have_a_task_state) )DOC"; - + ErrorList errors; entity::XmlParser parser; auto entity = parser.parse(Asset::getRoot(), doc, errors); ASSERT_EQ(1, errors.size()); - + EXPECT_EQ("Task(TaskState): Property TaskState is required and not provided"s, errors.front()->what()); } @@ -1220,13 +1220,13 @@ TEST_F(TaskAssetTest, task_must_have_a_task_type) )DOC"; - + ErrorList errors; entity::XmlParser parser; - + auto entity = parser.parse(Asset::getRoot(), doc, errors); ASSERT_EQ(1, errors.size()); - + EXPECT_EQ("Task(TaskType): Property TaskType is required and not provided"s, errors.front()->what()); } From 7ffde547a6a421634c90f535db1d3248064455e0 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Fri, 2 Jan 2026 16:58:57 +0100 Subject: [PATCH 03/11] Refactored websocket session for testing and added first websocket integration test --- agent_lib/CMakeLists.txt | 1 + src/mtconnect/sink/rest_sink/rest_service.cpp | 4 +- src/mtconnect/sink/rest_sink/routing.hpp | 4 + .../rest_sink/websocket_request_manager.hpp | 301 ++++++++++++ .../sink/rest_sink/websocket_session.hpp | 442 ++++++------------ test_package/CMakeLists.txt | 1 + test_package/agent_test_helper.cpp | 24 + test_package/agent_test_helper.hpp | 173 ++++++- test_package/websockets_rest_sink_test.cpp | 203 ++++++++ 9 files changed, 856 insertions(+), 297 deletions(-) create mode 100644 src/mtconnect/sink/rest_sink/websocket_request_manager.hpp create mode 100644 test_package/websockets_rest_sink_test.cpp diff --git a/agent_lib/CMakeLists.txt b/agent_lib/CMakeLists.txt index 4af4e1b36..637701afb 100644 --- a/agent_lib/CMakeLists.txt +++ b/agent_lib/CMakeLists.txt @@ -288,6 +288,7 @@ set(AGENT_SOURCES "${SOURCE_DIR}/sink/rest_sink/session_impl.hpp" "${SOURCE_DIR}/sink/rest_sink/tls_dector.hpp" "${SOURCE_DIR}/sink/rest_sink/websocket_session.hpp" + "${SOURCE_DIR}/sink/rest_sink/websocket_request_manager.hpp" # src/sink/rest_sink SOURCE_FILES_ONLY diff --git a/src/mtconnect/sink/rest_sink/rest_service.cpp b/src/mtconnect/sink/rest_sink/rest_service.cpp index d550ed06e..62491118e 100644 --- a/src/mtconnect/sink/rest_sink/rest_service.cpp +++ b/src/mtconnect/sink/rest_sink/rest_service.cpp @@ -731,11 +731,11 @@ namespace mtconnect { { using namespace rest_sink; - auto handler = [&](SessionPtr session, RequestPtr request) -> bool { request->m_request = "MTConnectStreams"; - if(!request->parameter("count")) request->m_parameters["count"] =100; + if (!request->parameter("count")) + request->m_parameters["count"] = 100; auto interval = request->parameter("interval"); if (interval) diff --git a/src/mtconnect/sink/rest_sink/routing.hpp b/src/mtconnect/sink/rest_sink/routing.hpp index 16813c01c..f3ee2f37a 100644 --- a/src/mtconnect/sink/rest_sink/routing.hpp +++ b/src/mtconnect/sink/rest_sink/routing.hpp @@ -283,6 +283,10 @@ namespace mtconnect::sink::rest_sink { errors.emplace_back(error); } } + else if (!std::holds_alternative(p.m_default)) + { + request->m_parameters.emplace(make_pair(p.m_name, p.m_default)); + } } if (!errors.empty()) diff --git a/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp b/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp new file mode 100644 index 000000000..16896ffda --- /dev/null +++ b/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp @@ -0,0 +1,301 @@ + +// +// Copyright Copyright 2009-2022, AMT – The Association For Manufacturing Technology (“AMT”) +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include "session.hpp" + +namespace mtconnect::sink::rest_sink { + class WebsocketRequestManager + { + public: + /// @brief Wrapper around a request with additional infomation required for a WebSocket request + struct WebsocketRequest + { + WebsocketRequest(const std::string &id) : m_requestId(id) {} + std::string m_requestId; //! The id of the request + std::optional m_streamBuffer; //! The streambuffer used in responses + Complete m_complete; //! A complete function when the request has finished + bool m_streaming {false}; //! A flag to indicate the request is a streaming request + RequestPtr m_request; //! A pointer to the underlying incoming request + }; + + /// @brief Create a request dispatcher + /// @param httpRequest a copy of the incoming HTTP request + /// @param dispatch the dispatch function to call + WebsocketRequestManager(RequestPtr &&httpRequest, Dispatch dispatch) + : m_httpRequest(std::move(httpRequest)), m_dispatch(dispatch) + {} + + /// @brief Destroy the request manager + ~WebsocketRequestManager() {} + + /// @brief clear the request and the set of requests + void reset() + { + m_httpRequest.reset(); + m_requests.clear(); + } + + /// @brief Set the current request (used for testing). + /// @param request the request that is owned by the manager + void setHttpRequest(RequestPtr &&request) + { + m_httpRequest = std::move(request); + } + + /// @brief Get the current HTTP request + /// @returns a pointer to the HTTP request + RequestPtr getHttpRequest() const + { + return m_httpRequest; + } + + /// @brief Finds the request for a given id + /// @param id the id to search for + /// @returns a pointer to the request structure or null if it is not found + WebsocketRequest *findRequest(const std::string &id) + { + auto it = m_requests.find(id); + if (it != m_requests.end()) + { + return &(it->second); + } + else + { + return nullptr; + } + } + + /// @brief finds or creates a WebSocketRequest structure and return it. + /// @param id the id of the request to create + /// @returns a pointer to the new websocket request or the existing one. + WebsocketRequest *findOrCreateRequest(const std::string &id) + { + auto res = m_requests.emplace(id, id); + return &res.first->second; + } + + /// @brief finds or creates a WebSocketRequest structure and return it. + /// @param id the id of the request to create + /// @returns a pointer to the new websocket request or the existing one. + WebsocketRequest *createRequest(const std::string &id) + { + auto it = m_requests.find(id); + if (it == m_requests.end()) + { + auto res = m_requests.emplace(id, id); + return &res.first->second; + } + else + { + return nullptr; + } + } + + /// @brief Remove a request from the known requests + /// @param id the id of the request to remove + void remove(const std::string &id) { m_requests.erase(id); } + + /// @brief Parse a JSON request buffer and create a new request ptr. + /// @param buffer the text to parse + RequestPtr parse(const std::string &buffer) + { + using namespace rapidjson; + using namespace std; + + Document doc; + doc.Parse(buffer.c_str()); + + RequestPtr request; + + if (doc.HasParseError()) + { + stringstream err; + err << "Websocket Read Error(offset (" << doc.GetErrorOffset() + << ")): " << GetParseError_En(doc.GetParseError()); + LOG(warning) << err.str(); + LOG(warning) << " " << buffer; + auto error = Error::make(Error::ErrorCode::INVALID_REQUEST, err.str()); + throw RestError(error, m_httpRequest->m_accepts, rest_sink::status::bad_request, std::nullopt, + "ERROR"); + } + if (!doc.IsObject()) + { + LOG(warning) << "Websocket Read Error: JSON message does not have a top level object"; + LOG(warning) << " " << buffer; + auto error = Error::make(Error::ErrorCode::INVALID_REQUEST, + "JSON message does not have a top level object"); + throw RestError(error, m_httpRequest->m_accepts, rest_sink::status::bad_request, std::nullopt, + "ERROR"); + } + else + { + // Extract the parameters from the json doc to map them to the REST + // protocol parameters + request = make_unique(*m_httpRequest); + + request->m_verb = boost::beast::http::verb::get; + request->m_parameters.clear(); + +#ifdef GetObject +#define __GOSave__ GetObject +#undef GetObject +#endif + const auto &object = doc.GetObject(); +#ifdef __GOSave__ +#define GetObject __GOSave__ +#endif + + for (auto &it : object) + { + switch (it.value.GetType()) + { + case rapidjson::kNullType: + // Skip nulls + break; + case rapidjson::kFalseType: + request->m_parameters.emplace( + make_pair(string(it.name.GetString()), ParameterValue(false))); + break; + case rapidjson::kTrueType: + request->m_parameters.emplace( + make_pair(string(it.name.GetString()), ParameterValue(true))); + break; + case rapidjson::kObjectType: + break; + case rapidjson::kArrayType: + break; + case rapidjson::kStringType: + request->m_parameters.emplace( + make_pair(it.name.GetString(), ParameterValue(string(it.value.GetString())))); + break; + case rapidjson::kNumberType: + if (it.value.IsInt()) + request->m_parameters.emplace( + make_pair(it.name.GetString(), ParameterValue(it.value.GetInt()))); + else if (it.value.IsUint()) + request->m_parameters.emplace( + make_pair(it.name.GetString(), ParameterValue(uint64_t(it.value.GetUint())))); + else if (it.value.IsInt64()) + request->m_parameters.emplace( + make_pair(it.name.GetString(), ParameterValue(uint64_t(it.value.GetInt64())))); + else if (it.value.IsUint64()) + request->m_parameters.emplace( + make_pair(it.name.GetString(), ParameterValue(it.value.GetUint64()))); + else if (it.value.IsDouble()) + request->m_parameters.emplace( + make_pair(it.name.GetString(), ParameterValue(double(it.value.GetDouble())))); + break; + } + } + } + + return request; + } + + /// @brief dispatch a JSON request buffer for a session + /// @param session A shared pointer to the session that we are dispatching for + /// @param buffer the JSON request string + /// @param outId optional pointer to a string to receive the request id + /// @returns `true` if the dispatch was successful. + bool dispatch(SessionPtr session, const std::string &buffer, + std::string *outId = nullptr) + { + using namespace std; + + auto request = parse(buffer); + if (!request) + { + return false; + } + + if (request->m_parameters.count("id") > 0) + { + auto &v = request->m_parameters["id"]; + string id = visit(overloaded {[](monostate m) { return ""s; }, + [](auto v) { return boost::lexical_cast(v); }}, + v); + request->m_requestId = id; + request->m_parameters.erase("id"); + } + else + { + auto error = InvalidParameterValue::make("id", "", "string", "string", "No id given"); + throw RestError(error, request->m_accepts, rest_sink::status::bad_request, std::nullopt, + "ERROR"); + } + + auto &id = *(request->m_requestId); + if (outId) + *outId = id; + auto res = m_requests.emplace(id, id); + if (!res.second) + { + LOG(error) << "Duplicate request id: " << id; + auto error = InvalidParameterValue::make("id", *request->m_requestId, "string", "string", + "Duplicate id given"); + throw RestError(error, request->m_accepts, rest_sink::status::bad_request, std::nullopt, + "ERROR"); + } + + if (request->m_parameters.count("request") > 0) + { + request->m_command = get(request->m_parameters["request"]); + request->m_parameters.erase("request"); + } + else + { + auto error = + InvalidParameterValue::make("request", "", "string", "string", "No request given"); + throw RestError(error, request->m_accepts, rest_sink::status::bad_request, std::nullopt, + id); + } + + // Check parameters for command + LOG(debug) << "Received request id: " << id; + + res.first->second.m_request = std::move(request); + try + { + return m_dispatch(session, res.first->second.m_request); + } + + catch (RestError &re) + { + re.setRequestId(id); + throw re; + } + + return false; + } + + protected: + RequestPtr m_httpRequest; //! A pointer to the original HTTP request + Dispatch m_dispatch; //! The dispatch function + std::map m_requests; //! The map of requests this class manages + }; +} // namespace mtconnect::sink::rest_sink diff --git a/src/mtconnect/sink/rest_sink/websocket_session.hpp b/src/mtconnect/sink/rest_sink/websocket_session.hpp index f43a425e2..be8b8c407 100644 --- a/src/mtconnect/sink/rest_sink/websocket_session.hpp +++ b/src/mtconnect/sink/rest_sink/websocket_session.hpp @@ -31,22 +31,13 @@ #include "mtconnect/configuration/config_options.hpp" #include "mtconnect/utilities.hpp" #include "session.hpp" +#include "websocket_request_manager.hpp" namespace mtconnect::sink::rest_sink { namespace beast = boost::beast; - struct WebsocketRequest - { - WebsocketRequest(const std::string &id) : m_requestId(id) {} - std::string m_requestId; - std::optional m_streamBuffer; - Complete m_complete; - bool m_streaming {false}; - RequestPtr m_request; - }; - /// @brief A websocket session that provides a pubsub interface using REST parameters - template + template class WebsocketSession : public Session { protected: @@ -62,45 +53,23 @@ namespace mtconnect::sink::rest_sink { }; public: - using RequestMessage = boost::beast::http::request; - - WebsocketSession(RequestPtr &&request, RequestMessage &&msg, Dispatch dispatch, + WebsocketSession(RequestPtr &&request, Dispatch dispatch, ErrorFunction func) - : Session(dispatch, func), m_request(std::move(request)), m_msg(std::move(msg)) + : Session(dispatch, func), + m_requestManager(std::move(request), dispatch) {} /// @brief Session cannot be copied. WebsocketSession(const WebsocketSession &) = delete; ~WebsocketSession() = default; - - /// @brief get this as the `Derived` type - /// @return the subclass + Derived &derived() { return static_cast(*this); } - - void run() override - { - using namespace boost::beast; - - // Set suggested timeout settings for the websocket - derived().stream().set_option( - websocket::stream_base::timeout::suggested(beast::role_type::server)); - - // Set a decorator to change the Server of the handshake - derived().stream().set_option( - websocket::stream_base::decorator([](websocket::response_type &res) { - res.set(http::field::server, GetAgentVersion() + " MTConnectAgent"); - })); - - // Accept the websocket handshake - derived().stream().async_accept( - m_msg, boost::asio::bind_executor(derived().getExecutor(), - beast::bind_front_handler(&WebsocketSession::onAccept, - derived().shared_ptr()))); - } + + auto &getRequestManager() { return m_requestManager; } void close() override { - NAMED_SCOPE("PlainWebsocketSession::close"); + NAMED_SCOPE("WebsocketSession::close"); if (!m_isOpen) return; @@ -113,8 +82,7 @@ namespace mtconnect::sink::rest_sink { ptr = wptr.lock(); } - m_request.reset(); - m_requests.clear(); + m_requestManager.reset(); for (auto obs : m_observers) { auto optr = obs.lock(); @@ -150,11 +118,10 @@ namespace mtconnect::sink::rest_sink { if (requestId) { auto id = *(requestId); - auto it = m_requests.find(id); - if (it != m_requests.end()) + auto req = m_requestManager.findRequest(id); + if (req != nullptr) { - auto &req = it->second; - req.m_streaming = true; + req->m_streaming = true; if (complete) { @@ -177,7 +144,7 @@ namespace mtconnect::sink::rest_sink { { NAMED_SCOPE("WebsocketSession::writeChunk"); - if (!derived().stream().is_open()) + if (!derived().isStreamOpen()) { return; } @@ -205,86 +172,61 @@ namespace mtconnect::sink::rest_sink { } protected: - void onAccept(boost::beast::error_code ec) - { - if (ec) - { - fail(status::internal_server_error, "Error occurred in accpet", ec); - return; - } - - m_isOpen = true; - - derived().stream().async_read( - m_buffer, beast::bind_front_handler(&WebsocketSession::onRead, derived().shared_ptr())); - } - void send(const std::string body, Complete complete, const std::string &requestId) { NAMED_SCOPE("WebsocketSession::send"); - + using namespace std::placeholders; - - auto it = m_requests.find(requestId); - if (it != m_requests.end()) + + auto req = m_requestManager.findRequest(requestId); + if (req != nullptr) { - auto &req = it->second; - req.m_complete = std::move(complete); - req.m_streamBuffer.emplace(); - std::ostream str(&req.m_streamBuffer.value()); - + req->m_complete = std::move(complete); + req->m_streamBuffer.emplace(); + std::ostream str(&req->m_streamBuffer.value()); + str << body; - - auto ref = derived().shared_ptr(); - + LOG(debug) << "writing chunk for ws: " << requestId; - + m_busy = true; - - derived().stream().text(derived().stream().got_text()); - derived().stream().async_write(req.m_streamBuffer->data(), - beast::bind_handler( - [ref, requestId](beast::error_code ec, std::size_t len) { - ref->sent(ec, len, requestId); - }, - _1, _2)); + derived().asyncSend(req); } else { LOG(error) << "Cannot find request for id: " << requestId; } } - + void sent(beast::error_code ec, std::size_t len, const std::string &id) { NAMED_SCOPE("WebsocketSession::sent"); - + if (ec) { return fail(status::bad_request, "Missing request Id", ec); } - + { LOG(trace) << "Waiting for mutex"; std::lock_guard lock(m_mutex); - + LOG(trace) << "sent chunk for ws: " << id; - - auto it = m_requests.find(id); - if (it != m_requests.end()) + + auto req = m_requestManager.findRequest(id); + if (req != nullptr) { - auto &req = it->second; - if (req.m_complete) + if (req->m_complete) { - boost::asio::post(derived().stream().get_executor(), req.m_complete); - req.m_complete = nullptr; + boost::asio::post(derived().getExecutor(), req->m_complete); + req->m_complete = nullptr; } - - if (!req.m_streaming) + + if (!req->m_streaming) { - m_requests.erase(id); + m_requestManager.remove(id); } - + if (m_messageQueue.size() == 0) { m_busy = false; @@ -295,11 +237,11 @@ namespace mtconnect::sink::rest_sink { LOG(error) << "WebsocketSession::sent: Cannot find request for id: " << id; } } - + { LOG(trace) << "Waiting for mutex to send next"; std::lock_guard lock(m_mutex); - + // Check for queued messages if (m_messageQueue.size() > 0) { @@ -310,197 +252,131 @@ namespace mtconnect::sink::rest_sink { } } - RequestPtr parseRequest(const std::string &buffer) - { - using namespace rapidjson; - using namespace std; - Document doc; - doc.Parse(buffer.c_str()); - - RequestPtr request; - - if (doc.HasParseError()) - { - stringstream err; - err << "Websocket Read Error(offset (" << doc.GetErrorOffset() - << ")): " << GetParseError_En(doc.GetParseError()); - LOG(warning) << err.str(); - LOG(warning) << " " << buffer; - auto error = Error::make(Error::ErrorCode::INVALID_REQUEST, err.str()); - throw RestError(error, m_request->m_accepts, rest_sink::status::bad_request, std::nullopt, - "ERROR"); - } - if (!doc.IsObject()) - { - LOG(warning) << "Websocket Read Error: JSON message does not have a top level object"; - LOG(warning) << " " << buffer; - auto error = Error::make(Error::ErrorCode::INVALID_REQUEST, - "JSON message does not have a top level object"); - throw RestError(error, m_request->m_accepts, rest_sink::status::bad_request, std::nullopt, - "ERROR"); - } - else - { - // Extract the parameters from the json doc to map them to the REST - // protocol parameters - request = make_unique(*m_request); - - request->m_verb = beast::http::verb::get; - request->m_parameters.clear(); -#ifdef GetObject -#define __GOSave__ GetObject -#undef GetObject -#endif - - const auto &object = doc.GetObject(); -#ifdef __GOSave__ -#define GetObject __GOSave__ -#endif - - for (auto &it : object) - { - switch (it.value.GetType()) - { - case rapidjson::kNullType: - // Skip nulls - break; - case rapidjson::kFalseType: - request->m_parameters.emplace( - make_pair(string(it.name.GetString()), ParameterValue(false))); - break; - case rapidjson::kTrueType: - request->m_parameters.emplace( - make_pair(string(it.name.GetString()), ParameterValue(true))); - break; - case rapidjson::kObjectType: - break; - case rapidjson::kArrayType: - break; - case rapidjson::kStringType: - request->m_parameters.emplace( - make_pair(it.name.GetString(), ParameterValue(string(it.value.GetString())))); - - break; - case rapidjson::kNumberType: - if (it.value.IsInt()) - request->m_parameters.emplace( - make_pair(it.name.GetString(), ParameterValue(it.value.GetInt()))); - else if (it.value.IsUint()) - request->m_parameters.emplace( - make_pair(it.name.GetString(), ParameterValue(uint64_t(it.value.GetUint())))); - else if (it.value.IsInt64()) - request->m_parameters.emplace( - make_pair(it.name.GetString(), ParameterValue(uint64_t(it.value.GetInt64())))); - else if (it.value.IsUint64()) - request->m_parameters.emplace( - make_pair(it.name.GetString(), ParameterValue(it.value.GetUint64()))); - else if (it.value.IsDouble()) - request->m_parameters.emplace( - make_pair(it.name.GetString(), ParameterValue(double(it.value.GetDouble())))); - - break; - } - } - } - - return request; + protected: + WebsocketRequestManager m_requestManager; + std::mutex m_mutex; + std::atomic_bool m_busy {false}; + std::deque m_messageQueue; + bool m_isOpen {false}; + }; + + /// @brief A websocket session that provides a pubsub interface using REST parameters + template + class WebsocketSessionImpl : public WebsocketSession + { + public: + using RequestMessage = boost::beast::http::request; + using super = WebsocketSession; + + WebsocketSessionImpl(RequestPtr &&request, RequestMessage &&msg, Dispatch dispatch, + ErrorFunction func) + : super(std::move(request), dispatch, func), + m_msg(std::move(msg)) + {} + + /// @brief Session cannot be copied. + WebsocketSessionImpl(const WebsocketSessionImpl &) = delete; + ~WebsocketSessionImpl() = default; + + /// @brief get this as the `Derived` type + /// @return the subclass + Derived &derived() { return static_cast(*this); } + + bool isStreamOpen() + { + return derived().stream().is_open(); } - - bool dispatchRequest(RequestPtr &&request) + + auto getExecutor() { return derived().stream().get_executor(); } + + void run() override { - using namespace std; - - if (request->m_parameters.count("id") > 0) - { - auto &v = request->m_parameters["id"]; - string id = visit(overloaded {[](monostate m) { return ""s; }, - [](auto v) { return boost::lexical_cast(v); }}, - v); - request->m_requestId = id; - request->m_parameters.erase("id"); - } - else - { - auto error = InvalidParameterValue::make("id", "", "string", "string", "No id given"); - throw RestError(error, request->m_accepts, rest_sink::status::bad_request, std::nullopt, - "ERROR"); - } - - auto &id = *(request->m_requestId); - auto res = m_requests.emplace(id, id); - if (!res.second) - { - LOG(error) << "Duplicate request id: " << id; - auto error = InvalidParameterValue::make("id", *request->m_requestId, "string", "string", - "Duplicate id given"); - throw RestError(error, request->m_accepts, rest_sink::status::bad_request, std::nullopt, - "ERROR"); - } - - if (request->m_parameters.count("request") > 0) - { - request->m_command = get(request->m_parameters["request"]); - request->m_parameters.erase("request"); - } - else + using namespace boost::beast; + + // Set suggested timeout settings for the websocket + derived().stream().set_option( + websocket::stream_base::timeout::suggested(beast::role_type::server)); + + // Set a decorator to change the Server of the handshake + derived().stream().set_option( + websocket::stream_base::decorator([](websocket::response_type &res) { + res.set(http::field::server, GetAgentVersion() + " MTConnectAgent"); + })); + + // Accept the websocket handshake + derived().stream().async_accept( + m_msg, boost::asio::bind_executor(derived().getExecutor(), + beast::bind_front_handler(&WebsocketSessionImpl::onAccept, + derived().shared_ptr()))); + } + + protected: + friend class WebsocketSession; + + void onAccept(boost::beast::error_code ec) + { + if (ec) { - auto error = - InvalidParameterValue::make("request", "", "string", "string", "No request given"); - throw RestError(error, request->m_accepts, rest_sink::status::bad_request, std::nullopt, - id); + super::fail(status::internal_server_error, "Error occurred in accpet", ec); + return; } + + super::m_isOpen = true; + + derived().stream().async_read( + m_buffer, beast::bind_front_handler(&WebsocketSessionImpl::onRead, derived().shared_ptr())); + } + + void asyncSend(WebsocketRequestManager::WebsocketRequest *request) + { + NAMED_SCOPE("WebsocketSessionImpl::asyncSend"); - // Check parameters for command - LOG(debug) << "Received request id: " << id; - - res.first->second.m_request = std::move(request); - try - { - return m_dispatch(derived().shared_ptr(), res.first->second.m_request); - } + using namespace std::placeholders; - catch (RestError &re) - { - re.setRequestId(id); - throw re; - } + auto ref = derived().shared_ptr(); - return false; + auto &requestId = request->m_requestId; + derived().stream().text(derived().stream().got_text()); + derived().stream().async_write(request->m_streamBuffer->data(), + beast::bind_handler( + [ref, requestId](beast::error_code ec, std::size_t len) { + ref->sent(ec, len, requestId); + }, + _1, _2)); } - + void onRead(beast::error_code ec, std::size_t len) { NAMED_SCOPE("PlainWebsocketSession::onRead"); - + if (ec) - return fail(boost::beast::http::status::internal_server_error, "shutdown", ec); - + return super::fail(boost::beast::http::status::internal_server_error, "shutdown", ec); + if (len == 0) { LOG(debug) << "Empty message received"; return; } - + // Parse the buffer as a JSON request with parameters matching // REST API derived().stream().text(derived().stream().got_text()); auto buffer = beast::buffers_to_string(m_buffer.data()); m_buffer.consume(m_buffer.size()); - + LOG(debug) << "Received :" << buffer; - + try { - auto request = parseRequest(buffer); - if (!request || !dispatchRequest(std::move(request))) + if (!super::m_requestManager.dispatch(derived().shared_ptr(), buffer)) { std::stringstream txt; - txt << getRemote().address() << ": Dispatch failed for: " << buffer; + txt << super::getRemote().address() << ": Dispatch failed for: " << buffer; LOG(error) << txt.str(); } } - + catch (RestError &re) { auto id = re.getRequestId(); @@ -509,56 +385,44 @@ namespace mtconnect::sink::rest_sink { id = "ERROR"; re.setRequestId(*id); } - - auto res = m_requests.find(*id); - if (res == m_requests.end()) - m_requests.emplace(*id, *id); - - m_errorFunction(derived().shared_ptr(), re); + + super::m_requestManager.findOrCreateRequest(*id); + super::m_errorFunction(derived().shared_ptr(), re); } - + catch (std::logic_error &le) { std::stringstream txt; - txt << getRemote().address() << ": Logic Error: " << le.what(); + txt << super::getRemote().address() << ": Logic Error: " << le.what(); LOG(error) << txt.str(); - fail(boost::beast::http::status::not_found, txt.str()); + super::fail(boost::beast::http::status::not_found, txt.str()); } - + catch (...) { std::stringstream txt; - txt << getRemote().address() << ": Unknown Error thrown"; + txt << super::getRemote().address() << ": Unknown Error thrown"; LOG(error) << txt.str(); - fail(boost::beast::http::status::not_found, txt.str()); + super::fail(boost::beast::http::status::not_found, txt.str()); } - + derived().stream().async_read( - m_buffer, beast::bind_front_handler(&WebsocketSession::onRead, derived().shared_ptr())); + m_buffer, beast::bind_front_handler(&WebsocketSessionImpl::onRead, derived().shared_ptr())); } - + protected: - RequestPtr m_request; RequestMessage m_msg; beast::flat_buffer m_buffer; - std::map m_requests; - std::mutex m_mutex; - std::atomic_bool m_busy {false}; - std::deque m_messageQueue; - bool m_isOpen {false}; }; - template - using WebsocketSessionPtr = std::shared_ptr>; - - class PlainWebsocketSession : public WebsocketSession + class PlainWebsocketSession : public WebsocketSessionImpl { public: using Stream = beast::websocket::stream; PlainWebsocketSession(beast::tcp_stream &&stream, RequestPtr &&request, RequestMessage &&msg, Dispatch dispatch, ErrorFunction func) - : WebsocketSession(std::move(request), std::move(msg), dispatch, func), + : WebsocketSessionImpl(std::move(request), std::move(msg), dispatch, func), m_stream(std::move(stream)) { beast::get_lowest_layer(m_stream).expires_never(); @@ -575,8 +439,6 @@ namespace mtconnect::sink::rest_sink { m_stream.close(beast::websocket::close_code::none); } - auto getExecutor() { return m_stream.get_executor(); } - auto &stream() { return m_stream; } /// @brief Get a pointer cast as an Websocket Session @@ -590,14 +452,14 @@ namespace mtconnect::sink::rest_sink { Stream m_stream; }; - class TlsWebsocketSession : public WebsocketSession + class TlsWebsocketSession : public WebsocketSessionImpl { public: using Stream = beast::websocket::stream>; TlsWebsocketSession(beast::ssl_stream &&stream, RequestPtr &&request, RequestMessage &&msg, Dispatch dispatch, ErrorFunction func) - : WebsocketSession(std::move(request), std::move(msg), dispatch, func), + : WebsocketSessionImpl(std::move(request), std::move(msg), dispatch, func), m_stream(std::move(stream)) { beast::get_lowest_layer(m_stream).expires_never(); @@ -610,8 +472,6 @@ namespace mtconnect::sink::rest_sink { auto &stream() { return m_stream; } - auto getExecutor() { return m_stream.get_executor(); } - void closeStream() override { if (m_isOpen && m_stream.is_open()) diff --git a/test_package/CMakeLists.txt b/test_package/CMakeLists.txt index b93a366fa..e26e3f04d 100644 --- a/test_package/CMakeLists.txt +++ b/test_package/CMakeLists.txt @@ -253,6 +253,7 @@ add_agent_test(qname FALSE entity) add_agent_test(file_cache FALSE sink/rest_sink) add_agent_test(http_server FALSE sink/rest_sink TRUE) add_agent_test(websockets FALSE sink/rest_sink TRUE) +add_agent_test(websockets_rest_sink FALSE sink/rest_sink TRUE) add_agent_test(tls_http_server FALSE sink/rest_sink TRUE) add_agent_test(routing FALSE sink/rest_sink) diff --git a/test_package/agent_test_helper.cpp b/test_package/agent_test_helper.cpp index ba25378aa..be5b3047b 100644 --- a/test_package/agent_test_helper.cpp +++ b/test_package/agent_test_helper.cpp @@ -98,3 +98,27 @@ void AgentTestHelper::responseHelper(const char *file, int line, const QueryMap makeRequest(file, line, http::verb::get, "", aQueries, path, accepts); doc = nlohmann::json::parse(m_session->m_body); } + +void AgentTestHelper::makeWebSocketRequest(const char *file, int line, const std::string &json, xmlDocPtr *doc, + std::string &id) +{ + m_dispatched = m_websocketSession->dispatch(json, id); + auto response = m_websocketSession->getNextResponse(id); + ASSERT_TRUE(response) << "No response for id " << id; + if (response) + { + *doc = xmlParseMemory(response->c_str(), int32_t(response->size())); + } +} + +void AgentTestHelper::makeWebSocketRequest(const char *file, int line, const std::string &json, nlohmann::json &doc, + std::string &id) +{ + m_dispatched = m_websocketSession->dispatch(json, id); + auto response = m_websocketSession->getNextResponse(id); + ASSERT_TRUE(response) << "No response for id " << id; + if (response) + { + doc = nlohmann::json::parse(*response); + } +} diff --git a/test_package/agent_test_helper.hpp b/test_package/agent_test_helper.hpp index f1aa59a7d..ad88c7938 100644 --- a/test_package/agent_test_helper.hpp +++ b/test_package/agent_test_helper.hpp @@ -36,6 +36,8 @@ #include "mtconnect/sink/rest_sink/routing.hpp" #include "mtconnect/sink/rest_sink/server.hpp" #include "mtconnect/sink/rest_sink/session.hpp" +#include "mtconnect/sink/rest_sink/websocket_request_manager.hpp" +#include "mtconnect/sink/rest_sink/websocket_session.hpp" #include "mtconnect/source/adapter/shdr/shdr_adapter.hpp" #include "mtconnect/source/loopback_source.hpp" #include "test_utilities.hpp" @@ -106,7 +108,80 @@ namespace mtconnect { std::string m_chunkMimeType; bool m_streaming {false}; }; + + class TestWebsocketSession : public WebsocketSession + { + public: + TestWebsocketSession(boost::asio::executor &&exec, RequestPtr &&request, Dispatch dispatch, + ErrorFunction func) + : WebsocketSession(std::move(request), dispatch, func), m_executor(std::move(exec)) + { + m_isOpen = true; + } + ~TestWebsocketSession() {} + std::shared_ptr shared_ptr() + { + return std::dynamic_pointer_cast(shared_from_this()); + } + + void run() override {} + + void read(const std::string &json) + { + if (!m_requestManager.dispatch(shared_ptr(), json)) + { + LOG(error) << "Dispatch failed for: " << json; + } + } + + void closeStream() override {} + + bool isStreamOpen() { return m_isOpen; } + + void asyncSend(WebsocketRequestManager::WebsocketRequest *request) + { + auto buffer = beast::buffers_to_string(request->m_streamBuffer->data()); + m_responses[request->m_requestId].emplace(buffer); + + beast::error_code ec; + boost::asio::post(m_executor, boost::bind(&TestWebsocketSession::sent, shared_ptr(), ec, 0, + request->m_requestId)); + } + + auto &getExecutor() { return m_executor; } + + bool dispatch(const std::string &buffer, std::string &id) + { + return m_requestManager.dispatch(shared_ptr(), buffer, &id); + } + + + bool hasResponse(const std::string &id) const + { + const auto q = m_responses.find(id); + return q != m_responses.end() && !q->second.empty(); + } + std::optional getNextResponse(const std::string &id) + { + auto q = m_responses.find(id); + if (q != m_responses.end() && !q->second.empty()) + { + auto response = q->second.front(); + q->second.pop(); + return response; + } + else + { + return std::nullopt; + } + } + + std::map> m_responses; + + protected: + boost::asio::executor m_executor; + }; } // namespace rest_sink } // namespace sink } // namespace mtconnect @@ -135,32 +210,101 @@ class AgentTestHelper } auto session() { return m_session; } + auto websocketSession() { return m_websocketSession; } void setAgentCreateHook(Hook &hook) { m_agentCreateHook = hook; } - // Helper method to test expected string, given optional query, & run tests + /// @brief Helper to get a response from the agent + /// @param file The source file the request is made from + /// @param line The line number + /// @param aQueries The query parameters + /// @param doc The returned document + /// @param path The request path + /// @param accepts The accepted mime type void responseHelper(const char *file, int line, const mtconnect::sink::rest_sink::QueryMap &aQueries, xmlDocPtr *doc, const char *path, const char *accepts = "text/xml"); + + /// @brief Helper to get a streaming response from the agent + /// @param file The source file the request is made from + /// @param line The line number + /// @param aQueries The query parameters + /// @param path The request path + /// @param accepts The accepted mime type void responseStreamHelper(const char *file, int line, const mtconnect::sink::rest_sink::QueryMap &aQueries, const char *path, const char *accepts = "text/xml"); + + /// @brief Helper to get a json response from the agent + /// @param file The source file the request is made from + /// @param line The line number + /// @param aQueries The query parameters + /// @param doc The returned document + /// @param path The request path + /// @param accepts The accepted mime type void responseHelper(const char *file, int line, const mtconnect::sink::rest_sink::QueryMap &aQueries, nlohmann::json &doc, const char *path, const char *accepts = "application/json"); + + /// @brief Helper to make a PUT request to the agent + /// @param file The source file the request is made from + /// @param line The line number + /// @param body The body of the request + /// @param aQueries The query parameters + /// @param doc The returned document + /// @param path The request path + /// @param accepts The accepted mime type void putResponseHelper(const char *file, int line, const std::string &body, const mtconnect::sink::rest_sink::QueryMap &aQueries, xmlDocPtr *doc, const char *path, const char *accepts = "text/xml"); + + /// @brief Helper to make a POST request to the agent + /// @param file The source file the request is made from + /// @param line The line number + /// @param aQueries The query parameters + /// @param doc The returned document + /// @param path The request path + /// @param accepts The accepted mime type void deleteResponseHelper(const char *file, int line, const mtconnect::sink::rest_sink::QueryMap &aQueries, xmlDocPtr *doc, const char *path, const char *accepts = "text/xml"); + /// @brief Helper to get a chunked response from the agent + /// @param file The source file the request is made from + /// @param line The line number + /// @param doc The returned document void chunkStreamHelper(const char *file, int line, xmlDocPtr *doc); + /// @brief Make a request to the agent + /// @param file The source file the request is made from + /// @param line The line number + /// @param verb The HTTP verb + /// @param body The body for PUT/POST requests + /// @param aQueries The query parameters + /// @param path The request path + /// @param accepts The accepted mime type void makeRequest(const char *file, int line, boost::beast::http::verb verb, const std::string &body, const mtconnect::sink::rest_sink::QueryMap &aQueries, const char *path, const char *accepts); + /// @brief Make a request using a json command to parse and dispatch + /// @param file The source file the request is made from + /// @param line The line number + /// @param json the request + /// @param doc the returned document + /// @param id the request id + void makeWebSocketRequest(const char *file, int line, const std::string &json, xmlDocPtr *doc, + std::string &id); + + /// @brief Make a request using a json command to parse and dispatch + /// @param file The source file the request is made from + /// @param line The line number + /// @param json the request + /// @param doc the returned document + /// @param id the request id + void makeWebSocketRequest(const char *file, int line, const std::string &json, nlohmann::json &doc, + std::string &id); + auto getAgent() { return m_agent.get(); } std::shared_ptr getRestService() { @@ -269,6 +413,14 @@ class AgentTestHelper m_session = std::make_shared( [](mhttp::SessionPtr, mhttp::RequestPtr) { return true; }, m_server->getErrorFunction()); + + mhttp::RequestPtr request = std::make_shared(); + request->m_verb = boost::beast::http::verb::get; + m_websocketSession = std::make_shared(m_agent->getContext().get().get_executor(), + std::move(request), + [this](mhttp::SessionPtr s, mhttp::RequestPtr r) { + return m_server->dispatch(s, r); }, + m_server->getErrorFunction()); return m_agent.get(); } @@ -339,6 +491,7 @@ class AgentTestHelper boost::asio::ip::tcp::socket m_socket; mtconnect::sink::rest_sink::Response m_response; std::shared_ptr m_session; + std::shared_ptr m_websocketSession; mtconnect::sink::SinkFactory m_sinkFactory; mtconnect::source::SourceFactory m_sourceFactory; @@ -374,7 +527,7 @@ struct XmlDocFreer ASSERT_TRUE(doc); \ XmlDocFreer cleanup(doc) -#define PARSE_XML_STREAM_QUERY(path, queries) \ +#define PARSE_XML_STREAM_QUERY(path, queries) \ m_agentTestHelper->responseStreamHelper(__FILE__, __LINE__, queries, path); #define PARSE_XML_CHUNK() \ @@ -394,6 +547,18 @@ struct XmlDocFreer ASSERT_TRUE(doc); \ XmlDocFreer cleanup(doc) -#define PARSE_JSON_RESPONSE(path) \ - nlohmann::json doc; \ +#define PARSE_JSON_RESPONSE(path) \ + nlohmann::json doc; \ m_agentTestHelper->responseHelper(__FILE__, __LINE__, {}, doc, path) + +#define PARSE_XML_WS_RESPONSE(json) \ + xmlDocPtr doc = nullptr; \ + std::string id; \ + m_agentTestHelper->makeWebSocketRequest(__FILE__, __LINE__, json, &doc, id); \ + ASSERT_TRUE(doc); \ + XmlDocFreer cleanup(doc) + +#define PARSE_JSON_WS_RESPONSE(json) \ + nlohmann::json doc; \ + std::string id; \ + m_agentTestHelper->makeWebSocketRequest(__FILE__, __LINE__, json, doc, id); diff --git a/test_package/websockets_rest_sink_test.cpp b/test_package/websockets_rest_sink_test.cpp new file mode 100644 index 000000000..d7dcac392 --- /dev/null +++ b/test_package/websockets_rest_sink_test.cpp @@ -0,0 +1,203 @@ + +// +// Copyright Copyright 2009-2025, AMT – The Association For Manufacturing Technology (“AMT”) +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Ensure that gtest is the first header otherwise Windows raises an error +#include +// Keep this comment to keep gtest.h above. (clang-format off/on is not working here!) + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "mtconnect/logging.hpp" +#include "mtconnect/sink/rest_sink/server.hpp" +#include "agent_test_helper.hpp" +#include "test_utilities.hpp" + +using namespace std; +using namespace mtconnect; +using namespace mtconnect::sink::rest_sink; + +namespace asio = boost::asio; +namespace beast = boost::beast; +namespace http = boost::beast::http; +using tcp = boost::asio::ip::tcp; +namespace websocket = beast::websocket; + +// main +int main(int argc, char* argv[]) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +class WebsocketsRestSinkTest : public testing::Test +{ +protected: + void SetUp() override + { + m_agentTestHelper = make_unique(); + m_agentTestHelper->createAgent("/samples/dyn_load.xml", 8, 64, "2.6", 25, true); + m_agentId = to_string(getCurrentTimeInSec()); + } + + void TearDown() override { m_agentTestHelper.reset(); } + + void addAdapter(ConfigOptions options = ConfigOptions {}) + { + m_agentTestHelper->addAdapter(options, "localhost", 7878, + m_agentTestHelper->m_agent->getDefaultDevice()->getName()); + } + + template + bool waitFor(const chrono::duration& time, function pred) + { + auto &context = m_agentTestHelper->m_ioContext; + boost::asio::steady_timer timer(context); + timer.expires_after(time); + bool timeout = false; + timer.async_wait([&timeout](boost::system::error_code ec) { + if (!ec) + { + timeout = true; + } + }); + + while (!timeout && !pred()) + { + context.run_for(500ms); + } + timer.cancel(); + + return pred(); + } + + +public: + std::string m_agentId; + std::unique_ptr m_agentTestHelper; + + std::chrono::milliseconds m_delay {}; +}; + +TEST_F(WebsocketsRestSinkTest, should_handle_simple_probe) +{ + { + PARSE_XML_WS_RESPONSE(R"({ "id": "1234", "request": "probe"})"); + ASSERT_EQ("1234", id); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Devices/m:Device@name", "LinuxCNC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Devices/m:Device@uuid", "000"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Devices/m:Device/m:Components/m:Controller@id", "cont"); + } +} + +TEST_F(WebsocketsRestSinkTest, should_handle_json_probe) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_handle_simple_current) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_handle_current_at) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_handle_simple_sample) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_handle_sample_from) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_handle_asset_request) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_handle_asset_with_id_array) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_handle_sample_streaming) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_handle_multiple_streaming_reqeuests) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_handle_asset_put) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_return_error_if_no_id) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_return_error_wrong_parameter_type) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_return_error_for_unknown_command) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_return_error_for_malformed_json) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_return_error_for_unknown_parameter) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_return_error_for_unknown_device) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + +TEST_F(WebsocketsRestSinkTest, should_return_error_for_bad_parameter_value) +{ + GTEST_SKIP() << "Test not implemented yet"; +} From 3d9713aa9d72242fa56017d5a955d2de676f3040 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Mon, 5 Jan 2026 00:02:33 +0100 Subject: [PATCH 04/11] Added json test --- test_package/agent_test_helper.hpp | 18 ++++++++--------- test_package/websockets_rest_sink_test.cpp | 23 ++++++++++++++++++++-- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/test_package/agent_test_helper.hpp b/test_package/agent_test_helper.hpp index ad88c7938..ea98b3a95 100644 --- a/test_package/agent_test_helper.hpp +++ b/test_package/agent_test_helper.hpp @@ -551,14 +551,14 @@ struct XmlDocFreer nlohmann::json doc; \ m_agentTestHelper->responseHelper(__FILE__, __LINE__, {}, doc, path) -#define PARSE_XML_WS_RESPONSE(json) \ - xmlDocPtr doc = nullptr; \ - std::string id; \ - m_agentTestHelper->makeWebSocketRequest(__FILE__, __LINE__, json, &doc, id); \ - ASSERT_TRUE(doc); \ +#define PARSE_XML_WS_RESPONSE(req) \ + xmlDocPtr doc = nullptr; \ + std::string id; \ + m_agentTestHelper->makeWebSocketRequest(__FILE__, __LINE__, req, &doc, id); \ + ASSERT_TRUE(doc); \ XmlDocFreer cleanup(doc) -#define PARSE_JSON_WS_RESPONSE(json) \ - nlohmann::json doc; \ - std::string id; \ - m_agentTestHelper->makeWebSocketRequest(__FILE__, __LINE__, json, doc, id); +#define PARSE_JSON_WS_RESPONSE(req) \ + nlohmann::json jdoc; \ + std::string id; \ + m_agentTestHelper->makeWebSocketRequest(__FILE__, __LINE__, req, jdoc, id) diff --git a/test_package/websockets_rest_sink_test.cpp b/test_package/websockets_rest_sink_test.cpp index d7dcac392..d47e5af10 100644 --- a/test_package/websockets_rest_sink_test.cpp +++ b/test_package/websockets_rest_sink_test.cpp @@ -62,7 +62,10 @@ class WebsocketsRestSinkTest : public testing::Test void SetUp() override { m_agentTestHelper = make_unique(); - m_agentTestHelper->createAgent("/samples/dyn_load.xml", 8, 64, "2.6", 25, true); + m_agentTestHelper->createAgent("/samples/dyn_load.xml", 8, 64, "2.6", 25, true, + true, {{configuration::JsonVersion, 2}, + {configuration::DisableAgentDevice, true} + }); m_agentId = to_string(getCurrentTimeInSec()); } @@ -119,7 +122,13 @@ TEST_F(WebsocketsRestSinkTest, should_handle_simple_probe) TEST_F(WebsocketsRestSinkTest, should_handle_json_probe) { - GTEST_SKIP() << "Test not implemented yet"; + { + PARSE_JSON_WS_RESPONSE(R"({ "id": "1234", "request": "probe", "format": "json"})"); + ASSERT_EQ("1234", id); + + ASSERT_EQ("LinuxCNC", jdoc.at("/MTConnectDevices/Devices/Device/0/name"_json_pointer).get()); + ASSERT_EQ("000", jdoc.at("/MTConnectDevices/Devices/Device/0/uuid"_json_pointer).get()); + } } TEST_F(WebsocketsRestSinkTest, should_handle_simple_current) @@ -162,6 +171,11 @@ TEST_F(WebsocketsRestSinkTest, should_handle_multiple_streaming_reqeuests) GTEST_SKIP() << "Test not implemented yet"; } +TEST_F(WebsocketsRestSinkTest, should_handle_multiple_streaming_reqeuests_with_cancel) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + TEST_F(WebsocketsRestSinkTest, should_handle_asset_put) { GTEST_SKIP() << "Test not implemented yet"; @@ -201,3 +215,8 @@ TEST_F(WebsocketsRestSinkTest, should_return_error_for_bad_parameter_value) { GTEST_SKIP() << "Test not implemented yet"; } + +TEST_F(WebsocketsRestSinkTest, should_coerce_parameter_data_types) +{ + GTEST_SKIP() << "Test not implemented yet"; +} From 501c82430fae3e5cc5a77185e228fd7b7b3a56d8 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Mon, 5 Jan 2026 10:50:33 +0100 Subject: [PATCH 05/11] Added queue to includes --- src/mtconnect/sink/rest_sink/websocket_session.hpp | 10 ++++++++-- test_package/agent_test_helper.hpp | 1 + 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/mtconnect/sink/rest_sink/websocket_session.hpp b/src/mtconnect/sink/rest_sink/websocket_session.hpp index be8b8c407..f5bc02312 100644 --- a/src/mtconnect/sink/rest_sink/websocket_session.hpp +++ b/src/mtconnect/sink/rest_sink/websocket_session.hpp @@ -36,7 +36,9 @@ namespace mtconnect::sink::rest_sink { namespace beast = boost::beast; - /// @brief A websocket session that provides a pubsub interface using REST parameters + /// @brief A websocket session that abstracts out the reading and writing to the stream for testing. + /// This uses the Curiously Recurring Template Pattern (CRTP) to allow the derived class to implement stream methods + /// for performance. template class WebsocketSession : public Session { @@ -261,7 +263,9 @@ namespace mtconnect::sink::rest_sink { bool m_isOpen {false}; }; - /// @brief A websocket session that provides a pubsub interface using REST parameters + /// @brief An intermediary class to implement a websocket stream connect, read, and write semantics. + /// This uses the Curiously Recurring Template Pattern (CRTP) to allow the derived class to implement plain or + /// SSL connections. template class WebsocketSessionImpl : public WebsocketSession { @@ -415,6 +419,7 @@ namespace mtconnect::sink::rest_sink { beast::flat_buffer m_buffer; }; + /// @brief Plain Websocket Session for HTTP connection class PlainWebsocketSession : public WebsocketSessionImpl { public: @@ -452,6 +457,7 @@ namespace mtconnect::sink::rest_sink { Stream m_stream; }; + /// @brief SSL Websocket Session for HTTPS connection class TlsWebsocketSession : public WebsocketSessionImpl { public: diff --git a/test_package/agent_test_helper.hpp b/test_package/agent_test_helper.hpp index ea98b3a95..1714f802f 100644 --- a/test_package/agent_test_helper.hpp +++ b/test_package/agent_test_helper.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include From 8db96c16392d6c04b79442662df1578c29fe52cb Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Mon, 5 Jan 2026 18:54:35 +0100 Subject: [PATCH 06/11] Added some simple tests and fixed some issues with the test framework --- src/mtconnect/configuration/async_context.hpp | 2 +- src/mtconnect/entity/xml_parser.cpp | 1 - src/mtconnect/parser/xml_parser.cpp | 2 - src/mtconnect/pipeline/response_document.cpp | 2 - test_package/agent_test_helper.hpp | 23 ++++- test_package/websockets_rest_sink_test.cpp | 91 ++++++++++++++++++- 6 files changed, 108 insertions(+), 13 deletions(-) diff --git a/src/mtconnect/configuration/async_context.hpp b/src/mtconnect/configuration/async_context.hpp index 6c4694913..29460fb8c 100644 --- a/src/mtconnect/configuration/async_context.hpp +++ b/src/mtconnect/configuration/async_context.hpp @@ -223,7 +223,7 @@ namespace mtconnect::configuration { /// @brief io_context::poll auto poll() { return m_context.poll(); } - /// @brief io_context::poll + /// @brief io_context::get_executor auto get_executor() BOOST_ASIO_NOEXCEPT { return m_context.get_executor(); } /// @} diff --git a/src/mtconnect/entity/xml_parser.cpp b/src/mtconnect/entity/xml_parser.cpp index a6e146bdd..53cf80e45 100644 --- a/src/mtconnect/entity/xml_parser.cpp +++ b/src/mtconnect/entity/xml_parser.cpp @@ -338,7 +338,6 @@ namespace mtconnect::entity { try { xmlInitParser(); - xmlXPathInit(); xmlSetGenericErrorFunc(nullptr, entityXMLErrorFunc); unique_ptr> doc( diff --git a/src/mtconnect/parser/xml_parser.cpp b/src/mtconnect/parser/xml_parser.cpp index 3a03709a4..9734d0ad5 100644 --- a/src/mtconnect/parser/xml_parser.cpp +++ b/src/mtconnect/parser/xml_parser.cpp @@ -114,7 +114,6 @@ namespace mtconnect::parser { try { xmlInitParser(); - xmlXPathInit(); xmlSetGenericErrorFunc(nullptr, agentXMLErrorFunc); THROW_IF_XML2_NULL(m_doc = xmlReadFile(filePath.c_str(), nullptr, XML_PARSE_NOBLANKS)); @@ -302,7 +301,6 @@ namespace mtconnect::parser { try { xmlInitParser(); - xmlXPathInit(); xmlSetGenericErrorFunc(nullptr, agentXMLErrorFunc); THROW_IF_XML2_NULL(m_doc = xmlReadMemory(doc.c_str(), int32_t(doc.length()), "Devices.xml", diff --git a/src/mtconnect/pipeline/response_document.cpp b/src/mtconnect/pipeline/response_document.cpp index d49ee36d8..3612e0ea6 100644 --- a/src/mtconnect/pipeline/response_document.cpp +++ b/src/mtconnect/pipeline/response_document.cpp @@ -497,8 +497,6 @@ namespace mtconnect::pipeline { const std::optional &device, const std::optional &uuid) { - // xmlInitParser(); - // xmlXPathInit(); unique_ptr> doc( xmlReadMemory(content.data(), static_cast(content.length()), "incoming.xml", nullptr, XML_PARSE_NOBLANKS), diff --git a/test_package/agent_test_helper.hpp b/test_package/agent_test_helper.hpp index 1714f802f..99c17dea8 100644 --- a/test_package/agent_test_helper.hpp +++ b/test_package/agent_test_helper.hpp @@ -113,6 +113,8 @@ namespace mtconnect { class TestWebsocketSession : public WebsocketSession { public: + using super = WebsocketSession; + TestWebsocketSession(boost::asio::executor &&exec, RequestPtr &&request, Dispatch dispatch, ErrorFunction func) : WebsocketSession(std::move(request), dispatch, func), m_executor(std::move(exec)) @@ -139,6 +141,13 @@ namespace mtconnect { bool isStreamOpen() { return m_isOpen; } + void sent(beast::error_code ec, std::size_t len, const std::string &id) + { + NAMED_SCOPE("WebsocketSession::sent"); + super::sent(ec, len, id); + m_responses.erase(id); + } + void asyncSend(WebsocketRequestManager::WebsocketRequest *request) { auto buffer = beast::buffers_to_string(request->m_streamBuffer->data()); @@ -149,7 +158,7 @@ namespace mtconnect { boost::asio::post(m_executor, boost::bind(&TestWebsocketSession::sent, shared_ptr(), ec, 0, request->m_requestId)); } - + auto &getExecutor() { return m_executor; } bool dispatch(const std::string &buffer, std::string &id) @@ -157,12 +166,18 @@ namespace mtconnect { return m_requestManager.dispatch(shared_ptr(), buffer, &id); } - bool hasResponse(const std::string &id) const { const auto q = m_responses.find(id); return q != m_responses.end() && !q->second.empty(); } + + bool hasResponseQueue(const std::string &id) const + { + return m_responses.find(id) != m_responses.end(); + } + + std::optional getNextResponse(const std::string &id) { auto q = m_responses.find(id); @@ -417,7 +432,9 @@ class AgentTestHelper mhttp::RequestPtr request = std::make_shared(); request->m_verb = boost::beast::http::verb::get; - m_websocketSession = std::make_shared(m_agent->getContext().get().get_executor(), + + auto ex { m_agent->getContext().get().get_executor() }; + m_websocketSession = std::make_shared(std::move(ex), std::move(request), [this](mhttp::SessionPtr s, mhttp::RequestPtr r) { return m_server->dispatch(s, r); }, diff --git a/test_package/websockets_rest_sink_test.cpp b/test_package/websockets_rest_sink_test.cpp index d47e5af10..4bd0d2b3f 100644 --- a/test_package/websockets_rest_sink_test.cpp +++ b/test_package/websockets_rest_sink_test.cpp @@ -133,22 +133,105 @@ TEST_F(WebsocketsRestSinkTest, should_handle_json_probe) TEST_F(WebsocketsRestSinkTest, should_handle_simple_current) { - GTEST_SKIP() << "Test not implemented yet"; + addAdapter(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); + + { + PARSE_XML_WS_RESPONSE(R"({ "id": "1", "request": "current", "format": "xml"})"); + ASSERT_EQ("1", id); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Availability", "AVAILABLE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "MANUAL"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "UNAVAILABLE"); + } } TEST_F(WebsocketsRestSinkTest, should_handle_current_at) { - GTEST_SKIP() << "Test not implemented yet"; + addAdapter(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); + + auto at = m_agentTestHelper->m_agent->getCircularBuffer().getSequence() - 1; + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + + { + PARSE_XML_WS_RESPONSE(format(R"({{ "id": "1", "request": "current", "format": "xml", "at": {} }})", at)); + ASSERT_EQ("1", id); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Availability", "AVAILABLE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "MANUAL"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "UNAVAILABLE"); + + waitFor(2s, [this, &id]() -> bool { + return !m_agentTestHelper->m_websocketSession->hasResponseQueue(id); + }); + } + + { + PARSE_XML_WS_RESPONSE(R"({ "id": "1", "request": "current", "format": "xml" })"); + ASSERT_EQ("1", id); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Availability", "AVAILABLE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "READY"); + } } TEST_F(WebsocketsRestSinkTest, should_handle_simple_sample) { - GTEST_SKIP() << "Test not implemented yet"; + addAdapter(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + + { + PARSE_XML_WS_RESPONSE(R"({ "id": "1", "request": "sample", "format": "xml" })"); + ASSERT_EQ("1", id); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Availability[1]", "UNAVAILABLE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Availability[2]", "AVAILABLE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[1]", "UNAVAILABLE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[2]", "MANUAL"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[3]", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "UNAVAILABLE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "READY"); + } + + } TEST_F(WebsocketsRestSinkTest, should_handle_sample_from) { - GTEST_SKIP() << "Test not implemented yet"; + addAdapter(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); + + auto at = m_agentTestHelper->m_agent->getCircularBuffer().getSequence(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + + { + PARSE_XML_WS_RESPONSE(format(R"({{ "id": "1", "request": "sample", "format": "xml", "from": {} }})", at)); + ASSERT_EQ("1", id); + + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[1]", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "READY"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "ACTIVE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[3]", "READY"); + } + } TEST_F(WebsocketsRestSinkTest, should_handle_asset_request) From 3bd5f498ddee033571a5ff4ea11c990d79750b51 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Fri, 9 Jan 2026 16:26:10 +0100 Subject: [PATCH 07/11] Checkpoint with tests and asset additions --- src/mtconnect/sink/rest_sink/rest_service.cpp | 44 +-- src/mtconnect/sink/rest_sink/server.hpp | 3 + .../rest_sink/websocket_request_manager.hpp | 39 +-- .../sink/rest_sink/websocket_session.hpp | 151 +++++----- test_package/agent_test_helper.cpp | 28 +- test_package/agent_test_helper.hpp | 175 ++++++++---- test_package/websockets_rest_sink_test.cpp | 270 +++++++++++++----- 7 files changed, 476 insertions(+), 234 deletions(-) diff --git a/src/mtconnect/sink/rest_sink/rest_service.cpp b/src/mtconnect/sink/rest_sink/rest_service.cpp index 62491118e..8828baade 100644 --- a/src/mtconnect/sink/rest_sink/rest_service.cpp +++ b/src/mtconnect/sink/rest_sink/rest_service.cpp @@ -521,23 +521,8 @@ namespace mtconnect { void RestService::createAssetRoutings() { using namespace rest_sink; - auto handler = [&](SessionPtr session, RequestPtr request) -> bool { - auto removed = *request->parameter("removed"); - auto count = *request->parameter("count"); - auto pretty = request->parameter("pretty").value_or(false); - auto format = request->parameter("format"); - auto printer = getPrinter(request->m_accepts, format); - - request->m_request = "MTConnectAssets"; - respond(session, - assetRequest(printer, count, removed, request->parameter("type"), - request->parameter("device"), pretty, request->m_requestId), - request->m_requestId); - return true; - }; - - auto idHandler = [&](SessionPtr session, RequestPtr request) -> bool { + auto idHandler = [this](SessionPtr session, RequestPtr request) -> bool { auto asset = request->parameter("assetIds"); request->m_request = "MTConnectAssets"; @@ -564,6 +549,28 @@ namespace mtconnect { return true; }; + auto handler = [this, idHandler](SessionPtr session, RequestPtr request) -> bool { + auto assets = request->parameter("assetIds"); + if (assets) + { + return idHandler(session, request); + } + + auto removed = *request->parameter("removed"); + auto count = *request->parameter("count"); + auto pretty = request->parameter("pretty").value_or(false); + auto format = request->parameter("format"); + auto printer = getPrinter(request->m_accepts, format); + + request->m_request = "MTConnectAssets"; + + respond(session, + assetRequest(printer, count, removed, request->parameter("type"), + request->parameter("device"), pretty, request->m_requestId), + request->m_requestId); + return true; + }; + string qp( "type={string}&removed={bool:false}&" "count={integer:100}&device={string}&pretty={bool:false}&format={string}"); @@ -584,8 +591,7 @@ namespace mtconnect { m_server->addRouting({boost::beast::http::verb::get, "/asset/{assetIds}", idHandler}) .document("MTConnect asset request", "Returns a set of assets identified by asset ids `asset` separated by " - "semi-colon (;)") - .command("assetsById"); + "semi-colon (;)"); if (m_server->arePutsAllowed()) { @@ -966,7 +972,7 @@ namespace mtconnect { } else { - LOG(debug) << "Sink close when failing sample respone: " << message; + LOG(debug) << "Sink close when failing sample response: " << message; } } diff --git a/src/mtconnect/sink/rest_sink/server.hpp b/src/mtconnect/sink/rest_sink/server.hpp index 924adedd4..2fdd8010e 100644 --- a/src/mtconnect/sink/rest_sink/server.hpp +++ b/src/mtconnect/sink/rest_sink/server.hpp @@ -90,6 +90,9 @@ namespace mtconnect::sink::rest_sink { /// @brief Start the http server void start(); + /// @brief Simulate running the server for testing + void simulateRun() { m_run = true; } + /// @brief Shutdown the http server void stop() { diff --git a/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp b/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp index 16896ffda..a23c256f6 100644 --- a/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp +++ b/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp @@ -25,6 +25,7 @@ #include #include #include +#include #include "session.hpp" @@ -59,20 +60,14 @@ namespace mtconnect::sink::rest_sink { m_httpRequest.reset(); m_requests.clear(); } - + /// @brief Set the current request (used for testing). /// @param request the request that is owned by the manager - void setHttpRequest(RequestPtr &&request) - { - m_httpRequest = std::move(request); - } - + void setHttpRequest(RequestPtr &&request) { m_httpRequest = std::move(request); } + /// @brief Get the current HTTP request /// @returns a pointer to the HTTP request - RequestPtr getHttpRequest() const - { - return m_httpRequest; - } + RequestPtr getHttpRequest() const { return m_httpRequest; } /// @brief Finds the request for a given id /// @param id the id to search for @@ -140,8 +135,8 @@ namespace mtconnect::sink::rest_sink { LOG(warning) << err.str(); LOG(warning) << " " << buffer; auto error = Error::make(Error::ErrorCode::INVALID_REQUEST, err.str()); - throw RestError(error, m_httpRequest->m_accepts, rest_sink::status::bad_request, std::nullopt, - "ERROR"); + throw RestError(error, m_httpRequest->m_accepts, rest_sink::status::bad_request, + std::nullopt, "ERROR"); } if (!doc.IsObject()) { @@ -149,8 +144,8 @@ namespace mtconnect::sink::rest_sink { LOG(warning) << " " << buffer; auto error = Error::make(Error::ErrorCode::INVALID_REQUEST, "JSON message does not have a top level object"); - throw RestError(error, m_httpRequest->m_accepts, rest_sink::status::bad_request, std::nullopt, - "ERROR"); + throw RestError(error, m_httpRequest->m_accepts, rest_sink::status::bad_request, + std::nullopt, "ERROR"); } else { @@ -160,7 +155,7 @@ namespace mtconnect::sink::rest_sink { request->m_verb = boost::beast::http::verb::get; request->m_parameters.clear(); - + #ifdef GetObject #define __GOSave__ GetObject #undef GetObject @@ -188,7 +183,16 @@ namespace mtconnect::sink::rest_sink { case rapidjson::kObjectType: break; case rapidjson::kArrayType: + { + const auto &array = it.value.GetArray(); + std::strstream buf; + for (const auto &s : array) + buf << s.GetString() << ";"; + string str = buf.str(); + str.erase(str.length() - 1); // Remove last ; + request->m_parameters.emplace(make_pair(it.name.GetString(), ParameterValue(str))); break; + } case rapidjson::kStringType: request->m_parameters.emplace( make_pair(it.name.GetString(), ParameterValue(string(it.value.GetString())))); @@ -222,8 +226,7 @@ namespace mtconnect::sink::rest_sink { /// @param buffer the JSON request string /// @param outId optional pointer to a string to receive the request id /// @returns `true` if the dispatch was successful. - bool dispatch(SessionPtr session, const std::string &buffer, - std::string *outId = nullptr) + bool dispatch(SessionPtr session, const std::string &buffer, std::string *outId = nullptr) { using namespace std; @@ -294,7 +297,7 @@ namespace mtconnect::sink::rest_sink { } protected: - RequestPtr m_httpRequest; //! A pointer to the original HTTP request + RequestPtr m_httpRequest; //! A pointer to the original HTTP request Dispatch m_dispatch; //! The dispatch function std::map m_requests; //! The map of requests this class manages }; diff --git a/src/mtconnect/sink/rest_sink/websocket_session.hpp b/src/mtconnect/sink/rest_sink/websocket_session.hpp index f5bc02312..5a4118950 100644 --- a/src/mtconnect/sink/rest_sink/websocket_session.hpp +++ b/src/mtconnect/sink/rest_sink/websocket_session.hpp @@ -36,10 +36,10 @@ namespace mtconnect::sink::rest_sink { namespace beast = boost::beast; - /// @brief A websocket session that abstracts out the reading and writing to the stream for testing. - /// This uses the Curiously Recurring Template Pattern (CRTP) to allow the derived class to implement stream methods - /// for performance. - template + /// @brief A websocket session that abstracts out the reading and writing to the stream for + /// testing. This uses the Curiously Recurring Template Pattern (CRTP) to allow the derived class + /// to implement stream methods for performance. + template class WebsocketSession : public Session { protected: @@ -55,18 +55,16 @@ namespace mtconnect::sink::rest_sink { }; public: - WebsocketSession(RequestPtr &&request, Dispatch dispatch, - ErrorFunction func) - : Session(dispatch, func), - m_requestManager(std::move(request), dispatch) + WebsocketSession(RequestPtr &&request, Dispatch dispatch, ErrorFunction func) + : Session(dispatch, func), m_requestManager(std::move(request), dispatch) {} /// @brief Session cannot be copied. WebsocketSession(const WebsocketSession &) = delete; ~WebsocketSession() = default; - + Derived &derived() { return static_cast(*this); } - + auto &getRequestManager() { return m_requestManager; } void close() override @@ -177,20 +175,20 @@ namespace mtconnect::sink::rest_sink { void send(const std::string body, Complete complete, const std::string &requestId) { NAMED_SCOPE("WebsocketSession::send"); - + using namespace std::placeholders; - + auto req = m_requestManager.findRequest(requestId); if (req != nullptr) { req->m_complete = std::move(complete); req->m_streamBuffer.emplace(); std::ostream str(&req->m_streamBuffer.value()); - + str << body; - + LOG(debug) << "writing chunk for ws: " << requestId; - + m_busy = true; derived().asyncSend(req); } @@ -199,22 +197,22 @@ namespace mtconnect::sink::rest_sink { LOG(error) << "Cannot find request for id: " << requestId; } } - + void sent(beast::error_code ec, std::size_t len, const std::string &id) { NAMED_SCOPE("WebsocketSession::sent"); - + if (ec) { return fail(status::bad_request, "Missing request Id", ec); } - + { LOG(trace) << "Waiting for mutex"; std::lock_guard lock(m_mutex); - + LOG(trace) << "sent chunk for ws: " << id; - + auto req = m_requestManager.findRequest(id); if (req != nullptr) { @@ -223,12 +221,12 @@ namespace mtconnect::sink::rest_sink { boost::asio::post(derived().getExecutor(), req->m_complete); req->m_complete = nullptr; } - + if (!req->m_streaming) { m_requestManager.remove(id); } - + if (m_messageQueue.size() == 0) { m_busy = false; @@ -239,11 +237,11 @@ namespace mtconnect::sink::rest_sink { LOG(error) << "WebsocketSession::sent: Cannot find request for id: " << id; } } - + { LOG(trace) << "Waiting for mutex to send next"; std::lock_guard lock(m_mutex); - + // Check for queued messages if (m_messageQueue.size() > 0) { @@ -254,7 +252,6 @@ namespace mtconnect::sink::rest_sink { } } - protected: WebsocketRequestManager m_requestManager; std::mutex m_mutex; @@ -262,62 +259,59 @@ namespace mtconnect::sink::rest_sink { std::deque m_messageQueue; bool m_isOpen {false}; }; - - /// @brief An intermediary class to implement a websocket stream connect, read, and write semantics. - /// This uses the Curiously Recurring Template Pattern (CRTP) to allow the derived class to implement plain or - /// SSL connections. - template + + /// @brief An intermediary class to implement a websocket stream connect, read, and write + /// semantics. This uses the Curiously Recurring Template Pattern (CRTP) to allow the derived + /// class to implement plain or SSL connections. + template class WebsocketSessionImpl : public WebsocketSession { public: using RequestMessage = boost::beast::http::request; using super = WebsocketSession; - + WebsocketSessionImpl(RequestPtr &&request, RequestMessage &&msg, Dispatch dispatch, - ErrorFunction func) - : super(std::move(request), dispatch, func), - m_msg(std::move(msg)) + ErrorFunction func) + : super(std::move(request), dispatch, func), m_msg(std::move(msg)) {} - + /// @brief Session cannot be copied. WebsocketSessionImpl(const WebsocketSessionImpl &) = delete; ~WebsocketSessionImpl() = default; - + /// @brief get this as the `Derived` type /// @return the subclass Derived &derived() { return static_cast(*this); } - - bool isStreamOpen() - { - return derived().stream().is_open(); - } - + + bool isStreamOpen() { return derived().stream().is_open(); } + auto getExecutor() { return derived().stream().get_executor(); } - + void run() override { using namespace boost::beast; - + // Set suggested timeout settings for the websocket derived().stream().set_option( - websocket::stream_base::timeout::suggested(beast::role_type::server)); - + websocket::stream_base::timeout::suggested(beast::role_type::server)); + // Set a decorator to change the Server of the handshake derived().stream().set_option( - websocket::stream_base::decorator([](websocket::response_type &res) { - res.set(http::field::server, GetAgentVersion() + " MTConnectAgent"); - })); - + websocket::stream_base::decorator([](websocket::response_type &res) { + res.set(http::field::server, GetAgentVersion() + " MTConnectAgent"); + })); + // Accept the websocket handshake derived().stream().async_accept( - m_msg, boost::asio::bind_executor(derived().getExecutor(), - beast::bind_front_handler(&WebsocketSessionImpl::onAccept, - derived().shared_ptr()))); + m_msg, + boost::asio::bind_executor( + derived().getExecutor(), + beast::bind_front_handler(&WebsocketSessionImpl::onAccept, derived().shared_ptr()))); } - + protected: friend class WebsocketSession; - + void onAccept(boost::beast::error_code ec) { if (ec) @@ -325,13 +319,14 @@ namespace mtconnect::sink::rest_sink { super::fail(status::internal_server_error, "Error occurred in accpet", ec); return; } - + super::m_isOpen = true; - + derived().stream().async_read( - m_buffer, beast::bind_front_handler(&WebsocketSessionImpl::onRead, derived().shared_ptr())); + m_buffer, + beast::bind_front_handler(&WebsocketSessionImpl::onRead, derived().shared_ptr())); } - + void asyncSend(WebsocketRequestManager::WebsocketRequest *request) { NAMED_SCOPE("WebsocketSessionImpl::asyncSend"); @@ -342,35 +337,34 @@ namespace mtconnect::sink::rest_sink { auto &requestId = request->m_requestId; derived().stream().text(derived().stream().got_text()); - derived().stream().async_write(request->m_streamBuffer->data(), - beast::bind_handler( - [ref, requestId](beast::error_code ec, std::size_t len) { - ref->sent(ec, len, requestId); - }, - _1, _2)); + derived().stream().async_write( + request->m_streamBuffer->data(), + beast::bind_handler([ref, requestId](beast::error_code ec, + std::size_t len) { ref->sent(ec, len, requestId); }, + _1, _2)); } - + void onRead(beast::error_code ec, std::size_t len) { NAMED_SCOPE("PlainWebsocketSession::onRead"); - + if (ec) return super::fail(boost::beast::http::status::internal_server_error, "shutdown", ec); - + if (len == 0) { LOG(debug) << "Empty message received"; return; } - + // Parse the buffer as a JSON request with parameters matching // REST API derived().stream().text(derived().stream().got_text()); auto buffer = beast::buffers_to_string(m_buffer.data()); m_buffer.consume(m_buffer.size()); - + LOG(debug) << "Received :" << buffer; - + try { if (!super::m_requestManager.dispatch(derived().shared_ptr(), buffer)) @@ -380,7 +374,7 @@ namespace mtconnect::sink::rest_sink { LOG(error) << txt.str(); } } - + catch (RestError &re) { auto id = re.getRequestId(); @@ -389,11 +383,11 @@ namespace mtconnect::sink::rest_sink { id = "ERROR"; re.setRequestId(*id); } - + super::m_requestManager.findOrCreateRequest(*id); super::m_errorFunction(derived().shared_ptr(), re); } - + catch (std::logic_error &le) { std::stringstream txt; @@ -401,7 +395,7 @@ namespace mtconnect::sink::rest_sink { LOG(error) << txt.str(); super::fail(boost::beast::http::status::not_found, txt.str()); } - + catch (...) { std::stringstream txt; @@ -409,11 +403,12 @@ namespace mtconnect::sink::rest_sink { LOG(error) << txt.str(); super::fail(boost::beast::http::status::not_found, txt.str()); } - + derived().stream().async_read( - m_buffer, beast::bind_front_handler(&WebsocketSessionImpl::onRead, derived().shared_ptr())); + m_buffer, + beast::bind_front_handler(&WebsocketSessionImpl::onRead, derived().shared_ptr())); } - + protected: RequestMessage m_msg; beast::flat_buffer m_buffer; diff --git a/test_package/agent_test_helper.cpp b/test_package/agent_test_helper.cpp index be5b3047b..54c35cf3e 100644 --- a/test_package/agent_test_helper.cpp +++ b/test_package/agent_test_helper.cpp @@ -99,10 +99,29 @@ void AgentTestHelper::responseHelper(const char *file, int line, const QueryMap doc = nlohmann::json::parse(m_session->m_body); } -void AgentTestHelper::makeWebSocketRequest(const char *file, int line, const std::string &json, xmlDocPtr *doc, - std::string &id) +void AgentTestHelper::makeWebSocketRequest(const char *file, int line, const std::string &json, + xmlDocPtr *doc, std::string &id) { m_dispatched = m_websocketSession->dispatch(json, id); + parseResponse(file, line, doc, id); +} + +void AgentTestHelper::makeWebSocketRequest(const char *file, int line, const std::string &json, + nlohmann::json &doc, std::string &id) +{ + m_dispatched = m_websocketSession->dispatch(json, id); + parseResponse(file, line, doc, id); +} + +void AgentTestHelper::makeAsyncWebSocketRequest(const char *file, int line, const std::string &json, + std::string &id) +{ + m_dispatched = m_websocketSession->dispatch(json, id); +} + +void AgentTestHelper::parseResponse(const char *file, int line, xmlDocPtr *doc, + const std::string &id) +{ auto response = m_websocketSession->getNextResponse(id); ASSERT_TRUE(response) << "No response for id " << id; if (response) @@ -111,10 +130,9 @@ void AgentTestHelper::makeWebSocketRequest(const char *file, int line, const std } } -void AgentTestHelper::makeWebSocketRequest(const char *file, int line, const std::string &json, nlohmann::json &doc, - std::string &id) +void AgentTestHelper::parseResponse(const char *file, int line, nlohmann::json &doc, + const std::string &id) { - m_dispatched = m_websocketSession->dispatch(json, id); auto response = m_websocketSession->getNextResponse(id); ASSERT_TRUE(response) << "No response for id " << id; if (response) diff --git a/test_package/agent_test_helper.hpp b/test_package/agent_test_helper.hpp index 99c17dea8..431d4ca6e 100644 --- a/test_package/agent_test_helper.hpp +++ b/test_package/agent_test_helper.hpp @@ -20,8 +20,8 @@ #include #include #include -#include #include +#include #include @@ -109,15 +109,15 @@ namespace mtconnect { std::string m_chunkMimeType; bool m_streaming {false}; }; - + class TestWebsocketSession : public WebsocketSession { public: using super = WebsocketSession; - + TestWebsocketSession(boost::asio::executor &&exec, RequestPtr &&request, Dispatch dispatch, ErrorFunction func) - : WebsocketSession(std::move(request), dispatch, func), m_executor(std::move(exec)) + : WebsocketSession(std::move(request), dispatch, func), m_executor(std::move(exec)) { m_isOpen = true; } @@ -128,7 +128,7 @@ namespace mtconnect { } void run() override {} - + void read(const std::string &json) { if (!m_requestManager.dispatch(shared_ptr(), json)) @@ -136,48 +136,42 @@ namespace mtconnect { LOG(error) << "Dispatch failed for: " << json; } } - + void closeStream() override {} - + bool isStreamOpen() { return m_isOpen; } - + void sent(beast::error_code ec, std::size_t len, const std::string &id) { NAMED_SCOPE("WebsocketSession::sent"); super::sent(ec, len, id); - m_responses.erase(id); + m_responsesSent[id]++; } - + void asyncSend(WebsocketRequestManager::WebsocketRequest *request) { auto buffer = beast::buffers_to_string(request->m_streamBuffer->data()); m_responses[request->m_requestId].emplace(buffer); - + beast::error_code ec; - boost::asio::post(m_executor, boost::bind(&TestWebsocketSession::sent, shared_ptr(), ec, 0, - request->m_requestId)); + boost::asio::post(m_executor, boost::bind(&TestWebsocketSession::sent, shared_ptr(), ec, + 0, request->m_requestId)); } - + auto &getExecutor() { return m_executor; } - + bool dispatch(const std::string &buffer, std::string &id) { return m_requestManager.dispatch(shared_ptr(), buffer, &id); } - + bool hasResponse(const std::string &id) const { const auto q = m_responses.find(id); return q != m_responses.end() && !q->second.empty(); } - bool hasResponseQueue(const std::string &id) const - { - return m_responses.find(id) != m_responses.end(); - } - - std::optional getNextResponse(const std::string &id) { auto q = m_responses.find(id); @@ -185,6 +179,7 @@ namespace mtconnect { { auto response = q->second.front(); q->second.pop(); + m_lastResponses[id] = response; return response; } else @@ -194,6 +189,8 @@ namespace mtconnect { } std::map> m_responses; + std::map m_responsesSent; + std::map m_lastResponses; protected: boost::asio::executor m_executor; @@ -240,7 +237,7 @@ class AgentTestHelper void responseHelper(const char *file, int line, const mtconnect::sink::rest_sink::QueryMap &aQueries, xmlDocPtr *doc, const char *path, const char *accepts = "text/xml"); - + /// @brief Helper to get a streaming response from the agent /// @param file The source file the request is made from /// @param line The line number @@ -250,7 +247,7 @@ class AgentTestHelper void responseStreamHelper(const char *file, int line, const mtconnect::sink::rest_sink::QueryMap &aQueries, const char *path, const char *accepts = "text/xml"); - + /// @brief Helper to get a json response from the agent /// @param file The source file the request is made from /// @param line The line number @@ -261,7 +258,7 @@ class AgentTestHelper void responseHelper(const char *file, int line, const mtconnect::sink::rest_sink::QueryMap &aQueries, nlohmann::json &doc, const char *path, const char *accepts = "application/json"); - + /// @brief Helper to make a PUT request to the agent /// @param file The source file the request is made from /// @param line The line number @@ -273,7 +270,7 @@ class AgentTestHelper void putResponseHelper(const char *file, int line, const std::string &body, const mtconnect::sink::rest_sink::QueryMap &aQueries, xmlDocPtr *doc, const char *path, const char *accepts = "text/xml"); - + /// @brief Helper to make a POST request to the agent /// @param file The source file the request is made from /// @param line The line number @@ -318,9 +315,31 @@ class AgentTestHelper /// @param json the request /// @param doc the returned document /// @param id the request id - void makeWebSocketRequest(const char *file, int line, const std::string &json, nlohmann::json &doc, - std::string &id); - + void makeWebSocketRequest(const char *file, int line, const std::string &json, + nlohmann::json &doc, std::string &id); + + /// @brief Make a request and don't wait for a response + /// @param file The source file the request is made from + /// @param line The line number + /// @param json the request + /// @param id the request id + void makeAsyncWebSocketRequest(const char *file, int line, const std::string &json, + std::string &id); + + /// @brief Parse an async respone + /// @param file The source file the request is made from + /// @param line The line number + /// @param doc the returned document + /// @param id the request id + void parseResponse(const char *file, int line, nlohmann::json &doc, const std::string &id); + + /// @brief Parse an async respone + /// @param file The source file the request is made from + /// @param line The line number + /// @param doc the returned document + /// @param id the request id + void parseResponse(const char *file, int line, xmlDocPtr *doc, const std::string &id); + auto getAgent() { return m_agent.get(); } std::shared_ptr getRestService() { @@ -429,16 +448,18 @@ class AgentTestHelper m_session = std::make_shared( [](mhttp::SessionPtr, mhttp::RequestPtr) { return true; }, m_server->getErrorFunction()); - + mhttp::RequestPtr request = std::make_shared(); request->m_verb = boost::beast::http::verb::get; - - auto ex { m_agent->getContext().get().get_executor() }; - m_websocketSession = std::make_shared(std::move(ex), - std::move(request), - [this](mhttp::SessionPtr s, mhttp::RequestPtr r) { - return m_server->dispatch(s, r); }, - m_server->getErrorFunction()); + + auto ex {m_agent->getContext().get().get_executor()}; + m_websocketSession = std::make_shared( + std::move(ex), std::move(request), + [this](mhttp::SessionPtr s, mhttp::RequestPtr r) { return m_server->dispatch(s, r); }, + m_server->getErrorFunction()); + + m_server->simulateRun(); + return m_agent.get(); } @@ -476,6 +497,41 @@ class AgentTestHelper return 0; } + template + bool waitFor(const chrono::duration &time, function pred) + { + std::decay_t run = time / 2; + if (run > 500ms) + run = 500ms; + + boost::asio::steady_timer timer(m_ioContext); + timer.expires_after(time); + bool timeout = false; + timer.async_wait([&timeout](boost::system::error_code ec) { + if (!ec) + { + timeout = true; + } + }); + + while (!timeout && !pred()) + { + m_ioContext.run_for(run); + } + timer.cancel(); + + return pred(); + } + + template + bool waitForResponseSent(const chrono::duration &time, const std::string &id) + { + uint32_t initial = m_websocketSession->m_responsesSent[id]; + return waitFor(time, [this, initial, id]() -> bool { + return m_websocketSession->m_responsesSent[id] > initial; + }); + } + void printResponse() { std::cout << "Status " << m_session->m_code << " " << std::endl @@ -490,6 +546,16 @@ class AgentTestHelper << "------------------------" << std::endl; } + void printLastWSResponse(const std::string &id) + { + auto it = m_websocketSession->m_lastResponses.find(id); + if (it != m_websocketSession->m_lastResponses.end()) + { + std::cout << "WebSocket Response for " << id << ": " << it->second << std::endl + << "------------------------" << std::endl; + } + } + mhttp::Server *m_server {nullptr}; std::shared_ptr m_context; std::shared_ptr m_adapter; @@ -545,7 +611,7 @@ struct XmlDocFreer ASSERT_TRUE(doc); \ XmlDocFreer cleanup(doc) -#define PARSE_XML_STREAM_QUERY(path, queries) \ +#define PARSE_XML_STREAM_QUERY(path, queries) \ m_agentTestHelper->responseStreamHelper(__FILE__, __LINE__, queries, path); #define PARSE_XML_CHUNK() \ @@ -565,18 +631,31 @@ struct XmlDocFreer ASSERT_TRUE(doc); \ XmlDocFreer cleanup(doc) -#define PARSE_JSON_RESPONSE(path) \ - nlohmann::json doc; \ +#define PARSE_JSON_RESPONSE(path) \ + nlohmann::json doc; \ m_agentTestHelper->responseHelper(__FILE__, __LINE__, {}, doc, path) -#define PARSE_XML_WS_RESPONSE(req) \ - xmlDocPtr doc = nullptr; \ - std::string id; \ - m_agentTestHelper->makeWebSocketRequest(__FILE__, __LINE__, req, &doc, id); \ - ASSERT_TRUE(doc); \ +#define PARSE_XML_WS_RESPONSE(req) \ + xmlDocPtr doc = nullptr; \ + std::string id; \ + m_agentTestHelper->makeWebSocketRequest(__FILE__, __LINE__, req, &doc, id); \ + ASSERT_TRUE(doc); \ XmlDocFreer cleanup(doc) -#define PARSE_JSON_WS_RESPONSE(req) \ - nlohmann::json jdoc; \ - std::string id; \ +#define PARSE_JSON_WS_RESPONSE(req) \ + nlohmann::json jdoc; \ + std::string id; \ m_agentTestHelper->makeWebSocketRequest(__FILE__, __LINE__, req, jdoc, id) + +#define BEGIN_ASYNC_WS_REQUEST(req) \ + std::string id; \ + m_agentTestHelper->makeAsyncWebSocketRequest(__FILE__, __LINE__, req, id) + +#define PARSE_NEXT_XML_RESPONSE(_id) \ + xmlDocPtr doc = nullptr; \ + m_agentTestHelper->parseResponse(__FILE__, __LINE__, &doc, _id); \ + XmlDocFreer cleanup(doc) + +#define PARSE_NEXT_JSON_RESPONSE(_id) \ + nlohmann::json jdoc; \ + m_agentTestHelper->parseResponse(__FILE__, __LINE__, kdoc, _id) diff --git a/test_package/websockets_rest_sink_test.cpp b/test_package/websockets_rest_sink_test.cpp index 4bd0d2b3f..0dad978cd 100644 --- a/test_package/websockets_rest_sink_test.cpp +++ b/test_package/websockets_rest_sink_test.cpp @@ -34,9 +34,9 @@ #include #include +#include "agent_test_helper.hpp" #include "mtconnect/logging.hpp" #include "mtconnect/sink/rest_sink/server.hpp" -#include "agent_test_helper.hpp" #include "test_utilities.hpp" using namespace std; @@ -62,49 +62,24 @@ class WebsocketsRestSinkTest : public testing::Test void SetUp() override { m_agentTestHelper = make_unique(); - m_agentTestHelper->createAgent("/samples/dyn_load.xml", 8, 64, "2.6", 25, true, - true, {{configuration::JsonVersion, 2}, - {configuration::DisableAgentDevice, true} - }); + m_agentTestHelper->createAgent( + "/samples/dyn_load.xml", 8, 64, "2.6", 25, true, true, + {{configuration::JsonVersion, 2}, {configuration::DisableAgentDevice, true}}); m_agentId = to_string(getCurrentTimeInSec()); } - + void TearDown() override { m_agentTestHelper.reset(); } - + void addAdapter(ConfigOptions options = ConfigOptions {}) { m_agentTestHelper->addAdapter(options, "localhost", 7878, m_agentTestHelper->m_agent->getDefaultDevice()->getName()); } - - template - bool waitFor(const chrono::duration& time, function pred) - { - auto &context = m_agentTestHelper->m_ioContext; - boost::asio::steady_timer timer(context); - timer.expires_after(time); - bool timeout = false; - timer.async_wait([&timeout](boost::system::error_code ec) { - if (!ec) - { - timeout = true; - } - }); - - while (!timeout && !pred()) - { - context.run_for(500ms); - } - timer.cancel(); - - return pred(); - } - public: std::string m_agentId; std::unique_ptr m_agentTestHelper; - + std::chrono::milliseconds m_delay {}; }; @@ -113,7 +88,7 @@ TEST_F(WebsocketsRestSinkTest, should_handle_simple_probe) { PARSE_XML_WS_RESPONSE(R"({ "id": "1234", "request": "probe"})"); ASSERT_EQ("1234", id); - + ASSERT_XML_PATH_EQUAL(doc, "//m:Devices/m:Device@name", "LinuxCNC"); ASSERT_XML_PATH_EQUAL(doc, "//m:Devices/m:Device@uuid", "000"); ASSERT_XML_PATH_EQUAL(doc, "//m:Devices/m:Device/m:Components/m:Controller@id", "cont"); @@ -126,22 +101,23 @@ TEST_F(WebsocketsRestSinkTest, should_handle_json_probe) PARSE_JSON_WS_RESPONSE(R"({ "id": "1234", "request": "probe", "format": "json"})"); ASSERT_EQ("1234", id); - ASSERT_EQ("LinuxCNC", jdoc.at("/MTConnectDevices/Devices/Device/0/name"_json_pointer).get()); - ASSERT_EQ("000", jdoc.at("/MTConnectDevices/Devices/Device/0/uuid"_json_pointer).get()); + ASSERT_EQ("LinuxCNC", + jdoc.at("/MTConnectDevices/Devices/Device/0/name"_json_pointer).get()); + ASSERT_EQ("000", jdoc.at("/MTConnectDevices/Devices/Device/0/uuid"_json_pointer).get()); } } TEST_F(WebsocketsRestSinkTest, should_handle_simple_current) { addAdapter(); - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); - + { PARSE_XML_WS_RESPONSE(R"({ "id": "1", "request": "current", "format": "xml"})"); ASSERT_EQ("1", id); - + ASSERT_XML_PATH_EQUAL(doc, "//m:Availability", "AVAILABLE"); ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "MANUAL"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "UNAVAILABLE"); @@ -151,32 +127,31 @@ TEST_F(WebsocketsRestSinkTest, should_handle_simple_current) TEST_F(WebsocketsRestSinkTest, should_handle_current_at) { addAdapter(); - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); - + auto at = m_agentTestHelper->m_agent->getCircularBuffer().getSequence() - 1; - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); - + { - PARSE_XML_WS_RESPONSE(format(R"({{ "id": "1", "request": "current", "format": "xml", "at": {} }})", at)); + PARSE_XML_WS_RESPONSE( + format(R"({{ "id": "1", "request": "current", "format": "xml", "at": {} }})", at)); ASSERT_EQ("1", id); - + ASSERT_XML_PATH_EQUAL(doc, "//m:Availability", "AVAILABLE"); ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "MANUAL"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "UNAVAILABLE"); - - waitFor(2s, [this, &id]() -> bool { - return !m_agentTestHelper->m_websocketSession->hasResponseQueue(id); - }); + + m_agentTestHelper->waitForResponseSent(10ms, id); } - + { PARSE_XML_WS_RESPONSE(R"({ "id": "1", "request": "current", "format": "xml" })"); ASSERT_EQ("1", id); - + ASSERT_XML_PATH_EQUAL(doc, "//m:Availability", "AVAILABLE"); ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "READY"); @@ -186,16 +161,16 @@ TEST_F(WebsocketsRestSinkTest, should_handle_current_at) TEST_F(WebsocketsRestSinkTest, should_handle_simple_sample) { addAdapter(); - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); - + { PARSE_XML_WS_RESPONSE(R"({ "id": "1", "request": "sample", "format": "xml" })"); ASSERT_EQ("1", id); - + ASSERT_XML_PATH_EQUAL(doc, "//m:Availability[1]", "UNAVAILABLE"); ASSERT_XML_PATH_EQUAL(doc, "//m:Availability[2]", "AVAILABLE"); ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[1]", "UNAVAILABLE"); @@ -204,54 +179,212 @@ TEST_F(WebsocketsRestSinkTest, should_handle_simple_sample) ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "UNAVAILABLE"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "READY"); } - - } TEST_F(WebsocketsRestSinkTest, should_handle_sample_from) { addAdapter(); - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); - + auto at = m_agentTestHelper->m_agent->getCircularBuffer().getSequence(); - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); - + { - PARSE_XML_WS_RESPONSE(format(R"({{ "id": "1", "request": "sample", "format": "xml", "from": {} }})", at)); + PARSE_XML_WS_RESPONSE( + format(R"({{ "id": "1", "request": "sample", "format": "xml", "from": {} }})", at)); ASSERT_EQ("1", id); - + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[1]", "AUTOMATIC"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "READY"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "ACTIVE"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[3]", "READY"); } - } TEST_F(WebsocketsRestSinkTest, should_handle_asset_request) { - GTEST_SKIP() << "Test not implemented yet"; + addAdapter(); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|@ASSET@|P1|FakeAsset|TEST 1"); + + { + PARSE_XML_WS_RESPONSE(R"({ "id": "1", "request": "asset", "format": "xml"})"); + ASSERT_EQ("1", id); + + ASSERT_XML_PATH_EQUAL(doc, "//m:FakeAsset@assetId", "P1"); + ASSERT_XML_PATH_EQUAL(doc, "//m:FakeAsset", "TEST 1"); + } } TEST_F(WebsocketsRestSinkTest, should_handle_asset_with_id_array) { - GTEST_SKIP() << "Test not implemented yet"; + addAdapter(); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|@ASSET@|P1|FakeAsset|TEST 1"); + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|@ASSET@|P2|FakeAsset|TEST 2"); + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|@ASSET@|P3|FakeAsset|TEST 3"); + + { + PARSE_XML_WS_RESPONSE( + R"({ "id": "1", "request": "asset", "assetIds": ["P1", "P2"], "format": "xml"})"); + ASSERT_EQ("1", id); + + ASSERT_XML_PATH_COUNT(doc, "//m:FakeAsset", 2); + + ASSERT_XML_PATH_EQUAL(doc, "//m:FakeAsset[1]@assetId", "P1"); + ASSERT_XML_PATH_EQUAL(doc, "//m:FakeAsset[1]", "TEST 1"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:FakeAsset[2]@assetId", "P2"); + ASSERT_XML_PATH_EQUAL(doc, "//m:FakeAsset[2]", "TEST 2"); + } } TEST_F(WebsocketsRestSinkTest, should_handle_sample_streaming) { - GTEST_SKIP() << "Test not implemented yet"; + addAdapter(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); + + auto at = m_agentTestHelper->m_agent->getCircularBuffer().getSequence(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + + { + BEGIN_ASYNC_WS_REQUEST(format( + R"({{ "id": "1", "request": "sample", "format": "xml", "interval": 10, "from": {} }})", + at)); + ASSERT_EQ("1", id); + + m_agentTestHelper->waitForResponseSent(15ms, id); + + PARSE_NEXT_XML_RESPONSE(id); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "1"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[1]", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "READY"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "ACTIVE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[3]", "READY"); + } + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + + m_agentTestHelper->waitForResponseSent(15ms, "1"); + + { + PARSE_NEXT_XML_RESPONSE("1"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "1"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "ACTIVE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "READY"); + } } TEST_F(WebsocketsRestSinkTest, should_handle_multiple_streaming_reqeuests) { - GTEST_SKIP() << "Test not implemented yet"; + addAdapter(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); + + auto at = m_agentTestHelper->m_agent->getCircularBuffer().getSequence(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + + { + BEGIN_ASYNC_WS_REQUEST(format( + R"({{ "id": "1", "request": "sample", "format": "xml", "interval": 10, "from": {} }})", + at)); + ASSERT_EQ("1", id); + } + + { + BEGIN_ASYNC_WS_REQUEST( + R"({ "id": "2", "request": "current", "format": "xml", "interval": 100})"); + ASSERT_EQ("2", id); + } + + m_agentTestHelper->waitForResponseSent(15ms, "1"); + + { + PARSE_NEXT_XML_RESPONSE("1"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "1"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[1]", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "READY"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "ACTIVE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[3]", "READY"); + } + + { + PARSE_NEXT_XML_RESPONSE("2"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "READY"); + } + + m_agentTestHelper->waitForResponseSent(100ms, "2"); + + { + PARSE_NEXT_XML_RESPONSE("2"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "READY"); + } + + m_agentTestHelper->waitForResponseSent(105ms, "2"); + + { + PARSE_NEXT_XML_RESPONSE("2"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "READY"); + } + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); + + m_agentTestHelper->waitForResponseSent(15ms, "1"); + + { + PARSE_NEXT_XML_RESPONSE("1"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "1"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "ACTIVE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "READY"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[3]", "ACTIVE"); + } + + m_agentTestHelper->waitForResponseSent(100ms, "2"); + + { + PARSE_NEXT_XML_RESPONSE("2"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "ACTIVE"); + } } TEST_F(WebsocketsRestSinkTest, should_handle_multiple_streaming_reqeuests_with_cancel) @@ -269,6 +402,11 @@ TEST_F(WebsocketsRestSinkTest, should_return_error_if_no_id) GTEST_SKIP() << "Test not implemented yet"; } +TEST_F(WebsocketsRestSinkTest, should_return_error_if_duplicate_id) +{ + GTEST_SKIP() << "Test not implemented yet"; +} + TEST_F(WebsocketsRestSinkTest, should_return_error_wrong_parameter_type) { GTEST_SKIP() << "Test not implemented yet"; From 7abd581e409ac8fe7d4c4c68d0928eda81f24cd3 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Fri, 9 Jan 2026 18:01:06 +0100 Subject: [PATCH 08/11] Added more tests --- .../rest_sink/websocket_request_manager.hpp | 43 +++-- test_package/agent_test_helper.hpp | 5 + test_package/websockets_rest_sink_test.cpp | 179 +++++++++++++++++- 3 files changed, 208 insertions(+), 19 deletions(-) diff --git a/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp b/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp index a23c256f6..6c145be74 100644 --- a/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp +++ b/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp @@ -253,17 +253,6 @@ namespace mtconnect::sink::rest_sink { } auto &id = *(request->m_requestId); - if (outId) - *outId = id; - auto res = m_requests.emplace(id, id); - if (!res.second) - { - LOG(error) << "Duplicate request id: " << id; - auto error = InvalidParameterValue::make("id", *request->m_requestId, "string", "string", - "Duplicate id given"); - throw RestError(error, request->m_accepts, rest_sink::status::bad_request, std::nullopt, - "ERROR"); - } if (request->m_parameters.count("request") > 0) { @@ -273,18 +262,38 @@ namespace mtconnect::sink::rest_sink { else { auto error = - InvalidParameterValue::make("request", "", "string", "string", "No request given"); + InvalidParameterValue::make("request", "", "string", "string", "No request given"); throw RestError(error, request->m_accepts, rest_sink::status::bad_request, std::nullopt, id); } - // Check parameters for command - LOG(debug) << "Received request id: " << id; - - res.first->second.m_request = std::move(request); + if (outId) + *outId = id; + if (request->m_command != "cancel") + { + auto res = m_requests.emplace(id, id); + if (!res.second) + { + LOG(error) << "Duplicate request id: " << id; + auto error = InvalidParameterValue::make("id", *request->m_requestId, "string", "string", + "Duplicate id given"); + throw RestError(error, request->m_accepts, rest_sink::status::bad_request, std::nullopt, + "ERROR"); + } + + // Check parameters for command + LOG(debug) << "Received request id: " << id; + + res.first->second.m_request = std::move(request); + request = res.first->second.m_request; + } + else + { + LOG(debug) << "Cancel request id: " << id; + } try { - return m_dispatch(session, res.first->second.m_request); + return m_dispatch(session, request); } catch (RestError &re) diff --git a/test_package/agent_test_helper.hpp b/test_package/agent_test_helper.hpp index 431d4ca6e..e67cf5428 100644 --- a/test_package/agent_test_helper.hpp +++ b/test_package/agent_test_helper.hpp @@ -555,6 +555,11 @@ class AgentTestHelper << "------------------------" << std::endl; } } + + auto getResponseCount(const std::string &id) + { + return m_websocketSession->m_responses[id].size(); + } mhttp::Server *m_server {nullptr}; std::shared_ptr m_context; diff --git a/test_package/websockets_rest_sink_test.cpp b/test_package/websockets_rest_sink_test.cpp index 0dad978cd..ea8be0540 100644 --- a/test_package/websockets_rest_sink_test.cpp +++ b/test_package/websockets_rest_sink_test.cpp @@ -249,6 +249,34 @@ TEST_F(WebsocketsRestSinkTest, should_handle_asset_with_id_array) } } +TEST_F(WebsocketsRestSinkTest, should_handle_asset_with_type) +{ + addAdapter(); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|@ASSET@|P1|FakeAsset|TEST 1"); + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|@ASSET@|P2|OtherAsset|TEST 2"); + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|@ASSET@|P3|FakeAsset|TEST 3"); + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|@ASSET@|P4|FakeAsset|TEST 4"); + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|@ASSET@|P5|OtherAsset|TEST 5"); + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|@ASSET@|P6|FakeAsset|TEST 6"); + { + PARSE_XML_WS_RESPONSE( + R"({ "id": "1", "request": "asset", "type": "OtherAsset", "format": "xml"})"); + ASSERT_EQ("1", id); + + ASSERT_XML_PATH_COUNT(doc, "//m:Assets/*", 2); + + ASSERT_XML_PATH_EQUAL(doc, "//m:OtherAsset[@assetId='P2']", "TEST 2"); + ASSERT_XML_PATH_EQUAL(doc, "//m:OtherAsset[@assetId='P5']", "TEST 5"); + } +} + TEST_F(WebsocketsRestSinkTest, should_handle_sample_streaming) { addAdapter(); @@ -387,11 +415,158 @@ TEST_F(WebsocketsRestSinkTest, should_handle_multiple_streaming_reqeuests) } } -TEST_F(WebsocketsRestSinkTest, should_handle_multiple_streaming_reqeuests_with_cancel) +TEST_F(WebsocketsRestSinkTest, should_handle_cancel_streaming_request) { - GTEST_SKIP() << "Test not implemented yet"; + addAdapter(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); + + auto at = m_agentTestHelper->m_agent->getCircularBuffer().getSequence(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + + { + BEGIN_ASYNC_WS_REQUEST(format( + R"({{ "id": "1", "request": "sample", "format": "xml", "interval": 10, "from": {} }})", + at)); + ASSERT_EQ("1", id); + + m_agentTestHelper->waitForResponseSent(15ms, id); + + PARSE_NEXT_XML_RESPONSE(id); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "1"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[1]", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "READY"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "ACTIVE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[3]", "READY"); + } + + { + BEGIN_ASYNC_WS_REQUEST( R"({ "id": "1", "request": "cancel"})"); + ASSERT_EQ(1, m_agentTestHelper->getResponseCount("1")); + auto resp = m_agentTestHelper->m_websocketSession->getNextResponse("1"); + ASSERT_TRUE(resp); + ASSERT_EQ("{ \"success\": \"true\"}", *resp); + } + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + + m_agentTestHelper->waitForResponseSent(15ms, "1"); + + ASSERT_EQ(0, m_agentTestHelper->getResponseCount("1")); } +TEST_F(WebsocketsRestSinkTest, should_handle_cancel_one_request_with_multiple_streaming_reqeuests) +{ + addAdapter(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); + + auto at = m_agentTestHelper->m_agent->getCircularBuffer().getSequence(); + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + + { + BEGIN_ASYNC_WS_REQUEST(format( + R"({{ "id": "1", "request": "sample", "format": "xml", "interval": 10, "from": {} }})", + at)); + ASSERT_EQ("1", id); + } + + { + BEGIN_ASYNC_WS_REQUEST( + R"({ "id": "2", "request": "current", "format": "xml", "interval": 100})"); + ASSERT_EQ("2", id); + } + + m_agentTestHelper->waitForResponseSent(15ms, "1"); + + { + PARSE_NEXT_XML_RESPONSE("1"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "1"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[1]", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "READY"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "ACTIVE"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[3]", "READY"); + } + + { + PARSE_NEXT_XML_RESPONSE("2"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "READY"); + } + + m_agentTestHelper->waitForResponseSent(100ms, "2"); + + { + PARSE_NEXT_XML_RESPONSE("2"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "READY"); + } + + m_agentTestHelper->waitForResponseSent(105ms, "2"); + + { + PARSE_NEXT_XML_RESPONSE("2"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "READY"); + } + + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); + + { + BEGIN_ASYNC_WS_REQUEST( R"({ "id": "1", "request": "cancel"})"); + ASSERT_EQ(1, m_agentTestHelper->getResponseCount("1")); + auto resp = m_agentTestHelper->m_websocketSession->getNextResponse("1"); + ASSERT_TRUE(resp); + ASSERT_EQ("{ \"success\": \"true\"}", *resp); + } + + m_agentTestHelper->waitForResponseSent(100ms, "2"); + + { + PARSE_NEXT_XML_RESPONSE("2"); + + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); + ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "ACTIVE"); + } + + ASSERT_EQ(0, m_agentTestHelper->getResponseCount("1")); + + { + BEGIN_ASYNC_WS_REQUEST( R"({ "id": "2", "request": "cancel"})"); + ASSERT_EQ(1, m_agentTestHelper->getResponseCount("2")); + auto resp = m_agentTestHelper->m_websocketSession->getNextResponse("2"); + ASSERT_TRUE(resp); + ASSERT_EQ("{ \"success\": \"true\"}", *resp); + } + + m_agentTestHelper->waitForResponseSent(105ms, "2"); + ASSERT_EQ(0, m_agentTestHelper->getResponseCount("2")); + ASSERT_EQ(0, m_agentTestHelper->getResponseCount("1")); +} + + TEST_F(WebsocketsRestSinkTest, should_handle_asset_put) { GTEST_SKIP() << "Test not implemented yet"; From c5b5d25d502ad35c458bcac2884f79773e4a39f8 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Fri, 9 Jan 2026 18:45:47 +0100 Subject: [PATCH 09/11] Added more tests --- .../rest_sink/websocket_request_manager.hpp | 6 +- test_package/agent_test_helper.hpp | 2 +- test_package/websockets_rest_sink_test.cpp | 161 +++++++++++------- 3 files changed, 106 insertions(+), 63 deletions(-) diff --git a/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp b/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp index 6c145be74..ceb9f9c5a 100644 --- a/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp +++ b/src/mtconnect/sink/rest_sink/websocket_request_manager.hpp @@ -262,7 +262,7 @@ namespace mtconnect::sink::rest_sink { else { auto error = - InvalidParameterValue::make("request", "", "string", "string", "No request given"); + InvalidParameterValue::make("request", "", "string", "string", "No request given"); throw RestError(error, request->m_accepts, rest_sink::status::bad_request, std::nullopt, id); } @@ -280,10 +280,10 @@ namespace mtconnect::sink::rest_sink { throw RestError(error, request->m_accepts, rest_sink::status::bad_request, std::nullopt, "ERROR"); } - + // Check parameters for command LOG(debug) << "Received request id: " << id; - + res.first->second.m_request = std::move(request); request = res.first->second.m_request; } diff --git a/test_package/agent_test_helper.hpp b/test_package/agent_test_helper.hpp index e67cf5428..453493245 100644 --- a/test_package/agent_test_helper.hpp +++ b/test_package/agent_test_helper.hpp @@ -555,7 +555,7 @@ class AgentTestHelper << "------------------------" << std::endl; } } - + auto getResponseCount(const std::string &id) { return m_websocketSession->m_responses[id].size(); diff --git a/test_package/websockets_rest_sink_test.cpp b/test_package/websockets_rest_sink_test.cpp index ea8be0540..88ccb98e3 100644 --- a/test_package/websockets_rest_sink_test.cpp +++ b/test_package/websockets_rest_sink_test.cpp @@ -252,26 +252,26 @@ TEST_F(WebsocketsRestSinkTest, should_handle_asset_with_id_array) TEST_F(WebsocketsRestSinkTest, should_handle_asset_with_type) { addAdapter(); - + m_agentTestHelper->m_adapter->processData( - "2021-02-01T12:00:00Z|@ASSET@|P1|FakeAsset|TEST 1"); + "2021-02-01T12:00:00Z|@ASSET@|P1|FakeAsset|TEST 1"); m_agentTestHelper->m_adapter->processData( - "2021-02-01T12:00:00Z|@ASSET@|P2|OtherAsset|TEST 2"); + "2021-02-01T12:00:00Z|@ASSET@|P2|OtherAsset|TEST 2"); m_agentTestHelper->m_adapter->processData( - "2021-02-01T12:00:00Z|@ASSET@|P3|FakeAsset|TEST 3"); + "2021-02-01T12:00:00Z|@ASSET@|P3|FakeAsset|TEST 3"); m_agentTestHelper->m_adapter->processData( - "2021-02-01T12:00:00Z|@ASSET@|P4|FakeAsset|TEST 4"); + "2021-02-01T12:00:00Z|@ASSET@|P4|FakeAsset|TEST 4"); m_agentTestHelper->m_adapter->processData( - "2021-02-01T12:00:00Z|@ASSET@|P5|OtherAsset|TEST 5"); + "2021-02-01T12:00:00Z|@ASSET@|P5|OtherAsset|TEST 5"); m_agentTestHelper->m_adapter->processData( - "2021-02-01T12:00:00Z|@ASSET@|P6|FakeAsset|TEST 6"); + "2021-02-01T12:00:00Z|@ASSET@|P6|FakeAsset|TEST 6"); { PARSE_XML_WS_RESPONSE( - R"({ "id": "1", "request": "asset", "type": "OtherAsset", "format": "xml"})"); + R"({ "id": "1", "request": "asset", "type": "OtherAsset", "format": "xml"})"); ASSERT_EQ("1", id); - + ASSERT_XML_PATH_COUNT(doc, "//m:Assets/*", 2); - + ASSERT_XML_PATH_EQUAL(doc, "//m:OtherAsset[@assetId='P2']", "TEST 2"); ASSERT_XML_PATH_EQUAL(doc, "//m:OtherAsset[@assetId='P5']", "TEST 5"); } @@ -418,155 +418,154 @@ TEST_F(WebsocketsRestSinkTest, should_handle_multiple_streaming_reqeuests) TEST_F(WebsocketsRestSinkTest, should_handle_cancel_streaming_request) { addAdapter(); - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); - + auto at = m_agentTestHelper->m_agent->getCircularBuffer().getSequence(); - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); - + { BEGIN_ASYNC_WS_REQUEST(format( - R"({{ "id": "1", "request": "sample", "format": "xml", "interval": 10, "from": {} }})", - at)); + R"({{ "id": "1", "request": "sample", "format": "xml", "interval": 10, "from": {} }})", + at)); ASSERT_EQ("1", id); - + m_agentTestHelper->waitForResponseSent(15ms, id); - + PARSE_NEXT_XML_RESPONSE(id); - + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "1"); ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[1]", "AUTOMATIC"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "READY"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "ACTIVE"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[3]", "READY"); } - + { - BEGIN_ASYNC_WS_REQUEST( R"({ "id": "1", "request": "cancel"})"); + BEGIN_ASYNC_WS_REQUEST(R"({ "id": "1", "request": "cancel"})"); ASSERT_EQ(1, m_agentTestHelper->getResponseCount("1")); auto resp = m_agentTestHelper->m_websocketSession->getNextResponse("1"); ASSERT_TRUE(resp); ASSERT_EQ("{ \"success\": \"true\"}", *resp); } - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); - + m_agentTestHelper->waitForResponseSent(15ms, "1"); - + ASSERT_EQ(0, m_agentTestHelper->getResponseCount("1")); } TEST_F(WebsocketsRestSinkTest, should_handle_cancel_one_request_with_multiple_streaming_reqeuests) { addAdapter(); - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|avail|AVAILABLE"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|MANUAL"); - + auto at = m_agentTestHelper->m_agent->getCircularBuffer().getSequence(); - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|mode|AUTOMATIC"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); - + { BEGIN_ASYNC_WS_REQUEST(format( - R"({{ "id": "1", "request": "sample", "format": "xml", "interval": 10, "from": {} }})", - at)); + R"({{ "id": "1", "request": "sample", "format": "xml", "interval": 10, "from": {} }})", + at)); ASSERT_EQ("1", id); } - + { BEGIN_ASYNC_WS_REQUEST( - R"({ "id": "2", "request": "current", "format": "xml", "interval": 100})"); + R"({ "id": "2", "request": "current", "format": "xml", "interval": 100})"); ASSERT_EQ("2", id); } - + m_agentTestHelper->waitForResponseSent(15ms, "1"); - + { PARSE_NEXT_XML_RESPONSE("1"); - + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "1"); ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode[1]", "AUTOMATIC"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[1]", "READY"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[2]", "ACTIVE"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution[3]", "READY"); } - + { PARSE_NEXT_XML_RESPONSE("2"); - + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "READY"); } - + m_agentTestHelper->waitForResponseSent(100ms, "2"); - + { PARSE_NEXT_XML_RESPONSE("2"); - + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "READY"); } - + m_agentTestHelper->waitForResponseSent(105ms, "2"); - + { PARSE_NEXT_XML_RESPONSE("2"); - + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "READY"); } - + m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|READY"); m_agentTestHelper->m_adapter->processData("2026-01-01T12:00:00Z|exec|ACTIVE"); - + { - BEGIN_ASYNC_WS_REQUEST( R"({ "id": "1", "request": "cancel"})"); + BEGIN_ASYNC_WS_REQUEST(R"({ "id": "1", "request": "cancel"})"); ASSERT_EQ(1, m_agentTestHelper->getResponseCount("1")); auto resp = m_agentTestHelper->m_websocketSession->getNextResponse("1"); ASSERT_TRUE(resp); ASSERT_EQ("{ \"success\": \"true\"}", *resp); } - + m_agentTestHelper->waitForResponseSent(100ms, "2"); - + { PARSE_NEXT_XML_RESPONSE("2"); - + ASSERT_XML_PATH_EQUAL(doc, "//m:Header@requestId", "2"); ASSERT_XML_PATH_EQUAL(doc, "//m:ControllerMode", "AUTOMATIC"); ASSERT_XML_PATH_EQUAL(doc, "//m:Execution", "ACTIVE"); } - + ASSERT_EQ(0, m_agentTestHelper->getResponseCount("1")); - + { - BEGIN_ASYNC_WS_REQUEST( R"({ "id": "2", "request": "cancel"})"); + BEGIN_ASYNC_WS_REQUEST(R"({ "id": "2", "request": "cancel"})"); ASSERT_EQ(1, m_agentTestHelper->getResponseCount("2")); auto resp = m_agentTestHelper->m_websocketSession->getNextResponse("2"); ASSERT_TRUE(resp); ASSERT_EQ("{ \"success\": \"true\"}", *resp); } - + m_agentTestHelper->waitForResponseSent(105ms, "2"); - ASSERT_EQ(0, m_agentTestHelper->getResponseCount("2")); ASSERT_EQ(0, m_agentTestHelper->getResponseCount("1")); + ASSERT_EQ(0, m_agentTestHelper->getResponseCount("2")); } - TEST_F(WebsocketsRestSinkTest, should_handle_asset_put) { GTEST_SKIP() << "Test not implemented yet"; @@ -574,12 +573,41 @@ TEST_F(WebsocketsRestSinkTest, should_handle_asset_put) TEST_F(WebsocketsRestSinkTest, should_return_error_if_no_id) { - GTEST_SKIP() << "Test not implemented yet"; + try + { + PARSE_XML_WS_RESPONSE(R"({ "request": "probe"})"); + FAIL() << "Expected exception not thrown"; + } + + catch (RestError& e) + { + ASSERT_EQ(status::bad_request, e.getStatus()); + ASSERT_TRUE(e.getRequestId()); + ASSERT_EQ("ERROR", *e.getRequestId()); + } } TEST_F(WebsocketsRestSinkTest, should_return_error_if_duplicate_id) { - GTEST_SKIP() << "Test not implemented yet"; + { + BEGIN_ASYNC_WS_REQUEST( + R"({ "id": "2", "request": "current", "format": "xml", "interval": 100})"); + ASSERT_EQ("2", id); + } + + try + { + BEGIN_ASYNC_WS_REQUEST( + R"({ "id": "2", "request": "current", "format": "xml", "interval": 100})"); + FAIL() << "Expected exception not thrown"; + } + + catch (RestError& e) + { + ASSERT_EQ(status::bad_request, e.getStatus()); + ASSERT_TRUE(e.getRequestId()); + ASSERT_EQ("ERROR", *e.getRequestId()); + } } TEST_F(WebsocketsRestSinkTest, should_return_error_wrong_parameter_type) @@ -589,12 +617,27 @@ TEST_F(WebsocketsRestSinkTest, should_return_error_wrong_parameter_type) TEST_F(WebsocketsRestSinkTest, should_return_error_for_unknown_command) { - GTEST_SKIP() << "Test not implemented yet"; + { + PARSE_XML_WS_RESPONSE(R"({ "id": "1", "request": "unknown"})"); + ASSERT_XML_PATH_EQUAL(doc, "//m:InvalidURI@errorCode", "INVALID_URI"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ErrorMessage", "0.0.0.0: Command failed: unknown"); + } } TEST_F(WebsocketsRestSinkTest, should_return_error_for_malformed_json) { - GTEST_SKIP() << "Test not implemented yet"; + try + { + PARSE_XML_WS_RESPONSE(R"({ "id": "1", "request": probe"})"); + FAIL() << "Expected exception not thrown"; + } + + catch (RestError& e) + { + ASSERT_EQ(status::bad_request, e.getStatus()); + ASSERT_TRUE(e.getRequestId()); + ASSERT_EQ("ERROR", *e.getRequestId()); + } } TEST_F(WebsocketsRestSinkTest, should_return_error_for_unknown_parameter) From bcb786d91a1db1a32cbad955da985d8894c38069 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Sat, 10 Jan 2026 15:18:34 +0100 Subject: [PATCH 10/11] Finished websocket tests --- test_package/websockets_rest_sink_test.cpp | 25 +++++++++++----------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/test_package/websockets_rest_sink_test.cpp b/test_package/websockets_rest_sink_test.cpp index 88ccb98e3..4aa0b8a4d 100644 --- a/test_package/websockets_rest_sink_test.cpp +++ b/test_package/websockets_rest_sink_test.cpp @@ -640,22 +640,23 @@ TEST_F(WebsocketsRestSinkTest, should_return_error_for_malformed_json) } } -TEST_F(WebsocketsRestSinkTest, should_return_error_for_unknown_parameter) -{ - GTEST_SKIP() << "Test not implemented yet"; -} - TEST_F(WebsocketsRestSinkTest, should_return_error_for_unknown_device) { - GTEST_SKIP() << "Test not implemented yet"; + { + PARSE_XML_WS_RESPONSE(R"({ "id": "1", "request": "probe", "device": "XyzAbc"})"); + ASSERT_XML_PATH_EQUAL(doc, "//m:InvalidURI@errorCode", "INVALID_URI"); + ASSERT_XML_PATH_EQUAL(doc, "//m:ErrorMessage", "0.0.0.0:"); + } } TEST_F(WebsocketsRestSinkTest, should_return_error_for_bad_parameter_value) { - GTEST_SKIP() << "Test not implemented yet"; -} - -TEST_F(WebsocketsRestSinkTest, should_coerce_parameter_data_types) -{ - GTEST_SKIP() << "Test not implemented yet"; + { + PARSE_XML_WS_RESPONSE( + R"({ "id": "1", "request": "current", "format": "xml", "at": "notanumber" })"); + ASSERT_XML_PATH_EQUAL(doc, "//m:InvalidParameterValue@errorCode", "INVALID_PARAMETER_VALUE"); + ASSERT_XML_PATH_EQUAL( + doc, "//m:ErrorMessage", + "query parameter 'at': invalid type, expected uint64"); + } } From ebb6ee289d7bdeebce653aa691c97a084d760201 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Sat, 10 Jan 2026 15:22:05 +0100 Subject: [PATCH 11/11] Finished websocket tests --- test_package/websockets_rest_sink_test.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test_package/websockets_rest_sink_test.cpp b/test_package/websockets_rest_sink_test.cpp index 4aa0b8a4d..aef828f87 100644 --- a/test_package/websockets_rest_sink_test.cpp +++ b/test_package/websockets_rest_sink_test.cpp @@ -610,11 +610,6 @@ TEST_F(WebsocketsRestSinkTest, should_return_error_if_duplicate_id) } } -TEST_F(WebsocketsRestSinkTest, should_return_error_wrong_parameter_type) -{ - GTEST_SKIP() << "Test not implemented yet"; -} - TEST_F(WebsocketsRestSinkTest, should_return_error_for_unknown_command) { {