diff options
author | Valery Piashchynski <[email protected]> | 2021-09-02 20:09:01 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-02 20:09:01 +0300 |
commit | 6749db4a2d39fa70b426bcf50edf66a176c07f57 (patch) | |
tree | f6f92c9f0016f6bcac6a9aa45ccc961eebf90018 /plugins | |
parent | 0437d1f58514f694ea86e8176e621c009cd510f9 (diff) | |
parent | 4524f8c5af045ed5048250b63b7859eaeb4f24a1 (diff) |
#778: feat(refactor): jobs code adjustingv2.4.0
#778: feat(refactor): jobs code adjusting
Diffstat (limited to 'plugins')
29 files changed, 269 insertions, 377 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 784a102c..2ff0a40a 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -242,6 +242,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("rabbit_run") pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -287,7 +288,8 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -323,6 +325,7 @@ func (c *consumer) State(ctx context.Context) (*jobState.State, error) { } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -356,11 +359,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -415,22 +420,25 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Stop(context.Context) error { - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} - } + start := time.Now() + c.stopCh <- struct{}{} pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) + return nil } diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go index 04385afe..b837ff86 100644 --- a/plugins/amqp/amqpjobs/item.go +++ b/plugins/amqp/amqpjobs/item.go @@ -43,17 +43,18 @@ type Options struct { Delay int64 `json:"delay,omitempty"` // private - // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + // ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery ack func(multiply bool) error - // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error + requeueFn func(context.Context, *Item) error + // delayed jobs TODO(rustatian): figure out how to get stats from the DLX delayed *int64 multipleAsk bool requeue bool diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 8d21784f..698a34a6 100644 --- a/plugins/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -113,25 +113,22 @@ func (c *consumer) redialer() { //nolint:gocognit c.Unlock() case <-c.stopCh: - if c.publishChan != nil { - pch := <-c.publishChan - err := pch.Close() - if err != nil { - c.log.Error("publish channel close", "error", err) - } + pch := <-c.publishChan + err := pch.Close() + if err != nil { + c.log.Error("publish channel close", "error", err) } if c.consumeChan != nil { - err := c.consumeChan.Close() + err = c.consumeChan.Close() if err != nil { c.log.Error("consume channel close", "error", err) } } - if c.conn != nil { - err := c.conn.Close() - if err != nil { - c.log.Error("amqp connection close", "error", err) - } + + err = c.conn.Close() + if err != nil { + c.log.Error("amqp connection close", "error", err) } return diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go index 5ef89983..30807f03 100644 --- a/plugins/beanstalk/consumer.go +++ b/plugins/beanstalk/consumer.go @@ -3,6 +3,7 @@ package beanstalk import ( "bytes" "context" + "encoding/gob" "strconv" "strings" "sync/atomic" @@ -183,11 +184,16 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error { bb := new(bytes.Buffer) bb.Grow(64) - err := item.pack(bb) + err := gob.NewEncoder(bb).Encode(item) if err != nil { return errors.E(op, err) } + body := make([]byte, bb.Len()) + copy(body, bb.Bytes()) + bb.Reset() + bb = nil + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458 // <pri> is an integer < 2**32. Jobs with smaller priority values will be // scheduled before jobs with larger priorities. The most urgent priority is 0; @@ -203,7 +209,7 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout) + id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout) if err != nil { errD := j.pool.Delete(ctx, id) if errD != nil { @@ -260,9 +266,10 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") - // check if the pipeline registered + start := time.Now() // load atomic value + // check if the pipeline registered pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name())) @@ -276,13 +283,15 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Stop(context.Context) error { + start := time.Now() pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -293,13 +302,15 @@ func (j *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Pause(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -322,11 +333,13 @@ func (j *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (j *consumer) Resume(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -351,7 +364,8 @@ func (j *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go index 0a6cd560..03060994 100644 --- a/plugins/beanstalk/item.go +++ b/plugins/beanstalk/item.go @@ -125,15 +125,6 @@ func fromJob(job *job.Job) *Item { } } -func (i *Item) pack(b *bytes.Buffer) error { - err := gob.NewEncoder(b).Encode(i) - if err != nil { - return err - } - - return nil -} - func (j *consumer) unpack(id uint64, data []byte, out *Item) error { err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) if err != nil { diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index ed0eda61..62045d3b 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return &consumer{ file: pipeline.String(file, rrDB), priority: pipeline.Int(priority, 10), - prefetch: pipeline.Int(prefetch, 100), + prefetch: pipeline.Int(prefetch, 1000), permissions: conf.Permissions, bPool: sync.Pool{New: func() interface{} { @@ -300,6 +300,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("boltdb_run") + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { @@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Stop(_ context.Context) error { + start := time.Now() if atomic.LoadUint32(&c.listeners) > 0 { c.stopCh <- struct{}{} c.stopCh <- struct{}{} @@ -334,12 +337,14 @@ func (c *consumer) Stop(_ context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -361,11 +366,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -389,7 +396,8 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 7c161555..081d3f57 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -3,6 +3,7 @@ package boltjobs import ( "bytes" "encoding/gob" + "sync/atomic" "time" "github.com/spiral/roadrunner/v2/utils" @@ -18,6 +19,10 @@ func (c *consumer) listener() { c.log.Info("boltdb listener stopped") return case <-tt.C: + if atomic.LoadUint64(c.active) > uint64(c.prefetch) { + time.Sleep(time.Second) + continue + } tx, err := c.db.Begin(true) if err != nil { c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go index ba1450cd..656d572e 100644 --- a/plugins/boltdb/boltkv/driver.go +++ b/plugins/boltdb/boltkv/driver.go @@ -38,7 +38,7 @@ type Driver struct { stop chan struct{} } -func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { +func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { const op = errors.Op("new_boltdb_driver") if !cfgPlugin.Has(RootPluginName) { @@ -47,7 +47,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, d := &Driver{ log: log, - stop: stop, + stop: make(chan struct{}), } err := cfgPlugin.UnmarshalKey(key, &d.cfg) @@ -411,6 +411,10 @@ func (d *Driver) Clear() error { return nil } +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + // ========================= PRIVATE ================================= func (d *Driver) startGCLoop() { //nolint:gocognit diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md index 317aec90..1424e586 100644 --- a/plugins/boltdb/doc/job_lifecycle.md +++ b/plugins/boltdb/doc/job_lifecycle.md @@ -7,4 +7,3 @@ There are several boltdb buckets: get into the `InQueueBucket` waiting to acknowledgement. 3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration. -`` diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go index 683b26f1..ad98cf3c 100644 --- a/plugins/boltdb/plugin.go +++ b/plugins/boltdb/plugin.go @@ -19,19 +19,14 @@ const ( // Plugin BoltDB K/V storage. type Plugin struct { - cfgPlugin config.Configurer + cfg config.Configurer // logger log logger.Logger - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - - drivers uint } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.stop = make(chan struct{}) p.log = log - p.cfgPlugin = cfg + p.cfg = cfg return nil } @@ -41,12 +36,6 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } @@ -60,23 +49,20 @@ func (p *Plugin) Available() {} func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") - st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfg) if err != nil { return nil, errors.E(op, err) } - // save driver number to release resources after Stop - p.drivers++ - return st, nil } // JOBS bbolt implementation func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue) + return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfg, e, queue) } func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue) + return boltjobs.FromPipeline(pipe, p.log, p.cfg, e, queue) } diff --git a/plugins/ephemeral/plugin.go b/plugins/ephemeral/plugin.go deleted file mode 100644 index 28495abb..00000000 --- a/plugins/ephemeral/plugin.go +++ /dev/null @@ -1,41 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "ephemeral" -) - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Available() {} - -// JobsConstruct creates new ephemeral consumer from the configuration -func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(configKey, p.log, p.cfg, e, pq) -} - -// FromPipeline creates new ephemeral consumer from the provided pipeline -func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipeline, p.log, e, pq) -} diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go index 06c3254e..adab2a0a 100644 --- a/plugins/jobs/job/job.go +++ b/plugins/jobs/job/job.go @@ -45,17 +45,6 @@ type Options struct { Delay int64 `json:"delay,omitempty"` } -// Merge merges job options. -func (o *Options) Merge(from *Options) { - if o.Pipeline == "" { - o.Pipeline = from.Pipeline - } - - if o.Delay == 0 { - o.Delay = from.Delay - } -} - // DelayDuration returns delay duration in a form of time.Duration. func (o *Options) DelayDuration() time.Duration { return time.Second * time.Duration(o.Delay) diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go index a47151a3..4a95e27d 100644 --- a/plugins/jobs/job/job_test.go +++ b/plugins/jobs/job/job_test.go @@ -16,30 +16,3 @@ func TestOptions_DelayDuration2(t *testing.T) { opts := &Options{Delay: 1} assert.Equal(t, time.Second, opts.DelayDuration()) } - -func TestOptions_Merge(t *testing.T) { - opts := &Options{} - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - }) - - assert.Equal(t, "pipeline", opts.Pipeline) - assert.Equal(t, int64(2), opts.Delay) -} - -func TestOptions_MergeKeepOriginal(t *testing.T) { - opts := &Options{ - Pipeline: "default", - Delay: 10, - } - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - }) - - assert.Equal(t, "default", opts.Pipeline) - assert.Equal(t, int64(10), opts.Delay) -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 3f3fa196..3aec6acc 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -41,7 +41,7 @@ type Plugin struct { server server.Server jobConstructors map[string]jobs.Constructor - consumers map[string]jobs.Consumer + consumers sync.Map // map[string]jobs.Consumer // events handler events events.Handler @@ -82,7 +82,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.events.AddListener(p.collectJobsEvents) p.jobConstructors = make(map[string]jobs.Constructor) - p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) p.stopCh = make(chan struct{}, 1) @@ -130,19 +129,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // jobConstructors contains constructors for the drivers // we need here to initialize these drivers for the pipelines - if c, ok := p.jobConstructors[dr]; ok { + if _, ok := p.jobConstructors[dr]; ok { // config key for the particular sub-driver jobs.pipelines.test-local configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) // init the driver - initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue) + initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue) if err != nil { errCh <- errors.E(op, err) return false } // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers[name] = initializedDriver + p.consumers.Store(name, initializedDriver) // register pipeline for the initialized driver err = initializedDriver.Register(context.Background(), pipe) @@ -331,16 +330,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit } func (p *Plugin) Stop() error { - for k, v := range p.consumers { + // range over all consumers and call stop + p.consumers.Range(func(key, value interface{}) bool { + consumer := value.(jobs.Consumer) ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := v.Stop(ctx) + err := consumer.Stop(ctx) if err != nil { cancel() - p.log.Error("stop job driver", "driver", k) - continue + p.log.Error("stop job driver", "driver", key) + return true } cancel() - } + return true + }) // this function can block forever, but we don't care, because we might have a chance to exit from the pollers, // but if not, this is not a problem at all. @@ -394,18 +396,26 @@ func (p *Plugin) Workers() []*process.State { func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) { const op = errors.Op("jobs_plugin_drivers_state") - jst := make([]*jobState.State, 0, len(p.consumers)) - for k := range p.consumers { - d := p.consumers[k] + jst := make([]*jobState.State, 0, 2) + var err error + p.consumers.Range(func(key, value interface{}) bool { + consumer := value.(jobs.Consumer) newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout)) - state, err := d.State(newCtx) + + var state *jobState.State + state, err = consumer.State(newCtx) if err != nil { cancel() - return nil, errors.E(op, err) + return false } jst = append(jst, state) cancel() + return true + }) + + if err != nil { + return nil, errors.E(op, err) } return jst, nil } @@ -449,13 +459,12 @@ func (p *Plugin) Push(j *job.Job) error { // type conversion ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) } // if job has no priority, inherit it from the pipeline - // TODO(rustatian) merge all options, not only priority if j.Options.Priority == 0 { j.Options.Priority = ppl.Priority() } @@ -463,16 +472,16 @@ func (p *Plugin) Push(j *job.Job) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) defer cancel() - err := d.Push(ctx, j) + err := d.(jobs.Consumer).Push(ctx, j) if err != nil { p.events.Push(events.JobEvent{ Event: events.EventPushError, ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return errors.E(op, err) } @@ -482,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error { ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return nil @@ -492,9 +501,9 @@ func (p *Plugin) Push(j *job.Job) error { func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() for i := 0; i < len(j); i++ { - start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -503,7 +512,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) } @@ -514,7 +523,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { } ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := d.Push(ctx, j[i]) + err := d.(jobs.Consumer).Push(ctx, j[i]) if err != nil { cancel() p.events.Push(events.JobEvent{ @@ -544,7 +553,7 @@ func (p *Plugin) Pause(pp string) { ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { p.log.Warn("driver for the pipeline not found", "pipeline", pp) return @@ -552,7 +561,7 @@ func (p *Plugin) Pause(pp string) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) defer cancel() // redirect call to the underlying driver - d.Pause(ctx, ppl.Name()) + d.(jobs.Consumer).Pause(ctx, ppl.Name()) } func (p *Plugin) Resume(pp string) { @@ -563,7 +572,7 @@ func (p *Plugin) Resume(pp string) { ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { p.log.Warn("driver for the pipeline not found", "pipeline", pp) return @@ -572,7 +581,7 @@ func (p *Plugin) Resume(pp string) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) defer cancel() // redirect call to the underlying driver - d.Resume(ctx, ppl.Name()) + d.(jobs.Consumer).Resume(ctx, ppl.Name()) } // Declare a pipeline. @@ -586,16 +595,13 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // jobConstructors contains constructors for the drivers // we need here to initialize these drivers for the pipelines - if c, ok := p.jobConstructors[dr]; ok { + if _, ok := p.jobConstructors[dr]; ok { // init the driver from pipeline - initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue) + initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue) if err != nil { return errors.E(op, err) } - // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers[pipeline.Name()] = initializedDriver - // register pipeline for the initialized driver err = initializedDriver.Register(context.Background(), pipeline) if err != nil { @@ -612,10 +618,12 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { return errors.E(op, err) } } - } - // save the pipeline - p.pipelines.Store(pipeline.Name(), pipeline) + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers.Store(pipeline.Name(), initializedDriver) + // save the pipeline + p.pipelines.Store(pipeline.Name(), pipeline) + } return nil } @@ -631,18 +639,24 @@ func (p *Plugin) Destroy(pp string) error { // type conversion ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + // delete consumer + d, ok := p.consumers.LoadAndDelete(ppl.Name()) if !ok { return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) } - // delete consumer - delete(p.consumers, ppl.Name()) - p.pipelines.Delete(pp) + // delete old pipeline + p.pipelines.LoadAndDelete(pp) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() + err := d.(jobs.Consumer).Stop(ctx) + if err != nil { + cancel() + return errors.E(op, err) + } - return d.Stop(ctx) + cancel() + return nil } func (p *Plugin) List() []string { diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 94f903d5..d7b93bd1 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -25,7 +25,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { return errors.E(op, errors.Str("empty ID field not allowed")) } - err := r.p.Push(r.from(j.GetJob())) + err := r.p.Push(from(j.GetJob())) if err != nil { return errors.E(op, err) } @@ -43,7 +43,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err for i := 0; i < l; i++ { // convert transport entity into domain // how we can do this quickly - batch[i] = r.from(j.GetJobs()[i]) + batch[i] = from(j.GetJobs()[i]) } err := r.p.PushBatch(batch) @@ -137,8 +137,8 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { } // from converts from transport entity to domain -func (r *rpc) from(j *jobsv1beta.Job) *job.Job { - headers := map[string][]string{} +func from(j *jobsv1beta.Job) *job.Job { + headers := make(map[string][]string, len(j.GetHeaders())) for k, v := range j.GetHeaders() { headers[k] = v.GetValue() diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index c6ca96c3..a1144b85 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -104,6 +104,10 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { + // stop all attached storages + for k := range p.storages { + p.storages[k].Stop() + } return nil } diff --git a/plugins/memcached/config.go b/plugins/memcached/memcachedkv/config.go index 6d413790..569e2573 100644 --- a/plugins/memcached/config.go +++ b/plugins/memcached/memcachedkv/config.go @@ -1,4 +1,4 @@ -package memcached +package memcachedkv type Config struct { // Addr is url for memcached, 11211 port is used by default diff --git a/plugins/memcached/driver.go b/plugins/memcached/memcachedkv/driver.go index e24747fe..6d5e1802 100644 --- a/plugins/memcached/driver.go +++ b/plugins/memcached/memcachedkv/driver.go @@ -1,4 +1,4 @@ -package memcached +package memcachedkv import ( "strings" @@ -246,3 +246,5 @@ func (d *Driver) Clear() error { return nil } + +func (d *Driver) Stop() {} diff --git a/plugins/memcached/plugin.go b/plugins/memcached/plugin.go index 59a2b7cb..47bca0e2 100644 --- a/plugins/memcached/plugin.go +++ b/plugins/memcached/plugin.go @@ -5,6 +5,7 @@ import ( "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memcached/memcachedkv" ) const ( @@ -39,7 +40,7 @@ func (s *Plugin) Available() {} func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") - st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) + st, err := memcachedkv.NewMemcachedDriver(s.log, key, s.cfgPlugin) if err != nil { return nil, errors.E(op, err) } diff --git a/plugins/ephemeral/consumer.go b/plugins/memory/memoryjobs/consumer.go index 8870bb0f..fbdedefe 100644 --- a/plugins/ephemeral/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -1,4 +1,4 @@ -package ephemeral +package memoryjobs import ( "context" @@ -53,7 +53,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh goroutines: 0, active: utils.Int64(0), delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -72,20 +72,16 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - - return jb, nil + return &consumer{ + log: log, + pq: pq, + eh: eh, + localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)), + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}), + }, nil } func (c *consumer) Push(ctx context.Context, jb *job.Job) error { @@ -123,6 +119,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -144,12 +141,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -169,8 +167,9 @@ func (c *consumer) Resume(_ context.Context, p string) { c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Driver: pipe.Driver(), + Start: start, + Elapsed: time.Since(start), }) } @@ -186,17 +185,28 @@ func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { } func (c *consumer) Stop(_ context.Context) error { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} + select { + case c.stopCh <- struct{}{}: + default: + break + } + + for i := 0; i < len(c.localPrefetch); i++ { + // drain all jobs from the channel + <-c.localPrefetch } + c.localPrefetch = nil + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Driver: pipe.Driver(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -219,10 +229,12 @@ func (c *consumer) handleItem(ctx context.Context, msg *Item) error { time.Sleep(jj.Options.DelayDuration()) - // send the item after timeout expired - c.localPrefetch <- jj - - atomic.AddUint64(&c.goroutines, ^uint64(0)) + select { + case c.localPrefetch <- jj: + atomic.AddUint64(&c.goroutines, ^uint64(0)) + default: + c.log.Warn("can't push job", "error", "local queue closed or full") + } }(msg) return nil @@ -247,7 +259,7 @@ func (c *consumer) consume() { select { case item, ok := <-c.localPrefetch: if !ok { - c.log.Warn("ephemeral local prefetch queue was closed") + c.log.Warn("ephemeral local prefetch queue closed") return } diff --git a/plugins/ephemeral/item.go b/plugins/memory/memoryjobs/item.go index 3298424d..f4d62ada 100644 --- a/plugins/ephemeral/item.go +++ b/plugins/memory/memoryjobs/item.go @@ -1,4 +1,4 @@ -package ephemeral +package memoryjobs import ( "context" @@ -124,6 +124,7 @@ func fromJob(job *job.Job) *Item { Job: job.Job, Ident: job.Ident, Payload: job.Payload, + Headers: job.Headers, Options: &Options{ Priority: job.Options.Priority, Pipeline: job.Options.Pipeline, diff --git a/plugins/memory/config.go b/plugins/memory/memorykv/config.go index e51d09c5..a8a8993f 100644 --- a/plugins/memory/config.go +++ b/plugins/memory/memorykv/config.go @@ -1,4 +1,4 @@ -package memory +package memorykv // Config is default config for the in-memory driver type Config struct { diff --git a/plugins/memory/kv.go b/plugins/memory/memorykv/kv.go index 68ea7266..9b3e176c 100644 --- a/plugins/memory/kv.go +++ b/plugins/memory/memorykv/kv.go @@ -1,4 +1,4 @@ -package memory +package memorykv import ( "strings" @@ -20,11 +20,11 @@ type Driver struct { cfg *Config } -func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { +func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configurer) (*Driver, error) { const op = errors.Op("new_in_memory_driver") d := &Driver{ - stop: stop, + stop: make(chan struct{}), log: log, } @@ -40,7 +40,7 @@ func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configure return d, nil } -func (s *Driver) Has(keys ...string) (map[string]bool, error) { +func (d *Driver) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("in_memory_plugin_has") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -52,7 +52,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) { return nil, errors.E(op, errors.EmptyKey) } - if _, ok := s.heap.Load(keys[i]); ok { + if _, ok := d.heap.Load(keys[i]); ok { m[keys[i]] = true } } @@ -60,7 +60,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) { return m, nil } -func (s *Driver) Get(key string) ([]byte, error) { +func (d *Driver) Get(key string) ([]byte, error) { const op = errors.Op("in_memory_plugin_get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) @@ -68,7 +68,7 @@ func (s *Driver) Get(key string) ([]byte, error) { return nil, errors.E(op, errors.EmptyKey) } - if data, exist := s.heap.Load(key); exist { + if data, exist := d.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function return data.(*kvv1.Item).Value, nil @@ -76,7 +76,7 @@ func (s *Driver) Get(key string) ([]byte, error) { return nil, nil } -func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("in_memory_plugin_mget") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -93,7 +93,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { m := make(map[string][]byte, len(keys)) for i := range keys { - if value, ok := s.heap.Load(keys[i]); ok { + if value, ok := d.heap.Load(keys[i]); ok { m[keys[i]] = value.(*kvv1.Item).Value } } @@ -101,7 +101,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { return m, nil } -func (s *Driver) Set(items ...*kvv1.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -120,14 +120,14 @@ func (s *Driver) Set(items ...*kvv1.Item) error { } } - s.heap.Store(items[i].Key, items[i]) + d.heap.Store(items[i].Key, items[i]) } return nil } // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...*kvv1.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_mexpire") for i := range items { if items[i] == nil { @@ -138,7 +138,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error { } // if key exist, overwrite it value - if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok { + if pItem, ok := d.heap.LoadAndDelete(items[i].Key); ok { // check that time is correct _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { @@ -148,7 +148,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error { // guess that t is in the future // in memory is just FOR TESTING PURPOSES // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, &kvv1.Item{ + d.heap.Store(items[i].Key, &kvv1.Item{ Key: items[i].Key, Value: tmp.Value, Timeout: items[i].Timeout, @@ -159,7 +159,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error { return nil } -func (s *Driver) TTL(keys ...string) (map[string]string, error) { +func (d *Driver) TTL(keys ...string) (map[string]string, error) { const op = errors.Op("in_memory_plugin_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -176,14 +176,14 @@ func (s *Driver) TTL(keys ...string) (map[string]string, error) { m := make(map[string]string, len(keys)) for i := range keys { - if item, ok := s.heap.Load(keys[i]); ok { + if item, ok := d.heap.Load(keys[i]); ok { m[keys[i]] = item.(*kvv1.Item).Timeout } } return m, nil } -func (s *Driver) Delete(keys ...string) error { +func (d *Driver) Delete(keys ...string) error { const op = errors.Op("in_memory_plugin_delete") if keys == nil { return errors.E(op, errors.NoKeys) @@ -198,34 +198,38 @@ func (s *Driver) Delete(keys ...string) error { } for i := range keys { - s.heap.Delete(keys[i]) + d.heap.Delete(keys[i]) } return nil } -func (s *Driver) Clear() error { - s.clearMu.Lock() - s.heap = sync.Map{} - s.clearMu.Unlock() +func (d *Driver) Clear() error { + d.clearMu.Lock() + d.heap = sync.Map{} + d.clearMu.Unlock() return nil } +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + // ================================== PRIVATE ====================================== -func (s *Driver) gc() { - ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) +func (d *Driver) gc() { + ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second) + defer ticker.Stop() for { select { - case <-s.stop: - ticker.Stop() + case <-d.stop: return case now := <-ticker.C: // mutes needed to clear the map - s.clearMu.RLock() + d.clearMu.RLock() // check every second - s.heap.Range(func(key, value interface{}) bool { + d.heap.Range(func(key, value interface{}) bool { v := value.(*kvv1.Item) if v.Timeout == "" { return true @@ -237,13 +241,13 @@ func (s *Driver) gc() { } if now.After(t) { - s.log.Debug("key deleted", "key", key) - s.heap.Delete(key) + d.log.Debug("key deleted", "key", key) + d.heap.Delete(key) } return true }) - s.clearMu.RUnlock() + d.clearMu.RUnlock() } } } diff --git a/plugins/memory/pubsub.go b/plugins/memory/memorypubsub/pubsub.go index fd30eb54..75122571 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/memorypubsub/pubsub.go @@ -1,4 +1,4 @@ -package memory +package memorypubsub import ( "context" diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 7d418a70..515e469a 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,27 +2,29 @@ package memory import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/common/pubsub" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs" + "github.com/spiral/roadrunner/v2/plugins/memory/memorykv" + "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub" ) const PluginName string = "memory" type Plugin struct { - // heap is user map for the key-value pairs - stop chan struct{} - - log logger.Logger - cfgPlugin config.Configurer - drivers uint + log logger.Logger + cfg config.Configurer } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log - p.cfgPlugin = cfg - p.stop = make(chan struct{}, 1) + p.cfg = cfg return nil } @@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Available() {} + +// Drivers implementation + func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { - return NewPubSubDriver(p.log, key) + return memorypubsub.NewPubSubDriver(p.log, key) } func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("inmemory_plugin_provide") - st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg) if err != nil { return nil, errors.E(op, err) } - - // save driver number to release resources after Stop - p.drivers++ - return st, nil } -func (p *Plugin) Name() string { - return PluginName +// JobsConstruct creates new ephemeral consumer from the configuration +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline creates new ephemeral consumer from the provided pipeline +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.FromPipeline(pipeline, p.log, e, pq) } diff --git a/plugins/redis/clients.go b/plugins/redis/clients.go deleted file mode 100644 index d0a184d2..00000000 --- a/plugins/redis/clients.go +++ /dev/null @@ -1,84 +0,0 @@ -package redis - -import ( - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" -) - -// RedisClient return a client based on the provided section key -// key sample: kv.some-section.redis -// kv.redis -// redis (root) -func (p *Plugin) RedisClient(key string) (redis.UniversalClient, error) { - const op = errors.Op("redis_get_client") - - if !p.cfgPlugin.Has(key) { - return nil, errors.E(op, errors.Errorf("no such section: %s", key)) - } - - cfg := &Config{} - - err := p.cfgPlugin.UnmarshalKey(key, cfg) - if err != nil { - return nil, errors.E(op, err) - } - - cfg.InitDefaults() - - uc := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: cfg.Addrs, - DB: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - SentinelPassword: cfg.SentinelPassword, - MaxRetries: cfg.MaxRetries, - MinRetryBackoff: cfg.MaxRetryBackoff, - MaxRetryBackoff: cfg.MaxRetryBackoff, - DialTimeout: cfg.DialTimeout, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - PoolSize: cfg.PoolSize, - MinIdleConns: cfg.MinIdleConns, - MaxConnAge: cfg.MaxConnAge, - PoolTimeout: cfg.PoolTimeout, - IdleTimeout: cfg.IdleTimeout, - IdleCheckFrequency: cfg.IdleCheckFreq, - ReadOnly: cfg.ReadOnly, - RouteByLatency: cfg.RouteByLatency, - RouteRandomly: cfg.RouteRandomly, - MasterName: cfg.MasterName, - }) - - return uc, nil -} - -func (p *Plugin) DefaultClient() redis.UniversalClient { - cfg := &Config{} - cfg.InitDefaults() - - uc := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: cfg.Addrs, - DB: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - SentinelPassword: cfg.SentinelPassword, - MaxRetries: cfg.MaxRetries, - MinRetryBackoff: cfg.MaxRetryBackoff, - MaxRetryBackoff: cfg.MaxRetryBackoff, - DialTimeout: cfg.DialTimeout, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - PoolSize: cfg.PoolSize, - MinIdleConns: cfg.MinIdleConns, - MaxConnAge: cfg.MaxConnAge, - PoolTimeout: cfg.PoolTimeout, - IdleTimeout: cfg.IdleTimeout, - IdleCheckFrequency: cfg.IdleCheckFreq, - ReadOnly: cfg.ReadOnly, - RouteByLatency: cfg.RouteByLatency, - RouteRandomly: cfg.RouteRandomly, - MasterName: cfg.MasterName, - }) - - return uc -} diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go deleted file mode 100644 index 189b0002..00000000 --- a/plugins/redis/interface.go +++ /dev/null @@ -1,12 +0,0 @@ -package redis - -import "github.com/go-redis/redis/v8" - -// Redis in the redis KV plugin interface -type Redis interface { - // RedisClient provides universal redis client - RedisClient(key string) (redis.UniversalClient, error) - - // DefaultClient provide default redis client based on redis defaults - DefaultClient() redis.UniversalClient -} diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go index b41cb86c..3d062fbb 100644 --- a/plugins/redis/kv/kv.go +++ b/plugins/redis/kv/kv.go @@ -248,3 +248,5 @@ func (d *Driver) Clear() error { return nil } + +func (d *Driver) Stop() {} diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go index dfbda154..92dbd6a8 100644 --- a/plugins/sqs/consumer.go +++ b/plugins/sqs/consumer.go @@ -298,6 +298,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("sqs_run") c.Lock() @@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Stop(context.Context) error { + start := time.Now() if atomic.LoadUint32(&c.listeners) > 0 { c.pauseCh <- struct{}{} } @@ -333,12 +336,14 @@ func (c *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -362,11 +367,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -391,7 +398,8 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } |