diff options
author | Valery Piashchynski <[email protected]> | 2021-07-01 21:28:23 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-01 21:28:23 +0300 |
commit | a4b9d0c47494431c30f357c3616e53baf411360f (patch) | |
tree | 281f19cb031ff66f6360e11ec610dbc0a2fd7920 | |
parent | cd64c6c44ad463963705a22af2bd49059987949c (diff) |
- Remove Dispatcher
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | plugins/jobs/config.go | 46 | ||||
-rw-r--r-- | plugins/jobs/dispatcher/dispatcher.go | 49 | ||||
-rw-r--r-- | plugins/jobs/dispatcher/dispatcher_test.go | 55 | ||||
-rw-r--r-- | plugins/jobs/doc/jobs_arch.drawio | 2 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 28 | ||||
-rw-r--r-- | plugins/jobs/pq_plugin/plugin.go | 34 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-init.yaml | 71 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_plugin_test.go | 2 |
8 files changed, 50 insertions, 237 deletions
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index bb042ec9..87e36ecb 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -1,11 +1,8 @@ package jobs import ( - "github.com/spiral/errors" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/jobs/dispatcher" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) // Config defines settings for job broker, workers and job-pipeline mapping. @@ -14,58 +11,17 @@ type Config struct { // Workers *roadrunner.ServerConfig poolCfg *poolImpl.Config - // Dispatch defines where and how to match jobs. - Dispatch map[string]*structs.Options - // Pipelines defines mapping between PHP job pipeline and associated job broker. Pipelines map[string]*pipeline.Pipeline // Consuming specifies names of pipelines to be consumed on service start. Consume []string - - // parent config for broken options. - pipelines pipeline.Pipelines - route dispatcher.Dispatcher } -func (c *Config) InitDefaults() error { - const op = errors.Op("config_init_defaults") - var err error - c.pipelines, err = pipeline.InitPipelines(c.Pipelines) - if err != nil { - return errors.E(op, err) - } - +func (c *Config) InitDefaults() { if c.poolCfg == nil { c.poolCfg = &poolImpl.Config{} } c.poolCfg.InitDefaults() - - return nil -} - -// MatchPipeline locates the pipeline associated with the job. -func (c *Config) MatchPipeline(job *structs.Job) (*pipeline.Pipeline, *structs.Options, error) { - const op = errors.Op("config_match_pipeline") - opt := c.route.Match(job) - - pipe := "" - if job.Options != nil { - pipe = job.Options.Pipeline - } - - if pipe == "" && opt != nil { - pipe = opt.Pipeline - } - - if pipe == "" { - return nil, nil, errors.E(op, errors.Errorf("unable to locate pipeline for `%s`", job.Job)) - } - - if p := c.pipelines.Get(pipe); p != nil { - return p, opt, nil - } - - return nil, nil, errors.E(op, errors.Errorf("undefined pipeline `%s`", pipe)) } diff --git a/plugins/jobs/dispatcher/dispatcher.go b/plugins/jobs/dispatcher/dispatcher.go deleted file mode 100644 index e73e7b74..00000000 --- a/plugins/jobs/dispatcher/dispatcher.go +++ /dev/null @@ -1,49 +0,0 @@ -package dispatcher - -import ( - "strings" - - "github.com/spiral/roadrunner/v2/plugins/jobs/structs" -) - -var separators = []string{"/", "-", "\\"} - -// Dispatcher provides ability to automatically locate the pipeline for the specific job -// and update job options (if none set). -type Dispatcher map[string]*structs.Options - -// pre-compile patterns -func initDispatcher(routes map[string]*structs.Options) Dispatcher { - dispatcher := make(Dispatcher) - for pattern, opts := range routes { - pattern = strings.ToLower(pattern) - pattern = strings.Trim(pattern, "-.*") - - for _, s := range separators { - pattern = strings.ReplaceAll(pattern, s, ".") - } - - dispatcher[pattern] = opts - } - - return dispatcher -} - -// Match clarifies target job pipeline and other job options. Can return nil. -func (dispatcher Dispatcher) Match(job *structs.Job) (found *structs.Options) { - var best = 0 - - jobName := strings.ToLower(job.Job) - for pattern, opts := range dispatcher { - if strings.HasPrefix(jobName, pattern) && len(pattern) > best { - found = opts - best = len(pattern) - } - } - - if best == 0 { - return nil - } - - return found -} diff --git a/plugins/jobs/dispatcher/dispatcher_test.go b/plugins/jobs/dispatcher/dispatcher_test.go deleted file mode 100644 index e584bda8..00000000 --- a/plugins/jobs/dispatcher/dispatcher_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package dispatcher - -import ( - "testing" - - "github.com/spiral/roadrunner/v2/plugins/jobs/structs" - "github.com/stretchr/testify/assert" -) - -func Test_Map_All(t *testing.T) { - m := initDispatcher(map[string]*structs.Options{"default": {Pipeline: "default"}}) - assert.Equal(t, "default", m.Match(&structs.Job{Job: "default"}).Pipeline) -} - -func Test_Map_Miss(t *testing.T) { - m := initDispatcher(map[string]*structs.Options{"some.*": {Pipeline: "default"}}) - - assert.Nil(t, m.Match(&structs.Job{Job: "miss"})) -} - -func Test_Map_Best(t *testing.T) { - m := initDispatcher(map[string]*structs.Options{ - "some.*": {Pipeline: "default"}, - "some.other.*": {Pipeline: "other"}, - }) - - assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) - assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) - assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline) - assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline) -} - -func Test_Map_BestUpper(t *testing.T) { - m := initDispatcher(map[string]*structs.Options{ - "some.*": {Pipeline: "default"}, - "some.Other.*": {Pipeline: "other"}, - }) - - assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) - assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) - assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.OTHER"}).Pipeline) - assert.Equal(t, "other", m.Match(&structs.Job{Job: "Some.other.job"}).Pipeline) -} - -func Test_Map_BestReversed(t *testing.T) { - m := initDispatcher(map[string]*structs.Options{ - "some.*": {Pipeline: "default"}, - "some.other.*": {Pipeline: "other"}, - }) - - assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline) - assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline) - assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) - assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) -} diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio index be941255..4161d7ca 100644 --- a/plugins/jobs/doc/jobs_arch.drawio +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-06-29T17:21:10.029Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="Bl_lIai7C62ty2EayK1b" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1bc9o4FP41zKSdSca2fH3MtU03aWhots2+GSzAjbGobZLQX7+SLINtCXDAMtDA7jQg32R9536OpBY4H71+itzx8BZ5MGhpivfaAhctTVMdw8Z/SMuUtajASlsGke+xtnlDx/8DWaPCWie+B+PCiQlCQeKPi409FIawlxTa3ChCL8XT+igoPnXsDiDX0Om5Ad/6w/eSYdY7/Jkf+Qz9wTDhDo3c7HzWEA9dD73kmsBlC5xHCCXpt9HrOQzIAGZDk153teDorG8RDJMqF3Tasfl4fjt1vyB9qA6+fr18io511rlnN5iwl76/u/uOW77cnXXwn/bNw6frr+wNkmk2MhGahB4kd1Za4Oxl6CewM3Z75OgLJgbcNkxGAf6l4q99FCZX7sgPCB10rvCdblGI8AE38AchbovSETwL3C4M2ij2Ex+R9gD2SfMzjBIfo3JTOpygce7oKbtZFyUJGpHH+kFwjgIU0S6Dfh+avR5uj5MIPcHcEc9yugp5D35I2SiTZ8DXXBMb4k8QjWASTfEp7KgONO3ESK/KyN42WcPLnIg0U0/bhnny0R1GvIxwB7P7z4HFXxi2Ypx/huHrZxVf9/wnCa1P3+B/Su9Y0zmcN8S0OLieC+2+cHDNng27fUYFuXaFfpZRRx1oZJKEQWEbJo+ErQiQyLi2fiS4cYceFjrsJ4qSIRqg0A0u561nRWTm59wgwgEUj18wSaZMgrqTBBXRwiMYTX+S6zFtsp+P+WMXr+zm6a8p+/V2bGI0iXpw2fuzAUjcaACTJSeyG5LRWQp1BAM38Z+LIlsEG7u0jXzc5zmJWEVutVSzeIu0o+yqvHzlblSkNQuUbpQODXej0yhyp7nTxuSEmKOz2RuvT3pgK6T36ic5ysO/HjNSw9/ndEd+TPNEuBvkqlakVr0pai0SmaFUo9a6SCiTwDlzoY3vpBwTU+Gh8xnfNP2DOzQNe/XqGAPani7SMbbWBaa5LR2jlCAxeR2j6gIdo9uSVIyzVT7Pc/mM59/G50rzfK5X1UpKM4wOylSlNczoB2UhkYjMhrSF6ZzYjmkYqoVNYWA5BZICZrMkxbsgt27SGxLNcd2+vLn+elmruoAqVhiWSF04pgXcramLkkuii9SFJlAXAEhSF9Y2OV3N8/mJtTf6wqjI6rVz+kZQ23sn1NUCznPYdw/p2v3VjZA2OGHb0syARNPGBQIwf09IAPJshF/PD1vgFB9Vxq/4XzpoStp+TENt5JieO4YlXnLMYnjkWA+PLYyKhz3YQ5GbBuzIOYSWosAP4fzR+NuA/aUd7GYNX1AXD8tpJ4kmvWQSwewEPCDd8kW4bVxuG0bllrXePY1CkoM2OVjuNu4lveKC3jvyw8HsRWbPb7vTALneyvPuxmSk4tl5s9+ilyxxcjkOmkZHsxArC6UifFY/oGFooi4ZxzDWVbVlHLRY0/YN8p9I05r0w2va9FOPQjX1okI1dYFCNQUK1ZEV4jP/At7LKG/n2a/tj2H6Tit46wLL3enKs06TBI7GyZwFqTG8+CVlvNI9ocJid3O96HLD2lS/vvsjiCbJOp06yCp61LbWlFWqKktYqSqHztbM/8rWv7ZtTz+Do/m48GZgC9K9mKPhCfXEj36h7odaffC+3YPinGvXNnRD4bmtER9ctUvBNVDVB5fGhRoHTASx0g3zRmAJGKLji6OfCVBmDPC58ZHveSkDw9j/43bprchos6QTvq9x1jIuyL0wz8ZM1nIAhiiEkjECilXESLN4jFRREt2ShREfwBpj46OlkQQIepcg6WYRJF2xeZAyIJsBiXd8cxbiMbZI3BGRWcwuwZbPGI8mswaNC59g0ifSzTprWRfvDU5LK8KZxRwLhStak3BuNTi5p2mILL2wOmRl122drJOH0KyGU1u8fz6C+IlktOeiYpFAfwcpCbB9c2i7Kew9ZXu7IttrDeWwl7O92nD6MRueHNt/gqS3Z4QXSeisH6ERGb8hpO8+8GOs0SGJ2nbpKSQo9Oy7TE6cpNcd1ewx9fvagipVs2sauyIiNK2iiDBliQiNd2Xbk3h4NDfJU29WwVyOH5/DuUa09qTsVa2Kli4NLb5wbAEcf7mFbdtFZGzeYTKMBg1sjQ88cJCgSUKMovPZLAtB3Ab/d0UeezaIXM+H82PZoBYZZ3a6EAHPjYczjlxU/T+r818eTJ5RTWlCQjmOnNkW5JFuPE5ftO+/kn6khASjy2eY0hOlnaE7JheMXgdk+suJ+xLrJ5OYPksqCQG1QEK2xTO3ZfMkZMkqN9QAL4k/t3HDj7v7fy7vOXLKBm4coR6M49VCt+v2ngZUTN+llLhAde5msNEwi8LYdvQTfj6IWBzXEAf+Y/a9F2Bf6U+j08dPk9e25Xw+5hFrLkMpSD2mIZmA/zUPx5TiNqVfC3JQszQlFl3xZESNvKPUlvuwpWymcKi49NuycaRncaPYSdzk6GMW6iJ2z9FHbADFJ+RAag/BKEKF964naSjrjag193EevCOv8PFLatMdZenbPXwvRosFsPak6/fMJyoSWgtcsf8xMFi6Blh3ElcpRsEzjBe83QKdEFG9mxf+fBZYpC2kCnDsxxaNNl3h46LAEgnwOgxqoQQ/BEiWBkiEYyao5BOeV3tUdCOkBQkNOsNmIzdWZnLV1IoGj6Nn6dZCVkhSPHEZorkx/H1CtCMxS4YudmuCXR7PkjfvAEcwnrLis/H9RVf59fTrh/X0reP+PP9hfn/IKtz2SPo0Wkd87zzcxMZzp2MH/z3+1GL3h38jmkkoHFs5MwkXzzIVx25VxypFKgzbytMNf4WZacX1rzA1q0Sbab/XDQQvAyInDL6efu9sKADWSvQIKWyRj1qHKHG08oAroqoKSZFBIblvdUrCeuVnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTS0snwBtrFUBWAJtPEVhlqv0hADoewO9ap7TL3LNPLOUa/tLDdgeFp8+xV1mzxiILZa5J2b17nv5Cs8T9sR8jXKxKivIEZl8yss0AT5chZ7O/JR5CdkfL49XD7UO3PcdBWl3xdZ78BSlPNz3kjv04/sQAAvO0TWu6y8vhCZHSrPrChWGg1DLhMXe2a9axwP3l7e3t0/1sp5e+M3W+q2/WZjdzhvuTclgYPAtjioplCarq4IjOllSa834ePwVQGnt9/a75PBdWfbDM7PNpKwMmupnNFWFAuI0Dk1FEUXVNA0Y/gAu4rhI5o3Kw0dwYS969EYi1SCRm8IR5BD5u+ubFRVUExTqUDls+TSJoOJQeIlGqtPIG8boWffo1Xd7wsoUymtTzyrUMsDlc0BqRso4TLUvIOnnuDfc7Bceg8m+XLILar88vznclM8dsOsTaN3T296HfqJjwH+Q2f9kAm3EA8mGTCfMjSdAJA9knQlJA0xpH9YYWiLlZmnUwfa33I1L/nH5poFHVzRZ0D63KbLL/izLnvF4cj3gsxsJJAmZIW1JziNU+DoYZ/0OHRHMKZ3m13TQ2HfH+AvR+7o9zgtr+pCN4wTN3hi1VZJ78NsFChg5B7HFK1ZkV2+bu8tA1GZEUsrgzfNhiUteEU/NbEnF4V0FIdjT8cRyFFdl8SevK4rmiKdy/t/L/dS8UkEkluYSAVAIGct0KSc3XIcePN4zTxC88Z4zZuRznuhwsEUeKHC8xiYzcdxlvU6x8uMezfyI1bt8DBj9Ppq9utg0SxdM3f7BFs1iEurZLHoDpVW7T2HCtbjXcYTO8KhvOP/HrMdPG8CW6vGm3U4/cskeR6ZO0F8bIhG3UmFeU381N9+TxEB0e+vmqfEy1mJUtNRS8gYBi81RbuqSAOGr+/Fo/J2OVrd0ZhNPsRu4CnZaYrccwzDtIXJWXs9LBfJwXUz4+vLT6tm+ckIKHP3N10iX1WM8lZLRsW1RN4awweKrfDPWl6pJLpGt/USsW8WxxfC4XDcMIV8lKtOdkin3G6DF95ofMjnhcobRNTLDJpjmGViM61SQKC+FTbEr36wXeuzXUUrTIpPNHbKejW2mphecz3RvScCfTdS3UC1OZVsKQWVV+kak0XFFl6ja/qKa+SoVsECqv+e3mzskVU265dsfbVgs6xGwyemXbEgDcgKVQvysm1EEiHYImFroRMmIEurkrzFCMVpLnDmYKcXHkLYFGCT38zU0PggNpC1DqsYYj6E+cP1kwN2BeyA4qhVsLMbhY6Pbf2bYqb4aSrSg9hz8GhylWQb+Qyv3yf9J1eEkKY/T8//yVp6JMOouB5ppgSxIkEsWFlh8dYI+fxlLs066/8h5bl+QMkEnDbPloDMLz2rCpY1kpb0NLSDN1OfIStYe0FsyG5tw6yl/c7rGrqqFa3Au7u7acrwE8SKbWFMBZjAAZ58w48z8x21oulnyMKKD89f/rw8f/hOShOOfp94iKzVWSh3mURRi9b5BPwqnu/TbMCymIsjWTq/SKG0VQrF0PIB/osIMz+teerA6Jmgeag+KSwBqpWrT4wZz+ZZ1BDoVHk47tAc8IbnC9WvU7WqwSEgJzjEr/esG1xUZra1SkMxaMGywKmkaM3K1LajsLda6KIDh0NGF+091Wipy4zS91Ya7JCFrWkVpUH9q79vRgSCgB1X7sKW+wy78VjkNP+F5S8ijgWmYEJFowUwGp9aPoTeeOz00jZ/qiFYMLLRwBvgY6ZtN47TzQ1ohqeVm8xwQJGgaGilaZvAEq7cbKhNArlDOfbl8373QGeCygWiu5VjB3wY/YYszEsjG9l2Fx41e4/JUvZ+3+/hlt8TKJDRkuxfzydr+aZL9b/AOOE17wabZ0hSsIYumKAha1cTcfjzEHQus/eixPbiVPgGAqFqmBo4uyUQBIsE31/f3V9/f5wb0YrEdYMXmcir3OIK1rckXreUijM9dFMWaHy8+h72ICYWj4lwZlSnszFPLuCzn85MFWdGEYlxfLkjy7kfjeAIRdMPbG9ScsANyYD0UEiGjRpkktOjmNfoLWa7x6ev8+GQH327P8EvGifIjooXoJdFvHxEnmzY0MpSKxnpsvnHuQKbXO58vGpey7v1PEr+o5XdJF90Yzfqd/CeP537Utw0j8x8l2peMiYvaZGu6nl94cwZVbGAA3m4ZFuSjmqQTeqrKRj7RFZKVF+80Q8ZEKoQsjGZC/ZMfXOy/ku6ZP6LT5/cpYjftduXF61CWpVNDh8HE7JJyVyup498s2DfNVavlXZMVh07pxzAs7rmaA2yur7dJF0+wFC1gruutUIlTEGtGmKov4x/vQpuzeRLOwwzT1SVrrHZTvBSq7F1Pg5Sfd+kdPsjZf7vNjYmY+a65D3JxLs7pSJMWTomaUt5ZE7jp6M1dq2quxtf3d5O9KPtTgPkerS6KNU43WkCxf0qCVZp+16tcrx7PWgscLwbKCADwOAqPi2FrzJSdZHik1ZBpvM7w9PN5XijdgtTvBfBwSFdAz78FG/bEhq0jc7y1vmAyd88zfuNlUcb2ChmRRul8iyzuid6G+WpDGQp8uJtJNcV6Xy842+eVG0dSC8lPUXlSA8o1Ujvraa35hicGY2blpregms0RbWXXgOw07niGjnmusHX5qX2b86a3tuoQL1zt2zeoVIF5XqyVlMVw8ev0pk5AqXyrLLbk87LovGiEB3T6Zh9mpSIESnYZqnqvVqIVSr6hsOhbwhK8DVBCT5YIuI2Q5/3tYv+1wHu+uB2bL5erGG4BfH+ebaGVZIobjDgDfDtw1gDKtizKS9yNZ8Ys2pN+DVEMP4ZIRJVmCtc7EwOb5EHyRn/Aw==</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-07-01T08:15:59.084Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="KkJXQeRy8c5tLqO1fy9Q" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1rc5s6E/41nkk7kwwgrh9za5u+SePGTdOcbxhkmwYjF3AS99e/khAYkGxjG7DdxOdMY4ub0LO7Wu1NHXA+fv0c2pPRDXKh31Ek97UDLjqKIlu6hP+QlhlrkXWQtAxDz2Vt84ae9xeyRnbhcOq5MCqcGCPkx96k2OigIIBOXGizwxC9FE8bIL/41Ik9hFxDz7F9vvXBc+NR2jv8mR/5Ar3hKOYOje30fNYQjWwXveSawGUHnIcIxcm38es59MkApkOTXPdpwdGsbyEM4ioX9LqR/nh+M7O/InUkD799u3wKj1XWuWfbn7KXvru9/YFbvt6e9fCf7vX956tv7A3iWToyIZoGLiR3ljrg7GXkxbA3sR1y9AUTA24bxWMf/5Lx1wEK4k/22PMJHfQ+4TvdoADhA7bvDQPcFiYjeObbfeh3UeTFHiLtPhyQ5mcYxh5G5bp0OEaT3NFTdrM+imM0Jo/1fP8c+SikXQaDAdQdB7dHcYieYO6Ia1h9ibwHP6RslMkz4GuuiQ3xZ4jGMA5n+BR2VJUM5URLrkrJ3tRZw8uciBRdTdpGefJRLUa8jHCH2f3nwOIvDFsxzr+C4PWLjK97/hsHxufv8D/JOVZUDuctMS0OrmtDcyAcXN0xYX/AqCDXLtHPMuqoA41UkjAoTE3nkTAlARIp19aPBDfu0MVCh/1EYTxCQxTY/uW89ayIzPyca0Q4gOLxG8bxjElQexqjIlp4BMPZL3I9pk328zF/7OKV3Tz5NWO/1scmQtPQgcvenw1AbIdDGC85kd2QjM5SqEPo27H3XBTZItjYpV3k4T7PScQocqsh68VbJB1lV+XlK3ejIq0ZoHSjZGi4G52GoT3LnTYhJ0QcnWVvvDnpgZ2Q3qsX5ygP/3pMSQ1/n9Md+THLE+F+kKtckVrVtqi1SGSaVI1a6yKhVALn1IUuvpN0TFSF+94XfNPkD+7QLHDqnWM0aLqqaI4xlT7Q9V3NMVIJEp2fY2RVMMeoZkNTjLVTPs9zecbz6/G51D6fq1VnJakdRgdlqlJaZvT3yaJBItJbmi1068S0dE2TDawKA8MqkBTQ2yUpfglyY8fOiMwcV93L66tvl7VOF1DGE4Yhmi4s3QD2zqaL0pJEFU0XimC6AKCh6cLYJafLeT4/MQ5mvtAqsnoznL54zSAWBCYoL3LUPNmsPF+XpaXnW5q07Hz8JelxrfLEPLgZSi4Q7ZyG949sa198byWhNG7m6Ci6T0yDkwIB6H+mxJp6Nsav5wUdcIqPSpNX/C8dNClpP6Z2Q3JMzR3D4js+ZgZJcszBYwvD4mEXOii0E+sjOYfQUuh7AZw/Gn8bsr+0g/204Svq42E57cXh1ImnIUxPwAPSL1+E2ybltlFYbtno3ROTKjlokoPlbuNe0isu6L1DLxhmL5I9v2vPfGS7K8+7nZCRirLzst+ilyxxctmom5h6U3sxswsjfNbApzZ1MvczjmGsKyvLOGix2jDQyH8itUGnH15tSD71aAe6WhKkqkA70AXagdWUvVL/B3gvpby9Z7+uN4HJO63grQssd2crzzqNYziexHMWpJP24pds4pXuCBUWu5vrRZ8b1rb69cMbQzSNN+nUu6xKlERjQ1kly00JK1nm0NnZWqbyUkbZtdkihaN9I/d2YAt815ij4Qk1Kxz9Rv0PtRoUBqYDxQ7kvqmpmsRzWysGBbnsEgACNgQig4LaFDAKB0wI8aQb5JXAEjBkji+OfipAmTLAO/rHnusmDAwj76/dp7cio81Ww/i+2llHuyD3wjwbMVnLARigADaMEZCMIkaKwWMkiyICjKYw4q1xE6x8dBTizUFvEiTVUAogqZLJg5QC2Q5I/MI3pyEeY43EHhOZxfQSrPlM8GgybVC78AgmAyLdjLOOcfHW4UwNqIUoHKVNOHdqaT1Qn0rqK1ltsjLr1k42caooRst+On59Pob4iWS056JikUB/A/4VIFKHhP6VxtSh3frjD5TtzYpsr7TkkF/O9nLLvtR0eHJs/xmS3p4RXiSms0GIxmT8RpC++9CL8IwOidW2T08hRqFnz2Zy4iS57qjmFdNgoCwIudX7urYvIkJRKooIvSm7hcIvZbvTaHQ0V8mT1ayEuRw/PodzjWgdSAyvXBUttTG0+Ci4BXD84xq2aRaRMfkFk6a1qGArvOGBgwRNY6IUnWcpIwK7Df7vE3ns2TC0XQ/Oj6WDWmSc7HQhAq4djTKOXJTKkCUtLDcmZ1RTyq4o25FT3YI80o4myYsOvFfSj4SQYHj5DBN6orQzsifkgvHrkOTynNgvkXoyjeizGiUhIBdIyDR45jZMnoSMpmInFcBL4i9d3PBwe/e/yzuOnNKBm4TIgVG0Wuj2bedpSMX0bUKJC6bO/TQ2anpRGJuWesInt4jFcQ124L/6wH0B5if1aXz6+Hn62jWsL8c8Yu15KAWux8Qk4/O/5uaYkt2m9GuBDypzU2LRFU3HVMk7SnS5DzvyZgqHinO/LRtHehY3ir3Yjo8+pqYuovccfcQKUHRCDiT6EAxDVHjvepyGTb0R1eY+zo135BU+fk10uqPUfXuA78VosQDWgXT9jq2JioTWAZ/Y/xgYLF19PHeSpVKE/GcYLXi7BXNCSOfdvPDnvcCi2aJJAa6bJW1alXhXBDBE8rsOfVoowN/tI0vtI8IxEwTyCc+r3Si6FdICfwbNFtpqFdukb1VXiuxiqSn/FJxCInNiU9zCW37+nJDJkWglIxuvavx9Hs/SYt4ClmA8hebZGsYzurvoS7+ffj8YT9979q/zB/3HfRrgdkDSp9Uw4jvr/jrSnns90//v8ZcS2Q/etSgrUji2zWRFrhv9LltGyVChmUaebvgrdAlse4WuGCXa3C4GfhkQOWHw7fRHb0sBsJGfR0hhi5aodYgSSykPuECTyULN6jYMCsl9pxkJm0WftSlKhGO2M1GyFdKHp7K2OmlshTTYj0lDKcsXYGpLpwAlK2+0+RWaXO+kIQZC2h/qlQ+YepfNyHtHvaa1XIHhaXH9K+pWecRA7DTGO5ejeujkKzxP2RPy1crEqK4gRmn7KwzQBvlyGns39FDoxWR8vt9f3tebBa/bkjQYiLR3YEjS+TmvpA/op2lDAC87RNp7U259ITJ7FJ1ZUay0aoZcJi4OTHtXOB68uby5vXuslfMOZt1syLteN2v7w3nLV1MNcBDYFQfVZEpT5RWGMbUs6dU21jh8UMDpzffu22Rw1do1g/PJRg1UmS1FM5qSZAAROqeaJKmCAJp2FB9gVlF8RGmzjaEjyNe7Gk+wSCVoOCM4hhwy/3Zgo5zlS2aWnLQeWhu5YGKQeInGwhPI24bo2XNpUPfbAkqXSrWWswC1Qvar3gxQwpLa/AJPPsG/52DZ9B5M8uWQWxT45XrP5aZoYgdpm0Lvntz0KvBiDwP8lyb9kHxbiAeTDJhHGZrG/6ePJF0JSEME6R8WF9phUeZJ5kD3ey7kJf/YXLOggyv6DEifu7T6gpd12S0OR74XJLGRQBqTanFPcBYlwNHDHulxYI9hRO+WXeOgYOAN8Zcje/xnkkRX9aEdRLHtP7Fgq9j5kI0CBYzc45iilcXY5cP21hmIyoxYqnLeNhuWZsFP9FMTe3JWSEuyOPa0LIEcVWvIxhKyJz/XFVWR3uXdz8uDnPgaBJKrSyQDIJCzBmhTzu7YDry9vWZuoVnTXrM20vlVqHAwBatQ4XkMzPbtOMt6neNlxr1brSNW7VaRMXp9Ift1sGjqrpkv+wTbTohDq5pi0T0KrTp4DhXUFl7GE3vCofzC/y16O3jeBKZSjTfrWPQvk+R5ZG4F9rERGvenFdKa+MzfgSOJgBgMVqUp8XK2QalpySVkNI2XmqIdYhoDho/vxaOyvhytvtDIcg/xMvCU7JpF7jmBQdLC5Ky5GZaL5OCmnvHN5adRs/xkBJQu97ct9y9LWnnbKK1iKZF1bfhAMiX+WcsjlUTXqGahiPTWdnwhHBbHDTPIW7nqZIck43YXvLCm8tE8L1Te7KJeZlAsTS8Tm26UDAL1FdgQv/q77lqf7ioqMCk+Udsr7VXbqWN6w3KiB08E6n64uoFsclOyIS3fN0F0jc6sYguvURV1xTXNTK2C+qk/T6+3XpFVVuuXbOO1YOOvVs0nulkxIK2OwmFigHhbdRcRRwjWSFgpdMIEpLIq8VuMUZT4ArMFdnLhuwmbAqwr3MasmsIbsUFTZVjFEPMmzAfbi9+xK2AHJEuugp3ZKnS8betngpnkJa5IF+KVg0udq8TbyHt4vQHpP7kigNT9eXr+v7TFIR5GyXZJMyWIFQ5iQWGFxTsj5P2XOTdr1v93l+fmBiVuHknrP+brzsqCmkaNuTw15X0tU58aK6i8IFZja9/6aztxxddeSEpa0fi729vrttQ+gaXYFFpUgA4s4Dav9nFKviVXVPy0prDijfOXvy7P73+QwISjPycuIoU6C8Eu0zDs0Cgfny/h+TaVBsXSOSuSofIVChsrUSiGljfvX4SY+WnEUw+GzwTN99iTvPJX3mdGBlrGs3kW1QRzanM47lEGeMvZQvXPqUpV0xBoxjTEF3tWJM4mk+2r0pIFWlATOJEUnSxIbTcT9k7DXFRZ5ZBRRRtPtRroklH6wUqDPdKwFaWiNKi/9Pt2RCAw13HBLqzWZ9CPJqIl8z8Y/CLiWKAL0ilaDX9ReMfyu+GNx04p1b+TNYlHrlWzG+Atpl07ipKdDah/p5NLZXhHkaJYTtoEhrBssya3CeQeediXZ/0ewJwJKoeH7peHHfBG9GtSlZdaNtK9Llyq9h6TOvbewHNwy58pFMjohvRf1yOFfJM6/S8wivmZd4udMxqaYDVVkJ7R1JYmYvPnu9G5zN6L3NqLHeFbCISqZmpg7ZdAEJQIvru6vbv68ThXoqUGqwYvUpFXLYsraN8N8bohVczzUPWmQOPt1XfQgZhYXCbCmVKd5GKeXMBnL8lLFftFEbFxfL0ltdyPxnCMwtkHtjEpOWAHZEAcFJBhowpZw85RzGv0FtnW8cnrfHj3jq6vicrl6gfpQixvkReXn2+KeHmLPNmtoZO6VlLSZdnHufCanOd8siqr5c2uPErrRyO9ST7kxmx13cGv/GnmS3HHPJL33qh6yZi8NIv0ZdcdCPNmZMkAFuThalqTtGSN7FBfbYIxT5pyiaqLd/khA0InhHRM5oI9nb45Wf81KZj/4tEn9ynit93u5UWn4FZlqeETf0p2KJnL9eSRawv2fWP1WmkHaGqJcgDP6oqltMjq6m6ddHkDQ9X47boqhTaQgFrVxFB/EP9m8duKzod2aHqeqCpdY7Jt4BuNxVZ5O0j1TZOSvY+k+b+72JXsKoZk9mx6RzLx3k6JDJOWDkrSUh6aqwsaSpNuwbXu3lV1d+c0ejraYAuturvxzXb2oh9de+Yj26UQJTNgfxZDcb9Kgr6xTbhWGQIcB2oLDAEtBLQBoAHeQMBHPcmqaCJOI0Xrn4n5berpTne8kr2DhPNFcHBI14APn3BuGkIFu9Wcc5U34PzLSedrRkJtoTPpFXWmyjlvdaeda+XECiVz+bYU56Ty9pd/OcXbeCe9hPQkmSM9IFUjvXWXAoqlcWo9blq6FBBco0iyufQaoOrlKgnla5pZPmh8rGBiPs9p9wdrpag3k8zkF3iyIHywqdquYvj4mqHpQqAULlZehiVZYtR+FaBjmhw6oE6SCJEAcuY6P6iysI2ir1kc+pogJUARpASAJSJuO/T5tX9x/fUOd31wWyYfv9Yy3AL/w9x7xCJbJNsf8gr47mGsARW8simX3Jon6qyqUF+HCH6NXn5Mfz7/AdfW/eNtVzK+PowXVW0+jWM4nrCqwEWLmHRjv2aHy4xKizLnYD1O5uI1EG3HCVwDnrKkFX18uiGYT0UF7evY0lcIpsKB+YDCJ5o54SSFnQmiR6zIM/6KHztCLrEysZ2UC75AO4qQ49mserY32UdlqgYgNbXom1cNRcCWQGSYKC8Za0NysddvUVyH2ARZJdaDMvwdHJL4wzA/mRLMPeh7AYw664V8NNXVpOZ7Rr4JTe5Fz2hl915MVRMn6+CRi6jKQtZKLvRhvHbwTFP9VamgpuOX6+88nGMvOqmlnSQ0WOgeHlk4sKc+GVgyX5IgqWmUkC8c4LPdfRlo/WTRaqbu8KkDErmkXlRRFwKmLBC6iiYQutb6Qhf/DBGBam58CO3J6AZhpsSN/wc=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index ab7222ae..690402d6 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -10,6 +10,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pool" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" @@ -35,13 +36,16 @@ type Plugin struct { // priority queue implementation queue priorityqueue.Queue + + // parent config for broken options. + pipelines pipeline.Pipelines } func testListener(data interface{}) { fmt.Println(data) } -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, pq priorityqueue.Queue) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { const op = errors.Op("jobs_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) @@ -52,10 +56,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } - err = p.cfg.InitDefaults() - if err != nil { - return errors.E(op, err) - } + p.cfg.InitDefaults() p.server = server p.events = events.NewEventsHandler() @@ -63,8 +64,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.brokers = make(map[string]Broker) p.consumers = make(map[string]Consumer) + // initial set of pipelines + p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines) + if err != nil { + return errors.E(op, err) + } + // initialize priority queue - p.queue = pq + p.queue = priorityqueue.NewPriorityQueue() p.log = log return nil @@ -132,14 +139,7 @@ func (p *Plugin) Name() string { } func (p *Plugin) Push(j *structs.Job) (string, error) { - pipe, pOpts, err := p.cfg.MatchPipeline(j) - if err != nil { - panic(err) - } - - if pOpts != nil { - j.Options.Merge(pOpts) - } + pipe := p.pipelines.Get(j.Options.Pipeline) broker, ok := p.consumers[pipe.Broker()] if !ok { diff --git a/plugins/jobs/pq_plugin/plugin.go b/plugins/jobs/pq_plugin/plugin.go deleted file mode 100644 index 7df846ac..00000000 --- a/plugins/jobs/pq_plugin/plugin.go +++ /dev/null @@ -1,34 +0,0 @@ -package pq_plugin //nolint:stylecheck - -import ( - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "internal_pq" -) - -type Plugin struct { - log logger.Logger - pq priorityqueue.Queue -} - -func (p *Plugin) Init(log logger.Logger) error { - p.log = log - p.pq = priorityqueue.NewPriorityQueue() - return nil -} - -func (p *Plugin) Push(item interface{}) { - p.pq.Push(item) - // no-op -} - -func (p *Plugin) Pop() interface{} { - return p.pq.Pop() -} - -func (p *Plugin) Name() string { - return PluginName -} diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index b21f764c..320f41b1 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -6,52 +6,49 @@ server: relay: "pipes" relay_timeout: "20s" -jobs: - # worker pool configuration - pool: - num_workers: 4 - - # rabbitmq and similar servers - amqp: - addr: amqp://guest:guest@localhost:5672/ +amqp: + addr: amqp://guest:guest@localhost:5672/ # beanstalk configuration - beanstalk: - addr: tcp://localhost:11300 +beanstalk: + addr: tcp://localhost:11300 # amazon sqs configuration - sqs: - key: api-key - secret: api-secret - region: us-west-1 - endpoint: http://localhost:9324 - - # job destinations and options - dispatch: - spiral-jobs-tests-amqp-*.pipeline: amqp - spiral-jobs-tests-local-*.pipeline: local - spiral-jobs-tests-beanstalk-*.pipeline: beanstalk - spiral-jobs-tests-sqs-*.pipeline: sqs - - # list of broker pipelines associated with endpoints - pipelines: - local: - broker: ephemeral +sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://localhost:9324 - amqp: - broker: amqp - queue: default - beanstalk: - broker: beanstalk - tube: default +jobs: + # worker pool configuration + pool: + num_workers: 4 - sqs: - broker: sqs - queue: default + # list of broker pipelines associated with endpoints + pipelines: + test-local: + driver: ephemeral + priority: 10 + + test-1: + driver: amqp + priority: 1 + queue: default + + test-2: + driver: beanstalk + priority: 11 + tube: default + + test-3: + # priority: 11 - not defined, 10 by default + driver: sqs + queue: default declare: MessageRetentionPeriod: 86400 # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: ["local", "amqp", "beanstalk", "sqs"] + consume: [ "test-local", "test-1" ] diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go index 2c58c344..e8b4e83d 100644 --- a/tests/plugins/jobs/jobs_plugin_test.go +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -12,7 +12,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs" "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral" - "github.com/spiral/roadrunner/v2/plugins/jobs/pq_plugin" "github.com/spiral/roadrunner/v2/plugins/logger" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -35,7 +34,6 @@ func TestJobsInit(t *testing.T) { &logger.ZapLogger{}, &jobs.Plugin{}, &ephemeral.Plugin{}, - &pq_plugin.Plugin{}, ) assert.NoError(t, err) |