diff options
Diffstat (limited to 'plugins')
39 files changed, 968 insertions, 80 deletions
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go index 46709d71..eda3572f 100644 --- a/plugins/broadcast/interface.go +++ b/plugins/broadcast/interface.go @@ -1,6 +1,6 @@ package broadcast -import "github.com/spiral/roadrunner/v2/pkg/pubsub" +import "github.com/spiral/roadrunner/v2/common/pubsub" type Broadcaster interface { GetDriver(key string) (pubsub.SubReader, error) diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 6ddef806..889dc2fa 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -7,7 +7,7 @@ import ( "github.com/google/uuid" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go index 2ee211f8..475076a0 100644 --- a/plugins/broadcast/rpc.go +++ b/plugins/broadcast/rpc.go @@ -2,7 +2,7 @@ package broadcast import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" ) diff --git a/plugins/jobs/.rr.yaml b/plugins/jobs/.rr.yaml new file mode 100644 index 00000000..1b84515f --- /dev/null +++ b/plugins/jobs/.rr.yaml @@ -0,0 +1,73 @@ +server: + command: "php worker.php" + +jobs: + # worker pool configuration + pool: + num_workers: 4 + + # rabbitmq and similar servers + amqp: + addr: amqp://guest:guest@localhost:5672/ + + # beanstalk configuration + beanstalk: + addr: tcp://localhost:11300 + + # amazon sqs configuration + sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://localhost:9324 + + # job destinations and options + dispatch: + spiral-jobs-tests-amqp-*.pipeline: amqp + spiral-jobs-tests-local-*.pipeline: local + spiral-jobs-tests-beanstalk-*.pipeline: beanstalk + spiral-jobs-tests-sqs-*.pipeline: sqs + + # list of broker pipelines associated with endpoints + pipelines: + local: + broker: ephemeral + + amqp: + broker: amqp + queue: default + + beanstalk: + broker: beanstalk + tube: default + + sqs: + broker: sqs + queue: default + declare: + MessageRetentionPeriod: 86400 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: ["local", "amqp", "beanstalk", "sqs"] + + +# monitors rr server(s) +limit: + # check worker state each second + interval: 1 + + # custom watch configuration for each service + services: + # monitor queue workers + jobs: + # maximum allowed memory consumption per worker (soft) + maxMemory: 100 + + # maximum time to live for the worker (soft) + TTL: 0 + + # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft) + idleTTL: 0 + + # max_execution_time (brutal) + execTTL: 60 diff --git a/plugins/jobs/brokers/amqp/config.go b/plugins/jobs/brokers/amqp/config.go new file mode 100644 index 00000000..a60cb486 --- /dev/null +++ b/plugins/jobs/brokers/amqp/config.go @@ -0,0 +1,22 @@ +package amqp + +import "time" + +// Config defines sqs broker configuration. +type Config struct { + // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/). + Addr string + + // Timeout to allocate the connection. Default 10 seconds. + Timeout int +} + +// TimeoutDuration returns number of seconds allowed to redial +func (c *Config) TimeoutDuration() time.Duration { + timeout := c.Timeout + if timeout == 0 { + timeout = 10 + } + + return time.Duration(timeout) * time.Second +} diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go new file mode 100644 index 00000000..0e8d02ac --- /dev/null +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -0,0 +1 @@ +package amqp diff --git a/plugins/jobs/brokers/ephemeral/config.go b/plugins/jobs/brokers/ephemeral/config.go new file mode 100644 index 00000000..847b63ea --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/config.go @@ -0,0 +1 @@ +package ephemeral diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go new file mode 100644 index 00000000..3028e79a --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -0,0 +1,8 @@ +package ephemeral + +type Plugin struct { +} + +func (p *Plugin) Init() error { + return nil +} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go new file mode 100644 index 00000000..1e49b959 --- /dev/null +++ b/plugins/jobs/config.go @@ -0,0 +1,64 @@ +package jobs + +import ( + "github.com/spiral/errors" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/jobs/dispatcher" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +// Config defines settings for job broker, workers and job-pipeline mapping. +type Config struct { + // Workers configures roadrunner server and worker busy. + // Workers *roadrunner.ServerConfig + poolCfg poolImpl.Config + + // Dispatch defines where and how to match jobs. + Dispatch map[string]*structs.Options + + // Pipelines defines mapping between PHP job pipeline and associated job broker. + Pipelines map[string]*pipeline.Pipeline + + // Consuming specifies names of pipelines to be consumed on service start. + Consume []string + + // parent config for broken options. + pipelines pipeline.Pipelines + route dispatcher.Dispatcher +} + +func (c *Config) InitDefaults() error { + const op = errors.Op("config_init_defaults") + var err error + c.pipelines, err = pipeline.InitPipelines(c.Pipelines) + if err != nil { + return errors.E(op, err) + } + return nil +} + +// MatchPipeline locates the pipeline associated with the job. +func (c *Config) MatchPipeline(job *structs.Job) (*pipeline.Pipeline, *structs.Options, error) { + const op = errors.Op("config_match_pipeline") + opt := c.route.Match(job) + + pipe := "" + if job.Options != nil { + pipe = job.Options.Pipeline + } + + if pipe == "" && opt != nil { + pipe = opt.Pipeline + } + + if pipe == "" { + return nil, nil, errors.E(op, errors.Errorf("unable to locate pipeline for `%s`", job.Job)) + } + + if p := c.pipelines.Get(pipe); p != nil { + return p, opt, nil + } + + return nil, nil, errors.E(op, errors.Errorf("undefined pipeline `%s`", pipe)) +} diff --git a/plugins/jobs/dispatcher/dispatcher.go b/plugins/jobs/dispatcher/dispatcher.go new file mode 100644 index 00000000..e73e7b74 --- /dev/null +++ b/plugins/jobs/dispatcher/dispatcher.go @@ -0,0 +1,49 @@ +package dispatcher + +import ( + "strings" + + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +var separators = []string{"/", "-", "\\"} + +// Dispatcher provides ability to automatically locate the pipeline for the specific job +// and update job options (if none set). +type Dispatcher map[string]*structs.Options + +// pre-compile patterns +func initDispatcher(routes map[string]*structs.Options) Dispatcher { + dispatcher := make(Dispatcher) + for pattern, opts := range routes { + pattern = strings.ToLower(pattern) + pattern = strings.Trim(pattern, "-.*") + + for _, s := range separators { + pattern = strings.ReplaceAll(pattern, s, ".") + } + + dispatcher[pattern] = opts + } + + return dispatcher +} + +// Match clarifies target job pipeline and other job options. Can return nil. +func (dispatcher Dispatcher) Match(job *structs.Job) (found *structs.Options) { + var best = 0 + + jobName := strings.ToLower(job.Job) + for pattern, opts := range dispatcher { + if strings.HasPrefix(jobName, pattern) && len(pattern) > best { + found = opts + best = len(pattern) + } + } + + if best == 0 { + return nil + } + + return found +} diff --git a/plugins/jobs/dispatcher/dispatcher_test.go b/plugins/jobs/dispatcher/dispatcher_test.go new file mode 100644 index 00000000..e584bda8 --- /dev/null +++ b/plugins/jobs/dispatcher/dispatcher_test.go @@ -0,0 +1,55 @@ +package dispatcher + +import ( + "testing" + + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/stretchr/testify/assert" +) + +func Test_Map_All(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{"default": {Pipeline: "default"}}) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "default"}).Pipeline) +} + +func Test_Map_Miss(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{"some.*": {Pipeline: "default"}}) + + assert.Nil(t, m.Match(&structs.Job{Job: "miss"})) +} + +func Test_Map_Best(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline) +} + +func Test_Map_BestUpper(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.Other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.OTHER"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "Some.other.job"}).Pipeline) +} + +func Test_Map_BestReversed(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) +} diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio new file mode 100644 index 00000000..0639f448 --- /dev/null +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-06-22T08:56:50.739Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="Kdvb2D1nWMjMRedS8I3V" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvpc9o4FP9rmKGZCePb5mMIOdpttzTZnbb7TdgCu7UtVxZX/vqVbPmUIDQxkLQlB+hJlvXe+71DT6anX0brGwwS/wPyYNjTFG/d08c9TdMUS6NvjLLJKaqmmjlljgOP0yrCffAAOVHh1EXgwbQxkCAUkiBpEl0Ux9AlDRrAGK2aw2YobN41AXMoEO5dEIrUz4FH/GJ19FX13MJg7hOhKwLFeE5IfeChVY2kX/X0S4wQyT9F60sYMgEWosmvu97SW64Nw5jsc8GXOF7fqiiCywcS2zef4H+Ke64Z+TRLEC4403y1ZFNIAaNF7EE2i9LTRys/IPA+AS7rXVHFU5pPopC2VPpxFoThJQoRzq7VPQCdmUvpKcHoO6z1WK4DpzPaI/LBWVtCTOC6RuJ83UDKBMEbOoT36g6XMQfa0C4QtKrUphWD/LrCCu0ADpV5OXklSvqBS/NnJCvIEXoUXLyJMPHRHMUgvKqoo6akqzHvEUq4fL9BQjbcUsCCoKb0qQjx5gu7fmAWza/1vvGaT563Nry1VQcpWmAX7mKT80kAnkOyYyCfkAlhp0oxDAEJlk0LlGmHXzpBAV1zCQXDMJtQMO2WhvOV8svq9iLM1AKVOWzNlAtHmOkCY7CpDUvYgFQAVMnz0zGmnwRj64DUIEZbXwtM0c8VwFhjU0fbUXGp7glL41iwbIFJt/aDZVdQKXxq5efvJnQm5Zz+Tf69v6WT5m90QZvY7TYImNDxDFkQcLSpblkdBQG7JWJDE4OAakiCgH6oGDA8qX3WrbO01Z+zT+Vg9mnsGzaU4xio1kZPmVMcy0D/OPPng8U6kjfXhwNnaJmmamuGqtvDJnS0dlJ5YOiIOfwHQFyfefa3k6v3b/++6tSdQ5U6dFvmzoeWrYOu3Hk7p9dl7lyTuPODpfT2KS1Ubbhz+6X5c3NPE+3cQp+lUefV+Vy1oc5KuydTaOf7umcp1BRcYU+zQrr+UdLQs/Vjweouo4iyF8Q9/YL2Ksma/s+EpuT0c8JUyvqMWh91YOQchMGcX+dS2ULc7PagizDlF/ExDDI4DGJY3Zp+mvP3bIHTgvAOTalYLu4JXrhkgWExgApk2r6I0pI2zcdtypN4D+GM5J0O62wvm64yu2KczY2DeF4yUt5/AjYhAt6j4z4mTFJpOa5sy5hsGSyLKIELwotcIeNMZSOunnHOxAjRUbMwq76xYMZiGooJt1BV4+1rEAUhs5xbGC4hm3VnHJyZ7Eda28pefNYaPX91Ex8ttV2eUCTx0ZLEx3YZozPjs34B4yug9+LtbxIkMOfpEeMaU8e7eXTUBSEwSkhlg1muup3JQ7B0x1DYXG5tFVNBrMda1z9BBNGCPGVRf5xV1usItdR9nZWqHspbqaqgnpOl83tn89qxyqfKnulf5/XT5+lUEULQHbVcOMg2xP1vaPqm063wzHGhKz3emjqmYe7Uwf7Wo5rtrfBwz63wwSqbqiYIGkMaLON69tYSNIvNTWkWjo8HcX3U9o9R4Hm53cE0eADTbComVX6mQuc1Rz1zzOaippZyHykoJEYx7EYXmlCWcERdDI9ZlVDF8k9Cc4Oexsr76FfWhdG2C80SdSE79T2cLsT9Zy1PO6d5AYiYR+HZAc0/Eio0npOZ44CJfsZ8jz3q2ePfRWsSC1KPa0Inrey97Np7UVN/vBDkdJ0KPKn4rrZj3qHPbcRdbwTpHZm4K9Pf5odfYR1ePX3ycdpz1Zdtrs6e5lpo7MTmqhz5rKyQT81cbyBb7ojZECskzTCKmAB9mDE/D1IaWSErYk6zIaxEsgwAt+9Bfl2/433FbKbJ9xWeNbXMQ5m2IovEMtNuP7zS3VNz4gZuskj9fpXR5ns4hVonvX1Nbx1K/zQPLTrDfaVvHEz64lNCW8T7a2SiVhv/hqgB85iJqCZuqwXRowVhScVl+cyzItbu6M81u+1ojoEXwKqPC68F+HK4VNIeSP3SkgqtvgdTGE6olvIy+3iKCKF+U1R7s8RZoqNufZLqZhHL2S1BmuSMzoI1W0cOGIivljDHTYYRHyTsgmg9Zw+jD8AqNQaLNLtXF1BR1CZUbF2Eiu2IUClo3UNFFz3l7YQSPn+8++vqToBNIaAEIxem6eNOcQrc7/PMjX7MEbclVB2lBGbqLVO1DWdg7ukuO6g2Plgzb6U718b36OLrzWI9sYe356IGjnfeJTnIyksLodiqygqt+kOrteVEozz0oi4nXURZktTPc6E3Jzobk4pKOMzZJcdslCDFewJI/6wo2bA8o39GE450wDry/ANijBp8d3MEdSiOsuzprCpCMRbO3uU5VL84DHyFfHEsNpT1SpZ+x/cUTaD19Gv+SxVDvWhIYx7baqQoXMJ0C3dbfDzO4mXdmYtnijLvv8UrS3z39rOK0gXvOqwon9Vu+Oku8lqpo/5N6gWtrfyjBQSprCTPfUnHOXIYdH7Ot2uR9dwn+/LCczaBXYDfaoPfkO3pFAn4uyiWbddTQ1Q/BiyksVzCB3QPEb4AsbWfZTrkASdtVl+6zMtK1ddX9av/AQ==</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go new file mode 100644 index 00000000..b4862038 --- /dev/null +++ b/plugins/jobs/interface.go @@ -0,0 +1,14 @@ +package jobs + +import ( + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +// Consumer todo naming +type Consumer interface { + Push(*pipeline.Pipeline, *structs.Job) (string, error) + Stat() + Consume(*pipeline.Pipeline) + Register(*pipeline.Pipeline) +} diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go new file mode 100644 index 00000000..f27f6ede --- /dev/null +++ b/plugins/jobs/pipeline/pipeline.go @@ -0,0 +1,172 @@ +package pipeline + +import ( + "time" + + "github.com/spiral/errors" +) + +// Pipelines is list of Pipeline. + +type Pipelines []*Pipeline + +func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) { + const op = errors.Op("pipeline_init") + out := make(Pipelines, 0) + + for name, pipe := range pipes { + if pipe.Broker() == "" { + return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker")) + } + + p := pipe.With("name", name) + out = append(out, &p) + } + + return out, nil +} + +// Reverse returns pipelines in reversed order. +func (ps Pipelines) Reverse() Pipelines { + out := make(Pipelines, len(ps)) + + for i, p := range ps { + out[len(ps)-i-1] = p + } + + return out +} + +// Broker return pipelines associated with specific broker. +func (ps Pipelines) Broker(broker string) Pipelines { + out := make(Pipelines, 0) + + for _, p := range ps { + if p.Broker() != broker { + continue + } + + out = append(out, p) + } + + return out +} + +// Names returns only pipelines with specified names. +func (ps Pipelines) Names(only ...string) Pipelines { + out := make(Pipelines, 0) + + for _, name := range only { + for _, p := range ps { + if p.Name() == name { + out = append(out, p) + } + } + } + + return out +} + +// Get returns pipeline by it'svc name. +func (ps Pipelines) Get(name string) *Pipeline { + // possibly optimize + for _, p := range ps { + if p.Name() == name { + return p + } + } + + return nil +} + +// Pipeline defines pipeline options. +type Pipeline map[string]interface{} + +// With pipeline value. Immutable. +func (p Pipeline) With(name string, value interface{}) Pipeline { + out := make(map[string]interface{}) + for k, v := range p { + out[k] = v + } + out[name] = value + + return out +} + +// Name returns pipeline name. +func (p Pipeline) Name() string { + return p.String("name", "") +} + +// Broker associated with the pipeline. +func (p Pipeline) Broker() string { + return p.String("broker", "") +} + +// Has checks if value presented in pipeline. +func (p Pipeline) Has(name string) bool { + if _, ok := p[name]; ok { + return true + } + + return false +} + +// Map must return nested map value or empty config. +func (p Pipeline) Map(name string) Pipeline { + out := make(map[string]interface{}) + + if value, ok := p[name]; ok { + if m, ok := value.(map[string]interface{}); ok { + for k, v := range m { + out[k] = v + } + } + } + + return out +} + +// Bool must return option value as string or return default value. +func (p Pipeline) Bool(name string, d bool) bool { + if value, ok := p[name]; ok { + if b, ok := value.(bool); ok { + return b + } + } + + return d +} + +// String must return option value as string or return default value. +func (p Pipeline) String(name string, d string) string { + if value, ok := p[name]; ok { + if str, ok := value.(string); ok { + return str + } + } + + return d +} + +// Integer must return option value as string or return default value. +func (p Pipeline) Integer(name string, d int) int { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return str + } + } + + return d +} + +// Duration must return option value as time.Duration (seconds) or return default value. +func (p Pipeline) Duration(name string, d time.Duration) time.Duration { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return time.Second * time.Duration(str) + } + } + + return d +} diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go new file mode 100644 index 00000000..f03dcbb8 --- /dev/null +++ b/plugins/jobs/pipeline/pipeline_test.go @@ -0,0 +1,90 @@ +package pipeline + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPipeline_Map(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} + + assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0)) + assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0)) +} + +func TestPipeline_MapString(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}} + + assert.Equal(t, "default", pipe.Map("options").String("alias", "")) + assert.Equal(t, "", pipe.Map("other").String("alias", "")) +} + +func TestPipeline_Bool(t *testing.T) { + pipe := Pipeline{"value": true} + + assert.Equal(t, true, pipe.Bool("value", false)) + assert.Equal(t, true, pipe.Bool("other", true)) +} + +func TestPipeline_String(t *testing.T) { + pipe := Pipeline{"value": "value"} + + assert.Equal(t, "value", pipe.String("value", "")) + assert.Equal(t, "value", pipe.String("other", "value")) +} + +func TestPipeline_Integer(t *testing.T) { + pipe := Pipeline{"value": 1} + + assert.Equal(t, 1, pipe.Integer("value", 0)) + assert.Equal(t, 1, pipe.Integer("other", 1)) +} + +func TestPipeline_Duration(t *testing.T) { + pipe := Pipeline{"value": 1} + + assert.Equal(t, time.Second, pipe.Duration("value", 0)) + assert.Equal(t, time.Second, pipe.Duration("other", time.Second)) +} + +func TestPipeline_Has(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} + + assert.Equal(t, true, pipe.Has("options")) + assert.Equal(t, false, pipe.Has("other")) +} + +func TestPipeline_FilterBroker(t *testing.T) { + pipes := Pipelines{ + &Pipeline{"name": "first", "broker": "a"}, + &Pipeline{"name": "second", "broker": "a"}, + &Pipeline{"name": "third", "broker": "b"}, + &Pipeline{"name": "forth", "broker": "b"}, + } + + filtered := pipes.Names("first", "third") + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "a", filtered[0].Broker()) + assert.Equal(t, "b", filtered[1].Broker()) + + filtered = pipes.Names("first", "third").Reverse() + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "a", filtered[1].Broker()) + assert.Equal(t, "b", filtered[0].Broker()) + + filtered = pipes.Broker("a") + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "first", filtered[0].Name()) + assert.Equal(t, "second", filtered[1].Name()) + + filtered = pipes.Broker("a").Reverse() + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "first", filtered[1].Name()) + assert.Equal(t, "second", filtered[0].Name()) +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go new file mode 100644 index 00000000..072f872a --- /dev/null +++ b/plugins/jobs/plugin.go @@ -0,0 +1,113 @@ +package jobs + +import ( + "context" + "fmt" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" +) + +const ( + // RrJobs env variable + RrJobs string = "rr_jobs" + PluginName string = "jobs" +) + +type Plugin struct { + cfg *Config + log logger.Logger + + workersPool pool.Pool + + consumers map[string]Consumer + events events.Handler +} + +func testListener(data interface{}) { + fmt.Println(data) +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { + const op = errors.Op("jobs_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, err) + } + + p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) + if err != nil { + return errors.E(op, err) + } + + p.events = events.NewEventsHandler() + p.events.AddListener(testListener) + p.consumers = make(map[string]Consumer) + p.log = log + return nil +} + +func (p *Plugin) Serve() chan error { + errCh := make(chan error, 1) + + return errCh +} + +func (p *Plugin) Stop() error { + return nil +} + +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.CollectMQBrokers, + } +} + +func (p *Plugin) CollectMQBrokers(name endure.Named, c Consumer) { + p.consumers[name.Name()] = c +} + +func (p *Plugin) Available() {} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Push(j *structs.Job) (string, error) { + pipe, pOpts, err := p.cfg.MatchPipeline(j) + if err != nil { + panic(err) + } + + if pOpts != nil { + j.Options.Merge(pOpts) + } + + broker, ok := p.consumers[pipe.Broker()] + if !ok { + panic("broker not found") + } + + id, err := broker.Push(pipe, j) + if err != nil { + panic(err) + } + + // p.events.Push() + + return id, nil +} + +func (p *Plugin) RPC() interface{} { + return &rpc{log: p.log} +} diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go new file mode 100644 index 00000000..e77cda59 --- /dev/null +++ b/plugins/jobs/rpc.go @@ -0,0 +1,20 @@ +package jobs + +import ( + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type rpc struct { + log logger.Logger + p *Plugin +} + +func (r *rpc) Push(j *structs.Job, idRet *string) error { + id, err := r.p.Push(j) + if err != nil { + panic(err) + } + *idRet = id + return nil +} diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go new file mode 100644 index 00000000..2e394543 --- /dev/null +++ b/plugins/jobs/structs/job.go @@ -0,0 +1,35 @@ +package structs + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/utils" +) + +// Job carries information about single job. +type Job struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Body packs job payload into binary payload. +func (j *Job) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (j *Job) Context(id string) []byte { + ctx, _ := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + }{ID: id, Job: j.Job}, + ) + + return ctx +} diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go new file mode 100644 index 00000000..1507d053 --- /dev/null +++ b/plugins/jobs/structs/job_options.go @@ -0,0 +1,70 @@ +package structs + +import "time" + +// Options carry information about how to handle given job. +type Options struct { + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int `json:"delay,omitempty"` + + // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). + // Minimum valuable value is 2. + Attempts int `json:"maxAttempts,omitempty"` + + // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. + RetryDelay int `json:"retryDelay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout int `json:"timeout,omitempty"` +} + +// Merge merges job options. +func (o *Options) Merge(from *Options) { + if o.Pipeline == "" { + o.Pipeline = from.Pipeline + } + + if o.Attempts == 0 { + o.Attempts = from.Attempts + } + + if o.Timeout == 0 { + o.Timeout = from.Timeout + } + + if o.RetryDelay == 0 { + o.RetryDelay = from.RetryDelay + } + + if o.Delay == 0 { + o.Delay = from.Delay + } +} + +// CanRetry must return true if broker is allowed to re-run the job. +func (o *Options) CanRetry(attempt int) bool { + // Attempts 1 and 0 has identical effect + return o.Attempts > (attempt + 1) +} + +// RetryDuration returns retry delay duration in a form of time.Duration. +func (o *Options) RetryDuration() time.Duration { + return time.Second * time.Duration(o.RetryDelay) +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/structs/job_options_test.go new file mode 100644 index 00000000..18702394 --- /dev/null +++ b/plugins/jobs/structs/job_options_test.go @@ -0,0 +1,110 @@ +package structs + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestOptions_CanRetry(t *testing.T) { + opts := &Options{Attempts: 0} + + assert.False(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) +} + +func TestOptions_CanRetry_SameValue(t *testing.T) { + opts := &Options{Attempts: 1} + + assert.False(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) +} + +func TestOptions_CanRetry_Value(t *testing.T) { + opts := &Options{Attempts: 2} + + assert.True(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) + assert.False(t, opts.CanRetry(2)) +} + +func TestOptions_CanRetry_Value3(t *testing.T) { + opts := &Options{Attempts: 3} + + assert.True(t, opts.CanRetry(0)) + assert.True(t, opts.CanRetry(1)) + assert.False(t, opts.CanRetry(2)) +} + +func TestOptions_RetryDuration(t *testing.T) { + opts := &Options{RetryDelay: 0} + assert.Equal(t, time.Duration(0), opts.RetryDuration()) +} + +func TestOptions_RetryDuration2(t *testing.T) { + opts := &Options{RetryDelay: 1} + assert.Equal(t, time.Second, opts.RetryDuration()) +} + +func TestOptions_DelayDuration(t *testing.T) { + opts := &Options{Delay: 0} + assert.Equal(t, time.Duration(0), opts.DelayDuration()) +} + +func TestOptions_DelayDuration2(t *testing.T) { + opts := &Options{Delay: 1} + assert.Equal(t, time.Second, opts.DelayDuration()) +} + +func TestOptions_TimeoutDuration(t *testing.T) { + opts := &Options{Timeout: 0} + assert.Equal(t, time.Minute*30, opts.TimeoutDuration()) +} + +func TestOptions_TimeoutDuration2(t *testing.T) { + opts := &Options{Timeout: 1} + assert.Equal(t, time.Second, opts.TimeoutDuration()) +} + +func TestOptions_Merge(t *testing.T) { + opts := &Options{} + + opts.Merge(&Options{ + Pipeline: "pipeline", + Delay: 2, + Timeout: 1, + Attempts: 1, + RetryDelay: 1, + }) + + assert.Equal(t, "pipeline", opts.Pipeline) + assert.Equal(t, 1, opts.Attempts) + assert.Equal(t, 2, opts.Delay) + assert.Equal(t, 1, opts.Timeout) + assert.Equal(t, 1, opts.RetryDelay) +} + +func TestOptions_MergeKeepOriginal(t *testing.T) { + opts := &Options{ + Pipeline: "default", + Delay: 10, + Timeout: 10, + Attempts: 10, + RetryDelay: 10, + } + + opts.Merge(&Options{ + Pipeline: "pipeline", + Delay: 2, + Timeout: 1, + Attempts: 1, + RetryDelay: 1, + }) + + assert.Equal(t, "default", opts.Pipeline) + assert.Equal(t, 10, opts.Attempts) + assert.Equal(t, 10, opts.Delay) + assert.Equal(t, 10, opts.Timeout) + assert.Equal(t, 10, opts.RetryDelay) +} diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go new file mode 100644 index 00000000..e7240c6b --- /dev/null +++ b/plugins/jobs/structs/job_test.go @@ -0,0 +1,19 @@ +package structs + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestJob_Body(t *testing.T) { + j := &Job{Payload: "hello"} + + assert.Equal(t, []byte("hello"), j.Body()) +} + +func TestJob_Context(t *testing.T) { + j := &Job{Job: "job"} + + assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id")) +} diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 4b675271..e5aac290 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -9,8 +9,8 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 6ae1a1f6..c839130f 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -2,12 +2,15 @@ package boltdb import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) -const PluginName = "boltdb" +const ( + PluginName string = "boltdb" + RootPluginName string = "kv" +) // Plugin BoltDB K/V storage. type Plugin struct { @@ -21,7 +24,7 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(kv.PluginName) { + if !cfg.Has(RootPluginName) { return errors.E(errors.Disabled) } diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index a2787d72..520ec7d5 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -6,8 +6,8 @@ import ( "github.com/bradfitz/gomemcache/memcache" "github.com/spiral/errors" + kv "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index 22ea5cca..59a2b7cb 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -2,12 +2,15 @@ package memcached import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) -const PluginName = "memcached" +const ( + PluginName string = "memcached" + RootPluginName string = "kv" +) type Plugin struct { // config plugin @@ -17,7 +20,7 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(kv.PluginName) { + if !cfg.Has(RootPluginName) { return errors.E(errors.Disabled) } diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go deleted file mode 100644 index ffdbbe62..00000000 --- a/plugins/kv/interface.go +++ /dev/null @@ -1,36 +0,0 @@ -package kv - -import kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" - -// Storage represents single abstract storage. -type Storage interface { - // Has checks if value exists. - Has(keys ...string) (map[string]bool, error) - - // Get loads value content into a byte slice. - Get(key string) ([]byte, error) - - // MGet loads content of multiple values - // Returns the map with existing keys and associated values - MGet(keys ...string) (map[string][]byte, error) - - // Set used to upload item to KV with TTL - // 0 value in TTL means no TTL - Set(items ...*kvv1.Item) error - - // MExpire sets the TTL for multiply keys - MExpire(items ...*kvv1.Item) error - - // TTL return the rest time to live for provided keys - // Not supported for the memcached and boltdb - TTL(keys ...string) (map[string]string, error) - - // Delete one or multiple keys. - Delete(keys ...string) error -} - -// Constructor provides storage based on the config -type Constructor interface { - // KVConstruct provides Storage based on the config key - KVConstruct(key string) (Storage, error) -} diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index 03dbaed6..e9ea25df 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -5,10 +5,12 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) +// PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync. const PluginName string = "kv" const ( @@ -25,9 +27,9 @@ const ( type Plugin struct { log logger.Logger // constructors contains general storage constructors, such as boltdb, memory, memcached, redis. - constructors map[string]Constructor + constructors map[string]kv.Constructor // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. - storages map[string]Storage + storages map[string]kv.Storage // KV configuration cfg Config cfgPlugin config.Configurer @@ -43,8 +45,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if err != nil { return errors.E(op, err) } - p.constructors = make(map[string]Constructor, 5) - p.storages = make(map[string]Storage, 5) + p.constructors = make(map[string]kv.Constructor, 5) + p.storages = make(map[string]kv.Storage, 5) p.log = log p.cfgPlugin = cfg return nil @@ -203,7 +205,7 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) { +func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor kv.Constructor) { // save the storage constructor p.constructors[name.Name()] = constructor } diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index af763600..b9b302fe 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,6 +2,7 @@ package kv import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) @@ -9,7 +10,7 @@ import ( // Wrapper for the plugin type rpc struct { // all available storages - storages map[string]Storage + storages map[string]kv.Storage // svc is a plugin implementing Storage interface srv *Plugin // Logger diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go index 1cf031d1..1906e4fd 100644 --- a/plugins/memory/kv.go +++ b/plugins/memory/kv.go @@ -6,8 +6,8 @@ import ( "time" "github.com/spiral/errors" + kv "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 70badf15..7d418a70 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,9 +2,9 @@ package memory import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -21,7 +21,6 @@ type Plugin struct { func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log - p.log = log p.cfgPlugin = cfg p.stop = make(chan struct{}, 1) return nil diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go index d027a8a5..3c909900 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/pubsub.go @@ -3,8 +3,8 @@ package memory import ( "sync" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go index 5817853c..0cd62d19 100644 --- a/plugins/redis/channel.go +++ b/plugins/redis/channel.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go index 320b7443..378d7630 100644 --- a/plugins/redis/kv.go +++ b/plugins/redis/kv.go @@ -7,8 +7,8 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 9d98790b..3c62a63f 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -5,9 +5,9 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index 4e41acb5..8bd78514 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 00639f43..e2fa0086 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -124,7 +124,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event const op = errors.Op("server_plugin_new_worker") list := make([]events.Listener, 0, len(listeners)) - list = append(list, server.collectWorkerLogs) + list = append(list, server.collectWorkerEvents) spawnCmd, err := server.CmdFactory(env) if err != nil { @@ -147,8 +147,8 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env En return nil, errors.E(op, err) } - list := make([]events.Listener, 0, 1) - list = append(list, server.collectEvents) + list := make([]events.Listener, 0, 22) + list = append(list, server.collectPoolEvents, server.collectWorkerEvents) if len(listeners) != 0 { list = append(list, listeners...) } @@ -209,7 +209,7 @@ func (server *Plugin) setEnv(e Env) []string { return env } -func (server *Plugin) collectEvents(event interface{}) { +func (server *Plugin) collectPoolEvents(event interface{}) { if we, ok := event.(events.PoolEvent); ok { switch we.Event { case events.EventMaxMemory: @@ -238,7 +238,9 @@ func (server *Plugin) collectEvents(event interface{}) { server.log.Warn("requested pool restart") } } +} +func (server *Plugin) collectWorkerEvents(event interface{}) { if we, ok := event.(events.WorkerEvent); ok { switch we.Event { case events.EventWorkerError: @@ -264,16 +266,13 @@ func (server *Plugin) collectEvents(event interface{}) { } } -func (server *Plugin) collectWorkerLogs(event interface{}) { - if we, ok := event.(events.WorkerEvent); ok { - switch we.Event { - case events.EventWorkerError: - server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t")) - case events.EventWorkerLog: - server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t")) - // stderr event is INFO level - case events.EventWorkerStderr: - server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t")) +func (server *Plugin) collectJobsEvents(event interface{}) { //nolint:unused + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { + case events.EventJobStart: + server.log.Info("Job started", "start", jev.Start, "elapsed", jev.Elapsed) + case events.EventJobOK: + server.log.Info("Job OK", "start", jev.Start, "elapsed", jev.Elapsed) } } } diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 664b4dfd..c1f79a78 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -7,7 +7,7 @@ import ( json "github.com/json-iterator/go" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index ca5f2f59..c9a31613 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -10,10 +10,10 @@ import ( "github.com/google/uuid" json "github.com/json-iterator/go" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/config" diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 752ba3ce..758620f6 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,7 +4,7 @@ import ( "sync" json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/utils" |