diff options
author | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
commit | 2c78e93222cc9d3b88456175348e42f7f40c449b (patch) | |
tree | be4fc671db33ceb8700019a5ede900c8d900d7c0 /pkg/priority_queue | |
parent | 207739f7346c98e16087547bc510e1f909671260 (diff) |
Rework ephemeral and binary heaps
Implemented a sync.Cond for binary heap algo to save processor from
spinning in the for loop and receiving nil Items until the Queue will be
filled.
Add num_pollers option to the configuration to specify number of
pollers from the queue.
Add Resume, ResumeAll, Stop, StopAll, PushBatch methods to the ephemeral.
Remove map and use sync.Map in the ephemeral broker.
Add protobuf schema.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/priority_queue')
-rw-r--r-- | pkg/priority_queue/binary_heap.go | 75 | ||||
-rw-r--r-- | pkg/priority_queue/binary_heap_test.go | 53 | ||||
-rw-r--r-- | pkg/priority_queue/pq.go | 30 | ||||
-rw-r--r-- | pkg/priority_queue/pq_test.go | 65 |
4 files changed, 0 insertions, 223 deletions
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go deleted file mode 100644 index c7c148da..00000000 --- a/pkg/priority_queue/binary_heap.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -binary heap (min-heap) algorithm used as a core for the priority queue -*/ - -package priorityqueue - -import priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - -type BinHeap []priorityqueue.Item - -func NewBinHeap() *BinHeap { - return &BinHeap{} -} - -func (bh *BinHeap) fixUp() { - k := len(*bh) - 1 - p := (k - 1) >> 1 // k-1 / 2 - - for k > 0 { - cur, par := (*bh)[k], (*bh)[p] - - if cur.Priority() < par.Priority() { - bh.swap(k, p) - k = p - p = (k - 1) >> 1 - } else { - return - } - } -} - -func (bh *BinHeap) swap(i, j int) { - (*bh)[i], (*bh)[j] = (*bh)[j], (*bh)[i] -} - -func (bh *BinHeap) fixDown(curr, end int) { - cOneIdx := curr*2 + 1 - for cOneIdx <= end { - cTwoIdx := -1 - if curr*2+2 <= end { - cTwoIdx = curr*2 + 2 - } - - idxToSwap := cOneIdx - if cTwoIdx > -1 && (*bh)[cTwoIdx].Priority() < (*bh)[cOneIdx].Priority() { - idxToSwap = cTwoIdx - } - if (*bh)[idxToSwap].Priority() < (*bh)[curr].Priority() { - bh.swap(curr, idxToSwap) - curr = idxToSwap - cOneIdx = curr*2 + 1 - } else { - return - } - } -} - -func (bh *BinHeap) Insert(item priorityqueue.Item) { - *bh = append(*bh, item) - bh.fixUp() -} - -func (bh *BinHeap) GetMax() priorityqueue.Item { - l := len(*bh) - if l == 0 { - return nil - } - - bh.swap(0, l-1) - - item := (*bh)[l-1] - *bh = (*bh)[0 : l-1] - bh.fixDown(0, l-2) - return item -} diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go deleted file mode 100644 index 528e8fd0..00000000 --- a/pkg/priority_queue/binary_heap_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package priorityqueue - -import ( - "testing" - - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - "github.com/stretchr/testify/require" -) - -type Test int - -func (t Test) Ack() { -} - -func (t Test) Nack() { -} - -func (t Test) Body() []byte { - return nil -} - -func (t Test) Context() []byte { - return nil -} - -func (t Test) ID() string { - return "" -} - -func (t Test) Priority() uint64 { - return uint64(t) -} - -func TestBinHeap_Init(t *testing.T) { - a := []priorityqueue.Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} - - bh := NewBinHeap() - - for i := 0; i < len(a); i++ { - bh.Insert(a[i]) - } - - expected := []priorityqueue.Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} - - res := make([]priorityqueue.Item, 0, 12) - - for i := 0; i < 11; i++ { - item := bh.GetMax() - res = append(res, item) - } - - require.Equal(t, expected, res) -} diff --git a/pkg/priority_queue/pq.go b/pkg/priority_queue/pq.go deleted file mode 100644 index 2ff52a79..00000000 --- a/pkg/priority_queue/pq.go +++ /dev/null @@ -1,30 +0,0 @@ -package priorityqueue - -import ( - "sync" - - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" -) - -type PQ struct { - sync.RWMutex - bh *BinHeap -} - -func NewPriorityQueue() *PQ { - return &PQ{ - bh: NewBinHeap(), - } -} - -func (p *PQ) GetMax() priorityqueue.Item { - p.Lock() - defer p.Unlock() - return p.bh.GetMax() -} - -func (p *PQ) Insert(item priorityqueue.Item) { - p.Lock() - p.bh.Insert(item) - p.Unlock() -} diff --git a/pkg/priority_queue/pq_test.go b/pkg/priority_queue/pq_test.go deleted file mode 100644 index 49afe5e3..00000000 --- a/pkg/priority_queue/pq_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package priorityqueue - -import ( - "fmt" - "math/rand" - "sync/atomic" - "testing" - "time" -) - -func TestNewPriorityQueue(t *testing.T) { - insertsPerSec := uint64(0) - getPerSec := uint64(0) - stopCh := make(chan struct{}, 1) - pq := NewPriorityQueue() - - go func() { - tt := time.NewTicker(time.Second) - - for { - select { - case <-tt.C: - fmt.Println(fmt.Sprintf("GetMax per second: %d", atomic.LoadUint64(&getPerSec))) - fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec))) - atomic.StoreUint64(&getPerSec, 0) - atomic.StoreUint64(&insertsPerSec, 0) - case <-stopCh: - tt.Stop() - return - } - } - }() - - go func() { - for { - select { - case <-stopCh: - return - default: - it := pq.GetMax() - if it == nil { - continue - } - atomic.AddUint64(&getPerSec, 1) - } - } - }() - - go func() { - for { - select { - case <-stopCh: - return - default: - pq.Insert(Test(rand.Int())) //nolint:gosec - atomic.AddUint64(&insertsPerSec, 1) - } - } - }() - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - stopCh <- struct{}{} - stopCh <- struct{}{} -} |