summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/amqp/broker.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/broker/amqp/broker.go')
-rw-r--r--plugins/jobs/oooold/broker/amqp/broker.go216
1 files changed, 0 insertions, 216 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go
deleted file mode 100644
index b47d83ee..00000000
--- a/plugins/jobs/oooold/broker/amqp/broker.go
+++ /dev/null
@@ -1,216 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/gofrs/uuid"
- "github.com/spiral/jobs/v2"
- "sync"
- "sync/atomic"
-)
-
-// Broker represents AMQP broker.
-type Broker struct {
- cfg *Config
- lsn func(event int, ctx interface{})
- publish *chanPool
- consume *chanPool
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*jobs.Pipeline]*queue
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures AMQP job broker (always 2 connections).
-func (b *Broker) Init(cfg *Config) (ok bool, err error) {
- b.cfg = cfg
- b.queues = make(map[*jobs.Pipeline]*queue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
- }
-
- q, err := newQueue(pipe, b.throw)
- if err != nil {
- return err
- }
-
- b.queues[pipe] = q
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() (err error) {
- b.mu.Lock()
-
- if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
- b.mu.Unlock()
- return err
- }
- defer b.publish.Close()
-
- if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
- b.mu.Unlock()
- return err
- }
- defer b.consume.Close()
-
- for _, q := range b.queues {
- err := q.declare(b.publish, q.name, q.key, nil)
- if err != nil {
- b.mu.Unlock()
- return err
- }
- }
-
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve(b.publish, b.consume)
- }
- }
-
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- for _, q := range b.queues {
- q.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.publish != nil && q.execPool != nil {
- if q.execPool != nil {
- go q.serve(b.publish, b.consume)
- }
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- id, err := uuid.NewV4()
- if err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil {
- return "", err
- }
-
- return id.String(), nil
-}
-
-// Stat must fetch statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- queue, err := q.inspect(b.publish)
- if err != nil {
- return nil, err
- }
-
- // this the closest approximation we can get for now
- return &jobs.Stat{
- InternalName: queue.Name,
- Queue: int64(queue.Messages),
- Active: int64(atomic.LoadInt32(&q.running)),
- }, nil
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}