From efb3efa98c8555815330274f0618bfc080f4c65c Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 26 Aug 2021 18:32:51 +0300 Subject: Move drivers to the plugin's root. Fix #771, add tests. Signed-off-by: Valery Piashchynski --- pkg/pool/static_pool.go | 2 +- pkg/pool/supervisor_test.go | 50 +++ pkg/worker_watcher/container/channel/vec.go | 5 +- pkg/worker_watcher/worker_watcher.go | 78 +++- plugins/amqp/amqpjobs/config.go | 67 +++ plugins/amqp/amqpjobs/consumer.go | 512 ++++++++++++++++++++++ plugins/amqp/amqpjobs/item.go | 239 ++++++++++ plugins/amqp/amqpjobs/listener.go | 25 ++ plugins/amqp/amqpjobs/rabbit_init.go | 57 +++ plugins/amqp/amqpjobs/redial.go | 141 ++++++ plugins/amqp/plugin.go | 41 ++ plugins/beanstalk/config.go | 53 +++ plugins/beanstalk/connection.go | 223 ++++++++++ plugins/beanstalk/consumer.go | 360 +++++++++++++++ plugins/beanstalk/encode_test.go | 75 ++++ plugins/beanstalk/item.go | 147 +++++++ plugins/beanstalk/listen.go | 39 ++ plugins/beanstalk/plugin.go | 47 ++ plugins/boltdb/boltjobs/listener.go | 2 +- plugins/broadcast/plugin.go | 63 +-- plugins/ephemeral/consumer.go | 274 ++++++++++++ plugins/ephemeral/item.go | 133 ++++++ plugins/ephemeral/plugin.go | 41 ++ plugins/jobs/drivers/amqp/amqpjobs/config.go | 67 --- plugins/jobs/drivers/amqp/amqpjobs/consumer.go | 512 ---------------------- plugins/jobs/drivers/amqp/amqpjobs/item.go | 239 ---------- plugins/jobs/drivers/amqp/amqpjobs/listener.go | 25 -- plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go | 57 --- plugins/jobs/drivers/amqp/amqpjobs/redial.go | 141 ------ plugins/jobs/drivers/amqp/plugin.go | 41 -- plugins/jobs/drivers/beanstalk/config.go | 53 --- plugins/jobs/drivers/beanstalk/connection.go | 223 ---------- plugins/jobs/drivers/beanstalk/consumer.go | 360 --------------- plugins/jobs/drivers/beanstalk/encode_test.go | 75 ---- plugins/jobs/drivers/beanstalk/item.go | 147 ------- plugins/jobs/drivers/beanstalk/listen.go | 39 -- plugins/jobs/drivers/beanstalk/plugin.go | 47 -- plugins/jobs/drivers/ephemeral/consumer.go | 274 ------------ plugins/jobs/drivers/ephemeral/item.go | 133 ------ plugins/jobs/drivers/ephemeral/plugin.go | 41 -- plugins/jobs/drivers/sqs/config.go | 114 ----- plugins/jobs/drivers/sqs/consumer.go | 411 ----------------- plugins/jobs/drivers/sqs/item.go | 247 ----------- plugins/jobs/drivers/sqs/listener.go | 87 ---- plugins/jobs/drivers/sqs/plugin.go | 39 -- plugins/kv/drivers/memcached/config.go | 12 - plugins/kv/drivers/memcached/driver.go | 248 ----------- plugins/kv/drivers/memcached/plugin.go | 48 -- plugins/kv/plugin.go | 116 +---- plugins/memcached/config.go | 12 + plugins/memcached/driver.go | 248 +++++++++++ plugins/memcached/plugin.go | 48 ++ plugins/sqs/config.go | 114 +++++ plugins/sqs/consumer.go | 411 +++++++++++++++++ plugins/sqs/item.go | 247 +++++++++++ plugins/sqs/listener.go | 87 ++++ plugins/sqs/plugin.go | 39 ++ tests/allocate-failed.php | 18 + tests/plugins/jobs/jobs_amqp_test.go | 2 +- tests/plugins/jobs/jobs_beanstalk_test.go | 2 +- tests/plugins/jobs/jobs_ephemeral_test.go | 2 +- tests/plugins/jobs/jobs_general_test.go | 4 +- tests/plugins/jobs/jobs_sqs_test.go | 2 +- tests/plugins/jobs/jobs_with_toxics_test.go | 6 +- tests/plugins/kv/storage_plugin_test.go | 2 +- 65 files changed, 3857 insertions(+), 3857 deletions(-) create mode 100644 plugins/amqp/amqpjobs/config.go create mode 100644 plugins/amqp/amqpjobs/consumer.go create mode 100644 plugins/amqp/amqpjobs/item.go create mode 100644 plugins/amqp/amqpjobs/listener.go create mode 100644 plugins/amqp/amqpjobs/rabbit_init.go create mode 100644 plugins/amqp/amqpjobs/redial.go create mode 100644 plugins/amqp/plugin.go create mode 100644 plugins/beanstalk/config.go create mode 100644 plugins/beanstalk/connection.go create mode 100644 plugins/beanstalk/consumer.go create mode 100644 plugins/beanstalk/encode_test.go create mode 100644 plugins/beanstalk/item.go create mode 100644 plugins/beanstalk/listen.go create mode 100644 plugins/beanstalk/plugin.go create mode 100644 plugins/ephemeral/consumer.go create mode 100644 plugins/ephemeral/item.go create mode 100644 plugins/ephemeral/plugin.go delete mode 100644 plugins/jobs/drivers/amqp/amqpjobs/config.go delete mode 100644 plugins/jobs/drivers/amqp/amqpjobs/consumer.go delete mode 100644 plugins/jobs/drivers/amqp/amqpjobs/item.go delete mode 100644 plugins/jobs/drivers/amqp/amqpjobs/listener.go delete mode 100644 plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go delete mode 100644 plugins/jobs/drivers/amqp/amqpjobs/redial.go delete mode 100644 plugins/jobs/drivers/amqp/plugin.go delete mode 100644 plugins/jobs/drivers/beanstalk/config.go delete mode 100644 plugins/jobs/drivers/beanstalk/connection.go delete mode 100644 plugins/jobs/drivers/beanstalk/consumer.go delete mode 100644 plugins/jobs/drivers/beanstalk/encode_test.go delete mode 100644 plugins/jobs/drivers/beanstalk/item.go delete mode 100644 plugins/jobs/drivers/beanstalk/listen.go delete mode 100644 plugins/jobs/drivers/beanstalk/plugin.go delete mode 100644 plugins/jobs/drivers/ephemeral/consumer.go delete mode 100644 plugins/jobs/drivers/ephemeral/item.go delete mode 100644 plugins/jobs/drivers/ephemeral/plugin.go delete mode 100644 plugins/jobs/drivers/sqs/config.go delete mode 100644 plugins/jobs/drivers/sqs/consumer.go delete mode 100644 plugins/jobs/drivers/sqs/item.go delete mode 100644 plugins/jobs/drivers/sqs/listener.go delete mode 100644 plugins/jobs/drivers/sqs/plugin.go delete mode 100644 plugins/kv/drivers/memcached/config.go delete mode 100644 plugins/kv/drivers/memcached/driver.go delete mode 100644 plugins/kv/drivers/memcached/plugin.go create mode 100644 plugins/memcached/config.go create mode 100644 plugins/memcached/driver.go create mode 100644 plugins/memcached/plugin.go create mode 100644 plugins/sqs/config.go create mode 100644 plugins/sqs/consumer.go create mode 100644 plugins/sqs/item.go create mode 100644 plugins/sqs/listener.go create mode 100644 plugins/sqs/plugin.go create mode 100644 tests/allocate-failed.php diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 3eb0714f..720ca9da 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -78,7 +78,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout) // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index d1b24574..14df513e 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -2,6 +2,7 @@ package pool import ( "context" + "os" "os/exec" "testing" "time" @@ -361,3 +362,52 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { <-block p.Destroy(context.Background()) } + +func TestSupervisedPool_AllocateFailedOK(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(2), + AllocateTimeout: time.Second * 15, + DestroyTimeout: time.Second * 5, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 5 * time.Second, + }, + } + + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + require.NotNil(t, p) + + time.Sleep(time.Second) + + // should be ok + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + require.NoError(t, err) + + // after creating this file, PHP will fail + file, err := os.Create("break") + require.NoError(t, err) + + time.Sleep(time.Second * 5) + assert.NoError(t, file.Close()) + assert.NoError(t, os.Remove("break")) + + defer func() { + if r := recover(); r != nil { + assert.Fail(t, "panic should not be fired!") + } else { + p.Destroy(context.Background()) + } + }() +} diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go index 7fb65a92..5605f1e0 100644 --- a/pkg/worker_watcher/container/channel/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -15,14 +15,11 @@ type Vec struct { destroy uint64 // channel with the workers workers chan worker.BaseProcess - - len uint64 } func NewVector(len uint64) *Vec { vec := &Vec{ destroy: 0, - len: len, workers: make(chan worker.BaseProcess, len), } @@ -48,7 +45,7 @@ func (v *Vec) Push(w worker.BaseProcess) { 1. TTL is set with no requests during the TTL 2. Violated Get <-> Release operation (how ??) */ - for i := uint64(0); i < v.len; i++ { + for i := 0; i < len(v.workers); i++ { /* We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. */ diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 348be199..bdd91423 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -3,12 +3,14 @@ package worker_watcher //nolint:stylecheck import ( "context" "sync" + "sync/atomic" "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel" + "github.com/spiral/roadrunner/v2/utils" ) // Vector interface represents vector container @@ -30,21 +32,24 @@ type workerWatcher struct { sync.RWMutex container Vector // used to control Destroy stage (that all workers are in the container) - numWorkers uint64 + numWorkers *uint64 workers []worker.BaseProcess - allocator worker.Allocator - events events.Handler + allocator worker.Allocator + allocateTimeout time.Duration + events events.Handler } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { ww := &workerWatcher{ - container: channel.NewVector(numWorkers), - numWorkers: numWorkers, + container: channel.NewVector(numWorkers), - workers: make([]worker.BaseProcess, 0, numWorkers), + // pass a ptr to the number of workers to avoid blocking in the TTL loop + numWorkers: utils.Uint64(numWorkers), + allocateTimeout: allocateTimeout, + workers: make([]worker.BaseProcess, 0, numWorkers), allocator: allocator, events: events, @@ -128,21 +133,57 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { } func (ww *workerWatcher) Allocate() error { - ww.Lock() const op = errors.Op("worker_watcher_allocate_new") + sw, err := ww.allocator() if err != nil { - return errors.E(op, errors.WorkerAllocate, err) + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), + }) + + // if no timeout, return error immediately + if ww.allocateTimeout == 0 { + return errors.E(op, errors.WorkerAllocate, err) + } + + tt := time.After(ww.allocateTimeout) + for { + select { + case <-tt: + // reduce number of workers + atomic.AddUint64(ww.numWorkers, ^uint64(0)) + // timeout exceed, worker can't be allocated + return errors.E(op, errors.WorkerAllocate, err) + default: + sw, err = ww.allocator() + if err != nil { + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), + }) + continue + } + + // reallocated + goto done + } + } } +done: // add worker to Wait ww.addToWatch(sw) + ww.Lock() // add new worker to the workers slice (to get information about workers in parallel) ww.workers = append(ww.workers, sw) - - // unlock Allocate mutex ww.Unlock() + // push the worker to the container ww.Release(sw) return nil @@ -160,7 +201,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { for i := 0; i < len(ww.workers); i++ { if ww.workers[i].Pid() == pid { ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) - // kill worker + // kill worker, just to be sure it's dead _ = wb.Kill() return } @@ -177,7 +218,7 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) { } } -// Destroy all underlying container (but let them to complete the task) +// Destroy all underlying container (but let them complete the task) func (ww *workerWatcher) Destroy(_ context.Context) { // destroy container, we don't use ww mutex here, since we should be able to push worker ww.Lock() @@ -192,7 +233,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) { case <-tt.C: ww.Lock() // that might be one of the workers is working - if ww.numWorkers != uint64(len(ww.workers)) { + if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { ww.Unlock() continue } @@ -216,6 +257,10 @@ func (ww *workerWatcher) List() []worker.BaseProcess { ww.RLock() defer ww.RUnlock() + if len(ww.workers) == 0 { + return nil + } + base := make([]worker.BaseProcess, 0, len(ww.workers)) for i := 0; i < len(ww.workers); i++ { base = append(base, ww.workers[i]) @@ -253,6 +298,11 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { Event: events.EventPoolError, Payload: errors.E(op, err), }) + + // no workers at all, panic + if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { + panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err))) + } } } diff --git a/plugins/amqp/amqpjobs/config.go b/plugins/amqp/amqpjobs/config.go new file mode 100644 index 00000000..ac2f6e53 --- /dev/null +++ b/plugins/amqp/amqpjobs/config.go @@ -0,0 +1,67 @@ +package amqpjobs + +// pipeline rabbitmq info +const ( + exchangeKey string = "exchange" + exchangeType string = "exchange_type" + queue string = "queue" + routingKey string = "routing_key" + prefetch string = "prefetch" + exclusive string = "exclusive" + priority string = "priority" + multipleAsk string = "multiple_ask" + requeueOnFail string = "requeue_on_fail" + + dlx string = "x-dead-letter-exchange" + dlxRoutingKey string = "x-dead-letter-routing-key" + dlxTTL string = "x-message-ttl" + dlxExpires string = "x-expires" + + contentType string = "application/octet-stream" +) + +type GlobalCfg struct { + Addr string `mapstructure:"addr"` +} + +// Config is used to parse pipeline configuration +type Config struct { + Prefetch int `mapstructure:"prefetch"` + Queue string `mapstructure:"queue"` + Priority int64 `mapstructure:"priority"` + Exchange string `mapstructure:"exchange"` + ExchangeType string `mapstructure:"exchange_type"` + RoutingKey string `mapstructure:"routing_key"` + Exclusive bool `mapstructure:"exclusive"` + MultipleAck bool `mapstructure:"multiple_ask"` + RequeueOnFail bool `mapstructure:"requeue_on_fail"` +} + +func (c *Config) InitDefault() { + // all options should be in sync with the pipeline defaults in the FromPipeline method + if c.ExchangeType == "" { + c.ExchangeType = "direct" + } + + if c.Exchange == "" { + c.Exchange = "amqp.default" + } + + if c.Queue == "" { + c.Queue = "default" + } + + if c.Prefetch == 0 { + c.Prefetch = 10 + } + + if c.Priority == 0 { + c.Priority = 10 + } +} + +func (c *GlobalCfg) InitDefault() { + if c.Addr == "" { + c.Addr = "amqp://guest:guest@127.0.0.1:5672/" + } +} diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go new file mode 100644 index 00000000..1931ceaa --- /dev/null +++ b/plugins/amqp/amqpjobs/consumer.go @@ -0,0 +1,512 @@ +package amqpjobs + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" +) + +const ( + pluginName string = "amqp" +) + +type consumer struct { + sync.Mutex + log logger.Logger + pq priorityqueue.Queue + eh events.Handler + + pipeline atomic.Value + + // amqp connection + conn *amqp.Connection + consumeChan *amqp.Channel + publishChan chan *amqp.Channel + consumeID string + connStr string + + retryTimeout time.Duration + // + // prefetch QoS AMQP + // + prefetch int + // + // pipeline's priority + // + priority int64 + exchangeName string + queue string + exclusive bool + exchangeType string + routingKey string + multipleAck bool + requeueOnFail bool + + listeners uint32 + delayed *int64 + stopCh chan struct{} +} + +// NewAMQPConsumer initializes rabbitmq pipeline +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_amqp_consumer") + // we need to obtain two parts of the amqp information here. + // firs part - address to connect, it is located in the global section under the amqp pluginName + // second part - queues and other pipeline information + // if no such key - error + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) + } + + // PARSE CONFIGURATION START ------- + var pipeCfg Config + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(configKey, &pipeCfg) + if err != nil { + return nil, errors.E(op, err) + } + + pipeCfg.InitDefault() + + err = cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + // PARSE CONFIGURATION END ------- + + jb := &consumer{ + log: log, + pq: pq, + eh: e, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + // TODO to config + retryTimeout: time.Minute * 5, + priority: pipeCfg.Priority, + delayed: utils.Int64(0), + + publishChan: make(chan *amqp.Channel, 1), + routingKey: pipeCfg.RoutingKey, + queue: pipeCfg.Queue, + exchangeType: pipeCfg.ExchangeType, + exchangeName: pipeCfg.Exchange, + prefetch: pipeCfg.Prefetch, + exclusive: pipeCfg.Exclusive, + multipleAck: pipeCfg.MultipleAck, + requeueOnFail: pipeCfg.RequeueOnFail, + } + + jb.conn, err = amqp.Dial(globalCfg.Addr) + if err != nil { + return nil, errors.E(op, err) + } + + // save address + jb.connStr = globalCfg.Addr + + err = jb.initRabbitMQ() + if err != nil { + return nil, errors.E(op, err) + } + + pch, err := jb.conn.Channel() + if err != nil { + return nil, errors.E(op, err) + } + + jb.publishChan <- pch + + // run redialer and requeue listener for the connection + jb.redialer() + + return jb, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_amqp_consumer_from_pipeline") + // we need to obtain two parts of the amqp information here. + // firs part - address to connect, it is located in the global section under the amqp pluginName + // second part - queues and other pipeline information + + // only global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) + } + + // PARSE CONFIGURATION ------- + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + // PARSE CONFIGURATION ------- + + jb := &consumer{ + log: log, + eh: e, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + retryTimeout: time.Minute * 5, + delayed: utils.Int64(0), + + publishChan: make(chan *amqp.Channel, 1), + routingKey: pipeline.String(routingKey, ""), + queue: pipeline.String(queue, "default"), + exchangeType: pipeline.String(exchangeType, "direct"), + exchangeName: pipeline.String(exchangeKey, "amqp.default"), + prefetch: pipeline.Int(prefetch, 10), + priority: int64(pipeline.Int(priority, 10)), + exclusive: pipeline.Bool(exclusive, false), + multipleAck: pipeline.Bool(multipleAsk, false), + requeueOnFail: pipeline.Bool(requeueOnFail, false), + } + + jb.conn, err = amqp.Dial(globalCfg.Addr) + if err != nil { + return nil, errors.E(op, err) + } + + // save address + jb.connStr = globalCfg.Addr + + err = jb.initRabbitMQ() + if err != nil { + return nil, errors.E(op, err) + } + + pch, err := jb.conn.Channel() + if err != nil { + return nil, errors.E(op, err) + } + + jb.publishChan <- pch + + // register the pipeline + // error here is always nil + _ = jb.Register(context.Background(), pipeline) + + // run redialer for the connection + jb.redialer() + + return jb, nil +} + +func (j *consumer) Push(ctx context.Context, job *job.Job) error { + const op = errors.Op("rabbitmq_push") + // check if the pipeline registered + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != job.Options.Pipeline { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) + } + + err := j.handleItem(ctx, fromJob(job)) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { + j.pipeline.Store(p) + return nil +} + +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("rabbit_consume") + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) + } + + // protect connection (redial) + j.Lock() + defer j.Unlock() + + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + err = j.consumeChan.Qos(j.prefetch, 0, false) + if err != nil { + return errors.E(op, err) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // run listener + j.listener(deliv) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + return nil +} + +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("amqp_driver_state") + select { + case pch := <-j.publishChan: + defer func() { + j.publishChan <- pch + }() + + q, err := pch.QueueInspect(j.queue) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: q.Name, + Active: int64(q.Messages), + Delayed: atomic.LoadInt64(j.delayed), + Ready: ready(atomic.LoadUint32(&j.listeners)), + }, nil + + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } +} + +func (j *consumer) Pause(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&j.listeners, ^uint32(0)) + + // protect connection (redial) + j.Lock() + defer j.Unlock() + + err := j.consumeChan.Cancel(j.consumeID, true) + if err != nil { + j.log.Error("cancel publish channel, forcing close", "error", err) + errCl := j.consumeChan.Close() + if errCl != nil { + j.log.Error("force close failed", "error", err) + return + } + return + } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (j *consumer) Resume(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested resume on: ", p) + } + + // protect connection (redial) + j.Lock() + defer j.Unlock() + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 1 { + j.log.Warn("amqp listener already in the active state") + return + } + + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + j.log.Error("create channel on rabbitmq connection", "error", err) + return + } + + err = j.consumeChan.Qos(j.prefetch, 0, false) + if err != nil { + j.log.Error("qos set failed", "error", err) + return + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + j.log.Error("consume operation failed", "error", err) + return + } + + // run listener + j.listener(deliv) + + // increase number of listeners + atomic.AddUint32(&j.listeners, 1) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (j *consumer) Stop(context.Context) error { + j.stopCh <- struct{}{} + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil +} + +// handleItem +func (j *consumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("rabbitmq_handle_item") + select { + case pch := <-j.publishChan: + // return the channel back + defer func() { + j.publishChan <- pch + }() + + // convert + table, err := pack(msg.ID(), msg) + if err != nil { + return errors.E(op, err) + } + + const op = errors.Op("rabbitmq_handle_item") + // handle timeouts + if msg.Options.DelayDuration() > 0 { + atomic.AddInt64(j.delayed, 1) + // TODO declare separate method for this if condition + // TODO dlx cache channel?? + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now().UTC(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + return nil + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + return nil + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) + } +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go new file mode 100644 index 00000000..a8e305ea --- /dev/null +++ b/plugins/amqp/amqpjobs/item.go @@ -0,0 +1,239 @@ +package amqpjobs + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + json "github.com/json-iterator/go" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" +) + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // private + // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + ack func(multiply bool) error + + // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. + // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. + // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time + nack func(multiply bool, requeue bool) error + + // requeueFn used as a pointer to the push function + requeueFn func(context.Context, *Item) error + delayed *int64 + multipleAsk bool + requeue bool +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +// Body packs job payload into binary payload. +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +// Context packs job context (job, id) into binary payload. +// Not used in the amqp, amqp.Table used instead +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } + return i.Options.ack(i.Options.multipleAsk) +} + +func (i *Item) Nack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } + return i.Options.nack(false, i.Options.requeue) +} + +// Requeue with the provided delay, handled by the Nack +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } + // overwrite the delay + i.Options.Delay = delay + i.Headers = headers + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + errNack := i.Options.nack(false, true) + if errNack != nil { + return fmt.Errorf("requeue error: %v\nack error: %v", err, errNack) + } + + return err + } + + // ack the job + err = i.Options.ack(false) + if err != nil { + return err + } + + return nil +} + +// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ +func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { + const op = errors.Op("from_delivery_convert") + item, err := j.unpack(d) + if err != nil { + return nil, errors.E(op, err) + } + + i := &Item{ + Job: item.Job, + Ident: item.Ident, + Payload: item.Payload, + Headers: item.Headers, + Options: item.Options, + } + + item.Options.ack = d.Ack + item.Options.nack = d.Nack + item.Options.delayed = j.delayed + + // requeue func + item.Options.requeueFn = j.handleItem + return i, nil +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Headers: job.Headers, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } +} + +// pack job metadata into headers +func pack(id string, j *Item) (amqp.Table, error) { + headers, err := json.Marshal(j.Headers) + if err != nil { + return nil, err + } + return amqp.Table{ + job.RRID: id, + job.RRJob: j.Job, + job.RRPipeline: j.Options.Pipeline, + job.RRHeaders: headers, + job.RRDelay: j.Options.Delay, + job.RRPriority: j.Options.Priority, + }, nil +} + +// unpack restores jobs.Options +func (j *consumer) unpack(d amqp.Delivery) (*Item, error) { + item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ + multipleAsk: j.multipleAck, + requeue: j.requeueOnFail, + requeueFn: j.handleItem, + }} + + if _, ok := d.Headers[job.RRID].(string); !ok { + return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID)) + } + + item.Ident = d.Headers[job.RRID].(string) + + if _, ok := d.Headers[job.RRJob].(string); !ok { + return nil, errors.E(errors.Errorf("missing header `%s`", job.RRJob)) + } + + item.Job = d.Headers[job.RRJob].(string) + + if _, ok := d.Headers[job.RRPipeline].(string); ok { + item.Options.Pipeline = d.Headers[job.RRPipeline].(string) + } + + if h, ok := d.Headers[job.RRHeaders].([]byte); ok { + err := json.Unmarshal(h, &item.Headers) + if err != nil { + return nil, err + } + } + + if _, ok := d.Headers[job.RRDelay].(int64); ok { + item.Options.Delay = d.Headers[job.RRDelay].(int64) + } + + if _, ok := d.Headers[job.RRPriority]; !ok { + // set pipe's priority + item.Options.Priority = j.priority + } else { + item.Options.Priority = d.Headers[job.RRPriority].(int64) + } + + return item, nil +} diff --git a/plugins/amqp/amqpjobs/listener.go b/plugins/amqp/amqpjobs/listener.go new file mode 100644 index 00000000..0156d55c --- /dev/null +++ b/plugins/amqp/amqpjobs/listener.go @@ -0,0 +1,25 @@ +package amqpjobs + +import amqp "github.com/rabbitmq/amqp091-go" + +func (j *consumer) listener(deliv <-chan amqp.Delivery) { + go func() { + for { //nolint:gosimple + select { + case msg, ok := <-deliv: + if !ok { + j.log.Info("delivery channel closed, leaving the rabbit listener") + return + } + + d, err := j.fromDelivery(msg) + if err != nil { + j.log.Error("amqp delivery convert", "error", err) + continue + } + // insert job into the main priority queue + j.pq.Insert(d) + } + } + }() +} diff --git a/plugins/amqp/amqpjobs/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go new file mode 100644 index 00000000..e260fabe --- /dev/null +++ b/plugins/amqp/amqpjobs/rabbit_init.go @@ -0,0 +1,57 @@ +package amqpjobs + +import ( + "github.com/spiral/errors" +) + +func (j *consumer) initRabbitMQ() error { + const op = errors.Op("jobs_plugin_rmq_init") + // Channel opens a unique, concurrent server channel to process the bulk of AMQP + // messages. Any error from methods on this receiver will render the receiver + // invalid and a new Channel should be opened. + channel, err := j.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + // declare an exchange (idempotent operation) + err = channel.ExchangeDeclare( + j.exchangeName, + j.exchangeType, + true, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // verify or declare a queue + q, err := channel.QueueDeclare( + j.queue, + false, + false, + j.exclusive, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // bind queue to the exchange + err = channel.QueueBind( + q.Name, + j.routingKey, + j.exchangeName, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + return channel.Close() +} diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go new file mode 100644 index 00000000..0835e3ea --- /dev/null +++ b/plugins/amqp/amqpjobs/redial.go @@ -0,0 +1,141 @@ +package amqpjobs + +import ( + "time" + + "github.com/cenkalti/backoff/v4" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" +) + +// redialer used to redial to the rabbitmq in case of the connection interrupts +func (j *consumer) redialer() { //nolint:gocognit + go func() { + const op = errors.Op("rabbitmq_redial") + + for { + select { + case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): + if err == nil { + return + } + + j.Lock() + + // trash the broken publishing channel + <-j.publishChan + + t := time.Now() + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeError, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Error: err, + Start: time.Now(), + }) + + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = j.retryTimeout + operation := func() error { + j.log.Warn("rabbitmq reconnecting, caused by", "error", err) + var dialErr error + j.conn, dialErr = amqp.Dial(j.connStr) + if dialErr != nil { + return errors.E(op, dialErr) + } + + j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + + // re-init connection + errInit := j.initRabbitMQ() + if errInit != nil { + j.log.Error("rabbitmq dial", "error", errInit) + return errInit + } + + // redeclare consume channel + var errConnCh error + j.consumeChan, errConnCh = j.conn.Channel() + if errConnCh != nil { + return errors.E(op, errConnCh) + } + + // redeclare publish channel + pch, errPubCh := j.conn.Channel() + if errPubCh != nil { + return errors.E(op, errPubCh) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // put the fresh publishing channel + j.publishChan <- pch + // restart listener + j.listener(deliv) + + j.log.Info("queues and subscribers redeclared successfully") + + return nil + } + + retryErr := backoff.Retry(operation, expb) + if retryErr != nil { + j.Unlock() + j.log.Error("backoff failed", "error", retryErr) + return + } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Start: t, + Elapsed: time.Since(t), + }) + + j.Unlock() + + case <-j.stopCh: + if j.publishChan != nil { + pch := <-j.publishChan + err := pch.Close() + if err != nil { + j.log.Error("publish channel close", "error", err) + } + } + + if j.consumeChan != nil { + err := j.consumeChan.Close() + if err != nil { + j.log.Error("consume channel close", "error", err) + } + } + if j.conn != nil { + err := j.conn.Close() + if err != nil { + j.log.Error("amqp connection close", "error", err) + } + } + + return + } + } + }() +} diff --git a/plugins/amqp/plugin.go b/plugins/amqp/plugin.go new file mode 100644 index 00000000..c4f5f1da --- /dev/null +++ b/plugins/amqp/plugin.go @@ -0,0 +1,41 @@ +package amqp + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/amqp/amqpjobs" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + pluginName string = "amqp" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) Name() string { + return pluginName +} + +func (p *Plugin) Available() {} + +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return amqpjobs.NewAMQPConsumer(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline constructs AMQP driver from pipeline +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return amqpjobs.FromPipeline(pipe, p.log, p.cfg, e, pq) +} diff --git a/plugins/beanstalk/config.go b/plugins/beanstalk/config.go new file mode 100644 index 00000000..a8069f5d --- /dev/null +++ b/plugins/beanstalk/config.go @@ -0,0 +1,53 @@ +package beanstalk + +import ( + "time" + + "github.com/spiral/roadrunner/v2/utils" +) + +const ( + tubePriority string = "tube_priority" + tube string = "tube" + reserveTimeout string = "reserve_timeout" +) + +type GlobalCfg struct { + Addr string `mapstructure:"addr"` + Timeout time.Duration `mapstructure:"timeout"` +} + +func (c *GlobalCfg) InitDefault() { + if c.Addr == "" { + c.Addr = "tcp://127.0.0.1:11300" + } + + if c.Timeout == 0 { + c.Timeout = time.Second * 30 + } +} + +type Config struct { + PipePriority int64 `mapstructure:"priority"` + TubePriority *uint32 `mapstructure:"tube_priority"` + Tube string `mapstructure:"tube"` + ReserveTimeout time.Duration `mapstructure:"reserve_timeout"` +} + +func (c *Config) InitDefault() { + if c.Tube == "" { + c.Tube = "default" + } + + if c.ReserveTimeout == 0 { + c.ReserveTimeout = time.Second * 1 + } + + if c.TubePriority == nil { + c.TubePriority = utils.Uint32(0) + } + + if c.PipePriority == 0 { + c.PipePriority = 10 + } +} diff --git a/plugins/beanstalk/connection.go b/plugins/beanstalk/connection.go new file mode 100644 index 00000000..d3241b37 --- /dev/null +++ b/plugins/beanstalk/connection.go @@ -0,0 +1,223 @@ +package beanstalk + +import ( + "context" + "net" + "sync" + "time" + + "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type ConnPool struct { + sync.RWMutex + + log logger.Logger + + conn *beanstalk.Conn + connT *beanstalk.Conn + ts *beanstalk.TubeSet + t *beanstalk.Tube + + network string + address string + tName string + tout time.Duration +} + +func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) { + connT, err := beanstalk.DialTimeout(network, address, tout) + if err != nil { + return nil, err + } + + connTS, err := beanstalk.DialTimeout(network, address, tout) + if err != nil { + return nil, err + } + + tube := beanstalk.NewTube(connT, tName) + ts := beanstalk.NewTubeSet(connTS, tName) + + return &ConnPool{ + log: log, + network: network, + address: address, + tName: tName, + tout: tout, + conn: connTS, + connT: connT, + ts: ts, + t: tube, + }, nil +} + +// Put the payload +// TODO use the context ?? +func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { + cp.RLock() + defer cp.RUnlock() + + // TODO(rustatian): redial based on the token + id, err := cp.t.Put(body, pri, delay, ttr) + if err != nil { + // errN contains both, err and internal checkAndRedial error + errN := cp.checkAndRedial(err) + if errN != nil { + return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN) + } else { + // retry put only when we redialed + return cp.t.Put(body, pri, delay, ttr) + } + } + + return id, nil +} + +// Reserve reserves and returns a job from one of the tubes in t. If no +// job is available before time timeout has passed, Reserve returns a +// ConnError recording ErrTimeout. +// +// Typically, a client will reserve a job, perform some work, then delete +// the job with Conn.Delete. +func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { + cp.RLock() + defer cp.RUnlock() + + id, body, err := cp.ts.Reserve(reserveTimeout) + if err != nil { + // errN contains both, err and internal checkAndRedial error + errN := cp.checkAndRedial(err) + if errN != nil { + return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN) + } else { + // retry Reserve only when we redialed + return cp.ts.Reserve(reserveTimeout) + } + } + + return id, body, nil +} + +func (cp *ConnPool) Delete(_ context.Context, id uint64) error { + cp.RLock() + defer cp.RUnlock() + + err := cp.conn.Delete(id) + if err != nil { + // errN contains both, err and internal checkAndRedial error + errN := cp.checkAndRedial(err) + if errN != nil { + return errors.Errorf("err: %s\nerr redial: %s", err, errN) + } else { + // retry Delete only when we redialed + return cp.conn.Delete(id) + } + } + return nil +} + +func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) { + cp.RLock() + defer cp.RUnlock() + + stat, err := cp.conn.Stats() + if err != nil { + errR := cp.checkAndRedial(err) + if errR != nil { + return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR) + } else { + return cp.conn.Stats() + } + } + + return stat, nil +} + +func (cp *ConnPool) redial() error { + const op = errors.Op("connection_pool_redial") + + cp.Lock() + // backoff here + expb := backoff.NewExponentialBackOff() + // TODO(rustatian) set via config + expb.MaxElapsedTime = time.Minute + + operation := func() error { + connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + if connT == nil { + return errors.E(op, errors.Str("connectionT is nil")) + } + + connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + + if connTS == nil { + return errors.E(op, errors.Str("connectionTS is nil")) + } + + cp.t = beanstalk.NewTube(connT, cp.tName) + cp.ts = beanstalk.NewTubeSet(connTS, cp.tName) + cp.conn = connTS + cp.connT = connT + + cp.log.Info("beanstalk redial was successful") + return nil + } + + retryErr := backoff.Retry(operation, expb) + if retryErr != nil { + cp.Unlock() + return retryErr + } + cp.Unlock() + + return nil +} + +var connErrors = map[string]struct{}{"EOF": {}} + +func (cp *ConnPool) checkAndRedial(err error) error { + const op = errors.Op("connection_pool_check_redial") + switch et := err.(type) { //nolint:gocritic + // check if the error + case beanstalk.ConnError: + switch bErr := et.Err.(type) { + case *net.OpError: + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", bErr, errR)) + } + + // if redial was successful -> continue listening + return nil + default: + if _, ok := connErrors[et.Err.Error()]; ok { + // if error is related to the broken connection - redial + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", err, errR)) + } + // if redial was successful -> continue listening + return nil + } + } + } + + // return initial error + return err +} diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go new file mode 100644 index 00000000..5ef89983 --- /dev/null +++ b/plugins/beanstalk/consumer.go @@ -0,0 +1,360 @@ +package beanstalk + +import ( + "bytes" + "context" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" +) + +type consumer struct { + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + + pipeline atomic.Value + listeners uint32 + + // beanstalk + pool *ConnPool + addr string + network string + reserveTimeout time.Duration + reconnectCh chan struct{} + tout time.Duration + // tube name + tName string + tubePriority *uint32 + priority int64 + + stopCh chan struct{} + requeueCh chan *Item +} + +func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_beanstalk_consumer") + + // PARSE CONFIGURATION ------- + var pipeCfg Config + var globalCfg GlobalCfg + + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + + err := cfg.UnmarshalKey(configKey, &pipeCfg) + if err != nil { + return nil, errors.E(op, err) + } + + pipeCfg.InitDefault() + + err = cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + // PARSE CONFIGURATION ------- + + dsn := strings.Split(globalCfg.Addr, "://") + if len(dsn) != 2 { + return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) + } + + cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log) + if err != nil { + return nil, errors.E(op, err) + } + + // initialize job consumer + jc := &consumer{ + pq: pq, + log: log, + eh: e, + pool: cPool, + network: dsn[0], + addr: dsn[1], + tout: globalCfg.Timeout, + tName: pipeCfg.Tube, + reserveTimeout: pipeCfg.ReserveTimeout, + tubePriority: pipeCfg.TubePriority, + priority: pipeCfg.PipePriority, + + // buffered with two because jobs root plugin can call Stop at the same time as Pause + stopCh: make(chan struct{}, 2), + requeueCh: make(chan *Item, 1000), + reconnectCh: make(chan struct{}, 2), + } + + return jc, nil +} + +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_beanstalk_consumer") + + // PARSE CONFIGURATION ------- + var globalCfg GlobalCfg + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + + err := cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + // PARSE CONFIGURATION ------- + + dsn := strings.Split(globalCfg.Addr, "://") + if len(dsn) != 2 { + return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) + } + + cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log) + if err != nil { + return nil, errors.E(op, err) + } + + // initialize job consumer + jc := &consumer{ + pq: pq, + log: log, + eh: e, + pool: cPool, + network: dsn[0], + addr: dsn[1], + tout: globalCfg.Timeout, + tName: pipe.String(tube, "default"), + reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), + tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))), + priority: pipe.Priority(), + + // buffered with two because jobs root plugin can call Stop at the same time as Pause + stopCh: make(chan struct{}, 2), + requeueCh: make(chan *Item, 1000), + reconnectCh: make(chan struct{}, 2), + } + + return jc, nil +} +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { + const op = errors.Op("beanstalk_push") + // check if the pipeline registered + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != jb.Options.Pipeline { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) + } + + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (j *consumer) handleItem(ctx context.Context, item *Item) error { + const op = errors.Op("beanstalk_handle_item") + + bb := new(bytes.Buffer) + bb.Grow(64) + err := item.pack(bb) + if err != nil { + return errors.E(op, err) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458 + // is an integer < 2**32. Jobs with smaller priority values will be + // scheduled before jobs with larger priorities. The most urgent priority is 0; + // the least urgent priority is 4,294,967,295. + // + // is an integer number of seconds to wait before putting the job in + // the ready queue. The job will be in the "delayed" state during this time. + // Maximum delay is 2**32-1. + // + // -- time to run -- is an integer number of seconds to allow a worker + // to run this job. This time is counted from the moment a worker reserves + // this job. If the worker does not delete, release, or bury the job within + // seconds, the job will time out and the server will release the job. + // The minimum ttr is 1. If the client sends 0, the server will silently + // increase the ttr to 1. Maximum ttr is 2**32-1. + id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout) + if err != nil { + errD := j.pool.Delete(ctx, id) + if errD != nil { + return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error())) + } + return errors.E(op, err) + } + + return nil +} + +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { + // register the pipeline + j.pipeline.Store(p) + return nil +} + +// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("beanstalk_state") + stat, err := j.pool.Stats(ctx) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: j.tName, + Ready: ready(atomic.LoadUint32(&j.listeners)), + } + + // set stat, skip errors (replace with 0) + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523 + if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil { + out.Active = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525 + if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil { + // this is not an error, reserved in beanstalk behaves like an active jobs + out.Reserved = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528 + if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil { + out.Delayed = int64(v) + } + + return out, nil +} + +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("beanstalk_run") + // check if the pipeline registered + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name())) + } + + atomic.AddUint32(&j.listeners, 1) + + go j.listen() + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + return nil +} + +func (j *consumer) Stop(context.Context) error { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + if atomic.LoadUint32(&j.listeners) == 1 { + j.stopCh <- struct{}{} + } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + return nil +} + +func (j *consumer) Pause(_ context.Context, p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&j.listeners, ^uint32(0)) + + j.stopCh <- struct{}{} + + j.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (j *consumer) Resume(_ context.Context, p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 1 { + j.log.Warn("sqs listener already in the active state") + return + } + + // start listener + go j.listen() + + // increase num of listeners + atomic.AddUint32(&j.listeners, 1) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/beanstalk/encode_test.go b/plugins/beanstalk/encode_test.go new file mode 100644 index 00000000..e43207eb --- /dev/null +++ b/plugins/beanstalk/encode_test.go @@ -0,0 +1,75 @@ +package beanstalk + +import ( + "bytes" + "crypto/rand" + "encoding/gob" + "testing" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/utils" +) + +func BenchmarkEncodeGob(b *testing.B) { + tb := make([]byte, 1024*10) + _, err := rand.Read(tb) + if err != nil { + b.Fatal(err) + } + + item := &Item{ + Job: "/super/test/php/class/loooooong", + Ident: "12341234-asdfasdfa-1234234-asdfasdfas", + Payload: utils.AsString(tb), + Headers: map[string][]string{"Test": {"test1", "test2"}}, + Options: &Options{ + Priority: 10, + Pipeline: "test-local-pipe", + Delay: 10, + }, + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + bb := new(bytes.Buffer) + err := gob.NewEncoder(bb).Encode(item) + if err != nil { + b.Fatal(err) + } + _ = bb.Bytes() + bb.Reset() + } +} + +func BenchmarkEncodeJsonIter(b *testing.B) { + tb := make([]byte, 1024*10) + _, err := rand.Read(tb) + if err != nil { + b.Fatal(err) + } + + item := &Item{ + Job: "/super/test/php/class/loooooong", + Ident: "12341234-asdfasdfa-1234234-asdfasdfas", + Payload: utils.AsString(tb), + Headers: map[string][]string{"Test": {"test1", "test2"}}, + Options: &Options{ + Priority: 10, + Pipeline: "test-local-pipe", + Delay: 10, + }, + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + bb, err := json.Marshal(item) + if err != nil { + b.Fatal(err) + } + _ = bb + } +} diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go new file mode 100644 index 00000000..0a6cd560 --- /dev/null +++ b/plugins/beanstalk/item.go @@ -0,0 +1,147 @@ +package beanstalk + +import ( + "bytes" + "context" + "encoding/gob" + "time" + + "github.com/beanstalkd/go-beanstalk" + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" +) + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // Private ================ + id uint64 + conn *beanstalk.Conn + requeueFn func(context.Context, *Item) error +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +// Body packs job payload into binary payload. +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +// Context packs job context (job, id) into binary payload. +// Not used in the sqs, MessageAttributes used instead +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + return i.Options.conn.Delete(i.Options.id) +} + +func (i *Item) Nack() error { + return i.Options.conn.Delete(i.Options.id) +} + +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + // overwrite the delay + i.Options.Delay = delay + i.Headers = headers + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err + } + + // delete old job + err = i.Options.conn.Delete(i.Options.id) + if err != nil { + return err + } + + return nil +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Headers: job.Headers, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } +} + +func (i *Item) pack(b *bytes.Buffer) error { + err := gob.NewEncoder(b).Encode(i) + if err != nil { + return err + } + + return nil +} + +func (j *consumer) unpack(id uint64, data []byte, out *Item) error { + err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) + if err != nil { + return err + } + out.Options.conn = j.pool.conn + out.Options.id = id + out.Options.requeueFn = j.handleItem + + return nil +} diff --git a/plugins/beanstalk/listen.go b/plugins/beanstalk/listen.go new file mode 100644 index 00000000..6bb159ea --- /dev/null +++ b/plugins/beanstalk/listen.go @@ -0,0 +1,39 @@ +package beanstalk + +import ( + "github.com/beanstalkd/go-beanstalk" +) + +func (j *consumer) listen() { + for { + select { + case <-j.stopCh: + j.log.Warn("beanstalk listener stopped") + return + default: + id, body, err := j.pool.Reserve(j.reserveTimeout) + if err != nil { + if errB, ok := err.(beanstalk.ConnError); ok { + switch errB.Err { //nolint:gocritic + case beanstalk.ErrTimeout: + j.log.Info("beanstalk reserve timeout", "warn", errB.Op) + continue + } + } + // in case of other error - continue + j.log.Error("beanstalk reserve", "error", err) + continue + } + + item := &Item{} + err = j.unpack(id, body, item) + if err != nil { + j.log.Error("beanstalk unpack item", "error", err) + continue + } + + // insert job into the priority queue + j.pq.Insert(item) + } + } +} diff --git a/plugins/beanstalk/plugin.go b/plugins/beanstalk/plugin.go new file mode 100644 index 00000000..529d1474 --- /dev/null +++ b/plugins/beanstalk/plugin.go @@ -0,0 +1,47 @@ +package beanstalk + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + pluginName string = "beanstalk" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) Serve() chan error { + return make(chan error) +} + +func (p *Plugin) Stop() error { + return nil +} + +func (p *Plugin) Name() string { + return pluginName +} + +func (p *Plugin) Available() {} + +func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewBeanstalkConsumer(configKey, p.log, p.cfg, eh, pq) +} + +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipe, p.log, p.cfg, eh, pq) +} diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 1f8e6ff1..4a8d6cd9 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -11,7 +11,7 @@ func (c *consumer) listener() { if err != nil { panic(err) } - //cursor := tx.Cursor() + // cursor := tx.Cursor() err = tx.Commit() if err != nil { diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 889dc2fa..a2390df5 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - "github.com/google/uuid" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" @@ -16,9 +15,6 @@ const ( PluginName string = "broadcast" // driver is the mandatory field which should present in every storage driver string = "driver" - - redis string = "redis" - memory string = "memory" ) type Plugin struct { @@ -97,6 +93,7 @@ func (p *Plugin) Publish(m *pubsub.Message) error { } func (p *Plugin) PublishAsync(m *pubsub.Message) { + // TODO(rustatian) channel here? go func() { p.Lock() defer p.Unlock() @@ -106,7 +103,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { err := p.publishers[j].Publish(m) if err != nil { p.log.Error("publishAsync", "error", err) - // continue publish to other registered publishers + // continue publishing to the other registered publishers continue } } @@ -116,7 +113,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { }() } -func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit +func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { const op = errors.Op("broadcast_plugin_get_driver") // choose a driver @@ -136,57 +133,37 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:goco // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, key) - switch val.(map[string]interface{})[driver] { - case memory: - if _, ok := p.constructors[memory]; !ok { - return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers)) - } - ps, err := p.constructors[memory].PSConstruct(configKey) - if err != nil { - return nil, errors.E(op, err) - } - - // save the initialized publisher channel - // for the in-memory, register new publishers - p.publishers[uuid.NewString()] = ps + drName := val.(map[string]interface{})[driver] - return ps, nil - case redis: - if _, ok := p.constructors[redis]; !ok { - return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers)) + // driver name should be a string + if drStr, ok := drName.(string); ok { + if _, ok := p.constructors[drStr]; !ok { + return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr)) } - // first - try local configuration - switch { - case p.cfgPlugin.Has(configKey): - ps, err := p.constructors[redis].PSConstruct(configKey) + // try local config first + if p.cfgPlugin.Has(configKey) { + ps, err := p.constructors[drStr].PSConstruct(configKey) if err != nil { return nil, errors.E(op, err) } - // if section already exists, return new connection - if _, ok := p.publishers[configKey]; ok { - return ps, nil - } - - // if not - initialize a connection + // save the initialized publisher channel + // for the in-memory, register new publishers p.publishers[configKey] = ps - return ps, nil - // then try global if local does not exist - case p.cfgPlugin.Has(redis): - ps, err := p.constructors[redis].PSConstruct(configKey) + return ps, nil + } else { + // try global driver section + ps, err := p.constructors[drStr].PSConstruct(drStr) if err != nil { return nil, errors.E(op, err) } - // if section already exists, return new connection - if _, ok := p.publishers[configKey]; ok { - return ps, nil - } - - // if not - initialize a connection + // save the initialized publisher channel + // for the in-memory, register new publishers p.publishers[configKey] = ps + return ps, nil } } diff --git a/plugins/ephemeral/consumer.go b/plugins/ephemeral/consumer.go new file mode 100644 index 00000000..91b8eda9 --- /dev/null +++ b/plugins/ephemeral/consumer.go @@ -0,0 +1,274 @@ +package ephemeral + +import ( + "context" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" +) + +const ( + prefetch string = "prefetch" + goroutinesMax uint64 = 1000 +) + +type Config struct { + Prefetch uint64 `mapstructure:"prefetch"` +} + +type consumer struct { + cfg *Config + log logger.Logger + eh events.Handler + pipeline atomic.Value + pq priorityqueue.Queue + localPrefetch chan *Item + + // time.sleep goroutines max number + goroutines uint64 + + delayed *int64 + active *int64 + + listeners uint32 + stopCh chan struct{} +} + +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_ephemeral_pipeline") + + jb := &consumer{ + log: log, + pq: pq, + eh: eh, + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}, 1), + } + + err := cfg.UnmarshalKey(configKey, &jb.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + if jb.cfg.Prefetch == 0 { + jb.cfg.Prefetch = 100_000 + } + + // initialize a local queue + jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) + + return jb, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + jb := &consumer{ + log: log, + pq: pq, + eh: eh, + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}, 1), + } + + // initialize a local queue + jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) + + return jb, nil +} + +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { + const op = errors.Op("ephemeral_push") + + // check if the pipeline registered + _, ok := j.pipeline.Load().(*pipeline.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) + } + + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (j *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: pipe.Name(), + Active: atomic.LoadInt64(j.active), + Delayed: atomic.LoadInt64(j.delayed), + Ready: ready(atomic.LoadUint32(&j.listeners)), + }, nil +} + +func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + j.pipeline.Store(pipeline) + return nil +} + +func (j *consumer) Pause(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&j.listeners, ^uint32(0)) + + // stop the consumer + j.stopCh <- struct{}{} + + j.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) +} + +func (j *consumer) Resume(_ context.Context, p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested resume on: ", p) + } + + l := atomic.LoadUint32(&j.listeners) + // listener already active + if l == 1 { + j.log.Warn("listener already in the active state") + return + } + + // resume the consumer on the same channel + j.consume() + + atomic.StoreUint32(&j.listeners, 1) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) +} + +// Run is no-op for the ephemeral +func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil +} + +func (j *consumer) Stop(ctx context.Context) error { + const op = errors.Op("ephemeral_plugin_stop") + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + select { + // return from the consumer + case j.stopCh <- struct{}{}: + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) + + return nil + + case <-ctx.Done(): + return errors.E(op, ctx.Err()) + } +} + +func (j *consumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("ephemeral_handle_request") + // handle timeouts + // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) + // goroutines here. We should limit goroutines here. + if msg.Options.Delay > 0 { + // if we have 1000 goroutines waiting on the delay - reject 1001 + if atomic.LoadUint64(&j.goroutines) >= goroutinesMax { + return errors.E(op, errors.Str("max concurrency number reached")) + } + + go func(jj *Item) { + atomic.AddUint64(&j.goroutines, 1) + atomic.AddInt64(j.delayed, 1) + + time.Sleep(jj.Options.DelayDuration()) + + // send the item after timeout expired + j.localPrefetch <- jj + + atomic.AddUint64(&j.goroutines, ^uint64(0)) + }(msg) + + return nil + } + + // increase number of the active jobs + atomic.AddInt64(j.active, 1) + + // insert to the local, limited pipeline + select { + case j.localPrefetch <- msg: + return nil + case <-ctx.Done(): + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) + } +} + +func (j *consumer) consume() { + go func() { + // redirect + for { + select { + case item, ok := <-j.localPrefetch: + if !ok { + j.log.Warn("ephemeral local prefetch queue was closed") + return + } + + // set requeue channel + item.Options.requeueFn = j.handleItem + item.Options.active = j.active + item.Options.delayed = j.delayed + + j.pq.Insert(item) + case <-j.stopCh: + return + } + } + }() +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/ephemeral/item.go b/plugins/ephemeral/item.go new file mode 100644 index 00000000..3298424d --- /dev/null +++ b/plugins/ephemeral/item.go @@ -0,0 +1,133 @@ +package ephemeral + +import ( + "context" + "sync/atomic" + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" +) + +type Item struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // private + requeueFn func(context.Context, *Item) error + active *int64 + delayed *int64 +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +// Body packs job payload into binary payload. +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + i.atomicallyReduceCount() + return nil +} + +func (i *Item) Nack() error { + i.atomicallyReduceCount() + return nil +} + +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + // overwrite the delay + i.Options.Delay = delay + i.Headers = headers + + i.atomicallyReduceCount() + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err + } + + return nil +} + +// atomicallyReduceCount reduces counter of active or delayed jobs +func (i *Item) atomicallyReduceCount() { + // if job was delayed, reduce number of the delayed jobs + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + return + } + + // otherwise, reduce number of the active jobs + atomic.AddInt64(i.Options.active, ^int64(0)) + // noop for the in-memory +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } +} diff --git a/plugins/ephemeral/plugin.go b/plugins/ephemeral/plugin.go new file mode 100644 index 00000000..28495abb --- /dev/null +++ b/plugins/ephemeral/plugin.go @@ -0,0 +1,41 @@ +package ephemeral + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "ephemeral" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Available() {} + +// JobsConstruct creates new ephemeral consumer from the configuration +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewJobBroker(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline creates new ephemeral consumer from the provided pipeline +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipeline, p.log, e, pq) +} diff --git a/plugins/jobs/drivers/amqp/amqpjobs/config.go b/plugins/jobs/drivers/amqp/amqpjobs/config.go deleted file mode 100644 index ac2f6e53..00000000 --- a/plugins/jobs/drivers/amqp/amqpjobs/config.go +++ /dev/null @@ -1,67 +0,0 @@ -package amqpjobs - -// pipeline rabbitmq info -const ( - exchangeKey string = "exchange" - exchangeType string = "exchange_type" - queue string = "queue" - routingKey string = "routing_key" - prefetch string = "prefetch" - exclusive string = "exclusive" - priority string = "priority" - multipleAsk string = "multiple_ask" - requeueOnFail string = "requeue_on_fail" - - dlx string = "x-dead-letter-exchange" - dlxRoutingKey string = "x-dead-letter-routing-key" - dlxTTL string = "x-message-ttl" - dlxExpires string = "x-expires" - - contentType string = "application/octet-stream" -) - -type GlobalCfg struct { - Addr string `mapstructure:"addr"` -} - -// Config is used to parse pipeline configuration -type Config struct { - Prefetch int `mapstructure:"prefetch"` - Queue string `mapstructure:"queue"` - Priority int64 `mapstructure:"priority"` - Exchange string `mapstructure:"exchange"` - ExchangeType string `mapstructure:"exchange_type"` - RoutingKey string `mapstructure:"routing_key"` - Exclusive bool `mapstructure:"exclusive"` - MultipleAck bool `mapstructure:"multiple_ask"` - RequeueOnFail bool `mapstructure:"requeue_on_fail"` -} - -func (c *Config) InitDefault() { - // all options should be in sync with the pipeline defaults in the FromPipeline method - if c.ExchangeType == "" { - c.ExchangeType = "direct" - } - - if c.Exchange == "" { - c.Exchange = "amqp.default" - } - - if c.Queue == "" { - c.Queue = "default" - } - - if c.Prefetch == 0 { - c.Prefetch = 10 - } - - if c.Priority == 0 { - c.Priority = 10 - } -} - -func (c *GlobalCfg) InitDefault() { - if c.Addr == "" { - c.Addr = "amqp://guest:guest@127.0.0.1:5672/" - } -} diff --git a/plugins/jobs/drivers/amqp/amqpjobs/consumer.go b/plugins/jobs/drivers/amqp/amqpjobs/consumer.go deleted file mode 100644 index 1931ceaa..00000000 --- a/plugins/jobs/drivers/amqp/amqpjobs/consumer.go +++ /dev/null @@ -1,512 +0,0 @@ -package amqpjobs - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/google/uuid" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - pluginName string = "amqp" -) - -type consumer struct { - sync.Mutex - log logger.Logger - pq priorityqueue.Queue - eh events.Handler - - pipeline atomic.Value - - // amqp connection - conn *amqp.Connection - consumeChan *amqp.Channel - publishChan chan *amqp.Channel - consumeID string - connStr string - - retryTimeout time.Duration - // - // prefetch QoS AMQP - // - prefetch int - // - // pipeline's priority - // - priority int64 - exchangeName string - queue string - exclusive bool - exchangeType string - routingKey string - multipleAck bool - requeueOnFail bool - - listeners uint32 - delayed *int64 - stopCh chan struct{} -} - -// NewAMQPConsumer initializes rabbitmq pipeline -func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_amqp_consumer") - // we need to obtain two parts of the amqp information here. - // firs part - address to connect, it is located in the global section under the amqp pluginName - // second part - queues and other pipeline information - // if no such key - error - if !cfg.Has(configKey) { - return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) - } - - // if no global section - if !cfg.Has(pluginName) { - return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) - } - - // PARSE CONFIGURATION START ------- - var pipeCfg Config - var globalCfg GlobalCfg - - err := cfg.UnmarshalKey(configKey, &pipeCfg) - if err != nil { - return nil, errors.E(op, err) - } - - pipeCfg.InitDefault() - - err = cfg.UnmarshalKey(pluginName, &globalCfg) - if err != nil { - return nil, errors.E(op, err) - } - - globalCfg.InitDefault() - // PARSE CONFIGURATION END ------- - - jb := &consumer{ - log: log, - pq: pq, - eh: e, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - // TODO to config - retryTimeout: time.Minute * 5, - priority: pipeCfg.Priority, - delayed: utils.Int64(0), - - publishChan: make(chan *amqp.Channel, 1), - routingKey: pipeCfg.RoutingKey, - queue: pipeCfg.Queue, - exchangeType: pipeCfg.ExchangeType, - exchangeName: pipeCfg.Exchange, - prefetch: pipeCfg.Prefetch, - exclusive: pipeCfg.Exclusive, - multipleAck: pipeCfg.MultipleAck, - requeueOnFail: pipeCfg.RequeueOnFail, - } - - jb.conn, err = amqp.Dial(globalCfg.Addr) - if err != nil { - return nil, errors.E(op, err) - } - - // save address - jb.connStr = globalCfg.Addr - - err = jb.initRabbitMQ() - if err != nil { - return nil, errors.E(op, err) - } - - pch, err := jb.conn.Channel() - if err != nil { - return nil, errors.E(op, err) - } - - jb.publishChan <- pch - - // run redialer and requeue listener for the connection - jb.redialer() - - return jb, nil -} - -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_amqp_consumer_from_pipeline") - // we need to obtain two parts of the amqp information here. - // firs part - address to connect, it is located in the global section under the amqp pluginName - // second part - queues and other pipeline information - - // only global section - if !cfg.Has(pluginName) { - return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) - } - - // PARSE CONFIGURATION ------- - var globalCfg GlobalCfg - - err := cfg.UnmarshalKey(pluginName, &globalCfg) - if err != nil { - return nil, errors.E(op, err) - } - - globalCfg.InitDefault() - - // PARSE CONFIGURATION ------- - - jb := &consumer{ - log: log, - eh: e, - pq: pq, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), - retryTimeout: time.Minute * 5, - delayed: utils.Int64(0), - - publishChan: make(chan *amqp.Channel, 1), - routingKey: pipeline.String(routingKey, ""), - queue: pipeline.String(queue, "default"), - exchangeType: pipeline.String(exchangeType, "direct"), - exchangeName: pipeline.String(exchangeKey, "amqp.default"), - prefetch: pipeline.Int(prefetch, 10), - priority: int64(pipeline.Int(priority, 10)), - exclusive: pipeline.Bool(exclusive, false), - multipleAck: pipeline.Bool(multipleAsk, false), - requeueOnFail: pipeline.Bool(requeueOnFail, false), - } - - jb.conn, err = amqp.Dial(globalCfg.Addr) - if err != nil { - return nil, errors.E(op, err) - } - - // save address - jb.connStr = globalCfg.Addr - - err = jb.initRabbitMQ() - if err != nil { - return nil, errors.E(op, err) - } - - pch, err := jb.conn.Channel() - if err != nil { - return nil, errors.E(op, err) - } - - jb.publishChan <- pch - - // register the pipeline - // error here is always nil - _ = jb.Register(context.Background(), pipeline) - - // run redialer for the connection - jb.redialer() - - return jb, nil -} - -func (j *consumer) Push(ctx context.Context, job *job.Job) error { - const op = errors.Op("rabbitmq_push") - // check if the pipeline registered - - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != job.Options.Pipeline { - return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) - } - - err := j.handleItem(ctx, fromJob(job)) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { - j.pipeline.Store(p) - return nil -} - -func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("rabbit_consume") - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p.Name() { - return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) - } - - // protect connection (redial) - j.Lock() - defer j.Unlock() - - var err error - j.consumeChan, err = j.conn.Channel() - if err != nil { - return errors.E(op, err) - } - - err = j.consumeChan.Qos(j.prefetch, 0, false) - if err != nil { - return errors.E(op, err) - } - - // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, - false, - false, - false, - false, - nil, - ) - if err != nil { - return errors.E(op, err) - } - - // run listener - j.listener(deliv) - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - - return nil -} - -func (j *consumer) State(ctx context.Context) (*jobState.State, error) { - const op = errors.Op("amqp_driver_state") - select { - case pch := <-j.publishChan: - defer func() { - j.publishChan <- pch - }() - - q, err := pch.QueueInspect(j.queue) - if err != nil { - return nil, errors.E(op, err) - } - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - return &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: q.Name, - Active: int64(q.Messages), - Delayed: atomic.LoadInt64(j.delayed), - Ready: ready(atomic.LoadUint32(&j.listeners)), - }, nil - - case <-ctx.Done(): - return nil, errors.E(op, errors.TimeOut, ctx.Err()) - } -} - -func (j *consumer) Pause(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested pause on: ", p) - } - - l := atomic.LoadUint32(&j.listeners) - // no active listeners - if l == 0 { - j.log.Warn("no active listeners, nothing to pause") - return - } - - atomic.AddUint32(&j.listeners, ^uint32(0)) - - // protect connection (redial) - j.Lock() - defer j.Unlock() - - err := j.consumeChan.Cancel(j.consumeID, true) - if err != nil { - j.log.Error("cancel publish channel, forcing close", "error", err) - errCl := j.consumeChan.Close() - if errCl != nil { - j.log.Error("force close failed", "error", err) - return - } - return - } - - j.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) -} - -func (j *consumer) Resume(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested resume on: ", p) - } - - // protect connection (redial) - j.Lock() - defer j.Unlock() - - l := atomic.LoadUint32(&j.listeners) - // no active listeners - if l == 1 { - j.log.Warn("amqp listener already in the active state") - return - } - - var err error - j.consumeChan, err = j.conn.Channel() - if err != nil { - j.log.Error("create channel on rabbitmq connection", "error", err) - return - } - - err = j.consumeChan.Qos(j.prefetch, 0, false) - if err != nil { - j.log.Error("qos set failed", "error", err) - return - } - - // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, - false, - false, - false, - false, - nil, - ) - if err != nil { - j.log.Error("consume operation failed", "error", err) - return - } - - // run listener - j.listener(deliv) - - // increase number of listeners - atomic.AddUint32(&j.listeners, 1) - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) -} - -func (j *consumer) Stop(context.Context) error { - j.stopCh <- struct{}{} - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - -// handleItem -func (j *consumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("rabbitmq_handle_item") - select { - case pch := <-j.publishChan: - // return the channel back - defer func() { - j.publishChan <- pch - }() - - // convert - table, err := pack(msg.ID(), msg) - if err != nil { - return errors.E(op, err) - } - - const op = errors.Op("rabbitmq_handle_item") - // handle timeouts - if msg.Options.DelayDuration() > 0 { - atomic.AddInt64(j.delayed, 1) - // TODO declare separate method for this if condition - // TODO dlx cache channel?? - delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) - if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) - return errors.E(op, err) - } - - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) - return errors.E(op, err) - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now().UTC(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) - return errors.E(op, err) - } - - return nil - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - return nil - case <-ctx.Done(): - return errors.E(op, errors.TimeOut, ctx.Err()) - } -} - -func ready(r uint32) bool { - return r > 0 -} diff --git a/plugins/jobs/drivers/amqp/amqpjobs/item.go b/plugins/jobs/drivers/amqp/amqpjobs/item.go deleted file mode 100644 index a8e305ea..00000000 --- a/plugins/jobs/drivers/amqp/amqpjobs/item.go +++ /dev/null @@ -1,239 +0,0 @@ -package amqpjobs - -import ( - "context" - "fmt" - "sync/atomic" - "time" - - json "github.com/json-iterator/go" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" -) - -type Item struct { - // Job contains pluginName of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // private - // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery - ack func(multiply bool) error - - // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. - // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. - // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. - // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time - nack func(multiply bool, requeue bool) error - - // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error - delayed *int64 - multipleAsk bool - requeue bool -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -// Body packs job payload into binary payload. -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -// Context packs job context (job, id) into binary payload. -// Not used in the amqp, amqp.Table used instead -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - if i.Options.Delay > 0 { - atomic.AddInt64(i.Options.delayed, ^int64(0)) - } - return i.Options.ack(i.Options.multipleAsk) -} - -func (i *Item) Nack() error { - if i.Options.Delay > 0 { - atomic.AddInt64(i.Options.delayed, ^int64(0)) - } - return i.Options.nack(false, i.Options.requeue) -} - -// Requeue with the provided delay, handled by the Nack -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - if i.Options.Delay > 0 { - atomic.AddInt64(i.Options.delayed, ^int64(0)) - } - // overwrite the delay - i.Options.Delay = delay - i.Headers = headers - - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - errNack := i.Options.nack(false, true) - if errNack != nil { - return fmt.Errorf("requeue error: %v\nack error: %v", err, errNack) - } - - return err - } - - // ack the job - err = i.Options.ack(false) - if err != nil { - return err - } - - return nil -} - -// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ -func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { - const op = errors.Op("from_delivery_convert") - item, err := j.unpack(d) - if err != nil { - return nil, errors.E(op, err) - } - - i := &Item{ - Job: item.Job, - Ident: item.Ident, - Payload: item.Payload, - Headers: item.Headers, - Options: item.Options, - } - - item.Options.ack = d.Ack - item.Options.nack = d.Nack - item.Options.delayed = j.delayed - - // requeue func - item.Options.requeueFn = j.handleItem - return i, nil -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Headers: job.Headers, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} - -// pack job metadata into headers -func pack(id string, j *Item) (amqp.Table, error) { - headers, err := json.Marshal(j.Headers) - if err != nil { - return nil, err - } - return amqp.Table{ - job.RRID: id, - job.RRJob: j.Job, - job.RRPipeline: j.Options.Pipeline, - job.RRHeaders: headers, - job.RRDelay: j.Options.Delay, - job.RRPriority: j.Options.Priority, - }, nil -} - -// unpack restores jobs.Options -func (j *consumer) unpack(d amqp.Delivery) (*Item, error) { - item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ - multipleAsk: j.multipleAck, - requeue: j.requeueOnFail, - requeueFn: j.handleItem, - }} - - if _, ok := d.Headers[job.RRID].(string); !ok { - return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID)) - } - - item.Ident = d.Headers[job.RRID].(string) - - if _, ok := d.Headers[job.RRJob].(string); !ok { - return nil, errors.E(errors.Errorf("missing header `%s`", job.RRJob)) - } - - item.Job = d.Headers[job.RRJob].(string) - - if _, ok := d.Headers[job.RRPipeline].(string); ok { - item.Options.Pipeline = d.Headers[job.RRPipeline].(string) - } - - if h, ok := d.Headers[job.RRHeaders].([]byte); ok { - err := json.Unmarshal(h, &item.Headers) - if err != nil { - return nil, err - } - } - - if _, ok := d.Headers[job.RRDelay].(int64); ok { - item.Options.Delay = d.Headers[job.RRDelay].(int64) - } - - if _, ok := d.Headers[job.RRPriority]; !ok { - // set pipe's priority - item.Options.Priority = j.priority - } else { - item.Options.Priority = d.Headers[job.RRPriority].(int64) - } - - return item, nil -} diff --git a/plugins/jobs/drivers/amqp/amqpjobs/listener.go b/plugins/jobs/drivers/amqp/amqpjobs/listener.go deleted file mode 100644 index 0156d55c..00000000 --- a/plugins/jobs/drivers/amqp/amqpjobs/listener.go +++ /dev/null @@ -1,25 +0,0 @@ -package amqpjobs - -import amqp "github.com/rabbitmq/amqp091-go" - -func (j *consumer) listener(deliv <-chan amqp.Delivery) { - go func() { - for { //nolint:gosimple - select { - case msg, ok := <-deliv: - if !ok { - j.log.Info("delivery channel closed, leaving the rabbit listener") - return - } - - d, err := j.fromDelivery(msg) - if err != nil { - j.log.Error("amqp delivery convert", "error", err) - continue - } - // insert job into the main priority queue - j.pq.Insert(d) - } - } - }() -} diff --git a/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go deleted file mode 100644 index e260fabe..00000000 --- a/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go +++ /dev/null @@ -1,57 +0,0 @@ -package amqpjobs - -import ( - "github.com/spiral/errors" -) - -func (j *consumer) initRabbitMQ() error { - const op = errors.Op("jobs_plugin_rmq_init") - // Channel opens a unique, concurrent server channel to process the bulk of AMQP - // messages. Any error from methods on this receiver will render the receiver - // invalid and a new Channel should be opened. - channel, err := j.conn.Channel() - if err != nil { - return errors.E(op, err) - } - - // declare an exchange (idempotent operation) - err = channel.ExchangeDeclare( - j.exchangeName, - j.exchangeType, - true, - false, - false, - false, - nil, - ) - if err != nil { - return errors.E(op, err) - } - - // verify or declare a queue - q, err := channel.QueueDeclare( - j.queue, - false, - false, - j.exclusive, - false, - nil, - ) - if err != nil { - return errors.E(op, err) - } - - // bind queue to the exchange - err = channel.QueueBind( - q.Name, - j.routingKey, - j.exchangeName, - false, - nil, - ) - if err != nil { - return errors.E(op, err) - } - - return channel.Close() -} diff --git a/plugins/jobs/drivers/amqp/amqpjobs/redial.go b/plugins/jobs/drivers/amqp/amqpjobs/redial.go deleted file mode 100644 index 0835e3ea..00000000 --- a/plugins/jobs/drivers/amqp/amqpjobs/redial.go +++ /dev/null @@ -1,141 +0,0 @@ -package amqpjobs - -import ( - "time" - - "github.com/cenkalti/backoff/v4" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" -) - -// redialer used to redial to the rabbitmq in case of the connection interrupts -func (j *consumer) redialer() { //nolint:gocognit - go func() { - const op = errors.Op("rabbitmq_redial") - - for { - select { - case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): - if err == nil { - return - } - - j.Lock() - - // trash the broken publishing channel - <-j.publishChan - - t := time.Now() - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeError, - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Error: err, - Start: time.Now(), - }) - - expb := backoff.NewExponentialBackOff() - // set the retry timeout (minutes) - expb.MaxElapsedTime = j.retryTimeout - operation := func() error { - j.log.Warn("rabbitmq reconnecting, caused by", "error", err) - var dialErr error - j.conn, dialErr = amqp.Dial(j.connStr) - if dialErr != nil { - return errors.E(op, dialErr) - } - - j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") - - // re-init connection - errInit := j.initRabbitMQ() - if errInit != nil { - j.log.Error("rabbitmq dial", "error", errInit) - return errInit - } - - // redeclare consume channel - var errConnCh error - j.consumeChan, errConnCh = j.conn.Channel() - if errConnCh != nil { - return errors.E(op, errConnCh) - } - - // redeclare publish channel - pch, errPubCh := j.conn.Channel() - if errPubCh != nil { - return errors.E(op, errPubCh) - } - - // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, - false, - false, - false, - false, - nil, - ) - if err != nil { - return errors.E(op, err) - } - - // put the fresh publishing channel - j.publishChan <- pch - // restart listener - j.listener(deliv) - - j.log.Info("queues and subscribers redeclared successfully") - - return nil - } - - retryErr := backoff.Retry(operation, expb) - if retryErr != nil { - j.Unlock() - j.log.Error("backoff failed", "error", retryErr) - return - } - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Start: t, - Elapsed: time.Since(t), - }) - - j.Unlock() - - case <-j.stopCh: - if j.publishChan != nil { - pch := <-j.publishChan - err := pch.Close() - if err != nil { - j.log.Error("publish channel close", "error", err) - } - } - - if j.consumeChan != nil { - err := j.consumeChan.Close() - if err != nil { - j.log.Error("consume channel close", "error", err) - } - } - if j.conn != nil { - err := j.conn.Close() - if err != nil { - j.log.Error("amqp connection close", "error", err) - } - } - - return - } - } - }() -} diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go deleted file mode 100644 index 8797d20b..00000000 --- a/plugins/jobs/drivers/amqp/plugin.go +++ /dev/null @@ -1,41 +0,0 @@ -package amqp - -import ( - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp/amqpjobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - pluginName string = "amqp" -) - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Name() string { - return pluginName -} - -func (p *Plugin) Available() {} - -func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return amqpjobs.NewAMQPConsumer(configKey, p.log, p.cfg, e, pq) -} - -// FromPipeline constructs AMQP driver from pipeline -func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return amqpjobs.FromPipeline(pipe, p.log, p.cfg, e, pq) -} diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go deleted file mode 100644 index a8069f5d..00000000 --- a/plugins/jobs/drivers/beanstalk/config.go +++ /dev/null @@ -1,53 +0,0 @@ -package beanstalk - -import ( - "time" - - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - tubePriority string = "tube_priority" - tube string = "tube" - reserveTimeout string = "reserve_timeout" -) - -type GlobalCfg struct { - Addr string `mapstructure:"addr"` - Timeout time.Duration `mapstructure:"timeout"` -} - -func (c *GlobalCfg) InitDefault() { - if c.Addr == "" { - c.Addr = "tcp://127.0.0.1:11300" - } - - if c.Timeout == 0 { - c.Timeout = time.Second * 30 - } -} - -type Config struct { - PipePriority int64 `mapstructure:"priority"` - TubePriority *uint32 `mapstructure:"tube_priority"` - Tube string `mapstructure:"tube"` - ReserveTimeout time.Duration `mapstructure:"reserve_timeout"` -} - -func (c *Config) InitDefault() { - if c.Tube == "" { - c.Tube = "default" - } - - if c.ReserveTimeout == 0 { - c.ReserveTimeout = time.Second * 1 - } - - if c.TubePriority == nil { - c.TubePriority = utils.Uint32(0) - } - - if c.PipePriority == 0 { - c.PipePriority = 10 - } -} diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go deleted file mode 100644 index d3241b37..00000000 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ /dev/null @@ -1,223 +0,0 @@ -package beanstalk - -import ( - "context" - "net" - "sync" - "time" - - "github.com/beanstalkd/go-beanstalk" - "github.com/cenkalti/backoff/v4" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type ConnPool struct { - sync.RWMutex - - log logger.Logger - - conn *beanstalk.Conn - connT *beanstalk.Conn - ts *beanstalk.TubeSet - t *beanstalk.Tube - - network string - address string - tName string - tout time.Duration -} - -func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) { - connT, err := beanstalk.DialTimeout(network, address, tout) - if err != nil { - return nil, err - } - - connTS, err := beanstalk.DialTimeout(network, address, tout) - if err != nil { - return nil, err - } - - tube := beanstalk.NewTube(connT, tName) - ts := beanstalk.NewTubeSet(connTS, tName) - - return &ConnPool{ - log: log, - network: network, - address: address, - tName: tName, - tout: tout, - conn: connTS, - connT: connT, - ts: ts, - t: tube, - }, nil -} - -// Put the payload -// TODO use the context ?? -func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { - cp.RLock() - defer cp.RUnlock() - - // TODO(rustatian): redial based on the token - id, err := cp.t.Put(body, pri, delay, ttr) - if err != nil { - // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(err) - if errN != nil { - return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN) - } else { - // retry put only when we redialed - return cp.t.Put(body, pri, delay, ttr) - } - } - - return id, nil -} - -// Reserve reserves and returns a job from one of the tubes in t. If no -// job is available before time timeout has passed, Reserve returns a -// ConnError recording ErrTimeout. -// -// Typically, a client will reserve a job, perform some work, then delete -// the job with Conn.Delete. -func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { - cp.RLock() - defer cp.RUnlock() - - id, body, err := cp.ts.Reserve(reserveTimeout) - if err != nil { - // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(err) - if errN != nil { - return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN) - } else { - // retry Reserve only when we redialed - return cp.ts.Reserve(reserveTimeout) - } - } - - return id, body, nil -} - -func (cp *ConnPool) Delete(_ context.Context, id uint64) error { - cp.RLock() - defer cp.RUnlock() - - err := cp.conn.Delete(id) - if err != nil { - // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(err) - if errN != nil { - return errors.Errorf("err: %s\nerr redial: %s", err, errN) - } else { - // retry Delete only when we redialed - return cp.conn.Delete(id) - } - } - return nil -} - -func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) { - cp.RLock() - defer cp.RUnlock() - - stat, err := cp.conn.Stats() - if err != nil { - errR := cp.checkAndRedial(err) - if errR != nil { - return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR) - } else { - return cp.conn.Stats() - } - } - - return stat, nil -} - -func (cp *ConnPool) redial() error { - const op = errors.Op("connection_pool_redial") - - cp.Lock() - // backoff here - expb := backoff.NewExponentialBackOff() - // TODO(rustatian) set via config - expb.MaxElapsedTime = time.Minute - - operation := func() error { - connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) - if err != nil { - return err - } - if connT == nil { - return errors.E(op, errors.Str("connectionT is nil")) - } - - connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) - if err != nil { - return err - } - - if connTS == nil { - return errors.E(op, errors.Str("connectionTS is nil")) - } - - cp.t = beanstalk.NewTube(connT, cp.tName) - cp.ts = beanstalk.NewTubeSet(connTS, cp.tName) - cp.conn = connTS - cp.connT = connT - - cp.log.Info("beanstalk redial was successful") - return nil - } - - retryErr := backoff.Retry(operation, expb) - if retryErr != nil { - cp.Unlock() - return retryErr - } - cp.Unlock() - - return nil -} - -var connErrors = map[string]struct{}{"EOF": {}} - -func (cp *ConnPool) checkAndRedial(err error) error { - const op = errors.Op("connection_pool_check_redial") - switch et := err.(type) { //nolint:gocritic - // check if the error - case beanstalk.ConnError: - switch bErr := et.Err.(type) { - case *net.OpError: - cp.RUnlock() - errR := cp.redial() - cp.RLock() - // if redial failed - return - if errR != nil { - return errors.E(op, errors.Errorf("%v:%v", bErr, errR)) - } - - // if redial was successful -> continue listening - return nil - default: - if _, ok := connErrors[et.Err.Error()]; ok { - // if error is related to the broken connection - redial - cp.RUnlock() - errR := cp.redial() - cp.RLock() - // if redial failed - return - if errR != nil { - return errors.E(op, errors.Errorf("%v:%v", err, errR)) - } - // if redial was successful -> continue listening - return nil - } - } - } - - // return initial error - return err -} diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go deleted file mode 100644 index 5ef89983..00000000 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ /dev/null @@ -1,360 +0,0 @@ -package beanstalk - -import ( - "bytes" - "context" - "strconv" - "strings" - "sync/atomic" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -type consumer struct { - log logger.Logger - eh events.Handler - pq priorityqueue.Queue - - pipeline atomic.Value - listeners uint32 - - // beanstalk - pool *ConnPool - addr string - network string - reserveTimeout time.Duration - reconnectCh chan struct{} - tout time.Duration - // tube name - tName string - tubePriority *uint32 - priority int64 - - stopCh chan struct{} - requeueCh chan *Item -} - -func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_beanstalk_consumer") - - // PARSE CONFIGURATION ------- - var pipeCfg Config - var globalCfg GlobalCfg - - if !cfg.Has(configKey) { - return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) - } - - // if no global section - if !cfg.Has(pluginName) { - return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) - } - - err := cfg.UnmarshalKey(configKey, &pipeCfg) - if err != nil { - return nil, errors.E(op, err) - } - - pipeCfg.InitDefault() - - err = cfg.UnmarshalKey(pluginName, &globalCfg) - if err != nil { - return nil, errors.E(op, err) - } - - globalCfg.InitDefault() - - // PARSE CONFIGURATION ------- - - dsn := strings.Split(globalCfg.Addr, "://") - if len(dsn) != 2 { - return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) - } - - cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log) - if err != nil { - return nil, errors.E(op, err) - } - - // initialize job consumer - jc := &consumer{ - pq: pq, - log: log, - eh: e, - pool: cPool, - network: dsn[0], - addr: dsn[1], - tout: globalCfg.Timeout, - tName: pipeCfg.Tube, - reserveTimeout: pipeCfg.ReserveTimeout, - tubePriority: pipeCfg.TubePriority, - priority: pipeCfg.PipePriority, - - // buffered with two because jobs root plugin can call Stop at the same time as Pause - stopCh: make(chan struct{}, 2), - requeueCh: make(chan *Item, 1000), - reconnectCh: make(chan struct{}, 2), - } - - return jc, nil -} - -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_beanstalk_consumer") - - // PARSE CONFIGURATION ------- - var globalCfg GlobalCfg - - // if no global section - if !cfg.Has(pluginName) { - return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) - } - - err := cfg.UnmarshalKey(pluginName, &globalCfg) - if err != nil { - return nil, errors.E(op, err) - } - - globalCfg.InitDefault() - - // PARSE CONFIGURATION ------- - - dsn := strings.Split(globalCfg.Addr, "://") - if len(dsn) != 2 { - return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) - } - - cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log) - if err != nil { - return nil, errors.E(op, err) - } - - // initialize job consumer - jc := &consumer{ - pq: pq, - log: log, - eh: e, - pool: cPool, - network: dsn[0], - addr: dsn[1], - tout: globalCfg.Timeout, - tName: pipe.String(tube, "default"), - reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), - tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))), - priority: pipe.Priority(), - - // buffered with two because jobs root plugin can call Stop at the same time as Pause - stopCh: make(chan struct{}, 2), - requeueCh: make(chan *Item, 1000), - reconnectCh: make(chan struct{}, 2), - } - - return jc, nil -} -func (j *consumer) Push(ctx context.Context, jb *job.Job) error { - const op = errors.Op("beanstalk_push") - // check if the pipeline registered - - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != jb.Options.Pipeline { - return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) - } - - err := j.handleItem(ctx, fromJob(jb)) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (j *consumer) handleItem(ctx context.Context, item *Item) error { - const op = errors.Op("beanstalk_handle_item") - - bb := new(bytes.Buffer) - bb.Grow(64) - err := item.pack(bb) - if err != nil { - return errors.E(op, err) - } - - // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458 - // is an integer < 2**32. Jobs with smaller priority values will be - // scheduled before jobs with larger priorities. The most urgent priority is 0; - // the least urgent priority is 4,294,967,295. - // - // is an integer number of seconds to wait before putting the job in - // the ready queue. The job will be in the "delayed" state during this time. - // Maximum delay is 2**32-1. - // - // -- time to run -- is an integer number of seconds to allow a worker - // to run this job. This time is counted from the moment a worker reserves - // this job. If the worker does not delete, release, or bury the job within - // seconds, the job will time out and the server will release the job. - // The minimum ttr is 1. If the client sends 0, the server will silently - // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout) - if err != nil { - errD := j.pool.Delete(ctx, id) - if errD != nil { - return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error())) - } - return errors.E(op, err) - } - - return nil -} - -func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { - // register the pipeline - j.pipeline.Store(p) - return nil -} - -// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 -func (j *consumer) State(ctx context.Context) (*jobState.State, error) { - const op = errors.Op("beanstalk_state") - stat, err := j.pool.Stats(ctx) - if err != nil { - return nil, errors.E(op, err) - } - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - out := &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: j.tName, - Ready: ready(atomic.LoadUint32(&j.listeners)), - } - - // set stat, skip errors (replace with 0) - // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523 - if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil { - out.Active = int64(v) - } - - // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525 - if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil { - // this is not an error, reserved in beanstalk behaves like an active jobs - out.Reserved = int64(v) - } - - // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528 - if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil { - out.Delayed = int64(v) - } - - return out, nil -} - -func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("beanstalk_run") - // check if the pipeline registered - - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p.Name() { - return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name())) - } - - atomic.AddUint32(&j.listeners, 1) - - go j.listen() - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - - return nil -} - -func (j *consumer) Stop(context.Context) error { - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - if atomic.LoadUint32(&j.listeners) == 1 { - j.stopCh <- struct{}{} - } - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - - return nil -} - -func (j *consumer) Pause(_ context.Context, p string) { - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) - return - } - - l := atomic.LoadUint32(&j.listeners) - // no active listeners - if l == 0 { - j.log.Warn("no active listeners, nothing to pause") - return - } - - atomic.AddUint32(&j.listeners, ^uint32(0)) - - j.stopCh <- struct{}{} - - j.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) -} - -func (j *consumer) Resume(_ context.Context, p string) { - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) - return - } - - l := atomic.LoadUint32(&j.listeners) - // no active listeners - if l == 1 { - j.log.Warn("sqs listener already in the active state") - return - } - - // start listener - go j.listen() - - // increase num of listeners - atomic.AddUint32(&j.listeners, 1) - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) -} - -func ready(r uint32) bool { - return r > 0 -} diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/jobs/drivers/beanstalk/encode_test.go deleted file mode 100644 index e43207eb..00000000 --- a/plugins/jobs/drivers/beanstalk/encode_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package beanstalk - -import ( - "bytes" - "crypto/rand" - "encoding/gob" - "testing" - - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/utils" -) - -func BenchmarkEncodeGob(b *testing.B) { - tb := make([]byte, 1024*10) - _, err := rand.Read(tb) - if err != nil { - b.Fatal(err) - } - - item := &Item{ - Job: "/super/test/php/class/loooooong", - Ident: "12341234-asdfasdfa-1234234-asdfasdfas", - Payload: utils.AsString(tb), - Headers: map[string][]string{"Test": {"test1", "test2"}}, - Options: &Options{ - Priority: 10, - Pipeline: "test-local-pipe", - Delay: 10, - }, - } - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - bb := new(bytes.Buffer) - err := gob.NewEncoder(bb).Encode(item) - if err != nil { - b.Fatal(err) - } - _ = bb.Bytes() - bb.Reset() - } -} - -func BenchmarkEncodeJsonIter(b *testing.B) { - tb := make([]byte, 1024*10) - _, err := rand.Read(tb) - if err != nil { - b.Fatal(err) - } - - item := &Item{ - Job: "/super/test/php/class/loooooong", - Ident: "12341234-asdfasdfa-1234234-asdfasdfas", - Payload: utils.AsString(tb), - Headers: map[string][]string{"Test": {"test1", "test2"}}, - Options: &Options{ - Priority: 10, - Pipeline: "test-local-pipe", - Delay: 10, - }, - } - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - bb, err := json.Marshal(item) - if err != nil { - b.Fatal(err) - } - _ = bb - } -} diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go deleted file mode 100644 index 0a6cd560..00000000 --- a/plugins/jobs/drivers/beanstalk/item.go +++ /dev/null @@ -1,147 +0,0 @@ -package beanstalk - -import ( - "bytes" - "context" - "encoding/gob" - "time" - - "github.com/beanstalkd/go-beanstalk" - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" -) - -type Item struct { - // Job contains pluginName of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // Private ================ - id uint64 - conn *beanstalk.Conn - requeueFn func(context.Context, *Item) error -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -// Body packs job payload into binary payload. -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -// Context packs job context (job, id) into binary payload. -// Not used in the sqs, MessageAttributes used instead -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - return i.Options.conn.Delete(i.Options.id) -} - -func (i *Item) Nack() error { - return i.Options.conn.Delete(i.Options.id) -} - -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - // overwrite the delay - i.Options.Delay = delay - i.Headers = headers - - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - return err - } - - // delete old job - err = i.Options.conn.Delete(i.Options.id) - if err != nil { - return err - } - - return nil -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Headers: job.Headers, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} - -func (i *Item) pack(b *bytes.Buffer) error { - err := gob.NewEncoder(b).Encode(i) - if err != nil { - return err - } - - return nil -} - -func (j *consumer) unpack(id uint64, data []byte, out *Item) error { - err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) - if err != nil { - return err - } - out.Options.conn = j.pool.conn - out.Options.id = id - out.Options.requeueFn = j.handleItem - - return nil -} diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go deleted file mode 100644 index 6bb159ea..00000000 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ /dev/null @@ -1,39 +0,0 @@ -package beanstalk - -import ( - "github.com/beanstalkd/go-beanstalk" -) - -func (j *consumer) listen() { - for { - select { - case <-j.stopCh: - j.log.Warn("beanstalk listener stopped") - return - default: - id, body, err := j.pool.Reserve(j.reserveTimeout) - if err != nil { - if errB, ok := err.(beanstalk.ConnError); ok { - switch errB.Err { //nolint:gocritic - case beanstalk.ErrTimeout: - j.log.Info("beanstalk reserve timeout", "warn", errB.Op) - continue - } - } - // in case of other error - continue - j.log.Error("beanstalk reserve", "error", err) - continue - } - - item := &Item{} - err = j.unpack(id, body, item) - if err != nil { - j.log.Error("beanstalk unpack item", "error", err) - continue - } - - // insert job into the priority queue - j.pq.Insert(item) - } - } -} diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/jobs/drivers/beanstalk/plugin.go deleted file mode 100644 index 529d1474..00000000 --- a/plugins/jobs/drivers/beanstalk/plugin.go +++ /dev/null @@ -1,47 +0,0 @@ -package beanstalk - -import ( - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - pluginName string = "beanstalk" -) - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Serve() chan error { - return make(chan error) -} - -func (p *Plugin) Stop() error { - return nil -} - -func (p *Plugin) Name() string { - return pluginName -} - -func (p *Plugin) Available() {} - -func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewBeanstalkConsumer(configKey, p.log, p.cfg, eh, pq) -} - -func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipe, p.log, p.cfg, eh, pq) -} diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go deleted file mode 100644 index 91b8eda9..00000000 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ /dev/null @@ -1,274 +0,0 @@ -package ephemeral - -import ( - "context" - "sync/atomic" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - prefetch string = "prefetch" - goroutinesMax uint64 = 1000 -) - -type Config struct { - Prefetch uint64 `mapstructure:"prefetch"` -} - -type consumer struct { - cfg *Config - log logger.Logger - eh events.Handler - pipeline atomic.Value - pq priorityqueue.Queue - localPrefetch chan *Item - - // time.sleep goroutines max number - goroutines uint64 - - delayed *int64 - active *int64 - - listeners uint32 - stopCh chan struct{} -} - -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_ephemeral_pipeline") - - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), - } - - err := cfg.UnmarshalKey(configKey, &jb.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100_000 - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) - - return jb, nil -} - -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - - return jb, nil -} - -func (j *consumer) Push(ctx context.Context, jb *job.Job) error { - const op = errors.Op("ephemeral_push") - - // check if the pipeline registered - _, ok := j.pipeline.Load().(*pipeline.Pipeline) - if !ok { - return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) - } - - err := j.handleItem(ctx, fromJob(jb)) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (j *consumer) State(_ context.Context) (*jobState.State, error) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) - return &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: pipe.Name(), - Active: atomic.LoadInt64(j.active), - Delayed: atomic.LoadInt64(j.delayed), - Ready: ready(atomic.LoadUint32(&j.listeners)), - }, nil -} - -func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - j.pipeline.Store(pipeline) - return nil -} - -func (j *consumer) Pause(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested pause on: ", p) - } - - l := atomic.LoadUint32(&j.listeners) - // no active listeners - if l == 0 { - j.log.Warn("no active listeners, nothing to pause") - return - } - - atomic.AddUint32(&j.listeners, ^uint32(0)) - - // stop the consumer - j.stopCh <- struct{}{} - - j.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) -} - -func (j *consumer) Resume(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested resume on: ", p) - } - - l := atomic.LoadUint32(&j.listeners) - // listener already active - if l == 1 { - j.log.Warn("listener already in the active state") - return - } - - // resume the consumer on the same channel - j.consume() - - atomic.StoreUint32(&j.listeners, 1) - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) -} - -// Run is no-op for the ephemeral -func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - -func (j *consumer) Stop(ctx context.Context) error { - const op = errors.Op("ephemeral_plugin_stop") - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - select { - // return from the consumer - case j.stopCh <- struct{}{}: - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) - - return nil - - case <-ctx.Done(): - return errors.E(op, ctx.Err()) - } -} - -func (j *consumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("ephemeral_handle_request") - // handle timeouts - // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) - // goroutines here. We should limit goroutines here. - if msg.Options.Delay > 0 { - // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&j.goroutines) >= goroutinesMax { - return errors.E(op, errors.Str("max concurrency number reached")) - } - - go func(jj *Item) { - atomic.AddUint64(&j.goroutines, 1) - atomic.AddInt64(j.delayed, 1) - - time.Sleep(jj.Options.DelayDuration()) - - // send the item after timeout expired - j.localPrefetch <- jj - - atomic.AddUint64(&j.goroutines, ^uint64(0)) - }(msg) - - return nil - } - - // increase number of the active jobs - atomic.AddInt64(j.active, 1) - - // insert to the local, limited pipeline - select { - case j.localPrefetch <- msg: - return nil - case <-ctx.Done(): - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) - } -} - -func (j *consumer) consume() { - go func() { - // redirect - for { - select { - case item, ok := <-j.localPrefetch: - if !ok { - j.log.Warn("ephemeral local prefetch queue was closed") - return - } - - // set requeue channel - item.Options.requeueFn = j.handleItem - item.Options.active = j.active - item.Options.delayed = j.delayed - - j.pq.Insert(item) - case <-j.stopCh: - return - } - } - }() -} - -func ready(r uint32) bool { - return r > 0 -} diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go deleted file mode 100644 index 3298424d..00000000 --- a/plugins/jobs/drivers/ephemeral/item.go +++ /dev/null @@ -1,133 +0,0 @@ -package ephemeral - -import ( - "context" - "sync/atomic" - "time" - - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" -) - -type Item struct { - // Job contains name of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // private - requeueFn func(context.Context, *Item) error - active *int64 - delayed *int64 -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -// Body packs job payload into binary payload. -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -// Context packs job context (job, id) into binary payload. -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - i.atomicallyReduceCount() - return nil -} - -func (i *Item) Nack() error { - i.atomicallyReduceCount() - return nil -} - -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - // overwrite the delay - i.Options.Delay = delay - i.Headers = headers - - i.atomicallyReduceCount() - - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - return err - } - - return nil -} - -// atomicallyReduceCount reduces counter of active or delayed jobs -func (i *Item) atomicallyReduceCount() { - // if job was delayed, reduce number of the delayed jobs - if i.Options.Delay > 0 { - atomic.AddInt64(i.Options.delayed, ^int64(0)) - return - } - - // otherwise, reduce number of the active jobs - atomic.AddInt64(i.Options.active, ^int64(0)) - // noop for the in-memory -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} diff --git a/plugins/jobs/drivers/ephemeral/plugin.go b/plugins/jobs/drivers/ephemeral/plugin.go deleted file mode 100644 index 28495abb..00000000 --- a/plugins/jobs/drivers/ephemeral/plugin.go +++ /dev/null @@ -1,41 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "ephemeral" -) - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Available() {} - -// JobsConstruct creates new ephemeral consumer from the configuration -func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(configKey, p.log, p.cfg, e, pq) -} - -// FromPipeline creates new ephemeral consumer from the provided pipeline -func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipeline, p.log, e, pq) -} diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go deleted file mode 100644 index 9b2a1ca8..00000000 --- a/plugins/jobs/drivers/sqs/config.go +++ /dev/null @@ -1,114 +0,0 @@ -package sqs - -import "github.com/aws/aws-sdk-go-v2/aws" - -const ( - attributes string = "attributes" - tags string = "tags" - queue string = "queue" - pref string = "prefetch" - visibility string = "visibility_timeout" - waitTime string = "wait_time" -) - -type GlobalCfg struct { - Key string `mapstructure:"key"` - Secret string `mapstructure:"secret"` - Region string `mapstructure:"region"` - SessionToken string `mapstructure:"session_token"` - Endpoint string `mapstructure:"endpoint"` -} - -// Config is used to parse pipeline configuration -type Config struct { - // The duration (in seconds) that the received messages are hidden from subsequent - // retrieve requests after being retrieved by a ReceiveMessage request. - VisibilityTimeout int32 `mapstructure:"visibility_timeout"` - // The duration (in seconds) for which the call waits for a message to arrive - // in the queue before returning. If a message is available, the call returns - // sooner than WaitTimeSeconds. If no messages are available and the wait time - // expires, the call returns successfully with an empty list of messages. - WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"` - // Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages - // than this value (however, fewer messages might be returned). Valid values: 1 to - // 10. Default: 1. - Prefetch int32 `mapstructure:"prefetch"` - // The name of the new queue. The following limits apply to this name: - // - // * A queue - // name can have up to 80 characters. - // - // * Valid values: alphanumeric characters, - // hyphens (-), and underscores (_). - // - // * A FIFO queue name must end with the .fifo - // suffix. - // - // Queue URLs and names are case-sensitive. - // - // This member is required. - Queue *string `mapstructure:"queue"` - - // A map of attributes with their corresponding values. The following lists the - // names, descriptions, and values of the special request parameters that the - // CreateQueue action uses. - // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html - Attributes map[string]string `mapstructure:"attributes"` - - // From amazon docs: - // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see - // Tagging Your Amazon SQS Queues - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html) - // in the Amazon SQS Developer Guide. When you use queue tags, keep the following - // guidelines in mind: - // - // * Adding more than 50 tags to a queue isn't recommended. - // - // * - // Tags don't have any semantic meaning. Amazon SQS interprets tags as character - // strings. - // - // * Tags are case-sensitive. - // - // * A new tag with a key identical to that - // of an existing tag overwrites the existing tag. - // - // For a full list of tag - // restrictions, see Quotas related to queues - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues) - // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you - // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account - // permissions don't apply to this action. For more information, see Grant - // cross-account permissions to a role and a user name - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name) - // in the Amazon SQS Developer Guide. - Tags map[string]string `mapstructure:"tags"` -} - -func (c *GlobalCfg) InitDefault() { - if c.Endpoint == "" { - c.Endpoint = "http://127.0.0.1:9324" - } -} - -func (c *Config) InitDefault() { - if c.Queue == nil { - c.Queue = aws.String("default") - } - - if c.Prefetch == 0 || c.Prefetch > 10 { - c.Prefetch = 10 - } - - if c.WaitTimeSeconds == 0 { - c.WaitTimeSeconds = 5 - } - - if c.Attributes == nil { - c.Attributes = make(map[string]string) - } - - if c.Tags == nil { - c.Tags = make(map[string]string) - } -} diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go deleted file mode 100644 index 23203190..00000000 --- a/plugins/jobs/drivers/sqs/consumer.go +++ /dev/null @@ -1,411 +0,0 @@ -package sqs - -import ( - "context" - "strconv" - "sync" - "sync/atomic" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/aws/retry" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "github.com/google/uuid" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type consumer struct { - sync.Mutex - pq priorityqueue.Queue - log logger.Logger - eh events.Handler - pipeline atomic.Value - - // connection info - key string - secret string - sessionToken string - region string - endpoint string - queue *string - messageGroupID string - waitTime int32 - prefetch int32 - visibilityTimeout int32 - - // if user invoke several resume operations - listeners uint32 - - // queue optional parameters - attributes map[string]string - tags map[string]string - - client *sqs.Client - queueURL *string - - pauseCh chan struct{} -} - -func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_sqs_consumer") - - // if no such key - error - if !cfg.Has(configKey) { - return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) - } - - // if no global section - if !cfg.Has(pluginName) { - return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) - } - - // PARSE CONFIGURATION ------- - var pipeCfg Config - var globalCfg GlobalCfg - - err := cfg.UnmarshalKey(configKey, &pipeCfg) - if err != nil { - return nil, errors.E(op, err) - } - - pipeCfg.InitDefault() - - err = cfg.UnmarshalKey(pluginName, &globalCfg) - if err != nil { - return nil, errors.E(op, err) - } - - globalCfg.InitDefault() - - // initialize job consumer - jb := &consumer{ - pq: pq, - log: log, - eh: e, - messageGroupID: uuid.NewString(), - attributes: pipeCfg.Attributes, - tags: pipeCfg.Tags, - queue: pipeCfg.Queue, - prefetch: pipeCfg.Prefetch, - visibilityTimeout: pipeCfg.VisibilityTimeout, - waitTime: pipeCfg.WaitTimeSeconds, - region: globalCfg.Region, - key: globalCfg.Key, - sessionToken: globalCfg.SessionToken, - secret: globalCfg.Secret, - endpoint: globalCfg.Endpoint, - pauseCh: make(chan struct{}, 1), - } - - // PARSE CONFIGURATION ------- - - awsConf, err := config.LoadDefaultConfig(context.Background(), - config.WithRegion(globalCfg.Region), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken))) - if err != nil { - return nil, errors.E(op, err) - } - - // config with retries - jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) { - o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { - opts.MaxAttempts = 60 - }) - }) - - out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) - if err != nil { - return nil, errors.E(op, err) - } - - // assign a queue URL - jb.queueURL = out.QueueUrl - - // To successfully create a new queue, you must provide a - // queue name that adheres to the limits related to queues - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) - // and is unique within the scope of your queues. After you create a queue, you - // must wait at least one second after the queue is created to be able to use the <------------ - // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require - time.Sleep(time.Second * 2) - - return jb, nil -} - -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_sqs_consumer") - - // if no global section - if !cfg.Has(pluginName) { - return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) - } - - // PARSE CONFIGURATION ------- - var globalCfg GlobalCfg - - err := cfg.UnmarshalKey(pluginName, &globalCfg) - if err != nil { - return nil, errors.E(op, err) - } - - globalCfg.InitDefault() - - attr := make(map[string]string) - err = pipe.Map(attributes, attr) - if err != nil { - return nil, errors.E(op, err) - } - - tg := make(map[string]string) - err = pipe.Map(tags, tg) - if err != nil { - return nil, errors.E(op, err) - } - - // initialize job consumer - jb := &consumer{ - pq: pq, - log: log, - eh: e, - messageGroupID: uuid.NewString(), - attributes: attr, - tags: tg, - queue: aws.String(pipe.String(queue, "default")), - prefetch: int32(pipe.Int(pref, 10)), - visibilityTimeout: int32(pipe.Int(visibility, 0)), - waitTime: int32(pipe.Int(waitTime, 0)), - region: globalCfg.Region, - key: globalCfg.Key, - sessionToken: globalCfg.SessionToken, - secret: globalCfg.Secret, - endpoint: globalCfg.Endpoint, - pauseCh: make(chan struct{}, 1), - } - - // PARSE CONFIGURATION ------- - - awsConf, err := config.LoadDefaultConfig(context.Background(), - config.WithRegion(globalCfg.Region), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken))) - if err != nil { - return nil, errors.E(op, err) - } - - // config with retries - jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) { - o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { - opts.MaxAttempts = 60 - }) - }) - - out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) - if err != nil { - return nil, errors.E(op, err) - } - - // assign a queue URL - jb.queueURL = out.QueueUrl - - // To successfully create a new queue, you must provide a - // queue name that adheres to the limits related to queues - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) - // and is unique within the scope of your queues. After you create a queue, you - // must wait at least one second after the queue is created to be able to use the <------------ - // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require - time.Sleep(time.Second * 2) - - return jb, nil -} - -func (j *consumer) Push(ctx context.Context, jb *job.Job) error { - const op = errors.Op("sqs_push") - // check if the pipeline registered - - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != jb.Options.Pipeline { - return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) - } - - // The length of time, in seconds, for which to delay a specific message. Valid - // values: 0 to 900. Maximum: 15 minutes. - if jb.Options.Delay > 900 { - return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay)) - } - - err := j.handleItem(ctx, fromJob(jb)) - if err != nil { - return errors.E(op, err) - } - return nil -} - -func (j *consumer) State(ctx context.Context) (*jobState.State, error) { - const op = errors.Op("sqs_state") - attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ - QueueUrl: j.queueURL, - AttributeNames: []types.QueueAttributeName{ - types.QueueAttributeNameApproximateNumberOfMessages, - types.QueueAttributeNameApproximateNumberOfMessagesDelayed, - types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, - }, - }) - - if err != nil { - return nil, errors.E(op, err) - } - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - out := &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: *j.queueURL, - Ready: ready(atomic.LoadUint32(&j.listeners)), - } - - nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) - if err == nil { - out.Active = int64(nom) - } - - delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)]) - if err == nil { - out.Delayed = int64(delayed) - } - - nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)]) - if err == nil { - out.Reserved = int64(nv) - } - - return out, nil -} - -func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { - j.pipeline.Store(p) - return nil -} - -func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("sqs_run") - - j.Lock() - defer j.Unlock() - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p.Name() { - return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) - } - - atomic.AddUint32(&j.listeners, 1) - - // start listener - go j.listen(context.Background()) - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - - return nil -} - -func (j *consumer) Stop(context.Context) error { - j.pauseCh <- struct{}{} - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) - return nil -} - -func (j *consumer) Pause(_ context.Context, p string) { - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) - return - } - - l := atomic.LoadUint32(&j.listeners) - // no active listeners - if l == 0 { - j.log.Warn("no active listeners, nothing to pause") - return - } - - atomic.AddUint32(&j.listeners, ^uint32(0)) - - // stop consume - j.pauseCh <- struct{}{} - - j.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) -} - -func (j *consumer) Resume(_ context.Context, p string) { - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) - return - } - - l := atomic.LoadUint32(&j.listeners) - // no active listeners - if l == 1 { - j.log.Warn("sqs listener already in the active state") - return - } - - // start listener - go j.listen(context.Background()) - - // increase num of listeners - atomic.AddUint32(&j.listeners, 1) - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: time.Now(), - }) -} - -func (j *consumer) handleItem(ctx context.Context, msg *Item) error { - d, err := msg.pack(j.queueURL) - if err != nil { - return err - } - _, err = j.client.SendMessage(ctx, d) - if err != nil { - return err - } - - return nil -} - -func ready(r uint32) bool { - return r > 0 -} diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go deleted file mode 100644 index 996adf6c..00000000 --- a/plugins/jobs/drivers/sqs/item.go +++ /dev/null @@ -1,247 +0,0 @@ -package sqs - -import ( - "context" - "strconv" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/types" - json "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - StringType string = "String" - NumberType string = "Number" - BinaryType string = "Binary" - ApproximateReceiveCount string = "ApproximateReceiveCount" -) - -var itemAttributes = []string{ - job.RRJob, - job.RRDelay, - job.RRPriority, - job.RRHeaders, -} - -type Item struct { - // Job contains pluginName of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // Private ================ - approxReceiveCount int64 - queue *string - receiptHandler *string - client *sqs.Client - requeueFn func(context.Context, *Item) error -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -// Body packs job payload into binary payload. -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -// Context packs job context (job, id) into binary payload. -// Not used in the sqs, MessageAttributes used instead -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: i.Options.queue, - ReceiptHandle: i.Options.receiptHandler, - }) - - if err != nil { - return err - } - - return nil -} - -func (i *Item) Nack() error { - // requeue message - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - return err - } - - _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: i.Options.queue, - ReceiptHandle: i.Options.receiptHandler, - }) - - if err != nil { - return err - } - - return nil -} - -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - // overwrite the delay - i.Options.Delay = delay - i.Headers = headers - - // requeue message - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - return err - } - - // Delete job from the queue only after successful requeue - _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: i.Options.queue, - ReceiptHandle: i.Options.receiptHandler, - }) - - if err != nil { - return err - } - - return nil -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Headers: job.Headers, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} - -func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { - // pack headers map - data, err := json.Marshal(i.Headers) - if err != nil { - return nil, err - } - - return &sqs.SendMessageInput{ - MessageBody: aws.String(i.Payload), - QueueUrl: queue, - DelaySeconds: int32(i.Options.Delay), - MessageAttributes: map[string]types.MessageAttributeValue{ - job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, - job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, - job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil}, - job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, - }, - }, nil -} - -func (j *consumer) unpack(msg *types.Message) (*Item, error) { - const op = errors.Op("sqs_unpack") - // reserved - if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { - return nil, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute")) - } - - for i := 0; i < len(itemAttributes); i++ { - if _, ok := msg.MessageAttributes[itemAttributes[i]]; !ok { - return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", itemAttributes[i])) - } - } - - var h map[string][]string - err := json.Unmarshal(msg.MessageAttributes[job.RRHeaders].BinaryValue, &h) - if err != nil { - return nil, err - } - - delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue) - if err != nil { - return nil, errors.E(op, err) - } - - priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue) - if err != nil { - return nil, errors.E(op, err) - } - - recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount]) - if err != nil { - return nil, errors.E(op, err) - } - - item := &Item{ - Job: *msg.MessageAttributes[job.RRJob].StringValue, - Payload: *msg.Body, - Headers: h, - Options: &Options{ - Delay: int64(delay), - Priority: int64(priority), - - // private - approxReceiveCount: int64(recCount), - client: j.client, - queue: j.queueURL, - receiptHandler: msg.ReceiptHandle, - requeueFn: j.handleItem, - }, - } - - return item, nil -} diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go deleted file mode 100644 index a4280af2..00000000 --- a/plugins/jobs/drivers/sqs/listener.go +++ /dev/null @@ -1,87 +0,0 @@ -package sqs - -import ( - "context" - "time" - - "github.com/aws/aws-sdk-go-v2/aws/transport/http" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "github.com/aws/smithy-go" -) - -const ( - // All - get all message attribute names - All string = "All" - - // NonExistentQueue AWS error code - NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" -) - -func (j *consumer) listen(ctx context.Context) { //nolint:gocognit - for { - select { - case <-j.pauseCh: - j.log.Warn("sqs listener stopped") - return - default: - message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ - QueueUrl: j.queueURL, - MaxNumberOfMessages: j.prefetch, - AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)}, - MessageAttributeNames: []string{All}, - // The new value for the message's visibility timeout (in seconds). Values range: 0 - // to 43200. Maximum: 12 hours. - VisibilityTimeout: j.visibilityTimeout, - WaitTimeSeconds: j.waitTime, - }) - - if err != nil { - if oErr, ok := (err).(*smithy.OperationError); ok { - if rErr, ok := oErr.Err.(*http.ResponseError); ok { - if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok { - // in case of NonExistentQueue - recreate the queue - if apiErr.Code == NonExistentQueue { - j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault()) - _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags}) - if err != nil { - j.log.Error("create queue", "error", err) - } - // To successfully create a new queue, you must provide a - // queue name that adheres to the limits related to the queues - // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) - // and is unique within the scope of your queues. After you create a queue, you - // must wait at least one second after the queue is created to be able to use the <------------ - // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require - time.Sleep(time.Second * 2) - continue - } - } - } - } - - j.log.Error("receive message", "error", err) - continue - } - - for i := 0; i < len(message.Messages); i++ { - m := message.Messages[i] - item, err := j.unpack(&m) - if err != nil { - _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: j.queueURL, - ReceiptHandle: m.ReceiptHandle, - }) - if errD != nil { - j.log.Error("message unpack, failed to delete the message from the queue", "error", err) - } - - j.log.Error("message unpack", "error", err) - continue - } - - j.pq.Insert(item) - } - } - } -} diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/jobs/drivers/sqs/plugin.go deleted file mode 100644 index 54f61ff5..00000000 --- a/plugins/jobs/drivers/sqs/plugin.go +++ /dev/null @@ -1,39 +0,0 @@ -package sqs - -import ( - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - pluginName string = "sqs" -) - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Available() {} - -func (p *Plugin) Name() string { - return pluginName -} - -func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewSQSConsumer(configKey, p.log, p.cfg, e, pq) -} - -func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipe, p.log, p.cfg, e, pq) -} diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/kv/drivers/memcached/config.go deleted file mode 100644 index 6d413790..00000000 --- a/plugins/kv/drivers/memcached/config.go +++ /dev/null @@ -1,12 +0,0 @@ -package memcached - -type Config struct { - // Addr is url for memcached, 11211 port is used by default - Addr []string -} - -func (s *Config) InitDefaults() { - if s.Addr == nil { - s.Addr = []string{"127.0.0.1:11211"} // default url for memcached - } -} diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go deleted file mode 100644 index e24747fe..00000000 --- a/plugins/kv/drivers/memcached/driver.go +++ /dev/null @@ -1,248 +0,0 @@ -package memcached - -import ( - "strings" - "time" - - "github.com/bradfitz/gomemcache/memcache" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" -) - -type Driver struct { - client *memcache.Client - log logger.Logger - cfg *Config -} - -// NewMemcachedDriver returns a memcache client using the provided server(s) -// with equal weight. If a server is listed multiple times, -// it gets a proportional amount of weight. -func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { - const op = errors.Op("new_memcached_driver") - - s := &Driver{ - log: log, - } - - err := cfgPlugin.UnmarshalKey(key, &s.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - s.cfg.InitDefaults() - - m := memcache.New(s.cfg.Addr...) - s.client = m - - return s, nil -} - -// Has checks the key for existence -func (d *Driver) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("memcached_plugin_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - m := make(map[string]bool, len(keys)) - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - exist, err := d.client.Get(keys[i]) - - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return nil, errors.E(op, err) - } - if exist != nil { - m[keys[i]] = true - } - } - return m, nil -} - -// Get gets the item for the given key. ErrCacheMiss is returned for a -// memcache cache miss. The key must be at most 250 bytes in length. -func (d *Driver) Get(key string) ([]byte, error) { - const op = errors.Op("memcached_plugin_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - data, err := d.client.Get(key) - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - return nil, nil - } - return nil, errors.E(op, err) - } - if data != nil { - // return the value by the key - return data.Value, nil - } - // data is nil by some reason and error also nil - return nil, nil -} - -// MGet return map with key -- string -// and map value as value -- []byte -func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { - const op = errors.Op("memcached_plugin_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string][]byte, len(keys)) - for i := range keys { - // Here also MultiGet - data, err := d.client.Get(keys[i]) - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return nil, errors.E(op, err) - } - if data != nil { - m[keys[i]] = data.Value - } - } - - return m, nil -} - -// Set sets the KV pairs. Keys should be 250 bytes maximum -// TTL: -// Expiration is the cache expiration time, in seconds: either a relative -// time from now (up to 1 month), or an absolute Unix epoch time. -// Zero means the Item has no expiration time. -func (d *Driver) Set(items ...*kvv1.Item) error { - const op = errors.Op("memcached_plugin_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - - for i := range items { - if items[i] == nil { - return errors.E(op, errors.EmptyItem) - } - - // pre-allocate item - memcachedItem := &memcache.Item{ - Key: items[i].Key, - // unsafe convert - Value: items[i].Value, - Flags: 0, - } - - // add additional TTL in case of TTL isn't empty - if items[i].Timeout != "" { - // verify the TTL - t, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return err - } - memcachedItem.Expiration = int32(t.Unix()) - } - - err := d.client.Set(memcachedItem) - if err != nil { - return err - } - } - - return nil -} - -// MExpire Expiration is the cache expiration time, in seconds: either a relative -// time from now (up to 1 month), or an absolute Unix epoch time. -// Zero means the Item has no expiration time. -func (d *Driver) MExpire(items ...*kvv1.Item) error { - const op = errors.Op("memcached_plugin_mexpire") - for i := range items { - if items[i] == nil { - continue - } - if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - // verify provided TTL - t, err := time.Parse(time.RFC3339, items[i].Timeout) - if err != nil { - return errors.E(op, err) - } - - // Touch updates the expiry for the given key. The seconds parameter is either - // a Unix timestamp or, if seconds is less than 1 month, the number of seconds - // into the future at which time the item will expire. Zero means the item has - // no expiration time. ErrCacheMiss is returned if the key is not in the cache. - // The key must be at most 250 bytes in length. - err = d.client.Touch(items[i].Key, int32(t.Unix())) - if err != nil { - return errors.E(op, err) - } - } - return nil -} - -// TTL return time in seconds (int32) for a given keys -func (d *Driver) TTL(_ ...string) (map[string]string, error) { - const op = errors.Op("memcached_plugin_ttl") - return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) -} - -func (d *Driver) Delete(keys ...string) error { - const op = errors.Op("memcached_plugin_has") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for i := range keys { - keyTrimmed := strings.TrimSpace(keys[i]) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - - for i := range keys { - err := d.client.Delete(keys[i]) - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil { - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err == memcache.ErrCacheMiss { - continue - } - return errors.E(op, err) - } - } - return nil -} - -func (d *Driver) Clear() error { - err := d.client.DeleteAll() - if err != nil { - d.log.Error("flush_all operation failed", "error", err) - return err - } - - return nil -} diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go deleted file mode 100644 index 59a2b7cb..00000000 --- a/plugins/kv/drivers/memcached/plugin.go +++ /dev/null @@ -1,48 +0,0 @@ -package memcached - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/kv" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "memcached" - RootPluginName string = "kv" -) - -type Plugin struct { - // config plugin - cfgPlugin config.Configurer - // logger - log logger.Logger -} - -func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(RootPluginName) { - return errors.E(errors.Disabled) - } - - s.cfgPlugin = cfg - s.log = log - return nil -} - -// Name returns plugin user-friendly name -func (s *Plugin) Name() string { - return PluginName -} - -// Available interface implementation -func (s *Plugin) Available() {} - -func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { - const op = errors.Op("boltdb_plugin_provide") - st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) - if err != nil { - return nil, errors.E(op, err) - } - - return st, nil -} diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index 9a19f96c..c6ca96c3 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -16,11 +16,6 @@ const PluginName string = "kv" const ( // driver is the mandatory field which should present in every storage driver string = "driver" - - memcached string = "memcached" - boltdb string = "boltdb" - redis string = "redis" - memory string = "memory" ) // Plugin for the unified storage @@ -52,40 +47,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { return nil } -func (p *Plugin) Serve() chan error { //nolint:gocognit +func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("kv_plugin_serve") // key - storage name in the config // value - storage - /* - For example we can have here 2 storages (but they are not pre-configured) - for the boltdb and memcached - We should provide here the actual configs for the all requested storages - kv: - boltdb-south: - driver: boltdb - dir: "tests/rr-bolt" - file: "rr.db" - bucket: "rr" - permissions: 777 - ttl: 40s - - boltdb-north: - driver: boltdb - dir: "tests/rr-bolt" - file: "rr.db" - bucket: "rr" - permissions: 777 - ttl: 40s - - memcached: - driver: memcached - addr: [ "127.0.0.1:11211" ] - - - For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached - when user requests for example boltdb-south, we should provide that particular preconfigured storage - */ + // For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached + // when user requests for example boltdb-south, we should provide that particular preconfigured storage + for k, v := range p.cfg.Data { // for example if the key not properly formatted (yaml) if v == nil { @@ -109,30 +78,16 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, k) // at this point we know, that driver field present in the configuration - // TODO(rustatian): refactor, made generic, with checks like in the broadcast, websockets or jobs - switch v.(map[string]interface{})[driver] { - case memcached: - if _, ok := p.constructors[memcached]; !ok { - p.log.Warn("no memcached constructors registered", "registered", p.constructors) - continue - } - - storage, err := p.constructors[memcached].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage + drName := v.(map[string]interface{})[driver] - case boltdb: - if _, ok := p.constructors[boltdb]; !ok { - p.log.Warn("no boltdb constructors registered", "registered", p.constructors) + // driver name should be a string + if drStr, ok := drName.(string); ok { + if _, ok := p.constructors[drStr]; !ok { + p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors) continue } - storage, err := p.constructors[boltdb].KVConstruct(configKey) + storage, err := p.constructors[drStr].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -140,56 +95,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // save the storage p.storages[k] = storage - case memory: - if _, ok := p.constructors[memory]; !ok { - p.log.Warn("no in-memory constructors registered", "registered", p.constructors) - continue - } - - storage, err := p.constructors[memory].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - case redis: - if _, ok := p.constructors[redis]; !ok { - p.log.Warn("no redis constructors registered", "registered", p.constructors) - continue - } - - // first - try local configuration - switch { - case p.cfgPlugin.Has(configKey): - storage, err := p.constructors[redis].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - case p.cfgPlugin.Has(redis): - storage, err := p.constructors[redis].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - continue - default: - // otherwise - error, no local or global config - p.log.Warn("no global or local redis configuration provided", "key", configKey) - continue - } - - default: - p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver]))) } + + continue } return errCh diff --git a/plugins/memcached/config.go b/plugins/memcached/config.go new file mode 100644 index 00000000..6d413790 --- /dev/null +++ b/plugins/memcached/config.go @@ -0,0 +1,12 @@ +package memcached + +type Config struct { + // Addr is url for memcached, 11211 port is used by default + Addr []string +} + +func (s *Config) InitDefaults() { + if s.Addr == nil { + s.Addr = []string{"127.0.0.1:11211"} // default url for memcached + } +} diff --git a/plugins/memcached/driver.go b/plugins/memcached/driver.go new file mode 100644 index 00000000..e24747fe --- /dev/null +++ b/plugins/memcached/driver.go @@ -0,0 +1,248 @@ +package memcached + +import ( + "strings" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" +) + +type Driver struct { + client *memcache.Client + log logger.Logger + cfg *Config +} + +// NewMemcachedDriver returns a memcache client using the provided server(s) +// with equal weight. If a server is listed multiple times, +// it gets a proportional amount of weight. +func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { + const op = errors.Op("new_memcached_driver") + + s := &Driver{ + log: log, + } + + err := cfgPlugin.UnmarshalKey(key, &s.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + s.cfg.InitDefaults() + + m := memcache.New(s.cfg.Addr...) + s.client = m + + return s, nil +} + +// Has checks the key for existence +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool, len(keys)) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + exist, err := d.client.Get(keys[i]) + + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if exist != nil { + m[keys[i]] = true + } + } + return m, nil +} + +// Get gets the item for the given key. ErrCacheMiss is returned for a +// memcache cache miss. The key must be at most 250 bytes in length. +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("memcached_plugin_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + data, err := d.client.Get(key) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + return nil, nil + } + return nil, errors.E(op, err) + } + if data != nil { + // return the value by the key + return data.Value, nil + } + // data is nil by some reason and error also nil + return nil, nil +} + +// MGet return map with key -- string +// and map value as value -- []byte +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { + const op = errors.Op("memcached_plugin_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string][]byte, len(keys)) + for i := range keys { + // Here also MultiGet + data, err := d.client.Get(keys[i]) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if data != nil { + m[keys[i]] = data.Value + } + } + + return m, nil +} + +// Set sets the KV pairs. Keys should be 250 bytes maximum +// TTL: +// Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (d *Driver) Set(items ...*kvv1.Item) error { + const op = errors.Op("memcached_plugin_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + if items[i] == nil { + return errors.E(op, errors.EmptyItem) + } + + // pre-allocate item + memcachedItem := &memcache.Item{ + Key: items[i].Key, + // unsafe convert + Value: items[i].Value, + Flags: 0, + } + + // add additional TTL in case of TTL isn't empty + if items[i].Timeout != "" { + // verify the TTL + t, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return err + } + memcachedItem.Expiration = int32(t.Unix()) + } + + err := d.client.Set(memcachedItem) + if err != nil { + return err + } + } + + return nil +} + +// MExpire Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (d *Driver) MExpire(items ...*kvv1.Item) error { + const op = errors.Op("memcached_plugin_mexpire") + for i := range items { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // verify provided TTL + t, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return errors.E(op, err) + } + + // Touch updates the expiry for the given key. The seconds parameter is either + // a Unix timestamp or, if seconds is less than 1 month, the number of seconds + // into the future at which time the item will expire. Zero means the item has + // no expiration time. ErrCacheMiss is returned if the key is not in the cache. + // The key must be at most 250 bytes in length. + err = d.client.Touch(items[i].Key, int32(t.Unix())) + if err != nil { + return errors.E(op, err) + } + } + return nil +} + +// TTL return time in seconds (int32) for a given keys +func (d *Driver) TTL(_ ...string) (map[string]string, error) { + const op = errors.Op("memcached_plugin_ttl") + return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) +} + +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + err := d.client.Delete(keys[i]) + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return errors.E(op, err) + } + } + return nil +} + +func (d *Driver) Clear() error { + err := d.client.DeleteAll() + if err != nil { + d.log.Error("flush_all operation failed", "error", err) + return err + } + + return nil +} diff --git a/plugins/memcached/plugin.go b/plugins/memcached/plugin.go new file mode 100644 index 00000000..59a2b7cb --- /dev/null +++ b/plugins/memcached/plugin.go @@ -0,0 +1,48 @@ +package memcached + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "memcached" + RootPluginName string = "kv" +) + +type Plugin struct { + // config plugin + cfgPlugin config.Configurer + // logger + log logger.Logger +} + +func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + if !cfg.Has(RootPluginName) { + return errors.E(errors.Disabled) + } + + s.cfgPlugin = cfg + s.log = log + return nil +} + +// Name returns plugin user-friendly name +func (s *Plugin) Name() string { + return PluginName +} + +// Available interface implementation +func (s *Plugin) Available() {} + +func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { + const op = errors.Op("boltdb_plugin_provide") + st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) + if err != nil { + return nil, errors.E(op, err) + } + + return st, nil +} diff --git a/plugins/sqs/config.go b/plugins/sqs/config.go new file mode 100644 index 00000000..9b2a1ca8 --- /dev/null +++ b/plugins/sqs/config.go @@ -0,0 +1,114 @@ +package sqs + +import "github.com/aws/aws-sdk-go-v2/aws" + +const ( + attributes string = "attributes" + tags string = "tags" + queue string = "queue" + pref string = "prefetch" + visibility string = "visibility_timeout" + waitTime string = "wait_time" +) + +type GlobalCfg struct { + Key string `mapstructure:"key"` + Secret string `mapstructure:"secret"` + Region string `mapstructure:"region"` + SessionToken string `mapstructure:"session_token"` + Endpoint string `mapstructure:"endpoint"` +} + +// Config is used to parse pipeline configuration +type Config struct { + // The duration (in seconds) that the received messages are hidden from subsequent + // retrieve requests after being retrieved by a ReceiveMessage request. + VisibilityTimeout int32 `mapstructure:"visibility_timeout"` + // The duration (in seconds) for which the call waits for a message to arrive + // in the queue before returning. If a message is available, the call returns + // sooner than WaitTimeSeconds. If no messages are available and the wait time + // expires, the call returns successfully with an empty list of messages. + WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"` + // Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages + // than this value (however, fewer messages might be returned). Valid values: 1 to + // 10. Default: 1. + Prefetch int32 `mapstructure:"prefetch"` + // The name of the new queue. The following limits apply to this name: + // + // * A queue + // name can have up to 80 characters. + // + // * Valid values: alphanumeric characters, + // hyphens (-), and underscores (_). + // + // * A FIFO queue name must end with the .fifo + // suffix. + // + // Queue URLs and names are case-sensitive. + // + // This member is required. + Queue *string `mapstructure:"queue"` + + // A map of attributes with their corresponding values. The following lists the + // names, descriptions, and values of the special request parameters that the + // CreateQueue action uses. + // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html + Attributes map[string]string `mapstructure:"attributes"` + + // From amazon docs: + // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see + // Tagging Your Amazon SQS Queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html) + // in the Amazon SQS Developer Guide. When you use queue tags, keep the following + // guidelines in mind: + // + // * Adding more than 50 tags to a queue isn't recommended. + // + // * + // Tags don't have any semantic meaning. Amazon SQS interprets tags as character + // strings. + // + // * Tags are case-sensitive. + // + // * A new tag with a key identical to that + // of an existing tag overwrites the existing tag. + // + // For a full list of tag + // restrictions, see Quotas related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues) + // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you + // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account + // permissions don't apply to this action. For more information, see Grant + // cross-account permissions to a role and a user name + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name) + // in the Amazon SQS Developer Guide. + Tags map[string]string `mapstructure:"tags"` +} + +func (c *GlobalCfg) InitDefault() { + if c.Endpoint == "" { + c.Endpoint = "http://127.0.0.1:9324" + } +} + +func (c *Config) InitDefault() { + if c.Queue == nil { + c.Queue = aws.String("default") + } + + if c.Prefetch == 0 || c.Prefetch > 10 { + c.Prefetch = 10 + } + + if c.WaitTimeSeconds == 0 { + c.WaitTimeSeconds = 5 + } + + if c.Attributes == nil { + c.Attributes = make(map[string]string) + } + + if c.Tags == nil { + c.Tags = make(map[string]string) + } +} diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go new file mode 100644 index 00000000..23203190 --- /dev/null +++ b/plugins/sqs/consumer.go @@ -0,0 +1,411 @@ +package sqs + +import ( + "context" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/retry" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/google/uuid" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type consumer struct { + sync.Mutex + pq priorityqueue.Queue + log logger.Logger + eh events.Handler + pipeline atomic.Value + + // connection info + key string + secret string + sessionToken string + region string + endpoint string + queue *string + messageGroupID string + waitTime int32 + prefetch int32 + visibilityTimeout int32 + + // if user invoke several resume operations + listeners uint32 + + // queue optional parameters + attributes map[string]string + tags map[string]string + + client *sqs.Client + queueURL *string + + pauseCh chan struct{} +} + +func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_sqs_consumer") + + // if no such key - error + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) + } + + // PARSE CONFIGURATION ------- + var pipeCfg Config + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(configKey, &pipeCfg) + if err != nil { + return nil, errors.E(op, err) + } + + pipeCfg.InitDefault() + + err = cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + // initialize job consumer + jb := &consumer{ + pq: pq, + log: log, + eh: e, + messageGroupID: uuid.NewString(), + attributes: pipeCfg.Attributes, + tags: pipeCfg.Tags, + queue: pipeCfg.Queue, + prefetch: pipeCfg.Prefetch, + visibilityTimeout: pipeCfg.VisibilityTimeout, + waitTime: pipeCfg.WaitTimeSeconds, + region: globalCfg.Region, + key: globalCfg.Key, + sessionToken: globalCfg.SessionToken, + secret: globalCfg.Secret, + endpoint: globalCfg.Endpoint, + pauseCh: make(chan struct{}, 1), + } + + // PARSE CONFIGURATION ------- + + awsConf, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(globalCfg.Region), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken))) + if err != nil { + return nil, errors.E(op, err) + } + + // config with retries + jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) { + o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { + opts.MaxAttempts = 60 + }) + }) + + out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) + if err != nil { + return nil, errors.E(op, err) + } + + // assign a queue URL + jb.queueURL = out.QueueUrl + + // To successfully create a new queue, you must provide a + // queue name that adheres to the limits related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) + // and is unique within the scope of your queues. After you create a queue, you + // must wait at least one second after the queue is created to be able to use the <------------ + // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require + time.Sleep(time.Second * 2) + + return jb, nil +} + +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_sqs_consumer") + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) + } + + // PARSE CONFIGURATION ------- + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + attr := make(map[string]string) + err = pipe.Map(attributes, attr) + if err != nil { + return nil, errors.E(op, err) + } + + tg := make(map[string]string) + err = pipe.Map(tags, tg) + if err != nil { + return nil, errors.E(op, err) + } + + // initialize job consumer + jb := &consumer{ + pq: pq, + log: log, + eh: e, + messageGroupID: uuid.NewString(), + attributes: attr, + tags: tg, + queue: aws.String(pipe.String(queue, "default")), + prefetch: int32(pipe.Int(pref, 10)), + visibilityTimeout: int32(pipe.Int(visibility, 0)), + waitTime: int32(pipe.Int(waitTime, 0)), + region: globalCfg.Region, + key: globalCfg.Key, + sessionToken: globalCfg.SessionToken, + secret: globalCfg.Secret, + endpoint: globalCfg.Endpoint, + pauseCh: make(chan struct{}, 1), + } + + // PARSE CONFIGURATION ------- + + awsConf, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(globalCfg.Region), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken))) + if err != nil { + return nil, errors.E(op, err) + } + + // config with retries + jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) { + o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) { + opts.MaxAttempts = 60 + }) + }) + + out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) + if err != nil { + return nil, errors.E(op, err) + } + + // assign a queue URL + jb.queueURL = out.QueueUrl + + // To successfully create a new queue, you must provide a + // queue name that adheres to the limits related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) + // and is unique within the scope of your queues. After you create a queue, you + // must wait at least one second after the queue is created to be able to use the <------------ + // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require + time.Sleep(time.Second * 2) + + return jb, nil +} + +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { + const op = errors.Op("sqs_push") + // check if the pipeline registered + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != jb.Options.Pipeline { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) + } + + // The length of time, in seconds, for which to delay a specific message. Valid + // values: 0 to 900. Maximum: 15 minutes. + if jb.Options.Delay > 900 { + return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay)) + } + + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + return nil +} + +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("sqs_state") + attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + QueueUrl: j.queueURL, + AttributeNames: []types.QueueAttributeName{ + types.QueueAttributeNameApproximateNumberOfMessages, + types.QueueAttributeNameApproximateNumberOfMessagesDelayed, + types.QueueAttributeNameApproximateNumberOfMessagesNotVisible, + }, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: *j.queueURL, + Ready: ready(atomic.LoadUint32(&j.listeners)), + } + + nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) + if err == nil { + out.Active = int64(nom) + } + + delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)]) + if err == nil { + out.Delayed = int64(delayed) + } + + nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)]) + if err == nil { + out.Reserved = int64(nv) + } + + return out, nil +} + +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { + j.pipeline.Store(p) + return nil +} + +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("sqs_run") + + j.Lock() + defer j.Unlock() + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) + } + + atomic.AddUint32(&j.listeners, 1) + + // start listener + go j.listen(context.Background()) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + return nil +} + +func (j *consumer) Stop(context.Context) error { + j.pauseCh <- struct{}{} + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil +} + +func (j *consumer) Pause(_ context.Context, p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&j.listeners, ^uint32(0)) + + // stop consume + j.pauseCh <- struct{}{} + + j.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (j *consumer) Resume(_ context.Context, p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 1 { + j.log.Warn("sqs listener already in the active state") + return + } + + // start listener + go j.listen(context.Background()) + + // increase num of listeners + atomic.AddUint32(&j.listeners, 1) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (j *consumer) handleItem(ctx context.Context, msg *Item) error { + d, err := msg.pack(j.queueURL) + if err != nil { + return err + } + _, err = j.client.SendMessage(ctx, d) + if err != nil { + return err + } + + return nil +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/sqs/item.go b/plugins/sqs/item.go new file mode 100644 index 00000000..996adf6c --- /dev/null +++ b/plugins/sqs/item.go @@ -0,0 +1,247 @@ +package sqs + +import ( + "context" + "strconv" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + json "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" +) + +const ( + StringType string = "String" + NumberType string = "Number" + BinaryType string = "Binary" + ApproximateReceiveCount string = "ApproximateReceiveCount" +) + +var itemAttributes = []string{ + job.RRJob, + job.RRDelay, + job.RRPriority, + job.RRHeaders, +} + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // Private ================ + approxReceiveCount int64 + queue *string + receiptHandler *string + client *sqs.Client + requeueFn func(context.Context, *Item) error +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +// Body packs job payload into binary payload. +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +// Context packs job context (job, id) into binary payload. +// Not used in the sqs, MessageAttributes used instead +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: i.Options.queue, + ReceiptHandle: i.Options.receiptHandler, + }) + + if err != nil { + return err + } + + return nil +} + +func (i *Item) Nack() error { + // requeue message + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err + } + + _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: i.Options.queue, + ReceiptHandle: i.Options.receiptHandler, + }) + + if err != nil { + return err + } + + return nil +} + +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + // overwrite the delay + i.Options.Delay = delay + i.Headers = headers + + // requeue message + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err + } + + // Delete job from the queue only after successful requeue + _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: i.Options.queue, + ReceiptHandle: i.Options.receiptHandler, + }) + + if err != nil { + return err + } + + return nil +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Headers: job.Headers, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } +} + +func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { + // pack headers map + data, err := json.Marshal(i.Headers) + if err != nil { + return nil, err + } + + return &sqs.SendMessageInput{ + MessageBody: aws.String(i.Payload), + QueueUrl: queue, + DelaySeconds: int32(i.Options.Delay), + MessageAttributes: map[string]types.MessageAttributeValue{ + job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, + job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, + job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil}, + job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, + }, + }, nil +} + +func (j *consumer) unpack(msg *types.Message) (*Item, error) { + const op = errors.Op("sqs_unpack") + // reserved + if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { + return nil, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute")) + } + + for i := 0; i < len(itemAttributes); i++ { + if _, ok := msg.MessageAttributes[itemAttributes[i]]; !ok { + return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", itemAttributes[i])) + } + } + + var h map[string][]string + err := json.Unmarshal(msg.MessageAttributes[job.RRHeaders].BinaryValue, &h) + if err != nil { + return nil, err + } + + delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue) + if err != nil { + return nil, errors.E(op, err) + } + + priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue) + if err != nil { + return nil, errors.E(op, err) + } + + recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount]) + if err != nil { + return nil, errors.E(op, err) + } + + item := &Item{ + Job: *msg.MessageAttributes[job.RRJob].StringValue, + Payload: *msg.Body, + Headers: h, + Options: &Options{ + Delay: int64(delay), + Priority: int64(priority), + + // private + approxReceiveCount: int64(recCount), + client: j.client, + queue: j.queueURL, + receiptHandler: msg.ReceiptHandle, + requeueFn: j.handleItem, + }, + } + + return item, nil +} diff --git a/plugins/sqs/listener.go b/plugins/sqs/listener.go new file mode 100644 index 00000000..a4280af2 --- /dev/null +++ b/plugins/sqs/listener.go @@ -0,0 +1,87 @@ +package sqs + +import ( + "context" + "time" + + "github.com/aws/aws-sdk-go-v2/aws/transport/http" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/aws/smithy-go" +) + +const ( + // All - get all message attribute names + All string = "All" + + // NonExistentQueue AWS error code + NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" +) + +func (j *consumer) listen(ctx context.Context) { //nolint:gocognit + for { + select { + case <-j.pauseCh: + j.log.Warn("sqs listener stopped") + return + default: + message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ + QueueUrl: j.queueURL, + MaxNumberOfMessages: j.prefetch, + AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)}, + MessageAttributeNames: []string{All}, + // The new value for the message's visibility timeout (in seconds). Values range: 0 + // to 43200. Maximum: 12 hours. + VisibilityTimeout: j.visibilityTimeout, + WaitTimeSeconds: j.waitTime, + }) + + if err != nil { + if oErr, ok := (err).(*smithy.OperationError); ok { + if rErr, ok := oErr.Err.(*http.ResponseError); ok { + if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok { + // in case of NonExistentQueue - recreate the queue + if apiErr.Code == NonExistentQueue { + j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault()) + _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags}) + if err != nil { + j.log.Error("create queue", "error", err) + } + // To successfully create a new queue, you must provide a + // queue name that adheres to the limits related to the queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) + // and is unique within the scope of your queues. After you create a queue, you + // must wait at least one second after the queue is created to be able to use the <------------ + // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require + time.Sleep(time.Second * 2) + continue + } + } + } + } + + j.log.Error("receive message", "error", err) + continue + } + + for i := 0; i < len(message.Messages); i++ { + m := message.Messages[i] + item, err := j.unpack(&m) + if err != nil { + _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: j.queueURL, + ReceiptHandle: m.ReceiptHandle, + }) + if errD != nil { + j.log.Error("message unpack, failed to delete the message from the queue", "error", err) + } + + j.log.Error("message unpack", "error", err) + continue + } + + j.pq.Insert(item) + } + } + } +} diff --git a/plugins/sqs/plugin.go b/plugins/sqs/plugin.go new file mode 100644 index 00000000..54f61ff5 --- /dev/null +++ b/plugins/sqs/plugin.go @@ -0,0 +1,39 @@ +package sqs + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + pluginName string = "sqs" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) Available() {} + +func (p *Plugin) Name() string { + return pluginName +} + +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewSQSConsumer(configKey, p.log, p.cfg, e, pq) +} + +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipe, p.log, p.cfg, e, pq) +} diff --git a/tests/allocate-failed.php b/tests/allocate-failed.php new file mode 100644 index 00000000..8514ecc0 --- /dev/null +++ b/tests/allocate-failed.php @@ -0,0 +1,18 @@ +waitPayload()){ + $rr->respond(new \Spiral\RoadRunner\Payload("")); +} diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 48d6515d..949698ec 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -14,10 +14,10 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index 8e74c7cc..9f4d37ec 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -14,10 +14,10 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/beanstalk" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index 98590a96..2890aa9d 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -15,9 +15,9 @@ import ( goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index f0b5697b..91855ee9 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -12,11 +12,11 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index 630a059a..95abe9dc 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -17,11 +17,11 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/sqs" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go index f6521e8d..80fed8eb 100644 --- a/tests/plugins/jobs/jobs_with_toxics_test.go +++ b/tests/plugins/jobs/jobs_with_toxics_test.go @@ -11,15 +11,15 @@ import ( toxiproxy "github.com/Shopify/toxiproxy/client" "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/amqp" + "github.com/spiral/roadrunner/v2/plugins/beanstalk" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/sqs" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index a09a456c..e757a9e6 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -15,8 +15,8 @@ import ( "github.com/spiral/roadrunner/v2/plugins/boltdb" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memcached" "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" -- cgit v1.2.3