diff options
Diffstat (limited to 'plugins/jobs/broker/amqp/broker.go')
-rw-r--r-- | plugins/jobs/broker/amqp/broker.go | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/plugins/jobs/broker/amqp/broker.go b/plugins/jobs/broker/amqp/broker.go new file mode 100644 index 00000000..b47d83ee --- /dev/null +++ b/plugins/jobs/broker/amqp/broker.go @@ -0,0 +1,216 @@ +package amqp + +import ( + "fmt" + "github.com/gofrs/uuid" + "github.com/spiral/jobs/v2" + "sync" + "sync/atomic" +) + +// Broker represents AMQP broker. +type Broker struct { + cfg *Config + lsn func(event int, ctx interface{}) + publish *chanPool + consume *chanPool + mu sync.Mutex + wait chan error + stopped chan interface{} + queues map[*jobs.Pipeline]*queue +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures AMQP job broker (always 2 connections). +func (b *Broker) Init(cfg *Config) (ok bool, err error) { + b.cfg = cfg + b.queues = make(map[*jobs.Pipeline]*queue) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.queues[pipe]; ok { + return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) + } + + q, err := newQueue(pipe, b.throw) + if err != nil { + return err + } + + b.queues[pipe] = q + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() (err error) { + b.mu.Lock() + + if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { + b.mu.Unlock() + return err + } + defer b.publish.Close() + + if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { + b.mu.Unlock() + return err + } + defer b.consume.Close() + + for _, q := range b.queues { + err := q.declare(b.publish, q.name, q.key, nil) + if err != nil { + b.mu.Unlock() + return err + } + } + + for _, q := range b.queues { + qq := q + if qq.execPool != nil { + go qq.serve(b.publish, b.consume) + } + } + + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + for _, q := range b.queues { + q.stop() + } + + close(b.wait) + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + q.stop() + + q.execPool = execPool + q.errHandler = errHandler + + if b.publish != nil && q.execPool != nil { + if q.execPool != nil { + go q.serve(b.publish, b.consume) + } + } + + return nil +} + +// Push job into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + id, err := uuid.NewV4() + if err != nil { + return "", err + } + + q := b.queue(pipe) + if q == nil { + return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil { + return "", err + } + + return id.String(), nil +} + +// Stat must fetch statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + q := b.queue(pipe) + if q == nil { + return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + queue, err := q.inspect(b.publish) + if err != nil { + return nil, err + } + + // this the closest approximation we can get for now + return &jobs.Stat{ + InternalName: queue.Name, + Queue: int64(queue.Messages), + Active: int64(atomic.LoadInt32(&q.running)), + }, nil +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) queue(pipe *jobs.Pipeline) *queue { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return nil + } + + return q +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} |