diff options
author | Valery Piashchynski <[email protected]> | 2021-07-12 09:06:20 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-12 09:06:20 +0300 |
commit | b924032fffc761a6d88264133745d783f0a58191 (patch) | |
tree | 80fd3cc3a053c4770747a2b9895ae3943654ba6d | |
parent | e82e9248bb1afd5e571f465ac79ac7f5f79b81f1 (diff) |
Implement Reset operation.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | plugins/jobs/plugin.go | 33 |
1 files changed, 33 insertions, 0 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 6dd55782..150c1d48 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -189,6 +189,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.log.Error("negatively acknowledge failed", "error", errNack) } p.log.Error("job marshal context", "error", err) + continue } exec := payload.Payload{ @@ -196,6 +197,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit Body: job.Body(), } + // protect from the pool reset + p.RLock() _, err = p.workersPool.Exec(exec) if err != nil { errNack := job.Nack() @@ -203,9 +206,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.log.Error("negatively acknowledge failed", "error", errNack) } + p.RUnlock() p.log.Error("job execute", "error", err) continue } + p.RUnlock() // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- atomic.AddUint64(&rate, 1) @@ -264,6 +269,33 @@ func (p *Plugin) Name() string { return PluginName } +func (p *Plugin) Reset() error { + p.Lock() + defer p.Unlock() + + const op = errors.Op("jobs_plugin_reset") + p.log.Info("JOBS plugin got restart request. Restarting...") + p.workersPool.Destroy(context.Background()) + p.workersPool = nil + + var err error + p.workersPool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{ + Debug: p.cfg.Pool.Debug, + NumWorkers: p.cfg.Pool.NumWorkers, + MaxJobs: p.cfg.Pool.MaxJobs, + AllocateTimeout: p.cfg.Pool.AllocateTimeout, + DestroyTimeout: p.cfg.Pool.DestroyTimeout, + Supervisor: p.cfg.Pool.Supervisor, + }, map[string]string{RrJobs: "true"}) + if err != nil { + return errors.E(op, err) + } + + p.log.Info("JOBS workers pool successfully restarted") + + return nil +} + func (p *Plugin) Push(j *structs.Job) error { const op = errors.Op("jobs_plugin_push") @@ -282,6 +314,7 @@ func (p *Plugin) Push(j *structs.Job) error { } // if job has no priority, inherit it from the pipeline + // TODO merge all options, not only priority if j.Options.Priority == 0 { j.Options.Priority = ppl.Priority() } |