diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 69 |
1 files changed, 66 insertions, 3 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index aab23f1c..86289aba 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -70,7 +70,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.cfg.InitDefaults() p.server = server + p.events = events.NewEventsHandler() + p.events.AddListener(p.collectJobsEvents) + p.jobConstructors = make(map[string]jobs.Constructor) p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) @@ -117,6 +120,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // register initial pipelines p.pipelines.Range(func(key, value interface{}) bool { + t := time.Now() // pipeline name (ie test-local, sqs-aws, etc) name := key.(string) @@ -132,7 +136,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) // init the driver - initializedDriver, err := c.JobsConstruct(configKey, p.queue) + initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue) if err != nil { errCh <- errors.E(op, err) return false @@ -155,9 +159,27 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit errCh <- errors.E(op, err) return false } + + p.events.Push(events.JobEvent{ + Event: events.EventPipeRun, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Start: t, + Elapsed: t.Sub(t), + }) + return true } + + return true } + p.events.Push(events.JobEvent{ + Event: events.EventDriverReady, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Start: t, + Elapsed: t.Sub(t), + }) return true }) @@ -279,7 +301,7 @@ func (p *Plugin) Reset() error { p.workersPool = nil var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}, p.collectJobsEvents) if err != nil { return errors.E(op, err) } @@ -404,7 +426,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // we need here to initialize these drivers for the pipelines if c, ok := p.jobConstructors[dr]; ok { // init the driver from pipeline - initializedDriver, err := c.FromPipeline(pipeline, p.queue) + initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue) if err != nil { return errors.E(op, err) } @@ -451,9 +473,50 @@ func (p *Plugin) Destroy(pp string) error { return d.Stop() } +func (p *Plugin) List() []string { + out := make([]string, 0, 10) + + p.pipelines.Range(func(key, _ interface{}) bool { + // we can safely convert value here as we know that we store keys as strings + out = append(out, key.(string)) + return true + }) + + return out +} + func (p *Plugin) RPC() interface{} { return &rpc{ log: p.log, p: p, } } + +func (p *Plugin) collectJobsEvents(event interface{}) { + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { + case events.EventJobStart: + p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventJobOK: + p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPushOK: + p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPushError: + p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventJobError: + p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPipeRun: + p.log.Info("pipeline started", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPipeActive: + p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPipeStopped: + p.log.Warn("pipeline stopped", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPipeError: + p.log.Error("pipeline error", "pipeline", jev.Pipeline, "error", jev.Error, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventDriverReady: + p.log.Info("driver ready", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventInitialized: + p.log.Info("driver initialized", "driver", jev.Driver, "start", jev.Start.UTC()) + } + } +} |