diff options
author | Valery Piashchynski <[email protected]> | 2021-07-06 23:32:23 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-06 23:32:23 +0300 |
commit | 36bf9228e60f59f569e84822e2860980d7ed698d (patch) | |
tree | c76e7656bd26e033678dc52bed8167cfb9b39aa7 /plugins/jobs/plugin.go | |
parent | 2c78e93222cc9d3b88456175348e42f7f40c449b (diff) |
Update Jobs interface...
Use bh.len everywhere in the binary heaps algo instead of direct len
check.
Add Ack/Nack to the main jobs loop.
Add PushBatch method to the jobs rpc layer.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 31 |
1 files changed, 26 insertions, 5 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 8a80479b..c3f766b9 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -16,6 +16,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -118,10 +119,6 @@ func (p *Plugin) Serve() chan error { // get data JOB from the queue job := p.queue.GetMax() - if job == nil { - continue - } - exec := payload.Payload{ Context: job.Context(), Body: job.Body(), @@ -129,8 +126,12 @@ func (p *Plugin) Serve() chan error { _, err := p.workersPool.Exec(exec) if err != nil { - panic(err) + job.Nack() + p.log.Error("job execute", "error", err) + continue } + + job.Ack() } }() } @@ -176,6 +177,26 @@ func (p *Plugin) Push(j *structs.Job) (*string, error) { return id, nil } +func (p *Plugin) PushBatch(j []*structs.Job) (*string, error) { + const op = errors.Op("jobs_plugin_push") + + for i := 0; i < len(j); i++ { + pipe := p.pipelines.Get(j[i].Options.Pipeline) + + broker, ok := p.consumers[pipe.Driver()] + if !ok { + return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) + } + + _, err := broker.Push(j[i]) + if err != nil { + return nil, errors.E(op, err) + } + } + + return utils.AsStringPtr("test"), nil +} + func (p *Plugin) RPC() interface{} { return &rpc{ log: p.log, |