summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xCODE_OF_CONDUCT.md2
-rw-r--r--plugins/jobs/brokers/ephemeral/config.go1
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go8
-rw-r--r--plugins/jobs/config.go17
-rw-r--r--plugins/jobs/dispatcher/dispatcher.go (renamed from plugins/jobs/dispatcher.go)12
-rw-r--r--plugins/jobs/dispatcher/dispatcher_test.go55
-rw-r--r--plugins/jobs/dispatcher_test.go54
-rw-r--r--plugins/jobs/interface.go13
-rw-r--r--plugins/jobs/pipeline/pipeline.go (renamed from plugins/jobs/pipeline.go)4
-rw-r--r--plugins/jobs/pipeline/pipeline_test.go (renamed from plugins/jobs/pipeline_test.go)2
-rw-r--r--plugins/jobs/plugin.go30
-rw-r--r--plugins/jobs/rpc.go15
-rw-r--r--plugins/jobs/structs/job.go (renamed from plugins/jobs/job.go)2
-rw-r--r--plugins/jobs/structs/job_options.go (renamed from plugins/jobs/job_options.go)2
-rw-r--r--plugins/jobs/structs/job_options_test.go (renamed from plugins/jobs/job_options_test.go)2
-rw-r--r--plugins/jobs/structs/job_test.go (renamed from plugins/jobs/job_test.go)2
16 files changed, 142 insertions, 79 deletions
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
index ae0b283a..49aeb3c8 100755
--- a/CODE_OF_CONDUCT.md
+++ b/CODE_OF_CONDUCT.md
@@ -43,4 +43,4 @@ Project maintainers who do not follow or enforce the Code of Conduct in good fai
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version]
[homepage]: http://contributor-covenant.org
-[version]: http://contributor-covenant.org/version/1/4/
+[version]: https://www.contributor-covenant.org/version/2/0/code_of_conduct/
diff --git a/plugins/jobs/brokers/ephemeral/config.go b/plugins/jobs/brokers/ephemeral/config.go
new file mode 100644
index 00000000..847b63ea
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/config.go
@@ -0,0 +1 @@
+package ephemeral
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
new file mode 100644
index 00000000..3028e79a
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -0,0 +1,8 @@
+package ephemeral
+
+type Plugin struct {
+}
+
+func (p *Plugin) Init() error {
+ return nil
+}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index 4606ccba..1e49b959 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -3,6 +3,9 @@ package jobs
import (
"github.com/spiral/errors"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/dispatcher"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
// Config defines settings for job broker, workers and job-pipeline mapping.
@@ -12,23 +15,23 @@ type Config struct {
poolCfg poolImpl.Config
// Dispatch defines where and how to match jobs.
- Dispatch map[string]*Options
+ Dispatch map[string]*structs.Options
// Pipelines defines mapping between PHP job pipeline and associated job broker.
- Pipelines map[string]*Pipeline
+ Pipelines map[string]*pipeline.Pipeline
// Consuming specifies names of pipelines to be consumed on service start.
Consume []string
// parent config for broken options.
- pipelines Pipelines
- route Dispatcher
+ pipelines pipeline.Pipelines
+ route dispatcher.Dispatcher
}
func (c *Config) InitDefaults() error {
const op = errors.Op("config_init_defaults")
var err error
- c.pipelines, err = initPipelines(c.Pipelines)
+ c.pipelines, err = pipeline.InitPipelines(c.Pipelines)
if err != nil {
return errors.E(op, err)
}
@@ -36,9 +39,9 @@ func (c *Config) InitDefaults() error {
}
// MatchPipeline locates the pipeline associated with the job.
-func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error) {
+func (c *Config) MatchPipeline(job *structs.Job) (*pipeline.Pipeline, *structs.Options, error) {
const op = errors.Op("config_match_pipeline")
- opt := c.route.match(job)
+ opt := c.route.Match(job)
pipe := ""
if job.Options != nil {
diff --git a/plugins/jobs/dispatcher.go b/plugins/jobs/dispatcher/dispatcher.go
index 8faf4db5..e73e7b74 100644
--- a/plugins/jobs/dispatcher.go
+++ b/plugins/jobs/dispatcher/dispatcher.go
@@ -1,17 +1,19 @@
-package jobs
+package dispatcher
import (
"strings"
+
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
var separators = []string{"/", "-", "\\"}
// Dispatcher provides ability to automatically locate the pipeline for the specific job
// and update job options (if none set).
-type Dispatcher map[string]*Options
+type Dispatcher map[string]*structs.Options
// pre-compile patterns
-func initDispatcher(routes map[string]*Options) Dispatcher {
+func initDispatcher(routes map[string]*structs.Options) Dispatcher {
dispatcher := make(Dispatcher)
for pattern, opts := range routes {
pattern = strings.ToLower(pattern)
@@ -27,8 +29,8 @@ func initDispatcher(routes map[string]*Options) Dispatcher {
return dispatcher
}
-// match clarifies target job pipeline and other job options. Can return nil.
-func (dispatcher Dispatcher) match(job *Job) (found *Options) {
+// Match clarifies target job pipeline and other job options. Can return nil.
+func (dispatcher Dispatcher) Match(job *structs.Job) (found *structs.Options) {
var best = 0
jobName := strings.ToLower(job.Job)
diff --git a/plugins/jobs/dispatcher/dispatcher_test.go b/plugins/jobs/dispatcher/dispatcher_test.go
new file mode 100644
index 00000000..e584bda8
--- /dev/null
+++ b/plugins/jobs/dispatcher/dispatcher_test.go
@@ -0,0 +1,55 @@
+package dispatcher
+
+import (
+ "testing"
+
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_Map_All(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{"default": {Pipeline: "default"}})
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "default"}).Pipeline)
+}
+
+func Test_Map_Miss(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{"some.*": {Pipeline: "default"}})
+
+ assert.Nil(t, m.Match(&structs.Job{Job: "miss"}))
+}
+
+func Test_Map_Best(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestUpper(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{
+ "some.*": {Pipeline: "default"},
+ "some.Other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.OTHER"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "Some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestReversed(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
+}
diff --git a/plugins/jobs/dispatcher_test.go b/plugins/jobs/dispatcher_test.go
deleted file mode 100644
index 9917642f..00000000
--- a/plugins/jobs/dispatcher_test.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package jobs
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_Map_All(t *testing.T) {
- m := initDispatcher(map[string]*Options{"default": {Pipeline: "default"}})
- assert.Equal(t, "default", m.match(&Job{Job: "default"}).Pipeline)
-}
-
-func Test_Map_Miss(t *testing.T) {
- m := initDispatcher(map[string]*Options{"some.*": {Pipeline: "default"}})
-
- assert.Nil(t, m.match(&Job{Job: "miss"}))
-}
-
-func Test_Map_Best(t *testing.T) {
- m := initDispatcher(map[string]*Options{
- "some.*": {Pipeline: "default"},
- "some.other.*": {Pipeline: "other"},
- })
-
- assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
- assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
- assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline)
- assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline)
-}
-
-func Test_Map_BestUpper(t *testing.T) {
- m := initDispatcher(map[string]*Options{
- "some.*": {Pipeline: "default"},
- "some.Other.*": {Pipeline: "other"},
- })
-
- assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
- assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
- assert.Equal(t, "other", m.match(&Job{Job: "some.OTHER"}).Pipeline)
- assert.Equal(t, "other", m.match(&Job{Job: "Some.other.job"}).Pipeline)
-}
-
-func Test_Map_BestReversed(t *testing.T) {
- m := initDispatcher(map[string]*Options{
- "some.*": {Pipeline: "default"},
- "some.other.*": {Pipeline: "other"},
- })
-
- assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline)
- assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline)
- assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
- assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
-}
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
index d013d320..b4862038 100644
--- a/plugins/jobs/interface.go
+++ b/plugins/jobs/interface.go
@@ -1,9 +1,14 @@
package jobs
-// todo naming
+import (
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+)
+
+// Consumer todo naming
type Consumer interface {
- Push()
+ Push(*pipeline.Pipeline, *structs.Job) (string, error)
Stat()
- Consume(*Pipeline)
- Register(*Pipeline)
+ Consume(*pipeline.Pipeline)
+ Register(*pipeline.Pipeline)
}
diff --git a/plugins/jobs/pipeline.go b/plugins/jobs/pipeline/pipeline.go
index bfd2e18c..f27f6ede 100644
--- a/plugins/jobs/pipeline.go
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -1,4 +1,4 @@
-package jobs
+package pipeline
import (
"time"
@@ -10,7 +10,7 @@ import (
type Pipelines []*Pipeline
-func initPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
+func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
const op = errors.Op("pipeline_init")
out := make(Pipelines, 0)
diff --git a/plugins/jobs/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go
index c1f958df..f03dcbb8 100644
--- a/plugins/jobs/pipeline_test.go
+++ b/plugins/jobs/pipeline/pipeline_test.go
@@ -1,4 +1,4 @@
-package jobs
+package pipeline
import (
"testing"
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index bd5ff5bf..072f872a 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -6,8 +6,10 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -25,6 +27,7 @@ type Plugin struct {
workersPool pool.Pool
consumers map[string]Consumer
+ events events.Handler
}
func testListener(data interface{}) {
@@ -47,6 +50,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
+ p.events = events.NewEventsHandler()
+ p.events.AddListener(testListener)
p.consumers = make(map[string]Consumer)
p.log = log
return nil
@@ -78,6 +83,31 @@ func (p *Plugin) Name() string {
return PluginName
}
+func (p *Plugin) Push(j *structs.Job) (string, error) {
+ pipe, pOpts, err := p.cfg.MatchPipeline(j)
+ if err != nil {
+ panic(err)
+ }
+
+ if pOpts != nil {
+ j.Options.Merge(pOpts)
+ }
+
+ broker, ok := p.consumers[pipe.Broker()]
+ if !ok {
+ panic("broker not found")
+ }
+
+ id, err := broker.Push(pipe, j)
+ if err != nil {
+ panic(err)
+ }
+
+ // p.events.Push()
+
+ return id, nil
+}
+
func (p *Plugin) RPC() interface{} {
return &rpc{log: p.log}
}
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index dbe7f808..e77cda59 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -1,7 +1,20 @@
package jobs
-import "github.com/spiral/roadrunner/v2/plugins/logger"
+import (
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
type rpc struct {
log logger.Logger
+ p *Plugin
+}
+
+func (r *rpc) Push(j *structs.Job, idRet *string) error {
+ id, err := r.p.Push(j)
+ if err != nil {
+ panic(err)
+ }
+ *idRet = id
+ return nil
}
diff --git a/plugins/jobs/job.go b/plugins/jobs/structs/job.go
index 79bb8ad8..2e394543 100644
--- a/plugins/jobs/job.go
+++ b/plugins/jobs/structs/job.go
@@ -1,4 +1,4 @@
-package jobs
+package structs
import (
json "github.com/json-iterator/go"
diff --git a/plugins/jobs/job_options.go b/plugins/jobs/structs/job_options.go
index d4c6f0d2..1507d053 100644
--- a/plugins/jobs/job_options.go
+++ b/plugins/jobs/structs/job_options.go
@@ -1,4 +1,4 @@
-package jobs
+package structs
import "time"
diff --git a/plugins/jobs/job_options_test.go b/plugins/jobs/structs/job_options_test.go
index d226fa1e..18702394 100644
--- a/plugins/jobs/job_options_test.go
+++ b/plugins/jobs/structs/job_options_test.go
@@ -1,4 +1,4 @@
-package jobs
+package structs
import (
"testing"
diff --git a/plugins/jobs/job_test.go b/plugins/jobs/structs/job_test.go
index 1f4bf918..e7240c6b 100644
--- a/plugins/jobs/job_test.go
+++ b/plugins/jobs/structs/job_test.go
@@ -1,4 +1,4 @@
-package jobs
+package structs
import (
"testing"