diff options
Diffstat (limited to 'plugins/boltdb')
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 18 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 5 | ||||
-rw-r--r-- | plugins/boltdb/boltkv/driver.go | 8 | ||||
-rw-r--r-- | plugins/boltdb/doc/job_lifecycle.md | 1 | ||||
-rw-r--r-- | plugins/boltdb/plugin.go | 24 |
5 files changed, 29 insertions, 27 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index ed0eda61..62045d3b 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return &consumer{ file: pipeline.String(file, rrDB), priority: pipeline.Int(priority, 10), - prefetch: pipeline.Int(prefetch, 100), + prefetch: pipeline.Int(prefetch, 1000), permissions: conf.Permissions, bPool: sync.Pool{New: func() interface{} { @@ -300,6 +300,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("boltdb_run") + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { @@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Stop(_ context.Context) error { + start := time.Now() if atomic.LoadUint32(&c.listeners) > 0 { c.stopCh <- struct{}{} c.stopCh <- struct{}{} @@ -334,12 +337,14 @@ func (c *consumer) Stop(_ context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -361,11 +366,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -389,7 +396,8 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 7c161555..081d3f57 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -3,6 +3,7 @@ package boltjobs import ( "bytes" "encoding/gob" + "sync/atomic" "time" "github.com/spiral/roadrunner/v2/utils" @@ -18,6 +19,10 @@ func (c *consumer) listener() { c.log.Info("boltdb listener stopped") return case <-tt.C: + if atomic.LoadUint64(c.active) > uint64(c.prefetch) { + time.Sleep(time.Second) + continue + } tx, err := c.db.Begin(true) if err != nil { c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go index ba1450cd..656d572e 100644 --- a/plugins/boltdb/boltkv/driver.go +++ b/plugins/boltdb/boltkv/driver.go @@ -38,7 +38,7 @@ type Driver struct { stop chan struct{} } -func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { +func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { const op = errors.Op("new_boltdb_driver") if !cfgPlugin.Has(RootPluginName) { @@ -47,7 +47,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, d := &Driver{ log: log, - stop: stop, + stop: make(chan struct{}), } err := cfgPlugin.UnmarshalKey(key, &d.cfg) @@ -411,6 +411,10 @@ func (d *Driver) Clear() error { return nil } +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + // ========================= PRIVATE ================================= func (d *Driver) startGCLoop() { //nolint:gocognit diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md index 317aec90..1424e586 100644 --- a/plugins/boltdb/doc/job_lifecycle.md +++ b/plugins/boltdb/doc/job_lifecycle.md @@ -7,4 +7,3 @@ There are several boltdb buckets: get into the `InQueueBucket` waiting to acknowledgement. 3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration. -`` diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go index 683b26f1..ad98cf3c 100644 --- a/plugins/boltdb/plugin.go +++ b/plugins/boltdb/plugin.go @@ -19,19 +19,14 @@ const ( // Plugin BoltDB K/V storage. type Plugin struct { - cfgPlugin config.Configurer + cfg config.Configurer // logger log logger.Logger - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - - drivers uint } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.stop = make(chan struct{}) p.log = log - p.cfgPlugin = cfg + p.cfg = cfg return nil } @@ -41,12 +36,6 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } @@ -60,23 +49,20 @@ func (p *Plugin) Available() {} func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") - st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfg) if err != nil { return nil, errors.E(op, err) } - // save driver number to release resources after Stop - p.drivers++ - return st, nil } // JOBS bbolt implementation func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue) + return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfg, e, queue) } func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue) + return boltjobs.FromPipeline(pipe, p.log, p.cfg, e, queue) } |