diff options
author | Valery Piashchynski <[email protected]> | 2021-07-06 23:32:23 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-06 23:32:23 +0300 |
commit | 36bf9228e60f59f569e84822e2860980d7ed698d (patch) | |
tree | c76e7656bd26e033678dc52bed8167cfb9b39aa7 /plugins | |
parent | 2c78e93222cc9d3b88456175348e42f7f40c449b (diff) |
Update Jobs interface...
Use bh.len everywhere in the binary heaps algo instead of direct len
check.
Add Ack/Nack to the main jobs loop.
Add PushBatch method to the jobs rpc layer.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 6 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 31 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 58 |
3 files changed, 55 insertions, 40 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 4bbb4095..6c7108f6 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -57,12 +57,6 @@ func (j *JobBroker) Register(pipeline string) error { return nil } -func (j *JobBroker) PushBatch(job *[]structs.Job) (*string, error) { - // Use a batch response - // Add JobID to the payload to match responses - panic("todo") -} - func (j *JobBroker) Stop(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == true { diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 8a80479b..c3f766b9 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -16,6 +16,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -118,10 +119,6 @@ func (p *Plugin) Serve() chan error { // get data JOB from the queue job := p.queue.GetMax() - if job == nil { - continue - } - exec := payload.Payload{ Context: job.Context(), Body: job.Body(), @@ -129,8 +126,12 @@ func (p *Plugin) Serve() chan error { _, err := p.workersPool.Exec(exec) if err != nil { - panic(err) + job.Nack() + p.log.Error("job execute", "error", err) + continue } + + job.Ack() } }() } @@ -176,6 +177,26 @@ func (p *Plugin) Push(j *structs.Job) (*string, error) { return id, nil } +func (p *Plugin) PushBatch(j []*structs.Job) (*string, error) { + const op = errors.Op("jobs_plugin_push") + + for i := 0; i < len(j); i++ { + pipe := p.pipelines.Get(j[i].Options.Pipeline) + + broker, ok := p.consumers[pipe.Driver()] + if !ok { + return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) + } + + _, err := broker.Push(j[i]) + if err != nil { + return nil, errors.E(op, err) + } + } + + return utils.AsStringPtr("test"), nil +} + func (p *Plugin) RPC() interface{} { return &rpc{ log: p.log, diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index c6bd1645..0d4cc099 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,8 +1,6 @@ package jobs import ( - "sync" - "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,26 +12,6 @@ type rpc struct { p *Plugin } -var jobsPool = &sync.Pool{ - New: func() interface{} { - return &structs.Job{ - Options: &structs.Options{}, - } - }, -} - -func pubJob(j *structs.Job) { - // clear - j.Job = "" - j.Payload = "" - j.Options = &structs.Options{} - jobsPool.Put(j) -} - -func getJob() *structs.Job { - return jobsPool.Get().(*structs.Job) -} - /* List of the RPC methods: 1. Push - single job push @@ -55,10 +33,7 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { // convert transport entity into domain // how we can do this quickly - jb := getJob() - defer pubJob(jb) - - jb = &structs.Job{ + jb := &structs.Job{ Job: j.GetJob().Job, Payload: j.GetJob().Payload, Options: &structs.Options{ @@ -71,6 +46,7 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { Timeout: j.GetJob().Options.Timeout, }, } + id, err := r.p.Push(jb) if err != nil { return errors.E(op, err) @@ -81,14 +57,38 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { return nil } -func (r *rpc) PushBatch(j *structs.Job, idRet *string) error { +func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) error { const op = errors.Op("jobs_rpc_push") - id, err := r.p.Push(j) + + l := len(j.GetJobs()) + + batch := make([]*structs.Job, l) + + for i := 0; i < l; i++ { + // convert transport entity into domain + // how we can do this quickly + jb := &structs.Job{ + Job: j.GetJobs()[i].Job, + Payload: j.GetJobs()[i].Payload, + Options: &structs.Options{ + Priority: &j.GetJobs()[i].Options.Priority, + ID: &j.GetJobs()[i].Options.Id, + Pipeline: j.GetJobs()[i].Options.Pipeline, + Delay: j.GetJobs()[i].Options.Delay, + Attempts: j.GetJobs()[i].Options.Attempts, + RetryDelay: j.GetJobs()[i].Options.RetryDelay, + Timeout: j.GetJobs()[i].Options.Timeout, + }, + } + + batch[i] = jb + } + + _, err := r.p.PushBatch(batch) if err != nil { return errors.E(op, err) } - *idRet = *id return nil } |