diff options
author | Valery Piashchynski <[email protected]> | 2021-07-22 14:26:13 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-22 14:26:13 +0300 |
commit | fedf012e632a31d2d0837c22832c7683547ad379 (patch) | |
tree | bcb5634dfacccc6d34e49aa7337ac8d1f18b693c /plugins/jobs | |
parent | 609e61426b137834ac589c88f1124574f939fa67 (diff) |
BC for the Pool, worker interfaces, pass/return payload by pointer
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 3 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 2 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 8 |
3 files changed, 6 insertions, 7 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index fc659902..6cc50c07 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -167,8 +167,7 @@ var connErrors = map[string]struct{}{"EOF": {}} func (cp *ConnPool) checkAndRedial(err error) error { const op = errors.Op("connection_pool_check_redial") - switch et := err.(type) { - + switch et := err.(type) { //nolint:gocritic // check if the error case beanstalk.ConnError: switch bErr := et.Err.(type) { diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index 0f98312a..3e9061a3 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -12,7 +12,7 @@ func (j *JobConsumer) listen() { id, body, err := j.pool.Reserve(j.reserveTimeout) if err != nil { if errB, ok := err.(beanstalk.ConnError); ok { - switch errB.Err { + switch errB.Err { //nolint:gocritic case beanstalk.ErrTimeout: j.log.Info("beanstalk reserve timeout", "warn", errB.Op) continue diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index c8973f1e..219799b8 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -83,7 +83,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.stopCh = make(chan struct{}, 1) p.pldPool = sync.Pool{New: func() interface{} { // with nil fields - return payload.Payload{} + return &payload.Payload{} }} // initial set of pipelines @@ -104,11 +104,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return nil } -func (p *Plugin) getPayload() payload.Payload { - return p.pldPool.Get().(payload.Payload) +func (p *Plugin) getPayload() *payload.Payload { + return p.pldPool.Get().(*payload.Payload) } -func (p *Plugin) putPayload(pld payload.Payload) { +func (p *Plugin) putPayload(pld *payload.Payload) { pld.Body = nil pld.Context = nil p.pldPool.Put(pld) |