summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/job/job.go11
-rw-r--r--plugins/jobs/job/job_test.go27
-rw-r--r--plugins/jobs/plugin.go96
-rw-r--r--plugins/jobs/rpc.go8
4 files changed, 59 insertions, 83 deletions
diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go
index 06c3254e..adab2a0a 100644
--- a/plugins/jobs/job/job.go
+++ b/plugins/jobs/job/job.go
@@ -45,17 +45,6 @@ type Options struct {
Delay int64 `json:"delay,omitempty"`
}
-// Merge merges job options.
-func (o *Options) Merge(from *Options) {
- if o.Pipeline == "" {
- o.Pipeline = from.Pipeline
- }
-
- if o.Delay == 0 {
- o.Delay = from.Delay
- }
-}
-
// DelayDuration returns delay duration in a form of time.Duration.
func (o *Options) DelayDuration() time.Duration {
return time.Second * time.Duration(o.Delay)
diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go
index a47151a3..4a95e27d 100644
--- a/plugins/jobs/job/job_test.go
+++ b/plugins/jobs/job/job_test.go
@@ -16,30 +16,3 @@ func TestOptions_DelayDuration2(t *testing.T) {
opts := &Options{Delay: 1}
assert.Equal(t, time.Second, opts.DelayDuration())
}
-
-func TestOptions_Merge(t *testing.T) {
- opts := &Options{}
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- })
-
- assert.Equal(t, "pipeline", opts.Pipeline)
- assert.Equal(t, int64(2), opts.Delay)
-}
-
-func TestOptions_MergeKeepOriginal(t *testing.T) {
- opts := &Options{
- Pipeline: "default",
- Delay: 10,
- }
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- })
-
- assert.Equal(t, "default", opts.Pipeline)
- assert.Equal(t, int64(10), opts.Delay)
-}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 3f3fa196..3aec6acc 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -41,7 +41,7 @@ type Plugin struct {
server server.Server
jobConstructors map[string]jobs.Constructor
- consumers map[string]jobs.Consumer
+ consumers sync.Map // map[string]jobs.Consumer
// events handler
events events.Handler
@@ -82,7 +82,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.events.AddListener(p.collectJobsEvents)
p.jobConstructors = make(map[string]jobs.Constructor)
- p.consumers = make(map[string]jobs.Consumer)
p.consume = make(map[string]struct{})
p.stopCh = make(chan struct{}, 1)
@@ -130,19 +129,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// jobConstructors contains constructors for the drivers
// we need here to initialize these drivers for the pipelines
- if c, ok := p.jobConstructors[dr]; ok {
+ if _, ok := p.jobConstructors[dr]; ok {
// config key for the particular sub-driver jobs.pipelines.test-local
configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name)
// init the driver
- initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue)
+ initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue)
if err != nil {
errCh <- errors.E(op, err)
return false
}
// add driver to the set of the consumers (name - pipeline name, value - associated driver)
- p.consumers[name] = initializedDriver
+ p.consumers.Store(name, initializedDriver)
// register pipeline for the initialized driver
err = initializedDriver.Register(context.Background(), pipe)
@@ -331,16 +330,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
func (p *Plugin) Stop() error {
- for k, v := range p.consumers {
+ // range over all consumers and call stop
+ p.consumers.Range(func(key, value interface{}) bool {
+ consumer := value.(jobs.Consumer)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := v.Stop(ctx)
+ err := consumer.Stop(ctx)
if err != nil {
cancel()
- p.log.Error("stop job driver", "driver", k)
- continue
+ p.log.Error("stop job driver", "driver", key)
+ return true
}
cancel()
- }
+ return true
+ })
// this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
// but if not, this is not a problem at all.
@@ -394,18 +396,26 @@ func (p *Plugin) Workers() []*process.State {
func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) {
const op = errors.Op("jobs_plugin_drivers_state")
- jst := make([]*jobState.State, 0, len(p.consumers))
- for k := range p.consumers {
- d := p.consumers[k]
+ jst := make([]*jobState.State, 0, 2)
+ var err error
+ p.consumers.Range(func(key, value interface{}) bool {
+ consumer := value.(jobs.Consumer)
newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout))
- state, err := d.State(newCtx)
+
+ var state *jobState.State
+ state, err = consumer.State(newCtx)
if err != nil {
cancel()
- return nil, errors.E(op, err)
+ return false
}
jst = append(jst, state)
cancel()
+ return true
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
}
return jst, nil
}
@@ -449,13 +459,12 @@ func (p *Plugin) Push(j *job.Job) error {
// type conversion
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
// if job has no priority, inherit it from the pipeline
- // TODO(rustatian) merge all options, not only priority
if j.Options.Priority == 0 {
j.Options.Priority = ppl.Priority()
}
@@ -463,16 +472,16 @@ func (p *Plugin) Push(j *job.Job) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
- err := d.Push(ctx, j)
+ err := d.(jobs.Consumer).Push(ctx, j)
if err != nil {
p.events.Push(events.JobEvent{
Event: events.EventPushError,
ID: j.Ident,
Pipeline: ppl.Name(),
Driver: ppl.Driver(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
- Error: err,
})
return errors.E(op, err)
}
@@ -482,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error {
ID: j.Ident,
Pipeline: ppl.Name(),
Driver: ppl.Driver(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
- Error: err,
})
return nil
@@ -492,9 +501,9 @@ func (p *Plugin) Push(j *job.Job) error {
func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
+ start := time.Now()
for i := 0; i < len(j); i++ {
- start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
if !ok {
@@ -503,7 +512,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
@@ -514,7 +523,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := d.Push(ctx, j[i])
+ err := d.(jobs.Consumer).Push(ctx, j[i])
if err != nil {
cancel()
p.events.Push(events.JobEvent{
@@ -544,7 +553,7 @@ func (p *Plugin) Pause(pp string) {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
p.log.Warn("driver for the pipeline not found", "pipeline", pp)
return
@@ -552,7 +561,7 @@ func (p *Plugin) Pause(pp string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
// redirect call to the underlying driver
- d.Pause(ctx, ppl.Name())
+ d.(jobs.Consumer).Pause(ctx, ppl.Name())
}
func (p *Plugin) Resume(pp string) {
@@ -563,7 +572,7 @@ func (p *Plugin) Resume(pp string) {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
p.log.Warn("driver for the pipeline not found", "pipeline", pp)
return
@@ -572,7 +581,7 @@ func (p *Plugin) Resume(pp string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
// redirect call to the underlying driver
- d.Resume(ctx, ppl.Name())
+ d.(jobs.Consumer).Resume(ctx, ppl.Name())
}
// Declare a pipeline.
@@ -586,16 +595,13 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
// jobConstructors contains constructors for the drivers
// we need here to initialize these drivers for the pipelines
- if c, ok := p.jobConstructors[dr]; ok {
+ if _, ok := p.jobConstructors[dr]; ok {
// init the driver from pipeline
- initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue)
+ initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue)
if err != nil {
return errors.E(op, err)
}
- // add driver to the set of the consumers (name - pipeline name, value - associated driver)
- p.consumers[pipeline.Name()] = initializedDriver
-
// register pipeline for the initialized driver
err = initializedDriver.Register(context.Background(), pipeline)
if err != nil {
@@ -612,10 +618,12 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
return errors.E(op, err)
}
}
- }
- // save the pipeline
- p.pipelines.Store(pipeline.Name(), pipeline)
+ // add driver to the set of the consumers (name - pipeline name, value - associated driver)
+ p.consumers.Store(pipeline.Name(), initializedDriver)
+ // save the pipeline
+ p.pipelines.Store(pipeline.Name(), pipeline)
+ }
return nil
}
@@ -631,18 +639,24 @@ func (p *Plugin) Destroy(pp string) error {
// type conversion
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ // delete consumer
+ d, ok := p.consumers.LoadAndDelete(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
- // delete consumer
- delete(p.consumers, ppl.Name())
- p.pipelines.Delete(pp)
+ // delete old pipeline
+ p.pipelines.LoadAndDelete(pp)
+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- defer cancel()
+ err := d.(jobs.Consumer).Stop(ctx)
+ if err != nil {
+ cancel()
+ return errors.E(op, err)
+ }
- return d.Stop(ctx)
+ cancel()
+ return nil
}
func (p *Plugin) List() []string {
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 94f903d5..d7b93bd1 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -25,7 +25,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
return errors.E(op, errors.Str("empty ID field not allowed"))
}
- err := r.p.Push(r.from(j.GetJob()))
+ err := r.p.Push(from(j.GetJob()))
if err != nil {
return errors.E(op, err)
}
@@ -43,7 +43,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err
for i := 0; i < l; i++ {
// convert transport entity into domain
// how we can do this quickly
- batch[i] = r.from(j.GetJobs()[i])
+ batch[i] = from(j.GetJobs()[i])
}
err := r.p.PushBatch(batch)
@@ -137,8 +137,8 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
}
// from converts from transport entity to domain
-func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
- headers := map[string][]string{}
+func from(j *jobsv1beta.Job) *job.Job {
+ headers := make(map[string][]string, len(j.GetHeaders()))
for k, v := range j.GetHeaders() {
headers[k] = v.GetValue()