From 60c229c8506df465586434309af5acd1f84e2406 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 7 Jul 2021 18:33:04 +0300 Subject: Updated ephemeral plugin, PQ and protobuf... Implement core of the root jobs plugin with a proper drivers/pipelines handling mechanism. Add delayed jobs for the ephemeral plugin. Remove ResumeAll, Resume, StopAll, Stop. Replaced with Pause/Resume with a slice of the pipelines. Other small improvements. Signed-off-by: Valery Piashchynski --- pkg/priorityqueue/binary_heap.go | 6 +++--- pkg/priorityqueue/binary_heap_test.go | 13 ++++++------- pkg/priorityqueue/interface.go | 18 +++++++++++++++--- 3 files changed, 24 insertions(+), 13 deletions(-) (limited to 'pkg') diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go index f3d8f95b..47fdf5e5 100644 --- a/pkg/priorityqueue/binary_heap.go +++ b/pkg/priorityqueue/binary_heap.go @@ -31,7 +31,7 @@ func (bh *BinHeap) fixUp() { for k > 0 { cur, par := (bh.items)[k], (bh.items)[p] - if *cur.Priority() < *par.Priority() { + if cur.Priority() < par.Priority() { bh.swap(k, p) k = p p = (k - 1) >> 1 @@ -55,10 +55,10 @@ func (bh *BinHeap) fixDown(curr, end int) { idxToSwap := cOneIdx // oh my, so unsafe - if cTwoIdx > -1 && *(bh.items)[cTwoIdx].Priority() < *(bh.items)[cOneIdx].Priority() { + if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() { idxToSwap = cTwoIdx } - if *(bh.items)[idxToSwap].Priority() < *(bh.items)[curr].Priority() { + if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() { bh.swap(uint64(curr), uint64(idxToSwap)) curr = idxToSwap cOneIdx = (curr << 1) + 1 diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go index 4c234dc5..b02017b6 100644 --- a/pkg/priorityqueue/binary_heap_test.go +++ b/pkg/priorityqueue/binary_heap_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/spiral/roadrunner/v2/utils" "github.com/stretchr/testify/require" ) @@ -23,16 +22,16 @@ func (t Test) Body() []byte { return nil } -func (t Test) Context() []byte { - return nil +func (t Test) Context() ([]byte, error) { + return nil, nil } -func (t Test) ID() *string { - return utils.AsStringPtr("none") +func (t Test) ID() string { + return "none" } -func (t Test) Priority() *uint64 { - return utils.AsUint64Ptr(uint64(t)) +func (t Test) Priority() uint64 { + return uint64(t) } func TestBinHeap_Init(t *testing.T) { diff --git a/pkg/priorityqueue/interface.go b/pkg/priorityqueue/interface.go index 7ac2e449..100aa667 100644 --- a/pkg/priorityqueue/interface.go +++ b/pkg/priorityqueue/interface.go @@ -5,11 +5,23 @@ type Queue interface { GetMax() Item } +// Item represents binary heap item type Item interface { - ID() *string - Priority() *uint64 + // ID is a unique item identifier + ID() string + + // Priority returns the Item's priority to sort + Priority() uint64 + + // Body is the Item payload Body() []byte - Context() []byte + + // Context is the Item meta information + Context() ([]byte, error) + + // Ack - acknowledge the Item after processing Ack() + + // Nack - discard the Item Nack() } -- cgit v1.2.3