diff options
-rw-r--r-- | plugins/jobs/drivers/amqp/config.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 92 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 13 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 44 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 9 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 1 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 86 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-init.yaml | 41 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-test.yaml | 106 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_plugin_test.go | 80 |
13 files changed, 205 insertions, 301 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go index 7befb3c8..2a1aed20 100644 --- a/plugins/jobs/drivers/amqp/config.go +++ b/plugins/jobs/drivers/amqp/config.go @@ -2,13 +2,15 @@ package amqp // pipeline rabbitmq info const ( - exchangeKey string = "exchange" - exchangeType string = "exchange-type" - queue string = "queue" - routingKey string = "routing-key" - prefetch string = "prefetch" - exclusive string = "exclusive" - priority string = "priority" + exchangeKey string = "exchange" + exchangeType string = "exchange-type" + queue string = "queue" + routingKey string = "routing-key" + prefetch string = "prefetch" + exclusive string = "exclusive" + priority string = "priority" + multipleAsk string = "multiple_ask" + requeueOnFail string = "requeue_on_fail" dlx string = "x-dead-letter-exchange" dlxRoutingKey string = "x-dead-letter-routing-key" @@ -24,13 +26,15 @@ type GlobalCfg struct { // Config is used to parse pipeline configuration type Config struct { - PrefetchCount int `mapstructure:"pipeline_size"` + Prefetch int `mapstructure:"prefetch"` Queue string `mapstructure:"queue"` Priority int64 `mapstructure:"priority"` Exchange string `mapstructure:"exchange"` ExchangeType string `mapstructure:"exchange_type"` RoutingKey string `mapstructure:"routing_key"` Exclusive bool `mapstructure:"exclusive"` + MultipleAck bool `mapstructure:"multiple_ask"` + RequeueOnFail bool `mapstructure:"requeue_on_fail"` } func (c *Config) InitDefault() { @@ -42,8 +46,8 @@ func (c *Config) InitDefault() { c.Exchange = "default" } - if c.PrefetchCount == 0 { - c.PrefetchCount = 100 + if c.Prefetch == 0 { + c.Prefetch = 100 } if c.Priority == 0 { diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 6def138e..d592a17a 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -29,17 +29,19 @@ type JobsConsumer struct { conn *amqp.Connection consumeChan *amqp.Channel publishChan *amqp.Channel + consumeID string + connStr string retryTimeout time.Duration - prefetchCount int + prefetch int priority int64 exchangeName string queue string exclusive bool - consumeID string - connStr string exchangeType string routingKey string + multipleAck bool + requeueOnFail bool delayCache map[string]struct{} @@ -53,17 +55,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information - jb := &JobsConsumer{ - log: log, - pq: pq, - eh: e, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - // TODO to config - retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), - } - // if no such key - error if !cfg.Has(configKey) { return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) @@ -74,7 +65,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) } - // PARSE CONFIGURATION ------- + // PARSE CONFIGURATION START ------- var pipeCfg Config var globalCfg GlobalCfg @@ -91,16 +82,28 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, } globalCfg.InitDefault() + // PARSE CONFIGURATION END ------- - jb.routingKey = pipeCfg.RoutingKey - jb.queue = pipeCfg.Queue - jb.exchangeType = pipeCfg.ExchangeType - jb.exchangeName = pipeCfg.Exchange - jb.prefetchCount = pipeCfg.PrefetchCount - jb.exclusive = pipeCfg.Exclusive - jb.priority = pipeCfg.Priority - - // PARSE CONFIGURATION ------- + jb := &JobsConsumer{ + log: log, + pq: pq, + eh: e, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + // TODO to config + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + priority: pipeCfg.Priority, + + routingKey: pipeCfg.RoutingKey, + queue: pipeCfg.Queue, + exchangeType: pipeCfg.ExchangeType, + exchangeName: pipeCfg.Exchange, + prefetch: pipeCfg.Prefetch, + exclusive: pipeCfg.Exclusive, + multipleAck: pipeCfg.MultipleAck, + requeueOnFail: pipeCfg.RequeueOnFail, + } jb.conn, err = amqp.Dial(globalCfg.Addr) if err != nil { @@ -131,15 +134,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information - jb := &JobsConsumer{ - log: log, - eh: e, - pq: pq, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), - } // only global section if !cfg.Has(pluginName) { @@ -156,16 +150,28 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con globalCfg.InitDefault() - jb.routingKey = pipeline.String(routingKey, "") - jb.queue = pipeline.String(queue, "default") - jb.exchangeType = pipeline.String(exchangeType, "direct") - jb.exchangeName = pipeline.String(exchangeKey, "amqp.default") - jb.prefetchCount = pipeline.Int(prefetch, 10) - jb.priority = int64(pipeline.Int(priority, 10)) - jb.exclusive = pipeline.Bool(exclusive, true) - // PARSE CONFIGURATION ------- + jb := &JobsConsumer{ + log: log, + eh: e, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + + routingKey: pipeline.String(routingKey, ""), + queue: pipeline.String(queue, "default"), + exchangeType: pipeline.String(exchangeType, "direct"), + exchangeName: pipeline.String(exchangeKey, "amqp.default"), + prefetch: pipeline.Int(prefetch, 10), + priority: int64(pipeline.Int(priority, 10)), + exclusive: pipeline.Bool(exclusive, true), + multipleAck: pipeline.Bool(multipleAsk, false), + requeueOnFail: pipeline.Bool(requeueOnFail, false), + } + jb.conn, err = amqp.Dial(globalCfg.Addr) if err != nil { return nil, errors.E(op, err) @@ -315,7 +321,7 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { return errors.E(op, err) } - err = j.consumeChan.Qos(j.prefetchCount, 0, false) + err = j.consumeChan.Qos(j.prefetch, 0, false) if err != nil { return errors.E(op, err) } @@ -409,7 +415,7 @@ func (j *JobsConsumer) Resume(p string) { return } - err = j.consumeChan.Qos(j.prefetchCount, 0, false) + err = j.consumeChan.Qos(j.prefetch, 0, false) if err != nil { j.log.Error("qos set failed", "error", err) return diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 6b912dde..bc679037 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -50,6 +50,10 @@ type Options struct { // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout int64 `json:"timeout,omitempty"` + + // private + multipleAsk bool + requeue bool } // DelayDuration returns delay duration in a form of time.Duration. @@ -100,11 +104,11 @@ func (j *Item) Context() ([]byte, error) { } func (j *Item) Ack() error { - return j.AckFunc(false) + return j.AckFunc(j.Options.multipleAsk) } func (j *Item) Nack() error { - return j.NackFunc(false, false) + return j.NackFunc(false, j.Options.requeue) } func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { @@ -157,7 +161,10 @@ func pack(id string, j *Item) (amqp.Table, error) { // unpack restores jobs.Options func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) { - item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}} + item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ + multipleAsk: j.multipleAck, + requeue: j.requeueOnFail, + }} if _, ok := d.Headers[job.RRID].(string); !ok { return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID)) diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 62301bed..fc659902 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -1,7 +1,7 @@ package beanstalk import ( - "strings" + "net" "sync" "time" @@ -64,6 +64,9 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint errN := cp.checkAndRedial(err) if errN != nil { return 0, errN + } else { + // retry put only when we redialed + return cp.t.Put(body, pri, delay, ttr) } } @@ -83,12 +86,14 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error id, body, err := cp.ts.Reserve(reserveTimeout) if err != nil { + // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { return 0, nil, errN + } else { + // retry Reserve only when we redialed + return cp.ts.Reserve(reserveTimeout) } - - return 0, nil, err } return id, body, nil @@ -100,12 +105,14 @@ func (cp *ConnPool) Delete(id uint64) error { err := cp.conn.Delete(id) if err != nil { + // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { return errN + } else { + // retry Delete only when we redialed + return cp.conn.Delete(id) } - - return err } return nil } @@ -156,15 +163,29 @@ func (cp *ConnPool) redial() error { return nil } -var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} +var connErrors = map[string]struct{}{"EOF": {}} func (cp *ConnPool) checkAndRedial(err error) error { const op = errors.Op("connection_pool_check_redial") + switch et := err.(type) { + + // check if the error + case beanstalk.ConnError: + switch bErr := et.Err.(type) { + case *net.OpError: + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", bErr, errR)) + } - for _, errStr := range connErrors { - if connErr, ok := err.(beanstalk.ConnError); ok { - // if error is related to the broken connection - redial - if strings.Contains(errStr, connErr.Err.Error()) { + // if redial was successful -> continue listening + return nil + default: + if _, ok := connErrors[et.Err.Error()]; ok { + // if error is related to the broken connection - redial cp.RUnlock() errR := cp.redial() cp.RLock() @@ -178,5 +199,6 @@ func (cp *ConnPool) checkAndRedial(err error) error { } } - return nil + // return initial error + return err } diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 1c2e9781..1490e587 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -224,7 +224,9 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { func (j *JobConsumer) Stop() error { pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.stopCh <- struct{}{} + if atomic.LoadUint32(&j.listeners) == 1 { + j.stopCh <- struct{}{} + } j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index 2c2873c2..b797fc12 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -100,7 +100,7 @@ func (i *Item) Ack() error { } func (i *Item) Nack() error { - return nil + return i.Options.conn.Delete(i.Options.id) } func fromJob(job *job.Job) *Item { diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index ec0b5ca8..0f98312a 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -1,5 +1,7 @@ package beanstalk +import "github.com/beanstalkd/go-beanstalk" + func (j *JobConsumer) listen() { for { select { @@ -9,6 +11,13 @@ func (j *JobConsumer) listen() { default: id, body, err := j.pool.Reserve(j.reserveTimeout) if err != nil { + if errB, ok := err.(beanstalk.ConnError); ok { + switch errB.Err { + case beanstalk.ErrTimeout: + j.log.Info("beanstalk reserve timeout", "warn", errB.Op) + continue + } + } // in case of other error - continue j.log.Error("beanstalk reserve", "error", err) continue diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 18546715..43617716 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -101,7 +101,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure sessionToken: globalCfg.SessionToken, secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, - pauseCh: make(chan struct{}), + pauseCh: make(chan struct{}, 1), } // PARSE CONFIGURATION ------- @@ -209,7 +209,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf sessionToken: globalCfg.SessionToken, secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, - pauseCh: make(chan struct{}), + pauseCh: make(chan struct{}, 1), } // PARSE CONFIGURATION ------- diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index 5722c19a..8c5d887e 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -22,6 +22,7 @@ func (j *JobConsumer) listen() { //nolint:gocognit for { select { case <-j.pauseCh: + j.log.Warn("sqs listener stopped") return default: message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{ diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 47d31d99..c8973f1e 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -3,9 +3,7 @@ package jobs import ( "context" "fmt" - "runtime" "sync" - "sync/atomic" "time" endure "github.com/spiral/endure/pkg/container" @@ -24,11 +22,12 @@ import ( ) const ( - // RrJobs env variable - RrJobs string = "rr_jobs" - PluginName string = "jobs" + // RrMode env variable + RrMode string = "RR_MODE" + RrModeJobs string = "jobs" - pipelines string = "pipelines" + PluginName string = "jobs" + pipelines string = "pipelines" ) type Plugin struct { @@ -54,7 +53,10 @@ type Plugin struct { // initial set of the pipelines to consume consume map[string]struct{} + // signal channel to stop the pollers stopCh chan struct{} + + pldPool sync.Pool } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -79,6 +81,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) p.stopCh = make(chan struct{}, 1) + p.pldPool = sync.Pool{New: func() interface{} { + // with nil fields + return payload.Payload{} + }} // initial set of pipelines for i := range p.cfg.Pipelines { @@ -98,6 +104,16 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return nil } +func (p *Plugin) getPayload() payload.Payload { + return p.pldPool.Get().(payload.Payload) +} + +func (p *Plugin) putPayload(pld payload.Payload) { + pld.Body = nil + pld.Context = nil + p.pldPool.Put(pld) +} + func (p *Plugin) Serve() chan error { //nolint:gocognit errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") @@ -161,29 +177,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit }) var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"}) if err != nil { errCh <- err return errCh } - // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- - var rate uint64 - go func() { - tt := time.NewTicker(time.Second * 1) - for { //nolint:gosimple - select { - case <-tt.C: - fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate)) - fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine()) - fmt.Printf("---> curr len: %d\n", p.queue.Len()) - atomic.StoreUint64(&rate, 0) - } - } - }() - - // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <----------------------------------------------------- - // start listening go func() { for i := uint8(0); i < p.cfg.NumPollers; i++ { @@ -194,9 +193,18 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.log.Debug("------> job poller stopped <------") return default: - // get data JOB from the queue + // get prioritized JOB from the queue jb := p.queue.ExtractMin() + // parse the context + // for the each job, context contains: + /* + 1. Job class + 2. Job ID provided from the outside + 3. Job Headers map[string][]string + 4. Timeout in seconds + 5. Pipeline name + */ ctx, err := jb.Context() if err != nil { errNack := jb.Nack() @@ -207,40 +215,44 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } - exec := payload.Payload{ - Context: ctx, - Body: jb.Body(), - } - - // protect from the pool reset - p.RLock() + // get payload from the sync.Pool + exec := p.getPayload() + exec.Body = jb.Body() + exec.Context = ctx // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- + // remove in tests p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context)) + // protect from the pool reset + p.RLock() resp, err := p.workersPool.Exec(exec) + p.RUnlock() if err != nil { errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) } - p.RUnlock() p.log.Error("job execute", "error", err) + + p.putPayload(exec) continue } - p.RUnlock() // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- + // remove in tests p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context)) errAck := jb.Ack() if errAck != nil { p.log.Error("acknowledge failed", "error", errAck) + p.putPayload(exec) continue } - // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- - atomic.AddUint64(&rate, 1) + + // return payload + p.putPayload(exec) } } }() @@ -301,7 +313,7 @@ func (p *Plugin) Reset() error { p.workersPool = nil var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}, p.collectJobsEvents) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents) if err != nil { return errors.E(op, err) } diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index e31e4441..550e55cc 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -9,11 +9,16 @@ server: amqp: addr: amqp://guest:guest@localhost:5672/ - # beanstalk configuration +# beanstalk configuration +# beanstalk: + # beanstalk address addr: tcp://localhost:11300 + # connect timeout + timeout: 10s - # amazon sqs configuration +# amazon sqs configuration +# General section sqs: key: api-key secret: api-secret @@ -42,32 +47,47 @@ jobs: test-local: driver: ephemeral priority: 10 - pipeline_size: 10000 + prefetch: 10000 test-local-2: driver: ephemeral priority: 1 - pipeline_size: 10000 + prefetch: 10000 test-local-3: driver: ephemeral priority: 2 - pipeline_size: 10000 + prefetch: 10000 test-1: driver: amqp - priority: 1 - pipeline_size: 1000000 + # QoS + prefetch: 1000000 + # Queue name queue: test-1-queue + # Pipeline jobs priority, 1 - highest + priority: 1 + # Exchange exchange: default - exclusive: false + # Exchange type: direct, topic, fanout exchange_type: direct + # Routing key for the queue routing_key: test + # Declare a queue exclusive at the exchange + exclusive: false + # When multiple is true, this delivery and all prior unacknowledged deliveries + # on the same channel will be acknowledged. This is useful for batch processing + # of deliveries + multiple_ack: false + # When multiple is true, this delivery and all prior unacknowledged deliveries + # on the same channel will be acknowledged. This is useful for batch processing + # of deliveries + requeue_on_fail: false test-2-amqp: driver: amqp priority: 2 - pipeline_size: 100000 + prefetch: 1000000 queue: test-2-queue exchange: default exchange_type: direct @@ -77,11 +97,10 @@ jobs: driver: beanstalk priority: 11 tube: default - pipeline_size: 1000000 test-3: driver: sqs - pipeline_size: 1000000 + prefetch: 1000000 queue: default attributes: MessageRetentionPeriod: 86400 diff --git a/tests/plugins/jobs/configs/.rr-jobs-test.yaml b/tests/plugins/jobs/configs/.rr-jobs-test.yaml deleted file mode 100644 index 8213d72a..00000000 --- a/tests/plugins/jobs/configs/.rr-jobs-test.yaml +++ /dev/null @@ -1,106 +0,0 @@ -rpc: - listen: unix:///tmp/rr.sock - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:guest@localhost:5672/ - - # beanstalk configuration -beanstalk: - addr: tcp://localhost:11300 - - # amazon sqs configuration -sqs: - key: api-key - secret: api-secret - region: us-west-1 - endpoint: http://localhost:9324 - session_token: "" - ping_period: 10 - attributes: - MessageRetentionPeriod: 86400 - -logs: - level: info - encoding: console - mode: development - -jobs: - # num logical cores by default - num_pollers: 64 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 20 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: -# test-local: -# driver: ephemeral -# priority: 10 -# pipeline_size: 10000 -# -# test-local-2: -# driver: ephemeral -# priority: 1 -# pipeline_size: 10000 -# -# test-local-3: -# driver: ephemeral -# priority: 2 -# pipeline_size: 10000 -# -# test-1: -# driver: amqp -# priority: 1 -# pipeline_size: 1000000 -# queue: test-1-queue -# exchange: default -# exchange_type: direct -# routing_key: test -# -# test-4: -# driver: amqp -# priority: 1 -# pipeline_size: 1000000 -# queue: test-1-queue -# exchange: default -# exchange_type: direct -# routing_key: test -# -# test-2-amqp: -# driver: amqp -# priority: 2 -# pipeline_size: 100000 -# queue: test-2-queue -# exchange: default -# exchange_type: direct -# routing_key: test-2 - - test-2: - driver: beanstalk - priority: 11 - tube: default - pipeline_size: 1000000 - reserve_timeout: 10s - -# test-3: -# driver: sqs -# pipeline_size: 1000000 -# queue: default -# attributes: -# MessageRetentionPeriod: 86400 -# tags: -# test: "tag" - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3", "test-2" ] - diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go index d7c1075f..c10ac350 100644 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -17,9 +17,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" @@ -30,80 +28,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestTEMP_INTI(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-jobs-test.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &ephemeral.Plugin{}, - &sqs.Plugin{}, - &amqp.Plugin{}, - &beanstalk.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - stopCh <- struct{}{} - wg.Wait() -} - func TestJobsInit(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) @@ -262,6 +186,10 @@ func TestJobsPauseResume(t *testing.T) { mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + // TODO delete + mockLogger.EXPECT().Debug("request", "body:", gomock.Any(), "context:", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("response", "body:", gomock.Any(), "context:", gomock.Any()).AnyTimes() + err = cont.RegisterAll( cfg, &server.Plugin{}, |