diff options
Diffstat (limited to 'plugins/boltdb/boltjobs/item.go')
-rw-r--r-- | plugins/boltdb/boltjobs/item.go | 45 |
1 files changed, 23 insertions, 22 deletions
diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go index 4f02bb43..837f8c63 100644 --- a/plugins/boltdb/boltjobs/item.go +++ b/plugins/boltdb/boltjobs/item.go @@ -43,8 +43,7 @@ type Options struct { Delay int64 `json:"delay,omitempty"` // private - db *bbolt.DB - + db *bbolt.DB active *uint64 delayed *uint64 } @@ -137,6 +136,17 @@ func (i *Item) Nack() error { return tx.Commit() } +/* +Requeue algorithm: +1. Rewrite item headers and delay. +2. Begin writable transaction on attached to the item db. +3. Delete item from the InQueueBucket +4. Handle items with the delay: + 4.1. Get DelayBucket + 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format + 4.3. Put this key with value to the DelayBucket +5. W/o delay, put the key with value to the PushBucket (requeue) +*/ func (i *Item) Requeue(headers map[string][]string, delay int64) error { const op = errors.Op("boltdb_item_requeue") i.Headers = headers @@ -153,23 +163,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return errors.E(op, i.rollback(err, tx)) } + // encode the item + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err = enc.Encode(i) + val := make([]byte, buf.Len()) + copy(val, buf.Bytes()) + buf.Reset() + if delay > 0 { delayB := tx.Bucket(utils.AsBytes(DelayBucket)) - tKey := time.Now().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) - - buf := new(bytes.Buffer) - enc := gob.NewEncoder(buf) - err = enc.Encode(i) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } + tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) - err = delayB.Put(utils.AsBytes(tKey), buf.Bytes()) if err != nil { return errors.E(op, i.rollback(err, tx)) } - err = inQb.Delete(utils.AsBytes(i.ID())) + err = delayB.Put(utils.AsBytes(tKey), val) if err != nil { return errors.E(op, i.rollback(err, tx)) } @@ -178,20 +188,11 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { } pushB := tx.Bucket(utils.AsBytes(PushBucket)) - - buf := new(bytes.Buffer) - enc := gob.NewEncoder(buf) - err = enc.Encode(i) if err != nil { return errors.E(op, i.rollback(err, tx)) } - err = pushB.Put(utils.AsBytes(i.ID()), buf.Bytes()) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - err = inQb.Delete(utils.AsBytes(i.ID())) + err = pushB.Put(utils.AsBytes(i.ID()), val) if err != nil { return errors.E(op, i.rollback(err, tx)) } |