summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugins/amqp/amqpjobs/consumer.go2
-rw-r--r--plugins/amqp/amqpjobs/redial.go4
-rw-r--r--plugins/boltdb/boltjobs/consumer.go41
-rw-r--r--plugins/boltdb/boltjobs/item.go45
-rw-r--r--plugins/boltdb/boltjobs/listener.go91
-rw-r--r--tests/plugins/jobs/jobs_beanstalk_test.go2
-rw-r--r--tests/plugins/jobs/jobs_boltdb_test.go150
7 files changed, 163 insertions, 172 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 578f36ce..784a102c 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -479,7 +479,7 @@ func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
err = pch.Publish(c.exchangeName, tmpQ, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
- Timestamp: time.Now().UTC(),
+ Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,
Body: msg.Body(),
})
diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go
index 56142e2b..8d21784f 100644
--- a/plugins/amqp/amqpjobs/redial.go
+++ b/plugins/amqp/amqpjobs/redial.go
@@ -27,7 +27,7 @@ func (c *consumer) redialer() { //nolint:gocognit
// trash the broken publishing channel
<-c.publishChan
- t := time.Now()
+ t := time.Now().UTC()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
c.eh.Push(events.JobEvent{
@@ -35,7 +35,7 @@ func (c *consumer) redialer() { //nolint:gocognit
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Error: err,
- Start: time.Now(),
+ Start: time.Now().UTC(),
})
expb := backoff.NewExponentialBackOff()
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index 2492ab60..ed0eda61 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -65,7 +65,6 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
}
conf := &GlobalCfg{}
-
err := cfg.UnmarshalKey(PluginName, conf)
if err != nil {
return nil, errors.E(op, err)
@@ -246,49 +245,45 @@ func (c *consumer) Push(_ context.Context, job *job.Job) error {
const op = errors.Op("boltdb_jobs_push")
err := c.db.Update(func(tx *bolt.Tx) error {
item := fromJob(job)
+ // pool with buffers
+ buf := c.get()
+ // encode the job
+ enc := gob.NewEncoder(buf)
+ err := enc.Encode(item)
+ if err != nil {
+ c.put(buf)
+ return errors.E(op, err)
+ }
+
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+ c.put(buf)
// handle delay
if item.Options.Delay > 0 {
b := tx.Bucket(utils.AsBytes(DelayBucket))
- tKey := time.Now().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339)
-
- // pool with buffers
- buf := c.get()
- defer c.put(buf)
+ tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339)
- enc := gob.NewEncoder(buf)
- err := enc.Encode(item)
+ err = b.Put(utils.AsBytes(tKey), value)
if err != nil {
return errors.E(op, err)
}
- value := make([]byte, buf.Len())
- copy(value, buf.Bytes())
-
atomic.AddUint64(c.delayed, 1)
- return b.Put(utils.AsBytes(tKey), value)
+ return nil
}
b := tx.Bucket(utils.AsBytes(PushBucket))
-
- // pool with buffers
- buf := c.get()
- defer c.put(buf)
-
- enc := gob.NewEncoder(buf)
- err := enc.Encode(item)
+ err = b.Put(utils.AsBytes(item.ID()), value)
if err != nil {
return errors.E(op, err)
}
- value := make([]byte, buf.Len())
- copy(value, buf.Bytes())
-
// increment active counter
atomic.AddUint64(c.active, 1)
- return b.Put(utils.AsBytes(item.ID()), value)
+ return nil
})
if err != nil {
diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go
index 4f02bb43..837f8c63 100644
--- a/plugins/boltdb/boltjobs/item.go
+++ b/plugins/boltdb/boltjobs/item.go
@@ -43,8 +43,7 @@ type Options struct {
Delay int64 `json:"delay,omitempty"`
// private
- db *bbolt.DB
-
+ db *bbolt.DB
active *uint64
delayed *uint64
}
@@ -137,6 +136,17 @@ func (i *Item) Nack() error {
return tx.Commit()
}
+/*
+Requeue algorithm:
+1. Rewrite item headers and delay.
+2. Begin writable transaction on attached to the item db.
+3. Delete item from the InQueueBucket
+4. Handle items with the delay:
+ 4.1. Get DelayBucket
+ 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format
+ 4.3. Put this key with value to the DelayBucket
+5. W/o delay, put the key with value to the PushBucket (requeue)
+*/
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
const op = errors.Op("boltdb_item_requeue")
i.Headers = headers
@@ -153,23 +163,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return errors.E(op, i.rollback(err, tx))
}
+ // encode the item
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err = enc.Encode(i)
+ val := make([]byte, buf.Len())
+ copy(val, buf.Bytes())
+ buf.Reset()
+
if delay > 0 {
delayB := tx.Bucket(utils.AsBytes(DelayBucket))
- tKey := time.Now().Add(time.Second * time.Duration(delay)).Format(time.RFC3339)
-
- buf := new(bytes.Buffer)
- enc := gob.NewEncoder(buf)
- err = enc.Encode(i)
- if err != nil {
- return errors.E(op, i.rollback(err, tx))
- }
+ tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339)
- err = delayB.Put(utils.AsBytes(tKey), buf.Bytes())
if err != nil {
return errors.E(op, i.rollback(err, tx))
}
- err = inQb.Delete(utils.AsBytes(i.ID()))
+ err = delayB.Put(utils.AsBytes(tKey), val)
if err != nil {
return errors.E(op, i.rollback(err, tx))
}
@@ -178,20 +188,11 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
}
pushB := tx.Bucket(utils.AsBytes(PushBucket))
-
- buf := new(bytes.Buffer)
- enc := gob.NewEncoder(buf)
- err = enc.Encode(i)
if err != nil {
return errors.E(op, i.rollback(err, tx))
}
- err = pushB.Put(utils.AsBytes(i.ID()), buf.Bytes())
- if err != nil {
- return errors.E(op, i.rollback(err, tx))
- }
-
- err = inQb.Delete(utils.AsBytes(i.ID()))
+ err = pushB.Put(utils.AsBytes(i.ID()), val)
if err != nil {
return errors.E(op, i.rollback(err, tx))
}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index d184303a..39de34ab 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -16,7 +16,7 @@ func (c *consumer) listener() {
for {
select {
case <-c.stopCh:
- c.log.Warn("boltdb listener stopped")
+ c.log.Info("boltdb listener stopped")
return
case <-tt.C:
if atomic.LoadUint64(c.active) >= uint64(c.prefetch) {
@@ -78,12 +78,22 @@ func (c *consumer) listener() {
}
func (c *consumer) delayedJobsListener() {
- tt := time.NewTicker(time.Millisecond * 100)
+ tt := time.NewTicker(time.Millisecond * 10)
defer tt.Stop()
+
+ // just some 90's
+ loc, err := time.LoadLocation("UTC")
+ if err != nil {
+ c.log.Error("failed to load location, delayed jobs won't work", "error", err)
+ return
+ }
+
+ var startDate = utils.AsBytes(time.Date(1990, 1, 1, 0, 0, 0, 0, loc).Format(time.RFC3339))
+
for {
select {
case <-c.stopCh:
- c.log.Warn("boltdb listener stopped")
+ c.log.Info("boltdb listener stopped")
return
case <-tt.C:
tx, err := c.db.Begin(true)
@@ -95,45 +105,37 @@ func (c *consumer) delayedJobsListener() {
delayB := tx.Bucket(utils.AsBytes(DelayBucket))
inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
- // get first item
- k, v := delayB.Cursor().First()
- if k == nil && v == nil {
- _ = tx.Commit()
- continue
- }
-
- t, err := time.Parse(time.RFC3339, utils.AsString(k))
- if err != nil {
- c.rollback(err, tx)
- continue
- }
-
- if t.After(time.Now()) {
- _ = tx.Commit()
- continue
- }
-
- buf := bytes.NewReader(v)
- dec := gob.NewDecoder(buf)
-
- item := &Item{}
- err = dec.Decode(item)
- if err != nil {
- c.rollback(err, tx)
- continue
- }
-
- err = inQb.Put(utils.AsBytes(item.ID()), v)
- if err != nil {
- c.rollback(err, tx)
- continue
- }
-
- // delete key from the PushBucket
- err = delayB.Delete(k)
- if err != nil {
- c.rollback(err, tx)
- continue
+ cursor := delayB.Cursor()
+ endDate := utils.AsBytes(time.Now().UTC().Format(time.RFC3339))
+
+ for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() {
+ buf := bytes.NewReader(v)
+ dec := gob.NewDecoder(buf)
+
+ item := &Item{}
+ err = dec.Decode(item)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = inQb.Put(utils.AsBytes(item.ID()), v)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // delete key from the PushBucket
+ err = delayB.Delete(k)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // attach pointer to the DB
+ item.attachDB(c.db, c.active, c.delayed)
+ // as the last step, after commit, put the item into the PQ
+ c.pq.Insert(item)
}
err = tx.Commit()
@@ -141,11 +143,6 @@ func (c *consumer) delayedJobsListener() {
c.rollback(err, tx)
continue
}
-
- // attach pointer to the DB
- item.attachDB(c.db, c.active, c.delayed)
- // as the last step, after commit, put the item into the PQ
- c.pq.Insert(item)
}
}
}
diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go
index 78d154b1..9f4d37ec 100644
--- a/tests/plugins/jobs/jobs_beanstalk_test.go
+++ b/tests/plugins/jobs/jobs_beanstalk_test.go
@@ -466,7 +466,7 @@ func TestBeanstalkStats(t *testing.T) {
}
func TestBeanstalkNoGlobalSection(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go
index 15d2bce8..ab36ffa4 100644
--- a/tests/plugins/jobs/jobs_boltdb_test.go
+++ b/tests/plugins/jobs/jobs_boltdb_test.go
@@ -10,6 +10,7 @@ import (
"testing"
"time"
+ "github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
@@ -22,6 +23,7 @@ import (
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
+ "github.com/spiral/roadrunner/v2/tests/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -39,30 +41,29 @@ func TestBoltDBInit(t *testing.T) {
Path: "boltdb/.rr-boltdb-init.yaml",
Prefix: "rr",
}
- //
- //controller := gomock.NewController(t)
- //mockLogger := mocks.NewMockLogger(controller)
- //
- //// general
- //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
- //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
- //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
- //
- //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- //
- //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
-
- //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Info("boltdb listener stopped").Times(4)
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- //mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -132,30 +133,29 @@ func TestBoltDBDeclare(t *testing.T) {
Prefix: "rr",
}
- //controller := gomock.NewController(t)
- //mockLogger := mocks.NewMockLogger(controller)
- //
- //// general
- //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
- //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
- //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
- //
- //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
- //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
- //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
- //
- //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1)
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("boltdb listener stopped").Times(2)
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- //mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -233,31 +233,30 @@ func TestBoltDBJobsError(t *testing.T) {
Prefix: "rr",
}
- //controller := gomock.NewController(t)
- //mockLogger := mocks.NewMockLogger(controller)
- //
- //// general
- //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
- //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
- //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
- //
- //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
- //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
- //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
- //
- //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
- //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1)
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("boltdb listener stopped").Times(2)
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- //mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -364,29 +363,28 @@ func TestBoltDBStats(t *testing.T) {
Prefix: "rr",
}
- //controller := gomock.NewController(t)
- //mockLogger := mocks.NewMockLogger(controller)
- //
- //// general
- //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
- //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
- //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
- //
- //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
- //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
- //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
- //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
- //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes()
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("boltdb listener stopped").AnyTimes()
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
- //mockLogger,
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},