@@ -76,18 +76,14 @@ func PourItemToBucket(
7676
7777 sigclosed := 0
7878 failed_sent := 0
79- attempts := 0
8079 start := time .Now ().UTC ()
8180
82- for {
83- attempts += 1
84- /* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/
85- if attempts % 100000 == 0 && start .Add (100 * time .Millisecond ).Before (time .Now ().UTC ()) {
86- holder .logger .Warningf ("stuck for %s sending event to %s (sigclosed:%d failed_sent:%d attempts:%d)" , time .Since (start ),
87- buckey , sigclosed , failed_sent , attempts )
88- }
81+ // Warn if we're stuck for too long trying to pour
82+ warnTicker := time .NewTicker (100 * time .Millisecond )
83+ defer warnTicker .Stop ()
8984
90- /* check if leak routine is up */
85+ for {
86+ // If bucket is dead, recreate and retry.
9187 select {
9288 case <- bucket .done :
9389 // the bucket was found and dead, get a new one and continue
@@ -100,6 +96,8 @@ func PourItemToBucket(
10096 }
10197 continue
10298 // holder.logger.Tracef("Signal exists, try to pour :)")
99+ case <- ctx .Done ():
100+ return ctx .Err ()
103101 default :
104102 // nothing to read, but not closed, try to pour
105103 // holder.logger.Tracef("Signal exists but empty, try to pour :)")
@@ -131,7 +129,9 @@ func PourItemToBucket(
131129 }
132130 }
133131 }
134- // the bucket seems to be up & running
132+
133+ // Block until we can send, or we learn it's dead/canceled, or we warn periodically.
134+
135135 select {
136136 case bucket .In <- parsed :
137137 // holder.logger.Tracef("Successfully sent !")
@@ -141,11 +141,23 @@ func PourItemToBucket(
141141 }
142142 holder .logger .Debugf ("bucket '%s' is poured" , holder .Spec .Name )
143143 return nil
144+ // XXX: bucket died while we were waiting to send.
145+ // case <- bucket.done:
146+
147+ case <- ctx .Done ():
148+ return ctx .Err ()
149+
150+ case <- warnTicker .C :
151+ // We are blocked because bucket.In isn't being read fast enough (or at all).
152+ holder .logger .Warningf (
153+ "stuck for %s sending event to %s (sigclosed:%d failed_sent:%d" ,
154+ time .Since (start ),
155+ buckey ,
156+ sigclosed ,
157+ failed_sent ,
158+ )
159+ failed_sent ++
144160 default :
145- failed_sent += 1
146- // holder.logger.Tracef("Failed to send, try again")
147- continue
148-
149161 }
150162 }
151163}
0 commit comments