summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-27 12:39:01 +0300
committerValery Piashchynski <[email protected]>2021-07-27 12:39:01 +0300
commit1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (patch)
tree68c7c7e8d9f4d99debc4895ab8469e323c60f47b /plugins/jobs/plugin.go
parentd72181126867c7e8fc05e5ac927bd90d01e0dbc7 (diff)
Initial support for the cancellation via context
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go44
1 files changed, 33 insertions, 11 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 5779b368..d2d2ed9f 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -146,7 +146,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.consumers[name] = initializedDriver
// register pipeline for the initialized driver
- err = initializedDriver.Register(pipe)
+ err = initializedDriver.Register(context.Background(), pipe)
if err != nil {
errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name()))
return false
@@ -154,7 +154,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// if pipeline initialized to be consumed, call Run on it
if _, ok := p.consume[name]; ok {
- err = initializedDriver.Run(pipe)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
+ err = initializedDriver.Run(ctx, pipe)
if err != nil {
errCh <- errors.E(op, err)
return false
@@ -265,11 +267,14 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
func (p *Plugin) Stop() error {
for k, v := range p.consumers {
- err := v.Stop()
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ err := v.Stop(ctx)
if err != nil {
+ cancel()
p.log.Error("stop job driver", "driver", k)
continue
}
+ cancel()
}
// this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
@@ -347,11 +352,17 @@ func (p *Plugin) Push(j *job.Job) error {
j.Options.Priority = ppl.Priority()
}
- err := d.Push(j)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
+
+ err := d.Push(ctx, j)
if err != nil {
+ cancel()
return errors.E(op, err)
}
+ cancel()
+
return nil
}
@@ -377,10 +388,14 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
j[i].Options.Priority = ppl.Priority()
}
- err := d.Push(j[i])
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ err := d.Push(ctx, j[i])
if err != nil {
+ cancel()
return errors.E(op, err)
}
+
+ cancel()
}
return nil
@@ -400,9 +415,10 @@ func (p *Plugin) Pause(pp string) {
p.log.Warn("driver for the pipeline not found", "pipeline", pp)
return
}
-
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
// redirect call to the underlying driver
- d.Pause(ppl.Name())
+ d.Pause(ctx, ppl.Name())
}
func (p *Plugin) Resume(pp string) {
@@ -419,8 +435,10 @@ func (p *Plugin) Resume(pp string) {
return
}
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
// redirect call to the underlying driver
- d.Resume(ppl.Name())
+ d.Resume(ctx, ppl.Name())
}
// Declare a pipeline.
@@ -445,7 +463,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
p.consumers[pipeline.Name()] = initializedDriver
// register pipeline for the initialized driver
- err = initializedDriver.Register(pipeline)
+ err = initializedDriver.Register(context.Background(), pipeline)
if err != nil {
return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name()))
}
@@ -453,7 +471,9 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
// if pipeline initialized to be consumed, call Run on it
// but likely for the dynamic pipelines it should be started manually
if _, ok := p.consume[pipeline.Name()]; ok {
- err = initializedDriver.Run(pipeline)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
+ err = initializedDriver.Run(ctx, pipeline)
if err != nil {
return errors.E(op, err)
}
@@ -485,8 +505,10 @@ func (p *Plugin) Destroy(pp string) error {
// delete consumer
delete(p.consumers, ppl.Name())
p.pipelines.Delete(pp)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
- return d.Stop()
+ return d.Stop(ctx)
}
func (p *Plugin) List() []string {