diff options
Diffstat (limited to 'plugins/jobs/rpc.go')
-rw-r--r-- | plugins/jobs/rpc.go | 97 |
1 files changed, 51 insertions, 46 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 0d4cc099..6718b99a 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -19,45 +19,32 @@ List of the RPC methods: 3. Reset - managed by the Resetter plugin -4. Stop - stop pipeline processing -5. StopAll - stop all pipelines processing -6. Resume - resume pipeline processing -7. ResumeAll - resume stopped pipelines +4. Pause - pauses set of pipelines +5. Resume - resumes set of pipelines -8. Workers - managed by the Informer plugin. -9. Stat - jobs statistic +6. Workers - managed by the Informer plugin. +7. Stat - jobs statistic */ -func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { +func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.EmptyResponse) error { const op = errors.Op("jobs_rpc_push") // convert transport entity into domain // how we can do this quickly - jb := &structs.Job{ - Job: j.GetJob().Job, - Payload: j.GetJob().Payload, - Options: &structs.Options{ - Priority: &j.GetJob().Options.Priority, - ID: &j.GetJob().Options.Id, - Pipeline: j.GetJob().Options.Pipeline, - Delay: j.GetJob().Options.Delay, - Attempts: j.GetJob().Options.Attempts, - RetryDelay: j.GetJob().Options.RetryDelay, - Timeout: j.GetJob().Options.Timeout, - }, + + if j.GetJob().GetId() == "" { + return errors.E(op, errors.Str("empty ID field not allowed")) } - id, err := r.p.Push(jb) + err := r.p.Push(r.from(j.GetJob())) if err != nil { return errors.E(op, err) } - resp.Id = *id - return nil } -func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) error { +func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.EmptyResponse) error { const op = errors.Op("jobs_rpc_push") l := len(j.GetJobs()) @@ -67,24 +54,10 @@ func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) e for i := 0; i < l; i++ { // convert transport entity into domain // how we can do this quickly - jb := &structs.Job{ - Job: j.GetJobs()[i].Job, - Payload: j.GetJobs()[i].Payload, - Options: &structs.Options{ - Priority: &j.GetJobs()[i].Options.Priority, - ID: &j.GetJobs()[i].Options.Id, - Pipeline: j.GetJobs()[i].Options.Pipeline, - Delay: j.GetJobs()[i].Options.Delay, - Attempts: j.GetJobs()[i].Options.Attempts, - RetryDelay: j.GetJobs()[i].Options.RetryDelay, - Timeout: j.GetJobs()[i].Options.Timeout, - }, - } - - batch[i] = jb + batch[i] = r.from(j.GetJobs()[i]) } - _, err := r.p.PushBatch(batch) + err := r.p.PushBatch(batch) if err != nil { return errors.E(op, err) } @@ -92,18 +65,50 @@ func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) e return nil } -func (r *rpc) Stop(pipeline string, w *string) error { - return nil -} +func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { + pipelines := make([]string, len(req.GetPipelines())) + + for i := 0; i < len(pipelines); i++ { + pipelines[i] = req.GetPipelines()[i] + } -func (r *rpc) StopAll(_ bool, w *string) error { + r.p.Pause(pipelines) return nil } -func (r *rpc) Resume(pipeline string, w *string) error { +func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error { + pipelines := make([]string, len(req.GetPipelines())) + + for i := 0; i < len(pipelines); i++ { + pipelines[i] = req.GetPipelines()[i] + } + + r.p.Resume(pipelines) return nil } -func (r *rpc) ResumeAll(_ bool, w *string) error { - return nil +// from converts from transport entity to domain +func (r *rpc) from(j *jobsv1beta.Job) *structs.Job { + headers := map[string][]string{} + + for k, v := range j.GetHeaders() { + headers[k] = v.GetValue() + } + + jb := &structs.Job{ + Job: j.GetJob(), + Headers: headers, + Ident: j.GetId(), + Payload: j.GetPayload(), + Options: &structs.Options{ + Priority: j.GetOptions().GetPriority(), + Pipeline: j.GetOptions().GetPipeline(), + Delay: j.GetOptions().GetDelay(), + Attempts: j.GetOptions().GetAttempts(), + RetryDelay: j.GetOptions().GetRetryDelay(), + Timeout: j.GetOptions().GetTimeout(), + }, + } + + return jb } |