summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-31 13:05:15 +0300
committerValery Piashchynski <[email protected]>2021-08-31 13:05:15 +0300
commit2f44878a7eac71d7b81e66246b46c615a95892d7 (patch)
treedca0f488614dc7e366aeec6face7296d9c52d2bf /plugins
parent3a187237282444f70b1eae8881f08cb6f0e068fc (diff)
Tune listener timers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/boltdb/boltjobs/listener.go10
-rw-r--r--plugins/jobs/plugin.go1
2 files changed, 3 insertions, 8 deletions
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 39de34ab..7c161555 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -3,7 +3,6 @@ package boltjobs
import (
"bytes"
"encoding/gob"
- "sync/atomic"
"time"
"github.com/spiral/roadrunner/v2/utils"
@@ -11,7 +10,7 @@ import (
)
func (c *consumer) listener() {
- tt := time.NewTicker(time.Millisecond * 10)
+ tt := time.NewTicker(time.Millisecond)
defer tt.Stop()
for {
select {
@@ -19,11 +18,6 @@ func (c *consumer) listener() {
c.log.Info("boltdb listener stopped")
return
case <-tt.C:
- if atomic.LoadUint64(c.active) >= uint64(c.prefetch) {
- time.Sleep(time.Second)
- continue
- }
-
tx, err := c.db.Begin(true)
if err != nil {
c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
@@ -78,7 +72,7 @@ func (c *consumer) listener() {
}
func (c *consumer) delayedJobsListener() {
- tt := time.NewTicker(time.Millisecond * 10)
+ tt := time.NewTicker(time.Second)
defer tt.Stop()
// just some 90's
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 91a77446..83b302ee 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -318,6 +318,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Start: start,
Elapsed: time.Since(start),
})
+
// return payload
p.putPayload(exec)
}