diff options
author | Valery Piashchynski <[email protected]> | 2021-07-06 23:32:23 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-06 23:32:23 +0300 |
commit | 36bf9228e60f59f569e84822e2860980d7ed698d (patch) | |
tree | c76e7656bd26e033678dc52bed8167cfb9b39aa7 | |
parent | 2c78e93222cc9d3b88456175348e42f7f40c449b (diff) |
Update Jobs interface...
Use bh.len everywhere in the binary heaps algo instead of direct len
check.
Add Ack/Nack to the main jobs loop.
Add PushBatch method to the jobs rpc layer.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | common/jobs/interface.go | 1 | ||||
-rw-r--r-- | pkg/priorityqueue/binary_heap.go | 10 | ||||
-rw-r--r-- | pkg/priorityqueue/binary_heap_test.go | 9 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 6 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 31 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 58 |
6 files changed, 63 insertions, 52 deletions
diff --git a/common/jobs/interface.go b/common/jobs/interface.go index 6738ed46..786eca0e 100644 --- a/common/jobs/interface.go +++ b/common/jobs/interface.go @@ -9,7 +9,6 @@ import ( // Consumer todo naming type Consumer interface { Push(job *structs.Job) (*string, error) - PushBatch(job *[]structs.Job) (*string, error) Consume(job *pipeline.Pipeline) Stop(pipeline string) diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go index fe6a06fd..f3d8f95b 100644 --- a/pkg/priorityqueue/binary_heap.go +++ b/pkg/priorityqueue/binary_heap.go @@ -25,7 +25,7 @@ func NewBinHeap() *BinHeap { } func (bh *BinHeap) fixUp() { - k := len(bh.items) - 1 + k := bh.len - 1 p := (k - 1) >> 1 // k-1 / 2 for k > 0 { @@ -41,7 +41,7 @@ func (bh *BinHeap) fixUp() { } } -func (bh *BinHeap) swap(i, j int) { +func (bh *BinHeap) swap(i, j uint64) { (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i] } @@ -59,7 +59,7 @@ func (bh *BinHeap) fixDown(curr, end int) { idxToSwap = cTwoIdx } if *(bh.items)[idxToSwap].Priority() < *(bh.items)[curr].Priority() { - bh.swap(curr, idxToSwap) + bh.swap(uint64(curr), uint64(idxToSwap)) curr = idxToSwap cOneIdx = (curr << 1) + 1 } else { @@ -87,11 +87,11 @@ func (bh *BinHeap) GetMax() Item { bh.cond.L.Lock() defer bh.cond.L.Unlock() - for atomic.LoadUint64(&bh.len) == 0 { + if atomic.LoadUint64(&bh.len) == 0 { bh.cond.Wait() } - bh.swap(0, int(bh.len-1)) + bh.swap(0, bh.len-1) item := (bh.items)[int(bh.len)-1] bh.items = (bh).items[0 : int(bh.len)-1] diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go index 149ec764..4c234dc5 100644 --- a/pkg/priorityqueue/binary_heap_test.go +++ b/pkg/priorityqueue/binary_heap_test.go @@ -68,10 +68,10 @@ func TestNewPriorityQueue(t *testing.T) { for { select { case <-tt.C: - fmt.Println(fmt.Sprintf("GetMax per second: %d", atomic.LoadUint64(&getPerSec))) fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec))) - atomic.StoreUint64(&getPerSec, 0) atomic.StoreUint64(&insertsPerSec, 0) + fmt.Println(fmt.Sprintf("GetMax per second: %d", atomic.LoadUint64(&getPerSec))) + atomic.StoreUint64(&getPerSec, 0) case <-stopCh: tt.Stop() return @@ -85,10 +85,7 @@ func TestNewPriorityQueue(t *testing.T) { case <-stopCh: return default: - it := pq.GetMax() - if it == nil { - continue - } + pq.GetMax() atomic.AddUint64(&getPerSec, 1) } } diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 4bbb4095..6c7108f6 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -57,12 +57,6 @@ func (j *JobBroker) Register(pipeline string) error { return nil } -func (j *JobBroker) PushBatch(job *[]structs.Job) (*string, error) { - // Use a batch response - // Add JobID to the payload to match responses - panic("todo") -} - func (j *JobBroker) Stop(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == true { diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 8a80479b..c3f766b9 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -16,6 +16,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -118,10 +119,6 @@ func (p *Plugin) Serve() chan error { // get data JOB from the queue job := p.queue.GetMax() - if job == nil { - continue - } - exec := payload.Payload{ Context: job.Context(), Body: job.Body(), @@ -129,8 +126,12 @@ func (p *Plugin) Serve() chan error { _, err := p.workersPool.Exec(exec) if err != nil { - panic(err) + job.Nack() + p.log.Error("job execute", "error", err) + continue } + + job.Ack() } }() } @@ -176,6 +177,26 @@ func (p *Plugin) Push(j *structs.Job) (*string, error) { return id, nil } +func (p *Plugin) PushBatch(j []*structs.Job) (*string, error) { + const op = errors.Op("jobs_plugin_push") + + for i := 0; i < len(j); i++ { + pipe := p.pipelines.Get(j[i].Options.Pipeline) + + broker, ok := p.consumers[pipe.Driver()] + if !ok { + return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) + } + + _, err := broker.Push(j[i]) + if err != nil { + return nil, errors.E(op, err) + } + } + + return utils.AsStringPtr("test"), nil +} + func (p *Plugin) RPC() interface{} { return &rpc{ log: p.log, diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index c6bd1645..0d4cc099 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,8 +1,6 @@ package jobs import ( - "sync" - "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -14,26 +12,6 @@ type rpc struct { p *Plugin } -var jobsPool = &sync.Pool{ - New: func() interface{} { - return &structs.Job{ - Options: &structs.Options{}, - } - }, -} - -func pubJob(j *structs.Job) { - // clear - j.Job = "" - j.Payload = "" - j.Options = &structs.Options{} - jobsPool.Put(j) -} - -func getJob() *structs.Job { - return jobsPool.Get().(*structs.Job) -} - /* List of the RPC methods: 1. Push - single job push @@ -55,10 +33,7 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { // convert transport entity into domain // how we can do this quickly - jb := getJob() - defer pubJob(jb) - - jb = &structs.Job{ + jb := &structs.Job{ Job: j.GetJob().Job, Payload: j.GetJob().Payload, Options: &structs.Options{ @@ -71,6 +46,7 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { Timeout: j.GetJob().Options.Timeout, }, } + id, err := r.p.Push(jb) if err != nil { return errors.E(op, err) @@ -81,14 +57,38 @@ func (r *rpc) Push(j *jobsv1beta.Request, resp *jobsv1beta.Response) error { return nil } -func (r *rpc) PushBatch(j *structs.Job, idRet *string) error { +func (r *rpc) PushBatch(j *jobsv1beta.BatchRequest, resp *jobsv1beta.Response) error { const op = errors.Op("jobs_rpc_push") - id, err := r.p.Push(j) + + l := len(j.GetJobs()) + + batch := make([]*structs.Job, l) + + for i := 0; i < l; i++ { + // convert transport entity into domain + // how we can do this quickly + jb := &structs.Job{ + Job: j.GetJobs()[i].Job, + Payload: j.GetJobs()[i].Payload, + Options: &structs.Options{ + Priority: &j.GetJobs()[i].Options.Priority, + ID: &j.GetJobs()[i].Options.Id, + Pipeline: j.GetJobs()[i].Options.Pipeline, + Delay: j.GetJobs()[i].Options.Delay, + Attempts: j.GetJobs()[i].Options.Attempts, + RetryDelay: j.GetJobs()[i].Options.RetryDelay, + Timeout: j.GetJobs()[i].Options.Timeout, + }, + } + + batch[i] = jb + } + + _, err := r.p.PushBatch(batch) if err != nil { return errors.E(op, err) } - *idRet = *id return nil } |