summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-25 21:56:26 +0300
committerValery Piashchynski <[email protected]>2020-11-25 21:56:26 +0300
commit1b0b033bcf4969e401b0c34b82171aa0b60b12cf (patch)
tree9cfe13b67613e69140bf29d320a1754cf2f7c9b7 /worker.go
parent2918cfca6d9579125257bfc9f5655537a63ec82a (diff)
Add mock logger, test for errors in log
Diffstat (limited to 'worker.go')
-rwxr-xr-xworker.go36
1 files changed, 21 insertions, 15 deletions
diff --git a/worker.go b/worker.go
index ef532f51..0f9b134d 100755
--- a/worker.go
+++ b/worker.go
@@ -26,16 +26,28 @@ const (
// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
const (
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError int64 = iota + 200
+ EventWorkerError Event = iota + 200
// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
EventWorkerLog
)
+type Event int64
+
+func (ev Event) String() string {
+ switch ev {
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ }
+ return "Unknown event type"
+}
+
// WorkerEvent wraps worker events.
type WorkerEvent struct {
// Event id, see below.
- Event int64
+ Event Event
// Worker triggered the event.
Worker WorkerBase
@@ -298,13 +310,11 @@ func (w *WorkerProcess) logCallback(log []byte) {
// thread safe errBuffer
type errBuffer struct {
- enable bool
- mu sync.RWMutex
- buf []byte
- last int
- wait *time.Timer
- // todo: remove update
- update chan interface{}
+ enable bool
+ mu sync.RWMutex
+ buf []byte
+ last int
+ wait *time.Timer
stop chan interface{}
logCallback func(log []byte)
}
@@ -312,7 +322,6 @@ type errBuffer struct {
func newErrBuffer(logCallback func(log []byte)) *errBuffer {
eb := &errBuffer{
buf: make([]byte, 0),
- update: make(chan interface{}),
wait: time.NewTimer(WaitDuration),
stop: make(chan interface{}),
logCallback: logCallback,
@@ -321,8 +330,6 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer {
go func(eb *errBuffer) {
for {
select {
- case <-eb.update:
- eb.wait.Reset(WaitDuration)
case <-eb.wait.C:
eb.mu.Lock()
if eb.enable && len(eb.buf) > eb.last {
@@ -362,10 +369,9 @@ func (eb *errBuffer) Len() int {
// needed. The return value n is the length of pool; errBuffer is always nil.
func (eb *errBuffer) Write(p []byte) (int, error) {
eb.mu.Lock()
- eb.buf = append(eb.buf, p...)
- eb.mu.Unlock()
- eb.update <- nil
+ defer eb.mu.Unlock()
+ eb.buf = append(eb.buf, p...)
return len(p), nil
}