summaryrefslogtreecommitdiff
path: root/plugins/beanstalk/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/beanstalk/consumer.go')
-rw-r--r--plugins/beanstalk/consumer.go10
1 files changed, 8 insertions, 2 deletions
diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
index 5ef89983..dc2a7e91 100644
--- a/plugins/beanstalk/consumer.go
+++ b/plugins/beanstalk/consumer.go
@@ -3,6 +3,7 @@ package beanstalk
import (
"bytes"
"context"
+ "encoding/gob"
"strconv"
"strings"
"sync/atomic"
@@ -183,11 +184,16 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error {
bb := new(bytes.Buffer)
bb.Grow(64)
- err := item.pack(bb)
+ err := gob.NewEncoder(bb).Encode(item)
if err != nil {
return errors.E(op, err)
}
+ body := make([]byte, bb.Len())
+ copy(body, bb.Bytes())
+ bb.Reset()
+ bb = nil
+
// https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458
// <pri> is an integer < 2**32. Jobs with smaller priority values will be
// scheduled before jobs with larger priorities. The most urgent priority is 0;
@@ -203,7 +209,7 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout)
+ id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout)
if err != nil {
errD := j.pool.Delete(ctx, id)
if errD != nil {