summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/config/ssl.go2
-rw-r--r--plugins/http/config/ssl_config_test.go2
-rw-r--r--plugins/jobs/drivers/amqp/config.go2
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/config.go14
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go11
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go2
-rw-r--r--plugins/jobs/drivers/sqs/config.go2
-rw-r--r--plugins/jobs/drivers/sqs/item.go1
-rw-r--r--plugins/kv/drivers/memcached/config.go2
-rw-r--r--plugins/kv/plugin.go2
-rw-r--r--plugins/metrics/config.go2
-rw-r--r--plugins/redis/config.go2
13 files changed, 27 insertions, 19 deletions
diff --git a/plugins/http/config/ssl.go b/plugins/http/config/ssl.go
index d44703f9..0e3c0caf 100644
--- a/plugins/http/config/ssl.go
+++ b/plugins/http/config/ssl.go
@@ -36,7 +36,7 @@ func (s *SSL) Valid() error {
parts := strings.Split(s.Address, ":")
switch len(parts) {
// :443 form
- // localhost:443 form
+ // 127.0.0.1:443 form
// use 0.0.0.0 as host and 443 as port
case 2:
if parts[0] == "" {
diff --git a/plugins/http/config/ssl_config_test.go b/plugins/http/config/ssl_config_test.go
index 1f5fef0a..8f6cf40e 100644
--- a/plugins/http/config/ssl_config_test.go
+++ b/plugins/http/config/ssl_config_test.go
@@ -101,7 +101,7 @@ func TestSSL_Valid6(t *testing.T) {
func TestSSL_Valid7(t *testing.T) {
conf := &SSL{
- Address: "localhost:555:1",
+ Address: "127.0.0.1:555:1",
Redirect: false,
Key: "../../../tests/plugins/http/fixtures/server.key",
Cert: "../../../tests/plugins/http/fixtures/server.crt",
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go
index 37f5c1c4..3fb0d066 100644
--- a/plugins/jobs/drivers/amqp/config.go
+++ b/plugins/jobs/drivers/amqp/config.go
@@ -57,6 +57,6 @@ func (c *Config) InitDefault() {
func (c *GlobalCfg) InitDefault() {
if c.Addr == "" {
- c.Addr = "amqp://guest:guest@localhost:5672/"
+ c.Addr = "amqp://guest:[email protected]:5672/"
}
}
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 36a16bcd..d7425858 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -181,7 +181,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
exchangeName: pipeline.String(exchangeKey, "amqp.default"),
prefetch: pipeline.Int(prefetch, 10),
priority: int64(pipeline.Int(priority, 10)),
- exclusive: pipeline.Bool(exclusive, true),
+ exclusive: pipeline.Bool(exclusive, false),
multipleAck: pipeline.Bool(multipleAsk, false),
requeueOnFail: pipeline.Bool(requeueOnFail, false),
requeueCh: make(chan *Item, 1000),
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go
index f0012362..a8069f5d 100644
--- a/plugins/jobs/drivers/beanstalk/config.go
+++ b/plugins/jobs/drivers/beanstalk/config.go
@@ -1,6 +1,10 @@
package beanstalk
-import "time"
+import (
+ "time"
+
+ "github.com/spiral/roadrunner/v2/utils"
+)
const (
tubePriority string = "tube_priority"
@@ -15,7 +19,7 @@ type GlobalCfg struct {
func (c *GlobalCfg) InitDefault() {
if c.Addr == "" {
- c.Addr = "tcp://localhost:11300"
+ c.Addr = "tcp://127.0.0.1:11300"
}
if c.Timeout == 0 {
@@ -25,7 +29,7 @@ func (c *GlobalCfg) InitDefault() {
type Config struct {
PipePriority int64 `mapstructure:"priority"`
- TubePriority uint32 `mapstructure:"tube_priority"`
+ TubePriority *uint32 `mapstructure:"tube_priority"`
Tube string `mapstructure:"tube"`
ReserveTimeout time.Duration `mapstructure:"reserve_timeout"`
}
@@ -39,6 +43,10 @@ func (c *Config) InitDefault() {
c.ReserveTimeout = time.Second * 1
}
+ if c.TubePriority == nil {
+ c.TubePriority = utils.Uint32(0)
+ }
+
if c.PipePriority == 0 {
c.PipePriority = 10
}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index b57b22ac..21b05b16 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -14,6 +14,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type JobConsumer struct {
@@ -33,7 +34,7 @@ type JobConsumer struct {
tout time.Duration
// tube name
tName string
- tubePriority uint32
+ tubePriority *uint32
priority int64
stopCh chan struct{}
@@ -65,7 +66,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
dsn := strings.Split(globalCfg.Addr, "://")
if len(dsn) != 2 {
- return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
+ return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log)
@@ -115,7 +116,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
dsn := strings.Split(globalCfg.Addr, "://")
if len(dsn) != 2 {
- return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
+ return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log)
@@ -134,7 +135,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
tout: globalCfg.Timeout,
tName: pipe.String(tube, "default"),
reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
- tubePriority: uint32(pipe.Int(tubePriority, 0)),
+ tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))),
priority: pipe.Priority(),
// buffered with two because jobs root plugin can call Stop at the same time as Pause
@@ -190,7 +191,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.pool.Put(ctx, bb.Bytes(), j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration())
+ id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration())
if err != nil {
errD := j.pool.Delete(ctx, id)
if errD != nil {
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 050d74b9..52c2cf65 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -180,7 +180,7 @@ func (j *JobConsumer) Pause(_ context.Context, pipeline string) {
}
j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
+ Event: events.EventPipePaused,
Pipeline: pipeline,
Start: time.Now(),
Elapsed: 0,
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go
index 39d0af48..9b2a1ca8 100644
--- a/plugins/jobs/drivers/sqs/config.go
+++ b/plugins/jobs/drivers/sqs/config.go
@@ -87,7 +87,7 @@ type Config struct {
func (c *GlobalCfg) InitDefault() {
if c.Endpoint == "" {
- c.Endpoint = "http://localhost:9324"
+ c.Endpoint = "http://127.0.0.1:9324"
}
}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index ea4ac8b7..cd2f6104 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -25,7 +25,6 @@ var itemAttributes = []string{
job.RRDelay,
job.RRTimeout,
job.RRPriority,
- job.RRMaxAttempts,
}
type Item struct {
diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/kv/drivers/memcached/config.go
index 7aad53b6..6d413790 100644
--- a/plugins/kv/drivers/memcached/config.go
+++ b/plugins/kv/drivers/memcached/config.go
@@ -7,6 +7,6 @@ type Config struct {
func (s *Config) InitDefaults() {
if s.Addr == nil {
- s.Addr = []string{"localhost:11211"} // default url for memcached
+ s.Addr = []string{"127.0.0.1:11211"} // default url for memcached
}
}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index e9ea25df..53fade97 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -80,7 +80,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
memcached:
driver: memcached
- addr: [ "localhost:11211" ]
+ addr: [ "127.0.0.1:11211" ]
For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
diff --git a/plugins/metrics/config.go b/plugins/metrics/config.go
index dd36005e..a2835130 100644
--- a/plugins/metrics/config.go
+++ b/plugins/metrics/config.go
@@ -135,6 +135,6 @@ func (c *Config) getCollectors() (map[string]prometheus.Collector, error) {
func (c *Config) InitDefaults() {
if c.Address == "" {
- c.Address = "localhost:2112"
+ c.Address = "127.0.0.1:2112"
}
}
diff --git a/plugins/redis/config.go b/plugins/redis/config.go
index 41348236..9acb4b47 100644
--- a/plugins/redis/config.go
+++ b/plugins/redis/config.go
@@ -29,6 +29,6 @@ type Config struct {
// InitDefaults initializing fill config with default values
func (s *Config) InitDefaults() {
if s.Addrs == nil {
- s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
}
}