diff options
author | Valery Piashchynski <[email protected]> | 2021-07-23 01:25:12 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-23 01:25:12 +0300 |
commit | 3f45d2c008c95daa923fef0c4c9022b2be462971 (patch) | |
tree | edb88b5470c448da2647715633f383d98818c7fb | |
parent | ed36f3ef0d18354ab18848aae488d139aefd1146 (diff) |
Update beanstalk options. Fix tube priority (correctly pass to the Put
method).
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | plugins/jobs/drivers/beanstalk/config.go | 8 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 9 | ||||
-rw-r--r-- | tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml | 30 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_beanstalk_test.go | 3 |
4 files changed, 21 insertions, 29 deletions
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go index 6a8bda1d..c1059d1e 100644 --- a/plugins/jobs/drivers/beanstalk/config.go +++ b/plugins/jobs/drivers/beanstalk/config.go @@ -2,6 +2,12 @@ package beanstalk import "time" +const ( + tubePriority string = "tube_priority" + tube string = "tube" + reserveTimeout string = "reserve_timeout" +) + type GlobalCfg struct { Addr string `mapstructure:"addr"` Timeout time.Duration `mapstructure:"timeout"` @@ -26,7 +32,7 @@ type Config struct { func (c *Config) InitDefault() { if c.Tube == "" { - c.Tube = "default" + c.Tube = "default-" + time.Now().String() } if c.ReserveTimeout == 0 { diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 32ce6ef7..8bd11a58 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -96,11 +96,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { const op = errors.Op("new_beanstalk_consumer") - const ( - tube string = "tube" - reserveTimeout string = "reserve_timeout" - ) - // PARSE CONFIGURATION ------- var globalCfg GlobalCfg @@ -134,7 +129,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu tout: globalCfg.Timeout, tName: pipe.String(tube, "default"), reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), - tubePriority: uint32(pipe.Int(tube, 10)), + tubePriority: uint32(pipe.Int(tubePriority, 0)), priority: pipe.Priority(), // buffered with two because jobs root plugin can call Stop at the same time as Pause @@ -178,7 +173,7 @@ func (j *JobConsumer) Push(jb *job.Job) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration()) + id, err := j.pool.Put(bb.Bytes(), j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration()) if err != nil { errD := j.pool.Delete(id) if errD != nil { diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml index 5ca45b7d..c73ed961 100644 --- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml @@ -28,28 +28,18 @@ jobs: pipelines: test-1: - driver: amqp - prefetch: 100 - queue: test-1-queue - priority: 1 - exchange: default - exchange_type: direct - routing_key: test-1 - exclusive: false - multiple_ack: false - requeue_on_fail: false + driver: beanstalk + priority: 11 + tube_priority: 1 + tube: default-1 + reserve_timeout: 10s test-2: - driver: amqp - prefetch: 100 - queue: test-2-queue - priority: 2 - exchange: default - exchange_type: direct - routing_key: test-2 - exclusive: false - multiple_ack: false - requeue_on_fail: false + driver: beanstalk + priority: 11 + tube_priority: 3 + tube: default-2 + reserve_timeout: 10s # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index 44d4f85e..d3e0dd2a 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -224,9 +224,10 @@ func declareBeanstalkPipe(t *testing.T) { pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ "driver": "beanstalk", "name": "test-3", - "tube": "default", + "tube": "default-" + time.Now().String(), "reserve_timeout": "1", "priority": "3", + "tube_priority": "10", }} er := &jobsv1beta.Empty{} |