summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-14 20:29:10 +0300
committerValery Piashchynski <[email protected]>2021-08-14 20:29:10 +0300
commit5a56dc33b9903e9d96e7c87067bd273ad2e68f8a (patch)
treeaa5e6020d18fd42ee29ac3cf62ad41d4f18795c4 /plugins
parent6860326fa5d8f37f6e954da07fd53b9261731227 (diff)
Update broadcast tests, add redis flusing. Initial impl of the job
drivers state. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/plugin.go2
-rw-r--r--plugins/informer/interface.go19
-rw-r--r--plugins/informer/plugin.go2
-rw-r--r--plugins/informer/rpc.go2
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go24
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go7
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go107
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go27
-rw-r--r--plugins/jobs/plugin.go40
-rw-r--r--plugins/redis/jobs/config.go34
-rw-r--r--plugins/redis/jobs/consumer.go1
-rw-r--r--plugins/redis/jobs/item.go1
-rw-r--r--plugins/redis/kv/config.go34
-rw-r--r--plugins/redis/kv/kv.go (renamed from plugins/redis/kv.go)2
-rw-r--r--plugins/redis/plugin.go6
-rw-r--r--plugins/redis/pubsub/channel.go (renamed from plugins/redis/channel.go)2
-rw-r--r--plugins/redis/pubsub/config.go34
-rw-r--r--plugins/redis/pubsub/pubsub.go (renamed from plugins/redis/pubsub.go)2
-rw-r--r--plugins/server/command.go2
-rw-r--r--plugins/server/plugin.go2
-rw-r--r--plugins/service/plugin.go2
-rw-r--r--plugins/websockets/plugin.go2
22 files changed, 274 insertions, 80 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 2ee83384..dc887f87 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -10,7 +10,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/config"
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index bbc1a048..9277b85b 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -1,7 +1,10 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
)
/*
@@ -9,17 +12,23 @@ Informer plugin should not receive any other plugin in the Init or via Collects
Because Availabler implementation should present in every plugin
*/
+// Statistic interfaces ==============
+
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
Workers() []*process.State
}
+// JobsStat interface provide statistic for the jobs plugin
+type JobsStat interface {
+ // JobsState returns slice with the attached drivers information
+ JobsState(ctx context.Context) ([]*job.State, error)
+}
+
+// Statistic interfaces end ============
+
// Availabler interface should be implemented by every plugin which wish to report to the PHP worker that it available in the RR runtime
type Availabler interface {
// Available method needed to collect all plugins which are available in the runtime.
Available()
}
-
-type JobsStat interface {
- Stat()
-}
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index c613af58..21fd7983 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -2,7 +2,7 @@ package informer
import (
endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
)
const PluginName = "informer"
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 02254865..3738b619 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -1,7 +1,7 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
)
type rpc struct {
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 429953e1..b89cdc82 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -12,6 +12,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -361,6 +362,29 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("amqp_driver_state")
+ select {
+ case pch := <-j.publishChan:
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ q, err := pch.QueueInspect(j.queue)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return &jobState.State{
+ Queue: q.Name,
+ Active: int64(q.Messages),
+ }, nil
+
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+
func (j *JobConsumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index eaf99be1..7e81e6d9 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -10,6 +10,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -213,12 +214,16 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
return nil
}
-func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error {
+func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
// register the pipeline
j.pipeline.Store(p)
return nil
}
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ return nil, nil
+}
+
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 95ad6ecd..d801b7b4 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -106,57 +107,8 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("ephemeral_handle_request")
- // handle timeouts
- // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
- // goroutines here. We should limit goroutines here.
- if msg.Options.Delay > 0 {
- // if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
- return errors.E(op, errors.Str("max concurrency number reached"))
- }
-
- go func(jj *Item) {
- atomic.AddUint64(&j.goroutines, 1)
- time.Sleep(jj.Options.DelayDuration())
-
- // send the item after timeout expired
- j.localPrefetch <- jj
-
- atomic.AddUint64(&j.goroutines, ^uint64(0))
- }(msg)
-
- return nil
- }
-
- // insert to the local, limited pipeline
- select {
- case j.localPrefetch <- msg:
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
- }
-}
-
-func (j *JobConsumer) consume() {
- // redirect
- for {
- select {
- case item, ok := <-j.localPrefetch:
- if !ok {
- j.log.Warn("ephemeral local prefetch queue was closed")
- return
- }
-
- // set requeue channel
- item.Options.requeueFn = j.handleItem
-
- j.pq.Insert(item)
- case <-j.stopCh:
- return
- }
- }
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ return nil, nil
}
func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
@@ -242,3 +194,56 @@ func (j *JobConsumer) Stop(ctx context.Context) error {
return errors.E(op, ctx.Err())
}
}
+
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("ephemeral_handle_request")
+ // handle timeouts
+ // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
+ // goroutines here. We should limit goroutines here.
+ if msg.Options.Delay > 0 {
+ // if we have 1000 goroutines waiting on the delay - reject 1001
+ if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
+ return errors.E(op, errors.Str("max concurrency number reached"))
+ }
+
+ go func(jj *Item) {
+ atomic.AddUint64(&j.goroutines, 1)
+ time.Sleep(jj.Options.DelayDuration())
+
+ // send the item after timeout expired
+ j.localPrefetch <- jj
+
+ atomic.AddUint64(&j.goroutines, ^uint64(0))
+ }(msg)
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ select {
+ case j.localPrefetch <- msg:
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ }
+}
+
+func (j *JobConsumer) consume() {
+ // redirect
+ for {
+ select {
+ case item, ok := <-j.localPrefetch:
+ if !ok {
+ j.log.Warn("ephemeral local prefetch queue was closed")
+ return
+ }
+
+ // set requeue channel
+ item.Options.requeueFn = j.handleItem
+
+ j.pq.Insert(item)
+ case <-j.stopCh:
+ return
+ }
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 9ce37543..4fb684f8 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -15,6 +15,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -261,17 +262,8 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
- if err != nil {
- return err
- }
- _, err = j.client.SendMessage(ctx, d)
- if err != nil {
- return err
- }
-
- return nil
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ return nil, nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -374,3 +366,16 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
Start: time.Now(),
})
}
+
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ d, err := msg.pack(j.queueURL)
+ if err != nil {
+ return err
+ }
+ _, err = j.client.SendMessage(ctx, d)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 7707cb8a..5f6c8b94 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -13,6 +13,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -303,6 +305,44 @@ func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) {
p.jobConstructors[name.Name()] = c
}
+func (p *Plugin) Workers() []*process.State {
+ p.RLock()
+ wrk := p.workersPool.Workers()
+ p.RUnlock()
+
+ ps := make([]*process.State, len(wrk))
+
+ for i := 0; i < len(wrk); i++ {
+ st, err := process.WorkerProcessState(wrk[i])
+ if err != nil {
+ p.log.Error("jobs workers state", "error", err)
+ return nil
+ }
+
+ ps[i] = st
+ }
+
+ return ps
+}
+
+func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) {
+ const op = errors.Op("jobs_plugin_drivers_state")
+ jst := make([]*jobState.State, 0, len(p.consumers))
+ for k := range p.consumers {
+ d := p.consumers[k]
+ newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout))
+ state, err := d.State(newCtx)
+ if err != nil {
+ cancel()
+ return nil, errors.E(op, err)
+ }
+
+ jst = append(jst, state)
+ cancel()
+ }
+ return jst, nil
+}
+
func (p *Plugin) Available() {}
func (p *Plugin) Name() string {
diff --git a/plugins/redis/jobs/config.go b/plugins/redis/jobs/config.go
new file mode 100644
index 00000000..89d707af
--- /dev/null
+++ b/plugins/redis/jobs/config.go
@@ -0,0 +1,34 @@
+package jobs
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/jobs/consumer.go b/plugins/redis/jobs/consumer.go
new file mode 100644
index 00000000..415ac457
--- /dev/null
+++ b/plugins/redis/jobs/consumer.go
@@ -0,0 +1 @@
+package jobs
diff --git a/plugins/redis/jobs/item.go b/plugins/redis/jobs/item.go
new file mode 100644
index 00000000..415ac457
--- /dev/null
+++ b/plugins/redis/jobs/item.go
@@ -0,0 +1 @@
+package jobs
diff --git a/plugins/redis/kv/config.go b/plugins/redis/kv/config.go
new file mode 100644
index 00000000..5b760952
--- /dev/null
+++ b/plugins/redis/kv/config.go
@@ -0,0 +1,34 @@
+package kv
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/kv.go b/plugins/redis/kv/kv.go
index 29f89d46..b41cb86c 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv/kv.go
@@ -1,4 +1,4 @@
-package redis
+package kv
import (
"context"
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 3c62a63f..961182a9 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -9,6 +9,8 @@ import (
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ redis_kv "github.com/spiral/roadrunner/v2/plugins/redis/kv"
+ redis_pubsub "github.com/spiral/roadrunner/v2/plugins/redis/pubsub"
)
const PluginName = "redis"
@@ -62,7 +64,7 @@ func (p *Plugin) Available() {}
// KVConstruct provides KV storage implementation over the redis plugin
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("redis_plugin_provide")
- st, err := NewRedisDriver(p.log, key, p.cfgPlugin)
+ st, err := redis_kv.NewRedisDriver(p.log, key, p.cfgPlugin)
if err != nil {
return nil, errors.E(op, err)
}
@@ -71,5 +73,5 @@ func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
}
func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
- return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
+ return redis_pubsub.NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
}
diff --git a/plugins/redis/channel.go b/plugins/redis/pubsub/channel.go
index 0cd62d19..eef5a7b9 100644
--- a/plugins/redis/channel.go
+++ b/plugins/redis/pubsub/channel.go
@@ -1,4 +1,4 @@
-package redis
+package pubsub
import (
"context"
diff --git a/plugins/redis/pubsub/config.go b/plugins/redis/pubsub/config.go
new file mode 100644
index 00000000..bf8d2fc9
--- /dev/null
+++ b/plugins/redis/pubsub/config.go
@@ -0,0 +1,34 @@
+package pubsub
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub/pubsub.go
index 01efc623..95a9f6dd 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub/pubsub.go
@@ -1,4 +1,4 @@
-package redis
+package pubsub
import (
"context"
diff --git a/plugins/server/command.go b/plugins/server/command.go
index e0b61896..b8bc1395 100644
--- a/plugins/server/command.go
+++ b/plugins/server/command.go
@@ -29,5 +29,5 @@ func (server *Plugin) scanCommand(cmd []string) error {
return nil
}
}
- return errors.E(errors.Str("scan failed, possible path not found"), op)
+ return errors.E(errors.Str("scan failed, possible path not found, this is not an error"), op)
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 1694cdf1..16e3bd8c 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -97,7 +97,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
// try to find a path here
err := server.scanCommand(cmdArgs)
if err != nil {
- server.log.Info("scan command", "error", err)
+ server.log.Info("scan command", "reason", err)
}
return func() *exec.Cmd {
diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go
index 28b84443..3bd0f956 100644
--- a/plugins/service/plugin.go
+++ b/plugins/service/plugin.go
@@ -4,7 +4,7 @@ import (
"sync"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 2df23f11..ad371bf8 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -13,7 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"