diff options
author | Valery Piashchynski <[email protected]> | 2021-06-16 12:56:02 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-16 12:56:02 +0300 |
commit | cee4bc46097506d6e892b6af194751434700621a (patch) | |
tree | e542d1b2f963c2aa0e304703c82ff4f04203b169 /plugins/jobs/oooold/broker/amqp | |
parent | d4c92e48bada7593b6fbec612a742c599de6e736 (diff) |
- Update jobs sources
- Update Arch diagramm
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold/broker/amqp')
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/broker.go | 216 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/broker_test.go | 419 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/config.go | 39 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/config_test.go | 27 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/conn.go | 232 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/consume_test.go | 258 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/durability_test.go | 728 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/job.go | 56 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/job_test.go | 29 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/queue.go | 302 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/stat_test.go | 63 |
11 files changed, 2369 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go new file mode 100644 index 00000000..b47d83ee --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/broker.go @@ -0,0 +1,216 @@ +package amqp + +import ( + "fmt" + "github.com/gofrs/uuid" + "github.com/spiral/jobs/v2" + "sync" + "sync/atomic" +) + +// Broker represents AMQP broker. +type Broker struct { + cfg *Config + lsn func(event int, ctx interface{}) + publish *chanPool + consume *chanPool + mu sync.Mutex + wait chan error + stopped chan interface{} + queues map[*jobs.Pipeline]*queue +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures AMQP job broker (always 2 connections). +func (b *Broker) Init(cfg *Config) (ok bool, err error) { + b.cfg = cfg + b.queues = make(map[*jobs.Pipeline]*queue) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.queues[pipe]; ok { + return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) + } + + q, err := newQueue(pipe, b.throw) + if err != nil { + return err + } + + b.queues[pipe] = q + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() (err error) { + b.mu.Lock() + + if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { + b.mu.Unlock() + return err + } + defer b.publish.Close() + + if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { + b.mu.Unlock() + return err + } + defer b.consume.Close() + + for _, q := range b.queues { + err := q.declare(b.publish, q.name, q.key, nil) + if err != nil { + b.mu.Unlock() + return err + } + } + + for _, q := range b.queues { + qq := q + if qq.execPool != nil { + go qq.serve(b.publish, b.consume) + } + } + + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + for _, q := range b.queues { + q.stop() + } + + close(b.wait) + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + q.stop() + + q.execPool = execPool + q.errHandler = errHandler + + if b.publish != nil && q.execPool != nil { + if q.execPool != nil { + go q.serve(b.publish, b.consume) + } + } + + return nil +} + +// Push job into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + id, err := uuid.NewV4() + if err != nil { + return "", err + } + + q := b.queue(pipe) + if q == nil { + return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil { + return "", err + } + + return id.String(), nil +} + +// Stat must fetch statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + q := b.queue(pipe) + if q == nil { + return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + queue, err := q.inspect(b.publish) + if err != nil { + return nil, err + } + + // this the closest approximation we can get for now + return &jobs.Stat{ + InternalName: queue.Name, + Queue: int64(queue.Messages), + Active: int64(atomic.LoadInt32(&q.running)), + }, nil +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) queue(pipe *jobs.Pipeline) *queue { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return nil + } + + return q +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/oooold/broker/amqp/broker_test.go b/plugins/jobs/oooold/broker/amqp/broker_test.go new file mode 100644 index 00000000..66078099 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/broker_test.go @@ -0,0 +1,419 @@ +package amqp + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "amqp", + "name": "default", + "queue": "rr-queue", + "exchange": "rr-exchange", + "prefetch": 1, + } + + cfg = &Config{ + Addr: "amqp://guest:guest@localhost:5672/", + } +) + +var ( + fanoutPipe = &jobs.Pipeline{ + "broker": "amqp", + "name": "fanout", + "queue": "fanout-queue", + "exchange": "fanout-exchange", + "exchange-type": "fanout", + "prefetch": 1, + } + + fanoutCfg = &Config{ + Addr: "amqp://guest:guest@localhost:5672/", + } +) + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_BadPipeline(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.Error(t, b.Register(&jobs.Pipeline{ + "broker": "amqp", + "name": "default", + "exchange": "rr-exchange", + "prefetch": 1, + })) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_CantStart(t *testing.T) { + b := &Broker{} + _, err := b.Init(&Config{ + Addr: "amqp://guest:guest@localhost:15672/", + }) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Serve()) +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + err = b.Consume(pipe, exec, errf) + if err != nil { + t.Fatal() + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_Queue_RoutingKey(t *testing.T) { + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key") +} + +func TestBroker_Register_With_RoutingKey(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + assert.NoError(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Consume_With_RoutingKey(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + err = b.Register(&pipeWithKey) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(&pipeWithKey, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Queue_ExchangeType(t *testing.T) { + pipeWithKey := pipe.With("exchange-type", "direct") + + assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct") +} + +func TestBroker_Register_With_ExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("exchange-type", "fanout") + + assert.NoError(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Register_With_WrongExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("exchange-type", "xxx") + + assert.Error(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Consume_With_ExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(fanoutCfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := fanoutPipe.With("exchange-type", "fanout") + + err = b.Register(&pipeWithKey) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(&pipeWithKey, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} diff --git a/plugins/jobs/oooold/broker/amqp/config.go b/plugins/jobs/oooold/broker/amqp/config.go new file mode 100644 index 00000000..0ed3a50e --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/config.go @@ -0,0 +1,39 @@ +package amqp + +import ( + "fmt" + "github.com/spiral/roadrunner/service" + "time" +) + +// Config defines sqs broker configuration. +type Config struct { + // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/). + Addr string + + // Timeout to allocate the connection. Default 10 seconds. + Timeout int +} + +// Hydrate config values. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if c.Addr == "" { + return fmt.Errorf("AMQP address is missing") + } + + return nil +} + +// TimeoutDuration returns number of seconds allowed to redial +func (c *Config) TimeoutDuration() time.Duration { + timeout := c.Timeout + if timeout == 0 { + timeout = 10 + } + + return time.Duration(timeout) * time.Second +} diff --git a/plugins/jobs/oooold/broker/amqp/config_test.go b/plugins/jobs/oooold/broker/amqp/config_test.go new file mode 100644 index 00000000..1abbb55d --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/config_test.go @@ -0,0 +1,27 @@ +package amqp + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{`{"dead`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"addr":""}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} diff --git a/plugins/jobs/oooold/broker/amqp/conn.go b/plugins/jobs/oooold/broker/amqp/conn.go new file mode 100644 index 00000000..be747776 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/conn.go @@ -0,0 +1,232 @@ +package amqp + +import ( + "fmt" + "github.com/cenkalti/backoff/v4" + "github.com/streadway/amqp" + "sync" + "time" +) + +// manages set of AMQP channels +type chanPool struct { + // timeout to backoff redial + tout time.Duration + url string + + mu *sync.Mutex + + conn *amqp.Connection + channels map[string]*channel + wait chan interface{} + connected chan interface{} +} + +// manages single channel +type channel struct { + ch *amqp.Channel + // todo unused + //consumer string + confirm chan amqp.Confirmation + signal chan error +} + +// newConn creates new watched AMQP connection +func newConn(url string, tout time.Duration) (*chanPool, error) { + conn, err := dial(url) + if err != nil { + return nil, err + } + + cp := &chanPool{ + url: url, + tout: tout, + conn: conn, + mu: &sync.Mutex{}, + channels: make(map[string]*channel), + wait: make(chan interface{}), + connected: make(chan interface{}), + } + + close(cp.connected) + go cp.watch() + return cp, nil +} + +// dial dials to AMQP. +func dial(url string) (*amqp.Connection, error) { + return amqp.Dial(url) +} + +// Close gracefully closes all underlying channels and connection. +func (cp *chanPool) Close() error { + cp.mu.Lock() + + close(cp.wait) + if cp.channels == nil { + return fmt.Errorf("connection is dead") + } + + // close all channels and consume + var wg sync.WaitGroup + for _, ch := range cp.channels { + wg.Add(1) + + go func(ch *channel) { + defer wg.Done() + cp.closeChan(ch, nil) + }(ch) + } + cp.mu.Unlock() + + wg.Wait() + + cp.mu.Lock() + defer cp.mu.Unlock() + + if cp.conn != nil { + return cp.conn.Close() + } + + return nil +} + +// waitConnected waits till connection is connected again or eventually closed. +// must only be invoked after connection error has been delivered to channel.signal. +func (cp *chanPool) waitConnected() chan interface{} { + cp.mu.Lock() + defer cp.mu.Unlock() + + return cp.connected +} + +// watch manages connection state and reconnects if needed +func (cp *chanPool) watch() { + for { + select { + case <-cp.wait: + // connection has been closed + return + // here we are waiting for the errors from amqp connection + case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)): + cp.mu.Lock() + // clear connected, since connections are dead + cp.connected = make(chan interface{}) + + // broadcast error to all consume to let them for the tryReconnect + for _, ch := range cp.channels { + ch.signal <- err + } + + // disable channel allocation while server is dead + cp.conn = nil + cp.channels = nil + + // initialize the backoff + expb := backoff.NewExponentialBackOff() + expb.MaxInterval = cp.tout + cp.mu.Unlock() + + // reconnect function + reconnect := func() error { + cp.mu.Lock() + conn, err := dial(cp.url) + if err != nil { + // still failing + fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error())) + cp.mu.Unlock() + return err + } + + // TODO ADD LOGGING + fmt.Println("------amqp successfully redialed------") + + // here we are reconnected + // replace the connection + cp.conn = conn + // re-init the channels + cp.channels = make(map[string]*channel) + cp.mu.Unlock() + return nil + } + + // start backoff retry + errb := backoff.Retry(reconnect, expb) + if errb != nil { + fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error())) + // reconnection failed + close(cp.connected) + return + } + close(cp.connected) + } + } +} + +// channel allocates new channel on amqp connection +func (cp *chanPool) channel(name string) (*channel, error) { + cp.mu.Lock() + dead := cp.conn == nil + cp.mu.Unlock() + + if dead { + // wait for connection restoration (doubled the timeout duration) + select { + case <-time.NewTimer(cp.tout * 2).C: + return nil, fmt.Errorf("connection is dead") + case <-cp.connected: + // connected + } + } + + cp.mu.Lock() + defer cp.mu.Unlock() + + if cp.conn == nil { + return nil, fmt.Errorf("connection has been closed") + } + + if ch, ok := cp.channels[name]; ok { + return ch, nil + } + + // we must create new channel + ch, err := cp.conn.Channel() + if err != nil { + return nil, err + } + + // Enable publish confirmations + if err = ch.Confirm(false); err != nil { + return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err) + } + + // we expect that every allocated channel would have listener on signal + // this is not true only in case of pure producing channels + cp.channels[name] = &channel{ + ch: ch, + confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)), + signal: make(chan error, 1), + } + + return cp.channels[name], nil +} + +// closeChan gracefully closes and removes channel allocation. +func (cp *chanPool) closeChan(c *channel, err error) error { + cp.mu.Lock() + defer cp.mu.Unlock() + + go func() { + c.signal <- nil + c.ch.Close() + }() + + for name, ch := range cp.channels { + if ch == c { + delete(cp.channels, name) + } + } + + return err +} diff --git a/plugins/jobs/oooold/broker/amqp/consume_test.go b/plugins/jobs/oooold/broker/amqp/consume_test.go new file mode 100644 index 00000000..28999c36 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/consume_test.go @@ -0,0 +1,258 @@ +package amqp + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + start := time.Now() + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed >= time.Second) + assert.True(t, elapsed < 3*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/oooold/broker/amqp/durability_test.go b/plugins/jobs/oooold/broker/amqp/durability_test.go new file mode 100644 index 00000000..00d62c51 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/durability_test.go @@ -0,0 +1,728 @@ +package amqp + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "io" + "net" + "sync" + "testing" + "time" +) + +var ( + proxyCfg = &Config{ + Addr: "amqp://guest:guest@localhost:5673/", + Timeout: 1, + } + + proxy = &tcpProxy{ + listen: "localhost:5673", + upstream: "localhost:5672", + accept: true, + } +) + +type tcpProxy struct { + listen string + upstream string + mu sync.Mutex + accept bool + conn []net.Conn +} + +func (p *tcpProxy) serve() { + l, err := net.Listen("tcp", p.listen) + if err != nil { + panic(err) + } + + for { + in, err := l.Accept() + if err != nil { + panic(err) + } + + if !p.accepting() { + in.Close() + } + + up, err := net.Dial("tcp", p.upstream) + if err != nil { + panic(err) + } + + go io.Copy(in, up) + go io.Copy(up, in) + + p.mu.Lock() + p.conn = append(p.conn, in, up) + p.mu.Unlock() + } +} + +// wait for specific number of connections +func (p *tcpProxy) waitConn(count int) *tcpProxy { + p.mu.Lock() + p.accept = true + p.mu.Unlock() + + for { + p.mu.Lock() + current := len(p.conn) + p.mu.Unlock() + + if current >= count*2 { + break + } + + time.Sleep(time.Millisecond) + } + + return p +} + +func (p *tcpProxy) reset(accept bool) int { + p.mu.Lock() + p.accept = accept + defer p.mu.Unlock() + + count := 0 + for _, conn := range p.conn { + conn.Close() + count++ + } + + p.conn = nil + return count / 2 +} + +func (p *tcpProxy) accepting() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.accept +} + +func init() { + go proxy.serve() +} + +func TestBroker_Durability_Base(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + // expect 2 connections + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Durability_Consume(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + time.Sleep(3 * time.Second) + proxy.waitConn(1) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NotEqual(t, "0", jid) + + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + if perr != nil { + panic(perr) + } + + proxy.reset(true) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2_2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // start when connection is dead + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + if perr != nil { + panic(perr) + } + + proxy.reset(false) + + _, serr := b.Stat(pipe) + assert.Error(t, serr) + + proxy.reset(true) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume3(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + if perr != nil { + panic(perr) + } + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume4(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2) + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "kill", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + + if j.Payload == "kill" && len(done) == 0 { + proxy.reset(true) + } + + mu.Lock() + defer mu.Unlock() + done[id] = true + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 3 { + break + } + } +} + +func TestBroker_Durability_StopDead(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + proxy.waitConn(2).reset(false) + + b.Stop() +} diff --git a/plugins/jobs/oooold/broker/amqp/job.go b/plugins/jobs/oooold/broker/amqp/job.go new file mode 100644 index 00000000..bd559715 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/job.go @@ -0,0 +1,56 @@ +package amqp + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/streadway/amqp" +) + +// pack job metadata into headers +func pack(id string, attempt int, j *jobs.Job) amqp.Table { + return amqp.Table{ + "rr-id": id, + "rr-job": j.Job, + "rr-attempt": int64(attempt), + "rr-maxAttempts": int64(j.Options.Attempts), + "rr-timeout": int64(j.Options.Timeout), + "rr-delay": int64(j.Options.Delay), + "rr-retryDelay": int64(j.Options.RetryDelay), + } +} + +// unpack restores jobs.Options +func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) { + j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}} + + if _, ok := d.Headers["rr-id"].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id") + } + + if _, ok := d.Headers["rr-attempt"].(int64); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt") + } + + if _, ok := d.Headers["rr-job"].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job") + } + j.Job = d.Headers["rr-job"].(string) + + if _, ok := d.Headers["rr-maxAttempts"].(int64); ok { + j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64)) + } + + if _, ok := d.Headers["rr-timeout"].(int64); ok { + j.Options.Timeout = int(d.Headers["rr-timeout"].(int64)) + } + + if _, ok := d.Headers["rr-delay"].(int64); ok { + j.Options.Delay = int(d.Headers["rr-delay"].(int64)) + } + + if _, ok := d.Headers["rr-retryDelay"].(int64); ok { + j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64)) + } + + return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil +} diff --git a/plugins/jobs/oooold/broker/amqp/job_test.go b/plugins/jobs/oooold/broker/amqp/job_test.go new file mode 100644 index 00000000..24ca453b --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/job_test.go @@ -0,0 +1,29 @@ +package amqp + +import ( + "github.com/streadway/amqp" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_Unpack_Errors(t *testing.T) { + _, _, _, err := unpack(amqp.Delivery{ + Headers: map[string]interface{}{}, + }) + assert.Error(t, err) + + _, _, _, err = unpack(amqp.Delivery{ + Headers: map[string]interface{}{ + "rr-id": "id", + }, + }) + assert.Error(t, err) + + _, _, _, err = unpack(amqp.Delivery{ + Headers: map[string]interface{}{ + "rr-id": "id", + "rr-attempt": int64(0), + }, + }) + assert.Error(t, err) +} diff --git a/plugins/jobs/oooold/broker/amqp/queue.go b/plugins/jobs/oooold/broker/amqp/queue.go new file mode 100644 index 00000000..6ef5f20f --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/queue.go @@ -0,0 +1,302 @@ +package amqp + +import ( + "errors" + "fmt" + "github.com/spiral/jobs/v2" + "github.com/streadway/amqp" + "os" + "sync" + "sync/atomic" + "time" +) + +type ExchangeType string + +const ( + Direct ExchangeType = "direct" + Fanout ExchangeType = "fanout" + Topic ExchangeType = "topic" + Headers ExchangeType = "headers" +) + +func (et ExchangeType) IsValid() error { + switch et { + case Direct, Fanout, Topic, Headers: + return nil + } + return errors.New("unknown exchange-type") +} + +func (et ExchangeType) String() string { + switch et { + case Direct, Fanout, Topic, Headers: + return string(et) + default: + return "direct" + } +} + + +type queue struct { + active int32 + pipe *jobs.Pipeline + exchange string + exchangeType ExchangeType + name, key string + consumer string + + // active consuming channel + muc sync.Mutex + cc *channel + + // queue events + lsn func(event int, ctx interface{}) + + // active operations + muw sync.RWMutex + wg sync.WaitGroup + + // exec handlers + running int32 + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +// newQueue creates new queue wrapper for AMQP. +func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { + if pipe.String("queue", "") == "" { + return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline") + } + + exchangeType := ExchangeType(pipe.String("exchange-type", "direct")) + + err := exchangeType.IsValid() + if err != nil { + return nil, fmt.Errorf(err.Error()) + } + + return &queue{ + exchange: pipe.String("exchange", "amqp.direct"), + exchangeType: exchangeType, + name: pipe.String("queue", ""), + key: pipe.String("routing-key", pipe.String("queue", "")), + consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())), + pipe: pipe, + lsn: lsn, + }, nil +} + +// serve consumes queue +func (q *queue) serve(publish, consume *chanPool) { + atomic.StoreInt32(&q.active, 1) + + for { + <-consume.waitConnected() + if atomic.LoadInt32(&q.active) == 0 { + // stopped + return + } + + delivery, cc, err := q.consume(consume) + if err != nil { + q.report(err) + continue + } + + q.muc.Lock() + q.cc = cc + q.muc.Unlock() + + for d := range delivery { + q.muw.Lock() + q.wg.Add(1) + q.muw.Unlock() + + atomic.AddInt32(&q.running, 1) + h := <-q.execPool + + go func(h jobs.Handler, d amqp.Delivery) { + err := q.do(publish, h, d) + + atomic.AddInt32(&q.running, ^int32(0)) + q.execPool <- h + q.wg.Done() + q.report(err) + }(h, d) + } + } +} + +func (q *queue) consume(consume *chanPool) (jobs <-chan amqp.Delivery, cc *channel, err error) { + // allocate channel for the consuming + if cc, err = consume.channel(q.name); err != nil { + return nil, nil, err + } + + if err := cc.ch.Qos(q.pipe.Integer("prefetch", 4), 0, false); err != nil { + return nil, nil, consume.closeChan(cc, err) + } + + delivery, err := cc.ch.Consume(q.name, q.consumer, false, false, false, false, nil) + if err != nil { + return nil, nil, consume.closeChan(cc, err) + } + + // do i like it? + go func(consume *chanPool) { + for err := range cc.signal { + consume.closeChan(cc, err) + return + } + }(consume) + + return delivery, cc, err +} + +func (q *queue) do(cp *chanPool, h jobs.Handler, d amqp.Delivery) error { + id, attempt, j, err := unpack(d) + if err != nil { + q.report(err) + return d.Nack(false, false) + } + err = h(id, j) + + if err == nil { + return d.Ack(false) + } + + // failed + q.errHandler(id, j, err) + + if !j.Options.CanRetry(attempt) { + return d.Nack(false, false) + } + + // retry as new j (to accommodate attempt number and new delay) + if err = q.publish(cp, id, attempt+1, j, j.Options.RetryDuration()); err != nil { + q.report(err) + return d.Nack(false, true) + } + + return d.Ack(false) +} + +func (q *queue) stop() { + if atomic.LoadInt32(&q.active) == 0 { + return + } + + atomic.StoreInt32(&q.active, 0) + + q.muc.Lock() + if q.cc != nil { + // gracefully stopped consuming + q.report(q.cc.ch.Cancel(q.consumer, true)) + } + q.muc.Unlock() + + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() +} + +// publish message to queue or to delayed queue. +func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay time.Duration) error { + c, err := cp.channel(q.name) + if err != nil { + return err + } + + qKey := q.key + + if delay != 0 { + delayMs := int64(delay.Seconds() * 1000) + qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name) + qKey = qName + + err := q.declare(cp, qName, qName, amqp.Table{ + "x-dead-letter-exchange": q.exchange, + "x-dead-letter-routing-key": q.name, + "x-message-ttl": delayMs, + "x-expires": delayMs * 2, + }) + + if err != nil { + return err + } + } + + err = c.ch.Publish( + q.exchange, // exchange + qKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/octet-stream", + Body: j.Body(), + DeliveryMode: amqp.Persistent, + Headers: pack(id, attempt, j), + }, + ) + + if err != nil { + return cp.closeChan(c, err) + } + + confirmed, ok := <-c.confirm + if ok && confirmed.Ack { + return nil + } + + return fmt.Errorf("failed to publish: %v", confirmed.DeliveryTag) +} + +// declare queue and binding to it +func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table) error { + c, err := cp.channel(q.name) + if err != nil { + return err + } + + err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil) + if err != nil { + return cp.closeChan(c, err) + } + + _, err = c.ch.QueueDeclare(queue, true, false, false, false, args) + if err != nil { + return cp.closeChan(c, err) + } + + err = c.ch.QueueBind(queue, key, q.exchange, false, nil) + if err != nil { + return cp.closeChan(c, err) + } + + // keep channel open + return err +} + +// inspect the queue +func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) { + c, err := cp.channel("stat") + if err != nil { + return nil, err + } + + queue, err := c.ch.QueueInspect(q.name) + if err != nil { + return nil, cp.closeChan(c, err) + } + + // keep channel open + return &queue, err +} + +// throw handles service, server and pool events. +func (q *queue) report(err error) { + if err != nil { + q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) + } +} diff --git a/plugins/jobs/oooold/broker/amqp/stat_test.go b/plugins/jobs/oooold/broker/amqp/stat_test.go new file mode 100644 index 00000000..ef19746c --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/stat_test.go @@ -0,0 +1,63 @@ +package amqp + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "sync" + "testing" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Queue) + assert.Equal(t, int64(0), stat.Active) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + wg := &sync.WaitGroup{} + wg.Add(1) + exec <- func(id string, j *jobs.Job) error { + defer wg.Done() + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Active) + + return nil + } + + wg.Wait() + stat, err = b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) + assert.Equal(t, int64(0), stat.Active) +} |