diff options
author | Valery Piashchynski <[email protected]> | 2021-08-31 15:31:30 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-08-31 15:31:30 +0300 |
commit | 83e7bc6afbc2e523a95cf9dcb8b25cf5f7ba3f1e (patch) | |
tree | 884dd2991acf12826752632b8321410e7cc923ce /plugins/jobs/plugin.go | |
parent | 0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff) | |
parent | 31cf040029eb0b26278e4a9948cbc1aba77ed58b (diff) |
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 18 |
1 files changed, 17 insertions, 1 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 5e62c5c5..3f3fa196 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -177,8 +177,13 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit return true }) + // do not continue processing, immediately stop if channel contains an error + if len(errCh) > 0 { + return errCh + } + var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"}) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}) if err != nil { errCh <- err return errCh @@ -219,6 +224,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit if err != nil { p.events.Push(events.JobEvent{ Event: events.EventJobError, + Error: err, ID: jb.ID(), Start: start, Elapsed: time.Since(start), @@ -243,6 +249,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.events.Push(events.JobEvent{ Event: events.EventJobError, ID: jb.ID(), + Error: err, Start: start, Elapsed: time.Since(start), }) @@ -266,6 +273,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.events.Push(events.JobEvent{ Event: events.EventJobError, ID: jb.ID(), + Error: err, Start: start, Elapsed: time.Since(start), }) @@ -279,6 +287,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit Start: start, Elapsed: time.Since(start), }) + + continue } // handle the response protocol @@ -288,6 +298,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit Event: events.EventJobError, ID: jb.ID(), Start: start, + Error: err, Elapsed: time.Since(start), }) p.putPayload(exec) @@ -307,6 +318,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit Start: start, Elapsed: time.Since(start), }) + // return payload p.putPayload(exec) } @@ -343,6 +355,10 @@ func (p *Plugin) Stop() error { // just wait pollers for 5 seconds before exit time.Sleep(time.Second * 5) + p.Lock() + p.workersPool.Destroy(context.Background()) + p.Unlock() + return nil } |