diff options
Diffstat (limited to 'pkg')
30 files changed, 5297 insertions, 0 deletions
diff --git a/pkg/doc/README.md b/pkg/doc/README.md new file mode 100644 index 00000000..4f726f4a --- /dev/null +++ b/pkg/doc/README.md @@ -0,0 +1,21 @@ +This is the drawio diagrams showing basic workflows inside RoadRunner 2.0 + +Simple HTTP workflow description: +![alt text](pool_workflow.svg) + +1. Allocate sync workers. When plugin starts (which use workers pool), then it allocates required number of processes + via `cmd.exec` command. + +2. When user send HTTP request to the RR2, HTTP plugin receive it and transfer to the workers pool `Exec/ExecWithContex` +method. And workers pool ask Worker watcher to get free worker. + +3. Workers watcher uses stack data structure under the hood and making POP operation to get first free worker. If there are +no workers in the `stack`, watcher waits for the specified via config (`allocate_timeout`) time. + +4. Stack returns free worker to the watcher. +5. Watcher returns that worker to the `pool`. +6. Pool invoke `Exec/ExecWithTimeout` method on the golang worker with provided request payload. +7. Golang worker send that request to the PHP worker via various set of transports (`pkg/transport` package). +8. PHP worker send back response to the golang worker (or error via stderr). +9. Golang worker return response payload to the pool. +10. Pool process this response and return answer to the user.
\ No newline at end of file diff --git a/pkg/doc/pool_workflow.drawio b/pkg/doc/pool_workflow.drawio new file mode 100644 index 00000000..3f74d0fc --- /dev/null +++ b/pkg/doc/pool_workflow.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-01-24T16:29:37.978Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="8v25I0qLU_vMqqVrx7cN" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">7Vxbd6M4Ev41OWfnoX0Qkrg8Jul09+zOTGc6vSfTjwRkm22MvCB37Pn1IwGydcExsTGO03HOiUEgLqqqr6o+lXwBr2fLj0U0n/5OE5JduE6yvIDvL1wXAMflX6Jl1bQ4jlO3TIo0qduUhrv0byJPbFoXaULKpq1uYpRmLJ3rjTHNcxIzrS0qCvqonzamWaI1zKMJsRru4iizW+/ThE3li3nh5sAnkk6m8tYeRvWRWSTPbl6lnEYJfVSa4M0FvC4oZfXWbHlNMjF8+sB82HJ0/WQFyVmXDn/hCPvfPk++/Ocm+HyPr5Z/TsfvvOYyP6Js0bzyhetl/IJXU37Im4itu3Q25wPiOv8tScG/CvL/BSmZPJHfcnNu865sJUewoIs8IeIZHH74cZoycjePYnH0kSuNuBGbZXwP8M0xzdmHaJZmQl+u6aJIqxv+Qfi4XUVZOsl5e0bG4k4/SMFSLqnLppnR+fr+6qjIN+Snk6XS1IzSR0JnhBUrfkpzFGNcd2mUFrmNFj+qGhA2GjBVpB9ItY0atZusL74RDN9oZPMMOYHQGliScE1tdmnBpnRC8yi72bRe6UO/Oec3KoaqGvD/EcZWjdlFC0Z1cZBlyv4S3Ucubna/KYfeL5tLVzsruZPzF1Z7if1v6sFNv2pPdtSE/2/CHooozUv+0r/TnD4l2pIrSkyeGr0GOaJiQtgT50lTFUP7pKYUJItY+kMHiTahN11vacqfea1hoedpGgYc2SCvUT9q083QnfVz7K9Orm32H8XrcNgS/8pVHgt9p8X3yv7+dfv5ln/ROSn4a9P8l1Zl/C164PCvKZA02ZiLjF/JNtpZmiS1rpIy/Tt6qK4nlGEuXr0aDHx1gd/vpR5PWpKFB2sn0jyFBtNtOPHOGTke0iUZ1HsH6sc7AJ2RDkL8ZsB1lA80VYaOxyU5jrZgS1vgmwbUQnE939e9RT8agAz5hyMHDSZv/7TOxledjdPV2fiaswEv3tnAvn3NQQGGRC7FxvEZ27hU4cNtnJs4QlDHYtiPjUPDxvEogEPZOHTsCEDcae3030RfwTvwdNm7vYgemaJ/B9DIw0MJ347+7iupi8G+j1g8bVGA5+VxaZZd04wWVV84HhMvjnl7yQr6nShHEj98cA7C4+55XQgNlwqbCErN63BjF2pa5x0tqwPWMA/qaPdJ6rSMbrT2uYO7WbejmwV9u9mq62VRRCvlhAbNthp8YOpeE71vtKe+Yr9m7lp2fpllNI6YleSVZwz4YBsS7AP4GHlHiecD3Y0gA1KOGMxDSwnaUecnFDaAGGhi6SmwQ7pvxyPgDyZuS9q/5ilLuSS5N+jTpycRCcatPt2LA/IwHsan+4GOq4HT4tOdFp9uGmB/Pr09bXpjajt5dXgqr36YzD3L6lQ69Y572vtzT6ukXvfEmjm6P/T7AV7fzKu8keerrKk7GA4jSyXcN+nX0oc8/NWl1I/0PYsz9Qfzu/CksA8UzFf40+flcv7pkrmusA/dF4X70MZ974yNHPZm5Dy0xrgnqwZoBAejRVE7SJ/V1Acf/B2GXO3dkiLlwyV076VYN+rbuvfiaoADA82PyKTiQ9cOPtrVwceHdcAONtT/CPwRaqEOZDHTOeKbNO4+8M1xJROxL74NAGZ2DHpH8qR65nJO81IkJxyL+L+p2FyUZ52h9CfeutjCmKHphwj0Xf2qbjAabNrHNuebJYn5pcTXfcqm11wQZMksHTh/nsic+2nlidrmfo7GE21JB88q0jhdkYWs1DuT6R/gID1C4A27PP6OHsdx+a6d0aAzdgnSyPrw+BAiA7t7mi1wDdoCgAHrQNzQEvivAgKjir4cF0R8recIryt0LETbii7EAxb2rMJPqh6uo08cyqmYA9UDADxCoakgCKmsJhhMW+yqoTsWxd8rR8r96Pbp5INCCEyCBLWFEIH7IOpgTxJCACc4df0ItI33tOsENBrS7UhDrqMItdtpaEi3Y1TRtXZzky46h9JhA6QHrqVNRn7wNZ0RuniN+QFwPd25tyYIshZwkAQB2umaNfDlNJqLzXFGlpdiuVtlLEmz+T7OorJMY8NYN9F8r5O++5SV7W+qOOzZVBU5t2G4bDvUmzuSQZPMHzCWcdRvbq38aSMdDQ7RGXYNEbTZJXN94BZtXcyyy5hRNQas4sVbWqZieRE/5YEyRmctQWK9ym/bksEjef7QWBAo1/4pOgMH9ft2sf6a2GuGXuX1tpR0V1TP0UJ0ESWUdcAwkJgAMDFcptAqhoeBLSizPKs3QSG70voLiUldG7IhYV+9YHyj2iN0bMHI9SbDCMZe0maJgV8mnZdEl4TEsJKjK+O5DyN2SFMnZW0hzXjcHBlg1D3PoDzlhMUO1ILHQi1kc0v2mO9MVhRZmEawdhpKCETnJK9bmuwleFIuL6/OXWrq7sXLHQOdQycwjNXxARo27EC+pUW32WKS5rxtXeparVJ+9bjqOUYsGUIbVuUvPAwCq7Kc+q0K6qj0g+QDX0gZFLYz1fYVxOfBCuNthrhXpSuSBnho5ojaS1kNtvn4NBE+7erwM6cYUWfeond/fpiN2ylncM423uP6cBA6ejlzP/OCAIhCaeWjzwfLepQByojsWYYtqxkOWiOMxV8rD1x9OsheHFf61Z+eUliMXtpCI2zzbzVdz990ldEoefXxL0BGjUQb4SOXGgwT/9q0whfy46fje5Br1MMGLUTcoHwPtjHs09ev4ket5k3yeBCU7QImA+qCmLT/HMJDwHHmyYmRZ7DW2FhkIxfDaIjVwoZCdCQheHb1gDXuL2lGa6sQdgZvuGuCdqJZJ+yH+jW6zzo54QgHys+Q6TpmTnkcmQyShL9KBlGadUBY1SRzmpNngW4bHOiAsUu1LMtfc8Z94K9jBCt+S6UKalE391iximczBB9pFuUT3rYllvzJRAbd3SJrjS+PJzM7lrn9JDxm5+nEVy0wLHmdJwQWoF7kxXc3PxNcQ+fm55bhzT8=</diagram></mxfile>
\ No newline at end of file diff --git a/pkg/doc/pool_workflow.svg b/pkg/doc/pool_workflow.svg new file mode 100644 index 00000000..1e043eaa --- /dev/null +++ b/pkg/doc/pool_workflow.svg @@ -0,0 +1,3 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> +<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="1200px" height="811px" viewBox="-0.5 -0.5 1200 811" content="<mxfile host="Electron" modified="2021-01-24T14:08:35.229Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.1.8 Chrome/87.0.4280.141 Electron/11.2.1 Safari/537.36" etag="XUF1gTxKr8mJfXGLcVd8" version="14.1.8" type="device"><diagram id="8w40hpb1-UDYxj1ewOsN" name="Page-1">7Vxbd6M4Ev41OWfnoX0Qkrg8Jul09+zOTGc6vSfTjwRkm22MvCB37Pn1IwGydcExsTGO03HOiUEgLqqqr6o+lXwBr2fLj0U0n/5OE5JduE6yvIDvL1wXAMflX6Jl1bQ4jlO3TIo0qduUhrv0byJPbFoXaULKpq1uYpRmLJ3rjTHNcxIzrS0qCvqonzamWaI1zKMJsRru4iizW+/ThE3li3nh5sAnkk6m8tYeRvWRWSTPbl6lnEYJfVSa4M0FvC4oZfXWbHlNMjF8+sB82HJ0/WQFyVmXDn/hCPvfPk++/Ocm+HyPr5Z/TsfvvOYyP6Js0bzyhetl/IJXU37Im4itu3Q25wPiOv8tScG/CvL/BSmZPJHfcnNu865sJUewoIs8IeIZHH74cZoycjePYnH0kSuNuBGbZXwP8M0xzdmHaJZmQl+u6aJIqxv+Qfi4XUVZOsl5e0bG4k4/SMFSLqnLppnR+fr+6qjIN+Snk6XS1IzSR0JnhBUrfkpzFGNcd2mUFrmNFj+qGhA2GjBVpB9ItY0atZusL74RDN9oZPMMOYHQGliScE1tdmnBpnRC8yi72bRe6UO/Oec3KoaqGvD/EcZWjdlFC0Z1cZBlyv4S3Ucubna/KYfeL5tLVzsruZPzF1Z7if1v6sFNv2pPdtSE/2/CHooozUv+0r/TnD4l2pIrSkyeGr0GOaJiQtgT50lTFUP7pKYUJItY+kMHiTahN11vacqfea1hoedpGgYc2SCvUT9q083QnfVz7K9Orm32H8XrcNgS/8pVHgt9p8X3yv7+dfv5ln/ROSn4a9P8l1Zl/C164PCvKZA02ZiLjF/JNtpZmiS1rpIy/Tt6qK4nlGEuXr0aDHx1gd/vpR5PWpKFB2sn0jyFBtNtOPHOGTke0iUZ1HsH6sc7AJ2RDkL8ZsB1lA80VYaOxyU5jrZgS1vgmwbUQnE939e9RT8agAz5hyMHDSZv/7TOxledjdPV2fiaswEv3tnAvn3NQQGGRC7FxvEZ27hU4cNtnJs4QlDHYtiPjUPDxvEogEPZOHTsCEDcae3030RfwTvwdNm7vYgemaJ/B9DIw0MJ347+7iupi8G+j1g8bVGA5+VxaZZd04wWVV84HhMvjnl7yQr6nShHEj98cA7C4+55XQgNlwqbCErN63BjF2pa5x0tqwPWMA/qaPdJ6rSMbrT2uYO7WbejmwV9u9mq62VRRCvlhAbNthp8YOpeE71vtKe+Yr9m7lp2fpllNI6YleSVZwz4YBsS7AP4GHlHiecD3Y0gA1KOGMxDSwnaUecnFDaAGGhi6SmwQ7pvxyPgDyZuS9q/5ilLuSS5N+jTpycRCcatPt2LA/IwHsan+4GOq4HT4tOdFp9uGmB/Pr09bXpjajt5dXgqr36YzD3L6lQ69Y572vtzT6ukXvfEmjm6P/T7AV7fzKu8keerrKk7GA4jSyXcN+nX0oc8/NWl1I/0PYsz9Qfzu/CksA8UzFf40+flcv7pkrmusA/dF4X70MZ974yNHPZm5Dy0xrgnqwZoBAejRVE7SJ/V1Acf/B2GXO3dkiLlwyV076VYN+rbuvfiaoADA82PyKTiQ9cOPtrVwceHdcAONtT/CPwRaqEOZDHTOeKbNO4+8M1xJROxL74NAGZ2DHpH8qR65nJO81IkJxyL+L+p2FyUZ52h9CfeutjCmKHphwj0Xf2qbjAabNrHNuebJYn5pcTXfcqm11wQZMksHTh/nsic+2nlidrmfo7GE21JB88q0jhdkYWs1DuT6R/gID1C4A27PP6OHsdx+a6d0aAzdgnSyPrw+BAiA7t7mi1wDdoCgAHrQNzQEvivAgKjir4cF0R8recIryt0LETbii7EAxb2rMJPqh6uo08cyqmYA9UDADxCoakgCKmsJhhMW+yqoTsWxd8rR8r96Pbp5INCCEyCBLWFEIH7IOpgTxJCACc4df0ItI33tOsENBrS7UhDrqMItdtpaEi3Y1TRtXZzky46h9JhA6QHrqVNRn7wNZ0RuniN+QFwPd25tyYIshZwkAQB2umaNfDlNJqLzXFGlpdiuVtlLEmz+T7OorJMY8NYN9F8r5O++5SV7W+qOOzZVBU5t2G4bDvUmzuSQZPMHzCWcdRvbq38aSMdDQ7RGXYNEbTZJXN94BZtXcyyy5hRNQas4sVbWqZieRE/5YEyRmctQWK9ym/bksEjef7QWBAo1/4pOgMH9ft2sf6a2GuGXuX1tpR0V1TP0UJ0ESWUdcAwkJgAMDFcptAqhoeBLSizPKs3QSG70voLiUldG7IhYV+9YHyj2iN0bMHI9SbDCMZe0maJgV8mnZdEl4TEsJKjK+O5DyN2SFMnZW0hzXjcHBlg1D3PoDzlhMUO1ILHQi1kc0v2mO9MVhRZmEawdhpKCETnJK9bmuwleFIuL6/OXWrq7sXLHQOdQycwjNXxARo27EC+pUW32WKS5rxtXeparVJ+9bjqOUYsGUIbVuUvPAwCq7Kc+q0K6qj0g+QDX0gZFLYz1fYVxOfBCuNthrhXpSuSBnho5ojaS1kNtvn4NBE+7erwM6cYUWfeond/fpiN2ylncM423uP6cBA6ejlzP/OCAIhCaeWjzwfLepQByojsWYYtqxkOWiOMxV8rD1x9OsheHFf61Z+eUliMXtpCI2zzbzVdz990ldEoefXxL0BGjUQb4SOXGgwT/9q0whfy46fje5Br1MMGLUTcoHwPtjHs09ev4ket5k3yeBCU7QImA+qCmLT/HMJDwHHmyYmRZ7DW2FhkIxfDaIjVwoZCdCQheHb1gDXuL2lGa6sQdgZvuGuCdqJZJ+yH+jW6zzo54QgHys+Q6TpmTnkcmQyShL9KBlGadUBY1SRzmpNngW4bHOiAsUu1LMtfc8Z94K9jBCt+S6UKalE391iximczBB9pFuUT3rYllvzJRAbd3SJrjS+PJzM7lrn9JDxm5+nEVy0wLHmdJwQWoF7kxXc3PxNcQ+fm55bhzT8=</diagram></mxfile>" style="background-color: rgb(255, 255, 255);"><defs/><g><rect x="0" y="0" width="1199" height="810" fill="#ffffff" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe flex-start; justify-content: unsafe flex-start; width: 1197px; height: 1px; padding-top: 7px; margin-left: 2px;"><div style="box-sizing: border-box; font-size: 0; text-align: left; "><div style="display: inline-block; font-size: 12px; font-family: Courier New; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; "><h1>Simple User request</h1></div></div></div></foreignObject><text x="2" y="19" fill="#000000" font-family="Courier New" font-size="12px">Simple User request</text></switch></g><path d="M 417.5 574 L 417.5 657.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 417.5 662.88 L 414 655.88 L 417.5 657.63 L 421 655.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 616px; margin-left: 296px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Give me sync worker (POP operation)</div></div></div></foreignObject><text x="296" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Give me sync worker (POP operation)</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 617px; margin-left: 418px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">3</div></div></div></foreignObject><text x="418" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">3</text></switch></g><path d="M 492.5 514 L 492.5 430.37" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 492.5 425.12 L 496 432.12 L 492.5 430.37 L 489 432.12 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 493px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">5</div></div></div></foreignObject><text x="493" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">5</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 465px; margin-left: 535px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Get worker</div></div></div></foreignObject><text x="535" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Get worker</text></switch></g><rect x="380" y="514" width="150" height="60" fill="#ffe6cc" stroke="#d79b00" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 544px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Workers Watcher</div></div></div></foreignObject><text x="455" y="548" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Workers Watcher</text></switch></g><path d="M 280 424 L 280 544 L 373.63 544" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 378.88 544 L 371.88 547.5 L 373.63 544 L 371.88 540.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 515px; margin-left: 202px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Allocate sync workers</div></div></div></foreignObject><text x="202" y="518" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Allocate sync workers</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 514px; margin-left: 280px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">1</div></div></div></foreignObject><text x="280" y="518" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">1</text></switch></g><rect x="230" y="384" width="100" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 404px; margin-left: 231px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Initialize</div></div></div></foreignObject><text x="280" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Initialize</text></switch></g><path d="M 417.5 424 L 417.5 507.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 417.5 512.88 L 414 505.88 L 417.5 507.63 L 421 505.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 352px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Give me SyncWorker</div></div></div></foreignObject><text x="352" y="467" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Give me SyncWorker</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 464px; margin-left: 418px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">2</div></div></div></foreignObject><text x="418" y="468" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">2</text></switch></g><path d="M 530 414 L 700.63 414" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 705.88 414 L 698.88 417.5 L 700.63 414 L 698.88 410.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 415px; margin-left: 618px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">6</div></div></div></foreignObject><text x="618" y="418" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">6</text></switch></g><path d="M 492.5 384 L 483 384 L 483 324 L 520 324 L 520 83 L 468.87 83" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 463.62 83 L 470.62 79.5 L 468.87 83 L 470.62 86.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 224px; margin-left: 521px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">10</div></div></div></foreignObject><text x="521" y="227" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">10</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 225px; margin-left: 597px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">Send response to the user</div></div></div></foreignObject><text x="597" y="228" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">Send response to the user</text></switch></g><rect x="380" y="384" width="150" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 404px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Exec/ExecWithContext</div></div></div></foreignObject><text x="455" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec/ExecWithContext</text></switch></g><path d="M 492.5 664 L 492.5 624 L 492.5 580.37" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 492.5 575.12 L 496 582.12 L 492.5 580.37 L 489 582.12 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 616px; margin-left: 494px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">4</div></div></div></foreignObject><text x="494" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">4</text></switch></g><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 617px; margin-left: 610px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">I have free workers, here you are</div></div></div></foreignObject><text x="610" y="620" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">I have free workers, here you are</text></switch></g><rect x="380" y="664" width="150" height="60" fill="#d5e8d4" stroke="#82b366" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 148px; height: 1px; padding-top: 694px; margin-left: 381px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Stack with workers</div></div></div></foreignObject><text x="455" y="698" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Stack with workers</text></switch></g><path d="M 707 394 L 536.37 394" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 531.12 394 L 538.12 390.5 L 536.37 394 L 538.12 397.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 394px; margin-left: 618px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">9</div></div></div></foreignObject><text x="618" y="397" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">9</text></switch></g><rect x="707" y="384" width="163" height="40" fill="#dae8fc" stroke="#6c8ebf" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 161px; height: 1px; padding-top: 404px; margin-left: 708px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Exec/ExecWithTimeout</div></div></div></foreignObject><text x="789" y="408" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec/ExecWithTimeout</text></switch></g><path d="M 450 289.5 L 460 289.5 L 460 364.5 L 470.5 364.5 L 455 383.5 L 439.5 364.5 L 450 364.5 Z" fill="none" stroke="#000000" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="all"/><ellipse cx="455" cy="84.5" rx="7.5" ry="7.5" fill="#ffffff" stroke="#000000" pointer-events="all"/><path d="M 455 92 L 455 117 M 455 97 L 440 97 M 455 97 L 470 97 M 455 117 L 440 137 M 455 117 L 470 137" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe flex-start; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 144px; margin-left: 455px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">User request</div></div></div></foreignObject><text x="455" y="156" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">User...</text></switch></g><rect x="607" y="426" width="198" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 435px; margin-left: 706px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Send request to the worker</div></div></div></foreignObject><text x="706" y="438" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Send request to the worker</text></switch></g><rect x="618" y="368" width="125" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 377px; margin-left: 681px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Receive response</div></div></div></foreignObject><text x="681" y="380" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Receive response</text></switch></g><ellipse cx="125" cy="404" rx="11" ry="11" fill="#000000" stroke="#ff0000" pointer-events="all"/><path d="M 140 404 L 227.76 404" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 219.88 408.5 L 228.88 404 L 219.88 399.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="all"/><rect x="45" y="371" width="161" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 380px; margin-left: 126px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Plugin Initialization</div></div></div></foreignObject><text x="126" y="383" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Plugin Initialization</text></switch></g><path d="M 870 414 L 983.63 414" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 988.88 414 L 981.88 417.5 L 983.63 414 L 981.88 410.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 413px; margin-left: 930px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">7</div></div></div></foreignObject><text x="930" y="416" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">7</text></switch></g><path d="M 990 394 L 876.37 394" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 871.12 394 L 878.12 390.5 L 876.37 394 L 878.12 397.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 394px; margin-left: 931px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 11px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; background-color: #ffffff; white-space: nowrap; ">8</div></div></div></foreignObject><text x="931" y="397" fill="#000000" font-family="Jetbrains Mono" font-size="11px" text-anchor="middle">8</text></switch></g><rect x="990" y="384" width="100" height="40" fill="#f5f5f5" stroke="#666666" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 404px; margin-left: 991px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #333333; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Worker</div></div></div></foreignObject><text x="1040" y="408" fill="#333333" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Worker</text></switch></g><rect x="893" y="426" width="96" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 435px; margin-left: 941px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Exec payload</div></div></div></foreignObject><text x="941" y="438" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Exec payload</text></switch></g><rect x="873" y="366" width="125" height="17" fill="none" stroke="none" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 375px; margin-left: 936px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: nowrap; ">Reveive response</div></div></div></foreignObject><text x="936" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Reveive response</text></switch></g><rect x="401" y="255" width="108" height="34" fill="#f8cecc" stroke="#b85450" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 106px; height: 1px; padding-top: 272px; margin-left: 402px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">HTTP plugin</div></div></div></foreignObject><text x="455" y="276" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">HTTP plugin</text></switch></g><path d="M 450 157.5 L 460 157.5 L 460 235.5 L 470.5 235.5 L 455 254.5 L 439.5 235.5 L 450 235.5 Z" fill="none" stroke="#000000" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="all"/><rect x="490" y="364" width="40" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 38px; height: 1px; padding-top: 374px; margin-left: 491px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Pool</div></div></div></foreignObject><text x="510" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Pool</text></switch></g><rect x="770" y="364" width="100" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 98px; height: 1px; padding-top: 374px; margin-left: 771px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">Golang Worker</div></div></div></foreignObject><text x="820" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">Golang Worker</text></switch></g><rect x="1006" y="364" width="84" height="20" fill="none" stroke="#000000" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject style="overflow: visible; text-align: left;" pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 82px; height: 1px; padding-top: 374px; margin-left: 1007px;"><div style="box-sizing: border-box; font-size: 0; text-align: center; "><div style="display: inline-block; font-size: 12px; font-family: Jetbrains Mono; color: #000000; line-height: 1.2; pointer-events: all; white-space: normal; word-wrap: normal; ">PHP worker</div></div></div></foreignObject><text x="1048" y="378" fill="#000000" font-family="Jetbrains Mono" font-size="12px" text-anchor="middle">PHP worker</text></switch></g></g><switch><g requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"/><a transform="translate(0,-5)" xlink:href="https://www.diagrams.net/doc/faq/svg-export-text-problems" target="_blank"><text text-anchor="middle" font-size="10px" x="50%" y="100%">Viewer does not support full SVG 1.1</text></a></switch></svg>
\ No newline at end of file diff --git a/pkg/events/general.go b/pkg/events/general.go new file mode 100755 index 00000000..a09a8759 --- /dev/null +++ b/pkg/events/general.go @@ -0,0 +1,39 @@ +package events + +import ( + "sync" +) + +// HandlerImpl helps to broadcast events to multiple listeners. +type HandlerImpl struct { + listeners []Listener + sync.RWMutex // all receivers should be pointers +} + +func NewEventsHandler() Handler { + return &HandlerImpl{listeners: make([]Listener, 0, 2)} +} + +// NumListeners returns number of event listeners. +func (eb *HandlerImpl) NumListeners() int { + eb.Lock() + defer eb.Unlock() + return len(eb.listeners) +} + +// AddListener registers new event listener. +func (eb *HandlerImpl) AddListener(listener Listener) { + eb.Lock() + defer eb.Unlock() + eb.listeners = append(eb.listeners, listener) +} + +// Push broadcast events across all event listeners. +func (eb *HandlerImpl) Push(e interface{}) { + // ReadLock here because we are not changing listeners + eb.RLock() + defer eb.RUnlock() + for k := range eb.listeners { + eb.listeners[k](e) + } +} diff --git a/pkg/events/interface.go b/pkg/events/interface.go new file mode 100644 index 00000000..ac6c15a4 --- /dev/null +++ b/pkg/events/interface.go @@ -0,0 +1,14 @@ +package events + +// Handler interface +type Handler interface { + // Return number of active listeners + NumListeners() int + // AddListener adds lister to the publisher + AddListener(listener Listener) + // Push pushes event to the listeners + Push(e interface{}) +} + +// Event listener listens for the events produced by worker, worker pool or other service. +type Listener func(event interface{}) diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go new file mode 100644 index 00000000..3925df56 --- /dev/null +++ b/pkg/events/pool_events.go @@ -0,0 +1,66 @@ +package events + +// TODO event numbers +const ( + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct P = iota + 10000 + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventPoolError caused on pool wide errors. + EventPoolError + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL +) + +type P int64 + +func (ev P) String() string { + switch ev { + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventPoolError: + return "EventPoolError" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + } + return "Unknown event type" +} + +// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. +type PoolEvent struct { + // Event type, see below. + Event P + + // Payload depends on event type, typically it's worker or error. + Payload interface{} +} diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go new file mode 100644 index 00000000..9d428f7d --- /dev/null +++ b/pkg/events/worker_events.go @@ -0,0 +1,33 @@ +package events + +const ( + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError W = iota + 11000 + + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog +) + +type W int64 + +func (ev W) String() string { + switch ev { + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + } + return "Unknown event type" +} + +// WorkerEvent wraps worker events. +type WorkerEvent struct { + // Event id, see below. + Event W + + // Worker triggered the event. + Worker interface{} + + // Event specific payload. + Payload interface{} +} diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go new file mode 100755 index 00000000..fac36852 --- /dev/null +++ b/pkg/payload/payload.go @@ -0,0 +1,16 @@ +package payload + +// Payload carries binary header and body to stack and +// back to the server. +type Payload struct { + // Context represent payload context, might be omitted. + Context []byte + + // body contains binary payload to be processed by WorkerProcess. + Body []byte +} + +// String returns payload body as string +func (p *Payload) String() string { + return string(p.Body) +} diff --git a/pkg/pool/config.go b/pkg/pool/config.go new file mode 100644 index 00000000..782f7ce9 --- /dev/null +++ b/pkg/pool/config.go @@ -0,0 +1,75 @@ +package pool + +import ( + "runtime" + "time" +) + +// Configures the pool behaviour. +type Config struct { + // Debug flag creates new fresh worker before every request. + Debug bool + + // NumWorkers defines how many sub-processes can be run at once. This value + // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. + NumWorkers uint64 `mapstructure:"num_workers"` + + // MaxJobs defines how many executions is allowed for the worker until + // it's destruction. set 1 to create new process for each new task, 0 to let + // worker handle as many tasks as it can. + MaxJobs uint64 `mapstructure:"max_jobs"` + + // AllocateTimeout defines for how long pool will be waiting for a worker to + // be freed to handle the task. Defaults to 60s. + AllocateTimeout time.Duration `mapstructure:"allocate_timeout"` + + // DestroyTimeout defines for how long pool should be waiting for worker to + // properly destroy, if timeout reached worker will be killed. Defaults to 60s. + DestroyTimeout time.Duration `mapstructure:"destroy_timeout"` + + // Supervision config to limit worker and pool memory usage. + Supervisor *SupervisorConfig `mapstructure:"supervisor"` +} + +// InitDefaults enables default config values. +func (cfg *Config) InitDefaults() { + if cfg.NumWorkers == 0 { + cfg.NumWorkers = uint64(runtime.NumCPU()) + } + + if cfg.AllocateTimeout == 0 { + cfg.AllocateTimeout = time.Minute + } + + if cfg.DestroyTimeout == 0 { + cfg.DestroyTimeout = time.Minute + } + if cfg.Supervisor == nil { + return + } + cfg.Supervisor.InitDefaults() +} + +type SupervisorConfig struct { + // WatchTick defines how often to check the state of worker. + WatchTick time.Duration `mapstructure:"watch_tick"` + + // TTL defines maximum time worker is allowed to live. + TTL time.Duration `mapstructure:"ttl"` + + // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. + IdleTTL time.Duration `mapstructure:"idle_ttl"` + + // ExecTTL defines maximum lifetime per job. + ExecTTL time.Duration `mapstructure:"exec_ttl"` + + // MaxWorkerMemory limits memory per worker. + MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"` +} + +// InitDefaults enables default config values. +func (cfg *SupervisorConfig) InitDefaults() { + if cfg.WatchTick == 0 { + cfg.WatchTick = time.Second + } +} diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go new file mode 100644 index 00000000..4f7ae595 --- /dev/null +++ b/pkg/pool/interface.go @@ -0,0 +1,29 @@ +package pool + +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +// Pool managed set of inner worker processes. +type Pool interface { + // GetConfig returns pool configuration. + GetConfig() interface{} + + // Exec executes task with payload + Exec(rqs payload.Payload) (payload.Payload, error) + + // ExecWithContext executes task with context which is used with timeout + ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) + + // Workers returns worker list associated with the pool. + Workers() (workers []worker.SyncWorker) + + // Remove worker from the pool. + RemoveWorker(worker worker.SyncWorker) error + + // Destroy all underlying stack (but let them to complete the task). + Destroy(ctx context.Context) +} diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go new file mode 100755 index 00000000..44adf9c0 --- /dev/null +++ b/pkg/pool/static_pool.go @@ -0,0 +1,327 @@ +package pool + +import ( + "context" + "os/exec" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/transport" + "github.com/spiral/roadrunner/v2/pkg/worker" + workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" +) + +// StopRequest can be sent by worker to indicate that restart is required. +const StopRequest = "{\"stop\":true}" + +// ErrorEncoder encode error or make a decision based on the error type +type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error) + +type Options func(p *StaticPool) + +type Command func() *exec.Cmd + +// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. +type StaticPool struct { + cfg Config + + // worker command creator + cmd Command + + // creates and connects to stack + factory transport.Factory + + // distributes the events + events events.Handler + + // saved list of event listeners + listeners []events.Listener + + // manages worker states and TTLs + ww workerWatcher.Watcher + + // allocate new worker + allocator worker.Allocator + + // errEncoder is the default Exec error encoder + errEncoder ErrorEncoder +} + +// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. +func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) { + const op = errors.Op("static_pool_initialize") + if factory == nil { + return nil, errors.E(op, errors.Str("no factory initialized")) + } + cfg.InitDefaults() + + if cfg.Debug { + cfg.NumWorkers = 0 + cfg.MaxJobs = 1 + } + + p := &StaticPool{ + cfg: cfg, + cmd: cmd, + factory: factory, + events: events.NewEventsHandler(), + } + + // add pool options + for i := 0; i < len(options); i++ { + options[i](p) + } + + p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + + workers, err := p.allocateWorkers(p.cfg.NumWorkers) + if err != nil { + return nil, errors.E(op, err) + } + + // put stack in the pool + err = p.ww.AddToWatch(workers) + if err != nil { + return nil, errors.E(op, err) + } + + p.errEncoder = defaultErrEncoder(p) + + // if supervised config not nil, guess, that pool wanted to be supervised + if cfg.Supervisor != nil { + sp := supervisorWrapper(p, p.events, p.cfg.Supervisor) + // start watcher timer + sp.Start() + return sp, nil + } + + return p, nil +} + +func AddListeners(listeners ...events.Listener) Options { + return func(p *StaticPool) { + p.listeners = listeners + for i := 0; i < len(listeners); i++ { + p.addListener(listeners[i]) + } + } +} + +// AddListener connects event listener to the pool. +func (sp *StaticPool) addListener(listener events.Listener) { + sp.events.AddListener(listener) +} + +// Config returns associated pool configuration. Immutable. +func (sp *StaticPool) GetConfig() interface{} { + return sp.cfg +} + +// Workers returns worker list associated with the pool. +func (sp *StaticPool) Workers() (workers []worker.SyncWorker) { + return sp.ww.WorkersList() +} + +func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error { + return sp.ww.RemoveWorker(wb) +} + +// Be careful, sync Exec with ExecWithContext +func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { + const op = errors.Op("static_pool_exec") + if sp.cfg.Debug { + return sp.execDebug(p) + } + ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + + rsp, err := w.Exec(p) + if err != nil { + return sp.errEncoder(err, w) + } + + // worker want's to be terminated + // TODO careful with string(rsp.Context) + if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { + sp.stopWorker(w) + + return sp.Exec(p) + } + + err = sp.checkMaxJobs(w) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + + return rsp, nil +} + +// Be careful, sync with pool.Exec method +func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) { + const op = errors.Op("static_pool_exec_with_context") + ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + + rsp, err := w.ExecWithTimeout(ctx, p) + if err != nil { + return sp.errEncoder(err, w) + } + + // worker want's to be terminated + if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { + sp.stopWorker(w) + return sp.ExecWithContext(ctx, p) + } + + err = sp.checkMaxJobs(w) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + + return rsp, nil +} + +func (sp *StaticPool) stopWorker(w worker.SyncWorker) { + const op = errors.Op("static_pool_stop_worker") + w.State().Set(internal.StateInvalid) + err := w.Stop() + if err != nil { + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + } +} + +// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs +func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error { + const op = errors.Op("static_pool_check_max_jobs") + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err := sp.ww.AllocateNew() + if err != nil { + return errors.E(op, err) + } + } else { + sp.ww.PushWorker(w) + } + return nil +} + +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) { + // GetFreeWorker function consumes context with timeout + w, err := sp.ww.GetFreeWorker(ctxGetFree) + if err != nil { + // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout + if errors.Is(errors.NoFreeWorkers, err) { + sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)}) + return nil, errors.E(op, err) + } + // else if err not nil - return error + return nil, errors.E(op, err) + } + return w, nil +} + +// Destroy all underlying stack (but let them to complete the task). +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.ww.Destroy(ctx) +} + +func defaultErrEncoder(sp *StaticPool) ErrorEncoder { + return func(err error, w worker.SyncWorker) (payload.Payload, error) { + const op = errors.Op("error encoder") + // just push event if on any stage was timeout error + if errors.Is(errors.ExecTTL, err) { + sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)}) + } + // soft job errors are allowed + if errors.Is(errors.SoftJob, err) { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew() + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) + } + + w.State().Set(internal.StateInvalid) + err = w.Stop() + if err != nil { + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + } + } else { + sp.ww.PushWorker(w) + } + + return payload.Payload{}, errors.E(op, err) + } + + w.State().Set(internal.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + errS := w.Stop() + + if errS != nil { + return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) + } + + return payload.Payload{}, errors.E(op, err) + } +} + +func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { + return func() (*worker.SyncWorkerImpl, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...) + if err != nil { + return nil, err + } + + sw := worker.From(w) + + sp.events.Push(events.PoolEvent{ + Event: events.EventWorkerConstruct, + Payload: sw, + }) + return sw, nil + } +} + +func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { + sw, err := sp.allocator() + if err != nil { + return payload.Payload{}, err + } + + r, err := sw.Exec(p) + + if stopErr := sw.Stop(); stopErr != nil { + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + } + + return r, err +} + +// allocate required number of stack +func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) { + const op = errors.Op("allocate workers") + var workers []worker.SyncWorker + + // constant number of stack simplify logic + for i := uint64(0); i < numWorkers; i++ { + w, err := sp.allocator() + if err != nil { + return nil, errors.E(op, errors.WorkerAllocate, err) + } + + workers = append(workers, w) + } + return workers, nil +} diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go new file mode 100755 index 00000000..a32790e0 --- /dev/null +++ b/pkg/pool/static_pool_test.go @@ -0,0 +1,647 @@ +package pool + +import ( + "context" + "log" + "os/exec" + "runtime" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + "github.com/stretchr/testify/assert" +) + +var cfg = Config{ + NumWorkers: uint64(runtime.NumCPU()), + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, +} + +func Test_NewPool(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) +} + +func Test_StaticPool_Invalid(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") }, + pipe.NewPipeFactory(), + cfg, + ) + + assert.Nil(t, p) + assert.Error(t, err) +} + +func Test_ConfigNoErrorInitDefaults(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) +} + +func Test_StaticPool_Echo(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_StaticPool_Echo_NilContext(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_StaticPool_Echo_Context(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.Empty(t, res.Body) + assert.NotNil(t, res.Context) + + assert.Equal(t, "world", string(res.Context)) +} + +func Test_StaticPool_JobError(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.Exec") + } + + assert.Contains(t, err.Error(), "hello") +} + +func Test_StaticPool_Broken_Replace(t *testing.T) { + ctx := context.Background() + block := make(chan struct{}, 1) + + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + if wev.Event == events.EventWorkerLog { + e := string(wev.Payload.([]byte)) + if strings.ContainsAny(e, "undefined_function()") { + block <- struct{}{} + return + } + } + } + } + + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") }, + pipe.NewPipeFactory(), + cfg, + AddListeners(listener), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + time.Sleep(time.Second) + res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + + <-block + + p.Destroy(ctx) +} + +func Test_StaticPool_Broken_FromOutside(t *testing.T) { + ctx := context.Background() + // Consume pool events + ev := make(chan struct{}, 1) + listener := func(event interface{}) { + if pe, ok := event.(events.PoolEvent); ok { + if pe.Event == events.EventWorkerConstruct { + ev <- struct{}{} + } + } + } + + var cfg = Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, + } + + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + AddListeners(listener), + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) + assert.Equal(t, 1, len(p.Workers())) + + // first creation + <-ev + // killing random worker and expecting pool to replace it + err = p.Workers()[0].Kill() + if err != nil { + t.Errorf("error killing the process: error %v", err) + } + + // re-creation + <-ev + + list := p.Workers() + for _, w := range list { + assert.Equal(t, internal.StateReady, w.State().Value()) + } +} + +func Test_StaticPool_AllocateTimeout(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Nanosecond * 1, + DestroyTimeout: time.Second * 2, + }, + ) + assert.Error(t, err) + if !errors.Is(errors.WorkerAllocate, err) { + t.Fatal("error should be of type WorkerAllocate") + } + assert.Nil(t, p) +} + +func Test_StaticPool_Replace_Worker(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + MaxJobs: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + var lastPID string + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) + + res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +func Test_StaticPool_Debug_Worker(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(), + Config{ + Debug: true, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + assert.Len(t, p.Workers(), 0) + + var lastPID string + res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + assert.NotEqual(t, lastPID, string(res.Body)) + + assert.Len(t, p.Workers(), 0) + + for i := 0; i < 10; i++ { + assert.Len(t, p.Workers(), 0) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +// identical to replace but controlled on worker side +func Test_StaticPool_Stop_Worker(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + var lastPID string + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) + + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Destroy_And_Close(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) + + p.Destroy(ctx) + _, err = p.Exec(payload.Payload{Body: []byte("100")}) + assert.Error(t, err) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) + + go func() { + _, err := p.Exec(payload.Payload{Body: []byte("100")}) + if err != nil { + t.Errorf("error executing payload: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + p.Destroy(ctx) + _, err = p.Exec(payload.Payload{Body: []byte("100")}) + assert.Error(t, err) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Handle_Dead(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + for i := range p.Workers() { + p.Workers()[i].State().Set(internal.StateErrored) + } + + _, err = p.Exec(payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + p.Destroy(ctx) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Slow_Destroy(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + p.Destroy(context.Background()) +} + +func Test_StaticPool_NoFreeWorkers(t *testing.T) { + ctx := context.Background() + block := make(chan struct{}, 1) + + listener := func(event interface{}) { + if ev, ok := event.(events.PoolEvent); ok { + if ev.Event == events.EventNoFreeWorkers { + block <- struct{}{} + } + } + } + + p, err := Initialize( + ctx, + // sleep for the 3 seconds + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + Config{ + Debug: false, + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: nil, + }, + AddListeners(listener), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + go func() { + _, _ = p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + }() + + time.Sleep(time.Second) + res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + + <-block + + p.Destroy(ctx) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_WrongCommand1(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.Error(t, err) + assert.Nil(t, p) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_WrongCommand2(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.Error(t, err) + assert.Nil(t, p) +} + +func Benchmark_Pool_Echo(b *testing.B) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.ReportAllocs() + for n := 0; n < b.N; n++ { + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +// +func Benchmark_Pool_Echo_Batched(b *testing.B) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: uint64(runtime.NumCPU()), + AllocateTimeout: time.Second * 100, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(b, err) + defer p.Destroy(ctx) + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + log.Println(err) + } + }() + } + + wg.Wait() +} + +// +func Benchmark_Pool_Echo_Replaced(b *testing.B) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + MaxJobs: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(b, err) + defer p.Destroy(ctx) + b.ResetTimer() + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + log.Println(err) + } + } +} diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go new file mode 100755 index 00000000..2597b352 --- /dev/null +++ b/pkg/pool/supervisor_pool.go @@ -0,0 +1,222 @@ +package pool + +import ( + "context" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/tools" +) + +const MB = 1024 * 1024 + +// NSEC_IN_SEC nanoseconds in second +const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck + +type Supervised interface { + Pool + // Start used to start watching process for all pool workers + Start() +} + +type supervised struct { + cfg *SupervisorConfig + events events.Handler + pool Pool + stopCh chan struct{} + mu *sync.RWMutex +} + +func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { + sp := &supervised{ + cfg: cfg, + events: events, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), + } + + return sp +} + +type ttlExec struct { + err error + p payload.Payload +} + +func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { + const op = errors.Op("supervised_exec_with_context") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + c := make(chan ttlExec, 1) + ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) + defer cancel() + go func() { + res, err := sp.pool.ExecWithContext(ctx, rqs) + if err != nil { + c <- ttlExec{ + err: errors.E(op, err), + p: payload.Payload{}, + } + } + + c <- ttlExec{ + err: nil, + p: res, + } + }() + + for { + select { + case <-ctx.Done(): + return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) + case res := <-c: + if res.err != nil { + return payload.Payload{}, res.err + } + + return res.p, nil + } + } +} + +func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) { + const op = errors.Op("supervised_exec") + rsp, err := sp.pool.Exec(p) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + return rsp, nil +} + +func (sp *supervised) GetConfig() interface{} { + return sp.pool.GetConfig() +} + +func (sp *supervised) Workers() (workers []worker.SyncWorker) { + sp.mu.Lock() + defer sp.mu.Unlock() + return sp.pool.Workers() +} + +func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error { + return sp.pool.RemoveWorker(worker) +} + +func (sp *supervised) Destroy(ctx context.Context) { + sp.pool.Destroy(ctx) +} + +func (sp *supervised) Start() { + go func() { + watchTout := time.NewTicker(sp.cfg.WatchTick) + for { + select { + case <-sp.stopCh: + watchTout.Stop() + return + // stop here + case <-watchTout.C: + sp.mu.Lock() + sp.control() + sp.mu.Unlock() + } + } + }() +} + +func (sp *supervised) Stop() { + sp.stopCh <- struct{}{} +} + +func (sp *supervised) control() { + now := time.Now() + const op = errors.Op("supervised_pool_control_tick") + + // THIS IS A COPY OF WORKERS + workers := sp.pool.Workers() + + for i := 0; i < len(workers); i++ { + if workers[i].State().Value() == internal.StateInvalid { + continue + } + + s, err := tools.WorkerProcessState(workers[i]) + if err != nil { + // worker not longer valid for supervision + continue + } + + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + continue + } + + if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + continue + } + + // firs we check maxWorker idle + if sp.cfg.IdleTTL != 0 { + // then check for the worker state + if workers[i].State().Value() != internal.StateReady { + continue + } + + /* + Calculate idle time + If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 + 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that worker overlap idle time and has to be killed + */ + + // 1610530005534416045 lu + // lu - now = -7811150814 - nanoseconds + // 7.8 seconds + // get last used unix nano + lu := workers[i].State().LastUsed() + // worker not used, skip + if lu == 0 { + continue + } + + // convert last used to unixNano and sub time.now to seconds + // negative number, because lu always in the past, except for the `back to the future` :) + res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1 + + // maxWorkerIdle more than diff between now and last used + // for example: + // After exec worker goes to the rest + // And resting for the 5 seconds + // IdleTTL is 1 second. + // After the control check, res will be 5, idle is 1 + // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. + if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + } + } + } +} diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go new file mode 100644 index 00000000..c67d5d91 --- /dev/null +++ b/pkg/pool/supervisor_test.go @@ -0,0 +1,248 @@ +package pool + +import ( + "context" + "os/exec" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + "github.com/spiral/roadrunner/v2/tools" + "github.com/stretchr/testify/assert" +) + +var cfgSupervised = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 100 * time.Second, + MaxWorkerMemory: 100, + }, +} + +func TestSupervisedPool_Exec(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(), + cfgSupervised, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + stopCh := make(chan struct{}) + defer p.Destroy(context.Background()) + + go func() { + for { + select { + case <-stopCh: + return + default: + workers := p.Workers() + if len(workers) > 0 { + s, err := tools.WorkerProcessState(workers[0]) + assert.NoError(t, err) + assert.NotNil(t, s) + // since this is soft limit, double max memory limit watch + if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 { + assert.Fail(t, "max memory reached") + } + } + } + } + }() + + for i := 0; i < 100; i++ { + time.Sleep(time.Millisecond * 50) + _, err = p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) + } + + stopCh <- struct{}{} +} + +func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 1 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + resp, err := p.ExecWithContext(context.Background(), payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Error(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 1) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) +} + +func TestSupervisedPool_Idle(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 1 * time.Second, + ExecTTL: 100 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + pid := p.Workers()[0].Pid() + + resp, err := p.ExecWithContext(context.Background(), payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Nil(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 5) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + p.Destroy(context.Background()) +} + +func TestSupervisedPool_ExecTTL_OK(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + time.Sleep(time.Millisecond * 100) + resp, err := p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 1) + // should be the same pid + assert.Equal(t, pid, p.Workers()[0].Pid()) +} + +func TestSupervisedPool_MaxMemoryReached(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, + MaxWorkerMemory: 1, + }, + } + + block := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.PoolEvent); ok { + if ev.Event == events.EventMaxMemory { + block <- struct{}{} + } + } + } + + // constructed + // max memory + // constructed + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + AddListeners(listener), + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + resp, err := p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + <-block + p.Destroy(context.Background()) +} diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go new file mode 100644 index 00000000..299ac95f --- /dev/null +++ b/pkg/transport/interface.go @@ -0,0 +1,21 @@ +package transport + +import ( + "context" + "os/exec" + + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +// Factory is responsible of wrapping given command into tasks WorkerProcess. +type Factory interface { + // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. + // Process must not be started. + SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error) + // SpawnWorker creates new WorkerProcess process based on given command. + // Process must not be started. + SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error) + // Close the factory and underlying connections. + Close() error +} diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go new file mode 100755 index 00000000..dd7c5841 --- /dev/null +++ b/pkg/transport/pipe/pipe_factory.go @@ -0,0 +1,162 @@ +package pipe + +import ( + "context" + "os/exec" + + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/pkg/pipe" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" + "go.uber.org/multierr" +) + +// Factory connects to stack using standard +// streams (STDIN, STDOUT pipes). +type Factory struct{} + +// NewPipeFactory returns new factory instance and starts +// listening +func NewPipeFactory() *Factory { + return &Factory{} +} + +type SpawnResult struct { + w *worker.Process + err error +} + +// SpawnWorker creates new Process and connects it to goridge relay, +// method Wait() must be handled on level above. +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + c := make(chan SpawnResult) + const op = errors.Op("factory_spawn_worker_with_timeout") + go func() { + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // TODO why out is in? + in, err := cmd.StdoutPipe() + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // TODO why in is out? + out, err := cmd.StdinPipe() + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // errors bundle + pid, err := internal.FetchPID(relay) + if pid != w.Pid() || err != nil { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // everything ok, set ready state + w.State().Set(internal.StateReady) + + // return worker + c <- SpawnResult{ + w: w, + err: nil, + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-c: + if res.err != nil { + return nil, res.err + } + return res.w, nil + } +} + +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + const op = errors.Op("factory_spawn_worker") + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + return nil, errors.E(op, err) + } + + // TODO why out is in? + in, err := cmd.StdoutPipe() + if err != nil { + return nil, errors.E(op, err) + } + + // TODO why in is out? + out, err := cmd.StdinPipe() + if err != nil { + return nil, errors.E(op, err) + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + return nil, errors.E(op, err) + } + + // errors bundle + if pid, err := internal.FetchPID(relay); pid != w.Pid() { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, errors.E(op, err) + } + + // everything ok, set ready state + w.State().Set(internal.StateReady) + return w, nil +} + +// Close the factory. +func (f *Factory) Close() error { + return nil +} diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go new file mode 100644 index 00000000..d4949c82 --- /dev/null +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -0,0 +1,456 @@ +package pipe + +import ( + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/stretchr/testify/assert" +) + +func Test_GetState2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + assert.Equal(t, internal.StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, internal.StateReady, w.State().Value()) + assert.NoError(t, w.Stop()) +} + +func Test_Kill2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.Error(t, w.Wait()) + assert.Equal(t, internal.StateErrored, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, internal.StateReady, w.State().Value()) + err = w.Kill() + if err != nil { + t.Errorf("error killing the Process: error %v", err) + } + wg.Wait() +} + +func Test_Pipe_Start2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + assert.NoError(t, w.Stop()) +} + +func Test_Pipe_StartError2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError3(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError4(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Failboot2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/failboot.php") + w, err := NewPipeFactory().SpawnWorker(cmd) + + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failboot") +} + +func Test_Pipe_Invalid2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/invalid.php") + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Broken2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) +} + +func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { + f := NewPipeFactory() + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorker(cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + sw := worker.From(w) + + b.ReportAllocs() + b.ResetTimer() + go func() { + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + }() + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Test_Echo2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(payload.Payload{}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Contains(t, err.Error(), "payload can not be empty") +} + +func Test_String2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_Echo_Slow2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + data := "" + mu := &sync.Mutex{} + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + mu.Lock() + data = string(wev.Payload.([]byte)) + mu.Unlock() + } + } + + w, err := NewPipeFactory().SpawnWorker(cmd, listener) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + time.Sleep(time.Second * 3) + mu.Lock() + if strings.ContainsAny(data, "undefined_function()") == false { + t.Fail() + } + mu.Unlock() + assert.Error(t, w.Stop()) +} + +func Test_Error2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") + } + assert.Contains(t, err.Error(), "hello") +} + +func Test_NumExecs2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(1), w.State().NumExecs()) + + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(2), w.State().NumExecs()) + + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(3), w.State().NumExecs()) +} diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go new file mode 100755 index 00000000..38166b85 --- /dev/null +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -0,0 +1,478 @@ +package pipe + +import ( + "context" + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/stretchr/testify/assert" +) + +func Test_GetState(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + assert.Equal(t, internal.StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, internal.StateReady, w.State().Value()) + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Kill(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.Error(t, w.Wait()) + assert.Equal(t, internal.StateErrored, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, internal.StateReady, w.State().Value()) + err = w.Kill() + if err != nil { + t.Errorf("error killing the Process: error %v", err) + } + wg.Wait() +} + +func Test_Pipe_Start(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + assert.NoError(t, w.Stop()) +} + +func Test_Pipe_StartError(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Failboot(t *testing.T) { + cmd := exec.Command("php", "../../../tests/failboot.php") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failboot") +} + +func Test_Pipe_Invalid(t *testing.T) { + cmd := exec.Command("php", "../../../tests/invalid.php") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Broken(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) +} + +func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { + f := NewPipeFactory() + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) + sw := worker.From(w) + + b.ReportAllocs() + b.ResetTimer() + go func() { + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + }() + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Test_Echo(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(payload.Payload{}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Contains(t, err.Error(), "payload can not be empty") +} + +func Test_String(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_Echo_Slow(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + data := "" + mu := &sync.Mutex{} + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + mu.Lock() + data = string(wev.Payload.([]byte)) + mu.Unlock() + } + } + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + time.Sleep(time.Second * 3) + mu.Lock() + if strings.ContainsAny(data, "undefined_function()") == false { + t.Fail() + } + mu.Unlock() + assert.Error(t, w.Stop()) +} + +func Test_Error(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") + } + assert.Contains(t, err.Error(), "hello") +} + +func Test_NumExecs(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(1), w.State().NumExecs()) + + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(2), w.State().NumExecs()) + + _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(3), w.State().NumExecs()) +} diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go new file mode 100755 index 00000000..ccd2b0bf --- /dev/null +++ b/pkg/transport/socket/socket_factory.go @@ -0,0 +1,228 @@ +package socket + +import ( + "context" + "net" + "os/exec" + "sync" + "time" + + "github.com/shirou/gopsutil/process" + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/socket" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" + + "go.uber.org/multierr" + "golang.org/x/sync/errgroup" +) + +// Factory connects to external stack using socket server. +type Factory struct { + // listens for incoming connections from underlying processes + ls net.Listener + + // relay connection timeout + tout time.Duration + + // sockets which are waiting for process association + relays sync.Map + + ErrCh chan error +} + +// todo: review + +// NewSocketServer returns Factory attached to a given socket listener. +// tout specifies for how long factory should serve for incoming relay connection +func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { + f := &Factory{ + ls: ls, + tout: tout, + relays: sync.Map{}, + ErrCh: make(chan error, 10), + } + + // Be careful + // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go + // https://github.com/golang/go/issues/5045 + go func() { + f.ErrCh <- f.listen() + }() + + return f +} + +// blocking operation, returns an error +func (f *Factory) listen() error { + errGr := &errgroup.Group{} + errGr.Go(func() error { + for { + conn, err := f.ls.Accept() + if err != nil { + return err + } + + rl := socket.NewSocketRelay(conn) + pid, err := internal.FetchPID(rl) + if err != nil { + return err + } + + f.attachRelayToPid(pid, rl) + } + }) + + return errGr.Wait() +} + +type socketSpawn struct { + w *worker.Process + err error +} + +// SpawnWorker creates Process and connects it to appropriate relay or returns error +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + const op = errors.Op("factory_spawn_worker_with_timeout") + c := make(chan socketSpawn) + go func() { + ctx, cancel := context.WithTimeout(ctx, f.tout) + defer cancel() + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + c <- socketSpawn{ + w: nil, + err: err, + } + return + } + + err = w.Start() + if err != nil { + c <- socketSpawn{ + w: nil, + err: errors.E(op, err), + } + return + } + + rl, err := f.findRelayWithContext(ctx, w) + if err != nil { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + + c <- socketSpawn{ + w: nil, + err: errors.E(op, err), + } + return + } + + w.AttachRelay(rl) + w.State().Set(internal.StateReady) + + c <- socketSpawn{ + w: w, + err: nil, + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-c: + if res.err != nil { + return nil, res.err + } + + return res.w, nil + } +} + +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + const op = errors.Op("factory_spawn_worker") + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + return nil, err + } + + err = w.Start() + if err != nil { + return nil, errors.E(op, err) + } + + rl, err := f.findRelay(w) + if err != nil { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, err + } + + w.AttachRelay(rl) + w.State().Set(internal.StateReady) + + return w, nil +} + +// Close socket factory and underlying socket connection. +func (f *Factory) Close() error { + return f.ls.Close() +} + +// waits for Process to connect over socket and returns associated relay of timeout +func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { + ticker := time.NewTicker(time.Millisecond * 100) + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + _, err := process.NewProcess(int32(w.Pid())) + if err != nil { + return nil, err + } + default: + tmp, ok := f.relays.Load(w.Pid()) + if !ok { + continue + } + return tmp.(*socket.Relay), nil + } + } +} + +func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { + const op = errors.Op("factory_find_relay") + // poll every 1ms for the relay + pollDone := time.NewTimer(f.tout) + for { + select { + case <-pollDone.C: + return nil, errors.E(op, errors.Str("relay timeout")) + default: + tmp, ok := f.relays.Load(w.Pid()) + if !ok { + continue + } + return tmp.(*socket.Relay), nil + } + } +} + +// chan to store relay associated with specific pid +func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { + f.relays.Store(pid, relay) +} + +// deletes relay chan associated with specific pid +func (f *Factory) removeRelayFromPid(pid int64) { + f.relays.Delete(pid) +} diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go new file mode 100644 index 00000000..0e29e7d2 --- /dev/null +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -0,0 +1,489 @@ +package socket + +import ( + "net" + "os/exec" + "sync" + "syscall" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/stretchr/testify/assert" +) + +func Test_Tcp_Start2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartCloseFactory2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + f := NewSocketServer(ls, time.Minute) + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + w, err := f.SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartError2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err = cmd.Start() + if err != nil { + t.Errorf("error executing the command: error %v", err) + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Failboot2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err3 := ls.Close() + if err3 != nil { + t.Errorf("error closing the listener: error %v", err3) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/failboot.php") + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err2) + assert.Contains(t, err2.Error(), "failboot") +} + +func Test_Tcp_Invalid2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Broken2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + err := w.Wait() + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() + + defer func() { + time.Sleep(time.Second) + err2 := w.Stop() + // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection + assert.Error(t, err2) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + wg.Wait() +} + +func Test_Tcp_Echo2(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Unix_Start2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Unix_Failboot2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/failboot.php") + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failboot") +} + +func Test_Unix_Timeout2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "relay timeout") +} + +func Test_Unix_Invalid2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Unix_Broken2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + err := w.Wait() + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() + + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + wg.Wait() +} + +func Test_Unix_Echo2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err := ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(b, err) + defer func() { + err := ls.Close() + assert.NoError(b, err) + }() + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := f.SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + go func() { + assert.NoError(b, w.Wait()) + }() + + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(b, err) + defer func() { + err := ls.Close() + assert.NoError(b, err) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { + defer func() { + _ = syscall.Unlink("sock.unix") + }() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := f.SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { + defer func() { + _ = syscall.Unlink("sock.unix") + }() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go new file mode 100755 index 00000000..f55fc3dd --- /dev/null +++ b/pkg/transport/socket/socket_factory_test.go @@ -0,0 +1,572 @@ +package socket + +import ( + "context" + "net" + "os/exec" + "sync" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/stretchr/testify/assert" +) + +func Test_Tcp_Start(t *testing.T) { + ctx := context.Background() + time.Sleep(time.Millisecond * 10) // to ensure free socket + + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartCloseFactory(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + f := NewSocketServer(ls, time.Minute) + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartError(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err = cmd.Start() + if err != nil { + t.Errorf("error executing the command: error %v", err) + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Failboot(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err3 := ls.Close() + if err3 != nil { + t.Errorf("error closing the listener: error %v", err3) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/failboot.php") + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err2) + assert.Contains(t, err2.Error(), "failboot") +} + +func Test_Tcp_Timeout(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "tcp", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "context deadline exceeded") +} + +func Test_Tcp_Invalid(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Broken(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + err := w.Wait() + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() + + defer func() { + time.Sleep(time.Second) + err2 := w.Stop() + // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection + assert.Error(t, err2) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + wg.Wait() +} + +func Test_Tcp_Echo(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Unix_Start(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Unix_Failboot(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + ctx := context.Background() + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/failboot.php") + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failboot") +} + +func Test_Unix_Timeout(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + ctx := context.Background() + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "context deadline exceeded") +} + +func Test_Unix_Invalid(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Unix_Broken(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + err := w.Wait() + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() + + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + wg.Wait() +} + +func Test_Unix_Echo(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + go func() { + assert.NoError(b, w.Wait()) + }() + + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("tcp", "localhost:9007") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go new file mode 100644 index 00000000..9d74ae10 --- /dev/null +++ b/pkg/worker/interface.go @@ -0,0 +1,56 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" +) + +type BaseProcess interface { + fmt.Stringer + + // Pid returns worker pid. + Pid() int64 + + // Created returns time worker was created at. + Created() time.Time + + // State return receive-only WorkerProcess state object, state can be used to safely access + // WorkerProcess status, time when status changed and number of WorkerProcess executions. + State() internal.State + + // Start used to run Cmd and immediately return + Start() error + + // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is + // complete and will return process error (if any), if stderr is presented it's value + // will be wrapped as WorkerError. Method will return error code if php process fails + // to find or Start the script. + Wait() error + + // Stop sends soft termination command to the WorkerProcess and waits for process completion. + Stop() error + + // Kill kills underlying process, make sure to call Wait() func to gather + // error log from the stderr. Does not waits for process completion! + Kill() error + + // Relay returns attached to worker goridge relay + Relay() relay.Relay + + // AttachRelay used to attach goridge relay to the worker process + AttachRelay(rl relay.Relay) +} + +type SyncWorker interface { + // BaseProcess provides basic functionality for the SyncWorker + BaseProcess + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS + Exec(rqs payload.Payload) (payload.Payload, error) + // ExecWithContext used to handle Exec with TTL + ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) +} diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go new file mode 100755 index 00000000..1a0393fb --- /dev/null +++ b/pkg/worker/sync_worker.go @@ -0,0 +1,244 @@ +package worker + +import ( + "bytes" + "context" + "time" + + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/frame" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" + "go.uber.org/multierr" +) + +// Allocator is responsible for worker allocation in the pool +type Allocator func() (*SyncWorkerImpl, error) + +type SyncWorkerImpl struct { + process *Process +} + +// From creates SyncWorker from BaseProcess +func From(process *Process) *SyncWorkerImpl { + return &SyncWorkerImpl{ + process: process, + } +} + +// FromSync creates BaseProcess from SyncWorkerImpl +func FromSync(w *SyncWorkerImpl) BaseProcess { + return &Process{ + created: w.process.created, + events: w.process.events, + state: w.process.state, + cmd: w.process.cmd, + pid: w.process.pid, + stderr: w.process.stderr, + endState: w.process.endState, + relay: w.process.relay, + rd: w.process.rd, + } +} + +// Exec payload without TTL timeout. +func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { + const op = errors.Op("sync_worker_exec") + if len(p.Body) == 0 && len(p.Context) == 0 { + return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) + } + + if tw.process.State().Value() != internal.StateReady { + return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) + } + + // set last used time + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) + + rsp, err := tw.execPayload(p) + if err != nil { + // just to be more verbose + if errors.Is(errors.SoftJob, err) == false { + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() + } + return payload.Payload{}, err + } + + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() + + return rsp, nil +} + +type wexec struct { + payload payload.Payload + err error +} + +// Exec payload without TTL timeout. +func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { + const op = errors.Op("sync_worker_exec_worker_with_timeout") + c := make(chan wexec, 1) + + go func() { + if len(p.Body) == 0 && len(p.Context) == 0 { + c <- wexec{ + payload: payload.Payload{}, + err: errors.E(op, errors.Str("payload can not be empty")), + } + return + } + + if tw.process.State().Value() != internal.StateReady { + c <- wexec{ + payload: payload.Payload{}, + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), + } + return + } + + // set last used time + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(internal.StateWorking) + + rsp, err := tw.execPayload(p) + if err != nil { + // just to be more verbose + if errors.Is(errors.SoftJob, err) == false { + tw.process.State().Set(internal.StateErrored) + tw.process.State().RegisterExec() + } + c <- wexec{ + payload: payload.Payload{}, + err: errors.E(op, err), + } + return + } + + tw.process.State().Set(internal.StateReady) + tw.process.State().RegisterExec() + + c <- wexec{ + payload: rsp, + err: nil, + } + }() + + select { + // exec TTL reached + case <-ctx.Done(): + err := multierr.Combine(tw.Kill()) + if err != nil { + // append timeout error + err = multierr.Append(err, errors.E(op, errors.ExecTTL)) + return payload.Payload{}, multierr.Append(err, ctx.Err()) + } + return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err()) + case res := <-c: + if res.err != nil { + return payload.Payload{}, res.err + } + return res.payload, nil + } +} + +func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) { + const op = errors.Op("sync_worker_exec_payload") + + fr := frame.NewFrame() + fr.WriteVersion(frame.VERSION_1) + // can be 0 here + + buf := new(bytes.Buffer) + buf.Write(p.Context) + buf.Write(p.Body) + + // Context offset + fr.WriteOptions(uint32(len(p.Context))) + fr.WritePayloadLen(uint32(buf.Len())) + fr.WritePayload(buf.Bytes()) + + fr.WriteCRC() + + // empty and free the buffer + buf.Truncate(0) + + err := tw.Relay().Send(fr) + if err != nil { + return payload.Payload{}, err + } + + frameR := frame.NewFrame() + + err = tw.process.Relay().Receive(frameR) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + if frameR == nil { + return payload.Payload{}, errors.E(op, errors.Str("nil fr received")) + } + + if !frameR.VerifyCRC() { + return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC")) + } + + flags := frameR.ReadFlags() + + if flags&byte(frame.ERROR) != byte(0) { + return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) + } + + options := frameR.ReadOptions() + if len(options) != 1 { + return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) + } + + pl := payload.Payload{} + pl.Context = frameR.Payload()[:options[0]] + pl.Body = frameR.Payload()[options[0]:] + + return pl, nil +} + +func (tw *SyncWorkerImpl) String() string { + return tw.process.String() +} + +func (tw *SyncWorkerImpl) Pid() int64 { + return tw.process.Pid() +} + +func (tw *SyncWorkerImpl) Created() time.Time { + return tw.process.Created() +} + +func (tw *SyncWorkerImpl) State() internal.State { + return tw.process.State() +} + +func (tw *SyncWorkerImpl) Start() error { + return tw.process.Start() +} + +func (tw *SyncWorkerImpl) Wait() error { + return tw.process.Wait() +} + +func (tw *SyncWorkerImpl) Stop() error { + return tw.process.Stop() +} + +func (tw *SyncWorkerImpl) Kill() error { + return tw.process.Kill() +} + +func (tw *SyncWorkerImpl) Relay() relay.Relay { + return tw.process.Relay() +} + +func (tw *SyncWorkerImpl) AttachRelay(rl relay.Relay) { + tw.process.AttachRelay(rl) +} diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go new file mode 100755 index 00000000..df556e93 --- /dev/null +++ b/pkg/worker/sync_worker_test.go @@ -0,0 +1,34 @@ +package worker + +import ( + "os/exec" + "testing" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/stretchr/testify/assert" +) + +func Test_NotStarted_String(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := InitBaseWorker(cmd) + assert.Contains(t, w.String(), "php tests/client.php echo pipes") + assert.Contains(t, w.String(), "inactive") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_NotStarted_Exec(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := InitBaseWorker(cmd) + + sw := From(w) + + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Contains(t, err.Error(), "Process is not ready (inactive)") +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go new file mode 100755 index 00000000..8fd71cca --- /dev/null +++ b/pkg/worker/worker.go @@ -0,0 +1,318 @@ +package worker + +import ( + "bytes" + "fmt" + "io" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "go.uber.org/multierr" +) + +const ( + // WaitDuration - for how long error buffer should attempt to aggregate error messages + // before merging output together since lastError update (required to keep error update together). + WaitDuration = 25 * time.Millisecond + + // ReadBufSize used to make a slice with specified length to read from stderr + ReadBufSize = 10240 // Kb +) + +type Options func(p *Process) + +// Process - supervised process with api over goridge.Relay. +type Process struct { + // created indicates at what time Process has been created. + created time.Time + + // updates parent supervisor or pool about Process events + events events.Handler + + // state holds information about current Process state, + // number of Process executions, buf status change time. + // publicly this object is receive-only and protected using Mutex + // and atomic counter. + state *internal.WorkerState + + // underlying command with associated process, command must be + // provided to Process from outside in non-started form. CmdSource + // stdErr direction will be handled by Process to aggregate error message. + cmd *exec.Cmd + + // pid of the process, points to pid of underlying process and + // can be nil while process is not started. + pid int + + // stderr aggregates stderr output from underlying process. Value can be + // receive only once command is completed and all pipes are closed. + stderr *bytes.Buffer + + // channel is being closed once command is complete. + // waitDone chan interface{} + + // contains information about resulted process state. + endState *os.ProcessState + + // ensures than only one execution can be run at once. + mu sync.RWMutex + + // communication bus with underlying process. + relay relay.Relay + // rd in a second part of pipe to read from stderr + rd io.Reader + // stop signal terminates io.Pipe from reading from stderr + stop chan struct{} + + syncPool sync.Pool +} + +// InitBaseWorker creates new Process over given exec.cmd. +func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { + const op = errors.Op("init_base_worker") + if cmd.Process != nil { + return nil, fmt.Errorf("can't attach to running process") + } + w := &Process{ + created: time.Now(), + events: events.NewEventsHandler(), + cmd: cmd, + state: internal.NewWorkerState(internal.StateInactive), + stderr: new(bytes.Buffer), + stop: make(chan struct{}, 1), + // sync pool for STDERR + // All receivers are pointers + syncPool: sync.Pool{ + New: func() interface{} { + buf := make([]byte, ReadBufSize) + return &buf + }, + }, + } + + w.rd, w.cmd.Stderr = io.Pipe() + + // small buffer optimization + // at this point we know, that stderr will contain huge messages + w.stderr.Grow(ReadBufSize) + + // add options + for i := 0; i < len(options); i++ { + options[i](w) + } + + go func() { + w.watch() + }() + + return w, nil +} + +func AddListeners(listeners ...events.Listener) Options { + return func(p *Process) { + for i := 0; i < len(listeners); i++ { + p.addListener(listeners[i]) + } + } +} + +// Pid returns worker pid. +func (w *Process) Pid() int64 { + return int64(w.pid) +} + +// Created returns time worker was created at. +func (w *Process) Created() time.Time { + return w.created +} + +// AddListener registers new worker event listener. +func (w *Process) addListener(listener events.Listener) { + w.events.AddListener(listener) +} + +// State return receive-only Process state object, state can be used to safely access +// Process status, time when status changed and number of Process executions. +func (w *Process) State() internal.State { + return w.state +} + +// State return receive-only Process state object, state can be used to safely access +// Process status, time when status changed and number of Process executions. +func (w *Process) AttachRelay(rl relay.Relay) { + w.relay = rl +} + +// State return receive-only Process state object, state can be used to safely access +// Process status, time when status changed and number of Process executions. +func (w *Process) Relay() relay.Relay { + return w.relay +} + +// String returns Process description. fmt.Stringer interface +func (w *Process) String() string { + st := w.state.String() + // we can safely compare pid to 0 + if w.pid != 0 { + st = st + ", pid:" + strconv.Itoa(w.pid) + } + + return fmt.Sprintf( + "(`%s` [%s], numExecs: %v)", + strings.Join(w.cmd.Args, " "), + st, + w.state.NumExecs(), + ) +} + +func (w *Process) Start() error { + err := w.cmd.Start() + if err != nil { + return err + } + w.pid = w.cmd.Process.Pid + return nil +} + +// Wait must be called once for each Process, call will be released once Process is +// complete and will return process error (if any), if stderr is presented it's value +// will be wrapped as WorkerError. Method will return error code if php process fails +// to find or Start the script. +func (w *Process) Wait() error { + const op = errors.Op("process_wait") + err := multierr.Combine(w.cmd.Wait()) + + if w.State().Value() == internal.StateDestroyed { + return errors.E(op, err) + } + + // at this point according to the documentation (see cmd.Wait comment) + // if worker finishes with an error, message will be written to the stderr first + // and then process.cmd.Wait return an error + w.endState = w.cmd.ProcessState + if err != nil { + w.state.Set(internal.StateErrored) + + w.mu.RLock() + // if process return code > 0, here will be an error from stderr (if presents) + if w.stderr.Len() > 0 { + err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) + // stop the stderr buffer + w.stop <- struct{}{} + } + w.mu.RUnlock() + + return multierr.Append(err, w.closeRelay()) + } + + err = multierr.Append(err, w.closeRelay()) + if err != nil { + w.state.Set(internal.StateErrored) + return err + } + + if w.endState.Success() { + w.state.Set(internal.StateStopped) + } + + w.stderr.Reset() + + return nil +} + +func (w *Process) closeRelay() error { + if w.relay != nil { + err := w.relay.Close() + if err != nil { + return err + } + } + return nil +} + +// Stop sends soft termination command to the Process and waits for process completion. +func (w *Process) Stop() error { + var err error + w.state.Set(internal.StateStopping) + err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) + if err != nil { + w.state.Set(internal.StateKilling) + return multierr.Append(err, w.cmd.Process.Kill()) + } + w.state.Set(internal.StateStopped) + return nil +} + +// Kill kills underlying process, make sure to call Wait() func to gather +// error log from the stderr. Does not waits for process completion! +func (w *Process) Kill() error { + if w.State().Value() == internal.StateDestroyed { + err := w.cmd.Process.Signal(os.Kill) + if err != nil { + return err + } + return nil + } + + w.state.Set(internal.StateKilling) + err := w.cmd.Process.Signal(os.Kill) + if err != nil { + return err + } + w.state.Set(internal.StateStopped) + return nil +} + +// put the pointer, to not allocate new slice +// but erase it len and then return back +func (w *Process) put(data *[]byte) { + w.syncPool.Put(data) +} + +// get pointer to the byte slice +func (w *Process) get() *[]byte { + return w.syncPool.Get().(*[]byte) +} + +// Write appends the contents of pool to the errBuffer, growing the errBuffer as +// needed. The return value n is the length of pool; errBuffer is always nil. +func (w *Process) watch() { + go func() { + for { + select { + case <-w.stop: + buf := w.get() + // read the last data + n, _ := w.rd.Read(*buf) + w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.mu.Lock() + // write new message + // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool + w.stderr.Write((*buf)[:n]) + w.mu.Unlock() + w.put(buf) + return + default: + // read the max 10kb of stderr per one read + buf := w.get() + n, _ := w.rd.Read(*buf) + w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.mu.Lock() + // delete all prev messages + w.stderr.Reset() + // write new message + w.stderr.Write((*buf)[:n]) + w.mu.Unlock() + w.put(buf) + } + } + }() +} diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go new file mode 100755 index 00000000..805f66b5 --- /dev/null +++ b/pkg/worker/worker_test.go @@ -0,0 +1,19 @@ +package worker + +import ( + "os/exec" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_OnStarted(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + assert.Nil(t, cmd.Start()) + + w, err := InitBaseWorker(cmd) + assert.Nil(t, w) + assert.NotNil(t, err) + + assert.Equal(t, "can't attach to running process", err.Error()) +} diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go new file mode 100644 index 00000000..927aa270 --- /dev/null +++ b/pkg/worker_watcher/interface.go @@ -0,0 +1,30 @@ +package worker_watcher //nolint:golint,stylecheck + +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Watcher interface { + // AddToWatch used to add stack to wait its state + AddToWatch(workers []worker.SyncWorker) error + + // GetFreeWorker provide first free worker + GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) + + // PutWorker enqueues worker back + PushWorker(w worker.SyncWorker) + + // AllocateNew used to allocate new worker and put in into the WorkerWatcher + AllocateNew() error + + // Destroy destroys the underlying stack + Destroy(ctx context.Context) + + // WorkersList return all stack w/o removing it from internal storage + WorkersList() []worker.SyncWorker + + // RemoveWorker remove worker from the stack + RemoveWorker(wb worker.SyncWorker) error +} diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go new file mode 100644 index 00000000..d76f4d8f --- /dev/null +++ b/pkg/worker_watcher/stack.go @@ -0,0 +1,142 @@ +package worker_watcher //nolint:golint,stylecheck +import ( + "context" + "runtime" + "sync" + "time" + + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Stack struct { + workers []*worker.SyncWorkerImpl + mutex sync.RWMutex + destroy bool + actualNumOfWorkers uint64 + initialNumOfWorkers uint64 +} + +func NewWorkersStack(initialNumOfWorkers uint64) *Stack { + w := runtime.NumCPU() + return &Stack{ + workers: make([]*worker.SyncWorkerImpl, 0, w), + actualNumOfWorkers: 0, + initialNumOfWorkers: initialNumOfWorkers, + } +} + +func (stack *Stack) Reset() { + stack.mutex.Lock() + defer stack.mutex.Unlock() + stack.actualNumOfWorkers = 0 + stack.workers = nil +} + +// Push worker back to the stack +// If stack in destroy state, Push will provide 100ms window to unlock the mutex +func (stack *Stack) Push(w worker.BaseProcess) { + stack.mutex.Lock() + defer stack.mutex.Unlock() + stack.actualNumOfWorkers++ + stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl)) +} + +func (stack *Stack) IsEmpty() bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + return len(stack.workers) == 0 +} + +func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { + stack.mutex.Lock() + defer stack.mutex.Unlock() + + // do not release new stack + if stack.destroy { + return nil, true + } + + if len(stack.workers) == 0 { + return nil, false + } + + // move worker + w := stack.workers[len(stack.workers)-1] + stack.workers = stack.workers[:len(stack.workers)-1] + stack.actualNumOfWorkers-- + return w, false +} + +func (stack *Stack) FindAndRemoveByPid(pid int64) bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + for i := 0; i < len(stack.workers); i++ { + // worker in the stack, reallocating + if stack.workers[i].Pid() == pid { + stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) + stack.actualNumOfWorkers-- + // worker found and removed + return true + } + } + // no worker with such ID + return false +} + +// Workers return copy of the workers in the stack +func (stack *Stack) Workers() []worker.SyncWorker { + stack.mutex.Lock() + defer stack.mutex.Unlock() + workersCopy := make([]worker.SyncWorker, 0, 1) + // copy + for _, v := range stack.workers { + if v != nil { + workersCopy = append(workersCopy, v) + } + } + + return workersCopy +} + +func (stack *Stack) isDestroying() bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + return stack.destroy +} + +// we also have to give a chance to pool to Push worker (return it) +func (stack *Stack) Destroy(ctx context.Context) { + stack.mutex.Lock() + stack.destroy = true + stack.mutex.Unlock() + + tt := time.NewTicker(time.Millisecond * 500) + defer tt.Stop() + for { + select { + case <-tt.C: + stack.mutex.Lock() + // that might be one of the workers is working + if stack.initialNumOfWorkers != stack.actualNumOfWorkers { + stack.mutex.Unlock() + continue + } + stack.mutex.Unlock() + // unnecessary mutex, but + // just to make sure. All stack at this moment are in the stack + // Pop operation is blocked, push can't be done, since it's not possible to pop + stack.mutex.Lock() + for i := 0; i < len(stack.workers); i++ { + // set state for the stack in the stack (unused at the moment) + stack.workers[i].State().Set(internal.StateDestroyed) + // kill the worker + _ = stack.workers[i].Kill() + } + stack.mutex.Unlock() + // clear + stack.Reset() + return + } + } +} diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go new file mode 100644 index 00000000..5287a6dc --- /dev/null +++ b/pkg/worker_watcher/stack_test.go @@ -0,0 +1,142 @@ +package worker_watcher //nolint:golint,stylecheck +import ( + "context" + "os/exec" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/stretchr/testify/assert" +) + +func TestNewWorkersStack(t *testing.T) { + stack := NewWorkersStack(0) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) + assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers) +} + +func TestStack_Push(t *testing.T) { + stack := NewWorkersStack(1) + + w, err := worker.InitBaseWorker(&exec.Cmd{}) + assert.NoError(t, err) + + sw := worker.From(w) + + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) +} + +func TestStack_Pop(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + + sw := worker.From(w) + + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) + + _, _ = stack.Pop() + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) +} + +func TestStack_FindAndRemoveByPid(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + + assert.NoError(t, w.Start()) + + sw := worker.From(w) + + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) + + stack.FindAndRemoveByPid(w.Pid()) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) +} + +func TestStack_IsEmpty(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + + sw := worker.From(w) + stack.Push(sw) + + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) + + assert.Equal(t, false, stack.IsEmpty()) +} + +func TestStack_Workers(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + assert.NoError(t, w.Start()) + + sw := worker.From(w) + stack.Push(sw) + + wrks := stack.Workers() + assert.Equal(t, 1, len(wrks)) + assert.Equal(t, w.Pid(), wrks[0].Pid()) +} + +func TestStack_Reset(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + assert.NoError(t, w.Start()) + + sw := worker.From(w) + stack.Push(sw) + + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) + stack.Reset() + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) +} + +func TestStack_Destroy(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + assert.NoError(t, w.Start()) + + sw := worker.From(w) + stack.Push(sw) + + stack.Destroy(context.Background()) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) +} + +func TestStack_DestroyWithWait(t *testing.T) { + stack := NewWorkersStack(2) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + assert.NoError(t, w.Start()) + + sw := worker.From(w) + stack.Push(sw) + stack.Push(sw) + assert.Equal(t, uint64(2), stack.actualNumOfWorkers) + + go func() { + wrk, _ := stack.Pop() + time.Sleep(time.Second * 3) + stack.Push(wrk) + }() + time.Sleep(time.Second) + stack.Destroy(context.Background()) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) +} diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go new file mode 100755 index 00000000..753b61ee --- /dev/null +++ b/pkg/worker_watcher/worker_watcher.go @@ -0,0 +1,165 @@ +package worker_watcher //nolint:golint,stylecheck + +import ( + "context" + "sync" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +// workerCreateFunc can be nil, but in that case, dead stack will not be replaced +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { + ww := &workerWatcher{ + stack: NewWorkersStack(numWorkers), + allocator: allocator, + events: events, + } + + return ww +} + +type workerWatcher struct { + mutex sync.RWMutex + stack *Stack + allocator worker.Allocator + events events.Handler +} + +func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { + for i := 0; i < len(workers); i++ { + ww.stack.Push(workers[i]) + + go func(swc worker.SyncWorker) { + ww.wait(swc) + }(workers[i]) + } + return nil +} + +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) { + const op = errors.Op("worker_watcher_get_free_worker") + // thread safe operation + w, stop := ww.stack.Pop() + if stop { + return nil, errors.E(op, errors.WatcherStopped) + } + + // handle worker remove state + // in this state worker is destroyed by supervisor + if w != nil && w.State().Value() == internal.StateRemove { + err := ww.RemoveWorker(w) + if err != nil { + return nil, err + } + // try to get next + return ww.GetFreeWorker(ctx) + } + // no free stack + if w == nil { + for { + select { + default: + w, stop = ww.stack.Pop() + if stop { + return nil, errors.E(op, errors.WatcherStopped) + } + if w == nil { + continue + } + return w, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) + } + } + } + + return w, nil +} + +func (ww *workerWatcher) AllocateNew() error { + ww.stack.mutex.Lock() + const op = errors.Op("worker_watcher_allocate_new") + sw, err := ww.allocator() + if err != nil { + return errors.E(op, errors.WorkerAllocate, err) + } + + ww.addToWatch(sw) + ww.stack.mutex.Unlock() + ww.PushWorker(sw) + + return nil +} + +func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { + ww.mutex.Lock() + defer ww.mutex.Unlock() + + const op = errors.Op("worker_watcher_remove_worker") + pid := wb.Pid() + + if ww.stack.FindAndRemoveByPid(pid) { + wb.State().Set(internal.StateRemove) + err := wb.Kill() + if err != nil { + return errors.E(op, err) + } + return nil + } + + return nil +} + +// O(1) operation +func (ww *workerWatcher) PushWorker(w worker.SyncWorker) { + ww.mutex.Lock() + defer ww.mutex.Unlock() + ww.stack.Push(w) +} + +// Destroy all underlying stack (but let them to complete the task) +func (ww *workerWatcher) Destroy(ctx context.Context) { + // destroy stack, we don't use ww mutex here, since we should be able to push worker + ww.stack.Destroy(ctx) +} + +// Warning, this is O(n) operation, and it will return copy of the actual workers +func (ww *workerWatcher) WorkersList() []worker.SyncWorker { + return ww.stack.Workers() +} + +func (ww *workerWatcher) wait(w worker.BaseProcess) { + const op = errors.Op("worker_watcher_wait") + err := w.Wait() + if err != nil { + ww.events.Push(events.WorkerEvent{ + Event: events.EventWorkerError, + Worker: w, + Payload: errors.E(op, err), + }) + } + + if w.State().Value() == internal.StateDestroyed { + // worker was manually destroyed, no need to replace + ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + return + } + + _ = ww.stack.FindAndRemoveByPid(w.Pid()) + err = ww.AllocateNew() + if err != nil { + ww.events.Push(events.PoolEvent{ + Event: events.EventPoolError, + Payload: errors.E(op, err), + }) + } +} + +func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) { + go func() { + ww.wait(wb) + }() +} |