summaryrefslogtreecommitdiff
path: root/plugins/boltdb/boltjobs/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/boltdb/boltjobs/item.go')
-rw-r--r--plugins/boltdb/boltjobs/item.go45
1 files changed, 23 insertions, 22 deletions
diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go
index 4f02bb43..837f8c63 100644
--- a/plugins/boltdb/boltjobs/item.go
+++ b/plugins/boltdb/boltjobs/item.go
@@ -43,8 +43,7 @@ type Options struct {
Delay int64 `json:"delay,omitempty"`
// private
- db *bbolt.DB
-
+ db *bbolt.DB
active *uint64
delayed *uint64
}
@@ -137,6 +136,17 @@ func (i *Item) Nack() error {
return tx.Commit()
}
+/*
+Requeue algorithm:
+1. Rewrite item headers and delay.
+2. Begin writable transaction on attached to the item db.
+3. Delete item from the InQueueBucket
+4. Handle items with the delay:
+ 4.1. Get DelayBucket
+ 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format
+ 4.3. Put this key with value to the DelayBucket
+5. W/o delay, put the key with value to the PushBucket (requeue)
+*/
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
const op = errors.Op("boltdb_item_requeue")
i.Headers = headers
@@ -153,23 +163,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return errors.E(op, i.rollback(err, tx))
}
+ // encode the item
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err = enc.Encode(i)
+ val := make([]byte, buf.Len())
+ copy(val, buf.Bytes())
+ buf.Reset()
+
if delay > 0 {
delayB := tx.Bucket(utils.AsBytes(DelayBucket))
- tKey := time.Now().Add(time.Second * time.Duration(delay)).Format(time.RFC3339)
-
- buf := new(bytes.Buffer)
- enc := gob.NewEncoder(buf)
- err = enc.Encode(i)
- if err != nil {
- return errors.E(op, i.rollback(err, tx))
- }
+ tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339)
- err = delayB.Put(utils.AsBytes(tKey), buf.Bytes())
if err != nil {
return errors.E(op, i.rollback(err, tx))
}
- err = inQb.Delete(utils.AsBytes(i.ID()))
+ err = delayB.Put(utils.AsBytes(tKey), val)
if err != nil {
return errors.E(op, i.rollback(err, tx))
}
@@ -178,20 +188,11 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
}
pushB := tx.Bucket(utils.AsBytes(PushBucket))
-
- buf := new(bytes.Buffer)
- enc := gob.NewEncoder(buf)
- err = enc.Encode(i)
if err != nil {
return errors.E(op, i.rollback(err, tx))
}
- err = pushB.Put(utils.AsBytes(i.ID()), buf.Bytes())
- if err != nil {
- return errors.E(op, i.rollback(err, tx))
- }
-
- err = inQb.Delete(utils.AsBytes(i.ID()))
+ err = pushB.Put(utils.AsBytes(i.ID()), val)
if err != nil {
return errors.E(op, i.rollback(err, tx))
}