summaryrefslogtreecommitdiff
path: root/plugins/jobs/rpc.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/rpc.go')
-rw-r--r--plugins/jobs/rpc.go160
1 files changed, 0 insertions, 160 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
deleted file mode 100644
index d7b93bd1..00000000
--- a/plugins/jobs/rpc.go
+++ /dev/null
@@ -1,160 +0,0 @@
-package jobs
-
-import (
- "context"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
-)
-
-type rpc struct {
- log logger.Logger
- p *Plugin
-}
-
-func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("rpc_push")
-
- // convert transport entity into domain
- // how we can do this quickly
-
- if j.GetJob().GetId() == "" {
- return errors.E(op, errors.Str("empty ID field not allowed"))
- }
-
- err := r.p.Push(from(j.GetJob()))
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("rpc_push_batch")
-
- l := len(j.GetJobs())
-
- batch := make([]*job.Job, l)
-
- for i := 0; i < l; i++ {
- // convert transport entity into domain
- // how we can do this quickly
- batch[i] = from(j.GetJobs()[i])
- }
-
- err := r.p.PushBatch(batch)
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (r *rpc) Pause(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error {
- for i := 0; i < len(req.GetPipelines()); i++ {
- r.p.Pause(req.GetPipelines()[i])
- }
-
- return nil
-}
-
-func (r *rpc) Resume(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error {
- for i := 0; i < len(req.GetPipelines()); i++ {
- r.p.Resume(req.GetPipelines()[i])
- }
-
- return nil
-}
-
-func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error {
- resp.Pipelines = r.p.List()
- return nil
-}
-
-// Declare pipeline used to dynamically declare any type of the pipeline
-// Mandatory fields:
-// 1. Driver
-// 2. Pipeline name
-// 3. Options related to the particular pipeline
-func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("rpc_declare_pipeline")
- pipe := &pipeline.Pipeline{}
-
- for i := range req.GetPipeline() {
- (*pipe)[i] = req.GetPipeline()[i]
- }
-
- err := r.p.Declare(pipe)
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error {
- const op = errors.Op("rpc_declare_pipeline")
-
- var destroyed []string //nolint:prealloc
- for i := 0; i < len(req.GetPipelines()); i++ {
- err := r.p.Destroy(req.GetPipelines()[i])
- if err != nil {
- return errors.E(op, err)
- }
- destroyed = append(destroyed, req.GetPipelines()[i])
- }
-
- // return destroyed pipelines
- resp.Pipelines = destroyed
-
- return nil
-}
-
-func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
- const op = errors.Op("rpc_stats")
- state, err := r.p.JobsState(context.Background())
- if err != nil {
- return errors.E(op, err)
- }
-
- for i := 0; i < len(state); i++ {
- resp.Stats = append(resp.Stats, &jobsv1beta.Stat{
- Pipeline: state[i].Pipeline,
- Driver: state[i].Driver,
- Queue: state[i].Queue,
- Active: state[i].Active,
- Delayed: state[i].Delayed,
- Reserved: state[i].Reserved,
- Ready: state[i].Ready,
- })
- }
-
- return nil
-}
-
-// from converts from transport entity to domain
-func from(j *jobsv1beta.Job) *job.Job {
- headers := make(map[string][]string, len(j.GetHeaders()))
-
- for k, v := range j.GetHeaders() {
- headers[k] = v.GetValue()
- }
-
- jb := &job.Job{
- Job: j.GetJob(),
- Headers: headers,
- Ident: j.GetId(),
- Payload: j.GetPayload(),
- Options: &job.Options{
- Priority: j.GetOptions().GetPriority(),
- Pipeline: j.GetOptions().GetPipeline(),
- Delay: j.GetOptions().GetDelay(),
- },
- }
-
- return jb
-}