summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-12 13:25:36 +0300
committerValery Piashchynski <[email protected]>2021-08-12 13:25:36 +0300
commitecbfc5c5265a9895f4e371ce4388f64df8714e63 (patch)
treedf0749155487eae6bcdbb2456885131a21916f4d /plugins/jobs/drivers/beanstalk
parent4169e8374f581ba2213f8cd1833cc6b9b84438e8 (diff)
Remove unneeded options, complete tests for the ephemeral, update proto
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/encode_test.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go26
3 files changed, 2 insertions, 28 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index f41a2c8a..eaf99be1 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -201,7 +201,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration())
+ id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout)
if err != nil {
errD := j.pool.Delete(ctx, id)
if errD != nil {
diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/jobs/drivers/beanstalk/encode_test.go
index 34f2342b..e43207eb 100644
--- a/plugins/jobs/drivers/beanstalk/encode_test.go
+++ b/plugins/jobs/drivers/beanstalk/encode_test.go
@@ -26,7 +26,6 @@ func BenchmarkEncodeGob(b *testing.B) {
Priority: 10,
Pipeline: "test-local-pipe",
Delay: 10,
- Timeout: 5,
},
}
@@ -60,7 +59,6 @@ func BenchmarkEncodeJsonIter(b *testing.B) {
Priority: 10,
Pipeline: "test-local-pipe",
Delay: 10,
- Timeout: 5,
},
}
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index 47336b43..f1d7ac76 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -41,15 +41,6 @@ type Options struct {
// Delay defines time duration to delay execution for. Defaults to none.
Delay int64 `json:"delay,omitempty"`
- // Reserve defines for how broker should wait until treating job are failed.
- // - <ttr> -- time to run -- is an integer number of seconds to allow a worker
- // to run this job. This time is counted from the moment a worker reserves
- // this job. If the worker does not delete, release, or bury the job within
- // <ttr> seconds, the job will time out and the server will release the job.
- // The minimum ttr is 1. If the client sends 0, the server will silently
- // increase the ttr to 1. Maximum ttr is 2**32-1.
- Timeout int64 `json:"timeout,omitempty"`
-
// Private ================
id uint64
conn *beanstalk.Conn
@@ -61,15 +52,6 @@ func (o *Options) DelayDuration() time.Duration {
return time.Second * time.Duration(o.Delay)
}
-// TimeoutDuration returns timeout duration in a form of time.Duration.
-func (o *Options) TimeoutDuration() time.Duration {
- if o.Timeout == 0 {
- return 30 * time.Minute
- }
-
- return time.Second * time.Duration(o.Timeout)
-}
-
func (i *Item) ID() string {
return i.Ident
}
@@ -91,9 +73,8 @@ func (i *Item) Context() ([]byte, error) {
ID string `json:"id"`
Job string `json:"job"`
Headers map[string][]string `json:"headers"`
- Timeout int64 `json:"timeout"`
Pipeline string `json:"pipeline"`
- }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline},
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
)
if err != nil {
@@ -130,10 +111,6 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return nil
}
-func (i *Item) Recycle() {
- i.Options = nil
-}
-
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
@@ -144,7 +121,6 @@ func fromJob(job *job.Job) *Item {
Priority: job.Options.Priority,
Pipeline: job.Options.Pipeline,
Delay: job.Options.Delay,
- Timeout: job.Options.Timeout,
},
}
}