diff options
author | Valery Piashchynski <[email protected]> | 2021-08-31 11:54:24 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-31 11:54:24 +0300 |
commit | 0c10cb989fa1deae3996df272f40e2270a880b52 (patch) | |
tree | 6bc3bd0e56ffe8753933dd2957a3269ee578d5b8 /plugins | |
parent | 0f5f9517f9b5bb79e265bbf7d9ee8ce4633cf9b4 (diff) |
Finish boltdb tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go | 2 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/redial.go | 4 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 41 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/item.go | 45 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 91 |
5 files changed, 88 insertions, 95 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 578f36ce..784a102c 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -479,7 +479,7 @@ func (c *consumer) handleItem(ctx context.Context, msg *Item) error { err = pch.Publish(c.exchangeName, tmpQ, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, - Timestamp: time.Now().UTC(), + Timestamp: time.Now(), DeliveryMode: amqp.Persistent, Body: msg.Body(), }) diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 56142e2b..8d21784f 100644 --- a/plugins/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -27,7 +27,7 @@ func (c *consumer) redialer() { //nolint:gocognit // trash the broken publishing channel <-c.publishChan - t := time.Now() + t := time.Now().UTC() pipe := c.pipeline.Load().(*pipeline.Pipeline) c.eh.Push(events.JobEvent{ @@ -35,7 +35,7 @@ func (c *consumer) redialer() { //nolint:gocognit Pipeline: pipe.Name(), Driver: pipe.Driver(), Error: err, - Start: time.Now(), + Start: time.Now().UTC(), }) expb := backoff.NewExponentialBackOff() diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index 2492ab60..ed0eda61 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -65,7 +65,6 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e } conf := &GlobalCfg{} - err := cfg.UnmarshalKey(PluginName, conf) if err != nil { return nil, errors.E(op, err) @@ -246,49 +245,45 @@ func (c *consumer) Push(_ context.Context, job *job.Job) error { const op = errors.Op("boltdb_jobs_push") err := c.db.Update(func(tx *bolt.Tx) error { item := fromJob(job) + // pool with buffers + buf := c.get() + // encode the job + enc := gob.NewEncoder(buf) + err := enc.Encode(item) + if err != nil { + c.put(buf) + return errors.E(op, err) + } + + value := make([]byte, buf.Len()) + copy(value, buf.Bytes()) + c.put(buf) // handle delay if item.Options.Delay > 0 { b := tx.Bucket(utils.AsBytes(DelayBucket)) - tKey := time.Now().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) - - // pool with buffers - buf := c.get() - defer c.put(buf) + tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) - enc := gob.NewEncoder(buf) - err := enc.Encode(item) + err = b.Put(utils.AsBytes(tKey), value) if err != nil { return errors.E(op, err) } - value := make([]byte, buf.Len()) - copy(value, buf.Bytes()) - atomic.AddUint64(c.delayed, 1) - return b.Put(utils.AsBytes(tKey), value) + return nil } b := tx.Bucket(utils.AsBytes(PushBucket)) - - // pool with buffers - buf := c.get() - defer c.put(buf) - - enc := gob.NewEncoder(buf) - err := enc.Encode(item) + err = b.Put(utils.AsBytes(item.ID()), value) if err != nil { return errors.E(op, err) } - value := make([]byte, buf.Len()) - copy(value, buf.Bytes()) - // increment active counter atomic.AddUint64(c.active, 1) - return b.Put(utils.AsBytes(item.ID()), value) + return nil }) if err != nil { diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go index 4f02bb43..837f8c63 100644 --- a/plugins/boltdb/boltjobs/item.go +++ b/plugins/boltdb/boltjobs/item.go @@ -43,8 +43,7 @@ type Options struct { Delay int64 `json:"delay,omitempty"` // private - db *bbolt.DB - + db *bbolt.DB active *uint64 delayed *uint64 } @@ -137,6 +136,17 @@ func (i *Item) Nack() error { return tx.Commit() } +/* +Requeue algorithm: +1. Rewrite item headers and delay. +2. Begin writable transaction on attached to the item db. +3. Delete item from the InQueueBucket +4. Handle items with the delay: + 4.1. Get DelayBucket + 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format + 4.3. Put this key with value to the DelayBucket +5. W/o delay, put the key with value to the PushBucket (requeue) +*/ func (i *Item) Requeue(headers map[string][]string, delay int64) error { const op = errors.Op("boltdb_item_requeue") i.Headers = headers @@ -153,23 +163,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return errors.E(op, i.rollback(err, tx)) } + // encode the item + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err = enc.Encode(i) + val := make([]byte, buf.Len()) + copy(val, buf.Bytes()) + buf.Reset() + if delay > 0 { delayB := tx.Bucket(utils.AsBytes(DelayBucket)) - tKey := time.Now().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) - - buf := new(bytes.Buffer) - enc := gob.NewEncoder(buf) - err = enc.Encode(i) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } + tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) - err = delayB.Put(utils.AsBytes(tKey), buf.Bytes()) if err != nil { return errors.E(op, i.rollback(err, tx)) } - err = inQb.Delete(utils.AsBytes(i.ID())) + err = delayB.Put(utils.AsBytes(tKey), val) if err != nil { return errors.E(op, i.rollback(err, tx)) } @@ -178,20 +188,11 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { } pushB := tx.Bucket(utils.AsBytes(PushBucket)) - - buf := new(bytes.Buffer) - enc := gob.NewEncoder(buf) - err = enc.Encode(i) if err != nil { return errors.E(op, i.rollback(err, tx)) } - err = pushB.Put(utils.AsBytes(i.ID()), buf.Bytes()) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - err = inQb.Delete(utils.AsBytes(i.ID())) + err = pushB.Put(utils.AsBytes(i.ID()), val) if err != nil { return errors.E(op, i.rollback(err, tx)) } diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index d184303a..39de34ab 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -16,7 +16,7 @@ func (c *consumer) listener() { for { select { case <-c.stopCh: - c.log.Warn("boltdb listener stopped") + c.log.Info("boltdb listener stopped") return case <-tt.C: if atomic.LoadUint64(c.active) >= uint64(c.prefetch) { @@ -78,12 +78,22 @@ func (c *consumer) listener() { } func (c *consumer) delayedJobsListener() { - tt := time.NewTicker(time.Millisecond * 100) + tt := time.NewTicker(time.Millisecond * 10) defer tt.Stop() + + // just some 90's + loc, err := time.LoadLocation("UTC") + if err != nil { + c.log.Error("failed to load location, delayed jobs won't work", "error", err) + return + } + + var startDate = utils.AsBytes(time.Date(1990, 1, 1, 0, 0, 0, 0, loc).Format(time.RFC3339)) + for { select { case <-c.stopCh: - c.log.Warn("boltdb listener stopped") + c.log.Info("boltdb listener stopped") return case <-tt.C: tx, err := c.db.Begin(true) @@ -95,45 +105,37 @@ func (c *consumer) delayedJobsListener() { delayB := tx.Bucket(utils.AsBytes(DelayBucket)) inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - // get first item - k, v := delayB.Cursor().First() - if k == nil && v == nil { - _ = tx.Commit() - continue - } - - t, err := time.Parse(time.RFC3339, utils.AsString(k)) - if err != nil { - c.rollback(err, tx) - continue - } - - if t.After(time.Now()) { - _ = tx.Commit() - continue - } - - buf := bytes.NewReader(v) - dec := gob.NewDecoder(buf) - - item := &Item{} - err = dec.Decode(item) - if err != nil { - c.rollback(err, tx) - continue - } - - err = inQb.Put(utils.AsBytes(item.ID()), v) - if err != nil { - c.rollback(err, tx) - continue - } - - // delete key from the PushBucket - err = delayB.Delete(k) - if err != nil { - c.rollback(err, tx) - continue + cursor := delayB.Cursor() + endDate := utils.AsBytes(time.Now().UTC().Format(time.RFC3339)) + + for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() { + buf := bytes.NewReader(v) + dec := gob.NewDecoder(buf) + + item := &Item{} + err = dec.Decode(item) + if err != nil { + c.rollback(err, tx) + continue + } + + err = inQb.Put(utils.AsBytes(item.ID()), v) + if err != nil { + c.rollback(err, tx) + continue + } + + // delete key from the PushBucket + err = delayB.Delete(k) + if err != nil { + c.rollback(err, tx) + continue + } + + // attach pointer to the DB + item.attachDB(c.db, c.active, c.delayed) + // as the last step, after commit, put the item into the PQ + c.pq.Insert(item) } err = tx.Commit() @@ -141,11 +143,6 @@ func (c *consumer) delayedJobsListener() { c.rollback(err, tx) continue } - - // attach pointer to the DB - item.attachDB(c.db, c.active, c.delayed) - // as the last step, after commit, put the item into the PQ - c.pq.Insert(item) } } } |