diff options
author | Valery Piashchynski <[email protected]> | 2021-07-27 12:39:01 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-27 12:39:01 +0300 |
commit | 1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (patch) | |
tree | 68c7c7e8d9f4d99debc4895ab8469e323c60f47b /plugins/jobs/plugin.go | |
parent | d72181126867c7e8fc05e5ac927bd90d01e0dbc7 (diff) |
Initial support for the cancellation via context
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 44 |
1 files changed, 33 insertions, 11 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 5779b368..d2d2ed9f 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -146,7 +146,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.consumers[name] = initializedDriver // register pipeline for the initialized driver - err = initializedDriver.Register(pipe) + err = initializedDriver.Register(context.Background(), pipe) if err != nil { errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name())) return false @@ -154,7 +154,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // if pipeline initialized to be consumed, call Run on it if _, ok := p.consume[name]; ok { - err = initializedDriver.Run(pipe) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() + err = initializedDriver.Run(ctx, pipe) if err != nil { errCh <- errors.E(op, err) return false @@ -265,11 +267,14 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit func (p *Plugin) Stop() error { for k, v := range p.consumers { - err := v.Stop() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + err := v.Stop(ctx) if err != nil { + cancel() p.log.Error("stop job driver", "driver", k) continue } + cancel() } // this function can block forever, but we don't care, because we might have a chance to exit from the pollers, @@ -347,11 +352,17 @@ func (p *Plugin) Push(j *job.Job) error { j.Options.Priority = ppl.Priority() } - err := d.Push(j) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() + + err := d.Push(ctx, j) if err != nil { + cancel() return errors.E(op, err) } + cancel() + return nil } @@ -377,10 +388,14 @@ func (p *Plugin) PushBatch(j []*job.Job) error { j[i].Options.Priority = ppl.Priority() } - err := d.Push(j[i]) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + err := d.Push(ctx, j[i]) if err != nil { + cancel() return errors.E(op, err) } + + cancel() } return nil @@ -400,9 +415,10 @@ func (p *Plugin) Pause(pp string) { p.log.Warn("driver for the pipeline not found", "pipeline", pp) return } - + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() // redirect call to the underlying driver - d.Pause(ppl.Name()) + d.Pause(ctx, ppl.Name()) } func (p *Plugin) Resume(pp string) { @@ -419,8 +435,10 @@ func (p *Plugin) Resume(pp string) { return } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() // redirect call to the underlying driver - d.Resume(ppl.Name()) + d.Resume(ctx, ppl.Name()) } // Declare a pipeline. @@ -445,7 +463,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { p.consumers[pipeline.Name()] = initializedDriver // register pipeline for the initialized driver - err = initializedDriver.Register(pipeline) + err = initializedDriver.Register(context.Background(), pipeline) if err != nil { return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name())) } @@ -453,7 +471,9 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // if pipeline initialized to be consumed, call Run on it // but likely for the dynamic pipelines it should be started manually if _, ok := p.consume[pipeline.Name()]; ok { - err = initializedDriver.Run(pipeline) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() + err = initializedDriver.Run(ctx, pipeline) if err != nil { return errors.E(op, err) } @@ -485,8 +505,10 @@ func (p *Plugin) Destroy(pp string) error { // delete consumer delete(p.consumers, ppl.Name()) p.pipelines.Delete(pp) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() - return d.Stop() + return d.Stop(ctx) } func (p *Plugin) List() []string { |