diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 7707cb8a..5f6c8b94 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -13,6 +13,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -303,6 +305,44 @@ func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) { p.jobConstructors[name.Name()] = c } +func (p *Plugin) Workers() []*process.State { + p.RLock() + wrk := p.workersPool.Workers() + p.RUnlock() + + ps := make([]*process.State, len(wrk)) + + for i := 0; i < len(wrk); i++ { + st, err := process.WorkerProcessState(wrk[i]) + if err != nil { + p.log.Error("jobs workers state", "error", err) + return nil + } + + ps[i] = st + } + + return ps +} + +func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) { + const op = errors.Op("jobs_plugin_drivers_state") + jst := make([]*jobState.State, 0, len(p.consumers)) + for k := range p.consumers { + d := p.consumers[k] + newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout)) + state, err := d.State(newCtx) + if err != nil { + cancel() + return nil, errors.E(op, err) + } + + jst = append(jst, state) + cancel() + } + return jst, nil +} + func (p *Plugin) Available() {} func (p *Plugin) Name() string { |