diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go | 146 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/item.go | 18 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/listener.go | 10 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/rabbit_init.go | 16 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/redial.go | 68 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/config.go | 37 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 225 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 24 | ||||
-rw-r--r-- | plugins/jobs/job/job.go (renamed from plugins/jobs/job/general.go) | 33 | ||||
-rw-r--r-- | plugins/jobs/job/job_options.go | 32 | ||||
-rw-r--r-- | plugins/jobs/job/job_test.go (renamed from plugins/jobs/job/job_options_test.go) | 0 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 2 |
12 files changed, 403 insertions, 208 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 1931ceaa..f1b4d54f 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -218,17 +218,17 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return jb, nil } -func (j *consumer) Push(ctx context.Context, job *job.Job) error { +func (c *consumer) Push(ctx context.Context, job *job.Job) error { const op = errors.Op("rabbitmq_push") // check if the pipeline registered // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != job.Options.Pipeline { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) } - err := j.handleItem(ctx, fromJob(job)) + err := c.handleItem(ctx, fromJob(job)) if err != nil { return errors.E(op, err) } @@ -236,38 +236,38 @@ func (j *consumer) Push(ctx context.Context, job *job.Job) error { return nil } -func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { - j.pipeline.Store(p) +func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { + c.pipeline.Store(p) return nil } -func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("rabbit_consume") +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("rabbit_run") - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } // protect connection (redial) - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() var err error - j.consumeChan, err = j.conn.Channel() + c.consumeChan, err = c.conn.Channel() if err != nil { return errors.E(op, err) } - err = j.consumeChan.Qos(j.prefetch, 0, false) + err = c.consumeChan.Qos(c.prefetch, 0, false) if err != nil { return errors.E(op, err) } // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, + deliv, err := c.consumeChan.Consume( + c.queue, + c.consumeID, false, false, false, @@ -279,9 +279,11 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { } // run listener - j.listener(deliv) + c.listener(deliv) - j.eh.Push(events.JobEvent{ + atomic.StoreUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -291,28 +293,28 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *consumer) State(ctx context.Context) (*jobState.State, error) { +func (c *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("amqp_driver_state") select { - case pch := <-j.publishChan: + case pch := <-c.publishChan: defer func() { - j.publishChan <- pch + c.publishChan <- pch }() - q, err := pch.QueueInspect(j.queue) + q, err := pch.QueueInspect(c.queue) if err != nil { return nil, errors.E(op, err) } - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) return &jobState.State{ Pipeline: pipe.Name(), Driver: pipe.Driver(), Queue: q.Name, Active: int64(q.Messages), - Delayed: atomic.LoadInt64(j.delayed), - Ready: ready(atomic.LoadUint32(&j.listeners)), + Delayed: atomic.LoadInt64(c.delayed), + Ready: ready(atomic.LoadUint32(&c.listeners)), }, nil case <-ctx.Done(): @@ -320,37 +322,37 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { } } -func (j *consumer) Pause(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested pause on: ", p) + c.log.Error("no such pipeline", "requested pause on: ", p) } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 0 { - j.log.Warn("no active listeners, nothing to pause") + c.log.Warn("no active listeners, nothing to pause") return } - atomic.AddUint32(&j.listeners, ^uint32(0)) + atomic.AddUint32(&c.listeners, ^uint32(0)) // protect connection (redial) - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() - err := j.consumeChan.Cancel(j.consumeID, true) + err := c.consumeChan.Cancel(c.consumeID, true) if err != nil { - j.log.Error("cancel publish channel, forcing close", "error", err) - errCl := j.consumeChan.Close() + c.log.Error("cancel publish channel, forcing close", "error", err) + errCl := c.consumeChan.Close() if errCl != nil { - j.log.Error("force close failed", "error", err) + c.log.Error("force close failed", "error", err) return } return } - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -358,40 +360,40 @@ func (j *consumer) Pause(_ context.Context, p string) { }) } -func (j *consumer) Resume(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested resume on: ", p) + c.log.Error("no such pipeline", "requested resume on: ", p) } // protect connection (redial) - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 1 { - j.log.Warn("amqp listener already in the active state") + c.log.Warn("amqp listener already in the active state") return } var err error - j.consumeChan, err = j.conn.Channel() + c.consumeChan, err = c.conn.Channel() if err != nil { - j.log.Error("create channel on rabbitmq connection", "error", err) + c.log.Error("create channel on rabbitmq connection", "error", err) return } - err = j.consumeChan.Qos(j.prefetch, 0, false) + err = c.consumeChan.Qos(c.prefetch, 0, false) if err != nil { - j.log.Error("qos set failed", "error", err) + c.log.Error("qos set failed", "error", err) return } // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, + deliv, err := c.consumeChan.Consume( + c.queue, + c.consumeID, false, false, false, @@ -399,17 +401,17 @@ func (j *consumer) Resume(_ context.Context, p string) { nil, ) if err != nil { - j.log.Error("consume operation failed", "error", err) + c.log.Error("consume operation failed", "error", err) return } // run listener - j.listener(deliv) + c.listener(deliv) // increase number of listeners - atomic.AddUint32(&j.listeners, 1) + atomic.AddUint32(&c.listeners, 1) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -417,11 +419,11 @@ func (j *consumer) Resume(_ context.Context, p string) { }) } -func (j *consumer) Stop(context.Context) error { - j.stopCh <- struct{}{} +func (c *consumer) Stop(context.Context) error { + c.stopCh <- struct{}{} - pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -431,13 +433,13 @@ func (j *consumer) Stop(context.Context) error { } // handleItem -func (j *consumer) handleItem(ctx context.Context, msg *Item) error { +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("rabbitmq_handle_item") select { - case pch := <-j.publishChan: + case pch := <-c.publishChan: // return the channel back defer func() { - j.publishChan <- pch + c.publishChan <- pch }() // convert @@ -449,30 +451,30 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("rabbitmq_handle_item") // handle timeouts if msg.Options.DelayDuration() > 0 { - atomic.AddInt64(j.delayed, 1) + atomic.AddInt64(c.delayed, 1) // TODO declare separate method for this if condition // TODO dlx cache channel?? delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, c.exchangeName, c.queue) _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, + dlx: c.exchangeName, + dlxRoutingKey: c.routingKey, dlxTTL: delayMs, dlxExpires: delayMs * 2, }) if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) + atomic.AddInt64(c.delayed, ^int64(0)) return errors.E(op, err) } - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + err = pch.QueueBind(tmpQ, tmpQ, c.exchangeName, false, nil) if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) + atomic.AddInt64(c.delayed, ^int64(0)) return errors.E(op, err) } // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + err = pch.Publish(c.exchangeName, tmpQ, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, Timestamp: time.Now().UTC(), @@ -481,7 +483,7 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error { }) if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) + atomic.AddInt64(c.delayed, ^int64(0)) return errors.E(op, err) } @@ -489,7 +491,7 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error { } // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + err = pch.Publish(c.exchangeName, c.routingKey, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, Timestamp: time.Now(), diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go index a8e305ea..66b70a36 100644 --- a/plugins/amqp/amqpjobs/item.go +++ b/plugins/amqp/amqpjobs/item.go @@ -139,9 +139,9 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { } // fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ -func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { +func (c *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { const op = errors.Op("from_delivery_convert") - item, err := j.unpack(d) + item, err := c.unpack(d) if err != nil { return nil, errors.E(op, err) } @@ -156,10 +156,10 @@ func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack - item.Options.delayed = j.delayed + item.Options.delayed = c.delayed // requeue func - item.Options.requeueFn = j.handleItem + item.Options.requeueFn = c.handleItem return i, nil } @@ -194,11 +194,11 @@ func pack(id string, j *Item) (amqp.Table, error) { } // unpack restores jobs.Options -func (j *consumer) unpack(d amqp.Delivery) (*Item, error) { +func (c *consumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ - multipleAsk: j.multipleAck, - requeue: j.requeueOnFail, - requeueFn: j.handleItem, + multipleAsk: c.multipleAck, + requeue: c.requeueOnFail, + requeueFn: c.handleItem, }} if _, ok := d.Headers[job.RRID].(string); !ok { @@ -230,7 +230,7 @@ func (j *consumer) unpack(d amqp.Delivery) (*Item, error) { if _, ok := d.Headers[job.RRPriority]; !ok { // set pipe's priority - item.Options.Priority = j.priority + item.Options.Priority = c.priority } else { item.Options.Priority = d.Headers[job.RRPriority].(int64) } diff --git a/plugins/amqp/amqpjobs/listener.go b/plugins/amqp/amqpjobs/listener.go index 0156d55c..75c61cad 100644 --- a/plugins/amqp/amqpjobs/listener.go +++ b/plugins/amqp/amqpjobs/listener.go @@ -2,23 +2,23 @@ package amqpjobs import amqp "github.com/rabbitmq/amqp091-go" -func (j *consumer) listener(deliv <-chan amqp.Delivery) { +func (c *consumer) listener(deliv <-chan amqp.Delivery) { go func() { for { //nolint:gosimple select { case msg, ok := <-deliv: if !ok { - j.log.Info("delivery channel closed, leaving the rabbit listener") + c.log.Info("delivery channel closed, leaving the rabbit listener") return } - d, err := j.fromDelivery(msg) + d, err := c.fromDelivery(msg) if err != nil { - j.log.Error("amqp delivery convert", "error", err) + c.log.Error("amqp delivery convert", "error", err) continue } // insert job into the main priority queue - j.pq.Insert(d) + c.pq.Insert(d) } } }() diff --git a/plugins/amqp/amqpjobs/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go index e260fabe..fb5f6911 100644 --- a/plugins/amqp/amqpjobs/rabbit_init.go +++ b/plugins/amqp/amqpjobs/rabbit_init.go @@ -4,20 +4,20 @@ import ( "github.com/spiral/errors" ) -func (j *consumer) initRabbitMQ() error { +func (c *consumer) initRabbitMQ() error { const op = errors.Op("jobs_plugin_rmq_init") // Channel opens a unique, concurrent server channel to process the bulk of AMQP // messages. Any error from methods on this receiver will render the receiver // invalid and a new Channel should be opened. - channel, err := j.conn.Channel() + channel, err := c.conn.Channel() if err != nil { return errors.E(op, err) } // declare an exchange (idempotent operation) err = channel.ExchangeDeclare( - j.exchangeName, - j.exchangeType, + c.exchangeName, + c.exchangeType, true, false, false, @@ -30,10 +30,10 @@ func (j *consumer) initRabbitMQ() error { // verify or declare a queue q, err := channel.QueueDeclare( - j.queue, + c.queue, false, false, - j.exclusive, + c.exclusive, false, nil, ) @@ -44,8 +44,8 @@ func (j *consumer) initRabbitMQ() error { // bind queue to the exchange err = channel.QueueBind( q.Name, - j.routingKey, - j.exchangeName, + c.routingKey, + c.exchangeName, false, nil, ) diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 0835e3ea..56142e2b 100644 --- a/plugins/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -11,26 +11,26 @@ import ( ) // redialer used to redial to the rabbitmq in case of the connection interrupts -func (j *consumer) redialer() { //nolint:gocognit +func (c *consumer) redialer() { //nolint:gocognit go func() { const op = errors.Op("rabbitmq_redial") for { select { - case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): + case err := <-c.conn.NotifyClose(make(chan *amqp.Error)): if err == nil { return } - j.Lock() + c.Lock() // trash the broken publishing channel - <-j.publishChan + <-c.publishChan t := time.Now() - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeError, Pipeline: pipe.Name(), Driver: pipe.Driver(), @@ -40,41 +40,41 @@ func (j *consumer) redialer() { //nolint:gocognit expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) - expb.MaxElapsedTime = j.retryTimeout + expb.MaxElapsedTime = c.retryTimeout operation := func() error { - j.log.Warn("rabbitmq reconnecting, caused by", "error", err) + c.log.Warn("rabbitmq reconnecting, caused by", "error", err) var dialErr error - j.conn, dialErr = amqp.Dial(j.connStr) + c.conn, dialErr = amqp.Dial(c.connStr) if dialErr != nil { return errors.E(op, dialErr) } - j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + c.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") // re-init connection - errInit := j.initRabbitMQ() + errInit := c.initRabbitMQ() if errInit != nil { - j.log.Error("rabbitmq dial", "error", errInit) + c.log.Error("rabbitmq dial", "error", errInit) return errInit } // redeclare consume channel var errConnCh error - j.consumeChan, errConnCh = j.conn.Channel() + c.consumeChan, errConnCh = c.conn.Channel() if errConnCh != nil { return errors.E(op, errConnCh) } // redeclare publish channel - pch, errPubCh := j.conn.Channel() + pch, errPubCh := c.conn.Channel() if errPubCh != nil { return errors.E(op, errPubCh) } // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, + deliv, err := c.consumeChan.Consume( + c.queue, + c.consumeID, false, false, false, @@ -86,23 +86,23 @@ func (j *consumer) redialer() { //nolint:gocognit } // put the fresh publishing channel - j.publishChan <- pch + c.publishChan <- pch // restart listener - j.listener(deliv) + c.listener(deliv) - j.log.Info("queues and subscribers redeclared successfully") + c.log.Info("queues and subscribers redeclared successfully") return nil } retryErr := backoff.Retry(operation, expb) if retryErr != nil { - j.Unlock() - j.log.Error("backoff failed", "error", retryErr) + c.Unlock() + c.log.Error("backoff failed", "error", retryErr) return } - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), Driver: pipe.Driver(), @@ -110,27 +110,27 @@ func (j *consumer) redialer() { //nolint:gocognit Elapsed: time.Since(t), }) - j.Unlock() + c.Unlock() - case <-j.stopCh: - if j.publishChan != nil { - pch := <-j.publishChan + case <-c.stopCh: + if c.publishChan != nil { + pch := <-c.publishChan err := pch.Close() if err != nil { - j.log.Error("publish channel close", "error", err) + c.log.Error("publish channel close", "error", err) } } - if j.consumeChan != nil { - err := j.consumeChan.Close() + if c.consumeChan != nil { + err := c.consumeChan.Close() if err != nil { - j.log.Error("consume channel close", "error", err) + c.log.Error("consume channel close", "error", err) } } - if j.conn != nil { - err := j.conn.Close() + if c.conn != nil { + err := c.conn.Close() if err != nil { - j.log.Error("amqp connection close", "error", err) + c.log.Error("amqp connection close", "error", err) } } diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go index 013e30bf..8cc098c1 100644 --- a/plugins/boltdb/boltjobs/config.go +++ b/plugins/boltdb/boltjobs/config.go @@ -1,16 +1,39 @@ package boltjobs -type Config struct { - // File is boltDB file. No need to create it by your own, - // boltdb driver is able to create the file, or read existing - File string - // Bucket to store data in boltDB - bucket string +const ( + file string = "file" + priority string = "priority" + prefetch string = "prefetch" +) + +type GlobalCfg struct { // db file permissions - Permissions int + Permissions int `mapstructure:"permissions"` // consume timeout } +func (c *GlobalCfg) InitDefaults() { + if c.Permissions == 0 { + c.Permissions = 0777 + } +} + +type Config struct { + File string `mapstructure:"file"` + Priority int `mapstructure:"priority"` + Prefetch int `mapstructure:"prefetch"` +} + func (c *Config) InitDefaults() { + if c.File == "" { + c.File = "rr.db" + } + + if c.Priority == 0 { + c.Priority = 10 + } + if c.Prefetch == 0 { + c.Prefetch = 1000 + } } diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index a8db2f30..67a6d3e7 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -1,11 +1,14 @@ package boltjobs import ( + "bytes" "context" + "encoding/gob" "os" "sync/atomic" "time" + "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" @@ -20,19 +23,27 @@ import ( const ( PluginName = "boltdb" + + PushBucket = "push" + InQueueBucket = "processing" + DoneBucket = "done" ) type consumer struct { - // bbolt configuration file string permissions int - bucket string - db *bolt.DB + priority int + prefetch int + + db *bolt.DB + + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + listeners uint32 + pipeline atomic.Value - log logger.Logger - eh events.Handler - pq priorityqueue.Queue - pipe atomic.Value + stopCh chan struct{} } func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { @@ -47,26 +58,88 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e return nil, errors.E(op, errors.Str("no global boltdb configuration")) } - conf := &Config{} + conf := &GlobalCfg{} - err := cfg.UnmarshalKey(configKey, conf) + err := cfg.UnmarshalKey(PluginName, conf) if err != nil { return nil, errors.E(op, err) } - // add default values + localCfg := &Config{} + err = cfg.UnmarshalKey(configKey, localCfg) + if err != nil { + return nil, errors.E(op, err) + } + + localCfg.InitDefaults() conf.InitDefaults() - c := &consumer{ - file: conf.File, + + db, err := bolt.Open(localCfg.File, os.FileMode(conf.Permissions), &bolt.Options{ + Timeout: time.Second * 20, + NoGrowSync: false, + NoFreelistSync: false, + ReadOnly: false, + NoSync: false, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, upOp) + } + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + if err != nil { + return errors.E(op, upOp) + } + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket)) + if err != nil { + return errors.E(op, upOp) + } + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + return &consumer{ permissions: conf.Permissions, - bucket: conf.bucket, + file: localCfg.File, + priority: localCfg.Priority, + prefetch: localCfg.Prefetch, + + db: db, + log: log, + eh: e, + pq: pq, + stopCh: make(chan struct{}, 1), + }, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("init_boltdb_jobs") - log: log, - eh: e, - pq: pq, + // if no global section + if !cfg.Has(PluginName) { + return nil, errors.E(op, errors.Str("no global boltdb configuration")) } - db, err := bolt.Open(c.file, os.FileMode(c.permissions), &bolt.Options{ + conf := &GlobalCfg{} + err := cfg.UnmarshalKey(PluginName, conf) + if err != nil { + return nil, errors.E(op, err) + } + + // add default values + conf.InitDefaults() + + db, err := bolt.Open(pipeline.String(file, "rr.db"), os.FileMode(conf.Permissions), &bolt.Options{ Timeout: time.Second * 20, NoGrowSync: false, NoFreelistSync: false, @@ -78,51 +151,135 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e return nil, errors.E(op, err) } - c.db = db - // create bucket if it does not exist // tx.Commit invokes via the db.Update err = db.Update(func(tx *bolt.Tx) error { const upOp = errors.Op("boltdb_plugin_update") - _, err = tx.CreateBucketIfNotExists(utils.AsBytes(c.bucket)) + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, upOp) + } + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + if err != nil { + return errors.E(op, upOp) + } + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket)) if err != nil { return errors.E(op, upOp) } return nil }) - return c, nil -} + if err != nil { + return nil, errors.E(op, err) + } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - return &consumer{}, nil + return &consumer{ + file: pipeline.String(file, "rr.db"), + priority: pipeline.Int(priority, 10), + prefetch: pipeline.Int(prefetch, 100), + permissions: conf.Permissions, + + db: db, + log: log, + eh: e, + pq: pq, + stopCh: make(chan struct{}, 1), + }, nil } func (c *consumer) Push(ctx context.Context, job *job.Job) error { - panic("implement me") + const op = errors.Op("boltdb_jobs_push") + err := c.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(utils.AsBytes(PushBucket)) + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err := enc.Encode(job) + if err != nil { + return err + } + + return b.Put(utils.AsBytes(uuid.NewString()), buf.Bytes()) + }) + + if err != nil { + return errors.E(op, err) + } + + return nil } func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - c.pipe.Store(pipeline) + c.pipeline.Store(pipeline) return nil } -func (c *consumer) Run(_ context.Context, pipeline *pipeline.Pipeline) error { - panic("implement me") +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("boltdb_run") + + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) + } + return nil } func (c *consumer) Stop(ctx context.Context) error { - panic("implement me") + return nil } -func (c *consumer) Pause(ctx context.Context, pipeline string) { - panic("implement me") +func (c *consumer) Pause(ctx context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 0 { + c.log.Warn("no active listeners, nothing to pause") + return + } + + c.stopCh <- struct{}{} + + atomic.AddUint32(&c.listeners, ^uint32(0)) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } -func (c *consumer) Resume(ctx context.Context, pipeline string) { - panic("implement me") +func (c *consumer) Resume(ctx context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested resume on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 1 { + c.log.Warn("amqp listener already in the active state") + return + } + + // run listener + go c.listener() + + // increase number of listeners + atomic.AddUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) } func (c *consumer) State(ctx context.Context) (*jobState.State, error) { - panic("implement me") + return nil, nil } diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 4a8d6cd9..2ee06088 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -1,22 +1,34 @@ package boltjobs -import "time" +import ( + "fmt" + "time" + + "github.com/spiral/roadrunner/v2/utils" +) func (c *consumer) listener() { tt := time.NewTicker(time.Second) for { select { + case <-c.stopCh: + c.log.Warn("boltdb listener stopped") + return case <-tt.C: tx, err := c.db.Begin(false) if err != nil { panic(err) } - // cursor := tx.Cursor() - err = tx.Commit() - if err != nil { - panic(err) - } + b := tx.Bucket(utils.AsBytes(PushBucket)) + + cursor := b.Cursor() + + k, v := cursor.First() + _ = k + _ = v + + fmt.Println("foo") } } } diff --git a/plugins/jobs/job/general.go b/plugins/jobs/job/job.go index 390f44b5..06c3254e 100644 --- a/plugins/jobs/job/general.go +++ b/plugins/jobs/job/job.go @@ -1,5 +1,9 @@ package job +import ( + "time" +) + // constant keys to pack/unpack messages from different drivers const ( RRID string = "rr_id" @@ -27,3 +31,32 @@ type Job struct { // Options contains set of PipelineOptions specific to job execution. Can be empty. Options *Options `json:"options,omitempty"` } + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` +} + +// Merge merges job options. +func (o *Options) Merge(from *Options) { + if o.Pipeline == "" { + o.Pipeline = from.Pipeline + } + + if o.Delay == 0 { + o.Delay = from.Delay + } +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} diff --git a/plugins/jobs/job/job_options.go b/plugins/jobs/job/job_options.go deleted file mode 100644 index b7e4ed36..00000000 --- a/plugins/jobs/job/job_options.go +++ /dev/null @@ -1,32 +0,0 @@ -package job - -import "time" - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` -} - -// Merge merges job options. -func (o *Options) Merge(from *Options) { - if o.Pipeline == "" { - o.Pipeline = from.Pipeline - } - - if o.Delay == 0 { - o.Delay = from.Delay - } -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_test.go index a47151a3..a47151a3 100644 --- a/plugins/jobs/job/job_options_test.go +++ b/plugins/jobs/job/job_test.go diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 5e62c5c5..a0b477f9 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -178,7 +178,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit }) var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"}) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}) if err != nil { errCh <- err return errCh |