From 9c8da162b3347b632f33f85d56e8c1ff7014631a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 1 Sep 2021 13:19:09 +0300 Subject: Code polishing before release Signed-off-by: Valery Piashchynski --- Makefile | 14 +- common/kv/interface.go | 3 + plugins/boltdb/boltjobs/consumer.go | 2 +- plugins/boltdb/boltjobs/listener.go | 5 + plugins/boltdb/boltkv/driver.go | 8 +- plugins/boltdb/doc/job_lifecycle.md | 1 - plugins/boltdb/plugin.go | 24 +- plugins/ephemeral/consumer.go | 269 ---------- plugins/ephemeral/item.go | 133 ----- plugins/ephemeral/plugin.go | 41 -- plugins/kv/plugin.go | 4 + plugins/memcached/config.go | 12 - plugins/memcached/driver.go | 248 --------- plugins/memcached/memcachedkv/config.go | 12 + plugins/memcached/memcachedkv/driver.go | 250 +++++++++ plugins/memcached/plugin.go | 3 +- plugins/memory/config.go | 14 - plugins/memory/kv.go | 249 --------- plugins/memory/memoryjobs/consumer.go | 269 ++++++++++ plugins/memory/memoryjobs/item.go | 133 +++++ plugins/memory/memorykv/config.go | 14 + plugins/memory/memorykv/kv.go | 253 +++++++++ plugins/memory/memorypubsub/pubsub.go | 92 ++++ plugins/memory/plugin.go | 50 +- plugins/memory/pubsub.go | 92 ---- plugins/redis/clients.go | 84 --- plugins/redis/interface.go | 12 - plugins/redis/kv/kv.go | 2 + tests/plugins/jobs/configs/.rr-jobs-init.yaml | 6 +- .../jobs/ephemeral/.rr-ephemeral-declare.yaml | 21 - .../plugins/jobs/ephemeral/.rr-ephemeral-init.yaml | 37 -- .../jobs/ephemeral/.rr-ephemeral-jobs-err.yaml | 21 - .../jobs/ephemeral/.rr-ephemeral-pause-resume.yaml | 44 -- tests/plugins/jobs/jobs_ephemeral_test.go | 571 --------------------- tests/plugins/jobs/jobs_general_test.go | 10 +- tests/plugins/jobs/jobs_memory_test.go | 571 +++++++++++++++++++++ tests/plugins/jobs/memory/.rr-memory-declare.yaml | 21 + tests/plugins/jobs/memory/.rr-memory-init.yaml | 37 ++ tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml | 21 + .../jobs/memory/.rr-memory-pause-resume.yaml | 44 ++ .../kv/configs/.rr-kv-bolt-no-interval.yaml | 3 + tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml | 3 + tests/plugins/kv/configs/.rr-kv-init.yaml | 6 +- tests/plugins/redis/plugin1.go | 45 -- tests/plugins/redis/redis_plugin_test.go | 120 ----- tests/plugins/rpc/configs/.rr-rpc-disabled.yaml | 7 +- tests/plugins/rpc/configs/.rr.yaml | 8 +- 47 files changed, 1809 insertions(+), 2080 deletions(-) delete mode 100644 plugins/ephemeral/consumer.go delete mode 100644 plugins/ephemeral/item.go delete mode 100644 plugins/ephemeral/plugin.go delete mode 100644 plugins/memcached/config.go delete mode 100644 plugins/memcached/driver.go create mode 100644 plugins/memcached/memcachedkv/config.go create mode 100644 plugins/memcached/memcachedkv/driver.go delete mode 100644 plugins/memory/config.go delete mode 100644 plugins/memory/kv.go create mode 100644 plugins/memory/memoryjobs/consumer.go create mode 100644 plugins/memory/memoryjobs/item.go create mode 100644 plugins/memory/memorykv/config.go create mode 100644 plugins/memory/memorykv/kv.go create mode 100644 plugins/memory/memorypubsub/pubsub.go delete mode 100644 plugins/memory/pubsub.go delete mode 100644 plugins/redis/clients.go delete mode 100644 plugins/redis/interface.go delete mode 100644 tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml delete mode 100644 tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml delete mode 100644 tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml delete mode 100644 tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml delete mode 100644 tests/plugins/jobs/jobs_ephemeral_test.go create mode 100644 tests/plugins/jobs/jobs_memory_test.go create mode 100644 tests/plugins/jobs/memory/.rr-memory-declare.yaml create mode 100644 tests/plugins/jobs/memory/.rr-memory-init.yaml create mode 100644 tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml create mode 100644 tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml delete mode 100644 tests/plugins/redis/plugin1.go delete mode 100644 tests/plugins/redis/redis_plugin_test.go diff --git a/Makefile b/Makefile index 389c9014..1de45451 100755 --- a/Makefile +++ b/Makefile @@ -35,7 +35,6 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/headers.txt -covermode=atomic ./tests/plugins/headers go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/logger.txt -covermode=atomic ./tests/plugins/logger go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/metrics.txt -covermode=atomic ./tests/plugins/metrics - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/redis.txt -covermode=atomic ./tests/plugins/redis go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/resetter.txt -covermode=atomic ./tests/plugins/resetter go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./tests/plugins/rpc cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt @@ -50,10 +49,15 @@ test: ## Run application tests go test -v -race -tags=debug ./pkg/worker_watcher go test -v -race -tags=debug ./pkg/bst go test -v -race -tags=debug ./pkg/priority_queue - go test -v -race -tags=debug ./plugins/jobs/job go test -v -race -tags=debug ./plugins/jobs/pipeline go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./plugins/server + go test -v -race -tags=debug ./plugins/jobs/job + go test -v -race -tags=debug ./tests/plugins/jobs + go test -v -race -tags=debug ./tests/plugins/kv + go test -v -race -tags=debug ./tests/plugins/broadcast + go test -v -race -tags=debug ./tests/plugins/websockets + go test -v -race -tags=debug ./plugins/websockets go test -v -race -tags=debug ./tests/plugins/http go test -v -race -tags=debug ./tests/plugins/informer go test -v -race -tags=debug ./tests/plugins/reload @@ -65,12 +69,6 @@ test: ## Run application tests go test -v -race -tags=debug ./tests/plugins/headers go test -v -race -tags=debug ./tests/plugins/logger go test -v -race -tags=debug ./tests/plugins/metrics - go test -v -race -tags=debug ./tests/plugins/redis go test -v -race -tags=debug ./tests/plugins/resetter go test -v -race -tags=debug ./tests/plugins/rpc - go test -v -race -tags=debug ./tests/plugins/kv - go test -v -race -tags=debug ./tests/plugins/broadcast - go test -v -race -tags=debug ./tests/plugins/websockets - go test -v -race -tags=debug ./plugins/websockets - go test -v -race -tags=debug ./tests/plugins/jobs docker-compose -f tests/env/docker-compose.yaml down diff --git a/common/kv/interface.go b/common/kv/interface.go index 5736a6a7..bc6a07b2 100644 --- a/common/kv/interface.go +++ b/common/kv/interface.go @@ -30,6 +30,9 @@ type Storage interface { // Delete one or multiple keys. Delete(keys ...string) error + + // Stop the storage driver + Stop() } // Constructor provides storage based on the config diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index ed0eda61..46d596fa 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return &consumer{ file: pipeline.String(file, rrDB), priority: pipeline.Int(priority, 10), - prefetch: pipeline.Int(prefetch, 100), + prefetch: pipeline.Int(prefetch, 1000), permissions: conf.Permissions, bPool: sync.Pool{New: func() interface{} { diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 7c161555..081d3f57 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -3,6 +3,7 @@ package boltjobs import ( "bytes" "encoding/gob" + "sync/atomic" "time" "github.com/spiral/roadrunner/v2/utils" @@ -18,6 +19,10 @@ func (c *consumer) listener() { c.log.Info("boltdb listener stopped") return case <-tt.C: + if atomic.LoadUint64(c.active) > uint64(c.prefetch) { + time.Sleep(time.Second) + continue + } tx, err := c.db.Begin(true) if err != nil { c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go index ba1450cd..656d572e 100644 --- a/plugins/boltdb/boltkv/driver.go +++ b/plugins/boltdb/boltkv/driver.go @@ -38,7 +38,7 @@ type Driver struct { stop chan struct{} } -func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { +func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { const op = errors.Op("new_boltdb_driver") if !cfgPlugin.Has(RootPluginName) { @@ -47,7 +47,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, d := &Driver{ log: log, - stop: stop, + stop: make(chan struct{}), } err := cfgPlugin.UnmarshalKey(key, &d.cfg) @@ -411,6 +411,10 @@ func (d *Driver) Clear() error { return nil } +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + // ========================= PRIVATE ================================= func (d *Driver) startGCLoop() { //nolint:gocognit diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md index 317aec90..1424e586 100644 --- a/plugins/boltdb/doc/job_lifecycle.md +++ b/plugins/boltdb/doc/job_lifecycle.md @@ -7,4 +7,3 @@ There are several boltdb buckets: get into the `InQueueBucket` waiting to acknowledgement. 3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration. -`` diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go index 683b26f1..ad98cf3c 100644 --- a/plugins/boltdb/plugin.go +++ b/plugins/boltdb/plugin.go @@ -19,19 +19,14 @@ const ( // Plugin BoltDB K/V storage. type Plugin struct { - cfgPlugin config.Configurer + cfg config.Configurer // logger log logger.Logger - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - - drivers uint } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.stop = make(chan struct{}) p.log = log - p.cfgPlugin = cfg + p.cfg = cfg return nil } @@ -41,12 +36,6 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } @@ -60,23 +49,20 @@ func (p *Plugin) Available() {} func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") - st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfg) if err != nil { return nil, errors.E(op, err) } - // save driver number to release resources after Stop - p.drivers++ - return st, nil } // JOBS bbolt implementation func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue) + return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfg, e, queue) } func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue) + return boltjobs.FromPipeline(pipe, p.log, p.cfg, e, queue) } diff --git a/plugins/ephemeral/consumer.go b/plugins/ephemeral/consumer.go deleted file mode 100644 index 8870bb0f..00000000 --- a/plugins/ephemeral/consumer.go +++ /dev/null @@ -1,269 +0,0 @@ -package ephemeral - -import ( - "context" - "sync/atomic" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - prefetch string = "prefetch" - goroutinesMax uint64 = 1000 -) - -type Config struct { - Prefetch uint64 `mapstructure:"prefetch"` -} - -type consumer struct { - cfg *Config - log logger.Logger - eh events.Handler - pipeline atomic.Value - pq priorityqueue.Queue - localPrefetch chan *Item - - // time.sleep goroutines max number - goroutines uint64 - - delayed *int64 - active *int64 - - listeners uint32 - stopCh chan struct{} -} - -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_ephemeral_pipeline") - - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), - } - - err := cfg.UnmarshalKey(configKey, &jb.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100_000 - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) - - return jb, nil -} - -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - - return jb, nil -} - -func (c *consumer) Push(ctx context.Context, jb *job.Job) error { - const op = errors.Op("ephemeral_push") - - // check if the pipeline registered - _, ok := c.pipeline.Load().(*pipeline.Pipeline) - if !ok { - return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) - } - - err := c.handleItem(ctx, fromJob(jb)) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (c *consumer) State(_ context.Context) (*jobState.State, error) { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - return &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: pipe.Name(), - Active: atomic.LoadInt64(c.active), - Delayed: atomic.LoadInt64(c.delayed), - Ready: ready(atomic.LoadUint32(&c.listeners)), - }, nil -} - -func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - c.pipeline.Store(pipeline) - return nil -} - -func (c *consumer) Pause(_ context.Context, p string) { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested pause on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // no active listeners - if l == 0 { - c.log.Warn("no active listeners, nothing to pause") - return - } - - atomic.AddUint32(&c.listeners, ^uint32(0)) - - // stop the consumer - c.stopCh <- struct{}{} - - c.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) -} - -func (c *consumer) Resume(_ context.Context, p string) { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - c.log.Error("no such pipeline", "requested resume on: ", p) - } - - l := atomic.LoadUint32(&c.listeners) - // listener already active - if l == 1 { - c.log.Warn("listener already in the active state") - return - } - - // resume the consumer on the same channel - c.consume() - - atomic.StoreUint32(&c.listeners, 1) - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) -} - -// Run is no-op for the ephemeral -func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - c.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - -func (c *consumer) Stop(_ context.Context) error { - pipe := c.pipeline.Load().(*pipeline.Pipeline) - - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} - } - - c.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) - - return nil -} - -func (c *consumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("ephemeral_handle_request") - // handle timeouts - // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) - // goroutines here. We should limit goroutines here. - if msg.Options.Delay > 0 { - // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&c.goroutines) >= goroutinesMax { - return errors.E(op, errors.Str("max concurrency number reached")) - } - - go func(jj *Item) { - atomic.AddUint64(&c.goroutines, 1) - atomic.AddInt64(c.delayed, 1) - - time.Sleep(jj.Options.DelayDuration()) - - // send the item after timeout expired - c.localPrefetch <- jj - - atomic.AddUint64(&c.goroutines, ^uint64(0)) - }(msg) - - return nil - } - - // increase number of the active jobs - atomic.AddInt64(c.active, 1) - - // insert to the local, limited pipeline - select { - case c.localPrefetch <- msg: - return nil - case <-ctx.Done(): - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err())) - } -} - -func (c *consumer) consume() { - go func() { - // redirect - for { - select { - case item, ok := <-c.localPrefetch: - if !ok { - c.log.Warn("ephemeral local prefetch queue was closed") - return - } - - // set requeue channel - item.Options.requeueFn = c.handleItem - item.Options.active = c.active - item.Options.delayed = c.delayed - - c.pq.Insert(item) - case <-c.stopCh: - return - } - } - }() -} - -func ready(r uint32) bool { - return r > 0 -} diff --git a/plugins/ephemeral/item.go b/plugins/ephemeral/item.go deleted file mode 100644 index 3298424d..00000000 --- a/plugins/ephemeral/item.go +++ /dev/null @@ -1,133 +0,0 @@ -package ephemeral - -import ( - "context" - "sync/atomic" - "time" - - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" -) - -type Item struct { - // Job contains name of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // private - requeueFn func(context.Context, *Item) error - active *int64 - delayed *int64 -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -// Body packs job payload into binary payload. -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -// Context packs job context (job, id) into binary payload. -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - i.atomicallyReduceCount() - return nil -} - -func (i *Item) Nack() error { - i.atomicallyReduceCount() - return nil -} - -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - // overwrite the delay - i.Options.Delay = delay - i.Headers = headers - - i.atomicallyReduceCount() - - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - return err - } - - return nil -} - -// atomicallyReduceCount reduces counter of active or delayed jobs -func (i *Item) atomicallyReduceCount() { - // if job was delayed, reduce number of the delayed jobs - if i.Options.Delay > 0 { - atomic.AddInt64(i.Options.delayed, ^int64(0)) - return - } - - // otherwise, reduce number of the active jobs - atomic.AddInt64(i.Options.active, ^int64(0)) - // noop for the in-memory -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} diff --git a/plugins/ephemeral/plugin.go b/plugins/ephemeral/plugin.go deleted file mode 100644 index 28495abb..00000000 --- a/plugins/ephemeral/plugin.go +++ /dev/null @@ -1,41 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "ephemeral" -) - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Available() {} - -// JobsConstruct creates new ephemeral consumer from the configuration -func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(configKey, p.log, p.cfg, e, pq) -} - -// FromPipeline creates new ephemeral consumer from the provided pipeline -func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipeline, p.log, e, pq) -} diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index c6ca96c3..a1144b85 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -104,6 +104,10 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { + // stop all attached storages + for k := range p.storages { + p.storages[k].Stop() + } return nil } diff --git a/plugins/memcached/config.go b/plugins/memcached/config.go deleted file mode 100644 index 6d413790..00000000 --- a/plugins/memcached/config.go +++ /dev/null @@ -1,12 +0,0 @@ -package memcached - -type Config struct { - // Addr is url for memcached, 11211 port is used by default - Addr []string -} - -func (s *Config) InitDefaults() { - if s.Addr == nil { - s.Addr = []string{"127.0.0.1:11211"} // default url for memcached - } -} diff --git a/plugins/memcached/driver.go b/plugins/memcached/driver.go deleted file mode 100644 index e24747fe..00000000 --- a/plugins/memcached/driver.go +++ /dev/null @@ -1,248 +0,0 @@ -package memcached - -import ( - "strings" - "time" - - "github.com/bradfitz/gomemcache/memcache" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" -) - -type Driver struct { - client *memcache.Client - log logger.Logger - cfg *Config -} - -// NewMemcachedDriver returns a memcache client using the provided server(s) -// with equal weight. If a server is listed multiple times, -// it gets a proportional amount of weight. -func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { - const op = errors.Op("new_memcached_driver") - - s := &Driver{ - log: log, - } - - err := cfgPlugin.UnmarshalKey(key, &s.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - s.cfg.InitDefaults() - - m := memcache.New(s.cfg.Addr...) - s.client = m - - return s, nil -} - -// Has checks the key for existence -func (d *Driver) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("memcached_plugin_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - m := make(map[string]bool, len(keys)) - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - exist, err := d.client.Get(keys[i]) - - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return nil, errors.E(op, err) - } - if exist != nil { - m[keys[i]] = true - } - } - return m, nil -} - -// Get gets the item for the given key. ErrCacheMiss is returned for a -// memcache cache miss. The key must be at most 250 bytes in length. -func (d *Driver) Get(key string) ([]byte, error) { - const op = errors.Op("memcached_plugin_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - data, err := d.client.Get(key) - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - return nil, nil - } - return nil, errors.E(op, err) - } - if data != nil { - // return the value by the key - return data.Value, nil - } - // data is nil by some reason and error also nil - return nil, nil -} - -// MGet return map with key -- string -// and map value as value -- []byte -func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { - const op = errors.Op("memcached_plugin_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string][]byte, len(keys)) - for i := range keys { - // Here also MultiGet - data, err := d.client.Get(keys[i]) - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return nil, errors.E(op, err) - } - if data != nil { - m[keys[i]] = data.Value - } - } - - return m, nil -} - -// Set sets the KV pairs. Keys should be 250 bytes maximum -// TTL: -// Expiration is the cache expiration time, in seconds: either a relative -// time from now (up to 1 month), or an absolute Unix epoch time. -// Zero means the Item has no expiration time. -func (d *Driver) Set(items ...*kvv1.Item) error { - const op = errors.Op("memcached_plugin_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - for i := range items { - if items[i] == nil { - return errors.E(op, errors.EmptyItem) - } - - // pre-allocate item - memcachedItem := &memcache.Item{ - Key: items[i].Key, - // unsafe convert - Value: items[i].Value, - Flags: 0, - } - - // add additional TTL in case of TTL isn't empty - if items[i].Timeout != "" { - // verify the TTL - t, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return err - } - memcachedItem.Expiration = int32(t.Unix()) - } - - err := d.client.Set(memcachedItem) - if err != nil { - return err - } - } - - return nil -} - -// MExpire Expiration is the cache expiration time, in seconds: either a relative -// time from now (up to 1 month), or an absolute Unix epoch time. -// Zero means the Item has no expiration time. -func (d *Driver) MExpire(items ...*kvv1.Item) error { - const op = errors.Op("memcached_plugin_mexpire") - for i := range items { - if items[i] == nil { - continue - } - if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // verify provided TTL - t, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return errors.E(op, err) - } - - // Touch updates the expiry for the given key. The seconds parameter is either - // a Unix timestamp or, if seconds is less than 1 month, the number of seconds - // into the future at which time the item will expire. Zero means the item has - // no expiration time. ErrCacheMiss is returned if the key is not in the cache. - // The key must be at most 250 bytes in length. - err = d.client.Touch(items[i].Key, int32(t.Unix())) - if err != nil { - return errors.E(op, err) - } - } - return nil -} - -// TTL return time in seconds (int32) for a given keys -func (d *Driver) TTL(_ ...string) (map[string]string, error) { - const op = errors.Op("memcached_plugin_ttl") - return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) -} - -func (d *Driver) Delete(keys ...string) error { - const op = errors.Op("memcached_plugin_has") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - for i := range keys { - err := d.client.Delete(keys[i]) - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return errors.E(op, err) - } - } - return nil -} - -func (d *Driver) Clear() error { - err := d.client.DeleteAll() - if err != nil { - d.log.Error("flush_all operation failed", "error", err) - return err - } - - return nil -} diff --git a/plugins/memcached/memcachedkv/config.go b/plugins/memcached/memcachedkv/config.go new file mode 100644 index 00000000..569e2573 --- /dev/null +++ b/plugins/memcached/memcachedkv/config.go @@ -0,0 +1,12 @@ +package memcachedkv + +type Config struct { + // Addr is url for memcached, 11211 port is used by default + Addr []string +} + +func (s *Config) InitDefaults() { + if s.Addr == nil { + s.Addr = []string{"127.0.0.1:11211"} // default url for memcached + } +} diff --git a/plugins/memcached/memcachedkv/driver.go b/plugins/memcached/memcachedkv/driver.go new file mode 100644 index 00000000..6d5e1802 --- /dev/null +++ b/plugins/memcached/memcachedkv/driver.go @@ -0,0 +1,250 @@ +package memcachedkv + +import ( + "strings" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" +) + +type Driver struct { + client *memcache.Client + log logger.Logger + cfg *Config +} + +// NewMemcachedDriver returns a memcache client using the provided server(s) +// with equal weight. If a server is listed multiple times, +// it gets a proportional amount of weight. +func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { + const op = errors.Op("new_memcached_driver") + + s := &Driver{ + log: log, + } + + err := cfgPlugin.UnmarshalKey(key, &s.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + s.cfg.InitDefaults() + + m := memcache.New(s.cfg.Addr...) + s.client = m + + return s, nil +} + +// Has checks the key for existence +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool, len(keys)) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + exist, err := d.client.Get(keys[i]) + + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if exist != nil { + m[keys[i]] = true + } + } + return m, nil +} + +// Get gets the item for the given key. ErrCacheMiss is returned for a +// memcache cache miss. The key must be at most 250 bytes in length. +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("memcached_plugin_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + data, err := d.client.Get(key) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + return nil, nil + } + return nil, errors.E(op, err) + } + if data != nil { + // return the value by the key + return data.Value, nil + } + // data is nil by some reason and error also nil + return nil, nil +} + +// MGet return map with key -- string +// and map value as value -- []byte +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { + const op = errors.Op("memcached_plugin_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string][]byte, len(keys)) + for i := range keys { + // Here also MultiGet + data, err := d.client.Get(keys[i]) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if data != nil { + m[keys[i]] = data.Value + } + } + + return m, nil +} + +// Set sets the KV pairs. Keys should be 250 bytes maximum +// TTL: +// Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (d *Driver) Set(items ...*kvv1.Item) error { + const op = errors.Op("memcached_plugin_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + if items[i] == nil { + return errors.E(op, errors.EmptyItem) + } + + // pre-allocate item + memcachedItem := &memcache.Item{ + Key: items[i].Key, + // unsafe convert + Value: items[i].Value, + Flags: 0, + } + + // add additional TTL in case of TTL isn't empty + if items[i].Timeout != "" { + // verify the TTL + t, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return err + } + memcachedItem.Expiration = int32(t.Unix()) + } + + err := d.client.Set(memcachedItem) + if err != nil { + return err + } + } + + return nil +} + +// MExpire Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (d *Driver) MExpire(items ...*kvv1.Item) error { + const op = errors.Op("memcached_plugin_mexpire") + for i := range items { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // verify provided TTL + t, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return errors.E(op, err) + } + + // Touch updates the expiry for the given key. The seconds parameter is either + // a Unix timestamp or, if seconds is less than 1 month, the number of seconds + // into the future at which time the item will expire. Zero means the item has + // no expiration time. ErrCacheMiss is returned if the key is not in the cache. + // The key must be at most 250 bytes in length. + err = d.client.Touch(items[i].Key, int32(t.Unix())) + if err != nil { + return errors.E(op, err) + } + } + return nil +} + +// TTL return time in seconds (int32) for a given keys +func (d *Driver) TTL(_ ...string) (map[string]string, error) { + const op = errors.Op("memcached_plugin_ttl") + return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) +} + +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + err := d.client.Delete(keys[i]) + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return errors.E(op, err) + } + } + return nil +} + +func (d *Driver) Clear() error { + err := d.client.DeleteAll() + if err != nil { + d.log.Error("flush_all operation failed", "error", err) + return err + } + + return nil +} + +func (d *Driver) Stop() {} diff --git a/plugins/memcached/plugin.go b/plugins/memcached/plugin.go index 59a2b7cb..47bca0e2 100644 --- a/plugins/memcached/plugin.go +++ b/plugins/memcached/plugin.go @@ -5,6 +5,7 @@ import ( "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memcached/memcachedkv" ) const ( @@ -39,7 +40,7 @@ func (s *Plugin) Available() {} func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") - st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) + st, err := memcachedkv.NewMemcachedDriver(s.log, key, s.cfgPlugin) if err != nil { return nil, errors.E(op, err) } diff --git a/plugins/memory/config.go b/plugins/memory/config.go deleted file mode 100644 index e51d09c5..00000000 --- a/plugins/memory/config.go +++ /dev/null @@ -1,14 +0,0 @@ -package memory - -// Config is default config for the in-memory driver -type Config struct { - // Interval for the check - Interval int -} - -// InitDefaults by default driver is turned off -func (c *Config) InitDefaults() { - if c.Interval == 0 { - c.Interval = 60 // seconds - } -} diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go deleted file mode 100644 index 68ea7266..00000000 --- a/plugins/memory/kv.go +++ /dev/null @@ -1,249 +0,0 @@ -package memory - -import ( - "strings" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" -) - -type Driver struct { - clearMu sync.RWMutex - heap sync.Map - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - log logger.Logger - cfg *Config -} - -func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { - const op = errors.Op("new_in_memory_driver") - - d := &Driver{ - stop: stop, - log: log, - } - - err := cfgPlugin.UnmarshalKey(key, &d.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - d.cfg.InitDefaults() - - go d.gc() - - return d, nil -} - -func (s *Driver) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("in_memory_plugin_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - m := make(map[string]bool) - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if _, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = true - } - } - - return m, nil -} - -func (s *Driver) Get(key string) ([]byte, error) { - const op = errors.Op("in_memory_plugin_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - if data, exist := s.heap.Load(key); exist { - // here might be a panic - // but data only could be a string, see Set function - return data.(*kvv1.Item).Value, nil - } - return nil, nil -} - -func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { - const op = errors.Op("in_memory_plugin_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string][]byte, len(keys)) - - for i := range keys { - if value, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = value.(*kvv1.Item).Value - } - } - - return m, nil -} - -func (s *Driver) Set(items ...*kvv1.Item) error { - const op = errors.Op("in_memory_plugin_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - for i := range items { - if items[i] == nil { - continue - } - // TTL is set - if items[i].Timeout != "" { - // check the TTL in the item - _, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return err - } - } - - s.heap.Store(items[i].Key, items[i]) - } - return nil -} - -// MExpire sets the expiration time to the key -// If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...*kvv1.Item) error { - const op = errors.Op("in_memory_plugin_mexpire") - for i := range items { - if items[i] == nil { - continue - } - if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // if key exist, overwrite it value - if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok { - // check that time is correct - _, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return errors.E(op, err) - } - tmp := pItem.(*kvv1.Item) - // guess that t is in the future - // in memory is just FOR TESTING PURPOSES - // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, &kvv1.Item{ - Key: items[i].Key, - Value: tmp.Value, - Timeout: items[i].Timeout, - }) - } - } - - return nil -} - -func (s *Driver) TTL(keys ...string) (map[string]string, error) { - const op = errors.Op("in_memory_plugin_ttl") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]string, len(keys)) - - for i := range keys { - if item, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = item.(*kvv1.Item).Timeout - } - } - return m, nil -} - -func (s *Driver) Delete(keys ...string) error { - const op = errors.Op("in_memory_plugin_delete") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - for i := range keys { - s.heap.Delete(keys[i]) - } - return nil -} - -func (s *Driver) Clear() error { - s.clearMu.Lock() - s.heap = sync.Map{} - s.clearMu.Unlock() - - return nil -} - -// ================================== PRIVATE ====================================== - -func (s *Driver) gc() { - ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) - for { - select { - case <-s.stop: - ticker.Stop() - return - case now := <-ticker.C: - // mutes needed to clear the map - s.clearMu.RLock() - - // check every second - s.heap.Range(func(key, value interface{}) bool { - v := value.(*kvv1.Item) - if v.Timeout == "" { - return true - } - - t, err := time.Parse(time.RFC3339, v.Timeout) - if err != nil { - return false - } - - if now.After(t) { - s.log.Debug("key deleted", "key", key) - s.heap.Delete(key) - } - return true - }) - - s.clearMu.RUnlock() - } - } -} diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go new file mode 100644 index 00000000..94abaadb --- /dev/null +++ b/plugins/memory/memoryjobs/consumer.go @@ -0,0 +1,269 @@ +package memoryjobs + +import ( + "context" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" +) + +const ( + prefetch string = "prefetch" + goroutinesMax uint64 = 1000 +) + +type Config struct { + Prefetch uint64 `mapstructure:"prefetch"` +} + +type consumer struct { + cfg *Config + log logger.Logger + eh events.Handler + pipeline atomic.Value + pq priorityqueue.Queue + localPrefetch chan *Item + + // time.sleep goroutines max number + goroutines uint64 + + delayed *int64 + active *int64 + + listeners uint32 + stopCh chan struct{} +} + +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_ephemeral_pipeline") + + jb := &consumer{ + log: log, + pq: pq, + eh: eh, + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}, 1), + } + + err := cfg.UnmarshalKey(configKey, &jb.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + if jb.cfg.Prefetch == 0 { + jb.cfg.Prefetch = 100_000 + } + + // initialize a local queue + jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) + + return jb, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + jb := &consumer{ + log: log, + pq: pq, + eh: eh, + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}, 1), + } + + // initialize a local queue + jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) + + return jb, nil +} + +func (c *consumer) Push(ctx context.Context, jb *job.Job) error { + const op = errors.Op("ephemeral_push") + + // check if the pipeline registered + _, ok := c.pipeline.Load().(*pipeline.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) + } + + err := c.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (c *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: pipe.Name(), + Active: atomic.LoadInt64(c.active), + Delayed: atomic.LoadInt64(c.delayed), + Ready: ready(atomic.LoadUint32(&c.listeners)), + }, nil +} + +func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + c.pipeline.Store(pipeline) + return nil +} + +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 0 { + c.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&c.listeners, ^uint32(0)) + + // stop the consumer + c.stopCh <- struct{}{} + + c.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) +} + +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested resume on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // listener already active + if l == 1 { + c.log.Warn("listener already in the active state") + return + } + + // resume the consumer on the same channel + c.consume() + + atomic.StoreUint32(&c.listeners, 1) + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) +} + +// Run is no-op for the ephemeral +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil +} + +func (c *consumer) Stop(_ context.Context) error { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + } + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) + + return nil +} + +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("ephemeral_handle_request") + // handle timeouts + // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) + // goroutines here. We should limit goroutines here. + if msg.Options.Delay > 0 { + // if we have 1000 goroutines waiting on the delay - reject 1001 + if atomic.LoadUint64(&c.goroutines) >= goroutinesMax { + return errors.E(op, errors.Str("max concurrency number reached")) + } + + go func(jj *Item) { + atomic.AddUint64(&c.goroutines, 1) + atomic.AddInt64(c.delayed, 1) + + time.Sleep(jj.Options.DelayDuration()) + + // send the item after timeout expired + c.localPrefetch <- jj + + atomic.AddUint64(&c.goroutines, ^uint64(0)) + }(msg) + + return nil + } + + // increase number of the active jobs + atomic.AddInt64(c.active, 1) + + // insert to the local, limited pipeline + select { + case c.localPrefetch <- msg: + return nil + case <-ctx.Done(): + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err())) + } +} + +func (c *consumer) consume() { + go func() { + // redirect + for { + select { + case item, ok := <-c.localPrefetch: + if !ok { + c.log.Warn("ephemeral local prefetch queue was closed") + return + } + + // set requeue channel + item.Options.requeueFn = c.handleItem + item.Options.active = c.active + item.Options.delayed = c.delayed + + c.pq.Insert(item) + case <-c.stopCh: + return + } + } + }() +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/memory/memoryjobs/item.go b/plugins/memory/memoryjobs/item.go new file mode 100644 index 00000000..8224c26b --- /dev/null +++ b/plugins/memory/memoryjobs/item.go @@ -0,0 +1,133 @@ +package memoryjobs + +import ( + "context" + "sync/atomic" + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" +) + +type Item struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // private + requeueFn func(context.Context, *Item) error + active *int64 + delayed *int64 +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +// Body packs job payload into binary payload. +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + i.atomicallyReduceCount() + return nil +} + +func (i *Item) Nack() error { + i.atomicallyReduceCount() + return nil +} + +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + // overwrite the delay + i.Options.Delay = delay + i.Headers = headers + + i.atomicallyReduceCount() + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err + } + + return nil +} + +// atomicallyReduceCount reduces counter of active or delayed jobs +func (i *Item) atomicallyReduceCount() { + // if job was delayed, reduce number of the delayed jobs + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + return + } + + // otherwise, reduce number of the active jobs + atomic.AddInt64(i.Options.active, ^int64(0)) + // noop for the in-memory +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } +} diff --git a/plugins/memory/memorykv/config.go b/plugins/memory/memorykv/config.go new file mode 100644 index 00000000..a8a8993f --- /dev/null +++ b/plugins/memory/memorykv/config.go @@ -0,0 +1,14 @@ +package memorykv + +// Config is default config for the in-memory driver +type Config struct { + // Interval for the check + Interval int +} + +// InitDefaults by default driver is turned off +func (c *Config) InitDefaults() { + if c.Interval == 0 { + c.Interval = 60 // seconds + } +} diff --git a/plugins/memory/memorykv/kv.go b/plugins/memory/memorykv/kv.go new file mode 100644 index 00000000..9b3e176c --- /dev/null +++ b/plugins/memory/memorykv/kv.go @@ -0,0 +1,253 @@ +package memorykv + +import ( + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" +) + +type Driver struct { + clearMu sync.RWMutex + heap sync.Map + // stop is used to stop keys GC and close boltdb connection + stop chan struct{} + log logger.Logger + cfg *Config +} + +func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configurer) (*Driver, error) { + const op = errors.Op("new_in_memory_driver") + + d := &Driver{ + stop: make(chan struct{}), + log: log, + } + + err := cfgPlugin.UnmarshalKey(key, &d.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + d.cfg.InitDefaults() + + go d.gc() + + return d, nil +} + +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("in_memory_plugin_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + if _, ok := d.heap.Load(keys[i]); ok { + m[keys[i]] = true + } + } + + return m, nil +} + +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("in_memory_plugin_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + if data, exist := d.heap.Load(key); exist { + // here might be a panic + // but data only could be a string, see Set function + return data.(*kvv1.Item).Value, nil + } + return nil, nil +} + +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { + const op = errors.Op("in_memory_plugin_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string][]byte, len(keys)) + + for i := range keys { + if value, ok := d.heap.Load(keys[i]); ok { + m[keys[i]] = value.(*kvv1.Item).Value + } + } + + return m, nil +} + +func (d *Driver) Set(items ...*kvv1.Item) error { + const op = errors.Op("in_memory_plugin_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + if items[i] == nil { + continue + } + // TTL is set + if items[i].Timeout != "" { + // check the TTL in the item + _, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return err + } + } + + d.heap.Store(items[i].Key, items[i]) + } + return nil +} + +// MExpire sets the expiration time to the key +// If key already has the expiration time, it will be overwritten +func (d *Driver) MExpire(items ...*kvv1.Item) error { + const op = errors.Op("in_memory_plugin_mexpire") + for i := range items { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // if key exist, overwrite it value + if pItem, ok := d.heap.LoadAndDelete(items[i].Key); ok { + // check that time is correct + _, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return errors.E(op, err) + } + tmp := pItem.(*kvv1.Item) + // guess that t is in the future + // in memory is just FOR TESTING PURPOSES + // LOGIC ISN'T IDEAL + d.heap.Store(items[i].Key, &kvv1.Item{ + Key: items[i].Key, + Value: tmp.Value, + Timeout: items[i].Timeout, + }) + } + } + + return nil +} + +func (d *Driver) TTL(keys ...string) (map[string]string, error) { + const op = errors.Op("in_memory_plugin_ttl") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]string, len(keys)) + + for i := range keys { + if item, ok := d.heap.Load(keys[i]); ok { + m[keys[i]] = item.(*kvv1.Item).Timeout + } + } + return m, nil +} + +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("in_memory_plugin_delete") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + d.heap.Delete(keys[i]) + } + return nil +} + +func (d *Driver) Clear() error { + d.clearMu.Lock() + d.heap = sync.Map{} + d.clearMu.Unlock() + + return nil +} + +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + +// ================================== PRIVATE ====================================== + +func (d *Driver) gc() { + ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second) + defer ticker.Stop() + for { + select { + case <-d.stop: + return + case now := <-ticker.C: + // mutes needed to clear the map + d.clearMu.RLock() + + // check every second + d.heap.Range(func(key, value interface{}) bool { + v := value.(*kvv1.Item) + if v.Timeout == "" { + return true + } + + t, err := time.Parse(time.RFC3339, v.Timeout) + if err != nil { + return false + } + + if now.After(t) { + d.log.Debug("key deleted", "key", key) + d.heap.Delete(key) + } + return true + }) + + d.clearMu.RUnlock() + } + } +} diff --git a/plugins/memory/memorypubsub/pubsub.go b/plugins/memory/memorypubsub/pubsub.go new file mode 100644 index 00000000..75122571 --- /dev/null +++ b/plugins/memory/memorypubsub/pubsub.go @@ -0,0 +1,92 @@ +package memorypubsub + +import ( + "context" + "sync" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/pubsub" + "github.com/spiral/roadrunner/v2/pkg/bst" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type PubSubDriver struct { + sync.RWMutex + // channel with the messages from the RPC + pushCh chan *pubsub.Message + // user-subscribed topics + storage bst.Storage + log logger.Logger +} + +func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) { + ps := &PubSubDriver{ + pushCh: make(chan *pubsub.Message, 10), + storage: bst.NewBST(), + log: log, + } + return ps, nil +} + +func (p *PubSubDriver) Publish(msg *pubsub.Message) error { + p.pushCh <- msg + return nil +} + +func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { + go func() { + p.pushCh <- msg + }() +} + +func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Insert(connectionID, topics[i]) + } + return nil +} + +func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Remove(connectionID, topics[i]) + } + return nil +} + +func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { + p.RLock() + defer p.RUnlock() + + ret := p.storage.Get(topic) + for rr := range ret { + res[rr] = struct{}{} + } +} + +func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { + const op = errors.Op("pubsub_memory") + select { + case msg := <-p.pushCh: + if msg == nil { + return nil, nil + } + + p.RLock() + defer p.RUnlock() + // push only messages, which topics are subscibed + // TODO better??? + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(msg.Topic); ok { + return msg, nil + } + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } + + return nil, nil +} diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 7d418a70..515e469a 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,27 +2,29 @@ package memory import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/common/pubsub" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs" + "github.com/spiral/roadrunner/v2/plugins/memory/memorykv" + "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub" ) const PluginName string = "memory" type Plugin struct { - // heap is user map for the key-value pairs - stop chan struct{} - - log logger.Logger - cfgPlugin config.Configurer - drivers uint + log logger.Logger + cfg config.Configurer } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log - p.cfgPlugin = cfg - p.stop = make(chan struct{}, 1) + p.cfg = cfg return nil } @@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Available() {} + +// Drivers implementation + func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { - return NewPubSubDriver(p.log, key) + return memorypubsub.NewPubSubDriver(p.log, key) } func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("inmemory_plugin_provide") - st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg) if err != nil { return nil, errors.E(op, err) } - - // save driver number to release resources after Stop - p.drivers++ - return st, nil } -func (p *Plugin) Name() string { - return PluginName +// JobsConstruct creates new ephemeral consumer from the configuration +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline creates new ephemeral consumer from the provided pipeline +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.FromPipeline(pipeline, p.log, e, pq) } diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go deleted file mode 100644 index fd30eb54..00000000 --- a/plugins/memory/pubsub.go +++ /dev/null @@ -1,92 +0,0 @@ -package memory - -import ( - "context" - "sync" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type PubSubDriver struct { - sync.RWMutex - // channel with the messages from the RPC - pushCh chan *pubsub.Message - // user-subscribed topics - storage bst.Storage - log logger.Logger -} - -func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) { - ps := &PubSubDriver{ - pushCh: make(chan *pubsub.Message, 10), - storage: bst.NewBST(), - log: log, - } - return ps, nil -} - -func (p *PubSubDriver) Publish(msg *pubsub.Message) error { - p.pushCh <- msg - return nil -} - -func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { - go func() { - p.pushCh <- msg - }() -} - -func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Insert(connectionID, topics[i]) - } - return nil -} - -func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Remove(connectionID, topics[i]) - } - return nil -} - -func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { - p.RLock() - defer p.RUnlock() - - ret := p.storage.Get(topic) - for rr := range ret { - res[rr] = struct{}{} - } -} - -func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { - const op = errors.Op("pubsub_memory") - select { - case msg := <-p.pushCh: - if msg == nil { - return nil, nil - } - - p.RLock() - defer p.RUnlock() - // push only messages, which topics are subscibed - // TODO better??? - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(msg.Topic); ok { - return msg, nil - } - case <-ctx.Done(): - return nil, errors.E(op, errors.TimeOut, ctx.Err()) - } - - return nil, nil -} diff --git a/plugins/redis/clients.go b/plugins/redis/clients.go deleted file mode 100644 index d0a184d2..00000000 --- a/plugins/redis/clients.go +++ /dev/null @@ -1,84 +0,0 @@ -package redis - -import ( - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" -) - -// RedisClient return a client based on the provided section key -// key sample: kv.some-section.redis -// kv.redis -// redis (root) -func (p *Plugin) RedisClient(key string) (redis.UniversalClient, error) { - const op = errors.Op("redis_get_client") - - if !p.cfgPlugin.Has(key) { - return nil, errors.E(op, errors.Errorf("no such section: %s", key)) - } - - cfg := &Config{} - - err := p.cfgPlugin.UnmarshalKey(key, cfg) - if err != nil { - return nil, errors.E(op, err) - } - - cfg.InitDefaults() - - uc := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: cfg.Addrs, - DB: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - SentinelPassword: cfg.SentinelPassword, - MaxRetries: cfg.MaxRetries, - MinRetryBackoff: cfg.MaxRetryBackoff, - MaxRetryBackoff: cfg.MaxRetryBackoff, - DialTimeout: cfg.DialTimeout, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - PoolSize: cfg.PoolSize, - MinIdleConns: cfg.MinIdleConns, - MaxConnAge: cfg.MaxConnAge, - PoolTimeout: cfg.PoolTimeout, - IdleTimeout: cfg.IdleTimeout, - IdleCheckFrequency: cfg.IdleCheckFreq, - ReadOnly: cfg.ReadOnly, - RouteByLatency: cfg.RouteByLatency, - RouteRandomly: cfg.RouteRandomly, - MasterName: cfg.MasterName, - }) - - return uc, nil -} - -func (p *Plugin) DefaultClient() redis.UniversalClient { - cfg := &Config{} - cfg.InitDefaults() - - uc := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: cfg.Addrs, - DB: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - SentinelPassword: cfg.SentinelPassword, - MaxRetries: cfg.MaxRetries, - MinRetryBackoff: cfg.MaxRetryBackoff, - MaxRetryBackoff: cfg.MaxRetryBackoff, - DialTimeout: cfg.DialTimeout, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - PoolSize: cfg.PoolSize, - MinIdleConns: cfg.MinIdleConns, - MaxConnAge: cfg.MaxConnAge, - PoolTimeout: cfg.PoolTimeout, - IdleTimeout: cfg.IdleTimeout, - IdleCheckFrequency: cfg.IdleCheckFreq, - ReadOnly: cfg.ReadOnly, - RouteByLatency: cfg.RouteByLatency, - RouteRandomly: cfg.RouteRandomly, - MasterName: cfg.MasterName, - }) - - return uc -} diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go deleted file mode 100644 index 189b0002..00000000 --- a/plugins/redis/interface.go +++ /dev/null @@ -1,12 +0,0 @@ -package redis - -import "github.com/go-redis/redis/v8" - -// Redis in the redis KV plugin interface -type Redis interface { - // RedisClient provides universal redis client - RedisClient(key string) (redis.UniversalClient, error) - - // DefaultClient provide default redis client based on redis defaults - DefaultClient() redis.UniversalClient -} diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go index b41cb86c..3d062fbb 100644 --- a/plugins/redis/kv/kv.go +++ b/plugins/redis/kv/kv.go @@ -248,3 +248,5 @@ func (d *Driver) Clear() error { return nil } + +func (d *Driver) Stop() {} diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index bf9f60cc..9813344e 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -45,17 +45,17 @@ jobs: # list of broker pipelines associated with endpoints pipelines: test-local: - driver: ephemeral + driver: memory priority: 10 prefetch: 10000 test-local-2: - driver: ephemeral + driver: memory priority: 1 prefetch: 10000 test-local-3: - driver: ephemeral + driver: memory priority: 2 prefetch: 10000 diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml deleted file mode 100644 index 726c24ac..00000000 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml +++ /dev/null @@ -1,21 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml deleted file mode 100644 index 8914dfaa..00000000 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml +++ /dev/null @@ -1,37 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: ephemeral - priority: 10 - prefetch: 10000 - - test-2: - driver: ephemeral - priority: 10 - prefetch: 10000 - - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml deleted file mode 100644 index 05dc3ffa..00000000 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml +++ /dev/null @@ -1,21 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_err.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml deleted file mode 100644 index e1b76263..00000000 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml +++ /dev/null @@ -1,44 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: ephemeral - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: ephemeral - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: ephemeral - priority: 2 - pipeline_size: 10000 - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2" ] - diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go deleted file mode 100644 index 2890aa9d..00000000 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ /dev/null @@ -1,571 +0,0 @@ -package jobs - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/ephemeral" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/resetter" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" -) - -func TestEphemeralInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-init.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - stopCh <- struct{}{} - wg.Wait() -} - -func TestEphemeralDeclare(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestEphemeralPauseResume(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-pause-resume.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) - - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("ephemeralResume", resumePipes("test-local")) - t.Run("ephemeralPause", pausePipelines("test-local")) - t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) - t.Run("ephemeralResume", resumePipes("test-local")) - t.Run("pushToEnabledPipe", pushToPipe("test-local")) - time.Sleep(time.Second * 1) - - stopCh <- struct{}{} - time.Sleep(time.Second) - wg.Wait() -} - -func TestEphemeralJobsError(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-jobs-err.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", resumePipes("test-3")) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) - time.Sleep(time.Second * 25) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestEphemeralStats(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - - t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) - - time.Sleep(time.Second) - out := &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "ephemeral") - assert.Equal(t, out.Queue, "test-3") - - assert.Equal(t, out.Active, int64(1)) - assert.Equal(t, out.Delayed, int64(1)) - assert.Equal(t, out.Reserved, int64(0)) - - time.Sleep(time.Second) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - time.Sleep(time.Second * 7) - - out = &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "ephemeral") - assert.Equal(t, out.Queue, "test-3") - - assert.Equal(t, out.Active, int64(0)) - assert.Equal(t, out.Delayed, int64(0)) - assert.Equal(t, out.Reserved, int64(0)) - - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func declareEphemeralPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "ephemeral", - "name": "test-3", - "prefetch": "10000", - }} - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Declare", pipe, er) - assert.NoError(t, err) -} - -func consumeEphemeralPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} - pipe.GetPipelines()[0] = "test-3" - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Resume", pipe, er) - assert.NoError(t, err) -} diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index 951d6227..5c521c2b 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -14,9 +14,9 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" @@ -68,7 +68,7 @@ func TestJobsInit(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, &amqp.Plugin{}, ) assert.NoError(t, err) @@ -154,7 +154,7 @@ func TestJOBSMetrics(t *testing.T) { &server.Plugin{}, &jobs.Plugin{}, &metrics.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, mockLogger, ) assert.NoError(t, err) @@ -204,8 +204,8 @@ func TestJOBSMetrics(t *testing.T) { time.Sleep(time.Second * 2) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("DeclareEphemeralPipeline", declareMemoryPipe) + t.Run("ConsumeEphemeralPipeline", consumeMemoryPipe) t.Run("PushEphemeralPipeline", pushToPipe("test-3")) time.Sleep(time.Second) t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) diff --git a/tests/plugins/jobs/jobs_memory_test.go b/tests/plugins/jobs/jobs_memory_test.go new file mode 100644 index 00000000..20cbfb3f --- /dev/null +++ b/tests/plugins/jobs/jobs_memory_test.go @@ -0,0 +1,571 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/memory" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" +) + +func TestMemoryInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "memory/.rr-memory-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} + +func TestMemoryDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "memory/.rr-memory-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestMemoryPauseResume(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "memory/.rr-memory-pause-resume.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) + + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &memory.Plugin{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("Resume", resumePipes("test-local")) + t.Run("Pause", pausePipelines("test-local")) + t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) + t.Run("Resume", resumePipes("test-local")) + t.Run("pushToEnabledPipe", pushToPipe("test-local")) + time.Sleep(time.Second * 1) + + stopCh <- struct{}{} + time.Sleep(time.Second) + wg.Wait() +} + +func TestMemoryJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "memory/.rr-memory-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestMemoryStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "memory/.rr-memory-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + + t.Run("PushPipeline", pushToPipeDelayed("test-3", 5)) + t.Run("PushPipeline", pushToPipe("test-3")) + + time.Sleep(time.Second) + out := &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "memory") + assert.Equal(t, out.Queue, "test-3") + + assert.Equal(t, out.Active, int64(1)) + assert.Equal(t, out.Delayed, int64(1)) + assert.Equal(t, out.Reserved, int64(0)) + + time.Sleep(time.Second) + t.Run("ConsumePipeline", consumeMemoryPipe) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, out.Pipeline, "test-3") + assert.Equal(t, out.Driver, "memory") + assert.Equal(t, out.Queue, "test-3") + + assert.Equal(t, out.Active, int64(0)) + assert.Equal(t, out.Delayed, int64(0)) + assert.Equal(t, out.Reserved, int64(0)) + + t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareMemoryPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "memory", + "name": "test-3", + "prefetch": "10000", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) +} + +func consumeMemoryPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} + pipe.GetPipelines()[0] = "test-3" + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Resume", pipe, er) + assert.NoError(t, err) +} diff --git a/tests/plugins/jobs/memory/.rr-memory-declare.yaml b/tests/plugins/jobs/memory/.rr-memory-declare.yaml new file mode 100644 index 00000000..726c24ac --- /dev/null +++ b/tests/plugins/jobs/memory/.rr-memory-declare.yaml @@ -0,0 +1,21 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/memory/.rr-memory-init.yaml b/tests/plugins/jobs/memory/.rr-memory-init.yaml new file mode 100644 index 00000000..9ee8afc2 --- /dev/null +++ b/tests/plugins/jobs/memory/.rr-memory-init.yaml @@ -0,0 +1,37 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: memory + priority: 10 + prefetch: 10000 + + test-2: + driver: memory + priority: 10 + prefetch: 10000 + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml new file mode 100644 index 00000000..05dc3ffa --- /dev/null +++ b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml @@ -0,0 +1,21 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml new file mode 100644 index 00000000..1ad48237 --- /dev/null +++ b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml @@ -0,0 +1,44 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-local: + driver: memory + priority: 10 + pipeline_size: 10000 + + test-local-2: + driver: memory + priority: 1 + pipeline_size: 10000 + + test-local-3: + driver: memory + priority: 2 + pipeline_size: 10000 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-local", "test-local-2" ] + diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml index f58de3e4..471e5c77 100644 --- a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml +++ b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: boltdb-south: diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml index 08b3bfad..b46bcb1c 100644 --- a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml +++ b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: boltdb-south: diff --git a/tests/plugins/kv/configs/.rr-kv-init.yaml b/tests/plugins/kv/configs/.rr-kv-init.yaml index a13b591c..6407c7ad 100644 --- a/tests/plugins/kv/configs/.rr-kv-init.yaml +++ b/tests/plugins/kv/configs/.rr-kv-init.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: default: @@ -25,6 +28,3 @@ kv: memcached: driver: memcached addr: [ "127.0.0.1:11211" ] - -# redis: -# driver: redis diff --git a/tests/plugins/redis/plugin1.go b/tests/plugins/redis/plugin1.go deleted file mode 100644 index 68da1394..00000000 --- a/tests/plugins/redis/plugin1.go +++ /dev/null @@ -1,45 +0,0 @@ -package redis - -import ( - "context" - "time" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - redisPlugin "github.com/spiral/roadrunner/v2/plugins/redis" -) - -type Plugin1 struct { - redisClient redis.UniversalClient -} - -func (p *Plugin1) Init(redis redisPlugin.Redis) error { - var err error - p.redisClient, err = redis.RedisClient("redis") - - return err -} - -func (p *Plugin1) Serve() chan error { - const op = errors.Op("plugin1 serve") - errCh := make(chan error, 1) - p.redisClient.Set(context.Background(), "foo", "bar", time.Minute) - - stringCmd := p.redisClient.Get(context.Background(), "foo") - data, err := stringCmd.Result() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - if data != "bar" { - errCh <- errors.E(op, errors.Str("no such key")) - return errCh - } - - return errCh -} - -func (p *Plugin1) Stop() error { - return nil -} diff --git a/tests/plugins/redis/redis_plugin_test.go b/tests/plugins/redis/redis_plugin_test.go deleted file mode 100644 index 1b84e339..00000000 --- a/tests/plugins/redis/redis_plugin_test.go +++ /dev/null @@ -1,120 +0,0 @@ -package redis - -import ( - "fmt" - "os" - "os/signal" - "sync" - "syscall" - "testing" - - "github.com/alicebob/miniredis/v2" - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/redis" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" -) - -func redisConfig(port string) string { - cfg := ` -redis: - addrs: - - '127.0.0.1:%s' - master_name: '' - username: '' - password: '' - db: 0 - sentinel_password: '' - route_by_latency: false - route_randomly: false - dial_timeout: 0 - max_retries: 1 - min_retry_backoff: 0 - max_retry_backoff: 0 - pool_size: 0 - min_idle_conns: 0 - max_conn_age: 0 - read_timeout: 0 - write_timeout: 0 - pool_timeout: 0 - idle_timeout: 0 - idle_check_freq: 0 - read_only: false -` - return fmt.Sprintf(cfg, port) -} - -func TestRedisInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - if err != nil { - t.Fatal(err) - } - - s, err := miniredis.Run() - assert.NoError(t, err) - - c := redisConfig(s.Port()) - - cfg := &config.Viper{} - cfg.Type = "yaml" - cfg.ReadInCfg = []byte(c) - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - err = cont.RegisterAll( - cfg, - mockLogger, - &redis.Plugin{}, - &Plugin1{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - stopCh <- struct{}{} - wg.Wait() -} diff --git a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml index 5ab359d3..d256aad7 100644 --- a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml +++ b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml @@ -1,3 +1,8 @@ logs: mode: development - level: error \ No newline at end of file + level: panic + +endure: + grace_period: 120s + print_graph: false + log_level: panic diff --git a/tests/plugins/rpc/configs/.rr.yaml b/tests/plugins/rpc/configs/.rr.yaml index 67d935e3..d6aaa7c6 100644 --- a/tests/plugins/rpc/configs/.rr.yaml +++ b/tests/plugins/rpc/configs/.rr.yaml @@ -1,5 +1,11 @@ rpc: listen: tcp://127.0.0.1:6001 + logs: mode: development - level: error \ No newline at end of file + level: panic + +endure: + grace_period: 120s + print_graph: false + log_level: panic -- cgit v1.2.3