summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/amqp/broker.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/broker/amqp/broker.go')
-rw-r--r--plugins/jobs/oooold/broker/amqp/broker.go216
1 files changed, 216 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go
new file mode 100644
index 00000000..b47d83ee
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/broker.go
@@ -0,0 +1,216 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/gofrs/uuid"
+ "github.com/spiral/jobs/v2"
+ "sync"
+ "sync/atomic"
+)
+
+// Broker represents AMQP broker.
+type Broker struct {
+ cfg *Config
+ lsn func(event int, ctx interface{})
+ publish *chanPool
+ consume *chanPool
+ mu sync.Mutex
+ wait chan error
+ stopped chan interface{}
+ queues map[*jobs.Pipeline]*queue
+}
+
+// Listen attaches server event watcher.
+func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
+ b.lsn = lsn
+}
+
+// Init configures AMQP job broker (always 2 connections).
+func (b *Broker) Init(cfg *Config) (ok bool, err error) {
+ b.cfg = cfg
+ b.queues = make(map[*jobs.Pipeline]*queue)
+
+ return true, nil
+}
+
+// Register broker pipeline.
+func (b *Broker) Register(pipe *jobs.Pipeline) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if _, ok := b.queues[pipe]; ok {
+ return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
+ }
+
+ q, err := newQueue(pipe, b.throw)
+ if err != nil {
+ return err
+ }
+
+ b.queues[pipe] = q
+
+ return nil
+}
+
+// Serve broker pipelines.
+func (b *Broker) Serve() (err error) {
+ b.mu.Lock()
+
+ if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
+ b.mu.Unlock()
+ return err
+ }
+ defer b.publish.Close()
+
+ if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
+ b.mu.Unlock()
+ return err
+ }
+ defer b.consume.Close()
+
+ for _, q := range b.queues {
+ err := q.declare(b.publish, q.name, q.key, nil)
+ if err != nil {
+ b.mu.Unlock()
+ return err
+ }
+ }
+
+ for _, q := range b.queues {
+ qq := q
+ if qq.execPool != nil {
+ go qq.serve(b.publish, b.consume)
+ }
+ }
+
+ b.wait = make(chan error)
+ b.stopped = make(chan interface{})
+ defer close(b.stopped)
+
+ b.mu.Unlock()
+
+ b.throw(jobs.EventBrokerReady, b)
+
+ return <-b.wait
+}
+
+// Stop all pipelines.
+func (b *Broker) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return
+ }
+
+ for _, q := range b.queues {
+ q.stop()
+ }
+
+ close(b.wait)
+ <-b.stopped
+}
+
+// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
+// the service is started!
+func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ q.stop()
+
+ q.execPool = execPool
+ q.errHandler = errHandler
+
+ if b.publish != nil && q.execPool != nil {
+ if q.execPool != nil {
+ go q.serve(b.publish, b.consume)
+ }
+ }
+
+ return nil
+}
+
+// Push job into the worker.
+func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
+ if err := b.isServing(); err != nil {
+ return "", err
+ }
+
+ id, err := uuid.NewV4()
+ if err != nil {
+ return "", err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil {
+ return "", err
+ }
+
+ return id.String(), nil
+}
+
+// Stat must fetch statistics about given pipeline or return error.
+func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
+ if err := b.isServing(); err != nil {
+ return nil, err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ queue, err := q.inspect(b.publish)
+ if err != nil {
+ return nil, err
+ }
+
+ // this the closest approximation we can get for now
+ return &jobs.Stat{
+ InternalName: queue.Name,
+ Queue: int64(queue.Messages),
+ Active: int64(atomic.LoadInt32(&q.running)),
+ }, nil
+}
+
+// check if broker is serving
+func (b *Broker) isServing() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return fmt.Errorf("broker is not running")
+ }
+
+ return nil
+}
+
+// queue returns queue associated with the pipeline.
+func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return nil
+ }
+
+ return q
+}
+
+// throw handles service, server and pool events.
+func (b *Broker) throw(event int, ctx interface{}) {
+ if b.lsn != nil {
+ b.lsn(event, ctx)
+ }
+}