Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using Microsoft.IO;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using Universalis.Application.Realtime.Messages;
using Universalis.Application.Tests.Mocks.Realtime.Messages;
using Xunit;

namespace Universalis.Application.Tests.Realtime.Messages;

public class SocketMessageTests
{
private static readonly RecyclableMemoryStreamManager Pool = new();

[Fact]
public void GetSerializedBytes_ReturnsCachedBytesWhenAvailable()
{
// Arrange
var message = new MockMessage("test") { Value = 42 };
var cachedBytes = new byte[] { 1, 2, 3, 4, 5 };
message.CachedSerializedBytes = cachedBytes;

// Act
var result = message.GetSerializedBytes(Pool);

// Assert - verify it returns the exact cached bytes
Assert.Equal(cachedBytes, result.ToArray());
}

[Fact]
public void GetSerializedBytes_SerializesWhenNoCachedBytes()
{
// Arrange
var message = new MockMessage("test") { Value = 42 };

// Act
var result = message.GetSerializedBytes(Pool);

// Assert
Assert.False(result.IsEmpty);
Assert.True(result.Length > 0);
}

[Fact]
public void GetSerializedBytes_ProducesValidBson()
{
// Arrange
var message = new MockMessage("test", "channel") { Value = 123 };

// Act
var bytes = message.GetSerializedBytes(Pool);
var deserialized = BsonSerializer.Deserialize<BsonDocument>(bytes.ToArray());

// Assert
Assert.Equal("test/channel", deserialized["event"].AsString);
Assert.Equal(123, deserialized["value"].AsInt32);

Check notice on line 55 in src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cs#L55

Assign this magic number '123' to a well-named variable or constant, and use that instead.
}

[Fact]
public void GetSerializedBytes_CachedAndFreshProduceSameOutput()
{
// Arrange
var message1 = new MockMessage("test", "channel") { Value = 999 };
var message2 = new MockMessage("test", "channel") { Value = 999 };

// Act - serialize message1 fresh, then cache its bytes on message2
var freshBytes = message1.GetSerializedBytes(Pool).ToArray();
message2.CachedSerializedBytes = freshBytes;
var cachedBytes = message2.GetSerializedBytes(Pool).ToArray();

// Assert - both should be identical
Assert.Equal(freshBytes, cachedBytes);
}

[Fact]
public void GetSerializedBytes_WorksForSubscribeFailure()
{
// Arrange - SubscribeFailure is sent per-client without caching
var message = new SubscribeFailure("test error");

// Act - should use fallback serialization path
var bytes = message.GetSerializedBytes(Pool);
var deserialized = BsonSerializer.Deserialize<BsonDocument>(bytes.ToArray());

// Assert
Assert.Equal("subscribe/error", deserialized["event"].AsString);
Assert.Equal("test error", deserialized["reason"].AsString);
}
}
55 changes: 55 additions & 0 deletions src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,68 @@
using Moq;
using Universalis.Application.Realtime;
using Universalis.Application.Realtime.Messages;
using Universalis.Application.Tests.Mocks.Realtime.Messages;
using Universalis.Common.Collections;
using Xunit;

namespace Universalis.Application.Tests.Realtime;

