Skip to content

Commit 1135db7

Browse files
author
andrey.leskov
committed
Green tests,
saga and aggregate actors can notify about events persisted
1 parent 1aaf616 commit 1135db7

File tree

6 files changed

+53
-47
lines changed

6 files changed

+53
-47
lines changed

GridDomain.Domain.Tests/Sagas/InstanceSagas/Given_saga_When_publishing_any_of_start_messages.cs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,35 +9,34 @@
99

1010
namespace GridDomain.Tests.Sagas.InstanceSagas
1111
{
12-
13-
1412
[TestFixture]
15-
class Given_saga_When_publishing_any_of_start_messages : Given_saga_When_publishing_start_messages
13+
class Given_saga_When_publishing_any_of_start_messages : SoftwareProgrammingInstanceSagaTest
1614
{
17-
private static Guid SagaId = Guid.NewGuid();
15+
private static readonly Guid SagaId = Guid.NewGuid();
16+
private SagaDataAggregate<SoftwareProgrammingSagaData> _sagaData;
1817

19-
public Given_saga_When_publishing_any_of_start_messages():
20-
base(SagaId, new SleptWellEvent(Guid.NewGuid(), Guid.NewGuid(), SagaId))
18+
[OneTimeSetUp]
19+
public void When_publishing_start_message()
2120
{
22-
21+
GridNode.NewDebugWaiter(Timeout)
22+
.Expect<SagaCreatedEvent<SoftwareProgrammingSagaData>>()
23+
.Create()
24+
.Publish(new SleptWellEvent(Guid.NewGuid(), Guid.NewGuid(), SagaId))
25+
.Wait();
26+
27+
_sagaData = LoadAggregate<SagaDataAggregate<SoftwareProgrammingSagaData>>(SagaId);
2328
}
2429

2530
[Then]
2631
public void Saga_data_is_not_null()
2732
{
28-
Assert.NotNull(SagaData.Data);
33+
Assert.NotNull(_sagaData.Data);
2934
}
3035

31-
3236
[Then]
3337
public void Saga_has_correct_id()
3438
{
35-
Assert.AreEqual(_sagaId, SagaData.Id);
36-
}
37-
38-
protected override IExpectBuilder<AnyMessagePublisher> ConfigureWait(IMessageWaiter<AnyMessagePublisher> waiter)
39-
{
40-
return waiter.Expect<SagaCreatedEvent<SoftwareProgrammingSagaData>>();
39+
Assert.AreEqual(SagaId, _sagaData.Id);
4140
}
4241
}
4342
}
Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,50 @@
11
using System;
2+
using System.Threading;
3+
using Akka.Actor;
4+
using Akka.Persistence;
25
using GridDomain.CQRS;
36
using GridDomain.EventSourcing.Sagas;
47
using GridDomain.EventSourcing.Sagas.InstanceSagas;
8+
using GridDomain.Node.Actors;
59
using GridDomain.Tests.Framework;
610
using GridDomain.Tests.Sagas.SoftwareProgrammingDomain.Events;
711
using NUnit.Framework;
812

