diff options
Diffstat (limited to 'plugins/jobs/rpc.go')
-rw-r--r-- | plugins/jobs/rpc.go | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go new file mode 100644 index 00000000..7f9859fb --- /dev/null +++ b/plugins/jobs/rpc.go @@ -0,0 +1,136 @@ +package jobs + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" +) + +type rpc struct { + log logger.Logger + p *Plugin +} + +func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { + const op = errors.Op("jobs_rpc_push") + + // convert transport entity into domain + // how we can do this quickly + + if j.GetJob().GetId() == "" { + return errors.E(op, errors.Str("empty ID field not allowed")) + } + + err := r.p.Push(r.from(j.GetJob())) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error { + const op = errors.Op("jobs_rpc_push") + + l := len(j.GetJobs()) + + batch := make([]*job.Job, l) + + for i := 0; i < l; i++ { + // convert transport entity into domain + // how we can do this quickly + batch[i] = r.from(j.GetJobs()[i]) + } + + err := r.p.PushBatch(batch) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (r *rpc) Pause(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error { + for i := 0; i < len(req.GetPipelines()); i++ { + r.p.Pause(req.GetPipelines()[i]) + } + + return nil +} + +func (r *rpc) Resume(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error { + for i := 0; i < len(req.GetPipelines()); i++ { + r.p.Resume(req.GetPipelines()[i]) + } + + return nil +} + +func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error { + resp.Pipelines = r.p.List() + return nil +} + +// Declare pipeline used to dynamically declare any type of the pipeline +// Mandatory fields: +// 1. Driver +// 2. Pipeline name +// 3. Options related to the particular pipeline +func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error { + const op = errors.Op("rcp_declare_pipeline") + pipe := &pipeline.Pipeline{} + + for i := range req.GetPipeline() { + (*pipe)[i] = req.GetPipeline()[i] + } + + err := r.p.Declare(pipe) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error { + const op = errors.Op("rcp_declare_pipeline") + + var destroyed []string //nolint:prealloc + for i := 0; i < len(req.GetPipelines()); i++ { + err := r.p.Destroy(req.GetPipelines()[i]) + if err != nil { + return errors.E(op, err) + } + destroyed = append(destroyed, req.GetPipelines()[i]) + } + + // return destroyed pipelines + resp.Pipelines = destroyed + + return nil +} + +// from converts from transport entity to domain +func (r *rpc) from(j *jobsv1beta.Job) *job.Job { + headers := map[string][]string{} + + for k, v := range j.GetHeaders() { + headers[k] = v.GetValue() + } + + jb := &job.Job{ + Job: j.GetJob(), + Headers: headers, + Ident: j.GetId(), + Payload: j.GetPayload(), + Options: &job.Options{ + Priority: j.GetOptions().GetPriority(), + Pipeline: j.GetOptions().GetPipeline(), + Delay: j.GetOptions().GetDelay(), + }, + } + + return jb +} |