summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/ephemeral/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/ephemeral/item.go')
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go46
1 files changed, 31 insertions, 15 deletions
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index 8560f10a..d140c9ed 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -8,20 +8,6 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-func fromJob(job *job.Job) *Item {
- return &Item{
- Job: job.Job,
- Ident: job.Ident,
- Payload: job.Payload,
- Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- Timeout: job.Options.Timeout,
- },
- }
-}
-
type Item struct {
// Job contains name of job broker (usually PHP class).
Job string `json:"job"`
@@ -53,6 +39,9 @@ type Options struct {
// Timeout defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout int64 `json:"timeout,omitempty"`
+
+ // private
+ requeueCh chan *Item
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -111,6 +100,33 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(_ int64) error {
+func (i *Item) Requeue(delay int64) error {
+ go func() {
+ time.Sleep(time.Second * time.Duration(delay))
+ // overwrite the delay
+ i.Options.Delay = delay
+ select {
+ case i.Options.requeueCh <- i:
+ return
+ default:
+ // TODO(rustatian): logs?
+ return
+ }
+ }()
+
return nil
}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ },
+ }
+}