summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/bst/bst.go152
-rw-r--r--pkg/bst/bst_test.go325
-rw-r--r--pkg/bst/doc.go7
-rw-r--r--pkg/bst/interface.go13
-rw-r--r--pkg/doc/README.md21
-rw-r--r--pkg/doc/pool_workflow.drawio1
-rw-r--r--pkg/doc/pool_workflow.svg3
-rwxr-xr-xpkg/events/general.go41
-rw-r--r--pkg/events/grpc_event.go39
-rw-r--r--pkg/events/interface.go14
-rw-r--r--pkg/events/jobs_events.go81
-rw-r--r--pkg/events/pool_events.go70
-rw-r--r--pkg/events/worker_events.go36
-rwxr-xr-xpkg/payload/payload.go20
-rw-r--r--pkg/pool/config.go75
-rw-r--r--pkg/pool/interface.go53
-rwxr-xr-xpkg/pool/static_pool.go374
-rwxr-xr-xpkg/pool/static_pool_test.go721
-rwxr-xr-xpkg/pool/supervisor_pool.go230
-rw-r--r--pkg/pool/supervisor_test.go413
-rw-r--r--pkg/priority_queue/binary_heap.go125
-rw-r--r--pkg/priority_queue/binary_heap_test.go154
-rw-r--r--pkg/priority_queue/interface.go31
-rw-r--r--pkg/state/job/state.go19
-rw-r--r--pkg/state/process/state.go76
-rw-r--r--pkg/transport/interface.go21
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go197
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go461
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go503
-rwxr-xr-xpkg/transport/socket/socket_factory.go255
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go533
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go622
-rw-r--r--pkg/worker/interface.go74
-rwxr-xr-xpkg/worker/state.go111
-rwxr-xr-xpkg/worker/state_test.go27
-rwxr-xr-xpkg/worker/sync_worker.go283
-rwxr-xr-xpkg/worker/sync_worker_test.go33
-rwxr-xr-xpkg/worker/worker.go220
-rwxr-xr-xpkg/worker/worker_test.go19
-rw-r--r--pkg/worker_handler/constants.go8
-rw-r--r--pkg/worker_handler/errors.go26
-rw-r--r--pkg/worker_handler/errors_windows.go28
-rw-r--r--pkg/worker_handler/handler.go246
-rw-r--r--pkg/worker_handler/parse.go149
-rw-r--r--pkg/worker_handler/request.go189
-rw-r--r--pkg/worker_handler/response.go105
-rw-r--r--pkg/worker_handler/uploads.go159
-rw-r--r--pkg/worker_watcher/container/channel/vec.go107
-rw-r--r--pkg/worker_watcher/container/queue/queue.go102
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go318
50 files changed, 0 insertions, 7890 deletions
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go
deleted file mode 100644
index dab9346c..00000000
--- a/pkg/bst/bst.go
+++ /dev/null
@@ -1,152 +0,0 @@
-package bst
-
-// BST ...
-type BST struct {
- // registered topic, not unique
- topic string
- // associated connections with the topic
- uuids map[string]struct{}
-
- // left and right subtrees
- left *BST
- right *BST
-}
-
-func NewBST() Storage {
- return &BST{
- uuids: make(map[string]struct{}, 10),
- }
-}
-
-// Insert uuid to the topic
-func (b *BST) Insert(uuid string, topic string) {
- curr := b
-
- for {
- if topic == curr.topic {
- curr.uuids[uuid] = struct{}{}
- return
- }
- // if topic less than curr topic
- if topic < curr.topic {
- if curr.left == nil {
- curr.left = &BST{
- topic: topic,
- uuids: map[string]struct{}{uuid: {}},
- }
- return
- }
- // move forward
- curr = curr.left
- } else {
- if curr.right == nil {
- curr.right = &BST{
- topic: topic,
- uuids: map[string]struct{}{uuid: {}},
- }
- return
- }
-
- curr = curr.right
- }
- }
-}
-
-func (b *BST) Contains(topic string) bool {
- curr := b
- for curr != nil {
- switch {
- case topic < curr.topic:
- curr = curr.left
- case topic > curr.topic:
- curr = curr.right
- case topic == curr.topic:
- return true
- }
- }
-
- return false
-}
-
-func (b *BST) Get(topic string) map[string]struct{} {
- curr := b
- for curr != nil {
- switch {
- case topic < curr.topic:
- curr = curr.left
- case topic > curr.topic:
- curr = curr.right
- case topic == curr.topic:
- return curr.uuids
- }
- }
-
- return nil
-}
-
-func (b *BST) Remove(uuid string, topic string) {
- b.removeHelper(uuid, topic, nil)
-}
-
-func (b *BST) removeHelper(uuid string, topic string, parent *BST) {
- curr := b
- for curr != nil {
- if topic < curr.topic { //nolint:gocritic
- parent = curr
- curr = curr.left
- } else if topic > curr.topic {
- parent = curr
- curr = curr.right
- } else {
- // if more than 1 topic - remove only topic, do not remove the whole vertex
- if len(curr.uuids) > 1 {
- if _, ok := curr.uuids[uuid]; ok {
- delete(curr.uuids, uuid)
- return
- }
- }
-
- if curr.left != nil && curr.right != nil { //nolint:gocritic
- curr.topic, curr.uuids = curr.right.traverseForMinString()
- curr.right.removeHelper(curr.topic, uuid, curr)
- } else if parent == nil {
- if curr.left != nil { //nolint:gocritic
- curr.topic = curr.left.topic
- curr.uuids = curr.left.uuids
-
- curr.right = curr.left.right
- curr.left = curr.left.left
- } else if curr.right != nil {
- curr.topic = curr.right.topic
- curr.uuids = curr.right.uuids
-
- curr.left = curr.right.left
- curr.right = curr.right.right
- } else { //nolint:staticcheck
- // single node tree
- }
- } else if parent.left == curr {
- if curr.left != nil {
- parent.left = curr.left
- } else {
- parent.left = curr.right
- }
- } else if parent.right == curr {
- if curr.left != nil {
- parent.right = curr.left
- } else {
- parent.right = curr.right
- }
- }
- break
- }
- }
-}
-
-//go:inline
-func (b *BST) traverseForMinString() (string, map[string]struct{}) {
- if b.left == nil {
- return b.topic, b.uuids
- }
- return b.left.traverseForMinString()
-}
diff --git a/pkg/bst/bst_test.go b/pkg/bst/bst_test.go
deleted file mode 100644
index 2271508c..00000000
--- a/pkg/bst/bst_test.go
+++ /dev/null
@@ -1,325 +0,0 @@
-package bst
-
-import (
- "math/rand"
- "testing"
- "time"
-
- "github.com/google/uuid"
- "github.com/stretchr/testify/assert"
-)
-
-const predifined = "chat-1-2"
-
-func TestNewBST(t *testing.T) {
- // create a new bst
- g := NewBST()
-
- for i := 0; i < 100; i++ {
- g.Insert(uuid.NewString(), "comments")
- }
-
- for i := 0; i < 100; i++ {
- g.Insert(uuid.NewString(), "comments2")
- }
-
- for i := 0; i < 100; i++ {
- g.Insert(uuid.NewString(), "comments3")
- }
-
- // should be 100
- exist := g.Get("comments")
- assert.Len(t, exist, 100)
-
- // should be 100
- exist2 := g.Get("comments2")
- assert.Len(t, exist2, 100)
-
- // should be 100
- exist3 := g.Get("comments3")
- assert.Len(t, exist3, 100)
-}
-
-func BenchmarkGraph(b *testing.B) {
- g := NewBST()
-
- for i := 0; i < 1000; i++ {
- uid := uuid.New().String()
- g.Insert(uuid.NewString(), uid)
- }
-
- g.Insert(uuid.NewString(), predifined)
-
- b.ResetTimer()
- b.ReportAllocs()
-
- for i := 0; i < b.N; i++ {
- exist := g.Get(predifined)
- _ = exist
- }
-}
-
-func BenchmarkBigSearch(b *testing.B) {
- g1 := NewBST()
- g2 := NewBST()
- g3 := NewBST()
-
- predefinedSlice := make([]string, 0, 1000)
- for i := 0; i < 1000; i++ {
- predefinedSlice = append(predefinedSlice, uuid.NewString())
- }
- if predefinedSlice == nil {
- b.FailNow()
- }
-
- for i := 0; i < 1000; i++ {
- g1.Insert(uuid.NewString(), uuid.NewString())
- }
- for i := 0; i < 1000; i++ {
- g2.Insert(uuid.NewString(), uuid.NewString())
- }
- for i := 0; i < 1000; i++ {
- g3.Insert(uuid.NewString(), uuid.NewString())
- }
-
- for i := 0; i < 333; i++ {
- g1.Insert(uuid.NewString(), predefinedSlice[i])
- }
-
- for i := 0; i < 333; i++ {
- g2.Insert(uuid.NewString(), predefinedSlice[333+i])
- }
-
- for i := 0; i < 333; i++ {
- g3.Insert(uuid.NewString(), predefinedSlice[666+i])
- }
-
- b.ResetTimer()
- b.ReportAllocs()
-
- for i := 0; i < b.N; i++ {
- for i := 0; i < 333; i++ {
- exist := g1.Get(predefinedSlice[i])
- _ = exist
- }
- }
- for i := 0; i < b.N; i++ {
- for i := 0; i < 333; i++ {
- exist := g2.Get(predefinedSlice[333+i])
- _ = exist
- }
- }
- for i := 0; i < b.N; i++ {
- for i := 0; i < 333; i++ {
- exist := g3.Get(predefinedSlice[666+i])
- _ = exist
- }
- }
-}
-
-func BenchmarkBigSearchWithRemoves(b *testing.B) {
- g1 := NewBST()
- g2 := NewBST()
- g3 := NewBST()
-
- predefinedSlice := make([]string, 0, 1000)
- for i := 0; i < 1000; i++ {
- predefinedSlice = append(predefinedSlice, uuid.NewString())
- }
- if predefinedSlice == nil {
- b.FailNow()
- }
-
- for i := 0; i < 1000; i++ {
- g1.Insert(uuid.NewString(), uuid.NewString())
- }
- for i := 0; i < 1000; i++ {
- g2.Insert(uuid.NewString(), uuid.NewString())
- }
- for i := 0; i < 1000; i++ {
- g3.Insert(uuid.NewString(), uuid.NewString())
- }
-
- for i := 0; i < 333; i++ {
- g1.Insert(uuid.NewString(), predefinedSlice[i])
- }
-
- for i := 0; i < 333; i++ {
- g2.Insert(uuid.NewString(), predefinedSlice[333+i])
- }
-
- for i := 0; i < 333; i++ {
- g3.Insert(uuid.NewString(), predefinedSlice[666+i])
- }
-
- go func() {
- tt := time.NewTicker(time.Millisecond)
- for {
- select {
- case <-tt.C:
- num := rand.Intn(333) //nolint:gosec
- values := g1.Get(predefinedSlice[num])
- for k := range values {
- g1.Remove(k, predefinedSlice[num])
- }
- }
- }
- }()
-
- b.ResetTimer()
- b.ReportAllocs()
-
- for i := 0; i < b.N; i++ {
- for i := 0; i < 333; i++ {
- exist := g1.Get(predefinedSlice[i])
- _ = exist
- }
- }
- for i := 0; i < b.N; i++ {
- for i := 0; i < 333; i++ {
- exist := g2.Get(predefinedSlice[333+i])
- _ = exist
- }
- }
- for i := 0; i < b.N; i++ {
- for i := 0; i < 333; i++ {
- exist := g3.Get(predefinedSlice[666+i])
- _ = exist
- }
- }
-}
-
-func TestGraph(t *testing.T) {
- g := NewBST()
-
- for i := 0; i < 1000; i++ {
- uid := uuid.New().String()
- g.Insert(uuid.NewString(), uid)
- }
-
- g.Insert(uuid.NewString(), predifined)
-
- exist := g.Get(predifined)
- assert.NotNil(t, exist)
- assert.Len(t, exist, 1)
-}
-
-func TestTreeConcurrentContains(t *testing.T) {
- g := NewBST()
-
- key1 := uuid.NewString()
- key2 := uuid.NewString()
- key3 := uuid.NewString()
- key4 := uuid.NewString()
- key5 := uuid.NewString()
-
- g.Insert(key1, predifined)
- g.Insert(key2, predifined)
- g.Insert(key3, predifined)
- g.Insert(key4, predifined)
- g.Insert(key5, predifined)
-
- for i := 0; i < 100; i++ {
- go func() {
- _ = g.Get(predifined)
- }()
-
- go func() {
- _ = g.Get(predifined)
- }()
-
- go func() {
- _ = g.Get(predifined)
- }()
-
- go func() {
- _ = g.Get(predifined)
- }()
- }
-
- time.Sleep(time.Second * 2)
-
- exist := g.Get(predifined)
- assert.NotNil(t, exist)
- assert.Len(t, exist, 5)
-}
-
-func TestGraphRemove(t *testing.T) {
- g := NewBST()
-
- key1 := uuid.NewString()
- key2 := uuid.NewString()
- key3 := uuid.NewString()
- key4 := uuid.NewString()
- key5 := uuid.NewString()
-
- g.Insert(key1, predifined)
- g.Insert(key2, predifined)
- g.Insert(key3, predifined)
- g.Insert(key4, predifined)
- g.Insert(key5, predifined)
-
- exist := g.Get(predifined)
- assert.NotNil(t, exist)
- assert.Len(t, exist, 5)
-
- g.Remove(key1, predifined)
-
- exist = g.Get(predifined)
- assert.NotNil(t, exist)
- assert.Len(t, exist, 4)
-}
-
-func TestBigSearch(t *testing.T) {
- g1 := NewBST()
- g2 := NewBST()
- g3 := NewBST()
-
- predefinedSlice := make([]string, 0, 1000)
- for i := 0; i < 1000; i++ {
- predefinedSlice = append(predefinedSlice, uuid.NewString())
- }
- if predefinedSlice == nil {
- t.FailNow()
- }
-
- for i := 0; i < 1000; i++ {
- g1.Insert(uuid.NewString(), uuid.NewString())
- }
- for i := 0; i < 1000; i++ {
- g2.Insert(uuid.NewString(), uuid.NewString())
- }
- for i := 0; i < 1000; i++ {
- g3.Insert(uuid.NewString(), uuid.NewString())
- }
-
- for i := 0; i < 333; i++ {
- g1.Insert(uuid.NewString(), predefinedSlice[i])
- }
-
- for i := 0; i < 333; i++ {
- g2.Insert(uuid.NewString(), predefinedSlice[333+i])
- }
-
- for i := 0; i < 333; i++ {
- g3.Insert(uuid.NewString(), predefinedSlice[666+i])
- }
-
- for i := 0; i < 333; i++ {
- exist := g1.Get(predefinedSlice[i])
- assert.NotNil(t, exist)
- assert.Len(t, exist, 1)
- }
-
- for i := 0; i < 333; i++ {
- exist := g2.Get(predefinedSlice[333+i])
- assert.NotNil(t, exist)
- assert.Len(t, exist, 1)
- }
-
- for i := 0; i < 333; i++ {
- exist := g3.Get(predefinedSlice[666+i])
- assert.NotNil(t, exist)
- assert.Len(t, exist, 1)
- }
-}
diff --git a/pkg/bst/doc.go b/pkg/bst/doc.go
deleted file mode 100644
index abb7e6e9..00000000
--- a/pkg/bst/doc.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package bst
-
-/*
-Binary search tree for the pubsub
-
-The vertex may have one or multiply topics associated with the single websocket connection UUID
-*/
diff --git a/pkg/bst/interface.go b/pkg/bst/interface.go
deleted file mode 100644
index 95b03e11..00000000
--- a/pkg/bst/interface.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package bst
-
-// Storage is general in-memory BST storage implementation
-type Storage interface {
- // Insert inserts to a vertex with topic ident connection uuid
- Insert(uuid string, topic string)
- // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed
- Remove(uuid, topic string)
- // Get will return all connections associated with the topic
- Get(topic string) map[string]struct{}
- // Contains checks if the BST contains a topic
- Contains(topic string) bool
-}
diff --git a/pkg/doc/README.md b/pkg/doc/README.md
deleted file mode 100644
index 709df603..00000000
--- a/pkg/doc/README.md
+++ /dev/null
@@ -1,21 +0,0 @@
-This is the drawio diagrams showing basic workflows inside RoadRunner 2.0
-
-Simple HTTP workflow description:
-![alt text](pool_workflow.svg)
-
-1. Allocate sync workers. When plugin starts (which use workers pool), then it allocates required number of processes
- via `cmd.exec` command.
-
-2. When user send HTTP request to the RR2, HTTP plugin receive it and transfer to the workers pool `Exec/ExecWithContex`
-method. And workers pool ask Worker watcher to get free worker.
-
-3. Workers watcher uses stack data structure under the hood and making POP operation to get first free worker. If there are
-no workers in the `stack`, watcher waits for the specified via config (`allocate_timeout`) time.
-
-4. Stack returns free worker to the watcher.
-5. Watcher returns that worker to the `pool`.
-6. Pool invoke `Exec/ExecWithTimeout` method on the golang worker with provided request payload.
-7. Golang worker send that request to the PHP worker via various set of transports (`pkg/transport` package).
-8. PHP worker send back response to the golang worker (or error via stderr).
-9. Golang worker return response payload to the pool.
-10. Pool process this response and return answer to the user.
diff --git a/pkg/doc/pool_workflow.drawio b/pkg/doc/pool_workflow.drawio
deleted file mode 100644
index 3f74d0fc..00000000
--- a/pkg/doc/pool_workflow.drawio
+++ /dev/null
@@ -1 +0,0 @@
-<mxfile host="Electron" modified="2021-01-24T16:29:37.978Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="8v25I0qLU_vMqqVrx7cN" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">7Vxbd6M4Ev41OWfnoX0Qkrg8Jul09+zOTGc6vSfTjwRkm22MvCB37Pn1IwGydcExsTGO03HOiUEgLqqqr6o+lXwBr2fLj0U0n/5OE5JduE6yvIDvL1wXAMflX6Jl1bQ4jlO3TIo0qduUhrv0byJPbFoXaULKpq1uYpRmLJ3rjTHNcxIzrS0qCvqonzamWaI1zKMJsRru4iizW+/ThE3li3nh5sAnkk6m8tYeRvWRWSTPbl6lnEYJfVSa4M0FvC4oZfXWbHlNMjF8+sB82HJ0/WQFyVmXDn/hCPvfPk++/Ocm+HyPr5Z/TsfvvOYyP6Js0bzyhetl/IJXU37Im4itu3Q25wPiOv8tScG/CvL/BSmZPJHfcnNu865sJUewoIs8IeIZHH74cZoycjePYnH0kSuNuBGbZXwP8M0xzdmHaJZmQl+u6aJIqxv+Qfi4XUVZOsl5e0bG4k4/SMFSLqnLppnR+fr+6qjIN+Snk6XS1IzSR0JnhBUrfkpzFGNcd2mUFrmNFj+qGhA2GjBVpB9ItY0atZusL74RDN9oZPMMOYHQGliScE1tdmnBpnRC8yi72bRe6UO/Oec3KoaqGvD/EcZWjdlFC0Z1cZBlyv4S3Ucubna/KYfeL5tLVzsruZPzF1Z7if1v6sFNv2pPdtSE/2/CHooozUv+0r/TnD4l2pIrSkyeGr0GOaJiQtgT50lTFUP7pKYUJItY+kMHiTahN11vacqfea1hoedpGgYc2SCvUT9q083QnfVz7K9Orm32H8XrcNgS/8pVHgt9p8X3yv7+dfv5ln/ROSn4a9P8l1Zl/C164PCvKZA02ZiLjF/JNtpZmiS1rpIy/Tt6qK4nlGEuXr0aDHx1gd/vpR5PWpKFB2sn0jyFBtNtOPHOGTke0iUZ1HsH6sc7AJ2RDkL8ZsB1lA80VYaOxyU5jrZgS1vgmwbUQnE939e9RT8agAz5hyMHDSZv/7TOxledjdPV2fiaswEv3tnAvn3NQQGGRC7FxvEZ27hU4cNtnJs4QlDHYtiPjUPDxvEogEPZOHTsCEDcae3030RfwTvwdNm7vYgemaJ/B9DIw0MJ347+7iupi8G+j1g8bVGA5+VxaZZd04wWVV84HhMvjnl7yQr6nShHEj98cA7C4+55XQgNlwqbCErN63BjF2pa5x0tqwPWMA/qaPdJ6rSMbrT2uYO7WbejmwV9u9mq62VRRCvlhAbNthp8YOpeE71vtKe+Yr9m7lp2fpllNI6YleSVZwz4YBsS7AP4GHlHiecD3Y0gA1KOGMxDSwnaUecnFDaAGGhi6SmwQ7pvxyPgDyZuS9q/5ilLuSS5N+jTpycRCcatPt2LA/IwHsan+4GOq4HT4tOdFp9uGmB/Pr09bXpjajt5dXgqr36YzD3L6lQ69Y572vtzT6ukXvfEmjm6P/T7AV7fzKu8keerrKk7GA4jSyXcN+nX0oc8/NWl1I/0PYsz9Qfzu/CksA8UzFf40+flcv7pkrmusA/dF4X70MZ974yNHPZm5Dy0xrgnqwZoBAejRVE7SJ/V1Acf/B2GXO3dkiLlwyV076VYN+rbuvfiaoADA82PyKTiQ9cOPtrVwceHdcAONtT/CPwRaqEOZDHTOeKbNO4+8M1xJROxL74NAGZ2DHpH8qR65nJO81IkJxyL+L+p2FyUZ52h9CfeutjCmKHphwj0Xf2qbjAabNrHNuebJYn5pcTXfcqm11wQZMksHTh/nsic+2nlidrmfo7GE21JB88q0jhdkYWs1DuT6R/gID1C4A27PP6OHsdx+a6d0aAzdgnSyPrw+BAiA7t7mi1wDdoCgAHrQNzQEvivAgKjir4cF0R8recIryt0LETbii7EAxb2rMJPqh6uo08cyqmYA9UDADxCoakgCKmsJhhMW+yqoTsWxd8rR8r96Pbp5INCCEyCBLWFEIH7IOpgTxJCACc4df0ItI33tOsENBrS7UhDrqMItdtpaEi3Y1TRtXZzky46h9JhA6QHrqVNRn7wNZ0RuniN+QFwPd25tyYIshZwkAQB2umaNfDlNJqLzXFGlpdiuVtlLEmz+T7OorJMY8NYN9F8r5O++5SV7W+qOOzZVBU5t2G4bDvUmzuSQZPMHzCWcdRvbq38aSMdDQ7RGXYNEbTZJXN94BZtXcyyy5hRNQas4sVbWqZieRE/5YEyRmctQWK9ym/bksEjef7QWBAo1/4pOgMH9ft2sf6a2GuGXuX1tpR0V1TP0UJ0ESWUdcAwkJgAMDFcptAqhoeBLSizPKs3QSG70voLiUldG7IhYV+9YHyj2iN0bMHI9SbDCMZe0maJgV8mnZdEl4TEsJKjK+O5DyN2SFMnZW0hzXjcHBlg1D3PoDzlhMUO1ILHQi1kc0v2mO9MVhRZmEawdhpKCETnJK9bmuwleFIuL6/OXWrq7sXLHQOdQycwjNXxARo27EC+pUW32WKS5rxtXeparVJ+9bjqOUYsGUIbVuUvPAwCq7Kc+q0K6qj0g+QDX0gZFLYz1fYVxOfBCuNthrhXpSuSBnho5ojaS1kNtvn4NBE+7erwM6cYUWfeond/fpiN2ylncM423uP6cBA6ejlzP/OCAIhCaeWjzwfLepQByojsWYYtqxkOWiOMxV8rD1x9OsheHFf61Z+eUliMXtpCI2zzbzVdz990ldEoefXxL0BGjUQb4SOXGgwT/9q0whfy46fje5Br1MMGLUTcoHwPtjHs09ev4ket5k3yeBCU7QImA+qCmLT/HMJDwHHmyYmRZ7DW2FhkIxfDaIjVwoZCdCQheHb1gDXuL2lGa6sQdgZvuGuCdqJZJ+yH+jW6zzo54QgHys+Q6TpmTnkcmQyShL9KBlGadUBY1SRzmpNngW4bHOiAsUu1LMtfc8Z94K9jBCt+S6UKalE391iximczBB9pFuUT3rYllvzJRAbd3SJrjS+PJzM7lrn9JDxm5+nEVy0wLHmdJwQWoF7kxXc3PxNcQ+fm55bhzT8=</diagram></mxfile> \ No newline at end of file
diff --git a/pkg/doc/pool_workflow.svg b/pkg/doc/pool_workflow.svg
deleted file mode 100644
index 1e043eaa..00000000
--- a/pkg/doc/pool_workflow.svg
+++ /dev/null
@@ -1,3 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
-<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="1200px" height="811px" viewBox="-0.5 -0.5 1200 811" content="&lt;mxfile host=&quot;Electron&quot; modified=&quot;2021-01-24T14:08:35.229Z&quot; agent=&quot;5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36&quot; etag=&quot;XUF1gTxKr8mJfXGLcVd8&quot; version=&quot;14.1.8&quot; type=&quot;device&quot;&gt;&lt;diagram id=&quot;8w40hpb1-UDYxj1ewOsN&quot; name=&quot;Page-1&quot;&gt;7Vxbd6M4Ev41OWfnoX0Qkrg8Jul09+zOTGc6vSfTjwRkm22MvCB37Pn1IwGydcExsTGO03HOiUEgLqqqr6o+lXwBr2fLj0U0n/5OE5JduE6yvIDvL1wXAMflX6Jl1bQ4jlO3TIo0qduUhrv0byJPbFoXaULKpq1uYpRmLJ3rjTHNcxIzrS0qCvqonzamWaI1zKMJsRru4iizW+/ThE3li3nh5sAnkk6m8tYeRvWRWSTPbl6lnEYJfVSa4M0FvC4oZfXWbHlNMjF8+sB82HJ0/WQFyVmXDn/hCPvfPk++/Ocm+HyPr5Z/TsfvvOYyP6Js0bzyhetl/IJXU37Im4itu3Q25wPiOv8tScG/CvL/BSmZPJHfcnNu865sJUewoIs8IeIZHH74cZoycjePYnH0kSuNuBGbZXwP8M0xzdmHaJZmQl+u6aJIqxv+Qfi4XUVZOsl5e0bG4k4/SMFSLqnLppnR+fr+6qjIN+Snk6XS1IzSR0JnhBUrfkpzFGNcd2mUFrmNFj+qGhA2GjBVpB9ItY0atZusL74RDN9oZPMMOYHQGliScE1tdmnBpnRC8yi72bRe6UO/Oec3KoaqGvD/EcZWjdlFC0Z1cZBlyv4S3Ucubna/KYfeL5tLVzsruZPzF1Z7if1v6sFNv2pPdtSE/2/CHooozUv+0r/TnD4l2pIrSkyeGr0GOaJiQtgT50lTFUP7pKYUJItY+kMHiTahN11vacqfea1hoedpGgYc2SCvUT9q083QnfVz7K9Orm32H8XrcNgS/8pVHgt9p8X3yv7+dfv5ln/ROSn4a9P8l1Zl/C164PCvKZA02ZiLjF/JNtpZmiS1rpIy/Tt6qK4nlGEuXr0aDHx1gd/vpR5PWpKFB2sn0jyFBtNtOPHOGTke0iUZ1HsH6sc7AJ2RDkL8ZsB1lA80VYaOxyU5jrZgS1vgmwbUQnE939e9RT8agAz5hyMHDSZv/7TOxledjdPV2fiaswEv3tnAvn3NQQGGRC7FxvEZ27hU4cNtnJs4QlDHYtiPjUPDxvEogEPZOHTsCEDcae3030RfwTvwdNm7vYgemaJ/B9DIw0MJ347+7iupi8G+j1g8bVGA5+VxaZZd04wWVV84HhMvjnl7yQr6nShHEj98cA7C4+55XQgNlwqbCErN63BjF2pa5x0tqwPWMA/qaPdJ6rSMbrT2uYO7WbejmwV9u9mq62VRRCvlhAbNthp8YOpeE71vtKe+Yr9m7lp2fpllNI6YleSVZwz4YBsS7AP4GHlHiecD3Y0gA1KOGMxDSwnaUecnFDaAGGhi6SmwQ7pvxyPgDyZuS9q/5ilLuSS5N+jTpycRCcatPt2LA/IwHsan+4GOq4HT4tOdFp9uGmB/Pr09bXpjajt5dXgqr36YzD3L6lQ69Y572vtzT6ukXvfEmjm6P/T7AV7fzKu8keerrKk7GA4jSyXcN+nX0oc8/NWl1I/0PYsz9Qfzu/CksA8UzFf40+flcv7pkrmusA/dF4X70MZ974yNHPZm5Dy0xrgnqwZoBAejRVE7SJ/V1Acf/B2GXO3dkiLlwyV076VYN+rbuvfiaoADA82PyKTiQ9cOPtrVwceHdcAONtT/CPwRaqEOZDHTOeKbNO4+8M1xJROxL74NAGZ2DHpH8qR65nJO81IkJxyL+L+p2FyUZ52h9CfeutjCmKHphwj0Xf2qbjAabNrHNuebJYn5pcTXfcqm11wQZMksHTh/nsic+2nlidrmfo7GE21JB88q0jhdkYWs1DuT6R/gID1C4A27PP6OHsdx+a6d0aAzdgnSyPrw+BAiA7t7mi1wDdoCgAHrQNzQEvivAgKjir4cF0R8recIryt0LETbii7EAxb2rMJPqh6uo08cyqmYA9UDADxCoakgCKmsJhhMW+yqoTsWxd8rR8r96Pbp5INCCEyCBLWFEIH7IOpgTxJCACc4df0ItI33tOsENBrS7UhDrqMItdtpaEi3Y1TRtXZzky46h9JhA6QHrqVNRn7wNZ0RuniN+QFwPd25tyYIshZwkAQB2umaNfDlNJqLzXFGlpdiuVtlLEmz+T7OorJMY8NYN9F8r5O++5SV7W+qOOzZVBU5t2G4bDvUmzuSQZPMHzCWcdRvbq38aSMdDQ7RGXYNEbTZJXN94BZtXcyyy5hRNQas4sVbWqZieRE/5YEyRmctQWK9ym/bksEjef7QWBAo1/4pOgMH9ft2sf6a2GuGXuX1tpR0V1TP0UJ0ESWUdcAwkJgAMDFcptAqhoeBLSizPKs3QSG70voLiUldG7IhYV+9YHyj2iN0bMHI9SbDCMZe0maJgV8mnZdEl4TEsJKjK+O5DyN2SFMnZW0hzXjcHBlg1D3PoDzlhMUO1ILHQi1kc0v2mO9MVhRZmEawdhpKCETnJK9bmuwleFIuL6/OXWrq7sXLHQOdQycwjNXxARo27EC+pUW32WKS5rxtXeparVJ+9bjqOUYsGUIbVuUvPAwCq7Kc+q0K6qj0g+QDX0gZFLYz1fYVxOfBCuNthrhXpSuSBnho5ojaS1kNtvn4NBE+7erwM6cYUWfeond/fpiN2ylncM423uP6cBA6ejlzP/OCAIhCaeWjzwfLepQByojsWYYtqxkOWiOMxV8rD1x9OsheHFf61Z+eUliMXtpCI2zzbzVdz990ldEoefXxL0BGjUQb4SOXGgwT/9q0whfy46fje5Br1MMGLUTcoHwPtjHs09ev4ket5k3yeBCU7QImA+qCmLT/HMJDwHHmyYmRZ7DW2FhkIxfDaIjVwoZCdCQheHb1gDXuL2lGa6sQdgZvuGuCdqJZJ+yH+jW6zzo54QgHys+Q6TpmTnkcmQyShL9KBlGadUBY1SRzmpNngW4bHOiAsUu1LMtfc8Z94K9jBCt+S6UKalE391iximczBB9pFuUT3rYllvzJRAbd3SJrjS+PJzM7lrn9JDxm5+nEVy0wLHmdJwQWoF7kxXc3PxNcQ+fm55bhzT8=&lt;/diagram&gt;&lt;/mxfile&gt;" style="background-color: rgb(255, 255, 255);"><defs/><g><rect x="0" y="0" width="1199" height="810" fill="#ffffff" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe flex-start; justify-content: unsafe flex-start; width: 1197px; height: 1px; padding-top: 7px; margin-left: 2px;"><div style="box-sizing: border-box; font-size: 0; text-align: left; "><div style="display: inline-block; font-size: 12px; font-family: Courier New; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; "><h1>Simple User request</h1></div></div></div></foreignObject><text x="2" y="19" fill="#000000" font-family="Courier New" font-size="12px">Simple User request</text></switch></g><path d="M 417.5 574 L 417.5 657.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 417.5 662.88 L 414 655.88 L 417.5 657.63 L 421 655.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 616px; margin-left: 296px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Give me sync worker (POP operation)</div></div></div></foreignObject><text x="296" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Give me sync worker (POP operation)</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 617px; margin-left: 418px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">3</div></div></div></foreignObject><text x="418" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">3</text></switch></g><path d="M 492.5 514 L 492.5 430.37" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 492.5 425.12 L 496 432.12 L 492.5 430.37 L 489 432.12 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 493px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">5</div></div></div></foreignObject><text x="493" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">5</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 465px; margin-left: 535px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Get worker</div></div></div></foreignObject><text x="535" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Get worker</text></switch></g><rect x="380" y="514" width="150" height="60" fill="#ffe6cc" stroke="#d79b00" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 544px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Workers Watcher</div></div></div></foreignObject><text x="455" y="548" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Workers Watcher</text></switch></g><path d="M 280 424 L 280 544 L 373.63 544" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 378.88 544 L 371.88 547.5 L 373.63 544 L 371.88 540.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 515px; margin-left: 202px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Allocate sync workers</div></div></div></foreignObject><text x="202" y="518" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Allocate sync workers</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 514px; margin-left: 280px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">1</div></div></div></foreignObject><text x="280" y="518" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">1</text></switch></g><rect x="230" y="384" width="100" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 404px; margin-left: 231px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Initialize</div></div></div></foreignObject><text x="280" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Initialize</text></switch></g><path d="M 417.5 424 L 417.5 507.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 417.5 512.88 L 414 505.88 L 417.5 507.63 L 421 505.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 352px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Give me SyncWorker</div></div></div></foreignObject><text x="352" y="467" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Give me SyncWorker</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 418px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">2</div></div></div></foreignObject><text x="418" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">2</text></switch></g><path d="M 530 414 L 700.63 414" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 705.88 414 L 698.88 417.5 L 700.63 414 L 698.88 410.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 415px; margin-left: 618px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">6</div></div></div></foreignObject><text x="618" y="418" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">6</text></switch></g><path d="M 492.5 384 L 483 384 L 483 324 L 520 324 L 520 83 L 468.87 83" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 463.62 83 L 470.62 79.5 L 468.87 83 L 470.62 86.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 224px; margin-left: 521px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">10</div></div></div></foreignObject><text x="521" y="227" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">10</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 225px; margin-left: 597px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Send response to the user</div></div></div></foreignObject><text x="597" y="228" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Send response to the user</text></switch></g><rect x="380" y="384" width="150" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 404px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Exec/ExecWithContext</div></div></div></foreignObject><text x="455" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec/ExecWithContext</text></switch></g><path d="M 492.5 664 L 492.5 624 L 492.5 580.37" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 492.5 575.12 L 496 582.12 L 492.5 580.37 L 489 582.12 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 616px; margin-left: 494px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">4</div></div></div></foreignObject><text x="494" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">4</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 617px; margin-left: 610px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">I have free workers, here you are</div></div></div></foreignObject><text x="610" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">I have free workers, here you are</text></switch></g><rect x="380" y="664" width="150" height="60" fill="#d5e8d4" stroke="#82b366" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 694px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Stack with workers</div></div></div></foreignObject><text x="455" y="698" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Stack with workers</text></switch></g><path d="M 707 394 L 536.37 394" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 531.12 394 L 538.12 390.5 L 536.37 394 L 538.12 397.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 394px; margin-left: 618px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">9</div></div></div></foreignObject><text x="618" y="397" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">9</text></switch></g><rect x="707" y="384" width="163" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 161px; height: 1px; padding-top: 404px; margin-left: 708px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Exec/ExecWithTimeout</div></div></div></foreignObject><text x="789" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec/ExecWithTimeout</text></switch></g><path d="M 450 289.5 L 460 289.5 L 460 364.5 L 470.5 364.5 L 455 383.5 L 439.5 364.5 L 450 364.5 Z" fill="none" stroke="#000000" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="all"/><ellipse cx="455" cy="84.5" rx="7.5" ry="7.5" fill="#ffffff" stroke="#000000" pointer-events="all"/><path d="M 455 92 L 455 117 M 455 97 L 440 97 M 455 97 L 470 97 M 455 117 L 440 137 M 455 117 L 470 137" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 144px; margin-left: 455px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">User request</div></div></div></foreignObject><text x="455" y="156" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">User...</text></switch></g><rect x="607" y="426" width="198" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 435px; margin-left: 706px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Send request to the worker</div></div></div></foreignObject><text x="706" y="438" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Send request to the worker</text></switch></g><rect x="618" y="368" width="125" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 377px; margin-left: 681px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Receive response</div></div></div></foreignObject><text x="681" y="380" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Receive response</text></switch></g><ellipse cx="125" cy="404" rx="11" ry="11" fill="#000000" stroke="#ff0000" pointer-events="all"/><path d="M 140 404 L 227.76 404" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 219.88 408.5 L 228.88 404 L 219.88 399.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="all"/><rect x="45" y="371" width="161" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 380px; margin-left: 126px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Plugin Initialization</div></div></div></foreignObject><text x="126" y="383" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Plugin Initialization</text></switch></g><path d="M 870 414 L 983.63 414" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 988.88 414 L 981.88 417.5 L 983.63 414 L 981.88 410.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 413px; margin-left: 930px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">7</div></div></div></foreignObject><text x="930" y="416" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">7</text></switch></g><path d="M 990 394 L 876.37 394" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 871.12 394 L 878.12 390.5 L 876.37 394 L 878.12 397.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 394px; margin-left: 931px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">8</div></div></div></foreignObject><text x="931" y="397" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">8</text></switch></g><rect x="990" y="384" width="100" height="40" fill="#f5f5f5" stroke="#666666" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 404px; margin-left: 991px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #333333; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Worker</div></div></div></foreignObject><text x="1040" y="408" fill="#333333" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Worker</text></switch></g><rect x="893" y="426" width="96" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 435px; margin-left: 941px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Exec payload</div></div></div></foreignObject><text x="941" y="438" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec payload</text></switch></g><rect x="873" y="366" width="125" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 375px; margin-left: 936px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Reveive response</div></div></div></foreignObject><text x="936" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Reveive response</text></switch></g><rect x="401" y="255" width="108" height="34" fill="#f8cecc" stroke="#b85450" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 106px; height: 1px; padding-top: 272px; margin-left: 402px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">HTTP plugin</div></div></div></foreignObject><text x="455" y="276" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">HTTP plugin</text></switch></g><path d="M 450 157.5 L 460 157.5 L 460 235.5 L 470.5 235.5 L 455 254.5 L 439.5 235.5 L 450 235.5 Z" fill="none" stroke="#000000" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="all"/><rect x="490" y="364" width="40" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 38px; height: 1px; padding-top: 374px; margin-left: 491px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Pool</div></div></div></foreignObject><text x="510" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Pool</text></switch></g><rect x="770" y="364" width="100" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 374px; margin-left: 771px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Golang Worker</div></div></div></foreignObject><text x="820" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Golang Worker</text></switch></g><rect x="1006" y="364" width="84" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 82px; height: 1px; padding-top: 374px; margin-left: 1007px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">PHP worker</div></div></div></foreignObject><text x="1048" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">PHP worker</text></switch></g></g><switch><g requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"/><a transform="translate(0,-5)" xlink:href="https://www.diagrams.net/doc/faq/svg-export-text-problems" target="_blank"><text text-anchor="middle" font-size="10px" x="50%" y="100%">Viewer does not support full SVG 1.1</text></a></switch></svg> \ No newline at end of file
diff --git a/pkg/events/general.go b/pkg/events/general.go
deleted file mode 100755
index 5cf13e10..00000000
--- a/pkg/events/general.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package events
-
-import (
- "sync"
-)
-
-const UnknownEventType string = "Unknown event type"
-
-// HandlerImpl helps to broadcast events to multiple listeners.
-type HandlerImpl struct {
- listeners []Listener
- sync.RWMutex // all receivers should be pointers
-}
-
-func NewEventsHandler() Handler {
- return &HandlerImpl{listeners: make([]Listener, 0, 2)}
-}
-
-// NumListeners returns number of event listeners.
-func (eb *HandlerImpl) NumListeners() int {
- eb.Lock()
- defer eb.Unlock()
- return len(eb.listeners)
-}
-
-// AddListener registers new event listener.
-func (eb *HandlerImpl) AddListener(listener Listener) {
- eb.Lock()
- defer eb.Unlock()
- eb.listeners = append(eb.listeners, listener)
-}
-
-// Push broadcast events across all event listeners.
-func (eb *HandlerImpl) Push(e interface{}) {
- // ReadLock here because we are not changing listeners
- eb.RLock()
- defer eb.RUnlock()
- for k := range eb.listeners {
- eb.listeners[k](e)
- }
-}
diff --git a/pkg/events/grpc_event.go b/pkg/events/grpc_event.go
deleted file mode 100644
index 31ff4957..00000000
--- a/pkg/events/grpc_event.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package events
-
-import (
- "time"
-
- "google.golang.org/grpc"
-)
-
-const (
- // EventUnaryCallOk represents success unary call response
- EventUnaryCallOk G = iota + 13000
-
- // EventUnaryCallErr raised when unary call ended with error
- EventUnaryCallErr
-)
-
-type G int64
-
-func (ev G) String() string {
- switch ev {
- case EventUnaryCallOk:
- return "EventUnaryCallOk"
- case EventUnaryCallErr:
- return "EventUnaryCallErr"
- }
- return UnknownEventType
-}
-
-// JobEvent represent job event.
-type GRPCEvent struct {
- Event G
- // Info contains unary call info.
- Info *grpc.UnaryServerInfo
- // Error associated with event.
- Error error
- // event timings
- Start time.Time
- Elapsed time.Duration
-}
diff --git a/pkg/events/interface.go b/pkg/events/interface.go
deleted file mode 100644
index 7d57e4d0..00000000
--- a/pkg/events/interface.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package events
-
-// Handler interface
-type Handler interface {
- // NumListeners return number of active listeners
- NumListeners() int
- // AddListener adds lister to the publisher
- AddListener(listener Listener)
- // Push pushes event to the listeners
- Push(e interface{})
-}
-
-// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service.
-type Listener func(event interface{})
diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go
deleted file mode 100644
index f65ede67..00000000
--- a/pkg/events/jobs_events.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package events
-
-import (
- "time"
-)
-
-const (
- // EventPushOK thrown when new job has been added. JobEvent is passed as context.
- EventPushOK J = iota + 12000
-
- // EventPushError caused when job can not be registered.
- EventPushError
-
- // EventJobStart thrown when new job received.
- EventJobStart
-
- // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
- EventJobOK
-
- // EventJobError thrown on all job related errors. See JobError as context.
- EventJobError
-
- // EventPipeActive when pipeline has started.
- EventPipeActive
-
- // EventPipeStopped when pipeline has been stopped.
- EventPipeStopped
-
- // EventPipePaused when pipeline has been paused.
- EventPipePaused
-
- // EventPipeError when pipeline specific error happen.
- EventPipeError
-
- // EventDriverReady thrown when broken is ready to accept/serve tasks.
- EventDriverReady
-)
-
-type J int64
-
-func (ev J) String() string {
- switch ev {
- case EventPushOK:
- return "EventPushOK"
- case EventPushError:
- return "EventPushError"
- case EventJobStart:
- return "EventJobStart"
- case EventJobOK:
- return "EventJobOK"
- case EventJobError:
- return "EventJobError"
- case EventPipeActive:
- return "EventPipeActive"
- case EventPipeStopped:
- return "EventPipeStopped"
- case EventPipeError:
- return "EventPipeError"
- case EventDriverReady:
- return "EventDriverReady"
- case EventPipePaused:
- return "EventPipePaused"
- }
- return UnknownEventType
-}
-
-// JobEvent represent job event.
-type JobEvent struct {
- Event J
- // String is job id.
- ID string
- // Pipeline name
- Pipeline string
- // Associated driver name (amqp, ephemeral, etc)
- Driver string
- // Error for the jobs/pipes errors
- Error error
- // event timings
- Start time.Time
- Elapsed time.Duration
-}
diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go
deleted file mode 100644
index 4d4cae5d..00000000
--- a/pkg/events/pool_events.go
+++ /dev/null
@@ -1,70 +0,0 @@
-package events
-
-const (
- // EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct P = iota + 10000
-
- // EventWorkerDestruct thrown after worker destruction.
- EventWorkerDestruct
-
- // EventPoolError caused on pool wide errors.
- EventPoolError
-
- // EventSupervisorError triggered when supervisor can not complete work.
- EventSupervisorError
-
- // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
- EventNoFreeWorkers
-
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory
-
- // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
- EventTTL
-
- // EventIdleTTL triggered when worker spends too much time at rest.
- EventIdleTTL
-
- // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
- EventExecTTL
-
- // EventPoolRestart triggered when pool restart is needed
- EventPoolRestart
-)
-
-type P int64
-
-func (ev P) String() string {
- switch ev {
- case EventWorkerConstruct:
- return "EventWorkerConstruct"
- case EventWorkerDestruct:
- return "EventWorkerDestruct"
- case EventPoolError:
- return "EventPoolError"
- case EventSupervisorError:
- return "EventSupervisorError"
- case EventNoFreeWorkers:
- return "EventNoFreeWorkers"
- case EventMaxMemory:
- return "EventMaxMemory"
- case EventTTL:
- return "EventTTL"
- case EventIdleTTL:
- return "EventIdleTTL"
- case EventExecTTL:
- return "EventExecTTL"
- case EventPoolRestart:
- return "EventPoolRestart"
- }
- return UnknownEventType
-}
-
-// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
-type PoolEvent struct {
- // Event type, see below.
- Event P
-
- // Payload depends on event type, typically it's worker or error.
- Payload interface{}
-}
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go
deleted file mode 100644
index 39c38e57..00000000
--- a/pkg/events/worker_events.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package events
-
-const (
- // EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError W = iota + 11000
- // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
- EventWorkerLog
- // EventWorkerStderr is the worker standard error output
- EventWorkerStderr
-)
-
-type W int64
-
-func (ev W) String() string {
- switch ev {
- case EventWorkerError:
- return "EventWorkerError"
- case EventWorkerLog:
- return "EventWorkerLog"
- case EventWorkerStderr:
- return "EventWorkerStderr"
- }
- return UnknownEventType
-}
-
-// WorkerEvent wraps worker events.
-type WorkerEvent struct {
- // Event id, see below.
- Event W
-
- // Worker triggered the event.
- Worker interface{}
-
- // Event specific payload.
- Payload interface{}
-}
diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go
deleted file mode 100755
index e1e45ac1..00000000
--- a/pkg/payload/payload.go
+++ /dev/null
@@ -1,20 +0,0 @@
-package payload
-
-import (
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-// Payload carries binary header and body to stack and
-// back to the server.
-type Payload struct {
- // Context represent payload context, might be omitted.
- Context []byte
-
- // body contains binary payload to be processed by WorkerProcess.
- Body []byte
-}
-
-// String returns payload body as string
-func (p *Payload) String() string {
- return utils.AsString(p.Body)
-}
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
deleted file mode 100644
index 3a058956..00000000
--- a/pkg/pool/config.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package pool
-
-import (
- "runtime"
- "time"
-)
-
-// Config .. Pool config Configures the pool behavior.
-type Config struct {
- // Debug flag creates new fresh worker before every request.
- Debug bool
-
- // NumWorkers defines how many sub-processes can be run at once. This value
- // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
- NumWorkers uint64 `mapstructure:"num_workers"`
-
- // MaxJobs defines how many executions is allowed for the worker until
- // it's destruction. set 1 to create new process for each new task, 0 to let
- // worker handle as many tasks as it can.
- MaxJobs uint64 `mapstructure:"max_jobs"`
-
- // AllocateTimeout defines for how long pool will be waiting for a worker to
- // be freed to handle the task. Defaults to 60s.
- AllocateTimeout time.Duration `mapstructure:"allocate_timeout"`
-
- // DestroyTimeout defines for how long pool should be waiting for worker to
- // properly destroy, if timeout reached worker will be killed. Defaults to 60s.
- DestroyTimeout time.Duration `mapstructure:"destroy_timeout"`
-
- // Supervision config to limit worker and pool memory usage.
- Supervisor *SupervisorConfig `mapstructure:"supervisor"`
-}
-
-// InitDefaults enables default config values.
-func (cfg *Config) InitDefaults() {
- if cfg.NumWorkers == 0 {
- cfg.NumWorkers = uint64(runtime.NumCPU())
- }
-
- if cfg.AllocateTimeout == 0 {
- cfg.AllocateTimeout = time.Minute
- }
-
- if cfg.DestroyTimeout == 0 {
- cfg.DestroyTimeout = time.Minute
- }
- if cfg.Supervisor == nil {
- return
- }
- cfg.Supervisor.InitDefaults()
-}
-
-type SupervisorConfig struct {
- // WatchTick defines how often to check the state of worker.
- WatchTick time.Duration `mapstructure:"watch_tick"`
-
- // TTL defines maximum time worker is allowed to live.
- TTL time.Duration `mapstructure:"ttl"`
-
- // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL time.Duration `mapstructure:"idle_ttl"`
-
- // ExecTTL defines maximum lifetime per job.
- ExecTTL time.Duration `mapstructure:"exec_ttl"`
-
- // MaxWorkerMemory limits memory per worker.
- MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
-}
-
-// InitDefaults enables default config values.
-func (cfg *SupervisorConfig) InitDefaults() {
- if cfg.WatchTick == 0 {
- cfg.WatchTick = time.Second
- }
-}
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
deleted file mode 100644
index 4049122c..00000000
--- a/pkg/pool/interface.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package pool
-
-import (
- "context"
-
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-// Pool managed set of inner worker processes.
-type Pool interface {
- // GetConfig returns pool configuration.
- GetConfig() interface{}
-
- // Exec executes task with payload
- Exec(rqs *payload.Payload) (*payload.Payload, error)
-
- // Workers returns worker list associated with the pool.
- Workers() (workers []worker.BaseProcess)
-
- // RemoveWorker removes worker from the pool.
- RemoveWorker(worker worker.BaseProcess) error
-
- // Destroy all underlying stack (but let them to complete the task).
- Destroy(ctx context.Context)
-
- // ExecWithContext executes task with context which is used with timeout
- execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error)
-}
-
-// Watcher is an interface for the Sync workers lifecycle
-type Watcher interface {
- // Watch used to add workers to the container
- Watch(workers []worker.BaseProcess) error
-
- // Take takes the first free worker
- Take(ctx context.Context) (worker.BaseProcess, error)
-
- // Release releases the worker putting it back to the queue
- Release(w worker.BaseProcess)
-
- // Allocate - allocates new worker and put it into the WorkerWatcher
- Allocate() error
-
- // Destroy destroys the underlying container
- Destroy(ctx context.Context)
-
- // List return all container w/o removing it from internal storage
- List() []worker.BaseProcess
-
- // Remove will remove worker from the container
- Remove(wb worker.BaseProcess)
-}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
deleted file mode 100755
index 7e190846..00000000
--- a/pkg/pool/static_pool.go
+++ /dev/null
@@ -1,374 +0,0 @@
-package pool
-
-import (
- "context"
- "os/exec"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/transport"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-// StopRequest can be sent by worker to indicate that restart is required.
-const StopRequest = "{\"stop\":true}"
-
-// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error)
-
-type Options func(p *StaticPool)
-
-type Command func() *exec.Cmd
-
-// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
-type StaticPool struct {
- cfg *Config
-
- // worker command creator
- cmd Command
-
- // creates and connects to stack
- factory transport.Factory
-
- // distributes the events
- events events.Handler
-
- // saved list of event listeners
- listeners []events.Listener
-
- // manages worker states and TTLs
- ww Watcher
-
- // allocate new worker
- allocator worker.Allocator
-
- // errEncoder is the default Exec error encoder
- errEncoder ErrorEncoder
-}
-
-// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
- const op = errors.Op("static_pool_initialize")
- if factory == nil {
- return nil, errors.E(op, errors.Str("no factory initialized"))
- }
- cfg.InitDefaults()
-
- if cfg.Debug {
- cfg.NumWorkers = 0
- cfg.MaxJobs = 1
- }
-
- p := &StaticPool{
- cfg: cfg,
- cmd: cmd,
- factory: factory,
- events: events.NewEventsHandler(),
- }
-
- // add pool options
- for i := 0; i < len(options); i++ {
- options[i](p)
- }
-
- // set up workers allocator
- p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
- // set up workers watcher
- p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout)
-
- // allocate requested number of workers
- workers, err := p.allocateWorkers(p.cfg.NumWorkers)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // add workers to the watcher
- err = p.ww.Watch(workers)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- p.errEncoder = defaultErrEncoder(p)
-
- // if supervised config not nil, guess, that pool wanted to be supervised
- if cfg.Supervisor != nil {
- sp := supervisorWrapper(p, p.events, p.cfg.Supervisor)
- // start watcher timer
- sp.Start()
- return sp, nil
- }
-
- return p, nil
-}
-
-func AddListeners(listeners ...events.Listener) Options {
- return func(p *StaticPool) {
- p.listeners = listeners
- for i := 0; i < len(listeners); i++ {
- p.addListener(listeners[i])
- }
- }
-}
-
-// AddListener connects event listener to the pool.
-func (sp *StaticPool) addListener(listener events.Listener) {
- sp.events.AddListener(listener)
-}
-
-// GetConfig returns associated pool configuration. Immutable.
-func (sp *StaticPool) GetConfig() interface{} {
- return sp.cfg
-}
-
-// Workers returns worker list associated with the pool.
-func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
- return sp.ww.List()
-}
-
-func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
- sp.ww.Remove(wb)
- return nil
-}
-
-// Exec executes provided payload on the worker
-func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("static_pool_exec")
- if sp.cfg.Debug {
- return sp.execDebug(p)
- }
- ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
- defer cancel()
- w, err := sp.takeWorker(ctxGetFree, op)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- rsp, err := w.(worker.SyncWorker).Exec(p)
- if err != nil {
- return sp.errEncoder(err, w)
- }
-
- // worker want's to be terminated
- if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest {
- sp.stopWorker(w)
- return sp.Exec(p)
- }
-
- if sp.cfg.MaxJobs != 0 {
- sp.checkMaxJobs(w)
- return rsp, nil
- }
- // return worker back
- sp.ww.Release(w)
- return rsp, nil
-}
-
-// Be careful, sync with pool.Exec method
-func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("static_pool_exec_with_context")
- if sp.cfg.Debug {
- return sp.execDebugWithTTL(ctx, p)
- }
-
- ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
- defer cancel()
- w, err := sp.takeWorker(ctxAlloc, op)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p)
- if err != nil {
- return sp.errEncoder(err, w)
- }
-
- // worker want's to be terminated
- if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest {
- sp.stopWorker(w)
- return sp.execWithTTL(ctx, p)
- }
-
- if sp.cfg.MaxJobs != 0 {
- sp.checkMaxJobs(w)
- return rsp, nil
- }
-
- // return worker back
- sp.ww.Release(w)
- return rsp, nil
-}
-
-func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
- const op = errors.Op("static_pool_stop_worker")
- w.State().Set(worker.StateInvalid)
- err := w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
- }
-}
-
-// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-//go:inline
-func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) {
- if w.State().NumExecs() >= sp.cfg.MaxJobs {
- w.State().Set(worker.StateMaxJobsReached)
- sp.ww.Release(w)
- return
- }
-
- sp.ww.Release(w)
-}
-
-func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
- // Get function consumes context with timeout
- w, err := sp.ww.Take(ctxGetFree)
- if err != nil {
- // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
- if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)})
- return nil, errors.E(op, err)
- }
- // else if err not nil - return error
- return nil, errors.E(op, err)
- }
- return w, nil
-}
-
-// Destroy all underlying stack (but let them complete the task).
-func (sp *StaticPool) Destroy(ctx context.Context) {
- sp.ww.Destroy(ctx)
-}
-
-func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
- const op = errors.Op("error_encoder")
- // just push event if on any stage was timeout error
- switch {
- case errors.Is(errors.ExecTTL, err):
- sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
- w.State().Set(worker.StateInvalid)
- return nil, err
-
- case errors.Is(errors.SoftJob, err):
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
-
- // if max jobs exceed
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- // mark old as invalid and stop
- w.State().Set(worker.StateInvalid)
- errS := w.Stop()
- if errS != nil {
- return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
- }
-
- return nil, err
- }
-
- // soft jobs errors are allowed, just put the worker back
- sp.ww.Release(w)
-
- return nil, err
- case errors.Is(errors.Network, err):
- // in case of network error, we can't stop the worker, we should kill it
- w.State().Set(worker.StateInvalid)
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
-
- // kill the worker instead of sending net packet to it
- _ = w.Kill()
-
- return nil, err
- default:
- w.State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- // stop the worker, worker here might be in the broken state (network)
- errS := w.Stop()
- if errS != nil {
- return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS))
- }
-
- return nil, errors.E(op, err)
- }
- }
-}
-
-func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
- return func() (worker.SyncWorker, error) {
- ctxT, cancel := context.WithTimeout(ctx, timeout)
- defer cancel()
- w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...)
- if err != nil {
- return nil, err
- }
-
- // wrap sync worker
- sw := worker.From(w)
-
- sp.events.Push(events.PoolEvent{
- Event: events.EventWorkerConstruct,
- Payload: sw,
- })
- return sw, nil
- }
-}
-
-// execDebug used when debug mode was not set and exec_ttl is 0
-func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("static_pool_exec_debug")
- sw, err := sp.allocator()
- if err != nil {
- return nil, err
- }
-
- // redirect call to the workers' exec method (without ttl)
- r, err := sw.Exec(p)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // destroy the worker
- sw.State().Set(worker.StateDestroyed)
- err = sw.Kill()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
- return nil, errors.E(op, err)
- }
-
- return r, nil
-}
-
-// execDebugWithTTL used when user set debug mode and exec_ttl
-func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
- sw, err := sp.allocator()
- if err != nil {
- return nil, err
- }
-
- // redirect call to the worker with TTL
- r, err := sw.ExecWithTTL(ctx, p)
- if stopErr := sw.Stop(); stopErr != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
- }
-
- return r, err
-}
-
-// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
- const op = errors.Op("static_pool_allocate_workers")
- workers := make([]worker.BaseProcess, 0, numWorkers)
-
- // constant number of stack simplify logic
- for i := uint64(0); i < numWorkers; i++ {
- w, err := sp.allocator()
- if err != nil {
- return nil, errors.E(op, errors.WorkerAllocate, err)
- }
-
- workers = append(workers, w)
- }
- return workers, nil
-}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
deleted file mode 100755
index cb6578a8..00000000
--- a/pkg/pool/static_pool_test.go
+++ /dev/null
@@ -1,721 +0,0 @@
-package pool
-
-import (
- "context"
- "log"
- "os/exec"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "testing"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/utils"
- "github.com/stretchr/testify/assert"
-)
-
-var cfg = &Config{
- NumWorkers: uint64(runtime.NumCPU()),
- AllocateTimeout: time.Second * 5,
- DestroyTimeout: time.Second * 5,
-}
-
-func Test_NewPool(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
- cfg,
- )
- assert.NoError(t, err)
-
- defer p.Destroy(ctx)
-
- assert.NotNil(t, p)
-}
-
-func Test_StaticPool_Invalid(t *testing.T) {
- p, err := Initialize(
- context.Background(),
- func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") },
- pipe.NewPipeFactory(),
- cfg,
- )
-
- assert.Nil(t, p)
- assert.Error(t, err)
-}
-
-func Test_ConfigNoErrorInitDefaults(t *testing.T) {
- p, err := Initialize(
- context.Background(),
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- )
-
- assert.NotNil(t, p)
- assert.NoError(t, err)
-}
-
-func Test_StaticPool_Echo(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
- cfg,
- )
- assert.NoError(t, err)
-
- defer p.Destroy(ctx)
-
- assert.NotNil(t, p)
-
- res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_StaticPool_Echo_NilContext(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
- cfg,
- )
- assert.NoError(t, err)
-
- defer p.Destroy(ctx)
-
- assert.NotNil(t, p)
-
- res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_StaticPool_Echo_Context(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") },
- pipe.NewPipeFactory(),
- cfg,
- )
- assert.NoError(t, err)
-
- defer p.Destroy(ctx)
-
- assert.NotNil(t, p)
-
- res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.Empty(t, res.Body)
- assert.NotNil(t, res.Context)
-
- assert.Equal(t, "world", string(res.Context))
-}
-
-func Test_StaticPool_JobError(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") },
- pipe.NewPipeFactory(),
- cfg,
- )
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- time.Sleep(time.Second * 2)
-
- res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
- assert.Error(t, err)
- assert.Nil(t, res)
-
- if errors.Is(errors.SoftJob, err) == false {
- t.Fatal("error should be of type errors.Exec")
- }
-
- assert.Contains(t, err.Error(), "hello")
- p.Destroy(ctx)
-}
-
-func Test_StaticPool_Broken_Replace(t *testing.T) {
- ctx := context.Background()
- block := make(chan struct{}, 10)
-
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- if wev.Event == events.EventWorkerStderr {
- e := string(wev.Payload.([]byte))
- if strings.ContainsAny(e, "undefined_function()") {
- block <- struct{}{}
- return
- }
- }
- }
- }
-
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") },
- pipe.NewPipeFactory(),
- cfg,
- AddListeners(listener),
- )
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- time.Sleep(time.Second)
- res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
- assert.Error(t, err)
- assert.Nil(t, res)
-
- <-block
-
- p.Destroy(ctx)
-}
-
-func Test_StaticPool_Broken_FromOutside(t *testing.T) {
- ctx := context.Background()
- // Run pool events
- ev := make(chan struct{}, 1)
- listener := func(event interface{}) {
- if pe, ok := event.(events.PoolEvent); ok {
- if pe.Event == events.EventWorkerConstruct {
- ev <- struct{}{}
- }
- }
- }
-
- var cfg2 = &Config{
- NumWorkers: 1,
- AllocateTimeout: time.Second * 5,
- DestroyTimeout: time.Second * 5,
- }
-
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
- cfg2,
- AddListeners(listener),
- )
- assert.NoError(t, err)
- assert.NotNil(t, p)
- defer p.Destroy(ctx)
- time.Sleep(time.Second)
-
- res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
- assert.Equal(t, 1, len(p.Workers()))
-
- // first creation
- <-ev
- // killing random worker and expecting pool to replace it
- err = p.Workers()[0].Kill()
- if err != nil {
- t.Errorf("error killing the process: error %v", err)
- }
-
- // re-creation
- <-ev
-
- list := p.Workers()
- for _, w := range list {
- assert.Equal(t, worker.StateReady, w.State().Value())
- }
-}
-
-func Test_StaticPool_AllocateTimeout(t *testing.T) {
- p, err := Initialize(
- context.Background(),
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- NumWorkers: 1,
- AllocateTimeout: time.Nanosecond * 1,
- DestroyTimeout: time.Second * 2,
- },
- )
- assert.Error(t, err)
- if !errors.Is(errors.WorkerAllocate, err) {
- t.Fatal("error should be of type WorkerAllocate")
- }
- assert.Nil(t, p)
-}
-
-func Test_StaticPool_Replace_Worker(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- NumWorkers: 1,
- MaxJobs: 1,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- )
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- defer p.Destroy(ctx)
- // prevent process is not ready
- time.Sleep(time.Second)
-
- var lastPID string
- lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
-
- res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
- assert.Equal(t, lastPID, string(res.Body))
-
- for i := 0; i < 10; i++ {
- res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.NotEqual(t, lastPID, string(res.Body))
- lastPID = string(res.Body)
- }
-}
-
-func Test_StaticPool_Debug_Worker(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- Debug: true,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- )
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- defer p.Destroy(ctx)
-
- // prevent process is not ready
- time.Sleep(time.Second)
- assert.Len(t, p.Workers(), 0)
-
- var lastPID string
- res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
- assert.NotEqual(t, lastPID, string(res.Body))
-
- assert.Len(t, p.Workers(), 0)
-
- for i := 0; i < 10; i++ {
- assert.Len(t, p.Workers(), 0)
- res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.NotEqual(t, lastPID, string(res.Body))
- lastPID = string(res.Body)
- }
-}
-
-// identical to replace but controlled on worker side
-func Test_StaticPool_Stop_Worker(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- NumWorkers: 1,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- )
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- defer p.Destroy(ctx)
- time.Sleep(time.Second)
-
- var lastPID string
- lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
-
- res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
- if err != nil {
- t.Fatal(err)
- }
- assert.Equal(t, lastPID, string(res.Body))
-
- for i := 0; i < 10; i++ {
- res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.NotEqual(t, lastPID, string(res.Body))
- lastPID = string(res.Body)
- }
-}
-
-// identical to replace but controlled on worker side
-func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- NumWorkers: 1,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- )
-
- assert.NotNil(t, p)
- assert.NoError(t, err)
-
- p.Destroy(ctx)
- _, err = p.Exec(&payload.Payload{Body: []byte("100")})
- assert.Error(t, err)
-}
-
-// identical to replace but controlled on worker side
-func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- NumWorkers: 1,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- )
-
- assert.NotNil(t, p)
- assert.NoError(t, err)
-
- go func() {
- _, errP := p.Exec(&payload.Payload{Body: []byte("100")})
- if errP != nil {
- t.Errorf("error executing payload: error %v", err)
- }
- }()
- time.Sleep(time.Millisecond * 100)
-
- p.Destroy(ctx)
- _, err = p.Exec(&payload.Payload{Body: []byte("100")})
- assert.Error(t, err)
-}
-
-// identical to replace but controlled on worker side
-func Test_Static_Pool_Handle_Dead(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- context.Background(),
- func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- NumWorkers: 5,
- AllocateTimeout: time.Second * 100,
- DestroyTimeout: time.Second,
- },
- )
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- time.Sleep(time.Second)
- for i := range p.Workers() {
- p.Workers()[i].State().Set(worker.StateErrored)
- }
-
- _, err = p.Exec(&payload.Payload{Body: []byte("hello")})
- assert.NoError(t, err)
- p.Destroy(ctx)
-}
-
-// identical to replace but controlled on worker side
-func Test_Static_Pool_Slow_Destroy(t *testing.T) {
- p, err := Initialize(
- context.Background(),
- func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- NumWorkers: 5,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- )
-
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- p.Destroy(context.Background())
-}
-
-func Test_StaticPool_NoFreeWorkers(t *testing.T) {
- ctx := context.Background()
- block := make(chan struct{}, 10)
-
- listener := func(event interface{}) {
- if ev, ok := event.(events.PoolEvent); ok {
- if ev.Event == events.EventNoFreeWorkers {
- block <- struct{}{}
- }
- }
- }
-
- p, err := Initialize(
- ctx,
- // sleep for the 3 seconds
- func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- Debug: false,
- NumWorkers: 1,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- Supervisor: nil,
- },
- AddListeners(listener),
- )
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- go func() {
- _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
- }()
-
- time.Sleep(time.Second)
- res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
- assert.Error(t, err)
- assert.Nil(t, res)
-
- <-block
-
- p.Destroy(ctx)
-}
-
-// identical to replace but controlled on worker side
-func Test_Static_Pool_WrongCommand1(t *testing.T) {
- p, err := Initialize(
- context.Background(),
- func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- NumWorkers: 5,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- )
-
- assert.Error(t, err)
- assert.Nil(t, p)
-}
-
-// identical to replace but controlled on worker side
-func Test_Static_Pool_WrongCommand2(t *testing.T) {
- p, err := Initialize(
- context.Background(),
- func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- NumWorkers: 5,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- )
-
- assert.Error(t, err)
- assert.Nil(t, p)
-}
-
-/* PTR:
-Benchmark_Pool_Echo-32 49076 29926 ns/op 8016 B/op 20 allocs/op
-Benchmark_Pool_Echo-32 47257 30779 ns/op 8047 B/op 20 allocs/op
-Benchmark_Pool_Echo-32 46737 29440 ns/op 8065 B/op 20 allocs/op
-Benchmark_Pool_Echo-32 51177 29074 ns/op 7981 B/op 20 allocs/op
-Benchmark_Pool_Echo-32 51764 28319 ns/op 8012 B/op 20 allocs/op
-Benchmark_Pool_Echo-32 54054 30714 ns/op 7987 B/op 20 allocs/op
-Benchmark_Pool_Echo-32 54391 30689 ns/op 8055 B/op 20 allocs/op
-
-VAL:
-Benchmark_Pool_Echo-32 47936 28679 ns/op 7942 B/op 19 allocs/op
-Benchmark_Pool_Echo-32 49010 29830 ns/op 7970 B/op 19 allocs/op
-Benchmark_Pool_Echo-32 46771 29031 ns/op 8014 B/op 19 allocs/op
-Benchmark_Pool_Echo-32 47760 30517 ns/op 7955 B/op 19 allocs/op
-Benchmark_Pool_Echo-32 48148 29816 ns/op 7950 B/op 19 allocs/op
-Benchmark_Pool_Echo-32 52705 29809 ns/op 7979 B/op 19 allocs/op
-Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allocs/op
-*/
-func Benchmark_Pool_Echo(b *testing.B) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
- cfg,
- )
- if err != nil {
- b.Fatal(err)
- }
-
- bd := make([]byte, 1024)
- c := make([]byte, 1024)
-
- pld := &payload.Payload{
- Context: c,
- Body: bd,
- }
-
- b.ResetTimer()
- b.ReportAllocs()
- for n := 0; n < b.N; n++ {
- if _, err := p.Exec(pld); err != nil {
- b.Fail()
- }
- }
-}
-
-// Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op
-// PTR -> Benchmark_Pool_Echo_Batched-32 406839 2900 ns/op 1059 B/op 23 allocs/op
-// PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op
-func Benchmark_Pool_Echo_Batched(b *testing.B) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- NumWorkers: uint64(runtime.NumCPU()),
- AllocateTimeout: time.Second * 100,
- DestroyTimeout: time.Second,
- },
- )
- assert.NoError(b, err)
- defer p.Destroy(ctx)
-
- bd := make([]byte, 1024)
- c := make([]byte, 1024)
-
- pld := &payload.Payload{
- Context: c,
- Body: bd,
- }
-
- b.ResetTimer()
- b.ReportAllocs()
-
- var wg sync.WaitGroup
- for i := 0; i < b.N; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- if _, err := p.Exec(pld); err != nil {
- b.Fail()
- log.Println(err)
- }
- }()
- }
-
- wg.Wait()
-}
-
-// Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op
-func Benchmark_Pool_Echo_Replaced(b *testing.B) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
- &Config{
- NumWorkers: 1,
- MaxJobs: 1,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- )
- assert.NoError(b, err)
- defer p.Destroy(ctx)
- b.ResetTimer()
- b.ReportAllocs()
-
- for n := 0; n < b.N; n++ {
- if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- log.Println(err)
- }
- }
-}
-
-// BenchmarkToStringUnsafe-12 566317729 1.91 ns/op 0 B/op 0 allocs/op
-// BenchmarkToStringUnsafe-32 1000000000 0.4434 ns/op 0 B/op 0 allocs/op
-func BenchmarkToStringUnsafe(b *testing.B) {
- testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
- b.ResetTimer()
- b.ReportAllocs()
-
- for i := 0; i < b.N; i++ {
- res := utils.AsString(testPayload)
- _ = res
- }
-}
-
-// BenchmarkToStringSafe-32 8017846 182.5 ns/op 896 B/op 1 allocs/op
-// inline BenchmarkToStringSafe-12 28926276 46.6 ns/op 128 B/op 1 allocs/op
-func BenchmarkToStringSafe(b *testing.B) {
- testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
-
- b.ResetTimer()
- b.ReportAllocs()
-
- for i := 0; i < b.N; i++ {
- res := toStringNotFun(testPayload)
- _ = res
- }
-}
-
-func toStringNotFun(data []byte) string {
- return string(data)
-}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
deleted file mode 100755
index e6b2bd7c..00000000
--- a/pkg/pool/supervisor_pool.go
+++ /dev/null
@@ -1,230 +0,0 @@
-package pool
-
-import (
- "context"
- "sync"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/state/process"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-const MB = 1024 * 1024
-
-// NSEC_IN_SEC nanoseconds in second
-const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
-
-type Supervised interface {
- Pool
- // Start used to start watching process for all pool workers
- Start()
-}
-
-type supervised struct {
- cfg *SupervisorConfig
- events events.Handler
- pool Pool
- stopCh chan struct{}
- mu *sync.RWMutex
-}
-
-func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
- sp := &supervised{
- cfg: cfg,
- events: events,
- pool: pool,
- mu: &sync.RWMutex{},
- stopCh: make(chan struct{}),
- }
-
- return sp
-}
-
-func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) {
- panic("used to satisfy pool interface")
-}
-
-func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("supervised_exec_with_context")
- if sp.cfg.ExecTTL == 0 {
- return sp.pool.Exec(rqs)
- }
-
- ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL)
- defer cancel()
-
- res, err := sp.pool.execWithTTL(ctx, rqs)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return res, nil
-}
-
-func (sp *supervised) GetConfig() interface{} {
- return sp.pool.GetConfig()
-}
-
-func (sp *supervised) Workers() (workers []worker.BaseProcess) {
- sp.mu.Lock()
- defer sp.mu.Unlock()
- return sp.pool.Workers()
-}
-
-func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
- return sp.pool.RemoveWorker(worker)
-}
-
-func (sp *supervised) Destroy(ctx context.Context) {
- sp.pool.Destroy(ctx)
-}
-
-func (sp *supervised) Start() {
- go func() {
- watchTout := time.NewTicker(sp.cfg.WatchTick)
- for {
- select {
- case <-sp.stopCh:
- watchTout.Stop()
- return
- // stop here
- case <-watchTout.C:
- sp.mu.Lock()
- sp.control()
- sp.mu.Unlock()
- }
- }
- }()
-}
-
-func (sp *supervised) Stop() {
- sp.stopCh <- struct{}{}
-}
-
-func (sp *supervised) control() { //nolint:gocognit
- now := time.Now()
-
- // MIGHT BE OUTDATED
- // It's a copy of the Workers pointers
- workers := sp.pool.Workers()
-
- for i := 0; i < len(workers); i++ {
- // if worker not in the Ready OR working state
- // skip such worker
- switch workers[i].State().Value() {
- case
- worker.StateInvalid,
- worker.StateErrored,
- worker.StateDestroyed,
- worker.StateInactive,
- worker.StateStopped,
- worker.StateStopping,
- worker.StateKilling:
- continue
- }
-
- s, err := process.WorkerProcessState(workers[i])
- if err != nil {
- // worker not longer valid for supervision
- continue
- }
-
- if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
- /*
- worker at this point might be in the middle of request execution:
-
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
- ^
- TTL Reached, state - invalid |
- -----> Worker Stopped here
- */
-
- if workers[i].State().Value() != worker.StateWorking {
- workers[i].State().Set(worker.StateInvalid)
- _ = workers[i].Stop()
- }
- // just to double check
- workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
- continue
- }
-
- if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
- /*
- worker at this point might be in the middle of request execution:
-
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
- ^
- TTL Reached, state - invalid |
- -----> Worker Stopped here
- */
-
- if workers[i].State().Value() != worker.StateWorking {
- workers[i].State().Set(worker.StateInvalid)
- _ = workers[i].Stop()
- }
- // just to double check
- workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
- continue
- }
-
- // firs we check maxWorker idle
- if sp.cfg.IdleTTL != 0 {
- // then check for the worker state
- if workers[i].State().Value() != worker.StateReady {
- continue
- }
-
- /*
- Calculate idle time
- If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
- 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
- we are guessing that worker overlap idle time and has to be killed
- */
-
- // 1610530005534416045 lu
- // lu - now = -7811150814 - nanoseconds
- // 7.8 seconds
- // get last used unix nano
- lu := workers[i].State().LastUsed()
- // worker not used, skip
- if lu == 0 {
- continue
- }
-
- // convert last used to unixNano and sub time.now to seconds
- // negative number, because lu always in the past, except for the `back to the future` :)
- res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1
-
- // maxWorkerIdle more than diff between now and last used
- // for example:
- // After exec worker goes to the rest
- // And resting for the 5 seconds
- // IdleTTL is 1 second.
- // After the control check, res will be 5, idle is 1
- // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
- if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
- /*
- worker at this point might be in the middle of request execution:
-
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
- ^
- TTL Reached, state - invalid |
- -----> Worker Stopped here
- */
-
- if workers[i].State().Value() != worker.StateWorking {
- workers[i].State().Set(worker.StateInvalid)
- _ = workers[i].Stop()
- }
- // just to double-check
- workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
- }
- }
- }
-}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
deleted file mode 100644
index 14df513e..00000000
--- a/pkg/pool/supervisor_test.go
+++ /dev/null
@@ -1,413 +0,0 @@
-package pool
-
-import (
- "context"
- "os"
- "os/exec"
- "testing"
- "time"
-
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
-)
-
-var cfgSupervised = &Config{
- NumWorkers: uint64(1),
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- Supervisor: &SupervisorConfig{
- WatchTick: 1 * time.Second,
- TTL: 100 * time.Second,
- IdleTTL: 100 * time.Second,
- ExecTTL: 100 * time.Second,
- MaxWorkerMemory: 100,
- },
-}
-
-func TestSupervisedPool_Exec(t *testing.T) {
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
- pipe.NewPipeFactory(),
- cfgSupervised,
- )
-
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- time.Sleep(time.Second)
-
- pidBefore := p.Workers()[0].Pid()
-
- for i := 0; i < 100; i++ {
- time.Sleep(time.Millisecond * 100)
- _, err = p.Exec(&payload.Payload{
- Context: []byte(""),
- Body: []byte("foo"),
- })
- assert.NoError(t, err)
- }
-
- assert.NotEqual(t, pidBefore, p.Workers()[0].Pid())
-
- p.Destroy(context.Background())
-}
-
-// This test should finish without freezes
-func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
- var cfgSupervised = cfgSupervised
- cfgSupervised.Debug = true
-
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/supervised.php") },
- pipe.NewPipeFactory(),
- cfgSupervised,
- )
-
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- time.Sleep(time.Second)
-
- for i := 0; i < 100; i++ {
- time.Sleep(time.Millisecond * 500)
- _, err = p.Exec(&payload.Payload{
- Context: []byte(""),
- Body: []byte("foo"),
- })
- assert.NoError(t, err)
- }
-
- p.Destroy(context.Background())
-}
-
-func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
- var cfgExecTTL = &Config{
- NumWorkers: uint64(1),
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- Supervisor: &SupervisorConfig{
- WatchTick: 1 * time.Second,
- TTL: 100 * time.Second,
- IdleTTL: 100 * time.Second,
- ExecTTL: 1 * time.Second,
- MaxWorkerMemory: 100,
- },
- }
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
- pipe.NewPipeFactory(),
- cfgExecTTL,
- )
-
- assert.NoError(t, err)
- assert.NotNil(t, p)
- defer p.Destroy(context.Background())
-
- pid := p.Workers()[0].Pid()
-
- resp, err := p.Exec(&payload.Payload{
- Context: []byte(""),
- Body: []byte("foo"),
- })
-
- assert.Error(t, err)
- assert.Empty(t, resp)
-
- time.Sleep(time.Second * 1)
- // should be new worker with new pid
- assert.NotEqual(t, pid, p.Workers()[0].Pid())
-}
-
-func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
- var cfgExecTTL = &Config{
- NumWorkers: uint64(1),
- Supervisor: &SupervisorConfig{
- WatchTick: 1 * time.Second,
- TTL: 5 * time.Second,
- },
- }
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") },
- pipe.NewPipeFactory(),
- cfgExecTTL,
- )
-
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- pid := p.Workers()[0].Pid()
-
- resp, err := p.Exec(&payload.Payload{
- Context: []byte(""),
- Body: []byte("foo"),
- })
-
- assert.NoError(t, err)
- assert.Equal(t, string(resp.Body), "hello world")
- assert.Empty(t, resp.Context)
-
- time.Sleep(time.Second)
- assert.NotEqual(t, pid, p.Workers()[0].Pid())
- require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
- pid = p.Workers()[0].Pid()
-
- resp, err = p.Exec(&payload.Payload{
- Context: []byte(""),
- Body: []byte("foo"),
- })
-
- assert.NoError(t, err)
- assert.Equal(t, string(resp.Body), "hello world")
- assert.Empty(t, resp.Context)
-
- time.Sleep(time.Second)
- // should be new worker with new pid
- assert.NotEqual(t, pid, p.Workers()[0].Pid())
- require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
-
- p.Destroy(context.Background())
-}
-
-func TestSupervisedPool_Idle(t *testing.T) {
- var cfgExecTTL = &Config{
- NumWorkers: uint64(1),
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- Supervisor: &SupervisorConfig{
- WatchTick: 1 * time.Second,
- TTL: 100 * time.Second,
- IdleTTL: 1 * time.Second,
- ExecTTL: 100 * time.Second,
- MaxWorkerMemory: 100,
- },
- }
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/idle.php", "pipes") },
- pipe.NewPipeFactory(),
- cfgExecTTL,
- )
-
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- pid := p.Workers()[0].Pid()
-
- resp, err := p.Exec(&payload.Payload{
- Context: []byte(""),
- Body: []byte("foo"),
- })
-
- assert.Nil(t, err)
- assert.Empty(t, resp.Body)
- assert.Empty(t, resp.Context)
-
- time.Sleep(time.Second * 5)
-
- // worker should be marked as invalid and reallocated
- _, err = p.Exec(&payload.Payload{
- Context: []byte(""),
- Body: []byte("foo"),
- })
- assert.NoError(t, err)
- // should be new worker with new pid
- assert.NotEqual(t, pid, p.Workers()[0].Pid())
- p.Destroy(context.Background())
-}
-
-func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
- var cfgExecTTL = &Config{
- NumWorkers: uint64(1),
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- Supervisor: &SupervisorConfig{
- WatchTick: 1 * time.Second,
- TTL: 1 * time.Second,
- IdleTTL: 1 * time.Second,
- MaxWorkerMemory: 100,
- },
- }
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") },
- pipe.NewPipeFactory(),
- cfgExecTTL,
- )
-
- assert.NoError(t, err)
- assert.NotNil(t, p)
- defer p.Destroy(context.Background())
-
- pid := p.Workers()[0].Pid()
-
- time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(&payload.Payload{
- Context: []byte(""),
- Body: []byte("foo"),
- })
-
- assert.NoError(t, err)
- assert.Empty(t, resp.Body)
- assert.Empty(t, resp.Context)
-
- time.Sleep(time.Second * 2)
- // should be destroyed, state should be Ready, not Invalid
- assert.NotEqual(t, pid, p.Workers()[0].Pid())
- assert.Equal(t, int64(1), p.Workers()[0].State().Value())
-}
-
-func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
- var cfgExecTTL = &Config{
- NumWorkers: uint64(1),
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- Supervisor: &SupervisorConfig{
- WatchTick: 1 * time.Second,
- TTL: 100 * time.Second,
- IdleTTL: 100 * time.Second,
- ExecTTL: 4 * time.Second,
- MaxWorkerMemory: 100,
- },
- }
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") },
- pipe.NewPipeFactory(),
- cfgExecTTL,
- )
-
- assert.NoError(t, err)
- assert.NotNil(t, p)
- defer p.Destroy(context.Background())
-
- pid := p.Workers()[0].Pid()
-
- time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(&payload.Payload{
- Context: []byte(""),
- Body: []byte("foo"),
- })
-
- assert.NoError(t, err)
- assert.Empty(t, resp.Body)
- assert.Empty(t, resp.Context)
-
- time.Sleep(time.Second * 1)
- // should be the same pid
- assert.Equal(t, pid, p.Workers()[0].Pid())
-}
-
-func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
- var cfgExecTTL = &Config{
- NumWorkers: uint64(1),
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- Supervisor: &SupervisorConfig{
- WatchTick: 1 * time.Second,
- TTL: 100 * time.Second,
- IdleTTL: 100 * time.Second,
- ExecTTL: 4 * time.Second,
- MaxWorkerMemory: 1,
- },
- }
-
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.PoolEvent); ok {
- if ev.Event == events.EventMaxMemory {
- block <- struct{}{}
- }
- }
- }
-
- // constructed
- // max memory
- // constructed
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
- pipe.NewPipeFactory(),
- cfgExecTTL,
- AddListeners(listener),
- )
-
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
- resp, err := p.Exec(&payload.Payload{
- Context: []byte(""),
- Body: []byte("foo"),
- })
-
- assert.NoError(t, err)
- assert.Empty(t, resp.Body)
- assert.Empty(t, resp.Context)
-
- <-block
- p.Destroy(context.Background())
-}
-
-func TestSupervisedPool_AllocateFailedOK(t *testing.T) {
- var cfgExecTTL = &Config{
- NumWorkers: uint64(2),
- AllocateTimeout: time.Second * 15,
- DestroyTimeout: time.Second * 5,
- Supervisor: &SupervisorConfig{
- WatchTick: 1 * time.Second,
- TTL: 5 * time.Second,
- },
- }
-
- ctx := context.Background()
- p, err := Initialize(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") },
- pipe.NewPipeFactory(),
- cfgExecTTL,
- )
-
- assert.NoError(t, err)
- require.NotNil(t, p)
-
- time.Sleep(time.Second)
-
- // should be ok
- _, err = p.Exec(&payload.Payload{
- Context: []byte(""),
- Body: []byte("foo"),
- })
-
- require.NoError(t, err)
-
- // after creating this file, PHP will fail
- file, err := os.Create("break")
- require.NoError(t, err)
-
- time.Sleep(time.Second * 5)
- assert.NoError(t, file.Close())
- assert.NoError(t, os.Remove("break"))
-
- defer func() {
- if r := recover(); r != nil {
- assert.Fail(t, "panic should not be fired!")
- } else {
- p.Destroy(context.Background())
- }
- }()
-}
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
deleted file mode 100644
index fc043927..00000000
--- a/pkg/priority_queue/binary_heap.go
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
-binary heap (min-heap) algorithm used as a core for the priority queue
-*/
-
-package priorityqueue
-
-import (
- "sync"
- "sync/atomic"
-)
-
-type BinHeap struct {
- items []Item
- // find a way to use pointer to the raw data
- len uint64
- maxLen uint64
- cond sync.Cond
-}
-
-func NewBinHeap(maxLen uint64) *BinHeap {
- return &BinHeap{
- items: make([]Item, 0, 1000),
- len: 0,
- maxLen: maxLen,
- cond: sync.Cond{L: &sync.Mutex{}},
- }
-}
-
-func (bh *BinHeap) fixUp() {
- k := bh.len - 1
- p := (k - 1) >> 1 // k-1 / 2
-
- for k > 0 {
- cur, par := (bh.items)[k], (bh.items)[p]
-
- if cur.Priority() < par.Priority() {
- bh.swap(k, p)
- k = p
- p = (k - 1) >> 1
- } else {
- return
- }
- }
-}
-
-func (bh *BinHeap) swap(i, j uint64) {
- (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i]
-}
-
-func (bh *BinHeap) fixDown(curr, end int) {
- cOneIdx := (curr << 1) + 1
- for cOneIdx <= end {
- cTwoIdx := -1
- if (curr<<1)+2 <= end {
- cTwoIdx = (curr << 1) + 2
- }
-
- idxToSwap := cOneIdx
- if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() {
- idxToSwap = cTwoIdx
- }
- if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() {
- bh.swap(uint64(curr), uint64(idxToSwap))
- curr = idxToSwap
- cOneIdx = (curr << 1) + 1
- } else {
- return
- }
- }
-}
-
-func (bh *BinHeap) Len() uint64 {
- return atomic.LoadUint64(&bh.len)
-}
-
-func (bh *BinHeap) Insert(item Item) {
- bh.cond.L.Lock()
-
- // check the binary heap len before insertion
- if bh.Len() > bh.maxLen {
- // unlock the mutex to proceed to get-max
- bh.cond.L.Unlock()
-
- // signal waiting goroutines
- for bh.Len() > 0 {
- // signal waiting goroutines
- bh.cond.Signal()
- }
- // lock mutex to proceed inserting into the empty slice
- bh.cond.L.Lock()
- }
-
- bh.items = append(bh.items, item)
-
- // add len to the slice
- atomic.AddUint64(&bh.len, 1)
-
- // fix binary heap up
- bh.fixUp()
- bh.cond.L.Unlock()
-
- // signal the goroutine on wait
- bh.cond.Signal()
-}
-
-func (bh *BinHeap) ExtractMin() Item {
- bh.cond.L.Lock()
-
- // if len == 0, wait for the signal
- for bh.Len() == 0 {
- bh.cond.Wait()
- }
-
- bh.swap(0, bh.len-1)
-
- item := (bh.items)[int(bh.len)-1]
- bh.items = (bh).items[0 : int(bh.len)-1]
- bh.fixDown(0, int(bh.len-2))
-
- // reduce len
- atomic.AddUint64(&bh.len, ^uint64(0))
-
- bh.cond.L.Unlock()
- return item
-}
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
deleted file mode 100644
index ab0f9266..00000000
--- a/pkg/priority_queue/binary_heap_test.go
+++ /dev/null
@@ -1,154 +0,0 @@
-package priorityqueue
-
-import (
- "fmt"
- "math/rand"
- "sync/atomic"
- "testing"
- "time"
-
- "github.com/stretchr/testify/require"
-)
-
-type Test int
-
-func (t Test) Ack() error {
- return nil
-}
-
-func (t Test) Nack() error {
- return nil
-}
-
-func (t Test) Requeue(_ map[string][]string, _ int64) error {
- return nil
-}
-
-func (t Test) Body() []byte {
- return nil
-}
-
-func (t Test) Context() ([]byte, error) {
- return nil, nil
-}
-
-func (t Test) ID() string {
- return "none"
-}
-
-func (t Test) Priority() int64 {
- return int64(t)
-}
-
-func TestBinHeap_Init(t *testing.T) {
- a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
-
- bh := NewBinHeap(12)
-
- for i := 0; i < len(a); i++ {
- bh.Insert(a[i])
- }
-
- expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
-
- res := make([]Item, 0, 12)
-
- for i := 0; i < 11; i++ {
- item := bh.ExtractMin()
- res = append(res, item)
- }
-
- require.Equal(t, expected, res)
-}
-
-func TestBinHeap_MaxLen(t *testing.T) {
- a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
-
- bh := NewBinHeap(1)
-
- go func() {
- expected := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
-
- res := make([]Item, 0, 12)
-
- for i := 0; i < 11; i++ {
- item := bh.ExtractMin()
- res = append(res, item)
- }
- require.Equal(t, expected, res)
- return
- }()
-
- time.Sleep(time.Second)
- for i := 0; i < len(a); i++ {
- bh.Insert(a[i])
- }
-
- time.Sleep(time.Second)
-}
-
-func TestNewPriorityQueue(t *testing.T) {
- insertsPerSec := uint64(0)
- getPerSec := uint64(0)
- stopCh := make(chan struct{}, 1)
- pq := NewBinHeap(1000)
-
- go func() {
- tt3 := time.NewTicker(time.Millisecond * 10)
- for {
- select {
- case <-tt3.C:
- require.Less(t, pq.Len(), uint64(1002))
- case <-stopCh:
- return
- }
- }
- }()
-
- go func() {
- tt := time.NewTicker(time.Second)
-
- for {
- select {
- case <-tt.C:
- fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec)))
- atomic.StoreUint64(&insertsPerSec, 0)
- fmt.Println(fmt.Sprintf("ExtractMin per second: %d", atomic.LoadUint64(&getPerSec)))
- atomic.StoreUint64(&getPerSec, 0)
- case <-stopCh:
- tt.Stop()
- return
- }
- }
- }()
-
- go func() {
- for {
- select {
- case <-stopCh:
- return
- default:
- pq.ExtractMin()
- atomic.AddUint64(&getPerSec, 1)
- }
- }
- }()
-
- go func() {
- for {
- select {
- case <-stopCh:
- return
- default:
- pq.Insert(Test(rand.Int())) //nolint:gosec
- atomic.AddUint64(&insertsPerSec, 1)
- }
- }
- }()
-
- time.Sleep(time.Second * 5)
- stopCh <- struct{}{}
- stopCh <- struct{}{}
- stopCh <- struct{}{}
- stopCh <- struct{}{}
-}
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
deleted file mode 100644
index 9efa4652..00000000
--- a/pkg/priority_queue/interface.go
+++ /dev/null
@@ -1,31 +0,0 @@
-package priorityqueue
-
-type Queue interface {
- Insert(item Item)
- ExtractMin() Item
- Len() uint64
-}
-
-// Item represents binary heap item
-type Item interface {
- // ID is a unique item identifier
- ID() string
-
- // Priority returns the Item's priority to sort
- Priority() int64
-
- // Body is the Item payload
- Body() []byte
-
- // Context is the Item meta information
- Context() ([]byte, error)
-
- // Ack - acknowledge the Item after processing
- Ack() error
-
- // Nack - discard the Item
- Nack() error
-
- // Requeue - put the message back to the queue with the optional delay
- Requeue(headers map[string][]string, delay int64) error
-}
diff --git a/pkg/state/job/state.go b/pkg/state/job/state.go
deleted file mode 100644
index 56050084..00000000
--- a/pkg/state/job/state.go
+++ /dev/null
@@ -1,19 +0,0 @@
-package job
-
-// State represents job's state
-type State struct {
- // Pipeline name
- Pipeline string
- // Driver name
- Driver string
- // Queue name (tube for the beanstalk)
- Queue string
- // Active jobs which are consumed from the driver but not handled by the PHP worker yet
- Active int64
- // Delayed jobs
- Delayed int64
- // Reserved jobs which are in the driver but not consumed yet
- Reserved int64
- // Status - 1 Ready, 0 - Paused
- Ready bool
-}
diff --git a/pkg/state/process/state.go b/pkg/state/process/state.go
deleted file mode 100644
index bfc3a287..00000000
--- a/pkg/state/process/state.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package process
-
-import (
- "github.com/shirou/gopsutil/process"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-// State provides information about specific worker.
-type State struct {
- // Pid contains process id.
- Pid int `json:"pid"`
-
- // Status of the worker.
- Status string `json:"status"`
-
- // Number of worker executions.
- NumJobs uint64 `json:"numExecs"`
-
- // Created is unix nano timestamp of worker creation time.
- Created int64 `json:"created"`
-
- // MemoryUsage holds the information about worker memory usage in bytes.
- // Values might vary for different operating systems and based on RSS.
- MemoryUsage uint64 `json:"memoryUsage"`
-
- // CPU_Percent returns how many percent of the CPU time this process uses
- CPUPercent float64
-
- // Command used in the service plugin and shows a command for the particular service
- Command string
-}
-
-// WorkerProcessState creates new worker state definition.
-func WorkerProcessState(w worker.BaseProcess) (*State, error) {
- const op = errors.Op("worker_process_state")
- p, _ := process.NewProcess(int32(w.Pid()))
- i, err := p.MemoryInfo()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- percent, err := p.CPUPercent()
- if err != nil {
- return nil, err
- }
-
- return &State{
- CPUPercent: percent,
- Pid: int(w.Pid()),
- Status: w.State().String(),
- NumJobs: w.State().NumExecs(),
- Created: w.Created().UnixNano(),
- MemoryUsage: i.RSS,
- }, nil
-}
-
-func GeneralProcessState(pid int, command string) (State, error) {
- const op = errors.Op("process_state")
- p, _ := process.NewProcess(int32(pid))
- i, err := p.MemoryInfo()
- if err != nil {
- return State{}, errors.E(op, err)
- }
- percent, err := p.CPUPercent()
- if err != nil {
- return State{}, err
- }
-
- return State{
- CPUPercent: percent,
- Pid: pid,
- MemoryUsage: i.RSS,
- Command: command,
- }, nil
-}
diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go
deleted file mode 100644
index 1b072378..00000000
--- a/pkg/transport/interface.go
+++ /dev/null
@@ -1,21 +0,0 @@
-package transport
-
-import (
- "context"
- "os/exec"
-
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-// Factory is responsible for wrapping given command into tasks WorkerProcess.
-type Factory interface {
- // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context.
- // Process must not be started.
- SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error)
- // SpawnWorker creates new WorkerProcess process based on given command.
- // Process must not be started.
- SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error)
- // Close the factory and underlying connections.
- Close() error
-}
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
deleted file mode 100755
index 9433a510..00000000
--- a/pkg/transport/pipe/pipe_factory.go
+++ /dev/null
@@ -1,197 +0,0 @@
-package pipe
-
-import (
- "context"
- "os/exec"
-
- "github.com/spiral/errors"
- "github.com/spiral/goridge/v3/pkg/pipe"
- "github.com/spiral/roadrunner/v2/internal"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "go.uber.org/multierr"
-)
-
-// Factory connects to stack using standard
-// streams (STDIN, STDOUT pipes).
-type Factory struct{}
-
-// NewPipeFactory returns new factory instance and starts
-// listening
-func NewPipeFactory() *Factory {
- return &Factory{}
-}
-
-type sr struct {
- w *worker.Process
- err error
-}
-
-// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
-// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit
- spCh := make(chan sr)
- const op = errors.Op("factory_spawn_worker_with_timeout")
- go func() {
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
- if err != nil {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- return
- }
- }
-
- in, err := cmd.StdoutPipe()
- if err != nil {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- return
- }
- }
-
- out, err := cmd.StdinPipe()
- if err != nil {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- return
- }
- }
-
- // Init new PIPE relay
- relay := pipe.NewPipeRelay(in, out)
- w.AttachRelay(relay)
-
- // Start the worker
- err = w.Start()
- if err != nil {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- return
- }
- }
-
- pid, err := internal.FetchPID(relay)
- if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- _ = w.Kill()
- return
- }
- }
-
- if pid != w.Pid() {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())),
- }:
- return
- default:
- _ = w.Kill()
- return
- }
- }
-
- select {
- case
- // return worker
- spCh <- sr{
- w: w,
- err: nil,
- }:
- // everything ok, set ready state
- w.State().Set(worker.StateReady)
- return
- default:
- _ = w.Kill()
- return
- }
- }()
-
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case res := <-spCh:
- if res.err != nil {
- return nil, res.err
- }
- return res.w, nil
- }
-}
-
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- const op = errors.Op("factory_spawn_worker")
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- in, err := cmd.StdoutPipe()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- out, err := cmd.StdinPipe()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // Init new PIPE relay
- relay := pipe.NewPipeRelay(in, out)
- w.AttachRelay(relay)
-
- // Start the worker
- err = w.Start()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // errors bundle
- if pid, err := internal.FetchPID(relay); pid != w.Pid() {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
- return nil, errors.E(op, err)
- }
-
- // everything ok, set ready state
- w.State().Set(worker.StateReady)
- return w, nil
-}
-
-// Close the factory.
-func (f *Factory) Close() error {
- return nil
-}
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
deleted file mode 100644
index f5e9669b..00000000
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ /dev/null
@@ -1,461 +0,0 @@
-package pipe
-
-import (
- "os/exec"
- "strings"
- "sync"
- "testing"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/stretchr/testify/assert"
-)
-
-func Test_GetState2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- assert.Equal(t, worker.StateStopped, w.State().Value())
- }()
-
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- assert.Equal(t, worker.StateReady, w.State().Value())
- assert.NoError(t, w.Stop())
-}
-
-func Test_Kill2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorker(cmd)
- wg := &sync.WaitGroup{}
- wg.Add(1)
- go func() {
- defer wg.Done()
- assert.Error(t, w.Wait())
- assert.Equal(t, worker.StateErrored, w.State().Value())
- }()
-
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- assert.Equal(t, worker.StateReady, w.State().Value())
- err = w.Kill()
- if err != nil {
- t.Errorf("error killing the Process: error %v", err)
- }
- wg.Wait()
-}
-
-func Test_Pipe_Start2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorker(cmd)
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
- assert.NoError(t, w.Stop())
-}
-
-func Test_Pipe_StartError2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- err := cmd.Start()
- if err != nil {
- t.Errorf("error running the command: error %v", err)
- }
-
- w, err := NewPipeFactory().SpawnWorker(cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_PipeError3(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- _, err := cmd.StdinPipe()
- if err != nil {
- t.Errorf("error creating the STDIN pipe: error %v", err)
- }
-
- w, err := NewPipeFactory().SpawnWorker(cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_PipeError4(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- _, err := cmd.StdinPipe()
- if err != nil {
- t.Errorf("error creating the STDIN pipe: error %v", err)
- }
-
- w, err := NewPipeFactory().SpawnWorker(cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_Failboot2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
- w, err := NewPipeFactory().SpawnWorker(cmd, listener)
-
- assert.Nil(t, w)
- assert.Error(t, err)
- <-finish
-}
-
-func Test_Pipe_Invalid2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/invalid.php")
- w, err := NewPipeFactory().SpawnWorker(cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_Echo2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_Pipe_Broken2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.Error(t, err)
- assert.Nil(t, res)
-}
-
-func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
- f := NewPipeFactory()
- for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- w, _ := f.SpawnWorker(cmd)
- go func() {
- if w.Wait() != nil {
- b.Fail()
- }
- }()
-
- err := w.Stop()
- if err != nil {
- b.Errorf("error stopping the worker: error %v", err)
- }
- }
-}
-
-func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- sw := worker.From(w)
-
- b.ReportAllocs()
- b.ResetTimer()
- go func() {
- err := w.Wait()
- if err != nil {
- b.Errorf("error waiting the worker: error %v", err)
- }
- }()
- defer func() {
- err := w.Stop()
- if err != nil {
- b.Errorf("error stopping the worker: error %v", err)
- }
- }()
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
-
-func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
- if err != nil {
- b.Fatal(err)
- }
-
- defer func() {
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
-
-func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
- if err != nil {
- b.Fatal(err)
- }
-
- defer func() {
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
-
-func Test_Echo2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorker(cmd)
- if err != nil {
- t.Fatal(err)
- }
-
- sw := worker.From(w)
-
- go func() {
- assert.NoError(t, sw.Wait())
- }()
- defer func() {
- err = sw.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.Nil(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_BadPayload2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
-
- sw := worker.From(w)
-
- go func() {
- assert.NoError(t, sw.Wait())
- }()
- defer func() {
- err := sw.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- res, err := sw.Exec(&payload.Payload{})
-
- assert.Error(t, err)
- assert.Nil(t, res)
-
- assert.Contains(t, err.Error(), "payload can not be empty")
-}
-
-func Test_String2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes")
- assert.Contains(t, w.String(), "ready")
- assert.Contains(t, w.String(), "numExecs: 0")
-}
-
-func Test_Echo_Slow2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.Nil(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_Broken2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
- data := ""
- mu := &sync.Mutex{}
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- mu.Lock()
- data = string(wev.Payload.([]byte))
- mu.Unlock()
- }
- }
-
- w, err := NewPipeFactory().SpawnWorker(cmd, listener)
- if err != nil {
- t.Fatal(err)
- }
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
- assert.NotNil(t, err)
- assert.Nil(t, res)
-
- time.Sleep(time.Second * 3)
- mu.Lock()
- if strings.ContainsAny(data, "undefined_function()") == false {
- t.Fail()
- }
- mu.Unlock()
- assert.Error(t, w.Stop())
-}
-
-func Test_Error2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
- defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
- assert.NotNil(t, err)
- assert.Nil(t, res)
-
- if errors.Is(errors.SoftJob, err) == false {
- t.Fatal("error should be of type errors.ErrSoftJob")
- }
- assert.Contains(t, err.Error(), "hello")
-}
-
-func Test_NumExecs2(t *testing.T) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- _, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
- if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
- }
- assert.Equal(t, uint64(1), w.State().NumExecs())
-
- _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
- if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
- }
- assert.Equal(t, uint64(2), w.State().NumExecs())
-
- _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
- if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
- }
- assert.Equal(t, uint64(3), w.State().NumExecs())
-}
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
deleted file mode 100755
index e396fe57..00000000
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ /dev/null
@@ -1,503 +0,0 @@
-package pipe
-
-import (
- "context"
- "os/exec"
- "strings"
- "sync"
- "testing"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/stretchr/testify/assert"
-)
-
-func Test_GetState(t *testing.T) {
- t.Parallel()
- ctx := context.Background()
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- go func() {
- assert.NoError(t, w.Wait())
- assert.Equal(t, worker.StateStopped, w.State().Value())
- }()
-
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- assert.Equal(t, worker.StateReady, w.State().Value())
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
-}
-
-func Test_Kill(t *testing.T) {
- t.Parallel()
- ctx := context.Background()
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- wg := &sync.WaitGroup{}
- wg.Add(1)
- go func() {
- defer wg.Done()
- assert.Error(t, w.Wait())
- assert.Equal(t, worker.StateErrored, w.State().Value())
- }()
-
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- assert.Equal(t, worker.StateReady, w.State().Value())
- err = w.Kill()
- if err != nil {
- t.Errorf("error killing the Process: error %v", err)
- }
- wg.Wait()
-}
-
-func Test_Pipe_Start(t *testing.T) {
- t.Parallel()
- ctx := context.Background()
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
- assert.NoError(t, w.Stop())
-}
-
-func Test_Pipe_StartError(t *testing.T) {
- t.Parallel()
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- err := cmd.Start()
- if err != nil {
- t.Errorf("error running the command: error %v", err)
- }
-
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_PipeError(t *testing.T) {
- t.Parallel()
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- _, err := cmd.StdinPipe()
- if err != nil {
- t.Errorf("error creating the STDIN pipe: error %v", err)
- }
-
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_PipeError2(t *testing.T) {
- t.Parallel()
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- // error cause
- _, err := cmd.StdinPipe()
- if err != nil {
- t.Errorf("error creating the STDIN pipe: error %v", err)
- }
-
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_Failboot(t *testing.T) {
- t.Parallel()
- cmd := exec.Command("php", "../../../tests/failboot.php")
- ctx := context.Background()
-
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
-
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
-
- assert.Nil(t, w)
- assert.Error(t, err)
- <-finish
-}
-
-func Test_Pipe_Invalid(t *testing.T) {
- t.Parallel()
- cmd := exec.Command("php", "../../../tests/invalid.php")
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Pipe_Echo(t *testing.T) {
- t.Parallel()
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_Pipe_Broken(t *testing.T) {
- t.Parallel()
- cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.Error(t, err)
- assert.Nil(t, res)
-}
-
-func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
- f := NewPipeFactory()
- for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd)
- go func() {
- if w.Wait() != nil {
- b.Fail()
- }
- }()
-
- err := w.Stop()
- if err != nil {
- b.Errorf("error stopping the worker: error %v", err)
- }
- }
-}
-
-func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd)
- sw := worker.From(w)
-
- b.ReportAllocs()
- b.ResetTimer()
- go func() {
- err := w.Wait()
- if err != nil {
- b.Errorf("error waiting the worker: error %v", err)
- }
- }()
- defer func() {
- err := w.Stop()
- if err != nil {
- b.Errorf("error stopping the worker: error %v", err)
- }
- }()
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
-
-func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- b.Fatal(err)
- }
-
- defer func() {
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
-
-func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- b.Fatal(err)
- }
-
- defer func() {
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
-
-func Test_Echo(t *testing.T) {
- t.Parallel()
- ctx := context.Background()
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- t.Fatal(err)
- }
-
- sw := worker.From(w)
- go func() {
- assert.NoError(t, sw.Wait())
- }()
- defer func() {
- err = sw.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.Nil(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_BadPayload(t *testing.T) {
- t.Parallel()
- ctx := context.Background()
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
-
- sw := worker.From(w)
-
- go func() {
- assert.NoError(t, sw.Wait())
- }()
- defer func() {
- err := sw.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- res, err := sw.Exec(&payload.Payload{})
-
- assert.Error(t, err)
- assert.Nil(t, res)
-
- assert.Contains(t, err.Error(), "payload can not be empty")
-}
-
-func Test_String(t *testing.T) {
- t.Parallel()
- ctx := context.Background()
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes")
- assert.Contains(t, w.String(), "ready")
- assert.Contains(t, w.String(), "numExecs: 0")
-}
-
-func Test_Echo_Slow(t *testing.T) {
- t.Parallel()
- ctx := context.Background()
- cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10")
-
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.Nil(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_Broken(t *testing.T) {
- t.Parallel()
- ctx := context.Background()
- cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
- data := ""
- mu := &sync.Mutex{}
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- mu.Lock()
- data = string(wev.Payload.([]byte))
- mu.Unlock()
- }
- }
-
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
- if err != nil {
- t.Fatal(err)
- }
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
- assert.NotNil(t, err)
- assert.Nil(t, res)
-
- time.Sleep(time.Second * 3)
- mu.Lock()
- if strings.ContainsAny(data, "undefined_function()") == false {
- t.Fail()
- }
- mu.Unlock()
- assert.Error(t, w.Stop())
-}
-
-func Test_Error(t *testing.T) {
- t.Parallel()
- ctx := context.Background()
- cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
- defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
- assert.NotNil(t, err)
- assert.Nil(t, res)
-
- if errors.Is(errors.SoftJob, err) == false {
- t.Fatal("error should be of type errors.ErrSoftJob")
- }
- assert.Contains(t, err.Error(), "hello")
-}
-
-func Test_NumExecs(t *testing.T) {
- t.Parallel()
- ctx := context.Background()
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- _, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
- if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
- }
- assert.Equal(t, uint64(1), w.State().NumExecs())
-
- _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
- if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
- }
- assert.Equal(t, uint64(2), w.State().NumExecs())
-
- _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
- if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
- }
- assert.Equal(t, uint64(3), w.State().NumExecs())
-}
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
deleted file mode 100755
index dc2b75cf..00000000
--- a/pkg/transport/socket/socket_factory.go
+++ /dev/null
@@ -1,255 +0,0 @@
-package socket
-
-import (
- "context"
- "fmt"
- "net"
- "os/exec"
- "sync"
- "time"
-
- "github.com/shirou/gopsutil/process"
- "github.com/spiral/errors"
- "github.com/spiral/goridge/v3/pkg/relay"
- "github.com/spiral/goridge/v3/pkg/socket"
- "github.com/spiral/roadrunner/v2/internal"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-
- "go.uber.org/multierr"
- "golang.org/x/sync/errgroup"
-)
-
-// Factory connects to external stack using socket server.
-type Factory struct {
- // listens for incoming connections from underlying processes
- ls net.Listener
-
- // relay connection timeout
- tout time.Duration
-
- // sockets which are waiting for process association
- relays sync.Map
-}
-
-// NewSocketServer returns Factory attached to a given socket listener.
-// tout specifies for how long factory should serve for incoming relay connection
-func NewSocketServer(ls net.Listener, tout time.Duration) *Factory {
- f := &Factory{
- ls: ls,
- tout: tout,
- relays: sync.Map{},
- }
-
- // Be careful
- // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go
- // https://github.com/golang/go/issues/5045
- go func() {
- err := f.listen()
- // there is no logger here, use fmt
- if err != nil {
- fmt.Printf("[WARN]: socket server listen, error: %v\n", err)
- }
- }()
-
- return f
-}
-
-// blocking operation, returns an error
-func (f *Factory) listen() error {
- errGr := &errgroup.Group{}
- errGr.Go(func() error {
- for {
- conn, err := f.ls.Accept()
- if err != nil {
- return err
- }
-
- rl := socket.NewSocketRelay(conn)
- pid, err := internal.FetchPID(rl)
- if err != nil {
- return err
- }
-
- f.attachRelayToPid(pid, rl)
- }
- })
-
- return errGr.Wait()
-}
-
-type socketSpawn struct {
- w *worker.Process
- err error
-}
-
-// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- const op = errors.Op("factory_spawn_worker_with_timeout")
- c := make(chan socketSpawn)
- go func() {
- ctxT, cancel := context.WithTimeout(ctx, f.tout)
- defer cancel()
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
- if err != nil {
- select {
- case c <- socketSpawn{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- return
- }
- }
-
- err = w.Start()
- if err != nil {
- select {
- case c <- socketSpawn{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- return
- }
- }
-
- rl, err := f.findRelayWithContext(ctxT, w)
- if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
-
- select {
- // try to write result
- case c <- socketSpawn{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- // if no receivers - return
- default:
- return
- }
- }
-
- w.AttachRelay(rl)
- w.State().Set(worker.StateReady)
-
- select {
- case c <- socketSpawn{
- w: w,
- err: nil,
- }:
- return
- default:
- _ = w.Kill()
- return
- }
- }()
-
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case res := <-c:
- if res.err != nil {
- return nil, res.err
- }
-
- return res.w, nil
- }
-}
-
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- const op = errors.Op("factory_spawn_worker")
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
- if err != nil {
- return nil, err
- }
-
- err = w.Start()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- rl, err := f.findRelay(w)
- if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
- return nil, err
- }
-
- w.AttachRelay(rl)
-
- // errors bundle
- if pid, err := internal.FetchPID(rl); pid != w.Pid() {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
- return nil, errors.E(op, err)
- }
-
- w.State().Set(worker.StateReady)
-
- return w, nil
-}
-
-// Close socket factory and underlying socket connection.
-func (f *Factory) Close() error {
- return f.ls.Close()
-}
-
-// waits for Process to connect over socket and returns associated relay of timeout
-func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) {
- ticker := time.NewTicker(time.Millisecond * 10)
- for {
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case <-ticker.C:
- // check for the process exists
- _, err := process.NewProcess(int32(w.Pid()))
- if err != nil {
- return nil, err
- }
- default:
- tmp, ok := f.relays.LoadAndDelete(w.Pid())
- if !ok {
- continue
- }
- return tmp.(*socket.Relay), nil
- }
- }
-}
-
-func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) {
- const op = errors.Op("factory_find_relay")
- // poll every 1ms for the relay
- pollDone := time.NewTimer(f.tout)
- for {
- select {
- case <-pollDone.C:
- return nil, errors.E(op, errors.Str("relay timeout"))
- default:
- tmp, ok := f.relays.Load(w.Pid())
- if !ok {
- continue
- }
- return tmp.(*socket.Relay), nil
- }
- }
-}
-
-// chan to store relay associated with specific pid
-func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) {
- f.relays.Store(pid, relay)
-}
diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
deleted file mode 100644
index 905a3b6b..00000000
--- a/pkg/transport/socket/socket_factory_spawn_test.go
+++ /dev/null
@@ -1,533 +0,0 @@
-package socket
-
-import (
- "net"
- "os/exec"
- "strings"
- "sync"
- "syscall"
- "testing"
- "time"
-
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/stretchr/testify/assert"
-)
-
-func Test_Tcp_Start2(t *testing.T) {
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- errC := ls.Close()
- if errC != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
-}
-
-func Test_Tcp_StartCloseFactory2(t *testing.T) {
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
-
- f := NewSocketServer(ls, time.Minute)
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
-
- w, err := f.SpawnWorker(cmd)
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
-}
-
-func Test_Tcp_StartError2(t *testing.T) {
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- errC := ls.Close()
- if errC != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- err = cmd.Start()
- if err != nil {
- t.Errorf("error executing the command: error %v", err)
- }
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Tcp_Failboot2(t *testing.T) {
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- err3 := ls.Close()
- if err3 != nil {
- t.Errorf("error closing the listener: error %v", err3)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/failboot.php")
-
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
-
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
- assert.Nil(t, w)
- assert.Error(t, err2)
- <-finish
-}
-
-func Test_Tcp_Invalid2(t *testing.T) {
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- errC := ls.Close()
- if errC != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/invalid.php")
-
- w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Tcp_Broken2(t *testing.T) {
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- errC := ls.Close()
- if errC != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
-
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
- if err != nil {
- t.Fatal(err)
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- defer wg.Done()
- errW := w.Wait()
- assert.Error(t, errW)
- }()
-
- defer func() {
- time.Sleep(time.Second)
- err2 := w.Stop()
- // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
- assert.Error(t, err2)
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
- assert.Error(t, err)
- assert.Nil(t, res)
- wg.Wait()
- <-finish
-}
-
-func Test_Tcp_Echo2(t *testing.T) {
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- errC := ls.Close()
- if errC != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
-
- w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_Unix_Start2(t *testing.T) {
- ls, err := net.Listen("unix", "sock.unix")
- assert.NoError(t, err)
- defer func() {
- err = ls.Close()
- assert.NoError(t, err)
- }()
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
-}
-
-func Test_Unix_Failboot2(t *testing.T) {
- ls, err := net.Listen("unix", "sock.unix")
- assert.NoError(t, err)
- defer func() {
- err = ls.Close()
- assert.NoError(t, err)
- }()
-
- cmd := exec.Command("php", "../../../tests/failboot.php")
-
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
-
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
- assert.Nil(t, w)
- assert.Error(t, err)
- <-finish
-}
-
-func Test_Unix_Timeout2(t *testing.T) {
- ls, err := net.Listen("unix", "sock.unix")
- assert.NoError(t, err)
- defer func() {
- err = ls.Close()
- assert.NoError(t, err)
- }()
-
- cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0")
-
- w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd)
- assert.Nil(t, w)
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "relay timeout")
-}
-
-func Test_Unix_Invalid2(t *testing.T) {
- ls, err := net.Listen("unix", "sock.unix")
- assert.NoError(t, err)
- defer func() {
- err = ls.Close()
- assert.NoError(t, err)
- }()
-
- cmd := exec.Command("php", "../../../tests/invalid.php")
-
- w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Unix_Broken2(t *testing.T) {
- ls, err := net.Listen("unix", "sock.unix")
- assert.NoError(t, err)
- defer func() {
- errC := ls.Close()
- assert.NoError(t, errC)
- }()
-
- cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
-
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
- if err != nil {
- t.Fatal(err)
- }
- wg := &sync.WaitGroup{}
- wg.Add(1)
- go func() {
- defer wg.Done()
- errW := w.Wait()
- assert.Error(t, errW)
- }()
-
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.Error(t, err)
- assert.Nil(t, res)
- wg.Wait()
- <-finish
-}
-
-func Test_Unix_Echo2(t *testing.T) {
- ls, err := net.Listen("unix", "sock.unix")
- assert.NoError(t, err)
- defer func() {
- err = ls.Close()
- assert.NoError(t, err)
- }()
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
- if err != nil {
- t.Fatal(err)
- }
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) {
- ls, err := net.Listen("unix", "sock.unix")
- assert.NoError(b, err)
- defer func() {
- err = ls.Close()
- assert.NoError(b, err)
- }()
-
- f := NewSocketServer(ls, time.Minute)
- for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
-
- w, err := f.SpawnWorker(cmd)
- if err != nil {
- b.Fatal(err)
- }
- go func() {
- assert.NoError(b, w.Wait())
- }()
-
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }
-}
-
-func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
- ls, err := net.Listen("unix", "sock.unix")
- assert.NoError(b, err)
- defer func() {
- err = ls.Close()
- assert.NoError(b, err)
- }()
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
- if err != nil {
- b.Fatal(err)
- }
- defer func() {
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
-
-func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) {
- defer func() {
- _ = syscall.Unlink("sock.unix")
- }()
- ls, err := net.Listen("unix", "sock.unix")
- if err == nil {
- defer func() {
- errC := ls.Close()
- if errC != nil {
- b.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- b.Skip("socket is busy")
- }
-
- f := NewSocketServer(ls, time.Minute)
- for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
-
- w, err := f.SpawnWorker(cmd)
- if err != nil {
- b.Fatal(err)
- }
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }
-}
-
-func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
- defer func() {
- _ = syscall.Unlink("sock.unix")
- }()
- ls, err := net.Listen("unix", "sock.unix")
- if err == nil {
- defer func() {
- errC := ls.Close()
- if errC != nil {
- b.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- b.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
- if err != nil {
- b.Fatal(err)
- }
- defer func() {
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
deleted file mode 100755
index 17437e2f..00000000
--- a/pkg/transport/socket/socket_factory_test.go
+++ /dev/null
@@ -1,622 +0,0 @@
-package socket
-
-import (
- "context"
- "net"
- "os/exec"
- "strings"
- "sync"
- "testing"
- "time"
-
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/stretchr/testify/assert"
-)
-
-func Test_Tcp_Start(t *testing.T) {
- ctx := context.Background()
- time.Sleep(time.Millisecond * 10) // to ensure free socket
-
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
-}
-
-func Test_Tcp_StartCloseFactory(t *testing.T) {
- time.Sleep(time.Millisecond * 10) // to ensure free socket
- ctx := context.Background()
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
-
- f := NewSocketServer(ls, time.Minute)
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
-
- w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
-}
-
-func Test_Tcp_StartError(t *testing.T) {
- time.Sleep(time.Millisecond * 10) // to ensure free socket
- ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
- defer cancel()
-
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
- err = cmd.Start()
- if err != nil {
- t.Errorf("error executing the command: error %v", err)
- }
-
- serv := NewSocketServer(ls, time.Minute)
- time.Sleep(time.Second * 2)
- w, err := serv.SpawnWorkerWithTimeout(ctx, cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Tcp_Failboot(t *testing.T) {
- time.Sleep(time.Millisecond * 10) // to ensure free socket
- ctx := context.Background()
-
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- err3 := ls.Close()
- if err3 != nil {
- t.Errorf("error closing the listener: error %v", err3)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/failboot.php")
-
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
-
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
- assert.Nil(t, w)
- assert.Error(t, err2)
- <-finish
-}
-
-func Test_Tcp_Timeout(t *testing.T) {
- time.Sleep(time.Millisecond * 10) // to ensure free socket
- ctx := context.Background()
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "tcp", "200", "0")
-
- w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd)
- assert.Nil(t, w)
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "context deadline exceeded")
-}
-
-func Test_Tcp_Invalid(t *testing.T) {
- time.Sleep(time.Millisecond * 10) // to ensure free socket
- ctx := context.Background()
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/invalid.php")
-
- w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Tcp_Broken(t *testing.T) {
- time.Sleep(time.Millisecond * 10) // to ensure free socket
- ctx := context.Background()
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- errC := ls.Close()
- if errC != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
-
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
- if err != nil {
- t.Fatal(err)
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- go func() {
- defer wg.Done()
- errW := w.Wait()
- assert.Error(t, errW)
- }()
-
- defer func() {
- time.Sleep(time.Second)
- err2 := w.Stop()
- // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
- assert.Error(t, err2)
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
- assert.Error(t, err)
- assert.Nil(t, res)
- wg.Wait()
- <-finish
-}
-
-func Test_Tcp_Echo(t *testing.T) {
- time.Sleep(time.Millisecond * 10) // to ensure free socket
- ctx := context.Background()
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if assert.NoError(t, err) {
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
-
- w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_Unix_Start(t *testing.T) {
- ctx := context.Background()
- ls, err := net.Listen("unix", "sock.unix")
- if err == nil {
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
- assert.NoError(t, err)
- assert.NotNil(t, w)
-
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
-}
-
-func Test_Unix_Failboot(t *testing.T) {
- ls, err := net.Listen("unix", "sock.unix")
- ctx := context.Background()
- if err == nil {
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/failboot.php")
-
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
-
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
- assert.Nil(t, w)
- assert.Error(t, err)
- <-finish
-}
-
-func Test_Unix_Timeout(t *testing.T) {
- ls, err := net.Listen("unix", "sock.unix")
- ctx := context.Background()
- if err == nil {
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0")
-
- w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd)
- assert.Nil(t, w)
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "context deadline exceeded")
-}
-
-func Test_Unix_Invalid(t *testing.T) {
- ctx := context.Background()
- ls, err := net.Listen("unix", "sock.unix")
- if err == nil {
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/invalid.php")
-
- w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
- assert.Error(t, err)
- assert.Nil(t, w)
-}
-
-func Test_Unix_Broken(t *testing.T) {
- ctx := context.Background()
- ls, err := net.Listen("unix", "sock.unix")
- if err == nil {
- defer func() {
- errC := ls.Close()
- if errC != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
-
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- if wev.Event == events.EventWorkerStderr {
- e := string(wev.Payload.([]byte))
- if strings.ContainsAny(e, "undefined_function()") {
- block <- struct{}{}
- return
- }
- }
- }
- }
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
- if err != nil {
- t.Fatal(err)
- }
- wg := &sync.WaitGroup{}
- wg.Add(1)
- go func() {
- defer wg.Done()
- errW := w.Wait()
- assert.Error(t, errW)
- }()
-
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.Error(t, err)
- assert.Nil(t, res)
- <-block
- wg.Wait()
-}
-
-func Test_Unix_Echo(t *testing.T) {
- ctx := context.Background()
- ls, err := net.Listen("unix", "sock.unix")
- if err == nil {
- defer func() {
- err = ls.Close()
- if err != nil {
- t.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- t.Fatal(err)
- }
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Empty(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
- ctx := context.Background()
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if err == nil {
- defer func() {
- err = ls.Close()
- if err != nil {
- b.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- b.Skip("socket is busy")
- }
-
- f := NewSocketServer(ls, time.Minute)
- for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
-
- w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- b.Fatal(err)
- }
- go func() {
- assert.NoError(b, w.Wait())
- }()
-
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }
-}
-
-func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
- ctx := context.Background()
- ls, err := net.Listen("tcp", "127.0.0.1:9007")
- if err == nil {
- defer func() {
- err = ls.Close()
- if err != nil {
- b.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- b.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- b.Fatal(err)
- }
- defer func() {
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
-
-func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
- ctx := context.Background()
- ls, err := net.Listen("unix", "sock.unix")
- if err == nil {
- defer func() {
- err = ls.Close()
- if err != nil {
- b.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- b.Skip("socket is busy")
- }
-
- f := NewSocketServer(ls, time.Minute)
- for n := 0; n < b.N; n++ {
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
-
- w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- b.Fatal(err)
- }
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }
-}
-
-func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
- ctx := context.Background()
- ls, err := net.Listen("unix", "sock.unix")
- if err == nil {
- defer func() {
- err = ls.Close()
- if err != nil {
- b.Errorf("error closing the listener: error %v", err)
- }
- }()
- } else {
- b.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
-
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- b.Fatal(err)
- }
- defer func() {
- err = w.Stop()
- if err != nil {
- b.Errorf("error stopping the Process: error %v", err)
- }
- }()
-
- sw := worker.From(w)
-
- for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
- b.Fail()
- }
- }
-}
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
deleted file mode 100644
index ed8704bb..00000000
--- a/pkg/worker/interface.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package worker
-
-import (
- "context"
- "fmt"
- "time"
-
- "github.com/spiral/goridge/v3/pkg/relay"
- "github.com/spiral/roadrunner/v2/pkg/payload"
-)
-
-// State represents WorkerProcess status and updated time.
-type State interface {
- fmt.Stringer
- // Value returns StateImpl value
- Value() int64
- // Set sets the StateImpl
- Set(value int64)
- // NumExecs shows how many times WorkerProcess was invoked
- NumExecs() uint64
- // IsActive returns true if WorkerProcess not Inactive or Stopped
- IsActive() bool
- // RegisterExec using to registering php executions
- RegisterExec()
- // SetLastUsed sets worker last used time
- SetLastUsed(lu uint64)
- // LastUsed return worker last used time
- LastUsed() uint64
-}
-
-type BaseProcess interface {
- fmt.Stringer
-
- // Pid returns worker pid.
- Pid() int64
-
- // Created returns time worker was created at.
- Created() time.Time
-
- // State return receive-only WorkerProcess state object, state can be used to safely access
- // WorkerProcess status, time when status changed and number of WorkerProcess executions.
- State() State
-
- // Start used to run Cmd and immediately return
- Start() error
-
- // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
- // complete and will return process error (if any), if stderr is presented it's value
- // will be wrapped as WorkerError. Method will return error code if php process fails
- // to find or Start the script.
- Wait() error
-
- // Stop sends soft termination command to the WorkerProcess and waits for process completion.
- Stop() error
-
- // Kill kills underlying process, make sure to call Wait() func to gather
- // error log from the stderr. Does not waits for process completion!
- Kill() error
-
- // Relay returns attached to worker goridge relay
- Relay() relay.Relay
-
- // AttachRelay used to attach goridge relay to the worker process
- AttachRelay(rl relay.Relay)
-}
-
-type SyncWorker interface {
- // BaseProcess provides basic functionality for the SyncWorker
- BaseProcess
- // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
- Exec(rqs *payload.Payload) (*payload.Payload, error)
- // ExecWithTTL used to handle Exec with TTL
- ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error)
-}
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
deleted file mode 100755
index bf152e8b..00000000
--- a/pkg/worker/state.go
+++ /dev/null
@@ -1,111 +0,0 @@
-package worker
-
-import (
- "sync/atomic"
-)
-
-// SYNC WITH worker_watcher.GET
-const (
- // StateInactive - no associated process
- StateInactive int64 = iota
-
- // StateReady - ready for job.
- StateReady
-
- // StateWorking - working on given payload.
- StateWorking
-
- // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
- StateInvalid
-
- // StateStopping - process is being softly stopped.
- StateStopping
-
- // StateKilling - process is being forcibly stopped
- StateKilling
-
- // StateDestroyed State of worker, when no need to allocate new one
- StateDestroyed
-
- // StateMaxJobsReached State of worker, when it reached executions limit
- StateMaxJobsReached
-
- // StateStopped - process has been terminated.
- StateStopped
-
- // StateErrored - error StateImpl (can't be used).
- StateErrored
-)
-
-type StateImpl struct {
- value int64
- numExecs uint64
- // to be lightweight, use UnixNano
- lastUsed uint64
-}
-
-// NewWorkerState initializes a state for the sync.Worker
-func NewWorkerState(value int64) *StateImpl {
- return &StateImpl{value: value}
-}
-
-// String returns current StateImpl as string.
-func (s *StateImpl) String() string {
- switch s.Value() {
- case StateInactive:
- return "inactive"
- case StateReady:
- return "ready"
- case StateWorking:
- return "working"
- case StateInvalid:
- return "invalid"
- case StateStopping:
- return "stopping"
- case StateStopped:
- return "stopped"
- case StateKilling:
- return "killing"
- case StateErrored:
- return "errored"
- case StateDestroyed:
- return "destroyed"
- }
-
- return "undefined"
-}
-
-// NumExecs returns number of registered WorkerProcess execs.
-func (s *StateImpl) NumExecs() uint64 {
- return atomic.LoadUint64(&s.numExecs)
-}
-
-// Value StateImpl returns StateImpl value
-func (s *StateImpl) Value() int64 {
- return atomic.LoadInt64(&s.value)
-}
-
-// IsActive returns true if WorkerProcess not Inactive or Stopped
-func (s *StateImpl) IsActive() bool {
- val := s.Value()
- return val == StateWorking || val == StateReady
-}
-
-// Set change StateImpl value (status)
-func (s *StateImpl) Set(value int64) {
- atomic.StoreInt64(&s.value, value)
-}
-
-// RegisterExec register new execution atomically
-func (s *StateImpl) RegisterExec() {
- atomic.AddUint64(&s.numExecs, 1)
-}
-
-// SetLastUsed Update last used time
-func (s *StateImpl) SetLastUsed(lu uint64) {
- atomic.StoreUint64(&s.lastUsed, lu)
-}
-
-func (s *StateImpl) LastUsed() uint64 {
- return atomic.LoadUint64(&s.lastUsed)
-}
diff --git a/pkg/worker/state_test.go b/pkg/worker/state_test.go
deleted file mode 100755
index c67182d6..00000000
--- a/pkg/worker/state_test.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package worker
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_NewState(t *testing.T) {
- st := NewWorkerState(StateErrored)
-
- assert.Equal(t, "errored", st.String())
-
- assert.Equal(t, "inactive", NewWorkerState(StateInactive).String())
- assert.Equal(t, "ready", NewWorkerState(StateReady).String())
- assert.Equal(t, "working", NewWorkerState(StateWorking).String())
- assert.Equal(t, "stopped", NewWorkerState(StateStopped).String())
- assert.Equal(t, "undefined", NewWorkerState(1000).String())
-}
-
-func Test_IsActive(t *testing.T) {
- assert.False(t, NewWorkerState(StateInactive).IsActive())
- assert.True(t, NewWorkerState(StateReady).IsActive())
- assert.True(t, NewWorkerState(StateWorking).IsActive())
- assert.False(t, NewWorkerState(StateStopped).IsActive())
- assert.False(t, NewWorkerState(StateErrored).IsActive())
-}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
deleted file mode 100755
index 74e29b71..00000000
--- a/pkg/worker/sync_worker.go
+++ /dev/null
@@ -1,283 +0,0 @@
-package worker
-
-import (
- "bytes"
- "context"
- "sync"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/goridge/v3/pkg/relay"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "go.uber.org/multierr"
-)
-
-// Allocator is responsible for worker allocation in the pool
-type Allocator func() (SyncWorker, error)
-
-type SyncWorkerImpl struct {
- process *Process
- fPool sync.Pool
- bPool sync.Pool
-}
-
-// From creates SyncWorker from BaseProcess
-func From(process *Process) *SyncWorkerImpl {
- return &SyncWorkerImpl{
- process: process,
- fPool: sync.Pool{New: func() interface{} {
- return frame.NewFrame()
- }},
- bPool: sync.Pool{New: func() interface{} {
- return new(bytes.Buffer)
- }},
- }
-}
-
-// Exec payload without TTL timeout.
-func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("sync_worker_exec")
- if len(p.Body) == 0 && len(p.Context) == 0 {
- return nil, errors.E(op, errors.Str("payload can not be empty"))
- }
-
- if tw.process.State().Value() != StateReady {
- return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
- }
-
- // set last used time
- tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(StateWorking)
-
- rsp, err := tw.execPayload(p)
- if err != nil {
- // just to be more verbose
- if !errors.Is(errors.SoftJob, err) {
- tw.process.State().Set(StateErrored)
- tw.process.State().RegisterExec()
- }
- return nil, errors.E(op, err)
- }
-
- // supervisor may set state of the worker during the work
- // in this case we should not re-write the worker state
- if tw.process.State().Value() != StateWorking {
- tw.process.State().RegisterExec()
- return rsp, nil
- }
-
- tw.process.State().Set(StateReady)
- tw.process.State().RegisterExec()
-
- return rsp, nil
-}
-
-type wexec struct {
- payload *payload.Payload
- err error
-}
-
-// ExecWithTTL executes payload without TTL timeout.
-func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("sync_worker_exec_worker_with_timeout")
- c := make(chan wexec, 1)
-
- go func() {
- if len(p.Body) == 0 && len(p.Context) == 0 {
- c <- wexec{
- err: errors.E(op, errors.Str("payload can not be empty")),
- }
- return
- }
-
- if tw.process.State().Value() != StateReady {
- c <- wexec{
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
- }
- return
- }
-
- // set last used time
- tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(StateWorking)
-
- rsp, err := tw.execPayload(p)
- if err != nil {
- // just to be more verbose
- if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple
- tw.process.State().Set(StateErrored)
- tw.process.State().RegisterExec()
- }
- c <- wexec{
- err: errors.E(op, err),
- }
- return
- }
-
- if tw.process.State().Value() != StateWorking {
- tw.process.State().RegisterExec()
- c <- wexec{
- payload: rsp,
- err: nil,
- }
- return
- }
-
- tw.process.State().Set(StateReady)
- tw.process.State().RegisterExec()
-
- c <- wexec{
- payload: rsp,
- err: nil,
- }
- }()
-
- select {
- // exec TTL reached
- case <-ctx.Done():
- err := multierr.Combine(tw.Kill())
- if err != nil {
- // append timeout error
- err = multierr.Append(err, errors.E(op, errors.ExecTTL))
- return nil, multierr.Append(err, ctx.Err())
- }
- return nil, errors.E(op, errors.ExecTTL, ctx.Err())
- case res := <-c:
- if res.err != nil {
- return nil, res.err
- }
- return res.payload, nil
- }
-}
-
-func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("sync_worker_exec_payload")
-
- // get a frame
- fr := tw.getFrame()
- defer tw.putFrame(fr)
-
- // can be 0 here
- fr.WriteVersion(fr.Header(), frame.VERSION_1)
-
- // obtain a buffer
- buf := tw.get()
-
- buf.Write(p.Context)
- buf.Write(p.Body)
-
- // Context offset
- fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context)))
- fr.WritePayloadLen(fr.Header(), uint32(buf.Len()))
- fr.WritePayload(buf.Bytes())
-
- fr.WriteCRC(fr.Header())
-
- // return buffer
- tw.put(buf)
-
- err := tw.Relay().Send(fr)
- if err != nil {
- return nil, errors.E(op, errors.Network, err)
- }
-
- frameR := tw.getFrame()
- defer tw.putFrame(frameR)
-
- err = tw.process.Relay().Receive(frameR)
- if err != nil {
- return nil, errors.E(op, errors.Network, err)
- }
- if frameR == nil {
- return nil, errors.E(op, errors.Network, errors.Str("nil fr received"))
- }
-
- if !frameR.VerifyCRC(frameR.Header()) {
- return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
- }
-
- flags := frameR.ReadFlags()
-
- if flags&frame.ERROR != byte(0) {
- return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
- }
-
- options := frameR.ReadOptions(frameR.Header())
- if len(options) != 1 {
- return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
- }
-
- pld := &payload.Payload{
- Body: make([]byte, len(frameR.Payload()[options[0]:])),
- Context: make([]byte, len(frameR.Payload()[:options[0]])),
- }
-
- // by copying we free frame's payload slice
- // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool)
- // https://blog.golang.org/slices-intro#TOC_6.
- copy(pld.Body, frameR.Payload()[options[0]:])
- copy(pld.Context, frameR.Payload()[:options[0]])
-
- return pld, nil
-}
-
-func (tw *SyncWorkerImpl) String() string {
- return tw.process.String()
-}
-
-func (tw *SyncWorkerImpl) Pid() int64 {
- return tw.process.Pid()
-}
-
-func (tw *SyncWorkerImpl) Created() time.Time {
- return tw.process.Created()
-}
-
-func (tw *SyncWorkerImpl) State() State {
- return tw.process.State()
-}
-
-func (tw *SyncWorkerImpl) Start() error {
- return tw.process.Start()
-}
-
-func (tw *SyncWorkerImpl) Wait() error {
- return tw.process.Wait()
-}
-
-func (tw *SyncWorkerImpl) Stop() error {
- return tw.process.Stop()
-}
-
-func (tw *SyncWorkerImpl) Kill() error {
- return tw.process.Kill()
-}
-
-func (tw *SyncWorkerImpl) Relay() relay.Relay {
- return tw.process.Relay()
-}
-
-func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
- tw.process.AttachRelay(rl)
-}
-
-// Private
-
-func (tw *SyncWorkerImpl) get() *bytes.Buffer {
- return tw.bPool.Get().(*bytes.Buffer)
-}
-
-func (tw *SyncWorkerImpl) put(b *bytes.Buffer) {
- b.Reset()
- tw.bPool.Put(b)
-}
-
-func (tw *SyncWorkerImpl) getFrame() *frame.Frame {
- return tw.fPool.Get().(*frame.Frame)
-}
-
-func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) {
- f.Reset()
- tw.fPool.Put(f)
-}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
deleted file mode 100755
index 64580f9f..00000000
--- a/pkg/worker/sync_worker_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package worker
-
-import (
- "os/exec"
- "testing"
-
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/stretchr/testify/assert"
-)
-
-func Test_NotStarted_String(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := InitBaseWorker(cmd)
- assert.Contains(t, w.String(), "php tests/client.php echo pipes")
- assert.Contains(t, w.String(), "inactive")
- assert.Contains(t, w.String(), "numExecs: 0")
-}
-
-func Test_NotStarted_Exec(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := InitBaseWorker(cmd)
-
- sw := From(w)
-
- res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
- assert.Error(t, err)
- assert.Nil(t, res)
-
- assert.Contains(t, err.Error(), "Process is not ready (inactive)")
-}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
deleted file mode 100755
index fa74e7b5..00000000
--- a/pkg/worker/worker.go
+++ /dev/null
@@ -1,220 +0,0 @@
-package worker
-
-import (
- "fmt"
- "os"
- "os/exec"
- "strconv"
- "strings"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/goridge/v3/pkg/relay"
- "github.com/spiral/roadrunner/v2/internal"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "go.uber.org/multierr"
-)
-
-type Options func(p *Process)
-
-// Process - supervised process with api over goridge.Relay.
-type Process struct {
- // created indicates at what time Process has been created.
- created time.Time
-
- // updates parent supervisor or pool about Process events
- events events.Handler
-
- // state holds information about current Process state,
- // number of Process executions, buf status change time.
- // publicly this object is receive-only and protected using Mutex
- // and atomic counter.
- state *StateImpl
-
- // underlying command with associated process, command must be
- // provided to Process from outside in non-started form. CmdSource
- // stdErr direction will be handled by Process to aggregate error message.
- cmd *exec.Cmd
-
- // pid of the process, points to pid of underlying process and
- // can be nil while process is not started.
- pid int
-
- // communication bus with underlying process.
- relay relay.Relay
-}
-
-// InitBaseWorker creates new Process over given exec.cmd.
-func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
- if cmd.Process != nil {
- return nil, fmt.Errorf("can't attach to running process")
- }
- w := &Process{
- created: time.Now(),
- events: events.NewEventsHandler(),
- cmd: cmd,
- state: NewWorkerState(StateInactive),
- }
-
- // set self as stderr implementation (Writer interface)
- w.cmd.Stderr = w
-
- // add options
- for i := 0; i < len(options); i++ {
- options[i](w)
- }
-
- return w, nil
-}
-
-func AddListeners(listeners ...events.Listener) Options {
- return func(p *Process) {
- for i := 0; i < len(listeners); i++ {
- p.addListener(listeners[i])
- }
- }
-}
-
-// Pid returns worker pid.
-func (w *Process) Pid() int64 {
- return int64(w.pid)
-}
-
-// Created returns time worker was created at.
-func (w *Process) Created() time.Time {
- return w.created
-}
-
-// AddListener registers new worker event listener.
-func (w *Process) addListener(listener events.Listener) {
- w.events.AddListener(listener)
-}
-
-// State return receive-only Process state object, state can be used to safely access
-// Process status, time when status changed and number of Process executions.
-func (w *Process) State() State {
- return w.state
-}
-
-// AttachRelay attaches relay to the worker
-func (w *Process) AttachRelay(rl relay.Relay) {
- w.relay = rl
-}
-
-// Relay returns relay attached to the worker
-func (w *Process) Relay() relay.Relay {
- return w.relay
-}
-
-// String returns Process description. fmt.Stringer interface
-func (w *Process) String() string {
- st := w.state.String()
- // we can safely compare pid to 0
- if w.pid != 0 {
- st = st + ", pid:" + strconv.Itoa(w.pid)
- }
-
- return fmt.Sprintf(
- "(`%s` [%s], numExecs: %v)",
- strings.Join(w.cmd.Args, " "),
- st,
- w.state.NumExecs(),
- )
-}
-
-func (w *Process) Start() error {
- err := w.cmd.Start()
- if err != nil {
- return err
- }
- w.pid = w.cmd.Process.Pid
- return nil
-}
-
-// Wait must be called once for each Process, call will be released once Process is
-// complete and will return process error (if any), if stderr is presented it's value
-// will be wrapped as WorkerError. Method will return error code if php process fails
-// to find or Start the script.
-func (w *Process) Wait() error {
- const op = errors.Op("process_wait")
- var err error
- err = w.cmd.Wait()
-
- // If worker was destroyed, just exit
- if w.State().Value() == StateDestroyed {
- return nil
- }
-
- // If state is different, and err is not nil, append it to the errors
- if err != nil {
- w.State().Set(StateErrored)
- err = multierr.Combine(err, errors.E(op, err))
- }
-
- // closeRelay
- // at this point according to the documentation (see cmd.Wait comment)
- // if worker finishes with an error, message will be written to the stderr first
- // and then process.cmd.Wait return an error
- err2 := w.closeRelay()
- if err2 != nil {
- w.State().Set(StateErrored)
- return multierr.Append(err, errors.E(op, err2))
- }
-
- if w.cmd.ProcessState.Success() {
- w.State().Set(StateStopped)
- return nil
- }
-
- return err
-}
-
-func (w *Process) closeRelay() error {
- if w.relay != nil {
- err := w.relay.Close()
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-// Stop sends soft termination command to the Process and waits for process completion.
-func (w *Process) Stop() error {
- const op = errors.Op("process_stop")
- w.state.Set(StateStopping)
- err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true})
- if err != nil {
- w.state.Set(StateKilling)
- _ = w.cmd.Process.Signal(os.Kill)
- return errors.E(op, errors.Network, err)
- }
- w.state.Set(StateStopped)
- return nil
-}
-
-// Kill kills underlying process, make sure to call Wait() func to gather
-// error log from the stderr. Does not wait for process completion!
-func (w *Process) Kill() error {
- if w.State().Value() == StateDestroyed {
- err := w.cmd.Process.Signal(os.Kill)
- if err != nil {
- return err
- }
- return nil
- }
-
- w.state.Set(StateKilling)
- err := w.cmd.Process.Signal(os.Kill)
- if err != nil {
- return err
- }
- w.state.Set(StateStopped)
- return nil
-}
-
-// Worker stderr
-func (w *Process) Write(p []byte) (n int, err error) {
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p})
- return len(p), nil
-}
diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go
deleted file mode 100755
index 805f66b5..00000000
--- a/pkg/worker/worker_test.go
+++ /dev/null
@@ -1,19 +0,0 @@
-package worker
-
-import (
- "os/exec"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_OnStarted(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
- assert.Nil(t, cmd.Start())
-
- w, err := InitBaseWorker(cmd)
- assert.Nil(t, w)
- assert.NotNil(t, err)
-
- assert.Equal(t, "can't attach to running process", err.Error())
-}
diff --git a/pkg/worker_handler/constants.go b/pkg/worker_handler/constants.go
deleted file mode 100644
index 3355d9c2..00000000
--- a/pkg/worker_handler/constants.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package handler
-
-import "net/http"
-
-var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push")
-
-// TrailerHeaderKey http header key
-var TrailerHeaderKey = http.CanonicalHeaderKey("trailer")
diff --git a/pkg/worker_handler/errors.go b/pkg/worker_handler/errors.go
deleted file mode 100644
index c3352a52..00000000
--- a/pkg/worker_handler/errors.go
+++ /dev/null
@@ -1,26 +0,0 @@
-//go:build !windows
-// +build !windows
-
-package handler
-
-import (
- "errors"
- "net"
- "os"
- "syscall"
-)
-
-// Broken pipe
-var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer")
-
-// handleWriteError just check if error was caused by aborted connection on linux
-func handleWriteError(err error) error {
- if netErr, ok2 := err.(*net.OpError); ok2 {
- if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
- if errors.Is(syscallErr.Err, syscall.EPIPE) {
- return errEPIPE
- }
- }
- }
- return err
-}
diff --git a/pkg/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go
deleted file mode 100644
index 3c6c2186..00000000
--- a/pkg/worker_handler/errors_windows.go
+++ /dev/null
@@ -1,28 +0,0 @@
-//go:build windows
-// +build windows
-
-package handler
-
-import (
- "errors"
- "net"
- "os"
- "syscall"
-)
-
-//Software caused connection abort.
-//An established connection was aborted by the software in your host computer,
-//possibly due to a data transmission time-out or protocol error.
-var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer")
-
-// handleWriteError just check if error was caused by aborted connection on windows
-func handleWriteError(err error) error {
- if netErr, ok2 := err.(*net.OpError); ok2 {
- if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
- if syscallErr.Err == syscall.WSAECONNABORTED {
- return errEPIPE
- }
- }
- }
- return err
-}
diff --git a/pkg/worker_handler/handler.go b/pkg/worker_handler/handler.go
deleted file mode 100644
index fc03563b..00000000
--- a/pkg/worker_handler/handler.go
+++ /dev/null
@@ -1,246 +0,0 @@
-package handler
-
-import (
- "net"
- "net/http"
- "strconv"
- "strings"
- "sync"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/plugins/http/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-// MB is 1024 bytes
-const MB uint64 = 1024 * 1024
-
-// ErrorEvent represents singular http error event.
-type ErrorEvent struct {
- // Request contains client request, must not be stored.
- Request *http.Request
-
- // Error - associated error, if any.
- Error error
-
- // event timings
- start time.Time
- elapsed time.Duration
-}
-
-// Elapsed returns duration of the invocation.
-func (e *ErrorEvent) Elapsed() time.Duration {
- return e.elapsed
-}
-
-// ResponseEvent represents singular http response event.
-type ResponseEvent struct {
- // Request contains client request, must not be stored.
- Request *Request
-
- // Response contains service response.
- Response *Response
-
- // event timings
- start time.Time
- elapsed time.Duration
-}
-
-// Elapsed returns duration of the invocation.
-func (e *ResponseEvent) Elapsed() time.Duration {
- return e.elapsed
-}
-
-// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
-// parsed files and query, payload will include parsed form dataTree (if any).
-type Handler struct {
- maxRequestSize uint64
- uploads config.Uploads
- trusted config.Cidrs
- log logger.Logger
- pool pool.Pool
- mul sync.Mutex
- lsn []events.Listener
- internalHTTPCode uint64
-}
-
-// NewHandler return handle interface implementation
-func NewHandler(maxReqSize uint64, internalHTTPCode uint64, uploads config.Uploads, trusted config.Cidrs, pool pool.Pool) (*Handler, error) {
- if pool == nil {
- return nil, errors.E(errors.Str("pool should be initialized"))
- }
- return &Handler{
- maxRequestSize: maxReqSize * MB,
- uploads: uploads,
- pool: pool,
- trusted: trusted,
- internalHTTPCode: internalHTTPCode,
- }, nil
-}
-
-// AddListener attaches handler event controller.
-func (h *Handler) AddListener(l ...events.Listener) {
- h.mul.Lock()
- defer h.mul.Unlock()
-
- h.lsn = l
-}
-
-// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
-func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- const op = errors.Op("serve_http")
- start := time.Now()
-
- // validating request size
- if h.maxRequestSize != 0 {
- const op = errors.Op("http_handler_max_size")
- if length := r.Header.Get("content-length"); length != "" {
- // try to parse the value from the `content-length` header
- size, err := strconv.ParseInt(length, 10, 64)
- if err != nil {
- // if got an error while parsing -> assign 500 code to the writer and return
- http.Error(w, "", 500)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("error while parsing value from the `content-length` header")), start: start, elapsed: time.Since(start)})
- return
- }
-
- if size > int64(h.maxRequestSize) {
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("request body max size is exceeded")), start: start, elapsed: time.Since(start)})
- http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), http.StatusBadRequest)
- return
- }
- }
- }
-
- req, err := NewRequest(r, h.uploads)
- if err != nil {
- // if pipe is broken, there is no sense to write the header
- // in this case we just report about error
- if err == errEPIPE {
- h.sendEvent(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
- return
- }
-
- http.Error(w, errors.E(op, err).Error(), 500)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
- return
- }
-
- // proxy IP resolution
- h.resolveIP(req)
-
- req.Open(h.log)
- defer req.Close(h.log)
-
- p, err := req.Payload()
- if err != nil {
- h.handleError(w, r, start, err)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
- return
- }
-
- rsp, err := h.pool.Exec(p)
- if err != nil {
- h.handleError(w, r, start, err)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
- return
- }
-
- resp, err := NewResponse(rsp)
- if err != nil {
- h.handleError(w, r, start, err)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
- return
- }
-
- h.handleResponse(req, resp, start)
- err = resp.Write(w)
- if err != nil {
- http.Error(w, errors.E(op, err).Error(), 500)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
- }
-}
-
-// handleError will handle internal RR errors and return 500
-func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, start time.Time, err error) {
- const op = errors.Op("handle_error")
- // internal error types, user should not see them
- if errors.Is(errors.SoftJob, err) ||
- errors.Is(errors.WatcherStopped, err) ||
- errors.Is(errors.WorkerAllocate, err) ||
- errors.Is(errors.NoFreeWorkers, err) ||
- errors.Is(errors.ExecTTL, err) ||
- errors.Is(errors.IdleTTL, err) ||
- errors.Is(errors.TTL, err) ||
- errors.Is(errors.Encode, err) ||
- errors.Is(errors.Decode, err) ||
- errors.Is(errors.Network, err) {
- // write an internal server error
- w.WriteHeader(int(h.internalHTTPCode))
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
- }
-}
-
-// handleResponse triggers response event.
-func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) {
- h.sendEvent(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)})
-}
-
-// sendEvent invokes event handler if any.
-func (h *Handler) sendEvent(event interface{}) {
- if h.lsn != nil {
- for i := range h.lsn {
- // do not block the pipeline
- // TODO not a good approach, redesign event bus
- i := i
- go func() {
- h.lsn[i](event)
- }()
- }
- }
-}
-
-// get real ip passing multiple proxy
-func (h *Handler) resolveIP(r *Request) {
- if h.trusted.IsTrusted(r.RemoteAddr) == false { //nolint:gosimple
- return
- }
-
- if r.Header.Get("X-Forwarded-For") != "" {
- ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",")
- ipCount := len(ips)
-
- for i := ipCount - 1; i >= 0; i-- {
- addr := strings.TrimSpace(ips[i])
- if net.ParseIP(addr) != nil {
- r.RemoteAddr = addr
- return
- }
- }
-
- return
- }
-
- // The logic here is the following:
- // In general case, we only expect X-Real-Ip header. If it exist, we get the IP address from header and set request Remote address
- // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers
- // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF.
- // CF-Connecting-IP is an Enterprise feature and we check it last in order.
- // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string
- if r.Header.Get("X-Real-Ip") != "" {
- r.RemoteAddr = FetchIP(r.Header.Get("X-Real-Ip"))
- return
- }
-
- if r.Header.Get("True-Client-IP") != "" {
- r.RemoteAddr = FetchIP(r.Header.Get("True-Client-IP"))
- return
- }
-
- if r.Header.Get("CF-Connecting-IP") != "" {
- r.RemoteAddr = FetchIP(r.Header.Get("CF-Connecting-IP"))
- }
-}
diff --git a/pkg/worker_handler/parse.go b/pkg/worker_handler/parse.go
deleted file mode 100644
index 2790da2a..00000000
--- a/pkg/worker_handler/parse.go
+++ /dev/null
@@ -1,149 +0,0 @@
-package handler
-
-import (
- "net/http"
-
- "github.com/spiral/roadrunner/v2/plugins/http/config"
-)
-
-// MaxLevel defines maximum tree depth for incoming request data and files.
-const MaxLevel = 127
-
-type dataTree map[string]interface{}
-type fileTree map[string]interface{}
-
-// parseData parses incoming request body into data tree.
-func parseData(r *http.Request) dataTree {
- data := make(dataTree)
- if r.PostForm != nil {
- for k, v := range r.PostForm {
- data.push(k, v)
- }
- }
-
- if r.MultipartForm != nil {
- for k, v := range r.MultipartForm.Value {
- data.push(k, v)
- }
- }
-
- return data
-}
-
-// pushes value into data tree.
-func (d dataTree) push(k string, v []string) {
- keys := FetchIndexes(k)
- if len(keys) <= MaxLevel {
- d.mount(keys, v)
- }
-}
-
-// mount mounts data tree recursively.
-func (d dataTree) mount(i []string, v []string) {
- if len(i) == 1 {
- // single value context (last element)
- d[i[0]] = v[len(v)-1]
- return
- }
-
- if len(i) == 2 && i[1] == "" {
- // non associated array of elements
- d[i[0]] = v
- return
- }
-
- if p, ok := d[i[0]]; ok {
- p.(dataTree).mount(i[1:], v)
- return
- }
-
- d[i[0]] = make(dataTree)
- d[i[0]].(dataTree).mount(i[1:], v)
-}
-
-// parse incoming dataTree request into JSON (including contentMultipart form dataTree)
-func parseUploads(r *http.Request, cfg config.Uploads) *Uploads {
- u := &Uploads{
- cfg: cfg,
- tree: make(fileTree),
- list: make([]*FileUpload, 0),
- }
-
- for k, v := range r.MultipartForm.File {
- files := make([]*FileUpload, 0, len(v))
- for _, f := range v {
- files = append(files, NewUpload(f))
- }
-
- u.list = append(u.list, files...)
- u.tree.push(k, files)
- }
-
- return u
-}
-
-// pushes new file upload into it's proper place.
-func (d fileTree) push(k string, v []*FileUpload) {
- keys := FetchIndexes(k)
- if len(keys) <= MaxLevel {
- d.mount(keys, v)
- }
-}
-
-// mount mounts data tree recursively.
-func (d fileTree) mount(i []string, v []*FileUpload) {
- if len(i) == 1 {
- // single value context
- d[i[0]] = v[0]
- return
- }
-
- if len(i) == 2 && i[1] == "" {
- // non associated array of elements
- d[i[0]] = v
- return
- }
-
- if p, ok := d[i[0]]; ok {
- p.(fileTree).mount(i[1:], v)
- return
- }
-
- d[i[0]] = make(fileTree)
- d[i[0]].(fileTree).mount(i[1:], v)
-}
-
-// FetchIndexes parses input name and splits it into separate indexes list.
-func FetchIndexes(s string) []string {
- var (
- pos int
- ch string
- keys = make([]string, 1)
- )
-
- for _, c := range s {
- ch = string(c)
- switch ch {
- case " ":
- // ignore all spaces
- continue
- case "[":
- pos = 1
- continue
- case "]":
- if pos == 1 {
- keys = append(keys, "")
- }
- pos = 2
- default:
- if pos == 1 || pos == 2 {
- keys = append(keys, "")
- }
-
- keys[len(keys)-1] += ch
- pos = 0
- }
- }
-
- return keys
-}
diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go
deleted file mode 100644
index 3d60897b..00000000
--- a/pkg/worker_handler/request.go
+++ /dev/null
@@ -1,189 +0,0 @@
-package handler
-
-import (
- "fmt"
- "io/ioutil"
- "net"
- "net/http"
- "net/url"
- "strings"
-
- j "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/http/attributes"
- "github.com/spiral/roadrunner/v2/plugins/http/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-var json = j.ConfigCompatibleWithStandardLibrary
-
-const (
- defaultMaxMemory = 32 << 20 // 32 MB
- contentNone = iota + 900
- contentStream
- contentMultipart
- contentFormData
-)
-
-// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files.
-type Request struct {
- // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address.
- RemoteAddr string `json:"remoteAddr"`
-
- // Protocol includes HTTP protocol version.
- Protocol string `json:"protocol"`
-
- // Method contains name of HTTP method used for the request.
- Method string `json:"method"`
-
- // URI contains full request URI with scheme and query.
- URI string `json:"uri"`
-
- // Header contains list of request headers.
- Header http.Header `json:"headers"`
-
- // Cookies contains list of request cookies.
- Cookies map[string]string `json:"cookies"`
-
- // RawQuery contains non parsed query string (to be parsed on php end).
- RawQuery string `json:"rawQuery"`
-
- // Parsed indicates that request body has been parsed on RR end.
- Parsed bool `json:"parsed"`
-
- // Uploads contains list of uploaded files, their names, sized and associations with temporary files.
- Uploads *Uploads `json:"uploads"`
-
- // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions.
- Attributes map[string]interface{} `json:"attributes"`
-
- // request body can be parsedData or []byte
- body interface{}
-}
-
-func FetchIP(pair string) string {
- if !strings.ContainsRune(pair, ':') {
- return pair
- }
-
- addr, _, _ := net.SplitHostPort(pair)
- return addr
-}
-
-// NewRequest creates new PSR7 compatible request using net/http request.
-func NewRequest(r *http.Request, cfg config.Uploads) (*Request, error) {
- req := &Request{
- RemoteAddr: FetchIP(r.RemoteAddr),
- Protocol: r.Proto,
- Method: r.Method,
- URI: URI(r),
- Header: r.Header,
- Cookies: make(map[string]string),
- RawQuery: r.URL.RawQuery,
- Attributes: attributes.All(r),
- }
-
- for _, c := range r.Cookies() {
- if v, err := url.QueryUnescape(c.Value); err == nil {
- req.Cookies[c.Name] = v
- }
- }
-
- switch req.contentType() {
- case contentNone:
- return req, nil
-
- case contentStream:
- var err error
- req.body, err = ioutil.ReadAll(r.Body)
- return req, err
-
- case contentMultipart:
- if err := r.ParseMultipartForm(defaultMaxMemory); err != nil {
- return nil, err
- }
-
- req.Uploads = parseUploads(r, cfg)
- fallthrough
- case contentFormData:
- if err := r.ParseForm(); err != nil {
- return nil, err
- }
-
- req.body = parseData(r)
- }
-
- req.Parsed = true
- return req, nil
-}
-
-// Open moves all uploaded files to temporary directory so it can be given to php later.
-func (r *Request) Open(log logger.Logger) {
- if r.Uploads == nil {
- return
- }
-
- r.Uploads.Open(log)
-}
-
-// Close clears all temp file uploads
-func (r *Request) Close(log logger.Logger) {
- if r.Uploads == nil {
- return
- }
-
- r.Uploads.Clear(log)
-}
-
-// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
-// files prior to calling this method.
-func (r *Request) Payload() (*payload.Payload, error) {
- const op = errors.Op("marshal_payload")
- p := &payload.Payload{}
-
- var err error
- if p.Context, err = json.Marshal(r); err != nil {
- return nil, errors.E(op, errors.Encode, err)
- }
-
- if r.Parsed {
- if p.Body, err = json.Marshal(r.body); err != nil {
- return nil, errors.E(op, errors.Encode, err)
- }
- } else if r.body != nil {
- p.Body = r.body.([]byte)
- }
-
- return p, nil
-}
-
-// contentType returns the payload content type.
-func (r *Request) contentType() int {
- if r.Method == "HEAD" || r.Method == "OPTIONS" {
- return contentNone
- }
-
- ct := r.Header.Get("content-type")
- if strings.Contains(ct, "application/x-www-form-urlencoded") {
- return contentFormData
- }
-
- if strings.Contains(ct, "multipart/form-data") {
- return contentMultipart
- }
-
- return contentStream
-}
-
-// URI fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
-func URI(r *http.Request) string {
- if r.URL.Host != "" {
- return r.URL.String()
- }
- if r.TLS != nil {
- return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
- }
-
- return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
-}
diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go
deleted file mode 100644
index d22f09d4..00000000
--- a/pkg/worker_handler/response.go
+++ /dev/null
@@ -1,105 +0,0 @@
-package handler
-
-import (
- "io"
- "net/http"
- "strings"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/payload"
-)
-
-// Response handles PSR7 response logic.
-type Response struct {
- // Status contains response status.
- Status int `json:"status"`
-
- // Header contains list of response headers.
- Headers map[string][]string `json:"headers"`
-
- // associated Body payload.
- Body interface{}
-}
-
-// NewResponse creates new response based on given pool payload.
-func NewResponse(p *payload.Payload) (*Response, error) {
- const op = errors.Op("http_response")
- r := &Response{Body: p.Body}
- if err := json.Unmarshal(p.Context, r); err != nil {
- return nil, errors.E(op, errors.Decode, err)
- }
-
- return r, nil
-}
-
-// Write writes response headers, status and body into ResponseWriter.
-func (r *Response) Write(w http.ResponseWriter) error {
- // INFO map is the reference type in golang
- p := handlePushHeaders(r.Headers)
- if pusher, ok := w.(http.Pusher); ok {
- for _, v := range p {
- err := pusher.Push(v, nil)
- if err != nil {
- return err
- }
- }
- }
-
- handleTrailers(r.Headers)
- for n, h := range r.Headers {
- for _, v := range h {
- w.Header().Add(n, v)
- }
- }
-
- w.WriteHeader(r.Status)
-
- if data, ok := r.Body.([]byte); ok {
- _, err := w.Write(data)
- if err != nil {
- return handleWriteError(err)
- }
- }
-
- if rc, ok := r.Body.(io.Reader); ok {
- if _, err := io.Copy(w, rc); err != nil {
- return err
- }
- }
-
- return nil
-}
-
-func handlePushHeaders(h map[string][]string) []string {
- var p []string
- pushHeader, ok := h[http2pushHeaderKey]
- if !ok {
- return p
- }
-
- p = append(p, pushHeader...)
-
- delete(h, http2pushHeaderKey)
-
- return p
-}
-
-func handleTrailers(h map[string][]string) {
- trailers, ok := h[TrailerHeaderKey]
- if !ok {
- return
- }
-
- for _, tr := range trailers {
- for _, n := range strings.Split(tr, ",") {
- n = strings.Trim(n, "\t ")
- if v, ok := h[n]; ok {
- h["Trailer:"+n] = v
-
- delete(h, n)
- }
- }
- }
-
- delete(h, TrailerHeaderKey)
-}
diff --git a/pkg/worker_handler/uploads.go b/pkg/worker_handler/uploads.go
deleted file mode 100644
index e695000e..00000000
--- a/pkg/worker_handler/uploads.go
+++ /dev/null
@@ -1,159 +0,0 @@
-package handler
-
-import (
- "github.com/spiral/roadrunner/v2/plugins/http/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-
- "io"
- "io/ioutil"
- "mime/multipart"
- "os"
- "sync"
-)
-
-const (
- // UploadErrorOK - no error, the file uploaded with success.
- UploadErrorOK = 0
-
- // UploadErrorNoFile - no file was uploaded.
- UploadErrorNoFile = 4
-
- // UploadErrorNoTmpDir - missing a temporary folder.
- UploadErrorNoTmpDir = 6
-
- // UploadErrorCantWrite - failed to write file to disk.
- UploadErrorCantWrite = 7
-
- // UploadErrorExtension - forbidden file extension.
- UploadErrorExtension = 8
-)
-
-// Uploads tree manages uploaded files tree and temporary files.
-type Uploads struct {
- // associated temp directory and forbidden extensions.
- cfg config.Uploads
-
- // pre processed data tree for Uploads.
- tree fileTree
-
- // flat list of all file Uploads.
- list []*FileUpload
-}
-
-// MarshalJSON marshal tree tree into JSON.
-func (u *Uploads) MarshalJSON() ([]byte, error) {
- return json.Marshal(u.tree)
-}
-
-// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
-// will be handled individually.
-func (u *Uploads) Open(log logger.Logger) {
- var wg sync.WaitGroup
- for _, f := range u.list {
- wg.Add(1)
- go func(f *FileUpload) {
- defer wg.Done()
- err := f.Open(u.cfg)
- if err != nil && log != nil {
- log.Error("error opening the file", "err", err)
- }
- }(f)
- }
-
- wg.Wait()
-}
-
-// Clear deletes all temporary files.
-func (u *Uploads) Clear(log logger.Logger) {
- for _, f := range u.list {
- if f.TempFilename != "" && exists(f.TempFilename) {
- err := os.Remove(f.TempFilename)
- if err != nil && log != nil {
- log.Error("error removing the file", "err", err)
- }
- }
- }
-}
-
-// FileUpload represents singular file NewUpload.
-type FileUpload struct {
- // ID contains filename specified by the client.
- Name string `json:"name"`
-
- // Mime contains mime-type provided by the client.
- Mime string `json:"mime"`
-
- // Size of the uploaded file.
- Size int64 `json:"size"`
-
- // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php
- Error int `json:"error"`
-
- // TempFilename points to temporary file location.
- TempFilename string `json:"tmpName"`
-
- // associated file header
- header *multipart.FileHeader
-}
-
-// NewUpload wraps net/http upload into PRS-7 compatible structure.
-func NewUpload(f *multipart.FileHeader) *FileUpload {
- return &FileUpload{
- Name: f.Filename,
- Mime: f.Header.Get("Content-Type"),
- Error: UploadErrorOK,
- header: f,
- }
-}
-
-// Open moves file content into temporary file available for PHP.
-// NOTE:
-// There is 2 deferred functions, and in case of getting 2 errors from both functions
-// error from close of temp file would be overwritten by error from the main file
-// STACK
-// DEFER FILE CLOSE (2)
-// DEFER TMP CLOSE (1)
-func (f *FileUpload) Open(cfg config.Uploads) (err error) {
- if cfg.Forbids(f.Name) {
- f.Error = UploadErrorExtension
- return nil
- }
-
- file, err := f.header.Open()
- if err != nil {
- f.Error = UploadErrorNoFile
- return err
- }
-
- defer func() {
- // close the main file
- err = file.Close()
- }()
-
- tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload")
- if err != nil {
- // most likely cause of this issue is missing tmp dir
- f.Error = UploadErrorNoTmpDir
- return err
- }
-
- f.TempFilename = tmp.Name()
- defer func() {
- // close the temp file
- err = tmp.Close()
- }()
-
- if f.Size, err = io.Copy(tmp, file); err != nil {
- f.Error = UploadErrorCantWrite
- }
-
- return err
-}
-
-// exists if file exists.
-func exists(path string) bool {
- if _, err := os.Stat(path); os.IsNotExist(err) {
- return false
- }
- return true
-}
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
deleted file mode 100644
index 5605f1e0..00000000
--- a/pkg/worker_watcher/container/channel/vec.go
+++ /dev/null
@@ -1,107 +0,0 @@
-package channel
-
-import (
- "context"
- "sync"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-type Vec struct {
- sync.RWMutex
- // destroy signal
- destroy uint64
- // channel with the workers
- workers chan worker.BaseProcess
-}
-
-func NewVector(len uint64) *Vec {
- vec := &Vec{
- destroy: 0,
- workers: make(chan worker.BaseProcess, len),
- }
-
- return vec
-}
-
-// Push is O(1) operation
-// In case of TTL and full channel O(n) worst case, where n is len of the channel
-func (v *Vec) Push(w worker.BaseProcess) {
- // Non-blocking channel send
- select {
- case v.workers <- w:
- // default select branch is only possible when dealing with TTL
- // because in that case, workers in the v.workers channel can be TTL-ed and killed
- // but presenting in the channel
- default:
- // Stop Pop operations
- v.Lock()
- defer v.Unlock()
-
- /*
- we can be in the default branch by the following reasons:
- 1. TTL is set with no requests during the TTL
- 2. Violated Get <-> Release operation (how ??)
- */
- for i := 0; i < len(v.workers); i++ {
- /*
- We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
- */
- wrk := <-v.workers
- switch wrk.State().Value() {
- // skip good states, put worker back
- case worker.StateWorking, worker.StateReady:
- // put the worker back
- // generally, while send and receive operations are concurrent (from the channel), channel behave
- // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
- v.workers <- wrk
- continue
- /*
- Bad states are here.
- */
- default:
- // kill the current worker (just to be sure it's dead)
- if wrk != nil {
- _ = wrk.Kill()
- }
- // replace with the new one and return from the loop
- // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
- // But this case will be handled in the worker_watcher::Get
- v.workers <- w
- return
- }
- }
- }
-}
-
-func (v *Vec) Remove(_ int64) {}
-
-func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
- /*
- if *addr == old {
- *addr = new
- return true
- }
- */
-
- if atomic.LoadUint64(&v.destroy) == 1 {
- return nil, errors.E(errors.WatcherStopped)
- }
-
- // used only for the TTL-ed workers
- v.RLock()
- defer v.RUnlock()
-
- select {
- case w := <-v.workers:
- return w, nil
- case <-ctx.Done():
- return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
- }
-}
-
-func (v *Vec) Destroy() {
- atomic.StoreUint64(&v.destroy, 1)
-}
diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go
deleted file mode 100644
index edf81d60..00000000
--- a/pkg/worker_watcher/container/queue/queue.go
+++ /dev/null
@@ -1,102 +0,0 @@
-package queue
-
-import (
- "context"
- "sync"
- "sync/atomic"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-const (
- initialSize = 1
- maxInitialSize = 8
- maxInternalSliceSize = 10
-)
-
-type Node struct {
- w []worker.BaseProcess
- // LL
- n *Node
-}
-
-type Queue struct {
- mu sync.Mutex
-
- head *Node
- tail *Node
-
- curr uint64
- len uint64
-
- sliceSize uint64
-}
-
-func NewQueue() *Queue {
- q := &Queue{
- mu: sync.Mutex{},
- head: nil,
- tail: nil,
- curr: 0,
- len: 0,
- sliceSize: 0,
- }
-
- return q
-}
-
-func (q *Queue) Push(w worker.BaseProcess) {
- q.mu.Lock()
-
- if q.head == nil {
- h := newNode(initialSize)
- q.head = h
- q.tail = h
- q.sliceSize = maxInitialSize
- } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) {
- n := newNode(maxInternalSliceSize)
- q.tail.n = n
- q.tail = n
- q.sliceSize = maxInternalSliceSize
- }
-
- q.tail.w = append(q.tail.w, w)
-
- atomic.AddUint64(&q.len, 1)
-
- q.mu.Unlock()
-}
-
-func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) {
- q.mu.Lock()
-
- if q.head == nil {
- return nil, nil
- }
-
- w := q.head.w[q.curr]
- q.head.w[q.curr] = nil
- atomic.AddUint64(&q.len, ^uint64(0))
- atomic.AddUint64(&q.curr, 1)
-
- if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) {
- n := q.head.n
- q.head.n = nil
- q.head = n
- q.curr = 0
- }
-
- q.mu.Unlock()
-
- return w, nil
-}
-
-func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) {
-
-}
-
-func (q *Queue) Destroy() {}
-
-func newNode(capacity int) *Node {
- return &Node{w: make([]worker.BaseProcess, 0, capacity)}
-}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
deleted file mode 100755
index 83f8e627..00000000
--- a/pkg/worker_watcher/worker_watcher.go
+++ /dev/null
@@ -1,318 +0,0 @@
-package worker_watcher //nolint:stylecheck
-
-import (
- "context"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-// Vector interface represents vector container
-type Vector interface {
- // Push used to put worker to the vector
- Push(worker.BaseProcess)
- // Pop used to get worker from the vector
- Pop(ctx context.Context) (worker.BaseProcess, error)
- // Remove worker with provided pid
- Remove(pid int64)
- // Destroy used to stop releasing the workers
- Destroy()
-
- // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation
- // Replace(prevPid int64, newWorker worker.BaseProcess)
-}
-
-type workerWatcher struct {
- sync.RWMutex
- container Vector
- // used to control Destroy stage (that all workers are in the container)
- numWorkers *uint64
-
- workers []worker.BaseProcess
-
- allocator worker.Allocator
- allocateTimeout time.Duration
- events events.Handler
-}
-
-// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher {
- ww := &workerWatcher{
- container: channel.NewVector(numWorkers),
-
- // pass a ptr to the number of workers to avoid blocking in the TTL loop
- numWorkers: utils.Uint64(numWorkers),
- allocateTimeout: allocateTimeout,
- workers: make([]worker.BaseProcess, 0, numWorkers),
-
- allocator: allocator,
- events: events,
- }
-
- return ww
-}
-
-func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
- for i := 0; i < len(workers); i++ {
- ww.container.Push(workers[i])
- // add worker to watch slice
- ww.workers = append(ww.workers, workers[i])
-
- go func(swc worker.BaseProcess) {
- ww.wait(swc)
- }(workers[i])
- }
- return nil
-}
-
-// Take is not a thread safe operation
-func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
- const op = errors.Op("worker_watcher_get_free_worker")
-
- // thread safe operation
- w, err := ww.container.Pop(ctx)
- if err != nil {
- if errors.Is(errors.WatcherStopped, err) {
- return nil, errors.E(op, errors.WatcherStopped)
- }
-
- return nil, errors.E(op, err)
- }
-
- // fast path, worker not nil and in the ReadyState
- if w.State().Value() == worker.StateReady {
- return w, nil
- }
-
- // =========================================================
- // SLOW PATH
- _ = w.Kill()
- // no free workers in the container or worker not in the ReadyState (TTL-ed)
- // try to continuously get free one
- for {
- w, err = ww.container.Pop(ctx)
- if err != nil {
- if errors.Is(errors.WatcherStopped, err) {
- return nil, errors.E(op, errors.WatcherStopped)
- }
- return nil, errors.E(op, err)
- }
-
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- switch w.State().Value() {
- // return only workers in the Ready state
- // check first
- case worker.StateReady:
- return w, nil
- case worker.StateWorking: // how??
- ww.container.Push(w) // put it back, let worker finish the work
- continue
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateStopping:
- // worker doing no work because it in the container
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // try to get new worker
- continue
- }
- }
-}
-
-func (ww *workerWatcher) Allocate() error {
- const op = errors.Op("worker_watcher_allocate_new")
-
- sw, err := ww.allocator()
- if err != nil {
- // log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)),
- })
-
- // if no timeout, return error immediately
- if ww.allocateTimeout == 0 {
- return errors.E(op, errors.WorkerAllocate, err)
- }
-
- // every half of a second
- allocateFreq := time.NewTicker(time.Millisecond * 500)
-
- tt := time.After(ww.allocateTimeout)
- for {
- select {
- case <-tt:
- // reduce number of workers
- atomic.AddUint64(ww.numWorkers, ^uint64(0))
- allocateFreq.Stop()
- // timeout exceed, worker can't be allocated
- return errors.E(op, errors.WorkerAllocate, err)
-
- case <-allocateFreq.C:
- sw, err = ww.allocator()
- if err != nil {
- // log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)),
- })
- continue
- }
-
- // reallocated
- allocateFreq.Stop()
- goto done
- }
- }
- }
-
-done:
- // add worker to Wait
- ww.addToWatch(sw)
-
- ww.Lock()
- // add new worker to the workers slice (to get information about workers in parallel)
- ww.workers = append(ww.workers, sw)
- ww.Unlock()
-
- // push the worker to the container
- ww.Release(sw)
- return nil
-}
-
-// Remove worker
-func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
- ww.Lock()
- defer ww.Unlock()
-
- // set remove state
- pid := wb.Pid()
-
- // worker will be removed on the Get operation
- for i := 0; i < len(ww.workers); i++ {
- if ww.workers[i].Pid() == pid {
- ww.workers = append(ww.workers[:i], ww.workers[i+1:]...)
- // kill worker, just to be sure it's dead
- _ = wb.Kill()
- return
- }
- }
-}
-
-// Release O(1) operation
-func (ww *workerWatcher) Release(w worker.BaseProcess) {
- switch w.State().Value() {
- case worker.StateReady:
- ww.container.Push(w)
- default:
- _ = w.Kill()
- }
-}
-
-// Destroy all underlying container (but let them complete the task)
-func (ww *workerWatcher) Destroy(_ context.Context) {
- // destroy container, we don't use ww mutex here, since we should be able to push worker
- ww.Lock()
- // do not release new workers
- ww.container.Destroy()
- ww.Unlock()
-
- tt := time.NewTicker(time.Millisecond * 100)
- defer tt.Stop()
- for { //nolint:gosimple
- select {
- case <-tt.C:
- ww.Lock()
- // that might be one of the workers is working
- if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) {
- ww.Unlock()
- continue
- }
- // All container at this moment are in the container
- // Pop operation is blocked, push can't be done, since it's not possible to pop
- for i := 0; i < len(ww.workers); i++ {
- ww.workers[i].State().Set(worker.StateDestroyed)
- // kill the worker
- _ = ww.workers[i].Kill()
- }
- return
- }
- }
-}
-
-// List - this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) List() []worker.BaseProcess {
- ww.RLock()
- defer ww.RUnlock()
-
- if len(ww.workers) == 0 {
- return nil
- }
-
- base := make([]worker.BaseProcess, 0, len(ww.workers))
- for i := 0; i < len(ww.workers); i++ {
- base = append(base, ww.workers[i])
- }
-
- return base
-}
-
-func (ww *workerWatcher) wait(w worker.BaseProcess) {
- const op = errors.Op("worker_watcher_wait")
- err := w.Wait()
- if err != nil {
- ww.events.Push(events.WorkerEvent{
- Event: events.EventWorkerError,
- Worker: w,
- Payload: errors.E(op, err),
- })
- }
-
- // remove worker
- ww.Remove(w)
-
- if w.State().Value() == worker.StateDestroyed {
- // worker was manually destroyed, no need to replace
- ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- return
- }
-
- // set state as stopped
- w.State().Set(worker.StateStopped)
-
- err = ww.Allocate()
- if err != nil {
- ww.events.Push(events.PoolEvent{
- Event: events.EventPoolError,
- Payload: errors.E(op, err),
- })
-
- // no workers at all, panic
- if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {
- panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err)))
- }
- }
-}
-
-func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) {
- go func() {
- ww.wait(wb)
- }()
-}