summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-07 18:33:04 +0300
committerValery Piashchynski <[email protected]>2021-07-07 18:33:04 +0300
commit60c229c8506df465586434309af5acd1f84e2406 (patch)
tree18fdf380b7e032415d656e84bcc3c7a057f194a8 /pkg
parent127186a72d4b8d30f6ada72ade661d8713490728 (diff)
Updated ephemeral plugin, PQ and protobuf...
Implement core of the root jobs plugin with a proper drivers/pipelines handling mechanism. Add delayed jobs for the ephemeral plugin. Remove ResumeAll, Resume, StopAll, Stop. Replaced with Pause/Resume with a slice of the pipelines. Other small improvements. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r--pkg/priorityqueue/binary_heap.go6
-rw-r--r--pkg/priorityqueue/binary_heap_test.go13
-rw-r--r--pkg/priorityqueue/interface.go18
3 files changed, 24 insertions, 13 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go
index f3d8f95b..47fdf5e5 100644
--- a/pkg/priorityqueue/binary_heap.go
+++ b/pkg/priorityqueue/binary_heap.go
@@ -31,7 +31,7 @@ func (bh *BinHeap) fixUp() {
for k > 0 {
cur, par := (bh.items)[k], (bh.items)[p]
- if *cur.Priority() < *par.Priority() {
+ if cur.Priority() < par.Priority() {
bh.swap(k, p)
k = p
p = (k - 1) >> 1
@@ -55,10 +55,10 @@ func (bh *BinHeap) fixDown(curr, end int) {
idxToSwap := cOneIdx
// oh my, so unsafe
- if cTwoIdx > -1 && *(bh.items)[cTwoIdx].Priority() < *(bh.items)[cOneIdx].Priority() {
+ if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() {
idxToSwap = cTwoIdx
}
- if *(bh.items)[idxToSwap].Priority() < *(bh.items)[curr].Priority() {
+ if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() {
bh.swap(uint64(curr), uint64(idxToSwap))
curr = idxToSwap
cOneIdx = (curr << 1) + 1
diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go
index 4c234dc5..b02017b6 100644
--- a/pkg/priorityqueue/binary_heap_test.go
+++ b/pkg/priorityqueue/binary_heap_test.go
@@ -7,7 +7,6 @@ import (
"testing"
"time"
- "github.com/spiral/roadrunner/v2/utils"
"github.com/stretchr/testify/require"
)
@@ -23,16 +22,16 @@ func (t Test) Body() []byte {
return nil
}
-func (t Test) Context() []byte {
- return nil
+func (t Test) Context() ([]byte, error) {
+ return nil, nil
}
-func (t Test) ID() *string {
- return utils.AsStringPtr("none")
+func (t Test) ID() string {
+ return "none"
}
-func (t Test) Priority() *uint64 {
- return utils.AsUint64Ptr(uint64(t))
+func (t Test) Priority() uint64 {
+ return uint64(t)
}
func TestBinHeap_Init(t *testing.T) {
diff --git a/pkg/priorityqueue/interface.go b/pkg/priorityqueue/interface.go
index 7ac2e449..100aa667 100644
--- a/pkg/priorityqueue/interface.go
+++ b/pkg/priorityqueue/interface.go
@@ -5,11 +5,23 @@ type Queue interface {
GetMax() Item
}
+// Item represents binary heap item
type Item interface {
- ID() *string
- Priority() *uint64
+ // ID is a unique item identifier
+ ID() string
+
+ // Priority returns the Item's priority to sort
+ Priority() uint64
+
+ // Body is the Item payload
Body() []byte
- Context() []byte
+
+ // Context is the Item meta information
+ Context() ([]byte, error)
+
+ // Ack - acknowledge the Item after processing
Ack()
+
+ // Nack - discard the Item
Nack()
}