diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 21:46:50 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-16 21:46:50 +0300 |
commit | 3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch) | |
tree | e723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /pkg | |
parent | 337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff) | |
parent | 823d831b57b75f70c7c3bbbee355f2016633bb3b (diff) |
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'pkg')
50 files changed, 0 insertions, 7890 deletions
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go deleted file mode 100644 index dab9346c..00000000 --- a/pkg/bst/bst.go +++ /dev/null @@ -1,152 +0,0 @@ -package bst - -// BST ... -type BST struct { - // registered topic, not unique - topic string - // associated connections with the topic - uuids map[string]struct{} - - // left and right subtrees - left *BST - right *BST -} - -func NewBST() Storage { - return &BST{ - uuids: make(map[string]struct{}, 10), - } -} - -// Insert uuid to the topic -func (b *BST) Insert(uuid string, topic string) { - curr := b - - for { - if topic == curr.topic { - curr.uuids[uuid] = struct{}{} - return - } - // if topic less than curr topic - if topic < curr.topic { - if curr.left == nil { - curr.left = &BST{ - topic: topic, - uuids: map[string]struct{}{uuid: {}}, - } - return - } - // move forward - curr = curr.left - } else { - if curr.right == nil { - curr.right = &BST{ - topic: topic, - uuids: map[string]struct{}{uuid: {}}, - } - return - } - - curr = curr.right - } - } -} - -func (b *BST) Contains(topic string) bool { - curr := b - for curr != nil { - switch { - case topic < curr.topic: - curr = curr.left - case topic > curr.topic: - curr = curr.right - case topic == curr.topic: - return true - } - } - - return false -} - -func (b *BST) Get(topic string) map[string]struct{} { - curr := b - for curr != nil { - switch { - case topic < curr.topic: - curr = curr.left - case topic > curr.topic: - curr = curr.right - case topic == curr.topic: - return curr.uuids - } - } - - return nil -} - -func (b *BST) Remove(uuid string, topic string) { - b.removeHelper(uuid, topic, nil) -} - -func (b *BST) removeHelper(uuid string, topic string, parent *BST) { - curr := b - for curr != nil { - if topic < curr.topic { //nolint:gocritic - parent = curr - curr = curr.left - } else if topic > curr.topic { - parent = curr - curr = curr.right - } else { - // if more than 1 topic - remove only topic, do not remove the whole vertex - if len(curr.uuids) > 1 { - if _, ok := curr.uuids[uuid]; ok { - delete(curr.uuids, uuid) - return - } - } - - if curr.left != nil && curr.right != nil { //nolint:gocritic - curr.topic, curr.uuids = curr.right.traverseForMinString() - curr.right.removeHelper(curr.topic, uuid, curr) - } else if parent == nil { - if curr.left != nil { //nolint:gocritic - curr.topic = curr.left.topic - curr.uuids = curr.left.uuids - - curr.right = curr.left.right - curr.left = curr.left.left - } else if curr.right != nil { - curr.topic = curr.right.topic - curr.uuids = curr.right.uuids - - curr.left = curr.right.left - curr.right = curr.right.right - } else { //nolint:staticcheck - // single node tree - } - } else if parent.left == curr { - if curr.left != nil { - parent.left = curr.left - } else { - parent.left = curr.right - } - } else if parent.right == curr { - if curr.left != nil { - parent.right = curr.left - } else { - parent.right = curr.right - } - } - break - } - } -} - -//go:inline -func (b *BST) traverseForMinString() (string, map[string]struct{}) { - if b.left == nil { - return b.topic, b.uuids - } - return b.left.traverseForMinString() -} diff --git a/pkg/bst/bst_test.go b/pkg/bst/bst_test.go deleted file mode 100644 index 2271508c..00000000 --- a/pkg/bst/bst_test.go +++ /dev/null @@ -1,325 +0,0 @@ -package bst - -import ( - "math/rand" - "testing" - "time" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" -) - -const predifined = "chat-1-2" - -func TestNewBST(t *testing.T) { - // create a new bst - g := NewBST() - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments") - } - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments2") - } - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments3") - } - - // should be 100 - exist := g.Get("comments") - assert.Len(t, exist, 100) - - // should be 100 - exist2 := g.Get("comments2") - assert.Len(t, exist2, 100) - - // should be 100 - exist3 := g.Get("comments3") - assert.Len(t, exist3, 100) -} - -func BenchmarkGraph(b *testing.B) { - g := NewBST() - - for i := 0; i < 1000; i++ { - uid := uuid.New().String() - g.Insert(uuid.NewString(), uid) - } - - g.Insert(uuid.NewString(), predifined) - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - exist := g.Get(predifined) - _ = exist - } -} - -func BenchmarkBigSearch(b *testing.B) { - g1 := NewBST() - g2 := NewBST() - g3 := NewBST() - - predefinedSlice := make([]string, 0, 1000) - for i := 0; i < 1000; i++ { - predefinedSlice = append(predefinedSlice, uuid.NewString()) - } - if predefinedSlice == nil { - b.FailNow() - } - - for i := 0; i < 1000; i++ { - g1.Insert(uuid.NewString(), uuid.NewString()) - } - for i := 0; i < 1000; i++ { - g2.Insert(uuid.NewString(), uuid.NewString()) - } - for i := 0; i < 1000; i++ { - g3.Insert(uuid.NewString(), uuid.NewString()) - } - - for i := 0; i < 333; i++ { - g1.Insert(uuid.NewString(), predefinedSlice[i]) - } - - for i := 0; i < 333; i++ { - g2.Insert(uuid.NewString(), predefinedSlice[333+i]) - } - - for i := 0; i < 333; i++ { - g3.Insert(uuid.NewString(), predefinedSlice[666+i]) - } - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - for i := 0; i < 333; i++ { - exist := g1.Get(predefinedSlice[i]) - _ = exist - } - } - for i := 0; i < b.N; i++ { - for i := 0; i < 333; i++ { - exist := g2.Get(predefinedSlice[333+i]) - _ = exist - } - } - for i := 0; i < b.N; i++ { - for i := 0; i < 333; i++ { - exist := g3.Get(predefinedSlice[666+i]) - _ = exist - } - } -} - -func BenchmarkBigSearchWithRemoves(b *testing.B) { - g1 := NewBST() - g2 := NewBST() - g3 := NewBST() - - predefinedSlice := make([]string, 0, 1000) - for i := 0; i < 1000; i++ { - predefinedSlice = append(predefinedSlice, uuid.NewString()) - } - if predefinedSlice == nil { - b.FailNow() - } - - for i := 0; i < 1000; i++ { - g1.Insert(uuid.NewString(), uuid.NewString()) - } - for i := 0; i < 1000; i++ { - g2.Insert(uuid.NewString(), uuid.NewString()) - } - for i := 0; i < 1000; i++ { - g3.Insert(uuid.NewString(), uuid.NewString()) - } - - for i := 0; i < 333; i++ { - g1.Insert(uuid.NewString(), predefinedSlice[i]) - } - - for i := 0; i < 333; i++ { - g2.Insert(uuid.NewString(), predefinedSlice[333+i]) - } - - for i := 0; i < 333; i++ { - g3.Insert(uuid.NewString(), predefinedSlice[666+i]) - } - - go func() { - tt := time.NewTicker(time.Millisecond) - for { - select { - case <-tt.C: - num := rand.Intn(333) //nolint:gosec - values := g1.Get(predefinedSlice[num]) - for k := range values { - g1.Remove(k, predefinedSlice[num]) - } - } - } - }() - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - for i := 0; i < 333; i++ { - exist := g1.Get(predefinedSlice[i]) - _ = exist - } - } - for i := 0; i < b.N; i++ { - for i := 0; i < 333; i++ { - exist := g2.Get(predefinedSlice[333+i]) - _ = exist - } - } - for i := 0; i < b.N; i++ { - for i := 0; i < 333; i++ { - exist := g3.Get(predefinedSlice[666+i]) - _ = exist - } - } -} - -func TestGraph(t *testing.T) { - g := NewBST() - - for i := 0; i < 1000; i++ { - uid := uuid.New().String() - g.Insert(uuid.NewString(), uid) - } - - g.Insert(uuid.NewString(), predifined) - - exist := g.Get(predifined) - assert.NotNil(t, exist) - assert.Len(t, exist, 1) -} - -func TestTreeConcurrentContains(t *testing.T) { - g := NewBST() - - key1 := uuid.NewString() - key2 := uuid.NewString() - key3 := uuid.NewString() - key4 := uuid.NewString() - key5 := uuid.NewString() - - g.Insert(key1, predifined) - g.Insert(key2, predifined) - g.Insert(key3, predifined) - g.Insert(key4, predifined) - g.Insert(key5, predifined) - - for i := 0; i < 100; i++ { - go func() { - _ = g.Get(predifined) - }() - - go func() { - _ = g.Get(predifined) - }() - - go func() { - _ = g.Get(predifined) - }() - - go func() { - _ = g.Get(predifined) - }() - } - - time.Sleep(time.Second * 2) - - exist := g.Get(predifined) - assert.NotNil(t, exist) - assert.Len(t, exist, 5) -} - -func TestGraphRemove(t *testing.T) { - g := NewBST() - - key1 := uuid.NewString() - key2 := uuid.NewString() - key3 := uuid.NewString() - key4 := uuid.NewString() - key5 := uuid.NewString() - - g.Insert(key1, predifined) - g.Insert(key2, predifined) - g.Insert(key3, predifined) - g.Insert(key4, predifined) - g.Insert(key5, predifined) - - exist := g.Get(predifined) - assert.NotNil(t, exist) - assert.Len(t, exist, 5) - - g.Remove(key1, predifined) - - exist = g.Get(predifined) - assert.NotNil(t, exist) - assert.Len(t, exist, 4) -} - -func TestBigSearch(t *testing.T) { - g1 := NewBST() - g2 := NewBST() - g3 := NewBST() - - predefinedSlice := make([]string, 0, 1000) - for i := 0; i < 1000; i++ { - predefinedSlice = append(predefinedSlice, uuid.NewString()) - } - if predefinedSlice == nil { - t.FailNow() - } - - for i := 0; i < 1000; i++ { - g1.Insert(uuid.NewString(), uuid.NewString()) - } - for i := 0; i < 1000; i++ { - g2.Insert(uuid.NewString(), uuid.NewString()) - } - for i := 0; i < 1000; i++ { - g3.Insert(uuid.NewString(), uuid.NewString()) - } - - for i := 0; i < 333; i++ { - g1.Insert(uuid.NewString(), predefinedSlice[i]) - } - - for i := 0; i < 333; i++ { - g2.Insert(uuid.NewString(), predefinedSlice[333+i]) - } - - for i := 0; i < 333; i++ { - g3.Insert(uuid.NewString(), predefinedSlice[666+i]) - } - - for i := 0; i < 333; i++ { - exist := g1.Get(predefinedSlice[i]) - assert.NotNil(t, exist) - assert.Len(t, exist, 1) - } - - for i := 0; i < 333; i++ { - exist := g2.Get(predefinedSlice[333+i]) - assert.NotNil(t, exist) - assert.Len(t, exist, 1) - } - - for i := 0; i < 333; i++ { - exist := g3.Get(predefinedSlice[666+i]) - assert.NotNil(t, exist) - assert.Len(t, exist, 1) - } -} diff --git a/pkg/bst/doc.go b/pkg/bst/doc.go deleted file mode 100644 index abb7e6e9..00000000 --- a/pkg/bst/doc.go +++ /dev/null @@ -1,7 +0,0 @@ -package bst - -/* -Binary search tree for the pubsub - -The vertex may have one or multiply topics associated with the single websocket connection UUID -*/ diff --git a/pkg/bst/interface.go b/pkg/bst/interface.go deleted file mode 100644 index 95b03e11..00000000 --- a/pkg/bst/interface.go +++ /dev/null @@ -1,13 +0,0 @@ -package bst - -// Storage is general in-memory BST storage implementation -type Storage interface { - // Insert inserts to a vertex with topic ident connection uuid - Insert(uuid string, topic string) - // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed - Remove(uuid, topic string) - // Get will return all connections associated with the topic - Get(topic string) map[string]struct{} - // Contains checks if the BST contains a topic - Contains(topic string) bool -} diff --git a/pkg/doc/README.md b/pkg/doc/README.md deleted file mode 100644 index 709df603..00000000 --- a/pkg/doc/README.md +++ /dev/null @@ -1,21 +0,0 @@ -This is the drawio diagrams showing basic workflows inside RoadRunner 2.0 - -Simple HTTP workflow description: -![alt text](pool_workflow.svg) - -1. Allocate sync workers. When plugin starts (which use workers pool), then it allocates required number of processes - via `cmd.exec` command. - -2. When user send HTTP request to the RR2, HTTP plugin receive it and transfer to the workers pool `Exec/ExecWithContex` -method. And workers pool ask Worker watcher to get free worker. - -3. Workers watcher uses stack data structure under the hood and making POP operation to get first free worker. If there are -no workers in the `stack`, watcher waits for the specified via config (`allocate_timeout`) time. - -4. Stack returns free worker to the watcher. -5. Watcher returns that worker to the `pool`. -6. Pool invoke `Exec/ExecWithTimeout` method on the golang worker with provided request payload. -7. Golang worker send that request to the PHP worker via various set of transports (`pkg/transport` package). -8. PHP worker send back response to the golang worker (or error via stderr). -9. Golang worker return response payload to the pool. -10. Pool process this response and return answer to the user. diff --git a/pkg/doc/pool_workflow.drawio b/pkg/doc/pool_workflow.drawio deleted file mode 100644 index 3f74d0fc..00000000 --- a/pkg/doc/pool_workflow.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2021-01-24T16:29:37.978Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="8v25I0qLU_vMqqVrx7cN" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">7Vxbd6M4Ev41OWfnoX0Qkrg8Jul09+zOTGc6vSfTjwRkm22MvCB37Pn1IwGydcExsTGO03HOiUEgLqqqr6o+lXwBr2fLj0U0n/5OE5JduE6yvIDvL1wXAMflX6Jl1bQ4jlO3TIo0qduUhrv0byJPbFoXaULKpq1uYpRmLJ3rjTHNcxIzrS0qCvqonzamWaI1zKMJsRru4iizW+/ThE3li3nh5sAnkk6m8tYeRvWRWSTPbl6lnEYJfVSa4M0FvC4oZfXWbHlNMjF8+sB82HJ0/WQFyVmXDn/hCPvfPk++/Ocm+HyPr5Z/TsfvvOYyP6Js0bzyhetl/IJXU37Im4itu3Q25wPiOv8tScG/CvL/BSmZPJHfcnNu865sJUewoIs8IeIZHH74cZoycjePYnH0kSuNuBGbZXwP8M0xzdmHaJZmQl+u6aJIqxv+Qfi4XUVZOsl5e0bG4k4/SMFSLqnLppnR+fr+6qjIN+Snk6XS1IzSR0JnhBUrfkpzFGNcd2mUFrmNFj+qGhA2GjBVpB9ItY0atZusL74RDN9oZPMMOYHQGliScE1tdmnBpnRC8yi72bRe6UO/Oec3KoaqGvD/EcZWjdlFC0Z1cZBlyv4S3Ucubna/KYfeL5tLVzsruZPzF1Z7if1v6sFNv2pPdtSE/2/CHooozUv+0r/TnD4l2pIrSkyeGr0GOaJiQtgT50lTFUP7pKYUJItY+kMHiTahN11vacqfea1hoedpGgYc2SCvUT9q083QnfVz7K9Orm32H8XrcNgS/8pVHgt9p8X3yv7+dfv5ln/ROSn4a9P8l1Zl/C164PCvKZA02ZiLjF/JNtpZmiS1rpIy/Tt6qK4nlGEuXr0aDHx1gd/vpR5PWpKFB2sn0jyFBtNtOPHOGTke0iUZ1HsH6sc7AJ2RDkL8ZsB1lA80VYaOxyU5jrZgS1vgmwbUQnE939e9RT8agAz5hyMHDSZv/7TOxledjdPV2fiaswEv3tnAvn3NQQGGRC7FxvEZ27hU4cNtnJs4QlDHYtiPjUPDxvEogEPZOHTsCEDcae3030RfwTvwdNm7vYgemaJ/B9DIw0MJ347+7iupi8G+j1g8bVGA5+VxaZZd04wWVV84HhMvjnl7yQr6nShHEj98cA7C4+55XQgNlwqbCErN63BjF2pa5x0tqwPWMA/qaPdJ6rSMbrT2uYO7WbejmwV9u9mq62VRRCvlhAbNthp8YOpeE71vtKe+Yr9m7lp2fpllNI6YleSVZwz4YBsS7AP4GHlHiecD3Y0gA1KOGMxDSwnaUecnFDaAGGhi6SmwQ7pvxyPgDyZuS9q/5ilLuSS5N+jTpycRCcatPt2LA/IwHsan+4GOq4HT4tOdFp9uGmB/Pr09bXpjajt5dXgqr36YzD3L6lQ69Y572vtzT6ukXvfEmjm6P/T7AV7fzKu8keerrKk7GA4jSyXcN+nX0oc8/NWl1I/0PYsz9Qfzu/CksA8UzFf40+flcv7pkrmusA/dF4X70MZ974yNHPZm5Dy0xrgnqwZoBAejRVE7SJ/V1Acf/B2GXO3dkiLlwyV076VYN+rbuvfiaoADA82PyKTiQ9cOPtrVwceHdcAONtT/CPwRaqEOZDHTOeKbNO4+8M1xJROxL74NAGZ2DHpH8qR65nJO81IkJxyL+L+p2FyUZ52h9CfeutjCmKHphwj0Xf2qbjAabNrHNuebJYn5pcTXfcqm11wQZMksHTh/nsic+2nlidrmfo7GE21JB88q0jhdkYWs1DuT6R/gID1C4A27PP6OHsdx+a6d0aAzdgnSyPrw+BAiA7t7mi1wDdoCgAHrQNzQEvivAgKjir4cF0R8recIryt0LETbii7EAxb2rMJPqh6uo08cyqmYA9UDADxCoakgCKmsJhhMW+yqoTsWxd8rR8r96Pbp5INCCEyCBLWFEIH7IOpgTxJCACc4df0ItI33tOsENBrS7UhDrqMItdtpaEi3Y1TRtXZzky46h9JhA6QHrqVNRn7wNZ0RuniN+QFwPd25tyYIshZwkAQB2umaNfDlNJqLzXFGlpdiuVtlLEmz+T7OorJMY8NYN9F8r5O++5SV7W+qOOzZVBU5t2G4bDvUmzuSQZPMHzCWcdRvbq38aSMdDQ7RGXYNEbTZJXN94BZtXcyyy5hRNQas4sVbWqZieRE/5YEyRmctQWK9ym/bksEjef7QWBAo1/4pOgMH9ft2sf6a2GuGXuX1tpR0V1TP0UJ0ESWUdcAwkJgAMDFcptAqhoeBLSizPKs3QSG70voLiUldG7IhYV+9YHyj2iN0bMHI9SbDCMZe0maJgV8mnZdEl4TEsJKjK+O5DyN2SFMnZW0hzXjcHBlg1D3PoDzlhMUO1ILHQi1kc0v2mO9MVhRZmEawdhpKCETnJK9bmuwleFIuL6/OXWrq7sXLHQOdQycwjNXxARo27EC+pUW32WKS5rxtXeparVJ+9bjqOUYsGUIbVuUvPAwCq7Kc+q0K6qj0g+QDX0gZFLYz1fYVxOfBCuNthrhXpSuSBnho5ojaS1kNtvn4NBE+7erwM6cYUWfeond/fpiN2ylncM423uP6cBA6ejlzP/OCAIhCaeWjzwfLepQByojsWYYtqxkOWiOMxV8rD1x9OsheHFf61Z+eUliMXtpCI2zzbzVdz990ldEoefXxL0BGjUQb4SOXGgwT/9q0whfy46fje5Br1MMGLUTcoHwPtjHs09ev4ket5k3yeBCU7QImA+qCmLT/HMJDwHHmyYmRZ7DW2FhkIxfDaIjVwoZCdCQheHb1gDXuL2lGa6sQdgZvuGuCdqJZJ+yH+jW6zzo54QgHys+Q6TpmTnkcmQyShL9KBlGadUBY1SRzmpNngW4bHOiAsUu1LMtfc8Z94K9jBCt+S6UKalE391iximczBB9pFuUT3rYllvzJRAbd3SJrjS+PJzM7lrn9JDxm5+nEVy0wLHmdJwQWoF7kxXc3PxNcQ+fm55bhzT8=</diagram></mxfile>
\ No newline at end of file diff --git a/pkg/doc/pool_workflow.svg b/pkg/doc/pool_workflow.svg deleted file mode 100644 index 1e043eaa..00000000 --- a/pkg/doc/pool_workflow.svg +++ /dev/null @@ -1,3 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> -<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="1200px" height="811px" viewBox="-0.5 -0.5 1200 811" content="<mxfile host="Electron" modified="2021-01-24T14:08:35.229Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="XUF1gTxKr8mJfXGLcVd8" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">7Vxbd6M4Ev41OWfnoX0Qkrg8Jul09+zOTGc6vSfTjwRkm22MvCB37Pn1IwGydcExsTGO03HOiUEgLqqqr6o+lXwBr2fLj0U0n/5OE5JduE6yvIDvL1wXAMflX6Jl1bQ4jlO3TIo0qduUhrv0byJPbFoXaULKpq1uYpRmLJ3rjTHNcxIzrS0qCvqonzamWaI1zKMJsRru4iizW+/ThE3li3nh5sAnkk6m8tYeRvWRWSTPbl6lnEYJfVSa4M0FvC4oZfXWbHlNMjF8+sB82HJ0/WQFyVmXDn/hCPvfPk++/Ocm+HyPr5Z/TsfvvOYyP6Js0bzyhetl/IJXU37Im4itu3Q25wPiOv8tScG/CvL/BSmZPJHfcnNu865sJUewoIs8IeIZHH74cZoycjePYnH0kSuNuBGbZXwP8M0xzdmHaJZmQl+u6aJIqxv+Qfi4XUVZOsl5e0bG4k4/SMFSLqnLppnR+fr+6qjIN+Snk6XS1IzSR0JnhBUrfkpzFGNcd2mUFrmNFj+qGhA2GjBVpB9ItY0atZusL74RDN9oZPMMOYHQGliScE1tdmnBpnRC8yi72bRe6UO/Oec3KoaqGvD/EcZWjdlFC0Z1cZBlyv4S3Ucubna/KYfeL5tLVzsruZPzF1Z7if1v6sFNv2pPdtSE/2/CHooozUv+0r/TnD4l2pIrSkyeGr0GOaJiQtgT50lTFUP7pKYUJItY+kMHiTahN11vacqfea1hoedpGgYc2SCvUT9q083QnfVz7K9Orm32H8XrcNgS/8pVHgt9p8X3yv7+dfv5ln/ROSn4a9P8l1Zl/C164PCvKZA02ZiLjF/JNtpZmiS1rpIy/Tt6qK4nlGEuXr0aDHx1gd/vpR5PWpKFB2sn0jyFBtNtOPHOGTke0iUZ1HsH6sc7AJ2RDkL8ZsB1lA80VYaOxyU5jrZgS1vgmwbUQnE939e9RT8agAz5hyMHDSZv/7TOxledjdPV2fiaswEv3tnAvn3NQQGGRC7FxvEZ27hU4cNtnJs4QlDHYtiPjUPDxvEogEPZOHTsCEDcae3030RfwTvwdNm7vYgemaJ/B9DIw0MJ347+7iupi8G+j1g8bVGA5+VxaZZd04wWVV84HhMvjnl7yQr6nShHEj98cA7C4+55XQgNlwqbCErN63BjF2pa5x0tqwPWMA/qaPdJ6rSMbrT2uYO7WbejmwV9u9mq62VRRCvlhAbNthp8YOpeE71vtKe+Yr9m7lp2fpllNI6YleSVZwz4YBsS7AP4GHlHiecD3Y0gA1KOGMxDSwnaUecnFDaAGGhi6SmwQ7pvxyPgDyZuS9q/5ilLuSS5N+jTpycRCcatPt2LA/IwHsan+4GOq4HT4tOdFp9uGmB/Pr09bXpjajt5dXgqr36YzD3L6lQ69Y572vtzT6ukXvfEmjm6P/T7AV7fzKu8keerrKk7GA4jSyXcN+nX0oc8/NWl1I/0PYsz9Qfzu/CksA8UzFf40+flcv7pkrmusA/dF4X70MZ974yNHPZm5Dy0xrgnqwZoBAejRVE7SJ/V1Acf/B2GXO3dkiLlwyV076VYN+rbuvfiaoADA82PyKTiQ9cOPtrVwceHdcAONtT/CPwRaqEOZDHTOeKbNO4+8M1xJROxL74NAGZ2DHpH8qR65nJO81IkJxyL+L+p2FyUZ52h9CfeutjCmKHphwj0Xf2qbjAabNrHNuebJYn5pcTXfcqm11wQZMksHTh/nsic+2nlidrmfo7GE21JB88q0jhdkYWs1DuT6R/gID1C4A27PP6OHsdx+a6d0aAzdgnSyPrw+BAiA7t7mi1wDdoCgAHrQNzQEvivAgKjir4cF0R8recIryt0LETbii7EAxb2rMJPqh6uo08cyqmYA9UDADxCoakgCKmsJhhMW+yqoTsWxd8rR8r96Pbp5INCCEyCBLWFEIH7IOpgTxJCACc4df0ItI33tOsENBrS7UhDrqMItdtpaEi3Y1TRtXZzky46h9JhA6QHrqVNRn7wNZ0RuniN+QFwPd25tyYIshZwkAQB2umaNfDlNJqLzXFGlpdiuVtlLEmz+T7OorJMY8NYN9F8r5O++5SV7W+qOOzZVBU5t2G4bDvUmzuSQZPMHzCWcdRvbq38aSMdDQ7RGXYNEbTZJXN94BZtXcyyy5hRNQas4sVbWqZieRE/5YEyRmctQWK9ym/bksEjef7QWBAo1/4pOgMH9ft2sf6a2GuGXuX1tpR0V1TP0UJ0ESWUdcAwkJgAMDFcptAqhoeBLSizPKs3QSG70voLiUldG7IhYV+9YHyj2iN0bMHI9SbDCMZe0maJgV8mnZdEl4TEsJKjK+O5DyN2SFMnZW0hzXjcHBlg1D3PoDzlhMUO1ILHQi1kc0v2mO9MVhRZmEawdhpKCETnJK9bmuwleFIuL6/OXWrq7sXLHQOdQycwjNXxARo27EC+pUW32WKS5rxtXeparVJ+9bjqOUYsGUIbVuUvPAwCq7Kc+q0K6qj0g+QDX0gZFLYz1fYVxOfBCuNthrhXpSuSBnho5ojaS1kNtvn4NBE+7erwM6cYUWfeond/fpiN2ylncM423uP6cBA6ejlzP/OCAIhCaeWjzwfLepQByojsWYYtqxkOWiOMxV8rD1x9OsheHFf61Z+eUliMXtpCI2zzbzVdz990ldEoefXxL0BGjUQb4SOXGgwT/9q0whfy46fje5Br1MMGLUTcoHwPtjHs09ev4ket5k3yeBCU7QImA+qCmLT/HMJDwHHmyYmRZ7DW2FhkIxfDaIjVwoZCdCQheHb1gDXuL2lGa6sQdgZvuGuCdqJZJ+yH+jW6zzo54QgHys+Q6TpmTnkcmQyShL9KBlGadUBY1SRzmpNngW4bHOiAsUu1LMtfc8Z94K9jBCt+S6UKalE391iximczBB9pFuUT3rYllvzJRAbd3SJrjS+PJzM7lrn9JDxm5+nEVy0wLHmdJwQWoF7kxXc3PxNcQ+fm55bhzT8=</diagram></mxfile>" style="background-color: rgb(255, 255, 255);"><defs/><g><rect x="0" y="0" width="1199" height="810" fill="#ffffff" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe flex-start; justify-content: unsafe flex-start; width: 1197px; height: 1px; padding-top: 7px; margin-left: 2px;"><div style="box-sizing: border-box; font-size: 0; text-align: left; "><div style="display: inline-block; font-size: 12px; font-family: Courier New; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; "><h1>Simple User request</h1></div></div></div></foreignObject><text x="2" y="19" fill="#000000" font-family="Courier New" font-size="12px">Simple User request</text></switch></g><path d="M 417.5 574 L 417.5 657.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 417.5 662.88 L 414 655.88 L 417.5 657.63 L 421 655.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 616px; margin-left: 296px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Give me sync worker (POP operation)</div></div></div></foreignObject><text x="296" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Give me sync worker (POP operation)</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 617px; margin-left: 418px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">3</div></div></div></foreignObject><text x="418" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">3</text></switch></g><path d="M 492.5 514 L 492.5 430.37" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 492.5 425.12 L 496 432.12 L 492.5 430.37 L 489 432.12 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 493px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">5</div></div></div></foreignObject><text x="493" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">5</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 465px; margin-left: 535px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Get worker</div></div></div></foreignObject><text x="535" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Get worker</text></switch></g><rect x="380" y="514" width="150" height="60" fill="#ffe6cc" stroke="#d79b00" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 544px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Workers Watcher</div></div></div></foreignObject><text x="455" y="548" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Workers Watcher</text></switch></g><path d="M 280 424 L 280 544 L 373.63 544" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 378.88 544 L 371.88 547.5 L 373.63 544 L 371.88 540.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 515px; margin-left: 202px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Allocate sync workers</div></div></div></foreignObject><text x="202" y="518" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Allocate sync workers</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 514px; margin-left: 280px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">1</div></div></div></foreignObject><text x="280" y="518" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">1</text></switch></g><rect x="230" y="384" width="100" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 404px; margin-left: 231px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Initialize</div></div></div></foreignObject><text x="280" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Initialize</text></switch></g><path d="M 417.5 424 L 417.5 507.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 417.5 512.88 L 414 505.88 L 417.5 507.63 L 421 505.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 352px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Give me SyncWorker</div></div></div></foreignObject><text x="352" y="467" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Give me SyncWorker</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 418px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">2</div></div></div></foreignObject><text x="418" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">2</text></switch></g><path d="M 530 414 L 700.63 414" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 705.88 414 L 698.88 417.5 L 700.63 414 L 698.88 410.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 415px; margin-left: 618px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">6</div></div></div></foreignObject><text x="618" y="418" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">6</text></switch></g><path d="M 492.5 384 L 483 384 L 483 324 L 520 324 L 520 83 L 468.87 83" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 463.62 83 L 470.62 79.5 L 468.87 83 L 470.62 86.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 224px; margin-left: 521px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">10</div></div></div></foreignObject><text x="521" y="227" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">10</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 225px; margin-left: 597px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Send response to the user</div></div></div></foreignObject><text x="597" y="228" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Send response to the user</text></switch></g><rect x="380" y="384" width="150" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 404px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Exec/ExecWithContext</div></div></div></foreignObject><text x="455" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec/ExecWithContext</text></switch></g><path d="M 492.5 664 L 492.5 624 L 492.5 580.37" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 492.5 575.12 L 496 582.12 L 492.5 580.37 L 489 582.12 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 616px; margin-left: 494px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">4</div></div></div></foreignObject><text x="494" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">4</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 617px; margin-left: 610px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">I have free workers, here you are</div></div></div></foreignObject><text x="610" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">I have free workers, here you are</text></switch></g><rect x="380" y="664" width="150" height="60" fill="#d5e8d4" stroke="#82b366" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 694px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Stack with workers</div></div></div></foreignObject><text x="455" y="698" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Stack with workers</text></switch></g><path d="M 707 394 L 536.37 394" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 531.12 394 L 538.12 390.5 L 536.37 394 L 538.12 397.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 394px; margin-left: 618px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">9</div></div></div></foreignObject><text x="618" y="397" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">9</text></switch></g><rect x="707" y="384" width="163" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 161px; height: 1px; padding-top: 404px; margin-left: 708px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Exec/ExecWithTimeout</div></div></div></foreignObject><text x="789" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec/ExecWithTimeout</text></switch></g><path d="M 450 289.5 L 460 289.5 L 460 364.5 L 470.5 364.5 L 455 383.5 L 439.5 364.5 L 450 364.5 Z" fill="none" stroke="#000000" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="all"/><ellipse cx="455" cy="84.5" rx="7.5" ry="7.5" fill="#ffffff" stroke="#000000" pointer-events="all"/><path d="M 455 92 L 455 117 M 455 97 L 440 97 M 455 97 L 470 97 M 455 117 L 440 137 M 455 117 L 470 137" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 144px; margin-left: 455px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">User request</div></div></div></foreignObject><text x="455" y="156" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">User...</text></switch></g><rect x="607" y="426" width="198" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 435px; margin-left: 706px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Send request to the worker</div></div></div></foreignObject><text x="706" y="438" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Send request to the worker</text></switch></g><rect x="618" y="368" width="125" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 377px; margin-left: 681px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Receive response</div></div></div></foreignObject><text x="681" y="380" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Receive response</text></switch></g><ellipse cx="125" cy="404" rx="11" ry="11" fill="#000000" stroke="#ff0000" pointer-events="all"/><path d="M 140 404 L 227.76 404" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 219.88 408.5 L 228.88 404 L 219.88 399.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="all"/><rect x="45" y="371" width="161" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 380px; margin-left: 126px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Plugin Initialization</div></div></div></foreignObject><text x="126" y="383" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Plugin Initialization</text></switch></g><path d="M 870 414 L 983.63 414" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 988.88 414 L 981.88 417.5 L 983.63 414 L 981.88 410.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 413px; margin-left: 930px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">7</div></div></div></foreignObject><text x="930" y="416" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">7</text></switch></g><path d="M 990 394 L 876.37 394" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 871.12 394 L 878.12 390.5 L 876.37 394 L 878.12 397.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 394px; margin-left: 931px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">8</div></div></div></foreignObject><text x="931" y="397" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">8</text></switch></g><rect x="990" y="384" width="100" height="40" fill="#f5f5f5" stroke="#666666" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 404px; margin-left: 991px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #333333; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Worker</div></div></div></foreignObject><text x="1040" y="408" fill="#333333" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Worker</text></switch></g><rect x="893" y="426" width="96" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 435px; margin-left: 941px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Exec payload</div></div></div></foreignObject><text x="941" y="438" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec payload</text></switch></g><rect x="873" y="366" width="125" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 375px; margin-left: 936px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Reveive response</div></div></div></foreignObject><text x="936" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Reveive response</text></switch></g><rect x="401" y="255" width="108" height="34" fill="#f8cecc" stroke="#b85450" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 106px; height: 1px; padding-top: 272px; margin-left: 402px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">HTTP plugin</div></div></div></foreignObject><text x="455" y="276" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">HTTP plugin</text></switch></g><path d="M 450 157.5 L 460 157.5 L 460 235.5 L 470.5 235.5 L 455 254.5 L 439.5 235.5 L 450 235.5 Z" fill="none" stroke="#000000" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="all"/><rect x="490" y="364" width="40" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 38px; height: 1px; padding-top: 374px; margin-left: 491px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Pool</div></div></div></foreignObject><text x="510" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Pool</text></switch></g><rect x="770" y="364" width="100" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 374px; margin-left: 771px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Golang Worker</div></div></div></foreignObject><text x="820" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Golang Worker</text></switch></g><rect x="1006" y="364" width="84" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 82px; height: 1px; padding-top: 374px; margin-left: 1007px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">PHP worker</div></div></div></foreignObject><text x="1048" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">PHP worker</text></switch></g></g><switch><g requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"/><a transform="translate(0,-5)" xlink:href="https://www.diagrams.net/doc/faq/svg-export-text-problems" target="_blank"><text text-anchor="middle" font-size="10px" x="50%" y="100%">Viewer does not support full SVG 1.1</text></a></switch></svg>
\ No newline at end of file diff --git a/pkg/events/general.go b/pkg/events/general.go deleted file mode 100755 index 5cf13e10..00000000 --- a/pkg/events/general.go +++ /dev/null @@ -1,41 +0,0 @@ -package events - -import ( - "sync" -) - -const UnknownEventType string = "Unknown event type" - -// HandlerImpl helps to broadcast events to multiple listeners. -type HandlerImpl struct { - listeners []Listener - sync.RWMutex // all receivers should be pointers -} - -func NewEventsHandler() Handler { - return &HandlerImpl{listeners: make([]Listener, 0, 2)} -} - -// NumListeners returns number of event listeners. -func (eb *HandlerImpl) NumListeners() int { - eb.Lock() - defer eb.Unlock() - return len(eb.listeners) -} - -// AddListener registers new event listener. -func (eb *HandlerImpl) AddListener(listener Listener) { - eb.Lock() - defer eb.Unlock() - eb.listeners = append(eb.listeners, listener) -} - -// Push broadcast events across all event listeners. -func (eb *HandlerImpl) Push(e interface{}) { - // ReadLock here because we are not changing listeners - eb.RLock() - defer eb.RUnlock() - for k := range eb.listeners { - eb.listeners[k](e) - } -} diff --git a/pkg/events/grpc_event.go b/pkg/events/grpc_event.go deleted file mode 100644 index 31ff4957..00000000 --- a/pkg/events/grpc_event.go +++ /dev/null @@ -1,39 +0,0 @@ -package events - -import ( - "time" - - "google.golang.org/grpc" -) - -const ( - // EventUnaryCallOk represents success unary call response - EventUnaryCallOk G = iota + 13000 - - // EventUnaryCallErr raised when unary call ended with error - EventUnaryCallErr -) - -type G int64 - -func (ev G) String() string { - switch ev { - case EventUnaryCallOk: - return "EventUnaryCallOk" - case EventUnaryCallErr: - return "EventUnaryCallErr" - } - return UnknownEventType -} - -// JobEvent represent job event. -type GRPCEvent struct { - Event G - // Info contains unary call info. - Info *grpc.UnaryServerInfo - // Error associated with event. - Error error - // event timings - Start time.Time - Elapsed time.Duration -} diff --git a/pkg/events/interface.go b/pkg/events/interface.go deleted file mode 100644 index 7d57e4d0..00000000 --- a/pkg/events/interface.go +++ /dev/null @@ -1,14 +0,0 @@ -package events - -// Handler interface -type Handler interface { - // NumListeners return number of active listeners - NumListeners() int - // AddListener adds lister to the publisher - AddListener(listener Listener) - // Push pushes event to the listeners - Push(e interface{}) -} - -// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service. -type Listener func(event interface{}) diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go deleted file mode 100644 index f65ede67..00000000 --- a/pkg/events/jobs_events.go +++ /dev/null @@ -1,81 +0,0 @@ -package events - -import ( - "time" -) - -const ( - // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK J = iota + 12000 - - // EventPushError caused when job can not be registered. - EventPushError - - // EventJobStart thrown when new job received. - EventJobStart - - // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. - EventJobOK - - // EventJobError thrown on all job related errors. See JobError as context. - EventJobError - - // EventPipeActive when pipeline has started. - EventPipeActive - - // EventPipeStopped when pipeline has been stopped. - EventPipeStopped - - // EventPipePaused when pipeline has been paused. - EventPipePaused - - // EventPipeError when pipeline specific error happen. - EventPipeError - - // EventDriverReady thrown when broken is ready to accept/serve tasks. - EventDriverReady -) - -type J int64 - -func (ev J) String() string { - switch ev { - case EventPushOK: - return "EventPushOK" - case EventPushError: - return "EventPushError" - case EventJobStart: - return "EventJobStart" - case EventJobOK: - return "EventJobOK" - case EventJobError: - return "EventJobError" - case EventPipeActive: - return "EventPipeActive" - case EventPipeStopped: - return "EventPipeStopped" - case EventPipeError: - return "EventPipeError" - case EventDriverReady: - return "EventDriverReady" - case EventPipePaused: - return "EventPipePaused" - } - return UnknownEventType -} - -// JobEvent represent job event. -type JobEvent struct { - Event J - // String is job id. - ID string - // Pipeline name - Pipeline string - // Associated driver name (amqp, ephemeral, etc) - Driver string - // Error for the jobs/pipes errors - Error error - // event timings - Start time.Time - Elapsed time.Duration -} diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go deleted file mode 100644 index 4d4cae5d..00000000 --- a/pkg/events/pool_events.go +++ /dev/null @@ -1,70 +0,0 @@ -package events - -const ( - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct P = iota + 10000 - - // EventWorkerDestruct thrown after worker destruction. - EventWorkerDestruct - - // EventPoolError caused on pool wide errors. - EventPoolError - - // EventSupervisorError triggered when supervisor can not complete work. - EventSupervisorError - - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed - EventNoFreeWorkers - - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory - - // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) - EventTTL - - // EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL - - // EventPoolRestart triggered when pool restart is needed - EventPoolRestart -) - -type P int64 - -func (ev P) String() string { - switch ev { - case EventWorkerConstruct: - return "EventWorkerConstruct" - case EventWorkerDestruct: - return "EventWorkerDestruct" - case EventPoolError: - return "EventPoolError" - case EventSupervisorError: - return "EventSupervisorError" - case EventNoFreeWorkers: - return "EventNoFreeWorkers" - case EventMaxMemory: - return "EventMaxMemory" - case EventTTL: - return "EventTTL" - case EventIdleTTL: - return "EventIdleTTL" - case EventExecTTL: - return "EventExecTTL" - case EventPoolRestart: - return "EventPoolRestart" - } - return UnknownEventType -} - -// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. -type PoolEvent struct { - // Event type, see below. - Event P - - // Payload depends on event type, typically it's worker or error. - Payload interface{} -} diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go deleted file mode 100644 index 39c38e57..00000000 --- a/pkg/events/worker_events.go +++ /dev/null @@ -1,36 +0,0 @@ -package events - -const ( - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError W = iota + 11000 - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. - EventWorkerLog - // EventWorkerStderr is the worker standard error output - EventWorkerStderr -) - -type W int64 - -func (ev W) String() string { - switch ev { - case EventWorkerError: - return "EventWorkerError" - case EventWorkerLog: - return "EventWorkerLog" - case EventWorkerStderr: - return "EventWorkerStderr" - } - return UnknownEventType -} - -// WorkerEvent wraps worker events. -type WorkerEvent struct { - // Event id, see below. - Event W - - // Worker triggered the event. - Worker interface{} - - // Event specific payload. - Payload interface{} -} diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go deleted file mode 100755 index e1e45ac1..00000000 --- a/pkg/payload/payload.go +++ /dev/null @@ -1,20 +0,0 @@ -package payload - -import ( - "github.com/spiral/roadrunner/v2/utils" -) - -// Payload carries binary header and body to stack and -// back to the server. -type Payload struct { - // Context represent payload context, might be omitted. - Context []byte - - // body contains binary payload to be processed by WorkerProcess. - Body []byte -} - -// String returns payload body as string -func (p *Payload) String() string { - return utils.AsString(p.Body) -} diff --git a/pkg/pool/config.go b/pkg/pool/config.go deleted file mode 100644 index 3a058956..00000000 --- a/pkg/pool/config.go +++ /dev/null @@ -1,75 +0,0 @@ -package pool - -import ( - "runtime" - "time" -) - -// Config .. Pool config Configures the pool behavior. -type Config struct { - // Debug flag creates new fresh worker before every request. - Debug bool - - // NumWorkers defines how many sub-processes can be run at once. This value - // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. - NumWorkers uint64 `mapstructure:"num_workers"` - - // MaxJobs defines how many executions is allowed for the worker until - // it's destruction. set 1 to create new process for each new task, 0 to let - // worker handle as many tasks as it can. - MaxJobs uint64 `mapstructure:"max_jobs"` - - // AllocateTimeout defines for how long pool will be waiting for a worker to - // be freed to handle the task. Defaults to 60s. - AllocateTimeout time.Duration `mapstructure:"allocate_timeout"` - - // DestroyTimeout defines for how long pool should be waiting for worker to - // properly destroy, if timeout reached worker will be killed. Defaults to 60s. - DestroyTimeout time.Duration `mapstructure:"destroy_timeout"` - - // Supervision config to limit worker and pool memory usage. - Supervisor *SupervisorConfig `mapstructure:"supervisor"` -} - -// InitDefaults enables default config values. -func (cfg *Config) InitDefaults() { - if cfg.NumWorkers == 0 { - cfg.NumWorkers = uint64(runtime.NumCPU()) - } - - if cfg.AllocateTimeout == 0 { - cfg.AllocateTimeout = time.Minute - } - - if cfg.DestroyTimeout == 0 { - cfg.DestroyTimeout = time.Minute - } - if cfg.Supervisor == nil { - return - } - cfg.Supervisor.InitDefaults() -} - -type SupervisorConfig struct { - // WatchTick defines how often to check the state of worker. - WatchTick time.Duration `mapstructure:"watch_tick"` - - // TTL defines maximum time worker is allowed to live. - TTL time.Duration `mapstructure:"ttl"` - - // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL time.Duration `mapstructure:"idle_ttl"` - - // ExecTTL defines maximum lifetime per job. - ExecTTL time.Duration `mapstructure:"exec_ttl"` - - // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"` -} - -// InitDefaults enables default config values. -func (cfg *SupervisorConfig) InitDefaults() { - if cfg.WatchTick == 0 { - cfg.WatchTick = time.Second - } -} diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go deleted file mode 100644 index 4049122c..00000000 --- a/pkg/pool/interface.go +++ /dev/null @@ -1,53 +0,0 @@ -package pool - -import ( - "context" - - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -// Pool managed set of inner worker processes. -type Pool interface { - // GetConfig returns pool configuration. - GetConfig() interface{} - - // Exec executes task with payload - Exec(rqs *payload.Payload) (*payload.Payload, error) - - // Workers returns worker list associated with the pool. - Workers() (workers []worker.BaseProcess) - - // RemoveWorker removes worker from the pool. - RemoveWorker(worker worker.BaseProcess) error - - // Destroy all underlying stack (but let them to complete the task). - Destroy(ctx context.Context) - - // ExecWithContext executes task with context which is used with timeout - execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error) -} - -// Watcher is an interface for the Sync workers lifecycle -type Watcher interface { - // Watch used to add workers to the container - Watch(workers []worker.BaseProcess) error - - // Take takes the first free worker - Take(ctx context.Context) (worker.BaseProcess, error) - - // Release releases the worker putting it back to the queue - Release(w worker.BaseProcess) - - // Allocate - allocates new worker and put it into the WorkerWatcher - Allocate() error - - // Destroy destroys the underlying container - Destroy(ctx context.Context) - - // List return all container w/o removing it from internal storage - List() []worker.BaseProcess - - // Remove will remove worker from the container - Remove(wb worker.BaseProcess) -} diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go deleted file mode 100755 index 7e190846..00000000 --- a/pkg/pool/static_pool.go +++ /dev/null @@ -1,374 +0,0 @@ -package pool - -import ( - "context" - "os/exec" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/transport" - "github.com/spiral/roadrunner/v2/pkg/worker" - workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" - "github.com/spiral/roadrunner/v2/utils" -) - -// StopRequest can be sent by worker to indicate that restart is required. -const StopRequest = "{\"stop\":true}" - -// ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) - -type Options func(p *StaticPool) - -type Command func() *exec.Cmd - -// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. -type StaticPool struct { - cfg *Config - - // worker command creator - cmd Command - - // creates and connects to stack - factory transport.Factory - - // distributes the events - events events.Handler - - // saved list of event listeners - listeners []events.Listener - - // manages worker states and TTLs - ww Watcher - - // allocate new worker - allocator worker.Allocator - - // errEncoder is the default Exec error encoder - errEncoder ErrorEncoder -} - -// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { - const op = errors.Op("static_pool_initialize") - if factory == nil { - return nil, errors.E(op, errors.Str("no factory initialized")) - } - cfg.InitDefaults() - - if cfg.Debug { - cfg.NumWorkers = 0 - cfg.MaxJobs = 1 - } - - p := &StaticPool{ - cfg: cfg, - cmd: cmd, - factory: factory, - events: events.NewEventsHandler(), - } - - // add pool options - for i := 0; i < len(options); i++ { - options[i](p) - } - - // set up workers allocator - p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) - // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout) - - // allocate requested number of workers - workers, err := p.allocateWorkers(p.cfg.NumWorkers) - if err != nil { - return nil, errors.E(op, err) - } - - // add workers to the watcher - err = p.ww.Watch(workers) - if err != nil { - return nil, errors.E(op, err) - } - - p.errEncoder = defaultErrEncoder(p) - - // if supervised config not nil, guess, that pool wanted to be supervised - if cfg.Supervisor != nil { - sp := supervisorWrapper(p, p.events, p.cfg.Supervisor) - // start watcher timer - sp.Start() - return sp, nil - } - - return p, nil -} - -func AddListeners(listeners ...events.Listener) Options { - return func(p *StaticPool) { - p.listeners = listeners - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - -// AddListener connects event listener to the pool. -func (sp *StaticPool) addListener(listener events.Listener) { - sp.events.AddListener(listener) -} - -// GetConfig returns associated pool configuration. Immutable. -func (sp *StaticPool) GetConfig() interface{} { - return sp.cfg -} - -// Workers returns worker list associated with the pool. -func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { - return sp.ww.List() -} - -func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { - sp.ww.Remove(wb) - return nil -} - -// Exec executes provided payload on the worker -func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("static_pool_exec") - if sp.cfg.Debug { - return sp.execDebug(p) - } - ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) - defer cancel() - w, err := sp.takeWorker(ctxGetFree, op) - if err != nil { - return nil, errors.E(op, err) - } - - rsp, err := w.(worker.SyncWorker).Exec(p) - if err != nil { - return sp.errEncoder(err, w) - } - - // worker want's to be terminated - if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest { - sp.stopWorker(w) - return sp.Exec(p) - } - - if sp.cfg.MaxJobs != 0 { - sp.checkMaxJobs(w) - return rsp, nil - } - // return worker back - sp.ww.Release(w) - return rsp, nil -} - -// Be careful, sync with pool.Exec method -func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("static_pool_exec_with_context") - if sp.cfg.Debug { - return sp.execDebugWithTTL(ctx, p) - } - - ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) - defer cancel() - w, err := sp.takeWorker(ctxAlloc, op) - if err != nil { - return nil, errors.E(op, err) - } - - rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p) - if err != nil { - return sp.errEncoder(err, w) - } - - // worker want's to be terminated - if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest { - sp.stopWorker(w) - return sp.execWithTTL(ctx, p) - } - - if sp.cfg.MaxJobs != 0 { - sp.checkMaxJobs(w) - return rsp, nil - } - - // return worker back - sp.ww.Release(w) - return rsp, nil -} - -func (sp *StaticPool) stopWorker(w worker.BaseProcess) { - const op = errors.Op("static_pool_stop_worker") - w.State().Set(worker.StateInvalid) - err := w.Stop() - if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) - } -} - -// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs -//go:inline -func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) { - if w.State().NumExecs() >= sp.cfg.MaxJobs { - w.State().Set(worker.StateMaxJobsReached) - sp.ww.Release(w) - return - } - - sp.ww.Release(w) -} - -func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { - // Get function consumes context with timeout - w, err := sp.ww.Take(ctxGetFree) - if err != nil { - // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout - if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)}) - return nil, errors.E(op, err) - } - // else if err not nil - return error - return nil, errors.E(op, err) - } - return w, nil -} - -// Destroy all underlying stack (but let them complete the task). -func (sp *StaticPool) Destroy(ctx context.Context) { - sp.ww.Destroy(ctx) -} - -func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (*payload.Payload, error) { - const op = errors.Op("error_encoder") - // just push event if on any stage was timeout error - switch { - case errors.Is(errors.ExecTTL, err): - sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)}) - w.State().Set(worker.StateInvalid) - return nil, err - - case errors.Is(errors.SoftJob, err): - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) - - // if max jobs exceed - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - // mark old as invalid and stop - w.State().Set(worker.StateInvalid) - errS := w.Stop() - if errS != nil { - return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) - } - - return nil, err - } - - // soft jobs errors are allowed, just put the worker back - sp.ww.Release(w) - - return nil, err - case errors.Is(errors.Network, err): - // in case of network error, we can't stop the worker, we should kill it - w.State().Set(worker.StateInvalid) - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) - - // kill the worker instead of sending net packet to it - _ = w.Kill() - - return nil, err - default: - w.State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) - // stop the worker, worker here might be in the broken state (network) - errS := w.Stop() - if errS != nil { - return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS)) - } - - return nil, errors.E(op, err) - } - } -} - -func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { - return func() (worker.SyncWorker, error) { - ctxT, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...) - if err != nil { - return nil, err - } - - // wrap sync worker - sw := worker.From(w) - - sp.events.Push(events.PoolEvent{ - Event: events.EventWorkerConstruct, - Payload: sw, - }) - return sw, nil - } -} - -// execDebug used when debug mode was not set and exec_ttl is 0 -func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("static_pool_exec_debug") - sw, err := sp.allocator() - if err != nil { - return nil, err - } - - // redirect call to the workers' exec method (without ttl) - r, err := sw.Exec(p) - if err != nil { - return nil, errors.E(op, err) - } - - // destroy the worker - sw.State().Set(worker.StateDestroyed) - err = sw.Kill() - if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) - return nil, errors.E(op, err) - } - - return r, nil -} - -// execDebugWithTTL used when user set debug mode and exec_ttl -func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { - sw, err := sp.allocator() - if err != nil { - return nil, err - } - - // redirect call to the worker with TTL - r, err := sw.ExecWithTTL(ctx, p) - if stopErr := sw.Stop(); stopErr != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) - } - - return r, err -} - -// allocate required number of stack -func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) { - const op = errors.Op("static_pool_allocate_workers") - workers := make([]worker.BaseProcess, 0, numWorkers) - - // constant number of stack simplify logic - for i := uint64(0); i < numWorkers; i++ { - w, err := sp.allocator() - if err != nil { - return nil, errors.E(op, errors.WorkerAllocate, err) - } - - workers = append(workers, w) - } - return workers, nil -} diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go deleted file mode 100755 index cb6578a8..00000000 --- a/pkg/pool/static_pool_test.go +++ /dev/null @@ -1,721 +0,0 @@ -package pool - -import ( - "context" - "log" - "os/exec" - "runtime" - "strconv" - "strings" - "sync" - "testing" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/transport/pipe" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/utils" - "github.com/stretchr/testify/assert" -) - -var cfg = &Config{ - NumWorkers: uint64(runtime.NumCPU()), - AllocateTimeout: time.Second * 5, - DestroyTimeout: time.Second * 5, -} - -func Test_NewPool(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), - cfg, - ) - assert.NoError(t, err) - - defer p.Destroy(ctx) - - assert.NotNil(t, p) -} - -func Test_StaticPool_Invalid(t *testing.T) { - p, err := Initialize( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") }, - pipe.NewPipeFactory(), - cfg, - ) - - assert.Nil(t, p) - assert.Error(t, err) -} - -func Test_ConfigNoErrorInitDefaults(t *testing.T) { - p, err := Initialize( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.NotNil(t, p) - assert.NoError(t, err) -} - -func Test_StaticPool_Echo(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), - cfg, - ) - assert.NoError(t, err) - - defer p.Destroy(ctx) - - assert.NotNil(t, p) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_StaticPool_Echo_NilContext(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), - cfg, - ) - assert.NoError(t, err) - - defer p.Destroy(ctx) - - assert.NotNil(t, p) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_StaticPool_Echo_Context(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") }, - pipe.NewPipeFactory(), - cfg, - ) - assert.NoError(t, err) - - defer p.Destroy(ctx) - - assert.NotNil(t, p) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.Empty(t, res.Body) - assert.NotNil(t, res.Context) - - assert.Equal(t, "world", string(res.Context)) -} - -func Test_StaticPool_JobError(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") }, - pipe.NewPipeFactory(), - cfg, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - time.Sleep(time.Second * 2) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - - if errors.Is(errors.SoftJob, err) == false { - t.Fatal("error should be of type errors.Exec") - } - - assert.Contains(t, err.Error(), "hello") - p.Destroy(ctx) -} - -func Test_StaticPool_Broken_Replace(t *testing.T) { - ctx := context.Background() - block := make(chan struct{}, 10) - - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } - - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") }, - pipe.NewPipeFactory(), - cfg, - AddListeners(listener), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - - <-block - - p.Destroy(ctx) -} - -func Test_StaticPool_Broken_FromOutside(t *testing.T) { - ctx := context.Background() - // Run pool events - ev := make(chan struct{}, 1) - listener := func(event interface{}) { - if pe, ok := event.(events.PoolEvent); ok { - if pe.Event == events.EventWorkerConstruct { - ev <- struct{}{} - } - } - } - - var cfg2 = &Config{ - NumWorkers: 1, - AllocateTimeout: time.Second * 5, - DestroyTimeout: time.Second * 5, - } - - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), - cfg2, - AddListeners(listener), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - defer p.Destroy(ctx) - time.Sleep(time.Second) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) - assert.Equal(t, 1, len(p.Workers())) - - // first creation - <-ev - // killing random worker and expecting pool to replace it - err = p.Workers()[0].Kill() - if err != nil { - t.Errorf("error killing the process: error %v", err) - } - - // re-creation - <-ev - - list := p.Workers() - for _, w := range list { - assert.Equal(t, worker.StateReady, w.State().Value()) - } -} - -func Test_StaticPool_AllocateTimeout(t *testing.T) { - p, err := Initialize( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - NumWorkers: 1, - AllocateTimeout: time.Nanosecond * 1, - DestroyTimeout: time.Second * 2, - }, - ) - assert.Error(t, err) - if !errors.Is(errors.WorkerAllocate, err) { - t.Fatal("error should be of type WorkerAllocate") - } - assert.Nil(t, p) -} - -func Test_StaticPool_Replace_Worker(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - NumWorkers: 1, - MaxJobs: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - defer p.Destroy(ctx) - // prevent process is not ready - time.Sleep(time.Second) - - var lastPID string - lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - - res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Equal(t, lastPID, string(res.Body)) - - for i := 0; i < 10; i++ { - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.NotEqual(t, lastPID, string(res.Body)) - lastPID = string(res.Body) - } -} - -func Test_StaticPool_Debug_Worker(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - Debug: true, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - defer p.Destroy(ctx) - - // prevent process is not ready - time.Sleep(time.Second) - assert.Len(t, p.Workers(), 0) - - var lastPID string - res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NotEqual(t, lastPID, string(res.Body)) - - assert.Len(t, p.Workers(), 0) - - for i := 0; i < 10; i++ { - assert.Len(t, p.Workers(), 0) - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.NotEqual(t, lastPID, string(res.Body)) - lastPID = string(res.Body) - } -} - -// identical to replace but controlled on worker side -func Test_StaticPool_Stop_Worker(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - defer p.Destroy(ctx) - time.Sleep(time.Second) - - var lastPID string - lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Fatal(err) - } - assert.Equal(t, lastPID, string(res.Body)) - - for i := 0; i < 10; i++ { - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.NotEqual(t, lastPID, string(res.Body)) - lastPID = string(res.Body) - } -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_Destroy_And_Close(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.NotNil(t, p) - assert.NoError(t, err) - - p.Destroy(ctx) - _, err = p.Exec(&payload.Payload{Body: []byte("100")}) - assert.Error(t, err) -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.NotNil(t, p) - assert.NoError(t, err) - - go func() { - _, errP := p.Exec(&payload.Payload{Body: []byte("100")}) - if errP != nil { - t.Errorf("error executing payload: error %v", err) - } - }() - time.Sleep(time.Millisecond * 100) - - p.Destroy(ctx) - _, err = p.Exec(&payload.Payload{Body: []byte("100")}) - assert.Error(t, err) -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_Handle_Dead(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - NumWorkers: 5, - AllocateTimeout: time.Second * 100, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - time.Sleep(time.Second) - for i := range p.Workers() { - p.Workers()[i].State().Set(worker.StateErrored) - } - - _, err = p.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NoError(t, err) - p.Destroy(ctx) -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_Slow_Destroy(t *testing.T) { - p, err := Initialize( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - NumWorkers: 5, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - p.Destroy(context.Background()) -} - -func Test_StaticPool_NoFreeWorkers(t *testing.T) { - ctx := context.Background() - block := make(chan struct{}, 10) - - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventNoFreeWorkers { - block <- struct{}{} - } - } - } - - p, err := Initialize( - ctx, - // sleep for the 3 seconds - func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - Debug: false, - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: nil, - }, - AddListeners(listener), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - go func() { - _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) - }() - - time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - - <-block - - p.Destroy(ctx) -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_WrongCommand1(t *testing.T) { - p, err := Initialize( - context.Background(), - func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - NumWorkers: 5, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.Error(t, err) - assert.Nil(t, p) -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_WrongCommand2(t *testing.T) { - p, err := Initialize( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - NumWorkers: 5, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.Error(t, err) - assert.Nil(t, p) -} - -/* PTR: -Benchmark_Pool_Echo-32 49076 29926 ns/op 8016 B/op 20 allocs/op -Benchmark_Pool_Echo-32 47257 30779 ns/op 8047 B/op 20 allocs/op -Benchmark_Pool_Echo-32 46737 29440 ns/op 8065 B/op 20 allocs/op -Benchmark_Pool_Echo-32 51177 29074 ns/op 7981 B/op 20 allocs/op -Benchmark_Pool_Echo-32 51764 28319 ns/op 8012 B/op 20 allocs/op -Benchmark_Pool_Echo-32 54054 30714 ns/op 7987 B/op 20 allocs/op -Benchmark_Pool_Echo-32 54391 30689 ns/op 8055 B/op 20 allocs/op - -VAL: -Benchmark_Pool_Echo-32 47936 28679 ns/op 7942 B/op 19 allocs/op -Benchmark_Pool_Echo-32 49010 29830 ns/op 7970 B/op 19 allocs/op -Benchmark_Pool_Echo-32 46771 29031 ns/op 8014 B/op 19 allocs/op -Benchmark_Pool_Echo-32 47760 30517 ns/op 7955 B/op 19 allocs/op -Benchmark_Pool_Echo-32 48148 29816 ns/op 7950 B/op 19 allocs/op -Benchmark_Pool_Echo-32 52705 29809 ns/op 7979 B/op 19 allocs/op -Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allocs/op -*/ -func Benchmark_Pool_Echo(b *testing.B) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), - cfg, - ) - if err != nil { - b.Fatal(err) - } - - bd := make([]byte, 1024) - c := make([]byte, 1024) - - pld := &payload.Payload{ - Context: c, - Body: bd, - } - - b.ResetTimer() - b.ReportAllocs() - for n := 0; n < b.N; n++ { - if _, err := p.Exec(pld); err != nil { - b.Fail() - } - } -} - -// Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op -// PTR -> Benchmark_Pool_Echo_Batched-32 406839 2900 ns/op 1059 B/op 23 allocs/op -// PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op -func Benchmark_Pool_Echo_Batched(b *testing.B) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - NumWorkers: uint64(runtime.NumCPU()), - AllocateTimeout: time.Second * 100, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(b, err) - defer p.Destroy(ctx) - - bd := make([]byte, 1024) - c := make([]byte, 1024) - - pld := &payload.Payload{ - Context: c, - Body: bd, - } - - b.ResetTimer() - b.ReportAllocs() - - var wg sync.WaitGroup - for i := 0; i < b.N; i++ { - wg.Add(1) - go func() { - defer wg.Done() - if _, err := p.Exec(pld); err != nil { - b.Fail() - log.Println(err) - } - }() - } - - wg.Wait() -} - -// Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op -func Benchmark_Pool_Echo_Replaced(b *testing.B) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), - &Config{ - NumWorkers: 1, - MaxJobs: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(b, err) - defer p.Destroy(ctx) - b.ResetTimer() - b.ReportAllocs() - - for n := 0; n < b.N; n++ { - if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - log.Println(err) - } - } -} - -// BenchmarkToStringUnsafe-12 566317729 1.91 ns/op 0 B/op 0 allocs/op -// BenchmarkToStringUnsafe-32 1000000000 0.4434 ns/op 0 B/op 0 allocs/op -func BenchmarkToStringUnsafe(b *testing.B) { - testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - res := utils.AsString(testPayload) - _ = res - } -} - -// BenchmarkToStringSafe-32 8017846 182.5 ns/op 896 B/op 1 allocs/op -// inline BenchmarkToStringSafe-12 28926276 46.6 ns/op 128 B/op 1 allocs/op -func BenchmarkToStringSafe(b *testing.B) { - testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - res := toStringNotFun(testPayload) - _ = res - } -} - -func toStringNotFun(data []byte) string { - return string(data) -} diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go deleted file mode 100755 index e6b2bd7c..00000000 --- a/pkg/pool/supervisor_pool.go +++ /dev/null @@ -1,230 +0,0 @@ -package pool - -import ( - "context" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/state/process" - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -const MB = 1024 * 1024 - -// NSEC_IN_SEC nanoseconds in second -const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck - -type Supervised interface { - Pool - // Start used to start watching process for all pool workers - Start() -} - -type supervised struct { - cfg *SupervisorConfig - events events.Handler - pool Pool - stopCh chan struct{} - mu *sync.RWMutex -} - -func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { - sp := &supervised{ - cfg: cfg, - events: events, - pool: pool, - mu: &sync.RWMutex{}, - stopCh: make(chan struct{}), - } - - return sp -} - -func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) { - panic("used to satisfy pool interface") -} - -func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("supervised_exec_with_context") - if sp.cfg.ExecTTL == 0 { - return sp.pool.Exec(rqs) - } - - ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) - defer cancel() - - res, err := sp.pool.execWithTTL(ctx, rqs) - if err != nil { - return nil, errors.E(op, err) - } - - return res, nil -} - -func (sp *supervised) GetConfig() interface{} { - return sp.pool.GetConfig() -} - -func (sp *supervised) Workers() (workers []worker.BaseProcess) { - sp.mu.Lock() - defer sp.mu.Unlock() - return sp.pool.Workers() -} - -func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { - return sp.pool.RemoveWorker(worker) -} - -func (sp *supervised) Destroy(ctx context.Context) { - sp.pool.Destroy(ctx) -} - -func (sp *supervised) Start() { - go func() { - watchTout := time.NewTicker(sp.cfg.WatchTick) - for { - select { - case <-sp.stopCh: - watchTout.Stop() - return - // stop here - case <-watchTout.C: - sp.mu.Lock() - sp.control() - sp.mu.Unlock() - } - } - }() -} - -func (sp *supervised) Stop() { - sp.stopCh <- struct{}{} -} - -func (sp *supervised) control() { //nolint:gocognit - now := time.Now() - - // MIGHT BE OUTDATED - // It's a copy of the Workers pointers - workers := sp.pool.Workers() - - for i := 0; i < len(workers); i++ { - // if worker not in the Ready OR working state - // skip such worker - switch workers[i].State().Value() { - case - worker.StateInvalid, - worker.StateErrored, - worker.StateDestroyed, - worker.StateInactive, - worker.StateStopped, - worker.StateStopping, - worker.StateKilling: - continue - } - - s, err := process.WorkerProcessState(workers[i]) - if err != nil { - // worker not longer valid for supervision - continue - } - - if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { - /* - worker at this point might be in the middle of request execution: - - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release - ^ - TTL Reached, state - invalid | - -----> Worker Stopped here - */ - - if workers[i].State().Value() != worker.StateWorking { - workers[i].State().Set(worker.StateInvalid) - _ = workers[i].Stop() - } - // just to double check - workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) - continue - } - - if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { - /* - worker at this point might be in the middle of request execution: - - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release - ^ - TTL Reached, state - invalid | - -----> Worker Stopped here - */ - - if workers[i].State().Value() != worker.StateWorking { - workers[i].State().Set(worker.StateInvalid) - _ = workers[i].Stop() - } - // just to double check - workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) - continue - } - - // firs we check maxWorker idle - if sp.cfg.IdleTTL != 0 { - // then check for the worker state - if workers[i].State().Value() != worker.StateReady { - continue - } - - /* - Calculate idle time - If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 - 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle - we are guessing that worker overlap idle time and has to be killed - */ - - // 1610530005534416045 lu - // lu - now = -7811150814 - nanoseconds - // 7.8 seconds - // get last used unix nano - lu := workers[i].State().LastUsed() - // worker not used, skip - if lu == 0 { - continue - } - - // convert last used to unixNano and sub time.now to seconds - // negative number, because lu always in the past, except for the `back to the future` :) - res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1 - - // maxWorkerIdle more than diff between now and last used - // for example: - // After exec worker goes to the rest - // And resting for the 5 seconds - // IdleTTL is 1 second. - // After the control check, res will be 5, idle is 1 - // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. - if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { - /* - worker at this point might be in the middle of request execution: - - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release - ^ - TTL Reached, state - invalid | - -----> Worker Stopped here - */ - - if workers[i].State().Value() != worker.StateWorking { - workers[i].State().Set(worker.StateInvalid) - _ = workers[i].Stop() - } - // just to double-check - workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) - } - } - } -} diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go deleted file mode 100644 index 14df513e..00000000 --- a/pkg/pool/supervisor_test.go +++ /dev/null @@ -1,413 +0,0 @@ -package pool - -import ( - "context" - "os" - "os/exec" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/transport/pipe" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var cfgSupervised = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 100 * time.Second, - IdleTTL: 100 * time.Second, - ExecTTL: 100 * time.Second, - MaxWorkerMemory: 100, - }, -} - -func TestSupervisedPool_Exec(t *testing.T) { - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(), - cfgSupervised, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - time.Sleep(time.Second) - - pidBefore := p.Workers()[0].Pid() - - for i := 0; i < 100; i++ { - time.Sleep(time.Millisecond * 100) - _, err = p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - assert.NoError(t, err) - } - - assert.NotEqual(t, pidBefore, p.Workers()[0].Pid()) - - p.Destroy(context.Background()) -} - -// This test should finish without freezes -func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { - var cfgSupervised = cfgSupervised - cfgSupervised.Debug = true - - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/supervised.php") }, - pipe.NewPipeFactory(), - cfgSupervised, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - time.Sleep(time.Second) - - for i := 0; i < 100; i++ { - time.Sleep(time.Millisecond * 500) - _, err = p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - assert.NoError(t, err) - } - - p.Destroy(context.Background()) -} - -func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 100 * time.Second, - IdleTTL: 100 * time.Second, - ExecTTL: 1 * time.Second, - MaxWorkerMemory: 100, - }, - } - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(), - cfgExecTTL, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - defer p.Destroy(context.Background()) - - pid := p.Workers()[0].Pid() - - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.Error(t, err) - assert.Empty(t, resp) - - time.Sleep(time.Second * 1) - // should be new worker with new pid - assert.NotEqual(t, pid, p.Workers()[0].Pid()) -} - -func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 5 * time.Second, - }, - } - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") }, - pipe.NewPipeFactory(), - cfgExecTTL, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - pid := p.Workers()[0].Pid() - - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.NoError(t, err) - assert.Equal(t, string(resp.Body), "hello world") - assert.Empty(t, resp.Context) - - time.Sleep(time.Second) - assert.NotEqual(t, pid, p.Workers()[0].Pid()) - require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) - pid = p.Workers()[0].Pid() - - resp, err = p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.NoError(t, err) - assert.Equal(t, string(resp.Body), "hello world") - assert.Empty(t, resp.Context) - - time.Sleep(time.Second) - // should be new worker with new pid - assert.NotEqual(t, pid, p.Workers()[0].Pid()) - require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) - - p.Destroy(context.Background()) -} - -func TestSupervisedPool_Idle(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 100 * time.Second, - IdleTTL: 1 * time.Second, - ExecTTL: 100 * time.Second, - MaxWorkerMemory: 100, - }, - } - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/idle.php", "pipes") }, - pipe.NewPipeFactory(), - cfgExecTTL, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - pid := p.Workers()[0].Pid() - - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.Nil(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) - - time.Sleep(time.Second * 5) - - // worker should be marked as invalid and reallocated - _, err = p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - assert.NoError(t, err) - // should be new worker with new pid - assert.NotEqual(t, pid, p.Workers()[0].Pid()) - p.Destroy(context.Background()) -} - -func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 1 * time.Second, - IdleTTL: 1 * time.Second, - MaxWorkerMemory: 100, - }, - } - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(), - cfgExecTTL, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - defer p.Destroy(context.Background()) - - pid := p.Workers()[0].Pid() - - time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.NoError(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) - - time.Sleep(time.Second * 2) - // should be destroyed, state should be Ready, not Invalid - assert.NotEqual(t, pid, p.Workers()[0].Pid()) - assert.Equal(t, int64(1), p.Workers()[0].State().Value()) -} - -func TestSupervisedPool_ExecTTL_OK(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 100 * time.Second, - IdleTTL: 100 * time.Second, - ExecTTL: 4 * time.Second, - MaxWorkerMemory: 100, - }, - } - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(), - cfgExecTTL, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - defer p.Destroy(context.Background()) - - pid := p.Workers()[0].Pid() - - time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.NoError(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) - - time.Sleep(time.Second * 1) - // should be the same pid - assert.Equal(t, pid, p.Workers()[0].Pid()) -} - -func TestSupervisedPool_MaxMemoryReached(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 100 * time.Second, - IdleTTL: 100 * time.Second, - ExecTTL: 4 * time.Second, - MaxWorkerMemory: 1, - }, - } - - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventMaxMemory { - block <- struct{}{} - } - } - } - - // constructed - // max memory - // constructed - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(), - cfgExecTTL, - AddListeners(listener), - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.NoError(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) - - <-block - p.Destroy(context.Background()) -} - -func TestSupervisedPool_AllocateFailedOK(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(2), - AllocateTimeout: time.Second * 15, - DestroyTimeout: time.Second * 5, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 5 * time.Second, - }, - } - - ctx := context.Background() - p, err := Initialize( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") }, - pipe.NewPipeFactory(), - cfgExecTTL, - ) - - assert.NoError(t, err) - require.NotNil(t, p) - - time.Sleep(time.Second) - - // should be ok - _, err = p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - require.NoError(t, err) - - // after creating this file, PHP will fail - file, err := os.Create("break") - require.NoError(t, err) - - time.Sleep(time.Second * 5) - assert.NoError(t, file.Close()) - assert.NoError(t, os.Remove("break")) - - defer func() { - if r := recover(); r != nil { - assert.Fail(t, "panic should not be fired!") - } else { - p.Destroy(context.Background()) - } - }() -} diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go deleted file mode 100644 index fc043927..00000000 --- a/pkg/priority_queue/binary_heap.go +++ /dev/null @@ -1,125 +0,0 @@ -/* -binary heap (min-heap) algorithm used as a core for the priority queue -*/ - -package priorityqueue - -import ( - "sync" - "sync/atomic" -) - -type BinHeap struct { - items []Item - // find a way to use pointer to the raw data - len uint64 - maxLen uint64 - cond sync.Cond -} - -func NewBinHeap(maxLen uint64) *BinHeap { - return &BinHeap{ - items: make([]Item, 0, 1000), - len: 0, - maxLen: maxLen, - cond: sync.Cond{L: &sync.Mutex{}}, - } -} - -func (bh *BinHeap) fixUp() { - k := bh.len - 1 - p := (k - 1) >> 1 // k-1 / 2 - - for k > 0 { - cur, par := (bh.items)[k], (bh.items)[p] - - if cur.Priority() < par.Priority() { - bh.swap(k, p) - k = p - p = (k - 1) >> 1 - } else { - return - } - } -} - -func (bh *BinHeap) swap(i, j uint64) { - (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i] -} - -func (bh *BinHeap) fixDown(curr, end int) { - cOneIdx := (curr << 1) + 1 - for cOneIdx <= end { - cTwoIdx := -1 - if (curr<<1)+2 <= end { - cTwoIdx = (curr << 1) + 2 - } - - idxToSwap := cOneIdx - if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() { - idxToSwap = cTwoIdx - } - if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() { - bh.swap(uint64(curr), uint64(idxToSwap)) - curr = idxToSwap - cOneIdx = (curr << 1) + 1 - } else { - return - } - } -} - -func (bh *BinHeap) Len() uint64 { - return atomic.LoadUint64(&bh.len) -} - -func (bh *BinHeap) Insert(item Item) { - bh.cond.L.Lock() - - // check the binary heap len before insertion - if bh.Len() > bh.maxLen { - // unlock the mutex to proceed to get-max - bh.cond.L.Unlock() - - // signal waiting goroutines - for bh.Len() > 0 { - // signal waiting goroutines - bh.cond.Signal() - } - // lock mutex to proceed inserting into the empty slice - bh.cond.L.Lock() - } - - bh.items = append(bh.items, item) - - // add len to the slice - atomic.AddUint64(&bh.len, 1) - - // fix binary heap up - bh.fixUp() - bh.cond.L.Unlock() - - // signal the goroutine on wait - bh.cond.Signal() -} - -func (bh *BinHeap) ExtractMin() Item { - bh.cond.L.Lock() - - // if len == 0, wait for the signal - for bh.Len() == 0 { - bh.cond.Wait() - } - - bh.swap(0, bh.len-1) - - item := (bh.items)[int(bh.len)-1] - bh.items = (bh).items[0 : int(bh.len)-1] - bh.fixDown(0, int(bh.len-2)) - - // reduce len - atomic.AddUint64(&bh.len, ^uint64(0)) - - bh.cond.L.Unlock() - return item -} diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go deleted file mode 100644 index ab0f9266..00000000 --- a/pkg/priority_queue/binary_heap_test.go +++ /dev/null @@ -1,154 +0,0 @@ -package priorityqueue - -import ( - "fmt" - "math/rand" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -type Test int - -func (t Test) Ack() error { - return nil -} - -func (t Test) Nack() error { - return nil -} - -func (t Test) Requeue(_ map[string][]string, _ int64) error { - return nil -} - -func (t Test) Body() []byte { - return nil -} - -func (t Test) Context() ([]byte, error) { - return nil, nil -} - -func (t Test) ID() string { - return "none" -} - -func (t Test) Priority() int64 { - return int64(t) -} - -func TestBinHeap_Init(t *testing.T) { - a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} - - bh := NewBinHeap(12) - - for i := 0; i < len(a); i++ { - bh.Insert(a[i]) - } - - expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)} - - res := make([]Item, 0, 12) - - for i := 0; i < 11; i++ { - item := bh.ExtractMin() - res = append(res, item) - } - - require.Equal(t, expected, res) -} - -func TestBinHeap_MaxLen(t *testing.T) { - a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} - - bh := NewBinHeap(1) - - go func() { - expected := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)} - - res := make([]Item, 0, 12) - - for i := 0; i < 11; i++ { - item := bh.ExtractMin() - res = append(res, item) - } - require.Equal(t, expected, res) - return - }() - - time.Sleep(time.Second) - for i := 0; i < len(a); i++ { - bh.Insert(a[i]) - } - - time.Sleep(time.Second) -} - -func TestNewPriorityQueue(t *testing.T) { - insertsPerSec := uint64(0) - getPerSec := uint64(0) - stopCh := make(chan struct{}, 1) - pq := NewBinHeap(1000) - - go func() { - tt3 := time.NewTicker(time.Millisecond * 10) - for { - select { - case <-tt3.C: - require.Less(t, pq.Len(), uint64(1002)) - case <-stopCh: - return - } - } - }() - - go func() { - tt := time.NewTicker(time.Second) - - for { - select { - case <-tt.C: - fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec))) - atomic.StoreUint64(&insertsPerSec, 0) - fmt.Println(fmt.Sprintf("ExtractMin per second: %d", atomic.LoadUint64(&getPerSec))) - atomic.StoreUint64(&getPerSec, 0) - case <-stopCh: - tt.Stop() - return - } - } - }() - - go func() { - for { - select { - case <-stopCh: - return - default: - pq.ExtractMin() - atomic.AddUint64(&getPerSec, 1) - } - } - }() - - go func() { - for { - select { - case <-stopCh: - return - default: - pq.Insert(Test(rand.Int())) //nolint:gosec - atomic.AddUint64(&insertsPerSec, 1) - } - } - }() - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - stopCh <- struct{}{} - stopCh <- struct{}{} - stopCh <- struct{}{} -} diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go deleted file mode 100644 index 9efa4652..00000000 --- a/pkg/priority_queue/interface.go +++ /dev/null @@ -1,31 +0,0 @@ -package priorityqueue - -type Queue interface { - Insert(item Item) - ExtractMin() Item - Len() uint64 -} - -// Item represents binary heap item -type Item interface { - // ID is a unique item identifier - ID() string - - // Priority returns the Item's priority to sort - Priority() int64 - - // Body is the Item payload - Body() []byte - - // Context is the Item meta information - Context() ([]byte, error) - - // Ack - acknowledge the Item after processing - Ack() error - - // Nack - discard the Item - Nack() error - - // Requeue - put the message back to the queue with the optional delay - Requeue(headers map[string][]string, delay int64) error -} diff --git a/pkg/state/job/state.go b/pkg/state/job/state.go deleted file mode 100644 index 56050084..00000000 --- a/pkg/state/job/state.go +++ /dev/null @@ -1,19 +0,0 @@ -package job - -// State represents job's state -type State struct { - // Pipeline name - Pipeline string - // Driver name - Driver string - // Queue name (tube for the beanstalk) - Queue string - // Active jobs which are consumed from the driver but not handled by the PHP worker yet - Active int64 - // Delayed jobs - Delayed int64 - // Reserved jobs which are in the driver but not consumed yet - Reserved int64 - // Status - 1 Ready, 0 - Paused - Ready bool -} diff --git a/pkg/state/process/state.go b/pkg/state/process/state.go deleted file mode 100644 index bfc3a287..00000000 --- a/pkg/state/process/state.go +++ /dev/null @@ -1,76 +0,0 @@ -package process - -import ( - "github.com/shirou/gopsutil/process" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -// State provides information about specific worker. -type State struct { - // Pid contains process id. - Pid int `json:"pid"` - - // Status of the worker. - Status string `json:"status"` - - // Number of worker executions. - NumJobs uint64 `json:"numExecs"` - - // Created is unix nano timestamp of worker creation time. - Created int64 `json:"created"` - - // MemoryUsage holds the information about worker memory usage in bytes. - // Values might vary for different operating systems and based on RSS. - MemoryUsage uint64 `json:"memoryUsage"` - - // CPU_Percent returns how many percent of the CPU time this process uses - CPUPercent float64 - - // Command used in the service plugin and shows a command for the particular service - Command string -} - -// WorkerProcessState creates new worker state definition. -func WorkerProcessState(w worker.BaseProcess) (*State, error) { - const op = errors.Op("worker_process_state") - p, _ := process.NewProcess(int32(w.Pid())) - i, err := p.MemoryInfo() - if err != nil { - return nil, errors.E(op, err) - } - - percent, err := p.CPUPercent() - if err != nil { - return nil, err - } - - return &State{ - CPUPercent: percent, - Pid: int(w.Pid()), - Status: w.State().String(), - NumJobs: w.State().NumExecs(), - Created: w.Created().UnixNano(), - MemoryUsage: i.RSS, - }, nil -} - -func GeneralProcessState(pid int, command string) (State, error) { - const op = errors.Op("process_state") - p, _ := process.NewProcess(int32(pid)) - i, err := p.MemoryInfo() - if err != nil { - return State{}, errors.E(op, err) - } - percent, err := p.CPUPercent() - if err != nil { - return State{}, err - } - - return State{ - CPUPercent: percent, - Pid: pid, - MemoryUsage: i.RSS, - Command: command, - }, nil -} diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go deleted file mode 100644 index 1b072378..00000000 --- a/pkg/transport/interface.go +++ /dev/null @@ -1,21 +0,0 @@ -package transport - -import ( - "context" - "os/exec" - - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -// Factory is responsible for wrapping given command into tasks WorkerProcess. -type Factory interface { - // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context. - // Process must not be started. - SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error) - // SpawnWorker creates new WorkerProcess process based on given command. - // Process must not be started. - SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error) - // Close the factory and underlying connections. - Close() error -} diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go deleted file mode 100755 index 9433a510..00000000 --- a/pkg/transport/pipe/pipe_factory.go +++ /dev/null @@ -1,197 +0,0 @@ -package pipe - -import ( - "context" - "os/exec" - - "github.com/spiral/errors" - "github.com/spiral/goridge/v3/pkg/pipe" - "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/worker" - "go.uber.org/multierr" -) - -// Factory connects to stack using standard -// streams (STDIN, STDOUT pipes). -type Factory struct{} - -// NewPipeFactory returns new factory instance and starts -// listening -func NewPipeFactory() *Factory { - return &Factory{} -} - -type sr struct { - w *worker.Process - err error -} - -// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, -// method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit - spCh := make(chan sr) - const op = errors.Op("factory_spawn_worker_with_timeout") - go func() { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, err), - }: - return - default: - return - } - } - - in, err := cmd.StdoutPipe() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, err), - }: - return - default: - return - } - } - - out, err := cmd.StdinPipe() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, err), - }: - return - default: - return - } - } - - // Init new PIPE relay - relay := pipe.NewPipeRelay(in, out) - w.AttachRelay(relay) - - // Start the worker - err = w.Start() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, err), - }: - return - default: - return - } - } - - pid, err := internal.FetchPID(relay) - if err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, err), - }: - return - default: - _ = w.Kill() - return - } - } - - if pid != w.Pid() { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())), - }: - return - default: - _ = w.Kill() - return - } - } - - select { - case - // return worker - spCh <- sr{ - w: w, - err: nil, - }: - // everything ok, set ready state - w.State().Set(worker.StateReady) - return - default: - _ = w.Kill() - return - } - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-spCh: - if res.err != nil { - return nil, res.err - } - return res.w, nil - } -} - -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - const op = errors.Op("factory_spawn_worker") - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) - if err != nil { - return nil, errors.E(op, err) - } - - in, err := cmd.StdoutPipe() - if err != nil { - return nil, errors.E(op, err) - } - - out, err := cmd.StdinPipe() - if err != nil { - return nil, errors.E(op, err) - } - - // Init new PIPE relay - relay := pipe.NewPipeRelay(in, out) - w.AttachRelay(relay) - - // Start the worker - err = w.Start() - if err != nil { - return nil, errors.E(op, err) - } - - // errors bundle - if pid, err := internal.FetchPID(relay); pid != w.Pid() { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - return nil, errors.E(op, err) - } - - // everything ok, set ready state - w.State().Set(worker.StateReady) - return w, nil -} - -// Close the factory. -func (f *Factory) Close() error { - return nil -} diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go deleted file mode 100644 index f5e9669b..00000000 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ /dev/null @@ -1,461 +0,0 @@ -package pipe - -import ( - "os/exec" - "strings" - "sync" - "testing" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/stretchr/testify/assert" -) - -func Test_GetState2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - assert.Equal(t, worker.StateStopped, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, worker.StateReady, w.State().Value()) - assert.NoError(t, w.Stop()) -} - -func Test_Kill2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.Error(t, w.Wait()) - assert.Equal(t, worker.StateErrored, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, worker.StateReady, w.State().Value()) - err = w.Kill() - if err != nil { - t.Errorf("error killing the Process: error %v", err) - } - wg.Wait() -} - -func Test_Pipe_Start2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - assert.NoError(t, w.Stop()) -} - -func Test_Pipe_StartError2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - err := cmd.Start() - if err != nil { - t.Errorf("error running the command: error %v", err) - } - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError3(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError4(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Failboot2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } - w, err := NewPipeFactory().SpawnWorker(cmd, listener) - - assert.Nil(t, w) - assert.Error(t, err) - <-finish -} - -func Test_Pipe_Invalid2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/invalid.php") - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Echo2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Pipe_Broken2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res) -} - -func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { - f := NewPipeFactory() - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - w, _ := f.SpawnWorker(cmd) - go func() { - if w.Wait() != nil { - b.Fail() - } - }() - - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - sw := worker.From(w) - - b.ReportAllocs() - b.ResetTimer() - go func() { - err := w.Wait() - if err != nil { - b.Errorf("error waiting the worker: error %v", err) - } - }() - defer func() { - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - }() - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Test_Echo2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - - sw := worker.From(w) - - go func() { - assert.NoError(t, sw.Wait()) - }() - defer func() { - err = sw.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_BadPayload2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - - sw := worker.From(w) - - go func() { - assert.NoError(t, sw.Wait()) - }() - defer func() { - err := sw.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := sw.Exec(&payload.Payload{}) - - assert.Error(t, err) - assert.Nil(t, res) - - assert.Contains(t, err.Error(), "payload can not be empty") -} - -func Test_String2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") - assert.Contains(t, w.String(), "ready") - assert.Contains(t, w.String(), "numExecs: 0") -} - -func Test_Echo_Slow2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Broken2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - - w, err := NewPipeFactory().SpawnWorker(cmd, listener) - if err != nil { - t.Fatal(err) - } - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res) - - time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { - t.Fail() - } - mu.Unlock() - assert.Error(t, w.Stop()) -} - -func Test_Error2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res) - - if errors.Is(errors.SoftJob, err) == false { - t.Fatal("error should be of type errors.ErrSoftJob") - } - assert.Contains(t, err.Error(), "hello") -} - -func Test_NumExecs2(t *testing.T) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(1), w.State().NumExecs()) - - _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(2), w.State().NumExecs()) - - _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(3), w.State().NumExecs()) -} diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go deleted file mode 100755 index e396fe57..00000000 --- a/pkg/transport/pipe/pipe_factory_test.go +++ /dev/null @@ -1,503 +0,0 @@ -package pipe - -import ( - "context" - "os/exec" - "strings" - "sync" - "testing" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/stretchr/testify/assert" -) - -func Test_GetState(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - assert.Equal(t, worker.StateStopped, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, worker.StateReady, w.State().Value()) - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Kill(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.Error(t, w.Wait()) - assert.Equal(t, worker.StateErrored, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, worker.StateReady, w.State().Value()) - err = w.Kill() - if err != nil { - t.Errorf("error killing the Process: error %v", err) - } - wg.Wait() -} - -func Test_Pipe_Start(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - assert.NoError(t, w.Stop()) -} - -func Test_Pipe_StartError(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - err := cmd.Start() - if err != nil { - t.Errorf("error running the command: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError2(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - // error cause - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Failboot(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../../tests/failboot.php") - ctx := context.Background() - - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) - - assert.Nil(t, w) - assert.Error(t, err) - <-finish -} - -func Test_Pipe_Invalid(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../../tests/invalid.php") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Echo(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Pipe_Broken(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res) -} - -func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { - f := NewPipeFactory() - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) - go func() { - if w.Wait() != nil { - b.Fail() - } - }() - - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) - sw := worker.From(w) - - b.ReportAllocs() - b.ResetTimer() - go func() { - err := w.Wait() - if err != nil { - b.Errorf("error waiting the worker: error %v", err) - } - }() - defer func() { - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - }() - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Test_Echo(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - - sw := worker.From(w) - go func() { - assert.NoError(t, sw.Wait()) - }() - defer func() { - err = sw.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_BadPayload(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - - sw := worker.From(w) - - go func() { - assert.NoError(t, sw.Wait()) - }() - defer func() { - err := sw.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := sw.Exec(&payload.Payload{}) - - assert.Error(t, err) - assert.Nil(t, res) - - assert.Contains(t, err.Error(), "payload can not be empty") -} - -func Test_String(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") - assert.Contains(t, w.String(), "ready") - assert.Contains(t, w.String(), "numExecs: 0") -} - -func Test_Echo_Slow(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Broken(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) - if err != nil { - t.Fatal(err) - } - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res) - - time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { - t.Fail() - } - mu.Unlock() - assert.Error(t, w.Stop()) -} - -func Test_Error(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res) - - if errors.Is(errors.SoftJob, err) == false { - t.Fatal("error should be of type errors.ErrSoftJob") - } - assert.Contains(t, err.Error(), "hello") -} - -func Test_NumExecs(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(1), w.State().NumExecs()) - - _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(2), w.State().NumExecs()) - - _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(3), w.State().NumExecs()) -} diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go deleted file mode 100755 index dc2b75cf..00000000 --- a/pkg/transport/socket/socket_factory.go +++ /dev/null @@ -1,255 +0,0 @@ -package socket - -import ( - "context" - "fmt" - "net" - "os/exec" - "sync" - "time" - - "github.com/shirou/gopsutil/process" - "github.com/spiral/errors" - "github.com/spiral/goridge/v3/pkg/relay" - "github.com/spiral/goridge/v3/pkg/socket" - "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/worker" - - "go.uber.org/multierr" - "golang.org/x/sync/errgroup" -) - -// Factory connects to external stack using socket server. -type Factory struct { - // listens for incoming connections from underlying processes - ls net.Listener - - // relay connection timeout - tout time.Duration - - // sockets which are waiting for process association - relays sync.Map -} - -// NewSocketServer returns Factory attached to a given socket listener. -// tout specifies for how long factory should serve for incoming relay connection -func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { - f := &Factory{ - ls: ls, - tout: tout, - relays: sync.Map{}, - } - - // Be careful - // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go - // https://github.com/golang/go/issues/5045 - go func() { - err := f.listen() - // there is no logger here, use fmt - if err != nil { - fmt.Printf("[WARN]: socket server listen, error: %v\n", err) - } - }() - - return f -} - -// blocking operation, returns an error -func (f *Factory) listen() error { - errGr := &errgroup.Group{} - errGr.Go(func() error { - for { - conn, err := f.ls.Accept() - if err != nil { - return err - } - - rl := socket.NewSocketRelay(conn) - pid, err := internal.FetchPID(rl) - if err != nil { - return err - } - - f.attachRelayToPid(pid, rl) - } - }) - - return errGr.Wait() -} - -type socketSpawn struct { - w *worker.Process - err error -} - -// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - const op = errors.Op("factory_spawn_worker_with_timeout") - c := make(chan socketSpawn) - go func() { - ctxT, cancel := context.WithTimeout(ctx, f.tout) - defer cancel() - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) - if err != nil { - select { - case c <- socketSpawn{ - w: nil, - err: errors.E(op, err), - }: - return - default: - return - } - } - - err = w.Start() - if err != nil { - select { - case c <- socketSpawn{ - w: nil, - err: errors.E(op, err), - }: - return - default: - return - } - } - - rl, err := f.findRelayWithContext(ctxT, w) - if err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - - select { - // try to write result - case c <- socketSpawn{ - w: nil, - err: errors.E(op, err), - }: - return - // if no receivers - return - default: - return - } - } - - w.AttachRelay(rl) - w.State().Set(worker.StateReady) - - select { - case c <- socketSpawn{ - w: w, - err: nil, - }: - return - default: - _ = w.Kill() - return - } - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-c: - if res.err != nil { - return nil, res.err - } - - return res.w, nil - } -} - -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - const op = errors.Op("factory_spawn_worker") - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) - if err != nil { - return nil, err - } - - err = w.Start() - if err != nil { - return nil, errors.E(op, err) - } - - rl, err := f.findRelay(w) - if err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - return nil, err - } - - w.AttachRelay(rl) - - // errors bundle - if pid, err := internal.FetchPID(rl); pid != w.Pid() { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - return nil, errors.E(op, err) - } - - w.State().Set(worker.StateReady) - - return w, nil -} - -// Close socket factory and underlying socket connection. -func (f *Factory) Close() error { - return f.ls.Close() -} - -// waits for Process to connect over socket and returns associated relay of timeout -func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { - ticker := time.NewTicker(time.Millisecond * 10) - for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-ticker.C: - // check for the process exists - _, err := process.NewProcess(int32(w.Pid())) - if err != nil { - return nil, err - } - default: - tmp, ok := f.relays.LoadAndDelete(w.Pid()) - if !ok { - continue - } - return tmp.(*socket.Relay), nil - } - } -} - -func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { - const op = errors.Op("factory_find_relay") - // poll every 1ms for the relay - pollDone := time.NewTimer(f.tout) - for { - select { - case <-pollDone.C: - return nil, errors.E(op, errors.Str("relay timeout")) - default: - tmp, ok := f.relays.Load(w.Pid()) - if !ok { - continue - } - return tmp.(*socket.Relay), nil - } - } -} - -// chan to store relay associated with specific pid -func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { - f.relays.Store(pid, relay) -} diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go deleted file mode 100644 index 905a3b6b..00000000 --- a/pkg/transport/socket/socket_factory_spawn_test.go +++ /dev/null @@ -1,533 +0,0 @@ -package socket - -import ( - "net" - "os/exec" - "strings" - "sync" - "syscall" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/stretchr/testify/assert" -) - -func Test_Tcp_Start2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartCloseFactory2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") - - f := NewSocketServer(ls, time.Minute) - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - w, err := f.SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartError2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - err = cmd.Start() - if err != nil { - t.Errorf("error executing the command: error %v", err) - } - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Failboot2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err3 := ls.Close() - if err3 != nil { - t.Errorf("error closing the listener: error %v", err3) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/failboot.php") - - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } - - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) - assert.Nil(t, w) - assert.Error(t, err2) - <-finish -} - -func Test_Tcp_Invalid2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Broken2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") - - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) - if err != nil { - t.Fatal(err) - } - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - errW := w.Wait() - assert.Error(t, errW) - }() - - defer func() { - time.Sleep(time.Second) - err2 := w.Stop() - // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection - assert.Error(t, err2) - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - wg.Wait() - <-finish -} - -func Test_Tcp_Echo2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") - - w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Unix_Start2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err = ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Unix_Failboot2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err = ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../../tests/failboot.php") - - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } - - w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) - assert.Nil(t, w) - assert.Error(t, err) - <-finish -} - -func Test_Unix_Timeout2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err = ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") - - w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd) - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "relay timeout") -} - -func Test_Unix_Invalid2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err = ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Unix_Broken2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - errC := ls.Close() - assert.NoError(t, errC) - }() - - cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") - - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) - if err != nil { - t.Fatal(err) - } - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - errW := w.Wait() - assert.Error(t, errW) - }() - - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res) - wg.Wait() - <-finish -} - -func Test_Unix_Echo2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err = ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(b, err) - defer func() { - err = ls.Close() - assert.NoError(b, err) - }() - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") - - w, err := f.SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - go func() { - assert.NoError(b, w.Wait()) - }() - - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(b, err) - defer func() { - err = ls.Close() - assert.NoError(b, err) - }() - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { - defer func() { - _ = syscall.Unlink("sock.unix") - }() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - errC := ls.Close() - if errC != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") - - w, err := f.SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { - defer func() { - _ = syscall.Unlink("sock.unix") - }() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - errC := ls.Close() - if errC != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go deleted file mode 100755 index 17437e2f..00000000 --- a/pkg/transport/socket/socket_factory_test.go +++ /dev/null @@ -1,622 +0,0 @@ -package socket - -import ( - "context" - "net" - "os/exec" - "strings" - "sync" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/stretchr/testify/assert" -) - -func Test_Tcp_Start(t *testing.T) { - ctx := context.Background() - time.Sleep(time.Millisecond * 10) // to ensure free socket - - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartCloseFactory(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") - - f := NewSocketServer(ls, time.Minute) - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - w, err := f.SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartError(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") - err = cmd.Start() - if err != nil { - t.Errorf("error executing the command: error %v", err) - } - - serv := NewSocketServer(ls, time.Minute) - time.Sleep(time.Second * 2) - w, err := serv.SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Failboot(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err3 := ls.Close() - if err3 != nil { - t.Errorf("error closing the listener: error %v", err3) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/failboot.php") - - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } - - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) - assert.Nil(t, w) - assert.Error(t, err2) - <-finish -} - -func Test_Tcp_Timeout(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "tcp", "200", "0") - - w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd) - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "context deadline exceeded") -} - -func Test_Tcp_Invalid(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Broken(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") - - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) - if err != nil { - t.Fatal(err) - } - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - errW := w.Wait() - assert.Error(t, errW) - }() - - defer func() { - time.Sleep(time.Second) - err2 := w.Stop() - // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection - assert.Error(t, err2) - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - wg.Wait() - <-finish -} - -func Test_Tcp_Echo(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") - - w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Unix_Start(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Unix_Failboot(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - ctx := context.Background() - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/failboot.php") - - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } - - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) - assert.Nil(t, w) - assert.Error(t, err) - <-finish -} - -func Test_Unix_Timeout(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - ctx := context.Background() - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") - - w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd) - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "context deadline exceeded") -} - -func Test_Unix_Invalid(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Unix_Broken(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") - - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) - if err != nil { - t.Fatal(err) - } - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - errW := w.Wait() - assert.Error(t, errW) - }() - - defer func() { - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res) - <-block - wg.Wait() -} - -func Test_Unix_Echo(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") - - w, err := f.SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - go func() { - assert.NoError(b, w.Wait()) - }() - - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") - - w, err := f.SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go deleted file mode 100644 index ed8704bb..00000000 --- a/pkg/worker/interface.go +++ /dev/null @@ -1,74 +0,0 @@ -package worker - -import ( - "context" - "fmt" - "time" - - "github.com/spiral/goridge/v3/pkg/relay" - "github.com/spiral/roadrunner/v2/pkg/payload" -) - -// State represents WorkerProcess status and updated time. -type State interface { - fmt.Stringer - // Value returns StateImpl value - Value() int64 - // Set sets the StateImpl - Set(value int64) - // NumExecs shows how many times WorkerProcess was invoked - NumExecs() uint64 - // IsActive returns true if WorkerProcess not Inactive or Stopped - IsActive() bool - // RegisterExec using to registering php executions - RegisterExec() - // SetLastUsed sets worker last used time - SetLastUsed(lu uint64) - // LastUsed return worker last used time - LastUsed() uint64 -} - -type BaseProcess interface { - fmt.Stringer - - // Pid returns worker pid. - Pid() int64 - - // Created returns time worker was created at. - Created() time.Time - - // State return receive-only WorkerProcess state object, state can be used to safely access - // WorkerProcess status, time when status changed and number of WorkerProcess executions. - State() State - - // Start used to run Cmd and immediately return - Start() error - - // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is - // complete and will return process error (if any), if stderr is presented it's value - // will be wrapped as WorkerError. Method will return error code if php process fails - // to find or Start the script. - Wait() error - - // Stop sends soft termination command to the WorkerProcess and waits for process completion. - Stop() error - - // Kill kills underlying process, make sure to call Wait() func to gather - // error log from the stderr. Does not waits for process completion! - Kill() error - - // Relay returns attached to worker goridge relay - Relay() relay.Relay - - // AttachRelay used to attach goridge relay to the worker process - AttachRelay(rl relay.Relay) -} - -type SyncWorker interface { - // BaseProcess provides basic functionality for the SyncWorker - BaseProcess - // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(rqs *payload.Payload) (*payload.Payload, error) - // ExecWithTTL used to handle Exec with TTL - ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) -} diff --git a/pkg/worker/state.go b/pkg/worker/state.go deleted file mode 100755 index bf152e8b..00000000 --- a/pkg/worker/state.go +++ /dev/null @@ -1,111 +0,0 @@ -package worker - -import ( - "sync/atomic" -) - -// SYNC WITH worker_watcher.GET -const ( - // StateInactive - no associated process - StateInactive int64 = iota - - // StateReady - ready for job. - StateReady - - // StateWorking - working on given payload. - StateWorking - - // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. - StateInvalid - - // StateStopping - process is being softly stopped. - StateStopping - - // StateKilling - process is being forcibly stopped - StateKilling - - // StateDestroyed State of worker, when no need to allocate new one - StateDestroyed - - // StateMaxJobsReached State of worker, when it reached executions limit - StateMaxJobsReached - - // StateStopped - process has been terminated. - StateStopped - - // StateErrored - error StateImpl (can't be used). - StateErrored -) - -type StateImpl struct { - value int64 - numExecs uint64 - // to be lightweight, use UnixNano - lastUsed uint64 -} - -// NewWorkerState initializes a state for the sync.Worker -func NewWorkerState(value int64) *StateImpl { - return &StateImpl{value: value} -} - -// String returns current StateImpl as string. -func (s *StateImpl) String() string { - switch s.Value() { - case StateInactive: - return "inactive" - case StateReady: - return "ready" - case StateWorking: - return "working" - case StateInvalid: - return "invalid" - case StateStopping: - return "stopping" - case StateStopped: - return "stopped" - case StateKilling: - return "killing" - case StateErrored: - return "errored" - case StateDestroyed: - return "destroyed" - } - - return "undefined" -} - -// NumExecs returns number of registered WorkerProcess execs. -func (s *StateImpl) NumExecs() uint64 { - return atomic.LoadUint64(&s.numExecs) -} - -// Value StateImpl returns StateImpl value -func (s *StateImpl) Value() int64 { - return atomic.LoadInt64(&s.value) -} - -// IsActive returns true if WorkerProcess not Inactive or Stopped -func (s *StateImpl) IsActive() bool { - val := s.Value() - return val == StateWorking || val == StateReady -} - -// Set change StateImpl value (status) -func (s *StateImpl) Set(value int64) { - atomic.StoreInt64(&s.value, value) -} - -// RegisterExec register new execution atomically -func (s *StateImpl) RegisterExec() { - atomic.AddUint64(&s.numExecs, 1) -} - -// SetLastUsed Update last used time -func (s *StateImpl) SetLastUsed(lu uint64) { - atomic.StoreUint64(&s.lastUsed, lu) -} - -func (s *StateImpl) LastUsed() uint64 { - return atomic.LoadUint64(&s.lastUsed) -} diff --git a/pkg/worker/state_test.go b/pkg/worker/state_test.go deleted file mode 100755 index c67182d6..00000000 --- a/pkg/worker/state_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package worker - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_NewState(t *testing.T) { - st := NewWorkerState(StateErrored) - - assert.Equal(t, "errored", st.String()) - - assert.Equal(t, "inactive", NewWorkerState(StateInactive).String()) - assert.Equal(t, "ready", NewWorkerState(StateReady).String()) - assert.Equal(t, "working", NewWorkerState(StateWorking).String()) - assert.Equal(t, "stopped", NewWorkerState(StateStopped).String()) - assert.Equal(t, "undefined", NewWorkerState(1000).String()) -} - -func Test_IsActive(t *testing.T) { - assert.False(t, NewWorkerState(StateInactive).IsActive()) - assert.True(t, NewWorkerState(StateReady).IsActive()) - assert.True(t, NewWorkerState(StateWorking).IsActive()) - assert.False(t, NewWorkerState(StateStopped).IsActive()) - assert.False(t, NewWorkerState(StateErrored).IsActive()) -} diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go deleted file mode 100755 index 74e29b71..00000000 --- a/pkg/worker/sync_worker.go +++ /dev/null @@ -1,283 +0,0 @@ -package worker - -import ( - "bytes" - "context" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/goridge/v3/pkg/relay" - "github.com/spiral/roadrunner/v2/pkg/payload" - "go.uber.org/multierr" -) - -// Allocator is responsible for worker allocation in the pool -type Allocator func() (SyncWorker, error) - -type SyncWorkerImpl struct { - process *Process - fPool sync.Pool - bPool sync.Pool -} - -// From creates SyncWorker from BaseProcess -func From(process *Process) *SyncWorkerImpl { - return &SyncWorkerImpl{ - process: process, - fPool: sync.Pool{New: func() interface{} { - return frame.NewFrame() - }}, - bPool: sync.Pool{New: func() interface{} { - return new(bytes.Buffer) - }}, - } -} - -// Exec payload without TTL timeout. -func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("sync_worker_exec") - if len(p.Body) == 0 && len(p.Context) == 0 { - return nil, errors.E(op, errors.Str("payload can not be empty")) - } - - if tw.process.State().Value() != StateReady { - return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) - } - - // set last used time - tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(StateWorking) - - rsp, err := tw.execPayload(p) - if err != nil { - // just to be more verbose - if !errors.Is(errors.SoftJob, err) { - tw.process.State().Set(StateErrored) - tw.process.State().RegisterExec() - } - return nil, errors.E(op, err) - } - - // supervisor may set state of the worker during the work - // in this case we should not re-write the worker state - if tw.process.State().Value() != StateWorking { - tw.process.State().RegisterExec() - return rsp, nil - } - - tw.process.State().Set(StateReady) - tw.process.State().RegisterExec() - - return rsp, nil -} - -type wexec struct { - payload *payload.Payload - err error -} - -// ExecWithTTL executes payload without TTL timeout. -func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("sync_worker_exec_worker_with_timeout") - c := make(chan wexec, 1) - - go func() { - if len(p.Body) == 0 && len(p.Context) == 0 { - c <- wexec{ - err: errors.E(op, errors.Str("payload can not be empty")), - } - return - } - - if tw.process.State().Value() != StateReady { - c <- wexec{ - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), - } - return - } - - // set last used time - tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(StateWorking) - - rsp, err := tw.execPayload(p) - if err != nil { - // just to be more verbose - if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple - tw.process.State().Set(StateErrored) - tw.process.State().RegisterExec() - } - c <- wexec{ - err: errors.E(op, err), - } - return - } - - if tw.process.State().Value() != StateWorking { - tw.process.State().RegisterExec() - c <- wexec{ - payload: rsp, - err: nil, - } - return - } - - tw.process.State().Set(StateReady) - tw.process.State().RegisterExec() - - c <- wexec{ - payload: rsp, - err: nil, - } - }() - - select { - // exec TTL reached - case <-ctx.Done(): - err := multierr.Combine(tw.Kill()) - if err != nil { - // append timeout error - err = multierr.Append(err, errors.E(op, errors.ExecTTL)) - return nil, multierr.Append(err, ctx.Err()) - } - return nil, errors.E(op, errors.ExecTTL, ctx.Err()) - case res := <-c: - if res.err != nil { - return nil, res.err - } - return res.payload, nil - } -} - -func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("sync_worker_exec_payload") - - // get a frame - fr := tw.getFrame() - defer tw.putFrame(fr) - - // can be 0 here - fr.WriteVersion(fr.Header(), frame.VERSION_1) - - // obtain a buffer - buf := tw.get() - - buf.Write(p.Context) - buf.Write(p.Body) - - // Context offset - fr.WriteOptions(fr.HeaderPtr(), uint32(len(p.Context))) - fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) - fr.WritePayload(buf.Bytes()) - - fr.WriteCRC(fr.Header()) - - // return buffer - tw.put(buf) - - err := tw.Relay().Send(fr) - if err != nil { - return nil, errors.E(op, errors.Network, err) - } - - frameR := tw.getFrame() - defer tw.putFrame(frameR) - - err = tw.process.Relay().Receive(frameR) - if err != nil { - return nil, errors.E(op, errors.Network, err) - } - if frameR == nil { - return nil, errors.E(op, errors.Network, errors.Str("nil fr received")) - } - - if !frameR.VerifyCRC(frameR.Header()) { - return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) - } - - flags := frameR.ReadFlags() - - if flags&frame.ERROR != byte(0) { - return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) - } - - options := frameR.ReadOptions(frameR.Header()) - if len(options) != 1 { - return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) - } - - pld := &payload.Payload{ - Body: make([]byte, len(frameR.Payload()[options[0]:])), - Context: make([]byte, len(frameR.Payload()[:options[0]])), - } - - // by copying we free frame's payload slice - // we do not hold the pointer from the smaller slice to the initial (which should be in the sync.Pool) - // https://blog.golang.org/slices-intro#TOC_6. - copy(pld.Body, frameR.Payload()[options[0]:]) - copy(pld.Context, frameR.Payload()[:options[0]]) - - return pld, nil -} - -func (tw *SyncWorkerImpl) String() string { - return tw.process.String() -} - -func (tw *SyncWorkerImpl) Pid() int64 { - return tw.process.Pid() -} - -func (tw *SyncWorkerImpl) Created() time.Time { - return tw.process.Created() -} - -func (tw *SyncWorkerImpl) State() State { - return tw.process.State() -} - -func (tw *SyncWorkerImpl) Start() error { - return tw.process.Start() -} - -func (tw *SyncWorkerImpl) Wait() error { - return tw.process.Wait() -} - -func (tw *SyncWorkerImpl) Stop() error { - return tw.process.Stop() -} - -func (tw *SyncWorkerImpl) Kill() error { - return tw.process.Kill() -} - -func (tw *SyncWorkerImpl) Relay() relay.Relay { - return tw.process.Relay() -} - -func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) { - tw.process.AttachRelay(rl) -} - -// Private - -func (tw *SyncWorkerImpl) get() *bytes.Buffer { - return tw.bPool.Get().(*bytes.Buffer) -} - -func (tw *SyncWorkerImpl) put(b *bytes.Buffer) { - b.Reset() - tw.bPool.Put(b) -} - -func (tw *SyncWorkerImpl) getFrame() *frame.Frame { - return tw.fPool.Get().(*frame.Frame) -} - -func (tw *SyncWorkerImpl) putFrame(f *frame.Frame) { - f.Reset() - tw.fPool.Put(f) -} diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go deleted file mode 100755 index 64580f9f..00000000 --- a/pkg/worker/sync_worker_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package worker - -import ( - "os/exec" - "testing" - - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/stretchr/testify/assert" -) - -func Test_NotStarted_String(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := InitBaseWorker(cmd) - assert.Contains(t, w.String(), "php tests/client.php echo pipes") - assert.Contains(t, w.String(), "inactive") - assert.Contains(t, w.String(), "numExecs: 0") -} - -func Test_NotStarted_Exec(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := InitBaseWorker(cmd) - - sw := From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res) - - assert.Contains(t, err.Error(), "Process is not ready (inactive)") -} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go deleted file mode 100755 index fa74e7b5..00000000 --- a/pkg/worker/worker.go +++ /dev/null @@ -1,220 +0,0 @@ -package worker - -import ( - "fmt" - "os" - "os/exec" - "strconv" - "strings" - "time" - - "github.com/spiral/errors" - "github.com/spiral/goridge/v3/pkg/relay" - "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/pkg/events" - "go.uber.org/multierr" -) - -type Options func(p *Process) - -// Process - supervised process with api over goridge.Relay. -type Process struct { - // created indicates at what time Process has been created. - created time.Time - - // updates parent supervisor or pool about Process events - events events.Handler - - // state holds information about current Process state, - // number of Process executions, buf status change time. - // publicly this object is receive-only and protected using Mutex - // and atomic counter. - state *StateImpl - - // underlying command with associated process, command must be - // provided to Process from outside in non-started form. CmdSource - // stdErr direction will be handled by Process to aggregate error message. - cmd *exec.Cmd - - // pid of the process, points to pid of underlying process and - // can be nil while process is not started. - pid int - - // communication bus with underlying process. - relay relay.Relay -} - -// InitBaseWorker creates new Process over given exec.cmd. -func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { - if cmd.Process != nil { - return nil, fmt.Errorf("can't attach to running process") - } - w := &Process{ - created: time.Now(), - events: events.NewEventsHandler(), - cmd: cmd, - state: NewWorkerState(StateInactive), - } - - // set self as stderr implementation (Writer interface) - w.cmd.Stderr = w - - // add options - for i := 0; i < len(options); i++ { - options[i](w) - } - - return w, nil -} - -func AddListeners(listeners ...events.Listener) Options { - return func(p *Process) { - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - -// Pid returns worker pid. -func (w *Process) Pid() int64 { - return int64(w.pid) -} - -// Created returns time worker was created at. -func (w *Process) Created() time.Time { - return w.created -} - -// AddListener registers new worker event listener. -func (w *Process) addListener(listener events.Listener) { - w.events.AddListener(listener) -} - -// State return receive-only Process state object, state can be used to safely access -// Process status, time when status changed and number of Process executions. -func (w *Process) State() State { - return w.state -} - -// AttachRelay attaches relay to the worker -func (w *Process) AttachRelay(rl relay.Relay) { - w.relay = rl -} - -// Relay returns relay attached to the worker -func (w *Process) Relay() relay.Relay { - return w.relay -} - -// String returns Process description. fmt.Stringer interface -func (w *Process) String() string { - st := w.state.String() - // we can safely compare pid to 0 - if w.pid != 0 { - st = st + ", pid:" + strconv.Itoa(w.pid) - } - - return fmt.Sprintf( - "(`%s` [%s], numExecs: %v)", - strings.Join(w.cmd.Args, " "), - st, - w.state.NumExecs(), - ) -} - -func (w *Process) Start() error { - err := w.cmd.Start() - if err != nil { - return err - } - w.pid = w.cmd.Process.Pid - return nil -} - -// Wait must be called once for each Process, call will be released once Process is -// complete and will return process error (if any), if stderr is presented it's value -// will be wrapped as WorkerError. Method will return error code if php process fails -// to find or Start the script. -func (w *Process) Wait() error { - const op = errors.Op("process_wait") - var err error - err = w.cmd.Wait() - - // If worker was destroyed, just exit - if w.State().Value() == StateDestroyed { - return nil - } - - // If state is different, and err is not nil, append it to the errors - if err != nil { - w.State().Set(StateErrored) - err = multierr.Combine(err, errors.E(op, err)) - } - - // closeRelay - // at this point according to the documentation (see cmd.Wait comment) - // if worker finishes with an error, message will be written to the stderr first - // and then process.cmd.Wait return an error - err2 := w.closeRelay() - if err2 != nil { - w.State().Set(StateErrored) - return multierr.Append(err, errors.E(op, err2)) - } - - if w.cmd.ProcessState.Success() { - w.State().Set(StateStopped) - return nil - } - - return err -} - -func (w *Process) closeRelay() error { - if w.relay != nil { - err := w.relay.Close() - if err != nil { - return err - } - } - return nil -} - -// Stop sends soft termination command to the Process and waits for process completion. -func (w *Process) Stop() error { - const op = errors.Op("process_stop") - w.state.Set(StateStopping) - err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true}) - if err != nil { - w.state.Set(StateKilling) - _ = w.cmd.Process.Signal(os.Kill) - return errors.E(op, errors.Network, err) - } - w.state.Set(StateStopped) - return nil -} - -// Kill kills underlying process, make sure to call Wait() func to gather -// error log from the stderr. Does not wait for process completion! -func (w *Process) Kill() error { - if w.State().Value() == StateDestroyed { - err := w.cmd.Process.Signal(os.Kill) - if err != nil { - return err - } - return nil - } - - w.state.Set(StateKilling) - err := w.cmd.Process.Signal(os.Kill) - if err != nil { - return err - } - w.state.Set(StateStopped) - return nil -} - -// Worker stderr -func (w *Process) Write(p []byte) (n int, err error) { - w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p}) - return len(p), nil -} diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go deleted file mode 100755 index 805f66b5..00000000 --- a/pkg/worker/worker_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package worker - -import ( - "os/exec" - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_OnStarted(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "broken", "pipes") - assert.Nil(t, cmd.Start()) - - w, err := InitBaseWorker(cmd) - assert.Nil(t, w) - assert.NotNil(t, err) - - assert.Equal(t, "can't attach to running process", err.Error()) -} diff --git a/pkg/worker_handler/constants.go b/pkg/worker_handler/constants.go deleted file mode 100644 index 3355d9c2..00000000 --- a/pkg/worker_handler/constants.go +++ /dev/null @@ -1,8 +0,0 @@ -package handler - -import "net/http" - -var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push") - -// TrailerHeaderKey http header key -var TrailerHeaderKey = http.CanonicalHeaderKey("trailer") diff --git a/pkg/worker_handler/errors.go b/pkg/worker_handler/errors.go deleted file mode 100644 index c3352a52..00000000 --- a/pkg/worker_handler/errors.go +++ /dev/null @@ -1,26 +0,0 @@ -//go:build !windows -// +build !windows - -package handler - -import ( - "errors" - "net" - "os" - "syscall" -) - -// Broken pipe -var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer") - -// handleWriteError just check if error was caused by aborted connection on linux -func handleWriteError(err error) error { - if netErr, ok2 := err.(*net.OpError); ok2 { - if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { - if errors.Is(syscallErr.Err, syscall.EPIPE) { - return errEPIPE - } - } - } - return err -} diff --git a/pkg/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go deleted file mode 100644 index 3c6c2186..00000000 --- a/pkg/worker_handler/errors_windows.go +++ /dev/null @@ -1,28 +0,0 @@ -//go:build windows -// +build windows - -package handler - -import ( - "errors" - "net" - "os" - "syscall" -) - -//Software caused connection abort. -//An established connection was aborted by the software in your host computer, -//possibly due to a data transmission time-out or protocol error. -var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer") - -// handleWriteError just check if error was caused by aborted connection on windows -func handleWriteError(err error) error { - if netErr, ok2 := err.(*net.OpError); ok2 { - if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { - if syscallErr.Err == syscall.WSAECONNABORTED { - return errEPIPE - } - } - } - return err -} diff --git a/pkg/worker_handler/handler.go b/pkg/worker_handler/handler.go deleted file mode 100644 index fc03563b..00000000 --- a/pkg/worker_handler/handler.go +++ /dev/null @@ -1,246 +0,0 @@ -package handler - -import ( - "net" - "net/http" - "strconv" - "strings" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/http/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -// MB is 1024 bytes -const MB uint64 = 1024 * 1024 - -// ErrorEvent represents singular http error event. -type ErrorEvent struct { - // Request contains client request, must not be stored. - Request *http.Request - - // Error - associated error, if any. - Error error - - // event timings - start time.Time - elapsed time.Duration -} - -// Elapsed returns duration of the invocation. -func (e *ErrorEvent) Elapsed() time.Duration { - return e.elapsed -} - -// ResponseEvent represents singular http response event. -type ResponseEvent struct { - // Request contains client request, must not be stored. - Request *Request - - // Response contains service response. - Response *Response - - // event timings - start time.Time - elapsed time.Duration -} - -// Elapsed returns duration of the invocation. -func (e *ResponseEvent) Elapsed() time.Duration { - return e.elapsed -} - -// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, -// parsed files and query, payload will include parsed form dataTree (if any). -type Handler struct { - maxRequestSize uint64 - uploads config.Uploads - trusted config.Cidrs - log logger.Logger - pool pool.Pool - mul sync.Mutex - lsn []events.Listener - internalHTTPCode uint64 -} - -// NewHandler return handle interface implementation -func NewHandler(maxReqSize uint64, internalHTTPCode uint64, uploads config.Uploads, trusted config.Cidrs, pool pool.Pool) (*Handler, error) { - if pool == nil { - return nil, errors.E(errors.Str("pool should be initialized")) - } - return &Handler{ - maxRequestSize: maxReqSize * MB, - uploads: uploads, - pool: pool, - trusted: trusted, - internalHTTPCode: internalHTTPCode, - }, nil -} - -// AddListener attaches handler event controller. -func (h *Handler) AddListener(l ...events.Listener) { - h.mul.Lock() - defer h.mul.Unlock() - - h.lsn = l -} - -// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. -func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - const op = errors.Op("serve_http") - start := time.Now() - - // validating request size - if h.maxRequestSize != 0 { - const op = errors.Op("http_handler_max_size") - if length := r.Header.Get("content-length"); length != "" { - // try to parse the value from the `content-length` header - size, err := strconv.ParseInt(length, 10, 64) - if err != nil { - // if got an error while parsing -> assign 500 code to the writer and return - http.Error(w, "", 500) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("error while parsing value from the `content-length` header")), start: start, elapsed: time.Since(start)}) - return - } - - if size > int64(h.maxRequestSize) { - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("request body max size is exceeded")), start: start, elapsed: time.Since(start)}) - http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), http.StatusBadRequest) - return - } - } - } - - req, err := NewRequest(r, h.uploads) - if err != nil { - // if pipe is broken, there is no sense to write the header - // in this case we just report about error - if err == errEPIPE { - h.sendEvent(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) - return - } - - http.Error(w, errors.E(op, err).Error(), 500) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) - return - } - - // proxy IP resolution - h.resolveIP(req) - - req.Open(h.log) - defer req.Close(h.log) - - p, err := req.Payload() - if err != nil { - h.handleError(w, r, start, err) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) - return - } - - rsp, err := h.pool.Exec(p) - if err != nil { - h.handleError(w, r, start, err) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) - return - } - - resp, err := NewResponse(rsp) - if err != nil { - h.handleError(w, r, start, err) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) - return - } - - h.handleResponse(req, resp, start) - err = resp.Write(w) - if err != nil { - http.Error(w, errors.E(op, err).Error(), 500) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) - } -} - -// handleError will handle internal RR errors and return 500 -func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, start time.Time, err error) { - const op = errors.Op("handle_error") - // internal error types, user should not see them - if errors.Is(errors.SoftJob, err) || - errors.Is(errors.WatcherStopped, err) || - errors.Is(errors.WorkerAllocate, err) || - errors.Is(errors.NoFreeWorkers, err) || - errors.Is(errors.ExecTTL, err) || - errors.Is(errors.IdleTTL, err) || - errors.Is(errors.TTL, err) || - errors.Is(errors.Encode, err) || - errors.Is(errors.Decode, err) || - errors.Is(errors.Network, err) { - // write an internal server error - w.WriteHeader(int(h.internalHTTPCode)) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) - } -} - -// handleResponse triggers response event. -func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) { - h.sendEvent(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)}) -} - -// sendEvent invokes event handler if any. -func (h *Handler) sendEvent(event interface{}) { - if h.lsn != nil { - for i := range h.lsn { - // do not block the pipeline - // TODO not a good approach, redesign event bus - i := i - go func() { - h.lsn[i](event) - }() - } - } -} - -// get real ip passing multiple proxy -func (h *Handler) resolveIP(r *Request) { - if h.trusted.IsTrusted(r.RemoteAddr) == false { //nolint:gosimple - return - } - - if r.Header.Get("X-Forwarded-For") != "" { - ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",") - ipCount := len(ips) - - for i := ipCount - 1; i >= 0; i-- { - addr := strings.TrimSpace(ips[i]) - if net.ParseIP(addr) != nil { - r.RemoteAddr = addr - return - } - } - - return - } - - // The logic here is the following: - // In general case, we only expect X-Real-Ip header. If it exist, we get the IP address from header and set request Remote address - // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers - // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF. - // CF-Connecting-IP is an Enterprise feature and we check it last in order. - // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string - if r.Header.Get("X-Real-Ip") != "" { - r.RemoteAddr = FetchIP(r.Header.Get("X-Real-Ip")) - return - } - - if r.Header.Get("True-Client-IP") != "" { - r.RemoteAddr = FetchIP(r.Header.Get("True-Client-IP")) - return - } - - if r.Header.Get("CF-Connecting-IP") != "" { - r.RemoteAddr = FetchIP(r.Header.Get("CF-Connecting-IP")) - } -} diff --git a/pkg/worker_handler/parse.go b/pkg/worker_handler/parse.go deleted file mode 100644 index 2790da2a..00000000 --- a/pkg/worker_handler/parse.go +++ /dev/null @@ -1,149 +0,0 @@ -package handler - -import ( - "net/http" - - "github.com/spiral/roadrunner/v2/plugins/http/config" -) - -// MaxLevel defines maximum tree depth for incoming request data and files. -const MaxLevel = 127 - -type dataTree map[string]interface{} -type fileTree map[string]interface{} - -// parseData parses incoming request body into data tree. -func parseData(r *http.Request) dataTree { - data := make(dataTree) - if r.PostForm != nil { - for k, v := range r.PostForm { - data.push(k, v) - } - } - - if r.MultipartForm != nil { - for k, v := range r.MultipartForm.Value { - data.push(k, v) - } - } - - return data -} - -// pushes value into data tree. -func (d dataTree) push(k string, v []string) { - keys := FetchIndexes(k) - if len(keys) <= MaxLevel { - d.mount(keys, v) - } -} - -// mount mounts data tree recursively. -func (d dataTree) mount(i []string, v []string) { - if len(i) == 1 { - // single value context (last element) - d[i[0]] = v[len(v)-1] - return - } - - if len(i) == 2 && i[1] == "" { - // non associated array of elements - d[i[0]] = v - return - } - - if p, ok := d[i[0]]; ok { - p.(dataTree).mount(i[1:], v) - return - } - - d[i[0]] = make(dataTree) - d[i[0]].(dataTree).mount(i[1:], v) -} - -// parse incoming dataTree request into JSON (including contentMultipart form dataTree) -func parseUploads(r *http.Request, cfg config.Uploads) *Uploads { - u := &Uploads{ - cfg: cfg, - tree: make(fileTree), - list: make([]*FileUpload, 0), - } - - for k, v := range r.MultipartForm.File { - files := make([]*FileUpload, 0, len(v)) - for _, f := range v { - files = append(files, NewUpload(f)) - } - - u.list = append(u.list, files...) - u.tree.push(k, files) - } - - return u -} - -// pushes new file upload into it's proper place. -func (d fileTree) push(k string, v []*FileUpload) { - keys := FetchIndexes(k) - if len(keys) <= MaxLevel { - d.mount(keys, v) - } -} - -// mount mounts data tree recursively. -func (d fileTree) mount(i []string, v []*FileUpload) { - if len(i) == 1 { - // single value context - d[i[0]] = v[0] - return - } - - if len(i) == 2 && i[1] == "" { - // non associated array of elements - d[i[0]] = v - return - } - - if p, ok := d[i[0]]; ok { - p.(fileTree).mount(i[1:], v) - return - } - - d[i[0]] = make(fileTree) - d[i[0]].(fileTree).mount(i[1:], v) -} - -// FetchIndexes parses input name and splits it into separate indexes list. -func FetchIndexes(s string) []string { - var ( - pos int - ch string - keys = make([]string, 1) - ) - - for _, c := range s { - ch = string(c) - switch ch { - case " ": - // ignore all spaces - continue - case "[": - pos = 1 - continue - case "]": - if pos == 1 { - keys = append(keys, "") - } - pos = 2 - default: - if pos == 1 || pos == 2 { - keys = append(keys, "") - } - - keys[len(keys)-1] += ch - pos = 0 - } - } - - return keys -} diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go deleted file mode 100644 index 3d60897b..00000000 --- a/pkg/worker_handler/request.go +++ /dev/null @@ -1,189 +0,0 @@ -package handler - -import ( - "fmt" - "io/ioutil" - "net" - "net/http" - "net/url" - "strings" - - j "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/http/attributes" - "github.com/spiral/roadrunner/v2/plugins/http/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -var json = j.ConfigCompatibleWithStandardLibrary - -const ( - defaultMaxMemory = 32 << 20 // 32 MB - contentNone = iota + 900 - contentStream - contentMultipart - contentFormData -) - -// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. -type Request struct { - // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address. - RemoteAddr string `json:"remoteAddr"` - - // Protocol includes HTTP protocol version. - Protocol string `json:"protocol"` - - // Method contains name of HTTP method used for the request. - Method string `json:"method"` - - // URI contains full request URI with scheme and query. - URI string `json:"uri"` - - // Header contains list of request headers. - Header http.Header `json:"headers"` - - // Cookies contains list of request cookies. - Cookies map[string]string `json:"cookies"` - - // RawQuery contains non parsed query string (to be parsed on php end). - RawQuery string `json:"rawQuery"` - - // Parsed indicates that request body has been parsed on RR end. - Parsed bool `json:"parsed"` - - // Uploads contains list of uploaded files, their names, sized and associations with temporary files. - Uploads *Uploads `json:"uploads"` - - // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions. - Attributes map[string]interface{} `json:"attributes"` - - // request body can be parsedData or []byte - body interface{} -} - -func FetchIP(pair string) string { - if !strings.ContainsRune(pair, ':') { - return pair - } - - addr, _, _ := net.SplitHostPort(pair) - return addr -} - -// NewRequest creates new PSR7 compatible request using net/http request. -func NewRequest(r *http.Request, cfg config.Uploads) (*Request, error) { - req := &Request{ - RemoteAddr: FetchIP(r.RemoteAddr), - Protocol: r.Proto, - Method: r.Method, - URI: URI(r), - Header: r.Header, - Cookies: make(map[string]string), - RawQuery: r.URL.RawQuery, - Attributes: attributes.All(r), - } - - for _, c := range r.Cookies() { - if v, err := url.QueryUnescape(c.Value); err == nil { - req.Cookies[c.Name] = v - } - } - - switch req.contentType() { - case contentNone: - return req, nil - - case contentStream: - var err error - req.body, err = ioutil.ReadAll(r.Body) - return req, err - - case contentMultipart: - if err := r.ParseMultipartForm(defaultMaxMemory); err != nil { - return nil, err - } - - req.Uploads = parseUploads(r, cfg) - fallthrough - case contentFormData: - if err := r.ParseForm(); err != nil { - return nil, err - } - - req.body = parseData(r) - } - - req.Parsed = true - return req, nil -} - -// Open moves all uploaded files to temporary directory so it can be given to php later. -func (r *Request) Open(log logger.Logger) { - if r.Uploads == nil { - return - } - - r.Uploads.Open(log) -} - -// Close clears all temp file uploads -func (r *Request) Close(log logger.Logger) { - if r.Uploads == nil { - return - } - - r.Uploads.Clear(log) -} - -// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open -// files prior to calling this method. -func (r *Request) Payload() (*payload.Payload, error) { - const op = errors.Op("marshal_payload") - p := &payload.Payload{} - - var err error - if p.Context, err = json.Marshal(r); err != nil { - return nil, errors.E(op, errors.Encode, err) - } - - if r.Parsed { - if p.Body, err = json.Marshal(r.body); err != nil { - return nil, errors.E(op, errors.Encode, err) - } - } else if r.body != nil { - p.Body = r.body.([]byte) - } - - return p, nil -} - -// contentType returns the payload content type. -func (r *Request) contentType() int { - if r.Method == "HEAD" || r.Method == "OPTIONS" { - return contentNone - } - - ct := r.Header.Get("content-type") - if strings.Contains(ct, "application/x-www-form-urlencoded") { - return contentFormData - } - - if strings.Contains(ct, "multipart/form-data") { - return contentMultipart - } - - return contentStream -} - -// URI fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). -func URI(r *http.Request) string { - if r.URL.Host != "" { - return r.URL.String() - } - if r.TLS != nil { - return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) - } - - return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) -} diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go deleted file mode 100644 index d22f09d4..00000000 --- a/pkg/worker_handler/response.go +++ /dev/null @@ -1,105 +0,0 @@ -package handler - -import ( - "io" - "net/http" - "strings" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/payload" -) - -// Response handles PSR7 response logic. -type Response struct { - // Status contains response status. - Status int `json:"status"` - - // Header contains list of response headers. - Headers map[string][]string `json:"headers"` - - // associated Body payload. - Body interface{} -} - -// NewResponse creates new response based on given pool payload. -func NewResponse(p *payload.Payload) (*Response, error) { - const op = errors.Op("http_response") - r := &Response{Body: p.Body} - if err := json.Unmarshal(p.Context, r); err != nil { - return nil, errors.E(op, errors.Decode, err) - } - - return r, nil -} - -// Write writes response headers, status and body into ResponseWriter. -func (r *Response) Write(w http.ResponseWriter) error { - // INFO map is the reference type in golang - p := handlePushHeaders(r.Headers) - if pusher, ok := w.(http.Pusher); ok { - for _, v := range p { - err := pusher.Push(v, nil) - if err != nil { - return err - } - } - } - - handleTrailers(r.Headers) - for n, h := range r.Headers { - for _, v := range h { - w.Header().Add(n, v) - } - } - - w.WriteHeader(r.Status) - - if data, ok := r.Body.([]byte); ok { - _, err := w.Write(data) - if err != nil { - return handleWriteError(err) - } - } - - if rc, ok := r.Body.(io.Reader); ok { - if _, err := io.Copy(w, rc); err != nil { - return err - } - } - - return nil -} - -func handlePushHeaders(h map[string][]string) []string { - var p []string - pushHeader, ok := h[http2pushHeaderKey] - if !ok { - return p - } - - p = append(p, pushHeader...) - - delete(h, http2pushHeaderKey) - - return p -} - -func handleTrailers(h map[string][]string) { - trailers, ok := h[TrailerHeaderKey] - if !ok { - return - } - - for _, tr := range trailers { - for _, n := range strings.Split(tr, ",") { - n = strings.Trim(n, "\t ") - if v, ok := h[n]; ok { - h["Trailer:"+n] = v - - delete(h, n) - } - } - } - - delete(h, TrailerHeaderKey) -} diff --git a/pkg/worker_handler/uploads.go b/pkg/worker_handler/uploads.go deleted file mode 100644 index e695000e..00000000 --- a/pkg/worker_handler/uploads.go +++ /dev/null @@ -1,159 +0,0 @@ -package handler - -import ( - "github.com/spiral/roadrunner/v2/plugins/http/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - - "io" - "io/ioutil" - "mime/multipart" - "os" - "sync" -) - -const ( - // UploadErrorOK - no error, the file uploaded with success. - UploadErrorOK = 0 - - // UploadErrorNoFile - no file was uploaded. - UploadErrorNoFile = 4 - - // UploadErrorNoTmpDir - missing a temporary folder. - UploadErrorNoTmpDir = 6 - - // UploadErrorCantWrite - failed to write file to disk. - UploadErrorCantWrite = 7 - - // UploadErrorExtension - forbidden file extension. - UploadErrorExtension = 8 -) - -// Uploads tree manages uploaded files tree and temporary files. -type Uploads struct { - // associated temp directory and forbidden extensions. - cfg config.Uploads - - // pre processed data tree for Uploads. - tree fileTree - - // flat list of all file Uploads. - list []*FileUpload -} - -// MarshalJSON marshal tree tree into JSON. -func (u *Uploads) MarshalJSON() ([]byte, error) { - return json.Marshal(u.tree) -} - -// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors -// will be handled individually. -func (u *Uploads) Open(log logger.Logger) { - var wg sync.WaitGroup - for _, f := range u.list { - wg.Add(1) - go func(f *FileUpload) { - defer wg.Done() - err := f.Open(u.cfg) - if err != nil && log != nil { - log.Error("error opening the file", "err", err) - } - }(f) - } - - wg.Wait() -} - -// Clear deletes all temporary files. -func (u *Uploads) Clear(log logger.Logger) { - for _, f := range u.list { - if f.TempFilename != "" && exists(f.TempFilename) { - err := os.Remove(f.TempFilename) - if err != nil && log != nil { - log.Error("error removing the file", "err", err) - } - } - } -} - -// FileUpload represents singular file NewUpload. -type FileUpload struct { - // ID contains filename specified by the client. - Name string `json:"name"` - - // Mime contains mime-type provided by the client. - Mime string `json:"mime"` - - // Size of the uploaded file. - Size int64 `json:"size"` - - // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php - Error int `json:"error"` - - // TempFilename points to temporary file location. - TempFilename string `json:"tmpName"` - - // associated file header - header *multipart.FileHeader -} - -// NewUpload wraps net/http upload into PRS-7 compatible structure. -func NewUpload(f *multipart.FileHeader) *FileUpload { - return &FileUpload{ - Name: f.Filename, - Mime: f.Header.Get("Content-Type"), - Error: UploadErrorOK, - header: f, - } -} - -// Open moves file content into temporary file available for PHP. -// NOTE: -// There is 2 deferred functions, and in case of getting 2 errors from both functions -// error from close of temp file would be overwritten by error from the main file -// STACK -// DEFER FILE CLOSE (2) -// DEFER TMP CLOSE (1) -func (f *FileUpload) Open(cfg config.Uploads) (err error) { - if cfg.Forbids(f.Name) { - f.Error = UploadErrorExtension - return nil - } - - file, err := f.header.Open() - if err != nil { - f.Error = UploadErrorNoFile - return err - } - - defer func() { - // close the main file - err = file.Close() - }() - - tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload") - if err != nil { - // most likely cause of this issue is missing tmp dir - f.Error = UploadErrorNoTmpDir - return err - } - - f.TempFilename = tmp.Name() - defer func() { - // close the temp file - err = tmp.Close() - }() - - if f.Size, err = io.Copy(tmp, file); err != nil { - f.Error = UploadErrorCantWrite - } - - return err -} - -// exists if file exists. -func exists(path string) bool { - if _, err := os.Stat(path); os.IsNotExist(err) { - return false - } - return true -} diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go deleted file mode 100644 index 5605f1e0..00000000 --- a/pkg/worker_watcher/container/channel/vec.go +++ /dev/null @@ -1,107 +0,0 @@ -package channel - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -type Vec struct { - sync.RWMutex - // destroy signal - destroy uint64 - // channel with the workers - workers chan worker.BaseProcess -} - -func NewVector(len uint64) *Vec { - vec := &Vec{ - destroy: 0, - workers: make(chan worker.BaseProcess, len), - } - - return vec -} - -// Push is O(1) operation -// In case of TTL and full channel O(n) worst case, where n is len of the channel -func (v *Vec) Push(w worker.BaseProcess) { - // Non-blocking channel send - select { - case v.workers <- w: - // default select branch is only possible when dealing with TTL - // because in that case, workers in the v.workers channel can be TTL-ed and killed - // but presenting in the channel - default: - // Stop Pop operations - v.Lock() - defer v.Unlock() - - /* - we can be in the default branch by the following reasons: - 1. TTL is set with no requests during the TTL - 2. Violated Get <-> Release operation (how ??) - */ - for i := 0; i < len(v.workers); i++ { - /* - We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. - */ - wrk := <-v.workers - switch wrk.State().Value() { - // skip good states, put worker back - case worker.StateWorking, worker.StateReady: - // put the worker back - // generally, while send and receive operations are concurrent (from the channel), channel behave - // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO - v.workers <- wrk - continue - /* - Bad states are here. - */ - default: - // kill the current worker (just to be sure it's dead) - if wrk != nil { - _ = wrk.Kill() - } - // replace with the new one and return from the loop - // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker - // But this case will be handled in the worker_watcher::Get - v.workers <- w - return - } - } - } -} - -func (v *Vec) Remove(_ int64) {} - -func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) { - /* - if *addr == old { - *addr = new - return true - } - */ - - if atomic.LoadUint64(&v.destroy) == 1 { - return nil, errors.E(errors.WatcherStopped) - } - - // used only for the TTL-ed workers - v.RLock() - defer v.RUnlock() - - select { - case w := <-v.workers: - return w, nil - case <-ctx.Done(): - return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) - } -} - -func (v *Vec) Destroy() { - atomic.StoreUint64(&v.destroy, 1) -} diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go deleted file mode 100644 index edf81d60..00000000 --- a/pkg/worker_watcher/container/queue/queue.go +++ /dev/null @@ -1,102 +0,0 @@ -package queue - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -const ( - initialSize = 1 - maxInitialSize = 8 - maxInternalSliceSize = 10 -) - -type Node struct { - w []worker.BaseProcess - // LL - n *Node -} - -type Queue struct { - mu sync.Mutex - - head *Node - tail *Node - - curr uint64 - len uint64 - - sliceSize uint64 -} - -func NewQueue() *Queue { - q := &Queue{ - mu: sync.Mutex{}, - head: nil, - tail: nil, - curr: 0, - len: 0, - sliceSize: 0, - } - - return q -} - -func (q *Queue) Push(w worker.BaseProcess) { - q.mu.Lock() - - if q.head == nil { - h := newNode(initialSize) - q.head = h - q.tail = h - q.sliceSize = maxInitialSize - } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) { - n := newNode(maxInternalSliceSize) - q.tail.n = n - q.tail = n - q.sliceSize = maxInternalSliceSize - } - - q.tail.w = append(q.tail.w, w) - - atomic.AddUint64(&q.len, 1) - - q.mu.Unlock() -} - -func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) { - q.mu.Lock() - - if q.head == nil { - return nil, nil - } - - w := q.head.w[q.curr] - q.head.w[q.curr] = nil - atomic.AddUint64(&q.len, ^uint64(0)) - atomic.AddUint64(&q.curr, 1) - - if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) { - n := q.head.n - q.head.n = nil - q.head = n - q.curr = 0 - } - - q.mu.Unlock() - - return w, nil -} - -func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) { - -} - -func (q *Queue) Destroy() {} - -func newNode(capacity int) *Node { - return &Node{w: make([]worker.BaseProcess, 0, capacity)} -} diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go deleted file mode 100755 index 83f8e627..00000000 --- a/pkg/worker_watcher/worker_watcher.go +++ /dev/null @@ -1,318 +0,0 @@ -package worker_watcher //nolint:stylecheck - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel" - "github.com/spiral/roadrunner/v2/utils" -) - -// Vector interface represents vector container -type Vector interface { - // Push used to put worker to the vector - Push(worker.BaseProcess) - // Pop used to get worker from the vector - Pop(ctx context.Context) (worker.BaseProcess, error) - // Remove worker with provided pid - Remove(pid int64) - // Destroy used to stop releasing the workers - Destroy() - - // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation - // Replace(prevPid int64, newWorker worker.BaseProcess) -} - -type workerWatcher struct { - sync.RWMutex - container Vector - // used to control Destroy stage (that all workers are in the container) - numWorkers *uint64 - - workers []worker.BaseProcess - - allocator worker.Allocator - allocateTimeout time.Duration - events events.Handler -} - -// NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { - ww := &workerWatcher{ - container: channel.NewVector(numWorkers), - - // pass a ptr to the number of workers to avoid blocking in the TTL loop - numWorkers: utils.Uint64(numWorkers), - allocateTimeout: allocateTimeout, - workers: make([]worker.BaseProcess, 0, numWorkers), - - allocator: allocator, - events: events, - } - - return ww -} - -func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { - for i := 0; i < len(workers); i++ { - ww.container.Push(workers[i]) - // add worker to watch slice - ww.workers = append(ww.workers, workers[i]) - - go func(swc worker.BaseProcess) { - ww.wait(swc) - }(workers[i]) - } - return nil -} - -// Take is not a thread safe operation -func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { - const op = errors.Op("worker_watcher_get_free_worker") - - // thread safe operation - w, err := ww.container.Pop(ctx) - if err != nil { - if errors.Is(errors.WatcherStopped, err) { - return nil, errors.E(op, errors.WatcherStopped) - } - - return nil, errors.E(op, err) - } - - // fast path, worker not nil and in the ReadyState - if w.State().Value() == worker.StateReady { - return w, nil - } - - // ========================================================= - // SLOW PATH - _ = w.Kill() - // no free workers in the container or worker not in the ReadyState (TTL-ed) - // try to continuously get free one - for { - w, err = ww.container.Pop(ctx) - if err != nil { - if errors.Is(errors.WatcherStopped, err) { - return nil, errors.E(op, errors.WatcherStopped) - } - return nil, errors.E(op, err) - } - - if err != nil { - return nil, errors.E(op, err) - } - - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - return w, nil - case worker.StateWorking: // how?? - ww.container.Push(w) // put it back, let worker finish the work - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateStopping: - // worker doing no work because it in the container - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker - continue - } - } -} - -func (ww *workerWatcher) Allocate() error { - const op = errors.Op("worker_watcher_allocate_new") - - sw, err := ww.allocator() - if err != nil { - // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), - }) - - // if no timeout, return error immediately - if ww.allocateTimeout == 0 { - return errors.E(op, errors.WorkerAllocate, err) - } - - // every half of a second - allocateFreq := time.NewTicker(time.Millisecond * 500) - - tt := time.After(ww.allocateTimeout) - for { - select { - case <-tt: - // reduce number of workers - atomic.AddUint64(ww.numWorkers, ^uint64(0)) - allocateFreq.Stop() - // timeout exceed, worker can't be allocated - return errors.E(op, errors.WorkerAllocate, err) - - case <-allocateFreq.C: - sw, err = ww.allocator() - if err != nil { - // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), - }) - continue - } - - // reallocated - allocateFreq.Stop() - goto done - } - } - } - -done: - // add worker to Wait - ww.addToWatch(sw) - - ww.Lock() - // add new worker to the workers slice (to get information about workers in parallel) - ww.workers = append(ww.workers, sw) - ww.Unlock() - - // push the worker to the container - ww.Release(sw) - return nil -} - -// Remove worker -func (ww *workerWatcher) Remove(wb worker.BaseProcess) { - ww.Lock() - defer ww.Unlock() - - // set remove state - pid := wb.Pid() - - // worker will be removed on the Get operation - for i := 0; i < len(ww.workers); i++ { - if ww.workers[i].Pid() == pid { - ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) - // kill worker, just to be sure it's dead - _ = wb.Kill() - return - } - } -} - -// Release O(1) operation -func (ww *workerWatcher) Release(w worker.BaseProcess) { - switch w.State().Value() { - case worker.StateReady: - ww.container.Push(w) - default: - _ = w.Kill() - } -} - -// Destroy all underlying container (but let them complete the task) -func (ww *workerWatcher) Destroy(_ context.Context) { - // destroy container, we don't use ww mutex here, since we should be able to push worker - ww.Lock() - // do not release new workers - ww.container.Destroy() - ww.Unlock() - - tt := time.NewTicker(time.Millisecond * 100) - defer tt.Stop() - for { //nolint:gosimple - select { - case <-tt.C: - ww.Lock() - // that might be one of the workers is working - if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { - ww.Unlock() - continue - } - // All container at this moment are in the container - // Pop operation is blocked, push can't be done, since it's not possible to pop - for i := 0; i < len(ww.workers); i++ { - ww.workers[i].State().Set(worker.StateDestroyed) - // kill the worker - _ = ww.workers[i].Kill() - } - return - } - } -} - -// List - this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) List() []worker.BaseProcess { - ww.RLock() - defer ww.RUnlock() - - if len(ww.workers) == 0 { - return nil - } - - base := make([]worker.BaseProcess, 0, len(ww.workers)) - for i := 0; i < len(ww.workers); i++ { - base = append(base, ww.workers[i]) - } - - return base -} - -func (ww *workerWatcher) wait(w worker.BaseProcess) { - const op = errors.Op("worker_watcher_wait") - err := w.Wait() - if err != nil { - ww.events.Push(events.WorkerEvent{ - Event: events.EventWorkerError, - Worker: w, - Payload: errors.E(op, err), - }) - } - - // remove worker - ww.Remove(w) - - if w.State().Value() == worker.StateDestroyed { - // worker was manually destroyed, no need to replace - ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) - return - } - - // set state as stopped - w.State().Set(worker.StateStopped) - - err = ww.Allocate() - if err != nil { - ww.events.Push(events.PoolEvent{ - Event: events.EventPoolError, - Payload: errors.E(op, err), - }) - - // no workers at all, panic - if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { - panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err))) - } - } -} - -func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { - go func() { - ww.wait(wb) - }() -} |