diff options
Diffstat (limited to 'plugins/jobs/rpc.go')
-rw-r--r-- | plugins/jobs/rpc.go | 32 |
1 files changed, 28 insertions, 4 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 7f9859fb..94f903d5 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,6 +1,8 @@ package jobs import ( + "context" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -14,7 +16,7 @@ type rpc struct { } func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("jobs_rpc_push") + const op = errors.Op("rpc_push") // convert transport entity into domain // how we can do this quickly @@ -32,7 +34,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { } func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("jobs_rpc_push") + const op = errors.Op("rpc_push_batch") l := len(j.GetJobs()) @@ -79,7 +81,7 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error { // 2. Pipeline name // 3. Options related to the particular pipeline func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error { - const op = errors.Op("rcp_declare_pipeline") + const op = errors.Op("rpc_declare_pipeline") pipe := &pipeline.Pipeline{} for i := range req.GetPipeline() { @@ -95,7 +97,7 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error } func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error { - const op = errors.Op("rcp_declare_pipeline") + const op = errors.Op("rpc_declare_pipeline") var destroyed []string //nolint:prealloc for i := 0; i < len(req.GetPipelines()); i++ { @@ -112,6 +114,28 @@ func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) err return nil } +func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { + const op = errors.Op("rpc_stats") + state, err := r.p.JobsState(context.Background()) + if err != nil { + return errors.E(op, err) + } + + for i := 0; i < len(state); i++ { + resp.Stats = append(resp.Stats, &jobsv1beta.Stat{ + Pipeline: state[i].Pipeline, + Driver: state[i].Driver, + Queue: state[i].Queue, + Active: state[i].Active, + Delayed: state[i].Delayed, + Reserved: state[i].Reserved, + Ready: state[i].Ready, + }) + } + + return nil +} + // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { headers := map[string][]string{} |