summaryrefslogtreecommitdiff
path: root/plugins/jobs/rpc.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/rpc.go')
-rw-r--r--plugins/jobs/rpc.go97
1 files changed, 51 insertions, 46 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 0d4cc099..6718b99a 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -19,45 +19,32 @@ List of the RPC methods:
3. Reset - managed by the Resetter plugin
-4. Stop - stop pipeline processing
-5. StopAll - stop all pipelines processing
-6. Resume - resume pipeline processing
-7. ResumeAll - resume stopped pipelines
+4. Pause - pauses set of pipelines
+5. Resume - resumes set of pipelines
-8. Workers - managed by the Informer plugin.
-9. Stat - jobs statistic
+6. Workers - managed by the Informer plugin.
+7. Stat - jobs statistic
*/
-func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error {
+func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.EmptyResponse) error {
const op = errors.Op("jobs_rpc_push")
// convert transport entity into domain
// how we can do this quickly
- jb := &structs.Job{
- Job: j.GetJob().Job,
- Payload: j.GetJob().Payload,
- Options: &structs.Options{
- Priority: &j.GetJob().Options.Priority,
- ID: &j.GetJob().Options.Id,
- Pipeline: j.GetJob().Options.Pipeline,
- Delay: j.GetJob().Options.Delay,
- Attempts: j.GetJob().Options.Attempts,
- RetryDelay: j.GetJob().Options.RetryDelay,
- Timeout: j.GetJob().Options.Timeout,
- },
+
+ if j.GetJob().GetId() == "" {
+ return errors.E(op, errors.Str("empty ID field not allowed"))
}
- id, err := r.p.Push(jb)
+ err := r.p.Push(r.from(j.GetJob()))
if err != nil {
return errors.E(op, err)
}
- resp.Id = *id
-
return nil
}
-func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) error {
+func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.EmptyResponse) error {
const op = errors.Op("jobs_rpc_push")
l := len(j.GetJobs())
@@ -67,24 +54,10 @@ func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) e
for i := 0; i < l; i++ {
// convert transport entity into domain
// how we can do this quickly
- jb := &structs.Job{
- Job: j.GetJobs()[i].Job,
- Payload: j.GetJobs()[i].Payload,
- Options: &structs.Options{
- Priority: &j.GetJobs()[i].Options.Priority,
- ID: &j.GetJobs()[i].Options.Id,
- Pipeline: j.GetJobs()[i].Options.Pipeline,
- Delay: j.GetJobs()[i].Options.Delay,
- Attempts: j.GetJobs()[i].Options.Attempts,
- RetryDelay: j.GetJobs()[i].Options.RetryDelay,
- Timeout: j.GetJobs()[i].Options.Timeout,
- },
- }
-
- batch[i] = jb
+ batch[i] = r.from(j.GetJobs()[i])
}
- _, err := r.p.PushBatch(batch)
+ err := r.p.PushBatch(batch)
if err != nil {
return errors.E(op, err)
}
@@ -92,18 +65,50 @@ func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) e
return nil
}
-func (r *rpc) Stop(pipeline string, w *string) error {
- return nil
-}
+func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error {
+ pipelines := make([]string, len(req.GetPipelines()))
+
+ for i := 0; i < len(pipelines); i++ {
+ pipelines[i] = req.GetPipelines()[i]
+ }
-func (r *rpc) StopAll(_ bool, w *string) error {
+ r.p.Pause(pipelines)
return nil
}
-func (r *rpc) Resume(pipeline string, w *string) error {
+func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.EmptyResponse) error {
+ pipelines := make([]string, len(req.GetPipelines()))
+
+ for i := 0; i < len(pipelines); i++ {
+ pipelines[i] = req.GetPipelines()[i]
+ }
+
+ r.p.Resume(pipelines)
return nil
}
-func (r *rpc) ResumeAll(_ bool, w *string) error {
- return nil
+// from converts from transport entity to domain
+func (r *rpc) from(j *jobsv1beta.Job) *structs.Job {
+ headers := map[string][]string{}
+
+ for k, v := range j.GetHeaders() {
+ headers[k] = v.GetValue()
+ }
+
+ jb := &structs.Job{
+ Job: j.GetJob(),
+ Headers: headers,
+ Ident: j.GetId(),
+ Payload: j.GetPayload(),
+ Options: &structs.Options{
+ Priority: j.GetOptions().GetPriority(),
+ Pipeline: j.GetOptions().GetPipeline(),
+ Delay: j.GetOptions().GetDelay(),
+ Attempts: j.GetOptions().GetAttempts(),
+ RetryDelay: j.GetOptions().GetRetryDelay(),
+ Timeout: j.GetOptions().GetTimeout(),
+ },
+ }
+
+ return jb
}