summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/amqp/amqpjobs/consumer.go146
-rw-r--r--plugins/amqp/amqpjobs/item.go18
-rw-r--r--plugins/amqp/amqpjobs/listener.go10
-rw-r--r--plugins/amqp/amqpjobs/rabbit_init.go16
-rw-r--r--plugins/amqp/amqpjobs/redial.go68
-rw-r--r--plugins/boltdb/boltjobs/config.go37
-rw-r--r--plugins/boltdb/boltjobs/consumer.go225
-rw-r--r--plugins/boltdb/boltjobs/listener.go24
-rw-r--r--plugins/jobs/job/job.go (renamed from plugins/jobs/job/general.go)33
-rw-r--r--plugins/jobs/job/job_options.go32
-rw-r--r--plugins/jobs/job/job_test.go (renamed from plugins/jobs/job/job_options_test.go)0
-rw-r--r--plugins/jobs/plugin.go2
12 files changed, 403 insertions, 208 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 1931ceaa..f1b4d54f 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -218,17 +218,17 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return jb, nil
}
-func (j *consumer) Push(ctx context.Context, job *job.Job) error {
+func (c *consumer) Push(ctx context.Context, job *job.Job) error {
const op = errors.Op("rabbitmq_push")
// check if the pipeline registered
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != job.Options.Pipeline {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name()))
}
- err := j.handleItem(ctx, fromJob(job))
+ err := c.handleItem(ctx, fromJob(job))
if err != nil {
return errors.E(op, err)
}
@@ -236,38 +236,38 @@ func (j *consumer) Push(ctx context.Context, job *job.Job) error {
return nil
}
-func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- j.pipeline.Store(p)
+func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ c.pipeline.Store(p)
return nil
}
-func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("rabbit_consume")
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_run")
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
// protect connection (redial)
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
var err error
- j.consumeChan, err = j.conn.Channel()
+ c.consumeChan, err = c.conn.Channel()
if err != nil {
return errors.E(op, err)
}
- err = j.consumeChan.Qos(j.prefetch, 0, false)
+ err = c.consumeChan.Qos(c.prefetch, 0, false)
if err != nil {
return errors.E(op, err)
}
// start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
false,
false,
false,
@@ -279,9 +279,11 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
}
// run listener
- j.listener(deliv)
+ c.listener(deliv)
- j.eh.Push(events.JobEvent{
+ atomic.StoreUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -291,28 +293,28 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
+func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("amqp_driver_state")
select {
- case pch := <-j.publishChan:
+ case pch := <-c.publishChan:
defer func() {
- j.publishChan <- pch
+ c.publishChan <- pch
}()
- q, err := pch.QueueInspect(j.queue)
+ q, err := pch.QueueInspect(c.queue)
if err != nil {
return nil, errors.E(op, err)
}
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
return &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Queue: q.Name,
Active: int64(q.Messages),
- Delayed: atomic.LoadInt64(j.delayed),
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Delayed: atomic.LoadInt64(c.delayed),
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}, nil
case <-ctx.Done():
@@ -320,37 +322,37 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
}
}
-func (j *consumer) Pause(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Pause(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested pause on: ", p)
+ c.log.Error("no such pipeline", "requested pause on: ", p)
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// protect connection (redial)
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
- err := j.consumeChan.Cancel(j.consumeID, true)
+ err := c.consumeChan.Cancel(c.consumeID, true)
if err != nil {
- j.log.Error("cancel publish channel, forcing close", "error", err)
- errCl := j.consumeChan.Close()
+ c.log.Error("cancel publish channel, forcing close", "error", err)
+ errCl := c.consumeChan.Close()
if errCl != nil {
- j.log.Error("force close failed", "error", err)
+ c.log.Error("force close failed", "error", err)
return
}
return
}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -358,40 +360,40 @@ func (j *consumer) Pause(_ context.Context, p string) {
})
}
-func (j *consumer) Resume(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Resume(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested resume on: ", p)
+ c.log.Error("no such pipeline", "requested resume on: ", p)
}
// protect connection (redial)
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 1 {
- j.log.Warn("amqp listener already in the active state")
+ c.log.Warn("amqp listener already in the active state")
return
}
var err error
- j.consumeChan, err = j.conn.Channel()
+ c.consumeChan, err = c.conn.Channel()
if err != nil {
- j.log.Error("create channel on rabbitmq connection", "error", err)
+ c.log.Error("create channel on rabbitmq connection", "error", err)
return
}
- err = j.consumeChan.Qos(j.prefetch, 0, false)
+ err = c.consumeChan.Qos(c.prefetch, 0, false)
if err != nil {
- j.log.Error("qos set failed", "error", err)
+ c.log.Error("qos set failed", "error", err)
return
}
// start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
false,
false,
false,
@@ -399,17 +401,17 @@ func (j *consumer) Resume(_ context.Context, p string) {
nil,
)
if err != nil {
- j.log.Error("consume operation failed", "error", err)
+ c.log.Error("consume operation failed", "error", err)
return
}
// run listener
- j.listener(deliv)
+ c.listener(deliv)
// increase number of listeners
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -417,11 +419,11 @@ func (j *consumer) Resume(_ context.Context, p string) {
})
}
-func (j *consumer) Stop(context.Context) error {
- j.stopCh <- struct{}{}
+func (c *consumer) Stop(context.Context) error {
+ c.stopCh <- struct{}{}
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -431,13 +433,13 @@ func (j *consumer) Stop(context.Context) error {
}
// handleItem
-func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("rabbitmq_handle_item")
select {
- case pch := <-j.publishChan:
+ case pch := <-c.publishChan:
// return the channel back
defer func() {
- j.publishChan <- pch
+ c.publishChan <- pch
}()
// convert
@@ -449,30 +451,30 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("rabbitmq_handle_item")
// handle timeouts
if msg.Options.DelayDuration() > 0 {
- atomic.AddInt64(j.delayed, 1)
+ atomic.AddInt64(c.delayed, 1)
// TODO declare separate method for this if condition
// TODO dlx cache channel??
delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, c.exchangeName, c.queue)
_, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
+ dlx: c.exchangeName,
+ dlxRoutingKey: c.routingKey,
dlxTTL: delayMs,
dlxExpires: delayMs * 2,
})
if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
+ atomic.AddInt64(c.delayed, ^int64(0))
return errors.E(op, err)
}
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ err = pch.QueueBind(tmpQ, tmpQ, c.exchangeName, false, nil)
if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
+ atomic.AddInt64(c.delayed, ^int64(0))
return errors.E(op, err)
}
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ err = pch.Publish(c.exchangeName, tmpQ, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
Timestamp: time.Now().UTC(),
@@ -481,7 +483,7 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
})
if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
+ atomic.AddInt64(c.delayed, ^int64(0))
return errors.E(op, err)
}
@@ -489,7 +491,7 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
}
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ err = pch.Publish(c.exchangeName, c.routingKey, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
Timestamp: time.Now(),
diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go
index a8e305ea..66b70a36 100644
--- a/plugins/amqp/amqpjobs/item.go
+++ b/plugins/amqp/amqpjobs/item.go
@@ -139,9 +139,9 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
-func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
+func (c *consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
- item, err := j.unpack(d)
+ item, err := c.unpack(d)
if err != nil {
return nil, errors.E(op, err)
}
@@ -156,10 +156,10 @@ func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
- item.Options.delayed = j.delayed
+ item.Options.delayed = c.delayed
// requeue func
- item.Options.requeueFn = j.handleItem
+ item.Options.requeueFn = c.handleItem
return i, nil
}
@@ -194,11 +194,11 @@ func pack(id string, j *Item) (amqp.Table, error) {
}
// unpack restores jobs.Options
-func (j *consumer) unpack(d amqp.Delivery) (*Item, error) {
+func (c *consumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
- multipleAsk: j.multipleAck,
- requeue: j.requeueOnFail,
- requeueFn: j.handleItem,
+ multipleAsk: c.multipleAck,
+ requeue: c.requeueOnFail,
+ requeueFn: c.handleItem,
}}
if _, ok := d.Headers[job.RRID].(string); !ok {
@@ -230,7 +230,7 @@ func (j *consumer) unpack(d amqp.Delivery) (*Item, error) {
if _, ok := d.Headers[job.RRPriority]; !ok {
// set pipe's priority
- item.Options.Priority = j.priority
+ item.Options.Priority = c.priority
} else {
item.Options.Priority = d.Headers[job.RRPriority].(int64)
}
diff --git a/plugins/amqp/amqpjobs/listener.go b/plugins/amqp/amqpjobs/listener.go
index 0156d55c..75c61cad 100644
--- a/plugins/amqp/amqpjobs/listener.go
+++ b/plugins/amqp/amqpjobs/listener.go
@@ -2,23 +2,23 @@ package amqpjobs
import amqp "github.com/rabbitmq/amqp091-go"
-func (j *consumer) listener(deliv <-chan amqp.Delivery) {
+func (c *consumer) listener(deliv <-chan amqp.Delivery) {
go func() {
for { //nolint:gosimple
select {
case msg, ok := <-deliv:
if !ok {
- j.log.Info("delivery channel closed, leaving the rabbit listener")
+ c.log.Info("delivery channel closed, leaving the rabbit listener")
return
}
- d, err := j.fromDelivery(msg)
+ d, err := c.fromDelivery(msg)
if err != nil {
- j.log.Error("amqp delivery convert", "error", err)
+ c.log.Error("amqp delivery convert", "error", err)
continue
}
// insert job into the main priority queue
- j.pq.Insert(d)
+ c.pq.Insert(d)
}
}
}()
diff --git a/plugins/amqp/amqpjobs/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go
index e260fabe..fb5f6911 100644
--- a/plugins/amqp/amqpjobs/rabbit_init.go
+++ b/plugins/amqp/amqpjobs/rabbit_init.go
@@ -4,20 +4,20 @@ import (
"github.com/spiral/errors"
)
-func (j *consumer) initRabbitMQ() error {
+func (c *consumer) initRabbitMQ() error {
const op = errors.Op("jobs_plugin_rmq_init")
// Channel opens a unique, concurrent server channel to process the bulk of AMQP
// messages. Any error from methods on this receiver will render the receiver
// invalid and a new Channel should be opened.
- channel, err := j.conn.Channel()
+ channel, err := c.conn.Channel()
if err != nil {
return errors.E(op, err)
}
// declare an exchange (idempotent operation)
err = channel.ExchangeDeclare(
- j.exchangeName,
- j.exchangeType,
+ c.exchangeName,
+ c.exchangeType,
true,
false,
false,
@@ -30,10 +30,10 @@ func (j *consumer) initRabbitMQ() error {
// verify or declare a queue
q, err := channel.QueueDeclare(
- j.queue,
+ c.queue,
false,
false,
- j.exclusive,
+ c.exclusive,
false,
nil,
)
@@ -44,8 +44,8 @@ func (j *consumer) initRabbitMQ() error {
// bind queue to the exchange
err = channel.QueueBind(
q.Name,
- j.routingKey,
- j.exchangeName,
+ c.routingKey,
+ c.exchangeName,
false,
nil,
)
diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go
index 0835e3ea..56142e2b 100644
--- a/plugins/amqp/amqpjobs/redial.go
+++ b/plugins/amqp/amqpjobs/redial.go
@@ -11,26 +11,26 @@ import (
)
// redialer used to redial to the rabbitmq in case of the connection interrupts
-func (j *consumer) redialer() { //nolint:gocognit
+func (c *consumer) redialer() { //nolint:gocognit
go func() {
const op = errors.Op("rabbitmq_redial")
for {
select {
- case err := <-j.conn.NotifyClose(make(chan *amqp.Error)):
+ case err := <-c.conn.NotifyClose(make(chan *amqp.Error)):
if err == nil {
return
}
- j.Lock()
+ c.Lock()
// trash the broken publishing channel
- <-j.publishChan
+ <-c.publishChan
t := time.Now()
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeError,
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
@@ -40,41 +40,41 @@ func (j *consumer) redialer() { //nolint:gocognit
expb := backoff.NewExponentialBackOff()
// set the retry timeout (minutes)
- expb.MaxElapsedTime = j.retryTimeout
+ expb.MaxElapsedTime = c.retryTimeout
operation := func() error {
- j.log.Warn("rabbitmq reconnecting, caused by", "error", err)
+ c.log.Warn("rabbitmq reconnecting, caused by", "error", err)
var dialErr error
- j.conn, dialErr = amqp.Dial(j.connStr)
+ c.conn, dialErr = amqp.Dial(c.connStr)
if dialErr != nil {
return errors.E(op, dialErr)
}
- j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
+ c.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
// re-init connection
- errInit := j.initRabbitMQ()
+ errInit := c.initRabbitMQ()
if errInit != nil {
- j.log.Error("rabbitmq dial", "error", errInit)
+ c.log.Error("rabbitmq dial", "error", errInit)
return errInit
}
// redeclare consume channel
var errConnCh error
- j.consumeChan, errConnCh = j.conn.Channel()
+ c.consumeChan, errConnCh = c.conn.Channel()
if errConnCh != nil {
return errors.E(op, errConnCh)
}
// redeclare publish channel
- pch, errPubCh := j.conn.Channel()
+ pch, errPubCh := c.conn.Channel()
if errPubCh != nil {
return errors.E(op, errPubCh)
}
// start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
false,
false,
false,
@@ -86,23 +86,23 @@ func (j *consumer) redialer() { //nolint:gocognit
}
// put the fresh publishing channel
- j.publishChan <- pch
+ c.publishChan <- pch
// restart listener
- j.listener(deliv)
+ c.listener(deliv)
- j.log.Info("queues and subscribers redeclared successfully")
+ c.log.Info("queues and subscribers redeclared successfully")
return nil
}
retryErr := backoff.Retry(operation, expb)
if retryErr != nil {
- j.Unlock()
- j.log.Error("backoff failed", "error", retryErr)
+ c.Unlock()
+ c.log.Error("backoff failed", "error", retryErr)
return
}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
@@ -110,27 +110,27 @@ func (j *consumer) redialer() { //nolint:gocognit
Elapsed: time.Since(t),
})
- j.Unlock()
+ c.Unlock()
- case <-j.stopCh:
- if j.publishChan != nil {
- pch := <-j.publishChan
+ case <-c.stopCh:
+ if c.publishChan != nil {
+ pch := <-c.publishChan
err := pch.Close()
if err != nil {
- j.log.Error("publish channel close", "error", err)
+ c.log.Error("publish channel close", "error", err)
}
}
- if j.consumeChan != nil {
- err := j.consumeChan.Close()
+ if c.consumeChan != nil {
+ err := c.consumeChan.Close()
if err != nil {
- j.log.Error("consume channel close", "error", err)
+ c.log.Error("consume channel close", "error", err)
}
}
- if j.conn != nil {
- err := j.conn.Close()
+ if c.conn != nil {
+ err := c.conn.Close()
if err != nil {
- j.log.Error("amqp connection close", "error", err)
+ c.log.Error("amqp connection close", "error", err)
}
}
diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go
index 013e30bf..8cc098c1 100644
--- a/plugins/boltdb/boltjobs/config.go
+++ b/plugins/boltdb/boltjobs/config.go
@@ -1,16 +1,39 @@
package boltjobs
-type Config struct {
- // File is boltDB file. No need to create it by your own,
- // boltdb driver is able to create the file, or read existing
- File string
- // Bucket to store data in boltDB
- bucket string
+const (
+ file string = "file"
+ priority string = "priority"
+ prefetch string = "prefetch"
+)
+
+type GlobalCfg struct {
// db file permissions
- Permissions int
+ Permissions int `mapstructure:"permissions"`
// consume timeout
}
+func (c *GlobalCfg) InitDefaults() {
+ if c.Permissions == 0 {
+ c.Permissions = 0777
+ }
+}
+
+type Config struct {
+ File string `mapstructure:"file"`
+ Priority int `mapstructure:"priority"`
+ Prefetch int `mapstructure:"prefetch"`
+}
+
func (c *Config) InitDefaults() {
+ if c.File == "" {
+ c.File = "rr.db"
+ }
+
+ if c.Priority == 0 {
+ c.Priority = 10
+ }
+ if c.Prefetch == 0 {
+ c.Prefetch = 1000
+ }
}
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index a8db2f30..67a6d3e7 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -1,11 +1,14 @@
package boltjobs
import (
+ "bytes"
"context"
+ "encoding/gob"
"os"
"sync/atomic"
"time"
+ "github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -20,19 +23,27 @@ import (
const (
PluginName = "boltdb"
+
+ PushBucket = "push"
+ InQueueBucket = "processing"
+ DoneBucket = "done"
)
type consumer struct {
- // bbolt configuration
file string
permissions int
- bucket string
- db *bolt.DB
+ priority int
+ prefetch int
+
+ db *bolt.DB
+
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+ listeners uint32
+ pipeline atomic.Value
- log logger.Logger
- eh events.Handler
- pq priorityqueue.Queue
- pipe atomic.Value
+ stopCh chan struct{}
}
func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
@@ -47,26 +58,88 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
return nil, errors.E(op, errors.Str("no global boltdb configuration"))
}
- conf := &Config{}
+ conf := &GlobalCfg{}
- err := cfg.UnmarshalKey(configKey, conf)
+ err := cfg.UnmarshalKey(PluginName, conf)
if err != nil {
return nil, errors.E(op, err)
}
- // add default values
+ localCfg := &Config{}
+ err = cfg.UnmarshalKey(configKey, localCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ localCfg.InitDefaults()
conf.InitDefaults()
- c := &consumer{
- file: conf.File,
+
+ db, err := bolt.Open(localCfg.File, os.FileMode(conf.Permissions), &bolt.Options{
+ Timeout: time.Second * 20,
+ NoGrowSync: false,
+ NoFreelistSync: false,
+ ReadOnly: false,
+ NoSync: false,
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ const upOp = errors.Op("boltdb_plugin_update")
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return &consumer{
permissions: conf.Permissions,
- bucket: conf.bucket,
+ file: localCfg.File,
+ priority: localCfg.Priority,
+ prefetch: localCfg.Prefetch,
+
+ db: db,
+ log: log,
+ eh: e,
+ pq: pq,
+ stopCh: make(chan struct{}, 1),
+ }, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("init_boltdb_jobs")
- log: log,
- eh: e,
- pq: pq,
+ // if no global section
+ if !cfg.Has(PluginName) {
+ return nil, errors.E(op, errors.Str("no global boltdb configuration"))
}
- db, err := bolt.Open(c.file, os.FileMode(c.permissions), &bolt.Options{
+ conf := &GlobalCfg{}
+ err := cfg.UnmarshalKey(PluginName, conf)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // add default values
+ conf.InitDefaults()
+
+ db, err := bolt.Open(pipeline.String(file, "rr.db"), os.FileMode(conf.Permissions), &bolt.Options{
Timeout: time.Second * 20,
NoGrowSync: false,
NoFreelistSync: false,
@@ -78,51 +151,135 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
return nil, errors.E(op, err)
}
- c.db = db
-
// create bucket if it does not exist
// tx.Commit invokes via the db.Update
err = db.Update(func(tx *bolt.Tx) error {
const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(c.bucket))
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+ if err != nil {
+ return errors.E(op, upOp)
+ }
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
if err != nil {
return errors.E(op, upOp)
}
return nil
})
- return c, nil
-}
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
- return &consumer{}, nil
+ return &consumer{
+ file: pipeline.String(file, "rr.db"),
+ priority: pipeline.Int(priority, 10),
+ prefetch: pipeline.Int(prefetch, 100),
+ permissions: conf.Permissions,
+
+ db: db,
+ log: log,
+ eh: e,
+ pq: pq,
+ stopCh: make(chan struct{}, 1),
+ }, nil
}
func (c *consumer) Push(ctx context.Context, job *job.Job) error {
- panic("implement me")
+ const op = errors.Op("boltdb_jobs_push")
+ err := c.db.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket(utils.AsBytes(PushBucket))
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err := enc.Encode(job)
+ if err != nil {
+ return err
+ }
+
+ return b.Put(utils.AsBytes(uuid.NewString()), buf.Bytes())
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
}
func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
- c.pipe.Store(pipeline)
+ c.pipeline.Store(pipeline)
return nil
}
-func (c *consumer) Run(_ context.Context, pipeline *pipeline.Pipeline) error {
- panic("implement me")
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("boltdb_run")
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+ return nil
}
func (c *consumer) Stop(ctx context.Context) error {
- panic("implement me")
+ return nil
}
-func (c *consumer) Pause(ctx context.Context, pipeline string) {
- panic("implement me")
+func (c *consumer) Pause(ctx context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ l := atomic.LoadUint32(&c.listeners)
+ // no active listeners
+ if l == 0 {
+ c.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ c.stopCh <- struct{}{}
+
+ atomic.AddUint32(&c.listeners, ^uint32(0))
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipePaused,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
-func (c *consumer) Resume(ctx context.Context, pipeline string) {
- panic("implement me")
+func (c *consumer) Resume(ctx context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested resume on: ", p)
+ }
+
+ l := atomic.LoadUint32(&c.listeners)
+ // no active listeners
+ if l == 1 {
+ c.log.Warn("amqp listener already in the active state")
+ return
+ }
+
+ // run listener
+ go c.listener()
+
+ // increase number of listeners
+ atomic.AddUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
}
func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
- panic("implement me")
+ return nil, nil
}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 4a8d6cd9..2ee06088 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -1,22 +1,34 @@
package boltjobs
-import "time"
+import (
+ "fmt"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/utils"
+)
func (c *consumer) listener() {
tt := time.NewTicker(time.Second)
for {
select {
+ case <-c.stopCh:
+ c.log.Warn("boltdb listener stopped")
+ return
case <-tt.C:
tx, err := c.db.Begin(false)
if err != nil {
panic(err)
}
- // cursor := tx.Cursor()
- err = tx.Commit()
- if err != nil {
- panic(err)
- }
+ b := tx.Bucket(utils.AsBytes(PushBucket))
+
+ cursor := b.Cursor()
+
+ k, v := cursor.First()
+ _ = k
+ _ = v
+
+ fmt.Println("foo")
}
}
}
diff --git a/plugins/jobs/job/general.go b/plugins/jobs/job/job.go
index 390f44b5..06c3254e 100644
--- a/plugins/jobs/job/general.go
+++ b/plugins/jobs/job/job.go
@@ -1,5 +1,9 @@
package job
+import (
+ "time"
+)
+
// constant keys to pack/unpack messages from different drivers
const (
RRID string = "rr_id"
@@ -27,3 +31,32 @@ type Job struct {
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+}
+
+// Merge merges job options.
+func (o *Options) Merge(from *Options) {
+ if o.Pipeline == "" {
+ o.Pipeline = from.Pipeline
+ }
+
+ if o.Delay == 0 {
+ o.Delay = from.Delay
+ }
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
diff --git a/plugins/jobs/job/job_options.go b/plugins/jobs/job/job_options.go
deleted file mode 100644
index b7e4ed36..00000000
--- a/plugins/jobs/job/job_options.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package job
-
-import "time"
-
-// Options carry information about how to handle given job.
-type Options struct {
- // Priority is job priority, default - 10
- // pointer to distinguish 0 as a priority and nil as priority not set
- Priority int64 `json:"priority"`
-
- // Pipeline manually specified pipeline.
- Pipeline string `json:"pipeline,omitempty"`
-
- // Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
-}
-
-// Merge merges job options.
-func (o *Options) Merge(from *Options) {
- if o.Pipeline == "" {
- o.Pipeline = from.Pipeline
- }
-
- if o.Delay == 0 {
- o.Delay = from.Delay
- }
-}
-
-// DelayDuration returns delay duration in a form of time.Duration.
-func (o *Options) DelayDuration() time.Duration {
- return time.Second * time.Duration(o.Delay)
-}
diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_test.go
index a47151a3..a47151a3 100644
--- a/plugins/jobs/job/job_options_test.go
+++ b/plugins/jobs/job/job_test.go
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 5e62c5c5..a0b477f9 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -178,7 +178,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
})
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"})
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs})
if err != nil {
errCh <- err
return errCh