diff options
-rwxr-xr-x | CODE_OF_CONDUCT.md | 2 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/config.go | 1 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 8 | ||||
-rw-r--r-- | plugins/jobs/config.go | 17 | ||||
-rw-r--r-- | plugins/jobs/dispatcher/dispatcher.go (renamed from plugins/jobs/dispatcher.go) | 12 | ||||
-rw-r--r-- | plugins/jobs/dispatcher/dispatcher_test.go | 55 | ||||
-rw-r--r-- | plugins/jobs/dispatcher_test.go | 54 | ||||
-rw-r--r-- | plugins/jobs/interface.go | 13 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline.go (renamed from plugins/jobs/pipeline.go) | 4 | ||||
-rw-r--r-- | plugins/jobs/pipeline/pipeline_test.go (renamed from plugins/jobs/pipeline_test.go) | 2 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 30 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 15 | ||||
-rw-r--r-- | plugins/jobs/structs/job.go (renamed from plugins/jobs/job.go) | 2 | ||||
-rw-r--r-- | plugins/jobs/structs/job_options.go (renamed from plugins/jobs/job_options.go) | 2 | ||||
-rw-r--r-- | plugins/jobs/structs/job_options_test.go (renamed from plugins/jobs/job_options_test.go) | 2 | ||||
-rw-r--r-- | plugins/jobs/structs/job_test.go (renamed from plugins/jobs/job_test.go) | 2 |
16 files changed, 142 insertions, 79 deletions
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index ae0b283a..49aeb3c8 100755 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -43,4 +43,4 @@ Project maintainers who do not follow or enforce the Code of Conduct in good fai This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version] [homepage]: http://contributor-covenant.org -[version]: http://contributor-covenant.org/version/1/4/ +[version]: https://www.contributor-covenant.org/version/2/0/code_of_conduct/ diff --git a/plugins/jobs/brokers/ephemeral/config.go b/plugins/jobs/brokers/ephemeral/config.go new file mode 100644 index 00000000..847b63ea --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/config.go @@ -0,0 +1 @@ +package ephemeral diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go new file mode 100644 index 00000000..3028e79a --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -0,0 +1,8 @@ +package ephemeral + +type Plugin struct { +} + +func (p *Plugin) Init() error { + return nil +} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go index 4606ccba..1e49b959 100644 --- a/plugins/jobs/config.go +++ b/plugins/jobs/config.go @@ -3,6 +3,9 @@ package jobs import ( "github.com/spiral/errors" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/jobs/dispatcher" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) // Config defines settings for job broker, workers and job-pipeline mapping. @@ -12,23 +15,23 @@ type Config struct { poolCfg poolImpl.Config // Dispatch defines where and how to match jobs. - Dispatch map[string]*Options + Dispatch map[string]*structs.Options // Pipelines defines mapping between PHP job pipeline and associated job broker. - Pipelines map[string]*Pipeline + Pipelines map[string]*pipeline.Pipeline // Consuming specifies names of pipelines to be consumed on service start. Consume []string // parent config for broken options. - pipelines Pipelines - route Dispatcher + pipelines pipeline.Pipelines + route dispatcher.Dispatcher } func (c *Config) InitDefaults() error { const op = errors.Op("config_init_defaults") var err error - c.pipelines, err = initPipelines(c.Pipelines) + c.pipelines, err = pipeline.InitPipelines(c.Pipelines) if err != nil { return errors.E(op, err) } @@ -36,9 +39,9 @@ func (c *Config) InitDefaults() error { } // MatchPipeline locates the pipeline associated with the job. -func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error) { +func (c *Config) MatchPipeline(job *structs.Job) (*pipeline.Pipeline, *structs.Options, error) { const op = errors.Op("config_match_pipeline") - opt := c.route.match(job) + opt := c.route.Match(job) pipe := "" if job.Options != nil { diff --git a/plugins/jobs/dispatcher.go b/plugins/jobs/dispatcher/dispatcher.go index 8faf4db5..e73e7b74 100644 --- a/plugins/jobs/dispatcher.go +++ b/plugins/jobs/dispatcher/dispatcher.go @@ -1,17 +1,19 @@ -package jobs +package dispatcher import ( "strings" + + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) var separators = []string{"/", "-", "\\"} // Dispatcher provides ability to automatically locate the pipeline for the specific job // and update job options (if none set). -type Dispatcher map[string]*Options +type Dispatcher map[string]*structs.Options // pre-compile patterns -func initDispatcher(routes map[string]*Options) Dispatcher { +func initDispatcher(routes map[string]*structs.Options) Dispatcher { dispatcher := make(Dispatcher) for pattern, opts := range routes { pattern = strings.ToLower(pattern) @@ -27,8 +29,8 @@ func initDispatcher(routes map[string]*Options) Dispatcher { return dispatcher } -// match clarifies target job pipeline and other job options. Can return nil. -func (dispatcher Dispatcher) match(job *Job) (found *Options) { +// Match clarifies target job pipeline and other job options. Can return nil. +func (dispatcher Dispatcher) Match(job *structs.Job) (found *structs.Options) { var best = 0 jobName := strings.ToLower(job.Job) diff --git a/plugins/jobs/dispatcher/dispatcher_test.go b/plugins/jobs/dispatcher/dispatcher_test.go new file mode 100644 index 00000000..e584bda8 --- /dev/null +++ b/plugins/jobs/dispatcher/dispatcher_test.go @@ -0,0 +1,55 @@ +package dispatcher + +import ( + "testing" + + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/stretchr/testify/assert" +) + +func Test_Map_All(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{"default": {Pipeline: "default"}}) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "default"}).Pipeline) +} + +func Test_Map_Miss(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{"some.*": {Pipeline: "default"}}) + + assert.Nil(t, m.Match(&structs.Job{Job: "miss"})) +} + +func Test_Map_Best(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline) +} + +func Test_Map_BestUpper(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.Other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.OTHER"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "Some.other.job"}).Pipeline) +} + +func Test_Map_BestReversed(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) +} diff --git a/plugins/jobs/dispatcher_test.go b/plugins/jobs/dispatcher_test.go deleted file mode 100644 index 9917642f..00000000 --- a/plugins/jobs/dispatcher_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package jobs - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_Map_All(t *testing.T) { - m := initDispatcher(map[string]*Options{"default": {Pipeline: "default"}}) - assert.Equal(t, "default", m.match(&Job{Job: "default"}).Pipeline) -} - -func Test_Map_Miss(t *testing.T) { - m := initDispatcher(map[string]*Options{"some.*": {Pipeline: "default"}}) - - assert.Nil(t, m.match(&Job{Job: "miss"})) -} - -func Test_Map_Best(t *testing.T) { - m := initDispatcher(map[string]*Options{ - "some.*": {Pipeline: "default"}, - "some.other.*": {Pipeline: "other"}, - }) - - assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) - assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) - assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline) - assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline) -} - -func Test_Map_BestUpper(t *testing.T) { - m := initDispatcher(map[string]*Options{ - "some.*": {Pipeline: "default"}, - "some.Other.*": {Pipeline: "other"}, - }) - - assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) - assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) - assert.Equal(t, "other", m.match(&Job{Job: "some.OTHER"}).Pipeline) - assert.Equal(t, "other", m.match(&Job{Job: "Some.other.job"}).Pipeline) -} - -func Test_Map_BestReversed(t *testing.T) { - m := initDispatcher(map[string]*Options{ - "some.*": {Pipeline: "default"}, - "some.other.*": {Pipeline: "other"}, - }) - - assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline) - assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline) - assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) - assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) -} diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go index d013d320..b4862038 100644 --- a/plugins/jobs/interface.go +++ b/plugins/jobs/interface.go @@ -1,9 +1,14 @@ package jobs -// todo naming +import ( + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +// Consumer todo naming type Consumer interface { - Push() + Push(*pipeline.Pipeline, *structs.Job) (string, error) Stat() - Consume(*Pipeline) - Register(*Pipeline) + Consume(*pipeline.Pipeline) + Register(*pipeline.Pipeline) } diff --git a/plugins/jobs/pipeline.go b/plugins/jobs/pipeline/pipeline.go index bfd2e18c..f27f6ede 100644 --- a/plugins/jobs/pipeline.go +++ b/plugins/jobs/pipeline/pipeline.go @@ -1,4 +1,4 @@ -package jobs +package pipeline import ( "time" @@ -10,7 +10,7 @@ import ( type Pipelines []*Pipeline -func initPipelines(pipes map[string]*Pipeline) (Pipelines, error) { +func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) { const op = errors.Op("pipeline_init") out := make(Pipelines, 0) diff --git a/plugins/jobs/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go index c1f958df..f03dcbb8 100644 --- a/plugins/jobs/pipeline_test.go +++ b/plugins/jobs/pipeline/pipeline_test.go @@ -1,4 +1,4 @@ -package jobs +package pipeline import ( "testing" diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index bd5ff5bf..072f872a 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -6,8 +6,10 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -25,6 +27,7 @@ type Plugin struct { workersPool pool.Pool consumers map[string]Consumer + events events.Handler } func testListener(data interface{}) { @@ -47,6 +50,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } + p.events = events.NewEventsHandler() + p.events.AddListener(testListener) p.consumers = make(map[string]Consumer) p.log = log return nil @@ -78,6 +83,31 @@ func (p *Plugin) Name() string { return PluginName } +func (p *Plugin) Push(j *structs.Job) (string, error) { + pipe, pOpts, err := p.cfg.MatchPipeline(j) + if err != nil { + panic(err) + } + + if pOpts != nil { + j.Options.Merge(pOpts) + } + + broker, ok := p.consumers[pipe.Broker()] + if !ok { + panic("broker not found") + } + + id, err := broker.Push(pipe, j) + if err != nil { + panic(err) + } + + // p.events.Push() + + return id, nil +} + func (p *Plugin) RPC() interface{} { return &rpc{log: p.log} } diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index dbe7f808..e77cda59 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -1,7 +1,20 @@ package jobs -import "github.com/spiral/roadrunner/v2/plugins/logger" +import ( + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" +) type rpc struct { log logger.Logger + p *Plugin +} + +func (r *rpc) Push(j *structs.Job, idRet *string) error { + id, err := r.p.Push(j) + if err != nil { + panic(err) + } + *idRet = id + return nil } diff --git a/plugins/jobs/job.go b/plugins/jobs/structs/job.go index 79bb8ad8..2e394543 100644 --- a/plugins/jobs/job.go +++ b/plugins/jobs/structs/job.go @@ -1,4 +1,4 @@ -package jobs +package structs import ( json "github.com/json-iterator/go" diff --git a/plugins/jobs/job_options.go b/plugins/jobs/structs/job_options.go index d4c6f0d2..1507d053 100644 --- a/plugins/jobs/job_options.go +++ b/plugins/jobs/structs/job_options.go @@ -1,4 +1,4 @@ -package jobs +package structs import "time" diff --git a/plugins/jobs/job_options_test.go b/plugins/jobs/structs/job_options_test.go index d226fa1e..18702394 100644 --- a/plugins/jobs/job_options_test.go +++ b/plugins/jobs/structs/job_options_test.go @@ -1,4 +1,4 @@ -package jobs +package structs import ( "testing" diff --git a/plugins/jobs/job_test.go b/plugins/jobs/structs/job_test.go index 1f4bf918..e7240c6b 100644 --- a/plugins/jobs/job_test.go +++ b/plugins/jobs/structs/job_test.go @@ -1,4 +1,4 @@ -package jobs +package structs import ( "testing" |