diff options
author | Valery Piashchynski <[email protected]> | 2021-06-22 11:44:22 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-22 11:44:22 +0300 |
commit | 1a2a1f4735e40675abf6cd9767c99374359ec2bb (patch) | |
tree | 5abedf7306b50b02ba3892c0bc562307a62eb332 /plugins | |
parent | 260d69c21fba6d763d05dc5693689ddf7ce7bfe2 (diff) |
- Remove all old code, reformat, fix linters, return GA
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
66 files changed, 36 insertions, 10032 deletions
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 5c5ad400..4606ccba 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -8,8 +8,8 @@ import ( // Config defines settings for job broker, workers and job-pipeline mapping. type Config struct { // Workers configures roadrunner server and worker busy. - //Workers *roadrunner.ServerConfig - pool poolImpl.Config + // Workers *roadrunner.ServerConfig + poolCfg poolImpl.Config // Dispatch defines where and how to match jobs. Dispatch map[string]*Options @@ -25,6 +25,16 @@ type Config struct { route Dispatcher } +func (c *Config) InitDefaults() error { + const op = errors.Op("config_init_defaults") + var err error + c.pipelines, err = initPipelines(c.Pipelines) + if err != nil { + return errors.E(op, err) + } + return nil +} + // MatchPipeline locates the pipeline associated with the job. func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error) { const op = errors.Op("config_match_pipeline") diff --git a/plugins/jobs/dispatcher.go b/plugins/jobs/dispatcher.go index 9fde8fac..8faf4db5 100644 --- a/plugins/jobs/dispatcher.go +++ b/plugins/jobs/dispatcher.go @@ -18,7 +18,7 @@ func initDispatcher(routes map[string]*Options) Dispatcher { pattern = strings.Trim(pattern, "-.*") for _, s := range separators { - pattern = strings.Replace(pattern, s, ".", -1) + pattern = strings.ReplaceAll(pattern, s, ".") } dispatcher[pattern] = opts diff --git a/plugins/jobs/dispatcher_test.go b/plugins/jobs/dispatcher_test.go index 59e3fd4e..9917642f 100644 --- a/plugins/jobs/dispatcher_test.go +++ b/plugins/jobs/dispatcher_test.go @@ -1,8 +1,9 @@ package jobs import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func Test_Map_All(t *testing.T) { diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio index ee923d29..a1c1532c 100644 --- a/plugins/jobs/doc/jobs_arch.drawio +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-06-21T13:57:33.772Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="eDaFKmf6xAQVYDDexY3m" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvrc5s4EP9rPONmJh7e4I9x7CTttVc3uZu2900GYWgBcUL4kb/+JBBPYcdNsJ32mkxra/VA+/jtrlZkoF6Hm1sMYu8DcmAwUCRnM1CnA0WRVVMz6ScjbXOSJUtSTlli3+HDKsKD/wg5sRiW+g5MGgMJQgHx4ybRRlEEbdKgAYzRujnMRUHzqTFYQoHwYINApH72HeIVu5MKPljPHfSXHhG6QlCM54TEAw5a10jqbKBeY4RI/i3cXMOASbAQTT7vZkdvuTcMI3LIhC9RtLmTUQhXjyQybz/BfyT7UtHyZVYgSDnTfLdkW0gBozRyIFtFGqiTtecT+BADm/WuqeYpzSNhQFsy/er6QXCNAoSzuaoDoOXalJ4QjL7DWo9hW3Dh0h6RD87aCmICNzUS5+sWUiYI3tIhvFe1uIy5oY3NwoLWldqUYpBXV1ihHcBNZVkuXomSfuHS/BHJCnKEDjUu3kSYeGiJIhDMKuqkKelqzHuEYi7fb5CQLUcKSAlqSp+KEG+/sPkjvWh+rfdNN3zxvLXlrZ06SFCKbbiPTc4nAXgJyZ6BfEEmhL0qxTAAxF81EdilHT51jny659IUNE1vmoJutjSc75RPq+NFWKllVPq4tVIuHGGlK4zBtjYsZgMSwaBKnp9vY+pZbGzjk5qJ0dbXwqbo98rAWGNbt7aT2qV8oFlqpzLLljGpxmFm2ZepFD618vP3c7qSdEn/zf9+uKOL5h90Q9vI7jcI6NBytK4gYCkL1TB6CgJmS8SaIgYBWesIAuqxYsD4rPiso7PE6o/hUzoaPrVDw4Z0GoAqbespc4pTAfS3M3+5sRgn8ubqeGSNDV2XTUWjB51x03SUdlJ5ZNMRc/gPgNge8+xv57P3b/+c9erOoUwdutnlzseGqYK+3Hk7p1e73LnS4c6PltKb50So3HDn5mvz5/qBEO0doS/SqPXT+Vy5oc5Ku2dTaO/nuhcpVBdc4UAxArr/SdzQs/Fvyuouk5Cy50cD9Yr2SvGG/p8JTcrpl4SplPVptT7qwMglCPwln2dT2ULc7HagjTDlF/ExzGRw4EewejT9tuSf2QYXBeEdWlCxXD0QnNokxbAYQAWyaE+itLhN83Cb8izeA+iSvNNine1t011mM6bZ2tiPliUj5fPnYBsg4Dw57mPMJJWU48p2F5MtwLKI4tsguMoVMs1UNuHqmeZMTBAd5QZZ9Y0FMxbTUEQ4QmWFt29A6AcMOXcwWEG26t446Orst7O2lf3wVWv0/Kef+GjI7fKE1BEfjY742C5j9AY+4xcAX2F6rx5/cz+GOU9PgGtKHe/2yVFXhMAwJhUGs1x1N5PHYOmeWWFzu7VdLASxnmpff/khRCl5zqZ+O6us1xJqqYc6K1k+lreSZUE9Z0vnD87mlVOVT6UD07/e66cv06kkhKB7ilw4yg7Ew29o8abXo7Br2dDuvN5aWLqm79XB4eiR9fZReHzgUfholU1ZEQSNIQ2WUT17awmaxeamNAvHx4O4Omn7x9B3nBx3MPEfwSJbikmV36nQdfXJQJ+ytSjUEu4jBYVEKIL96EIRyhKWqIvxKasSslj+iWluMFBYeR/9yrrQ2rhQDFEXXbe+x9OFeP6s5WmXNC8AIfMoPDug+UdMhcZzMn3qM9G7zPeYk4E5/b9orQNB8mkhdNbK3uuuvRc19acLQVbfqcCziu9yO+Yd+95GPPWGkD6RibuC/i4//BPW4eXzJx/nvVd93XC1DoRrobEzw1U68V1ZIZ8aXG8h2+6EYYgVklyMQiZAD2bML/2ERlbIipiLbAgrkax8wPE9yucNez5XuK7Sfa5wjIWhHwvaUlck7oJ2++WV/t6aEw9w8zTxhlVGm5/hJIpO+via3nqU/nleWrTGh0pfO5r0xbeEdoj318hEjbb9a6IG9FMmoop4rBZEj1LCkorr8p1nSazd0d8b9tjJEgPHh1UfF17L4MvhnZJ2QOKVSCq0+h4sYDCnWsrL7NMFIoT6TVHtzRJnaR119HVUN4tYzh4Jkjhn1PU3bB+5wUA8W8HcbjIb8UDMJoSbJXsbfQTWiTZKk+xZfZiKJDdNxVRFUzEt0VQKWv+mooqe8m5OCZ8/3v8xuxfMphBQjJENk+Rpp7gA9vdl5kY/5ha3I1SdpASmqy2ompo10g90lz1UGx8N11mr1o32Pbz6eptu5ub47lLUwOnuuzousvLSQiC2qrJCq/7Qau240SgvvajLSdIwS5KGeS705kx3Y52iEi5z9skxGyVI8YEAMrwoSjYszxhe0IQjGbGOPP+AGKMG3/1cQR2Loyx7uqiKUIyFi3d5DjUsLgOf4muHK8GZW677DPHqqsvJ7AB/h4vYUxIftxxC+QJnzR2UrwQ33MEz0ifarP5QJz+KVH/zpM7+Aw==</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-06-22T07:34:31.801Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="W3ry_giS-Ii3LUJqzQzg" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvpc9o4FP9rmKGZCePb5mMIOdpttzTZnbb7TdgCu7UtVxZX/vqVbPmUIDQxkLQlB+hJlvXe+71DT6anX0brGwwS/wPyYNjTFG/d08c9TdMUS6NvjLLJKaqmmjlljgOP0yrCffAAOVHh1EXgwbQxkCAUkiBpEl0Ux9AlDRrAGK2aw2YobN41AXMoEO5dEIrUz4FH/GJ19FX13MJg7hOhKwLFeE5IfeChVY2kX/X0S4wQyT9F60sYMgEWosmvu97SW64Nw5jsc8GXOF7fqiiCywcS2zef4H+Ke64Z+TRLEC4403y1ZFNIAaNF7EE2i9LTRys/IPA+AS7rXVHFU5pPopC2VPpxFoThJQoRzq7VPQCdmUvpKcHoO6z1WK4DpzPaI/LBWVtCTOC6RuJ83UDKBMEbOoT36g6XMQfa0C4QtKrUphWD/LrCCu0ADpV5OXklSvqBS/NnJCvIEXoUXLyJMPHRHMUgvKqoo6akqzHvEUq4fL9BQjbcUsCCoKb0qQjx5gu7fmAWza/1vvGaT563Nry1VQcpWmAX7mKT80kAnkOyYyCfkAlhp0oxDAEJlk0LlGmHXzpBAV1zCQXDMJtQMO2WhvOV8svq9iLM1AKVOWzNlAtHmOkCY7CpDUvYgFQAVMnz0zGmnwRj64DUIEZbXwtM0c8VwFhjU0fbUXGp7glL41iwbIFJt/aDZVdQKXxq5efvJnQm5Zz+Tf69v6WT5m90QZvY7TYImNDxDFkQcLSpblkdBQG7JWJDE4OAakiCgH6oGDA8qX3WrbO01Z+zT+Vg9mnsGzaU4xio1kZPmVMcy0D/OPPng8U6kjfXhwNnaJmmamuGqtvDJnS0dlJ5YOiIOfwHQFyfefa3k6v3b/++6tSdQ5U6dFvmzoeWrYOu3Hk7p9dl7lyTuPODpfT2KS1Ubbhz+6X5c3NPE+3cQp+lUefV+Vy1oc5KuydTaOf7umcp1BRcYU+zQrr+UdLQs/Vjweouo4iyF8Q9/YL2Ksma/s+EpuT0c8JUyvqMWh91YOQchMGcX+dS2ULc7PagizDlF/ExDDI4DGJY3Zp+mvP3bIHTgvAOTalYLu4JXrhkgWExgApk2r6I0pI2zcdtypN4D+GM5J0O62wvm64yu2KczY2DeF4yUt5/AjYhAt6j4z4mTFJpOa5sy5hsGSyLKIELwotcIeNMZSOunnHOxAjRUbMwq76xYMZiGooJt1BV4+1rEAUhs5xbGC4hm3VnHJyZ7Eda28pefNYaPX91Ex8ttV2eUCTx0ZLEx3YZozPjs34B4yug9+LtbxIkMOfpEeMaU8e7eXTUBSEwSkhlg1muup3JQ7B0x1DYXG5tFVNBrMda1z9BBNGCPGVRf5xV1usItdR9nZWqHspbqaqgnpOl83tn89qxyqfKnulf5/XT5+lUEULQHbVcOMg2xP1vaPqm063wzHGhKz3emjqmYe7Uwf7Wo5rtrfBwz63wwSqbqiYIGkMaLON69tYSNIvNTWkWjo8HcX3U9o9R4Hm53cE0eADTbComVX6mQuc1Rz1zzOaippZyHykoJEYx7EYXmlCWcERdDI9ZlVDF8k9Cc4Oexsr76FfWhdG2C80SdSE79T2cLsT9Zy1PO6d5AYiYR+HZAc0/Eio0npOZ44CJfsZ8jz3q2ePfRWsSC1KPa0Inrey97Np7UVN/vBDkdJ0KPKn4rrZj3qHPbcRdbwTpHZm4K9Pf5odfYR1ePX3ycdpz1Zdtrs6e5lpo7MTmqhz5rKyQT81cbyBb7ojZECskzTCKmAB9mDE/D1IaWSErYk6zIaxEsgwAt+9Bfl2/433FbKbJ9xWeNbXMQ5m2IovEMtNuP7zS3VNz4gZuskj9fpXR5ns4hVonvX1Nbx1K/zQPLTrDfaVvHEz64lNCW8T7a2SiVhv/hqgB85iJqCZuqwXRowVhScVl+cyzItbu6M81u+1ojoEXwKqPC68F+HK4VNIeSP3SkgqtvgdTGE6olvIy+3iKCKF+U1R7s8RZoqNufZLqZhHL2S1BmuSMzoI1W0cOGIivljDHTYYRHyTsgmg9Zw+jD8AqNQaLNLtXF1BR1CZUbF2Eiu2IUClo3UNFFz3l7YQSPn+8++vqToBNIaAEIxem6eNOcQrc7/PMjX7MEbclVB2lBGbqLVO1DWdg7ukuO6g2Plgzb6U718b36OLrzWI9sYe356IGjnfeJTnIyksLodiqygqt+kOrteVEozz0oi4nXURZktTPc6E3Jzobk4pKOMzZJcdslCDFewJI/6wo2bA8o39GE450wDry/ANijBp8d3MEdSiOsuzprCpCMRbO3uU5VL84DHyFfHEsNpT1SpZ+x/cUTaD19Gv+SxVDvWhIYx7baqQoXMJ0C3dbfDzO4mXdmYtnijLvv8UrS3z39rOK0gXvOqwon9Vu+Oku8lqpo/5N6gWtrfyjBQSprCTPfUnHOXIYdH7Ot2uR9dwn+/LCczaBXYDfFr7mIdvTKRLwd1Es266nhqh+DFhIY7mED+geInwBYmtnd9oBa4y0WX3pMi8rVV9f1a/+Bw==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go index 060326c8..d013d320 100644 --- a/plugins/jobs/interface.go +++ b/plugins/jobs/interface.go @@ -1,8 +1,9 @@ package jobs - // todo naming type Consumer interface { Push() Stat() + Consume(*Pipeline) + Register(*Pipeline) } diff --git a/plugins/jobs/job.go b/plugins/jobs/job.go index 8458b25b..79bb8ad8 100644 --- a/plugins/jobs/job.go +++ b/plugins/jobs/job.go @@ -5,12 +5,6 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -//// Handler handles job execution. -//type Handler func(id string, j *Job) error -// -//// ErrorHandler handles job execution errors. -//type ErrorHandler func(id string, j *Job, err error) - // Job carries information about single job. type Job struct { // Job contains name of job broker (usually PHP class). diff --git a/plugins/jobs/job_options_test.go b/plugins/jobs/job_options_test.go index 8caaa935..d226fa1e 100644 --- a/plugins/jobs/job_options_test.go +++ b/plugins/jobs/job_options_test.go @@ -1,9 +1,10 @@ package jobs import ( - "github.com/stretchr/testify/assert" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestOptions_CanRetry(t *testing.T) { diff --git a/plugins/jobs/job_test.go b/plugins/jobs/job_test.go index e1938eca..1f4bf918 100644 --- a/plugins/jobs/job_test.go +++ b/plugins/jobs/job_test.go @@ -1,8 +1,9 @@ package jobs import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestJob_Body(t *testing.T) { diff --git a/plugins/jobs/oooold/broker.go b/plugins/jobs/oooold/broker.go deleted file mode 100644 index d49616e7..00000000 --- a/plugins/jobs/oooold/broker.go +++ /dev/null @@ -1,47 +0,0 @@ -package oooold - -// Broker manages set of pipelines and provides ability to push jobs into them. -type Broker interface { - // Register broker pipeline. - Register(pipe *Pipeline) error - - // Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before - // the service is started! - Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error - - // Push job into the worker. - Push(pipe *Pipeline, j *Job) (string, error) - - // Stat must fetch statistics about given pipeline or return error. - Stat(pipe *Pipeline) (stat *Stat, err error) -} - -// EventProvider defines the ability to throw events for the broker. -type EventProvider interface { - // Listen attaches the even listener. - Listen(lsn func(event int, ctx interface{})) -} - -// Stat contains information about pipeline. -type Stat struct { - // Pipeline name. - Pipeline string - - // Broken is name of associated broker. - Broker string - - // InternalName defines internal broker specific pipeline name. - InternalName string - - // Consuming indicates that pipeline is pipelines jobs. - Consuming bool - - // testQueue defines number of pending jobs. - Queue int64 - - // Active defines number of jobs which are currently being processed. - Active int64 - - // Delayed defines number of jobs which are being processed. - Delayed int64 -} diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go deleted file mode 100644 index b47d83ee..00000000 --- a/plugins/jobs/oooold/broker/amqp/broker.go +++ /dev/null @@ -1,216 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/gofrs/uuid" - "github.com/spiral/jobs/v2" - "sync" - "sync/atomic" -) - -// Broker represents AMQP broker. -type Broker struct { - cfg *Config - lsn func(event int, ctx interface{}) - publish *chanPool - consume *chanPool - mu sync.Mutex - wait chan error - stopped chan interface{} - queues map[*jobs.Pipeline]*queue -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures AMQP job broker (always 2 connections). -func (b *Broker) Init(cfg *Config) (ok bool, err error) { - b.cfg = cfg - b.queues = make(map[*jobs.Pipeline]*queue) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.queues[pipe]; ok { - return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) - } - - q, err := newQueue(pipe, b.throw) - if err != nil { - return err - } - - b.queues[pipe] = q - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() (err error) { - b.mu.Lock() - - if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { - b.mu.Unlock() - return err - } - defer b.publish.Close() - - if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { - b.mu.Unlock() - return err - } - defer b.consume.Close() - - for _, q := range b.queues { - err := q.declare(b.publish, q.name, q.key, nil) - if err != nil { - b.mu.Unlock() - return err - } - } - - for _, q := range b.queues { - qq := q - if qq.execPool != nil { - go qq.serve(b.publish, b.consume) - } - } - - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - for _, q := range b.queues { - q.stop() - } - - close(b.wait) - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - q.stop() - - q.execPool = execPool - q.errHandler = errHandler - - if b.publish != nil && q.execPool != nil { - if q.execPool != nil { - go q.serve(b.publish, b.consume) - } - } - - return nil -} - -// Push job into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - id, err := uuid.NewV4() - if err != nil { - return "", err - } - - q := b.queue(pipe) - if q == nil { - return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil { - return "", err - } - - return id.String(), nil -} - -// Stat must fetch statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - q := b.queue(pipe) - if q == nil { - return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - queue, err := q.inspect(b.publish) - if err != nil { - return nil, err - } - - // this the closest approximation we can get for now - return &jobs.Stat{ - InternalName: queue.Name, - Queue: int64(queue.Messages), - Active: int64(atomic.LoadInt32(&q.running)), - }, nil -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) queue(pipe *jobs.Pipeline) *queue { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return nil - } - - return q -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/amqp/broker_test.go b/plugins/jobs/oooold/broker/amqp/broker_test.go deleted file mode 100644 index 66078099..00000000 --- a/plugins/jobs/oooold/broker/amqp/broker_test.go +++ /dev/null @@ -1,419 +0,0 @@ -package amqp - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "amqp", - "name": "default", - "queue": "rr-queue", - "exchange": "rr-exchange", - "prefetch": 1, - } - - cfg = &Config{ - Addr: "amqp://guest:guest@localhost:5672/", - } -) - -var ( - fanoutPipe = &jobs.Pipeline{ - "broker": "amqp", - "name": "fanout", - "queue": "fanout-queue", - "exchange": "fanout-exchange", - "exchange-type": "fanout", - "prefetch": 1, - } - - fanoutCfg = &Config{ - Addr: "amqp://guest:guest@localhost:5672/", - } -) - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_BadPipeline(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.Error(t, b.Register(&jobs.Pipeline{ - "broker": "amqp", - "name": "default", - "exchange": "rr-exchange", - "prefetch": 1, - })) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_CantStart(t *testing.T) { - b := &Broker{} - _, err := b.Init(&Config{ - Addr: "amqp://guest:guest@localhost:15672/", - }) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Serve()) -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - err = b.Consume(pipe, exec, errf) - if err != nil { - t.Fatal() - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_Queue_RoutingKey(t *testing.T) { - pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") - - assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key") -} - -func TestBroker_Register_With_RoutingKey(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") - - assert.NoError(t, b.Register(&pipeWithKey)) -} - -func TestBroker_Consume_With_RoutingKey(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") - - err = b.Register(&pipeWithKey) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(&pipeWithKey, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Queue_ExchangeType(t *testing.T) { - pipeWithKey := pipe.With("exchange-type", "direct") - - assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct") -} - -func TestBroker_Register_With_ExchangeType(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("exchange-type", "fanout") - - assert.NoError(t, b.Register(&pipeWithKey)) -} - -func TestBroker_Register_With_WrongExchangeType(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("exchange-type", "xxx") - - assert.Error(t, b.Register(&pipeWithKey)) -} - -func TestBroker_Consume_With_ExchangeType(t *testing.T) { - b := &Broker{} - _, err := b.Init(fanoutCfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := fanoutPipe.With("exchange-type", "fanout") - - err = b.Register(&pipeWithKey) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(&pipeWithKey, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} diff --git a/plugins/jobs/oooold/broker/amqp/conn.go b/plugins/jobs/oooold/broker/amqp/conn.go deleted file mode 100644 index be747776..00000000 --- a/plugins/jobs/oooold/broker/amqp/conn.go +++ /dev/null @@ -1,232 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/cenkalti/backoff/v4" - "github.com/streadway/amqp" - "sync" - "time" -) - -// manages set of AMQP channels -type chanPool struct { - // timeout to backoff redial - tout time.Duration - url string - - mu *sync.Mutex - - conn *amqp.Connection - channels map[string]*channel - wait chan interface{} - connected chan interface{} -} - -// manages single channel -type channel struct { - ch *amqp.Channel - // todo unused - //consumer string - confirm chan amqp.Confirmation - signal chan error -} - -// newConn creates new watched AMQP connection -func newConn(url string, tout time.Duration) (*chanPool, error) { - conn, err := dial(url) - if err != nil { - return nil, err - } - - cp := &chanPool{ - url: url, - tout: tout, - conn: conn, - mu: &sync.Mutex{}, - channels: make(map[string]*channel), - wait: make(chan interface{}), - connected: make(chan interface{}), - } - - close(cp.connected) - go cp.watch() - return cp, nil -} - -// dial dials to AMQP. -func dial(url string) (*amqp.Connection, error) { - return amqp.Dial(url) -} - -// Close gracefully closes all underlying channels and connection. -func (cp *chanPool) Close() error { - cp.mu.Lock() - - close(cp.wait) - if cp.channels == nil { - return fmt.Errorf("connection is dead") - } - - // close all channels and consume - var wg sync.WaitGroup - for _, ch := range cp.channels { - wg.Add(1) - - go func(ch *channel) { - defer wg.Done() - cp.closeChan(ch, nil) - }(ch) - } - cp.mu.Unlock() - - wg.Wait() - - cp.mu.Lock() - defer cp.mu.Unlock() - - if cp.conn != nil { - return cp.conn.Close() - } - - return nil -} - -// waitConnected waits till connection is connected again or eventually closed. -// must only be invoked after connection error has been delivered to channel.signal. -func (cp *chanPool) waitConnected() chan interface{} { - cp.mu.Lock() - defer cp.mu.Unlock() - - return cp.connected -} - -// watch manages connection state and reconnects if needed -func (cp *chanPool) watch() { - for { - select { - case <-cp.wait: - // connection has been closed - return - // here we are waiting for the errors from amqp connection - case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)): - cp.mu.Lock() - // clear connected, since connections are dead - cp.connected = make(chan interface{}) - - // broadcast error to all consume to let them for the tryReconnect - for _, ch := range cp.channels { - ch.signal <- err - } - - // disable channel allocation while server is dead - cp.conn = nil - cp.channels = nil - - // initialize the backoff - expb := backoff.NewExponentialBackOff() - expb.MaxInterval = cp.tout - cp.mu.Unlock() - - // reconnect function - reconnect := func() error { - cp.mu.Lock() - conn, err := dial(cp.url) - if err != nil { - // still failing - fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error())) - cp.mu.Unlock() - return err - } - - // TODO ADD LOGGING - fmt.Println("------amqp successfully redialed------") - - // here we are reconnected - // replace the connection - cp.conn = conn - // re-init the channels - cp.channels = make(map[string]*channel) - cp.mu.Unlock() - return nil - } - - // start backoff retry - errb := backoff.Retry(reconnect, expb) - if errb != nil { - fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error())) - // reconnection failed - close(cp.connected) - return - } - close(cp.connected) - } - } -} - -// channel allocates new channel on amqp connection -func (cp *chanPool) channel(name string) (*channel, error) { - cp.mu.Lock() - dead := cp.conn == nil - cp.mu.Unlock() - - if dead { - // wait for connection restoration (doubled the timeout duration) - select { - case <-time.NewTimer(cp.tout * 2).C: - return nil, fmt.Errorf("connection is dead") - case <-cp.connected: - // connected - } - } - - cp.mu.Lock() - defer cp.mu.Unlock() - - if cp.conn == nil { - return nil, fmt.Errorf("connection has been closed") - } - - if ch, ok := cp.channels[name]; ok { - return ch, nil - } - - // we must create new channel - ch, err := cp.conn.Channel() - if err != nil { - return nil, err - } - - // Enable publish confirmations - if err = ch.Confirm(false); err != nil { - return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err) - } - - // we expect that every allocated channel would have listener on signal - // this is not true only in case of pure producing channels - cp.channels[name] = &channel{ - ch: ch, - confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)), - signal: make(chan error, 1), - } - - return cp.channels[name], nil -} - -// closeChan gracefully closes and removes channel allocation. -func (cp *chanPool) closeChan(c *channel, err error) error { - cp.mu.Lock() - defer cp.mu.Unlock() - - go func() { - c.signal <- nil - c.ch.Close() - }() - - for name, ch := range cp.channels { - if ch == c { - delete(cp.channels, name) - } - } - - return err -} diff --git a/plugins/jobs/oooold/broker/amqp/consume_test.go b/plugins/jobs/oooold/broker/amqp/consume_test.go deleted file mode 100644 index 28999c36..00000000 --- a/plugins/jobs/oooold/broker/amqp/consume_test.go +++ /dev/null @@ -1,258 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - start := time.Now() - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed >= time.Second) - assert.True(t, elapsed < 3*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/amqp/durability_test.go b/plugins/jobs/oooold/broker/amqp/durability_test.go deleted file mode 100644 index 00d62c51..00000000 --- a/plugins/jobs/oooold/broker/amqp/durability_test.go +++ /dev/null @@ -1,728 +0,0 @@ -package amqp - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "io" - "net" - "sync" - "testing" - "time" -) - -var ( - proxyCfg = &Config{ - Addr: "amqp://guest:guest@localhost:5673/", - Timeout: 1, - } - - proxy = &tcpProxy{ - listen: "localhost:5673", - upstream: "localhost:5672", - accept: true, - } -) - -type tcpProxy struct { - listen string - upstream string - mu sync.Mutex - accept bool - conn []net.Conn -} - -func (p *tcpProxy) serve() { - l, err := net.Listen("tcp", p.listen) - if err != nil { - panic(err) - } - - for { - in, err := l.Accept() - if err != nil { - panic(err) - } - - if !p.accepting() { - in.Close() - } - - up, err := net.Dial("tcp", p.upstream) - if err != nil { - panic(err) - } - - go io.Copy(in, up) - go io.Copy(up, in) - - p.mu.Lock() - p.conn = append(p.conn, in, up) - p.mu.Unlock() - } -} - -// wait for specific number of connections -func (p *tcpProxy) waitConn(count int) *tcpProxy { - p.mu.Lock() - p.accept = true - p.mu.Unlock() - - for { - p.mu.Lock() - current := len(p.conn) - p.mu.Unlock() - - if current >= count*2 { - break - } - - time.Sleep(time.Millisecond) - } - - return p -} - -func (p *tcpProxy) reset(accept bool) int { - p.mu.Lock() - p.accept = accept - defer p.mu.Unlock() - - count := 0 - for _, conn := range p.conn { - conn.Close() - count++ - } - - p.conn = nil - return count / 2 -} - -func (p *tcpProxy) accepting() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.accept -} - -func init() { - go proxy.serve() -} - -func TestBroker_Durability_Base(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - // expect 2 connections - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Durability_Consume(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - time.Sleep(3 * time.Second) - proxy.waitConn(1) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NotEqual(t, "0", jid) - - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - if perr != nil { - panic(perr) - } - - proxy.reset(true) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2_2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // start when connection is dead - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - if perr != nil { - panic(perr) - } - - proxy.reset(false) - - _, serr := b.Stat(pipe) - assert.Error(t, serr) - - proxy.reset(true) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume3(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - if perr != nil { - panic(perr) - } - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume4(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2) - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "kill", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - - if j.Payload == "kill" && len(done) == 0 { - proxy.reset(true) - } - - mu.Lock() - defer mu.Unlock() - done[id] = true - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 3 { - break - } - } -} - -func TestBroker_Durability_StopDead(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - proxy.waitConn(2).reset(false) - - b.Stop() -} diff --git a/plugins/jobs/oooold/broker/amqp/job.go b/plugins/jobs/oooold/broker/amqp/job.go deleted file mode 100644 index bd559715..00000000 --- a/plugins/jobs/oooold/broker/amqp/job.go +++ /dev/null @@ -1,56 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/streadway/amqp" -) - -// pack job metadata into headers -func pack(id string, attempt int, j *jobs.Job) amqp.Table { - return amqp.Table{ - "rr-id": id, - "rr-job": j.Job, - "rr-attempt": int64(attempt), - "rr-maxAttempts": int64(j.Options.Attempts), - "rr-timeout": int64(j.Options.Timeout), - "rr-delay": int64(j.Options.Delay), - "rr-retryDelay": int64(j.Options.RetryDelay), - } -} - -// unpack restores jobs.Options -func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) { - j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}} - - if _, ok := d.Headers["rr-id"].(string); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id") - } - - if _, ok := d.Headers["rr-attempt"].(int64); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt") - } - - if _, ok := d.Headers["rr-job"].(string); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job") - } - j.Job = d.Headers["rr-job"].(string) - - if _, ok := d.Headers["rr-maxAttempts"].(int64); ok { - j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64)) - } - - if _, ok := d.Headers["rr-timeout"].(int64); ok { - j.Options.Timeout = int(d.Headers["rr-timeout"].(int64)) - } - - if _, ok := d.Headers["rr-delay"].(int64); ok { - j.Options.Delay = int(d.Headers["rr-delay"].(int64)) - } - - if _, ok := d.Headers["rr-retryDelay"].(int64); ok { - j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64)) - } - - return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil -} diff --git a/plugins/jobs/oooold/broker/amqp/job_test.go b/plugins/jobs/oooold/broker/amqp/job_test.go deleted file mode 100644 index 24ca453b..00000000 --- a/plugins/jobs/oooold/broker/amqp/job_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package amqp - -import ( - "github.com/streadway/amqp" - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_Unpack_Errors(t *testing.T) { - _, _, _, err := unpack(amqp.Delivery{ - Headers: map[string]interface{}{}, - }) - assert.Error(t, err) - - _, _, _, err = unpack(amqp.Delivery{ - Headers: map[string]interface{}{ - "rr-id": "id", - }, - }) - assert.Error(t, err) - - _, _, _, err = unpack(amqp.Delivery{ - Headers: map[string]interface{}{ - "rr-id": "id", - "rr-attempt": int64(0), - }, - }) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/amqp/queue.go b/plugins/jobs/oooold/broker/amqp/queue.go deleted file mode 100644 index 6ef5f20f..00000000 --- a/plugins/jobs/oooold/broker/amqp/queue.go +++ /dev/null @@ -1,302 +0,0 @@ -package amqp - -import ( - "errors" - "fmt" - "github.com/spiral/jobs/v2" - "github.com/streadway/amqp" - "os" - "sync" - "sync/atomic" - "time" -) - -type ExchangeType string - -const ( - Direct ExchangeType = "direct" - Fanout ExchangeType = "fanout" - Topic ExchangeType = "topic" - Headers ExchangeType = "headers" -) - -func (et ExchangeType) IsValid() error { - switch et { - case Direct, Fanout, Topic, Headers: - return nil - } - return errors.New("unknown exchange-type") -} - -func (et ExchangeType) String() string { - switch et { - case Direct, Fanout, Topic, Headers: - return string(et) - default: - return "direct" - } -} - - -type queue struct { - active int32 - pipe *jobs.Pipeline - exchange string - exchangeType ExchangeType - name, key string - consumer string - - // active consuming channel - muc sync.Mutex - cc *channel - - // queue events - lsn func(event int, ctx interface{}) - - // active operations - muw sync.RWMutex - wg sync.WaitGroup - - // exec handlers - running int32 - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -// newQueue creates new queue wrapper for AMQP. -func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { - if pipe.String("queue", "") == "" { - return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline") - } - - exchangeType := ExchangeType(pipe.String("exchange-type", "direct")) - - err := exchangeType.IsValid() - if err != nil { - return nil, fmt.Errorf(err.Error()) - } - - return &queue{ - exchange: pipe.String("exchange", "amqp.direct"), - exchangeType: exchangeType, - name: pipe.String("queue", ""), - key: pipe.String("routing-key", pipe.String("queue", "")), - consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())), - pipe: pipe, - lsn: lsn, - }, nil -} - -// serve consumes queue -func (q *queue) serve(publish, consume *chanPool) { - atomic.StoreInt32(&q.active, 1) - - for { - <-consume.waitConnected() - if atomic.LoadInt32(&q.active) == 0 { - // stopped - return - } - - delivery, cc, err := q.consume(consume) - if err != nil { - q.report(err) - continue - } - - q.muc.Lock() - q.cc = cc - q.muc.Unlock() - - for d := range delivery { - q.muw.Lock() - q.wg.Add(1) - q.muw.Unlock() - - atomic.AddInt32(&q.running, 1) - h := <-q.execPool - - go func(h jobs.Handler, d amqp.Delivery) { - err := q.do(publish, h, d) - - atomic.AddInt32(&q.running, ^int32(0)) - q.execPool <- h - q.wg.Done() - q.report(err) - }(h, d) - } - } -} - -func (q *queue) consume(consume *chanPool) (jobs <-chan amqp.Delivery, cc *channel, err error) { - // allocate channel for the consuming - if cc, err = consume.channel(q.name); err != nil { - return nil, nil, err - } - - if err := cc.ch.Qos(q.pipe.Integer("prefetch", 4), 0, false); err != nil { - return nil, nil, consume.closeChan(cc, err) - } - - delivery, err := cc.ch.Consume(q.name, q.consumer, false, false, false, false, nil) - if err != nil { - return nil, nil, consume.closeChan(cc, err) - } - - // do i like it? - go func(consume *chanPool) { - for err := range cc.signal { - consume.closeChan(cc, err) - return - } - }(consume) - - return delivery, cc, err -} - -func (q *queue) do(cp *chanPool, h jobs.Handler, d amqp.Delivery) error { - id, attempt, j, err := unpack(d) - if err != nil { - q.report(err) - return d.Nack(false, false) - } - err = h(id, j) - - if err == nil { - return d.Ack(false) - } - - // failed - q.errHandler(id, j, err) - - if !j.Options.CanRetry(attempt) { - return d.Nack(false, false) - } - - // retry as new j (to accommodate attempt number and new delay) - if err = q.publish(cp, id, attempt+1, j, j.Options.RetryDuration()); err != nil { - q.report(err) - return d.Nack(false, true) - } - - return d.Ack(false) -} - -func (q *queue) stop() { - if atomic.LoadInt32(&q.active) == 0 { - return - } - - atomic.StoreInt32(&q.active, 0) - - q.muc.Lock() - if q.cc != nil { - // gracefully stopped consuming - q.report(q.cc.ch.Cancel(q.consumer, true)) - } - q.muc.Unlock() - - q.muw.Lock() - q.wg.Wait() - q.muw.Unlock() -} - -// publish message to queue or to delayed queue. -func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay time.Duration) error { - c, err := cp.channel(q.name) - if err != nil { - return err - } - - qKey := q.key - - if delay != 0 { - delayMs := int64(delay.Seconds() * 1000) - qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name) - qKey = qName - - err := q.declare(cp, qName, qName, amqp.Table{ - "x-dead-letter-exchange": q.exchange, - "x-dead-letter-routing-key": q.name, - "x-message-ttl": delayMs, - "x-expires": delayMs * 2, - }) - - if err != nil { - return err - } - } - - err = c.ch.Publish( - q.exchange, // exchange - qKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/octet-stream", - Body: j.Body(), - DeliveryMode: amqp.Persistent, - Headers: pack(id, attempt, j), - }, - ) - - if err != nil { - return cp.closeChan(c, err) - } - - confirmed, ok := <-c.confirm - if ok && confirmed.Ack { - return nil - } - - return fmt.Errorf("failed to publish: %v", confirmed.DeliveryTag) -} - -// declare queue and binding to it -func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table) error { - c, err := cp.channel(q.name) - if err != nil { - return err - } - - err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil) - if err != nil { - return cp.closeChan(c, err) - } - - _, err = c.ch.QueueDeclare(queue, true, false, false, false, args) - if err != nil { - return cp.closeChan(c, err) - } - - err = c.ch.QueueBind(queue, key, q.exchange, false, nil) - if err != nil { - return cp.closeChan(c, err) - } - - // keep channel open - return err -} - -// inspect the queue -func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) { - c, err := cp.channel("stat") - if err != nil { - return nil, err - } - - queue, err := c.ch.QueueInspect(q.name) - if err != nil { - return nil, cp.closeChan(c, err) - } - - // keep channel open - return &queue, err -} - -// throw handles service, server and pool events. -func (q *queue) report(err error) { - if err != nil { - q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) - } -} diff --git a/plugins/jobs/oooold/broker/amqp/stat_test.go b/plugins/jobs/oooold/broker/amqp/stat_test.go deleted file mode 100644 index ef19746c..00000000 --- a/plugins/jobs/oooold/broker/amqp/stat_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package amqp - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "sync" - "testing" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Queue) - assert.Equal(t, int64(0), stat.Active) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - wg := &sync.WaitGroup{} - wg.Add(1) - exec <- func(id string, j *jobs.Job) error { - defer wg.Done() - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Active) - - return nil - } - - wg.Wait() - stat, err = b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) - assert.Equal(t, int64(0), stat.Active) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/broker.go b/plugins/jobs/oooold/broker/beanstalk/broker.go deleted file mode 100644 index dc3ea518..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/broker.go +++ /dev/null @@ -1,185 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "sync" -) - -// Broker run consume using Broker service. -type Broker struct { - cfg *Config - lsn func(event int, ctx interface{}) - mu sync.Mutex - wait chan error - stopped chan interface{} - conn *conn - tubes map[*jobs.Pipeline]*tube -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures broker. -func (b *Broker) Init(cfg *Config) (bool, error) { - b.cfg = cfg - b.tubes = make(map[*jobs.Pipeline]*tube) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.tubes[pipe]; ok { - return fmt.Errorf("tube `%s` has already been registered", pipe.Name()) - } - - t, err := newTube(pipe, b.throw) - if err != nil { - return err - } - - b.tubes[pipe] = t - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() (err error) { - b.mu.Lock() - - if b.conn, err = b.cfg.newConn(); err != nil { - return err - } - defer b.conn.Close() - - for _, t := range b.tubes { - tt := t - if tt.execPool != nil { - go tt.serve(b.cfg) - } - } - - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - for _, t := range b.tubes { - t.stop() - } - - close(b.wait) - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to reset consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - t, ok := b.tubes[pipe] - if !ok { - return fmt.Errorf("undefined tube `%s`", pipe.Name()) - } - - t.stop() - - t.execPool = execPool - t.errHandler = errHandler - - if b.conn != nil { - tt := t - if tt.execPool != nil { - go tt.serve(connFactory(b.cfg)) - } - } - - return nil -} - -// Push data into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - t := b.tube(pipe) - if t == nil { - return "", fmt.Errorf("undefined tube `%s`", pipe.Name()) - } - - data, err := pack(j) - if err != nil { - return "", err - } - - return t.put(b.conn, 0, data, j.Options.DelayDuration(), j.Options.TimeoutDuration()) -} - -// Stat must fetch statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - t := b.tube(pipe) - if t == nil { - return nil, fmt.Errorf("undefined tube `%s`", pipe.Name()) - } - - return t.stat(b.conn) -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) tube(pipe *jobs.Pipeline) *tube { - b.mu.Lock() - defer b.mu.Unlock() - - t, ok := b.tubes[pipe] - if !ok { - return nil - } - - return t -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/beanstalk/broker_test.go b/plugins/jobs/oooold/broker/beanstalk/broker_test.go deleted file mode 100644 index cd2132af..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/broker_test.go +++ /dev/null @@ -1,276 +0,0 @@ -package beanstalk - -import ( - "github.com/beanstalkd/go-beanstalk" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "beanstalk", - "name": "default", - "tube": "test", - } - - cfg = &Config{ - Addr: "tcp://localhost:11300", - } -) - -func init() { - conn, err := beanstalk.Dial("tcp", "localhost:11300") - if err != nil { - panic(err) - } - defer conn.Close() - - t := beanstalk.Tube{Name: "testTube", Conn: conn} - - for { - id, _, err := t.PeekReady() - if id == 0 || err != nil { - break - } - - if err := conn.Delete(id); err != nil { - panic(err) - } - } -} - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Register_Invalid(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.Error(t, b.Register(&jobs.Pipeline{ - "broker": "beanstalk", - "name": "default", - })) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_Error(t *testing.T) { - b := &Broker{} - _, err := b.Init(&Config{ - Addr: "tcp://localhost:11399", - }) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Serve()) -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - err = b.Consume(pipe, exec, errf) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/config.go b/plugins/jobs/oooold/broker/beanstalk/config.go deleted file mode 100644 index 3e48a2d7..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/config.go +++ /dev/null @@ -1,50 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/spiral/roadrunner/service" - "strings" - "time" -) - -// Config defines beanstalk broker configuration. -type Config struct { - // Addr of beanstalk server. - Addr string - - // Timeout to allocate the connection. Default 10 seconds. - Timeout int -} - -// Hydrate config values. -func (c *Config) Hydrate(cfg service.Config) error { - if err := cfg.Unmarshal(c); err != nil { - return err - } - - if c.Addr == "" { - return fmt.Errorf("beanstalk address is missing") - } - - return nil -} - -// TimeoutDuration returns number of seconds allowed to allocate the connection. -func (c *Config) TimeoutDuration() time.Duration { - timeout := c.Timeout - if timeout == 0 { - timeout = 10 - } - - return time.Duration(timeout) * time.Second -} - -// size creates new rpc socket Listener. -func (c *Config) newConn() (*conn, error) { - dsn := strings.Split(c.Addr, "://") - if len(dsn) != 2 { - return nil, fmt.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock)") - } - - return newConn(dsn[0], dsn[1], c.TimeoutDuration()) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/config_test.go b/plugins/jobs/oooold/broker/beanstalk/config_test.go deleted file mode 100644 index 4ba08a04..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/config_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package beanstalk - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func TestConfig_Hydrate_Error(t *testing.T) { - cfg := &mockCfg{`{"dead`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func TestConfig_Hydrate_Error2(t *testing.T) { - cfg := &mockCfg{`{"addr":""}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func TestConfig_Hydrate_Error3(t *testing.T) { - cfg := &mockCfg{`{"addr":"tcp"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - _, err := c.newConn() - assert.Error(t, err) -} - -func TestConfig_Hydrate_Error4(t *testing.T) { - cfg := &mockCfg{`{"addr":"unix://sock.bean"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - _, err := c.newConn() - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/conn.go b/plugins/jobs/oooold/broker/beanstalk/conn.go deleted file mode 100644 index 7aba6bbb..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/conn.go +++ /dev/null @@ -1,180 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/beanstalkd/go-beanstalk" - "github.com/cenkalti/backoff/v4" - "strings" - "sync" - "time" -) - -var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} - -// creates new connections -type connFactory interface { - newConn() (*conn, error) -} - -// conn protects allocation for one connection between -// threads and provides reconnecting capabilities. -type conn struct { - tout time.Duration - conn *beanstalk.Conn - alive bool - free chan interface{} - dead chan interface{} - stop chan interface{} - lock *sync.Cond -} - -// creates new beanstalk connection and reconnect watcher. -func newConn(network, addr string, tout time.Duration) (cn *conn, err error) { - cn = &conn{ - tout: tout, - alive: true, - free: make(chan interface{}, 1), - dead: make(chan interface{}, 1), - stop: make(chan interface{}), - lock: sync.NewCond(&sync.Mutex{}), - } - - cn.conn, err = beanstalk.Dial(network, addr) - if err != nil { - return nil, err - } - - go cn.watch(network, addr) - - return cn, nil -} - -// reset the connection and reconnect watcher. -func (cn *conn) Close() error { - cn.lock.L.Lock() - defer cn.lock.L.Unlock() - - close(cn.stop) - for cn.alive { - cn.lock.Wait() - } - - return nil -} - -// acquire connection instance or return error in case of timeout. When mandratory set to true -// timeout won't be applied. -func (cn *conn) acquire(mandatory bool) (*beanstalk.Conn, error) { - // do not apply timeout on mandatory connections - if mandatory { - select { - case <-cn.stop: - return nil, fmt.Errorf("connection closed") - case <-cn.free: - return cn.conn, nil - } - } - - select { - case <-cn.stop: - return nil, fmt.Errorf("connection closed") - case <-cn.free: - return cn.conn, nil - default: - // *2 to handle commands called right after the connection reset - tout := time.NewTimer(cn.tout * 2) - select { - case <-cn.stop: - tout.Stop() - return nil, fmt.Errorf("connection closed") - case <-cn.free: - tout.Stop() - return cn.conn, nil - case <-tout.C: - return nil, fmt.Errorf("unable to allocate connection (timeout %s)", cn.tout) - } - } -} - -// release acquired connection. -func (cn *conn) release(err error) error { - if isConnError(err) { - // reconnect is required - cn.dead <- err - } else { - cn.free <- nil - } - - return err -} - -// watch and reconnect if dead -func (cn *conn) watch(network, addr string) { - cn.free <- nil - t := time.NewTicker(WatchThrottleLimit) - defer t.Stop() - for { - select { - case <-cn.dead: - // simple throttle limiter - <-t.C - // try to reconnect - // TODO add logging here - expb := backoff.NewExponentialBackOff() - expb.MaxInterval = cn.tout - - reconnect := func() error { - conn, err := beanstalk.Dial(network, addr) - if err != nil { - fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error())) - return err - } - - // TODO ADD LOGGING - fmt.Println("------beanstalk successfully redialed------") - - cn.conn = conn - cn.free <- nil - return nil - } - - err := backoff.Retry(reconnect, expb) - if err != nil { - fmt.Println(fmt.Sprintf("redial failed: %s", err.Error())) - cn.dead <- nil - } - - case <-cn.stop: - cn.lock.L.Lock() - select { - case <-cn.dead: - case <-cn.free: - } - - // stop underlying connection - cn.conn.Close() - cn.alive = false - cn.lock.Signal() - - cn.lock.L.Unlock() - - return - } - } -} - -// isConnError indicates that error is related to dead socket. -func isConnError(err error) bool { - if err == nil { - return false - } - - for _, errStr := range connErrors { - // golang... - if strings.Contains(err.Error(), errStr) { - return true - } - } - - return false -} diff --git a/plugins/jobs/oooold/broker/beanstalk/constants.go b/plugins/jobs/oooold/broker/beanstalk/constants.go deleted file mode 100644 index 84be305e..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/constants.go +++ /dev/null @@ -1,6 +0,0 @@ -package beanstalk - -import "time" - -// WatchThrottleLimit is used to limit reconnection occurrence in watch function -const WatchThrottleLimit = time.Second
\ No newline at end of file diff --git a/plugins/jobs/oooold/broker/beanstalk/consume_test.go b/plugins/jobs/oooold/broker/beanstalk/consume_test.go deleted file mode 100644 index b16866ae..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/consume_test.go +++ /dev/null @@ -1,242 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - start := time.Now() - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed >= time.Second) - assert.True(t, elapsed < 2*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/durability_test.go b/plugins/jobs/oooold/broker/beanstalk/durability_test.go deleted file mode 100644 index 499a5206..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/durability_test.go +++ /dev/null @@ -1,575 +0,0 @@ -package beanstalk - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "io" - "net" - "sync" - "testing" - "time" -) - -var ( - proxyCfg = &Config{ - Addr: "tcp://localhost:11301", - Timeout: 1, - } - - proxy = &tcpProxy{ - listen: "localhost:11301", - upstream: "localhost:11300", - accept: true, - } -) - -type tcpProxy struct { - listen string - upstream string - mu sync.Mutex - accept bool - conn []net.Conn -} - -func (p *tcpProxy) serve() { - l, err := net.Listen("tcp", p.listen) - if err != nil { - panic(err) - } - - for { - in, err := l.Accept() - if err != nil { - panic(err) - } - - if !p.accepting() { - in.Close() - } - - up, err := net.Dial("tcp", p.upstream) - if err != nil { - panic(err) - } - - go io.Copy(in, up) - go io.Copy(up, in) - - p.mu.Lock() - p.conn = append(p.conn, in, up) - p.mu.Unlock() - } -} - -// wait for specific number of connections -func (p *tcpProxy) waitConn(count int) *tcpProxy { - p.mu.Lock() - p.accept = true - p.mu.Unlock() - - for { - p.mu.Lock() - current := len(p.conn) - p.mu.Unlock() - - if current >= count*2 { - break - } - - time.Sleep(time.Millisecond) - } - - return p -} - -func (p *tcpProxy) reset(accept bool) int { - p.mu.Lock() - p.accept = accept - defer p.mu.Unlock() - - count := 0 - for _, conn := range p.conn { - conn.Close() - count++ - } - - p.conn = nil - return count / 2 -} - -func (p *tcpProxy) accepting() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.accept -} - -func init() { - go proxy.serve() -} - -func TestBroker_Durability_Base(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - // expect 2 connections - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Durability_Consume(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // reoccuring - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - time.Sleep(3 * time.Second) - proxy.waitConn(1) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NotEqual(t, "0", jid) - - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(pipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - proxy.reset(true) - - // auto-reconnect - _, serr = b.Stat(pipe) - assert.NoError(t, serr) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_Consume3(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(pipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_Consume4(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2) - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "kill", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - st, serr := b.Stat(pipe) - assert.NoError(t, serr) - assert.Equal(t, int64(3), st.Queue+st.Active) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - if j.Payload == "kill" { - proxy.reset(true) - } - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_StopDead(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - proxy.waitConn(2).reset(false) - - b.Stop() -} diff --git a/plugins/jobs/oooold/broker/beanstalk/job.go b/plugins/jobs/oooold/broker/beanstalk/job.go deleted file mode 100644 index fd9c8c3c..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/job.go +++ /dev/null @@ -1,24 +0,0 @@ -package beanstalk - -import ( - "bytes" - "encoding/gob" - "github.com/spiral/jobs/v2" -) - -func pack(j *jobs.Job) ([]byte, error) { - b := new(bytes.Buffer) - err := gob.NewEncoder(b).Encode(j) - if err != nil { - return nil, err - } - - return b.Bytes(), nil -} - -func unpack(data []byte) (*jobs.Job, error) { - j := &jobs.Job{} - err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(j) - - return j, err -} diff --git a/plugins/jobs/oooold/broker/beanstalk/sock.bean b/plugins/jobs/oooold/broker/beanstalk/sock.bean deleted file mode 100644 index e69de29b..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/sock.bean +++ /dev/null diff --git a/plugins/jobs/oooold/broker/beanstalk/stat_test.go b/plugins/jobs/oooold/broker/beanstalk/stat_test.go deleted file mode 100644 index 14a55859..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/stat_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package beanstalk - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - // beanstalk reserves job right after push - time.Sleep(time.Millisecond * 100) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Queue) - assert.Equal(t, int64(0), stat.Active) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) - assert.Equal(t, int64(1), stat.Active) - - close(waitJob) - return nil - } - - <-waitJob - - stat, err = b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/tube.go b/plugins/jobs/oooold/broker/beanstalk/tube.go deleted file mode 100644 index 9d7ad117..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/tube.go +++ /dev/null @@ -1,250 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/beanstalkd/go-beanstalk" - "github.com/spiral/jobs/v2" - "strconv" - "sync" - "sync/atomic" - "time" -) - -type tube struct { - active int32 - pipe *jobs.Pipeline - mut sync.Mutex - tube *beanstalk.Tube - tubeSet *beanstalk.TubeSet - reserve time.Duration - - // tube events - lsn func(event int, ctx interface{}) - - // stop channel - wait chan interface{} - - // active operations - muw sync.RWMutex - wg sync.WaitGroup - - // exec handlers - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -type entry struct { - id uint64 - data []byte -} - -func (e *entry) String() string { - return fmt.Sprintf("%v", e.id) -} - -// create new tube consumer and producer -func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) { - if pipe.String("tube", "") == "" { - return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline") - } - - return &tube{ - pipe: pipe, - tube: &beanstalk.Tube{Name: pipe.String("tube", "")}, - tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")), - reserve: pipe.Duration("reserve", time.Second), - lsn: lsn, - }, nil -} - -// run consumers -func (t *tube) serve(connector connFactory) { - // tube specific consume connection - cn, err := connector.newConn() - if err != nil { - t.report(err) - return - } - defer cn.Close() - - t.wait = make(chan interface{}) - atomic.StoreInt32(&t.active, 1) - - for { - e, err := t.consume(cn) - if err != nil { - if isConnError(err) { - t.report(err) - } - continue - } - - if e == nil { - return - } - - h := <-t.execPool - go func(h jobs.Handler, e *entry) { - err := t.do(cn, h, e) - t.execPool <- h - t.wg.Done() - t.report(err) - }(h, e) - } -} - -// fetch consume -func (t *tube) consume(cn *conn) (*entry, error) { - t.muw.Lock() - defer t.muw.Unlock() - - select { - case <-t.wait: - return nil, nil - default: - conn, err := cn.acquire(false) - if err != nil { - return nil, err - } - - t.tubeSet.Conn = conn - - id, data, err := t.tubeSet.Reserve(t.reserve) - cn.release(err) - - if err != nil { - return nil, err - } - - t.wg.Add(1) - return &entry{id: id, data: data}, nil - } -} - -// do data -func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error { - j, err := unpack(e.data) - if err != nil { - return err - } - - err = h(e.String(), j) - - // mandatory acquisition - conn, connErr := cn.acquire(true) - if connErr != nil { - // possible if server is dead - return connErr - } - - if err == nil { - return cn.release(conn.Delete(e.id)) - } - - stat, statErr := conn.StatsJob(e.id) - if statErr != nil { - return cn.release(statErr) - } - - t.errHandler(e.String(), j, err) - - reserves, ok := strconv.Atoi(stat["reserves"]) - if ok != nil || !j.Options.CanRetry(reserves-1) { - return cn.release(conn.Bury(e.id, 0)) - } - - return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration())) -} - -// stop tube consuming -func (t *tube) stop() { - if atomic.LoadInt32(&t.active) == 0 { - return - } - - atomic.StoreInt32(&t.active, 0) - - close(t.wait) - - t.muw.Lock() - t.wg.Wait() - t.muw.Unlock() -} - -// put data into pool or return error (no wait), this method will try to reattempt operation if -// dead conn found. -func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { - id, err = t.doPut(cn, attempt, data, delay, rrt) - if err != nil && isConnError(err) { - return t.doPut(cn, attempt, data, delay, rrt) - } - - return id, err -} - -// perform put operation -func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { - conn, err := cn.acquire(false) - if err != nil { - return "", err - } - - var bid uint64 - - t.mut.Lock() - t.tube.Conn = conn - bid, err = t.tube.Put(data, 0, delay, rrt) - t.mut.Unlock() - - return strconv.FormatUint(bid, 10), cn.release(err) -} - -// return tube stats (retries) -func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) { - stat, err = t.doStat(cn) - if err != nil && isConnError(err) { - return t.doStat(cn) - } - - return stat, err -} - -// return tube stats -func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) { - conn, err := cn.acquire(false) - if err != nil { - return nil, err - } - - t.mut.Lock() - t.tube.Conn = conn - values, err := t.tube.Stats() - t.mut.Unlock() - - if err != nil { - return nil, cn.release(err) - } - - stat = &jobs.Stat{InternalName: t.tube.Name} - - if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil { - stat.Queue = int64(v) - } - - if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil { - stat.Active = int64(v) - } - - if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil { - stat.Delayed = int64(v) - } - - return stat, cn.release(nil) -} - -// report tube specific error -func (t *tube) report(err error) { - if err != nil { - t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err}) - } -} diff --git a/plugins/jobs/oooold/broker/beanstalk/tube_test.go b/plugins/jobs/oooold/broker/beanstalk/tube_test.go deleted file mode 100644 index b6a646f4..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/tube_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package beanstalk - -import ( - "github.com/stretchr/testify/assert" - "testing" -) - -func TestTube_CantServe(t *testing.T) { - var gctx interface{} - tube := &tube{ - lsn: func(event int, ctx interface{}) { - gctx = ctx - }, - } - - tube.serve(&Config{Addr: "broken"}) - assert.Error(t, gctx.(error)) -} diff --git a/plugins/jobs/oooold/broker/ephemeral/broker.go b/plugins/jobs/oooold/broker/ephemeral/broker.go deleted file mode 100644 index 385bb175..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/broker.go +++ /dev/null @@ -1,174 +0,0 @@ -package ephemeral - -import ( - "fmt" - "github.com/gofrs/uuid" - "github.com/spiral/jobs/v2" - "sync" -) - -// Broker run queue using local goroutines. -type Broker struct { - lsn func(event int, ctx interface{}) - mu sync.Mutex - wait chan error - stopped chan interface{} - queues map[*jobs.Pipeline]*queue -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures broker. -func (b *Broker) Init() (bool, error) { - b.queues = make(map[*jobs.Pipeline]*queue) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.queues[pipe]; ok { - return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) - } - - b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0)) - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() error { - // start consuming - b.mu.Lock() - for _, q := range b.queues { - qq := q - if qq.execPool != nil { - go qq.serve() - } - } - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - // stop all consuming - for _, q := range b.queues { - q.stop() - } - - close(b.wait) - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - q.stop() - - q.execPool = execPool - q.errHandler = errHandler - - if b.wait != nil { - if q.execPool != nil { - go q.serve() - } - } - - return nil -} - -// Push job into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - q := b.queue(pipe) - if q == nil { - return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - id, err := uuid.NewV4() - if err != nil { - return "", err - } - - q.push(id.String(), j, 0, j.Options.DelayDuration()) - - return id.String(), nil -} - -// Stat must consume statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - q := b.queue(pipe) - if q == nil { - return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - return q.stat(), nil -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) queue(pipe *jobs.Pipeline) *queue { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return nil - } - - return q -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/ephemeral/broker_test.go b/plugins/jobs/oooold/broker/ephemeral/broker_test.go deleted file mode 100644 index c1b40276..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/broker_test.go +++ /dev/null @@ -1,221 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "local", - "name": "default", - } -) - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init() - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - b.Init() - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - err = b.Consume(pipe, exec, errf) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/ephemeral/consume_test.go b/plugins/jobs/oooold/broker/ephemeral/consume_test.go deleted file mode 100644 index d764a984..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/consume_test.go +++ /dev/null @@ -1,253 +0,0 @@ -package ephemeral - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - start := time.Now() - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed > time.Second) - assert.True(t, elapsed < 2*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/ephemeral/queue.go b/plugins/jobs/oooold/broker/ephemeral/queue.go deleted file mode 100644 index a24bc216..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/queue.go +++ /dev/null @@ -1,161 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/jobs/v2" - "sync" - "sync/atomic" - "time" -) - -type queue struct { - on int32 - state *jobs.Stat - - // job pipeline - concurPool chan interface{} - jobs chan *entry - - // on operations - muw sync.Mutex - wg sync.WaitGroup - - // stop channel - wait chan interface{} - - // exec handlers - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -type entry struct { - id string - job *jobs.Job - attempt int -} - -// create new queue -func newQueue(maxConcur int) *queue { - q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)} - - if maxConcur != 0 { - q.concurPool = make(chan interface{}, maxConcur) - for i := 0; i < maxConcur; i++ { - q.concurPool <- nil - } - } - - return q -} - -// serve consumers -func (q *queue) serve() { - q.wait = make(chan interface{}) - atomic.StoreInt32(&q.on, 1) - - for { - e := q.consume() - if e == nil { - q.wg.Wait() - return - } - - if q.concurPool != nil { - <-q.concurPool - } - - atomic.AddInt64(&q.state.Active, 1) - h := <-q.execPool - - go func(h jobs.Handler, e *entry) { - defer q.wg.Done() - - q.do(h, e) - atomic.AddInt64(&q.state.Active, ^int64(0)) - - q.execPool <- h - - if q.concurPool != nil { - q.concurPool <- nil - } - }(h, e) - } -} - -// allocate one job entry -func (q *queue) consume() *entry { - q.muw.Lock() - defer q.muw.Unlock() - - select { - case <-q.wait: - return nil - case e := <-q.jobs: - q.wg.Add(1) - - return e - } -} - -// do singe job -func (q *queue) do(h jobs.Handler, e *entry) { - err := h(e.id, e.job) - - if err == nil { - atomic.AddInt64(&q.state.Queue, ^int64(0)) - return - } - - q.errHandler(e.id, e.job, err) - - if !e.job.Options.CanRetry(e.attempt) { - atomic.AddInt64(&q.state.Queue, ^int64(0)) - return - } - - q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration()) -} - -// stop the queue consuming -func (q *queue) stop() { - if atomic.LoadInt32(&q.on) == 0 { - return - } - - close(q.wait) - - q.muw.Lock() - q.wg.Wait() - q.muw.Unlock() - - atomic.StoreInt32(&q.on, 0) -} - -// add job to the queue -func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) { - if delay == 0 { - atomic.AddInt64(&q.state.Queue, 1) - go func() { - q.jobs <- &entry{id: id, job: j, attempt: attempt} - }() - - return - } - - atomic.AddInt64(&q.state.Delayed, 1) - go func() { - time.Sleep(delay) - atomic.AddInt64(&q.state.Delayed, ^int64(0)) - atomic.AddInt64(&q.state.Queue, 1) - - q.jobs <- &entry{id: id, job: j, attempt: attempt} - }() -} - -func (q *queue) stat() *jobs.Stat { - return &jobs.Stat{ - InternalName: ":memory:", - Queue: atomic.LoadInt64(&q.state.Queue), - Active: atomic.LoadInt64(&q.state.Active), - Delayed: atomic.LoadInt64(&q.state.Delayed), - } -} diff --git a/plugins/jobs/oooold/broker/ephemeral/stat_test.go b/plugins/jobs/oooold/broker/ephemeral/stat_test.go deleted file mode 100644 index 0894323c..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/stat_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Queue) - assert.Equal(t, int64(0), stat.Active) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Active) - - close(waitJob) - return nil - } - - <-waitJob - stat, err = b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) - assert.Equal(t, int64(0), stat.Active) -} diff --git a/plugins/jobs/oooold/broker/sqs/broker.go b/plugins/jobs/oooold/broker/sqs/broker.go deleted file mode 100644 index 8cc62b6b..00000000 --- a/plugins/jobs/oooold/broker/sqs/broker.go +++ /dev/null @@ -1,189 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/jobs/v2" - "sync" -) - -// Broker represents SQS broker. -type Broker struct { - cfg *Config - sqs *sqs.SQS - lsn func(event int, ctx interface{}) - mu sync.Mutex - wait chan error - stopped chan interface{} - queues map[*jobs.Pipeline]*queue -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures SQS broker. -func (b *Broker) Init(cfg *Config) (ok bool, err error) { - b.cfg = cfg - b.queues = make(map[*jobs.Pipeline]*queue) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.queues[pipe]; ok { - return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) - } - - q, err := newQueue(pipe, b.throw) - if err != nil { - return err - } - - b.queues[pipe] = q - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() (err error) { - b.mu.Lock() - - b.sqs, err = b.cfg.SQS() - if err != nil { - return err - } - - for _, q := range b.queues { - q.url, err = q.declareQueue(b.sqs) - if err != nil { - return err - } - } - - for _, q := range b.queues { - qq := q - if qq.execPool != nil { - go qq.serve(b.sqs, b.cfg.TimeoutDuration()) - } - } - - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - for _, q := range b.queues { - q.stop() - } - - b.wait <- nil - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - q.stop() - - q.execPool = execPool - q.errHandler = errHandler - - if b.sqs != nil && q.execPool != nil { - go q.serve(b.sqs, b.cfg.TimeoutDuration()) - } - - return nil -} - -// Push job into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - q := b.queue(pipe) - if q == nil { - return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - if j.Options.Delay > 900 || j.Options.RetryDelay > 900 { - return "", fmt.Errorf("unable to push into `%s`, maximum delay value is 900", pipe.Name()) - } - - return q.send(b.sqs, j) -} - -// Stat must fetch statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - q := b.queue(pipe) - if q == nil { - return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - return q.stat(b.sqs) -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) queue(pipe *jobs.Pipeline) *queue { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return nil - } - - return q -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/sqs/broker_test.go b/plugins/jobs/oooold/broker/sqs/broker_test.go deleted file mode 100644 index c87b302d..00000000 --- a/plugins/jobs/oooold/broker/sqs/broker_test.go +++ /dev/null @@ -1,275 +0,0 @@ -package sqs - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "test", - "declare": map[string]interface{}{ - "MessageRetentionPeriod": 86400, - }, - } - - cfg = &Config{ - Key: "api-key", - Secret: "api-secret", - Region: "us-west-1", - Endpoint: "http://localhost:9324", - } -) - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_RegisterInvalid(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.Error(t, b.Register(&jobs.Pipeline{ - "broker": "sqs", - "name": "default", - })) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - b.Consume(pipe, exec, errf) - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_InvalidQueue(t *testing.T) { - pipe := &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "invalid", - "declare": map[string]interface{}{ - "VisibilityTimeout": "invalid", - }, - } - - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - b.Consume(pipe, exec, errf) - - assert.Error(t, b.Serve()) -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/sqs/config.go b/plugins/jobs/oooold/broker/sqs/config.go deleted file mode 100644 index d0c2f2b2..00000000 --- a/plugins/jobs/oooold/broker/sqs/config.go +++ /dev/null @@ -1,82 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/roadrunner/service" - "time" -) - -// Config defines sqs broker configuration. -type Config struct { - // Region defined SQS region, not required when endpoint is not empty. - Region string - - // Region defined AWS API key, not required when endpoint is not empty. - Key string - - // Region defined AWS API secret, not required when endpoint is not empty. - Secret string - - // Endpoint can be used to re-define SQS endpoint to custom location. Only for local development. - Endpoint string - - // Timeout to allocate the connection. Default 10 seconds. - Timeout int -} - -// Hydrate config values. -func (c *Config) Hydrate(cfg service.Config) error { - if err := cfg.Unmarshal(c); err != nil { - return err - } - - if c.Region == "" { - return fmt.Errorf("SQS region is missing") - } - - if c.Key == "" { - return fmt.Errorf("SQS key is missing") - } - - if c.Secret == "" { - return fmt.Errorf("SQS secret is missing") - } - - return nil -} - -// TimeoutDuration returns number of seconds allowed to allocate the connection. -func (c *Config) TimeoutDuration() time.Duration { - timeout := c.Timeout - if timeout == 0 { - timeout = 10 - } - - return time.Duration(timeout) * time.Second -} - -// Session returns new AWS session. -func (c *Config) Session() (*session.Session, error) { - return session.NewSession(&aws.Config{ - Region: aws.String(c.Region), - Credentials: credentials.NewStaticCredentials(c.Key, c.Secret, ""), - }) -} - -// SQS returns new SQS instance or error. -func (c *Config) SQS() (*sqs.SQS, error) { - sess, err := c.Session() - if err != nil { - return nil, err - } - - if c.Endpoint == "" { - return sqs.New(sess), nil - } - - return sqs.New(sess, &aws.Config{Endpoint: aws.String(c.Endpoint)}), nil -} diff --git a/plugins/jobs/oooold/broker/sqs/config_test.go b/plugins/jobs/oooold/broker/sqs/config_test.go deleted file mode 100644 index b36b3c6f..00000000 --- a/plugins/jobs/oooold/broker/sqs/config_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package sqs - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func Test_Config_Hydrate_Error(t *testing.T) { - cfg := &mockCfg{`{"dead`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error2(t *testing.T) { - cfg := &mockCfg{`{}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error3(t *testing.T) { - cfg := &mockCfg{`{"region":"us-east-1"}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error4(t *testing.T) { - cfg := &mockCfg{`{"region":"us-east-1","key":"key"}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error5(t *testing.T) { - cfg := &mockCfg{`{"region":"us-east-1","key":"key","secret":"secret"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) -} diff --git a/plugins/jobs/oooold/broker/sqs/consume_test.go b/plugins/jobs/oooold/broker/sqs/consume_test.go deleted file mode 100644 index 434fc6ea..00000000 --- a/plugins/jobs/oooold/broker/sqs/consume_test.go +++ /dev/null @@ -1,370 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_JobUseExistedPipeline(t *testing.T) { - pipe := &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "test", - } - - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_PushTooBigDelay(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{ - Delay: 901, - }, - }) - - assert.Error(t, perr) -} - -func TestBroker_Consume_PushTooBigDelay2(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{ - RetryDelay: 901, - }, - }) - - assert.Error(t, perr) -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - start := time.Now() - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed > time.Second) - assert.True(t, elapsed < 2*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled - b.Stop() -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/sqs/durability_test.go b/plugins/jobs/oooold/broker/sqs/durability_test.go deleted file mode 100644 index 58ddf4b9..00000000 --- a/plugins/jobs/oooold/broker/sqs/durability_test.go +++ /dev/null @@ -1,588 +0,0 @@ -package sqs - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "io" - "net" - "sync" - "testing" - "time" -) - -var ( - proxyCfg = &Config{ - Key: "api-key", - Secret: "api-secret", - Region: "us-west-1", - Endpoint: "http://localhost:9325", - Timeout: 1, - } - - proxy = &tcpProxy{ - listen: "localhost:9325", - upstream: "localhost:9324", - accept: true, - } - - proxyPipe = &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "test", - "lockReserved": 1, - "declare": map[string]interface{}{ - "MessageRetentionPeriod": 86400, - }, - } -) - -type tcpProxy struct { - listen string - upstream string - mu sync.Mutex - accept bool - conn []net.Conn -} - -func (p *tcpProxy) serve() { - l, err := net.Listen("tcp", p.listen) - if err != nil { - panic(err) - } - - for { - in, err := l.Accept() - if err != nil { - panic(err) - } - - if !p.accepting() { - in.Close() - } - - up, err := net.Dial("tcp", p.upstream) - if err != nil { - panic(err) - } - - go io.Copy(in, up) - go io.Copy(up, in) - - p.mu.Lock() - p.conn = append(p.conn, in, up) - p.mu.Unlock() - } -} - -// wait for specific number of connections -func (p *tcpProxy) waitConn(count int) *tcpProxy { - p.mu.Lock() - p.accept = true - p.mu.Unlock() - - for { - p.mu.Lock() - current := len(p.conn) - p.mu.Unlock() - - if current >= count*2 { - break - } - - time.Sleep(time.Millisecond) - } - - return p -} - -func (p *tcpProxy) reset(accept bool) int { - p.mu.Lock() - p.accept = accept - defer p.mu.Unlock() - - count := 0 - for _, conn := range p.conn { - conn.Close() - count++ - } - - p.conn = nil - return count / 2 -} - -func (p *tcpProxy) accepting() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.accept -} - -func init() { - go proxy.serve() -} - -func TestBroker_Durability_Base(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - // expect 2 connections - proxy.waitConn(1) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Durability_Consume(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(1) - - jid, perr = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - time.Sleep(3 * time.Second) - proxy.waitConn(1) - - jid, perr = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(proxyPipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - proxy.reset(false) - - _, serr = b.Stat(proxyPipe) - assert.Error(t, serr) - - proxy.reset(true) - - _, serr = b.Stat(proxyPipe) - assert.NoError(t, serr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume3(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(proxyPipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume4(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1) - - _, err = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "kill", - Options: &jobs.Options{Timeout: 2}, - }) - if err != nil { - t.Fatal(err) - } - _, err = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - if err != nil { - t.Fatal(err) - } - - st, serr := b.Stat(proxyPipe) - assert.NoError(t, serr) - assert.Equal(t, int64(3), st.Queue+st.Active) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - if j.Payload == "kill" { - proxy.reset(true) - } - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 3 { - break - } - } -} - -func TestBroker_Durability_StopDead(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - proxy.waitConn(1).reset(false) - - b.Stop() -} diff --git a/plugins/jobs/oooold/broker/sqs/job.go b/plugins/jobs/oooold/broker/sqs/job.go deleted file mode 100644 index 50e2c164..00000000 --- a/plugins/jobs/oooold/broker/sqs/job.go +++ /dev/null @@ -1,80 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/jobs/v2" - "strconv" - "time" -) - -var jobAttributes = []*string{ - aws.String("rr-job"), - aws.String("rr-maxAttempts"), - aws.String("rr-delay"), - aws.String("rr-timeout"), - aws.String("rr-retryDelay"), -} - -// pack job metadata into headers -func pack(url *string, j *jobs.Job) *sqs.SendMessageInput { - return &sqs.SendMessageInput{ - QueueUrl: url, - DelaySeconds: aws.Int64(int64(j.Options.Delay)), - MessageBody: aws.String(j.Payload), - MessageAttributes: map[string]*sqs.MessageAttributeValue{ - "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)}, - "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)}, - "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())}, - "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())}, - "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())}, - }, - } -} - -// unpack restores jobs.Options -func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) { - if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok { - return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount") - } - attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"]) - - for _, attr := range jobAttributes { - if _, ok := msg.MessageAttributes[*attr]; !ok { - return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr) - } - } - - j = &jobs.Job{ - Job: *msg.MessageAttributes["rr-job"].StringValue, - Payload: *msg.Body, - Options: &jobs.Options{}, - } - - if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil { - j.Options.Delay = delay - } - - if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil { - j.Options.Attempts = maxAttempts - } - - if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil { - j.Options.Timeout = timeout - } - - if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil { - j.Options.RetryDelay = retryDelay - } - - return *msg.MessageId, attempt - 1, j, nil -} - -func awsString(n int) *string { - return aws.String(strconv.Itoa(n)) -} - -func awsDuration(d time.Duration) *string { - return aws.String(strconv.Itoa(int(d.Seconds()))) -} diff --git a/plugins/jobs/oooold/broker/sqs/job_test.go b/plugins/jobs/oooold/broker/sqs/job_test.go deleted file mode 100644 index a120af53..00000000 --- a/plugins/jobs/oooold/broker/sqs/job_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package sqs - -import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_Unpack(t *testing.T) { - msg := &sqs.Message{ - Body: aws.String("body"), - Attributes: map[string]*string{}, - MessageAttributes: map[string]*sqs.MessageAttributeValue{}, - } - - _, _, _, err := unpack(msg) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/sqs/queue.go b/plugins/jobs/oooold/broker/sqs/queue.go deleted file mode 100644 index 8a92448e..00000000 --- a/plugins/jobs/oooold/broker/sqs/queue.go +++ /dev/null @@ -1,266 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/jobs/v2" - "strconv" - "sync" - "sync/atomic" - "time" -) - -type queue struct { - active int32 - pipe *jobs.Pipeline - url *string - reserve time.Duration - lockReserved time.Duration - - // queue events - lsn func(event int, ctx interface{}) - - // stop channel - wait chan interface{} - - // active operations - muw sync.RWMutex - wg sync.WaitGroup - - // exec handlers - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { - if pipe.String("queue", "") == "" { - return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name()) - } - - return &queue{ - pipe: pipe, - reserve: pipe.Duration("reserve", time.Second), - lockReserved: pipe.Duration("lockReserved", 300*time.Second), - lsn: lsn, - }, nil -} - -// declareQueue declared queue -func (q *queue) declareQueue(s *sqs.SQS) (*string, error) { - attr := make(map[string]*string) - for k, v := range q.pipe.Map("declare") { - if vs, ok := v.(string); ok { - attr[k] = aws.String(vs) - } - - if vi, ok := v.(int); ok { - attr[k] = aws.String(strconv.Itoa(vi)) - } - - if vb, ok := v.(bool); ok { - if vb { - attr[k] = aws.String("true") - } else { - attr[k] = aws.String("false") - } - } - } - - if len(attr) != 0 { - r, err := s.CreateQueue(&sqs.CreateQueueInput{ - QueueName: aws.String(q.pipe.String("queue", "")), - Attributes: attr, - }) - - return r.QueueUrl, err - } - - // no need to create (get existed) - r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))}) - if err != nil { - return nil, err - } - - return r.QueueUrl, nil -} - -// serve consumers -func (q *queue) serve(s *sqs.SQS, tout time.Duration) { - q.wait = make(chan interface{}) - atomic.StoreInt32(&q.active, 1) - - var errored bool - for { - messages, stop, err := q.consume(s) - if err != nil { - if errored { - // reoccurring error - time.Sleep(tout) - } else { - errored = true - q.report(err) - } - - continue - } - errored = false - - if stop { - return - } - - for _, msg := range messages { - h := <-q.execPool - go func(h jobs.Handler, msg *sqs.Message) { - err := q.do(s, h, msg) - q.execPool <- h - q.wg.Done() - q.report(err) - }(h, msg) - } - } -} - -// consume and allocate connection. -func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) { - q.muw.Lock() - defer q.muw.Unlock() - - select { - case <-q.wait: - return nil, true, nil - default: - r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{ - QueueUrl: q.url, - MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))), - WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())), - VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())), - AttributeNames: []*string{aws.String("ApproximateReceiveCount")}, - MessageAttributeNames: jobAttributes, - }) - if err != nil { - return nil, false, err - } - - q.wg.Add(len(r.Messages)) - - return r.Messages, false, nil - } -} - -// do single message -func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) { - id, attempt, j, err := unpack(msg) - if err != nil { - go q.deleteMessage(s, msg, err) - return err - } - - // block the job based on known timeout - _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - QueueUrl: q.url, - ReceiptHandle: msg.ReceiptHandle, - VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())), - }) - if err != nil { - go q.deleteMessage(s, msg, err) - return err - } - - err = h(id, j) - if err == nil { - return q.deleteMessage(s, msg, nil) - } - - q.errHandler(id, j, err) - - if !j.Options.CanRetry(attempt) { - return q.deleteMessage(s, msg, err) - } - - // retry after specified duration - _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - QueueUrl: q.url, - ReceiptHandle: msg.ReceiptHandle, - VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)), - }) - - return err -} - -func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error { - _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle}) - return drr -} - -// stop the queue consuming -func (q *queue) stop() { - if atomic.LoadInt32(&q.active) == 0 { - return - } - - atomic.StoreInt32(&q.active, 0) - - close(q.wait) - q.muw.Lock() - q.wg.Wait() - q.muw.Unlock() -} - -// add job to the queue -func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) { - r, err := s.SendMessage(pack(q.url, j)) - if err != nil { - return "", err - } - - return *r.MessageId, nil -} - -// return queue stats -func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) { - r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{ - QueueUrl: q.url, - AttributeNames: []*string{ - aws.String("ApproximateNumberOfMessages"), - aws.String("ApproximateNumberOfMessagesDelayed"), - aws.String("ApproximateNumberOfMessagesNotVisible"), - }, - }) - - if err != nil { - return nil, err - } - - stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")} - - for a, v := range r.Attributes { - if a == "ApproximateNumberOfMessages" { - if v, err := strconv.Atoi(*v); err == nil { - stat.Queue = int64(v) - } - } - - if a == "ApproximateNumberOfMessagesNotVisible" { - if v, err := strconv.Atoi(*v); err == nil { - stat.Active = int64(v) - } - } - - if a == "ApproximateNumberOfMessagesDelayed" { - if v, err := strconv.Atoi(*v); err == nil { - stat.Delayed = int64(v) - } - } - } - - return stat, nil -} - -// throw handles service, server and pool events. -func (q *queue) report(err error) { - if err != nil { - q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) - } -} diff --git a/plugins/jobs/oooold/broker/sqs/stat_test.go b/plugins/jobs/oooold/broker/sqs/stat_test.go deleted file mode 100644 index 5031571b..00000000 --- a/plugins/jobs/oooold/broker/sqs/stat_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package sqs - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - // unable to use approximated stats - _, err = b.Stat(pipe) - assert.NoError(t, err) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - _, err := b.Stat(pipe) - assert.NoError(t, err) - - close(waitJob) - return nil - } - - <-waitJob - _, err = b.Stat(pipe) - assert.NoError(t, err) -} diff --git a/plugins/jobs/oooold/broker_test.go b/plugins/jobs/oooold/broker_test.go deleted file mode 100644 index b93eac51..00000000 --- a/plugins/jobs/oooold/broker_test.go +++ /dev/null @@ -1,314 +0,0 @@ -package oooold - -import ( - "fmt" - "github.com/gofrs/uuid" - "sync" - "sync/atomic" - "time" -) - -// testBroker run testQueue using local goroutines. -type testBroker struct { - lsn func(event int, ctx interface{}) - mu sync.Mutex - wait chan error - stopped chan interface{} - queues map[*Pipeline]*testQueue -} - -// Listen attaches server event watcher. -func (b *testBroker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures broker. -func (b *testBroker) Init() (bool, error) { - b.queues = make(map[*Pipeline]*testQueue) - - return true, nil -} - -// Register broker pipeline. -func (b *testBroker) Register(pipe *Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.queues[pipe]; ok { - return fmt.Errorf("testQueue `%s` has already been registered", pipe.Name()) - } - - b.queues[pipe] = newQueue() - - return nil -} - -// Serve broker pipelines. -func (b *testBroker) Serve() error { - // start pipelines - b.mu.Lock() - for _, q := range b.queues { - qq := q - if qq.execPool != nil { - go qq.serve() - } - } - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *testBroker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - // stop all pipelines - for _, q := range b.queues { - q.stop() - } - - close(b.wait) - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before -// the service is started! -func (b *testBroker) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return fmt.Errorf("undefined testQueue `%s`", pipe.Name()) - } - - q.stop() - - q.execPool = execPool - q.errHandler = errHandler - - if b.wait != nil { - if q.execPool != nil { - go q.serve() - } - } - - return nil -} - -// Push job into the worker. -func (b *testBroker) Push(pipe *Pipeline, j *Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - q := b.queue(pipe) - if q == nil { - return "", fmt.Errorf("undefined testQueue `%s`", pipe.Name()) - } - - id, err := uuid.NewV4() - if err != nil { - return "", err - } - - q.push(id.String(), j, 0, j.Options.DelayDuration()) - - return id.String(), nil -} - -// Stat must consume statistics about given pipeline or return error. -func (b *testBroker) Stat(pipe *Pipeline) (stat *Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - q := b.queue(pipe) - if q == nil { - return nil, fmt.Errorf("undefined testQueue `%s`", pipe.Name()) - } - - return q.stat(), nil -} - -// check if broker is serving -func (b *testBroker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// testQueue returns testQueue associated with the pipeline. -func (b *testBroker) queue(pipe *Pipeline) *testQueue { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return nil - } - - return q -} - -// throw handles service, server and pool events. -func (b *testBroker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} - -type testQueue struct { - active int32 - st *Stat - - // job pipeline - jobs chan *entry - - // pipelines operations - muw sync.Mutex - wg sync.WaitGroup - - // stop channel - wait chan interface{} - - // exec handlers - execPool chan Handler - errHandler ErrorHandler -} - -type entry struct { - id string - job *Job - attempt int -} - -// create new testQueue -func newQueue() *testQueue { - return &testQueue{st: &Stat{}, jobs: make(chan *entry)} -} - -// todo NOT USED -// associate testQueue with new do pool -//func (q *testQueue) configure(execPool chan Handler, err ErrorHandler) error { -// q.execPool = execPool -// q.errHandler = err -// -// return nil -//} - -// serve consumers -func (q *testQueue) serve() { - q.wait = make(chan interface{}) - atomic.StoreInt32(&q.active, 1) - - for { - e := q.consume() - if e == nil { - return - } - - atomic.AddInt64(&q.st.Active, 1) - h := <-q.execPool - go func(e *entry) { - q.do(h, e) - atomic.AddInt64(&q.st.Active, ^int64(0)) - q.execPool <- h - q.wg.Done() - }(e) - } -} - -// allocate one job entry -func (q *testQueue) consume() *entry { - q.muw.Lock() - defer q.muw.Unlock() - - select { - case <-q.wait: - return nil - case e := <-q.jobs: - q.wg.Add(1) - - return e - } -} - -// do singe job -func (q *testQueue) do(h Handler, e *entry) { - err := h(e.id, e.job) - - if err == nil { - atomic.AddInt64(&q.st.Queue, ^int64(0)) - return - } - - q.errHandler(e.id, e.job, err) - - if !e.job.Options.CanRetry(e.attempt) { - atomic.AddInt64(&q.st.Queue, ^int64(0)) - return - } - - q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration()) -} - -// stop the testQueue pipelines -func (q *testQueue) stop() { - if atomic.LoadInt32(&q.active) == 0 { - return - } - - atomic.StoreInt32(&q.active, 0) - - close(q.wait) - q.muw.Lock() - q.wg.Wait() - q.muw.Unlock() -} - -// add job to the testQueue -func (q *testQueue) push(id string, j *Job, attempt int, delay time.Duration) { - if delay == 0 { - atomic.AddInt64(&q.st.Queue, 1) - go func() { - q.jobs <- &entry{id: id, job: j, attempt: attempt} - }() - - return - } - - atomic.AddInt64(&q.st.Delayed, 1) - go func() { - time.Sleep(delay) - atomic.AddInt64(&q.st.Delayed, ^int64(0)) - atomic.AddInt64(&q.st.Queue, 1) - - q.jobs <- &entry{id: id, job: j, attempt: attempt} - }() -} - -func (q *testQueue) stat() *Stat { - return &Stat{ - InternalName: ":memory:", - Queue: atomic.LoadInt64(&q.st.Queue), - Active: atomic.LoadInt64(&q.st.Active), - Delayed: atomic.LoadInt64(&q.st.Delayed), - } -} diff --git a/plugins/jobs/oooold/config.go b/plugins/jobs/oooold/config.go deleted file mode 100644 index cf40b6fb..00000000 --- a/plugins/jobs/oooold/config.go +++ /dev/null @@ -1,91 +0,0 @@ -package oooold - -import ( - "fmt" - "github.com/spiral/roadrunner" - "github.com/spiral/roadrunner/service" -) - -// Config defines settings for job broker, workers and job-pipeline mapping. -type Config struct { - // Workers configures roadrunner server and worker busy. - Workers *roadrunner.ServerConfig - - // Dispatch defines where and how to match jobs. - Dispatch map[string]*Options - - // Pipelines defines mapping between PHP job pipeline and associated job broker. - Pipelines map[string]*Pipeline - - // Consuming specifies names of pipelines to be consumed on service start. - Consume []string - - // parent config for broken options. - parent service.Config - pipelines Pipelines - route Dispatcher -} - -// Hydrate populates config values. -func (c *Config) Hydrate(cfg service.Config) (err error) { - c.Workers = &roadrunner.ServerConfig{} - c.Workers.InitDefaults() - - if err := cfg.Unmarshal(&c); err != nil { - return err - } - - c.pipelines, err = initPipelines(c.Pipelines) - if err != nil { - return err - } - - if c.Workers.Command != "" { - if err := c.Workers.Pool.Valid(); err != nil { - return c.Workers.Pool.Valid() - } - } - - c.parent = cfg - c.route = initDispatcher(c.Dispatch) - - return nil -} - -// MatchPipeline locates the pipeline associated with the job. -func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error) { - opt := c.route.match(job) - - pipe := "" - if job.Options != nil { - pipe = job.Options.Pipeline - } - - if pipe == "" && opt != nil { - pipe = opt.Pipeline - } - - if pipe == "" { - return nil, nil, fmt.Errorf("unable to locate pipeline for `%s`", job.Job) - } - - if p := c.pipelines.Get(pipe); p != nil { - return p, opt, nil - } - - return nil, nil, fmt.Errorf("undefined pipeline `%s`", pipe) -} - -// Get underlying broker config. -func (c *Config) Get(service string) service.Config { - if c.parent == nil { - return nil - } - - return c.parent.Get(service) -} - -// Unmarshal is doing nothing. -func (c *Config) Unmarshal(out interface{}) error { - return nil -} diff --git a/plugins/jobs/oooold/config_test.go b/plugins/jobs/oooold/config_test.go deleted file mode 100644 index 5f14eb32..00000000 --- a/plugins/jobs/oooold/config_test.go +++ /dev/null @@ -1,158 +0,0 @@ -package oooold - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { - if name == "same" || name == "jobs" { - return cfg - } - - return nil -} -func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func Test_Config_Hydrate_Error(t *testing.T) { - cfg := &mockCfg{cfg: `{"dead`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_OK(t *testing.T) { - cfg := &mockCfg{cfg: `{ - "workers":{"pool":{"numWorkers": 1}} -}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Unmarshal(t *testing.T) { - cfg := &mockCfg{cfg: `{ - "workers":{"pool":{"numWorkers": 1}} -}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - var i interface{} - assert.Nil(t, c.Unmarshal(i)) -} - -func Test_Config_Hydrate_Get(t *testing.T) { - cfg := &mockCfg{cfg: `{ - "workers":{"pool":{"numWorkers": 1}} -}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - assert.Nil(t, c.Get("nil")) -} - -func Test_Config_Hydrate_Get_Valid(t *testing.T) { - cfg := &mockCfg{cfg: `{ - "workers":{"pool":{"numWorkers": 1}} -}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - assert.Equal(t, cfg, c.Get("same")) -} - -func Test_Config_Hydrate_GetNoParent(t *testing.T) { - c := &Config{} - assert.Nil(t, c.Get("nil")) -} - -func Test_Pipelines(t *testing.T) { - cfg := &mockCfg{cfg: `{ - "workers":{ - "pool":{"numWorkers": 1} - }, - "pipelines":{ - "pipe": {"broker":"broker"} - }, - "dispatch":{ - "job.*": {"pipeline":"default"} - } - }`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - assert.Equal(t, "pipe", c.pipelines.Get("pipe").Name()) - assert.Equal(t, "broker", c.pipelines.Get("pipe").Broker()) -} - -func Test_Pipelines_NoBroker(t *testing.T) { - cfg := &mockCfg{cfg: `{ - "workers":{ - "pool":{"numWorkers": 1} - }, - "pipelines":{ - "pipe": {} - }, - "dispatch":{ - "job.*": {"pipeline":"default"} - } - }`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_MatchPipeline(t *testing.T) { - cfg := &mockCfg{cfg: `{ - "workers":{ - "pool":{"numWorkers": 1} - }, - "pipelines":{ - "pipe": {"broker":"default"} - }, - "dispatch":{ - "job.*": {"pipeline":"pipe","delay":10} - } - }`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - _, _, err := c.MatchPipeline(&Job{Job: "undefined", Options: &Options{}}) - assert.Error(t, err) - - p, _, _ := c.MatchPipeline(&Job{Job: "undefined", Options: &Options{Pipeline: "pipe"}}) - assert.Equal(t, "pipe", p.Name()) - - p, opt, _ := c.MatchPipeline(&Job{Job: "job.abc", Options: &Options{}}) - assert.Equal(t, "pipe", p.Name()) - assert.Equal(t, 10, opt.Delay) -} - -func Test_MatchPipeline_Error(t *testing.T) { - cfg := &mockCfg{cfg: `{ - "workers":{ - "pool":{"numWorkers": 1} - }, - "pipelines":{ - "pipe": {"broker":"default"} - }, - "dispatch":{ - "job.*": {"pipeline":"missing"} - } - }`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - _, _, err := c.MatchPipeline(&Job{Job: "job.abc", Options: &Options{}}) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/dispatcher.go b/plugins/jobs/oooold/dispatcher.go deleted file mode 100644 index 801e1973..00000000 --- a/plugins/jobs/oooold/dispatcher.go +++ /dev/null @@ -1,47 +0,0 @@ -package oooold - -import ( - "strings" -) - -var separators = []string{"/", "-", "\\"} - -// Dispatcher provides ability to automatically locate the pipeline for the specific job -// and update job options (if none set). -type Dispatcher map[string]*Options - -// pre-compile patterns -func initDispatcher(routes map[string]*Options) Dispatcher { - dispatcher := make(Dispatcher) - for pattern, opts := range routes { - pattern = strings.ToLower(pattern) - pattern = strings.Trim(pattern, "-.*") - - for _, s := range separators { - pattern = strings.Replace(pattern, s, ".", -1) - } - - dispatcher[pattern] = opts - } - - return dispatcher -} - -// match clarifies target job pipeline and other job options. Can return nil. -func (dispatcher Dispatcher) match(job *Job) (found *Options) { - var best = 0 - - jobName := strings.ToLower(job.Job) - for pattern, opts := range dispatcher { - if strings.HasPrefix(jobName, pattern) && len(pattern) > best { - found = opts - best = len(pattern) - } - } - - if best == 0 { - return nil - } - - return found -} diff --git a/plugins/jobs/oooold/dispatcher_test.go b/plugins/jobs/oooold/dispatcher_test.go deleted file mode 100644 index 92f8c956..00000000 --- a/plugins/jobs/oooold/dispatcher_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package oooold - -import ( - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_Map_All(t *testing.T) { - m := initDispatcher(map[string]*Options{"default": {Pipeline: "default"}}) - assert.Equal(t, "default", m.match(&Job{Job: "default"}).Pipeline) -} - -func Test_Map_Miss(t *testing.T) { - m := initDispatcher(map[string]*Options{"some.*": {Pipeline: "default"}}) - - assert.Nil(t, m.match(&Job{Job: "miss"})) -} - -func Test_Map_Best(t *testing.T) { - m := initDispatcher(map[string]*Options{ - "some.*": {Pipeline: "default"}, - "some.other.*": {Pipeline: "other"}, - }) - - assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) - assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) - assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline) - assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline) -} - -func Test_Map_BestUpper(t *testing.T) { - m := initDispatcher(map[string]*Options{ - "some.*": {Pipeline: "default"}, - "some.Other.*": {Pipeline: "other"}, - }) - - assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) - assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) - assert.Equal(t, "other", m.match(&Job{Job: "some.OTHER"}).Pipeline) - assert.Equal(t, "other", m.match(&Job{Job: "Some.other.job"}).Pipeline) -} - -func Test_Map_BestReversed(t *testing.T) { - m := initDispatcher(map[string]*Options{ - "some.*": {Pipeline: "default"}, - "some.other.*": {Pipeline: "other"}, - }) - - assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline) - assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline) - assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) - assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) -} diff --git a/plugins/jobs/oooold/event.go b/plugins/jobs/oooold/event.go deleted file mode 100644 index 6524712f..00000000 --- a/plugins/jobs/oooold/event.go +++ /dev/null @@ -1,96 +0,0 @@ -package oooold - -import "time" - -const ( - // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK = iota + 1500 - - // EventPushError caused when job can not be registered. - EventPushError - - // EventJobStart thrown when new job received. - EventJobStart - - // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. - EventJobOK - - // EventJobError thrown on all job related errors. See JobError as context. - EventJobError - - // EventPipeConsume when pipeline pipelines has been requested. - EventPipeConsume - - // EventPipeActive when pipeline has started. - EventPipeActive - - // EventPipeStop when pipeline has begun stopping. - EventPipeStop - - // EventPipeStopped when pipeline has been stopped. - EventPipeStopped - - // EventPipeError when pipeline specific error happen. - EventPipeError - - // EventBrokerReady thrown when broken is ready to accept/serve tasks. - EventBrokerReady -) - -// JobEvent represent job event. -type JobEvent struct { - // String is job id. - ID string - - // Job is failed job. - Job *Job - - // event timings - start time.Time - elapsed time.Duration -} - -// Elapsed returns duration of the invocation. -func (e *JobEvent) Elapsed() time.Duration { - return e.elapsed -} - -// JobError represents singular Job error event. -type JobError struct { - // String is job id. - ID string - - // Job is failed job. - Job *Job - - // Caused contains job specific error. - Caused error - - // event timings - start time.Time - elapsed time.Duration -} - -// Elapsed returns duration of the invocation. -func (e *JobError) Elapsed() time.Duration { - return e.elapsed -} - -// Caused returns error message. -func (e *JobError) Error() string { - return e.Caused.Error() -} - -// PipelineError defines pipeline specific errors. -type PipelineError struct { - // Pipeline is associated pipeline. - Pipeline *Pipeline - - // Caused send by broker. - Caused error -} - -// Error returns error message. -func (e *PipelineError) Error() string { - return e.Caused.Error() -} diff --git a/plugins/jobs/oooold/event_test.go b/plugins/jobs/oooold/event_test.go deleted file mode 100644 index 82241124..00000000 --- a/plugins/jobs/oooold/event_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package oooold - -import ( - "errors" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestJobEvent_Elapsed(t *testing.T) { - e := &JobEvent{ - ID: "id", - Job: &Job{}, - start: time.Now(), - elapsed: time.Millisecond, - } - - assert.Equal(t, time.Millisecond, e.Elapsed()) -} - -func TestJobError_Elapsed(t *testing.T) { - e := &JobError{ - ID: "id", - Job: &Job{}, - start: time.Now(), - elapsed: time.Millisecond, - } - - assert.Equal(t, time.Millisecond, e.Elapsed()) -} - -func TestJobError_Error(t *testing.T) { - e := &JobError{ - ID: "id", - Job: &Job{}, - start: time.Now(), - elapsed: time.Millisecond, - Caused: errors.New("error"), - } - - assert.Equal(t, time.Millisecond, e.Elapsed()) - assert.Equal(t, "error", e.Error()) -} - -func TestPipelineError_Error(t *testing.T) { - e := &PipelineError{ - Pipeline: &Pipeline{}, - Caused: errors.New("error"), - } - - assert.Equal(t, "error", e.Error()) -} diff --git a/plugins/jobs/oooold/job.go b/plugins/jobs/oooold/job.go deleted file mode 100644 index 2f80c1cc..00000000 --- a/plugins/jobs/oooold/job.go +++ /dev/null @@ -1,38 +0,0 @@ -package oooold - -import json "github.com/json-iterator/go" - -// Handler handles job execution. -type Handler func(id string, j *Job) error - -// ErrorHandler handles job execution errors. -type ErrorHandler func(id string, j *Job, err error) - -// Job carries information about single job. -type Job struct { - // Job contains name of job broker (usually PHP class). - Job string `json:"job"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Body packs job payload into binary payload. -func (j *Job) Body() []byte { - return []byte(j.Payload) -} - -// Context packs job context (job, id) into binary payload. -func (j *Job) Context(id string) []byte { - ctx, _ := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - }{ID: id, Job: j.Job}, - ) - - return ctx -} diff --git a/plugins/jobs/oooold/job_options.go b/plugins/jobs/oooold/job_options.go deleted file mode 100644 index 206bbfc4..00000000 --- a/plugins/jobs/oooold/job_options.go +++ /dev/null @@ -1,70 +0,0 @@ -package oooold - -import "time" - -// Options carry information about how to handle given job. -type Options struct { - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int `json:"delay,omitempty"` - - // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). - // Minimum valuable value is 2. - Attempts int `json:"maxAttempts,omitempty"` - - // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. - RetryDelay int `json:"retryDelay,omitempty"` - - // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. - Timeout int `json:"timeout,omitempty"` -} - -// Merge merges job options. -func (o *Options) Merge(from *Options) { - if o.Pipeline == "" { - o.Pipeline = from.Pipeline - } - - if o.Attempts == 0 { - o.Attempts = from.Attempts - } - - if o.Timeout == 0 { - o.Timeout = from.Timeout - } - - if o.RetryDelay == 0 { - o.RetryDelay = from.RetryDelay - } - - if o.Delay == 0 { - o.Delay = from.Delay - } -} - -// CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt int) bool { - // Attempts 1 and 0 has identical effect - return o.Attempts > (attempt + 1) -} - -// RetryDuration returns retry delay duration in a form of time.Duration. -func (o *Options) RetryDuration() time.Duration { - return time.Second * time.Duration(o.RetryDelay) -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} - -// TimeoutDuration returns timeout duration in a form of time.Duration. -func (o *Options) TimeoutDuration() time.Duration { - if o.Timeout == 0 { - return 30 * time.Minute - } - - return time.Second * time.Duration(o.Timeout) -} diff --git a/plugins/jobs/oooold/job_options_test.go b/plugins/jobs/oooold/job_options_test.go deleted file mode 100644 index 4575d959..00000000 --- a/plugins/jobs/oooold/job_options_test.go +++ /dev/null @@ -1,109 +0,0 @@ -package oooold - -import ( - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestOptions_CanRetry(t *testing.T) { - opts := &Options{Attempts: 0} - - assert.False(t, opts.CanRetry(0)) - assert.False(t, opts.CanRetry(1)) -} - -func TestOptions_CanRetry_SameValue(t *testing.T) { - opts := &Options{Attempts: 1} - - assert.False(t, opts.CanRetry(0)) - assert.False(t, opts.CanRetry(1)) -} - -func TestOptions_CanRetry_Value(t *testing.T) { - opts := &Options{Attempts: 2} - - assert.True(t, opts.CanRetry(0)) - assert.False(t, opts.CanRetry(1)) - assert.False(t, opts.CanRetry(2)) -} - -func TestOptions_CanRetry_Value3(t *testing.T) { - opts := &Options{Attempts: 3} - - assert.True(t, opts.CanRetry(0)) - assert.True(t, opts.CanRetry(1)) - assert.False(t, opts.CanRetry(2)) -} - -func TestOptions_RetryDuration(t *testing.T) { - opts := &Options{RetryDelay: 0} - assert.Equal(t, time.Duration(0), opts.RetryDuration()) -} - -func TestOptions_RetryDuration2(t *testing.T) { - opts := &Options{RetryDelay: 1} - assert.Equal(t, time.Second, opts.RetryDuration()) -} - -func TestOptions_DelayDuration(t *testing.T) { - opts := &Options{Delay: 0} - assert.Equal(t, time.Duration(0), opts.DelayDuration()) -} - -func TestOptions_DelayDuration2(t *testing.T) { - opts := &Options{Delay: 1} - assert.Equal(t, time.Second, opts.DelayDuration()) -} - -func TestOptions_TimeoutDuration(t *testing.T) { - opts := &Options{Timeout: 0} - assert.Equal(t, time.Minute*30, opts.TimeoutDuration()) -} - -func TestOptions_TimeoutDuration2(t *testing.T) { - opts := &Options{Timeout: 1} - assert.Equal(t, time.Second, opts.TimeoutDuration()) -} - -func TestOptions_Merge(t *testing.T) { - opts := &Options{} - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - Timeout: 1, - Attempts: 1, - RetryDelay: 1, - }) - - assert.Equal(t, "pipeline", opts.Pipeline) - assert.Equal(t, 1, opts.Attempts) - assert.Equal(t, 2, opts.Delay) - assert.Equal(t, 1, opts.Timeout) - assert.Equal(t, 1, opts.RetryDelay) -} - -func TestOptions_MergeKeepOriginal(t *testing.T) { - opts := &Options{ - Pipeline: "default", - Delay: 10, - Timeout: 10, - Attempts: 10, - RetryDelay: 10, - } - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - Timeout: 1, - Attempts: 1, - RetryDelay: 1, - }) - - assert.Equal(t, "default", opts.Pipeline) - assert.Equal(t, 10, opts.Attempts) - assert.Equal(t, 10, opts.Delay) - assert.Equal(t, 10, opts.Timeout) - assert.Equal(t, 10, opts.RetryDelay) -} diff --git a/plugins/jobs/oooold/job_test.go b/plugins/jobs/oooold/job_test.go deleted file mode 100644 index 5db924eb..00000000 --- a/plugins/jobs/oooold/job_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package oooold - -import ( - "github.com/stretchr/testify/assert" - "testing" -) - -func TestJob_Body(t *testing.T) { - j := &Job{Payload: "hello"} - - assert.Equal(t, []byte("hello"), j.Body()) -} - -func TestJob_Context(t *testing.T) { - j := &Job{Job: "job"} - - assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id")) -} diff --git a/plugins/jobs/oooold/pipeline.go b/plugins/jobs/oooold/pipeline.go deleted file mode 100644 index c533f23d..00000000 --- a/plugins/jobs/oooold/pipeline.go +++ /dev/null @@ -1,169 +0,0 @@ -package oooold - -import ( - "fmt" - "time" -) - -// Pipelines is list of Pipeline. -type Pipelines []*Pipeline - -func initPipelines(pipes map[string]*Pipeline) (Pipelines, error) { - out := make(Pipelines, 0) - - for name, pipe := range pipes { - if pipe.Broker() == "" { - return nil, fmt.Errorf("found the pipeline without defined broker") - } - - p := pipe.With("name", name) - out = append(out, &p) - } - - return out, nil -} - -// Reverse returns pipelines in reversed order. -func (ps Pipelines) Reverse() Pipelines { - out := make(Pipelines, len(ps)) - - for i, p := range ps { - out[len(ps)-i-1] = p - } - - return out -} - -// Broker return pipelines associated with specific broker. -func (ps Pipelines) Broker(broker string) Pipelines { - out := make(Pipelines, 0) - - for _, p := range ps { - if p.Broker() != broker { - continue - } - - out = append(out, p) - } - - return out -} - -// Names returns only pipelines with specified names. -func (ps Pipelines) Names(only ...string) Pipelines { - out := make(Pipelines, 0) - - for _, name := range only { - for _, p := range ps { - if p.Name() == name { - out = append(out, p) - } - } - } - - return out -} - -// Get returns pipeline by it'svc name. -func (ps Pipelines) Get(name string) *Pipeline { - // possibly optimize - for _, p := range ps { - if p.Name() == name { - return p - } - } - - return nil -} - -// Pipeline defines pipeline options. -type Pipeline map[string]interface{} - -// With pipeline value. Immutable. -func (p Pipeline) With(name string, value interface{}) Pipeline { - out := make(map[string]interface{}) - for k, v := range p { - out[k] = v - } - out[name] = value - - return out -} - -// Name returns pipeline name. -func (p Pipeline) Name() string { - return p.String("name", "") -} - -// Broker associated with the pipeline. -func (p Pipeline) Broker() string { - return p.String("broker", "") -} - -// Has checks if value presented in pipeline. -func (p Pipeline) Has(name string) bool { - if _, ok := p[name]; ok { - return true - } - - return false -} - -// Map must return nested map value or empty config. -func (p Pipeline) Map(name string) Pipeline { - out := make(map[string]interface{}) - - if value, ok := p[name]; ok { - if m, ok := value.(map[string]interface{}); ok { - for k, v := range m { - out[k] = v - } - } - } - - return out -} - -// Bool must return option value as string or return default value. -func (p Pipeline) Bool(name string, d bool) bool { - if value, ok := p[name]; ok { - if b, ok := value.(bool); ok { - return b - } - } - - return d -} - -// String must return option value as string or return default value. -func (p Pipeline) String(name string, d string) string { - if value, ok := p[name]; ok { - if str, ok := value.(string); ok { - return str - } - } - - return d -} - -// Integer must return option value as string or return default value. -func (p Pipeline) Integer(name string, d int) int { - if value, ok := p[name]; ok { - if str, ok := value.(int); ok { - return str - } - } - - return d -} - -// Duration must return option value as time.Duration (seconds) or return default value. -func (p Pipeline) Duration(name string, d time.Duration) time.Duration { - if value, ok := p[name]; ok { - if str, ok := value.(int); ok { - return time.Second * time.Duration(str) - } - } - - return d -} diff --git a/plugins/jobs/oooold/pipeline_test.go b/plugins/jobs/oooold/pipeline_test.go deleted file mode 100644 index 0ace029f..00000000 --- a/plugins/jobs/oooold/pipeline_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package oooold - -import ( - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestPipeline_Map(t *testing.T) { - pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} - - assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0)) - assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0)) -} - -func TestPipeline_MapString(t *testing.T) { - pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}} - - assert.Equal(t, "default", pipe.Map("options").String("alias", "")) - assert.Equal(t, "", pipe.Map("other").String("alias", "")) -} - -func TestPipeline_Bool(t *testing.T) { - pipe := Pipeline{"value": true} - - assert.Equal(t, true, pipe.Bool("value", false)) - assert.Equal(t, true, pipe.Bool("other", true)) -} - -func TestPipeline_String(t *testing.T) { - pipe := Pipeline{"value": "value"} - - assert.Equal(t, "value", pipe.String("value", "")) - assert.Equal(t, "value", pipe.String("other", "value")) -} - -func TestPipeline_Integer(t *testing.T) { - pipe := Pipeline{"value": 1} - - assert.Equal(t, 1, pipe.Integer("value", 0)) - assert.Equal(t, 1, pipe.Integer("other", 1)) -} - -func TestPipeline_Duration(t *testing.T) { - pipe := Pipeline{"value": 1} - - assert.Equal(t, time.Second, pipe.Duration("value", 0)) - assert.Equal(t, time.Second, pipe.Duration("other", time.Second)) -} - -func TestPipeline_Has(t *testing.T) { - pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} - - assert.Equal(t, true, pipe.Has("options")) - assert.Equal(t, false, pipe.Has("other")) -} - -func TestPipeline_FilterBroker(t *testing.T) { - pipes := Pipelines{ - &Pipeline{"name": "first", "broker": "a"}, - &Pipeline{"name": "second", "broker": "a"}, - &Pipeline{"name": "third", "broker": "b"}, - &Pipeline{"name": "forth", "broker": "b"}, - } - - filtered := pipes.Names("first", "third") - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "a", filtered[0].Broker()) - assert.Equal(t, "b", filtered[1].Broker()) - - filtered = pipes.Names("first", "third").Reverse() - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "a", filtered[1].Broker()) - assert.Equal(t, "b", filtered[0].Broker()) - - filtered = pipes.Broker("a") - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "first", filtered[0].Name()) - assert.Equal(t, "second", filtered[1].Name()) - - filtered = pipes.Broker("a").Reverse() - assert.True(t, len(filtered) == 2) - - assert.Equal(t, "first", filtered[1].Name()) - assert.Equal(t, "second", filtered[0].Name()) -} diff --git a/plugins/jobs/oooold/rpc.go b/plugins/jobs/oooold/rpc.go deleted file mode 100644 index cc61fb7d..00000000 --- a/plugins/jobs/oooold/rpc.go +++ /dev/null @@ -1,150 +0,0 @@ -package oooold - -import ( - "fmt" -) - -type rpcServer struct{ svc *Service } - -// WorkerList contains list of workers. -type WorkerList struct { - // Workers is list of workers. - Workers []*util.State `json:"workers"` -} - -// PipelineList contains list of pipeline stats. -type PipelineList struct { - // Pipelines is list of pipeline stats. - Pipelines []*Stat `json:"pipelines"` -} - -// Push job to the testQueue. -func (rpc *rpcServer) Push(j *Job, id *string) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - *id, err = rpc.svc.Push(j) - return -} - -// Push job to the testQueue. -func (rpc *rpcServer) PushAsync(j *Job, ok *bool) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - *ok = true - go rpc.svc.Push(j) - - return -} - -// Reset resets underlying RR worker pool and restarts all of it's workers. -func (rpc *rpcServer) Reset(reset bool, w *string) error { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - *w = "OK" - return rpc.svc.rr.Reset() -} - -// Destroy job pipelines for a given pipeline. -func (rpc *rpcServer) Stop(pipeline string, w *string) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - pipe := rpc.svc.cfg.pipelines.Get(pipeline) - if pipe == nil { - return fmt.Errorf("undefined pipeline `%s`", pipeline) - } - - if err := rpc.svc.Consume(pipe, nil, nil); err != nil { - return err - } - - *w = "OK" - return nil -} - -// Resume job pipelines for a given pipeline. -func (rpc *rpcServer) Resume(pipeline string, w *string) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - pipe := rpc.svc.cfg.pipelines.Get(pipeline) - if pipe == nil { - return fmt.Errorf("undefined pipeline `%s`", pipeline) - } - - if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil { - return err - } - - *w = "OK" - return nil -} - -// Destroy job pipelines for a given pipeline. -func (rpc *rpcServer) StopAll(stop bool, w *string) (err error) { - if rpc.svc == nil || rpc.svc.rr == nil { - return fmt.Errorf("jobs server is not running") - } - - for _, pipe := range rpc.svc.cfg.pipelines { - if err := rpc.svc.Consume(pipe, nil, nil); err != nil { - return err - } - } - - *w = "OK" - return nil -} - -// Resume job pipelines for a given pipeline. -func (rpc *rpcServer) ResumeAll(resume bool, w *string) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - for _, pipe := range rpc.svc.cfg.pipelines { - if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil { - return err - } - } - - *w = "OK" - return nil -} - -// Workers returns list of pipelines workers and their stats. -func (rpc *rpcServer) Workers(list bool, w *WorkerList) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - w.Workers, err = util.ServerState(rpc.svc.rr) - return err -} - -// Stat returns list of pipelines workers and their stats. -func (rpc *rpcServer) Stat(list bool, l *PipelineList) (err error) { - if rpc.svc == nil { - return fmt.Errorf("jobs server is not running") - } - - *l = PipelineList{} - for _, p := range rpc.svc.cfg.pipelines { - stat, err := rpc.svc.Stat(p) - if err != nil { - return err - } - - l.Pipelines = append(l.Pipelines, stat) - } - - return err -} diff --git a/plugins/jobs/oooold/rpc_test.go b/plugins/jobs/oooold/rpc_test.go deleted file mode 100644 index a63b9ea2..00000000 --- a/plugins/jobs/oooold/rpc_test.go +++ /dev/null @@ -1,657 +0,0 @@ -package oooold - -import ( - "github.com/sirupsen/logrus" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/rpc" - "github.com/stretchr/testify/assert" - "io/ioutil" - "syscall" - "testing" -) - -func TestRPC_StatPipeline(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5004"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - - list := &PipelineList{} - assert.NoError(t, cl.Call("jobs.Stat", true, &list)) - - assert.Len(t, list.Pipelines, 1) - - assert.Equal(t, int64(0), list.Pipelines[0].Queue) - assert.Equal(t, true, list.Pipelines[0].Consuming) -} - -func TestRPC_StatNonActivePipeline(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5004"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": [] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - - list := &PipelineList{} - assert.NoError(t, cl.Call("jobs.Stat", true, &list)) - - assert.Len(t, list.Pipelines, 1) - - assert.Equal(t, int64(0), list.Pipelines[0].Queue) - assert.Equal(t, false, list.Pipelines[0].Consuming) -} - -func TestRPC_StatPipelineWithUndefinedBroker(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5004"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"undefined"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": [] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - - list := &PipelineList{} - assert.Error(t, cl.Call("jobs.Stat", true, &list)) -} - -func TestRPC_EnableConsuming(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5004"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": [] - } -}`))) - - ready := make(chan interface{}) - pipelineReady := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - - if event == EventPipeActive { - close(pipelineReady) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - - assert.NoError(t, cl.Call("jobs.Resume", "default", nil)) - - <-pipelineReady - - list := &PipelineList{} - assert.NoError(t, cl.Call("jobs.Stat", true, &list)) - - assert.Len(t, list.Pipelines, 1) - - assert.Equal(t, int64(0), list.Pipelines[0].Queue) - assert.Equal(t, true, list.Pipelines[0].Consuming) -} - -func TestRPC_EnableConsumingUndefined(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5005"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": [] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - ok := "" - assert.Error(t, cl.Call("jobs.Resume", "undefined", &ok)) -} - -func TestRPC_EnableConsumingUndefinedBroker(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5005"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"undefined"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": [] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - ok := "" - assert.Error(t, cl.Call("jobs.Resume", "default", &ok)) -} - -func TestRPC_EnableConsumingAllUndefinedBroker(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5005"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"undefined"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": [] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - ok := "" - assert.Error(t, cl.Call("jobs.ResumeAll", true, &ok)) -} - -func TestRPC_DisableConsuming(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5004"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - pipelineReady := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - - if event == EventPipeStopped { - close(pipelineReady) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - - assert.NoError(t, cl.Call("jobs.Stop", "default", nil)) - - <-pipelineReady - - list := &PipelineList{} - assert.NoError(t, cl.Call("jobs.Stat", true, &list)) - - assert.Len(t, list.Pipelines, 1) - - assert.Equal(t, int64(0), list.Pipelines[0].Queue) - assert.Equal(t, false, list.Pipelines[0].Consuming) -} - -func TestRPC_DisableConsumingUndefined(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5004"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - - ok := "" - assert.Error(t, cl.Call("jobs.Stop", "undefined", &ok)) -} - -func TestRPC_EnableAllConsuming(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5004"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": [] - } -}`))) - - ready := make(chan interface{}) - pipelineReady := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - - if event == EventPipeActive { - close(pipelineReady) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - - assert.NoError(t, cl.Call("jobs.ResumeAll", true, nil)) - - <-pipelineReady - - list := &PipelineList{} - assert.NoError(t, cl.Call("jobs.Stat", true, &list)) - - assert.Len(t, list.Pipelines, 1) - - assert.Equal(t, int64(0), list.Pipelines[0].Queue) - assert.Equal(t, true, list.Pipelines[0].Consuming) -} - -func TestRPC_DisableAllConsuming(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5004"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - pipelineReady := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - - if event == EventPipeStopped { - close(pipelineReady) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - - assert.NoError(t, cl.Call("jobs.StopAll", true, nil)) - - <-pipelineReady - - list := &PipelineList{} - assert.NoError(t, cl.Call("jobs.Stat", true, &list)) - - assert.Len(t, list.Pipelines, 1) - - assert.Equal(t, int64(0), list.Pipelines[0].Queue) - assert.Equal(t, false, list.Pipelines[0].Consuming) -} - -func TestRPC_DoJob(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5004"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - jobReady := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - - if event == EventJobOK { - close(jobReady) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - - id := "" - assert.NoError(t, cl.Call("jobs.Push", &Job{ - Job: "spiral.jobs.tests.local.job", - Payload: `{"data":100}`, - Options: &Options{}, - }, &id)) - assert.NoError(t, err) - - <-jobReady - - data, err := ioutil.ReadFile("tests/local.job") - assert.NoError(t, err) - defer syscall.Unlink("tests/local.job") - - assert.Contains(t, string(data), id) -} - -func TestRPC_NoOperationOnDeadServer(t *testing.T) { - rc := &rpcServer{nil} - - assert.Error(t, rc.Push(&Job{}, nil)) - assert.Error(t, rc.Reset(true, nil)) - - assert.Error(t, rc.Stop("default", nil)) - assert.Error(t, rc.StopAll(true, nil)) - - assert.Error(t, rc.Resume("default", nil)) - assert.Error(t, rc.ResumeAll(true, nil)) - - assert.Error(t, rc.Workers(true, nil)) - assert.Error(t, rc.Stat(true, nil)) -} - -func TestRPC_Workers(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("rpc", &rpc.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "rpc":{"listen":"tcp://:5004"}, - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - cl, err := rs.Client() - assert.NoError(t, err) - - list := &WorkerList{} - assert.NoError(t, cl.Call("jobs.Workers", true, &list)) - - assert.Len(t, list.Workers, 1) - - pid := list.Workers[0].Pid - assert.NotEqual(t, 0, pid) - - // reset - ok := "" - assert.NoError(t, cl.Call("jobs.Reset", true, &ok)) - - list = &WorkerList{} - assert.NoError(t, cl.Call("jobs.Workers", true, &list)) - - assert.Len(t, list.Workers, 1) - - assert.NotEqual(t, list.Workers[0].Pid, pid) -} diff --git a/plugins/jobs/oooold/service.go b/plugins/jobs/oooold/service.go deleted file mode 100644 index 7cfcff31..00000000 --- a/plugins/jobs/oooold/service.go +++ /dev/null @@ -1,327 +0,0 @@ -package oooold - -import ( - "fmt" - //"github.com/sirupsen/logrus" - //"github.com/spiral/roadrunner" - //"github.com/spiral/roadrunner/service" - //"github.com/spiral/roadrunner/service/env" - //"github.com/spiral/roadrunner/service/rpc" - "sync" - "sync/atomic" - "time" -) - -// ID defines public service name. -const ID = "jobs" - -// Service wraps roadrunner container and manage set of parent within it. -type Service struct { - // Associated parent - Brokers map[string]Broker - - // brokers and routing config - cfg *Config - - // environment, logger and listeners - //env env.Environment - //log *logrus.Logger - lsn []func(event int, ctx interface{}) - - // server and server controller - //rr *roadrunner.Server - //cr roadrunner.Controller - - // task balancer - execPool chan Handler - - // registered brokers - serving int32 - //brokers service.Container - - // pipelines pipelines - mup sync.Mutex - pipelines map[*Pipeline]bool -} - -// Attach attaches cr. Currently only one cr is supported. -func (svc *Service) Attach(ctr roadrunner.Controller) { - svc.cr = ctr -} - -// AddListener attaches event listeners to the service and all underlying brokers. -func (svc *Service) AddListener(l func(event int, ctx interface{})) { - svc.lsn = append(svc.lsn, l) -} - -// Init configures job service. -func (svc *Service) Init( - cfg service.Config, - log *logrus.Logger, - env env.Environment, - rpc *rpc.Service, -) (ok bool, err error) { - svc.cfg = &Config{} - if err := svc.cfg.Hydrate(cfg); err != nil { - return false, err - } - - svc.env = env - svc.log = log - - if rpc != nil { - if err := rpc.Register(ID, &rpcServer{svc}); err != nil { - return false, err - } - } - - // limit the number of parallel threads - if svc.cfg.Workers.Command != "" { - svc.execPool = make(chan Handler, svc.cfg.Workers.Pool.NumWorkers) - for i := int64(0); i < svc.cfg.Workers.Pool.NumWorkers; i++ { - svc.execPool <- svc.exec - } - - svc.rr = roadrunner.NewServer(svc.cfg.Workers) - } - - svc.pipelines = make(map[*Pipeline]bool) - for _, p := range svc.cfg.pipelines { - svc.pipelines[p] = false - } - - // run all brokers in nested container - //svc.brokers = service.NewContainer(log) - //for name, b := range svc.Brokers { - // svc.brokers.Register(name, b) - // if ep, ok := b.(EventProvider); ok { - // ep.Listen(svc.throw) - // } - //} - - // init all broker configs - //if err := svc.brokers.Init(svc.cfg); err != nil { - // return false, err - //} - - // register all pipelines (per broker) - //for name, b := range svc.Brokers { - // for _, pipe := range svc.cfg.pipelines.Broker(name) { - // if err := b.Register(pipe); err != nil { - // return false, err - // } - // } - //} - - return true, nil -} - -// Serve serves local rr server and creates broker association. -func (svc *Service) Serve() error { - if svc.rr != nil { - if svc.env != nil { - if err := svc.env.Copy(svc.cfg.Workers); err != nil { - return err - } - } - - // ensure that workers aware of running within jobs - svc.cfg.Workers.SetEnv("rr_jobs", "true") - svc.rr.Listen(svc.throw) - - if svc.cr != nil { - svc.rr.Attach(svc.cr) - } - - if err := svc.rr.Start(); err != nil { - return err - } - defer svc.rr.Stop() - - // start pipelines of all the pipelines - for _, p := range svc.cfg.pipelines.Names(svc.cfg.Consume...) { - // start pipeline consuming - if err := svc.Consume(p, svc.execPool, svc.error); err != nil { - svc.Stop() - - return err - } - } - } - - atomic.StoreInt32(&svc.serving, 1) - defer atomic.StoreInt32(&svc.serving, 0) - - return svc.brokers.Serve() -} - -// Stop all pipelines and rr server. -func (svc *Service) Stop() { - if atomic.LoadInt32(&svc.serving) == 0 { - return - } - - wg := sync.WaitGroup{} - for _, p := range svc.cfg.pipelines.Names(svc.cfg.Consume...).Reverse() { - wg.Add(1) - - go func(p *Pipeline) { - defer wg.Done() - if err := svc.Consume(p, nil, nil); err != nil { - svc.throw(EventPipeError, &PipelineError{Pipeline: p, Caused: err}) - } - }(p) - } - - wg.Wait() - svc.brokers.Stop() -} - -// Server returns associated rr server (if any). -func (svc *Service) Server() *roadrunner.Server { - return svc.rr -} - -// Stat returns list of pipelines workers and their stats. -func (svc *Service) Stat(pipe *Pipeline) (stat *Stat, err error) { - b, ok := svc.Brokers[pipe.Broker()] - if !ok { - return nil, fmt.Errorf("undefined broker `%s`", pipe.Broker()) - } - - stat, err = b.Stat(pipe) - if err != nil { - return nil, err - } - - stat.Pipeline = pipe.Name() - stat.Broker = pipe.Broker() - - svc.mup.Lock() - stat.Consuming = svc.pipelines[pipe] - svc.mup.Unlock() - - return stat, err -} - -// Consume enables or disables pipeline pipelines using given handlers. -func (svc *Service) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error { - svc.mup.Lock() - - if execPool != nil { - if svc.pipelines[pipe] { - svc.mup.Unlock() - return nil - } - - svc.throw(EventPipeConsume, pipe) - svc.pipelines[pipe] = true - } else { - if !svc.pipelines[pipe] { - svc.mup.Unlock() - return nil - } - - svc.throw(EventPipeStop, pipe) - svc.pipelines[pipe] = false - } - - broker, ok := svc.Brokers[pipe.Broker()] - if !ok { - svc.mup.Unlock() - return fmt.Errorf("undefined broker `%s`", pipe.Broker()) - } - svc.mup.Unlock() - - if err := broker.Consume(pipe, execPool, errHandler); err != nil { - svc.mup.Lock() - svc.pipelines[pipe] = false - svc.mup.Unlock() - - svc.throw(EventPipeError, &PipelineError{Pipeline: pipe, Caused: err}) - - return err - } - - if execPool != nil { - svc.throw(EventPipeActive, pipe) - } else { - svc.throw(EventPipeStopped, pipe) - } - - return nil -} - -// Push job to associated broker and return job id. -func (svc *Service) Push(job *Job) (string, error) { - pipe, pOpts, err := svc.cfg.MatchPipeline(job) - if err != nil { - return "", err - } - - if pOpts != nil { - job.Options.Merge(pOpts) - } - - broker, ok := svc.Brokers[pipe.Broker()] - if !ok { - return "", fmt.Errorf("undefined broker `%s`", pipe.Broker()) - } - - id, err := broker.Push(pipe, job) - - if err != nil { - svc.throw(EventPushError, &JobError{Job: job, Caused: err}) - } else { - svc.throw(EventPushOK, &JobEvent{ID: id, Job: job}) - } - - return id, err -} - -// exec executed job using local RR server. Make sure that service is started. -func (svc *Service) exec(id string, j *Job) error { - start := time.Now() - svc.throw(EventJobStart, &JobEvent{ID: id, Job: j, start: start}) - - // ignore response for now, possibly add more routing options - _, err := svc.rr.Exec(&roadrunner.Payload{ - Body: j.Body(), - Context: j.Context(id), - }) - - if err == nil { - svc.throw(EventJobOK, &JobEvent{ - ID: id, - Job: j, - start: start, - elapsed: time.Since(start), - }) - } else { - svc.throw(EventJobError, &JobError{ - ID: id, - Job: j, - Caused: err, start: start, - elapsed: time.Since(start), - }) - } - - return err -} - -// register died job, can be used to move to fallback testQueue or log -func (svc *Service) error(id string, j *Job, err error) { - // nothing for now, possibly route to another pipeline -} - -// throw handles service, server and pool events. -func (svc *Service) throw(event int, ctx interface{}) { - for _, l := range svc.lsn { - l(event, ctx) - } - - if event == roadrunner.EventServerFailure { - // underlying rr server is dead, stop everything - svc.Stop() - } -} diff --git a/plugins/jobs/oooold/service_test.go b/plugins/jobs/oooold/service_test.go deleted file mode 100644 index a8e0e56d..00000000 --- a/plugins/jobs/oooold/service_test.go +++ /dev/null @@ -1,458 +0,0 @@ -package oooold - -import ( - "bytes" - "github.com/sirupsen/logrus" - "github.com/spf13/viper" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/env" - "github.com/stretchr/testify/assert" - "io/ioutil" - "syscall" - "testing" -) - -func viperConfig(cfg string) service.Config { - v := viper.New() - v.SetConfigType("json") - - err := v.ReadConfig(bytes.NewBuffer([]byte(cfg))) - if err != nil { - panic(err) - } - - return &configWrapper{v} -} - -// configWrapper provides interface bridge between v configs and service.Config. -type configWrapper struct { - v *viper.Viper -} - -// Get nested config section (sub-map), returns nil if section not found. -func (w *configWrapper) Get(key string) service.Config { - sub := w.v.Sub(key) - if sub == nil { - return nil - } - - return &configWrapper{sub} -} - -// Unmarshal unmarshal config data into given struct. -func (w *configWrapper) Unmarshal(out interface{}) error { - return w.v.Unmarshal(out) -} - -func jobs(container service.Container) *Service { - svc, _ := container.Get("jobs") - return svc.(*Service) -} - -func TestService_Init(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) -} - -func TestService_ServeStop(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("env", &env.Service{}) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - <-ready - c.Stop() -} - -func TestService_ServeError(t *testing.T) { - l := logrus.New() - l.Level = logrus.FatalLevel - - c := service.NewContainer(l) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "jobs":{ - "workers":{ - "command": "php tests/bad-consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - assert.Error(t, c.Serve()) -} - -func TestService_GetPipeline(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - assert.Equal(t, "ephemeral", jobs(c).cfg.pipelines.Get("default").Broker()) -} - -func TestService_StatPipeline(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - svc := jobs(c) - pipe := svc.cfg.pipelines.Get("default") - - stat, err := svc.Stat(pipe) - assert.NoError(t, err) - - assert.Equal(t, int64(0), stat.Queue) - assert.Equal(t, true, stat.Consuming) -} - -func TestService_StatNonConsumingPipeline(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": [] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - svc := jobs(c) - pipe := svc.cfg.pipelines.Get("default") - - stat, err := svc.Stat(pipe) - assert.NoError(t, err) - - assert.Equal(t, int64(0), stat.Queue) - assert.Equal(t, false, stat.Consuming) -} - -func TestService_DoJob(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - jobReady := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - - if event == EventJobOK { - close(jobReady) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - svc := jobs(c) - - id, err := svc.Push(&Job{ - Job: "spiral.jobs.tests.local.job", - Payload: `{"data":100}`, - Options: &Options{}, - }) - assert.NoError(t, err) - - <-jobReady - - data, err := ioutil.ReadFile("tests/local.job") - assert.NoError(t, err) - defer syscall.Unlink("tests/local.job") - - assert.Contains(t, string(data), id) -} - -func TestService_DoUndefinedJob(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - svc := jobs(c) - - _, err := svc.Push(&Job{ - Job: "spiral.jobs.tests.undefined", - Payload: `{"data":100}`, - Options: &Options{}, - }) - assert.Error(t, err) -} - -func TestService_DoJobIntoInvalidBroker(t *testing.T) { - l := logrus.New() - l.Level = logrus.FatalLevel - - c := service.NewContainer(l) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"undefined"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": [] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - svc := jobs(c) - - _, err := svc.Push(&Job{ - Job: "spiral.jobs.tests.local.job", - Payload: `{"data":100}`, - Options: &Options{}, - }) - assert.Error(t, err) -} - -func TestService_DoStatInvalidBroker(t *testing.T) { - l := logrus.New() - l.Level = logrus.FatalLevel - - c := service.NewContainer(l) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"undefined"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": [] - } -}`))) - - ready := make(chan interface{}) - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - svc := jobs(c) - - _, err := svc.Stat(svc.cfg.pipelines.Get("default")) - assert.Error(t, err) -} - -func TestService_DoErrorJob(t *testing.T) { - c := service.NewContainer(logrus.New()) - c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) - - assert.NoError(t, c.Init(viperConfig(`{ - "jobs":{ - "workers":{ - "command": "php tests/consumer.php", - "pool.numWorkers": 1 - }, - "pipelines":{"default":{"broker":"ephemeral"}}, - "dispatch": { - "spiral-jobs-tests-local-*.pipeline": "default" - }, - "consume": ["default"] - } -}`))) - - ready := make(chan interface{}) - jobReady := make(chan interface{}) - - var jobErr error - jobs(c).AddListener(func(event int, ctx interface{}) { - if event == EventBrokerReady { - close(ready) - } - - if event == EventJobError { - jobErr = ctx.(error) - close(jobReady) - } - }) - - go func() { c.Serve() }() - defer c.Stop() - <-ready - - svc := jobs(c) - - _, err := svc.Push(&Job{ - Job: "spiral.jobs.tests.local.errorJob", - Payload: `{"data":100}`, - Options: &Options{}, - }) - assert.NoError(t, err) - - <-jobReady - assert.Error(t, jobErr) - assert.Contains(t, jobErr.Error(), "something is wrong") -} diff --git a/plugins/jobs/pipeline_test.go b/plugins/jobs/pipeline_test.go index b80e75d0..c1f958df 100644 --- a/plugins/jobs/pipeline_test.go +++ b/plugins/jobs/pipeline_test.go @@ -1,9 +1,10 @@ package jobs import ( - "github.com/stretchr/testify/assert" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestPipeline_Map(t *testing.T) { diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 42203871..cda2a711 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -1,10 +1,14 @@ package jobs import ( + "context" + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" ) const ( @@ -15,10 +19,12 @@ type Plugin struct { cfg *Config log logger.Logger + workersPool pool.Pool + consumers map[string]Consumer } -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { const op = errors.Op("jobs_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) @@ -29,6 +35,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { return errors.E(op, err) } + p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, nil, nil) + if err != nil { + return errors.E(op, err) + } + p.consumers = make(map[string]Consumer) p.log = log return nil diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 5a0bbf4e..dbe7f808 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -5,4 +5,3 @@ import "github.com/spiral/roadrunner/v2/plugins/logger" type rpc struct { log logger.Logger } - |