summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go40
1 files changed, 40 insertions, 0 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 7707cb8a..5f6c8b94 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -13,6 +13,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -303,6 +305,44 @@ func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) {
p.jobConstructors[name.Name()] = c
}
+func (p *Plugin) Workers() []*process.State {
+ p.RLock()
+ wrk := p.workersPool.Workers()
+ p.RUnlock()
+
+ ps := make([]*process.State, len(wrk))
+
+ for i := 0; i < len(wrk); i++ {
+ st, err := process.WorkerProcessState(wrk[i])
+ if err != nil {
+ p.log.Error("jobs workers state", "error", err)
+ return nil
+ }
+
+ ps[i] = st
+ }
+
+ return ps
+}
+
+func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) {
+ const op = errors.Op("jobs_plugin_drivers_state")
+ jst := make([]*jobState.State, 0, len(p.consumers))
+ for k := range p.consumers {
+ d := p.consumers[k]
+ newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout))
+ state, err := d.State(newCtx)
+ if err != nil {
+ cancel()
+ return nil, errors.E(op, err)
+ }
+
+ jst = append(jst, state)
+ cancel()
+ }
+ return jst, nil
+}
+
func (p *Plugin) Available() {}
func (p *Plugin) Name() string {