summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-31 15:31:30 +0300
committerGitHub <[email protected]>2021-08-31 15:31:30 +0300
commit83e7bc6afbc2e523a95cf9dcb8b25cf5f7ba3f1e (patch)
tree884dd2991acf12826752632b8321410e7cc923ce /plugins/jobs/plugin.go
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
parent31cf040029eb0b26278e4a9948cbc1aba77ed58b (diff)
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go18
1 files changed, 17 insertions, 1 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 5e62c5c5..3f3fa196 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -177,8 +177,13 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return true
})
+ // do not continue processing, immediately stop if channel contains an error
+ if len(errCh) > 0 {
+ return errCh
+ }
+
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"})
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs})
if err != nil {
errCh <- err
return errCh
@@ -219,6 +224,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
if err != nil {
p.events.Push(events.JobEvent{
Event: events.EventJobError,
+ Error: err,
ID: jb.ID(),
Start: start,
Elapsed: time.Since(start),
@@ -243,6 +249,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.events.Push(events.JobEvent{
Event: events.EventJobError,
ID: jb.ID(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
})
@@ -266,6 +273,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.events.Push(events.JobEvent{
Event: events.EventJobError,
ID: jb.ID(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
})
@@ -279,6 +287,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Start: start,
Elapsed: time.Since(start),
})
+
+ continue
}
// handle the response protocol
@@ -288,6 +298,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Event: events.EventJobError,
ID: jb.ID(),
Start: start,
+ Error: err,
Elapsed: time.Since(start),
})
p.putPayload(exec)
@@ -307,6 +318,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Start: start,
Elapsed: time.Since(start),
})
+
// return payload
p.putPayload(exec)
}
@@ -343,6 +355,10 @@ func (p *Plugin) Stop() error {
// just wait pollers for 5 seconds before exit
time.Sleep(time.Second * 5)
+ p.Lock()
+ p.workersPool.Destroy(context.Background())
+ p.Unlock()
+
return nil
}