913
namespace GridDomain.Tests.Sagas.InstanceSagas
1014
{
1115
[TestFixture]
12-
class Given_saga_When_publishing_several_start_messages : Given_saga_When_publishing_start_messages
16+
class Given_saga_When_publishing_several_start_messages : SoftwareProgrammingInstanceSagaTest
1317
{
1418
private static readonly Guid SagaId = Guid.NewGuid();
15-
private static GotTiredEvent firstMessage;
16-
private static SleptWellEvent secondMessage;
17-
private static object[] GetMessages(Guid sagaId)
18-
{
19-
firstMessage = new GotTiredEvent(Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid(), sagaId);
20-
secondMessage = new SleptWellEvent(Guid.NewGuid(), Guid.NewGuid(), sagaId);
21-
22-
return new object[]
23-
{
24-
firstMessage,
25-
secondMessage
26-
};
27-
}
19+
private static SleptWellEvent secondStartMessage;
20+
private SagaDataAggregate<SoftwareProgrammingSagaData> SagaData;
2821

29-
public Given_saga_When_publishing_several_start_messages(): base(SagaId,GetMessages(SagaId))
22+
[OneTimeSetUp]
23+
public void When_publishing_start_message()
3024
{
25+
var waiter = GridNode.NewDebugWaiter(Timeout);
26+
27+
secondStartMessage = new SleptWellEvent(Guid.NewGuid(), Guid.NewGuid(), SagaId);
28+
29+
waiter.Expect<SagaCreatedEvent<SoftwareProgrammingSagaData>>()
30+
.Create()
31+
.Publish(new GotTiredEvent(Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid(), SagaId),
32+
secondStartMessage)
33+
.Wait();
34+
35+
LookupInstanceSagaActor<SoftwareProgrammingSaga, SoftwareProgrammingSagaData>(SagaId)
36+
.Tell(NotifyOnPersistenceEvents.Instance);
3137

38+
int count = 2;
39+
FishForMessage<Persisted>(m => ++count >=2);
40+
41+
SagaData = LoadAggregate<SagaDataAggregate<SoftwareProgrammingSagaData>>(SagaId);
3242
}
3343

3444
[Then]
3545
public void Saga_reinitialized_from_last_start_message()
3646
{
37-
Assert.AreEqual(secondMessage.SofaId, SagaData.Data.SofaId);
47+
Assert.AreEqual(secondStartMessage.SofaId, SagaData.Data.SofaId);
3848
}
3949

4050
[Then]
@@ -43,10 +53,5 @@ public void Saga_has_correct_state()
4353
var saga = new SoftwareProgrammingSaga();
4454
Assert.AreEqual(saga.Coding.Name, SagaData.Data.CurrentStateName);
4555
}
46-
47-
protected override IExpectBuilder<AnyMessagePublisher> ConfigureWait(IMessageWaiter<AnyMessagePublisher> waiter)
48-
{
49-
return waiter.Expect<SagaCreatedEvent<SoftwareProgrammingSagaData>>();
50-
}
5156
}
5257
}

GridDomain.Node/Actors/AggregateActor.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ private void ProcessAggregateEvents(ICommand command)
9898
.With<FutureEventCanceledEvent>(Handle);
9999

100100
Publisher.Publish(e);
101+
102+
NotifyWatchers(new Persisted(e));
101103
});
102104

103105
State.ClearUncommittedEvents();

GridDomain.Node/Actors/AggregatePersistedNotification.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@
22

33
namespace GridDomain.Node.Actors
44
{
5-
public class AggregatePersistedNotification
5+
public class Persisted
66
{
7-
public AggregatePersistedNotification(Guid id, Guid commandId)
7+
public object Event { get;}
8+
9+
public Persisted(object @event)
810
{
9-
Id = id;
10-
CommandId = commandId;
11+
Event = @event;
1112
}
12-
13-
public Guid CommandId { get; }
14-
public Guid Id { get; }
1513
}
1614
}

GridDomain.Node/Actors/EventSourcedActor.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public EventSourcedActor(IConstructAggregates aggregateConstructor,
4040
Monitor.IncrementMessagesReceived();
4141
DeleteSnapshots(SnapshotsPolicy.GetSnapshotsToDelete());
4242
Become(Terminating);
43-
// StopNow(new DeleteSnapshotsSuccess(SnapshotSelectionCriteria.None));
4443
});
4544

4645
Command<CheckHealth>(s => Sender.Tell(new HealthStatus(s.Payload)));
@@ -88,7 +87,7 @@ protected bool TrySaveSnapshot(object[] stateChange)
8887
return shouldSave;
8988
}
9089

91-
private void NotifyWatchers(object msg)
90+
protected void NotifyWatchers(object msg)
9291
{
9392
foreach (var watcher in _persistenceWaiters)
9493
watcher.Tell(msg);

GridDomain.Node/Actors/SagaActor.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ private void ProcessSaga(object message)
8888
try
8989
{
9090
Saga.Transit(message);
91-
// SnapshotsPolicy.RefreshActivity();
9291
}
9392
catch (Exception ex)
9493
{
@@ -117,7 +116,11 @@ private void ProcessSagaCommands()
117116
private object[] ProcessSagaStateChange()
118117
{
119118
var stateChangeEvents = State.GetUncommittedEvents().Cast<object>().ToArray();
120-
PersistAll(stateChangeEvents, e => Publisher.Publish(e));
119+
PersistAll(stateChangeEvents,
120+
e => {
121+
Publisher.Publish(e);
122+
NotifyWatchers(new Persisted(e));
123+
});
121124
State.ClearUncommittedEvents();
122125
return stateChangeEvents;
123126
}

0 commit comments

Comments
 (0)