diff options
Diffstat (limited to 'plugins/beanstalk/consumer.go')
-rw-r--r-- | plugins/beanstalk/consumer.go | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go index 5ef89983..30807f03 100644 --- a/plugins/beanstalk/consumer.go +++ b/plugins/beanstalk/consumer.go @@ -3,6 +3,7 @@ package beanstalk import ( "bytes" "context" + "encoding/gob" "strconv" "strings" "sync/atomic" @@ -183,11 +184,16 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error { bb := new(bytes.Buffer) bb.Grow(64) - err := item.pack(bb) + err := gob.NewEncoder(bb).Encode(item) if err != nil { return errors.E(op, err) } + body := make([]byte, bb.Len()) + copy(body, bb.Bytes()) + bb.Reset() + bb = nil + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458 // <pri> is an integer < 2**32. Jobs with smaller priority values will be // scheduled before jobs with larger priorities. The most urgent priority is 0; @@ -203,7 +209,7 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout) + id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout) if err != nil { errD := j.pool.Delete(ctx, id) if errD != nil { @@ -260,9 +266,10 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") - // check if the pipeline registered + start := time.Now() // load atomic value + // check if the pipeline registered pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name())) @@ -276,13 +283,15 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Stop(context.Context) error { + start := time.Now() pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -293,13 +302,15 @@ func (j *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Pause(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -322,11 +333,13 @@ func (j *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (j *consumer) Resume(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -351,7 +364,8 @@ func (j *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } |