summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-23 01:25:12 +0300
committerValery Piashchynski <[email protected]>2021-07-23 01:25:12 +0300
commit3f45d2c008c95daa923fef0c4c9022b2be462971 (patch)
treeedb88b5470c448da2647715633f383d98818c7fb
parented36f3ef0d18354ab18848aae488d139aefd1146 (diff)
Update beanstalk options. Fix tube priority (correctly pass to the Put
method). Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--plugins/jobs/drivers/beanstalk/config.go8
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go9
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml30
-rw-r--r--tests/plugins/jobs/jobs_beanstalk_test.go3
4 files changed, 21 insertions, 29 deletions
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go
index 6a8bda1d..c1059d1e 100644
--- a/plugins/jobs/drivers/beanstalk/config.go
+++ b/plugins/jobs/drivers/beanstalk/config.go
@@ -2,6 +2,12 @@ package beanstalk
import "time"
+const (
+ tubePriority string = "tube_priority"
+ tube string = "tube"
+ reserveTimeout string = "reserve_timeout"
+)
+
type GlobalCfg struct {
Addr string `mapstructure:"addr"`
Timeout time.Duration `mapstructure:"timeout"`
@@ -26,7 +32,7 @@ type Config struct {
func (c *Config) InitDefault() {
if c.Tube == "" {
- c.Tube = "default"
+ c.Tube = "default-" + time.Now().String()
}
if c.ReserveTimeout == 0 {
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 32ce6ef7..8bd11a58 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -96,11 +96,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
const op = errors.Op("new_beanstalk_consumer")
- const (
- tube string = "tube"
- reserveTimeout string = "reserve_timeout"
- )
-
// PARSE CONFIGURATION -------
var globalCfg GlobalCfg
@@ -134,7 +129,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
tout: globalCfg.Timeout,
tName: pipe.String(tube, "default"),
reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
- tubePriority: uint32(pipe.Int(tube, 10)),
+ tubePriority: uint32(pipe.Int(tubePriority, 0)),
priority: pipe.Priority(),
// buffered with two because jobs root plugin can call Stop at the same time as Pause
@@ -178,7 +173,7 @@ func (j *JobConsumer) Push(jb *job.Job) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.pool.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration())
+ id, err := j.pool.Put(bb.Bytes(), j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration())
if err != nil {
errD := j.pool.Delete(id)
if errD != nil {
diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml
index 5ca45b7d..c73ed961 100644
--- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml
+++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml
@@ -28,28 +28,18 @@ jobs:
pipelines:
test-1:
- driver: amqp
- prefetch: 100
- queue: test-1-queue
- priority: 1
- exchange: default
- exchange_type: direct
- routing_key: test-1
- exclusive: false
- multiple_ack: false
- requeue_on_fail: false
+ driver: beanstalk
+ priority: 11
+ tube_priority: 1
+ tube: default-1
+ reserve_timeout: 10s
test-2:
- driver: amqp
- prefetch: 100
- queue: test-2-queue
- priority: 2
- exchange: default
- exchange_type: direct
- routing_key: test-2
- exclusive: false
- multiple_ack: false
- requeue_on_fail: false
+ driver: beanstalk
+ priority: 11
+ tube_priority: 3
+ tube: default-2
+ reserve_timeout: 10s
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go
index 44d4f85e..d3e0dd2a 100644
--- a/tests/plugins/jobs/jobs_beanstalk_test.go
+++ b/tests/plugins/jobs/jobs_beanstalk_test.go
@@ -224,9 +224,10 @@ func declareBeanstalkPipe(t *testing.T) {
pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{
"driver": "beanstalk",
"name": "test-3",
- "tube": "default",
+ "tube": "default-" + time.Now().String(),
"reserve_timeout": "1",
"priority": "3",
+ "tube_priority": "10",
}}
er := &jobsv1beta.Empty{}