diff options
Diffstat (limited to 'plugins/boltdb/boltjobs/consumer.go')
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 41 |
1 files changed, 18 insertions, 23 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index 2492ab60..ed0eda61 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -65,7 +65,6 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e } conf := &GlobalCfg{} - err := cfg.UnmarshalKey(PluginName, conf) if err != nil { return nil, errors.E(op, err) @@ -246,49 +245,45 @@ func (c *consumer) Push(_ context.Context, job *job.Job) error { const op = errors.Op("boltdb_jobs_push") err := c.db.Update(func(tx *bolt.Tx) error { item := fromJob(job) + // pool with buffers + buf := c.get() + // encode the job + enc := gob.NewEncoder(buf) + err := enc.Encode(item) + if err != nil { + c.put(buf) + return errors.E(op, err) + } + + value := make([]byte, buf.Len()) + copy(value, buf.Bytes()) + c.put(buf) // handle delay if item.Options.Delay > 0 { b := tx.Bucket(utils.AsBytes(DelayBucket)) - tKey := time.Now().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) - - // pool with buffers - buf := c.get() - defer c.put(buf) + tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) - enc := gob.NewEncoder(buf) - err := enc.Encode(item) + err = b.Put(utils.AsBytes(tKey), value) if err != nil { return errors.E(op, err) } - value := make([]byte, buf.Len()) - copy(value, buf.Bytes()) - atomic.AddUint64(c.delayed, 1) - return b.Put(utils.AsBytes(tKey), value) + return nil } b := tx.Bucket(utils.AsBytes(PushBucket)) - - // pool with buffers - buf := c.get() - defer c.put(buf) - - enc := gob.NewEncoder(buf) - err := enc.Encode(item) + err = b.Put(utils.AsBytes(item.ID()), value) if err != nil { return errors.E(op, err) } - value := make([]byte, buf.Len()) - copy(value, buf.Bytes()) - // increment active counter atomic.AddUint64(c.active, 1) - return b.Put(utils.AsBytes(item.ID()), value) + return nil }) if err != nil { |