summaryrefslogtreecommitdiff
path: root/plugins/boltdb/boltjobs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/boltdb/boltjobs')
-rw-r--r--plugins/boltdb/boltjobs/consumer.go2
-rw-r--r--plugins/boltdb/boltjobs/listener.go5
2 files changed, 6 insertions, 1 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index ed0eda61..46d596fa 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return &consumer{
file: pipeline.String(file, rrDB),
priority: pipeline.Int(priority, 10),
- prefetch: pipeline.Int(prefetch, 100),
+ prefetch: pipeline.Int(prefetch, 1000),
permissions: conf.Permissions,
bPool: sync.Pool{New: func() interface{} {
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 7c161555..081d3f57 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -3,6 +3,7 @@ package boltjobs
import (
"bytes"
"encoding/gob"
+ "sync/atomic"
"time"
"github.com/spiral/roadrunner/v2/utils"
@@ -18,6 +19,10 @@ func (c *consumer) listener() {
c.log.Info("boltdb listener stopped")
return
case <-tt.C:
+ if atomic.LoadUint64(c.active) > uint64(c.prefetch) {
+ time.Sleep(time.Second)
+ continue
+ }
tx, err := c.db.Begin(true)
if err != nil {
c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)