diff options
Diffstat (limited to 'plugins/jobs/oooold/broker/amqp/broker.go')
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/broker.go | 216 |
1 files changed, 0 insertions, 216 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go deleted file mode 100644 index b47d83ee..00000000 --- a/plugins/jobs/oooold/broker/amqp/broker.go +++ /dev/null @@ -1,216 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/gofrs/uuid" - "github.com/spiral/jobs/v2" - "sync" - "sync/atomic" -) - -// Broker represents AMQP broker. -type Broker struct { - cfg *Config - lsn func(event int, ctx interface{}) - publish *chanPool - consume *chanPool - mu sync.Mutex - wait chan error - stopped chan interface{} - queues map[*jobs.Pipeline]*queue -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures AMQP job broker (always 2 connections). -func (b *Broker) Init(cfg *Config) (ok bool, err error) { - b.cfg = cfg - b.queues = make(map[*jobs.Pipeline]*queue) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.queues[pipe]; ok { - return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) - } - - q, err := newQueue(pipe, b.throw) - if err != nil { - return err - } - - b.queues[pipe] = q - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() (err error) { - b.mu.Lock() - - if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { - b.mu.Unlock() - return err - } - defer b.publish.Close() - - if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { - b.mu.Unlock() - return err - } - defer b.consume.Close() - - for _, q := range b.queues { - err := q.declare(b.publish, q.name, q.key, nil) - if err != nil { - b.mu.Unlock() - return err - } - } - - for _, q := range b.queues { - qq := q - if qq.execPool != nil { - go qq.serve(b.publish, b.consume) - } - } - - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - for _, q := range b.queues { - q.stop() - } - - close(b.wait) - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - q.stop() - - q.execPool = execPool - q.errHandler = errHandler - - if b.publish != nil && q.execPool != nil { - if q.execPool != nil { - go q.serve(b.publish, b.consume) - } - } - - return nil -} - -// Push job into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - id, err := uuid.NewV4() - if err != nil { - return "", err - } - - q := b.queue(pipe) - if q == nil { - return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil { - return "", err - } - - return id.String(), nil -} - -// Stat must fetch statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - q := b.queue(pipe) - if q == nil { - return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - queue, err := q.inspect(b.publish) - if err != nil { - return nil, err - } - - // this the closest approximation we can get for now - return &jobs.Stat{ - InternalName: queue.Name, - Queue: int64(queue.Messages), - Active: int64(atomic.LoadInt32(&q.running)), - }, nil -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) queue(pipe *jobs.Pipeline) *queue { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return nil - } - - return q -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} |