diff options
author | Valery Piashchynski <[email protected]> | 2021-07-07 18:33:04 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-07 18:33:04 +0300 |
commit | 60c229c8506df465586434309af5acd1f84e2406 (patch) | |
tree | 18fdf380b7e032415d656e84bcc3c7a057f194a8 /pkg | |
parent | 127186a72d4b8d30f6ada72ade661d8713490728 (diff) |
Updated ephemeral plugin, PQ and protobuf...
Implement core of the root jobs plugin with a proper drivers/pipelines
handling mechanism.
Add delayed jobs for the ephemeral plugin.
Remove ResumeAll, Resume, StopAll, Stop. Replaced with Pause/Resume with
a slice of the pipelines.
Other small improvements.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/priorityqueue/binary_heap.go | 6 | ||||
-rw-r--r-- | pkg/priorityqueue/binary_heap_test.go | 13 | ||||
-rw-r--r-- | pkg/priorityqueue/interface.go | 18 |
3 files changed, 24 insertions, 13 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go index f3d8f95b..47fdf5e5 100644 --- a/pkg/priorityqueue/binary_heap.go +++ b/pkg/priorityqueue/binary_heap.go @@ -31,7 +31,7 @@ func (bh *BinHeap) fixUp() { for k > 0 { cur, par := (bh.items)[k], (bh.items)[p] - if *cur.Priority() < *par.Priority() { + if cur.Priority() < par.Priority() { bh.swap(k, p) k = p p = (k - 1) >> 1 @@ -55,10 +55,10 @@ func (bh *BinHeap) fixDown(curr, end int) { idxToSwap := cOneIdx // oh my, so unsafe - if cTwoIdx > -1 && *(bh.items)[cTwoIdx].Priority() < *(bh.items)[cOneIdx].Priority() { + if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() { idxToSwap = cTwoIdx } - if *(bh.items)[idxToSwap].Priority() < *(bh.items)[curr].Priority() { + if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() { bh.swap(uint64(curr), uint64(idxToSwap)) curr = idxToSwap cOneIdx = (curr << 1) + 1 diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go index 4c234dc5..b02017b6 100644 --- a/pkg/priorityqueue/binary_heap_test.go +++ b/pkg/priorityqueue/binary_heap_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/spiral/roadrunner/v2/utils" "github.com/stretchr/testify/require" ) @@ -23,16 +22,16 @@ func (t Test) Body() []byte { return nil } -func (t Test) Context() []byte { - return nil +func (t Test) Context() ([]byte, error) { + return nil, nil } -func (t Test) ID() *string { - return utils.AsStringPtr("none") +func (t Test) ID() string { + return "none" } -func (t Test) Priority() *uint64 { - return utils.AsUint64Ptr(uint64(t)) +func (t Test) Priority() uint64 { + return uint64(t) } func TestBinHeap_Init(t *testing.T) { diff --git a/pkg/priorityqueue/interface.go b/pkg/priorityqueue/interface.go index 7ac2e449..100aa667 100644 --- a/pkg/priorityqueue/interface.go +++ b/pkg/priorityqueue/interface.go @@ -5,11 +5,23 @@ type Queue interface { GetMax() Item } +// Item represents binary heap item type Item interface { - ID() *string - Priority() *uint64 + // ID is a unique item identifier + ID() string + + // Priority returns the Item's priority to sort + Priority() uint64 + + // Body is the Item payload Body() []byte - Context() []byte + + // Context is the Item meta information + Context() ([]byte, error) + + // Ack - acknowledge the Item after processing Ack() + + // Nack - discard the Item Nack() } |