diff --git a/src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cs b/src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cs new file mode 100644 index 00000000..804ef43e --- /dev/null +++ b/src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cs @@ -0,0 +1,88 @@ +using Microsoft.IO; +using MongoDB.Bson; +using MongoDB.Bson.Serialization; +using Universalis.Application.Realtime.Messages; +using Universalis.Application.Tests.Mocks.Realtime.Messages; +using Xunit; + +namespace Universalis.Application.Tests.Realtime.Messages; + +public class SocketMessageTests +{ + private static readonly RecyclableMemoryStreamManager Pool = new(); + + [Fact] + public void GetSerializedBytes_ReturnsCachedBytesWhenAvailable() + { + // Arrange + var message = new MockMessage("test") { Value = 42 }; + var cachedBytes = new byte[] { 1, 2, 3, 4, 5 }; + message.CachedSerializedBytes = cachedBytes; + + // Act + var result = message.GetSerializedBytes(Pool); + + // Assert - verify it returns the exact cached bytes + Assert.Equal(cachedBytes, result.ToArray()); + } + + [Fact] + public void GetSerializedBytes_SerializesWhenNoCachedBytes() + { + // Arrange + var message = new MockMessage("test") { Value = 42 }; + + // Act + var result = message.GetSerializedBytes(Pool); + + // Assert + Assert.False(result.IsEmpty); + Assert.True(result.Length > 0); + } + + [Fact] + public void GetSerializedBytes_ProducesValidBson() + { + // Arrange + var message = new MockMessage("test", "channel") { Value = 123 }; + + // Act + var bytes = message.GetSerializedBytes(Pool); + var deserialized = BsonSerializer.Deserialize(bytes.ToArray()); + + // Assert + Assert.Equal("test/channel", deserialized["event"].AsString); + Assert.Equal(123, deserialized["value"].AsInt32); + } + + [Fact] + public void GetSerializedBytes_CachedAndFreshProduceSameOutput() + { + // Arrange + var message1 = new MockMessage("test", "channel") { Value = 999 }; + var message2 = new MockMessage("test", "channel") { Value = 999 }; + + // Act - serialize message1 fresh, then cache its bytes on message2 + var freshBytes = message1.GetSerializedBytes(Pool).ToArray(); + message2.CachedSerializedBytes = freshBytes; + var cachedBytes = message2.GetSerializedBytes(Pool).ToArray(); + + // Assert - both should be identical + Assert.Equal(freshBytes, cachedBytes); + } + + [Fact] + public void GetSerializedBytes_WorksForSubscribeFailure() + { + // Arrange - SubscribeFailure is sent per-client without caching + var message = new SubscribeFailure("test error"); + + // Act - should use fallback serialization path + var bytes = message.GetSerializedBytes(Pool); + var deserialized = BsonSerializer.Deserialize(bytes.ToArray()); + + // Assert + Assert.Equal("subscribe/error", deserialized["event"].AsString); + Assert.Equal("test error", deserialized["reason"].AsString); + } +} diff --git a/src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs b/src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs index b0d1cc4d..d66abe66 100644 --- a/src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs +++ b/src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs @@ -9,6 +9,7 @@ using Moq; using Universalis.Application.Realtime; using Universalis.Application.Realtime.Messages; +using Universalis.Application.Tests.Mocks.Realtime.Messages; using Universalis.Common.Collections; using Xunit; @@ -16,6 +17,60 @@ namespace Universalis.Application.Tests.Realtime; public class SocketProcessorTests { + [Fact] + public void Publish_SetsCachedSerializedBytesOnMessage() + { + // Arrange + var loggerMock = new Mock>(); + var socketProcessor = new SocketProcessor(loggerMock.Object); + var message = new MockMessage("test", "channel") { Value = 42 }; + + // Act + Assert.Null(message.CachedSerializedBytes); // Verify it starts null + socketProcessor.Publish(message); + + // Assert - CachedSerializedBytes should now be set + Assert.NotNull(message.CachedSerializedBytes); + Assert.True(message.CachedSerializedBytes.Length > 0); + } + + [Fact] + public void Publish_ContinuesWhenOneClientThrows() + { + // Arrange + var loggerMock = new Mock>(); + var socketProcessor = new SocketProcessor(loggerMock.Object); + + var goodClient1 = new Mock(); + var badClient = new Mock(); + var goodClient2 = new Mock(); + + // Configure badClient to throw exception on Push + badClient.Setup(c => c.Push(It.IsAny())) + .Throws(new InvalidOperationException("Simulated client error")); + + var message = new MockMessage("test") { Value = 42 }; + + // Add clients to the processor + var connectionsField = typeof(SocketProcessor).GetField("_connections", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + var connections = new ConcurrentDictionary + { + [Guid.NewGuid()] = goodClient1.Object, + [Guid.NewGuid()] = badClient.Object, + [Guid.NewGuid()] = goodClient2.Object, + }; + connectionsField.SetValue(socketProcessor, connections); + + // Act - should not throw despite badClient throwing + socketProcessor.Publish(message); + + // Assert - good clients should still have received the message + goodClient1.Verify(c => c.Push(message), Times.Once); + goodClient2.Verify(c => c.Push(message), Times.Once); + badClient.Verify(c => c.Push(message), Times.Once); // Was called but threw + } + [Fact] public void Publish_FuzzTest() { diff --git a/src/Universalis.Application/Realtime/Messages/SocketMessage.cs b/src/Universalis.Application/Realtime/Messages/SocketMessage.cs index 4c31dc75..582a48b2 100644 --- a/src/Universalis.Application/Realtime/Messages/SocketMessage.cs +++ b/src/Universalis.Application/Realtime/Messages/SocketMessage.cs @@ -1,4 +1,8 @@ -using MongoDB.Bson.Serialization.Attributes; +using System; +using Microsoft.IO; +using MongoDB.Bson.IO; +using MongoDB.Bson.Serialization; +using MongoDB.Bson.Serialization.Attributes; namespace Universalis.Application.Realtime.Messages; @@ -10,8 +14,25 @@ public abstract class SocketMessage [BsonIgnore] public string[] ChannelsInternal { get; } + [BsonIgnore] + internal byte[]? CachedSerializedBytes { get; set; } + protected SocketMessage(params string[] channels) { ChannelsInternal = channels; } + + /// + /// Returns pre-cached serialized bytes if available, otherwise serializes on-the-fly. + /// + internal ReadOnlyMemory GetSerializedBytes(RecyclableMemoryStreamManager pool) + { + if (CachedSerializedBytes != null) + return CachedSerializedBytes; + + using var stream = pool.GetStream(); + using var writer = new BsonBinaryWriter(stream); + BsonSerializer.Serialize(writer, GetType(), this); + return stream.ToArray(); + } } \ No newline at end of file diff --git a/src/Universalis.Application/Realtime/SocketClient.cs b/src/Universalis.Application/Realtime/SocketClient.cs index fdd42981..b45134d1 100644 --- a/src/Universalis.Application/Realtime/SocketClient.cs +++ b/src/Universalis.Application/Realtime/SocketClient.cs @@ -9,7 +9,6 @@ using System.Threading; using System.Threading.Tasks; using MongoDB.Bson; -using MongoDB.Bson.IO; using MongoDB.Bson.Serialization; using Universalis.Application.Realtime.Messages; @@ -269,26 +268,8 @@ private async Task ReceiveEvent(byte[] buf, CancellationToken cancellationToken private async Task SendEvent(SocketMessage message, CancellationToken cancellationToken = default) { - await using var stream = MemoryStreamPool.GetStream(); - - using var writer = new BsonBinaryWriter(stream); - BsonSerializer.Serialize(writer, message.GetType(), message); - - var cur = 0; - var end = (int)stream.Position; - foreach (var memory in stream.GetReadOnlySequence()) - { - if (cur + memory.Length >= end) - { - var lastIdx = end - cur; - await ws.SendAsync(memory[..lastIdx], WebSocketMessageType.Binary, WebSocketMessageFlags.EndOfMessage, - cancellationToken); - break; - } - - cur += memory.Length; - await ws.SendAsync(memory, WebSocketMessageType.Binary, WebSocketMessageFlags.None, cancellationToken); - } + var bytes = message.GetSerializedBytes(MemoryStreamPool); + await ws.SendAsync(bytes, WebSocketMessageType.Binary, WebSocketMessageFlags.EndOfMessage, cancellationToken); } public void Dispose() diff --git a/src/Universalis.Application/Realtime/SocketProcessor.cs b/src/Universalis.Application/Realtime/SocketProcessor.cs index de2eac39..b7c384b4 100644 --- a/src/Universalis.Application/Realtime/SocketProcessor.cs +++ b/src/Universalis.Application/Realtime/SocketProcessor.cs @@ -8,6 +8,9 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; +using Microsoft.IO; +using MongoDB.Bson.IO; +using MongoDB.Bson.Serialization; using Universalis.Application.Realtime.Messages; namespace Universalis.Application.Realtime; @@ -33,26 +36,41 @@ public class SocketProcessor(ILogger logger) : ISocketProcessor "universalis_ws_exceptions", "WebSocket exceptions across all connections"); + private static readonly RecyclableMemoryStreamManager MemoryStreamPool = new(); + private readonly ConcurrentDictionary _connections = new(); + private static byte[] SerializeMessage(SocketMessage message) + { + using var stream = MemoryStreamPool.GetStream(); + using var writer = new BsonBinaryWriter(stream); + BsonSerializer.Serialize(writer, message.GetType(), message); + return stream.ToArray(); + } + public void Publish(SocketMessage message) { - var stopwatch = new Stopwatch(); - stopwatch.Start(); + var stopwatch = Stopwatch.StartNew(); - foreach (var (id, connection) in _connections) - { - try - { - connection.Push(message); - MessagesSent.Inc(); - } - catch (Exception ex) + // Serialize once and cache on the message + message.CachedSerializedBytes = SerializeMessage(message); + + Parallel.ForEach( + _connections, + new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 2 }, + kvp => { - logger.LogError(ex, "Failed to send message to connection {}", id); - ExceptionCount.Inc(); - } - } + try + { + kvp.Value.Push(message); + MessagesSent.Inc(); + } + catch (Exception ex) + { + logger.LogError(ex, "Failed to send message to connection {}", kvp.Key); + ExceptionCount.Inc(); + } + }); stopwatch.Stop(); MessageQueueTime.Observe(stopwatch.ElapsedMilliseconds); diff --git a/src/Universalis.Application/Universalis.Application.csproj b/src/Universalis.Application/Universalis.Application.csproj index ec502c61..1ce4f1a2 100644 --- a/src/Universalis.Application/Universalis.Application.csproj +++ b/src/Universalis.Application/Universalis.Application.csproj @@ -5,6 +5,10 @@ 14.0 + + + + Universalis.Application.xml