diff options
author | Valery Piashchynski <[email protected]> | 2020-12-17 02:34:44 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-17 02:34:44 +0300 |
commit | 9d5fe4f6a98b30fd73be8259f84fa595ac994a71 (patch) | |
tree | e49c46b03d8facc73e96f1b6247d83367cc65398 /plugins | |
parent | 1033c25b6bfc752d6059e446510f651e22cbf49b (diff) |
huge refactor
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/http/config.go | 6 | ||||
-rw-r--r-- | plugins/http/handler.go | 18 | ||||
-rw-r--r-- | plugins/http/plugin.go | 24 | ||||
-rw-r--r-- | plugins/http/request.go | 10 | ||||
-rw-r--r-- | plugins/http/response.go | 4 | ||||
-rw-r--r-- | plugins/http/tests/handler_test.go | 165 | ||||
-rw-r--r-- | plugins/http/tests/http_test.go | 3 | ||||
-rw-r--r-- | plugins/http/tests/response_test.go | 16 | ||||
-rw-r--r-- | plugins/http/tests/uploads_test.go | 27 | ||||
-rw-r--r-- | plugins/informer/plugin.go | 6 | ||||
-rw-r--r-- | plugins/informer/tests/test_plugin.go | 9 | ||||
-rw-r--r-- | plugins/resetter/tests/test_plugin.go | 6 | ||||
-rw-r--r-- | plugins/server/plugin.go | 28 | ||||
-rw-r--r-- | plugins/server/tests/plugin_pipes.go | 15 | ||||
-rw-r--r-- | plugins/server/tests/plugin_sockets.go | 10 | ||||
-rw-r--r-- | plugins/server/tests/plugin_tcp.go | 10 |
16 files changed, 188 insertions, 169 deletions
diff --git a/plugins/http/config.go b/plugins/http/config.go index d6efe310..00d2940b 100644 --- a/plugins/http/config.go +++ b/plugins/http/config.go @@ -8,7 +8,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" ) type Cidrs []*net.IPNet @@ -56,7 +56,7 @@ type Config struct { Uploads *UploadsConfig // Pool configures worker pool. - Pool *roadrunner.PoolConfig + Pool *poolImpl.Config // Env is environment variables passed to the http pool Env map[string]string @@ -141,7 +141,7 @@ func (c *Config) EnableFCGI() bool { func (c *Config) InitDefaults() error { if c.Pool == nil { // default pool - c.Pool = &roadrunner.PoolConfig{ + c.Pool = &poolImpl.Config{ Debug: false, NumWorkers: int64(runtime.NumCPU()), MaxJobs: 1000, diff --git a/plugins/http/handler.go b/plugins/http/handler.go index 74b038ff..4cc08c41 100644 --- a/plugins/http/handler.go +++ b/plugins/http/handler.go @@ -10,9 +10,9 @@ import ( "github.com/hashicorp/go-multierror" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/log" - "github.com/spiral/roadrunner/v2/util" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/worker" ) const ( @@ -23,8 +23,10 @@ const ( EventError ) +const MB = 1024 * 1024 + type Handle interface { - AddListener(l util.EventListener) + AddListener(l worker.EventListener) ServeHTTP(w http.ResponseWriter, r *http.Request) } @@ -71,17 +73,17 @@ type handler struct { uploads UploadsConfig trusted Cidrs log log.Logger - pool roadrunner.Pool + pool pool.Pool mul sync.Mutex - lsn util.EventListener + lsn worker.EventListener } -func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool roadrunner.Pool) (Handle, error) { +func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) { if pool == nil { return nil, errors.E(errors.Str("pool should be initialized")) } return &handler{ - maxRequestSize: maxReqSize * roadrunner.MB, + maxRequestSize: maxReqSize * MB, uploads: uploads, pool: pool, trusted: trusted, @@ -89,7 +91,7 @@ func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool ro } // Listen attaches handler event controller. -func (h *handler) AddListener(l util.EventListener) { +func (h *handler) AddListener(l worker.EventListener) { h.mul.Lock() defer h.mul.Unlock() diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 13299da1..9cb01d4b 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -15,10 +15,12 @@ import ( "github.com/hashicorp/go-multierror" "github.com/spiral/endure" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/log" - factory "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/interfaces/status" + "github.com/spiral/roadrunner/v2/interfaces/worker" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" "github.com/spiral/roadrunner/v2/util" @@ -47,17 +49,17 @@ type Plugin struct { sync.Mutex configurer config.Configurer - server factory.Server + server server.Server log log.Logger cfg *Config // middlewares to chain mdwr middleware // Event listener to stdout - listener util.EventListener + listener worker.EventListener // Pool which attached to all servers - pool roadrunner.Pool + pool pool.Pool // servers RR handler handler Handle @@ -69,7 +71,7 @@ type Plugin struct { } // AddListener attaches server event controller. -func (s *Plugin) AddListener(listener util.EventListener) { +func (s *Plugin) AddListener(listener worker.EventListener) { // save listeners for Reset s.listener = listener s.pool.AddListener(listener) @@ -77,7 +79,7 @@ func (s *Plugin) AddListener(listener util.EventListener) { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Server) error { +func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server server.Server) error { const op = errors.Op("http Init") err := cfg.UnmarshalKey(PluginName, &s.cfg) if err != nil { @@ -97,7 +99,7 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Serv return errors.E(op, errors.Disabled) } - s.pool, err = server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + s.pool, err = server.NewWorkerPool(context.Background(), poolImpl.Config{ Debug: s.cfg.Pool.Debug, NumWorkers: s.cfg.Pool.NumWorkers, MaxJobs: s.cfg.Pool.MaxJobs, @@ -122,7 +124,7 @@ func (s *Plugin) logCallback(event interface{}) { s.log.Debug("http handler response received", "elapsed", ev.Elapsed().String(), "remote address", ev.Request.RemoteAddr) case ErrorEvent: s.log.Error("error event received", "elapsed", ev.Elapsed().String(), "error", ev.Error) - case roadrunner.WorkerEvent: + case worker.Event: s.log.Debug("worker event received", "event", ev.Event, "worker state", ev.Worker.State()) default: fmt.Println(event) @@ -284,7 +286,7 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // Server returns associated pool workers -func (s *Plugin) Workers() []roadrunner.WorkerBase { +func (s *Plugin) Workers() []worker.BaseProcess { return s.pool.Workers() } @@ -305,7 +307,7 @@ func (s *Plugin) Reset() error { return errors.E(op, err) } - s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + s.pool, err = s.server.NewWorkerPool(context.Background(), poolImpl.Config{ Debug: s.cfg.Pool.Debug, NumWorkers: s.cfg.Pool.NumWorkers, MaxJobs: s.cfg.Pool.MaxJobs, diff --git a/plugins/http/request.go b/plugins/http/request.go index 640bdec2..5df79b7d 100644 --- a/plugins/http/request.go +++ b/plugins/http/request.go @@ -9,8 +9,8 @@ import ( "strings" j "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/plugins/http/attributes" ) @@ -136,17 +136,17 @@ func (r *Request) Close(log log.Logger) { // Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open // files prior to calling this method. -func (r *Request) Payload() (roadrunner.Payload, error) { - p := roadrunner.Payload{} +func (r *Request) Payload() (internal.Payload, error) { + p := internal.Payload{} var err error if p.Context, err = json.Marshal(r); err != nil { - return roadrunner.EmptyPayload, err + return internal.Payload{}, err } if r.Parsed { if p.Body, err = json.Marshal(r.body); err != nil { - return roadrunner.EmptyPayload, err + return internal.Payload{}, err } } else if r.body != nil { p.Body = r.body.([]byte) diff --git a/plugins/http/response.go b/plugins/http/response.go index e3ac2756..9700a16c 100644 --- a/plugins/http/response.go +++ b/plugins/http/response.go @@ -6,7 +6,7 @@ import ( "strings" "sync" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/internal" ) // Response handles PSR7 response logic. @@ -23,7 +23,7 @@ type Response struct { } // NewResponse creates new response based on given pool payload. -func NewResponse(p roadrunner.Payload) (*Response, error) { +func NewResponse(p internal.Payload) (*Response, error) { r := &Response{Body: p.Body} if err := json.Unmarshal(p.Context, r); err != nil { return nil, err diff --git a/plugins/http/tests/handler_test.go b/plugins/http/tests/handler_test.go index 0c6a39ef..54a4ae80 100644 --- a/plugins/http/tests/handler_test.go +++ b/plugins/http/tests/handler_test.go @@ -10,7 +10,8 @@ import ( "runtime" "strings" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/pkg/pipe" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/stretchr/testify/assert" @@ -21,10 +22,10 @@ import ( ) func TestHandler_Echo(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -72,10 +73,10 @@ func Test_HandlerErrors(t *testing.T) { } func TestHandler_Headers(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "header", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -133,10 +134,10 @@ func TestHandler_Headers(t *testing.T) { } func TestHandler_Empty_User_Agent(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "user-agent", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -193,10 +194,10 @@ func TestHandler_Empty_User_Agent(t *testing.T) { } func TestHandler_User_Agent(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "user-agent", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -253,10 +254,10 @@ func TestHandler_User_Agent(t *testing.T) { } func TestHandler_Cookies(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "cookie", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -318,10 +319,10 @@ func TestHandler_Cookies(t *testing.T) { } func TestHandler_JsonPayload_POST(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -382,10 +383,10 @@ func TestHandler_JsonPayload_POST(t *testing.T) { } func TestHandler_JsonPayload_PUT(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -442,10 +443,10 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { } func TestHandler_JsonPayload_PATCH(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "payload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -502,10 +503,10 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { } func TestHandler_FormData_POST(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -575,10 +576,10 @@ func TestHandler_FormData_POST(t *testing.T) { } func TestHandler_FormData_POST_Overwrite(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -648,10 +649,10 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { } func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -720,10 +721,10 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { } func TestHandler_FormData_PUT(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -792,10 +793,10 @@ func TestHandler_FormData_PUT(t *testing.T) { } func TestHandler_FormData_PATCH(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -864,10 +865,10 @@ func TestHandler_FormData_PATCH(t *testing.T) { } func TestHandler_Multipart_POST(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -978,10 +979,10 @@ func TestHandler_Multipart_POST(t *testing.T) { } func TestHandler_Multipart_PUT(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1092,10 +1093,10 @@ func TestHandler_Multipart_PUT(t *testing.T) { } func TestHandler_Multipart_PATCH(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "data", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1208,10 +1209,10 @@ func TestHandler_Multipart_PATCH(t *testing.T) { } func TestHandler_Error(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1254,10 +1255,10 @@ func TestHandler_Error(t *testing.T) { } func TestHandler_Error2(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error2", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1300,10 +1301,10 @@ func TestHandler_Error2(t *testing.T) { } func TestHandler_Error3(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "pid", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1359,10 +1360,10 @@ func TestHandler_Error3(t *testing.T) { } func TestHandler_ResponseDuration(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1420,10 +1421,10 @@ func TestHandler_ResponseDuration(t *testing.T) { } func TestHandler_ResponseDurationDelayed(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echoDelay", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1480,10 +1481,10 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { } func TestHandler_ErrorDuration(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "error", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1554,10 +1555,10 @@ func TestHandler_IP(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1615,10 +1616,10 @@ func TestHandler_XRealIP(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1681,10 +1682,10 @@ func TestHandler_XForwardedFor(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1746,10 +1747,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, cidrs) - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "ip", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1794,10 +1795,10 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { } func BenchmarkHandler_Listen_Echo(b *testing.B) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "echo", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go index c8dd4b38..2979949d 100644 --- a/plugins/http/tests/http_test.go +++ b/plugins/http/tests/http_test.go @@ -19,6 +19,7 @@ import ( "github.com/spiral/endure" "github.com/spiral/goridge/v3" "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/mocks" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -900,7 +901,7 @@ func TestHttpEchoErr(t *testing.T) { mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1") mockLogger.EXPECT().Debug("WORLD", "pid", gomock.Any()) - mockLogger.EXPECT().Debug("worker event received", "event", roadrunner.EventWorkerLog, "worker state", gomock.Any()) + mockLogger.EXPECT().Debug("worker event received", "event", worker.EventWorkerLog, "worker state", gomock.Any()) err = cont.RegisterAll( cfg, diff --git a/plugins/http/tests/response_test.go b/plugins/http/tests/response_test.go index 2bfe7d56..a526fe03 100644 --- a/plugins/http/tests/response_test.go +++ b/plugins/http/tests/response_test.go @@ -6,7 +6,7 @@ import ( "net/http" "testing" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/internal" http2 "github.com/spiral/roadrunner/v2/plugins/http" "github.com/stretchr/testify/assert" ) @@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error { } func TestNewResponse_Error(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{Context: []byte(`invalid payload`)}) + r, err := http2.NewResponse(internal.Payload{Context: []byte(`invalid payload`)}) assert.Error(t, err) assert.Nil(t, r) } func TestNewResponse_Write(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), Body: []byte(`sample body`), }) @@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) { } func TestNewResponse_Stream(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -92,7 +92,7 @@ func TestNewResponse_Stream(t *testing.T) { } func TestNewResponse_StreamError(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -112,7 +112,7 @@ func TestNewResponse_StreamError(t *testing.T) { } func TestWrite_HandlesPush(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`), }) @@ -127,7 +127,7 @@ func TestWrite_HandlesPush(t *testing.T) { } func TestWrite_HandlesTrailers(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`), }) @@ -146,7 +146,7 @@ func TestWrite_HandlesTrailers(t *testing.T) { } func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) { - r, err := http2.NewResponse(roadrunner.Payload{ + r, err := http2.NewResponse(internal.Payload{ Context: []byte( `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`), }) diff --git a/plugins/http/tests/uploads_test.go b/plugins/http/tests/uploads_test.go index d36d4793..f255ec91 100644 --- a/plugins/http/tests/uploads_test.go +++ b/plugins/http/tests/uploads_test.go @@ -16,7 +16,8 @@ import ( "time" j "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/pkg/pipe" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/stretchr/testify/assert" ) @@ -26,10 +27,10 @@ var json = j.ConfigCompatibleWithStandardLibrary const testFile = "uploads_test.go" func TestHandler_Upload_File(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -109,10 +110,10 @@ func TestHandler_Upload_File(t *testing.T) { } func TestHandler_Upload_NestedFile(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -192,10 +193,10 @@ func TestHandler_Upload_NestedFile(t *testing.T) { } func TestHandler_Upload_File_NoTmpDir(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -275,10 +276,10 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { } func TestHandler_Upload_File_Forbids(t *testing.T) { - pool, err := roadrunner.NewPool(context.Background(), + pool, err := poolImpl.NewPool(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../../tests/http/client.php", "upload", "pipes") }, - roadrunner.NewPipeFactory(), - roadrunner.PoolConfig{ + pipe.NewPipeFactory(), + poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go index f3013394..449be085 100644 --- a/plugins/informer/plugin.go +++ b/plugins/informer/plugin.go @@ -3,9 +3,9 @@ package informer import ( "github.com/spiral/endure" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/informer" "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/interfaces/worker" ) const PluginName = "informer" @@ -21,8 +21,8 @@ func (p *Plugin) Init(log log.Logger) error { return nil } -// Workers provides WorkerBase slice with workers for the requested plugin -func (p *Plugin) Workers(name string) ([]roadrunner.WorkerBase, error) { +// Workers provides BaseProcess slice with workers for the requested plugin +func (p *Plugin) Workers(name string) ([]worker.BaseProcess, error) { const op = errors.Op("get workers") svc, ok := p.registry[name] if !ok { diff --git a/plugins/informer/tests/test_plugin.go b/plugins/informer/tests/test_plugin.go index 473b6de7..3fdefde3 100644 --- a/plugins/informer/tests/test_plugin.go +++ b/plugins/informer/tests/test_plugin.go @@ -4,17 +4,18 @@ import ( "context" "time" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/interfaces/worker" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/config" ) -var testPoolConfig = roadrunner.PoolConfig{ +var testPoolConfig = poolImpl.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - Supervisor: &roadrunner.SupervisorConfig{ + Supervisor: &poolImpl.SupervisorConfig{ WatchTick: 60, TTL: 1000, IdleTTL: 10, @@ -48,7 +49,7 @@ func (p1 *Plugin1) Name() string { return "informer.plugin1" } -func (p1 *Plugin1) Workers() []roadrunner.WorkerBase { +func (p1 *Plugin1) Workers() []worker.BaseProcess { pool, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) if err != nil { panic(err) diff --git a/plugins/resetter/tests/test_plugin.go b/plugins/resetter/tests/test_plugin.go index 9f48a43f..1d770e70 100644 --- a/plugins/resetter/tests/test_plugin.go +++ b/plugins/resetter/tests/test_plugin.go @@ -4,17 +4,17 @@ import ( "context" "time" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/server" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/plugins/config" ) -var testPoolConfig = roadrunner.PoolConfig{ +var testPoolConfig = poolImpl.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - Supervisor: &roadrunner.SupervisorConfig{ + Supervisor: &poolImpl.SupervisorConfig{ WatchTick: 60, TTL: 1000, IdleTTL: 10, diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index ea6d42eb..7c91bbcc 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -8,9 +8,13 @@ import ( "strings" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/pkg/pipe" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/socket" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" ) @@ -21,7 +25,7 @@ const PluginName = "server" type Plugin struct { cfg Config log log.Logger - factory roadrunner.Factory + factory worker.Factory } // Init application provider. @@ -93,7 +97,7 @@ func (server *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { } // NewWorker issues new standalone worker. -func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) { +func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (worker.BaseProcess, error) { const op = errors.Op("new worker") spawnCmd, err := server.CmdFactory(env) if err != nil { @@ -111,13 +115,13 @@ func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner } // NewWorkerPool issues new worker pool. -func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) { +func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env server.Env) (pool.Pool, error) { spawnCmd, err := server.CmdFactory(env) if err != nil { return nil, err } - p, err := roadrunner.NewPool(ctx, spawnCmd, server.factory, opt) + p, err := poolImpl.NewPool(ctx, spawnCmd, server.factory, opt) if err != nil { return nil, err } @@ -128,10 +132,10 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConf } // creates relay and worker factory. -func (server *Plugin) initFactory() (roadrunner.Factory, error) { +func (server *Plugin) initFactory() (worker.Factory, error) { const op = errors.Op("network factory init") if server.cfg.Relay == "" || server.cfg.Relay == "pipes" { - return roadrunner.NewPipeFactory(), nil + return pipe.NewPipeFactory(), nil } dsn := strings.Split(server.cfg.Relay, "://") @@ -147,9 +151,9 @@ func (server *Plugin) initFactory() (roadrunner.Factory, error) { switch dsn[0] { // sockets group case "unix": - return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil + return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil case "tcp": - return roadrunner.NewSocketServer(lsn, server.cfg.RelayTimeout), nil + return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil default: return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } @@ -165,11 +169,11 @@ func (server *Plugin) setEnv(e server.Env) []string { } func (server *Plugin) collectLogs(event interface{}) { - if we, ok := event.(roadrunner.WorkerEvent); ok { + if we, ok := event.(worker.Event); ok { switch we.Event { - case roadrunner.EventWorkerError: + case worker.EventWorkerError: server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid()) - case roadrunner.EventWorkerLog: + case worker.EventWorkerLog: server.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid()) } } diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go index fbd37e12..61c9a8f9 100644 --- a/plugins/server/tests/plugin_pipes.go +++ b/plugins/server/tests/plugin_pipes.go @@ -5,8 +5,11 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/internal" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -14,12 +17,12 @@ import ( const ConfigSection = "server" const Response = "test" -var testPoolConfig = roadrunner.PoolConfig{ +var testPoolConfig = poolImpl.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - Supervisor: &roadrunner.SupervisorConfig{ + Supervisor: &poolImpl.SupervisorConfig{ WatchTick: 60, TTL: 1000, IdleTTL: 10, @@ -31,7 +34,7 @@ var testPoolConfig = roadrunner.PoolConfig{ type Foo struct { configProvider config.Configurer wf server.Server - pool roadrunner.Pool + pool pool.Pool } func (f *Foo) Init(p config.Configurer, workerFactory server.Server) error { @@ -44,7 +47,7 @@ func (f *Foo) Serve() chan error { const op = errors.Op("serve") // test payload for echo - r := roadrunner.Payload{ + r := internal.Payload{ Context: nil, Body: []byte(Response), } @@ -78,7 +81,7 @@ func (f *Foo) Serve() chan error { } // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { errCh <- err return errCh diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go index 4942d4c5..3b97efff 100644 --- a/plugins/server/tests/plugin_sockets.go +++ b/plugins/server/tests/plugin_sockets.go @@ -4,8 +4,10 @@ import ( "context" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -13,7 +15,7 @@ import ( type Foo2 struct { configProvider config.Configurer wf server.Server - pool roadrunner.Pool + pool pool.Pool } func (f *Foo2) Init(p config.Configurer, workerFactory server.Server) error { @@ -29,7 +31,7 @@ func (f *Foo2) Serve() chan error { conf := &plugin.Config{} // test payload for echo - r := roadrunner.Payload{ + r := internal.Payload{ Context: nil, Body: []byte(Response), } @@ -59,7 +61,7 @@ func (f *Foo2) Serve() chan error { } // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { errCh <- err return errCh diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go index 89757a02..2857dadc 100644 --- a/plugins/server/tests/plugin_tcp.go +++ b/plugins/server/tests/plugin_tcp.go @@ -4,8 +4,10 @@ import ( "context" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -13,7 +15,7 @@ import ( type Foo3 struct { configProvider config.Configurer wf server.Server - pool roadrunner.Pool + pool pool.Pool } func (f *Foo3) Init(p config.Configurer, workerFactory server.Server) error { @@ -29,7 +31,7 @@ func (f *Foo3) Serve() chan error { conf := &plugin.Config{} // test payload for echo - r := roadrunner.Payload{ + r := internal.Payload{ Context: nil, Body: []byte(Response), } @@ -59,7 +61,7 @@ func (f *Foo3) Serve() chan error { } // test that our worker is functional - sw, err := roadrunner.NewSyncWorker(w) + sw, err := worker.From(w) if err != nil { errCh <- err return errCh |