diff --git a/src/gateway.cpp b/src/gateway.cpp index 99298cb..bd9c090 100644 --- a/src/gateway.cpp +++ b/src/gateway.cpp @@ -382,6 +382,8 @@ class GatewayImpl : public IGateway, private IClientTransport Server server; std::thread watchdogThread; std::atomic watchdogRunning; + std::mutex watchdogMutex_; + std::condition_variable watchdogCv_; bool legacyRPCv1; std::map rpcv1_eventMap; @@ -401,6 +403,7 @@ class GatewayImpl : public IGateway, private IClientTransport if (watchdogRunning) { watchdogRunning = false; + watchdogCv_.notify_one(); if (watchdogThread.joinable()) { watchdogThread.join(); @@ -450,7 +453,11 @@ class GatewayImpl : public IGateway, private IClientTransport { while (watchdogRunning) { - std::this_thread::sleep_for(std::chrono::milliseconds(watchdog_interval_ms)); + std::unique_lock lock(watchdogMutex_); + watchdogCv_.wait_for(lock, std::chrono::milliseconds(watchdog_interval_ms), + [this] { return !watchdogRunning.load(); }); + if (!watchdogRunning) + break; client.checkPromises(); } }); @@ -461,19 +468,37 @@ class GatewayImpl : public IGateway, private IClientTransport virtual Firebolt::Error disconnect() override { + FIREBOLT_LOG_INFO("Gateway", "[disconnect] transport.disconnect() start"); + auto t0_disc = std::chrono::steady_clock::now(); Firebolt::Error status = transport.disconnect(); + FIREBOLT_LOG_INFO("Gateway", "[disconnect] transport.disconnect() done in %ld ms, status=%d", + std::chrono::duration_cast(std::chrono::steady_clock::now() - t0_disc) + .count(), + static_cast(status)); if (status != Firebolt::Error::None) { return status; } if (watchdogRunning.exchange(false)) { + watchdogCv_.notify_one(); + FIREBOLT_LOG_INFO("Gateway", "[disconnect] waiting for watchdog thread join..."); + auto t0_wdog = std::chrono::steady_clock::now(); if (watchdogThread.joinable()) { watchdogThread.join(); } + FIREBOLT_LOG_INFO("Gateway", "[disconnect] watchdog joined in %ld ms", + std::chrono::duration_cast(std::chrono::steady_clock::now() - + t0_wdog) + .count()); } + FIREBOLT_LOG_INFO("Gateway", "[disconnect] stopping notification worker..."); + auto t0_nw = std::chrono::steady_clock::now(); server.stopNotificationWorker(); + FIREBOLT_LOG_INFO("Gateway", "[disconnect] notification worker stopped in %ld ms", + std::chrono::duration_cast(std::chrono::steady_clock::now() - t0_nw) + .count()); return Error::None; } @@ -512,11 +537,25 @@ class GatewayImpl : public IGateway, private IClientTransport nlohmann::json params; params["listen"] = true; - auto result = request(event, params, id).get(); - - if (!result) + FIREBOLT_LOG_INFO("Gateway", "[subscribe] waiting for subscribe ACK for '%s'...", event.c_str()); + auto t0_sub = std::chrono::steady_clock::now(); + auto fut = request(event, params, id); + if (fut.wait_for(std::chrono::milliseconds(50)) == std::future_status::ready) + { + auto result = fut.get(); + FIREBOLT_LOG_INFO("Gateway", "[subscribe] ACK for '%s' received in %ld ms", event.c_str(), + std::chrono::duration_cast(std::chrono::steady_clock::now() - + t0_sub) + .count()); + if (!result) + { + status = result.error(); + } + } + else { - status = result.error(); + FIREBOLT_LOG_INFO("Gateway", "[subscribe] ACK not received within 50ms, giving up"); + status = Firebolt::Error::Timedout; } if (status != Firebolt::Error::None) @@ -533,9 +572,12 @@ class GatewayImpl : public IGateway, private IClientTransport Firebolt::Error unsubscribe(const std::string& event, void* usercb) override { + FIREBOLT_LOG_DEBUG("Gateway", "Unsubscribe called for event '%s'", event.c_str()); Firebolt::Error status = server.unsubscribe(event, usercb); + if (status != Firebolt::Error::None) { + FIREBOLT_LOG_DEBUG("Gateway", "Unsubscribe failed for event '%s'", event.c_str()); return status; } @@ -563,11 +605,25 @@ class GatewayImpl : public IGateway, private IClientTransport nlohmann::json params; params["listen"] = false; - auto result = request(event, params).get(); - - if (!result) + FIREBOLT_LOG_INFO("Gateway", "[unsubscribe] sending unsubscribe for '%s', waiting for ACK (waitTime_ms=%u)...", + event.c_str(), runtime_waitTime_ms); + auto t0_unsub = std::chrono::steady_clock::now(); + auto fut = request(event, params); + if (fut.wait_for(std::chrono::milliseconds(50)) == std::future_status::ready) + { + auto result = fut.get(); + auto unsub_ms = + std::chrono::duration_cast(std::chrono::steady_clock::now() - t0_unsub).count(); + FIREBOLT_LOG_INFO("Gateway", "[unsubscribe] ACK received after %ld ms", unsub_ms); + if (!result) + { + status = result.error(); + } + } + else { - status = result.error(); + FIREBOLT_LOG_INFO("Gateway", "[unsubscribe] ACK not received within 50ms, giving up"); + status = Firebolt::Error::Timedout; } return status; diff --git a/src/transport.cpp b/src/transport.cpp index 69704e6..94140fc 100644 --- a/src/transport.cpp +++ b/src/transport.cpp @@ -20,6 +20,7 @@ #include "firebolt/logger.h" #include "firebolt/types.h" #include +#include #include namespace Firebolt::Transport @@ -234,6 +235,7 @@ Firebolt::Error Transport::disconnect() } websocketpp::lib::error_code ec; + FIREBOLT_LOG_INFO("Transport", "[disconnect] close() start (handshake timeout=100ms)"); client_->close(connectionHandle_, websocketpp::close::status::going_away, "", ec); if (ec) { @@ -241,12 +243,22 @@ Firebolt::Error Transport::disconnect() } } + FIREBOLT_LOG_INFO("Transport", "[disconnect] waiting for connectionThread join (close handshake in progress)..."); + auto t0_ct = std::chrono::steady_clock::now(); if (connectionThread_ && connectionThread_->joinable()) { connectionThread_->join(); } + FIREBOLT_LOG_INFO("Transport", "[disconnect] connectionThread joined in %ld ms", + std::chrono::duration_cast(std::chrono::steady_clock::now() - t0_ct) + .count()); + FIREBOLT_LOG_INFO("Transport", "[disconnect] stopping message worker..."); + auto t0_mw = std::chrono::steady_clock::now(); stopMessageWorker(); + FIREBOLT_LOG_INFO("Transport", "[disconnect] message worker stopped in %ld ms", + std::chrono::duration_cast(std::chrono::steady_clock::now() - t0_mw) + .count()); client_ = std::make_unique(); connectionStatus_ = TransportState::NotStarted; diff --git a/test/unit/gatewayTest.cpp b/test/unit/gatewayTest.cpp index f77fac0..9a8c5d3 100644 --- a/test/unit/gatewayTest.cpp +++ b/test/unit/gatewayTest.cpp @@ -17,6 +17,7 @@ */ #include "firebolt/gateway.h" +#include "firebolt/logger.h" #include "utils.h" #include #include @@ -152,7 +153,7 @@ class GatewayUTest : public ::testing::Test { Firebolt::Config cfg; cfg.wsUrl = m_uri; - cfg.log.level = Firebolt::LogLevel::Error; + cfg.log.level = Firebolt::LogLevel::Debug; cfg.waitTime_ms = 1000; return cfg; } @@ -592,3 +593,169 @@ TEST_F(GatewayUTest, UnsubscribeFromCallbackDoesNotDeadlock) auto status = doneFuture.wait_for(std::chrono::seconds(2)); EXPECT_EQ(status, std::future_status::ready) << "Callback blocked (possible deadlock)"; } + +// --------------------------------------------------------------------------- +// Regression test: disconnect() must return quickly even when the server dies +// while active subscriptions are held. +// +// Repro scenario (RDKEMW-16573): +// 1. Client subscribes to an event (gateway sends subscribe ACK) +// 2. Server disappears abruptly (no WS close handshake, no unsubscribe ACK) +// 3. Client calls disconnect() +// +// Before the fix: disconnect() blocked for ~waitTime_ms per subscription +// (request(...).get() inside unsubscribe() held the calling thread). +// +// After the fix: disconnect() uses wait_for(50ms) ceilings and returns in +// well under 500ms regardless of server responsiveness. +// +// The test asserts disconnect() completes within 500ms (generous allowance). +// On an unpatched build it will take >= waitTime_ms (1000ms default) and fail. +// --------------------------------------------------------------------------- +TEST_F(GatewayUTest, SubscribeDoesNotHangWhenServerIgnoresSubscribeAck) +{ + // If the server never sends the subscribe ACK, subscribe() calls + // request().get() which blocks until checkPromises() fires the timeout + // after waitTime_ms (1000ms). Pre-fix: FAILS with ~1000-1500ms. + // Post-fix (wait_for 50ms): PASSES with ~50ms. + // + // Setup: server drops all messages — it never replies to anything, + // including the subscribe frame. + m_messageHandler = [](connection_hdl, server::message_ptr) { /* drop everything */ }; + + startServer(); + IGateway& gateway = GetGatewayInstance(); + // Connection itself is at the WebSocket level (HTTP upgrade) — the server + // doesn't need to send a JSON-RPC message for connect() to succeed. + auto connectionFuture = m_connectionPromise.get_future(); + Firebolt::Error err = gateway.connect(getTestConfig(), [this](bool connected, const Firebolt::Error& connErr) + { onConnectionChange(connected, connErr); }); + ASSERT_EQ(err, Firebolt::Error::None); + connectionFuture.wait_for(std::chrono::seconds(2)); + + std::promise eventPromise; + auto onEvent = [](void* usercb, const nlohmann::json& params) + { static_cast*>(usercb)->set_value(params); }; + + auto t0 = std::chrono::steady_clock::now(); + Firebolt::Error subErr = gateway.subscribe("test.onStateChanged", onEvent, &eventPromise); + auto elapsed = std::chrono::duration_cast(std::chrono::steady_clock::now() - t0); + + std::cout << "[timing] subscribe() (no ACK from server) took " << elapsed.count() << " ms\n"; + + // subscribe() should return error (no ACK) quickly, not hang for 1000ms. + EXPECT_NE(subErr, Firebolt::Error::None) << "subscribe() should fail when server ignores ACK"; + EXPECT_LT(elapsed.count(), 200) << "subscribe() blocked for " << elapsed.count() + << " ms — waiting for ACK that never came (bug reproduced)"; +} + +TEST_F(GatewayUTest, WatchdogDoesNotHangOnDisconnect) +{ + // The watchdog thread loops with sleep_for(500ms) on the pre-fix baseline. + // When disconnect() sets watchdogRunning=false the thread is mid-sleep and + // takes up to 500ms to notice. On hardware (Sky app) this shows as 312-416ms. + // + // This test verifies that disconnect() completes in < 100ms even when the + // watchdog is running. Pre-fix: FAILS (~0-500ms, typically ~250ms). + // Post-fix (condition_variable): PASSES (< 5ms). + IGateway& gateway = connectAndWait(); + + // Ensure the watchdog is well into its sleep cycle before we disconnect. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + auto t0 = std::chrono::steady_clock::now(); + gateway.disconnect(); + auto elapsed = std::chrono::duration_cast(std::chrono::steady_clock::now() - t0); + + std::cout << "[timing] disconnect() (watchdog running) took " << elapsed.count() << " ms\n"; + + EXPECT_LT(elapsed.count(), 100) << "disconnect() took " << elapsed.count() + << " ms — watchdog sleep_for(500ms) is blocking join (bug reproduced)"; +} + +TEST_F(GatewayUTest, DisconnectDoesNotHangWhenServerDisappearsWithActiveSubscription) +{ + // Use a silent message handler so subscribe ACK is sent but unsubscribe ACK + // is intentionally never sent (simulates server dying after the subscription). + bool subscribeAckSent = false; + m_messageHandler = [this, &subscribeAckSent](connection_hdl hdl, server::message_ptr msg) + { + auto request = nlohmann::json::parse(msg->get_payload()); + const std::string method = request.value("method", ""); + + // Ack the subscribe so the client considers it active. + if (!subscribeAckSent && method.find(".on") != std::string::npos) + { + nlohmann::json ack; + ack["jsonrpc"] = "2.0"; + ack["id"] = request["id"]; + ack["result"] = {{"listening", true}}; + m_server.send(hdl, ack.dump(), msg->get_opcode()); + subscribeAckSent = true; + return; + } + // All subsequent messages (including the unsubscribe request) get no reply. + // This simulates a dead server. + }; + + // Connect with a callback that mirrors the Sky app's log output. + // On device these appear as: + // [Firebolt] FireboltService uninitialize + // [Firebolt] Connection state changed: Disconnected, error=2 + // [Firebolt] Connection state changed: Disconnected end + startServer(); + IGateway& gateway = GetGatewayInstance(); + auto connectionFuture = m_connectionPromise.get_future(); + + Firebolt::Error err = + gateway.connect(getTestConfig(), + [this](bool connected, const Firebolt::Error& error) + { + if (connected) + { + onConnectionChange(connected, error); + } + else + { + FIREBOLT_LOG_INFO("FireboltApp", "Connection state changed: Disconnected, error=%d", + static_cast(error)); + FIREBOLT_LOG_INFO("FireboltApp", "Connection state changed: Disconnected end"); + } + }); + ASSERT_EQ(err, Firebolt::Error::None); + connectionFuture.wait_for(std::chrono::seconds(2)); + + std::promise eventPromise; + auto onEvent = [](void* usercb, const nlohmann::json& params) + { static_cast*>(usercb)->set_value(params); }; + + Firebolt::Error subErr = gateway.subscribe("test.onStateChanged", onEvent, &eventPromise); + ASSERT_EQ(subErr, Firebolt::Error::None) << "subscribe() failed"; + + // Give the subscribe ACK time to arrive. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + // Switch to a silent handler: server keeps the TCP connection open but + // never replies to anything (including unsubscribe ACKs). + // This is the on-device scenario: WPEFramework plugin stops responding + // without closing the socket. The bug is in unsubscribe(): pre-fix calls + // request().get() which blocks indefinitely waiting for the ACK. + m_messageHandler = [](connection_hdl, server::message_ptr) { /* drop everything */ }; + + // Time unsubscribe() + disconnect() together. + // The hang is in unsubscribe(): pre-fix calls request().get() which blocks + // indefinitely waiting for the server's unsubscribe ACK (which never comes). + // With the bug: unsubscribe() blocks >= waitTime_ms (1000 ms). + // With the fix: unsubscribe() returns in <= 50 ms (wait_for ceiling). + FIREBOLT_LOG_INFO("FireboltApp", "FireboltService uninitialize"); + auto t0 = std::chrono::steady_clock::now(); + gateway.unsubscribe("test.onStateChanged", &eventPromise); + auto elapsed = std::chrono::duration_cast(std::chrono::steady_clock::now() - t0); + + std::cout << "[timing] unsubscribe() took " << elapsed.count() << " ms\n"; + + gateway.disconnect(); + + EXPECT_LT(elapsed.count(), 200) << "unsubscribe() took " << elapsed.count() + << " ms — blocked waiting for ACK from silent server (bug reproduced)"; +}