diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index b57b22ac..21b05b16 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -14,6 +14,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type JobConsumer struct { @@ -33,7 +34,7 @@ type JobConsumer struct { tout time.Duration // tube name tName string - tubePriority uint32 + tubePriority *uint32 priority int64 stopCh chan struct{} @@ -65,7 +66,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config dsn := strings.Split(globalCfg.Addr, "://") if len(dsn) != 2 { - return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) + return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log) @@ -115,7 +116,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu dsn := strings.Split(globalCfg.Addr, "://") if len(dsn) != 2 { - return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) + return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log) @@ -134,7 +135,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu tout: globalCfg.Timeout, tName: pipe.String(tube, "default"), reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), - tubePriority: uint32(pipe.Int(tubePriority, 0)), + tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))), priority: pipe.Priority(), // buffered with two because jobs root plugin can call Stop at the same time as Pause @@ -190,7 +191,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(ctx, bb.Bytes(), j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration()) + id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration()) if err != nil { errD := j.pool.Delete(ctx, id) if errD != nil { |