summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 13:53:19 +0300
committerValery Piashchynski <[email protected]>2021-07-22 13:53:19 +0300
commit05660fcd256963eac94ada90f7baa409344f9e73 (patch)
tree72fe19d7c6b05eda1c5e5cc85cb536878bd8aa24 /plugins
parent182199a6449677a620813e3a8157cd0406095435 (diff)
Update consumers, tests stabilization
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/drivers/amqp/config.go24
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go92
-rw-r--r--plugins/jobs/drivers/amqp/item.go13
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go44
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go4
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go9
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go4
-rw-r--r--plugins/jobs/drivers/sqs/listener.go1
-rw-r--r--plugins/jobs/plugin.go86
10 files changed, 171 insertions, 108 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go
index 7befb3c8..2a1aed20 100644
--- a/plugins/jobs/drivers/amqp/config.go
+++ b/plugins/jobs/drivers/amqp/config.go
@@ -2,13 +2,15 @@ package amqp
// pipeline rabbitmq info
const (
- exchangeKey string = "exchange"
- exchangeType string = "exchange-type"
- queue string = "queue"
- routingKey string = "routing-key"
- prefetch string = "prefetch"
- exclusive string = "exclusive"
- priority string = "priority"
+ exchangeKey string = "exchange"
+ exchangeType string = "exchange-type"
+ queue string = "queue"
+ routingKey string = "routing-key"
+ prefetch string = "prefetch"
+ exclusive string = "exclusive"
+ priority string = "priority"
+ multipleAsk string = "multiple_ask"
+ requeueOnFail string = "requeue_on_fail"
dlx string = "x-dead-letter-exchange"
dlxRoutingKey string = "x-dead-letter-routing-key"
@@ -24,13 +26,15 @@ type GlobalCfg struct {
// Config is used to parse pipeline configuration
type Config struct {
- PrefetchCount int `mapstructure:"pipeline_size"`
+ Prefetch int `mapstructure:"prefetch"`
Queue string `mapstructure:"queue"`
Priority int64 `mapstructure:"priority"`
Exchange string `mapstructure:"exchange"`
ExchangeType string `mapstructure:"exchange_type"`
RoutingKey string `mapstructure:"routing_key"`
Exclusive bool `mapstructure:"exclusive"`
+ MultipleAck bool `mapstructure:"multiple_ask"`
+ RequeueOnFail bool `mapstructure:"requeue_on_fail"`
}
func (c *Config) InitDefault() {
@@ -42,8 +46,8 @@ func (c *Config) InitDefault() {
c.Exchange = "default"
}
- if c.PrefetchCount == 0 {
- c.PrefetchCount = 100
+ if c.Prefetch == 0 {
+ c.Prefetch = 100
}
if c.Priority == 0 {
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 6def138e..d592a17a 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -29,17 +29,19 @@ type JobsConsumer struct {
conn *amqp.Connection
consumeChan *amqp.Channel
publishChan *amqp.Channel
+ consumeID string
+ connStr string
retryTimeout time.Duration
- prefetchCount int
+ prefetch int
priority int64
exchangeName string
queue string
exclusive bool
- consumeID string
- connStr string
exchangeType string
routingKey string
+ multipleAck bool
+ requeueOnFail bool
delayCache map[string]struct{}
@@ -53,17 +55,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
- jb := &JobsConsumer{
- log: log,
- pq: pq,
- eh: e,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- // TODO to config
- retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
- }
-
// if no such key - error
if !cfg.Has(configKey) {
return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
@@ -74,7 +65,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
}
- // PARSE CONFIGURATION -------
+ // PARSE CONFIGURATION START -------
var pipeCfg Config
var globalCfg GlobalCfg
@@ -91,16 +82,28 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
}
globalCfg.InitDefault()
+ // PARSE CONFIGURATION END -------
- jb.routingKey = pipeCfg.RoutingKey
- jb.queue = pipeCfg.Queue
- jb.exchangeType = pipeCfg.ExchangeType
- jb.exchangeName = pipeCfg.Exchange
- jb.prefetchCount = pipeCfg.PrefetchCount
- jb.exclusive = pipeCfg.Exclusive
- jb.priority = pipeCfg.Priority
-
- // PARSE CONFIGURATION -------
+ jb := &JobsConsumer{
+ log: log,
+ pq: pq,
+ eh: e,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ // TODO to config
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+ priority: pipeCfg.Priority,
+
+ routingKey: pipeCfg.RoutingKey,
+ queue: pipeCfg.Queue,
+ exchangeType: pipeCfg.ExchangeType,
+ exchangeName: pipeCfg.Exchange,
+ prefetch: pipeCfg.Prefetch,
+ exclusive: pipeCfg.Exclusive,
+ multipleAck: pipeCfg.MultipleAck,
+ requeueOnFail: pipeCfg.RequeueOnFail,
+ }
jb.conn, err = amqp.Dial(globalCfg.Addr)
if err != nil {
@@ -131,15 +134,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
- jb := &JobsConsumer{
- log: log,
- eh: e,
- pq: pq,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
- }
// only global section
if !cfg.Has(pluginName) {
@@ -156,16 +150,28 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
globalCfg.InitDefault()
- jb.routingKey = pipeline.String(routingKey, "")
- jb.queue = pipeline.String(queue, "default")
- jb.exchangeType = pipeline.String(exchangeType, "direct")
- jb.exchangeName = pipeline.String(exchangeKey, "amqp.default")
- jb.prefetchCount = pipeline.Int(prefetch, 10)
- jb.priority = int64(pipeline.Int(priority, 10))
- jb.exclusive = pipeline.Bool(exclusive, true)
-
// PARSE CONFIGURATION -------
+ jb := &JobsConsumer{
+ log: log,
+ eh: e,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+
+ routingKey: pipeline.String(routingKey, ""),
+ queue: pipeline.String(queue, "default"),
+ exchangeType: pipeline.String(exchangeType, "direct"),
+ exchangeName: pipeline.String(exchangeKey, "amqp.default"),
+ prefetch: pipeline.Int(prefetch, 10),
+ priority: int64(pipeline.Int(priority, 10)),
+ exclusive: pipeline.Bool(exclusive, true),
+ multipleAck: pipeline.Bool(multipleAsk, false),
+ requeueOnFail: pipeline.Bool(requeueOnFail, false),
+ }
+
jb.conn, err = amqp.Dial(globalCfg.Addr)
if err != nil {
return nil, errors.E(op, err)
@@ -315,7 +321,7 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error {
return errors.E(op, err)
}
- err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ err = j.consumeChan.Qos(j.prefetch, 0, false)
if err != nil {
return errors.E(op, err)
}
@@ -409,7 +415,7 @@ func (j *JobsConsumer) Resume(p string) {
return
}
- err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ err = j.consumeChan.Qos(j.prefetch, 0, false)
if err != nil {
j.log.Error("qos set failed", "error", err)
return
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 6b912dde..bc679037 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -50,6 +50,10 @@ type Options struct {
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout int64 `json:"timeout,omitempty"`
+
+ // private
+ multipleAsk bool
+ requeue bool
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -100,11 +104,11 @@ func (j *Item) Context() ([]byte, error) {
}
func (j *Item) Ack() error {
- return j.AckFunc(false)
+ return j.AckFunc(j.Options.multipleAsk)
}
func (j *Item) Nack() error {
- return j.NackFunc(false, false)
+ return j.NackFunc(false, j.Options.requeue)
}
func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
@@ -157,7 +161,10 @@ func pack(id string, j *Item) (amqp.Table, error) {
// unpack restores jobs.Options
func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) {
- item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
+ item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
+ multipleAsk: j.multipleAck,
+ requeue: j.requeueOnFail,
+ }}
if _, ok := d.Headers[job.RRID].(string); !ok {
return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID))
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index 62301bed..fc659902 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -1,7 +1,7 @@
package beanstalk
import (
- "strings"
+ "net"
"sync"
"time"
@@ -64,6 +64,9 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint
errN := cp.checkAndRedial(err)
if errN != nil {
return 0, errN
+ } else {
+ // retry put only when we redialed
+ return cp.t.Put(body, pri, delay, ttr)
}
}
@@ -83,12 +86,14 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
id, body, err := cp.ts.Reserve(reserveTimeout)
if err != nil {
+ // errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
return 0, nil, errN
+ } else {
+ // retry Reserve only when we redialed
+ return cp.ts.Reserve(reserveTimeout)
}
-
- return 0, nil, err
}
return id, body, nil
@@ -100,12 +105,14 @@ func (cp *ConnPool) Delete(id uint64) error {
err := cp.conn.Delete(id)
if err != nil {
+ // errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
return errN
+ } else {
+ // retry Delete only when we redialed
+ return cp.conn.Delete(id)
}
-
- return err
}
return nil
}
@@ -156,15 +163,29 @@ func (cp *ConnPool) redial() error {
return nil
}
-var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"}
+var connErrors = map[string]struct{}{"EOF": {}}
func (cp *ConnPool) checkAndRedial(err error) error {
const op = errors.Op("connection_pool_check_redial")
+ switch et := err.(type) {
+
+ // check if the error
+ case beanstalk.ConnError:
+ switch bErr := et.Err.(type) {
+ case *net.OpError:
+ cp.RUnlock()
+ errR := cp.redial()
+ cp.RLock()
+ // if redial failed - return
+ if errR != nil {
+ return errors.E(op, errors.Errorf("%v:%v", bErr, errR))
+ }
- for _, errStr := range connErrors {
- if connErr, ok := err.(beanstalk.ConnError); ok {
- // if error is related to the broken connection - redial
- if strings.Contains(errStr, connErr.Err.Error()) {
+ // if redial was successful -> continue listening
+ return nil
+ default:
+ if _, ok := connErrors[et.Err.Error()]; ok {
+ // if error is related to the broken connection - redial
cp.RUnlock()
errR := cp.redial()
cp.RLock()
@@ -178,5 +199,6 @@ func (cp *ConnPool) checkAndRedial(err error) error {
}
}
- return nil
+ // return initial error
+ return err
}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 1c2e9781..1490e587 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -224,7 +224,9 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
func (j *JobConsumer) Stop() error {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.stopCh <- struct{}{}
+ if atomic.LoadUint32(&j.listeners) == 1 {
+ j.stopCh <- struct{}{}
+ }
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index 2c2873c2..b797fc12 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -100,7 +100,7 @@ func (i *Item) Ack() error {
}
func (i *Item) Nack() error {
- return nil
+ return i.Options.conn.Delete(i.Options.id)
}
func fromJob(job *job.Job) *Item {
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index ec0b5ca8..0f98312a 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -1,5 +1,7 @@
package beanstalk
+import "github.com/beanstalkd/go-beanstalk"
+
func (j *JobConsumer) listen() {
for {
select {
@@ -9,6 +11,13 @@ func (j *JobConsumer) listen() {
default:
id, body, err := j.pool.Reserve(j.reserveTimeout)
if err != nil {
+ if errB, ok := err.(beanstalk.ConnError); ok {
+ switch errB.Err {
+ case beanstalk.ErrTimeout:
+ j.log.Info("beanstalk reserve timeout", "warn", errB.Op)
+ continue
+ }
+ }
// in case of other error - continue
j.log.Error("beanstalk reserve", "error", err)
continue
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 18546715..43617716 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -101,7 +101,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
sessionToken: globalCfg.SessionToken,
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
- pauseCh: make(chan struct{}),
+ pauseCh: make(chan struct{}, 1),
}
// PARSE CONFIGURATION -------
@@ -209,7 +209,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
sessionToken: globalCfg.SessionToken,
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
- pauseCh: make(chan struct{}),
+ pauseCh: make(chan struct{}, 1),
}
// PARSE CONFIGURATION -------
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index 5722c19a..8c5d887e 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -22,6 +22,7 @@ func (j *JobConsumer) listen() { //nolint:gocognit
for {
select {
case <-j.pauseCh:
+ j.log.Warn("sqs listener stopped")
return
default:
message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 47d31d99..c8973f1e 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -3,9 +3,7 @@ package jobs
import (
"context"
"fmt"
- "runtime"
"sync"
- "sync/atomic"
"time"
endure "github.com/spiral/endure/pkg/container"
@@ -24,11 +22,12 @@ import (
)
const (
- // RrJobs env variable
- RrJobs string = "rr_jobs"
- PluginName string = "jobs"
+ // RrMode env variable
+ RrMode string = "RR_MODE"
+ RrModeJobs string = "jobs"
- pipelines string = "pipelines"
+ PluginName string = "jobs"
+ pipelines string = "pipelines"
)
type Plugin struct {
@@ -54,7 +53,10 @@ type Plugin struct {
// initial set of the pipelines to consume
consume map[string]struct{}
+ // signal channel to stop the pollers
stopCh chan struct{}
+
+ pldPool sync.Pool
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
@@ -79,6 +81,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.consumers = make(map[string]jobs.Consumer)
p.consume = make(map[string]struct{})
p.stopCh = make(chan struct{}, 1)
+ p.pldPool = sync.Pool{New: func() interface{} {
+ // with nil fields
+ return payload.Payload{}
+ }}
// initial set of pipelines
for i := range p.cfg.Pipelines {
@@ -98,6 +104,16 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return nil
}
+func (p *Plugin) getPayload() payload.Payload {
+ return p.pldPool.Get().(payload.Payload)
+}
+
+func (p *Plugin) putPayload(pld payload.Payload) {
+ pld.Body = nil
+ pld.Context = nil
+ p.pldPool.Put(pld)
+}
+
func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
@@ -161,29 +177,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
})
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"})
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"})
if err != nil {
errCh <- err
return errCh
}
- // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
- var rate uint64
- go func() {
- tt := time.NewTicker(time.Second * 1)
- for { //nolint:gosimple
- select {
- case <-tt.C:
- fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate))
- fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine())
- fmt.Printf("---> curr len: %d\n", p.queue.Len())
- atomic.StoreUint64(&rate, 0)
- }
- }
- }()
-
- // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
-
// start listening
go func() {
for i := uint8(0); i < p.cfg.NumPollers; i++ {
@@ -194,9 +193,18 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.log.Debug("------> job poller stopped <------")
return
default:
- // get data JOB from the queue
+ // get prioritized JOB from the queue
jb := p.queue.ExtractMin()
+ // parse the context
+ // for the each job, context contains:
+ /*
+ 1. Job class
+ 2. Job ID provided from the outside
+ 3. Job Headers map[string][]string
+ 4. Timeout in seconds
+ 5. Pipeline name
+ */
ctx, err := jb.Context()
if err != nil {
errNack := jb.Nack()
@@ -207,40 +215,44 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
- exec := payload.Payload{
- Context: ctx,
- Body: jb.Body(),
- }
-
- // protect from the pool reset
- p.RLock()
+ // get payload from the sync.Pool
+ exec := p.getPayload()
+ exec.Body = jb.Body()
+ exec.Context = ctx
// TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
+ // remove in tests
p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context))
+ // protect from the pool reset
+ p.RLock()
resp, err := p.workersPool.Exec(exec)
+ p.RUnlock()
if err != nil {
errNack := jb.Nack()
if errNack != nil {
p.log.Error("negatively acknowledge failed", "error", errNack)
}
- p.RUnlock()
p.log.Error("job execute", "error", err)
+
+ p.putPayload(exec)
continue
}
- p.RUnlock()
// TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
+ // remove in tests
p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context))
errAck := jb.Ack()
if errAck != nil {
p.log.Error("acknowledge failed", "error", errAck)
+ p.putPayload(exec)
continue
}
- // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
- atomic.AddUint64(&rate, 1)
+
+ // return payload
+ p.putPayload(exec)
}
}
}()
@@ -301,7 +313,7 @@ func (p *Plugin) Reset() error {
p.workersPool = nil
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}, p.collectJobsEvents)
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents)
if err != nil {
return errors.E(op, err)
}