diff --git a/src/relaycast_ws.rs b/src/relaycast_ws.rs index 82e40cfd2..c33f03160 100644 --- a/src/relaycast_ws.rs +++ b/src/relaycast_ws.rs @@ -6,7 +6,7 @@ use relaycast::{ agent::DmOptions, format_registration_error, retry_agent_registration as sdk_retry_agent_registration, AgentClient, AgentRegistrationClient, AgentRegistrationError, AgentRegistrationRetryOutcome, MessageListQuery, RelayCast, - RelayCastOptions, RelayError, ReleaseAgentRequest, WsClient, WsClientOptions, WsLifecycleEvent, + RelayCastOptions, ReleaseAgentRequest, WsClient, WsClientOptions, WsLifecycleEvent, }; use serde_json::{json, Value}; use tokio::sync::mpsc; @@ -502,9 +502,11 @@ impl RelaycastHttpClient { }; let config = relay - .workspace_stream_set(true) + .ensure_workspace_stream_enabled() .await - .map_err(|error| anyhow::anyhow!("relaycast workspace_stream_set failed: {error}"))?; + .map_err(|error| { + anyhow::anyhow!("relaycast ensure_workspace_stream_enabled failed: {error}") + })?; tracing::debug!( enabled = config.enabled, default_enabled = config.default_enabled, @@ -548,30 +550,24 @@ impl RelaycastHttpClient { return Ok(()); } }; - match agent_client.create_channel(request).await { - Ok(_) => { - tracing::info!(channel = %name, "created default channel"); - } - Err(RelayError::Api { status: 409, .. }) => { - tracing::debug!(channel = %name, "default channel already exists"); + match agent_client.ensure_channel_joined(request).await { + Ok(outcome) => { + if outcome.created { + tracing::info!(channel = %name, "created default channel"); + } else { + tracing::debug!(channel = %name, "default channel already exists"); + } + if outcome.joined { + tracing::info!(channel = %name, "broker joined default channel"); + } else { + tracing::debug!(channel = %name, "broker already joined default channel"); + } } Err(error) => { - tracing::warn!(channel = %name, error = %error, "failed to create default channel"); + tracing::warn!(channel = %name, error = %error, "failed to ensure broker joined default channel"); continue; } } - // Join so the broker receives message.created WS events for this channel. - match agent_client.join_channel(name).await { - Ok(_) => { - tracing::info!(channel = %name, "broker joined default channel"); - } - Err(RelayError::Api { status: 409, .. }) => { - tracing::debug!(channel = %name, "broker already joined default channel"); - } - Err(error) => { - tracing::warn!(channel = %name, error = %error, "failed to join default channel"); - } - } } Ok(()) } @@ -611,24 +607,21 @@ impl RelaycastHttpClient { topic: None, metadata: None, }; - match agent_client.create_channel(request).await { - Ok(_) => tracing::info!(channel = %name, "created extra channel"), - Err(RelayError::Api { status: 409, .. }) => { - tracing::debug!(channel = %name, "extra channel already exists"); - } - Err(error) => { - tracing::warn!(channel = %name, error = %error, "failed to create extra channel"); - continue; - } - } - // Join the channel so the broker receives message.created WS events. - match agent_client.join_channel(name).await { - Ok(_) => tracing::info!(channel = %name, "broker joined extra channel"), - Err(RelayError::Api { status: 409, .. }) => { - tracing::debug!(channel = %name, "broker already joined extra channel"); + match agent_client.ensure_channel_joined(request).await { + Ok(outcome) => { + if outcome.created { + tracing::info!(channel = %name, "created extra channel"); + } else { + tracing::debug!(channel = %name, "extra channel already exists"); + } + if outcome.joined { + tracing::info!(channel = %name, "broker joined extra channel"); + } else { + tracing::debug!(channel = %name, "broker already joined extra channel"); + } } Err(error) => { - tracing::warn!(channel = %name, error = %error, "failed to join extra channel"); + tracing::warn!(channel = %name, error = %error, "failed to ensure broker joined extra channel"); } } }