diff options
Diffstat (limited to 'plugins/boltdb/boltjobs/listener.go')
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 156 |
1 files changed, 0 insertions, 156 deletions
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go deleted file mode 100644 index 081d3f57..00000000 --- a/plugins/boltdb/boltjobs/listener.go +++ /dev/null @@ -1,156 +0,0 @@ -package boltjobs - -import ( - "bytes" - "encoding/gob" - "sync/atomic" - "time" - - "github.com/spiral/roadrunner/v2/utils" - bolt "go.etcd.io/bbolt" -) - -func (c *consumer) listener() { - tt := time.NewTicker(time.Millisecond) - defer tt.Stop() - for { - select { - case <-c.stopCh: - c.log.Info("boltdb listener stopped") - return - case <-tt.C: - if atomic.LoadUint64(c.active) > uint64(c.prefetch) { - time.Sleep(time.Second) - continue - } - tx, err := c.db.Begin(true) - if err != nil { - c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) - continue - } - - b := tx.Bucket(utils.AsBytes(PushBucket)) - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - - // get first item - k, v := b.Cursor().First() - if k == nil && v == nil { - _ = tx.Commit() - continue - } - - buf := bytes.NewReader(v) - dec := gob.NewDecoder(buf) - - item := &Item{} - err = dec.Decode(item) - if err != nil { - c.rollback(err, tx) - continue - } - - err = inQb.Put(utils.AsBytes(item.ID()), v) - if err != nil { - c.rollback(err, tx) - continue - } - - // delete key from the PushBucket - err = b.Delete(k) - if err != nil { - c.rollback(err, tx) - continue - } - - err = tx.Commit() - if err != nil { - c.rollback(err, tx) - continue - } - - // attach pointer to the DB - item.attachDB(c.db, c.active, c.delayed) - // as the last step, after commit, put the item into the PQ - c.pq.Insert(item) - } - } -} - -func (c *consumer) delayedJobsListener() { - tt := time.NewTicker(time.Second) - defer tt.Stop() - - // just some 90's - loc, err := time.LoadLocation("UTC") - if err != nil { - c.log.Error("failed to load location, delayed jobs won't work", "error", err) - return - } - - var startDate = utils.AsBytes(time.Date(1990, 1, 1, 0, 0, 0, 0, loc).Format(time.RFC3339)) - - for { - select { - case <-c.stopCh: - c.log.Info("boltdb listener stopped") - return - case <-tt.C: - tx, err := c.db.Begin(true) - if err != nil { - c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) - continue - } - - delayB := tx.Bucket(utils.AsBytes(DelayBucket)) - inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - - cursor := delayB.Cursor() - endDate := utils.AsBytes(time.Now().UTC().Format(time.RFC3339)) - - for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() { - buf := bytes.NewReader(v) - dec := gob.NewDecoder(buf) - - item := &Item{} - err = dec.Decode(item) - if err != nil { - c.rollback(err, tx) - continue - } - - err = inQb.Put(utils.AsBytes(item.ID()), v) - if err != nil { - c.rollback(err, tx) - continue - } - - // delete key from the PushBucket - err = delayB.Delete(k) - if err != nil { - c.rollback(err, tx) - continue - } - - // attach pointer to the DB - item.attachDB(c.db, c.active, c.delayed) - // as the last step, after commit, put the item into the PQ - c.pq.Insert(item) - } - - err = tx.Commit() - if err != nil { - c.rollback(err, tx) - continue - } - } - } -} - -func (c *consumer) rollback(err error, tx *bolt.Tx) { - errR := tx.Rollback() - if errR != nil { - c.log.Error("transaction commit error, rollback failed", "error", err, "rollback error", errR) - } - - c.log.Error("transaction commit error, rollback succeed", "error", err) -} |