summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-08 14:28:05 +0300
committerValery Piashchynski <[email protected]>2021-07-08 14:28:05 +0300
commit05956485a121ee47a2f8281a8a0dffd7eecc68aa (patch)
tree1fd0ffd60c1731ea6934300506c3061f6e65d1d7 /pkg
parentc7becb2fc51fc09523f6640eb72f360a6b4681f5 (diff)
Add pipeline and job plugin options...
Skeleton for the amqp plugin. Add Timeout and Pipeline to the job.Context() method. Implement queue limits for the ephemeral driver with main priority queue limits. Update configuration, add pipeline_size for every pipeline and jobs priority queue size. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r--pkg/priorityqueue/binary_heap.go6
-rw-r--r--pkg/priorityqueue/binary_heap_test.go17
2 files changed, 16 insertions, 7 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go
index 82424331..e47dd2c8 100644
--- a/pkg/priorityqueue/binary_heap.go
+++ b/pkg/priorityqueue/binary_heap.go
@@ -18,12 +18,8 @@ type BinHeap struct {
}
func NewBinHeap(maxLen uint64) *BinHeap {
- if maxLen == 0 {
- maxLen = 100_000
- }
-
return &BinHeap{
- items: make([]Item, 0, maxLen),
+ items: make([]Item, 0, 1000),
len: 0,
maxLen: maxLen,
cond: sync.Cond{L: &sync.Mutex{}},
diff --git a/pkg/priorityqueue/binary_heap_test.go b/pkg/priorityqueue/binary_heap_test.go
index 125884b1..53505f52 100644
--- a/pkg/priorityqueue/binary_heap_test.go
+++ b/pkg/priorityqueue/binary_heap_test.go
@@ -37,7 +37,7 @@ func (t Test) Priority() uint64 {
func TestBinHeap_Init(t *testing.T) {
a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
- bh := NewBinHeap(100)
+ bh := NewBinHeap(0)
for i := 0; i < len(a); i++ {
bh.Insert(a[i])
@@ -59,7 +59,19 @@ func TestNewPriorityQueue(t *testing.T) {
insertsPerSec := uint64(0)
getPerSec := uint64(0)
stopCh := make(chan struct{}, 1)
- pq := NewBinHeap(10000000)
+ pq := NewBinHeap(1000)
+
+ go func() {
+ tt3 := time.NewTicker(time.Millisecond * 10)
+ for {
+ select {
+ case <-tt3.C:
+ require.Less(t, pq.Len(), uint64(1002))
+ case <-stopCh:
+ return
+ }
+ }
+ }()
go func() {
tt := time.NewTicker(time.Second)
@@ -106,4 +118,5 @@ func TestNewPriorityQueue(t *testing.T) {
stopCh <- struct{}{}
stopCh <- struct{}{}
stopCh <- struct{}{}
+ stopCh <- struct{}{}
}