diff options
Diffstat (limited to 'plugins/jobs/rpc.go')
-rw-r--r-- | plugins/jobs/rpc.go | 58 |
1 files changed, 29 insertions, 29 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index c6bd1645..0d4cc099 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,8 +1,6 @@ package jobs import ( - "sync" - "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,26 +12,6 @@ type rpc struct { p *Plugin } -var jobsPool = &sync.Pool{ - New: func() interface{} { - return &structs.Job{ - Options: &structs.Options{}, - } - }, -} - -func pubJob(j *structs.Job) { - // clear - j.Job = "" - j.Payload = "" - j.Options = &structs.Options{} - jobsPool.Put(j) -} - -func getJob() *structs.Job { - return jobsPool.Get().(*structs.Job) -} - /* List of the RPC methods: 1. Push - single job push @@ -55,10 +33,7 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { // convert transport entity into domain // how we can do this quickly - jb := getJob() - defer pubJob(jb) - - jb = &structs.Job{ + jb := &structs.Job{ Job: j.GetJob().Job, Payload: j.GetJob().Payload, Options: &structs.Options{ @@ -71,6 +46,7 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { Timeout: j.GetJob().Options.Timeout, }, } + id, err := r.p.Push(jb) if err != nil { return errors.E(op, err) @@ -81,14 +57,38 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { return nil } -func (r *rpc) PushBatch(j *structs.Job, idRet *string) error { +func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) error { const op = errors.Op("jobs_rpc_push") - id, err := r.p.Push(j) + + l := len(j.GetJobs()) + + batch := make([]*structs.Job, l) + + for i := 0; i < l; i++ { + // convert transport entity into domain + // how we can do this quickly + jb := &structs.Job{ + Job: j.GetJobs()[i].Job, + Payload: j.GetJobs()[i].Payload, + Options: &structs.Options{ + Priority: &j.GetJobs()[i].Options.Priority, + ID: &j.GetJobs()[i].Options.Id, + Pipeline: j.GetJobs()[i].Options.Pipeline, + Delay: j.GetJobs()[i].Options.Delay, + Attempts: j.GetJobs()[i].Options.Attempts, + RetryDelay: j.GetJobs()[i].Options.RetryDelay, + Timeout: j.GetJobs()[i].Options.Timeout, + }, + } + + batch[i] = jb + } + + _, err := r.p.PushBatch(batch) if err != nil { return errors.E(op, err) } - *idRet = *id return nil } |