summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-12 09:06:20 +0300
committerValery Piashchynski <[email protected]>2021-07-12 09:06:20 +0300
commitb924032fffc761a6d88264133745d783f0a58191 (patch)
tree80fd3cc3a053c4770747a2b9895ae3943654ba6d
parente82e9248bb1afd5e571f465ac79ac7f5f79b81f1 (diff)
Implement Reset operation.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--plugins/jobs/plugin.go33
1 files changed, 33 insertions, 0 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 6dd55782..150c1d48 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -189,6 +189,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.log.Error("negatively acknowledge failed", "error", errNack)
}
p.log.Error("job marshal context", "error", err)
+ continue
}
exec := payload.Payload{
@@ -196,6 +197,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Body: job.Body(),
}
+ // protect from the pool reset
+ p.RLock()
_, err = p.workersPool.Exec(exec)
if err != nil {
errNack := job.Nack()
@@ -203,9 +206,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.log.Error("negatively acknowledge failed", "error", errNack)
}
+ p.RUnlock()
p.log.Error("job execute", "error", err)
continue
}
+ p.RUnlock()
// TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
atomic.AddUint64(&rate, 1)
@@ -264,6 +269,33 @@ func (p *Plugin) Name() string {
return PluginName
}
+func (p *Plugin) Reset() error {
+ p.Lock()
+ defer p.Unlock()
+
+ const op = errors.Op("jobs_plugin_reset")
+ p.log.Info("JOBS plugin got restart request. Restarting...")
+ p.workersPool.Destroy(context.Background())
+ p.workersPool = nil
+
+ var err error
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{
+ Debug: p.cfg.Pool.Debug,
+ NumWorkers: p.cfg.Pool.NumWorkers,
+ MaxJobs: p.cfg.Pool.MaxJobs,
+ AllocateTimeout: p.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: p.cfg.Pool.DestroyTimeout,
+ Supervisor: p.cfg.Pool.Supervisor,
+ }, map[string]string{RrJobs: "true"})
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.log.Info("JOBS workers pool successfully restarted")
+
+ return nil
+}
+
func (p *Plugin) Push(j *structs.Job) error {
const op = errors.Op("jobs_plugin_push")
@@ -282,6 +314,7 @@ func (p *Plugin) Push(j *structs.Job) error {
}
// if job has no priority, inherit it from the pipeline
+ // TODO merge all options, not only priority
if j.Options.Priority == 0 {
j.Options.Priority = ppl.Priority()
}