summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-21 17:01:39 +0300
committerValery Piashchynski <[email protected]>2021-06-21 17:01:39 +0300
commit41bb9fa5938125217a075c60f1e39dc3a9a27537 (patch)
treece2997caa62f90279d85f6aa2397996f80791893 /plugins/jobs
parentbdcfdd28d705e401973da2beb8a11543e362bda4 (diff)
- Rework dispatcher, pipeline, job (not completely)
Create a config sample with RR2 support. Progress on root JOBS plugin. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/.rr.yaml73
-rw-r--r--plugins/jobs/brokers/amqp/config.go (renamed from plugins/jobs/oooold/broker/amqp/config.go)19
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go1
-rw-r--r--plugins/jobs/config.go51
-rw-r--r--plugins/jobs/dispatcher.go47
-rw-r--r--plugins/jobs/dispatcher_test.go53
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio2
-rw-r--r--plugins/jobs/interface.go8
-rw-r--r--plugins/jobs/job.go41
-rw-r--r--plugins/jobs/job_options.go70
-rw-r--r--plugins/jobs/job_options_test.go109
-rw-r--r--plugins/jobs/job_test.go18
-rw-r--r--plugins/jobs/oooold/broker/amqp/config_test.go27
-rw-r--r--plugins/jobs/oooold/rpc.go1
-rw-r--r--plugins/jobs/oooold/service.go34
-rw-r--r--plugins/jobs/pipeline.go172
-rw-r--r--plugins/jobs/pipeline_test.go89
-rw-r--r--plugins/jobs/plugin.go29
-rw-r--r--plugins/jobs/rpc.go8
19 files changed, 788 insertions, 64 deletions
diff --git a/plugins/jobs/.rr.yaml b/plugins/jobs/.rr.yaml
new file mode 100644
index 00000000..1b84515f
--- /dev/null
+++ b/plugins/jobs/.rr.yaml
@@ -0,0 +1,73 @@
+server:
+ command: "php worker.php"
+
+jobs:
+ # worker pool configuration
+ pool:
+ num_workers: 4
+
+ # rabbitmq and similar servers
+ amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+ # beanstalk configuration
+ beanstalk:
+ addr: tcp://localhost:11300
+
+ # amazon sqs configuration
+ sqs:
+ key: api-key
+ secret: api-secret
+ region: us-west-1
+ endpoint: http://localhost:9324
+
+ # job destinations and options
+ dispatch:
+ spiral-jobs-tests-amqp-*.pipeline: amqp
+ spiral-jobs-tests-local-*.pipeline: local
+ spiral-jobs-tests-beanstalk-*.pipeline: beanstalk
+ spiral-jobs-tests-sqs-*.pipeline: sqs
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ local:
+ broker: ephemeral
+
+ amqp:
+ broker: amqp
+ queue: default
+
+ beanstalk:
+ broker: beanstalk
+ tube: default
+
+ sqs:
+ broker: sqs
+ queue: default
+ declare:
+ MessageRetentionPeriod: 86400
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: ["local", "amqp", "beanstalk", "sqs"]
+
+
+# monitors rr server(s)
+limit:
+ # check worker state each second
+ interval: 1
+
+ # custom watch configuration for each service
+ services:
+ # monitor queue workers
+ jobs:
+ # maximum allowed memory consumption per worker (soft)
+ maxMemory: 100
+
+ # maximum time to live for the worker (soft)
+ TTL: 0
+
+ # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft)
+ idleTTL: 0
+
+ # max_execution_time (brutal)
+ execTTL: 60
diff --git a/plugins/jobs/oooold/broker/amqp/config.go b/plugins/jobs/brokers/amqp/config.go
index 0ed3a50e..a60cb486 100644
--- a/plugins/jobs/oooold/broker/amqp/config.go
+++ b/plugins/jobs/brokers/amqp/config.go
@@ -1,10 +1,6 @@
package amqp
-import (
- "fmt"
- "github.com/spiral/roadrunner/service"
- "time"
-)
+import "time"
// Config defines sqs broker configuration.
type Config struct {
@@ -15,19 +11,6 @@ type Config struct {
Timeout int
}
-// Hydrate config values.
-func (c *Config) Hydrate(cfg service.Config) error {
- if err := cfg.Unmarshal(c); err != nil {
- return err
- }
-
- if c.Addr == "" {
- return fmt.Errorf("AMQP address is missing")
- }
-
- return nil
-}
-
// TimeoutDuration returns number of seconds allowed to redial
func (c *Config) TimeoutDuration() time.Duration {
timeout := c.Timeout
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
new file mode 100644
index 00000000..0e8d02ac
--- /dev/null
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -0,0 +1 @@
+package amqp
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
new file mode 100644
index 00000000..5c5ad400
--- /dev/null
+++ b/plugins/jobs/config.go
@@ -0,0 +1,51 @@
+package jobs
+
+import (
+ "github.com/spiral/errors"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+)
+
+// Config defines settings for job broker, workers and job-pipeline mapping.
+type Config struct {
+ // Workers configures roadrunner server and worker busy.
+ //Workers *roadrunner.ServerConfig
+ pool poolImpl.Config
+
+ // Dispatch defines where and how to match jobs.
+ Dispatch map[string]*Options
+
+ // Pipelines defines mapping between PHP job pipeline and associated job broker.
+ Pipelines map[string]*Pipeline
+
+ // Consuming specifies names of pipelines to be consumed on service start.
+ Consume []string
+
+ // parent config for broken options.
+ pipelines Pipelines
+ route Dispatcher
+}
+
+// MatchPipeline locates the pipeline associated with the job.
+func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error) {
+ const op = errors.Op("config_match_pipeline")
+ opt := c.route.match(job)
+
+ pipe := ""
+ if job.Options != nil {
+ pipe = job.Options.Pipeline
+ }
+
+ if pipe == "" && opt != nil {
+ pipe = opt.Pipeline
+ }
+
+ if pipe == "" {
+ return nil, nil, errors.E(op, errors.Errorf("unable to locate pipeline for `%s`", job.Job))
+ }
+
+ if p := c.pipelines.Get(pipe); p != nil {
+ return p, opt, nil
+ }
+
+ return nil, nil, errors.E(op, errors.Errorf("undefined pipeline `%s`", pipe))
+}
diff --git a/plugins/jobs/dispatcher.go b/plugins/jobs/dispatcher.go
new file mode 100644
index 00000000..9fde8fac
--- /dev/null
+++ b/plugins/jobs/dispatcher.go
@@ -0,0 +1,47 @@
+package jobs
+
+import (
+ "strings"
+)
+
+var separators = []string{"/", "-", "\\"}
+
+// Dispatcher provides ability to automatically locate the pipeline for the specific job
+// and update job options (if none set).
+type Dispatcher map[string]*Options
+
+// pre-compile patterns
+func initDispatcher(routes map[string]*Options) Dispatcher {
+ dispatcher := make(Dispatcher)
+ for pattern, opts := range routes {
+ pattern = strings.ToLower(pattern)
+ pattern = strings.Trim(pattern, "-.*")
+
+ for _, s := range separators {
+ pattern = strings.Replace(pattern, s, ".", -1)
+ }
+
+ dispatcher[pattern] = opts
+ }
+
+ return dispatcher
+}
+
+// match clarifies target job pipeline and other job options. Can return nil.
+func (dispatcher Dispatcher) match(job *Job) (found *Options) {
+ var best = 0
+
+ jobName := strings.ToLower(job.Job)
+ for pattern, opts := range dispatcher {
+ if strings.HasPrefix(jobName, pattern) && len(pattern) > best {
+ found = opts
+ best = len(pattern)
+ }
+ }
+
+ if best == 0 {
+ return nil
+ }
+
+ return found
+}
diff --git a/plugins/jobs/dispatcher_test.go b/plugins/jobs/dispatcher_test.go
new file mode 100644
index 00000000..59e3fd4e
--- /dev/null
+++ b/plugins/jobs/dispatcher_test.go
@@ -0,0 +1,53 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_Map_All(t *testing.T) {
+ m := initDispatcher(map[string]*Options{"default": {Pipeline: "default"}})
+ assert.Equal(t, "default", m.match(&Job{Job: "default"}).Pipeline)
+}
+
+func Test_Map_Miss(t *testing.T) {
+ m := initDispatcher(map[string]*Options{"some.*": {Pipeline: "default"}})
+
+ assert.Nil(t, m.match(&Job{Job: "miss"}))
+}
+
+func Test_Map_Best(t *testing.T) {
+ m := initDispatcher(map[string]*Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestUpper(t *testing.T) {
+ m := initDispatcher(map[string]*Options{
+ "some.*": {Pipeline: "default"},
+ "some.Other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.OTHER"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "Some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestReversed(t *testing.T) {
+ m := initDispatcher(map[string]*Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
+}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
index 871df916..ee923d29 100644
--- a/plugins/jobs/doc/jobs_arch.drawio
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-06-16T08:42:30.343Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="TfWMGqp6GgpLWRWY4Wkn" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vtbd5s4EP41Pqf7EB/E3Y9x7Dbdbbdusnva7psAYWgBsUJO7Pz6lUBchRMaY7vp1j5J0OiCNPPNp9FAJtpVvH1DYBq8xx6KJqribSfaYqKqwFIA+8MlOyEBql5I1iT0hKwW3IYPSAgVId2EHspaDSnGEQ3TttDFSYJc2pJBQvB9u5mPo/ZdU7hGkuDWhZEs/RR6NCikqqIodcU1CtcB7dbEsGwtBFkAPXzfEGnLiXZFMKbFVby9QhFXX6mYot/rPbXVzAhK6JAOn5Nkew1wjO4eaGK9+Yj+UdyL0hx3MNqIJYvZ0l2pA4I3iYf4KMpEm98HIUW3KXR57T0zO5MFNI5YCbBLP4yiKxxhkvfVPIhs32XyjBL8DTVqTNdGjs9qxAQQoWi7d2mgUhjDGmKLoGTHmogO+kzouISZXeLnvmE0W8iChr1AaR0ogLKuBq9VyS6ENr9Hs5IekcegJYqY0ACvcQKjZS2dtzVdt3mHcSr0+xVRuhN+AjcUt7XP9EV2n3n/qVEWvzTrFlsxeFHaiVIxVz7Bxy3A1oM3xEWPrVssnEKyRvSRhntMSlAEaXjXnkifdUTXFQ7ZFCsoGIbRhoJpdSxcTEx0a/qLNFIHVOasM1KhC2mkS0LgrtEs5Q0yCVDVmp+PMe0sGNuGtAExVvpSYopd1wDjhV0TbefFJRgIS/1IsOyASTeHwXIsqACJ529WbCTlgv2s/r69Hpf1DWR7eh/r26qjmeZIrG93dGqoMusDvYf1tWOR/uysDtl0x8o5v88hldM5pD50n1D6UXCgR2pd9FRwOpVH/mLvI4DFPA5967OpPTMNA1iqDjRr1oaOBmbTWeNjnxRIcgj/HlI34MT+drV89/bP5ajkjgCjd6uP3GempcGxyL0b0ut95K72kPvRInrrnP4KWuRu/fDsbgx02EP99SCL2i+OgUHLnLV1fxyDHnqsO8ighkSFE9WM2HTnacvO5r8bnnaZx2w1YTLRLlmtkm7Z71xpSiG/oNykvE5v1DFOoxcwCtein8tUiUi72kMuJmy9WLThkCFRmKD61uxqLf7mE3RKwe/YYWq5vKVk49INQWUDphCn24nJ0q4sIF3Js9YeIZ8WlTav7E6bzTLvscjHJmGyrhZS3X8FdxGG3pPtPqRcU1nVrir3LbLjsHyTCV0YXRYGWeQmmwvzLIpFzDFr5Ud58o1vZnxPwwkVHgpUUX4N4zDinnONojvER310H/QN/u1NbeUfMWpDXnzG2R8ttZudUHr2R7Nnf+xmMUZzPvMncL4Sej+8/63CFBVresK5Fox4d0+2uqQUxSmtfTCPVfcv8hhLuuEobE+3MQtHUuup5vVXGCO8oc+Z1C+yKihHSqUOJSsAjsVWAEjmOVs4PziaV8+WPVUGhn+Hpk8Ps6kibUE3zHPRND8Qv/qKnd9GPQr7tovc3qdbjm3ohjKO96hm9yg8G3gUPlqeE6iSoglim2XSjN46iuZ7c1ubJfGJTVybd/kxDj2v8DuUhQ/QyYfiWhWPVNi4xnxiLPhYzNUywZGSQRKcoHFsoUlpCVu2xeyUWQkgp39SFhtMVJ7dxz+zLYyuX2imbIu+h77Hs4V8/mzEaRcsLoAxZxQRHbD4I2VKEzGZsQi56n3OPdZ8Yi3+L1br8SBwWhc6a2bvhWXigTkwEgD2gaHAs1LxjH3PmIoH8hk4RuyOXNc1Eexj5ReYlVfPH4qc95nrC3Nee6DzNt5QO6HzgrM+RyuV03DeN4hPfs49iieZfIJjrr0A5apYhxnbdRFPcDp5E54+uQuh8PZp0e/VyGcO31f7zxye6ZjGsRwd9O3SfY7efa9lvBfq5MPdapMFr+potzjfKcxX2e0bdhtR+2d6n1EZqn39aNqXXyDao96fI0q1uvjXZQsYpwxSVfnILakebygPMa6ql6EVOa/Hvq/5bedrAr0Q1XVCeR3AV817Ne3BLKg8qbTqO+igaMWsVKTgFw6mlPGmbPZ2+rNCR9P7ejKf5c7ObwmztFioH275PArAILK8QwVucowEMOUd4u2av6U+hfeZPt1k+b3GgEqZSaxePtZkqFi2DBXLPhZUNJkpr1dM8OnDzR/LGwk2pYJSgl2UZU+TogPdb+ucRj8UiNuzVZ0kPWbqHVe1dXtqDKTL789EsmL9zn4RetT/+6At/wM=</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-06-21T13:57:33.772Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="eDaFKmf6xAQVYDDexY3m" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvrc5s4EP9rPONmJh7e4I9x7CTttVc3uZu2900GYWgBcUL4kb/+JBBPYcdNsJ32mkxra/VA+/jtrlZkoF6Hm1sMYu8DcmAwUCRnM1CnA0WRVVMz6ScjbXOSJUtSTlli3+HDKsKD/wg5sRiW+g5MGgMJQgHx4ybRRlEEbdKgAYzRujnMRUHzqTFYQoHwYINApH72HeIVu5MKPljPHfSXHhG6QlCM54TEAw5a10jqbKBeY4RI/i3cXMOASbAQTT7vZkdvuTcMI3LIhC9RtLmTUQhXjyQybz/BfyT7UtHyZVYgSDnTfLdkW0gBozRyIFtFGqiTtecT+BADm/WuqeYpzSNhQFsy/er6QXCNAoSzuaoDoOXalJ4QjL7DWo9hW3Dh0h6RD87aCmICNzUS5+sWUiYI3tIhvFe1uIy5oY3NwoLWldqUYpBXV1ihHcBNZVkuXomSfuHS/BHJCnKEDjUu3kSYeGiJIhDMKuqkKelqzHuEYi7fb5CQLUcKSAlqSp+KEG+/sPkjvWh+rfdNN3zxvLXlrZ06SFCKbbiPTc4nAXgJyZ6BfEEmhL0qxTAAxF81EdilHT51jny659IUNE1vmoJutjSc75RPq+NFWKllVPq4tVIuHGGlK4zBtjYsZgMSwaBKnp9vY+pZbGzjk5qJ0dbXwqbo98rAWGNbt7aT2qV8oFlqpzLLljGpxmFm2ZepFD618vP3c7qSdEn/zf9+uKOL5h90Q9vI7jcI6NBytK4gYCkL1TB6CgJmS8SaIgYBWesIAuqxYsD4rPiso7PE6o/hUzoaPrVDw4Z0GoAqbespc4pTAfS3M3+5sRgn8ubqeGSNDV2XTUWjB51x03SUdlJ5ZNMRc/gPgNge8+xv57P3b/+c9erOoUwdutnlzseGqYK+3Hk7p1e73LnS4c6PltKb50So3HDn5mvz5/qBEO0doS/SqPXT+Vy5oc5Ku2dTaO/nuhcpVBdc4UAxArr/SdzQs/Fvyuouk5Cy50cD9Yr2SvGG/p8JTcrpl4SplPVptT7qwMglCPwln2dT2ULc7HagjTDlF/ExzGRw4EewejT9tuSf2QYXBeEdWlCxXD0QnNokxbAYQAWyaE+itLhN83Cb8izeA+iSvNNine1t011mM6bZ2tiPliUj5fPnYBsg4Dw57mPMJJWU48p2F5MtwLKI4tsguMoVMs1UNuHqmeZMTBAd5QZZ9Y0FMxbTUEQ4QmWFt29A6AcMOXcwWEG26t446Orst7O2lf3wVWv0/Kef+GjI7fKE1BEfjY742C5j9AY+4xcAX2F6rx5/cz+GOU9PgGtKHe/2yVFXhMAwJhUGs1x1N5PHYOmeWWFzu7VdLASxnmpff/khRCl5zqZ+O6us1xJqqYc6K1k+lreSZUE9Z0vnD87mlVOVT6UD07/e66cv06kkhKB7ilw4yg7Ew29o8abXo7Br2dDuvN5aWLqm79XB4eiR9fZReHzgUfholU1ZEQSNIQ2WUT17awmaxeamNAvHx4O4Omn7x9B3nBx3MPEfwSJbikmV36nQdfXJQJ+ytSjUEu4jBYVEKIL96EIRyhKWqIvxKasSslj+iWluMFBYeR/9yrrQ2rhQDFEXXbe+x9OFeP6s5WmXNC8AIfMoPDug+UdMhcZzMn3qM9G7zPeYk4E5/b9orQNB8mkhdNbK3uuuvRc19acLQVbfqcCziu9yO+Yd+95GPPWGkD6RibuC/i4//BPW4eXzJx/nvVd93XC1DoRrobEzw1U68V1ZIZ8aXG8h2+6EYYgVklyMQiZAD2bML/2ERlbIipiLbAgrkax8wPE9yucNez5XuK7Sfa5wjIWhHwvaUlck7oJ2++WV/t6aEw9w8zTxhlVGm5/hJIpO+via3nqU/nleWrTGh0pfO5r0xbeEdoj318hEjbb9a6IG9FMmoop4rBZEj1LCkorr8p1nSazd0d8b9tjJEgPHh1UfF17L4MvhnZJ2QOKVSCq0+h4sYDCnWsrL7NMFIoT6TVHtzRJnaR119HVUN4tYzh4Jkjhn1PU3bB+5wUA8W8HcbjIb8UDMJoSbJXsbfQTWiTZKk+xZfZiKJDdNxVRFUzEt0VQKWv+mooqe8m5OCZ8/3v8xuxfMphBQjJENk+Rpp7gA9vdl5kY/5ha3I1SdpASmqy2ompo10g90lz1UGx8N11mr1o32Pbz6eptu5ub47lLUwOnuuzousvLSQiC2qrJCq/7Qau240SgvvajLSdIwS5KGeS705kx3Y52iEi5z9skxGyVI8YEAMrwoSjYszxhe0IQjGbGOPP+AGKMG3/1cQR2Loyx7uqiKUIyFi3d5DjUsLgOf4muHK8GZW677DPHqqsvJ7AB/h4vYUxIftxxC+QJnzR2UrwQ33MEz0ifarP5QJz+KVH/zpM7+Aw==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
new file mode 100644
index 00000000..060326c8
--- /dev/null
+++ b/plugins/jobs/interface.go
@@ -0,0 +1,8 @@
+package jobs
+
+
+// todo naming
+type Consumer interface {
+ Push()
+ Stat()
+}
diff --git a/plugins/jobs/job.go b/plugins/jobs/job.go
new file mode 100644
index 00000000..8458b25b
--- /dev/null
+++ b/plugins/jobs/job.go
@@ -0,0 +1,41 @@
+package jobs
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+//// Handler handles job execution.
+//type Handler func(id string, j *Job) error
+//
+//// ErrorHandler handles job execution errors.
+//type ErrorHandler func(id string, j *Job, err error)
+
+// Job carries information about single job.
+type Job struct {
+ // Job contains name of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Body packs job payload into binary payload.
+func (j *Job) Body() []byte {
+ return utils.AsBytes(j.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+func (j *Job) Context(id string) []byte {
+ ctx, _ := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ }{ID: id, Job: j.Job},
+ )
+
+ return ctx
+}
diff --git a/plugins/jobs/job_options.go b/plugins/jobs/job_options.go
new file mode 100644
index 00000000..d4c6f0d2
--- /dev/null
+++ b/plugins/jobs/job_options.go
@@ -0,0 +1,70 @@
+package jobs
+
+import "time"
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int `json:"delay,omitempty"`
+
+ // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
+ // Minimum valuable value is 2.
+ Attempts int `json:"maxAttempts,omitempty"`
+
+ // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
+ RetryDelay int `json:"retryDelay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int `json:"timeout,omitempty"`
+}
+
+// Merge merges job options.
+func (o *Options) Merge(from *Options) {
+ if o.Pipeline == "" {
+ o.Pipeline = from.Pipeline
+ }
+
+ if o.Attempts == 0 {
+ o.Attempts = from.Attempts
+ }
+
+ if o.Timeout == 0 {
+ o.Timeout = from.Timeout
+ }
+
+ if o.RetryDelay == 0 {
+ o.RetryDelay = from.RetryDelay
+ }
+
+ if o.Delay == 0 {
+ o.Delay = from.Delay
+ }
+}
+
+// CanRetry must return true if broker is allowed to re-run the job.
+func (o *Options) CanRetry(attempt int) bool {
+ // Attempts 1 and 0 has identical effect
+ return o.Attempts > (attempt + 1)
+}
+
+// RetryDuration returns retry delay duration in a form of time.Duration.
+func (o *Options) RetryDuration() time.Duration {
+ return time.Second * time.Duration(o.RetryDelay)
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
diff --git a/plugins/jobs/job_options_test.go b/plugins/jobs/job_options_test.go
new file mode 100644
index 00000000..8caaa935
--- /dev/null
+++ b/plugins/jobs/job_options_test.go
@@ -0,0 +1,109 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestOptions_CanRetry(t *testing.T) {
+ opts := &Options{Attempts: 0}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_SameValue(t *testing.T) {
+ opts := &Options{Attempts: 1}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_Value(t *testing.T) {
+ opts := &Options{Attempts: 2}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_CanRetry_Value3(t *testing.T) {
+ opts := &Options{Attempts: 3}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.True(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_RetryDuration(t *testing.T) {
+ opts := &Options{RetryDelay: 0}
+ assert.Equal(t, time.Duration(0), opts.RetryDuration())
+}
+
+func TestOptions_RetryDuration2(t *testing.T) {
+ opts := &Options{RetryDelay: 1}
+ assert.Equal(t, time.Second, opts.RetryDuration())
+}
+
+func TestOptions_DelayDuration(t *testing.T) {
+ opts := &Options{Delay: 0}
+ assert.Equal(t, time.Duration(0), opts.DelayDuration())
+}
+
+func TestOptions_DelayDuration2(t *testing.T) {
+ opts := &Options{Delay: 1}
+ assert.Equal(t, time.Second, opts.DelayDuration())
+}
+
+func TestOptions_TimeoutDuration(t *testing.T) {
+ opts := &Options{Timeout: 0}
+ assert.Equal(t, time.Minute*30, opts.TimeoutDuration())
+}
+
+func TestOptions_TimeoutDuration2(t *testing.T) {
+ opts := &Options{Timeout: 1}
+ assert.Equal(t, time.Second, opts.TimeoutDuration())
+}
+
+func TestOptions_Merge(t *testing.T) {
+ opts := &Options{}
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "pipeline", opts.Pipeline)
+ assert.Equal(t, 1, opts.Attempts)
+ assert.Equal(t, 2, opts.Delay)
+ assert.Equal(t, 1, opts.Timeout)
+ assert.Equal(t, 1, opts.RetryDelay)
+}
+
+func TestOptions_MergeKeepOriginal(t *testing.T) {
+ opts := &Options{
+ Pipeline: "default",
+ Delay: 10,
+ Timeout: 10,
+ Attempts: 10,
+ RetryDelay: 10,
+ }
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "default", opts.Pipeline)
+ assert.Equal(t, 10, opts.Attempts)
+ assert.Equal(t, 10, opts.Delay)
+ assert.Equal(t, 10, opts.Timeout)
+ assert.Equal(t, 10, opts.RetryDelay)
+}
diff --git a/plugins/jobs/job_test.go b/plugins/jobs/job_test.go
new file mode 100644
index 00000000..e1938eca
--- /dev/null
+++ b/plugins/jobs/job_test.go
@@ -0,0 +1,18 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestJob_Body(t *testing.T) {
+ j := &Job{Payload: "hello"}
+
+ assert.Equal(t, []byte("hello"), j.Body())
+}
+
+func TestJob_Context(t *testing.T) {
+ j := &Job{Job: "job"}
+
+ assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id"))
+}
diff --git a/plugins/jobs/oooold/broker/amqp/config_test.go b/plugins/jobs/oooold/broker/amqp/config_test.go
deleted file mode 100644
index 1abbb55d..00000000
--- a/plugins/jobs/oooold/broker/amqp/config_test.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package amqp
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type mockCfg struct{ cfg string }
-
-func (cfg *mockCfg) Get(name string) service.Config { return nil }
-func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func Test_Config_Hydrate_Error(t *testing.T) {
- cfg := &mockCfg{`{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error2(t *testing.T) {
- cfg := &mockCfg{`{"addr":""}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
diff --git a/plugins/jobs/oooold/rpc.go b/plugins/jobs/oooold/rpc.go
index 42730a68..cc61fb7d 100644
--- a/plugins/jobs/oooold/rpc.go
+++ b/plugins/jobs/oooold/rpc.go
@@ -2,7 +2,6 @@ package oooold
import (
"fmt"
- "github.com/spiral/roadrunner/util"
)
type rpcServer struct{ svc *Service }
diff --git a/plugins/jobs/oooold/service.go b/plugins/jobs/oooold/service.go
index 4244ed1a..7cfcff31 100644
--- a/plugins/jobs/oooold/service.go
+++ b/plugins/jobs/oooold/service.go
@@ -91,27 +91,27 @@ func (svc *Service) Init(
}
// run all brokers in nested container
- svc.brokers = service.NewContainer(log)
- for name, b := range svc.Brokers {
- svc.brokers.Register(name, b)
- if ep, ok := b.(EventProvider); ok {
- ep.Listen(svc.throw)
- }
- }
+ //svc.brokers = service.NewContainer(log)
+ //for name, b := range svc.Brokers {
+ // svc.brokers.Register(name, b)
+ // if ep, ok := b.(EventProvider); ok {
+ // ep.Listen(svc.throw)
+ // }
+ //}
// init all broker configs
- if err := svc.brokers.Init(svc.cfg); err != nil {
- return false, err
- }
+ //if err := svc.brokers.Init(svc.cfg); err != nil {
+ // return false, err
+ //}
// register all pipelines (per broker)
- for name, b := range svc.Brokers {
- for _, pipe := range svc.cfg.pipelines.Broker(name) {
- if err := b.Register(pipe); err != nil {
- return false, err
- }
- }
- }
+ //for name, b := range svc.Brokers {
+ // for _, pipe := range svc.cfg.pipelines.Broker(name) {
+ // if err := b.Register(pipe); err != nil {
+ // return false, err
+ // }
+ // }
+ //}
return true, nil
}
diff --git a/plugins/jobs/pipeline.go b/plugins/jobs/pipeline.go
new file mode 100644
index 00000000..bfd2e18c
--- /dev/null
+++ b/plugins/jobs/pipeline.go
@@ -0,0 +1,172 @@
+package jobs
+
+import (
+ "time"
+
+ "github.com/spiral/errors"
+)
+
+// Pipelines is list of Pipeline.
+
+type Pipelines []*Pipeline
+
+func initPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
+ const op = errors.Op("pipeline_init")
+ out := make(Pipelines, 0)
+
+ for name, pipe := range pipes {
+ if pipe.Broker() == "" {
+ return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker"))
+ }
+
+ p := pipe.With("name", name)
+ out = append(out, &p)
+ }
+
+ return out, nil
+}
+
+// Reverse returns pipelines in reversed order.
+func (ps Pipelines) Reverse() Pipelines {
+ out := make(Pipelines, len(ps))
+
+ for i, p := range ps {
+ out[len(ps)-i-1] = p
+ }
+
+ return out
+}
+
+// Broker return pipelines associated with specific broker.
+func (ps Pipelines) Broker(broker string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, p := range ps {
+ if p.Broker() != broker {
+ continue
+ }
+
+ out = append(out, p)
+ }
+
+ return out
+}
+
+// Names returns only pipelines with specified names.
+func (ps Pipelines) Names(only ...string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, name := range only {
+ for _, p := range ps {
+ if p.Name() == name {
+ out = append(out, p)
+ }
+ }
+ }
+
+ return out
+}
+
+// Get returns pipeline by it'svc name.
+func (ps Pipelines) Get(name string) *Pipeline {
+ // possibly optimize
+ for _, p := range ps {
+ if p.Name() == name {
+ return p
+ }
+ }
+
+ return nil
+}
+
+// Pipeline defines pipeline options.
+type Pipeline map[string]interface{}
+
+// With pipeline value. Immutable.
+func (p Pipeline) With(name string, value interface{}) Pipeline {
+ out := make(map[string]interface{})
+ for k, v := range p {
+ out[k] = v
+ }
+ out[name] = value
+
+ return out
+}
+
+// Name returns pipeline name.
+func (p Pipeline) Name() string {
+ return p.String("name", "")
+}
+
+// Broker associated with the pipeline.
+func (p Pipeline) Broker() string {
+ return p.String("broker", "")
+}
+
+// Has checks if value presented in pipeline.
+func (p Pipeline) Has(name string) bool {
+ if _, ok := p[name]; ok {
+ return true
+ }
+
+ return false
+}
+
+// Map must return nested map value or empty config.
+func (p Pipeline) Map(name string) Pipeline {
+ out := make(map[string]interface{})
+
+ if value, ok := p[name]; ok {
+ if m, ok := value.(map[string]interface{}); ok {
+ for k, v := range m {
+ out[k] = v
+ }
+ }
+ }
+
+ return out
+}
+
+// Bool must return option value as string or return default value.
+func (p Pipeline) Bool(name string, d bool) bool {
+ if value, ok := p[name]; ok {
+ if b, ok := value.(bool); ok {
+ return b
+ }
+ }
+
+ return d
+}
+
+// String must return option value as string or return default value.
+func (p Pipeline) String(name string, d string) string {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(string); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Integer must return option value as string or return default value.
+func (p Pipeline) Integer(name string, d int) int {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Duration must return option value as time.Duration (seconds) or return default value.
+func (p Pipeline) Duration(name string, d time.Duration) time.Duration {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return time.Second * time.Duration(str)
+ }
+ }
+
+ return d
+}
diff --git a/plugins/jobs/pipeline_test.go b/plugins/jobs/pipeline_test.go
new file mode 100644
index 00000000..b80e75d0
--- /dev/null
+++ b/plugins/jobs/pipeline_test.go
@@ -0,0 +1,89 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestPipeline_Map(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0))
+ assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0))
+}
+
+func TestPipeline_MapString(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}}
+
+ assert.Equal(t, "default", pipe.Map("options").String("alias", ""))
+ assert.Equal(t, "", pipe.Map("other").String("alias", ""))
+}
+
+func TestPipeline_Bool(t *testing.T) {
+ pipe := Pipeline{"value": true}
+
+ assert.Equal(t, true, pipe.Bool("value", false))
+ assert.Equal(t, true, pipe.Bool("other", true))
+}
+
+func TestPipeline_String(t *testing.T) {
+ pipe := Pipeline{"value": "value"}
+
+ assert.Equal(t, "value", pipe.String("value", ""))
+ assert.Equal(t, "value", pipe.String("other", "value"))
+}
+
+func TestPipeline_Integer(t *testing.T) {
+ pipe := Pipeline{"value": 1}
+
+ assert.Equal(t, 1, pipe.Integer("value", 0))
+ assert.Equal(t, 1, pipe.Integer("other", 1))
+}
+
+func TestPipeline_Duration(t *testing.T) {
+ pipe := Pipeline{"value": 1}
+
+ assert.Equal(t, time.Second, pipe.Duration("value", 0))
+ assert.Equal(t, time.Second, pipe.Duration("other", time.Second))
+}
+
+func TestPipeline_Has(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, true, pipe.Has("options"))
+ assert.Equal(t, false, pipe.Has("other"))
+}
+
+func TestPipeline_FilterBroker(t *testing.T) {
+ pipes := Pipelines{
+ &Pipeline{"name": "first", "broker": "a"},
+ &Pipeline{"name": "second", "broker": "a"},
+ &Pipeline{"name": "third", "broker": "b"},
+ &Pipeline{"name": "forth", "broker": "b"},
+ }
+
+ filtered := pipes.Names("first", "third")
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "a", filtered[0].Broker())
+ assert.Equal(t, "b", filtered[1].Broker())
+
+ filtered = pipes.Names("first", "third").Reverse()
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "a", filtered[1].Broker())
+ assert.Equal(t, "b", filtered[0].Broker())
+
+ filtered = pipes.Broker("a")
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "first", filtered[0].Name())
+ assert.Equal(t, "second", filtered[1].Name())
+
+ filtered = pipes.Broker("a").Reverse()
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "first", filtered[1].Name())
+ assert.Equal(t, "second", filtered[0].Name())
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index e708d0ca..42203871 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -1,6 +1,7 @@
package jobs
import (
+ endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -11,11 +12,25 @@ const (
)
type Plugin struct {
+ cfg *Config
+ log logger.Logger
+
+ consumers map[string]Consumer
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("jobs_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ p.consumers = make(map[string]Consumer)
+ p.log = log
return nil
}
@@ -29,8 +44,22 @@ func (p *Plugin) Stop() error {
return nil
}
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectMQBrokers,
+ }
+}
+
+func (p *Plugin) CollectMQBrokers(name endure.Named, c Consumer) {
+ p.consumers[name.Name()] = c
+}
+
func (p *Plugin) Available() {}
func (p *Plugin) Name() string {
return PluginName
}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{log: p.log}
+}
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
new file mode 100644
index 00000000..5a0bbf4e
--- /dev/null
+++ b/plugins/jobs/rpc.go
@@ -0,0 +1,8 @@
+package jobs
+
+import "github.com/spiral/roadrunner/v2/plugins/logger"
+
+type rpc struct {
+ log logger.Logger
+}
+