-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnode_driver.go
More file actions
130 lines (110 loc) · 3.69 KB
/
Copy pathnode_driver.go
File metadata and controls
130 lines (110 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package paxos
import (
"context"
"time"
)
// nodeDriver is a thin wrapper around the deterministic protocol logic in protocol_node.go
// this wrapper is responsible for running the protocol logic in a single goroutine, and for performing the side effects that the protocol logic decides on (sending messages, setting timers, etc)
// to reason better about the flow:
/*
- nodeDriver is a loop waiting for events that concern the node (proposals, messages, retry timer firing)
- when an event is received (say, by the 'network'), it is passed to the node protocol logic (node.Handle(event)), which (potentially) returns actions (for the driver to act upon and perform side affects) and traces (to be displayed in CLI, e.g. when a proposer gives up on its round).
Example:
- nodeDriver receives a messageReceived event that is a Prepare request from a proposer
- node.Handle(messageReceived) returns actions, including sendMessage to the proposer (a promise in response to the prepare)
- nodeDriver performs the sendMessage action by calling transport.Send(...)
*/
type nodeDriver struct {
protocol *node // the brain that will result in nodeActions, pure and deterministic
ctx context.Context
events chan nodeEvent // messageReceived, proposalTriggered, retryTimerFired, etc - events that the node protocol logic needs to react to
transport network
traces chan<- TraceEntry // a channel to report protocol traces to the outside world
retryTimer retryTimer
}
type TraceEntry struct {
NodeID int
Event TraceEvent
}
func newNodeDriver(protocol *node, transport network, traces chan<- TraceEntry) *nodeDriver {
return &nodeDriver{
protocol: protocol,
events: make(chan nodeEvent, 64),
transport: transport,
traces: traces,
}
}
func (d *nodeDriver) startLoop(ctx context.Context) {
d.ctx = ctx
go func() {
defer d.retryTimer.stop()
for {
select {
case <-d.ctx.Done():
return
case event := <-d.events:
output := d.protocol.Handle(event)
d.applyActions(output.actions)
d.reportTraces(output.traces)
}
}
}()
}
func (d *nodeDriver) applyActions(actions []nodeAction) {
for _, action := range actions {
switch action := action.(type) {
case sendMessage:
d.transport.Send(d.ctx, d.protocol.id, action.to, action.msg)
case resetRetryTimer:
d.resetRetryTimer(action.ballot, action.after)
case stopRetryTimer:
d.retryTimer.stop()
}
}
}
func (d *nodeDriver) reportTraces(traces []TraceEvent) {
for _, event := range traces {
select {
case <-d.ctx.Done():
return
case d.traces <- TraceEntry{
NodeID: d.protocol.id,
Event: event,
}:
}
}
}
// this is called by the outside world to trigger a new proposal on this node (if it's a proposer)
func (d *nodeDriver) propose(value string) {
d.enqueue(proposalTriggered{value: value})
}
// called by the transport, this is the transport's entry point for message delivery
func (d *nodeDriver) receive(from int, msg Message) {
d.enqueue(messageReceived{from: from, msg: msg})
}
func (d *nodeDriver) enqueue(event nodeEvent) {
select {
case <-d.ctx.Done():
case d.events <- event:
}
}
func (d *nodeDriver) resetRetryTimer(ballot Ballot, after time.Duration) {
// note: in case of races, stale callbacks are harmless because the protocol logic is inherently designed to handle them correctly (late promises etc)
d.retryTimer.reset(after, func() {
d.enqueue(retryTimerFired{ballot: ballot})
})
}
// thin wrapper for readability, could be deleted
type retryTimer struct {
timer *time.Timer
}
func (t *retryTimer) reset(after time.Duration, fire func()) {
t.stop()
t.timer = time.AfterFunc(after, fire)
}
func (t *retryTimer) stop() {
if t.timer != nil {
t.timer.Stop()
t.timer = nil
}
}