summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-01 16:50:41 +0300
committerValery Piashchynski <[email protected]>2021-09-01 16:50:41 +0300
commitd62acca114a9646afed6ec0217b8cb709687aeb9 (patch)
tree3357194e05e6edbac46e2d85e4e98ef0d388480e
parent5ad241b23b64faf7389c424bdecd3489338fa1ba (diff)
Close connection in the amqp driver.
bytes.Buffer update in the beanstalk driver Signed-off-by: Valery Piashchynski <[email protected]>
-rwxr-xr-xMakefile10
-rw-r--r--plugins/amqp/amqpjobs/consumer.go6
-rw-r--r--plugins/amqp/amqpjobs/item.go7
-rw-r--r--plugins/amqp/amqpjobs/redial.go21
-rw-r--r--plugins/beanstalk/consumer.go10
-rw-r--r--plugins/beanstalk/item.go9
-rw-r--r--plugins/jobs/job/job.go11
-rw-r--r--plugins/jobs/job/job_test.go27
-rw-r--r--plugins/jobs/plugin.go22
9 files changed, 44 insertions, 79 deletions
diff --git a/Makefile b/Makefile
index 96df65cc..8390e910 100755
--- a/Makefile
+++ b/Makefile
@@ -15,15 +15,15 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.txt -covermode=atomic ./plugins/websockets
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 784a102c..1bfc4b41 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -420,17 +420,17 @@ func (c *consumer) Resume(_ context.Context, p string) {
}
func (c *consumer) Stop(context.Context) error {
- if atomic.LoadUint32(&c.listeners) > 0 {
- c.stopCh <- struct{}{}
- }
+ c.stopCh <- struct{}{}
pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
Start: time.Now(),
})
+
return nil
}
diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go
index 04385afe..b837ff86 100644
--- a/plugins/amqp/amqpjobs/item.go
+++ b/plugins/amqp/amqpjobs/item.go
@@ -43,17 +43,18 @@ type Options struct {
Delay int64 `json:"delay,omitempty"`
// private
- // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
+ // ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
ack func(multiply bool) error
- // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
+ // nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
// When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
// When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
// This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
nack func(multiply bool, requeue bool) error
// requeueFn used as a pointer to the push function
- requeueFn func(context.Context, *Item) error
+ requeueFn func(context.Context, *Item) error
+ // delayed jobs TODO(rustatian): figure out how to get stats from the DLX
delayed *int64
multipleAsk bool
requeue bool
diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go
index 8d21784f..698a34a6 100644
--- a/plugins/amqp/amqpjobs/redial.go
+++ b/plugins/amqp/amqpjobs/redial.go
@@ -113,25 +113,22 @@ func (c *consumer) redialer() { //nolint:gocognit
c.Unlock()
case <-c.stopCh:
- if c.publishChan != nil {
- pch := <-c.publishChan
- err := pch.Close()
- if err != nil {
- c.log.Error("publish channel close", "error", err)
- }
+ pch := <-c.publishChan
+ err := pch.Close()
+ if err != nil {
+ c.log.Error("publish channel close", "error", err)
}
if c.consumeChan != nil {
- err := c.consumeChan.Close()
+ err = c.consumeChan.Close()
if err != nil {
c.log.Error("consume channel close", "error", err)
}
}
- if c.conn != nil {
- err := c.conn.Close()
- if err != nil {
- c.log.Error("amqp connection close", "error", err)
- }
+
+ err = c.conn.Close()
+ if err != nil {
+ c.log.Error("amqp connection close", "error", err)
}
return
diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
index 5ef89983..dc2a7e91 100644
--- a/plugins/beanstalk/consumer.go
+++ b/plugins/beanstalk/consumer.go
@@ -3,6 +3,7 @@ package beanstalk
import (
"bytes"
"context"
+ "encoding/gob"
"strconv"
"strings"
"sync/atomic"
@@ -183,11 +184,16 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error {
bb := new(bytes.Buffer)
bb.Grow(64)
- err := item.pack(bb)
+ err := gob.NewEncoder(bb).Encode(item)
if err != nil {
return errors.E(op, err)
}
+ body := make([]byte, bb.Len())
+ copy(body, bb.Bytes())
+ bb.Reset()
+ bb = nil
+
// https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458
// <pri> is an integer < 2**32. Jobs with smaller priority values will be
// scheduled before jobs with larger priorities. The most urgent priority is 0;
@@ -203,7 +209,7 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout)
+ id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout)
if err != nil {
errD := j.pool.Delete(ctx, id)
if errD != nil {
diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go
index 0a6cd560..03060994 100644
--- a/plugins/beanstalk/item.go
+++ b/plugins/beanstalk/item.go
@@ -125,15 +125,6 @@ func fromJob(job *job.Job) *Item {
}
}
-func (i *Item) pack(b *bytes.Buffer) error {
- err := gob.NewEncoder(b).Encode(i)
- if err != nil {
- return err
- }
-
- return nil
-}
-
func (j *consumer) unpack(id uint64, data []byte, out *Item) error {
err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
if err != nil {
diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go
index 06c3254e..adab2a0a 100644
--- a/plugins/jobs/job/job.go
+++ b/plugins/jobs/job/job.go
@@ -45,17 +45,6 @@ type Options struct {
Delay int64 `json:"delay,omitempty"`
}
-// Merge merges job options.
-func (o *Options) Merge(from *Options) {
- if o.Pipeline == "" {
- o.Pipeline = from.Pipeline
- }
-
- if o.Delay == 0 {
- o.Delay = from.Delay
- }
-}
-
// DelayDuration returns delay duration in a form of time.Duration.
func (o *Options) DelayDuration() time.Duration {
return time.Second * time.Duration(o.Delay)
diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go
index a47151a3..4a95e27d 100644
--- a/plugins/jobs/job/job_test.go
+++ b/plugins/jobs/job/job_test.go
@@ -16,30 +16,3 @@ func TestOptions_DelayDuration2(t *testing.T) {
opts := &Options{Delay: 1}
assert.Equal(t, time.Second, opts.DelayDuration())
}
-
-func TestOptions_Merge(t *testing.T) {
- opts := &Options{}
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- })
-
- assert.Equal(t, "pipeline", opts.Pipeline)
- assert.Equal(t, int64(2), opts.Delay)
-}
-
-func TestOptions_MergeKeepOriginal(t *testing.T) {
- opts := &Options{
- Pipeline: "default",
- Delay: 10,
- }
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- })
-
- assert.Equal(t, "default", opts.Pipeline)
- assert.Equal(t, int64(10), opts.Delay)
-}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 3f3fa196..f411e9a0 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -455,7 +455,6 @@ func (p *Plugin) Push(j *job.Job) error {
}
// if job has no priority, inherit it from the pipeline
- // TODO(rustatian) merge all options, not only priority
if j.Options.Priority == 0 {
j.Options.Priority = ppl.Priority()
}
@@ -470,9 +469,9 @@ func (p *Plugin) Push(j *job.Job) error {
ID: j.Ident,
Pipeline: ppl.Name(),
Driver: ppl.Driver(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
- Error: err,
})
return errors.E(op, err)
}
@@ -482,9 +481,9 @@ func (p *Plugin) Push(j *job.Job) error {
ID: j.Ident,
Pipeline: ppl.Name(),
Driver: ppl.Driver(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
- Error: err,
})
return nil
@@ -492,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error {
func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
+ start := time.Now()
for i := 0; i < len(j); i++ {
- start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
if !ok {
@@ -616,6 +615,7 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
// save the pipeline
p.pipelines.Store(pipeline.Name(), pipeline)
+ p.log.Debug("pipeline declared", "driver", pipeline.Driver(), "name", pipeline.Name())
return nil
}
@@ -638,11 +638,19 @@ func (p *Plugin) Destroy(pp string) error {
// delete consumer
delete(p.consumers, ppl.Name())
- p.pipelines.Delete(pp)
+ // delete old pipeline
+ p.pipelines.LoadAndDelete(pp)
+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- defer cancel()
+ err := d.Stop(ctx)
+ if err != nil {
+ cancel()
+ return errors.E(op, err)
+ }
- return d.Stop(ctx)
+ d = nil
+ cancel()
+ return nil
}
func (p *Plugin) List() []string {