diff options
83 files changed, 1533 insertions, 220 deletions
diff --git a/.dockerignore b/.dockerignore index bfa82a3d..b817b3c8 100644 --- a/.dockerignore +++ b/.dockerignore @@ -7,4 +7,4 @@ /tests /bin composer.json -vendor_php
\ No newline at end of file +vendor_php diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index ae0b283a..49aeb3c8 100755 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -43,4 +43,4 @@ Project maintainers who do not follow or enforce the Code of Conduct in good fai This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version] [homepage]: http://contributor-covenant.org -[version]: http://contributor-covenant.org/version/1/4/ +[version]: https://www.contributor-covenant.org/version/2/0/code_of_conduct/ diff --git a/common/doc.go b/common/doc.go new file mode 100644 index 00000000..adc03351 --- /dev/null +++ b/common/doc.go @@ -0,0 +1,9 @@ +/* +Package common used to collect common interfaces/structures which might be implemented (or imported) by a different plugins. +For example, 'pubsub' interface might be implemented by memory, redis, websockets and many other plugins. + +Folders: +- kv - contains KV interfaces and structures +- pubsub - contains pub-sub interfaces and structures +*/ +package common diff --git a/plugins/kv/interface.go b/common/kv/interface.go index 5736a6a7..5736a6a7 100644 --- a/plugins/kv/interface.go +++ b/common/kv/interface.go diff --git a/pkg/pubsub/interface.go b/common/pubsub/interface.go index 06252d70..06252d70 100644 --- a/pkg/pubsub/interface.go +++ b/common/pubsub/interface.go diff --git a/pkg/pubsub/psmessage.go b/common/pubsub/psmessage.go index e33d9284..e33d9284 100644 --- a/pkg/pubsub/psmessage.go +++ b/common/pubsub/psmessage.go diff --git a/pkg/events/general.go b/pkg/events/general.go index a09a8759..5cf13e10 100755 --- a/pkg/events/general.go +++ b/pkg/events/general.go @@ -4,6 +4,8 @@ import ( "sync" ) +const UnknownEventType string = "Unknown event type" + // HandlerImpl helps to broadcast events to multiple listeners. type HandlerImpl struct { listeners []Listener diff --git a/pkg/events/interface.go b/pkg/events/interface.go index ac6c15a4..7d57e4d0 100644 --- a/pkg/events/interface.go +++ b/pkg/events/interface.go @@ -2,7 +2,7 @@ package events // Handler interface type Handler interface { - // Return number of active listeners + // NumListeners return number of active listeners NumListeners() int // AddListener adds lister to the publisher AddListener(listener Listener) @@ -10,5 +10,5 @@ type Handler interface { Push(e interface{}) } -// Event listener listens for the events produced by worker, worker pool or other service. +// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service. type Listener func(event interface{}) diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go new file mode 100644 index 00000000..ed07c7da --- /dev/null +++ b/pkg/events/jobs_events.go @@ -0,0 +1,84 @@ +package events + +import ( + "time" +) + +const ( + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK = iota + 12000 + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeConsume when pipeline pipelines has been requested. + EventPipeConsume + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStop when pipeline has begun stopping. + EventPipeStop + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventBrokerReady thrown when broken is ready to accept/serve tasks. + EventBrokerReady +) + +type J int64 + +func (ev J) String() string { + switch ev { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventPipeConsume: + return "EventPipeConsume" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStop: + return "EventPipeStop" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventBrokerReady: + return "EventBrokerReady" + } + return UnknownEventType +} + +// JobEvent represent job event. +type JobEvent struct { + Event J + // String is job id. + ID string + + // Job is failed job. + Job interface{} // this is *jobs.Job, but interface used to avoid package import + + // event timings + Start time.Time + Elapsed time.Duration +} diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go index e7b451e0..4d4cae5d 100644 --- a/pkg/events/pool_events.go +++ b/pkg/events/pool_events.go @@ -57,7 +57,7 @@ func (ev P) String() string { case EventPoolRestart: return "EventPoolRestart" } - return "Unknown event type" + return UnknownEventType } // PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go index 11bd6ab7..39c38e57 100644 --- a/pkg/events/worker_events.go +++ b/pkg/events/worker_events.go @@ -20,7 +20,7 @@ func (ev W) String() string { case EventWorkerStderr: return "EventWorkerStderr" } - return "Unknown event type" + return UnknownEventType } // WorkerEvent wraps worker events. diff --git a/pkg/pool/config.go b/pkg/pool/config.go index 2a3dabe4..3a058956 100644 --- a/pkg/pool/config.go +++ b/pkg/pool/config.go @@ -5,7 +5,7 @@ import ( "time" ) -// Configures the pool behavior. +// Config .. Pool config Configures the pool behavior. type Config struct { // Debug flag creates new fresh worker before every request. Debug bool diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index ab025fa1..74e06b81 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -26,7 +26,7 @@ type Command func() *exec.Cmd // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { - cfg Config + cfg *Config // worker command creator cmd Command @@ -51,7 +51,7 @@ type StaticPool struct { } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) { +func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { const op = errors.Op("static_pool_initialize") if factory == nil { return nil, errors.E(op, errors.Str("no factory initialized")) diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 6f875072..f264c6dc 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -20,7 +20,7 @@ import ( "github.com/stretchr/testify/assert" ) -var cfg = Config{ +var cfg = &Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, @@ -58,7 +58,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -214,7 +214,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } } - var cfg2 = Config{ + var cfg2 = &Config{ NumWorkers: 1, AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, @@ -264,7 +264,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, @@ -283,7 +283,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, @@ -320,7 +320,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ Debug: true, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -360,7 +360,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -400,7 +400,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -422,7 +422,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -452,7 +452,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, @@ -476,7 +476,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -506,7 +506,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { // sleep for the 3 seconds func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ Debug: false, NumWorkers: 1, AllocateTimeout: time.Second, @@ -539,7 +539,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -556,7 +556,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -595,7 +595,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, @@ -626,7 +626,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index dc307c33..348622c7 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/assert" ) -var cfgSupervised = Config{ +var cfgSupervised = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -82,7 +82,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { } func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -123,7 +123,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { } func TestSupervisedPool_Idle(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -171,7 +171,7 @@ func TestSupervisedPool_Idle(t *testing.T) { } func TestSupervisedPool_ExecTTL_OK(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -213,7 +213,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { } func TestSupervisedPool_MaxMemoryReached(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go new file mode 100644 index 00000000..c660ddb6 --- /dev/null +++ b/pkg/priority_queue/binary_heap.go @@ -0,0 +1,12 @@ +/* +binary heap (min-heap) algorithm used as a core for the priority queue +*/ + +package priorityqueue + +type BinHeap struct { +} + +func NewBinHeap() *BinHeap { + return &BinHeap{} +} diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go new file mode 100644 index 00000000..d1c3229f --- /dev/null +++ b/pkg/priority_queue/interface.go @@ -0,0 +1,7 @@ +package priorityqueue + +type Queue interface { + Push() + Pop() + BLPop() +} diff --git a/pkg/priority_queue/queue.go b/pkg/priority_queue/queue.go new file mode 100644 index 00000000..88d18acb --- /dev/null +++ b/pkg/priority_queue/queue.go @@ -0,0 +1,21 @@ +package priorityqueue + +type QueueImpl struct { +} + +func NewPriorityQueue() *QueueImpl { + return nil +} + +// Push the task +func (q *QueueImpl) Push() { + +} + +func (q *QueueImpl) Pop() { + +} + +func (q *QueueImpl) BLPop() { + +} diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go index 46709d71..eda3572f 100644 --- a/plugins/broadcast/interface.go +++ b/plugins/broadcast/interface.go @@ -1,6 +1,6 @@ package broadcast -import "github.com/spiral/roadrunner/v2/pkg/pubsub" +import "github.com/spiral/roadrunner/v2/common/pubsub" type Broadcaster interface { GetDriver(key string) (pubsub.SubReader, error) diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 6ddef806..889dc2fa 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -7,7 +7,7 @@ import ( "github.com/google/uuid" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go index 2ee211f8..475076a0 100644 --- a/plugins/broadcast/rpc.go +++ b/plugins/broadcast/rpc.go @@ -2,7 +2,7 @@ package broadcast import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" ) diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index bec01ac3..fb174792 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -143,7 +143,7 @@ func (p *Plugin) Serve() chan error { func (p *Plugin) serve(errCh chan error) { var err error const op = errors.Op("http_plugin_serve") - p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, @@ -323,7 +323,7 @@ func (p *Plugin) Reset() error { p.pool = nil var err error - p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go new file mode 100644 index 00000000..905f5409 --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -0,0 +1,30 @@ +package ephemeral + +import ( + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +type JobBroker struct { +} + +func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { + return &JobBroker{}, nil +} + +func (j *JobBroker) Push(pipeline *pipeline.Pipeline, job *structs.Job) (string, error) { + panic("implement me") +} + +func (j *JobBroker) Stat() { + panic("implement me") +} + +func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) { + panic("implement me") +} + +func (j *JobBroker) Register(pipeline *pipeline.Pipeline) { + panic("implement me") +} diff --git a/plugins/jobs/brokers/ephemeral/config.go b/plugins/jobs/brokers/ephemeral/config.go new file mode 100644 index 00000000..847b63ea --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/config.go @@ -0,0 +1 @@ +package ephemeral diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go new file mode 100644 index 00000000..84cc871b --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -0,0 +1,28 @@ +package ephemeral + +import ( + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "ephemeral" +) + +type Plugin struct { + log logger.Logger +} + +func (p *Plugin) Init(log logger.Logger) error { + p.log = log + return nil +} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) { + return NewJobBroker(q) +} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go new file mode 100644 index 00000000..bb042ec9 --- /dev/null +++ b/plugins/jobs/config.go @@ -0,0 +1,71 @@ +package jobs + +import ( + "github.com/spiral/errors" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/jobs/dispatcher" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +// Config defines settings for job broker, workers and job-pipeline mapping. +type Config struct { + // Workers configures roadrunner server and worker busy. + // Workers *roadrunner.ServerConfig + poolCfg *poolImpl.Config + + // Dispatch defines where and how to match jobs. + Dispatch map[string]*structs.Options + + // Pipelines defines mapping between PHP job pipeline and associated job broker. + Pipelines map[string]*pipeline.Pipeline + + // Consuming specifies names of pipelines to be consumed on service start. + Consume []string + + // parent config for broken options. + pipelines pipeline.Pipelines + route dispatcher.Dispatcher +} + +func (c *Config) InitDefaults() error { + const op = errors.Op("config_init_defaults") + var err error + c.pipelines, err = pipeline.InitPipelines(c.Pipelines) + if err != nil { + return errors.E(op, err) + } + + if c.poolCfg == nil { + c.poolCfg = &poolImpl.Config{} + } + + c.poolCfg.InitDefaults() + + return nil +} + +// MatchPipeline locates the pipeline associated with the job. +func (c *Config) MatchPipeline(job *structs.Job) (*pipeline.Pipeline, *structs.Options, error) { + const op = errors.Op("config_match_pipeline") + opt := c.route.Match(job) + + pipe := "" + if job.Options != nil { + pipe = job.Options.Pipeline + } + + if pipe == "" && opt != nil { + pipe = opt.Pipeline + } + + if pipe == "" { + return nil, nil, errors.E(op, errors.Errorf("unable to locate pipeline for `%s`", job.Job)) + } + + if p := c.pipelines.Get(pipe); p != nil { + return p, opt, nil + } + + return nil, nil, errors.E(op, errors.Errorf("undefined pipeline `%s`", pipe)) +} diff --git a/plugins/jobs/dispatcher/dispatcher.go b/plugins/jobs/dispatcher/dispatcher.go new file mode 100644 index 00000000..e73e7b74 --- /dev/null +++ b/plugins/jobs/dispatcher/dispatcher.go @@ -0,0 +1,49 @@ +package dispatcher + +import ( + "strings" + + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +var separators = []string{"/", "-", "\\"} + +// Dispatcher provides ability to automatically locate the pipeline for the specific job +// and update job options (if none set). +type Dispatcher map[string]*structs.Options + +// pre-compile patterns +func initDispatcher(routes map[string]*structs.Options) Dispatcher { + dispatcher := make(Dispatcher) + for pattern, opts := range routes { + pattern = strings.ToLower(pattern) + pattern = strings.Trim(pattern, "-.*") + + for _, s := range separators { + pattern = strings.ReplaceAll(pattern, s, ".") + } + + dispatcher[pattern] = opts + } + + return dispatcher +} + +// Match clarifies target job pipeline and other job options. Can return nil. +func (dispatcher Dispatcher) Match(job *structs.Job) (found *structs.Options) { + var best = 0 + + jobName := strings.ToLower(job.Job) + for pattern, opts := range dispatcher { + if strings.HasPrefix(jobName, pattern) && len(pattern) > best { + found = opts + best = len(pattern) + } + } + + if best == 0 { + return nil + } + + return found +} diff --git a/plugins/jobs/dispatcher/dispatcher_test.go b/plugins/jobs/dispatcher/dispatcher_test.go new file mode 100644 index 00000000..e584bda8 --- /dev/null +++ b/plugins/jobs/dispatcher/dispatcher_test.go @@ -0,0 +1,55 @@ +package dispatcher + +import ( + "testing" + + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/stretchr/testify/assert" +) + +func Test_Map_All(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{"default": {Pipeline: "default"}}) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "default"}).Pipeline) +} + +func Test_Map_Miss(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{"some.*": {Pipeline: "default"}}) + + assert.Nil(t, m.Match(&structs.Job{Job: "miss"})) +} + +func Test_Map_Best(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline) +} + +func Test_Map_BestUpper(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.Other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.OTHER"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "Some.other.job"}).Pipeline) +} + +func Test_Map_BestReversed(t *testing.T) { + m := initDispatcher(map[string]*structs.Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline) + assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline) +} diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio new file mode 100644 index 00000000..d452d345 --- /dev/null +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-06-23T11:05:51.495Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="_SQi43bjgSX-nT5V1Ksg" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7VvrU9s4EP9rMkOZIeO3nY+E8Givvebgblrum2IriVvbcmUlJPz1t7Llp5yQgpNAr1BKtHpYu/vbh1amp1+Eq2uK4vkn4uGgpyneqqePepqmDiwFfnHKWlBUS88oM+p7glYS7vxHLIhi4mzhezipDWSEBMyP60SXRBF2WY2GKCUP9WFTEtSfGqMZlgh3Lgpk6hffY/N8d/BV9txgfzZnUleI8vGCkMyRRx4qJP2yp19QQlj2KVxd4IALMBdNNu9qQ2+xN4ojtsuEr1G0ulFJiJePLLKv/8L/Ku6ZZmTLLFGwEEyL3bJ1LgVKFpGH+SpKTx8+zH2G72Lk8t4HUDzQ5iwMoKXCx6kfBBckIDSdq3sIO1MX6Amj5Duu9FiugydT6JH5EKwtMWV4VSEJvq4xMMHoGoaIXt0RMhZAG9g5gh5KtWn5oHlVYbl2kIDKrFi8FCV8ENL8GclKcsQegEs0CWVzMiMRCi5L6rAu6XLMR0JiId9vmLG1sBS0YKQufRAhXX/l8/tm3ryv9o1WYvGstRatjTpIyIK6eBubgk+G6AyzLQPFglwIW1VKcYCYv6xbYJt2xNQx8WHPBRQMw6xDwbQbGs52KqZV7UVaqQEqc9BYKROOtNI5pWhdGRbzAYkEqILn52NMPwrGVj6rQAxa9zmm4HMJMN5YV9F2UFyqO8LSOBQsG2DSrd1g2RVUcp9a+vnbMayknMHP+J+7G1g0+wUbWkdut0HAxI5ntAUBR5voltVRELAbIjY0OQioRksQ0PcVAwZHtc+qdRa2+nP2qezNPo1dw4ZyGAPVmugpcopDGehvZ/5ysFgH8ub6oO8MLNNUbc1QdXtQh47WTCr3DB05h/+EmDvnnv39+PLj+z8vO3XnWAWHbre584Fl66grd97M6fU2d661uPO9pfT2MS1Urblz+7X5c3NHE+3cQl+kUefN+Vy1ps5Su0dTaOfnuhcp1JRcYU+zAtj/MK7p2fqx4HWXYQjs+VFPP4deJV7B/6nQlIx+xrhKeZ9R6QMHxs5Q4M/EPBdki2m928MuocAvEWM4ZGjgR7h8NHyaid/pBic54QOZgFjO7xhduGxBcT4ABDJpTgJa3KTNaZPyLN4DPGVZp8M7m9uGXaYzRuna1I9mBSPF88doHRDkPTnuc8wllRTjinYbkw2D5RHFd1FwnilklKpsKNQzypgYEhg1DdLqGw9mPKaRiAkLVTXRvkKhH3DLucHBEvNVt8bBqcm/W2tb6ZdYtULPvrqJj5baLE8oLfHRaomPzTJGZ8Zn/QLGl0Pv1dvf2I9xxtMTxjUCx7t+ctQ5YziMWWmDaa66mcl9sHTLUVjfbmUXE0msh9rX336IyYI9Z1O/nVXa60i11F2dlaruy1upqqSeo6XzO2fz2qHKp8qO6V/n9dOX6VSRQtAtWC7upwfik29k8q7To/DUcbHber01cUzD3KqD3a1HNZtH4cGOR+G9VTZVTRI0xRAso2r21hA0j811aeaOTwRxfdj0j6HveZnd4cR/RJN0KS5VcacC65rDnjnia4GpJcJHSgqJSIS70YUmlSUcWReDQ1YlVLn8E0Nu0NN4eZ/8yrowmnahWbIu2m5996cL+fxZydPOIC9AIfcoIjuA/CMGoYmczBz5XPRT7nvsYc8e/V+01mJB6mFN6KiVvddde89r6k8XgpyuU4FnFd/VZszb972NfOoNMTyRi7s0/U1++A3W4dXjJx/HvVd93ebq7GiuucaObK7Kge/KcvlUzPUa8+0OuQ3xQtKUkpALcI5T5md+ApEV8yLmJB3CSyRLHwn77mfzTjo+V0ynWvu5wrMmlrkv01baInGbaTdfXunurTn5ADdeJPOTMqPNznAKWCc8vqK3DqV/nJcWncGu0jf2Jn35LaEN4v01MlGriX9D1oB5yERUk4/VkujJgvGk4qJ451mRa3fwfcUfO5xR5Pm47BPCawC+GN4qaQ8l88KScq1+RBMcjEFLWZl9NCGMgd+U1V4vcRboqFpfS3Uzj+X8kSiJM0an/orvIwMMppdLnOEmxcgcxXxCuJrxl9H76CEx+oskfVYXUFHUOlRsXYaK7chQyWndQ0WXPeXNGAhfPt/+cXkrwSYXUEyJi5Pkaac4Qe73WepGP2eI2xCqDlICM/WGqdqG0zd3dJcdVBsfran3oDtXxvfw/P56sRrbg5szWQOHu+9qucjKSguB3CrLCo36Q6O14UajuPQCl5MswjRJOslyoXdHuhtrFZV0mbNNjukoSYp3DLGT07xkw/OMk1NIOJI+78jyD0wpqfHdzRXUvjhKs6fTsgjFWTj9kOVQJ/ll4BvkS2Cxpqw3svVbcaaoA62nX4l/oBjwogHEPH7USEiwxMkG7jb4eJrGy6ozl+8U27x/B45aLVzwtsuK4l3tmp/uIq9tddS/6wUbRdPymlfruM6Ley9SaEuZPf3jhZccArsAv9UEv9F2plNawN9FsWyb4iqi+tHnIY3nEnMEZ4jgFYit+S7TPi84oVn+0WVWVir/fFW//A8=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go new file mode 100644 index 00000000..bb0e8c50 --- /dev/null +++ b/plugins/jobs/interface.go @@ -0,0 +1,19 @@ +package jobs + +import ( + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" +) + +// Consumer todo naming +type Consumer interface { + Push(*pipeline.Pipeline, *structs.Job) (string, error) + Stat() + Consume(*pipeline.Pipeline) + Register(*pipeline.Pipeline) +} + +type Broker interface { + InitJobBroker(queue priorityqueue.Queue) (Consumer, error) +} diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go new file mode 100644 index 00000000..f27f6ede --- /dev/null +++ b/plugins/jobs/pipeline/pipeline.go @@ -0,0 +1,172 @@ +package pipeline + +import ( + "time" + + "github.com/spiral/errors" +) + +// Pipelines is list of Pipeline. + +type Pipelines []*Pipeline + +func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) { + const op = errors.Op("pipeline_init") + out := make(Pipelines, 0) + + for name, pipe := range pipes { + if pipe.Broker() == "" { + return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker")) + } + + p := pipe.With("name", name) + out = append(out, &p) + } + + return out, nil +} + +// Reverse returns pipelines in reversed order. +func (ps Pipelines) Reverse() Pipelines { + out := make(Pipelines, len(ps)) + + for i, p := range ps { + out[len(ps)-i-1] = p + } + + return out +} + +// Broker return pipelines associated with specific broker. +func (ps Pipelines) Broker(broker string) Pipelines { + out := make(Pipelines, 0) + + for _, p := range ps { + if p.Broker() != broker { + continue + } + + out = append(out, p) + } + + return out +} + +// Names returns only pipelines with specified names. +func (ps Pipelines) Names(only ...string) Pipelines { + out := make(Pipelines, 0) + + for _, name := range only { + for _, p := range ps { + if p.Name() == name { + out = append(out, p) + } + } + } + + return out +} + +// Get returns pipeline by it'svc name. +func (ps Pipelines) Get(name string) *Pipeline { + // possibly optimize + for _, p := range ps { + if p.Name() == name { + return p + } + } + + return nil +} + +// Pipeline defines pipeline options. +type Pipeline map[string]interface{} + +// With pipeline value. Immutable. +func (p Pipeline) With(name string, value interface{}) Pipeline { + out := make(map[string]interface{}) + for k, v := range p { + out[k] = v + } + out[name] = value + + return out +} + +// Name returns pipeline name. +func (p Pipeline) Name() string { + return p.String("name", "") +} + +// Broker associated with the pipeline. +func (p Pipeline) Broker() string { + return p.String("broker", "") +} + +// Has checks if value presented in pipeline. +func (p Pipeline) Has(name string) bool { + if _, ok := p[name]; ok { + return true + } + + return false +} + +// Map must return nested map value or empty config. +func (p Pipeline) Map(name string) Pipeline { + out := make(map[string]interface{}) + + if value, ok := p[name]; ok { + if m, ok := value.(map[string]interface{}); ok { + for k, v := range m { + out[k] = v + } + } + } + + return out +} + +// Bool must return option value as string or return default value. +func (p Pipeline) Bool(name string, d bool) bool { + if value, ok := p[name]; ok { + if b, ok := value.(bool); ok { + return b + } + } + + return d +} + +// String must return option value as string or return default value. +func (p Pipeline) String(name string, d string) string { + if value, ok := p[name]; ok { + if str, ok := value.(string); ok { + return str + } + } + + return d +} + +// Integer must return option value as string or return default value. +func (p Pipeline) Integer(name string, d int) int { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return str + } + } + + return d +} + +// Duration must return option value as time.Duration (seconds) or return default value. +func (p Pipeline) Duration(name string, d time.Duration) time.Duration { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return time.Second * time.Duration(str) + } + } + + return d +} diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go new file mode 100644 index 00000000..f03dcbb8 --- /dev/null +++ b/plugins/jobs/pipeline/pipeline_test.go @@ -0,0 +1,90 @@ +package pipeline + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPipeline_Map(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} + + assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0)) + assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0)) +} + +func TestPipeline_MapString(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}} + + assert.Equal(t, "default", pipe.Map("options").String("alias", "")) + assert.Equal(t, "", pipe.Map("other").String("alias", "")) +} + +func TestPipeline_Bool(t *testing.T) { + pipe := Pipeline{"value": true} + + assert.Equal(t, true, pipe.Bool("value", false)) + assert.Equal(t, true, pipe.Bool("other", true)) +} + +func TestPipeline_String(t *testing.T) { + pipe := Pipeline{"value": "value"} + + assert.Equal(t, "value", pipe.String("value", "")) + assert.Equal(t, "value", pipe.String("other", "value")) +} + +func TestPipeline_Integer(t *testing.T) { + pipe := Pipeline{"value": 1} + + assert.Equal(t, 1, pipe.Integer("value", 0)) + assert.Equal(t, 1, pipe.Integer("other", 1)) +} + +func TestPipeline_Duration(t *testing.T) { + pipe := Pipeline{"value": 1} + + assert.Equal(t, time.Second, pipe.Duration("value", 0)) + assert.Equal(t, time.Second, pipe.Duration("other", time.Second)) +} + +func TestPipeline_Has(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} + + assert.Equal(t, true, pipe.Has("options")) + assert.Equal(t, false, pipe.Has("other")) +} + +func TestPipeline_FilterBroker(t *testing.T) { + pipes := Pipelines{ + &Pipeline{"name": "first", "broker": "a"}, + &Pipeline{"name": "second", "broker": "a"}, + &Pipeline{"name": "third", "broker": "b"}, + &Pipeline{"name": "forth", "broker": "b"}, + } + + filtered := pipes.Names("first", "third") + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "a", filtered[0].Broker()) + assert.Equal(t, "b", filtered[1].Broker()) + + filtered = pipes.Names("first", "third").Reverse() + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "a", filtered[1].Broker()) + assert.Equal(t, "b", filtered[0].Broker()) + + filtered = pipes.Broker("a") + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "first", filtered[0].Name()) + assert.Equal(t, "second", filtered[1].Name()) + + filtered = pipes.Broker("a").Reverse() + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "first", filtered[1].Name()) + assert.Equal(t, "second", filtered[0].Name()) +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go new file mode 100644 index 00000000..90932edd --- /dev/null +++ b/plugins/jobs/plugin.go @@ -0,0 +1,148 @@ +package jobs + +import ( + "context" + "fmt" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" +) + +const ( + // RrJobs env variable + RrJobs string = "rr_jobs" + PluginName string = "jobs" +) + +type Plugin struct { + cfg *Config + log logger.Logger + + workersPool pool.Pool + server server.Server + + brokers map[string]Broker + consumers map[string]Consumer + + events events.Handler + + // priority queue implementation + queue priorityqueue.Queue +} + +func testListener(data interface{}) { + fmt.Println(data) +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { + const op = errors.Op("jobs_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, err) + } + + err = p.cfg.InitDefaults() + if err != nil { + return errors.E(op, err) + } + + p.server = server + p.events = events.NewEventsHandler() + p.events.AddListener(testListener) + p.brokers = make(map[string]Broker) + p.consumers = make(map[string]Consumer) + + // initialize priority queue + p.queue = priorityqueue.NewPriorityQueue() + p.log = log + + return nil +} + +func (p *Plugin) Serve() chan error { + errCh := make(chan error, 1) + + for name := range p.brokers { + jb, err := p.brokers[name].InitJobBroker(p.queue) + if err != nil { + errCh <- err + return errCh + } + + p.consumers[name] = jb + } + + var err error + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) + if err != nil { + errCh <- err + return errCh + } + + // initialize sub-plugins + // provide a queue to them + // start consume loop + // start resp loop + + return errCh +} + +func (p *Plugin) Stop() error { + return nil +} + +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.CollectMQBrokers, + } +} + +func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) { + p.brokers[name.Name()] = c +} + +func (p *Plugin) Available() {} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Push(j *structs.Job) (string, error) { + pipe, pOpts, err := p.cfg.MatchPipeline(j) + if err != nil { + panic(err) + } + + if pOpts != nil { + j.Options.Merge(pOpts) + } + + broker, ok := p.consumers[pipe.Broker()] + if !ok { + panic("broker not found") + } + + id, err := broker.Push(pipe, j) + if err != nil { + panic(err) + } + + // p.events.Push() + + return id, nil +} + +func (p *Plugin) RPC() interface{} { + return &rpc{log: p.log} +} diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go new file mode 100644 index 00000000..e77cda59 --- /dev/null +++ b/plugins/jobs/rpc.go @@ -0,0 +1,20 @@ +package jobs + +import ( + "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type rpc struct { + log logger.Logger + p *Plugin +} + +func (r *rpc) Push(j *structs.Job, idRet *string) error { + id, err := r.p.Push(j) + if err != nil { + panic(err) + } + *idRet = id + return nil +} diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go new file mode 100644 index 00000000..2e394543 --- /dev/null +++ b/plugins/jobs/structs/job.go @@ -0,0 +1,35 @@ +package structs + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/utils" +) + +// Job carries information about single job. +type Job struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Body packs job payload into binary payload. +func (j *Job) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (j *Job) Context(id string) []byte { + ctx, _ := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + }{ID: id, Job: j.Job}, + ) + + return ctx +} diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go new file mode 100644 index 00000000..1507d053 --- /dev/null +++ b/plugins/jobs/structs/job_options.go @@ -0,0 +1,70 @@ +package structs + +import "time" + +// Options carry information about how to handle given job. +type Options struct { + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int `json:"delay,omitempty"` + + // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). + // Minimum valuable value is 2. + Attempts int `json:"maxAttempts,omitempty"` + + // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. + RetryDelay int `json:"retryDelay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout int `json:"timeout,omitempty"` +} + +// Merge merges job options. +func (o *Options) Merge(from *Options) { + if o.Pipeline == "" { + o.Pipeline = from.Pipeline + } + + if o.Attempts == 0 { + o.Attempts = from.Attempts + } + + if o.Timeout == 0 { + o.Timeout = from.Timeout + } + + if o.RetryDelay == 0 { + o.RetryDelay = from.RetryDelay + } + + if o.Delay == 0 { + o.Delay = from.Delay + } +} + +// CanRetry must return true if broker is allowed to re-run the job. +func (o *Options) CanRetry(attempt int) bool { + // Attempts 1 and 0 has identical effect + return o.Attempts > (attempt + 1) +} + +// RetryDuration returns retry delay duration in a form of time.Duration. +func (o *Options) RetryDuration() time.Duration { + return time.Second * time.Duration(o.RetryDelay) +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/structs/job_options_test.go new file mode 100644 index 00000000..18702394 --- /dev/null +++ b/plugins/jobs/structs/job_options_test.go @@ -0,0 +1,110 @@ +package structs + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestOptions_CanRetry(t *testing.T) { + opts := &Options{Attempts: 0} + + assert.False(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) +} + +func TestOptions_CanRetry_SameValue(t *testing.T) { + opts := &Options{Attempts: 1} + + assert.False(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) +} + +func TestOptions_CanRetry_Value(t *testing.T) { + opts := &Options{Attempts: 2} + + assert.True(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) + assert.False(t, opts.CanRetry(2)) +} + +func TestOptions_CanRetry_Value3(t *testing.T) { + opts := &Options{Attempts: 3} + + assert.True(t, opts.CanRetry(0)) + assert.True(t, opts.CanRetry(1)) + assert.False(t, opts.CanRetry(2)) +} + +func TestOptions_RetryDuration(t *testing.T) { + opts := &Options{RetryDelay: 0} + assert.Equal(t, time.Duration(0), opts.RetryDuration()) +} + +func TestOptions_RetryDuration2(t *testing.T) { + opts := &Options{RetryDelay: 1} + assert.Equal(t, time.Second, opts.RetryDuration()) +} + +func TestOptions_DelayDuration(t *testing.T) { + opts := &Options{Delay: 0} + assert.Equal(t, time.Duration(0), opts.DelayDuration()) +} + +func TestOptions_DelayDuration2(t *testing.T) { + opts := &Options{Delay: 1} + assert.Equal(t, time.Second, opts.DelayDuration()) +} + +func TestOptions_TimeoutDuration(t *testing.T) { + opts := &Options{Timeout: 0} + assert.Equal(t, time.Minute*30, opts.TimeoutDuration()) +} + +func TestOptions_TimeoutDuration2(t *testing.T) { + opts := &Options{Timeout: 1} + assert.Equal(t, time.Second, opts.TimeoutDuration()) +} + +func TestOptions_Merge(t *testing.T) { + opts := &Options{} + + opts.Merge(&Options{ + Pipeline: "pipeline", + Delay: 2, + Timeout: 1, + Attempts: 1, + RetryDelay: 1, + }) + + assert.Equal(t, "pipeline", opts.Pipeline) + assert.Equal(t, 1, opts.Attempts) + assert.Equal(t, 2, opts.Delay) + assert.Equal(t, 1, opts.Timeout) + assert.Equal(t, 1, opts.RetryDelay) +} + +func TestOptions_MergeKeepOriginal(t *testing.T) { + opts := &Options{ + Pipeline: "default", + Delay: 10, + Timeout: 10, + Attempts: 10, + RetryDelay: 10, + } + + opts.Merge(&Options{ + Pipeline: "pipeline", + Delay: 2, + Timeout: 1, + Attempts: 1, + RetryDelay: 1, + }) + + assert.Equal(t, "default", opts.Pipeline) + assert.Equal(t, 10, opts.Attempts) + assert.Equal(t, 10, opts.Delay) + assert.Equal(t, 10, opts.Timeout) + assert.Equal(t, 10, opts.RetryDelay) +} diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go new file mode 100644 index 00000000..e7240c6b --- /dev/null +++ b/plugins/jobs/structs/job_test.go @@ -0,0 +1,19 @@ +package structs + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestJob_Body(t *testing.T) { + j := &Job{Payload: "hello"} + + assert.Equal(t, []byte("hello"), j.Body()) +} + +func TestJob_Context(t *testing.T) { + j := &Job{Job: "job"} + + assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id")) +} diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 47d37cc2..0f737fbd 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -9,8 +9,8 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 6ae1a1f6..c839130f 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -2,12 +2,15 @@ package boltdb import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) -const PluginName = "boltdb" +const ( + PluginName string = "boltdb" + RootPluginName string = "kv" +) // Plugin BoltDB K/V storage. type Plugin struct { @@ -21,7 +24,7 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(kv.PluginName) { + if !cfg.Has(RootPluginName) { return errors.E(errors.Disabled) } diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index 14e7c078..42e342ac 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -6,8 +6,8 @@ import ( "github.com/bradfitz/gomemcache/memcache" "github.com/spiral/errors" + kv "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index 22ea5cca..59a2b7cb 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -2,12 +2,15 @@ package memcached import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) -const PluginName = "memcached" +const ( + PluginName string = "memcached" + RootPluginName string = "kv" +) type Plugin struct { // config plugin @@ -17,7 +20,7 @@ type Plugin struct { } func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(kv.PluginName) { + if !cfg.Has(RootPluginName) { return errors.E(errors.Disabled) } diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index 03dbaed6..e9ea25df 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -5,10 +5,12 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) +// PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync. const PluginName string = "kv" const ( @@ -25,9 +27,9 @@ const ( type Plugin struct { log logger.Logger // constructors contains general storage constructors, such as boltdb, memory, memcached, redis. - constructors map[string]Constructor + constructors map[string]kv.Constructor // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. - storages map[string]Storage + storages map[string]kv.Storage // KV configuration cfg Config cfgPlugin config.Configurer @@ -43,8 +45,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if err != nil { return errors.E(op, err) } - p.constructors = make(map[string]Constructor, 5) - p.storages = make(map[string]Storage, 5) + p.constructors = make(map[string]kv.Constructor, 5) + p.storages = make(map[string]kv.Storage, 5) p.log = log p.cfgPlugin = cfg return nil @@ -203,7 +205,7 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) { +func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor kv.Constructor) { // save the storage constructor p.constructors[name.Name()] = constructor } diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 3f7ba97c..ad4aefa9 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,6 +2,7 @@ package kv import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) @@ -9,7 +10,7 @@ import ( // Wrapper for the plugin type rpc struct { // all available storages - storages map[string]Storage + storages map[string]kv.Storage // svc is a plugin implementing Storage interface srv *Plugin // Logger diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go index c13c2314..3cec1f97 100644 --- a/plugins/memory/kv.go +++ b/plugins/memory/kv.go @@ -6,8 +6,8 @@ import ( "time" "github.com/spiral/errors" + kv "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 70badf15..7d418a70 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,9 +2,9 @@ package memory import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -21,7 +21,6 @@ type Plugin struct { func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log - p.log = log p.cfgPlugin = cfg p.stop = make(chan struct{}, 1) return nil diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go index d027a8a5..3c909900 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/pubsub.go @@ -3,8 +3,8 @@ package memory import ( "sync" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go index 5817853c..0cd62d19 100644 --- a/plugins/redis/channel.go +++ b/plugins/redis/channel.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go index 2e4b9bfd..5bf03af1 100644 --- a/plugins/redis/kv.go +++ b/plugins/redis/kv.go @@ -7,8 +7,8 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 9d98790b..3c62a63f 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -5,9 +5,9 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index 4e41acb5..8bd78514 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/server/interface.go b/plugins/server/interface.go index 0424d52d..b0f84a7f 100644 --- a/plugins/server/interface.go +++ b/plugins/server/interface.go @@ -19,5 +19,5 @@ type Server interface { // NewWorker return a new worker with provided and attached by the user listeners and environment variables NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) // NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration - NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) + NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 00639f43..038d83d4 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -21,14 +21,14 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -// PluginName for the server -const PluginName = "server" - -// RrRelay env variable key (internal) -const RrRelay = "RR_RELAY" - -// RrRPC env variable key (internal) if the RPC presents -const RrRPC = "RR_RPC" +const ( + // PluginName for the server + PluginName = "server" + // RrRelay env variable key (internal) + RrRelay = "RR_RELAY" + // RrRPC env variable key (internal) if the RPC presents + RrRPC = "RR_RPC" +) // Plugin manages worker type Plugin struct { @@ -124,7 +124,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event const op = errors.Op("server_plugin_new_worker") list := make([]events.Listener, 0, len(listeners)) - list = append(list, server.collectWorkerLogs) + list = append(list, server.collectWorkerEvents) spawnCmd, err := server.CmdFactory(env) if err != nil { @@ -140,15 +140,16 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event } // NewWorkerPool issues new worker pool. -func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) { +func (server *Plugin) NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) { const op = errors.Op("server_plugin_new_worker_pool") + spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, errors.E(op, err) } - list := make([]events.Listener, 0, 1) - list = append(list, server.collectEvents) + list := make([]events.Listener, 0, 2) + list = append(list, server.collectPoolEvents, server.collectWorkerEvents) if len(listeners) != 0 { list = append(list, listeners...) } @@ -209,7 +210,7 @@ func (server *Plugin) setEnv(e Env) []string { return env } -func (server *Plugin) collectEvents(event interface{}) { +func (server *Plugin) collectPoolEvents(event interface{}) { if we, ok := event.(events.PoolEvent); ok { switch we.Event { case events.EventMaxMemory: @@ -238,7 +239,9 @@ func (server *Plugin) collectEvents(event interface{}) { server.log.Warn("requested pool restart") } } +} +func (server *Plugin) collectWorkerEvents(event interface{}) { if we, ok := event.(events.WorkerEvent); ok { switch we.Event { case events.EventWorkerError: @@ -264,16 +267,13 @@ func (server *Plugin) collectEvents(event interface{}) { } } -func (server *Plugin) collectWorkerLogs(event interface{}) { - if we, ok := event.(events.WorkerEvent); ok { - switch we.Event { - case events.EventWorkerError: - server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t")) - case events.EventWorkerLog: - server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t")) - // stderr event is INFO level - case events.EventWorkerStderr: - server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t")) +func (server *Plugin) collectJobsEvents(event interface{}) { //nolint:unused + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { + case events.EventJobStart: + server.log.Info("Job started", "start", jev.Start, "elapsed", jev.Elapsed) + case events.EventJobOK: + server.log.Info("Job OK", "start", jev.Start, "elapsed", jev.Elapsed) } } } diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 664b4dfd..c1f79a78 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -7,7 +7,7 @@ import ( json "github.com/json-iterator/go" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index ca5f2f59..5925a588 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -10,10 +10,10 @@ import ( "github.com/google/uuid" json "github.com/json-iterator/go" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/config" @@ -106,7 +106,7 @@ func (p *Plugin) Serve() chan error { p.Lock() defer p.Unlock() - p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ + p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, @@ -273,7 +273,7 @@ func (p *Plugin) Reset() error { p.phpPool = nil var err error - p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ + p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, MaxJobs: p.cfg.Pool.MaxJobs, diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 752ba3ce..758620f6 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,7 +4,7 @@ import ( "sync" json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/utils" diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto new file mode 100644 index 00000000..46434fa8 --- /dev/null +++ b/proto/jobs/v1beta/jobs.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package kv.v1beta; +option go_package = "./;jobsv1beta"; + +message Request { + // could be an enum in the future + string storage = 1; + repeated Item items = 2; +} + +message Item { + string key = 1; + bytes value = 2; + // RFC 3339 + string timeout = 3; +} + +// KV response for the KV RPC methods +message Response { + repeated Item items = 1; +} diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go index 622967b8..75578bff 100644 --- a/proto/kv/v1beta/kv.pb.go +++ b/proto/kv/v1beta/kv.pb.go @@ -7,10 +7,11 @@ package kvv1beta import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go index ad4ebbe7..a2868118 100644 --- a/proto/websockets/v1beta/websockets.pb.go +++ b/proto/websockets/v1beta/websockets.pb.go @@ -7,10 +7,11 @@ package websocketsv1beta import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/tests/composer.json b/tests/composer.json index 50178d1f..fa5925b7 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -2,7 +2,7 @@ "minimum-stability": "beta", "prefer-stable": true, "require": { - "nyholm/psr7": "^1.3", + "nyholm/psr7": "^1.4", "spiral/roadrunner": "^2.0", "spiral/roadrunner-http": "^2.0", "temporal/sdk": ">=1.0", diff --git a/tests/docker-compose-jobs.yml b/tests/docker-compose-jobs.yml new file mode 100644 index 00000000..7b88c9cf --- /dev/null +++ b/tests/docker-compose-jobs.yml @@ -0,0 +1,22 @@ +version: "3" + +services: + beanstalk: + image: schickling/beanstalkd + ports: + - "11300:11300" + + sqs: + image: vsouza/sqs-local + ports: + - "9324:9324" + + rabbitmq: + image: rabbitmq:3-management + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + RABBITMQ_DEFAULT_VHOST: / + ports: + - "15672:15672" + - "5672:5672"
\ No newline at end of file diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index 0ec813f3..d8bedf29 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -176,7 +176,7 @@ func TestBroadcastNoConfig(t *testing.T) { } func TestBroadcastSameSubscriber(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) cfg := &config.Viper{ @@ -189,11 +189,11 @@ func TestBroadcastSameSubscriber(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).MinTimes(1) - mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).AnyTimes() + mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes() mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2) mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) @@ -279,14 +279,15 @@ func TestBroadcastSameSubscriber(t *testing.T) { t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) - time.Sleep(time.Second * 4) stopCh <- struct{}{} wg.Wait() + + time.Sleep(time.Second * 5) } func TestBroadcastSameSubscriberGlobal(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) cfg := &config.Viper{ @@ -299,11 +300,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).MinTimes(1) - mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).AnyTimes() + mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes() mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2) mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) @@ -389,10 +390,10 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) - time.Sleep(time.Second * 4) stopCh <- struct{}{} wg.Wait() + time.Sleep(time.Second * 5) } func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) { diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml index d8daa251..66114d64 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml index 2ca97055..ea25988c 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" @@ -38,9 +36,4 @@ broadcast: logs: mode: development - level: error - -endure: - grace_period: 120s - print_graph: false - log_level: error + level: info diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml index 360e05e5..cbe18196 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" @@ -35,9 +33,4 @@ broadcast: logs: mode: development - level: debug - -endure: - grace_period: 120s - print_graph: false - log_level: error + level: info diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go index d3b16256..01ad1479 100644 --- a/tests/plugins/broadcast/plugins/plugin1.go +++ b/tests/plugins/broadcast/plugins/plugin1.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,14 @@ type Plugin1 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + p.exit = make(chan struct{}, 1) return nil } @@ -39,16 +42,22 @@ func (p *Plugin1) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + } } }() @@ -59,6 +68,8 @@ func (p *Plugin1) Stop() error { _ = p.driver.Unsubscribe("1", "foo") _ = p.driver.Unsubscribe("1", "foo2") _ = p.driver.Unsubscribe("1", "foo3") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go index 2bd819d2..ee072ffe 100644 --- a/tests/plugins/broadcast/plugins/plugin2.go +++ b/tests/plugins/broadcast/plugins/plugin2.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,13 @@ type Plugin2 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + exit chan struct{} } func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +40,22 @@ func (p *Plugin2) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + } } }() @@ -56,6 +64,7 @@ func (p *Plugin2) Serve() chan error { func (p *Plugin2) Stop() error { _ = p.driver.Unsubscribe("2", "foo") + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go index ef926222..288201d1 100644 --- a/tests/plugins/broadcast/plugins/plugin3.go +++ b/tests/plugins/broadcast/plugins/plugin3.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin3 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin3) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + } } }() @@ -56,6 +66,7 @@ func (p *Plugin3) Serve() chan error { func (p *Plugin3) Stop() error { _ = p.driver.Unsubscribe("3", "foo") + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go index c9b94777..56f79c0f 100644 --- a/tests/plugins/broadcast/plugins/plugin4.go +++ b/tests/plugins/broadcast/plugins/plugin4.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin4 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin4) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin4) Serve() chan error { func (p *Plugin4) Stop() error { _ = p.driver.Unsubscribe("4", "foo") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go index 01562a8f..e7cd7e60 100644 --- a/tests/plugins/broadcast/plugins/plugin5.go +++ b/tests/plugins/broadcast/plugins/plugin5.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin5 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin5) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin5) Serve() chan error { func (p *Plugin5) Stop() error { _ = p.driver.Unsubscribe("5", "foo") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go index 76f2d6e8..08272196 100644 --- a/tests/plugins/broadcast/plugins/plugin6.go +++ b/tests/plugins/broadcast/plugins/plugin6.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin6 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin6) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin6) Serve() chan error { func (p *Plugin6) Stop() error { _ = p.driver.Unsubscribe("6", "foo") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/headers/configs/.rr-cors-headers.yaml b/tests/plugins/headers/configs/.rr-cors-headers.yaml index 9d2ef7e5..b4e960f1 100644 --- a/tests/plugins/headers/configs/.rr-cors-headers.yaml +++ b/tests/plugins/headers/configs/.rr-cors-headers.yaml @@ -1,9 +1,5 @@ server: command: "php ../../http/client.php headers pipes" - user: "" - group: "" - env: - "RR_HTTP": "true" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/http/configs/.rr-env.yaml b/tests/plugins/http/configs/.rr-env.yaml index 99358b04..4ea8ec73 100644 --- a/tests/plugins/http/configs/.rr-env.yaml +++ b/tests/plugins/http/configs/.rr-env.yaml @@ -3,17 +3,13 @@ rpc: server: command: "php ../../http/client.php env pipes" - user: "" - group: "" - env: - "env_key": "ENV_VALUE" relay: "pipes" relay_timeout: "20s" http: address: 127.0.0.1:12084 max_request_size: 1024 - middleware: [ "" ] + middleware: [] env: "RR_HTTP": "true" "env_key": "ENV_VALUE" diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index 40e3a720..37d9452c 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -26,7 +26,7 @@ func TestHandler_Echo(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -77,7 +77,7 @@ func TestHandler_Headers(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "header", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -138,7 +138,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -198,7 +198,7 @@ func TestHandler_User_Agent(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -258,7 +258,7 @@ func TestHandler_Cookies(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "cookie", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -323,7 +323,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -387,7 +387,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -447,7 +447,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -507,7 +507,7 @@ func TestHandler_FormData_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -580,7 +580,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -653,7 +653,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -725,7 +725,7 @@ func TestHandler_FormData_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -797,7 +797,7 @@ func TestHandler_FormData_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -869,7 +869,7 @@ func TestHandler_Multipart_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -983,7 +983,7 @@ func TestHandler_Multipart_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1097,7 +1097,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1213,7 +1213,7 @@ func TestHandler_Error(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1259,7 +1259,7 @@ func TestHandler_Error2(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error2", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1305,7 +1305,7 @@ func TestHandler_Error3(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1364,7 +1364,7 @@ func TestHandler_ResponseDuration(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1425,7 +1425,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echoDelay", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1485,7 +1485,7 @@ func TestHandler_ErrorDuration(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1560,7 +1560,7 @@ func TestHandler_IP(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1621,7 +1621,7 @@ func TestHandler_XRealIP(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1687,7 +1687,7 @@ func TestHandler_XForwardedFor(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1752,7 +1752,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1800,7 +1800,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go index df696668..d02f9eee 100644 --- a/tests/plugins/http/uploads_test.go +++ b/tests/plugins/http/uploads_test.go @@ -31,7 +31,7 @@ func TestHandler_Upload_File(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -114,7 +114,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -197,7 +197,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -280,7 +280,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go index 43335999..62816d02 100644 --- a/tests/plugins/informer/test_plugin.go +++ b/tests/plugins/informer/test_plugin.go @@ -10,7 +10,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" ) -var testPoolConfig = pool.Config{ +var testPoolConfig = &pool.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml new file mode 100644 index 00000000..b21f764c --- /dev/null +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -0,0 +1,57 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + relay: "pipes" + relay_timeout: "20s" + +jobs: + # worker pool configuration + pool: + num_workers: 4 + + # rabbitmq and similar servers + amqp: + addr: amqp://guest:guest@localhost:5672/ + + # beanstalk configuration + beanstalk: + addr: tcp://localhost:11300 + + # amazon sqs configuration + sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://localhost:9324 + + # job destinations and options + dispatch: + spiral-jobs-tests-amqp-*.pipeline: amqp + spiral-jobs-tests-local-*.pipeline: local + spiral-jobs-tests-beanstalk-*.pipeline: beanstalk + spiral-jobs-tests-sqs-*.pipeline: sqs + + # list of broker pipelines associated with endpoints + pipelines: + local: + broker: ephemeral + + amqp: + broker: amqp + queue: default + + beanstalk: + broker: beanstalk + tube: default + + sqs: + broker: sqs + queue: default + declare: + MessageRetentionPeriod: 86400 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: ["local", "amqp", "beanstalk", "sqs"] + diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go new file mode 100644 index 00000000..e8b4e83d --- /dev/null +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -0,0 +1,90 @@ +package jobs + +import ( + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/logger" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/stretchr/testify/assert" +) + +func TestJobsInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-jobs-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &ephemeral.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + + stopCh <- struct{}{} + + wg.Wait() +} diff --git a/tests/plugins/resetter/test_plugin.go b/tests/plugins/resetter/test_plugin.go index 61942516..5c26cbd0 100644 --- a/tests/plugins/resetter/test_plugin.go +++ b/tests/plugins/resetter/test_plugin.go @@ -9,7 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" ) -var testPoolConfig = poolImpl.Config{ +var testPoolConfig = &poolImpl.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go index f1c13734..e813e456 100644 --- a/tests/plugins/server/plugin_pipes.go +++ b/tests/plugins/server/plugin_pipes.go @@ -15,7 +15,7 @@ import ( const ConfigSection = "server" const Response = "test" -var testPoolConfig = pool.Config{ +var testPoolConfig = &pool.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php index d0c72eae..b4a028d4 100644 --- a/tests/psr-worker-bench.php +++ b/tests/psr-worker-bench.php @@ -56,4 +56,4 @@ if ($env->getMode() === 'http') { } $factory->run(); -}
\ No newline at end of file +} diff --git a/tests/psr-worker.php b/tests/psr-worker.php index db53eee2..de4befbc 100644 --- a/tests/psr-worker.php +++ b/tests/psr-worker.php @@ -20,7 +20,7 @@ while ($req = $psr7->waitRequest()) { try { $resp = new \Nyholm\Psr7\Response(); $resp->getBody()->write(str_repeat("hello world", 1000)); - + $psr7->respond($resp); } catch (\Throwable $e) { $psr7->getWorker()->error((string)$e); diff --git a/tests/worker-cors.php b/tests/worker-cors.php new file mode 100644 index 00000000..ea3c986c --- /dev/null +++ b/tests/worker-cors.php @@ -0,0 +1,15 @@ +<?php + +use Spiral\RoadRunner\Worker; +use Spiral\RoadRunner\Http\HttpWorker; + +ini_set('display_errors', 'stderr'); +require __DIR__ . '/vendor/autoload.php'; + +$http = new HttpWorker(Worker::create()); + +while ($req = $http->waitRequest()) { + $http->respond(200, 'Response', [ + 'Access-Control-Allow-Origin' => ['*'] + ]); +} |