-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocessor.go
More file actions
165 lines (137 loc) · 3.05 KB
/
processor.go
File metadata and controls
165 lines (137 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package log
import (
"fmt"
"sync"
"sync/atomic"
)
const buffSize = 1024
// processor handles message processing and writing in either blocking or non-blocking mode.
// In non-blocking mode, it uses a buffered channel and background goroutine.
type processor struct {
mode WriteMode
buf chan *message
wg sync.WaitGroup
closed atomic.Bool
droppedCount atomic.Uint64
}
// NewProcessor creates a new processor with the specified write mode.
// In non-blocking mode, starts a background goroutine for processing.
func NewProcessor(mode WriteMode) *processor {
p := &processor{
mode: mode,
}
if mode == ModeNonBlocking {
p.run()
}
return p
}
func (p *processor) run() {
p.buf = make(chan *message, buffSize)
p.wg.Add(1)
go func() {
defer p.wg.Done()
for msg := range p.buf {
if msg != nil {
p.write(msg)
}
}
}()
}
func (p *processor) Do(m *message) {
switch p.mode {
case ModeBlocking:
p.write(m)
case ModeNonBlocking:
p.addToBuf(m)
}
if m.level == Fatal {
panic(m.text)
}
}
func (p *processor) addToBuf(m *message) {
if m == nil {
return
}
defer func() {
if recover() != nil {
fmt.Println("log: error writing to buffer")
}
}()
select {
case p.buf <- m:
// Successfully added to buffer
default:
// Buffer is full, drop the message and increment counter
dropped := p.droppedCount.Add(1)
// Log warning every 100 dropped messages to avoid spam
if dropped%100 == 1 {
// Safely write warning to first available writer
if len(m.super.global.writers) > 0 {
fmt.Fprintf(m.super.global.writers[0].writer,
"log: WARNING - buffer overflow, %d messages dropped\n", dropped)
}
}
}
}
func (p *processor) write(m *message) {
v, ok := levels[m.level]
if !ok {
return
}
logger := m.super
if v < int(logger.local.level.Load()) {
return
}
if len(m.text) == 0 {
return
}
for _, w := range logger.global.writers {
str, err := formatter.Get(&formatterProps{m, w.format})
if err != nil {
continue
}
p.writeByMode(w, str)
}
for _, w := range logger.local.writers {
str, err := formatter.Get(&formatterProps{m, w.format})
if err != nil {
continue
}
p.writeByMode(w, str)
}
}
func (p *processor) writeByMode(w *writer, msg string) {
switch p.mode {
case ModeBlocking:
p.writeSync(w, msg)
case ModeNonBlocking:
p.writeAsync(w, msg)
}
}
func (p *processor) writeAsync(w *writer, msg string) {
w.mu.Lock()
fmt.Fprintln(w.writer, msg)
w.mu.Unlock()
}
func (p *processor) writeSync(w *writer, msg string) {
w.mu.Lock()
fmt.Fprintln(w.writer, msg)
w.mu.Unlock()
}
// Shutdown gracefully shuts down the processor, draining the buffer
func (p *processor) Shutdown() error {
if p.mode != ModeNonBlocking {
// Nothing to do for blocking mode
return nil
}
if !p.closed.CompareAndSwap(false, true) {
return fmt.Errorf("processor already closed")
}
close(p.buf)
p.wg.Wait()
return nil
}
// DroppedCount returns the number of messages dropped due to buffer overflow
func (p *processor) DroppedCount() uint64 {
return p.droppedCount.Load()
}