summaryrefslogtreecommitdiff
path: root/plugins/beanstalk/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/beanstalk/consumer.go')
-rw-r--r--plugins/beanstalk/consumer.go28
1 files changed, 21 insertions, 7 deletions
diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
index 5ef89983..30807f03 100644
--- a/plugins/beanstalk/consumer.go
+++ b/plugins/beanstalk/consumer.go
@@ -3,6 +3,7 @@ package beanstalk
import (
"bytes"
"context"
+ "encoding/gob"
"strconv"
"strings"
"sync/atomic"
@@ -183,11 +184,16 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error {
bb := new(bytes.Buffer)
bb.Grow(64)
- err := item.pack(bb)
+ err := gob.NewEncoder(bb).Encode(item)
if err != nil {
return errors.E(op, err)
}
+ body := make([]byte, bb.Len())
+ copy(body, bb.Bytes())
+ bb.Reset()
+ bb = nil
+
// https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458
// <pri> is an integer < 2**32. Jobs with smaller priority values will be
// scheduled before jobs with larger priorities. The most urgent priority is 0;
@@ -203,7 +209,7 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout)
+ id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout)
if err != nil {
errD := j.pool.Delete(ctx, id)
if errD != nil {
@@ -260,9 +266,10 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
- // check if the pipeline registered
+ start := time.Now()
// load atomic value
+ // check if the pipeline registered
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name()))
@@ -276,13 +283,15 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (j *consumer) Stop(context.Context) error {
+ start := time.Now()
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if atomic.LoadUint32(&j.listeners) == 1 {
@@ -293,13 +302,15 @@ func (j *consumer) Stop(context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (j *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -322,11 +333,13 @@ func (j *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (j *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -351,7 +364,8 @@ func (j *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}