summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-06 23:32:23 +0300
committerValery Piashchynski <[email protected]>2021-07-06 23:32:23 +0300
commit36bf9228e60f59f569e84822e2860980d7ed698d (patch)
treec76e7656bd26e033678dc52bed8167cfb9b39aa7 /plugins
parent2c78e93222cc9d3b88456175348e42f7f40c449b (diff)
Update Jobs interface...
Use bh.len everywhere in the binary heaps algo instead of direct len check. Add Ack/Nack to the main jobs loop. Add PushBatch method to the jobs rpc layer. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go6
-rw-r--r--plugins/jobs/plugin.go31
-rw-r--r--plugins/jobs/rpc.go58
3 files changed, 55 insertions, 40 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
index 4bbb4095..6c7108f6 100644
--- a/plugins/jobs/brokers/ephemeral/broker.go
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -57,12 +57,6 @@ func (j *JobBroker) Register(pipeline string) error {
return nil
}
-func (j *JobBroker) PushBatch(job *[]structs.Job) (*string, error) {
- // Use a batch response
- // Add JobID to the payload to match responses
- panic("todo")
-}
-
func (j *JobBroker) Stop(pipeline string) {
if q, ok := j.queues.Load(pipeline); ok {
if q == true {
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 8a80479b..c3f766b9 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -16,6 +16,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -118,10 +119,6 @@ func (p *Plugin) Serve() chan error {
// get data JOB from the queue
job := p.queue.GetMax()
- if job == nil {
- continue
- }
-
exec := payload.Payload{
Context: job.Context(),
Body: job.Body(),
@@ -129,8 +126,12 @@ func (p *Plugin) Serve() chan error {
_, err := p.workersPool.Exec(exec)
if err != nil {
- panic(err)
+ job.Nack()
+ p.log.Error("job execute", "error", err)
+ continue
}
+
+ job.Ack()
}
}()
}
@@ -176,6 +177,26 @@ func (p *Plugin) Push(j *structs.Job) (*string, error) {
return id, nil
}
+func (p *Plugin) PushBatch(j []*structs.Job) (*string, error) {
+ const op = errors.Op("jobs_plugin_push")
+
+ for i := 0; i < len(j); i++ {
+ pipe := p.pipelines.Get(j[i].Options.Pipeline)
+
+ broker, ok := p.consumers[pipe.Driver()]
+ if !ok {
+ return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver()))
+ }
+
+ _, err := broker.Push(j[i])
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+ }
+
+ return utils.AsStringPtr("test"), nil
+}
+
func (p *Plugin) RPC() interface{} {
return &rpc{
log: p.log,
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index c6bd1645..0d4cc099 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -1,8 +1,6 @@
package jobs
import (
- "sync"
-
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,26 +12,6 @@ type rpc struct {
p *Plugin
}
-var jobsPool = &sync.Pool{
- New: func() interface{} {
- return &structs.Job{
- Options: &structs.Options{},
- }
- },
-}
-
-func pubJob(j *structs.Job) {
- // clear
- j.Job = ""
- j.Payload = ""
- j.Options = &structs.Options{}
- jobsPool.Put(j)
-}
-
-func getJob() *structs.Job {
- return jobsPool.Get().(*structs.Job)
-}
-
/*
List of the RPC methods:
1. Push - single job push
@@ -55,10 +33,7 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error {
// convert transport entity into domain
// how we can do this quickly
- jb := getJob()
- defer pubJob(jb)
-
- jb = &structs.Job{
+ jb := &structs.Job{
Job: j.GetJob().Job,
Payload: j.GetJob().Payload,
Options: &structs.Options{
@@ -71,6 +46,7 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error {
Timeout: j.GetJob().Options.Timeout,
},
}
+
id, err := r.p.Push(jb)
if err != nil {
return errors.E(op, err)
@@ -81,14 +57,38 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error {
return nil
}
-func (r *rpc) PushBatch(j *structs.Job, idRet *string) error {
+func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) error {
const op = errors.Op("jobs_rpc_push")
- id, err := r.p.Push(j)
+
+ l := len(j.GetJobs())
+
+ batch := make([]*structs.Job, l)
+
+ for i := 0; i < l; i++ {
+ // convert transport entity into domain
+ // how we can do this quickly
+ jb := &structs.Job{
+ Job: j.GetJobs()[i].Job,
+ Payload: j.GetJobs()[i].Payload,
+ Options: &structs.Options{
+ Priority: &j.GetJobs()[i].Options.Priority,
+ ID: &j.GetJobs()[i].Options.Id,
+ Pipeline: j.GetJobs()[i].Options.Pipeline,
+ Delay: j.GetJobs()[i].Options.Delay,
+ Attempts: j.GetJobs()[i].Options.Attempts,
+ RetryDelay: j.GetJobs()[i].Options.RetryDelay,
+ Timeout: j.GetJobs()[i].Options.Timeout,
+ },
+ }
+
+ batch[i] = jb
+ }
+
+ _, err := r.p.PushBatch(batch)
if err != nil {
return errors.E(op, err)
}
- *idRet = *id
return nil
}