public class SocketProcessorTests
{
[Fact]
public void Publish_SetsCachedSerializedBytesOnMessage()
{
// Arrange
var loggerMock = new Mock<ILogger<SocketProcessor>>();
var socketProcessor = new SocketProcessor(loggerMock.Object);
var message = new MockMessage("test", "channel") { Value = 42 };

// Act
Assert.Null(message.CachedSerializedBytes); // Verify it starts null
socketProcessor.Publish(message);

// Assert - CachedSerializedBytes should now be set
Assert.NotNull(message.CachedSerializedBytes);
Assert.True(message.CachedSerializedBytes.Length > 0);
}

[Fact]
public void Publish_ContinuesWhenOneClientThrows()
{
// Arrange
var loggerMock = new Mock<ILogger<SocketProcessor>>();
var socketProcessor = new SocketProcessor(loggerMock.Object);

var goodClient1 = new Mock<ISocketClient>();
var badClient = new Mock<ISocketClient>();
var goodClient2 = new Mock<ISocketClient>();

// Configure badClient to throw exception on Push
badClient.Setup(c => c.Push(It.IsAny<SocketMessage>()))
.Throws(new InvalidOperationException("Simulated client error"));

var message = new MockMessage("test") { Value = 42 };

// Add clients to the processor
var connectionsField = typeof(SocketProcessor).GetField("_connections",

Check warning on line 55 in src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs#L55

Define a constant instead of using this literal '_connections' 4 times.
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
var connections = new ConcurrentDictionary<Guid, ISocketClient>
{
[Guid.NewGuid()] = goodClient1.Object,
[Guid.NewGuid()] = badClient.Object,
[Guid.NewGuid()] = goodClient2.Object,
};
connectionsField.SetValue(socketProcessor, connections);

// Act - should not throw despite badClient throwing
socketProcessor.Publish(message);

// Assert - good clients should still have received the message
goodClient1.Verify(c => c.Push(message), Times.Once);
goodClient2.Verify(c => c.Push(message), Times.Once);
badClient.Verify(c => c.Push(message), Times.Once); // Was called but threw
}

[Fact]
public void Publish_FuzzTest()
{
Expand Down
23 changes: 22 additions & 1 deletion src/Universalis.Application/Realtime/Messages/SocketMessage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
using MongoDB.Bson.Serialization.Attributes;
using System;
using Microsoft.IO;
using MongoDB.Bson.IO;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Attributes;

namespace Universalis.Application.Realtime.Messages;

Expand All @@ -10,8 +14,25 @@ public abstract class SocketMessage
[BsonIgnore]
public string[] ChannelsInternal { get; }

[BsonIgnore]
internal byte[]? CachedSerializedBytes { get; set; }

protected SocketMessage(params string[] channels)
{
ChannelsInternal = channels;
}

/// <summary>
/// Returns pre-cached serialized bytes if available, otherwise serializes on-the-fly.
/// </summary>
internal ReadOnlyMemory<byte> GetSerializedBytes(RecyclableMemoryStreamManager pool)
{
if (CachedSerializedBytes != null)
return CachedSerializedBytes;

using var stream = pool.GetStream();
using var writer = new BsonBinaryWriter(stream);
BsonSerializer.Serialize(writer, GetType(), this);
return stream.ToArray();
}
}
23 changes: 2 additions & 21 deletions src/Universalis.Application/Realtime/SocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Bson.IO;
using MongoDB.Bson.Serialization;
using Universalis.Application.Realtime.Messages;

Expand Down Expand Up @@ -269,26 +268,8 @@ private async Task ReceiveEvent(byte[] buf, CancellationToken cancellationToken

private async Task SendEvent(SocketMessage message, CancellationToken cancellationToken = default)
{
await using var stream = MemoryStreamPool.GetStream();

using var writer = new BsonBinaryWriter(stream);
BsonSerializer.Serialize(writer, message.GetType(), message);

var cur = 0;
var end = (int)stream.Position;
foreach (var memory in stream.GetReadOnlySequence())
{
if (cur + memory.Length >= end)
{
var lastIdx = end - cur;
await ws.SendAsync(memory[..lastIdx], WebSocketMessageType.Binary, WebSocketMessageFlags.EndOfMessage,
cancellationToken);
break;
}

cur += memory.Length;
await ws.SendAsync(memory, WebSocketMessageType.Binary, WebSocketMessageFlags.None, cancellationToken);
}
var bytes = message.GetSerializedBytes(MemoryStreamPool);
await ws.SendAsync(bytes, WebSocketMessageType.Binary, WebSocketMessageFlags.EndOfMessage, cancellationToken);
}

public void Dispose()
Expand Down
46 changes: 32 additions & 14 deletions src/Universalis.Application/Realtime/SocketProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.IO;
using MongoDB.Bson.IO;
using MongoDB.Bson.Serialization;
using Universalis.Application.Realtime.Messages;

namespace Universalis.Application.Realtime;
Expand All @@ -33,26 +36,41 @@
"universalis_ws_exceptions",
"WebSocket exceptions across all connections");

private static readonly RecyclableMemoryStreamManager MemoryStreamPool = new();

private readonly ConcurrentDictionary<Guid, ISocketClient> _connections = new();

private static byte[] SerializeMessage(SocketMessage message)
{
using var stream = MemoryStreamPool.GetStream();
using var writer = new BsonBinaryWriter(stream);
BsonSerializer.Serialize(writer, message.GetType(), message);
return stream.ToArray();
}

public void Publish(SocketMessage message)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var stopwatch = Stopwatch.StartNew();

foreach (var (id, connection) in _connections)
{
try
{
connection.Push(message);
MessagesSent.Inc();
}
catch (Exception ex)
// Serialize once and cache on the message
message.CachedSerializedBytes = SerializeMessage(message);

Parallel.ForEach(
_connections,
new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 2 },

Check notice on line 60 in src/Universalis.Application/Realtime/SocketProcessor.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/Universalis.Application/Realtime/SocketProcessor.cs#L60

Assign this magic number '2' to a well-named variable or constant, and use that instead.
kvp =>
{
logger.LogError(ex, "Failed to send message to connection {}", id);
ExceptionCount.Inc();
}
}
try
{
kvp.Value.Push(message);
MessagesSent.Inc();
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to send message to connection {}", kvp.Key);
ExceptionCount.Inc();
}
});

stopwatch.Stop();
MessageQueueTime.Observe(stopwatch.ElapsedMilliseconds);
Expand Down
4 changes: 4 additions & 0 deletions src/Universalis.Application/Universalis.Application.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
<LangVersion>14.0</LangVersion>
</PropertyGroup>

<ItemGroup>
<InternalsVisibleTo Include="Universalis.Application.Tests" />
</ItemGroup>

<PropertyGroup>
<DocumentationFile>Universalis.Application.xml</DocumentationFile>
</PropertyGroup>
Expand Down
Loading