summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-03 12:54:43 +0300
committerWolfy-J <[email protected]>2018-06-03 12:54:43 +0300
commit36ea77baa5a41de10bd604cd0e5b5b3cafaaeb64 (patch)
tree13ca8abd454a6668f490eec2e44b1520bd3953fe
parentb02611b7266589d888e054a1d2e4432ae370617d (diff)
service bus, http service, rpc bus, cli commands, new configs
-rw-r--r--LICENSE2
-rw-r--r--cmd/rr-php/LICENSE202
-rw-r--r--cmd/rr-php/cmd/root.go82
-rw-r--r--cmd/rr-php/cmd/serve.go85
-rw-r--r--cmd/rr-php/main.go23
-rw-r--r--cmd/rr/LICENSE21
-rw-r--r--cmd/rr/cmd/root.go99
-rw-r--r--cmd/rr/cmd/serve.go48
-rw-r--r--cmd/rr/http/reload.go50
-rw-r--r--cmd/rr/http/workers.go64
-rw-r--r--cmd/rr/main.go44
-rw-r--r--cmd/rr/utils/config.go23
-rw-r--r--cmd/rr/utils/cprint.go32
-rw-r--r--cmd/rr/utils/verbose.go18
-rw-r--r--config.go2
-rw-r--r--factory.go2
-rw-r--r--http/config.go85
-rw-r--r--http/data.go2
-rw-r--r--http/request.go6
-rw-r--r--http/rpc.go57
-rw-r--r--http/server.go40
-rw-r--r--http/service.go67
-rw-r--r--http/uploads.go73
-rw-r--r--pool.go5
-rw-r--r--server.go10
-rw-r--r--service/bus.go165
-rw-r--r--service/config.go6
-rw-r--r--service/factory.go69
-rw-r--r--service/rpc.go32
-rw-r--r--service/service.go9
-rw-r--r--static_pool.go29
-rw-r--r--utils/size.go28
-rw-r--r--worker.go14
33 files changed, 1042 insertions, 452 deletions
diff --git a/LICENSE b/LICENSE
index d78565f0..efb98c87 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,6 +1,6 @@
MIT License
-Copyright (c) 2017 SpiralScout
+Copyright (c) 2018 SpiralScout
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
diff --git a/cmd/rr-php/LICENSE b/cmd/rr-php/LICENSE
deleted file mode 100644
index d6456956..00000000
--- a/cmd/rr-php/LICENSE
+++ /dev/null
@@ -1,202 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
diff --git a/cmd/rr-php/cmd/root.go b/cmd/rr-php/cmd/root.go
deleted file mode 100644
index dfce4399..00000000
--- a/cmd/rr-php/cmd/root.go
+++ /dev/null
@@ -1,82 +0,0 @@
-// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package cmd
-
-import (
- "fmt"
- "os"
- "github.com/mitchellh/go-homedir"
- "github.com/spf13/cobra"
- "github.com/spf13/viper"
-)
-
-var cfgFile string
-
-// rootCmd represents the base command when called without any subcommands
-var rootCmd = &cobra.Command{
- Use: "rr",
- Short: "A brief description of your application",
- // Uncomment the following line if your bare application
- // has an action associated with it:
- // Run: func(cmd *cobra.Command, args []string) { },
-}
-
-// Execute adds all child commands to the root command and sets flags appropriately.
-// This is called by main.main(). It only needs to happen once to the rootCmd.
-func Execute() {
- if err := rootCmd.Execute(); err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
-}
-
-func init() {
- cobra.OnInitialize(initConfig)
-
- // Here you will define your flags and configuration settings.
- // Cobra supports persistent flags, which, if defined here,
- // will be global for your application.
- rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.rr.yaml)")
-
- // Cobra also supports local flags, which will only run
- // when this action is called directly.
- rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
-}
-
-// initConfig reads in config file and ENV variables if set.
-func initConfig() {
- if cfgFile != "" {
- // Use config file from the flag.
- viper.SetConfigFile(cfgFile)
- } else {
- // Find home directory.
- home, err := homedir.Dir()
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
-
- // Search config in home directory with name ".rr" (without extension).
- viper.AddConfigPath(home)
- viper.SetConfigName(".rr")
- }
-
- viper.AutomaticEnv() // read in environment variables that match
-
- // If a config file is found, read it in.
- if err := viper.ReadInConfig(); err == nil {
- fmt.Println("Using config file:", viper.ConfigFileUsed())
- }
-}
diff --git a/cmd/rr-php/cmd/serve.go b/cmd/rr-php/cmd/serve.go
deleted file mode 100644
index 24ec4043..00000000
--- a/cmd/rr-php/cmd/serve.go
+++ /dev/null
@@ -1,85 +0,0 @@
-// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package cmd
-
-import (
- "github.com/spf13/cobra"
- "github.com/spiral/roadrunner"
- "os/exec"
- "time"
- "github.com/sirupsen/logrus"
- rrttp "github.com/spiral/roadrunner/http"
- "net/http"
- "os"
-)
-
-func init() {
- rootCmd.AddCommand(&cobra.Command{
- Use: "serve",
- Run: serveHandler,
- })
-}
-
-func serveHandler(cmd *cobra.Command, args []string) {
- rr := roadrunner.NewRouter(
- func() *exec.Cmd {
- return exec.Command("php", "/Users/wolfy-j/Projects/phpapp/webroot/index.php", "rr", "pipes")
- },
- roadrunner.NewPipeFactory(),
- )
-
- err := rr.Configure(roadrunner.Config{
- NumWorkers: 4,
- AllocateTimeout: time.Minute,
- DestroyTimeout: time.Minute,
- //MaxExecutions: 10,
- })
-
- rr.Observe(func(event int, ctx interface{}) {
- logrus.Info(ctx)
- })
-
- if err != nil {
- panic(err)
- }
-
- logrus.Info("serving")
-
- //Enable http2
- //srv := http.Server{
- // Addr: ":8080",
- // Handler: rrhttp.NewServer(
- // rrhttp.Config{
- // serveStatic: true,
- // Root: "/Users/wolfy-j/Projects/phpapp/webroot",
- // },
- // rr,
- // ),
- //}
-
- // srv.ListenAndServe()
-
- //http2.ConfigureServer(&srv, nil)
- //srv.ListenAndServeTLS("localhost.cert", "localhost.key")
-
- http.ListenAndServe(":8080", rrttp.NewServer(
- rrttp.Config{
- ServeStatic: true,
- Root: "/Users/wolfy-j/Projects/phpapp/webroot",
- UploadsDir: os.TempDir(),
- },
- rr,
- ))
-}
diff --git a/cmd/rr-php/main.go b/cmd/rr-php/main.go
deleted file mode 100644
index e78ce8b2..00000000
--- a/cmd/rr-php/main.go
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package main
-
-import (
- "github.com/spiral/roadrunner/cmd/rr-php/cmd"
-)
-
-func main() {
- cmd.Execute()
-}
diff --git a/cmd/rr/LICENSE b/cmd/rr/LICENSE
new file mode 100644
index 00000000..efb98c87
--- /dev/null
+++ b/cmd/rr/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 SpiralScout
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE. \ No newline at end of file
diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go
new file mode 100644
index 00000000..ac8466ef
--- /dev/null
+++ b/cmd/rr/cmd/root.go
@@ -0,0 +1,99 @@
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package cmd
+
+import (
+ "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+ "github.com/spf13/viper"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/cmd/rr/utils"
+ "os"
+ "fmt"
+)
+
+// Service bus for all the commands.
+var (
+ // Shared service bus.
+ Bus *service.Bus
+
+ // Root is application endpoint.
+ Root = &cobra.Command{
+ Use: "rr",
+ Short: "RoadRunner, PHP application server",
+ }
+
+ cfgFile string
+ verbose bool
+)
+
+// Execute adds all child commands to the Root command and sets flags appropriately.
+// This is called by main.main(). It only needs to happen once to the Root.
+func Execute(serviceBus *service.Bus) {
+ Bus = serviceBus
+ if err := Root.Execute(); err != nil {
+ fmt.Println(err)
+ os.Exit(1)
+ }
+}
+
+func init() {
+ Root.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "verbose output")
+ Root.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is .rr.yaml)")
+
+ cobra.OnInitialize(func() {
+ if verbose {
+ logrus.SetLevel(logrus.DebugLevel)
+ }
+
+ if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil {
+ if err := Bus.Configure(cfg); err != nil {
+ panic(err)
+ }
+ }
+ })
+}
+
+func initConfig(cfgFile string, path []string, name string) service.Config {
+ cfg := viper.New()
+
+ if cfgFile != "" {
+ // Use cfg file from the flag.
+ cfg.SetConfigFile(cfgFile)
+ } else {
+ // automatic location
+ for _, p := range path {
+ cfg.AddConfigPath(p)
+ }
+ cfg.SetConfigName(name)
+ }
+
+ // read in environment variables that match
+ cfg.AutomaticEnv()
+
+ // If a cfg file is found, read it in.
+ if err := cfg.ReadInConfig(); err != nil {
+ logrus.Warnf("config: %s", err)
+ return nil
+ }
+
+ return &utils.ConfigWrapper{cfg}
+}
diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go
new file mode 100644
index 00000000..f5524556
--- /dev/null
+++ b/cmd/rr/cmd/serve.go
@@ -0,0 +1,48 @@
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package cmd
+
+import (
+ "github.com/spf13/cobra"
+ "os"
+ "os/signal"
+ "syscall"
+)
+
+var (
+ stopSignal = make(chan os.Signal, 1)
+)
+
+func init() {
+ Root.AddCommand(&cobra.Command{
+ Use: "serve",
+ Short: "Serve RoadRunner service(s)",
+ Run: serveHandler,
+ })
+
+ signal.Notify(stopSignal, syscall.SIGTERM)
+}
+
+func serveHandler(cmd *cobra.Command, args []string) {
+ Bus.Serve()
+ <-stopSignal
+ Bus.Stop()
+}
diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reload.go
new file mode 100644
index 00000000..6caf0a71
--- /dev/null
+++ b/cmd/rr/http/reload.go
@@ -0,0 +1,50 @@
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package http
+
+import (
+ "github.com/spf13/cobra"
+ "fmt"
+ rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+)
+
+func init() {
+ rr.Root.AddCommand(&cobra.Command{
+ Use: "http:reload",
+ Short: "Reload RoadRunner worker pools for the HTTP service",
+ Run: reloadHandler,
+ })
+}
+
+func reloadHandler(cmd *cobra.Command, args []string) {
+ client, err := rr.Bus.RCPClient()
+ if err != nil {
+ panic(err) // todo: change
+ }
+ defer client.Close()
+
+ var r string
+ if err := client.Call("http.Reset", true, &r); err != nil {
+ panic(err)
+ }
+
+ fmt.Println(r)
+}
diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go
new file mode 100644
index 00000000..c1225987
--- /dev/null
+++ b/cmd/rr/http/workers.go
@@ -0,0 +1,64 @@
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package http
+
+import (
+ "github.com/spf13/cobra"
+ "github.com/spiral/roadrunner/http"
+ "github.com/olekukonko/tablewriter"
+ "os"
+ "strconv"
+ rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+)
+
+func init() {
+ rr.Root.AddCommand(&cobra.Command{
+ Use: "http:workers",
+ Short: "List workers associated with RoadRunner HTTP service",
+ Run: workersHandler,
+ })
+}
+
+func workersHandler(cmd *cobra.Command, args []string) {
+ client, err := rr.Bus.RCPClient()
+ if err != nil {
+ panic(err) // todo: change
+ }
+ defer client.Close()
+
+ var r http.WorkerList
+ if err := client.Call("http.Workers", true, &r); err != nil {
+ panic(err)
+ }
+
+ tw := tablewriter.NewWriter(os.Stdout)
+ tw.SetHeader([]string{"PID", "Status", "Num Execs"})
+
+ for _, w := range r.Workers {
+ tw.Append([]string{
+ strconv.Itoa(w.Pid),
+ w.Status,
+ strconv.Itoa(int(w.NumExecs)),
+ })
+ }
+
+ tw.Render()
+}
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
new file mode 100644
index 00000000..336aeddd
--- /dev/null
+++ b/cmd/rr/main.go
@@ -0,0 +1,44 @@
+// MIT License
+//
+// Copyright (c) 2018 SpiralScout
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package main
+
+import (
+ "github.com/spiral/roadrunner/cmd/rr/cmd"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/http"
+ _ "github.com/spiral/roadrunner/cmd/rr/http"
+)
+
+var bus *service.Bus
+
+func init() {
+ bus = service.NewBus()
+}
+
+func main() {
+ // http server with PSR7 support
+ bus.Register(&http.Service{})
+
+ // you can register additional commands using cmd.Root
+ cmd.Execute(bus)
+}
diff --git a/cmd/rr/utils/config.go b/cmd/rr/utils/config.go
new file mode 100644
index 00000000..e7e22b3a
--- /dev/null
+++ b/cmd/rr/utils/config.go
@@ -0,0 +1,23 @@
+package utils
+
+import (
+ "github.com/spf13/viper"
+ "github.com/spiral/roadrunner/service"
+)
+
+type ConfigWrapper struct {
+ Viper *viper.Viper
+}
+
+func (w *ConfigWrapper) Get(key string) service.Config {
+ sub := w.Viper.Sub(key)
+ if sub == nil {
+ return nil
+ }
+
+ return &ConfigWrapper{sub}
+}
+
+func (w *ConfigWrapper) Unmarshal(out interface{}) error {
+ return w.Viper.Unmarshal(out)
+}
diff --git a/cmd/rr/utils/cprint.go b/cmd/rr/utils/cprint.go
new file mode 100644
index 00000000..f6f828f8
--- /dev/null
+++ b/cmd/rr/utils/cprint.go
@@ -0,0 +1,32 @@
+package utils
+
+import (
+ "fmt"
+ "gopkg.in/AlecAivazis/survey.v1/core"
+ "regexp"
+ "strings"
+)
+
+// Printf works identically to fmt.Print but adds `<white+hb>color formatting support for CLI</reset>`.
+func Printf(format string, args ...interface{}) {
+ fmt.Print(Sprintf(format, args...))
+}
+
+// Sprintf works identically to fmt.Sprintf but adds `<white+hb>color formatting support for CLI</reset>`.
+func Sprintf(format string, args ...interface{}) string {
+ r, err := regexp.Compile(`<([^>]+)>`)
+ if err != nil {
+ panic(err)
+ }
+
+ format = r.ReplaceAllStringFunc(format, func(s string) string {
+ return fmt.Sprintf(`{{color "%s"}}`, strings.Trim(s, "<>/"))
+ })
+
+ out, err := core.RunTemplate(fmt.Sprintf(format, args...), nil)
+ if err != nil {
+ panic(err)
+ }
+
+ return out
+}
diff --git a/cmd/rr/utils/verbose.go b/cmd/rr/utils/verbose.go
new file mode 100644
index 00000000..43770f34
--- /dev/null
+++ b/cmd/rr/utils/verbose.go
@@ -0,0 +1,18 @@
+package utils
+
+//if f.Verbose {
+// rr.Observe(func(event int, ctx interface{}) {
+// switch event {
+// case roadrunner.EventPoolError:
+// logrus.Error(ctx)
+// case roadrunner.EventWorkerCreate:
+// logrus.Infof("%s - created", ctx)
+// case roadrunner.EventWorkerError:
+// logrus.Errorf("%s: %s", ctx.(roadrunner.WorkerError).Worker, ctx.(roadrunner.WorkerError).Error())
+// case roadrunner.EventWorkerDestruct:
+// logrus.Warnf("%s - destructed", ctx)
+// case roadrunner.EventWorkerKill:
+// logrus.Warnf("%s - killed", ctx)
+// }
+// })
+//}
diff --git a/config.go b/config.go
index f2d8b609..0ddaa14e 100644
--- a/config.go
+++ b/config.go
@@ -25,7 +25,7 @@ type Config struct {
DestroyTimeout time.Duration
}
-// Valid returns error if cfg not valid
+// Configure returns error if cfg not valid
func (cfg *Config) Valid() error {
if cfg.NumWorkers == 0 {
return fmt.Errorf("cfg.NumWorkers must be set")
diff --git a/factory.go b/factory.go
index 97ea3a87..73896406 100644
--- a/factory.go
+++ b/factory.go
@@ -2,7 +2,7 @@ package roadrunner
import "os/exec"
-// Factory is responsible of wrapping given command into tasks worker.
+// Pool is responsible of wrapping given command into tasks worker.
type Factory interface {
// SpawnWorker creates new worker process based on given command.
// Process must not be started.
diff --git a/http/config.go b/http/config.go
new file mode 100644
index 00000000..fe5fab36
--- /dev/null
+++ b/http/config.go
@@ -0,0 +1,85 @@
+package http
+
+import (
+ "strings"
+ "path"
+ "github.com/spiral/roadrunner/service"
+ "os"
+ "github.com/spiral/roadrunner/utils"
+ "fmt"
+)
+
+// Configures RoadRunner HTTP server.
+type Config struct {
+ // serve enables static file serving from desired root directory.
+ ServeStatic bool
+
+ // Root directory, required when serve set to true.
+ Root string
+
+ // TmpDir contains name of temporary directory to store uploaded files passed to underlying PHP process.
+ TmpDir string
+
+ // MaxRequest specified max size for payload body in bytes, set 0 to unlimited.
+ MaxRequest int64
+
+ // ForbidUploads specifies list of file extensions which are forbidden for uploads.
+ // Example: .php, .exe, .bat, .htaccess and etc.
+ ForbidUploads []string
+}
+
+// ForbidUploads must return true if file extension is not allowed for the upload.
+func (cfg Config) Forbidden(filename string) bool {
+ ext := strings.ToLower(path.Ext(filename))
+
+ for _, v := range cfg.ForbidUploads {
+ if ext == v {
+ return true
+ }
+ }
+
+ return false
+}
+
+type serviceConfig struct {
+ Enabled bool
+ Host string
+ Port string
+ MaxRequest string
+ Static struct {
+ Serve bool
+ Root string
+ }
+
+ Uploads struct {
+ TmpDir string
+ Forbid []string
+ }
+
+ Pool service.PoolConfig
+
+ //todo: verbose ?
+}
+
+func (cfg *serviceConfig) httpAddr() string {
+ return fmt.Sprintf("%s:%v", cfg.Host, cfg.Port)
+}
+
+func (cfg *serviceConfig) httpConfig() *Config {
+ tmpDir := cfg.Uploads.TmpDir
+ if tmpDir == "" {
+ tmpDir = os.TempDir()
+ }
+
+ return &Config{
+ ServeStatic: cfg.Static.Serve,
+ Root: cfg.Static.Root,
+ TmpDir: tmpDir,
+ MaxRequest: utils.ParseSize(cfg.MaxRequest),
+ ForbidUploads: cfg.Uploads.Forbid,
+ }
+}
+
+func (cfg *serviceConfig) Valid() error {
+ return nil
+}
diff --git a/http/data.go b/http/data.go
index 865e4760..b84150ee 100644
--- a/http/data.go
+++ b/http/data.go
@@ -42,7 +42,7 @@ func (d dataTree) push(k string, v []string) {
// mount mounts data tree recursively.
func (d dataTree) mount(i []string, v []string) {
- if len(v) == 0 || v[0] == "" {
+ if len(v) == 0 {
return
}
diff --git a/http/request.go b/http/request.go
index 572d7d6a..516deda2 100644
--- a/http/request.go
+++ b/http/request.go
@@ -79,13 +79,13 @@ func NewRequest(r *http.Request) (req *Request, err error) {
return req, nil
}
-// OpenUploads moves all uploaded files to temporary directory so it can be given to php later.
-func (r *Request) OpenUploads(tmpDir string) error {
+// Open moves all uploaded files to temporary directory so it can be given to php later.
+func (r *Request) Open(cfg *Config) error {
if r.Uploads == nil {
return nil
}
- return r.Uploads.OpenUploads(tmpDir)
+ return r.Uploads.Open(cfg)
}
// Close clears all temp file uploads
diff --git a/http/rpc.go b/http/rpc.go
new file mode 100644
index 00000000..c096ff77
--- /dev/null
+++ b/http/rpc.go
@@ -0,0 +1,57 @@
+package http
+
+import (
+ "github.com/sirupsen/logrus"
+)
+
+type RPCServer struct {
+ Service *Service
+}
+
+// WorkerList contains list of workers.
+type WorkerList struct {
+ // Workers is list of workers.
+ Workers []Worker `json:"workers"`
+}
+
+// Worker provides information about specific worker.
+type Worker struct {
+ // Pid contains process id.
+ Pid int `json:"pid"`
+
+ // Status of the worker.
+ Status string `json:"status"`
+
+ // Number of worker executions.
+ NumExecs uint64 `json:"numExecs"`
+
+ // Created is unix nano timestamp of worker creation time.
+ Created int64 `json:"created"`
+
+ // Updated is unix nano timestamp of last worker execution.
+ Updated int64 `json:"updated"`
+}
+
+// Reset resets underlying RR worker pool and restarts all of it's workers.
+func (rpc *RPCServer) Reset(reset bool, r *string) error {
+ logrus.Info("resetting worker pool")
+ *r = "OK"
+
+ return rpc.Service.srv.rr.Reset()
+}
+
+// Workers returns list of active workers and their stats.
+func (rpc *RPCServer) Workers(list bool, r *WorkerList) error {
+ for _, w := range rpc.Service.srv.rr.Workers() {
+ state := w.State()
+ r.Workers = append(r.Workers, Worker{
+ Pid: *w.Pid,
+ Status: state.String(),
+ NumExecs: state.NumExecs(),
+ Created: w.Created.UnixNano(),
+ Updated: state.Updated().UnixNano(),
+ })
+ }
+
+ return nil
+}
diff --git a/http/server.go b/http/server.go
index 363dca2d..039dba02 100644
--- a/http/server.go
+++ b/http/server.go
@@ -3,30 +3,21 @@ package http
import (
"github.com/spiral/roadrunner"
"net/http"
+ "strconv"
+ "errors"
+ "github.com/sirupsen/logrus"
)
-// Configures RoadRunner HTTP server.
-type Config struct {
- // serve enables static file serving from desired root directory.
- ServeStatic bool
-
- // Root directory, required when serve set to true.
- Root string
-
- // UploadsDir contains name of temporary directory to store uploaded files passed to underlying PHP process.
- UploadsDir string
-}
-
-// Server serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
+// Service serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
// parsed files and query, payload will include parsed form dataTree (if any).
type Server struct {
- cfg Config
+ cfg *Config
static *staticServer
rr *roadrunner.Server
}
// NewServer returns new instance of HTTP PSR7 server.
-func NewServer(cfg Config, server *roadrunner.Server) *Server {
+func NewServer(cfg *Config, server *roadrunner.Server) *Server {
h := &Server{cfg: cfg, rr: server}
if cfg.ServeStatic {
@@ -42,13 +33,26 @@ func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) () {
return
}
+ // validating request size
+ if srv.cfg.MaxRequest != 0 {
+ if length := r.Header.Get("content-length"); length != "" {
+ if size, err := strconv.ParseInt(length, 10, 64); err != nil {
+ srv.sendError(w, r, err)
+ return
+ } else if size > srv.cfg.MaxRequest {
+ srv.sendError(w, r, errors.New("request body max size is exceeded"))
+ return
+ }
+ }
+ }
+
req, err := NewRequest(r)
if err != nil {
srv.sendError(w, r, err)
return
}
- if err = req.OpenUploads(srv.cfg.UploadsDir); err != nil {
+ if err = req.Open(srv.cfg); err != nil {
srv.sendError(w, r, err)
return
}
@@ -77,6 +81,10 @@ func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) () {
// sendError sends error
func (srv *Server) sendError(w http.ResponseWriter, r *http.Request, err error) {
+ if _, job := err.(roadrunner.JobError); !job {
+ logrus.Error(err)
+ }
+
w.WriteHeader(500)
w.Write([]byte(err.Error()))
}
diff --git a/http/service.go b/http/service.go
new file mode 100644
index 00000000..e38e1a03
--- /dev/null
+++ b/http/service.go
@@ -0,0 +1,67 @@
+package http
+
+import (
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner/service"
+ "net/http"
+ "context"
+)
+
+type Service struct {
+ cfg *serviceConfig
+ http *http.Server
+ srv *Server
+}
+
+func (s *Service) Name() string {
+ return "http"
+}
+
+func (s *Service) Configure(cfg service.Config) (bool, error) {
+ config := &serviceConfig{}
+ if err := cfg.Unmarshal(config); err != nil {
+ return false, err
+ }
+
+ if !config.Enabled {
+ return false, nil
+ }
+
+ if err := config.Valid(); err != nil {
+ return false, err
+ }
+
+ s.cfg = config
+ return true, nil
+}
+
+func (s *Service) RPC() interface{} {
+ return &RPCServer{s}
+}
+
+func (s *Service) Serve() error {
+ logrus.Debugf("http: started")
+ defer logrus.Debugf("http: stopped")
+
+ rr, term, err := s.cfg.Pool.NewServer()
+ if err != nil {
+ return err
+ }
+ defer term()
+
+ s.srv = NewServer(s.cfg.httpConfig(), rr)
+ s.http = &http.Server{
+ Addr: s.cfg.httpAddr(),
+ Handler: s.srv,
+ }
+
+ if err := s.http.ListenAndServe(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (s *Service) Stop() error {
+ return s.http.Shutdown(context.Background())
+}
diff --git a/http/uploads.go b/http/uploads.go
index 1b851e6e..370e73e6 100644
--- a/http/uploads.go
+++ b/http/uploads.go
@@ -3,12 +3,30 @@ package http
import (
"mime/multipart"
"encoding/json"
- "log"
"strings"
"net/http"
"io/ioutil"
"io"
"sync"
+ "os"
+ "fmt"
+)
+
+const (
+ // There is no error, the file uploaded with success.
+ UploadErrorOK = 0
+
+ // No file was uploaded.
+ UploadErrorNoFile = 4
+
+ // Missing a temporary folder.
+ UploadErrorNoTmpDir = 5
+
+ // Failed to write file to disk.
+ UploadErrorCantWrite = 6
+
+ // ForbidUploads file extension.
+ UploadErrorExtension = 7
)
// FileUpload represents singular file wrapUpload.
@@ -17,46 +35,56 @@ type FileUpload struct {
Name string `json:"name"`
// MimeType contains mime-type provided by the client.
- MimeType string `json:"mimetype"`
+ MimeType string `json:"type"`
// Size of the uploaded file.
Size int64 `json:"size"`
// Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php
- Error int
+ Error int `json:"error"`
// TempFilename points to temporary file location.
- TempFilename string `json:"tempFilename"`
+ TempFilename string `json:"tmpName"`
// associated file header
header *multipart.FileHeader
}
-func (f *FileUpload) Open(tmpDir string) error {
+func (f *FileUpload) Open(cfg *Config) error {
+ if cfg.Forbidden(f.Name) {
+ f.Error = UploadErrorExtension
+ return nil
+ }
+
file, err := f.header.Open()
if err != nil {
+ f.Error = UploadErrorNoFile
return err
}
-
defer file.Close()
- tmp, err := ioutil.TempFile(tmpDir, "upload")
+ tmp, err := ioutil.TempFile(cfg.TmpDir, "upload")
if err != nil {
+ // most likely cause of this issue is missing tmp dir
+ f.Error = UploadErrorNoTmpDir
return err
}
f.TempFilename = tmp.Name()
defer tmp.Close()
- f.Size, err = io.Copy(tmp, file)
+ if f.Size, err = io.Copy(tmp, file); err != nil {
+ f.Error = UploadErrorCantWrite
+ }
+
return err
}
func wrapUpload(f *multipart.FileHeader) *FileUpload {
- log.Print(f.Header)
return &FileUpload{
Name: f.Filename,
MimeType: f.Header.Get("Content-Type"),
+ Error: UploadErrorOK,
header: f,
}
}
@@ -119,26 +147,29 @@ func (u *Uploads) MarshalJSON() ([]byte, error) {
return json.Marshal(u.tree)
}
-// OpenUploads moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
+// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
// will be handled individually. @todo: do we need it?
-func (u *Uploads) OpenUploads(tmpDir string) error {
+func (u *Uploads) Open(cfg *Config) error {
var wg sync.WaitGroup
for _, f := range u.list {
wg.Add(1)
go func(f *FileUpload) {
defer wg.Done()
- f.Open(tmpDir)
+ f.Open(cfg)
}(f)
}
wg.Wait()
- log.Print(u.list)
return nil
}
// Clear deletes all temporary files.
func (u *Uploads) Clear() {
-
+ for _, f := range u.list {
+ if f.TempFilename != "" && exists(f.TempFilename) {
+ os.Remove(f.TempFilename)
+ }
+ }
}
// parse incoming dataTree request into JSON (including multipart form dataTree)
@@ -160,3 +191,17 @@ func parseUploads(r *http.Request) (*Uploads, error) {
return u, nil
}
+
+// exists if file exists. by osutils; todo: better?
+func exists(path string) bool {
+ _, err := os.Stat(path)
+ if err == nil {
+ return true
+ }
+
+ if os.IsNotExist(err) {
+ return false
+ }
+
+ panic(fmt.Errorf("unable to stat path %q; %v", path, err))
+}
diff --git a/pool.go b/pool.go
index 360b895d..ac1a6820 100644
--- a/pool.go
+++ b/pool.go
@@ -4,9 +4,12 @@ const (
// EventWorkerCreate thrown when new worker is spawned.
EventWorkerCreate = iota
- // EventWorkerDestruct thrown before worker destruction.
+ // EventWorkerDestruct thrown after worker destruction.
EventWorkerDestruct
+ // EventWorkerKill thrown after worker is being forcefully killed.
+ EventWorkerKill
+
// EventWorkerError thrown any worker related even happen (passed with WorkerError)
EventWorkerError
diff --git a/server.go b/server.go
index d873da63..ef5bb413 100644
--- a/server.go
+++ b/server.go
@@ -8,13 +8,13 @@ import (
const (
// EventNewPool triggered when server creates new pool.
- EventNewPool = 3
+ EventNewPool = 60
// EventDestroyPool triggered when server destroys existed pool.
- EventDestroyPool = 4
+ EventDestroyPool = 61
)
-// Server manages pool creation and swapping.
+// Service manages pool creation and swapping.
type Server struct {
// observes pool events (can be attached to multiple pools at the same time)
observer func(event int, ctx interface{})
@@ -35,8 +35,8 @@ type Server struct {
pool Pool
}
-// NewRouter creates new router. Make sure to call configure before the usage.
-func NewRouter(cmd func() *exec.Cmd, factory Factory) *Server {
+// NewServer creates new router. Make sure to call configure before the usage.
+func NewServer(cmd func() *exec.Cmd, factory Factory) *Server {
return &Server{
cmd: cmd,
factory: factory,
diff --git a/service/bus.go b/service/bus.go
new file mode 100644
index 00000000..40e73c31
--- /dev/null
+++ b/service/bus.go
@@ -0,0 +1,165 @@
+package service
+
+import (
+ "github.com/sirupsen/logrus"
+ "net/rpc"
+ "sync"
+ "github.com/spiral/goridge"
+ "github.com/pkg/errors"
+)
+
+const (
+ rpcConfig = "rpc"
+)
+
+var (
+ dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
+)
+
+type Bus struct {
+ wg sync.WaitGroup
+ services []Service
+ enabled []Service
+ stop chan interface{}
+ rpc *rpc.Server
+ rpcConfig *RPCConfig
+}
+
+func (b *Bus) Register(s Service) {
+ b.services = append(b.services, s)
+}
+
+func (b *Bus) Services() []Service {
+ return b.services
+}
+
+func (b *Bus) Configure(cfg Config) error {
+ if segment := cfg.Get(rpcConfig); segment == nil {
+ logrus.Warn("rpc: no config has been provided")
+ } else {
+ b.rpcConfig = &RPCConfig{}
+ if err := segment.Unmarshal(b.rpcConfig); err != nil {
+ return err
+ }
+ }
+
+ b.enabled = make([]Service, 0)
+
+ for _, s := range b.services {
+ segment := cfg.Get(s.Name())
+ if segment == nil {
+ // no config has been provided for the service
+ logrus.Debugf("%s: no config has been provided", s.Name())
+ continue
+ }
+
+ if enable, err := s.Configure(segment); err != nil {
+ return err
+ } else if !enable {
+ continue
+ }
+
+ b.enabled = append(b.enabled, s)
+ }
+
+ return nil
+}
+
+func (b *Bus) RCPClient() (*rpc.Client, error) {
+ if b.rpcConfig == nil {
+ return nil, errors.New("rpc is not configured")
+ }
+
+ conn, err := b.rpcConfig.CreateDialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+}
+
+func (b *Bus) Serve() {
+ b.rpc = rpc.NewServer()
+
+ for _, s := range b.enabled {
+ // some services might provide net/rpc api for internal communications
+ if api := s.RPC(); api != nil {
+ b.rpc.RegisterName(s.Name(), api)
+ }
+
+ b.wg.Add(1)
+ go func() {
+ defer b.wg.Done()
+
+ if err := s.Serve(); err != nil {
+ logrus.Errorf("%s.start: ", s.Name(), err)
+ }
+ }()
+ }
+
+ b.wg.Add(1)
+ go func() {
+ defer b.wg.Done()
+
+ logrus.Debug("rpc: started")
+ if err := b.serveRPC(); err != nil {
+ logrus.Errorf("rpc: %s", err)
+ }
+ }()
+
+ b.wg.Wait()
+}
+
+func (b *Bus) Stop() {
+ if err := b.stopRPC(); err != nil {
+ logrus.Errorf("rpc: ", err)
+ }
+
+ for _, s := range b.enabled {
+ if err := s.Stop(); err != nil {
+ logrus.Errorf("%s.stop: ", s.Name(), err)
+ }
+ }
+
+ b.wg.Wait()
+}
+
+func (b *Bus) serveRPC() error {
+ if b.rpcConfig == nil {
+ return nil
+ }
+
+ b.stop = make(chan interface{})
+
+ ln, err := b.rpcConfig.CreateListener()
+ if err != nil {
+ return err
+ }
+ defer ln.Close()
+
+ for {
+ select {
+ case <-b.stop:
+ b.rpc = nil
+ return nil
+ default:
+ conn, err := ln.Accept()
+ if err != nil {
+ continue
+ }
+
+ go b.rpc.ServeCodec(goridge.NewCodec(conn))
+ }
+ }
+
+ return nil
+}
+
+func (b *Bus) stopRPC() error {
+ if b.rpcConfig == nil {
+ return nil
+ }
+
+ close(b.stop)
+ return nil
+}
diff --git a/service/config.go b/service/config.go
new file mode 100644
index 00000000..d5381376
--- /dev/null
+++ b/service/config.go
@@ -0,0 +1,6 @@
+package service
+
+type Config interface {
+ Get(key string) Config
+ Unmarshal(out interface{}) error
+}
diff --git a/service/factory.go b/service/factory.go
new file mode 100644
index 00000000..dbdebc4f
--- /dev/null
+++ b/service/factory.go
@@ -0,0 +1,69 @@
+package service
+
+import (
+ "github.com/spiral/roadrunner"
+ "time"
+ "os/exec"
+ "strings"
+ "net"
+)
+
+type PoolConfig struct {
+ Command string
+ Relay string
+
+ Number uint64
+ MaxJobs uint64
+
+ Timeouts struct {
+ Allocate int
+ Destroy int
+ }
+}
+
+func (f *PoolConfig) NewServer() (*roadrunner.Server, func(), error) {
+ relays, terminator, err := f.relayFactory()
+ if err != nil {
+ terminator()
+ return nil, nil, err
+ }
+
+ rr := roadrunner.NewServer(f.cmd(), relays)
+ if err := rr.Configure(f.rrConfig()); err != nil {
+ return nil, nil, err
+ }
+
+ return rr, nil, nil
+}
+
+func (f *PoolConfig) rrConfig() roadrunner.Config {
+ return roadrunner.Config{
+ NumWorkers: f.Number,
+ MaxExecutions: f.MaxJobs,
+ AllocateTimeout: time.Second * time.Duration(f.Timeouts.Allocate),
+ DestroyTimeout: time.Second * time.Duration(f.Timeouts.Destroy),
+ }
+}
+
+func (f *PoolConfig) cmd() func() *exec.Cmd {
+ cmd := strings.Split(f.Command, " ")
+ return func() *exec.Cmd { return exec.Command(cmd[0], cmd[1:]...) }
+}
+
+func (f *PoolConfig) relayFactory() (roadrunner.Factory, func(), error) {
+ if f.Relay == "pipes" || f.Relay == "pipe" {
+ return roadrunner.NewPipeFactory(), nil, nil
+ }
+
+ dsn := strings.Split(f.Relay, "://")
+ if len(dsn) != 2 {
+ return nil, nil, dsnError
+ }
+
+ ln, err := net.Listen(dsn[0], dsn[1])
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return roadrunner.NewSocketFactory(ln, time.Minute), func() { ln.Close() }, nil
+}
diff --git a/service/rpc.go b/service/rpc.go
new file mode 100644
index 00000000..eb128768
--- /dev/null
+++ b/service/rpc.go
@@ -0,0 +1,32 @@
+package service
+
+import (
+ "net"
+ "strings"
+)
+
+type RPCConfig struct {
+ Listen string
+}
+
+func (cfg *RPCConfig) CreateListener() (net.Listener, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, dsnError
+ }
+
+ return net.Listen(dsn[0], dsn[1])
+}
+
+func (cfg *RPCConfig) CreateDialer() (net.Conn, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, dsnError
+ }
+
+ return net.Dial(dsn[0], dsn[1])
+}
+
+func NewBus() *Bus {
+ return &Bus{services: make([]Service, 0)}
+}
diff --git a/service/service.go b/service/service.go
new file mode 100644
index 00000000..2f704657
--- /dev/null
+++ b/service/service.go
@@ -0,0 +1,9 @@
+package service
+
+type Service interface {
+ Name() string
+ Configure(cfg Config) (bool, error)
+ RPC() interface{}
+ Serve() error
+ Stop() error
+}
diff --git a/static_pool.go b/static_pool.go
index be5e9b06..0527d024 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -107,7 +107,7 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
if err != nil {
// soft job errors are allowed
if _, jobError := err.(JobError); jobError {
- p.free <- w
+ p.release(w)
return nil, err
}
@@ -121,12 +121,7 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
return p.Exec(rqs)
}
- if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions {
- go p.replaceWorker(w, p.cfg.MaxExecutions)
- } else {
- p.free <- w
- }
-
+ p.release(w)
return rsp, nil
}
@@ -165,6 +160,16 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
}
}
+// release releases or replaces the worker.
+func (p *StaticPool) release(w *Worker) {
+ if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions {
+ go p.replaceWorker(w, p.cfg.MaxExecutions)
+ return
+ }
+
+ p.free <- w
+}
+
// replaceWorker replaces dead or expired worker with new instance.
func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
go p.destroyWorker(w)
@@ -183,13 +188,11 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
// destroyWorker destroys workers and removes it from the pool.
func (p *StaticPool) destroyWorker(w *Worker) {
- p.throw(EventWorkerDestruct, w)
-
// detaching
p.muw.Lock()
for i, wc := range p.workers {
if wc == w {
- p.workers = p.workers[:i+1]
+ p.workers = append(p.workers[:i], p.workers[i+1:]...)
break
}
}
@@ -200,11 +203,15 @@ func (p *StaticPool) destroyWorker(w *Worker) {
select {
case <-w.waitDone:
// worker is dead
+ p.throw(EventWorkerDestruct, w)
+
case <-time.NewTimer(p.cfg.DestroyTimeout).C:
- // failed to stop process
+ // failed to stop process in given time
if err := w.Kill(); err != nil {
p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
}
+
+ p.throw(EventWorkerKill, w)
}
}
diff --git a/utils/size.go b/utils/size.go
new file mode 100644
index 00000000..176cc9e1
--- /dev/null
+++ b/utils/size.go
@@ -0,0 +1,28 @@
+package utils
+
+import (
+ "strconv"
+ "strings"
+)
+
+func ParseSize(size string) int64 {
+ if len(size) == 0 {
+ return 0
+ }
+
+ s, err := strconv.Atoi(size[:len(size)-1])
+ if err != nil {
+ return 0
+ }
+
+ switch strings.ToLower(size[len(size)-1:]) {
+ case "k", "kb":
+ return int64(s * 1024)
+ case "m", "mb":
+ return int64(s * 1024 * 1024)
+ case "g", "gb":
+ return int64(s * 1024 * 1024 * 1024)
+ }
+
+ return 0
+}
diff --git a/worker.go b/worker.go
index cc37b69d..a29e514a 100644
--- a/worker.go
+++ b/worker.go
@@ -11,6 +11,7 @@ import (
"strconv"
"strings"
"sync"
+ "time"
)
// Worker - supervised process with api over goridge.Relay.
@@ -19,6 +20,9 @@ type Worker struct {
// can be nil while process is not started.
Pid *int
+ // Created indicates at what time worker has been created.
+ Created time.Time
+
// state holds information about current worker state,
// number of worker executions, last status change time.
// publicly this object is receive-only and protected using Mutex
@@ -26,7 +30,7 @@ type Worker struct {
state *state
// underlying command with associated process, command must be
- // provided to worker from outside in non-started form. Cmd
+ // provided to worker from outside in non-started form. Command
// stdErr direction will be handled by worker to aggregate error message.
cmd *exec.Cmd
@@ -54,6 +58,7 @@ func newWorker(cmd *exec.Cmd) (*Worker, error) {
}
w := &Worker{
+ Created: time.Now(),
cmd: cmd,
err: new(bytes.Buffer),
waitDone: make(chan interface{}),
@@ -134,15 +139,12 @@ func (w *Worker) Stop() error {
}
// Kill kills underlying process, make sure to call Wait() func to gather
-// error log from the stderr. Waits for process completion.
+// error log from the stderr. Does not waits for process completion!
func (w *Worker) Kill() error {
select {
case <-w.waitDone:
return nil
default:
- w.mu.Lock()
- defer w.mu.Unlock()
-
w.state.set(StateInactive)
err := w.cmd.Process.Signal(os.Kill)
@@ -163,7 +165,7 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
}
if w.state.Value() != StateReady {
- return nil, fmt.Errorf("worker is not ready (%s)", w.state.Value())
+ return nil, fmt.Errorf("worker is not ready (%s)", w.state.String())
}
w.state.set(StateWorking)