diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/http/config/ssl.go | 2 | ||||
-rw-r--r-- | plugins/http/config/ssl_config_test.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/config.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/config.go | 14 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 11 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/config.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 1 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/config.go | 2 | ||||
-rw-r--r-- | plugins/kv/plugin.go | 2 | ||||
-rw-r--r-- | plugins/metrics/config.go | 2 | ||||
-rw-r--r-- | plugins/redis/config.go | 2 |
13 files changed, 27 insertions, 19 deletions
diff --git a/plugins/http/config/ssl.go b/plugins/http/config/ssl.go index d44703f9..0e3c0caf 100644 --- a/plugins/http/config/ssl.go +++ b/plugins/http/config/ssl.go @@ -36,7 +36,7 @@ func (s *SSL) Valid() error { parts := strings.Split(s.Address, ":") switch len(parts) { // :443 form - // localhost:443 form + // 127.0.0.1:443 form // use 0.0.0.0 as host and 443 as port case 2: if parts[0] == "" { diff --git a/plugins/http/config/ssl_config_test.go b/plugins/http/config/ssl_config_test.go index 1f5fef0a..8f6cf40e 100644 --- a/plugins/http/config/ssl_config_test.go +++ b/plugins/http/config/ssl_config_test.go @@ -101,7 +101,7 @@ func TestSSL_Valid6(t *testing.T) { func TestSSL_Valid7(t *testing.T) { conf := &SSL{ - Address: "localhost:555:1", + Address: "127.0.0.1:555:1", Redirect: false, Key: "../../../tests/plugins/http/fixtures/server.key", Cert: "../../../tests/plugins/http/fixtures/server.crt", diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go index 37f5c1c4..3fb0d066 100644 --- a/plugins/jobs/drivers/amqp/config.go +++ b/plugins/jobs/drivers/amqp/config.go @@ -57,6 +57,6 @@ func (c *Config) InitDefault() { func (c *GlobalCfg) InitDefault() { if c.Addr == "" { - c.Addr = "amqp://guest:guest@localhost:5672/" + c.Addr = "amqp://guest:[email protected]:5672/" } } diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 36a16bcd..d7425858 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -181,7 +181,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con exchangeName: pipeline.String(exchangeKey, "amqp.default"), prefetch: pipeline.Int(prefetch, 10), priority: int64(pipeline.Int(priority, 10)), - exclusive: pipeline.Bool(exclusive, true), + exclusive: pipeline.Bool(exclusive, false), multipleAck: pipeline.Bool(multipleAsk, false), requeueOnFail: pipeline.Bool(requeueOnFail, false), requeueCh: make(chan *Item, 1000), diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go index f0012362..a8069f5d 100644 --- a/plugins/jobs/drivers/beanstalk/config.go +++ b/plugins/jobs/drivers/beanstalk/config.go @@ -1,6 +1,10 @@ package beanstalk -import "time" +import ( + "time" + + "github.com/spiral/roadrunner/v2/utils" +) const ( tubePriority string = "tube_priority" @@ -15,7 +19,7 @@ type GlobalCfg struct { func (c *GlobalCfg) InitDefault() { if c.Addr == "" { - c.Addr = "tcp://localhost:11300" + c.Addr = "tcp://127.0.0.1:11300" } if c.Timeout == 0 { @@ -25,7 +29,7 @@ func (c *GlobalCfg) InitDefault() { type Config struct { PipePriority int64 `mapstructure:"priority"` - TubePriority uint32 `mapstructure:"tube_priority"` + TubePriority *uint32 `mapstructure:"tube_priority"` Tube string `mapstructure:"tube"` ReserveTimeout time.Duration `mapstructure:"reserve_timeout"` } @@ -39,6 +43,10 @@ func (c *Config) InitDefault() { c.ReserveTimeout = time.Second * 1 } + if c.TubePriority == nil { + c.TubePriority = utils.Uint32(0) + } + if c.PipePriority == 0 { c.PipePriority = 10 } diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index b57b22ac..21b05b16 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -14,6 +14,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type JobConsumer struct { @@ -33,7 +34,7 @@ type JobConsumer struct { tout time.Duration // tube name tName string - tubePriority uint32 + tubePriority *uint32 priority int64 stopCh chan struct{} @@ -65,7 +66,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config dsn := strings.Split(globalCfg.Addr, "://") if len(dsn) != 2 { - return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) + return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log) @@ -115,7 +116,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu dsn := strings.Split(globalCfg.Addr, "://") if len(dsn) != 2 { - return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) + return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log) @@ -134,7 +135,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu tout: globalCfg.Timeout, tName: pipe.String(tube, "default"), reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), - tubePriority: uint32(pipe.Int(tubePriority, 0)), + tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))), priority: pipe.Priority(), // buffered with two because jobs root plugin can call Stop at the same time as Pause @@ -190,7 +191,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(ctx, bb.Bytes(), j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration()) + id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration()) if err != nil { errD := j.pool.Delete(ctx, id) if errD != nil { diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 050d74b9..52c2cf65 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -180,7 +180,7 @@ func (j *JobConsumer) Pause(_ context.Context, pipeline string) { } j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, + Event: events.EventPipePaused, Pipeline: pipeline, Start: time.Now(), Elapsed: 0, diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go index 39d0af48..9b2a1ca8 100644 --- a/plugins/jobs/drivers/sqs/config.go +++ b/plugins/jobs/drivers/sqs/config.go @@ -87,7 +87,7 @@ type Config struct { func (c *GlobalCfg) InitDefault() { if c.Endpoint == "" { - c.Endpoint = "http://localhost:9324" + c.Endpoint = "http://127.0.0.1:9324" } } diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index ea4ac8b7..cd2f6104 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -25,7 +25,6 @@ var itemAttributes = []string{ job.RRDelay, job.RRTimeout, job.RRPriority, - job.RRMaxAttempts, } type Item struct { diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/kv/drivers/memcached/config.go index 7aad53b6..6d413790 100644 --- a/plugins/kv/drivers/memcached/config.go +++ b/plugins/kv/drivers/memcached/config.go @@ -7,6 +7,6 @@ type Config struct { func (s *Config) InitDefaults() { if s.Addr == nil { - s.Addr = []string{"localhost:11211"} // default url for memcached + s.Addr = []string{"127.0.0.1:11211"} // default url for memcached } } diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index e9ea25df..53fade97 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -80,7 +80,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit memcached: driver: memcached - addr: [ "localhost:11211" ] + addr: [ "127.0.0.1:11211" ] For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached diff --git a/plugins/metrics/config.go b/plugins/metrics/config.go index dd36005e..a2835130 100644 --- a/plugins/metrics/config.go +++ b/plugins/metrics/config.go @@ -135,6 +135,6 @@ func (c *Config) getCollectors() (map[string]prometheus.Collector, error) { func (c *Config) InitDefaults() { if c.Address == "" { - c.Address = "localhost:2112" + c.Address = "127.0.0.1:2112" } } diff --git a/plugins/redis/config.go b/plugins/redis/config.go index 41348236..9acb4b47 100644 --- a/plugins/redis/config.go +++ b/plugins/redis/config.go @@ -29,6 +29,6 @@ type Config struct { // InitDefaults initializing fill config with default values func (s *Config) InitDefaults() { if s.Addrs == nil { - s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage + s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage } } |