summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/priority_queue/binary_heap.go43
-rw-r--r--pkg/priority_queue/binary_heap_test.go55
-rw-r--r--pkg/priority_queue/interface.go9
-rw-r--r--pkg/priority_queue/queue.go19
-rw-r--r--plugins/jobs/brokers/ephemeral/entry.go7
-rw-r--r--plugins/jobs/plugin.go2
6 files changed, 110 insertions, 25 deletions
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
index c660ddb6..7e034e82 100644
--- a/pkg/priority_queue/binary_heap.go
+++ b/pkg/priority_queue/binary_heap.go
@@ -4,9 +4,48 @@ binary heap (min-heap) algorithm used as a core for the priority queue
package priorityqueue
-type BinHeap struct {
-}
+type BinHeap []PQItem
func NewBinHeap() *BinHeap {
return &BinHeap{}
}
+
+func (bh *BinHeap) Init(items []PQItem) {
+ arraySize := len(items) - 1
+
+ for i := arraySize/2 - 1; i >= 0; i-- {
+ bh.shiftDown(items, i, arraySize)
+ }
+
+ for i := arraySize - 1; i >= 1; i-- {
+ items[0], items[i] = items[i], items[0]
+ bh.shiftDown(items, 0, i-1)
+ }
+}
+
+func (bh *BinHeap) shiftDown(numbers []PQItem, k, n int) {
+ // k << 1 is equal to k*2
+ for k<<1 <= n {
+ j := k << 1
+
+ if j < n && numbers[j].Priority() < numbers[j+1].Priority() {
+ j++
+ }
+
+ if !(numbers[k].Priority() < numbers[j].Priority()) {
+ break
+ }
+
+ numbers[k], numbers[j] = numbers[j], numbers[k]
+ k = j
+ }
+}
+func (bh *BinHeap) fix() {}
+
+func (bh *BinHeap) Push(_ PQItem) {}
+
+func (bh *BinHeap) Pop() PQItem {
+ bh.fix()
+ // get min
+ return nil
+}
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
new file mode 100644
index 00000000..a2b0e7e8
--- /dev/null
+++ b/pkg/priority_queue/binary_heap_test.go
@@ -0,0 +1,55 @@
+package priorityqueue
+
+import (
+ "sort"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+type Test int
+
+func (t Test) ID() string {
+ return ""
+}
+
+func (t Test) Priority() uint64 {
+ return uint64(t)
+}
+
+func TestBinHeap_Init(t *testing.T) {
+ a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
+
+ bh := NewBinHeap()
+
+ bh.Init(a)
+
+ expected := []PQItem{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
+
+ require.Equal(t, expected, a)
+}
+
+func BenchmarkBinHeap_Init(b *testing.B) {
+ a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
+ bh := NewBinHeap()
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ bh.Init(a)
+ }
+}
+
+func BenchmarkBinHeap_InitStdSort(b *testing.B) {
+ a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ sort.Slice(a, func(i, j int) bool {
+ return a[i].Priority() < a[j].Priority()
+ })
+ }
+}
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
index 00998d78..45430486 100644
--- a/pkg/priority_queue/interface.go
+++ b/pkg/priority_queue/interface.go
@@ -1,6 +1,11 @@
package priorityqueue
type Queue interface {
- Push(item interface{})
- Pop() interface{}
+ Push(item PQItem)
+ Pop() PQItem
+}
+
+type PQItem interface {
+ ID() string
+ Priority() uint64
}
diff --git a/pkg/priority_queue/queue.go b/pkg/priority_queue/queue.go
deleted file mode 100644
index c5a8a1f6..00000000
--- a/pkg/priority_queue/queue.go
+++ /dev/null
@@ -1,19 +0,0 @@
-package priorityqueue
-
-import "fmt"
-
-type QueueImpl struct {
-}
-
-func NewPriorityQueue() *QueueImpl {
- return &QueueImpl{}
-}
-
-// Push the task
-func (q *QueueImpl) Push(item interface{}) {
- fmt.Println(item)
-}
-
-func (q *QueueImpl) Pop() interface{} {
- return nil
-}
diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go
index bf8796d5..3cedec3e 100644
--- a/plugins/jobs/brokers/ephemeral/entry.go
+++ b/plugins/jobs/brokers/ephemeral/entry.go
@@ -1,13 +1,18 @@
package ephemeral
type entry struct {
- id string
+ id string
+ priority uint64
}
func (e *entry) ID() string {
return e.id
}
+func (e *entry) Priority() uint64 {
+ return e.priority
+}
+
func (e *entry) Ask() {
// no-op
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 690402d6..6bf43a11 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -71,7 +71,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
// initialize priority queue
- p.queue = priorityqueue.NewPriorityQueue()
+ p.queue = priorityqueue.NewBinHeap()
p.log = log
return nil