summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.dockerignore2
-rwxr-xr-xCODE_OF_CONDUCT.md2
-rw-r--r--common/doc.go9
-rw-r--r--common/kv/interface.go (renamed from plugins/kv/interface.go)0
-rw-r--r--common/pubsub/interface.go (renamed from pkg/pubsub/interface.go)0
-rw-r--r--common/pubsub/psmessage.go (renamed from pkg/pubsub/psmessage.go)0
-rwxr-xr-xpkg/events/general.go2
-rw-r--r--pkg/events/interface.go4
-rw-r--r--pkg/events/jobs_events.go84
-rw-r--r--pkg/events/pool_events.go2
-rw-r--r--pkg/events/worker_events.go2
-rw-r--r--pkg/pool/config.go2
-rwxr-xr-xpkg/pool/static_pool.go4
-rwxr-xr-xpkg/pool/static_pool_test.go32
-rw-r--r--pkg/pool/supervisor_test.go12
-rw-r--r--pkg/priority_queue/binary_heap.go12
-rw-r--r--pkg/priority_queue/interface.go6
-rw-r--r--pkg/priority_queue/queue.go19
-rw-r--r--plugins/broadcast/interface.go2
-rw-r--r--plugins/broadcast/plugin.go2
-rw-r--r--plugins/broadcast/rpc.go2
-rw-r--r--plugins/http/plugin.go4
-rw-r--r--plugins/informer/rpc.go14
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go62
-rw-r--r--plugins/jobs/brokers/ephemeral/config.go1
-rw-r--r--plugins/jobs/brokers/ephemeral/entry.go21
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go28
-rw-r--r--plugins/jobs/brokers/ephemeral/queue.go7
-rw-r--r--plugins/jobs/config.go71
-rw-r--r--plugins/jobs/dispatcher/dispatcher.go49
-rw-r--r--plugins/jobs/dispatcher/dispatcher_test.go55
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio1
-rw-r--r--plugins/jobs/interface.go26
-rw-r--r--plugins/jobs/pipeline/pipeline.go172
-rw-r--r--plugins/jobs/pipeline/pipeline_test.go90
-rw-r--r--plugins/jobs/plugin.go164
-rw-r--r--plugins/jobs/pq_plugin/plugin.go34
-rw-r--r--plugins/jobs/rpc.go20
-rw-r--r--plugins/jobs/structs/job.go35
-rw-r--r--plugins/jobs/structs/job_options.go70
-rw-r--r--plugins/jobs/structs/job_options_test.go110
-rw-r--r--plugins/jobs/structs/job_test.go19
-rw-r--r--plugins/kv/drivers/boltdb/driver.go2
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go9
-rw-r--r--plugins/kv/drivers/memcached/driver.go2
-rw-r--r--plugins/kv/drivers/memcached/plugin.go9
-rw-r--r--plugins/kv/plugin.go12
-rw-r--r--plugins/kv/rpc.go3
-rw-r--r--plugins/memory/kv.go2
-rw-r--r--plugins/memory/plugin.go5
-rw-r--r--plugins/memory/pubsub.go2
-rw-r--r--plugins/redis/channel.go2
-rw-r--r--plugins/redis/kv.go2
-rw-r--r--plugins/redis/plugin.go4
-rw-r--r--plugins/redis/pubsub.go2
-rw-r--r--plugins/server/interface.go2
-rw-r--r--plugins/server/plugin.go46
-rw-r--r--plugins/websockets/executor/executor.go2
-rw-r--r--plugins/websockets/plugin.go6
-rw-r--r--plugins/websockets/pool/workers_pool.go2
-rw-r--r--proto/jobs/v1beta/jobs.proto22
-rw-r--r--proto/kv/v1beta/kv.pb.go5
-rw-r--r--proto/websockets/v1beta/websockets.pb.go5
-rw-r--r--tests/composer.json2
-rw-r--r--tests/docker-compose-jobs.yml22
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go21
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml2
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-global.yaml9
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml9
-rw-r--r--tests/plugins/broadcast/plugins/plugin1.go29
-rw-r--r--tests/plugins/broadcast/plugins/plugin2.go27
-rw-r--r--tests/plugins/broadcast/plugins/plugin3.go29
-rw-r--r--tests/plugins/broadcast/plugins/plugin4.go30
-rw-r--r--tests/plugins/broadcast/plugins/plugin5.go30
-rw-r--r--tests/plugins/broadcast/plugins/plugin6.go30
-rw-r--r--tests/plugins/headers/configs/.rr-cors-headers.yaml4
-rw-r--r--tests/plugins/http/configs/.rr-env.yaml6
-rw-r--r--tests/plugins/http/handler_test.go54
-rw-r--r--tests/plugins/http/uploads_test.go8
-rw-r--r--tests/plugins/informer/test_plugin.go2
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml57
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go92
-rw-r--r--tests/plugins/resetter/test_plugin.go2
-rw-r--r--tests/plugins/server/plugin_pipes.go2
-rw-r--r--tests/psr-worker-bench.php2
-rw-r--r--tests/psr-worker.php2
-rw-r--r--tests/worker-cors.php15
87 files changed, 1664 insertions, 221 deletions
diff --git a/.dockerignore b/.dockerignore
index bfa82a3d..b817b3c8 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -7,4 +7,4 @@
/tests
/bin
composer.json
-vendor_php \ No newline at end of file
+vendor_php
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
index ae0b283a..49aeb3c8 100755
--- a/CODE_OF_CONDUCT.md
+++ b/CODE_OF_CONDUCT.md
@@ -43,4 +43,4 @@ Project maintainers who do not follow or enforce the Code of Conduct in good fai
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version]
[homepage]: http://contributor-covenant.org
-[version]: http://contributor-covenant.org/version/1/4/
+[version]: https://www.contributor-covenant.org/version/2/0/code_of_conduct/
diff --git a/common/doc.go b/common/doc.go
new file mode 100644
index 00000000..adc03351
--- /dev/null
+++ b/common/doc.go
@@ -0,0 +1,9 @@
+/*
+Package common used to collect common interfaces/structures which might be implemented (or imported) by a different plugins.
+For example, 'pubsub' interface might be implemented by memory, redis, websockets and many other plugins.
+
+Folders:
+- kv - contains KV interfaces and structures
+- pubsub - contains pub-sub interfaces and structures
+*/
+package common
diff --git a/plugins/kv/interface.go b/common/kv/interface.go
index 5736a6a7..5736a6a7 100644
--- a/plugins/kv/interface.go
+++ b/common/kv/interface.go
diff --git a/pkg/pubsub/interface.go b/common/pubsub/interface.go
index 06252d70..06252d70 100644
--- a/pkg/pubsub/interface.go
+++ b/common/pubsub/interface.go
diff --git a/pkg/pubsub/psmessage.go b/common/pubsub/psmessage.go
index e33d9284..e33d9284 100644
--- a/pkg/pubsub/psmessage.go
+++ b/common/pubsub/psmessage.go
diff --git a/pkg/events/general.go b/pkg/events/general.go
index a09a8759..5cf13e10 100755
--- a/pkg/events/general.go
+++ b/pkg/events/general.go
@@ -4,6 +4,8 @@ import (
"sync"
)
+const UnknownEventType string = "Unknown event type"
+
// HandlerImpl helps to broadcast events to multiple listeners.
type HandlerImpl struct {
listeners []Listener
diff --git a/pkg/events/interface.go b/pkg/events/interface.go
index ac6c15a4..7d57e4d0 100644
--- a/pkg/events/interface.go
+++ b/pkg/events/interface.go
@@ -2,7 +2,7 @@ package events
// Handler interface
type Handler interface {
- // Return number of active listeners
+ // NumListeners return number of active listeners
NumListeners() int
// AddListener adds lister to the publisher
AddListener(listener Listener)
@@ -10,5 +10,5 @@ type Handler interface {
Push(e interface{})
}
-// Event listener listens for the events produced by worker, worker pool or other service.
+// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service.
type Listener func(event interface{})
diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go
new file mode 100644
index 00000000..ed07c7da
--- /dev/null
+++ b/pkg/events/jobs_events.go
@@ -0,0 +1,84 @@
+package events
+
+import (
+ "time"
+)
+
+const (
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK = iota + 12000
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventPipeConsume when pipeline pipelines has been requested.
+ EventPipeConsume
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStop when pipeline has begun stopping.
+ EventPipeStop
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventBrokerReady thrown when broken is ready to accept/serve tasks.
+ EventBrokerReady
+)
+
+type J int64
+
+func (ev J) String() string {
+ switch ev {
+ case EventPushOK:
+ return "EventPushOK"
+ case EventPushError:
+ return "EventPushError"
+ case EventJobStart:
+ return "EventJobStart"
+ case EventJobOK:
+ return "EventJobOK"
+ case EventJobError:
+ return "EventJobError"
+ case EventPipeConsume:
+ return "EventPipeConsume"
+ case EventPipeActive:
+ return "EventPipeActive"
+ case EventPipeStop:
+ return "EventPipeStop"
+ case EventPipeStopped:
+ return "EventPipeStopped"
+ case EventPipeError:
+ return "EventPipeError"
+ case EventBrokerReady:
+ return "EventBrokerReady"
+ }
+ return UnknownEventType
+}
+
+// JobEvent represent job event.
+type JobEvent struct {
+ Event J
+ // String is job id.
+ ID string
+
+ // Job is failed job.
+ Job interface{} // this is *jobs.Job, but interface used to avoid package import
+
+ // event timings
+ Start time.Time
+ Elapsed time.Duration
+}
diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go
index e7b451e0..4d4cae5d 100644
--- a/pkg/events/pool_events.go
+++ b/pkg/events/pool_events.go
@@ -57,7 +57,7 @@ func (ev P) String() string {
case EventPoolRestart:
return "EventPoolRestart"
}
- return "Unknown event type"
+ return UnknownEventType
}
// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go
index 11bd6ab7..39c38e57 100644
--- a/pkg/events/worker_events.go
+++ b/pkg/events/worker_events.go
@@ -20,7 +20,7 @@ func (ev W) String() string {
case EventWorkerStderr:
return "EventWorkerStderr"
}
- return "Unknown event type"
+ return UnknownEventType
}
// WorkerEvent wraps worker events.
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index 2a3dabe4..3a058956 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -5,7 +5,7 @@ import (
"time"
)
-// Configures the pool behavior.
+// Config .. Pool config Configures the pool behavior.
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index e568661f..1c149c51 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -26,7 +26,7 @@ type Command func() *exec.Cmd
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
- cfg Config
+ cfg *Config
// worker command creator
cmd Command
@@ -51,7 +51,7 @@ type StaticPool struct {
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) {
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
const op = errors.Op("static_pool_initialize")
if factory == nil {
return nil, errors.E(op, errors.Str("no factory initialized"))
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 6f875072..f264c6dc 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -20,7 +20,7 @@ import (
"github.com/stretchr/testify/assert"
)
-var cfg = Config{
+var cfg = &Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -58,7 +58,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
@@ -214,7 +214,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
}
- var cfg2 = Config{
+ var cfg2 = &Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -264,7 +264,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
DestroyTimeout: time.Second * 2,
@@ -283,7 +283,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
@@ -320,7 +320,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
Debug: true,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -360,7 +360,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -400,7 +400,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -422,7 +422,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -452,7 +452,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
@@ -476,7 +476,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -506,7 +506,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
// sleep for the 3 seconds
func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
Debug: false,
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -539,7 +539,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -556,7 +556,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -595,7 +595,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
@@ -626,7 +626,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 513d369f..c33aa6fb 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/assert"
)
-var cfgSupervised = Config{
+var cfgSupervised = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -82,7 +82,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -123,7 +123,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
}
func TestSupervisedPool_Idle(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -171,7 +171,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
}
func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -213,7 +213,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -255,7 +255,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
}
func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
new file mode 100644
index 00000000..c660ddb6
--- /dev/null
+++ b/pkg/priority_queue/binary_heap.go
@@ -0,0 +1,12 @@
+/*
+binary heap (min-heap) algorithm used as a core for the priority queue
+*/
+
+package priorityqueue
+
+type BinHeap struct {
+}
+
+func NewBinHeap() *BinHeap {
+ return &BinHeap{}
+}
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
new file mode 100644
index 00000000..00998d78
--- /dev/null
+++ b/pkg/priority_queue/interface.go
@@ -0,0 +1,6 @@
+package priorityqueue
+
+type Queue interface {
+ Push(item interface{})
+ Pop() interface{}
+}
diff --git a/pkg/priority_queue/queue.go b/pkg/priority_queue/queue.go
new file mode 100644
index 00000000..c5a8a1f6
--- /dev/null
+++ b/pkg/priority_queue/queue.go
@@ -0,0 +1,19 @@
+package priorityqueue
+
+import "fmt"
+
+type QueueImpl struct {
+}
+
+func NewPriorityQueue() *QueueImpl {
+ return &QueueImpl{}
+}
+
+// Push the task
+func (q *QueueImpl) Push(item interface{}) {
+ fmt.Println(item)
+}
+
+func (q *QueueImpl) Pop() interface{} {
+ return nil
+}
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
index 46709d71..eda3572f 100644
--- a/plugins/broadcast/interface.go
+++ b/plugins/broadcast/interface.go
@@ -1,6 +1,6 @@
package broadcast
-import "github.com/spiral/roadrunner/v2/pkg/pubsub"
+import "github.com/spiral/roadrunner/v2/common/pubsub"
type Broadcaster interface {
GetDriver(key string) (pubsub.SubReader, error)
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 6ddef806..889dc2fa 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -7,7 +7,7 @@ import (
"github.com/google/uuid"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
index 2ee211f8..475076a0 100644
--- a/plugins/broadcast/rpc.go
+++ b/plugins/broadcast/rpc.go
@@ -2,7 +2,7 @@ package broadcast
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
)
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index bec01ac3..fb174792 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -143,7 +143,7 @@ func (p *Plugin) Serve() chan error {
func (p *Plugin) serve(errCh chan error) {
var err error
const op = errors.Op("http_plugin_serve")
- p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
@@ -323,7 +323,7 @@ func (p *Plugin) Reset() error {
p.pool = nil
var err error
- p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 3925ef64..f096a0af 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -38,3 +38,17 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error {
return nil
}
+
+// sort.Sort
+
+func (w *WorkerList) Len() int {
+ return len(w.Workers)
+}
+
+func (w *WorkerList) Less(i, j int) bool {
+ return w.Workers[i].Pid < w.Workers[j].Pid
+}
+
+func (w *WorkerList) Swap(i, j int) {
+ w.Workers[i], w.Workers[j] = w.Workers[j], w.Workers[i]
+}
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
new file mode 100644
index 00000000..95f476a6
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -0,0 +1,62 @@
+package ephemeral
+
+import (
+ "github.com/google/uuid"
+ "github.com/spiral/errors"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+)
+
+type JobBroker struct {
+ jobs chan *entry
+ queues map[*pipeline.Pipeline]*queue
+ pq priorityqueue.Queue
+}
+
+func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
+ jb := &JobBroker{
+ jobs: make(chan *entry, 10),
+ pq: q,
+ }
+
+ go jb.serve()
+
+ return jb, nil
+}
+
+func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) {
+ id := uuid.NewString()
+
+ j.jobs <- &entry{
+ id: id,
+ }
+
+ return id, nil
+}
+
+func (j *JobBroker) Stat() {
+ panic("implement me")
+}
+
+func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) {
+ panic("implement me")
+}
+
+func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("ephemeral_register")
+ if _, ok := j.queues[pipeline]; !ok {
+ return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name()))
+ }
+
+ j.queues[pipeline] = newQueue()
+
+ return nil
+}
+
+func (j *JobBroker) serve() {
+ for item := range j.jobs {
+ // item should satisfy
+ j.pq.Push(item)
+ }
+}
diff --git a/plugins/jobs/brokers/ephemeral/config.go b/plugins/jobs/brokers/ephemeral/config.go
new file mode 100644
index 00000000..847b63ea
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/config.go
@@ -0,0 +1 @@
+package ephemeral
diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go
new file mode 100644
index 00000000..bf8796d5
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/entry.go
@@ -0,0 +1,21 @@
+package ephemeral
+
+type entry struct {
+ id string
+}
+
+func (e *entry) ID() string {
+ return e.id
+}
+
+func (e *entry) Ask() {
+ // no-op
+}
+
+func (e *entry) Nack() {
+ // no-op
+}
+
+func (e *entry) Payload() []byte {
+ panic("implement me")
+}
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
new file mode 100644
index 00000000..84cc871b
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -0,0 +1,28 @@
+package ephemeral
+
+import (
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "ephemeral"
+)
+
+type Plugin struct {
+ log logger.Logger
+}
+
+func (p *Plugin) Init(log logger.Logger) error {
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewJobBroker(q)
+}
diff --git a/plugins/jobs/brokers/ephemeral/queue.go b/plugins/jobs/brokers/ephemeral/queue.go
new file mode 100644
index 00000000..1c6d865b
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/queue.go
@@ -0,0 +1,7 @@
+package ephemeral
+
+type queue struct{}
+
+func newQueue() *queue {
+ return &queue{}
+}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
new file mode 100644
index 00000000..bb042ec9
--- /dev/null
+++ b/plugins/jobs/config.go
@@ -0,0 +1,71 @@
+package jobs
+
+import (
+ "github.com/spiral/errors"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/dispatcher"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+)
+
+// Config defines settings for job broker, workers and job-pipeline mapping.
+type Config struct {
+ // Workers configures roadrunner server and worker busy.
+ // Workers *roadrunner.ServerConfig
+ poolCfg *poolImpl.Config
+
+ // Dispatch defines where and how to match jobs.
+ Dispatch map[string]*structs.Options
+
+ // Pipelines defines mapping between PHP job pipeline and associated job broker.
+ Pipelines map[string]*pipeline.Pipeline
+
+ // Consuming specifies names of pipelines to be consumed on service start.
+ Consume []string
+
+ // parent config for broken options.
+ pipelines pipeline.Pipelines
+ route dispatcher.Dispatcher
+}
+
+func (c *Config) InitDefaults() error {
+ const op = errors.Op("config_init_defaults")
+ var err error
+ c.pipelines, err = pipeline.InitPipelines(c.Pipelines)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ if c.poolCfg == nil {
+ c.poolCfg = &poolImpl.Config{}
+ }
+
+ c.poolCfg.InitDefaults()
+
+ return nil
+}
+
+// MatchPipeline locates the pipeline associated with the job.
+func (c *Config) MatchPipeline(job *structs.Job) (*pipeline.Pipeline, *structs.Options, error) {
+ const op = errors.Op("config_match_pipeline")
+ opt := c.route.Match(job)
+
+ pipe := ""
+ if job.Options != nil {
+ pipe = job.Options.Pipeline
+ }
+
+ if pipe == "" && opt != nil {
+ pipe = opt.Pipeline
+ }
+
+ if pipe == "" {
+ return nil, nil, errors.E(op, errors.Errorf("unable to locate pipeline for `%s`", job.Job))
+ }
+
+ if p := c.pipelines.Get(pipe); p != nil {
+ return p, opt, nil
+ }
+
+ return nil, nil, errors.E(op, errors.Errorf("undefined pipeline `%s`", pipe))
+}
diff --git a/plugins/jobs/dispatcher/dispatcher.go b/plugins/jobs/dispatcher/dispatcher.go
new file mode 100644
index 00000000..e73e7b74
--- /dev/null
+++ b/plugins/jobs/dispatcher/dispatcher.go
@@ -0,0 +1,49 @@
+package dispatcher
+
+import (
+ "strings"
+
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+)
+
+var separators = []string{"/", "-", "\\"}
+
+// Dispatcher provides ability to automatically locate the pipeline for the specific job
+// and update job options (if none set).
+type Dispatcher map[string]*structs.Options
+
+// pre-compile patterns
+func initDispatcher(routes map[string]*structs.Options) Dispatcher {
+ dispatcher := make(Dispatcher)
+ for pattern, opts := range routes {
+ pattern = strings.ToLower(pattern)
+ pattern = strings.Trim(pattern, "-.*")
+
+ for _, s := range separators {
+ pattern = strings.ReplaceAll(pattern, s, ".")
+ }
+
+ dispatcher[pattern] = opts
+ }
+
+ return dispatcher
+}
+
+// Match clarifies target job pipeline and other job options. Can return nil.
+func (dispatcher Dispatcher) Match(job *structs.Job) (found *structs.Options) {
+ var best = 0
+
+ jobName := strings.ToLower(job.Job)
+ for pattern, opts := range dispatcher {
+ if strings.HasPrefix(jobName, pattern) && len(pattern) > best {
+ found = opts
+ best = len(pattern)
+ }
+ }
+
+ if best == 0 {
+ return nil
+ }
+
+ return found
+}
diff --git a/plugins/jobs/dispatcher/dispatcher_test.go b/plugins/jobs/dispatcher/dispatcher_test.go
new file mode 100644
index 00000000..e584bda8
--- /dev/null
+++ b/plugins/jobs/dispatcher/dispatcher_test.go
@@ -0,0 +1,55 @@
+package dispatcher
+
+import (
+ "testing"
+
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_Map_All(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{"default": {Pipeline: "default"}})
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "default"}).Pipeline)
+}
+
+func Test_Map_Miss(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{"some.*": {Pipeline: "default"}})
+
+ assert.Nil(t, m.Match(&structs.Job{Job: "miss"}))
+}
+
+func Test_Map_Best(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestUpper(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{
+ "some.*": {Pipeline: "default"},
+ "some.Other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.OTHER"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "Some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestReversed(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
+}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
new file mode 100644
index 00000000..be941255
--- /dev/null
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-06-29T17:21:10.029Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.106 Electron/13.1.4 Safari/537.36" etag="Bl_lIai7C62ty2EayK1b" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7V1bc9o4FP41zKSdSca2fH3MtU03aWhots2+GSzAjbGobZLQX7+SLINtCXDAMtDA7jQg32R9536OpBY4H71+itzx8BZ5MGhpivfaAhctTVMdw8Z/SMuUtajASlsGke+xtnlDx/8DWaPCWie+B+PCiQlCQeKPi409FIawlxTa3ChCL8XT+igoPnXsDiDX0Om5Ad/6w/eSYdY7/Jkf+Qz9wTDhDo3c7HzWEA9dD73kmsBlC5xHCCXpt9HrOQzIAGZDk153teDorG8RDJMqF3Tasfl4fjt1vyB9qA6+fr18io511rlnN5iwl76/u/uOW77cnXXwn/bNw6frr+wNkmk2MhGahB4kd1Za4Oxl6CewM3Z75OgLJgbcNkxGAf6l4q99FCZX7sgPCB10rvCdblGI8AE38AchbovSETwL3C4M2ij2Ex+R9gD2SfMzjBIfo3JTOpygce7oKbtZFyUJGpHH+kFwjgIU0S6Dfh+avR5uj5MIPcHcEc9yugp5D35I2SiTZ8DXXBMb4k8QjWASTfEp7KgONO3ESK/KyN42WcPLnIg0U0/bhnny0R1GvIxwB7P7z4HFXxi2Ypx/huHrZxVf9/wnCa1P3+B/Su9Y0zmcN8S0OLieC+2+cHDNng27fUYFuXaFfpZRRx1oZJKEQWEbJo+ErQiQyLi2fiS4cYceFjrsJ4qSIRqg0A0u561nRWTm59wgwgEUj18wSaZMgrqTBBXRwiMYTX+S6zFtsp+P+WMXr+zm6a8p+/V2bGI0iXpw2fuzAUjcaACTJSeyG5LRWQp1BAM38Z+LIlsEG7u0jXzc5zmJWEVutVSzeIu0o+yqvHzlblSkNQuUbpQODXej0yhyp7nTxuSEmKOz2RuvT3pgK6T36ic5ysO/HjNSw9/ndEd+TPNEuBvkqlakVr0pai0SmaFUo9a6SCiTwDlzoY3vpBwTU+Gh8xnfNP2DOzQNe/XqGAPani7SMbbWBaa5LR2jlCAxeR2j6gIdo9uSVIyzVT7Pc/mM59/G50rzfK5X1UpKM4wOylSlNczoB2UhkYjMhrSF6ZzYjmkYqoVNYWA5BZICZrMkxbsgt27SGxLNcd2+vLn+elmruoAqVhiWSF04pgXcramLkkuii9SFJlAXAEhSF9Y2OV3N8/mJtTf6wqjI6rVz+kZQ23sn1NUCznPYdw/p2v3VjZA2OGHb0syARNPGBQIwf09IAPJshF/PD1vgFB9Vxq/4XzpoStp+TENt5JieO4YlXnLMYnjkWA+PLYyKhz3YQ5GbBuzIOYSWosAP4fzR+NuA/aUd7GYNX1AXD8tpJ4kmvWQSwewEPCDd8kW4bVxuG0bllrXePY1CkoM2OVjuNu4lveKC3jvyw8HsRWbPb7vTALneyvPuxmSk4tl5s9+ilyxxcjkOmkZHsxArC6UifFY/oGFooi4ZxzDWVbVlHLRY0/YN8p9I05r0w2va9FOPQjX1okI1dYFCNQUK1ZEV4jP/At7LKG/n2a/tj2H6Tit46wLL3enKs06TBI7GyZwFqTG8+CVlvNI9ocJid3O96HLD2lS/vvsjiCbJOp06yCp61LbWlFWqKktYqSqHztbM/8rWv7ZtTz+Do/m48GZgC9K9mKPhCfXEj36h7odaffC+3YPinGvXNnRD4bmtER9ctUvBNVDVB5fGhRoHTASx0g3zRmAJGKLji6OfCVBmDPC58ZHveSkDw9j/43bprchos6QTvq9x1jIuyL0wz8ZM1nIAhiiEkjECilXESLN4jFRREt2ShREfwBpj46OlkQQIepcg6WYRJF2xeZAyIJsBiXd8cxbiMbZI3BGRWcwuwZbPGI8mswaNC59g0ifSzTprWRfvDU5LK8KZxRwLhStak3BuNTi5p2mILL2wOmRl122drJOH0KyGU1u8fz6C+IlktOeiYpFAfwcpCbB9c2i7Kew9ZXu7IttrDeWwl7O92nD6MRueHNt/gqS3Z4QXSeisH6ERGb8hpO8+8GOs0SGJ2nbpKSQo9Oy7TE6cpNcd1ewx9fvagipVs2sauyIiNK2iiDBliQiNd2Xbk3h4NDfJU29WwVyOH5/DuUa09qTsVa2Kli4NLb5wbAEcf7mFbdtFZGzeYTKMBg1sjQ88cJCgSUKMovPZLAtB3Ab/d0UeezaIXM+H82PZoBYZZ3a6EAHPjYczjlxU/T+r818eTJ5RTWlCQjmOnNkW5JFuPE5ftO+/kn6khASjy2eY0hOlnaE7JheMXgdk+suJ+xLrJ5OYPksqCQG1QEK2xTO3ZfMkZMkqN9QAL4k/t3HDj7v7fy7vOXLKBm4coR6M49VCt+v2ngZUTN+llLhAde5msNEwi8LYdvQTfj6IWBzXEAf+Y/a9F2Bf6U+j08dPk9e25Xw+5hFrLkMpSD2mIZmA/zUPx5TiNqVfC3JQszQlFl3xZESNvKPUlvuwpWymcKi49NuycaRncaPYSdzk6GMW6iJ2z9FHbADFJ+RAag/BKEKF964naSjrjag193EevCOv8PFLatMdZenbPXwvRosFsPak6/fMJyoSWgtcsf8xMFi6Blh3ElcpRsEzjBe83QKdEFG9mxf+fBZYpC2kCnDsxxaNNl3h46LAEgnwOgxqoQQ/BEiWBkiEYyao5BOeV3tUdCOkBQkNOsNmIzdWZnLV1IoGj6Nn6dZCVkhSPHEZorkx/H1CtCMxS4YudmuCXR7PkjfvAEcwnrLis/H9RVf59fTrh/X0reP+PP9hfn/IKtz2SPo0Wkd87zzcxMZzp2MH/z3+1GL3h38jmkkoHFs5MwkXzzIVx25VxypFKgzbytMNf4WZacX1rzA1q0Sbab/XDQQvAyInDL6efu9sKADWSvQIKWyRj1qHKHG08oAroqoKSZFBIblvdUrCeuVnTYoS4ZhtTZRshPT+mayNKo2NkAa7oTS0snwBtrFUBWAJtPEVhlqv0hADoewO9ap7TL3LNPLOUa/tLDdgeFp8+xV1mzxiILZa5J2b17nv5Cs8T9sR8jXKxKivIEZl8yss0AT5chZ7O/JR5CdkfL49XD7UO3PcdBWl3xdZ78BSlPNz3kjv04/sQAAvO0TWu6y8vhCZHSrPrChWGg1DLhMXe2a9axwP3l7e3t0/1sp5e+M3W+q2/WZjdzhvuTclgYPAtjioplCarq4IjOllSa834ePwVQGnt9/a75PBdWfbDM7PNpKwMmupnNFWFAuI0Dk1FEUXVNA0Y/gAu4rhI5o3Kw0dwYS969EYi1SCRm8IR5BD5u+ubFRVUExTqUDls+TSJoOJQeIlGqtPIG8boWffo1Xd7wsoUymtTzyrUMsDlc0BqRso4TLUvIOnnuDfc7Bceg8m+XLILar88vznclM8dsOsTaN3T296HfqJjwH+Q2f9kAm3EA8mGTCfMjSdAJA9knQlJA0xpH9YYWiLlZmnUwfa33I1L/nH5poFHVzRZ0D63KbLL/izLnvF4cj3gsxsJJAmZIW1JziNU+DoYZ/0OHRHMKZ3m13TQ2HfH+AvR+7o9zgtr+pCN4wTN3hi1VZJ78NsFChg5B7HFK1ZkV2+bu8tA1GZEUsrgzfNhiUteEU/NbEnF4V0FIdjT8cRyFFdl8SevK4rmiKdy/t/L/dS8UkEkluYSAVAIGct0KSc3XIcePN4zTxC88Z4zZuRznuhwsEUeKHC8xiYzcdxlvU6x8uMezfyI1bt8DBj9Ppq9utg0SxdM3f7BFs1iEurZLHoDpVW7T2HCtbjXcYTO8KhvOP/HrMdPG8CW6vGm3U4/cskeR6ZO0F8bIhG3UmFeU381N9+TxEB0e+vmqfEy1mJUtNRS8gYBi81RbuqSAOGr+/Fo/J2OVrd0ZhNPsRu4CnZaYrccwzDtIXJWXs9LBfJwXUz4+vLT6tm+ckIKHP3N10iX1WM8lZLRsW1RN4awweKrfDPWl6pJLpGt/USsW8WxxfC4XDcMIV8lKtOdkin3G6DF95ofMjnhcobRNTLDJpjmGViM61SQKC+FTbEr36wXeuzXUUrTIpPNHbKejW2mphecz3RvScCfTdS3UC1OZVsKQWVV+kak0XFFl6ja/qKa+SoVsECqv+e3mzskVU265dsfbVgs6xGwyemXbEgDcgKVQvysm1EEiHYImFroRMmIEurkrzFCMVpLnDmYKcXHkLYFGCT38zU0PggNpC1DqsYYj6E+cP1kwN2BeyA4qhVsLMbhY6Pbf2bYqb4aSrSg9hz8GhylWQb+Qyv3yf9J1eEkKY/T8//yVp6JMOouB5ppgSxIkEsWFlh8dYI+fxlLs066/8h5bl+QMkEnDbPloDMLz2rCpY1kpb0NLSDN1OfIStYe0FsyG5tw6yl/c7rGrqqFa3Au7u7acrwE8SKbWFMBZjAAZ58w48z8x21oulnyMKKD89f/rw8f/hOShOOfp94iKzVWSh3mURRi9b5BPwqnu/TbMCymIsjWTq/SKG0VQrF0PIB/osIMz+teerA6Jmgeag+KSwBqpWrT4wZz+ZZ1BDoVHk47tAc8IbnC9WvU7WqwSEgJzjEr/esG1xUZra1SkMxaMGywKmkaM3K1LajsLda6KIDh0NGF+091Wipy4zS91Ya7JCFrWkVpUH9q79vRgSCgB1X7sKW+wy78VjkNP+F5S8ijgWmYEJFowUwGp9aPoTeeOz00jZ/qiFYMLLRwBvgY6ZtN47TzQ1ohqeVm8xwQJGgaGilaZvAEq7cbKhNArlDOfbl8373QGeCygWiu5VjB3wY/YYszEsjG9l2Fx41e4/JUvZ+3+/hlt8TKJDRkuxfzydr+aZL9b/AOOE17wabZ0hSsIYumKAha1cTcfjzEHQus/eixPbiVPgGAqFqmBo4uyUQBIsE31/f3V9/f5wb0YrEdYMXmcir3OIK1rckXreUijM9dFMWaHy8+h72ICYWj4lwZlSnszFPLuCzn85MFWdGEYlxfLkjy7kfjeAIRdMPbG9ScsANyYD0UEiGjRpkktOjmNfoLWa7x6ev8+GQH327P8EvGifIjooXoJdFvHxEnmzY0MpSKxnpsvnHuQKbXO58vGpey7v1PEr+o5XdJF90Yzfqd/CeP537Utw0j8x8l2peMiYvaZGu6nl94cwZVbGAA3m4ZFuSjmqQTeqrKRj7RFZKVF+80Q8ZEKoQsjGZC/ZMfXOy/ku6ZP6LT5/cpYjftduXF61CWpVNDh8HE7JJyVyup498s2DfNVavlXZMVh07pxzAs7rmaA2yur7dJF0+wFC1gruutUIlTEGtGmKov4x/vQpuzeRLOwwzT1SVrrHZTvBSq7F1Pg5Sfd+kdPsjZf7vNjYmY+a65D3JxLs7pSJMWTomaUt5ZE7jp6M1dq2quxtf3d5O9KPtTgPkerS6KNU43WkCxf0qCVZp+16tcrx7PWgscLwbKCADwOAqPi2FrzJSdZHik1ZBpvM7w9PN5XijdgtTvBfBwSFdAz78FG/bEhq0jc7y1vmAyd88zfuNlUcb2ChmRRul8iyzuid6G+WpDGQp8uJtJNcV6Xy842+eVG0dSC8lPUXlSA8o1Ujvraa35hicGY2blpregms0RbWXXgOw07niGjnmusHX5qX2b86a3tuoQL1zt2zeoVIF5XqyVlMVw8ev0pk5AqXyrLLbk87LovGiEB3T6Zh9mpSIESnYZqnqvVqIVSr6hsOhbwhK8DVBCT5YIuI2Q5/3tYv+1wHu+uB2bL5erGG4BfH+ebaGVZIobjDgDfDtw1gDKtizKS9yNZ8Ys2pN+DVEMP4ZIRJVmCtc7EwOb5EHyRn/Aw==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
new file mode 100644
index 00000000..a0aed50b
--- /dev/null
+++ b/plugins/jobs/interface.go
@@ -0,0 +1,26 @@
+package jobs
+
+import (
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+)
+
+// Consumer todo naming
+type Consumer interface {
+ Push(*pipeline.Pipeline, *structs.Job) (string, error)
+ Stat()
+ Consume(*pipeline.Pipeline)
+ Register(*pipeline.Pipeline) error
+}
+
+type Broker interface {
+ InitJobBroker(queue priorityqueue.Queue) (Consumer, error)
+}
+
+type Item interface {
+ ID() string
+ Ask()
+ Nack()
+ Payload() []byte
+}
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
new file mode 100644
index 00000000..f27f6ede
--- /dev/null
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -0,0 +1,172 @@
+package pipeline
+
+import (
+ "time"
+
+ "github.com/spiral/errors"
+)
+
+// Pipelines is list of Pipeline.
+
+type Pipelines []*Pipeline
+
+func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
+ const op = errors.Op("pipeline_init")
+ out := make(Pipelines, 0)
+
+ for name, pipe := range pipes {
+ if pipe.Broker() == "" {
+ return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker"))
+ }
+
+ p := pipe.With("name", name)
+ out = append(out, &p)
+ }
+
+ return out, nil
+}
+
+// Reverse returns pipelines in reversed order.
+func (ps Pipelines) Reverse() Pipelines {
+ out := make(Pipelines, len(ps))
+
+ for i, p := range ps {
+ out[len(ps)-i-1] = p
+ }
+
+ return out
+}
+
+// Broker return pipelines associated with specific broker.
+func (ps Pipelines) Broker(broker string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, p := range ps {
+ if p.Broker() != broker {
+ continue
+ }
+
+ out = append(out, p)
+ }
+
+ return out
+}
+
+// Names returns only pipelines with specified names.
+func (ps Pipelines) Names(only ...string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, name := range only {
+ for _, p := range ps {
+ if p.Name() == name {
+ out = append(out, p)
+ }
+ }
+ }
+
+ return out
+}
+
+// Get returns pipeline by it'svc name.
+func (ps Pipelines) Get(name string) *Pipeline {
+ // possibly optimize
+ for _, p := range ps {
+ if p.Name() == name {
+ return p
+ }
+ }
+
+ return nil
+}
+
+// Pipeline defines pipeline options.
+type Pipeline map[string]interface{}
+
+// With pipeline value. Immutable.
+func (p Pipeline) With(name string, value interface{}) Pipeline {
+ out := make(map[string]interface{})
+ for k, v := range p {
+ out[k] = v
+ }
+ out[name] = value
+
+ return out
+}
+
+// Name returns pipeline name.
+func (p Pipeline) Name() string {
+ return p.String("name", "")
+}
+
+// Broker associated with the pipeline.
+func (p Pipeline) Broker() string {
+ return p.String("broker", "")
+}
+
+// Has checks if value presented in pipeline.
+func (p Pipeline) Has(name string) bool {
+ if _, ok := p[name]; ok {
+ return true
+ }
+
+ return false
+}
+
+// Map must return nested map value or empty config.
+func (p Pipeline) Map(name string) Pipeline {
+ out := make(map[string]interface{})
+
+ if value, ok := p[name]; ok {
+ if m, ok := value.(map[string]interface{}); ok {
+ for k, v := range m {
+ out[k] = v
+ }
+ }
+ }
+
+ return out
+}
+
+// Bool must return option value as string or return default value.
+func (p Pipeline) Bool(name string, d bool) bool {
+ if value, ok := p[name]; ok {
+ if b, ok := value.(bool); ok {
+ return b
+ }
+ }
+
+ return d
+}
+
+// String must return option value as string or return default value.
+func (p Pipeline) String(name string, d string) string {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(string); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Integer must return option value as string or return default value.
+func (p Pipeline) Integer(name string, d int) int {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Duration must return option value as time.Duration (seconds) or return default value.
+func (p Pipeline) Duration(name string, d time.Duration) time.Duration {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return time.Second * time.Duration(str)
+ }
+ }
+
+ return d
+}
diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go
new file mode 100644
index 00000000..f03dcbb8
--- /dev/null
+++ b/plugins/jobs/pipeline/pipeline_test.go
@@ -0,0 +1,90 @@
+package pipeline
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestPipeline_Map(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0))
+ assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0))
+}
+
+func TestPipeline_MapString(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}}
+
+ assert.Equal(t, "default", pipe.Map("options").String("alias", ""))
+ assert.Equal(t, "", pipe.Map("other").String("alias", ""))
+}
+
+func TestPipeline_Bool(t *testing.T) {
+ pipe := Pipeline{"value": true}
+
+ assert.Equal(t, true, pipe.Bool("value", false))
+ assert.Equal(t, true, pipe.Bool("other", true))
+}
+
+func TestPipeline_String(t *testing.T) {
+ pipe := Pipeline{"value": "value"}
+
+ assert.Equal(t, "value", pipe.String("value", ""))
+ assert.Equal(t, "value", pipe.String("other", "value"))
+}
+
+func TestPipeline_Integer(t *testing.T) {
+ pipe := Pipeline{"value": 1}
+
+ assert.Equal(t, 1, pipe.Integer("value", 0))
+ assert.Equal(t, 1, pipe.Integer("other", 1))
+}
+
+func TestPipeline_Duration(t *testing.T) {
+ pipe := Pipeline{"value": 1}
+
+ assert.Equal(t, time.Second, pipe.Duration("value", 0))
+ assert.Equal(t, time.Second, pipe.Duration("other", time.Second))
+}
+
+func TestPipeline_Has(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, true, pipe.Has("options"))
+ assert.Equal(t, false, pipe.Has("other"))
+}
+
+func TestPipeline_FilterBroker(t *testing.T) {
+ pipes := Pipelines{
+ &Pipeline{"name": "first", "broker": "a"},
+ &Pipeline{"name": "second", "broker": "a"},
+ &Pipeline{"name": "third", "broker": "b"},
+ &Pipeline{"name": "forth", "broker": "b"},
+ }
+
+ filtered := pipes.Names("first", "third")
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "a", filtered[0].Broker())
+ assert.Equal(t, "b", filtered[1].Broker())
+
+ filtered = pipes.Names("first", "third").Reverse()
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "a", filtered[1].Broker())
+ assert.Equal(t, "b", filtered[0].Broker())
+
+ filtered = pipes.Broker("a")
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "first", filtered[0].Name())
+ assert.Equal(t, "second", filtered[1].Name())
+
+ filtered = pipes.Broker("a").Reverse()
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "first", filtered[1].Name())
+ assert.Equal(t, "second", filtered[0].Name())
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
new file mode 100644
index 00000000..ab7222ae
--- /dev/null
+++ b/plugins/jobs/plugin.go
@@ -0,0 +1,164 @@
+package jobs
+
+import (
+ "context"
+ "fmt"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+)
+
+const (
+ // RrJobs env variable
+ RrJobs string = "rr_jobs"
+ PluginName string = "jobs"
+)
+
+type Plugin struct {
+ cfg *Config
+ log logger.Logger
+
+ workersPool pool.Pool
+ server server.Server
+
+ brokers map[string]Broker
+ consumers map[string]Consumer
+
+ events events.Handler
+
+ // priority queue implementation
+ queue priorityqueue.Queue
+}
+
+func testListener(data interface{}) {
+ fmt.Println(data)
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, pq priorityqueue.Queue) error {
+ const op = errors.Op("jobs_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = p.cfg.InitDefaults()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.server = server
+ p.events = events.NewEventsHandler()
+ p.events.AddListener(testListener)
+ p.brokers = make(map[string]Broker)
+ p.consumers = make(map[string]Consumer)
+
+ // initialize priority queue
+ p.queue = pq
+ p.log = log
+
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ for name := range p.brokers {
+ jb, err := p.brokers[name].InitJobBroker(p.queue)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ p.consumers[name] = jb
+ }
+
+ var err error
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // initialize sub-plugins
+ // provide a queue to them
+ // start consume loop
+ // start resp loop
+
+ /*
+ go func() {
+ for {
+ // get data JOB from the queue
+ job := p.queue.Pop()
+
+ // request
+ _ = job
+ p.workersPool.Exec(nil)
+ }
+ }()
+
+ */
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectMQBrokers,
+ }
+}
+
+func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) {
+ p.brokers[name.Name()] = c
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Push(j *structs.Job) (string, error) {
+ pipe, pOpts, err := p.cfg.MatchPipeline(j)
+ if err != nil {
+ panic(err)
+ }
+
+ if pOpts != nil {
+ j.Options.Merge(pOpts)
+ }
+
+ broker, ok := p.consumers[pipe.Broker()]
+ if !ok {
+ panic("broker not found")
+ }
+
+ id, err := broker.Push(pipe, j)
+ if err != nil {
+ panic(err)
+ }
+
+ // p.events.Push()
+
+ return id, nil
+}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ log: p.log,
+ p: p,
+ }
+}
diff --git a/plugins/jobs/pq_plugin/plugin.go b/plugins/jobs/pq_plugin/plugin.go
new file mode 100644
index 00000000..7df846ac
--- /dev/null
+++ b/plugins/jobs/pq_plugin/plugin.go
@@ -0,0 +1,34 @@
+package pq_plugin //nolint:stylecheck
+
+import (
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "internal_pq"
+)
+
+type Plugin struct {
+ log logger.Logger
+ pq priorityqueue.Queue
+}
+
+func (p *Plugin) Init(log logger.Logger) error {
+ p.log = log
+ p.pq = priorityqueue.NewPriorityQueue()
+ return nil
+}
+
+func (p *Plugin) Push(item interface{}) {
+ p.pq.Push(item)
+ // no-op
+}
+
+func (p *Plugin) Pop() interface{} {
+ return p.pq.Pop()
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
new file mode 100644
index 00000000..e77cda59
--- /dev/null
+++ b/plugins/jobs/rpc.go
@@ -0,0 +1,20 @@
+package jobs
+
+import (
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type rpc struct {
+ log logger.Logger
+ p *Plugin
+}
+
+func (r *rpc) Push(j *structs.Job, idRet *string) error {
+ id, err := r.p.Push(j)
+ if err != nil {
+ panic(err)
+ }
+ *idRet = id
+ return nil
+}
diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go
new file mode 100644
index 00000000..2e394543
--- /dev/null
+++ b/plugins/jobs/structs/job.go
@@ -0,0 +1,35 @@
+package structs
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+// Job carries information about single job.
+type Job struct {
+ // Job contains name of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Body packs job payload into binary payload.
+func (j *Job) Body() []byte {
+ return utils.AsBytes(j.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+func (j *Job) Context(id string) []byte {
+ ctx, _ := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ }{ID: id, Job: j.Job},
+ )
+
+ return ctx
+}
diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go
new file mode 100644
index 00000000..1507d053
--- /dev/null
+++ b/plugins/jobs/structs/job_options.go
@@ -0,0 +1,70 @@
+package structs
+
+import "time"
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int `json:"delay,omitempty"`
+
+ // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
+ // Minimum valuable value is 2.
+ Attempts int `json:"maxAttempts,omitempty"`
+
+ // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
+ RetryDelay int `json:"retryDelay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int `json:"timeout,omitempty"`
+}
+
+// Merge merges job options.
+func (o *Options) Merge(from *Options) {
+ if o.Pipeline == "" {
+ o.Pipeline = from.Pipeline
+ }
+
+ if o.Attempts == 0 {
+ o.Attempts = from.Attempts
+ }
+
+ if o.Timeout == 0 {
+ o.Timeout = from.Timeout
+ }
+
+ if o.RetryDelay == 0 {
+ o.RetryDelay = from.RetryDelay
+ }
+
+ if o.Delay == 0 {
+ o.Delay = from.Delay
+ }
+}
+
+// CanRetry must return true if broker is allowed to re-run the job.
+func (o *Options) CanRetry(attempt int) bool {
+ // Attempts 1 and 0 has identical effect
+ return o.Attempts > (attempt + 1)
+}
+
+// RetryDuration returns retry delay duration in a form of time.Duration.
+func (o *Options) RetryDuration() time.Duration {
+ return time.Second * time.Duration(o.RetryDelay)
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/structs/job_options_test.go
new file mode 100644
index 00000000..18702394
--- /dev/null
+++ b/plugins/jobs/structs/job_options_test.go
@@ -0,0 +1,110 @@
+package structs
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestOptions_CanRetry(t *testing.T) {
+ opts := &Options{Attempts: 0}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_SameValue(t *testing.T) {
+ opts := &Options{Attempts: 1}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_Value(t *testing.T) {
+ opts := &Options{Attempts: 2}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_CanRetry_Value3(t *testing.T) {
+ opts := &Options{Attempts: 3}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.True(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_RetryDuration(t *testing.T) {
+ opts := &Options{RetryDelay: 0}
+ assert.Equal(t, time.Duration(0), opts.RetryDuration())
+}
+
+func TestOptions_RetryDuration2(t *testing.T) {
+ opts := &Options{RetryDelay: 1}
+ assert.Equal(t, time.Second, opts.RetryDuration())
+}
+
+func TestOptions_DelayDuration(t *testing.T) {
+ opts := &Options{Delay: 0}
+ assert.Equal(t, time.Duration(0), opts.DelayDuration())
+}
+
+func TestOptions_DelayDuration2(t *testing.T) {
+ opts := &Options{Delay: 1}
+ assert.Equal(t, time.Second, opts.DelayDuration())
+}
+
+func TestOptions_TimeoutDuration(t *testing.T) {
+ opts := &Options{Timeout: 0}
+ assert.Equal(t, time.Minute*30, opts.TimeoutDuration())
+}
+
+func TestOptions_TimeoutDuration2(t *testing.T) {
+ opts := &Options{Timeout: 1}
+ assert.Equal(t, time.Second, opts.TimeoutDuration())
+}
+
+func TestOptions_Merge(t *testing.T) {
+ opts := &Options{}
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "pipeline", opts.Pipeline)
+ assert.Equal(t, 1, opts.Attempts)
+ assert.Equal(t, 2, opts.Delay)
+ assert.Equal(t, 1, opts.Timeout)
+ assert.Equal(t, 1, opts.RetryDelay)
+}
+
+func TestOptions_MergeKeepOriginal(t *testing.T) {
+ opts := &Options{
+ Pipeline: "default",
+ Delay: 10,
+ Timeout: 10,
+ Attempts: 10,
+ RetryDelay: 10,
+ }
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "default", opts.Pipeline)
+ assert.Equal(t, 10, opts.Attempts)
+ assert.Equal(t, 10, opts.Delay)
+ assert.Equal(t, 10, opts.Timeout)
+ assert.Equal(t, 10, opts.RetryDelay)
+}
diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go
new file mode 100644
index 00000000..e7240c6b
--- /dev/null
+++ b/plugins/jobs/structs/job_test.go
@@ -0,0 +1,19 @@
+package structs
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestJob_Body(t *testing.T) {
+ j := &Job{Payload: "hello"}
+
+ assert.Equal(t, []byte("hello"), j.Body())
+}
+
+func TestJob_Context(t *testing.T) {
+ j := &Job{Job: "job"}
+
+ assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id"))
+}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 47d37cc2..0f737fbd 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -9,8 +9,8 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
index 6ae1a1f6..c839130f 100644
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -2,12 +2,15 @@ package boltdb
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-const PluginName = "boltdb"
+const (
+ PluginName string = "boltdb"
+ RootPluginName string = "kv"
+)
// Plugin BoltDB K/V storage.
type Plugin struct {
@@ -21,7 +24,7 @@ type Plugin struct {
}
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(kv.PluginName) {
+ if !cfg.Has(RootPluginName) {
return errors.E(errors.Disabled)
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 14e7c078..42e342ac 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,8 +6,8 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
+ kv "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index 22ea5cca..59a2b7cb 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -2,12 +2,15 @@ package memcached
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-const PluginName = "memcached"
+const (
+ PluginName string = "memcached"
+ RootPluginName string = "kv"
+)
type Plugin struct {
// config plugin
@@ -17,7 +20,7 @@ type Plugin struct {
}
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(kv.PluginName) {
+ if !cfg.Has(RootPluginName) {
return errors.E(errors.Disabled)
}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index 03dbaed6..e9ea25df 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -5,10 +5,12 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
+// PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync.
const PluginName string = "kv"
const (
@@ -25,9 +27,9 @@ const (
type Plugin struct {
log logger.Logger
// constructors contains general storage constructors, such as boltdb, memory, memcached, redis.
- constructors map[string]Constructor
+ constructors map[string]kv.Constructor
// storages contains user-defined storages, such as boltdb-north, memcached-us and so on.
- storages map[string]Storage
+ storages map[string]kv.Storage
// KV configuration
cfg Config
cfgPlugin config.Configurer
@@ -43,8 +45,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if err != nil {
return errors.E(op, err)
}
- p.constructors = make(map[string]Constructor, 5)
- p.storages = make(map[string]Storage, 5)
+ p.constructors = make(map[string]kv.Constructor, 5)
+ p.storages = make(map[string]kv.Storage, 5)
p.log = log
p.cfgPlugin = cfg
return nil
@@ -203,7 +205,7 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) {
+func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor kv.Constructor) {
// save the storage constructor
p.constructors[name.Name()] = constructor
}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 3f7ba97c..ad4aefa9 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,6 +2,7 @@ package kv
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
@@ -9,7 +10,7 @@ import (
// Wrapper for the plugin
type rpc struct {
// all available storages
- storages map[string]Storage
+ storages map[string]kv.Storage
// svc is a plugin implementing Storage interface
srv *Plugin
// Logger
diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go
index c13c2314..3cec1f97 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/kv.go
@@ -6,8 +6,8 @@ import (
"time"
"github.com/spiral/errors"
+ kv "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 70badf15..7d418a70 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,9 +2,9 @@ package memory
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -21,7 +21,6 @@ type Plugin struct {
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
- p.log = log
p.cfgPlugin = cfg
p.stop = make(chan struct{}, 1)
return nil
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index d027a8a5..3c909900 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -3,8 +3,8 @@ package memory
import (
"sync"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/bst"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go
index 5817853c..0cd62d19 100644
--- a/plugins/redis/channel.go
+++ b/plugins/redis/channel.go
@@ -6,7 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go
index 2e4b9bfd..5bf03af1 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv.go
@@ -7,8 +7,8 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 9d98790b..3c62a63f 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -5,9 +5,9 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index 4e41acb5..8bd78514 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -6,7 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
index 0424d52d..b0f84a7f 100644
--- a/plugins/server/interface.go
+++ b/plugins/server/interface.go
@@ -19,5 +19,5 @@ type Server interface {
// NewWorker return a new worker with provided and attached by the user listeners and environment variables
NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error)
// NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration
- NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
+ NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 00639f43..038d83d4 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -21,14 +21,14 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-// PluginName for the server
-const PluginName = "server"
-
-// RrRelay env variable key (internal)
-const RrRelay = "RR_RELAY"
-
-// RrRPC env variable key (internal) if the RPC presents
-const RrRPC = "RR_RPC"
+const (
+ // PluginName for the server
+ PluginName = "server"
+ // RrRelay env variable key (internal)
+ RrRelay = "RR_RELAY"
+ // RrRPC env variable key (internal) if the RPC presents
+ RrRPC = "RR_RPC"
+)
// Plugin manages worker
type Plugin struct {
@@ -124,7 +124,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event
const op = errors.Op("server_plugin_new_worker")
list := make([]events.Listener, 0, len(listeners))
- list = append(list, server.collectWorkerLogs)
+ list = append(list, server.collectWorkerEvents)
spawnCmd, err := server.CmdFactory(env)
if err != nil {
@@ -140,15 +140,16 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event
}
// NewWorkerPool issues new worker pool.
-func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) {
+func (server *Plugin) NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) {
const op = errors.Op("server_plugin_new_worker_pool")
+
spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, errors.E(op, err)
}
- list := make([]events.Listener, 0, 1)
- list = append(list, server.collectEvents)
+ list := make([]events.Listener, 0, 2)
+ list = append(list, server.collectPoolEvents, server.collectWorkerEvents)
if len(listeners) != 0 {
list = append(list, listeners...)
}
@@ -209,7 +210,7 @@ func (server *Plugin) setEnv(e Env) []string {
return env
}
-func (server *Plugin) collectEvents(event interface{}) {
+func (server *Plugin) collectPoolEvents(event interface{}) {
if we, ok := event.(events.PoolEvent); ok {
switch we.Event {
case events.EventMaxMemory:
@@ -238,7 +239,9 @@ func (server *Plugin) collectEvents(event interface{}) {
server.log.Warn("requested pool restart")
}
}
+}
+func (server *Plugin) collectWorkerEvents(event interface{}) {
if we, ok := event.(events.WorkerEvent); ok {
switch we.Event {
case events.EventWorkerError:
@@ -264,16 +267,13 @@ func (server *Plugin) collectEvents(event interface{}) {
}
}
-func (server *Plugin) collectWorkerLogs(event interface{}) {
- if we, ok := event.(events.WorkerEvent); ok {
- switch we.Event {
- case events.EventWorkerError:
- server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t"))
- case events.EventWorkerLog:
- server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
- // stderr event is INFO level
- case events.EventWorkerStderr:
- server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
+func (server *Plugin) collectJobsEvents(event interface{}) { //nolint:unused
+ if jev, ok := event.(events.JobEvent); ok {
+ switch jev.Event {
+ case events.EventJobStart:
+ server.log.Info("Job started", "start", jev.Start, "elapsed", jev.Elapsed)
+ case events.EventJobOK:
+ server.log.Info("Job OK", "start", jev.Start, "elapsed", jev.Elapsed)
}
}
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 664b4dfd..c1f79a78 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -7,7 +7,7 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index ca5f2f59..5925a588 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -10,10 +10,10 @@ import (
"github.com/google/uuid"
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -106,7 +106,7 @@ func (p *Plugin) Serve() chan error {
p.Lock()
defer p.Unlock()
- p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
+ p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
@@ -273,7 +273,7 @@ func (p *Plugin) Reset() error {
p.phpPool = nil
var err error
- p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
+ p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 752ba3ce..758620f6 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,7 +4,7 @@ import (
"sync"
json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/utils"
diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto
new file mode 100644
index 00000000..46434fa8
--- /dev/null
+++ b/proto/jobs/v1beta/jobs.proto
@@ -0,0 +1,22 @@
+syntax = "proto3";
+
+package kv.v1beta;
+option go_package = "./;jobsv1beta";
+
+message Request {
+ // could be an enum in the future
+ string storage = 1;
+ repeated Item items = 2;
+}
+
+message Item {
+ string key = 1;
+ bytes value = 2;
+ // RFC 3339
+ string timeout = 3;
+}
+
+// KV response for the KV RPC methods
+message Response {
+ repeated Item items = 1;
+}
diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go
index 622967b8..75578bff 100644
--- a/proto/kv/v1beta/kv.pb.go
+++ b/proto/kv/v1beta/kv.pb.go
@@ -7,10 +7,11 @@
package kvv1beta
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go
index ad4ebbe7..a2868118 100644
--- a/proto/websockets/v1beta/websockets.pb.go
+++ b/proto/websockets/v1beta/websockets.pb.go
@@ -7,10 +7,11 @@
package websocketsv1beta
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
diff --git a/tests/composer.json b/tests/composer.json
index 50178d1f..fa5925b7 100644
--- a/tests/composer.json
+++ b/tests/composer.json
@@ -2,7 +2,7 @@
"minimum-stability": "beta",
"prefer-stable": true,
"require": {
- "nyholm/psr7": "^1.3",
+ "nyholm/psr7": "^1.4",
"spiral/roadrunner": "^2.0",
"spiral/roadrunner-http": "^2.0",
"temporal/sdk": ">=1.0",
diff --git a/tests/docker-compose-jobs.yml b/tests/docker-compose-jobs.yml
new file mode 100644
index 00000000..7b88c9cf
--- /dev/null
+++ b/tests/docker-compose-jobs.yml
@@ -0,0 +1,22 @@
+version: "3"
+
+services:
+ beanstalk:
+ image: schickling/beanstalkd
+ ports:
+ - "11300:11300"
+
+ sqs:
+ image: vsouza/sqs-local
+ ports:
+ - "9324:9324"
+
+ rabbitmq:
+ image: rabbitmq:3-management
+ environment:
+ RABBITMQ_DEFAULT_USER: guest
+ RABBITMQ_DEFAULT_PASS: guest
+ RABBITMQ_DEFAULT_VHOST: /
+ ports:
+ - "15672:15672"
+ - "5672:5672" \ No newline at end of file
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
index 0ec813f3..d8bedf29 100644
--- a/tests/plugins/broadcast/broadcast_plugin_test.go
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -176,7 +176,7 @@ func TestBroadcastNoConfig(t *testing.T) {
}
func TestBroadcastSameSubscriber(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
cfg := &config.Viper{
@@ -189,11 +189,11 @@ func TestBroadcastSameSubscriber(t *testing.T) {
mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).MinTimes(1)
- mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).AnyTimes()
+ mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3)
- mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2)
mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3)
@@ -279,14 +279,15 @@ func TestBroadcastSameSubscriber(t *testing.T) {
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002"))
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002"))
- time.Sleep(time.Second * 4)
stopCh <- struct{}{}
wg.Wait()
+
+ time.Sleep(time.Second * 5)
}
func TestBroadcastSameSubscriberGlobal(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
cfg := &config.Viper{
@@ -299,11 +300,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).MinTimes(1)
- mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).AnyTimes()
+ mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3)
- mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2)
mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3)
@@ -389,10 +390,10 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003"))
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003"))
- time.Sleep(time.Second * 4)
stopCh <- struct{}{}
wg.Wait()
+ time.Sleep(time.Second * 5)
}
func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) {
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml
index d8daa251..66114d64 100644
--- a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml
@@ -3,8 +3,6 @@ rpc:
server:
command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml
index 2ca97055..ea25988c 100644
--- a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml
@@ -3,8 +3,6 @@ rpc:
server:
command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
relay: "pipes"
relay_timeout: "20s"
@@ -38,9 +36,4 @@ broadcast:
logs:
mode: development
- level: error
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
+ level: info
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml
index 360e05e5..cbe18196 100644
--- a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml
@@ -3,8 +3,6 @@ rpc:
server:
command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
relay: "pipes"
relay_timeout: "20s"
@@ -35,9 +33,4 @@ broadcast:
logs:
mode: development
- level: debug
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
+ level: info
diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go
index d3b16256..01ad1479 100644
--- a/tests/plugins/broadcast/plugins/plugin1.go
+++ b/tests/plugins/broadcast/plugins/plugin1.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,14 @@ type Plugin1 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -39,16 +42,22 @@ func (p *Plugin1) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
+ }
}
}()
@@ -59,6 +68,8 @@ func (p *Plugin1) Stop() error {
_ = p.driver.Unsubscribe("1", "foo")
_ = p.driver.Unsubscribe("1", "foo2")
_ = p.driver.Unsubscribe("1", "foo3")
+
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go
index 2bd819d2..ee072ffe 100644
--- a/tests/plugins/broadcast/plugins/plugin2.go
+++ b/tests/plugins/broadcast/plugins/plugin2.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,13 @@ type Plugin2 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+ exit chan struct{}
}
func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +40,22 @@ func (p *Plugin2) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
+ }
}
}()
@@ -56,6 +64,7 @@ func (p *Plugin2) Serve() chan error {
func (p *Plugin2) Stop() error {
_ = p.driver.Unsubscribe("2", "foo")
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go
index ef926222..288201d1 100644
--- a/tests/plugins/broadcast/plugins/plugin3.go
+++ b/tests/plugins/broadcast/plugins/plugin3.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,15 @@ type Plugin3 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin3) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
+ }
}
}()
@@ -56,6 +66,7 @@ func (p *Plugin3) Serve() chan error {
func (p *Plugin3) Stop() error {
_ = p.driver.Unsubscribe("3", "foo")
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go
index c9b94777..56f79c0f 100644
--- a/tests/plugins/broadcast/plugins/plugin4.go
+++ b/tests/plugins/broadcast/plugins/plugin4.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,15 @@ type Plugin4 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin4) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
+ }
}
}()
@@ -56,6 +66,8 @@ func (p *Plugin4) Serve() chan error {
func (p *Plugin4) Stop() error {
_ = p.driver.Unsubscribe("4", "foo")
+
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go
index 01562a8f..e7cd7e60 100644
--- a/tests/plugins/broadcast/plugins/plugin5.go
+++ b/tests/plugins/broadcast/plugins/plugin5.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,15 @@ type Plugin5 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin5) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
+ }
}
}()
@@ -56,6 +66,8 @@ func (p *Plugin5) Serve() chan error {
func (p *Plugin5) Stop() error {
_ = p.driver.Unsubscribe("5", "foo")
+
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go
index 76f2d6e8..08272196 100644
--- a/tests/plugins/broadcast/plugins/plugin6.go
+++ b/tests/plugins/broadcast/plugins/plugin6.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,15 @@ type Plugin6 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin6) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
+ }
}
}()
@@ -56,6 +66,8 @@ func (p *Plugin6) Serve() chan error {
func (p *Plugin6) Stop() error {
_ = p.driver.Unsubscribe("6", "foo")
+
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/headers/configs/.rr-cors-headers.yaml b/tests/plugins/headers/configs/.rr-cors-headers.yaml
index 9d2ef7e5..b4e960f1 100644
--- a/tests/plugins/headers/configs/.rr-cors-headers.yaml
+++ b/tests/plugins/headers/configs/.rr-cors-headers.yaml
@@ -1,9 +1,5 @@
server:
command: "php ../../http/client.php headers pipes"
- user: ""
- group: ""
- env:
- "RR_HTTP": "true"
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/http/configs/.rr-env.yaml b/tests/plugins/http/configs/.rr-env.yaml
index 99358b04..4ea8ec73 100644
--- a/tests/plugins/http/configs/.rr-env.yaml
+++ b/tests/plugins/http/configs/.rr-env.yaml
@@ -3,17 +3,13 @@ rpc:
server:
command: "php ../../http/client.php env pipes"
- user: ""
- group: ""
- env:
- "env_key": "ENV_VALUE"
relay: "pipes"
relay_timeout: "20s"
http:
address: 127.0.0.1:12084
max_request_size: 1024
- middleware: [ "" ]
+ middleware: []
env:
"RR_HTTP": "true"
"env_key": "ENV_VALUE"
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index 40e3a720..37d9452c 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -26,7 +26,7 @@ func TestHandler_Echo(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -77,7 +77,7 @@ func TestHandler_Headers(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "header", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -138,7 +138,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -198,7 +198,7 @@ func TestHandler_User_Agent(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -258,7 +258,7 @@ func TestHandler_Cookies(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "cookie", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -323,7 +323,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -387,7 +387,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -447,7 +447,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -507,7 +507,7 @@ func TestHandler_FormData_POST(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -580,7 +580,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -653,7 +653,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -725,7 +725,7 @@ func TestHandler_FormData_PUT(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -797,7 +797,7 @@ func TestHandler_FormData_PATCH(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -869,7 +869,7 @@ func TestHandler_Multipart_POST(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -983,7 +983,7 @@ func TestHandler_Multipart_PUT(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1097,7 +1097,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1213,7 +1213,7 @@ func TestHandler_Error(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1259,7 +1259,7 @@ func TestHandler_Error2(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error2", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1305,7 +1305,7 @@ func TestHandler_Error3(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1364,7 +1364,7 @@ func TestHandler_ResponseDuration(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1425,7 +1425,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echoDelay", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1485,7 +1485,7 @@ func TestHandler_ErrorDuration(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1560,7 +1560,7 @@ func TestHandler_IP(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1621,7 +1621,7 @@ func TestHandler_XRealIP(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1687,7 +1687,7 @@ func TestHandler_XForwardedFor(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1752,7 +1752,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1800,7 +1800,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go
index df696668..d02f9eee 100644
--- a/tests/plugins/http/uploads_test.go
+++ b/tests/plugins/http/uploads_test.go
@@ -31,7 +31,7 @@ func TestHandler_Upload_File(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -114,7 +114,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -197,7 +197,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -280,7 +280,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go
index 43335999..62816d02 100644
--- a/tests/plugins/informer/test_plugin.go
+++ b/tests/plugins/informer/test_plugin.go
@@ -10,7 +10,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
)
-var testPoolConfig = pool.Config{
+var testPoolConfig = &pool.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
new file mode 100644
index 00000000..b21f764c
--- /dev/null
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -0,0 +1,57 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+jobs:
+ # worker pool configuration
+ pool:
+ num_workers: 4
+
+ # rabbitmq and similar servers
+ amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+ # beanstalk configuration
+ beanstalk:
+ addr: tcp://localhost:11300
+
+ # amazon sqs configuration
+ sqs:
+ key: api-key
+ secret: api-secret
+ region: us-west-1
+ endpoint: http://localhost:9324
+
+ # job destinations and options
+ dispatch:
+ spiral-jobs-tests-amqp-*.pipeline: amqp
+ spiral-jobs-tests-local-*.pipeline: local
+ spiral-jobs-tests-beanstalk-*.pipeline: beanstalk
+ spiral-jobs-tests-sqs-*.pipeline: sqs
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ local:
+ broker: ephemeral
+
+ amqp:
+ broker: amqp
+ queue: default
+
+ beanstalk:
+ broker: beanstalk
+ tube: default
+
+ sqs:
+ broker: sqs
+ queue: default
+ declare:
+ MessageRetentionPeriod: 86400
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: ["local", "amqp", "beanstalk", "sqs"]
+
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
new file mode 100644
index 00000000..2c58c344
--- /dev/null
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -0,0 +1,92 @@
+package jobs
+
+import (
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pq_plugin"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestJobsInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-jobs-init.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &jobs.Plugin{},
+ &ephemeral.Plugin{},
+ &pq_plugin.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
diff --git a/tests/plugins/resetter/test_plugin.go b/tests/plugins/resetter/test_plugin.go
index 61942516..5c26cbd0 100644
--- a/tests/plugins/resetter/test_plugin.go
+++ b/tests/plugins/resetter/test_plugin.go
@@ -9,7 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
)
-var testPoolConfig = poolImpl.Config{
+var testPoolConfig = &poolImpl.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go
index f1c13734..e813e456 100644
--- a/tests/plugins/server/plugin_pipes.go
+++ b/tests/plugins/server/plugin_pipes.go
@@ -15,7 +15,7 @@ import (
const ConfigSection = "server"
const Response = "test"
-var testPoolConfig = pool.Config{
+var testPoolConfig = &pool.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php
index d0c72eae..b4a028d4 100644
--- a/tests/psr-worker-bench.php
+++ b/tests/psr-worker-bench.php
@@ -56,4 +56,4 @@ if ($env->getMode() === 'http') {
}
$factory->run();
-} \ No newline at end of file
+}
diff --git a/tests/psr-worker.php b/tests/psr-worker.php
index db53eee2..de4befbc 100644
--- a/tests/psr-worker.php
+++ b/tests/psr-worker.php
@@ -20,7 +20,7 @@ while ($req = $psr7->waitRequest()) {
try {
$resp = new \Nyholm\Psr7\Response();
$resp->getBody()->write(str_repeat("hello world", 1000));
-
+
$psr7->respond($resp);
} catch (\Throwable $e) {
$psr7->getWorker()->error((string)$e);
diff --git a/tests/worker-cors.php b/tests/worker-cors.php
new file mode 100644
index 00000000..ea3c986c
--- /dev/null
+++ b/tests/worker-cors.php
@@ -0,0 +1,15 @@
+<?php
+
+use Spiral\RoadRunner\Worker;
+use Spiral\RoadRunner\Http\HttpWorker;
+
+ini_set('display_errors', 'stderr');
+require __DIR__ . '/vendor/autoload.php';
+
+$http = new HttpWorker(Worker::create());
+
+while ($req = $http->waitRequest()) {
+ $http->respond(200, 'Response', [
+ 'Access-Control-Allow-Origin' => ['*']
+ ]);
+}