diff options
author | Wolfy-J <[email protected]> | 2018-06-03 12:54:43 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-03 12:54:43 +0300 |
commit | 36ea77baa5a41de10bd604cd0e5b5b3cafaaeb64 (patch) | |
tree | 13ca8abd454a6668f490eec2e44b1520bd3953fe | |
parent | b02611b7266589d888e054a1d2e4432ae370617d (diff) |
service bus, http service, rpc bus, cli commands, new configs
-rw-r--r-- | LICENSE | 2 | ||||
-rw-r--r-- | cmd/rr-php/LICENSE | 202 | ||||
-rw-r--r-- | cmd/rr-php/cmd/root.go | 82 | ||||
-rw-r--r-- | cmd/rr-php/cmd/serve.go | 85 | ||||
-rw-r--r-- | cmd/rr-php/main.go | 23 | ||||
-rw-r--r-- | cmd/rr/LICENSE | 21 | ||||
-rw-r--r-- | cmd/rr/cmd/root.go | 99 | ||||
-rw-r--r-- | cmd/rr/cmd/serve.go | 48 | ||||
-rw-r--r-- | cmd/rr/http/reload.go | 50 | ||||
-rw-r--r-- | cmd/rr/http/workers.go | 64 | ||||
-rw-r--r-- | cmd/rr/main.go | 44 | ||||
-rw-r--r-- | cmd/rr/utils/config.go | 23 | ||||
-rw-r--r-- | cmd/rr/utils/cprint.go | 32 | ||||
-rw-r--r-- | cmd/rr/utils/verbose.go | 18 | ||||
-rw-r--r-- | config.go | 2 | ||||
-rw-r--r-- | factory.go | 2 | ||||
-rw-r--r-- | http/config.go | 85 | ||||
-rw-r--r-- | http/data.go | 2 | ||||
-rw-r--r-- | http/request.go | 6 | ||||
-rw-r--r-- | http/rpc.go | 57 | ||||
-rw-r--r-- | http/server.go | 40 | ||||
-rw-r--r-- | http/service.go | 67 | ||||
-rw-r--r-- | http/uploads.go | 73 | ||||
-rw-r--r-- | pool.go | 5 | ||||
-rw-r--r-- | server.go | 10 | ||||
-rw-r--r-- | service/bus.go | 165 | ||||
-rw-r--r-- | service/config.go | 6 | ||||
-rw-r--r-- | service/factory.go | 69 | ||||
-rw-r--r-- | service/rpc.go | 32 | ||||
-rw-r--r-- | service/service.go | 9 | ||||
-rw-r--r-- | static_pool.go | 29 | ||||
-rw-r--r-- | utils/size.go | 28 | ||||
-rw-r--r-- | worker.go | 14 |
33 files changed, 1042 insertions, 452 deletions
@@ -1,6 +1,6 @@ MIT License -Copyright (c) 2017 SpiralScout +Copyright (c) 2018 SpiralScout Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/cmd/rr-php/LICENSE b/cmd/rr-php/LICENSE deleted file mode 100644 index d6456956..00000000 --- a/cmd/rr-php/LICENSE +++ /dev/null @@ -1,202 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/cmd/rr-php/cmd/root.go b/cmd/rr-php/cmd/root.go deleted file mode 100644 index dfce4399..00000000 --- a/cmd/rr-php/cmd/root.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright © 2018 NAME HERE <EMAIL ADDRESS> -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "fmt" - "os" - "github.com/mitchellh/go-homedir" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -var cfgFile string - -// rootCmd represents the base command when called without any subcommands -var rootCmd = &cobra.Command{ - Use: "rr", - Short: "A brief description of your application", - // Uncomment the following line if your bare application - // has an action associated with it: - // Run: func(cmd *cobra.Command, args []string) { }, -} - -// Execute adds all child commands to the root command and sets flags appropriately. -// This is called by main.main(). It only needs to happen once to the rootCmd. -func Execute() { - if err := rootCmd.Execute(); err != nil { - fmt.Println(err) - os.Exit(1) - } -} - -func init() { - cobra.OnInitialize(initConfig) - - // Here you will define your flags and configuration settings. - // Cobra supports persistent flags, which, if defined here, - // will be global for your application. - rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.rr.yaml)") - - // Cobra also supports local flags, which will only run - // when this action is called directly. - rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} - -// initConfig reads in config file and ENV variables if set. -func initConfig() { - if cfgFile != "" { - // Use config file from the flag. - viper.SetConfigFile(cfgFile) - } else { - // Find home directory. - home, err := homedir.Dir() - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - // Search config in home directory with name ".rr" (without extension). - viper.AddConfigPath(home) - viper.SetConfigName(".rr") - } - - viper.AutomaticEnv() // read in environment variables that match - - // If a config file is found, read it in. - if err := viper.ReadInConfig(); err == nil { - fmt.Println("Using config file:", viper.ConfigFileUsed()) - } -} diff --git a/cmd/rr-php/cmd/serve.go b/cmd/rr-php/cmd/serve.go deleted file mode 100644 index 24ec4043..00000000 --- a/cmd/rr-php/cmd/serve.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright © 2018 NAME HERE <EMAIL ADDRESS> -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "github.com/spf13/cobra" - "github.com/spiral/roadrunner" - "os/exec" - "time" - "github.com/sirupsen/logrus" - rrttp "github.com/spiral/roadrunner/http" - "net/http" - "os" -) - -func init() { - rootCmd.AddCommand(&cobra.Command{ - Use: "serve", - Run: serveHandler, - }) -} - -func serveHandler(cmd *cobra.Command, args []string) { - rr := roadrunner.NewRouter( - func() *exec.Cmd { - return exec.Command("php", "/Users/wolfy-j/Projects/phpapp/webroot/index.php", "rr", "pipes") - }, - roadrunner.NewPipeFactory(), - ) - - err := rr.Configure(roadrunner.Config{ - NumWorkers: 4, - AllocateTimeout: time.Minute, - DestroyTimeout: time.Minute, - //MaxExecutions: 10, - }) - - rr.Observe(func(event int, ctx interface{}) { - logrus.Info(ctx) - }) - - if err != nil { - panic(err) - } - - logrus.Info("serving") - - //Enable http2 - //srv := http.Server{ - // Addr: ":8080", - // Handler: rrhttp.NewServer( - // rrhttp.Config{ - // serveStatic: true, - // Root: "/Users/wolfy-j/Projects/phpapp/webroot", - // }, - // rr, - // ), - //} - - // srv.ListenAndServe() - - //http2.ConfigureServer(&srv, nil) - //srv.ListenAndServeTLS("localhost.cert", "localhost.key") - - http.ListenAndServe(":8080", rrttp.NewServer( - rrttp.Config{ - ServeStatic: true, - Root: "/Users/wolfy-j/Projects/phpapp/webroot", - UploadsDir: os.TempDir(), - }, - rr, - )) -} diff --git a/cmd/rr-php/main.go b/cmd/rr-php/main.go deleted file mode 100644 index e78ce8b2..00000000 --- a/cmd/rr-php/main.go +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright © 2018 NAME HERE <EMAIL ADDRESS> -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "github.com/spiral/roadrunner/cmd/rr-php/cmd" -) - -func main() { - cmd.Execute() -} diff --git a/cmd/rr/LICENSE b/cmd/rr/LICENSE new file mode 100644 index 00000000..efb98c87 --- /dev/null +++ b/cmd/rr/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 SpiralScout + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.
\ No newline at end of file diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go new file mode 100644 index 00000000..ac8466ef --- /dev/null +++ b/cmd/rr/cmd/root.go @@ -0,0 +1,99 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package cmd + +import ( + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/cmd/rr/utils" + "os" + "fmt" +) + +// Service bus for all the commands. +var ( + // Shared service bus. + Bus *service.Bus + + // Root is application endpoint. + Root = &cobra.Command{ + Use: "rr", + Short: "RoadRunner, PHP application server", + } + + cfgFile string + verbose bool +) + +// Execute adds all child commands to the Root command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the Root. +func Execute(serviceBus *service.Bus) { + Bus = serviceBus + if err := Root.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +func init() { + Root.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "verbose output") + Root.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is .rr.yaml)") + + cobra.OnInitialize(func() { + if verbose { + logrus.SetLevel(logrus.DebugLevel) + } + + if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil { + if err := Bus.Configure(cfg); err != nil { + panic(err) + } + } + }) +} + +func initConfig(cfgFile string, path []string, name string) service.Config { + cfg := viper.New() + + if cfgFile != "" { + // Use cfg file from the flag. + cfg.SetConfigFile(cfgFile) + } else { + // automatic location + for _, p := range path { + cfg.AddConfigPath(p) + } + cfg.SetConfigName(name) + } + + // read in environment variables that match + cfg.AutomaticEnv() + + // If a cfg file is found, read it in. + if err := cfg.ReadInConfig(); err != nil { + logrus.Warnf("config: %s", err) + return nil + } + + return &utils.ConfigWrapper{cfg} +} diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go new file mode 100644 index 00000000..f5524556 --- /dev/null +++ b/cmd/rr/cmd/serve.go @@ -0,0 +1,48 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package cmd + +import ( + "github.com/spf13/cobra" + "os" + "os/signal" + "syscall" +) + +var ( + stopSignal = make(chan os.Signal, 1) +) + +func init() { + Root.AddCommand(&cobra.Command{ + Use: "serve", + Short: "Serve RoadRunner service(s)", + Run: serveHandler, + }) + + signal.Notify(stopSignal, syscall.SIGTERM) +} + +func serveHandler(cmd *cobra.Command, args []string) { + Bus.Serve() + <-stopSignal + Bus.Stop() +} diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reload.go new file mode 100644 index 00000000..6caf0a71 --- /dev/null +++ b/cmd/rr/http/reload.go @@ -0,0 +1,50 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package http + +import ( + "github.com/spf13/cobra" + "fmt" + rr "github.com/spiral/roadrunner/cmd/rr/cmd" +) + +func init() { + rr.Root.AddCommand(&cobra.Command{ + Use: "http:reload", + Short: "Reload RoadRunner worker pools for the HTTP service", + Run: reloadHandler, + }) +} + +func reloadHandler(cmd *cobra.Command, args []string) { + client, err := rr.Bus.RCPClient() + if err != nil { + panic(err) // todo: change + } + defer client.Close() + + var r string + if err := client.Call("http.Reset", true, &r); err != nil { + panic(err) + } + + fmt.Println(r) +} diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go new file mode 100644 index 00000000..c1225987 --- /dev/null +++ b/cmd/rr/http/workers.go @@ -0,0 +1,64 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package http + +import ( + "github.com/spf13/cobra" + "github.com/spiral/roadrunner/http" + "github.com/olekukonko/tablewriter" + "os" + "strconv" + rr "github.com/spiral/roadrunner/cmd/rr/cmd" +) + +func init() { + rr.Root.AddCommand(&cobra.Command{ + Use: "http:workers", + Short: "List workers associated with RoadRunner HTTP service", + Run: workersHandler, + }) +} + +func workersHandler(cmd *cobra.Command, args []string) { + client, err := rr.Bus.RCPClient() + if err != nil { + panic(err) // todo: change + } + defer client.Close() + + var r http.WorkerList + if err := client.Call("http.Workers", true, &r); err != nil { + panic(err) + } + + tw := tablewriter.NewWriter(os.Stdout) + tw.SetHeader([]string{"PID", "Status", "Num Execs"}) + + for _, w := range r.Workers { + tw.Append([]string{ + strconv.Itoa(w.Pid), + w.Status, + strconv.Itoa(int(w.NumExecs)), + }) + } + + tw.Render() +} diff --git a/cmd/rr/main.go b/cmd/rr/main.go new file mode 100644 index 00000000..336aeddd --- /dev/null +++ b/cmd/rr/main.go @@ -0,0 +1,44 @@ +// MIT License +// +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package main + +import ( + "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/http" + _ "github.com/spiral/roadrunner/cmd/rr/http" +) + +var bus *service.Bus + +func init() { + bus = service.NewBus() +} + +func main() { + // http server with PSR7 support + bus.Register(&http.Service{}) + + // you can register additional commands using cmd.Root + cmd.Execute(bus) +} diff --git a/cmd/rr/utils/config.go b/cmd/rr/utils/config.go new file mode 100644 index 00000000..e7e22b3a --- /dev/null +++ b/cmd/rr/utils/config.go @@ -0,0 +1,23 @@ +package utils + +import ( + "github.com/spf13/viper" + "github.com/spiral/roadrunner/service" +) + +type ConfigWrapper struct { + Viper *viper.Viper +} + +func (w *ConfigWrapper) Get(key string) service.Config { + sub := w.Viper.Sub(key) + if sub == nil { + return nil + } + + return &ConfigWrapper{sub} +} + +func (w *ConfigWrapper) Unmarshal(out interface{}) error { + return w.Viper.Unmarshal(out) +} diff --git a/cmd/rr/utils/cprint.go b/cmd/rr/utils/cprint.go new file mode 100644 index 00000000..f6f828f8 --- /dev/null +++ b/cmd/rr/utils/cprint.go @@ -0,0 +1,32 @@ +package utils + +import ( + "fmt" + "gopkg.in/AlecAivazis/survey.v1/core" + "regexp" + "strings" +) + +// Printf works identically to fmt.Print but adds `<white+hb>color formatting support for CLI</reset>`. +func Printf(format string, args ...interface{}) { + fmt.Print(Sprintf(format, args...)) +} + +// Sprintf works identically to fmt.Sprintf but adds `<white+hb>color formatting support for CLI</reset>`. +func Sprintf(format string, args ...interface{}) string { + r, err := regexp.Compile(`<([^>]+)>`) + if err != nil { + panic(err) + } + + format = r.ReplaceAllStringFunc(format, func(s string) string { + return fmt.Sprintf(`{{color "%s"}}`, strings.Trim(s, "<>/")) + }) + + out, err := core.RunTemplate(fmt.Sprintf(format, args...), nil) + if err != nil { + panic(err) + } + + return out +} diff --git a/cmd/rr/utils/verbose.go b/cmd/rr/utils/verbose.go new file mode 100644 index 00000000..43770f34 --- /dev/null +++ b/cmd/rr/utils/verbose.go @@ -0,0 +1,18 @@ +package utils + +//if f.Verbose { +// rr.Observe(func(event int, ctx interface{}) { +// switch event { +// case roadrunner.EventPoolError: +// logrus.Error(ctx) +// case roadrunner.EventWorkerCreate: +// logrus.Infof("%s - created", ctx) +// case roadrunner.EventWorkerError: +// logrus.Errorf("%s: %s", ctx.(roadrunner.WorkerError).Worker, ctx.(roadrunner.WorkerError).Error()) +// case roadrunner.EventWorkerDestruct: +// logrus.Warnf("%s - destructed", ctx) +// case roadrunner.EventWorkerKill: +// logrus.Warnf("%s - killed", ctx) +// } +// }) +//} @@ -25,7 +25,7 @@ type Config struct { DestroyTimeout time.Duration } -// Valid returns error if cfg not valid +// Configure returns error if cfg not valid func (cfg *Config) Valid() error { if cfg.NumWorkers == 0 { return fmt.Errorf("cfg.NumWorkers must be set") @@ -2,7 +2,7 @@ package roadrunner import "os/exec" -// Factory is responsible of wrapping given command into tasks worker. +// Pool is responsible of wrapping given command into tasks worker. type Factory interface { // SpawnWorker creates new worker process based on given command. // Process must not be started. diff --git a/http/config.go b/http/config.go new file mode 100644 index 00000000..fe5fab36 --- /dev/null +++ b/http/config.go @@ -0,0 +1,85 @@ +package http + +import ( + "strings" + "path" + "github.com/spiral/roadrunner/service" + "os" + "github.com/spiral/roadrunner/utils" + "fmt" +) + +// Configures RoadRunner HTTP server. +type Config struct { + // serve enables static file serving from desired root directory. + ServeStatic bool + + // Root directory, required when serve set to true. + Root string + + // TmpDir contains name of temporary directory to store uploaded files passed to underlying PHP process. + TmpDir string + + // MaxRequest specified max size for payload body in bytes, set 0 to unlimited. + MaxRequest int64 + + // ForbidUploads specifies list of file extensions which are forbidden for uploads. + // Example: .php, .exe, .bat, .htaccess and etc. + ForbidUploads []string +} + +// ForbidUploads must return true if file extension is not allowed for the upload. +func (cfg Config) Forbidden(filename string) bool { + ext := strings.ToLower(path.Ext(filename)) + + for _, v := range cfg.ForbidUploads { + if ext == v { + return true + } + } + + return false +} + +type serviceConfig struct { + Enabled bool + Host string + Port string + MaxRequest string + Static struct { + Serve bool + Root string + } + + Uploads struct { + TmpDir string + Forbid []string + } + + Pool service.PoolConfig + + //todo: verbose ? +} + +func (cfg *serviceConfig) httpAddr() string { + return fmt.Sprintf("%s:%v", cfg.Host, cfg.Port) +} + +func (cfg *serviceConfig) httpConfig() *Config { + tmpDir := cfg.Uploads.TmpDir + if tmpDir == "" { + tmpDir = os.TempDir() + } + + return &Config{ + ServeStatic: cfg.Static.Serve, + Root: cfg.Static.Root, + TmpDir: tmpDir, + MaxRequest: utils.ParseSize(cfg.MaxRequest), + ForbidUploads: cfg.Uploads.Forbid, + } +} + +func (cfg *serviceConfig) Valid() error { + return nil +} diff --git a/http/data.go b/http/data.go index 865e4760..b84150ee 100644 --- a/http/data.go +++ b/http/data.go @@ -42,7 +42,7 @@ func (d dataTree) push(k string, v []string) { // mount mounts data tree recursively. func (d dataTree) mount(i []string, v []string) { - if len(v) == 0 || v[0] == "" { + if len(v) == 0 { return } diff --git a/http/request.go b/http/request.go index 572d7d6a..516deda2 100644 --- a/http/request.go +++ b/http/request.go @@ -79,13 +79,13 @@ func NewRequest(r *http.Request) (req *Request, err error) { return req, nil } -// OpenUploads moves all uploaded files to temporary directory so it can be given to php later. -func (r *Request) OpenUploads(tmpDir string) error { +// Open moves all uploaded files to temporary directory so it can be given to php later. +func (r *Request) Open(cfg *Config) error { if r.Uploads == nil { return nil } - return r.Uploads.OpenUploads(tmpDir) + return r.Uploads.Open(cfg) } // Close clears all temp file uploads diff --git a/http/rpc.go b/http/rpc.go new file mode 100644 index 00000000..c096ff77 --- /dev/null +++ b/http/rpc.go @@ -0,0 +1,57 @@ +package http + +import ( + "github.com/sirupsen/logrus" +) + +type RPCServer struct { + Service *Service +} + +// WorkerList contains list of workers. +type WorkerList struct { + // Workers is list of workers. + Workers []Worker `json:"workers"` +} + +// Worker provides information about specific worker. +type Worker struct { + // Pid contains process id. + Pid int `json:"pid"` + + // Status of the worker. + Status string `json:"status"` + + // Number of worker executions. + NumExecs uint64 `json:"numExecs"` + + // Created is unix nano timestamp of worker creation time. + Created int64 `json:"created"` + + // Updated is unix nano timestamp of last worker execution. + Updated int64 `json:"updated"` +} + +// Reset resets underlying RR worker pool and restarts all of it's workers. +func (rpc *RPCServer) Reset(reset bool, r *string) error { + logrus.Info("resetting worker pool") + *r = "OK" + + return rpc.Service.srv.rr.Reset() +} + +// Workers returns list of active workers and their stats. +func (rpc *RPCServer) Workers(list bool, r *WorkerList) error { + for _, w := range rpc.Service.srv.rr.Workers() { + state := w.State() + r.Workers = append(r.Workers, Worker{ + Pid: *w.Pid, + Status: state.String(), + NumExecs: state.NumExecs(), + Created: w.Created.UnixNano(), + Updated: state.Updated().UnixNano(), + }) + } + + return nil +} diff --git a/http/server.go b/http/server.go index 363dca2d..039dba02 100644 --- a/http/server.go +++ b/http/server.go @@ -3,30 +3,21 @@ package http import ( "github.com/spiral/roadrunner" "net/http" + "strconv" + "errors" + "github.com/sirupsen/logrus" ) -// Configures RoadRunner HTTP server. -type Config struct { - // serve enables static file serving from desired root directory. - ServeStatic bool - - // Root directory, required when serve set to true. - Root string - - // UploadsDir contains name of temporary directory to store uploaded files passed to underlying PHP process. - UploadsDir string -} - -// Server serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, +// Service serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, // parsed files and query, payload will include parsed form dataTree (if any). type Server struct { - cfg Config + cfg *Config static *staticServer rr *roadrunner.Server } // NewServer returns new instance of HTTP PSR7 server. -func NewServer(cfg Config, server *roadrunner.Server) *Server { +func NewServer(cfg *Config, server *roadrunner.Server) *Server { h := &Server{cfg: cfg, rr: server} if cfg.ServeStatic { @@ -42,13 +33,26 @@ func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) () { return } + // validating request size + if srv.cfg.MaxRequest != 0 { + if length := r.Header.Get("content-length"); length != "" { + if size, err := strconv.ParseInt(length, 10, 64); err != nil { + srv.sendError(w, r, err) + return + } else if size > srv.cfg.MaxRequest { + srv.sendError(w, r, errors.New("request body max size is exceeded")) + return + } + } + } + req, err := NewRequest(r) if err != nil { srv.sendError(w, r, err) return } - if err = req.OpenUploads(srv.cfg.UploadsDir); err != nil { + if err = req.Open(srv.cfg); err != nil { srv.sendError(w, r, err) return } @@ -77,6 +81,10 @@ func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) () { // sendError sends error func (srv *Server) sendError(w http.ResponseWriter, r *http.Request, err error) { + if _, job := err.(roadrunner.JobError); !job { + logrus.Error(err) + } + w.WriteHeader(500) w.Write([]byte(err.Error())) } diff --git a/http/service.go b/http/service.go new file mode 100644 index 00000000..e38e1a03 --- /dev/null +++ b/http/service.go @@ -0,0 +1,67 @@ +package http + +import ( + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner/service" + "net/http" + "context" +) + +type Service struct { + cfg *serviceConfig + http *http.Server + srv *Server +} + +func (s *Service) Name() string { + return "http" +} + +func (s *Service) Configure(cfg service.Config) (bool, error) { + config := &serviceConfig{} + if err := cfg.Unmarshal(config); err != nil { + return false, err + } + + if !config.Enabled { + return false, nil + } + + if err := config.Valid(); err != nil { + return false, err + } + + s.cfg = config + return true, nil +} + +func (s *Service) RPC() interface{} { + return &RPCServer{s} +} + +func (s *Service) Serve() error { + logrus.Debugf("http: started") + defer logrus.Debugf("http: stopped") + + rr, term, err := s.cfg.Pool.NewServer() + if err != nil { + return err + } + defer term() + + s.srv = NewServer(s.cfg.httpConfig(), rr) + s.http = &http.Server{ + Addr: s.cfg.httpAddr(), + Handler: s.srv, + } + + if err := s.http.ListenAndServe(); err != nil { + return err + } + + return nil +} + +func (s *Service) Stop() error { + return s.http.Shutdown(context.Background()) +} diff --git a/http/uploads.go b/http/uploads.go index 1b851e6e..370e73e6 100644 --- a/http/uploads.go +++ b/http/uploads.go @@ -3,12 +3,30 @@ package http import ( "mime/multipart" "encoding/json" - "log" "strings" "net/http" "io/ioutil" "io" "sync" + "os" + "fmt" +) + +const ( + // There is no error, the file uploaded with success. + UploadErrorOK = 0 + + // No file was uploaded. + UploadErrorNoFile = 4 + + // Missing a temporary folder. + UploadErrorNoTmpDir = 5 + + // Failed to write file to disk. + UploadErrorCantWrite = 6 + + // ForbidUploads file extension. + UploadErrorExtension = 7 ) // FileUpload represents singular file wrapUpload. @@ -17,46 +35,56 @@ type FileUpload struct { Name string `json:"name"` // MimeType contains mime-type provided by the client. - MimeType string `json:"mimetype"` + MimeType string `json:"type"` // Size of the uploaded file. Size int64 `json:"size"` // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php - Error int + Error int `json:"error"` // TempFilename points to temporary file location. - TempFilename string `json:"tempFilename"` + TempFilename string `json:"tmpName"` // associated file header header *multipart.FileHeader } -func (f *FileUpload) Open(tmpDir string) error { +func (f *FileUpload) Open(cfg *Config) error { + if cfg.Forbidden(f.Name) { + f.Error = UploadErrorExtension + return nil + } + file, err := f.header.Open() if err != nil { + f.Error = UploadErrorNoFile return err } - defer file.Close() - tmp, err := ioutil.TempFile(tmpDir, "upload") + tmp, err := ioutil.TempFile(cfg.TmpDir, "upload") if err != nil { + // most likely cause of this issue is missing tmp dir + f.Error = UploadErrorNoTmpDir return err } f.TempFilename = tmp.Name() defer tmp.Close() - f.Size, err = io.Copy(tmp, file) + if f.Size, err = io.Copy(tmp, file); err != nil { + f.Error = UploadErrorCantWrite + } + return err } func wrapUpload(f *multipart.FileHeader) *FileUpload { - log.Print(f.Header) return &FileUpload{ Name: f.Filename, MimeType: f.Header.Get("Content-Type"), + Error: UploadErrorOK, header: f, } } @@ -119,26 +147,29 @@ func (u *Uploads) MarshalJSON() ([]byte, error) { return json.Marshal(u.tree) } -// OpenUploads moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors +// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors // will be handled individually. @todo: do we need it? -func (u *Uploads) OpenUploads(tmpDir string) error { +func (u *Uploads) Open(cfg *Config) error { var wg sync.WaitGroup for _, f := range u.list { wg.Add(1) go func(f *FileUpload) { defer wg.Done() - f.Open(tmpDir) + f.Open(cfg) }(f) } wg.Wait() - log.Print(u.list) return nil } // Clear deletes all temporary files. func (u *Uploads) Clear() { - + for _, f := range u.list { + if f.TempFilename != "" && exists(f.TempFilename) { + os.Remove(f.TempFilename) + } + } } // parse incoming dataTree request into JSON (including multipart form dataTree) @@ -160,3 +191,17 @@ func parseUploads(r *http.Request) (*Uploads, error) { return u, nil } + +// exists if file exists. by osutils; todo: better? +func exists(path string) bool { + _, err := os.Stat(path) + if err == nil { + return true + } + + if os.IsNotExist(err) { + return false + } + + panic(fmt.Errorf("unable to stat path %q; %v", path, err)) +} @@ -4,9 +4,12 @@ const ( // EventWorkerCreate thrown when new worker is spawned. EventWorkerCreate = iota - // EventWorkerDestruct thrown before worker destruction. + // EventWorkerDestruct thrown after worker destruction. EventWorkerDestruct + // EventWorkerKill thrown after worker is being forcefully killed. + EventWorkerKill + // EventWorkerError thrown any worker related even happen (passed with WorkerError) EventWorkerError @@ -8,13 +8,13 @@ import ( const ( // EventNewPool triggered when server creates new pool. - EventNewPool = 3 + EventNewPool = 60 // EventDestroyPool triggered when server destroys existed pool. - EventDestroyPool = 4 + EventDestroyPool = 61 ) -// Server manages pool creation and swapping. +// Service manages pool creation and swapping. type Server struct { // observes pool events (can be attached to multiple pools at the same time) observer func(event int, ctx interface{}) @@ -35,8 +35,8 @@ type Server struct { pool Pool } -// NewRouter creates new router. Make sure to call configure before the usage. -func NewRouter(cmd func() *exec.Cmd, factory Factory) *Server { +// NewServer creates new router. Make sure to call configure before the usage. +func NewServer(cmd func() *exec.Cmd, factory Factory) *Server { return &Server{ cmd: cmd, factory: factory, diff --git a/service/bus.go b/service/bus.go new file mode 100644 index 00000000..40e73c31 --- /dev/null +++ b/service/bus.go @@ -0,0 +1,165 @@ +package service + +import ( + "github.com/sirupsen/logrus" + "net/rpc" + "sync" + "github.com/spiral/goridge" + "github.com/pkg/errors" +) + +const ( + rpcConfig = "rpc" +) + +var ( + dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") +) + +type Bus struct { + wg sync.WaitGroup + services []Service + enabled []Service + stop chan interface{} + rpc *rpc.Server + rpcConfig *RPCConfig +} + +func (b *Bus) Register(s Service) { + b.services = append(b.services, s) +} + +func (b *Bus) Services() []Service { + return b.services +} + +func (b *Bus) Configure(cfg Config) error { + if segment := cfg.Get(rpcConfig); segment == nil { + logrus.Warn("rpc: no config has been provided") + } else { + b.rpcConfig = &RPCConfig{} + if err := segment.Unmarshal(b.rpcConfig); err != nil { + return err + } + } + + b.enabled = make([]Service, 0) + + for _, s := range b.services { + segment := cfg.Get(s.Name()) + if segment == nil { + // no config has been provided for the service + logrus.Debugf("%s: no config has been provided", s.Name()) + continue + } + + if enable, err := s.Configure(segment); err != nil { + return err + } else if !enable { + continue + } + + b.enabled = append(b.enabled, s) + } + + return nil +} + +func (b *Bus) RCPClient() (*rpc.Client, error) { + if b.rpcConfig == nil { + return nil, errors.New("rpc is not configured") + } + + conn, err := b.rpcConfig.CreateDialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} + +func (b *Bus) Serve() { + b.rpc = rpc.NewServer() + + for _, s := range b.enabled { + // some services might provide net/rpc api for internal communications + if api := s.RPC(); api != nil { + b.rpc.RegisterName(s.Name(), api) + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + + if err := s.Serve(); err != nil { + logrus.Errorf("%s.start: ", s.Name(), err) + } + }() + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + + logrus.Debug("rpc: started") + if err := b.serveRPC(); err != nil { + logrus.Errorf("rpc: %s", err) + } + }() + + b.wg.Wait() +} + +func (b *Bus) Stop() { + if err := b.stopRPC(); err != nil { + logrus.Errorf("rpc: ", err) + } + + for _, s := range b.enabled { + if err := s.Stop(); err != nil { + logrus.Errorf("%s.stop: ", s.Name(), err) + } + } + + b.wg.Wait() +} + +func (b *Bus) serveRPC() error { + if b.rpcConfig == nil { + return nil + } + + b.stop = make(chan interface{}) + + ln, err := b.rpcConfig.CreateListener() + if err != nil { + return err + } + defer ln.Close() + + for { + select { + case <-b.stop: + b.rpc = nil + return nil + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + go b.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + + return nil +} + +func (b *Bus) stopRPC() error { + if b.rpcConfig == nil { + return nil + } + + close(b.stop) + return nil +} diff --git a/service/config.go b/service/config.go new file mode 100644 index 00000000..d5381376 --- /dev/null +++ b/service/config.go @@ -0,0 +1,6 @@ +package service + +type Config interface { + Get(key string) Config + Unmarshal(out interface{}) error +} diff --git a/service/factory.go b/service/factory.go new file mode 100644 index 00000000..dbdebc4f --- /dev/null +++ b/service/factory.go @@ -0,0 +1,69 @@ +package service + +import ( + "github.com/spiral/roadrunner" + "time" + "os/exec" + "strings" + "net" +) + +type PoolConfig struct { + Command string + Relay string + + Number uint64 + MaxJobs uint64 + + Timeouts struct { + Allocate int + Destroy int + } +} + +func (f *PoolConfig) NewServer() (*roadrunner.Server, func(), error) { + relays, terminator, err := f.relayFactory() + if err != nil { + terminator() + return nil, nil, err + } + + rr := roadrunner.NewServer(f.cmd(), relays) + if err := rr.Configure(f.rrConfig()); err != nil { + return nil, nil, err + } + + return rr, nil, nil +} + +func (f *PoolConfig) rrConfig() roadrunner.Config { + return roadrunner.Config{ + NumWorkers: f.Number, + MaxExecutions: f.MaxJobs, + AllocateTimeout: time.Second * time.Duration(f.Timeouts.Allocate), + DestroyTimeout: time.Second * time.Duration(f.Timeouts.Destroy), + } +} + +func (f *PoolConfig) cmd() func() *exec.Cmd { + cmd := strings.Split(f.Command, " ") + return func() *exec.Cmd { return exec.Command(cmd[0], cmd[1:]...) } +} + +func (f *PoolConfig) relayFactory() (roadrunner.Factory, func(), error) { + if f.Relay == "pipes" || f.Relay == "pipe" { + return roadrunner.NewPipeFactory(), nil, nil + } + + dsn := strings.Split(f.Relay, "://") + if len(dsn) != 2 { + return nil, nil, dsnError + } + + ln, err := net.Listen(dsn[0], dsn[1]) + if err != nil { + return nil, nil, err + } + + return roadrunner.NewSocketFactory(ln, time.Minute), func() { ln.Close() }, nil +} diff --git a/service/rpc.go b/service/rpc.go new file mode 100644 index 00000000..eb128768 --- /dev/null +++ b/service/rpc.go @@ -0,0 +1,32 @@ +package service + +import ( + "net" + "strings" +) + +type RPCConfig struct { + Listen string +} + +func (cfg *RPCConfig) CreateListener() (net.Listener, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, dsnError + } + + return net.Listen(dsn[0], dsn[1]) +} + +func (cfg *RPCConfig) CreateDialer() (net.Conn, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, dsnError + } + + return net.Dial(dsn[0], dsn[1]) +} + +func NewBus() *Bus { + return &Bus{services: make([]Service, 0)} +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 00000000..2f704657 --- /dev/null +++ b/service/service.go @@ -0,0 +1,9 @@ +package service + +type Service interface { + Name() string + Configure(cfg Config) (bool, error) + RPC() interface{} + Serve() error + Stop() error +} diff --git a/static_pool.go b/static_pool.go index be5e9b06..0527d024 100644 --- a/static_pool.go +++ b/static_pool.go @@ -107,7 +107,7 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { if err != nil { // soft job errors are allowed if _, jobError := err.(JobError); jobError { - p.free <- w + p.release(w) return nil, err } @@ -121,12 +121,7 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { return p.Exec(rqs) } - if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions { - go p.replaceWorker(w, p.cfg.MaxExecutions) - } else { - p.free <- w - } - + p.release(w) return rsp, nil } @@ -165,6 +160,16 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { } } +// release releases or replaces the worker. +func (p *StaticPool) release(w *Worker) { + if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions { + go p.replaceWorker(w, p.cfg.MaxExecutions) + return + } + + p.free <- w +} + // replaceWorker replaces dead or expired worker with new instance. func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { go p.destroyWorker(w) @@ -183,13 +188,11 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { // destroyWorker destroys workers and removes it from the pool. func (p *StaticPool) destroyWorker(w *Worker) { - p.throw(EventWorkerDestruct, w) - // detaching p.muw.Lock() for i, wc := range p.workers { if wc == w { - p.workers = p.workers[:i+1] + p.workers = append(p.workers[:i], p.workers[i+1:]...) break } } @@ -200,11 +203,15 @@ func (p *StaticPool) destroyWorker(w *Worker) { select { case <-w.waitDone: // worker is dead + p.throw(EventWorkerDestruct, w) + case <-time.NewTimer(p.cfg.DestroyTimeout).C: - // failed to stop process + // failed to stop process in given time if err := w.Kill(); err != nil { p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) } + + p.throw(EventWorkerKill, w) } } diff --git a/utils/size.go b/utils/size.go new file mode 100644 index 00000000..176cc9e1 --- /dev/null +++ b/utils/size.go @@ -0,0 +1,28 @@ +package utils + +import ( + "strconv" + "strings" +) + +func ParseSize(size string) int64 { + if len(size) == 0 { + return 0 + } + + s, err := strconv.Atoi(size[:len(size)-1]) + if err != nil { + return 0 + } + + switch strings.ToLower(size[len(size)-1:]) { + case "k", "kb": + return int64(s * 1024) + case "m", "mb": + return int64(s * 1024 * 1024) + case "g", "gb": + return int64(s * 1024 * 1024 * 1024) + } + + return 0 +} @@ -11,6 +11,7 @@ import ( "strconv" "strings" "sync" + "time" ) // Worker - supervised process with api over goridge.Relay. @@ -19,6 +20,9 @@ type Worker struct { // can be nil while process is not started. Pid *int + // Created indicates at what time worker has been created. + Created time.Time + // state holds information about current worker state, // number of worker executions, last status change time. // publicly this object is receive-only and protected using Mutex @@ -26,7 +30,7 @@ type Worker struct { state *state // underlying command with associated process, command must be - // provided to worker from outside in non-started form. Cmd + // provided to worker from outside in non-started form. Command // stdErr direction will be handled by worker to aggregate error message. cmd *exec.Cmd @@ -54,6 +58,7 @@ func newWorker(cmd *exec.Cmd) (*Worker, error) { } w := &Worker{ + Created: time.Now(), cmd: cmd, err: new(bytes.Buffer), waitDone: make(chan interface{}), @@ -134,15 +139,12 @@ func (w *Worker) Stop() error { } // Kill kills underlying process, make sure to call Wait() func to gather -// error log from the stderr. Waits for process completion. +// error log from the stderr. Does not waits for process completion! func (w *Worker) Kill() error { select { case <-w.waitDone: return nil default: - w.mu.Lock() - defer w.mu.Unlock() - w.state.set(StateInactive) err := w.cmd.Process.Signal(os.Kill) @@ -163,7 +165,7 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { } if w.state.Value() != StateReady { - return nil, fmt.Errorf("worker is not ready (%s)", w.state.Value()) + return nil, fmt.Errorf("worker is not ready (%s)", w.state.String()) } w.state.set(StateWorking) |