summaryrefslogtreecommitdiff
path: root/plugins/jobs/rpc.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/rpc.go')
-rw-r--r--plugins/jobs/rpc.go32
1 files changed, 28 insertions, 4 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 7f9859fb..94f903d5 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -1,6 +1,8 @@
package jobs
import (
+ "context"
+
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -14,7 +16,7 @@ type rpc struct {
}
func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("jobs_rpc_push")
+ const op = errors.Op("rpc_push")
// convert transport entity into domain
// how we can do this quickly
@@ -32,7 +34,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
}
func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("jobs_rpc_push")
+ const op = errors.Op("rpc_push_batch")
l := len(j.GetJobs())
@@ -79,7 +81,7 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error {
// 2. Pipeline name
// 3. Options related to the particular pipeline
func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("rcp_declare_pipeline")
+ const op = errors.Op("rpc_declare_pipeline")
pipe := &pipeline.Pipeline{}
for i := range req.GetPipeline() {
@@ -95,7 +97,7 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error
}
func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error {
- const op = errors.Op("rcp_declare_pipeline")
+ const op = errors.Op("rpc_declare_pipeline")
var destroyed []string //nolint:prealloc
for i := 0; i < len(req.GetPipelines()); i++ {
@@ -112,6 +114,28 @@ func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) err
return nil
}
+func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
+ const op = errors.Op("rpc_stats")
+ state, err := r.p.JobsState(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ for i := 0; i < len(state); i++ {
+ resp.Stats = append(resp.Stats, &jobsv1beta.Stat{
+ Pipeline: state[i].Pipeline,
+ Driver: state[i].Driver,
+ Queue: state[i].Queue,
+ Active: state[i].Active,
+ Delayed: state[i].Delayed,
+ Reserved: state[i].Reserved,
+ Ready: state[i].Ready,
+ })
+ }
+
+ return nil
+}
+
// from converts from transport entity to domain
func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
headers := map[string][]string{}