diff options
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 62 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/config.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/entry.go | 21 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 28 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/queue.go | 7 | ||||
-rw-r--r-- | plugins/jobs/config.go | 71 | ||||
-rw-r--r-- | plugins/jobs/dispatcher/dispatcher.go | 49 | ||||
-rw-r--r-- | plugins/jobs/dispatcher/dispatcher_test.go | 55 | ||||
-rw-r--r-- | plugins/jobs/doc/jobs_arch.drawio | 1 | ||||
-rw-r--r-- | plugins/jobs/interface.go | 26 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline.go | 172 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline_test.go | 90 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 164 | ||||
-rw-r--r-- | plugins/jobs/pq_plugin/plugin.go | 34 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 20 | ||||
-rw-r--r-- | plugins/jobs/structs/job.go | 35 | ||||
-rw-r--r-- | plugins/jobs/structs/job_options.go | 70 | ||||
-rw-r--r-- | plugins/jobs/structs/job_options_test.go | 110 | ||||
-rw-r--r-- | plugins/jobs/structs/job_test.go | 19 |
19 files changed, 1035 insertions, 0 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go new file mode 100644 index 00000000..95f476a6 --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -0,0 +1,62 @@ +package ephemeral + +import ( + "github.com/google/uuid" + "github.com/spiral/errors" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +type JobBroker struct { + jobs chan *entry + queues map[*pipeline.Pipeline]*queue + pq priorityqueue.Queue +} + +func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { + jb := &JobBroker{ + jobs: make(chan *entry, 10), + pq: q, + } + + go jb.serve() + + return jb, nil +} + +func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) { + id := uuid.NewString() + + j.jobs <- &entry{ + id: id, + } + + return id, nil +} + +func (j *JobBroker) Stat() { + panic("implement me") +} + +func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) { + panic("implement me") +} + +func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { + const op = errors.Op("ephemeral_register") + if _, ok := j.queues[pipeline]; !ok { + return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name())) + } + + j.queues[pipeline] = newQueue() + + return nil +} + +func (j *JobBroker) serve() { + for item := range j.jobs { + // item should satisfy + j.pq.Push(item) + } +} diff --git a/plugins/jobs/brokers/ephemeral/config.go b/plugins/jobs/brokers/ephemeral/config.go new file mode 100644 index 00000000..847b63ea --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/config.go @@ -0,0 +1 @@ +package ephemeral diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go new file mode 100644 index 00000000..bf8796d5 --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/entry.go @@ -0,0 +1,21 @@ +package ephemeral + +type entry struct { + id string +} + +func (e *entry) ID() string { + return e.id +} + +func (e *entry) Ask() { + // no-op +} + +func (e *entry) Nack() { + // no-op +} + +func (e *entry) Payload() []byte { + panic("implement me") +} diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go new file mode 100644 index 00000000..84cc871b --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -0,0 +1,28 @@ +package ephemeral + +import ( + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "ephemeral" +) + +type Plugin struct { + log logger.Logger +} + +func (p *Plugin) Init(log logger.Logger) error { + p.log = log + return nil +} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) { + return NewJobBroker(q) +} diff --git a/plugins/jobs/brokers/ephemeral/queue.go b/plugins/jobs/brokers/ephemeral/queue.go new file mode 100644 index 00000000..1c6d865b --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/queue.go @@ -0,0 +1,7 @@ +package ephemeral + +type queue struct{} + +func newQueue() *queue { + return &queue{} +} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go new file mode 100644 index 00000000..bb042ec9 --- /dev/null +++ b/plugins/jobs/config.go @@ -0,0 +1,71 @@ +package jobs + +import ( + "github.com/spiral/errors" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/jobs/dispatcher" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +// Config defines settings for job broker, workers and job-pipeline mapping. +type Config struct { + // Workers configures roadrunner server and worker busy. + // Workers *roadrunner.ServerConfig + poolCfg *poolImpl.Config + + // Dispatch defines where and how to match jobs. + Dispatch map[string]*structs.Options + + // Pipelines defines mapping between PHP job pipeline and associated job broker. + Pipelines map[string]*pipeline.Pipeline + + // Consuming specifies names of pipelines to be consumed on service start. + Consume []string + + // parent config for broken options. + pipelines pipeline.Pipelines + route dispatcher.Dispatcher +} + +func (c *Config) InitDefaults() error { + const op = errors.Op("config_init_defaults") + var err error + c.pipelines, err = pipeline.InitPipelines(c.Pipelines) + if err != nil { + return errors.E(op, err) + } + + if c.poolCfg == nil { + c.poolCfg = &poolImpl.Config{} + } + + c.poolCfg.InitDefaults() + + return nil +} + +// MatchPipeline locates the pipeline associated with the job. +func (c *Config) MatchPipeline(job *structs.Job) (*pipeline.Pipeline, *structs.Options, error) { + const op = errors.Op("config_match_pipeline") + opt := c.route.Match(job) + + pipe := "" + if job.Options != nil { + pipe = job.Options.Pipeline + } + + if pipe == "" && opt != nil { + pipe = opt.Pipeline + } + + if pipe == "" { + return nil, nil, errors.E(op, errors.Errorf("unable to locate pipeline for `%s`", job.Job)) + } + + if p := c.pipelines.Get(pipe); p != nil { + return p, opt, nil + } + + return nil, nil, errors.E(op, errors.Errorf("undefined pipeline `%s`", pipe)) +} diff --git a/plugins/jobs/dispatcher/dispatcher.go b/plugins/jobs/dispatcher/dispatcher.go new file mode 100644 index 00000000..e73e7b74 --- /dev/null +++ b/plugins/jobs/dispatcher/dispatcher.go @@ -0,0 +1,49 @@ +package dispatcher + +import ( + "strings" + + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +var separators = []string{"/", "-", "\\"} + +// Dispatcher provides ability to automatically locate the pipeline for the specific job +// and update job options (if none set). +type Dispatcher map[string]*structs.Options + +// pre-compile patterns +func initDispatcher(routes map[string]*structs.Options) Dispatcher { + dispatcher := make(Dispatcher) + for pattern, opts := range routes { + pattern = strings.ToLower(pattern) + pattern = strings.Trim(pattern, "-.*") + + for _, s := range separators { + pattern = strings.ReplaceAll(pattern, s, ".") + } + + dispatcher[pattern] = opts + } + + return dispatcher +} + +// Match clarifies target job pipeline and other job options. Can return nil. +func (dispatcher Dispatcher) Match(job *structs.Job) (found *structs.Options) { + var best = 0 + + jobName := strings.ToLower(job.Job) + for pattern, opts := range dispatcher { + if strings.HasPrefix(jobName, pattern) && len(pattern) > best { + found = opts + best = len(pattern) + } + } + + if best == 0 { + return nil + } + + return found +} diff --git a/plugins/jobs/dispatcher/dispatcher_test.go b/plugins/jobs/dispatcher/dispatcher_test.go new file mode 100644 index 00000000..e584bda8 --- /dev/null +++ b/plugins/jobs/dispatcher/dispatcher_test.go @@ -0,0 +1,55 @@ +package dispatcher + +import ( + "testing" + + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/stretchr/testify/assert" +) + +func Test_Map_All(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{"default": {Pipeline: "default"}}) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "default"}).Pipeline) +} + +func Test_Map_Miss(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{"some.*": {Pipeline: "default"}}) + + assert.Nil(t, m.Match(&structs.Job{Job: "miss"})) +} + +func Test_Map_Best(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline) +} + +func Test_Map_BestUpper(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.Other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.OTHER"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "Some.other.job"}).Pipeline) +} + +func Test_Map_BestReversed(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) +} diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio new file mode 100644 index 00000000..be941255 --- /dev/null +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-06-29T17:21:10.029Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="Bl_lIai7C62ty2EayK1b" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1bc9o4FP41zKSdSca2fH3MtU03aWhots2+GSzAjbGobZLQX7+SLINtCXDAMtDA7jQg32R9536OpBY4H71+itzx8BZ5MGhpivfaAhctTVMdw8Z/SMuUtajASlsGke+xtnlDx/8DWaPCWie+B+PCiQlCQeKPi409FIawlxTa3ChCL8XT+igoPnXsDiDX0Om5Ad/6w/eSYdY7/Jkf+Qz9wTDhDo3c7HzWEA9dD73kmsBlC5xHCCXpt9HrOQzIAGZDk153teDorG8RDJMqF3Tasfl4fjt1vyB9qA6+fr18io511rlnN5iwl76/u/uOW77cnXXwn/bNw6frr+wNkmk2MhGahB4kd1Za4Oxl6CewM3Z75OgLJgbcNkxGAf6l4q99FCZX7sgPCB10rvCdblGI8AE38AchbovSETwL3C4M2ij2Ex+R9gD2SfMzjBIfo3JTOpygce7oKbtZFyUJGpHH+kFwjgIU0S6Dfh+avR5uj5MIPcHcEc9yugp5D35I2SiTZ8DXXBMb4k8QjWASTfEp7KgONO3ESK/KyN42WcPLnIg0U0/bhnny0R1GvIxwB7P7z4HFXxi2Ypx/huHrZxVf9/wnCa1P3+B/Su9Y0zmcN8S0OLieC+2+cHDNng27fUYFuXaFfpZRRx1oZJKEQWEbJo+ErQiQyLi2fiS4cYceFjrsJ4qSIRqg0A0u561nRWTm59wgwgEUj18wSaZMgrqTBBXRwiMYTX+S6zFtsp+P+WMXr+zm6a8p+/V2bGI0iXpw2fuzAUjcaACTJSeyG5LRWQp1BAM38Z+LIlsEG7u0jXzc5zmJWEVutVSzeIu0o+yqvHzlblSkNQuUbpQODXej0yhyp7nTxuSEmKOz2RuvT3pgK6T36ic5ysO/HjNSw9/ndEd+TPNEuBvkqlakVr0pai0SmaFUo9a6SCiTwDlzoY3vpBwTU+Gh8xnfNP2DOzQNe/XqGAPani7SMbbWBaa5LR2jlCAxeR2j6gIdo9uSVIyzVT7Pc/mM59/G50rzfK5X1UpKM4wOylSlNczoB2UhkYjMhrSF6ZzYjmkYqoVNYWA5BZICZrMkxbsgt27SGxLNcd2+vLn+elmruoAqVhiWSF04pgXcramLkkuii9SFJlAXAEhSF9Y2OV3N8/mJtTf6wqjI6rVz+kZQ23sn1NUCznPYdw/p2v3VjZA2OGHb0syARNPGBQIwf09IAPJshF/PD1vgFB9Vxq/4XzpoStp+TENt5JieO4YlXnLMYnjkWA+PLYyKhz3YQ5GbBuzIOYSWosAP4fzR+NuA/aUd7GYNX1AXD8tpJ4kmvWQSwewEPCDd8kW4bVxuG0bllrXePY1CkoM2OVjuNu4lveKC3jvyw8HsRWbPb7vTALneyvPuxmSk4tl5s9+ilyxxcjkOmkZHsxArC6UifFY/oGFooi4ZxzDWVbVlHLRY0/YN8p9I05r0w2va9FOPQjX1okI1dYFCNQUK1ZEV4jP/At7LKG/n2a/tj2H6Tit46wLL3enKs06TBI7GyZwFqTG8+CVlvNI9ocJid3O96HLD2lS/vvsjiCbJOp06yCp61LbWlFWqKktYqSqHztbM/8rWv7ZtTz+Do/m48GZgC9K9mKPhCfXEj36h7odaffC+3YPinGvXNnRD4bmtER9ctUvBNVDVB5fGhRoHTASx0g3zRmAJGKLji6OfCVBmDPC58ZHveSkDw9j/43bprchos6QTvq9x1jIuyL0wz8ZM1nIAhiiEkjECilXESLN4jFRREt2ShREfwBpj46OlkQQIepcg6WYRJF2xeZAyIJsBiXd8cxbiMbZI3BGRWcwuwZbPGI8mswaNC59g0ifSzTprWRfvDU5LK8KZxRwLhStak3BuNTi5p2mILL2wOmRl122drJOH0KyGU1u8fz6C+IlktOeiYpFAfwcpCbB9c2i7Kew9ZXu7IttrDeWwl7O92nD6MRueHNt/gqS3Z4QXSeisH6ERGb8hpO8+8GOs0SGJ2nbpKSQo9Oy7TE6cpNcd1ewx9fvagipVs2sauyIiNK2iiDBliQiNd2Xbk3h4NDfJU29WwVyOH5/DuUa09qTsVa2Kli4NLb5wbAEcf7mFbdtFZGzeYTKMBg1sjQ88cJCgSUKMovPZLAtB3Ab/d0UeezaIXM+H82PZoBYZZ3a6EAHPjYczjlxU/T+r818eTJ5RTWlCQjmOnNkW5JFuPE5ftO+/kn6khASjy2eY0hOlnaE7JheMXgdk+suJ+xLrJ5OYPksqCQG1QEK2xTO3ZfMkZMkqN9QAL4k/t3HDj7v7fy7vOXLKBm4coR6M49VCt+v2ngZUTN+llLhAde5msNEwi8LYdvQTfj6IWBzXEAf+Y/a9F2Bf6U+j08dPk9e25Xw+5hFrLkMpSD2mIZmA/zUPx5TiNqVfC3JQszQlFl3xZESNvKPUlvuwpWymcKi49NuycaRncaPYSdzk6GMW6iJ2z9FHbADFJ+RAag/BKEKF964naSjrjag193EevCOv8PFLatMdZenbPXwvRosFsPak6/fMJyoSWgtcsf8xMFi6Blh3ElcpRsEzjBe83QKdEFG9mxf+fBZYpC2kCnDsxxaNNl3h46LAEgnwOgxqoQQ/BEiWBkiEYyao5BOeV3tUdCOkBQkNOsNmIzdWZnLV1IoGj6Nn6dZCVkhSPHEZorkx/H1CtCMxS4YudmuCXR7PkjfvAEcwnrLis/H9RVf59fTrh/X0reP+PP9hfn/IKtz2SPo0Wkd87zzcxMZzp2MH/z3+1GL3h38jmkkoHFs5MwkXzzIVx25VxypFKgzbytMNf4WZacX1rzA1q0Sbab/XDQQvAyInDL6efu9sKADWSvQIKWyRj1qHKHG08oAroqoKSZFBIblvdUrCeuVnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTS0snwBtrFUBWAJtPEVhlqv0hADoewO9ap7TL3LNPLOUa/tLDdgeFp8+xV1mzxiILZa5J2b17nv5Cs8T9sR8jXKxKivIEZl8yss0AT5chZ7O/JR5CdkfL49XD7UO3PcdBWl3xdZ78BSlPNz3kjv04/sQAAvO0TWu6y8vhCZHSrPrChWGg1DLhMXe2a9axwP3l7e3t0/1sp5e+M3W+q2/WZjdzhvuTclgYPAtjioplCarq4IjOllSa834ePwVQGnt9/a75PBdWfbDM7PNpKwMmupnNFWFAuI0Dk1FEUXVNA0Y/gAu4rhI5o3Kw0dwYS969EYi1SCRm8IR5BD5u+ubFRVUExTqUDls+TSJoOJQeIlGqtPIG8boWffo1Xd7wsoUymtTzyrUMsDlc0BqRso4TLUvIOnnuDfc7Bceg8m+XLILar88vznclM8dsOsTaN3T296HfqJjwH+Q2f9kAm3EA8mGTCfMjSdAJA9knQlJA0xpH9YYWiLlZmnUwfa33I1L/nH5poFHVzRZ0D63KbLL/izLnvF4cj3gsxsJJAmZIW1JziNU+DoYZ/0OHRHMKZ3m13TQ2HfH+AvR+7o9zgtr+pCN4wTN3hi1VZJ78NsFChg5B7HFK1ZkV2+bu8tA1GZEUsrgzfNhiUteEU/NbEnF4V0FIdjT8cRyFFdl8SevK4rmiKdy/t/L/dS8UkEkluYSAVAIGct0KSc3XIcePN4zTxC88Z4zZuRznuhwsEUeKHC8xiYzcdxlvU6x8uMezfyI1bt8DBj9Ppq9utg0SxdM3f7BFs1iEurZLHoDpVW7T2HCtbjXcYTO8KhvOP/HrMdPG8CW6vGm3U4/cskeR6ZO0F8bIhG3UmFeU381N9+TxEB0e+vmqfEy1mJUtNRS8gYBi81RbuqSAOGr+/Fo/J2OVrd0ZhNPsRu4CnZaYrccwzDtIXJWXs9LBfJwXUz4+vLT6tm+ckIKHP3N10iX1WM8lZLRsW1RN4awweKrfDPWl6pJLpGt/USsW8WxxfC4XDcMIV8lKtOdkin3G6DF95ofMjnhcobRNTLDJpjmGViM61SQKC+FTbEr36wXeuzXUUrTIpPNHbKejW2mphecz3RvScCfTdS3UC1OZVsKQWVV+kak0XFFl6ja/qKa+SoVsECqv+e3mzskVU265dsfbVgs6xGwyemXbEgDcgKVQvysm1EEiHYImFroRMmIEurkrzFCMVpLnDmYKcXHkLYFGCT38zU0PggNpC1DqsYYj6E+cP1kwN2BeyA4qhVsLMbhY6Pbf2bYqb4aSrSg9hz8GhylWQb+Qyv3yf9J1eEkKY/T8//yVp6JMOouB5ppgSxIkEsWFlh8dYI+fxlLs066/8h5bl+QMkEnDbPloDMLz2rCpY1kpb0NLSDN1OfIStYe0FsyG5tw6yl/c7rGrqqFa3Au7u7acrwE8SKbWFMBZjAAZ58w48z8x21oulnyMKKD89f/rw8f/hOShOOfp94iKzVWSh3mURRi9b5BPwqnu/TbMCymIsjWTq/SKG0VQrF0PIB/osIMz+teerA6Jmgeag+KSwBqpWrT4wZz+ZZ1BDoVHk47tAc8IbnC9WvU7WqwSEgJzjEr/esG1xUZra1SkMxaMGywKmkaM3K1LajsLda6KIDh0NGF+091Wipy4zS91Ya7JCFrWkVpUH9q79vRgSCgB1X7sKW+wy78VjkNP+F5S8ijgWmYEJFowUwGp9aPoTeeOz00jZ/qiFYMLLRwBvgY6ZtN47TzQ1ohqeVm8xwQJGgaGilaZvAEq7cbKhNArlDOfbl8373QGeCygWiu5VjB3wY/YYszEsjG9l2Fx41e4/JUvZ+3+/hlt8TKJDRkuxfzydr+aZL9b/AOOE17wabZ0hSsIYumKAha1cTcfjzEHQus/eixPbiVPgGAqFqmBo4uyUQBIsE31/f3V9/f5wb0YrEdYMXmcir3OIK1rckXreUijM9dFMWaHy8+h72ICYWj4lwZlSnszFPLuCzn85MFWdGEYlxfLkjy7kfjeAIRdMPbG9ScsANyYD0UEiGjRpkktOjmNfoLWa7x6ev8+GQH327P8EvGifIjooXoJdFvHxEnmzY0MpSKxnpsvnHuQKbXO58vGpey7v1PEr+o5XdJF90Yzfqd/CeP537Utw0j8x8l2peMiYvaZGu6nl94cwZVbGAA3m4ZFuSjmqQTeqrKRj7RFZKVF+80Q8ZEKoQsjGZC/ZMfXOy/ku6ZP6LT5/cpYjftduXF61CWpVNDh8HE7JJyVyup498s2DfNVavlXZMVh07pxzAs7rmaA2yur7dJF0+wFC1gruutUIlTEGtGmKov4x/vQpuzeRLOwwzT1SVrrHZTvBSq7F1Pg5Sfd+kdPsjZf7vNjYmY+a65D3JxLs7pSJMWTomaUt5ZE7jp6M1dq2quxtf3d5O9KPtTgPkerS6KNU43WkCxf0qCVZp+16tcrx7PWgscLwbKCADwOAqPi2FrzJSdZHik1ZBpvM7w9PN5XijdgtTvBfBwSFdAz78FG/bEhq0jc7y1vmAyd88zfuNlUcb2ChmRRul8iyzuid6G+WpDGQp8uJtJNcV6Xy842+eVG0dSC8lPUXlSA8o1Ujvraa35hicGY2blpregms0RbWXXgOw07niGjnmusHX5qX2b86a3tuoQL1zt2zeoVIF5XqyVlMVw8ev0pk5AqXyrLLbk87LovGiEB3T6Zh9mpSIESnYZqnqvVqIVSr6hsOhbwhK8DVBCT5YIuI2Q5/3tYv+1wHu+uB2bL5erGG4BfH+ebaGVZIobjDgDfDtw1gDKtizKS9yNZ8Ys2pN+DVEMP4ZIRJVmCtc7EwOb5EHyRn/Aw==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go new file mode 100644 index 00000000..a0aed50b --- /dev/null +++ b/plugins/jobs/interface.go @@ -0,0 +1,26 @@ +package jobs + +import ( + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +// Consumer todo naming +type Consumer interface { + Push(*pipeline.Pipeline, *structs.Job) (string, error) + Stat() + Consume(*pipeline.Pipeline) + Register(*pipeline.Pipeline) error +} + +type Broker interface { + InitJobBroker(queue priorityqueue.Queue) (Consumer, error) +} + +type Item interface { + ID() string + Ask() + Nack() + Payload() []byte +} diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go new file mode 100644 index 00000000..f27f6ede --- /dev/null +++ b/plugins/jobs/pipeline/pipeline.go @@ -0,0 +1,172 @@ +package pipeline + +import ( + "time" + + "github.com/spiral/errors" +) + +// Pipelines is list of Pipeline. + +type Pipelines []*Pipeline + +func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) { + const op = errors.Op("pipeline_init") + out := make(Pipelines, 0) + + for name, pipe := range pipes { + if pipe.Broker() == "" { + return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker")) + } + + p := pipe.With("name", name) + out = append(out, &p) + } + + return out, nil +} + +// Reverse returns pipelines in reversed order. +func (ps Pipelines) Reverse() Pipelines { + out := make(Pipelines, len(ps)) + + for i, p := range ps { + out[len(ps)-i-1] = p + } + + return out +} + +// Broker return pipelines associated with specific broker. +func (ps Pipelines) Broker(broker string) Pipelines { + out := make(Pipelines, 0) + + for _, p := range ps { + if p.Broker() != broker { + continue + } + + out = append(out, p) + } + + return out +} + +// Names returns only pipelines with specified names. +func (ps Pipelines) Names(only ...string) Pipelines { + out := make(Pipelines, 0) + + for _, name := range only { + for _, p := range ps { + if p.Name() == name { + out = append(out, p) + } + } + } + + return out +} + +// Get returns pipeline by it'svc name. +func (ps Pipelines) Get(name string) *Pipeline { + // possibly optimize + for _, p := range ps { + if p.Name() == name { + return p + } + } + + return nil +} + +// Pipeline defines pipeline options. +type Pipeline map[string]interface{} + +// With pipeline value. Immutable. +func (p Pipeline) With(name string, value interface{}) Pipeline { + out := make(map[string]interface{}) + for k, v := range p { + out[k] = v + } + out[name] = value + + return out +} + +// Name returns pipeline name. +func (p Pipeline) Name() string { + return p.String("name", "") +} + +// Broker associated with the pipeline. +func (p Pipeline) Broker() string { + return p.String("broker", "") +} + +// Has checks if value presented in pipeline. +func (p Pipeline) Has(name string) bool { + if _, ok := p[name]; ok { + return true + } + + return false +} + +// Map must return nested map value or empty config. +func (p Pipeline) Map(name string) Pipeline { + out := make(map[string]interface{}) + + if value, ok := p[name]; ok { + if m, ok := value.(map[string]interface{}); ok { + for k, v := range m { + out[k] = v + } + } + } + + return out +} + +// Bool must return option value as string or return default value. +func (p Pipeline) Bool(name string, d bool) bool { + if value, ok := p[name]; ok { + if b, ok := value.(bool); ok { + return b + } + } + + return d +} + +// String must return option value as string or return default value. +func (p Pipeline) String(name string, d string) string { + if value, ok := p[name]; ok { + if str, ok := value.(string); ok { + return str + } + } + + return d +} + +// Integer must return option value as string or return default value. +func (p Pipeline) Integer(name string, d int) int { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return str + } + } + + return d +} + +// Duration must return option value as time.Duration (seconds) or return default value. +func (p Pipeline) Duration(name string, d time.Duration) time.Duration { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return time.Second * time.Duration(str) + } + } + + return d +} diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go new file mode 100644 index 00000000..f03dcbb8 --- /dev/null +++ b/plugins/jobs/pipeline/pipeline_test.go @@ -0,0 +1,90 @@ +package pipeline + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPipeline_Map(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} + + assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0)) + assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0)) +} + +func TestPipeline_MapString(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}} + + assert.Equal(t, "default", pipe.Map("options").String("alias", "")) + assert.Equal(t, "", pipe.Map("other").String("alias", "")) +} + +func TestPipeline_Bool(t *testing.T) { + pipe := Pipeline{"value": true} + + assert.Equal(t, true, pipe.Bool("value", false)) + assert.Equal(t, true, pipe.Bool("other", true)) +} + +func TestPipeline_String(t *testing.T) { + pipe := Pipeline{"value": "value"} + + assert.Equal(t, "value", pipe.String("value", "")) + assert.Equal(t, "value", pipe.String("other", "value")) +} + +func TestPipeline_Integer(t *testing.T) { + pipe := Pipeline{"value": 1} + + assert.Equal(t, 1, pipe.Integer("value", 0)) + assert.Equal(t, 1, pipe.Integer("other", 1)) +} + +func TestPipeline_Duration(t *testing.T) { + pipe := Pipeline{"value": 1} + + assert.Equal(t, time.Second, pipe.Duration("value", 0)) + assert.Equal(t, time.Second, pipe.Duration("other", time.Second)) +} + +func TestPipeline_Has(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} + + assert.Equal(t, true, pipe.Has("options")) + assert.Equal(t, false, pipe.Has("other")) +} + +func TestPipeline_FilterBroker(t *testing.T) { + pipes := Pipelines{ + &Pipeline{"name": "first", "broker": "a"}, + &Pipeline{"name": "second", "broker": "a"}, + &Pipeline{"name": "third", "broker": "b"}, + &Pipeline{"name": "forth", "broker": "b"}, + } + + filtered := pipes.Names("first", "third") + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "a", filtered[0].Broker()) + assert.Equal(t, "b", filtered[1].Broker()) + + filtered = pipes.Names("first", "third").Reverse() + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "a", filtered[1].Broker()) + assert.Equal(t, "b", filtered[0].Broker()) + + filtered = pipes.Broker("a") + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "first", filtered[0].Name()) + assert.Equal(t, "second", filtered[1].Name()) + + filtered = pipes.Broker("a").Reverse() + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "first", filtered[1].Name()) + assert.Equal(t, "second", filtered[0].Name()) +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go new file mode 100644 index 00000000..ab7222ae --- /dev/null +++ b/plugins/jobs/plugin.go @@ -0,0 +1,164 @@ +package jobs + +import ( + "context" + "fmt" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" +) + +const ( + // RrJobs env variable + RrJobs string = "rr_jobs" + PluginName string = "jobs" +) + +type Plugin struct { + cfg *Config + log logger.Logger + + workersPool pool.Pool + server server.Server + + brokers map[string]Broker + consumers map[string]Consumer + + events events.Handler + + // priority queue implementation + queue priorityqueue.Queue +} + +func testListener(data interface{}) { + fmt.Println(data) +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, pq priorityqueue.Queue) error { + const op = errors.Op("jobs_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, err) + } + + err = p.cfg.InitDefaults() + if err != nil { + return errors.E(op, err) + } + + p.server = server + p.events = events.NewEventsHandler() + p.events.AddListener(testListener) + p.brokers = make(map[string]Broker) + p.consumers = make(map[string]Consumer) + + // initialize priority queue + p.queue = pq + p.log = log + + return nil +} + +func (p *Plugin) Serve() chan error { + errCh := make(chan error, 1) + + for name := range p.brokers { + jb, err := p.brokers[name].InitJobBroker(p.queue) + if err != nil { + errCh <- err + return errCh + } + + p.consumers[name] = jb + } + + var err error + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) + if err != nil { + errCh <- err + return errCh + } + + // initialize sub-plugins + // provide a queue to them + // start consume loop + // start resp loop + + /* + go func() { + for { + // get data JOB from the queue + job := p.queue.Pop() + + // request + _ = job + p.workersPool.Exec(nil) + } + }() + + */ + return errCh +} + +func (p *Plugin) Stop() error { + return nil +} + +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.CollectMQBrokers, + } +} + +func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) { + p.brokers[name.Name()] = c +} + +func (p *Plugin) Available() {} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Push(j *structs.Job) (string, error) { + pipe, pOpts, err := p.cfg.MatchPipeline(j) + if err != nil { + panic(err) + } + + if pOpts != nil { + j.Options.Merge(pOpts) + } + + broker, ok := p.consumers[pipe.Broker()] + if !ok { + panic("broker not found") + } + + id, err := broker.Push(pipe, j) + if err != nil { + panic(err) + } + + // p.events.Push() + + return id, nil +} + +func (p *Plugin) RPC() interface{} { + return &rpc{ + log: p.log, + p: p, + } +} diff --git a/plugins/jobs/pq_plugin/plugin.go b/plugins/jobs/pq_plugin/plugin.go new file mode 100644 index 00000000..7df846ac --- /dev/null +++ b/plugins/jobs/pq_plugin/plugin.go @@ -0,0 +1,34 @@ +package pq_plugin //nolint:stylecheck + +import ( + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "internal_pq" +) + +type Plugin struct { + log logger.Logger + pq priorityqueue.Queue +} + +func (p *Plugin) Init(log logger.Logger) error { + p.log = log + p.pq = priorityqueue.NewPriorityQueue() + return nil +} + +func (p *Plugin) Push(item interface{}) { + p.pq.Push(item) + // no-op +} + +func (p *Plugin) Pop() interface{} { + return p.pq.Pop() +} + +func (p *Plugin) Name() string { + return PluginName +} diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go new file mode 100644 index 00000000..e77cda59 --- /dev/null +++ b/plugins/jobs/rpc.go @@ -0,0 +1,20 @@ +package jobs + +import ( + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type rpc struct { + log logger.Logger + p *Plugin +} + +func (r *rpc) Push(j *structs.Job, idRet *string) error { + id, err := r.p.Push(j) + if err != nil { + panic(err) + } + *idRet = id + return nil +} diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go new file mode 100644 index 00000000..2e394543 --- /dev/null +++ b/plugins/jobs/structs/job.go @@ -0,0 +1,35 @@ +package structs + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/utils" +) + +// Job carries information about single job. +type Job struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Body packs job payload into binary payload. +func (j *Job) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (j *Job) Context(id string) []byte { + ctx, _ := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + }{ID: id, Job: j.Job}, + ) + + return ctx +} diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go new file mode 100644 index 00000000..1507d053 --- /dev/null +++ b/plugins/jobs/structs/job_options.go @@ -0,0 +1,70 @@ +package structs + +import "time" + +// Options carry information about how to handle given job. +type Options struct { + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int `json:"delay,omitempty"` + + // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). + // Minimum valuable value is 2. + Attempts int `json:"maxAttempts,omitempty"` + + // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. + RetryDelay int `json:"retryDelay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout int `json:"timeout,omitempty"` +} + +// Merge merges job options. +func (o *Options) Merge(from *Options) { + if o.Pipeline == "" { + o.Pipeline = from.Pipeline + } + + if o.Attempts == 0 { + o.Attempts = from.Attempts + } + + if o.Timeout == 0 { + o.Timeout = from.Timeout + } + + if o.RetryDelay == 0 { + o.RetryDelay = from.RetryDelay + } + + if o.Delay == 0 { + o.Delay = from.Delay + } +} + +// CanRetry must return true if broker is allowed to re-run the job. +func (o *Options) CanRetry(attempt int) bool { + // Attempts 1 and 0 has identical effect + return o.Attempts > (attempt + 1) +} + +// RetryDuration returns retry delay duration in a form of time.Duration. +func (o *Options) RetryDuration() time.Duration { + return time.Second * time.Duration(o.RetryDelay) +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/structs/job_options_test.go new file mode 100644 index 00000000..18702394 --- /dev/null +++ b/plugins/jobs/structs/job_options_test.go @@ -0,0 +1,110 @@ +package structs + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestOptions_CanRetry(t *testing.T) { + opts := &Options{Attempts: 0} + + assert.False(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) +} + +func TestOptions_CanRetry_SameValue(t *testing.T) { + opts := &Options{Attempts: 1} + + assert.False(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) +} + +func TestOptions_CanRetry_Value(t *testing.T) { + opts := &Options{Attempts: 2} + + assert.True(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) + assert.False(t, opts.CanRetry(2)) +} + +func TestOptions_CanRetry_Value3(t *testing.T) { + opts := &Options{Attempts: 3} + + assert.True(t, opts.CanRetry(0)) + assert.True(t, opts.CanRetry(1)) + assert.False(t, opts.CanRetry(2)) +} + +func TestOptions_RetryDuration(t *testing.T) { + opts := &Options{RetryDelay: 0} + assert.Equal(t, time.Duration(0), opts.RetryDuration()) +} + +func TestOptions_RetryDuration2(t *testing.T) { + opts := &Options{RetryDelay: 1} + assert.Equal(t, time.Second, opts.RetryDuration()) +} + +func TestOptions_DelayDuration(t *testing.T) { + opts := &Options{Delay: 0} + assert.Equal(t, time.Duration(0), opts.DelayDuration()) +} + +func TestOptions_DelayDuration2(t *testing.T) { + opts := &Options{Delay: 1} + assert.Equal(t, time.Second, opts.DelayDuration()) +} + +func TestOptions_TimeoutDuration(t *testing.T) { + opts := &Options{Timeout: 0} + assert.Equal(t, time.Minute*30, opts.TimeoutDuration()) +} + +func TestOptions_TimeoutDuration2(t *testing.T) { + opts := &Options{Timeout: 1} + assert.Equal(t, time.Second, opts.TimeoutDuration()) +} + +func TestOptions_Merge(t *testing.T) { + opts := &Options{} + + opts.Merge(&Options{ + Pipeline: "pipeline", + Delay: 2, + Timeout: 1, + Attempts: 1, + RetryDelay: 1, + }) + + assert.Equal(t, "pipeline", opts.Pipeline) + assert.Equal(t, 1, opts.Attempts) + assert.Equal(t, 2, opts.Delay) + assert.Equal(t, 1, opts.Timeout) + assert.Equal(t, 1, opts.RetryDelay) +} + +func TestOptions_MergeKeepOriginal(t *testing.T) { + opts := &Options{ + Pipeline: "default", + Delay: 10, + Timeout: 10, + Attempts: 10, + RetryDelay: 10, + } + + opts.Merge(&Options{ + Pipeline: "pipeline", + Delay: 2, + Timeout: 1, + Attempts: 1, + RetryDelay: 1, + }) + + assert.Equal(t, "default", opts.Pipeline) + assert.Equal(t, 10, opts.Attempts) + assert.Equal(t, 10, opts.Delay) + assert.Equal(t, 10, opts.Timeout) + assert.Equal(t, 10, opts.RetryDelay) +} diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go new file mode 100644 index 00000000..e7240c6b --- /dev/null +++ b/plugins/jobs/structs/job_test.go @@ -0,0 +1,19 @@ +package structs + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestJob_Body(t *testing.T) { + j := &Job{Payload: "hello"} + + assert.Equal(t, []byte("hello"), j.Body()) +} + +func TestJob_Context(t *testing.T) { + j := &Job{Job: "job"} + + assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id")) +} |