diff options
-rw-r--r-- | pkg/priority_queue/binary_heap.go | 43 | ||||
-rw-r--r-- | pkg/priority_queue/binary_heap_test.go | 55 | ||||
-rw-r--r-- | pkg/priority_queue/interface.go | 9 | ||||
-rw-r--r-- | pkg/priority_queue/queue.go | 19 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/entry.go | 7 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 2 |
6 files changed, 110 insertions, 25 deletions
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go index c660ddb6..7e034e82 100644 --- a/pkg/priority_queue/binary_heap.go +++ b/pkg/priority_queue/binary_heap.go @@ -4,9 +4,48 @@ binary heap (min-heap) algorithm used as a core for the priority queue package priorityqueue -type BinHeap struct { -} +type BinHeap []PQItem func NewBinHeap() *BinHeap { return &BinHeap{} } + +func (bh *BinHeap) Init(items []PQItem) { + arraySize := len(items) - 1 + + for i := arraySize/2 - 1; i >= 0; i-- { + bh.shiftDown(items, i, arraySize) + } + + for i := arraySize - 1; i >= 1; i-- { + items[0], items[i] = items[i], items[0] + bh.shiftDown(items, 0, i-1) + } +} + +func (bh *BinHeap) shiftDown(numbers []PQItem, k, n int) { + // k << 1 is equal to k*2 + for k<<1 <= n { + j := k << 1 + + if j < n && numbers[j].Priority() < numbers[j+1].Priority() { + j++ + } + + if !(numbers[k].Priority() < numbers[j].Priority()) { + break + } + + numbers[k], numbers[j] = numbers[j], numbers[k] + k = j + } +} +func (bh *BinHeap) fix() {} + +func (bh *BinHeap) Push(_ PQItem) {} + +func (bh *BinHeap) Pop() PQItem { + bh.fix() + // get min + return nil +} diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go new file mode 100644 index 00000000..a2b0e7e8 --- /dev/null +++ b/pkg/priority_queue/binary_heap_test.go @@ -0,0 +1,55 @@ +package priorityqueue + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/require" +) + +type Test int + +func (t Test) ID() string { + return "" +} + +func (t Test) Priority() uint64 { + return uint64(t) +} + +func TestBinHeap_Init(t *testing.T) { + a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} + + bh := NewBinHeap() + + bh.Init(a) + + expected := []PQItem{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} + + require.Equal(t, expected, a) +} + +func BenchmarkBinHeap_Init(b *testing.B) { + a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} + bh := NewBinHeap() + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + bh.Init(a) + } +} + +func BenchmarkBinHeap_InitStdSort(b *testing.B) { + a := []PQItem{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + sort.Slice(a, func(i, j int) bool { + return a[i].Priority() < a[j].Priority() + }) + } +} diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 00998d78..45430486 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -1,6 +1,11 @@ package priorityqueue type Queue interface { - Push(item interface{}) - Pop() interface{} + Push(item PQItem) + Pop() PQItem +} + +type PQItem interface { + ID() string + Priority() uint64 } diff --git a/pkg/priority_queue/queue.go b/pkg/priority_queue/queue.go deleted file mode 100644 index c5a8a1f6..00000000 --- a/pkg/priority_queue/queue.go +++ /dev/null @@ -1,19 +0,0 @@ -package priorityqueue - -import "fmt" - -type QueueImpl struct { -} - -func NewPriorityQueue() *QueueImpl { - return &QueueImpl{} -} - -// Push the task -func (q *QueueImpl) Push(item interface{}) { - fmt.Println(item) -} - -func (q *QueueImpl) Pop() interface{} { - return nil -} diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go index bf8796d5..3cedec3e 100644 --- a/plugins/jobs/brokers/ephemeral/entry.go +++ b/plugins/jobs/brokers/ephemeral/entry.go @@ -1,13 +1,18 @@ package ephemeral type entry struct { - id string + id string + priority uint64 } func (e *entry) ID() string { return e.id } +func (e *entry) Priority() uint64 { + return e.priority +} + func (e *entry) Ask() { // no-op } diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 690402d6..6bf43a11 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -71,7 +71,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue.NewPriorityQueue() + p.queue = priorityqueue.NewBinHeap() p.log = log return nil |