diff options
author | Valery Piashchynski <[email protected]> | 2021-09-01 16:50:41 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-01 16:50:41 +0300 |
commit | d62acca114a9646afed6ec0217b8cb709687aeb9 (patch) | |
tree | 3357194e05e6edbac46e2d85e4e98ef0d388480e | |
parent | 5ad241b23b64faf7389c424bdecd3489338fa1ba (diff) |
Close connection in the amqp driver.
bytes.Buffer update in the beanstalk driver
Signed-off-by: Valery Piashchynski <[email protected]>
-rwxr-xr-x | Makefile | 10 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go | 6 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/item.go | 7 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/redial.go | 21 | ||||
-rw-r--r-- | plugins/beanstalk/consumer.go | 10 | ||||
-rw-r--r-- | plugins/beanstalk/item.go | 9 | ||||
-rw-r--r-- | plugins/jobs/job/job.go | 11 | ||||
-rw-r--r-- | plugins/jobs/job/job_test.go | 27 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 22 |
9 files changed, 44 insertions, 79 deletions
@@ -15,15 +15,15 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.txt -covermode=atomic ./plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 784a102c..1bfc4b41 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -420,17 +420,17 @@ func (c *consumer) Resume(_ context.Context, p string) { } func (c *consumer) Stop(context.Context) error { - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} - } + c.stopCh <- struct{}{} pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), }) + return nil } diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go index 04385afe..b837ff86 100644 --- a/plugins/amqp/amqpjobs/item.go +++ b/plugins/amqp/amqpjobs/item.go @@ -43,17 +43,18 @@ type Options struct { Delay int64 `json:"delay,omitempty"` // private - // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + // ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery ack func(multiply bool) error - // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error + requeueFn func(context.Context, *Item) error + // delayed jobs TODO(rustatian): figure out how to get stats from the DLX delayed *int64 multipleAsk bool requeue bool diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 8d21784f..698a34a6 100644 --- a/plugins/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -113,25 +113,22 @@ func (c *consumer) redialer() { //nolint:gocognit c.Unlock() case <-c.stopCh: - if c.publishChan != nil { - pch := <-c.publishChan - err := pch.Close() - if err != nil { - c.log.Error("publish channel close", "error", err) - } + pch := <-c.publishChan + err := pch.Close() + if err != nil { + c.log.Error("publish channel close", "error", err) } if c.consumeChan != nil { - err := c.consumeChan.Close() + err = c.consumeChan.Close() if err != nil { c.log.Error("consume channel close", "error", err) } } - if c.conn != nil { - err := c.conn.Close() - if err != nil { - c.log.Error("amqp connection close", "error", err) - } + + err = c.conn.Close() + if err != nil { + c.log.Error("amqp connection close", "error", err) } return diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go index 5ef89983..dc2a7e91 100644 --- a/plugins/beanstalk/consumer.go +++ b/plugins/beanstalk/consumer.go @@ -3,6 +3,7 @@ package beanstalk import ( "bytes" "context" + "encoding/gob" "strconv" "strings" "sync/atomic" @@ -183,11 +184,16 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error { bb := new(bytes.Buffer) bb.Grow(64) - err := item.pack(bb) + err := gob.NewEncoder(bb).Encode(item) if err != nil { return errors.E(op, err) } + body := make([]byte, bb.Len()) + copy(body, bb.Bytes()) + bb.Reset() + bb = nil + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458 // <pri> is an integer < 2**32. Jobs with smaller priority values will be // scheduled before jobs with larger priorities. The most urgent priority is 0; @@ -203,7 +209,7 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout) + id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout) if err != nil { errD := j.pool.Delete(ctx, id) if errD != nil { diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go index 0a6cd560..03060994 100644 --- a/plugins/beanstalk/item.go +++ b/plugins/beanstalk/item.go @@ -125,15 +125,6 @@ func fromJob(job *job.Job) *Item { } } -func (i *Item) pack(b *bytes.Buffer) error { - err := gob.NewEncoder(b).Encode(i) - if err != nil { - return err - } - - return nil -} - func (j *consumer) unpack(id uint64, data []byte, out *Item) error { err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) if err != nil { diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go index 06c3254e..adab2a0a 100644 --- a/plugins/jobs/job/job.go +++ b/plugins/jobs/job/job.go @@ -45,17 +45,6 @@ type Options struct { Delay int64 `json:"delay,omitempty"` } -// Merge merges job options. -func (o *Options) Merge(from *Options) { - if o.Pipeline == "" { - o.Pipeline = from.Pipeline - } - - if o.Delay == 0 { - o.Delay = from.Delay - } -} - // DelayDuration returns delay duration in a form of time.Duration. func (o *Options) DelayDuration() time.Duration { return time.Second * time.Duration(o.Delay) diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go index a47151a3..4a95e27d 100644 --- a/plugins/jobs/job/job_test.go +++ b/plugins/jobs/job/job_test.go @@ -16,30 +16,3 @@ func TestOptions_DelayDuration2(t *testing.T) { opts := &Options{Delay: 1} assert.Equal(t, time.Second, opts.DelayDuration()) } - -func TestOptions_Merge(t *testing.T) { - opts := &Options{} - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - }) - - assert.Equal(t, "pipeline", opts.Pipeline) - assert.Equal(t, int64(2), opts.Delay) -} - -func TestOptions_MergeKeepOriginal(t *testing.T) { - opts := &Options{ - Pipeline: "default", - Delay: 10, - } - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - }) - - assert.Equal(t, "default", opts.Pipeline) - assert.Equal(t, int64(10), opts.Delay) -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 3f3fa196..f411e9a0 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -455,7 +455,6 @@ func (p *Plugin) Push(j *job.Job) error { } // if job has no priority, inherit it from the pipeline - // TODO(rustatian) merge all options, not only priority if j.Options.Priority == 0 { j.Options.Priority = ppl.Priority() } @@ -470,9 +469,9 @@ func (p *Plugin) Push(j *job.Job) error { ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return errors.E(op, err) } @@ -482,9 +481,9 @@ func (p *Plugin) Push(j *job.Job) error { ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return nil @@ -492,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error { func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() for i := 0; i < len(j); i++ { - start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -616,6 +615,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // save the pipeline p.pipelines.Store(pipeline.Name(), pipeline) + p.log.Debug("pipeline declared", "driver", pipeline.Driver(), "name", pipeline.Name()) return nil } @@ -638,11 +638,19 @@ func (p *Plugin) Destroy(pp string) error { // delete consumer delete(p.consumers, ppl.Name()) - p.pipelines.Delete(pp) + // delete old pipeline + p.pipelines.LoadAndDelete(pp) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() + err := d.Stop(ctx) + if err != nil { + cancel() + return errors.E(op, err) + } - return d.Stop(ctx) + d = nil + cancel() + return nil } func (p *Plugin) List() []string { |