summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-11 19:54:35 +0300
committerValery Piashchynski <[email protected]>2021-07-11 19:54:35 +0300
commit0f70f1e2311640236d74a0a237536779d8d44223 (patch)
tree8b2e9dc32b5b6bafe418083c33cce3dbb8f277c7 /pkg
parent240b114e1ea3c1414bcd9f4d2c050d56c467222f (diff)
Update JOBS interface, Renamed Consume -> Run.
Add DYNAMIC declaration of the pipelines. Update Jobs constructor interface, add FromPipeline method to construct jobs driver (unique) via the `Declare` RPC call. Add `Stop` method to gracefully stop all consumers. Binary heaps `GetMax` to canonical `ExtractMin`. Other small improvements. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pool/static_pool_test.go2
-rw-r--r--pkg/priority_queue/binary_heap.go2
-rw-r--r--pkg/priority_queue/binary_heap_test.go6
-rw-r--r--pkg/priority_queue/interface.go2
4 files changed, 6 insertions, 6 deletions
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index f264c6dc..3df773ab 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -204,7 +204,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
- // Consume pool events
+ // Run pool events
ev := make(chan struct{}, 1)
listener := func(event interface{}) {
if pe, ok := event.(events.PoolEvent); ok {
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
index e47dd2c8..514ca460 100644
--- a/pkg/priority_queue/binary_heap.go
+++ b/pkg/priority_queue/binary_heap.go
@@ -104,7 +104,7 @@ func (bh *BinHeap) Insert(item Item) {
bh.cond.Signal()
}
-func (bh *BinHeap) GetMax() Item {
+func (bh *BinHeap) ExtractMin() Item {
bh.cond.L.Lock()
// if len == 0, wait for the signal
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
index 06d0735c..f30cf8d8 100644
--- a/pkg/priority_queue/binary_heap_test.go
+++ b/pkg/priority_queue/binary_heap_test.go
@@ -50,7 +50,7 @@ func TestBinHeap_Init(t *testing.T) {
res := make([]Item, 0, 12)
for i := 0; i < 11; i++ {
- item := bh.GetMax()
+ item := bh.ExtractMin()
res = append(res, item)
}
@@ -83,7 +83,7 @@ func TestNewPriorityQueue(t *testing.T) {
case <-tt.C:
fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec)))
atomic.StoreUint64(&insertsPerSec, 0)
- fmt.Println(fmt.Sprintf("GetMax per second: %d", atomic.LoadUint64(&getPerSec)))
+ fmt.Println(fmt.Sprintf("ExtractMin per second: %d", atomic.LoadUint64(&getPerSec)))
atomic.StoreUint64(&getPerSec, 0)
case <-stopCh:
tt.Stop()
@@ -98,7 +98,7 @@ func TestNewPriorityQueue(t *testing.T) {
case <-stopCh:
return
default:
- pq.GetMax()
+ pq.ExtractMin()
atomic.AddUint64(&getPerSec, 1)
}
}
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
index 8278dc8d..1efebf1c 100644
--- a/pkg/priority_queue/interface.go
+++ b/pkg/priority_queue/interface.go
@@ -2,7 +2,7 @@ package priorityqueue
type Queue interface {
Insert(item Item)
- GetMax() Item
+ ExtractMin() Item
Len() uint64
}