summaryrefslogtreecommitdiff
path: root/plugins/boltdb/boltjobs/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/boltdb/boltjobs/consumer.go')
-rw-r--r--plugins/boltdb/boltjobs/consumer.go41
1 files changed, 18 insertions, 23 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index 2492ab60..ed0eda61 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -65,7 +65,6 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
}
conf := &GlobalCfg{}
-
err := cfg.UnmarshalKey(PluginName, conf)
if err != nil {
return nil, errors.E(op, err)
@@ -246,49 +245,45 @@ func (c *consumer) Push(_ context.Context, job *job.Job) error {
const op = errors.Op("boltdb_jobs_push")
err := c.db.Update(func(tx *bolt.Tx) error {
item := fromJob(job)
+ // pool with buffers
+ buf := c.get()
+ // encode the job
+ enc := gob.NewEncoder(buf)
+ err := enc.Encode(item)
+ if err != nil {
+ c.put(buf)
+ return errors.E(op, err)
+ }
+
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+ c.put(buf)
// handle delay
if item.Options.Delay > 0 {
b := tx.Bucket(utils.AsBytes(DelayBucket))
- tKey := time.Now().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339)
-
- // pool with buffers
- buf := c.get()
- defer c.put(buf)
+ tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339)
- enc := gob.NewEncoder(buf)
- err := enc.Encode(item)
+ err = b.Put(utils.AsBytes(tKey), value)
if err != nil {
return errors.E(op, err)
}
- value := make([]byte, buf.Len())
- copy(value, buf.Bytes())
-
atomic.AddUint64(c.delayed, 1)
- return b.Put(utils.AsBytes(tKey), value)
+ return nil
}
b := tx.Bucket(utils.AsBytes(PushBucket))
-
- // pool with buffers
- buf := c.get()
- defer c.put(buf)
-
- enc := gob.NewEncoder(buf)
- err := enc.Encode(item)
+ err = b.Put(utils.AsBytes(item.ID()), value)
if err != nil {
return errors.E(op, err)
}
- value := make([]byte, buf.Len())
- copy(value, buf.Bytes())
-
// increment active counter
atomic.AddUint64(c.active, 1)
- return b.Put(utils.AsBytes(item.ID()), value)
+ return nil
})
if err != nil {