diff options
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/.rr.yaml | 73 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/config.go (renamed from plugins/jobs/oooold/broker/amqp/config.go) | 19 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 1 | ||||
-rw-r--r-- | plugins/jobs/config.go | 51 | ||||
-rw-r--r-- | plugins/jobs/dispatcher.go | 47 | ||||
-rw-r--r-- | plugins/jobs/dispatcher_test.go | 53 | ||||
-rw-r--r-- | plugins/jobs/doc/jobs_arch.drawio | 2 | ||||
-rw-r--r-- | plugins/jobs/interface.go | 8 | ||||
-rw-r--r-- | plugins/jobs/job.go | 41 | ||||
-rw-r--r-- | plugins/jobs/job_options.go | 70 | ||||
-rw-r--r-- | plugins/jobs/job_options_test.go | 109 | ||||
-rw-r--r-- | plugins/jobs/job_test.go | 18 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/config_test.go | 27 | ||||
-rw-r--r-- | plugins/jobs/oooold/rpc.go | 1 | ||||
-rw-r--r-- | plugins/jobs/oooold/service.go | 34 | ||||
-rw-r--r-- | plugins/jobs/pipeline.go | 172 | ||||
-rw-r--r-- | plugins/jobs/pipeline_test.go | 89 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 29 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 8 |
19 files changed, 788 insertions, 64 deletions
diff --git a/plugins/jobs/.rr.yaml b/plugins/jobs/.rr.yaml new file mode 100644 index 00000000..1b84515f --- /dev/null +++ b/plugins/jobs/.rr.yaml @@ -0,0 +1,73 @@ +server: + command: "php worker.php" + +jobs: + # worker pool configuration + pool: + num_workers: 4 + + # rabbitmq and similar servers + amqp: + addr: amqp://guest:guest@localhost:5672/ + + # beanstalk configuration + beanstalk: + addr: tcp://localhost:11300 + + # amazon sqs configuration + sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://localhost:9324 + + # job destinations and options + dispatch: + spiral-jobs-tests-amqp-*.pipeline: amqp + spiral-jobs-tests-local-*.pipeline: local + spiral-jobs-tests-beanstalk-*.pipeline: beanstalk + spiral-jobs-tests-sqs-*.pipeline: sqs + + # list of broker pipelines associated with endpoints + pipelines: + local: + broker: ephemeral + + amqp: + broker: amqp + queue: default + + beanstalk: + broker: beanstalk + tube: default + + sqs: + broker: sqs + queue: default + declare: + MessageRetentionPeriod: 86400 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: ["local", "amqp", "beanstalk", "sqs"] + + +# monitors rr server(s) +limit: + # check worker state each second + interval: 1 + + # custom watch configuration for each service + services: + # monitor queue workers + jobs: + # maximum allowed memory consumption per worker (soft) + maxMemory: 100 + + # maximum time to live for the worker (soft) + TTL: 0 + + # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft) + idleTTL: 0 + + # max_execution_time (brutal) + execTTL: 60 diff --git a/plugins/jobs/oooold/broker/amqp/config.go b/plugins/jobs/brokers/amqp/config.go index 0ed3a50e..a60cb486 100644 --- a/plugins/jobs/oooold/broker/amqp/config.go +++ b/plugins/jobs/brokers/amqp/config.go @@ -1,10 +1,6 @@ package amqp -import ( - "fmt" - "github.com/spiral/roadrunner/service" - "time" -) +import "time" // Config defines sqs broker configuration. type Config struct { @@ -15,19 +11,6 @@ type Config struct { Timeout int } -// Hydrate config values. -func (c *Config) Hydrate(cfg service.Config) error { - if err := cfg.Unmarshal(c); err != nil { - return err - } - - if c.Addr == "" { - return fmt.Errorf("AMQP address is missing") - } - - return nil -} - // TimeoutDuration returns number of seconds allowed to redial func (c *Config) TimeoutDuration() time.Duration { timeout := c.Timeout diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go new file mode 100644 index 00000000..0e8d02ac --- /dev/null +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -0,0 +1 @@ +package amqp diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go new file mode 100644 index 00000000..5c5ad400 --- /dev/null +++ b/plugins/jobs/config.go @@ -0,0 +1,51 @@ +package jobs + +import ( + "github.com/spiral/errors" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" +) + +// Config defines settings for job broker, workers and job-pipeline mapping. +type Config struct { + // Workers configures roadrunner server and worker busy. + //Workers *roadrunner.ServerConfig + pool poolImpl.Config + + // Dispatch defines where and how to match jobs. + Dispatch map[string]*Options + + // Pipelines defines mapping between PHP job pipeline and associated job broker. + Pipelines map[string]*Pipeline + + // Consuming specifies names of pipelines to be consumed on service start. + Consume []string + + // parent config for broken options. + pipelines Pipelines + route Dispatcher +} + +// MatchPipeline locates the pipeline associated with the job. +func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error) { + const op = errors.Op("config_match_pipeline") + opt := c.route.match(job) + + pipe := "" + if job.Options != nil { + pipe = job.Options.Pipeline + } + + if pipe == "" && opt != nil { + pipe = opt.Pipeline + } + + if pipe == "" { + return nil, nil, errors.E(op, errors.Errorf("unable to locate pipeline for `%s`", job.Job)) + } + + if p := c.pipelines.Get(pipe); p != nil { + return p, opt, nil + } + + return nil, nil, errors.E(op, errors.Errorf("undefined pipeline `%s`", pipe)) +} diff --git a/plugins/jobs/dispatcher.go b/plugins/jobs/dispatcher.go new file mode 100644 index 00000000..9fde8fac --- /dev/null +++ b/plugins/jobs/dispatcher.go @@ -0,0 +1,47 @@ +package jobs + +import ( + "strings" +) + +var separators = []string{"/", "-", "\\"} + +// Dispatcher provides ability to automatically locate the pipeline for the specific job +// and update job options (if none set). +type Dispatcher map[string]*Options + +// pre-compile patterns +func initDispatcher(routes map[string]*Options) Dispatcher { + dispatcher := make(Dispatcher) + for pattern, opts := range routes { + pattern = strings.ToLower(pattern) + pattern = strings.Trim(pattern, "-.*") + + for _, s := range separators { + pattern = strings.Replace(pattern, s, ".", -1) + } + + dispatcher[pattern] = opts + } + + return dispatcher +} + +// match clarifies target job pipeline and other job options. Can return nil. +func (dispatcher Dispatcher) match(job *Job) (found *Options) { + var best = 0 + + jobName := strings.ToLower(job.Job) + for pattern, opts := range dispatcher { + if strings.HasPrefix(jobName, pattern) && len(pattern) > best { + found = opts + best = len(pattern) + } + } + + if best == 0 { + return nil + } + + return found +} diff --git a/plugins/jobs/dispatcher_test.go b/plugins/jobs/dispatcher_test.go new file mode 100644 index 00000000..59e3fd4e --- /dev/null +++ b/plugins/jobs/dispatcher_test.go @@ -0,0 +1,53 @@ +package jobs + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_Map_All(t *testing.T) { + m := initDispatcher(map[string]*Options{"default": {Pipeline: "default"}}) + assert.Equal(t, "default", m.match(&Job{Job: "default"}).Pipeline) +} + +func Test_Map_Miss(t *testing.T) { + m := initDispatcher(map[string]*Options{"some.*": {Pipeline: "default"}}) + + assert.Nil(t, m.match(&Job{Job: "miss"})) +} + +func Test_Map_Best(t *testing.T) { + m := initDispatcher(map[string]*Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline) +} + +func Test_Map_BestUpper(t *testing.T) { + m := initDispatcher(map[string]*Options{ + "some.*": {Pipeline: "default"}, + "some.Other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.match(&Job{Job: "some.OTHER"}).Pipeline) + assert.Equal(t, "other", m.match(&Job{Job: "Some.other.job"}).Pipeline) +} + +func Test_Map_BestReversed(t *testing.T) { + m := initDispatcher(map[string]*Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline) + assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) +} diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio index 871df916..ee923d29 100644 --- a/plugins/jobs/doc/jobs_arch.drawio +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-06-16T08:42:30.343Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="TfWMGqp6GgpLWRWY4Wkn" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vtbd5s4EP41Pqf7EB/E3Y9x7Dbdbbdusnva7psAYWgBsUJO7Pz6lUBchRMaY7vp1j5J0OiCNPPNp9FAJtpVvH1DYBq8xx6KJqribSfaYqKqwFIA+8MlOyEBql5I1iT0hKwW3IYPSAgVId2EHspaDSnGEQ3TttDFSYJc2pJBQvB9u5mPo/ZdU7hGkuDWhZEs/RR6NCikqqIodcU1CtcB7dbEsGwtBFkAPXzfEGnLiXZFMKbFVby9QhFXX6mYot/rPbXVzAhK6JAOn5Nkew1wjO4eaGK9+Yj+UdyL0hx3MNqIJYvZ0l2pA4I3iYf4KMpEm98HIUW3KXR57T0zO5MFNI5YCbBLP4yiKxxhkvfVPIhs32XyjBL8DTVqTNdGjs9qxAQQoWi7d2mgUhjDGmKLoGTHmogO+kzouISZXeLnvmE0W8iChr1AaR0ogLKuBq9VyS6ENr9Hs5IekcegJYqY0ACvcQKjZS2dtzVdt3mHcSr0+xVRuhN+AjcUt7XP9EV2n3n/qVEWvzTrFlsxeFHaiVIxVz7Bxy3A1oM3xEWPrVssnEKyRvSRhntMSlAEaXjXnkifdUTXFQ7ZFCsoGIbRhoJpdSxcTEx0a/qLNFIHVOasM1KhC2mkS0LgrtEs5Q0yCVDVmp+PMe0sGNuGtAExVvpSYopd1wDjhV0TbefFJRgIS/1IsOyASTeHwXIsqACJ529WbCTlgv2s/r69Hpf1DWR7eh/r26qjmeZIrG93dGqoMusDvYf1tWOR/uysDtl0x8o5v88hldM5pD50n1D6UXCgR2pd9FRwOpVH/mLvI4DFPA5967OpPTMNA1iqDjRr1oaOBmbTWeNjnxRIcgj/HlI34MT+drV89/bP5ajkjgCjd6uP3GempcGxyL0b0ut95K72kPvRInrrnP4KWuRu/fDsbgx02EP99SCL2i+OgUHLnLV1fxyDHnqsO8ighkSFE9WM2HTnacvO5r8bnnaZx2w1YTLRLlmtkm7Z71xpSiG/oNykvE5v1DFOoxcwCtein8tUiUi72kMuJmy9WLThkCFRmKD61uxqLf7mE3RKwe/YYWq5vKVk49INQWUDphCn24nJ0q4sIF3Js9YeIZ8WlTav7E6bzTLvscjHJmGyrhZS3X8FdxGG3pPtPqRcU1nVrir3LbLjsHyTCV0YXRYGWeQmmwvzLIpFzDFr5Ud58o1vZnxPwwkVHgpUUX4N4zDinnONojvER310H/QN/u1NbeUfMWpDXnzG2R8ttZudUHr2R7Nnf+xmMUZzPvMncL4Sej+8/63CFBVresK5Fox4d0+2uqQUxSmtfTCPVfcv8hhLuuEobE+3MQtHUuup5vVXGCO8oc+Z1C+yKihHSqUOJSsAjsVWAEjmOVs4PziaV8+WPVUGhn+Hpk8Ps6kibUE3zHPRND8Qv/qKnd9GPQr7tovc3qdbjm3ohjKO96hm9yg8G3gUPlqeE6iSoglim2XSjN46iuZ7c1ubJfGJTVybd/kxDj2v8DuUhQ/QyYfiWhWPVNi4xnxiLPhYzNUywZGSQRKcoHFsoUlpCVu2xeyUWQkgp39SFhtMVJ7dxz+zLYyuX2imbIu+h77Hs4V8/mzEaRcsLoAxZxQRHbD4I2VKEzGZsQi56n3OPdZ8Yi3+L1br8SBwWhc6a2bvhWXigTkwEgD2gaHAs1LxjH3PmIoH8hk4RuyOXNc1Eexj5ReYlVfPH4qc95nrC3Nee6DzNt5QO6HzgrM+RyuV03DeN4hPfs49iieZfIJjrr0A5apYhxnbdRFPcDp5E54+uQuh8PZp0e/VyGcO31f7zxye6ZjGsRwd9O3SfY7efa9lvBfq5MPdapMFr+potzjfKcxX2e0bdhtR+2d6n1EZqn39aNqXXyDao96fI0q1uvjXZQsYpwxSVfnILakebygPMa6ql6EVOa/Hvq/5bedrAr0Q1XVCeR3AV817Ne3BLKg8qbTqO+igaMWsVKTgFw6mlPGmbPZ2+rNCR9P7ejKf5c7ObwmztFioH275PArAILK8QwVucowEMOUd4u2av6U+hfeZPt1k+b3GgEqZSaxePtZkqFi2DBXLPhZUNJkpr1dM8OnDzR/LGwk2pYJSgl2UZU+TogPdb+ucRj8UiNuzVZ0kPWbqHVe1dXtqDKTL789EsmL9zn4RetT/+6At/wM=</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-06-21T13:57:33.772Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="eDaFKmf6xAQVYDDexY3m" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvrc5s4EP9rPONmJh7e4I9x7CTttVc3uZu2900GYWgBcUL4kb/+JBBPYcdNsJ32mkxra/VA+/jtrlZkoF6Hm1sMYu8DcmAwUCRnM1CnA0WRVVMz6ScjbXOSJUtSTlli3+HDKsKD/wg5sRiW+g5MGgMJQgHx4ybRRlEEbdKgAYzRujnMRUHzqTFYQoHwYINApH72HeIVu5MKPljPHfSXHhG6QlCM54TEAw5a10jqbKBeY4RI/i3cXMOASbAQTT7vZkdvuTcMI3LIhC9RtLmTUQhXjyQybz/BfyT7UtHyZVYgSDnTfLdkW0gBozRyIFtFGqiTtecT+BADm/WuqeYpzSNhQFsy/er6QXCNAoSzuaoDoOXalJ4QjL7DWo9hW3Dh0h6RD87aCmICNzUS5+sWUiYI3tIhvFe1uIy5oY3NwoLWldqUYpBXV1ihHcBNZVkuXomSfuHS/BHJCnKEDjUu3kSYeGiJIhDMKuqkKelqzHuEYi7fb5CQLUcKSAlqSp+KEG+/sPkjvWh+rfdNN3zxvLXlrZ06SFCKbbiPTc4nAXgJyZ6BfEEmhL0qxTAAxF81EdilHT51jny659IUNE1vmoJutjSc75RPq+NFWKllVPq4tVIuHGGlK4zBtjYsZgMSwaBKnp9vY+pZbGzjk5qJ0dbXwqbo98rAWGNbt7aT2qV8oFlqpzLLljGpxmFm2ZepFD618vP3c7qSdEn/zf9+uKOL5h90Q9vI7jcI6NBytK4gYCkL1TB6CgJmS8SaIgYBWesIAuqxYsD4rPiso7PE6o/hUzoaPrVDw4Z0GoAqbespc4pTAfS3M3+5sRgn8ubqeGSNDV2XTUWjB51x03SUdlJ5ZNMRc/gPgNge8+xv57P3b/+c9erOoUwdutnlzseGqYK+3Hk7p1e73LnS4c6PltKb50So3HDn5mvz5/qBEO0doS/SqPXT+Vy5oc5Ku2dTaO/nuhcpVBdc4UAxArr/SdzQs/Fvyuouk5Cy50cD9Yr2SvGG/p8JTcrpl4SplPVptT7qwMglCPwln2dT2ULc7HagjTDlF/ExzGRw4EewejT9tuSf2QYXBeEdWlCxXD0QnNokxbAYQAWyaE+itLhN83Cb8izeA+iSvNNine1t011mM6bZ2tiPliUj5fPnYBsg4Dw57mPMJJWU48p2F5MtwLKI4tsguMoVMs1UNuHqmeZMTBAd5QZZ9Y0FMxbTUEQ4QmWFt29A6AcMOXcwWEG26t446Orst7O2lf3wVWv0/Kef+GjI7fKE1BEfjY742C5j9AY+4xcAX2F6rx5/cz+GOU9PgGtKHe/2yVFXhMAwJhUGs1x1N5PHYOmeWWFzu7VdLASxnmpff/khRCl5zqZ+O6us1xJqqYc6K1k+lreSZUE9Z0vnD87mlVOVT6UD07/e66cv06kkhKB7ilw4yg7Ew29o8abXo7Br2dDuvN5aWLqm79XB4eiR9fZReHzgUfholU1ZEQSNIQ2WUT17awmaxeamNAvHx4O4Omn7x9B3nBx3MPEfwSJbikmV36nQdfXJQJ+ytSjUEu4jBYVEKIL96EIRyhKWqIvxKasSslj+iWluMFBYeR/9yrrQ2rhQDFEXXbe+x9OFeP6s5WmXNC8AIfMoPDug+UdMhcZzMn3qM9G7zPeYk4E5/b9orQNB8mkhdNbK3uuuvRc19acLQVbfqcCziu9yO+Yd+95GPPWGkD6RibuC/i4//BPW4eXzJx/nvVd93XC1DoRrobEzw1U68V1ZIZ8aXG8h2+6EYYgVklyMQiZAD2bML/2ERlbIipiLbAgrkax8wPE9yucNez5XuK7Sfa5wjIWhHwvaUlck7oJ2++WV/t6aEw9w8zTxhlVGm5/hJIpO+via3nqU/nleWrTGh0pfO5r0xbeEdoj318hEjbb9a6IG9FMmoop4rBZEj1LCkorr8p1nSazd0d8b9tjJEgPHh1UfF17L4MvhnZJ2QOKVSCq0+h4sYDCnWsrL7NMFIoT6TVHtzRJnaR119HVUN4tYzh4Jkjhn1PU3bB+5wUA8W8HcbjIb8UDMJoSbJXsbfQTWiTZKk+xZfZiKJDdNxVRFUzEt0VQKWv+mooqe8m5OCZ8/3v8xuxfMphBQjJENk+Rpp7gA9vdl5kY/5ha3I1SdpASmqy2ompo10g90lz1UGx8N11mr1o32Pbz6eptu5ub47lLUwOnuuzousvLSQiC2qrJCq/7Qau240SgvvajLSdIwS5KGeS705kx3Y52iEi5z9skxGyVI8YEAMrwoSjYszxhe0IQjGbGOPP+AGKMG3/1cQR2Loyx7uqiKUIyFi3d5DjUsLgOf4muHK8GZW677DPHqqsvJ7AB/h4vYUxIftxxC+QJnzR2UrwQ33MEz0ifarP5QJz+KVH/zpM7+Aw==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go new file mode 100644 index 00000000..060326c8 --- /dev/null +++ b/plugins/jobs/interface.go @@ -0,0 +1,8 @@ +package jobs + + +// todo naming +type Consumer interface { + Push() + Stat() +} diff --git a/plugins/jobs/job.go b/plugins/jobs/job.go new file mode 100644 index 00000000..8458b25b --- /dev/null +++ b/plugins/jobs/job.go @@ -0,0 +1,41 @@ +package jobs + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/utils" +) + +//// Handler handles job execution. +//type Handler func(id string, j *Job) error +// +//// ErrorHandler handles job execution errors. +//type ErrorHandler func(id string, j *Job, err error) + +// Job carries information about single job. +type Job struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Body packs job payload into binary payload. +func (j *Job) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (j *Job) Context(id string) []byte { + ctx, _ := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + }{ID: id, Job: j.Job}, + ) + + return ctx +} diff --git a/plugins/jobs/job_options.go b/plugins/jobs/job_options.go new file mode 100644 index 00000000..d4c6f0d2 --- /dev/null +++ b/plugins/jobs/job_options.go @@ -0,0 +1,70 @@ +package jobs + +import "time" + +// Options carry information about how to handle given job. +type Options struct { + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int `json:"delay,omitempty"` + + // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). + // Minimum valuable value is 2. + Attempts int `json:"maxAttempts,omitempty"` + + // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. + RetryDelay int `json:"retryDelay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout int `json:"timeout,omitempty"` +} + +// Merge merges job options. +func (o *Options) Merge(from *Options) { + if o.Pipeline == "" { + o.Pipeline = from.Pipeline + } + + if o.Attempts == 0 { + o.Attempts = from.Attempts + } + + if o.Timeout == 0 { + o.Timeout = from.Timeout + } + + if o.RetryDelay == 0 { + o.RetryDelay = from.RetryDelay + } + + if o.Delay == 0 { + o.Delay = from.Delay + } +} + +// CanRetry must return true if broker is allowed to re-run the job. +func (o *Options) CanRetry(attempt int) bool { + // Attempts 1 and 0 has identical effect + return o.Attempts > (attempt + 1) +} + +// RetryDuration returns retry delay duration in a form of time.Duration. +func (o *Options) RetryDuration() time.Duration { + return time.Second * time.Duration(o.RetryDelay) +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} diff --git a/plugins/jobs/job_options_test.go b/plugins/jobs/job_options_test.go new file mode 100644 index 00000000..8caaa935 --- /dev/null +++ b/plugins/jobs/job_options_test.go @@ -0,0 +1,109 @@ +package jobs + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestOptions_CanRetry(t *testing.T) { + opts := &Options{Attempts: 0} + + assert.False(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) +} + +func TestOptions_CanRetry_SameValue(t *testing.T) { + opts := &Options{Attempts: 1} + + assert.False(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) +} + +func TestOptions_CanRetry_Value(t *testing.T) { + opts := &Options{Attempts: 2} + + assert.True(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) + assert.False(t, opts.CanRetry(2)) +} + +func TestOptions_CanRetry_Value3(t *testing.T) { + opts := &Options{Attempts: 3} + + assert.True(t, opts.CanRetry(0)) + assert.True(t, opts.CanRetry(1)) + assert.False(t, opts.CanRetry(2)) +} + +func TestOptions_RetryDuration(t *testing.T) { + opts := &Options{RetryDelay: 0} + assert.Equal(t, time.Duration(0), opts.RetryDuration()) +} + +func TestOptions_RetryDuration2(t *testing.T) { + opts := &Options{RetryDelay: 1} + assert.Equal(t, time.Second, opts.RetryDuration()) +} + +func TestOptions_DelayDuration(t *testing.T) { + opts := &Options{Delay: 0} + assert.Equal(t, time.Duration(0), opts.DelayDuration()) +} + +func TestOptions_DelayDuration2(t *testing.T) { + opts := &Options{Delay: 1} + assert.Equal(t, time.Second, opts.DelayDuration()) +} + +func TestOptions_TimeoutDuration(t *testing.T) { + opts := &Options{Timeout: 0} + assert.Equal(t, time.Minute*30, opts.TimeoutDuration()) +} + +func TestOptions_TimeoutDuration2(t *testing.T) { + opts := &Options{Timeout: 1} + assert.Equal(t, time.Second, opts.TimeoutDuration()) +} + +func TestOptions_Merge(t *testing.T) { + opts := &Options{} + + opts.Merge(&Options{ + Pipeline: "pipeline", + Delay: 2, + Timeout: 1, + Attempts: 1, + RetryDelay: 1, + }) + + assert.Equal(t, "pipeline", opts.Pipeline) + assert.Equal(t, 1, opts.Attempts) + assert.Equal(t, 2, opts.Delay) + assert.Equal(t, 1, opts.Timeout) + assert.Equal(t, 1, opts.RetryDelay) +} + +func TestOptions_MergeKeepOriginal(t *testing.T) { + opts := &Options{ + Pipeline: "default", + Delay: 10, + Timeout: 10, + Attempts: 10, + RetryDelay: 10, + } + + opts.Merge(&Options{ + Pipeline: "pipeline", + Delay: 2, + Timeout: 1, + Attempts: 1, + RetryDelay: 1, + }) + + assert.Equal(t, "default", opts.Pipeline) + assert.Equal(t, 10, opts.Attempts) + assert.Equal(t, 10, opts.Delay) + assert.Equal(t, 10, opts.Timeout) + assert.Equal(t, 10, opts.RetryDelay) +} diff --git a/plugins/jobs/job_test.go b/plugins/jobs/job_test.go new file mode 100644 index 00000000..e1938eca --- /dev/null +++ b/plugins/jobs/job_test.go @@ -0,0 +1,18 @@ +package jobs + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestJob_Body(t *testing.T) { + j := &Job{Payload: "hello"} + + assert.Equal(t, []byte("hello"), j.Body()) +} + +func TestJob_Context(t *testing.T) { + j := &Job{Job: "job"} + + assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id")) +} diff --git a/plugins/jobs/oooold/broker/amqp/config_test.go b/plugins/jobs/oooold/broker/amqp/config_test.go deleted file mode 100644 index 1abbb55d..00000000 --- a/plugins/jobs/oooold/broker/amqp/config_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package amqp - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func Test_Config_Hydrate_Error(t *testing.T) { - cfg := &mockCfg{`{"dead`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error2(t *testing.T) { - cfg := &mockCfg{`{"addr":""}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} diff --git a/plugins/jobs/oooold/rpc.go b/plugins/jobs/oooold/rpc.go index 42730a68..cc61fb7d 100644 --- a/plugins/jobs/oooold/rpc.go +++ b/plugins/jobs/oooold/rpc.go @@ -2,7 +2,6 @@ package oooold import ( "fmt" - "github.com/spiral/roadrunner/util" ) type rpcServer struct{ svc *Service } diff --git a/plugins/jobs/oooold/service.go b/plugins/jobs/oooold/service.go index 4244ed1a..7cfcff31 100644 --- a/plugins/jobs/oooold/service.go +++ b/plugins/jobs/oooold/service.go @@ -91,27 +91,27 @@ func (svc *Service) Init( } // run all brokers in nested container - svc.brokers = service.NewContainer(log) - for name, b := range svc.Brokers { - svc.brokers.Register(name, b) - if ep, ok := b.(EventProvider); ok { - ep.Listen(svc.throw) - } - } + //svc.brokers = service.NewContainer(log) + //for name, b := range svc.Brokers { + // svc.brokers.Register(name, b) + // if ep, ok := b.(EventProvider); ok { + // ep.Listen(svc.throw) + // } + //} // init all broker configs - if err := svc.brokers.Init(svc.cfg); err != nil { - return false, err - } + //if err := svc.brokers.Init(svc.cfg); err != nil { + // return false, err + //} // register all pipelines (per broker) - for name, b := range svc.Brokers { - for _, pipe := range svc.cfg.pipelines.Broker(name) { - if err := b.Register(pipe); err != nil { - return false, err - } - } - } + //for name, b := range svc.Brokers { + // for _, pipe := range svc.cfg.pipelines.Broker(name) { + // if err := b.Register(pipe); err != nil { + // return false, err + // } + // } + //} return true, nil } diff --git a/plugins/jobs/pipeline.go b/plugins/jobs/pipeline.go new file mode 100644 index 00000000..bfd2e18c --- /dev/null +++ b/plugins/jobs/pipeline.go @@ -0,0 +1,172 @@ +package jobs + +import ( + "time" + + "github.com/spiral/errors" +) + +// Pipelines is list of Pipeline. + +type Pipelines []*Pipeline + +func initPipelines(pipes map[string]*Pipeline) (Pipelines, error) { + const op = errors.Op("pipeline_init") + out := make(Pipelines, 0) + + for name, pipe := range pipes { + if pipe.Broker() == "" { + return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker")) + } + + p := pipe.With("name", name) + out = append(out, &p) + } + + return out, nil +} + +// Reverse returns pipelines in reversed order. +func (ps Pipelines) Reverse() Pipelines { + out := make(Pipelines, len(ps)) + + for i, p := range ps { + out[len(ps)-i-1] = p + } + + return out +} + +// Broker return pipelines associated with specific broker. +func (ps Pipelines) Broker(broker string) Pipelines { + out := make(Pipelines, 0) + + for _, p := range ps { + if p.Broker() != broker { + continue + } + + out = append(out, p) + } + + return out +} + +// Names returns only pipelines with specified names. +func (ps Pipelines) Names(only ...string) Pipelines { + out := make(Pipelines, 0) + + for _, name := range only { + for _, p := range ps { + if p.Name() == name { + out = append(out, p) + } + } + } + + return out +} + +// Get returns pipeline by it'svc name. +func (ps Pipelines) Get(name string) *Pipeline { + // possibly optimize + for _, p := range ps { + if p.Name() == name { + return p + } + } + + return nil +} + +// Pipeline defines pipeline options. +type Pipeline map[string]interface{} + +// With pipeline value. Immutable. +func (p Pipeline) With(name string, value interface{}) Pipeline { + out := make(map[string]interface{}) + for k, v := range p { + out[k] = v + } + out[name] = value + + return out +} + +// Name returns pipeline name. +func (p Pipeline) Name() string { + return p.String("name", "") +} + +// Broker associated with the pipeline. +func (p Pipeline) Broker() string { + return p.String("broker", "") +} + +// Has checks if value presented in pipeline. +func (p Pipeline) Has(name string) bool { + if _, ok := p[name]; ok { + return true + } + + return false +} + +// Map must return nested map value or empty config. +func (p Pipeline) Map(name string) Pipeline { + out := make(map[string]interface{}) + + if value, ok := p[name]; ok { + if m, ok := value.(map[string]interface{}); ok { + for k, v := range m { + out[k] = v + } + } + } + + return out +} + +// Bool must return option value as string or return default value. +func (p Pipeline) Bool(name string, d bool) bool { + if value, ok := p[name]; ok { + if b, ok := value.(bool); ok { + return b + } + } + + return d +} + +// String must return option value as string or return default value. +func (p Pipeline) String(name string, d string) string { + if value, ok := p[name]; ok { + if str, ok := value.(string); ok { + return str + } + } + + return d +} + +// Integer must return option value as string or return default value. +func (p Pipeline) Integer(name string, d int) int { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return str + } + } + + return d +} + +// Duration must return option value as time.Duration (seconds) or return default value. +func (p Pipeline) Duration(name string, d time.Duration) time.Duration { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return time.Second * time.Duration(str) + } + } + + return d +} diff --git a/plugins/jobs/pipeline_test.go b/plugins/jobs/pipeline_test.go new file mode 100644 index 00000000..b80e75d0 --- /dev/null +++ b/plugins/jobs/pipeline_test.go @@ -0,0 +1,89 @@ +package jobs + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestPipeline_Map(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} + + assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0)) + assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0)) +} + +func TestPipeline_MapString(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}} + + assert.Equal(t, "default", pipe.Map("options").String("alias", "")) + assert.Equal(t, "", pipe.Map("other").String("alias", "")) +} + +func TestPipeline_Bool(t *testing.T) { + pipe := Pipeline{"value": true} + + assert.Equal(t, true, pipe.Bool("value", false)) + assert.Equal(t, true, pipe.Bool("other", true)) +} + +func TestPipeline_String(t *testing.T) { + pipe := Pipeline{"value": "value"} + + assert.Equal(t, "value", pipe.String("value", "")) + assert.Equal(t, "value", pipe.String("other", "value")) +} + +func TestPipeline_Integer(t *testing.T) { + pipe := Pipeline{"value": 1} + + assert.Equal(t, 1, pipe.Integer("value", 0)) + assert.Equal(t, 1, pipe.Integer("other", 1)) +} + +func TestPipeline_Duration(t *testing.T) { + pipe := Pipeline{"value": 1} + + assert.Equal(t, time.Second, pipe.Duration("value", 0)) + assert.Equal(t, time.Second, pipe.Duration("other", time.Second)) +} + +func TestPipeline_Has(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} + + assert.Equal(t, true, pipe.Has("options")) + assert.Equal(t, false, pipe.Has("other")) +} + +func TestPipeline_FilterBroker(t *testing.T) { + pipes := Pipelines{ + &Pipeline{"name": "first", "broker": "a"}, + &Pipeline{"name": "second", "broker": "a"}, + &Pipeline{"name": "third", "broker": "b"}, + &Pipeline{"name": "forth", "broker": "b"}, + } + + filtered := pipes.Names("first", "third") + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "a", filtered[0].Broker()) + assert.Equal(t, "b", filtered[1].Broker()) + + filtered = pipes.Names("first", "third").Reverse() + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "a", filtered[1].Broker()) + assert.Equal(t, "b", filtered[0].Broker()) + + filtered = pipes.Broker("a") + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "first", filtered[0].Name()) + assert.Equal(t, "second", filtered[1].Name()) + + filtered = pipes.Broker("a").Reverse() + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "first", filtered[1].Name()) + assert.Equal(t, "second", filtered[0].Name()) +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index e708d0ca..42203871 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -1,6 +1,7 @@ package jobs import ( + endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -11,11 +12,25 @@ const ( ) type Plugin struct { + cfg *Config + log logger.Logger + + consumers map[string]Consumer } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("jobs_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, err) + } + p.consumers = make(map[string]Consumer) + p.log = log return nil } @@ -29,8 +44,22 @@ func (p *Plugin) Stop() error { return nil } +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.CollectMQBrokers, + } +} + +func (p *Plugin) CollectMQBrokers(name endure.Named, c Consumer) { + p.consumers[name.Name()] = c +} + func (p *Plugin) Available() {} func (p *Plugin) Name() string { return PluginName } + +func (p *Plugin) RPC() interface{} { + return &rpc{log: p.log} +} diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go new file mode 100644 index 00000000..5a0bbf4e --- /dev/null +++ b/plugins/jobs/rpc.go @@ -0,0 +1,8 @@ +package jobs + +import "github.com/spiral/roadrunner/v2/plugins/logger" + +type rpc struct { + log logger.Logger +} + |