summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-01 21:28:23 +0300
committerValery Piashchynski <[email protected]>2021-07-01 21:28:23 +0300
commita4b9d0c47494431c30f357c3616e53baf411360f (patch)
tree281f19cb031ff66f6360e11ec610dbc0a2fd7920
parentcd64c6c44ad463963705a22af2bd49059987949c (diff)
- Remove Dispatcher
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--plugins/jobs/config.go46
-rw-r--r--plugins/jobs/dispatcher/dispatcher.go49
-rw-r--r--plugins/jobs/dispatcher/dispatcher_test.go55
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio2
-rw-r--r--plugins/jobs/plugin.go28
-rw-r--r--plugins/jobs/pq_plugin/plugin.go34
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml71
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go2
8 files changed, 50 insertions, 237 deletions
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index bb042ec9..87e36ecb 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -1,11 +1,8 @@
package jobs
import (
- "github.com/spiral/errors"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/plugins/jobs/dispatcher"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
// Config defines settings for job broker, workers and job-pipeline mapping.
@@ -14,58 +11,17 @@ type Config struct {
// Workers *roadrunner.ServerConfig
poolCfg *poolImpl.Config
- // Dispatch defines where and how to match jobs.
- Dispatch map[string]*structs.Options
-
// Pipelines defines mapping between PHP job pipeline and associated job broker.
Pipelines map[string]*pipeline.Pipeline
// Consuming specifies names of pipelines to be consumed on service start.
Consume []string
-
- // parent config for broken options.
- pipelines pipeline.Pipelines
- route dispatcher.Dispatcher
}
-func (c *Config) InitDefaults() error {
- const op = errors.Op("config_init_defaults")
- var err error
- c.pipelines, err = pipeline.InitPipelines(c.Pipelines)
- if err != nil {
- return errors.E(op, err)
- }
-
+func (c *Config) InitDefaults() {
if c.poolCfg == nil {
c.poolCfg = &poolImpl.Config{}
}
c.poolCfg.InitDefaults()
-
- return nil
-}
-
-// MatchPipeline locates the pipeline associated with the job.
-func (c *Config) MatchPipeline(job *structs.Job) (*pipeline.Pipeline, *structs.Options, error) {
- const op = errors.Op("config_match_pipeline")
- opt := c.route.Match(job)
-
- pipe := ""
- if job.Options != nil {
- pipe = job.Options.Pipeline
- }
-
- if pipe == "" && opt != nil {
- pipe = opt.Pipeline
- }
-
- if pipe == "" {
- return nil, nil, errors.E(op, errors.Errorf("unable to locate pipeline for `%s`", job.Job))
- }
-
- if p := c.pipelines.Get(pipe); p != nil {
- return p, opt, nil
- }
-
- return nil, nil, errors.E(op, errors.Errorf("undefined pipeline `%s`", pipe))
}
diff --git a/plugins/jobs/dispatcher/dispatcher.go b/plugins/jobs/dispatcher/dispatcher.go
deleted file mode 100644
index e73e7b74..00000000
--- a/plugins/jobs/dispatcher/dispatcher.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package dispatcher
-
-import (
- "strings"
-
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
-)
-
-var separators = []string{"/", "-", "\\"}
-
-// Dispatcher provides ability to automatically locate the pipeline for the specific job
-// and update job options (if none set).
-type Dispatcher map[string]*structs.Options
-
-// pre-compile patterns
-func initDispatcher(routes map[string]*structs.Options) Dispatcher {
- dispatcher := make(Dispatcher)
- for pattern, opts := range routes {
- pattern = strings.ToLower(pattern)
- pattern = strings.Trim(pattern, "-.*")
-
- for _, s := range separators {
- pattern = strings.ReplaceAll(pattern, s, ".")
- }
-
- dispatcher[pattern] = opts
- }
-
- return dispatcher
-}
-
-// Match clarifies target job pipeline and other job options. Can return nil.
-func (dispatcher Dispatcher) Match(job *structs.Job) (found *structs.Options) {
- var best = 0
-
- jobName := strings.ToLower(job.Job)
- for pattern, opts := range dispatcher {
- if strings.HasPrefix(jobName, pattern) && len(pattern) > best {
- found = opts
- best = len(pattern)
- }
- }
-
- if best == 0 {
- return nil
- }
-
- return found
-}
diff --git a/plugins/jobs/dispatcher/dispatcher_test.go b/plugins/jobs/dispatcher/dispatcher_test.go
deleted file mode 100644
index e584bda8..00000000
--- a/plugins/jobs/dispatcher/dispatcher_test.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package dispatcher
-
-import (
- "testing"
-
- "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
- "github.com/stretchr/testify/assert"
-)
-
-func Test_Map_All(t *testing.T) {
- m := initDispatcher(map[string]*structs.Options{"default": {Pipeline: "default"}})
- assert.Equal(t, "default", m.Match(&structs.Job{Job: "default"}).Pipeline)
-}
-
-func Test_Map_Miss(t *testing.T) {
- m := initDispatcher(map[string]*structs.Options{"some.*": {Pipeline: "default"}})
-
- assert.Nil(t, m.Match(&structs.Job{Job: "miss"}))
-}
-
-func Test_Map_Best(t *testing.T) {
- m := initDispatcher(map[string]*structs.Options{
- "some.*": {Pipeline: "default"},
- "some.other.*": {Pipeline: "other"},
- })
-
- assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
- assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
- assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline)
- assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline)
-}
-
-func Test_Map_BestUpper(t *testing.T) {
- m := initDispatcher(map[string]*structs.Options{
- "some.*": {Pipeline: "default"},
- "some.Other.*": {Pipeline: "other"},
- })
-
- assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
- assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
- assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.OTHER"}).Pipeline)
- assert.Equal(t, "other", m.Match(&structs.Job{Job: "Some.other.job"}).Pipeline)
-}
-
-func Test_Map_BestReversed(t *testing.T) {
- m := initDispatcher(map[string]*structs.Options{
- "some.*": {Pipeline: "default"},
- "some.other.*": {Pipeline: "other"},
- })
-
- assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline)
- assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline)
- assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
- assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
-}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
index be941255..4161d7ca 100644
--- a/plugins/jobs/doc/jobs_arch.drawio
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-06-29T17:21:10.029Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="Bl_lIai7C62ty2EayK1b" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1bc9o4FP41zKSdSca2fH3MtU03aWhots2+GSzAjbGobZLQX7+SLINtCXDAMtDA7jQg32R9536OpBY4H71+itzx8BZ5MGhpivfaAhctTVMdw8Z/SMuUtajASlsGke+xtnlDx/8DWaPCWie+B+PCiQlCQeKPi409FIawlxTa3ChCL8XT+igoPnXsDiDX0Om5Ad/6w/eSYdY7/Jkf+Qz9wTDhDo3c7HzWEA9dD73kmsBlC5xHCCXpt9HrOQzIAGZDk153teDorG8RDJMqF3Tasfl4fjt1vyB9qA6+fr18io511rlnN5iwl76/u/uOW77cnXXwn/bNw6frr+wNkmk2MhGahB4kd1Za4Oxl6CewM3Z75OgLJgbcNkxGAf6l4q99FCZX7sgPCB10rvCdblGI8AE38AchbovSETwL3C4M2ij2Ex+R9gD2SfMzjBIfo3JTOpygce7oKbtZFyUJGpHH+kFwjgIU0S6Dfh+avR5uj5MIPcHcEc9yugp5D35I2SiTZ8DXXBMb4k8QjWASTfEp7KgONO3ESK/KyN42WcPLnIg0U0/bhnny0R1GvIxwB7P7z4HFXxi2Ypx/huHrZxVf9/wnCa1P3+B/Su9Y0zmcN8S0OLieC+2+cHDNng27fUYFuXaFfpZRRx1oZJKEQWEbJo+ErQiQyLi2fiS4cYceFjrsJ4qSIRqg0A0u561nRWTm59wgwgEUj18wSaZMgrqTBBXRwiMYTX+S6zFtsp+P+WMXr+zm6a8p+/V2bGI0iXpw2fuzAUjcaACTJSeyG5LRWQp1BAM38Z+LIlsEG7u0jXzc5zmJWEVutVSzeIu0o+yqvHzlblSkNQuUbpQODXej0yhyp7nTxuSEmKOz2RuvT3pgK6T36ic5ysO/HjNSw9/ndEd+TPNEuBvkqlakVr0pai0SmaFUo9a6SCiTwDlzoY3vpBwTU+Gh8xnfNP2DOzQNe/XqGAPani7SMbbWBaa5LR2jlCAxeR2j6gIdo9uSVIyzVT7Pc/mM59/G50rzfK5X1UpKM4wOylSlNczoB2UhkYjMhrSF6ZzYjmkYqoVNYWA5BZICZrMkxbsgt27SGxLNcd2+vLn+elmruoAqVhiWSF04pgXcramLkkuii9SFJlAXAEhSF9Y2OV3N8/mJtTf6wqjI6rVz+kZQ23sn1NUCznPYdw/p2v3VjZA2OGHb0syARNPGBQIwf09IAPJshF/PD1vgFB9Vxq/4XzpoStp+TENt5JieO4YlXnLMYnjkWA+PLYyKhz3YQ5GbBuzIOYSWosAP4fzR+NuA/aUd7GYNX1AXD8tpJ4kmvWQSwewEPCDd8kW4bVxuG0bllrXePY1CkoM2OVjuNu4lveKC3jvyw8HsRWbPb7vTALneyvPuxmSk4tl5s9+ilyxxcjkOmkZHsxArC6UifFY/oGFooi4ZxzDWVbVlHLRY0/YN8p9I05r0w2va9FOPQjX1okI1dYFCNQUK1ZEV4jP/At7LKG/n2a/tj2H6Tit46wLL3enKs06TBI7GyZwFqTG8+CVlvNI9ocJid3O96HLD2lS/vvsjiCbJOp06yCp61LbWlFWqKktYqSqHztbM/8rWv7ZtTz+Do/m48GZgC9K9mKPhCfXEj36h7odaffC+3YPinGvXNnRD4bmtER9ctUvBNVDVB5fGhRoHTASx0g3zRmAJGKLji6OfCVBmDPC58ZHveSkDw9j/43bprchos6QTvq9x1jIuyL0wz8ZM1nIAhiiEkjECilXESLN4jFRREt2ShREfwBpj46OlkQQIepcg6WYRJF2xeZAyIJsBiXd8cxbiMbZI3BGRWcwuwZbPGI8mswaNC59g0ifSzTprWRfvDU5LK8KZxRwLhStak3BuNTi5p2mILL2wOmRl122drJOH0KyGU1u8fz6C+IlktOeiYpFAfwcpCbB9c2i7Kew9ZXu7IttrDeWwl7O92nD6MRueHNt/gqS3Z4QXSeisH6ERGb8hpO8+8GOs0SGJ2nbpKSQo9Oy7TE6cpNcd1ewx9fvagipVs2sauyIiNK2iiDBliQiNd2Xbk3h4NDfJU29WwVyOH5/DuUa09qTsVa2Kli4NLb5wbAEcf7mFbdtFZGzeYTKMBg1sjQ88cJCgSUKMovPZLAtB3Ab/d0UeezaIXM+H82PZoBYZZ3a6EAHPjYczjlxU/T+r818eTJ5RTWlCQjmOnNkW5JFuPE5ftO+/kn6khASjy2eY0hOlnaE7JheMXgdk+suJ+xLrJ5OYPksqCQG1QEK2xTO3ZfMkZMkqN9QAL4k/t3HDj7v7fy7vOXLKBm4coR6M49VCt+v2ngZUTN+llLhAde5msNEwi8LYdvQTfj6IWBzXEAf+Y/a9F2Bf6U+j08dPk9e25Xw+5hFrLkMpSD2mIZmA/zUPx5TiNqVfC3JQszQlFl3xZESNvKPUlvuwpWymcKi49NuycaRncaPYSdzk6GMW6iJ2z9FHbADFJ+RAag/BKEKF964naSjrjag193EevCOv8PFLatMdZenbPXwvRosFsPak6/fMJyoSWgtcsf8xMFi6Blh3ElcpRsEzjBe83QKdEFG9mxf+fBZYpC2kCnDsxxaNNl3h46LAEgnwOgxqoQQ/BEiWBkiEYyao5BOeV3tUdCOkBQkNOsNmIzdWZnLV1IoGj6Nn6dZCVkhSPHEZorkx/H1CtCMxS4YudmuCXR7PkjfvAEcwnrLis/H9RVf59fTrh/X0reP+PP9hfn/IKtz2SPo0Wkd87zzcxMZzp2MH/z3+1GL3h38jmkkoHFs5MwkXzzIVx25VxypFKgzbytMNf4WZacX1rzA1q0Sbab/XDQQvAyInDL6efu9sKADWSvQIKWyRj1qHKHG08oAroqoKSZFBIblvdUrCeuVnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTS0snwBtrFUBWAJtPEVhlqv0hADoewO9ap7TL3LNPLOUa/tLDdgeFp8+xV1mzxiILZa5J2b17nv5Cs8T9sR8jXKxKivIEZl8yss0AT5chZ7O/JR5CdkfL49XD7UO3PcdBWl3xdZ78BSlPNz3kjv04/sQAAvO0TWu6y8vhCZHSrPrChWGg1DLhMXe2a9axwP3l7e3t0/1sp5e+M3W+q2/WZjdzhvuTclgYPAtjioplCarq4IjOllSa834ePwVQGnt9/a75PBdWfbDM7PNpKwMmupnNFWFAuI0Dk1FEUXVNA0Y/gAu4rhI5o3Kw0dwYS969EYi1SCRm8IR5BD5u+ubFRVUExTqUDls+TSJoOJQeIlGqtPIG8boWffo1Xd7wsoUymtTzyrUMsDlc0BqRso4TLUvIOnnuDfc7Bceg8m+XLILar88vznclM8dsOsTaN3T296HfqJjwH+Q2f9kAm3EA8mGTCfMjSdAJA9knQlJA0xpH9YYWiLlZmnUwfa33I1L/nH5poFHVzRZ0D63KbLL/izLnvF4cj3gsxsJJAmZIW1JziNU+DoYZ/0OHRHMKZ3m13TQ2HfH+AvR+7o9zgtr+pCN4wTN3hi1VZJ78NsFChg5B7HFK1ZkV2+bu8tA1GZEUsrgzfNhiUteEU/NbEnF4V0FIdjT8cRyFFdl8SevK4rmiKdy/t/L/dS8UkEkluYSAVAIGct0KSc3XIcePN4zTxC88Z4zZuRznuhwsEUeKHC8xiYzcdxlvU6x8uMezfyI1bt8DBj9Ppq9utg0SxdM3f7BFs1iEurZLHoDpVW7T2HCtbjXcYTO8KhvOP/HrMdPG8CW6vGm3U4/cskeR6ZO0F8bIhG3UmFeU381N9+TxEB0e+vmqfEy1mJUtNRS8gYBi81RbuqSAOGr+/Fo/J2OVrd0ZhNPsRu4CnZaYrccwzDtIXJWXs9LBfJwXUz4+vLT6tm+ckIKHP3N10iX1WM8lZLRsW1RN4awweKrfDPWl6pJLpGt/USsW8WxxfC4XDcMIV8lKtOdkin3G6DF95ofMjnhcobRNTLDJpjmGViM61SQKC+FTbEr36wXeuzXUUrTIpPNHbKejW2mphecz3RvScCfTdS3UC1OZVsKQWVV+kak0XFFl6ja/qKa+SoVsECqv+e3mzskVU265dsfbVgs6xGwyemXbEgDcgKVQvysm1EEiHYImFroRMmIEurkrzFCMVpLnDmYKcXHkLYFGCT38zU0PggNpC1DqsYYj6E+cP1kwN2BeyA4qhVsLMbhY6Pbf2bYqb4aSrSg9hz8GhylWQb+Qyv3yf9J1eEkKY/T8//yVp6JMOouB5ppgSxIkEsWFlh8dYI+fxlLs066/8h5bl+QMkEnDbPloDMLz2rCpY1kpb0NLSDN1OfIStYe0FsyG5tw6yl/c7rGrqqFa3Au7u7acrwE8SKbWFMBZjAAZ58w48z8x21oulnyMKKD89f/rw8f/hOShOOfp94iKzVWSh3mURRi9b5BPwqnu/TbMCymIsjWTq/SKG0VQrF0PIB/osIMz+teerA6Jmgeag+KSwBqpWrT4wZz+ZZ1BDoVHk47tAc8IbnC9WvU7WqwSEgJzjEr/esG1xUZra1SkMxaMGywKmkaM3K1LajsLda6KIDh0NGF+091Wipy4zS91Ya7JCFrWkVpUH9q79vRgSCgB1X7sKW+wy78VjkNP+F5S8ijgWmYEJFowUwGp9aPoTeeOz00jZ/qiFYMLLRwBvgY6ZtN47TzQ1ohqeVm8xwQJGgaGilaZvAEq7cbKhNArlDOfbl8373QGeCygWiu5VjB3wY/YYszEsjG9l2Fx41e4/JUvZ+3+/hlt8TKJDRkuxfzydr+aZL9b/AOOE17wabZ0hSsIYumKAha1cTcfjzEHQus/eixPbiVPgGAqFqmBo4uyUQBIsE31/f3V9/f5wb0YrEdYMXmcir3OIK1rckXreUijM9dFMWaHy8+h72ICYWj4lwZlSnszFPLuCzn85MFWdGEYlxfLkjy7kfjeAIRdMPbG9ScsANyYD0UEiGjRpkktOjmNfoLWa7x6ev8+GQH327P8EvGifIjooXoJdFvHxEnmzY0MpSKxnpsvnHuQKbXO58vGpey7v1PEr+o5XdJF90Yzfqd/CeP537Utw0j8x8l2peMiYvaZGu6nl94cwZVbGAA3m4ZFuSjmqQTeqrKRj7RFZKVF+80Q8ZEKoQsjGZC/ZMfXOy/ku6ZP6LT5/cpYjftduXF61CWpVNDh8HE7JJyVyup498s2DfNVavlXZMVh07pxzAs7rmaA2yur7dJF0+wFC1gruutUIlTEGtGmKov4x/vQpuzeRLOwwzT1SVrrHZTvBSq7F1Pg5Sfd+kdPsjZf7vNjYmY+a65D3JxLs7pSJMWTomaUt5ZE7jp6M1dq2quxtf3d5O9KPtTgPkerS6KNU43WkCxf0qCVZp+16tcrx7PWgscLwbKCADwOAqPi2FrzJSdZHik1ZBpvM7w9PN5XijdgtTvBfBwSFdAz78FG/bEhq0jc7y1vmAyd88zfuNlUcb2ChmRRul8iyzuid6G+WpDGQp8uJtJNcV6Xy842+eVG0dSC8lPUXlSA8o1Ujvraa35hicGY2blpregms0RbWXXgOw07niGjnmusHX5qX2b86a3tuoQL1zt2zeoVIF5XqyVlMVw8ev0pk5AqXyrLLbk87LovGiEB3T6Zh9mpSIESnYZqnqvVqIVSr6hsOhbwhK8DVBCT5YIuI2Q5/3tYv+1wHu+uB2bL5erGG4BfH+ebaGVZIobjDgDfDtw1gDKtizKS9yNZ8Ys2pN+DVEMP4ZIRJVmCtc7EwOb5EHyRn/Aw==</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-07-01T08:15:59.084Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="KkJXQeRy8c5tLqO1fy9Q" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc5s6E/41nkk7kwwgrh9za5u+SePGTdOcbxhkmwYjF3AS99e/khAYkGxjG7DdxOdMY4ub0LO7Wu1NHXA+fv0c2pPRDXKh31Ek97UDLjqKIlu6hP+QlhlrkXWQtAxDz2Vt84ae9xeyRnbhcOq5MCqcGCPkx96k2OigIIBOXGizwxC9FE8bIL/41Ik9hFxDz7F9vvXBc+NR2jv8mR/5Ar3hKOYOje30fNYQjWwXveSawGUHnIcIxcm38es59MkApkOTXPdpwdGsbyEM4ioX9LqR/nh+M7O/InUkD799u3wKj1XWuWfbn7KXvru9/YFbvt6e9fCf7vX956tv7A3iWToyIZoGLiR3ljrg7GXkxbA3sR1y9AUTA24bxWMf/5Lx1wEK4k/22PMJHfQ+4TvdoADhA7bvDQPcFiYjeObbfeh3UeTFHiLtPhyQ5mcYxh5G5bp0OEaT3NFTdrM+imM0Jo/1fP8c+SikXQaDAdQdB7dHcYieYO6Ia1h9ibwHP6RslMkz4GuuiQ3xZ4jGMA5n+BR2VJUM5URLrkrJ3tRZw8uciBRdTdpGefJRLUa8jHCH2f3nwOIvDFsxzr+C4PWLjK97/hsHxufv8D/JOVZUDuctMS0OrmtDcyAcXN0xYX/AqCDXLtHPMuqoA41UkjAoTE3nkTAlARIp19aPBDfu0MVCh/1EYTxCQxTY/uW89ayIzPyca0Q4gOLxG8bxjElQexqjIlp4BMPZL3I9pk328zF/7OKV3Tz5NWO/1scmQtPQgcvenw1AbIdDGC85kd2QjM5SqEPo27H3XBTZItjYpV3k4T7PScQocqsh68VbJB1lV+XlK3ejIq0ZoHSjZGi4G52GoT3LnTYhJ0QcnWVvvDnpgZ2Q3qsX5ygP/3pMSQ1/n9Md+THLE+F+kKtckVrVtqi1SGSaVI1a6yKhVALn1IUuvpN0TFSF+94XfNPkD+7QLHDqnWM0aLqqaI4xlT7Q9V3NMVIJEp2fY2RVMMeoZkNTjLVTPs9zecbz6/G51D6fq1VnJakdRgdlqlJaZvT3yaJBItJbmi1068S0dE2TDawKA8MqkBTQ2yUpfglyY8fOiMwcV93L66tvl7VOF1DGE4Yhmi4s3QD2zqaL0pJEFU0XimC6AKCh6cLYJafLeT4/MQ5mvtAqsnoznL54zSAWBCYoL3LUPNmsPF+XpaXnW5q07Hz8JelxrfLEPLgZSi4Q7ZyG949sa198byWhNG7m6Ci6T0yDkwIB6H+mxJp6Nsav5wUdcIqPSpNX/C8dNClpP6Z2Q3JMzR3D4js+ZgZJcszBYwvD4mEXOii0E+sjOYfQUuh7AZw/Gn8bsr+0g/204Svq42E57cXh1ImnIUxPwAPSL1+E2ybltlFYbtno3ROTKjlokoPlbuNe0isu6L1DLxhmL5I9v2vPfGS7K8+7nZCRirLzst+ilyxxctmom5h6U3sxswsjfNbApzZ1MvczjmGsKyvLOGix2jDQyH8itUGnH15tSD71aAe6WhKkqkA70AXagdWUvVL/B3gvpby9Z7+uN4HJO63grQssd2crzzqNYziexHMWpJP24pds4pXuCBUWu5vrRZ8b1rb69cMbQzSNN+nUu6xKlERjQ1kly00JK1nm0NnZWqbyUkbZtdkihaN9I/d2YAt815ij4Qk1Kxz9Rv0PtRoUBqYDxQ7kvqmpmsRzWysGBbnsEgACNgQig4LaFDAKB0wI8aQb5JXAEjBkji+OfipAmTLAO/rHnusmDAwj76/dp7cio81Ww/i+2llHuyD3wjwbMVnLARigADaMEZCMIkaKwWMkiyICjKYw4q1xE6x8dBTizUFvEiTVUAogqZLJg5QC2Q5I/MI3pyEeY43EHhOZxfQSrPlM8GgybVC78AgmAyLdjLOOcfHW4UwNqIUoHKVNOHdqaT1Qn0rqK1ltsjLr1k42caooRst+On59Pob4iWS056JikUB/A/4VIFKHhP6VxtSh3frjD5TtzYpsr7TkkF/O9nLLvtR0eHJs/xmS3p4RXiSms0GIxmT8RpC++9CL8IwOidW2T08hRqFnz2Zy4iS57qjmFdNgoCwIudX7urYvIkJRKooIvSm7hcIvZbvTaHQ0V8mT1ayEuRw/PodzjWgdSAyvXBUttTG0+Ci4BXD84xq2aRaRMfkFk6a1qGArvOGBgwRNY6IUnWcpIwK7Df7vE3ns2TC0XQ/Oj6WDWmSc7HQhAq4djTKOXJTKkCUtLDcmZ1RTyq4o25FT3YI80o4myYsOvFfSj4SQYHj5DBN6orQzsifkgvHrkOTynNgvkXoyjeizGiUhIBdIyDR45jZMnoSMpmInFcBL4i9d3PBwe/e/yzuOnNKBm4TIgVG0Wuj2bedpSMX0bUKJC6bO/TQ2anpRGJuWesInt4jFcQ124L/6wH0B5if1aXz6+Hn62jWsL8c8Yu15KAWux8Qk4/O/5uaYkt2m9GuBDypzU2LRFU3HVMk7SnS5DzvyZgqHinO/LRtHehY3ir3Yjo8+pqYuovccfcQKUHRCDiT6EAxDVHjvepyGTb0R1eY+zo135BU+fk10uqPUfXuA78VosQDWgXT9jq2JioTWAZ/Y/xgYLF19PHeSpVKE/GcYLXi7BXNCSOfdvPDnvcCi2aJJAa6bJW1alXhXBDBE8rsOfVoowN/tI0vtI8IxEwTyCc+r3Si6FdICfwbNFtpqFdukb1VXiuxiqSn/FJxCInNiU9zCW37+nJDJkWglIxuvavx9Hs/SYt4ClmA8hebZGsYzurvoS7+ffj8YT9979q/zB/3HfRrgdkDSp9Uw4jvr/jrSnns90//v8ZcS2Q/etSgrUji2zWRFrhv9LltGyVChmUaebvgrdAlse4WuGCXa3C4GfhkQOWHw7fRHb0sBsJGfR0hhi5aodYgSSykPuECTyULN6jYMCsl9pxkJm0WftSlKhGO2M1GyFdKHp7K2OmlshTTYj0lDKcsXYGpLpwAlK2+0+RWaXO+kIQZC2h/qlQ+YepfNyHtHvaa1XIHhaXH9K+pWecRA7DTGO5ejeujkKzxP2RPy1crEqK4gRmn7KwzQBvlyGns39FDoxWR8vt9f3tebBa/bkjQYiLR3YEjS+TmvpA/op2lDAC87RNp7U259ITJ7FJ1ZUay0aoZcJi4OTHtXOB68uby5vXuslfMOZt1syLteN2v7w3nLV1MNcBDYFQfVZEpT5RWGMbUs6dU21jh8UMDpzffu22Rw1do1g/PJRg1UmS1FM5qSZAAROqeaJKmCAJp2FB9gVlF8RGmzjaEjyNe7Gk+wSCVoOCM4hhwy/3Zgo5zlS2aWnLQeWhu5YGKQeInGwhPI24bo2XNpUPfbAkqXSrWWswC1Qvar3gxQwpLa/AJPPsG/52DZ9B5M8uWQWxT45XrP5aZoYgdpm0Lvntz0KvBiDwP8lyb9kHxbiAeTDJhHGZrG/6ePJF0JSEME6R8WF9phUeZJ5kD3ey7kJf/YXLOggyv6DEifu7T6gpd12S0OR74XJLGRQBqTanFPcBYlwNHDHulxYI9hRO+WXeOgYOAN8Zcje/xnkkRX9aEdRLHtP7Fgq9j5kI0CBYzc45iilcXY5cP21hmIyoxYqnLeNhuWZsFP9FMTe3JWSEuyOPa0LIEcVWvIxhKyJz/XFVWR3uXdz8uDnPgaBJKrSyQDIJCzBmhTzu7YDry9vWZuoVnTXrM20vlVqHAwBatQ4XkMzPbtOMt6neNlxr1brSNW7VaRMXp9Ift1sGjqrpkv+wTbTohDq5pi0T0KrTp4DhXUFl7GE3vCofzC/y16O3jeBKZSjTfrWPQvk+R5ZG4F9rERGvenFdKa+MzfgSOJgBgMVqUp8XK2QalpySVkNI2XmqIdYhoDho/vxaOyvhytvtDIcg/xMvCU7JpF7jmBQdLC5Ky5GZaL5OCmnvHN5adRs/xkBJQu97ct9y9LWnnbKK1iKZF1bfhAMiX+WcsjlUTXqGahiPTWdnwhHBbHDTPIW7nqZIck43YXvLCm8tE8L1Te7KJeZlAsTS8Tm26UDAL1FdgQv/q77lqf7ioqMCk+Udsr7VXbqWN6w3KiB08E6n64uoFsclOyIS3fN0F0jc6sYguvURV1xTXNTK2C+qk/T6+3XpFVVuuXbOO1YOOvVs0nulkxIK2OwmFigHhbdRcRRwjWSFgpdMIEpLIq8VuMUZT4ArMFdnLhuwmbAqwr3MasmsIbsUFTZVjFEPMmzAfbi9+xK2AHJEuugp3ZKnS8betngpnkJa5IF+KVg0udq8TbyHt4vQHpP7kigNT9eXr+v7TFIR5GyXZJMyWIFQ5iQWGFxTsj5P2XOTdr1v93l+fmBiVuHknrP+brzsqCmkaNuTw15X0tU58aK6i8IFZja9/6aztxxddeSEpa0fi729vrttQ+gaXYFFpUgA4s4Dav9nFKviVXVPy0prDijfOXvy7P73+QwISjPycuIoU6C8Eu0zDs0Cgfny/h+TaVBsXSOSuSofIVChsrUSiGljfvX4SY+WnEUw+GzwTN99iTvPJX3mdGBlrGs3kW1QRzanM47lEGeMvZQvXPqUpV0xBoxjTEF3tWJM4mk+2r0pIFWlATOJEUnSxIbTcT9k7DXFRZ5ZBRRRtPtRroklH6wUqDPdKwFaWiNKi/9Pt2RCAw13HBLqzWZ9CPJqIl8z8Y/CLiWKAL0ilaDX9ReMfyu+GNx04p1b+TNYlHrlWzG+Atpl07ipKdDah/p5NLZXhHkaJYTtoEhrBssya3CeQeediXZ/0ewJwJKoeH7peHHfBG9GtSlZdaNtK9Llyq9h6TOvbewHNwy58pFMjohvRf1yOFfJM6/S8wivmZd4udMxqaYDVVkJ7R1JYmYvPnu9G5zN6L3NqLHeFbCISqZmpg7ZdAEJQIvru6vbv68ThXoqUGqwYvUpFXLYsraN8N8bohVczzUPWmQOPt1XfQgZhYXCbCmVKd5GKeXMBnL8lLFftFEbFxfL0ltdyPxnCMwtkHtjEpOWAHZEAcFJBhowpZw85RzGv0FtnW8cnrfHj3jq6vicrl6gfpQixvkReXn2+KeHmLPNmtoZO6VlLSZdnHufCanOd8siqr5c2uPErrRyO9ST7kxmx13cGv/GnmS3HHPJL33qh6yZi8NIv0ZdcdCPNmZMkAFuThalqTtGSN7FBfbYIxT5pyiaqLd/khA0InhHRM5oI9nb45Wf81KZj/4tEn9ynit93u5UWn4FZlqeETf0p2KJnL9eSRawv2fWP1WmkHaGqJcgDP6oqltMjq6m6ddHkDQ9X47boqhTaQgFrVxFB/EP9m8duKzod2aHqeqCpdY7Jt4BuNxVZ5O0j1TZOSvY+k+b+72JXsKoZk9mx6RzLx3k6JDJOWDkrSUh6aqwsaSpNuwbXu3lV1d+c0ejraYAuturvxzXb2oh9de+Yj26UQJTNgfxZDcb9Kgr6xTbhWGQIcB2oLDAEtBLQBoAHeQMBHPcmqaCJOI0Xrn4n5berpTne8kr2DhPNFcHBI14APn3BuGkIFu9Wcc5U34PzLSedrRkJtoTPpFXWmyjlvdaeda+XECiVz+bYU56Ty9pd/OcXbeCe9hPQkmSM9IFUjvXWXAoqlcWo9blq6FBBco0iyufQaoOrlKgnla5pZPmh8rGBiPs9p9wdrpag3k8zkF3iyIHywqdquYvj4mqHpQqAULlZehiVZYtR+FaBjmhw6oE6SCJEAcuY6P6iysI2ir1kc+pogJUARpASAJSJuO/T5tX9x/fUOd31wWyYfv9Yy3AL/w9x7xCJbJNsf8gr47mGsARW8simX3Jon6qyqUF+HCH6NXn5Mfz7/AdfW/eNtVzK+PowXVW0+jWM4nrCqwEWLmHRjv2aHy4xKizLnYD1O5uI1EG3HCVwDnrKkFX18uiGYT0UF7evY0lcIpsKB+YDCJ5o54SSFnQmiR6zIM/6KHztCLrEysZ2UC75AO4qQ49mserY32UdlqgYgNbXom1cNRcCWQGSYKC8Za0NysddvUVyH2ARZJdaDMvwdHJL4wzA/mRLMPeh7AYw664V8NNXVpOZ7Rr4JTe5Fz2hl915MVRMn6+CRi6jKQtZKLvRhvHbwTFP9VamgpuOX6+88nGMvOqmlnSQ0WOgeHlk4sKc+GVgyX5IgqWmUkC8c4LPdfRlo/WTRaqbu8KkDErmkXlRRFwKmLBC6iiYQutb6Qhf/DBGBam58CO3J6AZhpsSN/wc=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index ab7222ae..690402d6 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -10,6 +10,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -35,13 +36,16 @@ type Plugin struct {
// priority queue implementation
queue priorityqueue.Queue
+
+ // parent config for broken options.
+ pipelines pipeline.Pipelines
}
func testListener(data interface{}) {
fmt.Println(data)
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, pq priorityqueue.Queue) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
const op = errors.Op("jobs_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -52,10 +56,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
- err = p.cfg.InitDefaults()
- if err != nil {
- return errors.E(op, err)
- }
+ p.cfg.InitDefaults()
p.server = server
p.events = events.NewEventsHandler()
@@ -63,8 +64,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.brokers = make(map[string]Broker)
p.consumers = make(map[string]Consumer)
+ // initial set of pipelines
+ p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
// initialize priority queue
- p.queue = pq
+ p.queue = priorityqueue.NewPriorityQueue()
p.log = log
return nil
@@ -132,14 +139,7 @@ func (p *Plugin) Name() string {
}
func (p *Plugin) Push(j *structs.Job) (string, error) {
- pipe, pOpts, err := p.cfg.MatchPipeline(j)
- if err != nil {
- panic(err)
- }
-
- if pOpts != nil {
- j.Options.Merge(pOpts)
- }
+ pipe := p.pipelines.Get(j.Options.Pipeline)
broker, ok := p.consumers[pipe.Broker()]
if !ok {
diff --git a/plugins/jobs/pq_plugin/plugin.go b/plugins/jobs/pq_plugin/plugin.go
deleted file mode 100644
index 7df846ac..00000000
--- a/plugins/jobs/pq_plugin/plugin.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package pq_plugin //nolint:stylecheck
-
-import (
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "internal_pq"
-)
-
-type Plugin struct {
- log logger.Logger
- pq priorityqueue.Queue
-}
-
-func (p *Plugin) Init(log logger.Logger) error {
- p.log = log
- p.pq = priorityqueue.NewPriorityQueue()
- return nil
-}
-
-func (p *Plugin) Push(item interface{}) {
- p.pq.Push(item)
- // no-op
-}
-
-func (p *Plugin) Pop() interface{} {
- return p.pq.Pop()
-}
-
-func (p *Plugin) Name() string {
- return PluginName
-}
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index b21f764c..320f41b1 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -6,52 +6,49 @@ server:
relay: "pipes"
relay_timeout: "20s"
-jobs:
- # worker pool configuration
- pool:
- num_workers: 4
-
- # rabbitmq and similar servers
- amqp:
- addr: amqp://guest:guest@localhost:5672/
+amqp:
+ addr: amqp://guest:guest@localhost:5672/
# beanstalk configuration
- beanstalk:
- addr: tcp://localhost:11300
+beanstalk:
+ addr: tcp://localhost:11300
# amazon sqs configuration
- sqs:
- key: api-key
- secret: api-secret
- region: us-west-1
- endpoint: http://localhost:9324
-
- # job destinations and options
- dispatch:
- spiral-jobs-tests-amqp-*.pipeline: amqp
- spiral-jobs-tests-local-*.pipeline: local
- spiral-jobs-tests-beanstalk-*.pipeline: beanstalk
- spiral-jobs-tests-sqs-*.pipeline: sqs
-
- # list of broker pipelines associated with endpoints
- pipelines:
- local:
- broker: ephemeral
+sqs:
+ key: api-key
+ secret: api-secret
+ region: us-west-1
+ endpoint: http://localhost:9324
- amqp:
- broker: amqp
- queue: default
- beanstalk:
- broker: beanstalk
- tube: default
+jobs:
+ # worker pool configuration
+ pool:
+ num_workers: 4
- sqs:
- broker: sqs
- queue: default
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-local:
+ driver: ephemeral
+ priority: 10
+
+ test-1:
+ driver: amqp
+ priority: 1
+ queue: default
+
+ test-2:
+ driver: beanstalk
+ priority: 11
+ tube: default
+
+ test-3:
+ # priority: 11 - not defined, 10 by default
+ driver: sqs
+ queue: default
declare:
MessageRetentionPeriod: 86400
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
- consume: ["local", "amqp", "beanstalk", "sqs"]
+ consume: [ "test-local", "test-1" ]
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
index 2c58c344..e8b4e83d 100644
--- a/tests/plugins/jobs/jobs_plugin_test.go
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -12,7 +12,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs"
"github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pq_plugin"
"github.com/spiral/roadrunner/v2/plugins/logger"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -35,7 +34,6 @@ func TestJobsInit(t *testing.T) {
&logger.ZapLogger{},
&jobs.Plugin{},
&ephemeral.Plugin{},
- &pq_plugin.Plugin{},
)
assert.NoError(t, err)