summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go31
1 files changed, 26 insertions, 5 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 8a80479b..c3f766b9 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -16,6 +16,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -118,10 +119,6 @@ func (p *Plugin) Serve() chan error {
// get data JOB from the queue
job := p.queue.GetMax()
- if job == nil {
- continue
- }
-
exec := payload.Payload{
Context: job.Context(),
Body: job.Body(),
@@ -129,8 +126,12 @@ func (p *Plugin) Serve() chan error {
_, err := p.workersPool.Exec(exec)
if err != nil {
- panic(err)
+ job.Nack()
+ p.log.Error("job execute", "error", err)
+ continue
}
+
+ job.Ack()
}
}()
}
@@ -176,6 +177,26 @@ func (p *Plugin) Push(j *structs.Job) (*string, error) {
return id, nil
}
+func (p *Plugin) PushBatch(j []*structs.Job) (*string, error) {
+ const op = errors.Op("jobs_plugin_push")
+
+ for i := 0; i < len(j); i++ {
+ pipe := p.pipelines.Get(j[i].Options.Pipeline)
+
+ broker, ok := p.consumers[pipe.Driver()]
+ if !ok {
+ return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver()))
+ }
+
+ _, err := broker.Push(j[i])
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+ }
+
+ return utils.AsStringPtr("test"), nil
+}
+
func (p *Plugin) RPC() interface{} {
return &rpc{
log: p.log,