summaryrefslogtreecommitdiff
path: root/plugins/boltdb
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/boltdb')
-rw-r--r--plugins/boltdb/boltjobs/consumer.go18
-rw-r--r--plugins/boltdb/boltjobs/listener.go5
-rw-r--r--plugins/boltdb/boltkv/driver.go8
-rw-r--r--plugins/boltdb/doc/job_lifecycle.md1
-rw-r--r--plugins/boltdb/plugin.go24
5 files changed, 29 insertions, 27 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index ed0eda61..62045d3b 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return &consumer{
file: pipeline.String(file, rrDB),
priority: pipeline.Int(priority, 10),
- prefetch: pipeline.Int(prefetch, 100),
+ prefetch: pipeline.Int(prefetch, 1000),
permissions: conf.Permissions,
bPool: sync.Pool{New: func() interface{} {
@@ -300,6 +300,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("boltdb_run")
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
@@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Stop(_ context.Context) error {
+ start := time.Now()
if atomic.LoadUint32(&c.listeners) > 0 {
c.stopCh <- struct{}{}
c.stopCh <- struct{}{}
@@ -334,12 +337,14 @@ func (c *consumer) Stop(_ context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -361,11 +366,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -389,7 +396,8 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 7c161555..081d3f57 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -3,6 +3,7 @@ package boltjobs
import (
"bytes"
"encoding/gob"
+ "sync/atomic"
"time"
"github.com/spiral/roadrunner/v2/utils"
@@ -18,6 +19,10 @@ func (c *consumer) listener() {
c.log.Info("boltdb listener stopped")
return
case <-tt.C:
+ if atomic.LoadUint64(c.active) > uint64(c.prefetch) {
+ time.Sleep(time.Second)
+ continue
+ }
tx, err := c.db.Begin(true)
if err != nil {
c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go
index ba1450cd..656d572e 100644
--- a/plugins/boltdb/boltkv/driver.go
+++ b/plugins/boltdb/boltkv/driver.go
@@ -38,7 +38,7 @@ type Driver struct {
stop chan struct{}
}
-func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
+func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) {
const op = errors.Op("new_boltdb_driver")
if !cfgPlugin.Has(RootPluginName) {
@@ -47,7 +47,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
d := &Driver{
log: log,
- stop: stop,
+ stop: make(chan struct{}),
}
err := cfgPlugin.UnmarshalKey(key, &d.cfg)
@@ -411,6 +411,10 @@ func (d *Driver) Clear() error {
return nil
}
+func (d *Driver) Stop() {
+ d.stop <- struct{}{}
+}
+
// ========================= PRIVATE =================================
func (d *Driver) startGCLoop() { //nolint:gocognit
diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md
index 317aec90..1424e586 100644
--- a/plugins/boltdb/doc/job_lifecycle.md
+++ b/plugins/boltdb/doc/job_lifecycle.md
@@ -7,4 +7,3 @@ There are several boltdb buckets:
get into the `InQueueBucket` waiting to acknowledgement.
3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration.
-``
diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go
index 683b26f1..ad98cf3c 100644
--- a/plugins/boltdb/plugin.go
+++ b/plugins/boltdb/plugin.go
@@ -19,19 +19,14 @@ const (
// Plugin BoltDB K/V storage.
type Plugin struct {
- cfgPlugin config.Configurer
+ cfg config.Configurer
// logger
log logger.Logger
- // stop is used to stop keys GC and close boltdb connection
- stop chan struct{}
-
- drivers uint
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.stop = make(chan struct{})
p.log = log
- p.cfgPlugin = cfg
+ p.cfg = cfg
return nil
}
@@ -41,12 +36,6 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
- if p.drivers > 0 {
- for i := uint(0); i < p.drivers; i++ {
- // send close signal to every driver
- p.stop <- struct{}{}
- }
- }
return nil
}
@@ -60,23 +49,20 @@ func (p *Plugin) Available() {}
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
- st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop)
+ st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfg)
if err != nil {
return nil, errors.E(op, err)
}
- // save driver number to release resources after Stop
- p.drivers++
-
return st, nil
}
// JOBS bbolt implementation
func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
- return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue)
+ return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfg, e, queue)
}
func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
- return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue)
+ return boltjobs.FromPipeline(pipe, p.log, p.cfg, e, queue)
}