summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go69
1 files changed, 66 insertions, 3 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index aab23f1c..86289aba 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -70,7 +70,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.cfg.InitDefaults()
p.server = server
+
p.events = events.NewEventsHandler()
+ p.events.AddListener(p.collectJobsEvents)
+
p.jobConstructors = make(map[string]jobs.Constructor)
p.consumers = make(map[string]jobs.Consumer)
p.consume = make(map[string]struct{})
@@ -117,6 +120,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// register initial pipelines
p.pipelines.Range(func(key, value interface{}) bool {
+ t := time.Now()
// pipeline name (ie test-local, sqs-aws, etc)
name := key.(string)
@@ -132,7 +136,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name)
// init the driver
- initializedDriver, err := c.JobsConstruct(configKey, p.queue)
+ initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue)
if err != nil {
errCh <- errors.E(op, err)
return false
@@ -155,9 +159,27 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh <- errors.E(op, err)
return false
}
+
+ p.events.Push(events.JobEvent{
+ Event: events.EventPipeRun,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Start: t,
+ Elapsed: t.Sub(t),
+ })
+
return true
}
+
+ return true
}
+ p.events.Push(events.JobEvent{
+ Event: events.EventDriverReady,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Start: t,
+ Elapsed: t.Sub(t),
+ })
return true
})
@@ -279,7 +301,7 @@ func (p *Plugin) Reset() error {
p.workersPool = nil
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"})
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}, p.collectJobsEvents)
if err != nil {
return errors.E(op, err)
}
@@ -404,7 +426,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
// we need here to initialize these drivers for the pipelines
if c, ok := p.jobConstructors[dr]; ok {
// init the driver from pipeline
- initializedDriver, err := c.FromPipeline(pipeline, p.queue)
+ initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue)
if err != nil {
return errors.E(op, err)
}
@@ -451,9 +473,50 @@ func (p *Plugin) Destroy(pp string) error {
return d.Stop()
}
+func (p *Plugin) List() []string {
+ out := make([]string, 0, 10)
+
+ p.pipelines.Range(func(key, _ interface{}) bool {
+ // we can safely convert value here as we know that we store keys as strings
+ out = append(out, key.(string))
+ return true
+ })
+
+ return out
+}
+
func (p *Plugin) RPC() interface{} {
return &rpc{
log: p.log,
p: p,
}
}
+
+func (p *Plugin) collectJobsEvents(event interface{}) {
+ if jev, ok := event.(events.JobEvent); ok {
+ switch jev.Event {
+ case events.EventJobStart:
+ p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventJobOK:
+ p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPushOK:
+ p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPushError:
+ p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventJobError:
+ p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPipeRun:
+ p.log.Info("pipeline started", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPipeActive:
+ p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPipeStopped:
+ p.log.Warn("pipeline stopped", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPipeError:
+ p.log.Error("pipeline error", "pipeline", jev.Pipeline, "error", jev.Error, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventDriverReady:
+ p.log.Info("driver ready", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventInitialized:
+ p.log.Info("driver initialized", "driver", jev.Driver, "start", jev.Start.UTC())
+ }
+ }
+}