diff options
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go | 2 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/redial.go | 4 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 41 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/item.go | 45 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 91 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_beanstalk_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_boltdb_test.go | 150 |
7 files changed, 163 insertions, 172 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 578f36ce..784a102c 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -479,7 +479,7 @@ func (c *consumer) handleItem(ctx context.Context, msg *Item) error { err = pch.Publish(c.exchangeName, tmpQ, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, - Timestamp: time.Now().UTC(), + Timestamp: time.Now(), DeliveryMode: amqp.Persistent, Body: msg.Body(), }) diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 56142e2b..8d21784f 100644 --- a/plugins/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -27,7 +27,7 @@ func (c *consumer) redialer() { //nolint:gocognit // trash the broken publishing channel <-c.publishChan - t := time.Now() + t := time.Now().UTC() pipe := c.pipeline.Load().(*pipeline.Pipeline) c.eh.Push(events.JobEvent{ @@ -35,7 +35,7 @@ func (c *consumer) redialer() { //nolint:gocognit Pipeline: pipe.Name(), Driver: pipe.Driver(), Error: err, - Start: time.Now(), + Start: time.Now().UTC(), }) expb := backoff.NewExponentialBackOff() diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index 2492ab60..ed0eda61 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -65,7 +65,6 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e } conf := &GlobalCfg{} - err := cfg.UnmarshalKey(PluginName, conf) if err != nil { return nil, errors.E(op, err) @@ -246,49 +245,45 @@ func (c *consumer) Push(_ context.Context, job *job.Job) error { const op = errors.Op("boltdb_jobs_push") err := c.db.Update(func(tx *bolt.Tx) error { item := fromJob(job) + // pool with buffers + buf := c.get() + // encode the job + enc := gob.NewEncoder(buf) + err := enc.Encode(item) + if err != nil { + c.put(buf) + return errors.E(op, err) + } + + value := make([]byte, buf.Len()) + copy(value, buf.Bytes()) + c.put(buf) // handle delay if item.Options.Delay > 0 { b := tx.Bucket(utils.AsBytes(DelayBucket)) - tKey := time.Now().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) - - // pool with buffers - buf := c.get() - defer c.put(buf) + tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) - enc := gob.NewEncoder(buf) - err := enc.Encode(item) + err = b.Put(utils.AsBytes(tKey), value) if err != nil { return errors.E(op, err) } - value := make([]byte, buf.Len()) - copy(value, buf.Bytes()) - atomic.AddUint64(c.delayed, 1) - return b.Put(utils.AsBytes(tKey), value) + return nil } b := tx.Bucket(utils.AsBytes(PushBucket)) - - // pool with buffers - buf := c.get() - defer c.put(buf) - - enc := gob.NewEncoder(buf) - err := enc.Encode(item) + err = b.Put(utils.AsBytes(item.ID()), value) if err != nil { return errors.E(op, err) } - value := make([]byte, buf.Len()) - copy(value, buf.Bytes()) - // increment active counter atomic.AddUint64(c.active, 1) - return b.Put(utils.AsBytes(item.ID()), value) + return nil }) if err != nil { diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go index 4f02bb43..837f8c63 100644 --- a/plugins/boltdb/boltjobs/item.go +++ b/plugins/boltdb/boltjobs/item.go @@ -43,8 +43,7 @@ type Options struct { Delay int64 `json:"delay,omitempty"` // private - db *bbolt.DB - + db *bbolt.DB active *uint64 delayed *uint64 } @@ -137,6 +136,17 @@ func (i *Item) Nack() error { return tx.Commit() } +/* +Requeue algorithm: +1. Rewrite item headers and delay. +2. Begin writable transaction on attached to the item db. +3. Delete item from the InQueueBucket +4. Handle items with the delay: + 4.1. Get DelayBucket + 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format + 4.3. Put this key with value to the DelayBucket +5. W/o delay, put the key with value to the PushBucket (requeue) +*/ func (i *Item) Requeue(headers map[string][]string, delay int64) error { const op = errors.Op("boltdb_item_requeue") i.Headers = headers @@ -153,23 +163,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { return errors.E(op, i.rollback(err, tx)) } + // encode the item + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err = enc.Encode(i) + val := make([]byte, buf.Len()) + copy(val, buf.Bytes()) + buf.Reset() + if delay > 0 { delayB := tx.Bucket(utils.AsBytes(DelayBucket)) - tKey := time.Now().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) - - buf := new(bytes.Buffer) - enc := gob.NewEncoder(buf) - err = enc.Encode(i) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } + tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) - err = delayB.Put(utils.AsBytes(tKey), buf.Bytes()) if err != nil { return errors.E(op, i.rollback(err, tx)) } - err = inQb.Delete(utils.AsBytes(i.ID())) + err = delayB.Put(utils.AsBytes(tKey), val) if err != nil { return errors.E(op, i.rollback(err, tx)) } @@ -178,20 +188,11 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { } pushB := tx.Bucket(utils.AsBytes(PushBucket)) - - buf := new(bytes.Buffer) - enc := gob.NewEncoder(buf) - err = enc.Encode(i) if err != nil { return errors.E(op, i.rollback(err, tx)) } - err = pushB.Put(utils.AsBytes(i.ID()), buf.Bytes()) - if err != nil { - return errors.E(op, i.rollback(err, tx)) - } - - err = inQb.Delete(utils.AsBytes(i.ID())) + err = pushB.Put(utils.AsBytes(i.ID()), val) if err != nil { return errors.E(op, i.rollback(err, tx)) } diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index d184303a..39de34ab 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -16,7 +16,7 @@ func (c *consumer) listener() { for { select { case <-c.stopCh: - c.log.Warn("boltdb listener stopped") + c.log.Info("boltdb listener stopped") return case <-tt.C: if atomic.LoadUint64(c.active) >= uint64(c.prefetch) { @@ -78,12 +78,22 @@ func (c *consumer) listener() { } func (c *consumer) delayedJobsListener() { - tt := time.NewTicker(time.Millisecond * 100) + tt := time.NewTicker(time.Millisecond * 10) defer tt.Stop() + + // just some 90's + loc, err := time.LoadLocation("UTC") + if err != nil { + c.log.Error("failed to load location, delayed jobs won't work", "error", err) + return + } + + var startDate = utils.AsBytes(time.Date(1990, 1, 1, 0, 0, 0, 0, loc).Format(time.RFC3339)) + for { select { case <-c.stopCh: - c.log.Warn("boltdb listener stopped") + c.log.Info("boltdb listener stopped") return case <-tt.C: tx, err := c.db.Begin(true) @@ -95,45 +105,37 @@ func (c *consumer) delayedJobsListener() { delayB := tx.Bucket(utils.AsBytes(DelayBucket)) inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) - // get first item - k, v := delayB.Cursor().First() - if k == nil && v == nil { - _ = tx.Commit() - continue - } - - t, err := time.Parse(time.RFC3339, utils.AsString(k)) - if err != nil { - c.rollback(err, tx) - continue - } - - if t.After(time.Now()) { - _ = tx.Commit() - continue - } - - buf := bytes.NewReader(v) - dec := gob.NewDecoder(buf) - - item := &Item{} - err = dec.Decode(item) - if err != nil { - c.rollback(err, tx) - continue - } - - err = inQb.Put(utils.AsBytes(item.ID()), v) - if err != nil { - c.rollback(err, tx) - continue - } - - // delete key from the PushBucket - err = delayB.Delete(k) - if err != nil { - c.rollback(err, tx) - continue + cursor := delayB.Cursor() + endDate := utils.AsBytes(time.Now().UTC().Format(time.RFC3339)) + + for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() { + buf := bytes.NewReader(v) + dec := gob.NewDecoder(buf) + + item := &Item{} + err = dec.Decode(item) + if err != nil { + c.rollback(err, tx) + continue + } + + err = inQb.Put(utils.AsBytes(item.ID()), v) + if err != nil { + c.rollback(err, tx) + continue + } + + // delete key from the PushBucket + err = delayB.Delete(k) + if err != nil { + c.rollback(err, tx) + continue + } + + // attach pointer to the DB + item.attachDB(c.db, c.active, c.delayed) + // as the last step, after commit, put the item into the PQ + c.pq.Insert(item) } err = tx.Commit() @@ -141,11 +143,6 @@ func (c *consumer) delayedJobsListener() { c.rollback(err, tx) continue } - - // attach pointer to the DB - item.attachDB(c.db, c.active, c.delayed) - // as the last step, after commit, put the item into the PQ - c.pq.Insert(item) } } } diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index 78d154b1..9f4d37ec 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -466,7 +466,7 @@ func TestBeanstalkStats(t *testing.T) { } func TestBeanstalkNoGlobalSection(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go index 15d2bce8..ab36ffa4 100644 --- a/tests/plugins/jobs/jobs_boltdb_test.go +++ b/tests/plugins/jobs/jobs_boltdb_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" @@ -22,6 +23,7 @@ import ( rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -39,30 +41,29 @@ func TestBoltDBInit(t *testing.T) { Path: "boltdb/.rr-boltdb-init.yaml", Prefix: "rr", } - // - //controller := gomock.NewController(t) - //mockLogger := mocks.NewMockLogger(controller) - // - //// general - //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - // - //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - // - //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("boltdb listener stopped").Times(4) err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - //mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -132,30 +133,29 @@ func TestBoltDBDeclare(t *testing.T) { Prefix: "rr", } - //controller := gomock.NewController(t) - //mockLogger := mocks.NewMockLogger(controller) - // - //// general - //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - // - //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - // - //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").Times(2) err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - //mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -233,31 +233,30 @@ func TestBoltDBJobsError(t *testing.T) { Prefix: "rr", } - //controller := gomock.NewController(t) - //mockLogger := mocks.NewMockLogger(controller) - // - //// general - //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - // - //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - // - //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) - //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").Times(2) err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - //mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -364,29 +363,28 @@ func TestBoltDBStats(t *testing.T) { Prefix: "rr", } - //controller := gomock.NewController(t) - //mockLogger := mocks.NewMockLogger(controller) - // - //// general - //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - // - //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").AnyTimes() err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - //mockLogger, + mockLogger, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, |