diff options
Diffstat (limited to 'plugins/jobs/drivers/ephemeral/item.go')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/item.go | 22 |
1 files changed, 20 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index 1a61d7e9..3298424d 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -2,6 +2,7 @@ package ephemeral import ( "context" + "sync/atomic" "time" json "github.com/json-iterator/go" @@ -40,6 +41,8 @@ type Options struct { // private requeueFn func(context.Context, *Item) error + active *int64 + delayed *int64 } // DelayDuration returns delay duration in a form of time.Duration. @@ -79,12 +82,12 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { - // noop for the in-memory + i.atomicallyReduceCount() return nil } func (i *Item) Nack() error { - // noop for the in-memory + i.atomicallyReduceCount() return nil } @@ -93,6 +96,8 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { i.Options.Delay = delay i.Headers = headers + i.atomicallyReduceCount() + err := i.Options.requeueFn(context.Background(), i) if err != nil { return err @@ -101,6 +106,19 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return nil } +// atomicallyReduceCount reduces counter of active or delayed jobs +func (i *Item) atomicallyReduceCount() { + // if job was delayed, reduce number of the delayed jobs + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + return + } + + // otherwise, reduce number of the active jobs + atomic.AddInt64(i.Options.active, ^int64(0)) + // noop for the in-memory +} + func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, |