diff options
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/drivers/amqp/config.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 92 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 13 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 44 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 9 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 1 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 86 |
10 files changed, 171 insertions, 108 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go index 7befb3c8..2a1aed20 100644 --- a/plugins/jobs/drivers/amqp/config.go +++ b/plugins/jobs/drivers/amqp/config.go @@ -2,13 +2,15 @@ package amqp // pipeline rabbitmq info const ( - exchangeKey string = "exchange" - exchangeType string = "exchange-type" - queue string = "queue" - routingKey string = "routing-key" - prefetch string = "prefetch" - exclusive string = "exclusive" - priority string = "priority" + exchangeKey string = "exchange" + exchangeType string = "exchange-type" + queue string = "queue" + routingKey string = "routing-key" + prefetch string = "prefetch" + exclusive string = "exclusive" + priority string = "priority" + multipleAsk string = "multiple_ask" + requeueOnFail string = "requeue_on_fail" dlx string = "x-dead-letter-exchange" dlxRoutingKey string = "x-dead-letter-routing-key" @@ -24,13 +26,15 @@ type GlobalCfg struct { // Config is used to parse pipeline configuration type Config struct { - PrefetchCount int `mapstructure:"pipeline_size"` + Prefetch int `mapstructure:"prefetch"` Queue string `mapstructure:"queue"` Priority int64 `mapstructure:"priority"` Exchange string `mapstructure:"exchange"` ExchangeType string `mapstructure:"exchange_type"` RoutingKey string `mapstructure:"routing_key"` Exclusive bool `mapstructure:"exclusive"` + MultipleAck bool `mapstructure:"multiple_ask"` + RequeueOnFail bool `mapstructure:"requeue_on_fail"` } func (c *Config) InitDefault() { @@ -42,8 +46,8 @@ func (c *Config) InitDefault() { c.Exchange = "default" } - if c.PrefetchCount == 0 { - c.PrefetchCount = 100 + if c.Prefetch == 0 { + c.Prefetch = 100 } if c.Priority == 0 { diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 6def138e..d592a17a 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -29,17 +29,19 @@ type JobsConsumer struct { conn *amqp.Connection consumeChan *amqp.Channel publishChan *amqp.Channel + consumeID string + connStr string retryTimeout time.Duration - prefetchCount int + prefetch int priority int64 exchangeName string queue string exclusive bool - consumeID string - connStr string exchangeType string routingKey string + multipleAck bool + requeueOnFail bool delayCache map[string]struct{} @@ -53,17 +55,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information - jb := &JobsConsumer{ - log: log, - pq: pq, - eh: e, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - // TODO to config - retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), - } - // if no such key - error if !cfg.Has(configKey) { return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) @@ -74,7 +65,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) } - // PARSE CONFIGURATION ------- + // PARSE CONFIGURATION START ------- var pipeCfg Config var globalCfg GlobalCfg @@ -91,16 +82,28 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, } globalCfg.InitDefault() + // PARSE CONFIGURATION END ------- - jb.routingKey = pipeCfg.RoutingKey - jb.queue = pipeCfg.Queue - jb.exchangeType = pipeCfg.ExchangeType - jb.exchangeName = pipeCfg.Exchange - jb.prefetchCount = pipeCfg.PrefetchCount - jb.exclusive = pipeCfg.Exclusive - jb.priority = pipeCfg.Priority - - // PARSE CONFIGURATION ------- + jb := &JobsConsumer{ + log: log, + pq: pq, + eh: e, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + // TODO to config + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + priority: pipeCfg.Priority, + + routingKey: pipeCfg.RoutingKey, + queue: pipeCfg.Queue, + exchangeType: pipeCfg.ExchangeType, + exchangeName: pipeCfg.Exchange, + prefetch: pipeCfg.Prefetch, + exclusive: pipeCfg.Exclusive, + multipleAck: pipeCfg.MultipleAck, + requeueOnFail: pipeCfg.RequeueOnFail, + } jb.conn, err = amqp.Dial(globalCfg.Addr) if err != nil { @@ -131,15 +134,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information - jb := &JobsConsumer{ - log: log, - eh: e, - pq: pq, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), - } // only global section if !cfg.Has(pluginName) { @@ -156,16 +150,28 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con globalCfg.InitDefault() - jb.routingKey = pipeline.String(routingKey, "") - jb.queue = pipeline.String(queue, "default") - jb.exchangeType = pipeline.String(exchangeType, "direct") - jb.exchangeName = pipeline.String(exchangeKey, "amqp.default") - jb.prefetchCount = pipeline.Int(prefetch, 10) - jb.priority = int64(pipeline.Int(priority, 10)) - jb.exclusive = pipeline.Bool(exclusive, true) - // PARSE CONFIGURATION ------- + jb := &JobsConsumer{ + log: log, + eh: e, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + + routingKey: pipeline.String(routingKey, ""), + queue: pipeline.String(queue, "default"), + exchangeType: pipeline.String(exchangeType, "direct"), + exchangeName: pipeline.String(exchangeKey, "amqp.default"), + prefetch: pipeline.Int(prefetch, 10), + priority: int64(pipeline.Int(priority, 10)), + exclusive: pipeline.Bool(exclusive, true), + multipleAck: pipeline.Bool(multipleAsk, false), + requeueOnFail: pipeline.Bool(requeueOnFail, false), + } + jb.conn, err = amqp.Dial(globalCfg.Addr) if err != nil { return nil, errors.E(op, err) @@ -315,7 +321,7 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { return errors.E(op, err) } - err = j.consumeChan.Qos(j.prefetchCount, 0, false) + err = j.consumeChan.Qos(j.prefetch, 0, false) if err != nil { return errors.E(op, err) } @@ -409,7 +415,7 @@ func (j *JobsConsumer) Resume(p string) { return } - err = j.consumeChan.Qos(j.prefetchCount, 0, false) + err = j.consumeChan.Qos(j.prefetch, 0, false) if err != nil { j.log.Error("qos set failed", "error", err) return diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 6b912dde..bc679037 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -50,6 +50,10 @@ type Options struct { // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout int64 `json:"timeout,omitempty"` + + // private + multipleAsk bool + requeue bool } // DelayDuration returns delay duration in a form of time.Duration. @@ -100,11 +104,11 @@ func (j *Item) Context() ([]byte, error) { } func (j *Item) Ack() error { - return j.AckFunc(false) + return j.AckFunc(j.Options.multipleAsk) } func (j *Item) Nack() error { - return j.NackFunc(false, false) + return j.NackFunc(false, j.Options.requeue) } func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { @@ -157,7 +161,10 @@ func pack(id string, j *Item) (amqp.Table, error) { // unpack restores jobs.Options func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) { - item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}} + item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ + multipleAsk: j.multipleAck, + requeue: j.requeueOnFail, + }} if _, ok := d.Headers[job.RRID].(string); !ok { return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID)) diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 62301bed..fc659902 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -1,7 +1,7 @@ package beanstalk import ( - "strings" + "net" "sync" "time" @@ -64,6 +64,9 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint errN := cp.checkAndRedial(err) if errN != nil { return 0, errN + } else { + // retry put only when we redialed + return cp.t.Put(body, pri, delay, ttr) } } @@ -83,12 +86,14 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error id, body, err := cp.ts.Reserve(reserveTimeout) if err != nil { + // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { return 0, nil, errN + } else { + // retry Reserve only when we redialed + return cp.ts.Reserve(reserveTimeout) } - - return 0, nil, err } return id, body, nil @@ -100,12 +105,14 @@ func (cp *ConnPool) Delete(id uint64) error { err := cp.conn.Delete(id) if err != nil { + // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { return errN + } else { + // retry Delete only when we redialed + return cp.conn.Delete(id) } - - return err } return nil } @@ -156,15 +163,29 @@ func (cp *ConnPool) redial() error { return nil } -var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} +var connErrors = map[string]struct{}{"EOF": {}} func (cp *ConnPool) checkAndRedial(err error) error { const op = errors.Op("connection_pool_check_redial") + switch et := err.(type) { + + // check if the error + case beanstalk.ConnError: + switch bErr := et.Err.(type) { + case *net.OpError: + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", bErr, errR)) + } - for _, errStr := range connErrors { - if connErr, ok := err.(beanstalk.ConnError); ok { - // if error is related to the broken connection - redial - if strings.Contains(errStr, connErr.Err.Error()) { + // if redial was successful -> continue listening + return nil + default: + if _, ok := connErrors[et.Err.Error()]; ok { + // if error is related to the broken connection - redial cp.RUnlock() errR := cp.redial() cp.RLock() @@ -178,5 +199,6 @@ func (cp *ConnPool) checkAndRedial(err error) error { } } - return nil + // return initial error + return err } diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 1c2e9781..1490e587 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -224,7 +224,9 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { func (j *JobConsumer) Stop() error { pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.stopCh <- struct{}{} + if atomic.LoadUint32(&j.listeners) == 1 { + j.stopCh <- struct{}{} + } j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index 2c2873c2..b797fc12 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -100,7 +100,7 @@ func (i *Item) Ack() error { } func (i *Item) Nack() error { - return nil + return i.Options.conn.Delete(i.Options.id) } func fromJob(job *job.Job) *Item { diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index ec0b5ca8..0f98312a 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -1,5 +1,7 @@ package beanstalk +import "github.com/beanstalkd/go-beanstalk" + func (j *JobConsumer) listen() { for { select { @@ -9,6 +11,13 @@ func (j *JobConsumer) listen() { default: id, body, err := j.pool.Reserve(j.reserveTimeout) if err != nil { + if errB, ok := err.(beanstalk.ConnError); ok { + switch errB.Err { + case beanstalk.ErrTimeout: + j.log.Info("beanstalk reserve timeout", "warn", errB.Op) + continue + } + } // in case of other error - continue j.log.Error("beanstalk reserve", "error", err) continue diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 18546715..43617716 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -101,7 +101,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure sessionToken: globalCfg.SessionToken, secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, - pauseCh: make(chan struct{}), + pauseCh: make(chan struct{}, 1), } // PARSE CONFIGURATION ------- @@ -209,7 +209,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf sessionToken: globalCfg.SessionToken, secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, - pauseCh: make(chan struct{}), + pauseCh: make(chan struct{}, 1), } // PARSE CONFIGURATION ------- diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index 5722c19a..8c5d887e 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -22,6 +22,7 @@ func (j *JobConsumer) listen() { //nolint:gocognit for { select { case <-j.pauseCh: + j.log.Warn("sqs listener stopped") return default: message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{ diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 47d31d99..c8973f1e 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -3,9 +3,7 @@ package jobs import ( "context" "fmt" - "runtime" "sync" - "sync/atomic" "time" endure "github.com/spiral/endure/pkg/container" @@ -24,11 +22,12 @@ import ( ) const ( - // RrJobs env variable - RrJobs string = "rr_jobs" - PluginName string = "jobs" + // RrMode env variable + RrMode string = "RR_MODE" + RrModeJobs string = "jobs" - pipelines string = "pipelines" + PluginName string = "jobs" + pipelines string = "pipelines" ) type Plugin struct { @@ -54,7 +53,10 @@ type Plugin struct { // initial set of the pipelines to consume consume map[string]struct{} + // signal channel to stop the pollers stopCh chan struct{} + + pldPool sync.Pool } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -79,6 +81,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) p.stopCh = make(chan struct{}, 1) + p.pldPool = sync.Pool{New: func() interface{} { + // with nil fields + return payload.Payload{} + }} // initial set of pipelines for i := range p.cfg.Pipelines { @@ -98,6 +104,16 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return nil } +func (p *Plugin) getPayload() payload.Payload { + return p.pldPool.Get().(payload.Payload) +} + +func (p *Plugin) putPayload(pld payload.Payload) { + pld.Body = nil + pld.Context = nil + p.pldPool.Put(pld) +} + func (p *Plugin) Serve() chan error { //nolint:gocognit errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") @@ -161,29 +177,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit }) var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"}) if err != nil { errCh <- err return errCh } - // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- - var rate uint64 - go func() { - tt := time.NewTicker(time.Second * 1) - for { //nolint:gosimple - select { - case <-tt.C: - fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate)) - fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine()) - fmt.Printf("---> curr len: %d\n", p.queue.Len()) - atomic.StoreUint64(&rate, 0) - } - } - }() - - // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- - // start listening go func() { for i := uint8(0); i < p.cfg.NumPollers; i++ { @@ -194,9 +193,18 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.log.Debug("------> job poller stopped <------") return default: - // get data JOB from the queue + // get prioritized JOB from the queue jb := p.queue.ExtractMin() + // parse the context + // for the each job, context contains: + /* + 1. Job class + 2. Job ID provided from the outside + 3. Job Headers map[string][]string + 4. Timeout in seconds + 5. Pipeline name + */ ctx, err := jb.Context() if err != nil { errNack := jb.Nack() @@ -207,40 +215,44 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } - exec := payload.Payload{ - Context: ctx, - Body: jb.Body(), - } - - // protect from the pool reset - p.RLock() + // get payload from the sync.Pool + exec := p.getPayload() + exec.Body = jb.Body() + exec.Context = ctx // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- + // remove in tests p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context)) + // protect from the pool reset + p.RLock() resp, err := p.workersPool.Exec(exec) + p.RUnlock() if err != nil { errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) } - p.RUnlock() p.log.Error("job execute", "error", err) + + p.putPayload(exec) continue } - p.RUnlock() // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- + // remove in tests p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context)) errAck := jb.Ack() if errAck != nil { p.log.Error("acknowledge failed", "error", errAck) + p.putPayload(exec) continue } - // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- - atomic.AddUint64(&rate, 1) + + // return payload + p.putPayload(exec) } } }() @@ -301,7 +313,7 @@ func (p *Plugin) Reset() error { p.workersPool = nil var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}, p.collectJobsEvents) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents) if err != nil { return errors.E(op, err) } |