summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/doc/README.md21
-rw-r--r--pkg/doc/pool_workflow.drawio1
-rw-r--r--pkg/doc/pool_workflow.svg3
-rwxr-xr-xpkg/events/general.go39
-rw-r--r--pkg/events/interface.go14
-rw-r--r--pkg/events/pool_events.go66
-rw-r--r--pkg/events/worker_events.go33
-rwxr-xr-xpkg/payload/payload.go16
-rw-r--r--pkg/pool/config.go75
-rw-r--r--pkg/pool/interface.go29
-rwxr-xr-xpkg/pool/static_pool.go327
-rwxr-xr-xpkg/pool/static_pool_test.go647
-rwxr-xr-xpkg/pool/supervisor_pool.go222
-rw-r--r--pkg/pool/supervisor_test.go248
-rw-r--r--pkg/transport/interface.go21
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go162
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go456
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go478
-rwxr-xr-xpkg/transport/socket/socket_factory.go228
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go489
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go572
-rw-r--r--pkg/worker/interface.go56
-rwxr-xr-xpkg/worker/sync_worker.go244
-rwxr-xr-xpkg/worker/sync_worker_test.go34
-rwxr-xr-xpkg/worker/worker.go318
-rwxr-xr-xpkg/worker/worker_test.go19
-rw-r--r--pkg/worker_watcher/interface.go30
-rw-r--r--pkg/worker_watcher/stack.go142
-rw-r--r--pkg/worker_watcher/stack_test.go142
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go165
30 files changed, 5297 insertions, 0 deletions
diff --git a/pkg/doc/README.md b/pkg/doc/README.md
new file mode 100644
index 00000000..4f726f4a
--- /dev/null
+++ b/pkg/doc/README.md
@@ -0,0 +1,21 @@
+This is the drawio diagrams showing basic workflows inside RoadRunner 2.0
+
+Simple HTTP workflow description:
+![alt text](pool_workflow.svg)
+
+1. Allocate sync workers. When plugin starts (which use workers pool), then it allocates required number of processes
+ via `cmd.exec` command.
+
+2. When user send HTTP request to the RR2, HTTP plugin receive it and transfer to the workers pool `Exec/ExecWithContex`
+method. And workers pool ask Worker watcher to get free worker.
+
+3. Workers watcher uses stack data structure under the hood and making POP operation to get first free worker. If there are
+no workers in the `stack`, watcher waits for the specified via config (`allocate_timeout`) time.
+
+4. Stack returns free worker to the watcher.
+5. Watcher returns that worker to the `pool`.
+6. Pool invoke `Exec/ExecWithTimeout` method on the golang worker with provided request payload.
+7. Golang worker send that request to the PHP worker via various set of transports (`pkg/transport` package).
+8. PHP worker send back response to the golang worker (or error via stderr).
+9. Golang worker return response payload to the pool.
+10. Pool process this response and return answer to the user. \ No newline at end of file
diff --git a/pkg/doc/pool_workflow.drawio b/pkg/doc/pool_workflow.drawio
new file mode 100644
index 00000000..3f74d0fc
--- /dev/null
+++ b/pkg/doc/pool_workflow.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-01-24T16:29:37.978Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="8v25I0qLU_vMqqVrx7cN" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">7Vxbd6M4Ev41OWfnoX0Qkrg8Jul09+zOTGc6vSfTjwRkm22MvCB37Pn1IwGydcExsTGO03HOiUEgLqqqr6o+lXwBr2fLj0U0n/5OE5JduE6yvIDvL1wXAMflX6Jl1bQ4jlO3TIo0qduUhrv0byJPbFoXaULKpq1uYpRmLJ3rjTHNcxIzrS0qCvqonzamWaI1zKMJsRru4iizW+/ThE3li3nh5sAnkk6m8tYeRvWRWSTPbl6lnEYJfVSa4M0FvC4oZfXWbHlNMjF8+sB82HJ0/WQFyVmXDn/hCPvfPk++/Ocm+HyPr5Z/TsfvvOYyP6Js0bzyhetl/IJXU37Im4itu3Q25wPiOv8tScG/CvL/BSmZPJHfcnNu865sJUewoIs8IeIZHH74cZoycjePYnH0kSuNuBGbZXwP8M0xzdmHaJZmQl+u6aJIqxv+Qfi4XUVZOsl5e0bG4k4/SMFSLqnLppnR+fr+6qjIN+Snk6XS1IzSR0JnhBUrfkpzFGNcd2mUFrmNFj+qGhA2GjBVpB9ItY0atZusL74RDN9oZPMMOYHQGliScE1tdmnBpnRC8yi72bRe6UO/Oec3KoaqGvD/EcZWjdlFC0Z1cZBlyv4S3Ucubna/KYfeL5tLVzsruZPzF1Z7if1v6sFNv2pPdtSE/2/CHooozUv+0r/TnD4l2pIrSkyeGr0GOaJiQtgT50lTFUP7pKYUJItY+kMHiTahN11vacqfea1hoedpGgYc2SCvUT9q083QnfVz7K9Orm32H8XrcNgS/8pVHgt9p8X3yv7+dfv5ln/ROSn4a9P8l1Zl/C164PCvKZA02ZiLjF/JNtpZmiS1rpIy/Tt6qK4nlGEuXr0aDHx1gd/vpR5PWpKFB2sn0jyFBtNtOPHOGTke0iUZ1HsH6sc7AJ2RDkL8ZsB1lA80VYaOxyU5jrZgS1vgmwbUQnE939e9RT8agAz5hyMHDSZv/7TOxledjdPV2fiaswEv3tnAvn3NQQGGRC7FxvEZ27hU4cNtnJs4QlDHYtiPjUPDxvEogEPZOHTsCEDcae3030RfwTvwdNm7vYgemaJ/B9DIw0MJ347+7iupi8G+j1g8bVGA5+VxaZZd04wWVV84HhMvjnl7yQr6nShHEj98cA7C4+55XQgNlwqbCErN63BjF2pa5x0tqwPWMA/qaPdJ6rSMbrT2uYO7WbejmwV9u9mq62VRRCvlhAbNthp8YOpeE71vtKe+Yr9m7lp2fpllNI6YleSVZwz4YBsS7AP4GHlHiecD3Y0gA1KOGMxDSwnaUecnFDaAGGhi6SmwQ7pvxyPgDyZuS9q/5ilLuSS5N+jTpycRCcatPt2LA/IwHsan+4GOq4HT4tOdFp9uGmB/Pr09bXpjajt5dXgqr36YzD3L6lQ69Y572vtzT6ukXvfEmjm6P/T7AV7fzKu8keerrKk7GA4jSyXcN+nX0oc8/NWl1I/0PYsz9Qfzu/CksA8UzFf40+flcv7pkrmusA/dF4X70MZ974yNHPZm5Dy0xrgnqwZoBAejRVE7SJ/V1Acf/B2GXO3dkiLlwyV076VYN+rbuvfiaoADA82PyKTiQ9cOPtrVwceHdcAONtT/CPwRaqEOZDHTOeKbNO4+8M1xJROxL74NAGZ2DHpH8qR65nJO81IkJxyL+L+p2FyUZ52h9CfeutjCmKHphwj0Xf2qbjAabNrHNuebJYn5pcTXfcqm11wQZMksHTh/nsic+2nlidrmfo7GE21JB88q0jhdkYWs1DuT6R/gID1C4A27PP6OHsdx+a6d0aAzdgnSyPrw+BAiA7t7mi1wDdoCgAHrQNzQEvivAgKjir4cF0R8recIryt0LETbii7EAxb2rMJPqh6uo08cyqmYA9UDADxCoakgCKmsJhhMW+yqoTsWxd8rR8r96Pbp5INCCEyCBLWFEIH7IOpgTxJCACc4df0ItI33tOsENBrS7UhDrqMItdtpaEi3Y1TRtXZzky46h9JhA6QHrqVNRn7wNZ0RuniN+QFwPd25tyYIshZwkAQB2umaNfDlNJqLzXFGlpdiuVtlLEmz+T7OorJMY8NYN9F8r5O++5SV7W+qOOzZVBU5t2G4bDvUmzuSQZPMHzCWcdRvbq38aSMdDQ7RGXYNEbTZJXN94BZtXcyyy5hRNQas4sVbWqZieRE/5YEyRmctQWK9ym/bksEjef7QWBAo1/4pOgMH9ft2sf6a2GuGXuX1tpR0V1TP0UJ0ESWUdcAwkJgAMDFcptAqhoeBLSizPKs3QSG70voLiUldG7IhYV+9YHyj2iN0bMHI9SbDCMZe0maJgV8mnZdEl4TEsJKjK+O5DyN2SFMnZW0hzXjcHBlg1D3PoDzlhMUO1ILHQi1kc0v2mO9MVhRZmEawdhpKCETnJK9bmuwleFIuL6/OXWrq7sXLHQOdQycwjNXxARo27EC+pUW32WKS5rxtXeparVJ+9bjqOUYsGUIbVuUvPAwCq7Kc+q0K6qj0g+QDX0gZFLYz1fYVxOfBCuNthrhXpSuSBnho5ojaS1kNtvn4NBE+7erwM6cYUWfeond/fpiN2ylncM423uP6cBA6ejlzP/OCAIhCaeWjzwfLepQByojsWYYtqxkOWiOMxV8rD1x9OsheHFf61Z+eUliMXtpCI2zzbzVdz990ldEoefXxL0BGjUQb4SOXGgwT/9q0whfy46fje5Br1MMGLUTcoHwPtjHs09ev4ket5k3yeBCU7QImA+qCmLT/HMJDwHHmyYmRZ7DW2FhkIxfDaIjVwoZCdCQheHb1gDXuL2lGa6sQdgZvuGuCdqJZJ+yH+jW6zzo54QgHys+Q6TpmTnkcmQyShL9KBlGadUBY1SRzmpNngW4bHOiAsUu1LMtfc8Z94K9jBCt+S6UKalE391iximczBB9pFuUT3rYllvzJRAbd3SJrjS+PJzM7lrn9JDxm5+nEVy0wLHmdJwQWoF7kxXc3PxNcQ+fm55bhzT8=</diagram></mxfile> \ No newline at end of file
diff --git a/pkg/doc/pool_workflow.svg b/pkg/doc/pool_workflow.svg
new file mode 100644
index 00000000..1e043eaa
--- /dev/null
+++ b/pkg/doc/pool_workflow.svg
@@ -0,0 +1,3 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="1200px" height="811px" viewBox="-0.5 -0.5 1200 811" content="&lt;mxfile host=&quot;Electron&quot; modified=&quot;2021-01-24T14:08:35.229Z&quot; agent=&quot;5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36&quot; etag=&quot;XUF1gTxKr8mJfXGLcVd8&quot; version=&quot;14.1.8&quot; type=&quot;device&quot;&gt;&lt;diagram id=&quot;8w40hpb1-UDYxj1ewOsN&quot; name=&quot;Page-1&quot;&gt;7Vxbd6M4Ev41OWfnoX0Qkrg8Jul09+zOTGc6vSfTjwRkm22MvCB37Pn1IwGydcExsTGO03HOiUEgLqqqr6o+lXwBr2fLj0U0n/5OE5JduE6yvIDvL1wXAMflX6Jl1bQ4jlO3TIo0qduUhrv0byJPbFoXaULKpq1uYpRmLJ3rjTHNcxIzrS0qCvqonzamWaI1zKMJsRru4iizW+/ThE3li3nh5sAnkk6m8tYeRvWRWSTPbl6lnEYJfVSa4M0FvC4oZfXWbHlNMjF8+sB82HJ0/WQFyVmXDn/hCPvfPk++/Ocm+HyPr5Z/TsfvvOYyP6Js0bzyhetl/IJXU37Im4itu3Q25wPiOv8tScG/CvL/BSmZPJHfcnNu865sJUewoIs8IeIZHH74cZoycjePYnH0kSuNuBGbZXwP8M0xzdmHaJZmQl+u6aJIqxv+Qfi4XUVZOsl5e0bG4k4/SMFSLqnLppnR+fr+6qjIN+Snk6XS1IzSR0JnhBUrfkpzFGNcd2mUFrmNFj+qGhA2GjBVpB9ItY0atZusL74RDN9oZPMMOYHQGliScE1tdmnBpnRC8yi72bRe6UO/Oec3KoaqGvD/EcZWjdlFC0Z1cZBlyv4S3Ucubna/KYfeL5tLVzsruZPzF1Z7if1v6sFNv2pPdtSE/2/CHooozUv+0r/TnD4l2pIrSkyeGr0GOaJiQtgT50lTFUP7pKYUJItY+kMHiTahN11vacqfea1hoedpGgYc2SCvUT9q083QnfVz7K9Orm32H8XrcNgS/8pVHgt9p8X3yv7+dfv5ln/ROSn4a9P8l1Zl/C164PCvKZA02ZiLjF/JNtpZmiS1rpIy/Tt6qK4nlGEuXr0aDHx1gd/vpR5PWpKFB2sn0jyFBtNtOPHOGTke0iUZ1HsH6sc7AJ2RDkL8ZsB1lA80VYaOxyU5jrZgS1vgmwbUQnE939e9RT8agAz5hyMHDSZv/7TOxledjdPV2fiaswEv3tnAvn3NQQGGRC7FxvEZ27hU4cNtnJs4QlDHYtiPjUPDxvEogEPZOHTsCEDcae3030RfwTvwdNm7vYgemaJ/B9DIw0MJ347+7iupi8G+j1g8bVGA5+VxaZZd04wWVV84HhMvjnl7yQr6nShHEj98cA7C4+55XQgNlwqbCErN63BjF2pa5x0tqwPWMA/qaPdJ6rSMbrT2uYO7WbejmwV9u9mq62VRRCvlhAbNthp8YOpeE71vtKe+Yr9m7lp2fpllNI6YleSVZwz4YBsS7AP4GHlHiecD3Y0gA1KOGMxDSwnaUecnFDaAGGhi6SmwQ7pvxyPgDyZuS9q/5ilLuSS5N+jTpycRCcatPt2LA/IwHsan+4GOq4HT4tOdFp9uGmB/Pr09bXpjajt5dXgqr36YzD3L6lQ69Y572vtzT6ukXvfEmjm6P/T7AV7fzKu8keerrKk7GA4jSyXcN+nX0oc8/NWl1I/0PYsz9Qfzu/CksA8UzFf40+flcv7pkrmusA/dF4X70MZ974yNHPZm5Dy0xrgnqwZoBAejRVE7SJ/V1Acf/B2GXO3dkiLlwyV076VYN+rbuvfiaoADA82PyKTiQ9cOPtrVwceHdcAONtT/CPwRaqEOZDHTOeKbNO4+8M1xJROxL74NAGZ2DHpH8qR65nJO81IkJxyL+L+p2FyUZ52h9CfeutjCmKHphwj0Xf2qbjAabNrHNuebJYn5pcTXfcqm11wQZMksHTh/nsic+2nlidrmfo7GE21JB88q0jhdkYWs1DuT6R/gID1C4A27PP6OHsdx+a6d0aAzdgnSyPrw+BAiA7t7mi1wDdoCgAHrQNzQEvivAgKjir4cF0R8recIryt0LETbii7EAxb2rMJPqh6uo08cyqmYA9UDADxCoakgCKmsJhhMW+yqoTsWxd8rR8r96Pbp5INCCEyCBLWFEIH7IOpgTxJCACc4df0ItI33tOsENBrS7UhDrqMItdtpaEi3Y1TRtXZzky46h9JhA6QHrqVNRn7wNZ0RuniN+QFwPd25tyYIshZwkAQB2umaNfDlNJqLzXFGlpdiuVtlLEmz+T7OorJMY8NYN9F8r5O++5SV7W+qOOzZVBU5t2G4bDvUmzuSQZPMHzCWcdRvbq38aSMdDQ7RGXYNEbTZJXN94BZtXcyyy5hRNQas4sVbWqZieRE/5YEyRmctQWK9ym/bksEjef7QWBAo1/4pOgMH9ft2sf6a2GuGXuX1tpR0V1TP0UJ0ESWUdcAwkJgAMDFcptAqhoeBLSizPKs3QSG70voLiUldG7IhYV+9YHyj2iN0bMHI9SbDCMZe0maJgV8mnZdEl4TEsJKjK+O5DyN2SFMnZW0hzXjcHBlg1D3PoDzlhMUO1ILHQi1kc0v2mO9MVhRZmEawdhpKCETnJK9bmuwleFIuL6/OXWrq7sXLHQOdQycwjNXxARo27EC+pUW32WKS5rxtXeparVJ+9bjqOUYsGUIbVuUvPAwCq7Kc+q0K6qj0g+QDX0gZFLYz1fYVxOfBCuNthrhXpSuSBnho5ojaS1kNtvn4NBE+7erwM6cYUWfeond/fpiN2ylncM423uP6cBA6ejlzP/OCAIhCaeWjzwfLepQByojsWYYtqxkOWiOMxV8rD1x9OsheHFf61Z+eUliMXtpCI2zzbzVdz990ldEoefXxL0BGjUQb4SOXGgwT/9q0whfy46fje5Br1MMGLUTcoHwPtjHs09ev4ket5k3yeBCU7QImA+qCmLT/HMJDwHHmyYmRZ7DW2FhkIxfDaIjVwoZCdCQheHb1gDXuL2lGa6sQdgZvuGuCdqJZJ+yH+jW6zzo54QgHys+Q6TpmTnkcmQyShL9KBlGadUBY1SRzmpNngW4bHOiAsUu1LMtfc8Z94K9jBCt+S6UKalE391iximczBB9pFuUT3rYllvzJRAbd3SJrjS+PJzM7lrn9JDxm5+nEVy0wLHmdJwQWoF7kxXc3PxNcQ+fm55bhzT8=&lt;/diagram&gt;&lt;/mxfile&gt;" style="background-color: rgb(255, 255, 255);"><defs/><g><rect x="0" y="0" width="1199" height="810" fill="#ffffff" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe flex-start; justify-content: unsafe flex-start; width: 1197px; height: 1px; padding-top: 7px; margin-left: 2px;"><div style="box-sizing: border-box; font-size: 0; text-align: left; "><div style="display: inline-block; font-size: 12px; font-family: Courier New; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; "><h1>Simple User request</h1></div></div></div></foreignObject><text x="2" y="19" fill="#000000" font-family="Courier New" font-size="12px">Simple User request</text></switch></g><path d="M 417.5 574 L 417.5 657.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 417.5 662.88 L 414 655.88 L 417.5 657.63 L 421 655.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 616px; margin-left: 296px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Give me sync worker (POP operation)</div></div></div></foreignObject><text x="296" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Give me sync worker (POP operation)</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 617px; margin-left: 418px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">3</div></div></div></foreignObject><text x="418" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">3</text></switch></g><path d="M 492.5 514 L 492.5 430.37" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 492.5 425.12 L 496 432.12 L 492.5 430.37 L 489 432.12 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 493px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">5</div></div></div></foreignObject><text x="493" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">5</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 465px; margin-left: 535px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Get worker</div></div></div></foreignObject><text x="535" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Get worker</text></switch></g><rect x="380" y="514" width="150" height="60" fill="#ffe6cc" stroke="#d79b00" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 544px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Workers Watcher</div></div></div></foreignObject><text x="455" y="548" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Workers Watcher</text></switch></g><path d="M 280 424 L 280 544 L 373.63 544" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 378.88 544 L 371.88 547.5 L 373.63 544 L 371.88 540.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 515px; margin-left: 202px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Allocate sync workers</div></div></div></foreignObject><text x="202" y="518" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Allocate sync workers</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 514px; margin-left: 280px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">1</div></div></div></foreignObject><text x="280" y="518" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">1</text></switch></g><rect x="230" y="384" width="100" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 404px; margin-left: 231px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Initialize</div></div></div></foreignObject><text x="280" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Initialize</text></switch></g><path d="M 417.5 424 L 417.5 507.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 417.5 512.88 L 414 505.88 L 417.5 507.63 L 421 505.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 352px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Give me SyncWorker</div></div></div></foreignObject><text x="352" y="467" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Give me SyncWorker</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 418px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">2</div></div></div></foreignObject><text x="418" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">2</text></switch></g><path d="M 530 414 L 700.63 414" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 705.88 414 L 698.88 417.5 L 700.63 414 L 698.88 410.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 415px; margin-left: 618px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">6</div></div></div></foreignObject><text x="618" y="418" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">6</text></switch></g><path d="M 492.5 384 L 483 384 L 483 324 L 520 324 L 520 83 L 468.87 83" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 463.62 83 L 470.62 79.5 L 468.87 83 L 470.62 86.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 224px; margin-left: 521px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">10</div></div></div></foreignObject><text x="521" y="227" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">10</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 225px; margin-left: 597px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Send response to the user</div></div></div></foreignObject><text x="597" y="228" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Send response to the user</text></switch></g><rect x="380" y="384" width="150" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 404px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Exec/ExecWithContext</div></div></div></foreignObject><text x="455" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec/ExecWithContext</text></switch></g><path d="M 492.5 664 L 492.5 624 L 492.5 580.37" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 492.5 575.12 L 496 582.12 L 492.5 580.37 L 489 582.12 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 616px; margin-left: 494px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">4</div></div></div></foreignObject><text x="494" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">4</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 617px; margin-left: 610px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">I have free workers, here you are</div></div></div></foreignObject><text x="610" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">I have free workers, here you are</text></switch></g><rect x="380" y="664" width="150" height="60" fill="#d5e8d4" stroke="#82b366" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 694px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Stack with workers</div></div></div></foreignObject><text x="455" y="698" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Stack with workers</text></switch></g><path d="M 707 394 L 536.37 394" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 531.12 394 L 538.12 390.5 L 536.37 394 L 538.12 397.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 394px; margin-left: 618px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">9</div></div></div></foreignObject><text x="618" y="397" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">9</text></switch></g><rect x="707" y="384" width="163" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 161px; height: 1px; padding-top: 404px; margin-left: 708px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Exec/ExecWithTimeout</div></div></div></foreignObject><text x="789" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec/ExecWithTimeout</text></switch></g><path d="M 450 289.5 L 460 289.5 L 460 364.5 L 470.5 364.5 L 455 383.5 L 439.5 364.5 L 450 364.5 Z" fill="none" stroke="#000000" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="all"/><ellipse cx="455" cy="84.5" rx="7.5" ry="7.5" fill="#ffffff" stroke="#000000" pointer-events="all"/><path d="M 455 92 L 455 117 M 455 97 L 440 97 M 455 97 L 470 97 M 455 117 L 440 137 M 455 117 L 470 137" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 144px; margin-left: 455px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">User request</div></div></div></foreignObject><text x="455" y="156" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">User...</text></switch></g><rect x="607" y="426" width="198" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 435px; margin-left: 706px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Send request to the worker</div></div></div></foreignObject><text x="706" y="438" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Send request to the worker</text></switch></g><rect x="618" y="368" width="125" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 377px; margin-left: 681px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Receive response</div></div></div></foreignObject><text x="681" y="380" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Receive response</text></switch></g><ellipse cx="125" cy="404" rx="11" ry="11" fill="#000000" stroke="#ff0000" pointer-events="all"/><path d="M 140 404 L 227.76 404" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 219.88 408.5 L 228.88 404 L 219.88 399.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="all"/><rect x="45" y="371" width="161" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 380px; margin-left: 126px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Plugin Initialization</div></div></div></foreignObject><text x="126" y="383" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Plugin Initialization</text></switch></g><path d="M 870 414 L 983.63 414" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 988.88 414 L 981.88 417.5 L 983.63 414 L 981.88 410.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 413px; margin-left: 930px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">7</div></div></div></foreignObject><text x="930" y="416" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">7</text></switch></g><path d="M 990 394 L 876.37 394" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 871.12 394 L 878.12 390.5 L 876.37 394 L 878.12 397.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 394px; margin-left: 931px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">8</div></div></div></foreignObject><text x="931" y="397" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">8</text></switch></g><rect x="990" y="384" width="100" height="40" fill="#f5f5f5" stroke="#666666" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 404px; margin-left: 991px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #333333; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Worker</div></div></div></foreignObject><text x="1040" y="408" fill="#333333" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Worker</text></switch></g><rect x="893" y="426" width="96" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 435px; margin-left: 941px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Exec payload</div></div></div></foreignObject><text x="941" y="438" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec payload</text></switch></g><rect x="873" y="366" width="125" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 375px; margin-left: 936px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Reveive response</div></div></div></foreignObject><text x="936" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Reveive response</text></switch></g><rect x="401" y="255" width="108" height="34" fill="#f8cecc" stroke="#b85450" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 106px; height: 1px; padding-top: 272px; margin-left: 402px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">HTTP plugin</div></div></div></foreignObject><text x="455" y="276" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">HTTP plugin</text></switch></g><path d="M 450 157.5 L 460 157.5 L 460 235.5 L 470.5 235.5 L 455 254.5 L 439.5 235.5 L 450 235.5 Z" fill="none" stroke="#000000" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="all"/><rect x="490" y="364" width="40" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 38px; height: 1px; padding-top: 374px; margin-left: 491px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Pool</div></div></div></foreignObject><text x="510" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Pool</text></switch></g><rect x="770" y="364" width="100" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 374px; margin-left: 771px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Golang Worker</div></div></div></foreignObject><text x="820" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Golang Worker</text></switch></g><rect x="1006" y="364" width="84" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 82px; height: 1px; padding-top: 374px; margin-left: 1007px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">PHP worker</div></div></div></foreignObject><text x="1048" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">PHP worker</text></switch></g></g><switch><g requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"/><a transform="translate(0,-5)" xlink:href="https://www.diagrams.net/doc/faq/svg-export-text-problems" target="_blank"><text text-anchor="middle" font-size="10px" x="50%" y="100%">Viewer does not support full SVG 1.1</text></a></switch></svg> \ No newline at end of file
diff --git a/pkg/events/general.go b/pkg/events/general.go
new file mode 100755
index 00000000..a09a8759
--- /dev/null
+++ b/pkg/events/general.go
@@ -0,0 +1,39 @@
+package events
+
+import (
+ "sync"
+)
+
+// HandlerImpl helps to broadcast events to multiple listeners.
+type HandlerImpl struct {
+ listeners []Listener
+ sync.RWMutex // all receivers should be pointers
+}
+
+func NewEventsHandler() Handler {
+ return &HandlerImpl{listeners: make([]Listener, 0, 2)}
+}
+
+// NumListeners returns number of event listeners.
+func (eb *HandlerImpl) NumListeners() int {
+ eb.Lock()
+ defer eb.Unlock()
+ return len(eb.listeners)
+}
+
+// AddListener registers new event listener.
+func (eb *HandlerImpl) AddListener(listener Listener) {
+ eb.Lock()
+ defer eb.Unlock()
+ eb.listeners = append(eb.listeners, listener)
+}
+
+// Push broadcast events across all event listeners.
+func (eb *HandlerImpl) Push(e interface{}) {
+ // ReadLock here because we are not changing listeners
+ eb.RLock()
+ defer eb.RUnlock()
+ for k := range eb.listeners {
+ eb.listeners[k](e)
+ }
+}
diff --git a/pkg/events/interface.go b/pkg/events/interface.go
new file mode 100644
index 00000000..ac6c15a4
--- /dev/null
+++ b/pkg/events/interface.go
@@ -0,0 +1,14 @@
+package events
+
+// Handler interface
+type Handler interface {
+ // Return number of active listeners
+ NumListeners() int
+ // AddListener adds lister to the publisher
+ AddListener(listener Listener)
+ // Push pushes event to the listeners
+ Push(e interface{})
+}
+
+// Event listener listens for the events produced by worker, worker pool or other service.
+type Listener func(event interface{})
diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go
new file mode 100644
index 00000000..3925df56
--- /dev/null
+++ b/pkg/events/pool_events.go
@@ -0,0 +1,66 @@
+package events
+
+// TODO event numbers
+const (
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct P = iota + 10000
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventPoolError caused on pool wide errors.
+ EventPoolError
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
+ EventTTL
+
+ // EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+)
+
+type P int64
+
+func (ev P) String() string {
+ switch ev {
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventPoolError:
+ return "EventPoolError"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ }
+ return "Unknown event type"
+}
+
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event P
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+}
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go
new file mode 100644
index 00000000..9d428f7d
--- /dev/null
+++ b/pkg/events/worker_events.go
@@ -0,0 +1,33 @@
+package events
+
+const (
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError W = iota + 11000
+
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+)
+
+type W int64
+
+func (ev W) String() string {
+ switch ev {
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ }
+ return "Unknown event type"
+}
+
+// WorkerEvent wraps worker events.
+type WorkerEvent struct {
+ // Event id, see below.
+ Event W
+
+ // Worker triggered the event.
+ Worker interface{}
+
+ // Event specific payload.
+ Payload interface{}
+}
diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go
new file mode 100755
index 00000000..fac36852
--- /dev/null
+++ b/pkg/payload/payload.go
@@ -0,0 +1,16 @@
+package payload
+
+// Payload carries binary header and body to stack and
+// back to the server.
+type Payload struct {
+ // Context represent payload context, might be omitted.
+ Context []byte
+
+ // body contains binary payload to be processed by WorkerProcess.
+ Body []byte
+}
+
+// String returns payload body as string
+func (p *Payload) String() string {
+ return string(p.Body)
+}
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
new file mode 100644
index 00000000..782f7ce9
--- /dev/null
+++ b/pkg/pool/config.go
@@ -0,0 +1,75 @@
+package pool
+
+import (
+ "runtime"
+ "time"
+)
+
+// Configures the pool behaviour.
+type Config struct {
+ // Debug flag creates new fresh worker before every request.
+ Debug bool
+
+ // NumWorkers defines how many sub-processes can be run at once. This value
+ // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
+ NumWorkers uint64 `mapstructure:"num_workers"`
+
+ // MaxJobs defines how many executions is allowed for the worker until
+ // it's destruction. set 1 to create new process for each new task, 0 to let
+ // worker handle as many tasks as it can.
+ MaxJobs uint64 `mapstructure:"max_jobs"`
+
+ // AllocateTimeout defines for how long pool will be waiting for a worker to
+ // be freed to handle the task. Defaults to 60s.
+ AllocateTimeout time.Duration `mapstructure:"allocate_timeout"`
+
+ // DestroyTimeout defines for how long pool should be waiting for worker to
+ // properly destroy, if timeout reached worker will be killed. Defaults to 60s.
+ DestroyTimeout time.Duration `mapstructure:"destroy_timeout"`
+
+ // Supervision config to limit worker and pool memory usage.
+ Supervisor *SupervisorConfig `mapstructure:"supervisor"`
+}
+
+// InitDefaults enables default config values.
+func (cfg *Config) InitDefaults() {
+ if cfg.NumWorkers == 0 {
+ cfg.NumWorkers = uint64(runtime.NumCPU())
+ }
+
+ if cfg.AllocateTimeout == 0 {
+ cfg.AllocateTimeout = time.Minute
+ }
+
+ if cfg.DestroyTimeout == 0 {
+ cfg.DestroyTimeout = time.Minute
+ }
+ if cfg.Supervisor == nil {
+ return
+ }
+ cfg.Supervisor.InitDefaults()
+}
+
+type SupervisorConfig struct {
+ // WatchTick defines how often to check the state of worker.
+ WatchTick time.Duration `mapstructure:"watch_tick"`
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL time.Duration `mapstructure:"ttl"`
+
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
+ IdleTTL time.Duration `mapstructure:"idle_ttl"`
+
+ // ExecTTL defines maximum lifetime per job.
+ ExecTTL time.Duration `mapstructure:"exec_ttl"`
+
+ // MaxWorkerMemory limits memory per worker.
+ MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
+}
+
+// InitDefaults enables default config values.
+func (cfg *SupervisorConfig) InitDefaults() {
+ if cfg.WatchTick == 0 {
+ cfg.WatchTick = time.Second
+ }
+}
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
new file mode 100644
index 00000000..4f7ae595
--- /dev/null
+++ b/pkg/pool/interface.go
@@ -0,0 +1,29 @@
+package pool
+
+import (
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+// Pool managed set of inner worker processes.
+type Pool interface {
+ // GetConfig returns pool configuration.
+ GetConfig() interface{}
+
+ // Exec executes task with payload
+ Exec(rqs payload.Payload) (payload.Payload, error)
+
+ // ExecWithContext executes task with context which is used with timeout
+ ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
+
+ // Workers returns worker list associated with the pool.
+ Workers() (workers []worker.SyncWorker)
+
+ // Remove worker from the pool.
+ RemoveWorker(worker worker.SyncWorker) error
+
+ // Destroy all underlying stack (but let them to complete the task).
+ Destroy(ctx context.Context)
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
new file mode 100755
index 00000000..44adf9c0
--- /dev/null
+++ b/pkg/pool/static_pool.go
@@ -0,0 +1,327 @@
+package pool
+
+import (
+ "context"
+ "os/exec"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/transport"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
+)
+
+// StopRequest can be sent by worker to indicate that restart is required.
+const StopRequest = "{\"stop\":true}"
+
+// ErrorEncoder encode error or make a decision based on the error type
+type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error)
+
+type Options func(p *StaticPool)
+
+type Command func() *exec.Cmd
+
+// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
+type StaticPool struct {
+ cfg Config
+
+ // worker command creator
+ cmd Command
+
+ // creates and connects to stack
+ factory transport.Factory
+
+ // distributes the events
+ events events.Handler
+
+ // saved list of event listeners
+ listeners []events.Listener
+
+ // manages worker states and TTLs
+ ww workerWatcher.Watcher
+
+ // allocate new worker
+ allocator worker.Allocator
+
+ // errEncoder is the default Exec error encoder
+ errEncoder ErrorEncoder
+}
+
+// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) {
+ const op = errors.Op("static_pool_initialize")
+ if factory == nil {
+ return nil, errors.E(op, errors.Str("no factory initialized"))
+ }
+ cfg.InitDefaults()
+
+ if cfg.Debug {
+ cfg.NumWorkers = 0
+ cfg.MaxJobs = 1
+ }
+
+ p := &StaticPool{
+ cfg: cfg,
+ cmd: cmd,
+ factory: factory,
+ events: events.NewEventsHandler(),
+ }
+
+ // add pool options
+ for i := 0; i < len(options); i++ {
+ options[i](p)
+ }
+
+ p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+
+ workers, err := p.allocateWorkers(p.cfg.NumWorkers)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // put stack in the pool
+ err = p.ww.AddToWatch(workers)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ p.errEncoder = defaultErrEncoder(p)
+
+ // if supervised config not nil, guess, that pool wanted to be supervised
+ if cfg.Supervisor != nil {
+ sp := supervisorWrapper(p, p.events, p.cfg.Supervisor)
+ // start watcher timer
+ sp.Start()
+ return sp, nil
+ }
+
+ return p, nil
+}
+
+func AddListeners(listeners ...events.Listener) Options {
+ return func(p *StaticPool) {
+ p.listeners = listeners
+ for i := 0; i < len(listeners); i++ {
+ p.addListener(listeners[i])
+ }
+ }
+}
+
+// AddListener connects event listener to the pool.
+func (sp *StaticPool) addListener(listener events.Listener) {
+ sp.events.AddListener(listener)
+}
+
+// Config returns associated pool configuration. Immutable.
+func (sp *StaticPool) GetConfig() interface{} {
+ return sp.cfg
+}
+
+// Workers returns worker list associated with the pool.
+func (sp *StaticPool) Workers() (workers []worker.SyncWorker) {
+ return sp.ww.WorkersList()
+}
+
+func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error {
+ return sp.ww.RemoveWorker(wb)
+}
+
+// Be careful, sync Exec with ExecWithContext
+func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("static_pool_exec")
+ if sp.cfg.Debug {
+ return sp.execDebug(p)
+ }
+ ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
+
+ rsp, err := w.Exec(p)
+ if err != nil {
+ return sp.errEncoder(err, w)
+ }
+
+ // worker want's to be terminated
+ // TODO careful with string(rsp.Context)
+ if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ sp.stopWorker(w)
+
+ return sp.Exec(p)
+ }
+
+ err = sp.checkMaxJobs(w)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
+
+ return rsp, nil
+}
+
+// Be careful, sync with pool.Exec method
+func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("static_pool_exec_with_context")
+ ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
+
+ rsp, err := w.ExecWithTimeout(ctx, p)
+ if err != nil {
+ return sp.errEncoder(err, w)
+ }
+
+ // worker want's to be terminated
+ if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ sp.stopWorker(w)
+ return sp.ExecWithContext(ctx, p)
+ }
+
+ err = sp.checkMaxJobs(w)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
+
+ return rsp, nil
+}
+
+func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
+ const op = errors.Op("static_pool_stop_worker")
+ w.State().Set(internal.StateInvalid)
+ err := w.Stop()
+ if err != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ }
+}
+
+// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
+func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
+ const op = errors.Op("static_pool_check_max_jobs")
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err := sp.ww.AllocateNew()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ } else {
+ sp.ww.PushWorker(w)
+ }
+ return nil
+}
+
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
+ // GetFreeWorker function consumes context with timeout
+ w, err := sp.ww.GetFreeWorker(ctxGetFree)
+ if err != nil {
+ // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
+ if errors.Is(errors.NoFreeWorkers, err) {
+ sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)})
+ return nil, errors.E(op, err)
+ }
+ // else if err not nil - return error
+ return nil, errors.E(op, err)
+ }
+ return w, nil
+}
+
+// Destroy all underlying stack (but let them to complete the task).
+func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.ww.Destroy(ctx)
+}
+
+func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
+ return func(err error, w worker.SyncWorker) (payload.Payload, error) {
+ const op = errors.Op("error encoder")
+ // just push event if on any stage was timeout error
+ if errors.Is(errors.ExecTTL, err) {
+ sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
+ }
+ // soft job errors are allowed
+ if errors.Is(errors.SoftJob, err) {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew()
+ if err != nil {
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
+ }
+
+ w.State().Set(internal.StateInvalid)
+ err = w.Stop()
+ if err != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ }
+ } else {
+ sp.ww.PushWorker(w)
+ }
+
+ return payload.Payload{}, errors.E(op, err)
+ }
+
+ w.State().Set(internal.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ errS := w.Stop()
+
+ if errS != nil {
+ return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
+ }
+
+ return payload.Payload{}, errors.E(op, err)
+ }
+}
+
+func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
+ return func() (*worker.SyncWorkerImpl, error) {
+ ctx, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
+ if err != nil {
+ return nil, err
+ }
+
+ sw := worker.From(w)
+
+ sp.events.Push(events.PoolEvent{
+ Event: events.EventWorkerConstruct,
+ Payload: sw,
+ })
+ return sw, nil
+ }
+}
+
+func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
+ sw, err := sp.allocator()
+ if err != nil {
+ return payload.Payload{}, err
+ }
+
+ r, err := sw.Exec(p)
+
+ if stopErr := sw.Stop(); stopErr != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ }
+
+ return r, err
+}
+
+// allocate required number of stack
+func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) {
+ const op = errors.Op("allocate workers")
+ var workers []worker.SyncWorker
+
+ // constant number of stack simplify logic
+ for i := uint64(0); i < numWorkers; i++ {
+ w, err := sp.allocator()
+ if err != nil {
+ return nil, errors.E(op, errors.WorkerAllocate, err)
+ }
+
+ workers = append(workers, w)
+ }
+ return workers, nil
+}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
new file mode 100755
index 00000000..a32790e0
--- /dev/null
+++ b/pkg/pool/static_pool_test.go
@@ -0,0 +1,647 @@
+package pool
+
+import (
+ "context"
+ "log"
+ "os/exec"
+ "runtime"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/stretchr/testify/assert"
+)
+
+var cfg = Config{
+ NumWorkers: uint64(runtime.NumCPU()),
+ AllocateTimeout: time.Second * 5,
+ DestroyTimeout: time.Second * 5,
+}
+
+func Test_NewPool(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+}
+
+func Test_StaticPool_Invalid(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+
+ assert.Nil(t, p)
+ assert.Error(t, err)
+}
+
+func Test_ConfigNoErrorInitDefaults(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+}
+
+func Test_StaticPool_Echo(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_StaticPool_Echo_NilContext(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_StaticPool_Echo_Context(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.Empty(t, res.Body)
+ assert.NotNil(t, res.Context)
+
+ assert.Equal(t, "world", string(res.Context))
+}
+
+func Test_StaticPool_JobError(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ if errors.Is(errors.SoftJob, err) == false {
+ t.Fatal("error should be of type errors.Exec")
+ }
+
+ assert.Contains(t, err.Error(), "hello")
+}
+
+func Test_StaticPool_Broken_Replace(t *testing.T) {
+ ctx := context.Background()
+ block := make(chan struct{}, 1)
+
+ listener := func(event interface{}) {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ if wev.Event == events.EventWorkerLog {
+ e := string(wev.Payload.([]byte))
+ if strings.ContainsAny(e, "undefined_function()") {
+ block <- struct{}{}
+ return
+ }
+ }
+ }
+ }
+
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ time.Sleep(time.Second)
+ res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+
+ <-block
+
+ p.Destroy(ctx)
+}
+
+func Test_StaticPool_Broken_FromOutside(t *testing.T) {
+ ctx := context.Background()
+ // Consume pool events
+ ev := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if pe, ok := event.(events.PoolEvent); ok {
+ if pe.Event == events.EventWorkerConstruct {
+ ev <- struct{}{}
+ }
+ }
+ }
+
+ var cfg = Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second * 5,
+ DestroyTimeout: time.Second * 5,
+ }
+
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+ assert.Equal(t, 1, len(p.Workers()))
+
+ // first creation
+ <-ev
+ // killing random worker and expecting pool to replace it
+ err = p.Workers()[0].Kill()
+ if err != nil {
+ t.Errorf("error killing the process: error %v", err)
+ }
+
+ // re-creation
+ <-ev
+
+ list := p.Workers()
+ for _, w := range list {
+ assert.Equal(t, internal.StateReady, w.State().Value())
+ }
+}
+
+func Test_StaticPool_AllocateTimeout(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Nanosecond * 1,
+ DestroyTimeout: time.Second * 2,
+ },
+ )
+ assert.Error(t, err)
+ if !errors.Is(errors.WorkerAllocate, err) {
+ t.Fatal("error should be of type WorkerAllocate")
+ }
+ assert.Nil(t, p)
+}
+
+func Test_StaticPool_Replace_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ MaxJobs: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ var lastPID string
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
+
+ res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ assert.Equal(t, lastPID, string(res.Body))
+
+ for i := 0; i < 10; i++ {
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+func Test_StaticPool_Debug_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ Debug: true,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ assert.Len(t, p.Workers(), 0)
+
+ var lastPID string
+ res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ assert.NotEqual(t, lastPID, string(res.Body))
+
+ assert.Len(t, p.Workers(), 0)
+
+ for i := 0; i < 10; i++ {
+ assert.Len(t, p.Workers(), 0)
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+// identical to replace but controlled on worker side
+func Test_StaticPool_Stop_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ var lastPID string
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, lastPID, string(res.Body))
+
+ for i := 0; i < 10; i++ {
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ p.Destroy(ctx)
+ _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ assert.Error(t, err)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ go func() {
+ _, err := p.Exec(payload.Payload{Body: []byte("100")})
+ if err != nil {
+ t.Errorf("error executing payload: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ p.Destroy(ctx)
+ _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ assert.Error(t, err)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Handle_Dead(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ for i := range p.Workers() {
+ p.Workers()[i].State().Set(internal.StateErrored)
+ }
+
+ _, err = p.Exec(payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ p.Destroy(ctx)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Slow_Destroy(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ p.Destroy(context.Background())
+}
+
+func Test_StaticPool_NoFreeWorkers(t *testing.T) {
+ ctx := context.Background()
+ block := make(chan struct{}, 1)
+
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.PoolEvent); ok {
+ if ev.Event == events.EventNoFreeWorkers {
+ block <- struct{}{}
+ }
+ }
+ }
+
+ p, err := Initialize(
+ ctx,
+ // sleep for the 3 seconds
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ Debug: false,
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: nil,
+ },
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ go func() {
+ _, _ = p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ }()
+
+ time.Sleep(time.Second)
+ res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+
+ <-block
+
+ p.Destroy(ctx)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_WrongCommand1(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.Error(t, err)
+ assert.Nil(t, p)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_WrongCommand2(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.Error(t, err)
+ assert.Nil(t, p)
+}
+
+func Benchmark_Pool_Echo(b *testing.B) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+ for n := 0; n < b.N; n++ {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+//
+func Benchmark_Pool_Echo_Batched(b *testing.B) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: uint64(runtime.NumCPU()),
+ AllocateTimeout: time.Second * 100,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(b, err)
+ defer p.Destroy(ctx)
+
+ var wg sync.WaitGroup
+ for i := 0; i < b.N; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ log.Println(err)
+ }
+ }()
+ }
+
+ wg.Wait()
+}
+
+//
+func Benchmark_Pool_Echo_Replaced(b *testing.B) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ MaxJobs: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(b, err)
+ defer p.Destroy(ctx)
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ log.Println(err)
+ }
+ }
+}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
new file mode 100755
index 00000000..2597b352
--- /dev/null
+++ b/pkg/pool/supervisor_pool.go
@@ -0,0 +1,222 @@
+package pool
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/tools"
+)
+
+const MB = 1024 * 1024
+
+// NSEC_IN_SEC nanoseconds in second
+const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
+
+type Supervised interface {
+ Pool
+ // Start used to start watching process for all pool workers
+ Start()
+}
+
+type supervised struct {
+ cfg *SupervisorConfig
+ events events.Handler
+ pool Pool
+ stopCh chan struct{}
+ mu *sync.RWMutex
+}
+
+func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+ sp := &supervised{
+ cfg: cfg,
+ events: events,
+ pool: pool,
+ mu: &sync.RWMutex{},
+ stopCh: make(chan struct{}),
+ }
+
+ return sp
+}
+
+type ttlExec struct {
+ err error
+ p payload.Payload
+}
+
+func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("supervised_exec_with_context")
+ if sp.cfg.ExecTTL == 0 {
+ return sp.pool.Exec(rqs)
+ }
+
+ c := make(chan ttlExec, 1)
+ ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL)
+ defer cancel()
+ go func() {
+ res, err := sp.pool.ExecWithContext(ctx, rqs)
+ if err != nil {
+ c <- ttlExec{
+ err: errors.E(op, err),
+ p: payload.Payload{},
+ }
+ }
+
+ c <- ttlExec{
+ err: nil,
+ p: res,
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return payload.Payload{}, res.err
+ }
+
+ return res.p, nil
+ }
+ }
+}
+
+func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("supervised_exec")
+ rsp, err := sp.pool.Exec(p)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
+ return rsp, nil
+}
+
+func (sp *supervised) GetConfig() interface{} {
+ return sp.pool.GetConfig()
+}
+
+func (sp *supervised) Workers() (workers []worker.SyncWorker) {
+ sp.mu.Lock()
+ defer sp.mu.Unlock()
+ return sp.pool.Workers()
+}
+
+func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error {
+ return sp.pool.RemoveWorker(worker)
+}
+
+func (sp *supervised) Destroy(ctx context.Context) {
+ sp.pool.Destroy(ctx)
+}
+
+func (sp *supervised) Start() {
+ go func() {
+ watchTout := time.NewTicker(sp.cfg.WatchTick)
+ for {
+ select {
+ case <-sp.stopCh:
+ watchTout.Stop()
+ return
+ // stop here
+ case <-watchTout.C:
+ sp.mu.Lock()
+ sp.control()
+ sp.mu.Unlock()
+ }
+ }
+ }()
+}
+
+func (sp *supervised) Stop() {
+ sp.stopCh <- struct{}{}
+}
+
+func (sp *supervised) control() {
+ now := time.Now()
+ const op = errors.Op("supervised_pool_control_tick")
+
+ // THIS IS A COPY OF WORKERS
+ workers := sp.pool.Workers()
+
+ for i := 0; i < len(workers); i++ {
+ if workers[i].State().Value() == internal.StateInvalid {
+ continue
+ }
+
+ s, err := tools.WorkerProcessState(workers[i])
+ if err != nil {
+ // worker not longer valid for supervision
+ continue
+ }
+
+ if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
+ continue
+ }
+
+ if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
+ continue
+ }
+
+ // firs we check maxWorker idle
+ if sp.cfg.IdleTTL != 0 {
+ // then check for the worker state
+ if workers[i].State().Value() != internal.StateReady {
+ continue
+ }
+
+ /*
+ Calculate idle time
+ If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
+ 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
+ we are guessing that worker overlap idle time and has to be killed
+ */
+
+ // 1610530005534416045 lu
+ // lu - now = -7811150814 - nanoseconds
+ // 7.8 seconds
+ // get last used unix nano
+ lu := workers[i].State().LastUsed()
+ // worker not used, skip
+ if lu == 0 {
+ continue
+ }
+
+ // convert last used to unixNano and sub time.now to seconds
+ // negative number, because lu always in the past, except for the `back to the future` :)
+ res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1
+
+ // maxWorkerIdle more than diff between now and last used
+ // for example:
+ // After exec worker goes to the rest
+ // And resting for the 5 seconds
+ // IdleTTL is 1 second.
+ // After the control check, res will be 5, idle is 1
+ // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
+ if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
+ }
+ }
+ }
+}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
new file mode 100644
index 00000000..c67d5d91
--- /dev/null
+++ b/pkg/pool/supervisor_test.go
@@ -0,0 +1,248 @@
+package pool
+
+import (
+ "context"
+ "os/exec"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/tools"
+ "github.com/stretchr/testify/assert"
+)
+
+var cfgSupervised = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 100 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+}
+
+func TestSupervisedPool_Exec(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgSupervised,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ stopCh := make(chan struct{})
+ defer p.Destroy(context.Background())
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ workers := p.Workers()
+ if len(workers) > 0 {
+ s, err := tools.WorkerProcessState(workers[0])
+ assert.NoError(t, err)
+ assert.NotNil(t, s)
+ // since this is soft limit, double max memory limit watch
+ if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 {
+ assert.Fail(t, "max memory reached")
+ }
+ }
+ }
+ }
+ }()
+
+ for i := 0; i < 100; i++ {
+ time.Sleep(time.Millisecond * 50)
+ _, err = p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+ assert.NoError(t, err)
+ }
+
+ stopCh <- struct{}{}
+}
+
+func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 1 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.ExecWithContext(context.Background(), payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.Error(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 1)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+}
+
+func TestSupervisedPool_Idle(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 1 * time.Second,
+ ExecTTL: 100 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.ExecWithContext(context.Background(), payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.Nil(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 5)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ p.Destroy(context.Background())
+}
+
+func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ time.Sleep(time.Millisecond * 100)
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 1)
+ // should be the same pid
+ assert.Equal(t, pid, p.Workers()[0].Pid())
+}
+
+func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
+ MaxWorkerMemory: 1,
+ },
+ }
+
+ block := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.PoolEvent); ok {
+ if ev.Event == events.EventMaxMemory {
+ block <- struct{}{}
+ }
+ }
+ }
+
+ // constructed
+ // max memory
+ // constructed
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ AddListeners(listener),
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ <-block
+ p.Destroy(context.Background())
+}
diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go
new file mode 100644
index 00000000..299ac95f
--- /dev/null
+++ b/pkg/transport/interface.go
@@ -0,0 +1,21 @@
+package transport
+
+import (
+ "context"
+ "os/exec"
+
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+// Factory is responsible of wrapping given command into tasks WorkerProcess.
+type Factory interface {
+ // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
+ // Process must not be started.
+ SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error)
+ // SpawnWorker creates new WorkerProcess process based on given command.
+ // Process must not be started.
+ SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error)
+ // Close the factory and underlying connections.
+ Close() error
+}
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
new file mode 100755
index 00000000..dd7c5841
--- /dev/null
+++ b/pkg/transport/pipe/pipe_factory.go
@@ -0,0 +1,162 @@
+package pipe
+
+import (
+ "context"
+ "os/exec"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/pkg/pipe"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "go.uber.org/multierr"
+)
+
+// Factory connects to stack using standard
+// streams (STDIN, STDOUT pipes).
+type Factory struct{}
+
+// NewPipeFactory returns new factory instance and starts
+// listening
+func NewPipeFactory() *Factory {
+ return &Factory{}
+}
+
+type SpawnResult struct {
+ w *worker.Process
+ err error
+}
+
+// SpawnWorker creates new Process and connects it to goridge relay,
+// method Wait() must be handled on level above.
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
+ c := make(chan SpawnResult)
+ const op = errors.Op("factory_spawn_worker_with_timeout")
+ go func() {
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+ if err != nil {
+ c <- SpawnResult{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ // TODO why out is in?
+ in, err := cmd.StdoutPipe()
+ if err != nil {
+ c <- SpawnResult{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ // TODO why in is out?
+ out, err := cmd.StdinPipe()
+ if err != nil {
+ c <- SpawnResult{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ // Init new PIPE relay
+ relay := pipe.NewPipeRelay(in, out)
+ w.AttachRelay(relay)
+
+ // Start the worker
+ err = w.Start()
+ if err != nil {
+ c <- SpawnResult{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ // errors bundle
+ pid, err := internal.FetchPID(relay)
+ if pid != w.Pid() || err != nil {
+ err = multierr.Combine(
+ err,
+ w.Kill(),
+ w.Wait(),
+ )
+ c <- SpawnResult{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ // everything ok, set ready state
+ w.State().Set(internal.StateReady)
+
+ // return worker
+ c <- SpawnResult{
+ w: w,
+ err: nil,
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case res := <-c:
+ if res.err != nil {
+ return nil, res.err
+ }
+ return res.w, nil
+ }
+}
+
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
+ const op = errors.Op("factory_spawn_worker")
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // TODO why out is in?
+ in, err := cmd.StdoutPipe()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // TODO why in is out?
+ out, err := cmd.StdinPipe()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // Init new PIPE relay
+ relay := pipe.NewPipeRelay(in, out)
+ w.AttachRelay(relay)
+
+ // Start the worker
+ err = w.Start()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // errors bundle
+ if pid, err := internal.FetchPID(relay); pid != w.Pid() {
+ err = multierr.Combine(
+ err,
+ w.Kill(),
+ w.Wait(),
+ )
+ return nil, errors.E(op, err)
+ }
+
+ // everything ok, set ready state
+ w.State().Set(internal.StateReady)
+ return w, nil
+}
+
+// Close the factory.
+func (f *Factory) Close() error {
+ return nil
+}
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
new file mode 100644
index 00000000..d4949c82
--- /dev/null
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -0,0 +1,456 @@
+package pipe
+
+import (
+ "os/exec"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_GetState2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ assert.Equal(t, internal.StateStopped, w.State().Value())
+ }()
+
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ assert.Equal(t, internal.StateReady, w.State().Value())
+ assert.NoError(t, w.Stop())
+}
+
+func Test_Kill2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ assert.Error(t, w.Wait())
+ assert.Equal(t, internal.StateErrored, w.State().Value())
+ }()
+
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ assert.Equal(t, internal.StateReady, w.State().Value())
+ err = w.Kill()
+ if err != nil {
+ t.Errorf("error killing the Process: error %v", err)
+ }
+ wg.Wait()
+}
+
+func Test_Pipe_Start2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ assert.NoError(t, w.Stop())
+}
+
+func Test_Pipe_StartError2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ err := cmd.Start()
+ if err != nil {
+ t.Errorf("error running the command: error %v", err)
+ }
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_PipeError3(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ _, err := cmd.StdinPipe()
+ if err != nil {
+ t.Errorf("error creating the STDIN pipe: error %v", err)
+ }
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_PipeError4(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ _, err := cmd.StdinPipe()
+ if err != nil {
+ t.Errorf("error creating the STDIN pipe: error %v", err)
+ }
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_Failboot2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/failboot.php")
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+
+ assert.Nil(t, w)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "failboot")
+}
+
+func Test_Pipe_Invalid2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/invalid.php")
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_Echo2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Pipe_Broken2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.Error(t, err)
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+}
+
+func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
+ f := NewPipeFactory()
+ for n := 0; n < b.N; n++ {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ w, _ := f.SpawnWorker(cmd)
+ go func() {
+ if w.Wait() != nil {
+ b.Fail()
+ }
+ }()
+
+ err := w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
+ }
+}
+
+func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ sw := worker.From(w)
+
+ b.ReportAllocs()
+ b.ResetTimer()
+ go func() {
+ err := w.Wait()
+ if err != nil {
+ b.Errorf("error waiting the worker: error %v", err)
+ }
+ }()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Test_Echo2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sw := worker.From(w)
+
+ go func() {
+ assert.NoError(t, sw.Wait())
+ }()
+ defer func() {
+ err := sw.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.Nil(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_BadPayload2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+
+ sw := worker.From(w)
+
+ go func() {
+ assert.NoError(t, sw.Wait())
+ }()
+ defer func() {
+ err := sw.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ res, err := sw.Exec(payload.Payload{})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Contains(t, err.Error(), "payload can not be empty")
+}
+
+func Test_String2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "ready")
+ assert.Contains(t, w.String(), "numExecs: 0")
+}
+
+func Test_Echo_Slow2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10")
+
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.Nil(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Broken2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
+ data := ""
+ mu := &sync.Mutex{}
+ listener := func(event interface{}) {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ mu.Lock()
+ data = string(wev.Payload.([]byte))
+ mu.Unlock()
+ }
+ }
+
+ w, err := NewPipeFactory().SpawnWorker(cmd, listener)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ assert.NotNil(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ time.Sleep(time.Second * 3)
+ mu.Lock()
+ if strings.ContainsAny(data, "undefined_function()") == false {
+ t.Fail()
+ }
+ mu.Unlock()
+ assert.Error(t, w.Stop())
+}
+
+func Test_Error2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ assert.NotNil(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ if errors.Is(errors.SoftJob, err) == false {
+ t.Fatal("error should be of type errors.ErrSoftJob")
+ }
+ assert.Contains(t, err.Error(), "hello")
+}
+
+func Test_NumExecs2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, uint64(1), w.State().NumExecs())
+
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, uint64(2), w.State().NumExecs())
+
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, uint64(3), w.State().NumExecs())
+}
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
new file mode 100755
index 00000000..38166b85
--- /dev/null
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -0,0 +1,478 @@
+package pipe
+
+import (
+ "context"
+ "os/exec"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_GetState(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ assert.Equal(t, internal.StateStopped, w.State().Value())
+ }()
+
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ assert.Equal(t, internal.StateReady, w.State().Value())
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+}
+
+func Test_Kill(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ assert.Error(t, w.Wait())
+ assert.Equal(t, internal.StateErrored, w.State().Value())
+ }()
+
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ assert.Equal(t, internal.StateReady, w.State().Value())
+ err = w.Kill()
+ if err != nil {
+ t.Errorf("error killing the Process: error %v", err)
+ }
+ wg.Wait()
+}
+
+func Test_Pipe_Start(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ assert.NoError(t, w.Stop())
+}
+
+func Test_Pipe_StartError(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ err := cmd.Start()
+ if err != nil {
+ t.Errorf("error running the command: error %v", err)
+ }
+
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_PipeError(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ _, err := cmd.StdinPipe()
+ if err != nil {
+ t.Errorf("error creating the STDIN pipe: error %v", err)
+ }
+
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_PipeError2(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ _, err := cmd.StdinPipe()
+ if err != nil {
+ t.Errorf("error creating the STDIN pipe: error %v", err)
+ }
+
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_Failboot(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/failboot.php")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+
+ assert.Nil(t, w)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "failboot")
+}
+
+func Test_Pipe_Invalid(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/invalid.php")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Pipe_Echo(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Pipe_Broken(t *testing.T) {
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.Error(t, err)
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+}
+
+func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
+ f := NewPipeFactory()
+ for n := 0; n < b.N; n++ {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd)
+ go func() {
+ if w.Wait() != nil {
+ b.Fail()
+ }
+ }()
+
+ err := w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
+ }
+}
+
+func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd)
+ sw := worker.From(w)
+
+ b.ReportAllocs()
+ b.ResetTimer()
+ go func() {
+ err := w.Wait()
+ if err != nil {
+ b.Errorf("error waiting the worker: error %v", err)
+ }
+ }()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Test_Echo(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sw := worker.From(w)
+ go func() {
+ assert.NoError(t, sw.Wait())
+ }()
+ defer func() {
+ err := sw.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.Nil(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_BadPayload(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+
+ sw := worker.From(w)
+
+ go func() {
+ assert.NoError(t, sw.Wait())
+ }()
+ defer func() {
+ err := sw.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ res, err := sw.Exec(payload.Payload{})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Contains(t, err.Error(), "payload can not be empty")
+}
+
+func Test_String(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "ready")
+ assert.Contains(t, w.String(), "numExecs: 0")
+}
+
+func Test_Echo_Slow(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.Nil(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Broken(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes")
+ data := ""
+ mu := &sync.Mutex{}
+ listener := func(event interface{}) {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ mu.Lock()
+ data = string(wev.Payload.([]byte))
+ mu.Unlock()
+ }
+ }
+
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ assert.NotNil(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ time.Sleep(time.Second * 3)
+ mu.Lock()
+ if strings.ContainsAny(data, "undefined_function()") == false {
+ t.Fail()
+ }
+ mu.Unlock()
+ assert.Error(t, w.Stop())
+}
+
+func Test_Error(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ assert.NotNil(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ if errors.Is(errors.SoftJob, err) == false {
+ t.Fatal("error should be of type errors.ErrSoftJob")
+ }
+ assert.Contains(t, err.Error(), "hello")
+}
+
+func Test_NumExecs(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, uint64(1), w.State().NumExecs())
+
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, uint64(2), w.State().NumExecs())
+
+ _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, uint64(3), w.State().NumExecs())
+}
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
new file mode 100755
index 00000000..ccd2b0bf
--- /dev/null
+++ b/pkg/transport/socket/socket_factory.go
@@ -0,0 +1,228 @@
+package socket
+
+import (
+ "context"
+ "net"
+ "os/exec"
+ "sync"
+ "time"
+
+ "github.com/shirou/gopsutil/process"
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/socket"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+
+ "go.uber.org/multierr"
+ "golang.org/x/sync/errgroup"
+)
+
+// Factory connects to external stack using socket server.
+type Factory struct {
+ // listens for incoming connections from underlying processes
+ ls net.Listener
+
+ // relay connection timeout
+ tout time.Duration
+
+ // sockets which are waiting for process association
+ relays sync.Map
+
+ ErrCh chan error
+}
+
+// todo: review
+
+// NewSocketServer returns Factory attached to a given socket listener.
+// tout specifies for how long factory should serve for incoming relay connection
+func NewSocketServer(ls net.Listener, tout time.Duration) *Factory {
+ f := &Factory{
+ ls: ls,
+ tout: tout,
+ relays: sync.Map{},
+ ErrCh: make(chan error, 10),
+ }
+
+ // Be careful
+ // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go
+ // https://github.com/golang/go/issues/5045
+ go func() {
+ f.ErrCh <- f.listen()
+ }()
+
+ return f
+}
+
+// blocking operation, returns an error
+func (f *Factory) listen() error {
+ errGr := &errgroup.Group{}
+ errGr.Go(func() error {
+ for {
+ conn, err := f.ls.Accept()
+ if err != nil {
+ return err
+ }
+
+ rl := socket.NewSocketRelay(conn)
+ pid, err := internal.FetchPID(rl)
+ if err != nil {
+ return err
+ }
+
+ f.attachRelayToPid(pid, rl)
+ }
+ })
+
+ return errGr.Wait()
+}
+
+type socketSpawn struct {
+ w *worker.Process
+ err error
+}
+
+// SpawnWorker creates Process and connects it to appropriate relay or returns error
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
+ const op = errors.Op("factory_spawn_worker_with_timeout")
+ c := make(chan socketSpawn)
+ go func() {
+ ctx, cancel := context.WithTimeout(ctx, f.tout)
+ defer cancel()
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+ if err != nil {
+ c <- socketSpawn{
+ w: nil,
+ err: err,
+ }
+ return
+ }
+
+ err = w.Start()
+ if err != nil {
+ c <- socketSpawn{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ rl, err := f.findRelayWithContext(ctx, w)
+ if err != nil {
+ err = multierr.Combine(
+ err,
+ w.Kill(),
+ w.Wait(),
+ )
+
+ c <- socketSpawn{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ w.AttachRelay(rl)
+ w.State().Set(internal.StateReady)
+
+ c <- socketSpawn{
+ w: w,
+ err: nil,
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case res := <-c:
+ if res.err != nil {
+ return nil, res.err
+ }
+
+ return res.w, nil
+ }
+}
+
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
+ const op = errors.Op("factory_spawn_worker")
+ w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+ if err != nil {
+ return nil, err
+ }
+
+ err = w.Start()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ rl, err := f.findRelay(w)
+ if err != nil {
+ err = multierr.Combine(
+ err,
+ w.Kill(),
+ w.Wait(),
+ )
+ return nil, err
+ }
+
+ w.AttachRelay(rl)
+ w.State().Set(internal.StateReady)
+
+ return w, nil
+}
+
+// Close socket factory and underlying socket connection.
+func (f *Factory) Close() error {
+ return f.ls.Close()
+}
+
+// waits for Process to connect over socket and returns associated relay of timeout
+func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) {
+ ticker := time.NewTicker(time.Millisecond * 100)
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case <-ticker.C:
+ _, err := process.NewProcess(int32(w.Pid()))
+ if err != nil {
+ return nil, err
+ }
+ default:
+ tmp, ok := f.relays.Load(w.Pid())
+ if !ok {
+ continue
+ }
+ return tmp.(*socket.Relay), nil
+ }
+ }
+}
+
+func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) {
+ const op = errors.Op("factory_find_relay")
+ // poll every 1ms for the relay
+ pollDone := time.NewTimer(f.tout)
+ for {
+ select {
+ case <-pollDone.C:
+ return nil, errors.E(op, errors.Str("relay timeout"))
+ default:
+ tmp, ok := f.relays.Load(w.Pid())
+ if !ok {
+ continue
+ }
+ return tmp.(*socket.Relay), nil
+ }
+ }
+}
+
+// chan to store relay associated with specific pid
+func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) {
+ f.relays.Store(pid, relay)
+}
+
+// deletes relay chan associated with specific pid
+func (f *Factory) removeRelayFromPid(pid int64) {
+ f.relays.Delete(pid)
+}
diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
new file mode 100644
index 00000000..0e29e7d2
--- /dev/null
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -0,0 +1,489 @@
+package socket
+
+import (
+ "net"
+ "os/exec"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_Tcp_Start2(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+}
+
+func Test_Tcp_StartCloseFactory2(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
+
+ f := NewSocketServer(ls, time.Minute)
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+
+ w, err := f.SpawnWorker(cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+}
+
+func Test_Tcp_StartError2(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ err = cmd.Start()
+ if err != nil {
+ t.Errorf("error executing the command: error %v", err)
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Tcp_Failboot2(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err3 := ls.Close()
+ if err3 != nil {
+ t.Errorf("error closing the listener: error %v", err3)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/failboot.php")
+
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
+ assert.Nil(t, w)
+ assert.Error(t, err2)
+ assert.Contains(t, err2.Error(), "failboot")
+}
+
+func Test_Tcp_Invalid2(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/invalid.php")
+
+ w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Tcp_Broken2(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err := w.Wait()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "undefined_function()")
+ }()
+
+ defer func() {
+ time.Sleep(time.Second)
+ err2 := w.Stop()
+ // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
+ assert.Error(t, err2)
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+ wg.Wait()
+}
+
+func Test_Tcp_Echo2(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
+
+ w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Unix_Start2(t *testing.T) {
+ ls, err := net.Listen("unix", "sock.unix")
+ assert.NoError(t, err)
+ defer func() {
+ err := ls.Close()
+ assert.NoError(t, err)
+ }()
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+}
+
+func Test_Unix_Failboot2(t *testing.T) {
+ ls, err := net.Listen("unix", "sock.unix")
+ assert.NoError(t, err)
+ defer func() {
+ err := ls.Close()
+ assert.NoError(t, err)
+ }()
+
+ cmd := exec.Command("php", "../../../tests/failboot.php")
+
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
+ assert.Nil(t, w)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "failboot")
+}
+
+func Test_Unix_Timeout2(t *testing.T) {
+ ls, err := net.Listen("unix", "sock.unix")
+ assert.NoError(t, err)
+ defer func() {
+ err := ls.Close()
+ assert.NoError(t, err)
+ }()
+
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0")
+
+ w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd)
+ assert.Nil(t, w)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "relay timeout")
+}
+
+func Test_Unix_Invalid2(t *testing.T) {
+ ls, err := net.Listen("unix", "sock.unix")
+ assert.NoError(t, err)
+ defer func() {
+ err := ls.Close()
+ assert.NoError(t, err)
+ }()
+
+ cmd := exec.Command("php", "../../../tests/invalid.php")
+
+ w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Unix_Broken2(t *testing.T) {
+ ls, err := net.Listen("unix", "sock.unix")
+ assert.NoError(t, err)
+ defer func() {
+ err := ls.Close()
+ assert.NoError(t, err)
+ }()
+
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err := w.Wait()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "undefined_function()")
+ }()
+
+ defer func() {
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.Error(t, err)
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+ wg.Wait()
+}
+
+func Test_Unix_Echo2(t *testing.T) {
+ ls, err := net.Listen("unix", "sock.unix")
+ assert.NoError(t, err)
+ defer func() {
+ err := ls.Close()
+ assert.NoError(t, err)
+ }()
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) {
+ ls, err := net.Listen("unix", "sock.unix")
+ assert.NoError(b, err)
+ defer func() {
+ err := ls.Close()
+ assert.NoError(b, err)
+ }()
+
+ f := NewSocketServer(ls, time.Minute)
+ for n := 0; n < b.N; n++ {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
+
+ w, err := f.SpawnWorker(cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+ go func() {
+ assert.NoError(b, w.Wait())
+ }()
+
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }
+}
+
+func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
+ ls, err := net.Listen("unix", "sock.unix")
+ assert.NoError(b, err)
+ defer func() {
+ err := ls.Close()
+ assert.NoError(b, err)
+ }()
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) {
+ defer func() {
+ _ = syscall.Unlink("sock.unix")
+ }()
+ ls, err := net.Listen("unix", "sock.unix")
+ if err == nil {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ b.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ b.Skip("socket is busy")
+ }
+
+ f := NewSocketServer(ls, time.Minute)
+ for n := 0; n < b.N; n++ {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
+
+ w, err := f.SpawnWorker(cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }
+}
+
+func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
+ defer func() {
+ _ = syscall.Unlink("sock.unix")
+ }()
+ ls, err := net.Listen("unix", "sock.unix")
+ if err == nil {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ b.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ b.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
new file mode 100755
index 00000000..f55fc3dd
--- /dev/null
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -0,0 +1,572 @@
+package socket
+
+import (
+ "context"
+ "net"
+ "os/exec"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_Tcp_Start(t *testing.T) {
+ ctx := context.Background()
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+}
+
+func Test_Tcp_StartCloseFactory(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
+
+ f := NewSocketServer(ls, time.Minute)
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+}
+
+func Test_Tcp_StartError(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ err = cmd.Start()
+ if err != nil {
+ t.Errorf("error executing the command: error %v", err)
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Tcp_Failboot(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err3 := ls.Close()
+ if err3 != nil {
+ t.Errorf("error closing the listener: error %v", err3)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/failboot.php")
+
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
+ assert.Nil(t, w)
+ assert.Error(t, err2)
+ assert.Contains(t, err2.Error(), "failboot")
+}
+
+func Test_Tcp_Timeout(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "tcp", "200", "0")
+
+ w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd)
+ assert.Nil(t, w)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "context deadline exceeded")
+}
+
+func Test_Tcp_Invalid(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/invalid.php")
+
+ w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Tcp_Broken(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err := w.Wait()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "undefined_function()")
+ }()
+
+ defer func() {
+ time.Sleep(time.Second)
+ err2 := w.Stop()
+ // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
+ assert.Error(t, err2)
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+ wg.Wait()
+}
+
+func Test_Tcp_Echo(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
+
+ w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Unix_Start(t *testing.T) {
+ ctx := context.Background()
+ ls, err := net.Listen("unix", "sock.unix")
+ if err == nil {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+}
+
+func Test_Unix_Failboot(t *testing.T) {
+ ls, err := net.Listen("unix", "sock.unix")
+ ctx := context.Background()
+ if err == nil {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/failboot.php")
+
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
+ assert.Nil(t, w)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "failboot")
+}
+
+func Test_Unix_Timeout(t *testing.T) {
+ ls, err := net.Listen("unix", "sock.unix")
+ ctx := context.Background()
+ if err == nil {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0")
+
+ w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd)
+ assert.Nil(t, w)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "context deadline exceeded")
+}
+
+func Test_Unix_Invalid(t *testing.T) {
+ ctx := context.Background()
+ ls, err := net.Listen("unix", "sock.unix")
+ if err == nil {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/invalid.php")
+
+ w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
+ assert.Error(t, err)
+ assert.Nil(t, w)
+}
+
+func Test_Unix_Broken(t *testing.T) {
+ ctx := context.Background()
+ ls, err := net.Listen("unix", "sock.unix")
+ if err == nil {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err := w.Wait()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "undefined_function()")
+ }()
+
+ defer func() {
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.Error(t, err)
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+ wg.Wait()
+}
+
+func Test_Unix_Echo(t *testing.T) {
+ ctx := context.Background()
+ ls, err := net.Listen("unix", "sock.unix")
+ if err == nil {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
+ ctx := context.Background()
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if err == nil {
+ defer func() {
+ err = ls.Close()
+ if err != nil {
+ b.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ b.Skip("socket is busy")
+ }
+
+ f := NewSocketServer(ls, time.Minute)
+ for n := 0; n < b.N; n++ {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
+
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+ go func() {
+ assert.NoError(b, w.Wait())
+ }()
+
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }
+}
+
+func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
+ ctx := context.Background()
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if err == nil {
+ defer func() {
+ err = ls.Close()
+ if err != nil {
+ b.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ b.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
+ ctx := context.Background()
+ ls, err := net.Listen("unix", "sock.unix")
+ if err == nil {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ b.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ b.Skip("socket is busy")
+ }
+
+ f := NewSocketServer(ls, time.Minute)
+ for n := 0; n < b.N; n++ {
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
+
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }
+}
+
+func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
+ ctx := context.Background()
+ ls, err := net.Listen("unix", "sock.unix")
+ if err == nil {
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ b.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ b.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix")
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
new file mode 100644
index 00000000..9d74ae10
--- /dev/null
+++ b/pkg/worker/interface.go
@@ -0,0 +1,56 @@
+package worker
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+)
+
+type BaseProcess interface {
+ fmt.Stringer
+
+ // Pid returns worker pid.
+ Pid() int64
+
+ // Created returns time worker was created at.
+ Created() time.Time
+
+ // State return receive-only WorkerProcess state object, state can be used to safely access
+ // WorkerProcess status, time when status changed and number of WorkerProcess executions.
+ State() internal.State
+
+ // Start used to run Cmd and immediately return
+ Start() error
+
+ // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
+ // complete and will return process error (if any), if stderr is presented it's value
+ // will be wrapped as WorkerError. Method will return error code if php process fails
+ // to find or Start the script.
+ Wait() error
+
+ // Stop sends soft termination command to the WorkerProcess and waits for process completion.
+ Stop() error
+
+ // Kill kills underlying process, make sure to call Wait() func to gather
+ // error log from the stderr. Does not waits for process completion!
+ Kill() error
+
+ // Relay returns attached to worker goridge relay
+ Relay() relay.Relay
+
+ // AttachRelay used to attach goridge relay to the worker process
+ AttachRelay(rl relay.Relay)
+}
+
+type SyncWorker interface {
+ // BaseProcess provides basic functionality for the SyncWorker
+ BaseProcess
+ // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
+ Exec(rqs payload.Payload) (payload.Payload, error)
+ // ExecWithContext used to handle Exec with TTL
+ ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error)
+}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
new file mode 100755
index 00000000..1a0393fb
--- /dev/null
+++ b/pkg/worker/sync_worker.go
@@ -0,0 +1,244 @@
+package worker
+
+import (
+ "bytes"
+ "context"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/frame"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "go.uber.org/multierr"
+)
+
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (*SyncWorkerImpl, error)
+
+type SyncWorkerImpl struct {
+ process *Process
+}
+
+// From creates SyncWorker from BaseProcess
+func From(process *Process) *SyncWorkerImpl {
+ return &SyncWorkerImpl{
+ process: process,
+ }
+}
+
+// FromSync creates BaseProcess from SyncWorkerImpl
+func FromSync(w *SyncWorkerImpl) BaseProcess {
+ return &Process{
+ created: w.process.created,
+ events: w.process.events,
+ state: w.process.state,
+ cmd: w.process.cmd,
+ pid: w.process.pid,
+ stderr: w.process.stderr,
+ endState: w.process.endState,
+ relay: w.process.relay,
+ rd: w.process.rd,
+ }
+}
+
+// Exec payload without TTL timeout.
+func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("sync_worker_exec")
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
+ }
+
+ if tw.process.State().Value() != internal.StateReady {
+ return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
+ }
+
+ // set last used time
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(internal.StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ // just to be more verbose
+ if errors.Is(errors.SoftJob, err) == false {
+ tw.process.State().Set(internal.StateErrored)
+ tw.process.State().RegisterExec()
+ }
+ return payload.Payload{}, err
+ }
+
+ tw.process.State().Set(internal.StateReady)
+ tw.process.State().RegisterExec()
+
+ return rsp, nil
+}
+
+type wexec struct {
+ payload payload.Payload
+ err error
+}
+
+// Exec payload without TTL timeout.
+func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("sync_worker_exec_worker_with_timeout")
+ c := make(chan wexec, 1)
+
+ go func() {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ c <- wexec{
+ payload: payload.Payload{},
+ err: errors.E(op, errors.Str("payload can not be empty")),
+ }
+ return
+ }
+
+ if tw.process.State().Value() != internal.StateReady {
+ c <- wexec{
+ payload: payload.Payload{},
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
+ }
+ return
+ }
+
+ // set last used time
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(internal.StateWorking)
+
+ rsp, err := tw.execPayload(p)
+ if err != nil {
+ // just to be more verbose
+ if errors.Is(errors.SoftJob, err) == false {
+ tw.process.State().Set(internal.StateErrored)
+ tw.process.State().RegisterExec()
+ }
+ c <- wexec{
+ payload: payload.Payload{},
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ tw.process.State().Set(internal.StateReady)
+ tw.process.State().RegisterExec()
+
+ c <- wexec{
+ payload: rsp,
+ err: nil,
+ }
+ }()
+
+ select {
+ // exec TTL reached
+ case <-ctx.Done():
+ err := multierr.Combine(tw.Kill())
+ if err != nil {
+ // append timeout error
+ err = multierr.Append(err, errors.E(op, errors.ExecTTL))
+ return payload.Payload{}, multierr.Append(err, ctx.Err())
+ }
+ return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return payload.Payload{}, res.err
+ }
+ return res.payload, nil
+ }
+}
+
+func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("sync_worker_exec_payload")
+
+ fr := frame.NewFrame()
+ fr.WriteVersion(frame.VERSION_1)
+ // can be 0 here
+
+ buf := new(bytes.Buffer)
+ buf.Write(p.Context)
+ buf.Write(p.Body)
+
+ // Context offset
+ fr.WriteOptions(uint32(len(p.Context)))
+ fr.WritePayloadLen(uint32(buf.Len()))
+ fr.WritePayload(buf.Bytes())
+
+ fr.WriteCRC()
+
+ // empty and free the buffer
+ buf.Truncate(0)
+
+ err := tw.Relay().Send(fr)
+ if err != nil {
+ return payload.Payload{}, err
+ }
+
+ frameR := frame.NewFrame()
+
+ err = tw.process.Relay().Receive(frameR)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
+ if frameR == nil {
+ return payload.Payload{}, errors.E(op, errors.Str("nil fr received"))
+ }
+
+ if !frameR.VerifyCRC() {
+ return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
+ }
+
+ flags := frameR.ReadFlags()
+
+ if flags&byte(frame.ERROR) != byte(0) {
+ return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
+ }
+
+ options := frameR.ReadOptions()
+ if len(options) != 1 {
+ return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
+ }
+
+ pl := payload.Payload{}
+ pl.Context = frameR.Payload()[:options[0]]
+ pl.Body = frameR.Payload()[options[0]:]
+
+ return pl, nil
+}
+
+func (tw *SyncWorkerImpl) String() string {
+ return tw.process.String()
+}
+
+func (tw *SyncWorkerImpl) Pid() int64 {
+ return tw.process.Pid()
+}
+
+func (tw *SyncWorkerImpl) Created() time.Time {
+ return tw.process.Created()
+}
+
+func (tw *SyncWorkerImpl) State() internal.State {
+ return tw.process.State()
+}
+
+func (tw *SyncWorkerImpl) Start() error {
+ return tw.process.Start()
+}
+
+func (tw *SyncWorkerImpl) Wait() error {
+ return tw.process.Wait()
+}
+
+func (tw *SyncWorkerImpl) Stop() error {
+ return tw.process.Stop()
+}
+
+func (tw *SyncWorkerImpl) Kill() error {
+ return tw.process.Kill()
+}
+
+func (tw *SyncWorkerImpl) Relay() relay.Relay {
+ return tw.process.Relay()
+}
+
+func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) {
+ tw.process.AttachRelay(rl)
+}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
new file mode 100755
index 00000000..df556e93
--- /dev/null
+++ b/pkg/worker/sync_worker_test.go
@@ -0,0 +1,34 @@
+package worker
+
+import (
+ "os/exec"
+ "testing"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_NotStarted_String(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := InitBaseWorker(cmd)
+ assert.Contains(t, w.String(), "php tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "inactive")
+ assert.Contains(t, w.String(), "numExecs: 0")
+}
+
+func Test_NotStarted_Exec(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := InitBaseWorker(cmd)
+
+ sw := From(w)
+
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Contains(t, err.Error(), "Process is not ready (inactive)")
+}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
new file mode 100755
index 00000000..8fd71cca
--- /dev/null
+++ b/pkg/worker/worker.go
@@ -0,0 +1,318 @@
+package worker
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "os"
+ "os/exec"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "go.uber.org/multierr"
+)
+
+const (
+ // WaitDuration - for how long error buffer should attempt to aggregate error messages
+ // before merging output together since lastError update (required to keep error update together).
+ WaitDuration = 25 * time.Millisecond
+
+ // ReadBufSize used to make a slice with specified length to read from stderr
+ ReadBufSize = 10240 // Kb
+)
+
+type Options func(p *Process)
+
+// Process - supervised process with api over goridge.Relay.
+type Process struct {
+ // created indicates at what time Process has been created.
+ created time.Time
+
+ // updates parent supervisor or pool about Process events
+ events events.Handler
+
+ // state holds information about current Process state,
+ // number of Process executions, buf status change time.
+ // publicly this object is receive-only and protected using Mutex
+ // and atomic counter.
+ state *internal.WorkerState
+
+ // underlying command with associated process, command must be
+ // provided to Process from outside in non-started form. CmdSource
+ // stdErr direction will be handled by Process to aggregate error message.
+ cmd *exec.Cmd
+
+ // pid of the process, points to pid of underlying process and
+ // can be nil while process is not started.
+ pid int
+
+ // stderr aggregates stderr output from underlying process. Value can be
+ // receive only once command is completed and all pipes are closed.
+ stderr *bytes.Buffer
+
+ // channel is being closed once command is complete.
+ // waitDone chan interface{}
+
+ // contains information about resulted process state.
+ endState *os.ProcessState
+
+ // ensures than only one execution can be run at once.
+ mu sync.RWMutex
+
+ // communication bus with underlying process.
+ relay relay.Relay
+ // rd in a second part of pipe to read from stderr
+ rd io.Reader
+ // stop signal terminates io.Pipe from reading from stderr
+ stop chan struct{}
+
+ syncPool sync.Pool
+}
+
+// InitBaseWorker creates new Process over given exec.cmd.
+func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
+ const op = errors.Op("init_base_worker")
+ if cmd.Process != nil {
+ return nil, fmt.Errorf("can't attach to running process")
+ }
+ w := &Process{
+ created: time.Now(),
+ events: events.NewEventsHandler(),
+ cmd: cmd,
+ state: internal.NewWorkerState(internal.StateInactive),
+ stderr: new(bytes.Buffer),
+ stop: make(chan struct{}, 1),
+ // sync pool for STDERR
+ // All receivers are pointers
+ syncPool: sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, ReadBufSize)
+ return &buf
+ },
+ },
+ }
+
+ w.rd, w.cmd.Stderr = io.Pipe()
+
+ // small buffer optimization
+ // at this point we know, that stderr will contain huge messages
+ w.stderr.Grow(ReadBufSize)
+
+ // add options
+ for i := 0; i < len(options); i++ {
+ options[i](w)
+ }
+
+ go func() {
+ w.watch()
+ }()
+
+ return w, nil
+}
+
+func AddListeners(listeners ...events.Listener) Options {
+ return func(p *Process) {
+ for i := 0; i < len(listeners); i++ {
+ p.addListener(listeners[i])
+ }
+ }
+}
+
+// Pid returns worker pid.
+func (w *Process) Pid() int64 {
+ return int64(w.pid)
+}
+
+// Created returns time worker was created at.
+func (w *Process) Created() time.Time {
+ return w.created
+}
+
+// AddListener registers new worker event listener.
+func (w *Process) addListener(listener events.Listener) {
+ w.events.AddListener(listener)
+}
+
+// State return receive-only Process state object, state can be used to safely access
+// Process status, time when status changed and number of Process executions.
+func (w *Process) State() internal.State {
+ return w.state
+}
+
+// State return receive-only Process state object, state can be used to safely access
+// Process status, time when status changed and number of Process executions.
+func (w *Process) AttachRelay(rl relay.Relay) {
+ w.relay = rl
+}
+
+// State return receive-only Process state object, state can be used to safely access
+// Process status, time when status changed and number of Process executions.
+func (w *Process) Relay() relay.Relay {
+ return w.relay
+}
+
+// String returns Process description. fmt.Stringer interface
+func (w *Process) String() string {
+ st := w.state.String()
+ // we can safely compare pid to 0
+ if w.pid != 0 {
+ st = st + ", pid:" + strconv.Itoa(w.pid)
+ }
+
+ return fmt.Sprintf(
+ "(`%s` [%s], numExecs: %v)",
+ strings.Join(w.cmd.Args, " "),
+ st,
+ w.state.NumExecs(),
+ )
+}
+
+func (w *Process) Start() error {
+ err := w.cmd.Start()
+ if err != nil {
+ return err
+ }
+ w.pid = w.cmd.Process.Pid
+ return nil
+}
+
+// Wait must be called once for each Process, call will be released once Process is
+// complete and will return process error (if any), if stderr is presented it's value
+// will be wrapped as WorkerError. Method will return error code if php process fails
+// to find or Start the script.
+func (w *Process) Wait() error {
+ const op = errors.Op("process_wait")
+ err := multierr.Combine(w.cmd.Wait())
+
+ if w.State().Value() == internal.StateDestroyed {
+ return errors.E(op, err)
+ }
+
+ // at this point according to the documentation (see cmd.Wait comment)
+ // if worker finishes with an error, message will be written to the stderr first
+ // and then process.cmd.Wait return an error
+ w.endState = w.cmd.ProcessState
+ if err != nil {
+ w.state.Set(internal.StateErrored)
+
+ w.mu.RLock()
+ // if process return code > 0, here will be an error from stderr (if presents)
+ if w.stderr.Len() > 0 {
+ err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
+ // stop the stderr buffer
+ w.stop <- struct{}{}
+ }
+ w.mu.RUnlock()
+
+ return multierr.Append(err, w.closeRelay())
+ }
+
+ err = multierr.Append(err, w.closeRelay())
+ if err != nil {
+ w.state.Set(internal.StateErrored)
+ return err
+ }
+
+ if w.endState.Success() {
+ w.state.Set(internal.StateStopped)
+ }
+
+ w.stderr.Reset()
+
+ return nil
+}
+
+func (w *Process) closeRelay() error {
+ if w.relay != nil {
+ err := w.relay.Close()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Stop sends soft termination command to the Process and waits for process completion.
+func (w *Process) Stop() error {
+ var err error
+ w.state.Set(internal.StateStopping)
+ err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
+ if err != nil {
+ w.state.Set(internal.StateKilling)
+ return multierr.Append(err, w.cmd.Process.Kill())
+ }
+ w.state.Set(internal.StateStopped)
+ return nil
+}
+
+// Kill kills underlying process, make sure to call Wait() func to gather
+// error log from the stderr. Does not waits for process completion!
+func (w *Process) Kill() error {
+ if w.State().Value() == internal.StateDestroyed {
+ err := w.cmd.Process.Signal(os.Kill)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+
+ w.state.Set(internal.StateKilling)
+ err := w.cmd.Process.Signal(os.Kill)
+ if err != nil {
+ return err
+ }
+ w.state.Set(internal.StateStopped)
+ return nil
+}
+
+// put the pointer, to not allocate new slice
+// but erase it len and then return back
+func (w *Process) put(data *[]byte) {
+ w.syncPool.Put(data)
+}
+
+// get pointer to the byte slice
+func (w *Process) get() *[]byte {
+ return w.syncPool.Get().(*[]byte)
+}
+
+// Write appends the contents of pool to the errBuffer, growing the errBuffer as
+// needed. The return value n is the length of pool; errBuffer is always nil.
+func (w *Process) watch() {
+ go func() {
+ for {
+ select {
+ case <-w.stop:
+ buf := w.get()
+ // read the last data
+ n, _ := w.rd.Read(*buf)
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.mu.Lock()
+ // write new message
+ // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool
+ w.stderr.Write((*buf)[:n])
+ w.mu.Unlock()
+ w.put(buf)
+ return
+ default:
+ // read the max 10kb of stderr per one read
+ buf := w.get()
+ n, _ := w.rd.Read(*buf)
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.mu.Lock()
+ // delete all prev messages
+ w.stderr.Reset()
+ // write new message
+ w.stderr.Write((*buf)[:n])
+ w.mu.Unlock()
+ w.put(buf)
+ }
+ }
+ }()
+}
diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go
new file mode 100755
index 00000000..805f66b5
--- /dev/null
+++ b/pkg/worker/worker_test.go
@@ -0,0 +1,19 @@
+package worker
+
+import (
+ "os/exec"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_OnStarted(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+ assert.Nil(t, cmd.Start())
+
+ w, err := InitBaseWorker(cmd)
+ assert.Nil(t, w)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, "can't attach to running process", err.Error())
+}
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
new file mode 100644
index 00000000..927aa270
--- /dev/null
+++ b/pkg/worker_watcher/interface.go
@@ -0,0 +1,30 @@
+package worker_watcher //nolint:golint,stylecheck
+
+import (
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Watcher interface {
+ // AddToWatch used to add stack to wait its state
+ AddToWatch(workers []worker.SyncWorker) error
+
+ // GetFreeWorker provide first free worker
+ GetFreeWorker(ctx context.Context) (worker.SyncWorker, error)
+
+ // PutWorker enqueues worker back
+ PushWorker(w worker.SyncWorker)
+
+ // AllocateNew used to allocate new worker and put in into the WorkerWatcher
+ AllocateNew() error
+
+ // Destroy destroys the underlying stack
+ Destroy(ctx context.Context)
+
+ // WorkersList return all stack w/o removing it from internal storage
+ WorkersList() []worker.SyncWorker
+
+ // RemoveWorker remove worker from the stack
+ RemoveWorker(wb worker.SyncWorker) error
+}
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
new file mode 100644
index 00000000..d76f4d8f
--- /dev/null
+++ b/pkg/worker_watcher/stack.go
@@ -0,0 +1,142 @@
+package worker_watcher //nolint:golint,stylecheck
+import (
+ "context"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Stack struct {
+ workers []*worker.SyncWorkerImpl
+ mutex sync.RWMutex
+ destroy bool
+ actualNumOfWorkers uint64
+ initialNumOfWorkers uint64
+}
+
+func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
+ w := runtime.NumCPU()
+ return &Stack{
+ workers: make([]*worker.SyncWorkerImpl, 0, w),
+ actualNumOfWorkers: 0,
+ initialNumOfWorkers: initialNumOfWorkers,
+ }
+}
+
+func (stack *Stack) Reset() {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ stack.actualNumOfWorkers = 0
+ stack.workers = nil
+}
+
+// Push worker back to the stack
+// If stack in destroy state, Push will provide 100ms window to unlock the mutex
+func (stack *Stack) Push(w worker.BaseProcess) {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ stack.actualNumOfWorkers++
+ stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl))
+}
+
+func (stack *Stack) IsEmpty() bool {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ return len(stack.workers) == 0
+}
+
+func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+
+ // do not release new stack
+ if stack.destroy {
+ return nil, true
+ }
+
+ if len(stack.workers) == 0 {
+ return nil, false
+ }
+
+ // move worker
+ w := stack.workers[len(stack.workers)-1]
+ stack.workers = stack.workers[:len(stack.workers)-1]
+ stack.actualNumOfWorkers--
+ return w, false
+}
+
+func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ for i := 0; i < len(stack.workers); i++ {
+ // worker in the stack, reallocating
+ if stack.workers[i].Pid() == pid {
+ stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
+ stack.actualNumOfWorkers--
+ // worker found and removed
+ return true
+ }
+ }
+ // no worker with such ID
+ return false
+}
+
+// Workers return copy of the workers in the stack
+func (stack *Stack) Workers() []worker.SyncWorker {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ workersCopy := make([]worker.SyncWorker, 0, 1)
+ // copy
+ for _, v := range stack.workers {
+ if v != nil {
+ workersCopy = append(workersCopy, v)
+ }
+ }
+
+ return workersCopy
+}
+
+func (stack *Stack) isDestroying() bool {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ return stack.destroy
+}
+
+// we also have to give a chance to pool to Push worker (return it)
+func (stack *Stack) Destroy(ctx context.Context) {
+ stack.mutex.Lock()
+ stack.destroy = true
+ stack.mutex.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 500)
+ defer tt.Stop()
+ for {
+ select {
+ case <-tt.C:
+ stack.mutex.Lock()
+ // that might be one of the workers is working
+ if stack.initialNumOfWorkers != stack.actualNumOfWorkers {
+ stack.mutex.Unlock()
+ continue
+ }
+ stack.mutex.Unlock()
+ // unnecessary mutex, but
+ // just to make sure. All stack at this moment are in the stack
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ stack.mutex.Lock()
+ for i := 0; i < len(stack.workers); i++ {
+ // set state for the stack in the stack (unused at the moment)
+ stack.workers[i].State().Set(internal.StateDestroyed)
+ // kill the worker
+ _ = stack.workers[i].Kill()
+ }
+ stack.mutex.Unlock()
+ // clear
+ stack.Reset()
+ return
+ }
+ }
+}
diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go
new file mode 100644
index 00000000..5287a6dc
--- /dev/null
+++ b/pkg/worker_watcher/stack_test.go
@@ -0,0 +1,142 @@
+package worker_watcher //nolint:golint,stylecheck
+import (
+ "context"
+ "os/exec"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewWorkersStack(t *testing.T) {
+ stack := NewWorkersStack(0)
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+ assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers)
+}
+
+func TestStack_Push(t *testing.T) {
+ stack := NewWorkersStack(1)
+
+ w, err := worker.InitBaseWorker(&exec.Cmd{})
+ assert.NoError(t, err)
+
+ sw := worker.From(w)
+
+ stack.Push(sw)
+ assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
+}
+
+func TestStack_Pop(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+
+ sw := worker.From(w)
+
+ stack.Push(sw)
+ assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
+
+ _, _ = stack.Pop()
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+}
+
+func TestStack_FindAndRemoveByPid(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+
+ assert.NoError(t, w.Start())
+
+ sw := worker.From(w)
+
+ stack.Push(sw)
+ assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
+
+ stack.FindAndRemoveByPid(w.Pid())
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+}
+
+func TestStack_IsEmpty(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+
+ sw := worker.From(w)
+ stack.Push(sw)
+
+ assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
+
+ assert.Equal(t, false, stack.IsEmpty())
+}
+
+func TestStack_Workers(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+ assert.NoError(t, w.Start())
+
+ sw := worker.From(w)
+ stack.Push(sw)
+
+ wrks := stack.Workers()
+ assert.Equal(t, 1, len(wrks))
+ assert.Equal(t, w.Pid(), wrks[0].Pid())
+}
+
+func TestStack_Reset(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+ assert.NoError(t, w.Start())
+
+ sw := worker.From(w)
+ stack.Push(sw)
+
+ assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
+ stack.Reset()
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+}
+
+func TestStack_Destroy(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+ assert.NoError(t, w.Start())
+
+ sw := worker.From(w)
+ stack.Push(sw)
+
+ stack.Destroy(context.Background())
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+}
+
+func TestStack_DestroyWithWait(t *testing.T) {
+ stack := NewWorkersStack(2)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+ assert.NoError(t, w.Start())
+
+ sw := worker.From(w)
+ stack.Push(sw)
+ stack.Push(sw)
+ assert.Equal(t, uint64(2), stack.actualNumOfWorkers)
+
+ go func() {
+ wrk, _ := stack.Pop()
+ time.Sleep(time.Second * 3)
+ stack.Push(wrk)
+ }()
+ time.Sleep(time.Second)
+ stack.Destroy(context.Background())
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
new file mode 100755
index 00000000..753b61ee
--- /dev/null
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -0,0 +1,165 @@
+package worker_watcher //nolint:golint,stylecheck
+
+import (
+ "context"
+ "sync"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
+ ww := &workerWatcher{
+ stack: NewWorkersStack(numWorkers),
+ allocator: allocator,
+ events: events,
+ }
+
+ return ww
+}
+
+type workerWatcher struct {
+ mutex sync.RWMutex
+ stack *Stack
+ allocator worker.Allocator
+ events events.Handler
+}
+
+func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error {
+ for i := 0; i < len(workers); i++ {
+ ww.stack.Push(workers[i])
+
+ go func(swc worker.SyncWorker) {
+ ww.wait(swc)
+ }(workers[i])
+ }
+ return nil
+}
+
+func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) {
+ const op = errors.Op("worker_watcher_get_free_worker")
+ // thread safe operation
+ w, stop := ww.stack.Pop()
+ if stop {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+
+ // handle worker remove state
+ // in this state worker is destroyed by supervisor
+ if w != nil && w.State().Value() == internal.StateRemove {
+ err := ww.RemoveWorker(w)
+ if err != nil {
+ return nil, err
+ }
+ // try to get next
+ return ww.GetFreeWorker(ctx)
+ }
+ // no free stack
+ if w == nil {
+ for {
+ select {
+ default:
+ w, stop = ww.stack.Pop()
+ if stop {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+ if w == nil {
+ continue
+ }
+ return w, nil
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
+ }
+ }
+ }
+
+ return w, nil
+}
+
+func (ww *workerWatcher) AllocateNew() error {
+ ww.stack.mutex.Lock()
+ const op = errors.Op("worker_watcher_allocate_new")
+ sw, err := ww.allocator()
+ if err != nil {
+ return errors.E(op, errors.WorkerAllocate, err)
+ }
+
+ ww.addToWatch(sw)
+ ww.stack.mutex.Unlock()
+ ww.PushWorker(sw)
+
+ return nil
+}
+
+func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
+ ww.mutex.Lock()
+ defer ww.mutex.Unlock()
+
+ const op = errors.Op("worker_watcher_remove_worker")
+ pid := wb.Pid()
+
+ if ww.stack.FindAndRemoveByPid(pid) {
+ wb.State().Set(internal.StateRemove)
+ err := wb.Kill()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ }
+
+ return nil
+}
+
+// O(1) operation
+func (ww *workerWatcher) PushWorker(w worker.SyncWorker) {
+ ww.mutex.Lock()
+ defer ww.mutex.Unlock()
+ ww.stack.Push(w)
+}
+
+// Destroy all underlying stack (but let them to complete the task)
+func (ww *workerWatcher) Destroy(ctx context.Context) {
+ // destroy stack, we don't use ww mutex here, since we should be able to push worker
+ ww.stack.Destroy(ctx)
+}
+
+// Warning, this is O(n) operation, and it will return copy of the actual workers
+func (ww *workerWatcher) WorkersList() []worker.SyncWorker {
+ return ww.stack.Workers()
+}
+
+func (ww *workerWatcher) wait(w worker.BaseProcess) {
+ const op = errors.Op("worker_watcher_wait")
+ err := w.Wait()
+ if err != nil {
+ ww.events.Push(events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Worker: w,
+ Payload: errors.E(op, err),
+ })
+ }
+
+ if w.State().Value() == internal.StateDestroyed {
+ // worker was manually destroyed, no need to replace
+ ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ return
+ }
+
+ _ = ww.stack.FindAndRemoveByPid(w.Pid())
+ err = ww.AllocateNew()
+ if err != nil {
+ ww.events.Push(events.PoolEvent{
+ Event: events.EventPoolError,
+ Payload: errors.E(op, err),
+ })
+ }
+}
+
+func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) {
+ go func() {
+ ww.wait(wb)
+ }()
+}