summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/ephemeral/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/ephemeral/item.go')
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go22
1 files changed, 20 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index 1a61d7e9..3298424d 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -2,6 +2,7 @@ package ephemeral
import (
"context"
+ "sync/atomic"
"time"
json "github.com/json-iterator/go"
@@ -40,6 +41,8 @@ type Options struct {
// private
requeueFn func(context.Context, *Item) error
+ active *int64
+ delayed *int64
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -79,12 +82,12 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
- // noop for the in-memory
+ i.atomicallyReduceCount()
return nil
}
func (i *Item) Nack() error {
- // noop for the in-memory
+ i.atomicallyReduceCount()
return nil
}
@@ -93,6 +96,8 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
i.Options.Delay = delay
i.Headers = headers
+ i.atomicallyReduceCount()
+
err := i.Options.requeueFn(context.Background(), i)
if err != nil {
return err
@@ -101,6 +106,19 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return nil
}
+// atomicallyReduceCount reduces counter of active or delayed jobs
+func (i *Item) atomicallyReduceCount() {
+ // if job was delayed, reduce number of the delayed jobs
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ return
+ }
+
+ // otherwise, reduce number of the active jobs
+ atomic.AddInt64(i.Options.active, ^int64(0))
+ // noop for the in-memory
+}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,