diff options
author | Valery Piashchynski <[email protected]> | 2021-07-11 19:54:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-11 19:54:35 +0300 |
commit | 0f70f1e2311640236d74a0a237536779d8d44223 (patch) | |
tree | 8b2e9dc32b5b6bafe418083c33cce3dbb8f277c7 /pkg | |
parent | 240b114e1ea3c1414bcd9f4d2c050d56c467222f (diff) |
Update JOBS interface, Renamed Consume -> Run.
Add DYNAMIC declaration of the pipelines. Update Jobs constructor
interface, add FromPipeline method to construct jobs driver (unique)
via the `Declare` RPC call.
Add `Stop` method to gracefully stop all consumers.
Binary heaps `GetMax` to canonical `ExtractMin`.
Other small improvements.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pool/static_pool_test.go | 2 | ||||
-rw-r--r-- | pkg/priority_queue/binary_heap.go | 2 | ||||
-rw-r--r-- | pkg/priority_queue/binary_heap_test.go | 6 | ||||
-rw-r--r-- | pkg/priority_queue/interface.go | 2 |
4 files changed, 6 insertions, 6 deletions
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index f264c6dc..3df773ab 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -204,7 +204,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() - // Consume pool events + // Run pool events ev := make(chan struct{}, 1) listener := func(event interface{}) { if pe, ok := event.(events.PoolEvent); ok { diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go index e47dd2c8..514ca460 100644 --- a/pkg/priority_queue/binary_heap.go +++ b/pkg/priority_queue/binary_heap.go @@ -104,7 +104,7 @@ func (bh *BinHeap) Insert(item Item) { bh.cond.Signal() } -func (bh *BinHeap) GetMax() Item { +func (bh *BinHeap) ExtractMin() Item { bh.cond.L.Lock() // if len == 0, wait for the signal diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go index 06d0735c..f30cf8d8 100644 --- a/pkg/priority_queue/binary_heap_test.go +++ b/pkg/priority_queue/binary_heap_test.go @@ -50,7 +50,7 @@ func TestBinHeap_Init(t *testing.T) { res := make([]Item, 0, 12) for i := 0; i < 11; i++ { - item := bh.GetMax() + item := bh.ExtractMin() res = append(res, item) } @@ -83,7 +83,7 @@ func TestNewPriorityQueue(t *testing.T) { case <-tt.C: fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec))) atomic.StoreUint64(&insertsPerSec, 0) - fmt.Println(fmt.Sprintf("GetMax per second: %d", atomic.LoadUint64(&getPerSec))) + fmt.Println(fmt.Sprintf("ExtractMin per second: %d", atomic.LoadUint64(&getPerSec))) atomic.StoreUint64(&getPerSec, 0) case <-stopCh: tt.Stop() @@ -98,7 +98,7 @@ func TestNewPriorityQueue(t *testing.T) { case <-stopCh: return default: - pq.GetMax() + pq.ExtractMin() atomic.AddUint64(&getPerSec, 1) } } diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go index 8278dc8d..1efebf1c 100644 --- a/pkg/priority_queue/interface.go +++ b/pkg/priority_queue/interface.go @@ -2,7 +2,7 @@ package priorityqueue type Queue interface { Insert(item Item) - GetMax() Item + ExtractMin() Item Len() uint64 } |