summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-06 17:30:31 +0300
committerValery Piashchynski <[email protected]>2021-07-06 17:30:31 +0300
commit2c78e93222cc9d3b88456175348e42f7f40c449b (patch)
treebe4fc671db33ceb8700019a5ede900c8d900d7c0 /pkg
parent207739f7346c98e16087547bc510e1f909671260 (diff)
Rework ephemeral and binary heaps
Implemented a sync.Cond for binary heap algo to save processor from spinning in the for loop and receiving nil Items until the Queue will be filled. Add num_pollers option to the configuration to specify number of pollers from the queue. Add Resume, ResumeAll, Stop, StopAll, PushBatch methods to the ephemeral. Remove map and use sync.Map in the ephemeral broker. Add protobuf schema. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r--pkg/priority_queue/binary_heap.go75
-rw-r--r--pkg/priority_queue/binary_heap_test.go53
-rw-r--r--pkg/priority_queue/pq.go30
-rw-r--r--pkg/priorityqueue/binary_heap.go103
-rw-r--r--pkg/priorityqueue/binary_heap_test.go (renamed from pkg/priority_queue/pq_test.go)50
-rw-r--r--pkg/priorityqueue/interface.go15
6 files changed, 167 insertions, 159 deletions
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
deleted file mode 100644
index c7c148da..00000000
--- a/pkg/priority_queue/binary_heap.go
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
-binary heap (min-heap) algorithm used as a core for the priority queue
-*/
-
-package priorityqueue
-
-import priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
-
-type BinHeap []priorityqueue.Item
-
-func NewBinHeap() *BinHeap {
- return &BinHeap{}
-}
-
-func (bh *BinHeap) fixUp() {
- k := len(*bh) - 1
- p := (k - 1) >> 1 // k-1 / 2
-
- for k > 0 {
- cur, par := (*bh)[k], (*bh)[p]
-
- if cur.Priority() < par.Priority() {
- bh.swap(k, p)
- k = p
- p = (k - 1) >> 1
- } else {
- return
- }
- }
-}
-
-func (bh *BinHeap) swap(i, j int) {
- (*bh)[i], (*bh)[j] = (*bh)[j], (*bh)[i]
-}
-
-func (bh *BinHeap) fixDown(curr, end int) {
- cOneIdx := curr*2 + 1
- for cOneIdx <= end {
- cTwoIdx := -1
- if curr*2+2 <= end {
- cTwoIdx = curr*2 + 2
- }
-
- idxToSwap := cOneIdx
- if cTwoIdx > -1 && (*bh)[cTwoIdx].Priority() < (*bh)[cOneIdx].Priority() {
- idxToSwap = cTwoIdx
- }
- if (*bh)[idxToSwap].Priority() < (*bh)[curr].Priority() {
- bh.swap(curr, idxToSwap)
- curr = idxToSwap
- cOneIdx = curr*2 + 1
- } else {
- return
- }
- }
-}
-
-func (bh *BinHeap) Insert(item priorityqueue.Item) {
- *bh = append(*bh, item)
- bh.fixUp()
-}
-
-func (bh *BinHeap) GetMax() priorityqueue.Item {
- l := len(*bh)
- if l == 0 {
- return nil
- }
-
- bh.swap(0, l-1)
-
- item := (*bh)[l-1]
- *bh = (*bh)[0 : l-1]
- bh.fixDown(0, l-2)
- return item
-}
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
deleted file mode 100644
index 528e8fd0..00000000
--- a/pkg/priority_queue/binary_heap_test.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package priorityqueue
-
-import (
- "testing"
-
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
- "github.com/stretchr/testify/require"
-)
-
-type Test int
-
-func (t Test) Ack() {
-}
-
-func (t Test) Nack() {
-}
-
-func (t Test) Body() []byte {
- return nil
-}
-
-func (t Test) Context() []byte {
- return nil
-}
-
-func (t Test) ID() string {
- return ""
-}
-
-func (t Test) Priority() uint64 {
- return uint64(t)
-}
-
-func TestBinHeap_Init(t *testing.T) {
- a := []priorityqueue.Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
-
- bh := NewBinHeap()
-
- for i := 0; i < len(a); i++ {
- bh.Insert(a[i])
- }
-
- expected := []priorityqueue.Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
-
- res := make([]priorityqueue.Item, 0, 12)
-
- for i := 0; i < 11; i++ {
- item := bh.GetMax()
- res = append(res, item)
- }
-
- require.Equal(t, expected, res)
-}
diff --git a/pkg/priority_queue/pq.go b/pkg/priority_queue/pq.go
deleted file mode 100644
index 2ff52a79..00000000
--- a/pkg/priority_queue/pq.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package priorityqueue
-
-import (
- "sync"
-
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
-)
-
-type PQ struct {
- sync.RWMutex
- bh *BinHeap
-}
-
-func NewPriorityQueue() *PQ {
- return &PQ{
- bh: NewBinHeap(),
- }
-}
-
-func (p *PQ) GetMax() priorityqueue.Item {
- p.Lock()
- defer p.Unlock()
- return p.bh.GetMax()
-}
-
-func (p *PQ) Insert(item priorityqueue.Item) {
- p.Lock()
- p.bh.Insert(item)
- p.Unlock()
-}
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go
new file mode 100644
index 00000000..fe6a06fd
--- /dev/null
+++ b/pkg/priorityqueue/binary_heap.go
@@ -0,0 +1,103 @@
+/*
+binary heap (min-heap) algorithm used as a core for the priority queue
+*/
+
+package priorityqueue
+
+import (
+ "sync"
+ "sync/atomic"
+)
+
+type BinHeap struct {
+ items []Item
+ // find a way to use pointer to the raw data
+ len uint64
+ cond sync.Cond
+}
+
+func NewBinHeap() *BinHeap {
+ return &BinHeap{
+ items: make([]Item, 0, 100),
+ len: 0,
+ cond: sync.Cond{L: &sync.Mutex{}},
+ }
+}
+
+func (bh *BinHeap) fixUp() {
+ k := len(bh.items) - 1
+ p := (k - 1) >> 1 // k-1 / 2
+
+ for k > 0 {
+ cur, par := (bh.items)[k], (bh.items)[p]
+
+ if *cur.Priority() < *par.Priority() {
+ bh.swap(k, p)
+ k = p
+ p = (k - 1) >> 1
+ } else {
+ return
+ }
+ }
+}
+
+func (bh *BinHeap) swap(i, j int) {
+ (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i]
+}
+
+func (bh *BinHeap) fixDown(curr, end int) {
+ cOneIdx := (curr << 1) + 1
+ for cOneIdx <= end {
+ cTwoIdx := -1
+ if (curr<<1)+2 <= end {
+ cTwoIdx = (curr << 1) + 2
+ }
+
+ idxToSwap := cOneIdx
+ // oh my, so unsafe
+ if cTwoIdx > -1 && *(bh.items)[cTwoIdx].Priority() < *(bh.items)[cOneIdx].Priority() {
+ idxToSwap = cTwoIdx
+ }
+ if *(bh.items)[idxToSwap].Priority() < *(bh.items)[curr].Priority() {
+ bh.swap(curr, idxToSwap)
+ curr = idxToSwap
+ cOneIdx = (curr << 1) + 1
+ } else {
+ return
+ }
+ }
+}
+
+func (bh *BinHeap) Insert(item Item) {
+ bh.cond.L.Lock()
+ bh.items = append(bh.items, item)
+
+ // add len to the slice
+ atomic.AddUint64(&bh.len, 1)
+
+ // fix binary heap up
+ bh.fixUp()
+ bh.cond.L.Unlock()
+
+ // signal the goroutine on wait
+ bh.cond.Signal()
+}
+
+func (bh *BinHeap) GetMax() Item {
+ bh.cond.L.Lock()
+ defer bh.cond.L.Unlock()
+
+ for atomic.LoadUint64(&bh.len) == 0 {
+ bh.cond.Wait()
+ }
+
+ bh.swap(0, int(bh.len-1))
+
+ item := (bh.items)[int(bh.len)-1]
+ bh.items = (bh).items[0 : int(bh.len)-1]
+ bh.fixDown(0, int(bh.len-2))
+
+ // reduce len
+ atomic.AddUint64(&bh.len, ^uint64(0))
+ return item
+}
diff --git a/pkg/priority_queue/pq_test.go b/pkg/priorityqueue/binary_heap_test.go
index 49afe5e3..149ec764 100644
--- a/pkg/priority_queue/pq_test.go
+++ b/pkg/priorityqueue/binary_heap_test.go
@@ -6,13 +6,61 @@ import (
"sync/atomic"
"testing"
"time"
+
+ "github.com/spiral/roadrunner/v2/utils"
+ "github.com/stretchr/testify/require"
)
+type Test int
+
+func (t Test) Ack() {
+}
+
+func (t Test) Nack() {
+}
+
+func (t Test) Body() []byte {
+ return nil
+}
+
+func (t Test) Context() []byte {
+ return nil
+}
+
+func (t Test) ID() *string {
+ return utils.AsStringPtr("none")
+}
+
+func (t Test) Priority() *uint64 {
+ return utils.AsUint64Ptr(uint64(t))
+}
+
+func TestBinHeap_Init(t *testing.T) {
+ a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
+
+ bh := NewBinHeap()
+
+ for i := 0; i < len(a); i++ {
+ bh.Insert(a[i])
+ }
+
+ expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
+
+ res := make([]Item, 0, 12)
+
+ for i := 0; i < 11; i++ {
+ item := bh.GetMax()
+ res = append(res, item)
+ }
+
+ require.Equal(t, expected, res)
+}
+
func TestNewPriorityQueue(t *testing.T) {
insertsPerSec := uint64(0)
getPerSec := uint64(0)
stopCh := make(chan struct{}, 1)
- pq := NewPriorityQueue()
+ pq := NewBinHeap()
go func() {
tt := time.NewTicker(time.Second)
diff --git a/pkg/priorityqueue/interface.go b/pkg/priorityqueue/interface.go
new file mode 100644
index 00000000..7ac2e449
--- /dev/null
+++ b/pkg/priorityqueue/interface.go
@@ -0,0 +1,15 @@
+package priorityqueue
+
+type Queue interface {
+ Insert(item Item)
+ GetMax() Item
+}
+
+type Item interface {
+ ID() *string
+ Priority() *uint64
+ Body() []byte
+ Context() []byte
+ Ack()
+ Nack()
+}