summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-14 20:29:10 +0300
committerValery Piashchynski <[email protected]>2021-08-14 20:29:10 +0300
commit5a56dc33b9903e9d96e7c87067bd273ad2e68f8a (patch)
treeaa5e6020d18fd42ee29ac3cf62ad41d4f18795c4
parent6860326fa5d8f37f6e954da07fd53b9261731227 (diff)
Update broadcast tests, add redis flusing. Initial impl of the job
drivers state. Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--common/jobs/interface.go7
-rwxr-xr-xpkg/pool/supervisor_pool.go2
-rw-r--r--pkg/state/job/state.go6
-rw-r--r--pkg/state/process/state.go (renamed from pkg/process/state.go)0
-rw-r--r--plugins/http/plugin.go2
-rw-r--r--plugins/informer/interface.go19
-rw-r--r--plugins/informer/plugin.go2
-rw-r--r--plugins/informer/rpc.go2
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go24
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go7
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go107
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go27
-rw-r--r--plugins/jobs/plugin.go40
-rw-r--r--plugins/redis/jobs/config.go34
-rw-r--r--plugins/redis/jobs/consumer.go1
-rw-r--r--plugins/redis/jobs/item.go1
-rw-r--r--plugins/redis/kv/config.go34
-rw-r--r--plugins/redis/kv/kv.go (renamed from plugins/redis/kv.go)2
-rw-r--r--plugins/redis/plugin.go6
-rw-r--r--plugins/redis/pubsub/channel.go (renamed from plugins/redis/channel.go)2
-rw-r--r--plugins/redis/pubsub/config.go34
-rw-r--r--plugins/redis/pubsub/pubsub.go (renamed from plugins/redis/pubsub.go)2
-rw-r--r--plugins/server/command.go2
-rw-r--r--plugins/server/plugin.go2
-rw-r--r--plugins/service/plugin.go2
-rw-r--r--plugins/websockets/plugin.go2
-rw-r--r--tests/env/docker-compose.yaml2
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go27
-rw-r--r--tests/plugins/http/http_plugin_test.go2
-rw-r--r--tests/plugins/informer/informer_test.go2
-rw-r--r--tests/plugins/informer/test_plugin.go2
31 files changed, 318 insertions, 86 deletions
diff --git a/common/jobs/interface.go b/common/jobs/interface.go
index c957df2b..4b5ff70e 100644
--- a/common/jobs/interface.go
+++ b/common/jobs/interface.go
@@ -5,11 +5,12 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
)
-// Consumer todo naming
+// Consumer represents a single jobs driver interface
type Consumer interface {
Push(ctx context.Context, job *job.Job) error
Register(ctx context.Context, pipeline *pipeline.Pipeline) error
@@ -18,8 +19,12 @@ type Consumer interface {
Pause(ctx context.Context, pipeline string)
Resume(ctx context.Context, pipeline string)
+
+ // State provide information about driver state
+ State(ctx context.Context) (*jobState.State, error)
}
+// Constructor constructs Consumer interface. Endure abstraction.
type Constructor interface {
JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (Consumer, error)
FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (Consumer, error)
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index bdaeade1..e6b2bd7c 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -8,7 +8,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
diff --git a/pkg/state/job/state.go b/pkg/state/job/state.go
new file mode 100644
index 00000000..e5b142b5
--- /dev/null
+++ b/pkg/state/job/state.go
@@ -0,0 +1,6 @@
+package job
+
+type State struct {
+ Queue string
+ Active int64
+}
diff --git a/pkg/process/state.go b/pkg/state/process/state.go
index bfc3a287..bfc3a287 100644
--- a/pkg/process/state.go
+++ b/pkg/state/process/state.go
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 2ee83384..dc887f87 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -10,7 +10,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/config"
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index bbc1a048..9277b85b 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -1,7 +1,10 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
)
/*
@@ -9,17 +12,23 @@ Informer plugin should not receive any other plugin in the Init or via Collects
Because Availabler implementation should present in every plugin
*/
+// Statistic interfaces ==============
+
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
Workers() []*process.State
}
+// JobsStat interface provide statistic for the jobs plugin
+type JobsStat interface {
+ // JobsState returns slice with the attached drivers information
+ JobsState(ctx context.Context) ([]*job.State, error)
+}
+
+// Statistic interfaces end ============
+
// Availabler interface should be implemented by every plugin which wish to report to the PHP worker that it available in the RR runtime
type Availabler interface {
// Available method needed to collect all plugins which are available in the runtime.
Available()
}
-
-type JobsStat interface {
- Stat()
-}
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index c613af58..21fd7983 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -2,7 +2,7 @@ package informer
import (
endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
)
const PluginName = "informer"
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 02254865..3738b619 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -1,7 +1,7 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
)
type rpc struct {
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 429953e1..b89cdc82 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -12,6 +12,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -361,6 +362,29 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("amqp_driver_state")
+ select {
+ case pch := <-j.publishChan:
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ q, err := pch.QueueInspect(j.queue)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return &jobState.State{
+ Queue: q.Name,
+ Active: int64(q.Messages),
+ }, nil
+
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+
func (j *JobConsumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index eaf99be1..7e81e6d9 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -10,6 +10,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -213,12 +214,16 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
return nil
}
-func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error {
+func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
// register the pipeline
j.pipeline.Store(p)
return nil
}
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ return nil, nil
+}
+
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 95ad6ecd..d801b7b4 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -106,57 +107,8 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("ephemeral_handle_request")
- // handle timeouts
- // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
- // goroutines here. We should limit goroutines here.
- if msg.Options.Delay > 0 {
- // if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
- return errors.E(op, errors.Str("max concurrency number reached"))
- }
-
- go func(jj *Item) {
- atomic.AddUint64(&j.goroutines, 1)
- time.Sleep(jj.Options.DelayDuration())
-
- // send the item after timeout expired
- j.localPrefetch <- jj
-
- atomic.AddUint64(&j.goroutines, ^uint64(0))
- }(msg)
-
- return nil
- }
-
- // insert to the local, limited pipeline
- select {
- case j.localPrefetch <- msg:
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
- }
-}
-
-func (j *JobConsumer) consume() {
- // redirect
- for {
- select {
- case item, ok := <-j.localPrefetch:
- if !ok {
- j.log.Warn("ephemeral local prefetch queue was closed")
- return
- }
-
- // set requeue channel
- item.Options.requeueFn = j.handleItem
-
- j.pq.Insert(item)
- case <-j.stopCh:
- return
- }
- }
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ return nil, nil
}
func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
@@ -242,3 +194,56 @@ func (j *JobConsumer) Stop(ctx context.Context) error {
return errors.E(op, ctx.Err())
}
}
+
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("ephemeral_handle_request")
+ // handle timeouts
+ // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
+ // goroutines here. We should limit goroutines here.
+ if msg.Options.Delay > 0 {
+ // if we have 1000 goroutines waiting on the delay - reject 1001
+ if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
+ return errors.E(op, errors.Str("max concurrency number reached"))
+ }
+
+ go func(jj *Item) {
+ atomic.AddUint64(&j.goroutines, 1)
+ time.Sleep(jj.Options.DelayDuration())
+
+ // send the item after timeout expired
+ j.localPrefetch <- jj
+
+ atomic.AddUint64(&j.goroutines, ^uint64(0))
+ }(msg)
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ select {
+ case j.localPrefetch <- msg:
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ }
+}
+
+func (j *JobConsumer) consume() {
+ // redirect
+ for {
+ select {
+ case item, ok := <-j.localPrefetch:
+ if !ok {
+ j.log.Warn("ephemeral local prefetch queue was closed")
+ return
+ }
+
+ // set requeue channel
+ item.Options.requeueFn = j.handleItem
+
+ j.pq.Insert(item)
+ case <-j.stopCh:
+ return
+ }
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 9ce37543..4fb684f8 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -15,6 +15,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -261,17 +262,8 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
- if err != nil {
- return err
- }
- _, err = j.client.SendMessage(ctx, d)
- if err != nil {
- return err
- }
-
- return nil
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ return nil, nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -374,3 +366,16 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
Start: time.Now(),
})
}
+
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ d, err := msg.pack(j.queueURL)
+ if err != nil {
+ return err
+ }
+ _, err = j.client.SendMessage(ctx, d)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 7707cb8a..5f6c8b94 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -13,6 +13,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -303,6 +305,44 @@ func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) {
p.jobConstructors[name.Name()] = c
}
+func (p *Plugin) Workers() []*process.State {
+ p.RLock()
+ wrk := p.workersPool.Workers()
+ p.RUnlock()
+
+ ps := make([]*process.State, len(wrk))
+
+ for i := 0; i < len(wrk); i++ {
+ st, err := process.WorkerProcessState(wrk[i])
+ if err != nil {
+ p.log.Error("jobs workers state", "error", err)
+ return nil
+ }
+
+ ps[i] = st
+ }
+
+ return ps
+}
+
+func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) {
+ const op = errors.Op("jobs_plugin_drivers_state")
+ jst := make([]*jobState.State, 0, len(p.consumers))
+ for k := range p.consumers {
+ d := p.consumers[k]
+ newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout))
+ state, err := d.State(newCtx)
+ if err != nil {
+ cancel()
+ return nil, errors.E(op, err)
+ }
+
+ jst = append(jst, state)
+ cancel()
+ }
+ return jst, nil
+}
+
func (p *Plugin) Available() {}
func (p *Plugin) Name() string {
diff --git a/plugins/redis/jobs/config.go b/plugins/redis/jobs/config.go
new file mode 100644
index 00000000..89d707af
--- /dev/null
+++ b/plugins/redis/jobs/config.go
@@ -0,0 +1,34 @@
+package jobs
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/jobs/consumer.go b/plugins/redis/jobs/consumer.go
new file mode 100644
index 00000000..415ac457
--- /dev/null
+++ b/plugins/redis/jobs/consumer.go
@@ -0,0 +1 @@
+package jobs
diff --git a/plugins/redis/jobs/item.go b/plugins/redis/jobs/item.go
new file mode 100644
index 00000000..415ac457
--- /dev/null
+++ b/plugins/redis/jobs/item.go
@@ -0,0 +1 @@
+package jobs
diff --git a/plugins/redis/kv/config.go b/plugins/redis/kv/config.go
new file mode 100644
index 00000000..5b760952
--- /dev/null
+++ b/plugins/redis/kv/config.go
@@ -0,0 +1,34 @@
+package kv
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/kv.go b/plugins/redis/kv/kv.go
index 29f89d46..b41cb86c 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv/kv.go
@@ -1,4 +1,4 @@
-package redis
+package kv
import (
"context"
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 3c62a63f..961182a9 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -9,6 +9,8 @@ import (
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ redis_kv "github.com/spiral/roadrunner/v2/plugins/redis/kv"
+ redis_pubsub "github.com/spiral/roadrunner/v2/plugins/redis/pubsub"
)
const PluginName = "redis"
@@ -62,7 +64,7 @@ func (p *Plugin) Available() {}
// KVConstruct provides KV storage implementation over the redis plugin
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("redis_plugin_provide")
- st, err := NewRedisDriver(p.log, key, p.cfgPlugin)
+ st, err := redis_kv.NewRedisDriver(p.log, key, p.cfgPlugin)
if err != nil {
return nil, errors.E(op, err)
}
@@ -71,5 +73,5 @@ func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
}
func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
- return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
+ return redis_pubsub.NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
}
diff --git a/plugins/redis/channel.go b/plugins/redis/pubsub/channel.go
index 0cd62d19..eef5a7b9 100644
--- a/plugins/redis/channel.go
+++ b/plugins/redis/pubsub/channel.go
@@ -1,4 +1,4 @@
-package redis
+package pubsub
import (
"context"
diff --git a/plugins/redis/pubsub/config.go b/plugins/redis/pubsub/config.go
new file mode 100644
index 00000000..bf8d2fc9
--- /dev/null
+++ b/plugins/redis/pubsub/config.go
@@ -0,0 +1,34 @@
+package pubsub
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub/pubsub.go
index 01efc623..95a9f6dd 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub/pubsub.go
@@ -1,4 +1,4 @@
-package redis
+package pubsub
import (
"context"
diff --git a/plugins/server/command.go b/plugins/server/command.go
index e0b61896..b8bc1395 100644
--- a/plugins/server/command.go
+++ b/plugins/server/command.go
@@ -29,5 +29,5 @@ func (server *Plugin) scanCommand(cmd []string) error {
return nil
}
}
- return errors.E(errors.Str("scan failed, possible path not found"), op)
+ return errors.E(errors.Str("scan failed, possible path not found, this is not an error"), op)
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 1694cdf1..16e3bd8c 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -97,7 +97,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
// try to find a path here
err := server.scanCommand(cmdArgs)
if err != nil {
- server.log.Info("scan command", "error", err)
+ server.log.Info("scan command", "reason", err)
}
return func() *exec.Cmd {
diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go
index 28b84443..3bd0f956 100644
--- a/plugins/service/plugin.go
+++ b/plugins/service/plugin.go
@@ -4,7 +4,7 @@ import (
"sync"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 2df23f11..ad371bf8 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -13,7 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
diff --git a/tests/env/docker-compose.yaml b/tests/env/docker-compose.yaml
index 4f58d543..4735070a 100644
--- a/tests/env/docker-compose.yaml
+++ b/tests/env/docker-compose.yaml
@@ -1,4 +1,4 @@
-version: '3.8'
+version: '3'
services:
memcached:
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
index a78b17e1..c7041cc9 100644
--- a/tests/plugins/broadcast/broadcast_plugin_test.go
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -1,6 +1,7 @@
package broadcast
import (
+ "context"
"net"
"net/rpc"
"os"
@@ -10,6 +11,7 @@ import (
"testing"
"time"
+ goRedis "github.com/go-redis/redis/v8"
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
@@ -176,6 +178,9 @@ func TestBroadcastNoConfig(t *testing.T) {
}
func TestBroadcastSameSubscriber(t *testing.T) {
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
@@ -283,9 +288,15 @@ func TestBroadcastSameSubscriber(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()
+
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
}
func TestBroadcastSameSubscriberGlobal(t *testing.T) {
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
@@ -394,6 +405,9 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
wg.Wait()
time.Sleep(time.Second * 5)
+
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
}
func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) {
@@ -446,6 +460,7 @@ func BroadcastPublishFoo3(port string) func(t *testing.T) {
}
}
}
+
func BroadcastPublishAsyncFooFoo2Foo3(port string) func(t *testing.T) {
return func(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:"+port)
@@ -475,3 +490,15 @@ func makeMessage(payload []byte, topics ...string) *websocketsv1.Request {
return m
}
+
+func redisFlushAll(addr string) func(t *testing.T) {
+ return func(t *testing.T) {
+ rdb := goRedis.NewClient(&goRedis.Options{
+ Addr: addr,
+ Password: "", // no password set
+ DB: 0, // use default DB
+ })
+
+ rdb.FlushAll(context.Background())
+ }
+}
diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go
index bd804264..a48c8972 100644
--- a/tests/plugins/http/http_plugin_test.go
+++ b/tests/plugins/http/http_plugin_test.go
@@ -22,7 +22,7 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/gzip"
"github.com/spiral/roadrunner/v2/plugins/informer"
diff --git a/tests/plugins/informer/informer_test.go b/tests/plugins/informer/informer_test.go
index 61be85a1..c3b5c6a6 100644
--- a/tests/plugins/informer/informer_test.go
+++ b/tests/plugins/informer/informer_test.go
@@ -12,7 +12,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/logger"
diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go
index 095140b8..21897f40 100644
--- a/tests/plugins/informer/test_plugin.go
+++ b/tests/plugins/informer/test_plugin.go
@@ -5,7 +5,7 @@ import (
"time"
"github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/server"
)