diff options
Diffstat (limited to 'plugins/jobs/drivers/ephemeral/item.go')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/item.go | 46 |
1 files changed, 31 insertions, 15 deletions
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index 8560f10a..d140c9ed 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -8,20 +8,6 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - Timeout: job.Options.Timeout, - }, - } -} - type Item struct { // Job contains name of job broker (usually PHP class). Job string `json:"job"` @@ -53,6 +39,9 @@ type Options struct { // Timeout defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout int64 `json:"timeout,omitempty"` + + // private + requeueCh chan *Item } // DelayDuration returns delay duration in a form of time.Duration. @@ -111,6 +100,33 @@ func (i *Item) Nack() error { return nil } -func (i *Item) Requeue(_ int64) error { +func (i *Item) Requeue(delay int64) error { + go func() { + time.Sleep(time.Second * time.Duration(delay)) + // overwrite the delay + i.Options.Delay = delay + select { + case i.Options.requeueCh <- i: + return + default: + // TODO(rustatian): logs? + return + } + }() + return nil } + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, + }, + } +} |