diff options
author | Valery Piashchynski <[email protected]> | 2021-06-01 00:10:31 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-06-01 00:10:31 +0300 |
commit | 548ee4432e48b316ada00feec1a6b89e67ae4f2f (patch) | |
tree | 5cd2aaeeafdb50e3e46824197c721223f54695bf /pkg | |
parent | 8cd696bbca8fac2ced30d8172c41b7434ec86650 (diff) | |
parent | df4d316d519cea6dff654bd917521a616a37f769 (diff) |
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/bst/bst.go | 136 | ||||
-rw-r--r-- | pkg/bst/bst_test.go | 394 | ||||
-rw-r--r-- | pkg/bst/doc.go | 7 | ||||
-rw-r--r-- | pkg/bst/interface.go | 11 | ||||
-rw-r--r-- | pkg/pool/interface.go | 2 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 4 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 2 | ||||
-rw-r--r-- | pkg/pubsub/interface.go | 32 | ||||
-rw-r--r-- | pkg/pubsub/message.go | 24 | ||||
-rw-r--r-- | pkg/worker_handler/constants.go | 8 | ||||
-rw-r--r-- | pkg/worker_handler/errors.go | 25 | ||||
-rw-r--r-- | pkg/worker_handler/errors_windows.go | 27 | ||||
-rw-r--r-- | pkg/worker_handler/handler.go | 217 | ||||
-rw-r--r-- | pkg/worker_handler/parse.go | 149 | ||||
-rw-r--r-- | pkg/worker_handler/request.go | 187 | ||||
-rw-r--r-- | pkg/worker_handler/response.go | 105 | ||||
-rw-r--r-- | pkg/worker_handler/uploads.go | 159 | ||||
-rw-r--r-- | pkg/worker_watcher/interface.go | 6 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 6 |
19 files changed, 1491 insertions, 10 deletions
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go new file mode 100644 index 00000000..664937ba --- /dev/null +++ b/pkg/bst/bst.go @@ -0,0 +1,136 @@ +package bst + +// BST ... +type BST struct { + // registered topic, not unique + topic string + // associated connections with the topic + uuids map[string]struct{} + + // left and right subtrees + left *BST + right *BST +} + +func NewBST() Storage { + return &BST{ + uuids: make(map[string]struct{}, 10), + } +} + +// Insert uuid to the topic +func (b *BST) Insert(uuid string, topic string) { + curr := b + + for { + if topic == curr.topic { + curr.uuids[uuid] = struct{}{} + return + } + // if topic less than curr topic + if topic < curr.topic { + if curr.left == nil { + curr.left = &BST{ + topic: topic, + uuids: map[string]struct{}{uuid: {}}, + } + return + } + // move forward + curr = curr.left + } else { + if curr.right == nil { + curr.right = &BST{ + topic: topic, + uuids: map[string]struct{}{uuid: {}}, + } + return + } + + curr = curr.right + } + } +} + +func (b *BST) Get(topic string) map[string]struct{} { + curr := b + for curr != nil { + switch { + case topic < curr.topic: + curr = curr.left + case topic > curr.topic: + curr = curr.right + case topic == curr.topic: + return curr.uuids + } + } + + return nil +} + +func (b *BST) Remove(uuid string, topic string) { + b.removeHelper(uuid, topic, nil) +} + +func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit + curr := b + for curr != nil { + if topic < curr.topic { //nolint:gocritic + parent = curr + curr = curr.left + } else if topic > curr.topic { + parent = curr + curr = curr.right + } else { + // if more than 1 topic - remove only topic, do not remove the whole vertex + if len(curr.uuids) > 1 { + if _, ok := curr.uuids[uuid]; ok { + delete(curr.uuids, uuid) + return + } + } + + if curr.left != nil && curr.right != nil { //nolint:gocritic + curr.topic, curr.uuids = curr.right.traverseForMinString() + curr.right.removeHelper(curr.topic, uuid, curr) + } else if parent == nil { + if curr.left != nil { //nolint:gocritic + curr.topic = curr.left.topic + curr.uuids = curr.left.uuids + + curr.right = curr.left.right + curr.left = curr.left.left + } else if curr.right != nil { + curr.topic = curr.right.topic + curr.uuids = curr.right.uuids + + curr.left = curr.right.left + curr.right = curr.right.right + } else { //nolint:staticcheck + // single node tree + } + } else if parent.left == curr { + if curr.left != nil { + parent.left = curr.left + } else { + parent.left = curr.right + } + } else if parent.right == curr { + if curr.left != nil { + parent.right = curr.left + } else { + parent.right = curr.right + } + } + break + } + } +} + +//go:inline +func (b *BST) traverseForMinString() (string, map[string]struct{}) { + if b.left == nil { + return b.topic, b.uuids + } + return b.left.traverseForMinString() +} diff --git a/pkg/bst/bst_test.go b/pkg/bst/bst_test.go new file mode 100644 index 00000000..e4d4e4c3 --- /dev/null +++ b/pkg/bst/bst_test.go @@ -0,0 +1,394 @@ +package bst + +import ( + "math/rand" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +const predifined = "chat-1-2" + +func TestNewBST(t *testing.T) { + // create a new bst + g := NewBST() + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments") + } + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments2") + } + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments3") + } + + // should be 100 + exist := g.Get("comments") + assert.Len(t, exist, 100) + + // should be 100 + exist2 := g.Get("comments2") + assert.Len(t, exist2, 100) + + // should be 100 + exist3 := g.Get("comments3") + assert.Len(t, exist3, 100) +} + +func BenchmarkGraph(b *testing.B) { + g := NewBST() + + for i := 0; i < 1000; i++ { + uid := uuid.New().String() + g.Insert(uuid.NewString(), uid) + } + + g.Insert(uuid.NewString(), predifined) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + exist := g.Get(predifined) + _ = exist + } +} + +func BenchmarkBigSearch(b *testing.B) { + g1 := NewBST() + g2 := NewBST() + g3 := NewBST() + + predefinedSlice := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + predefinedSlice = append(predefinedSlice, uuid.NewString()) + } + if predefinedSlice == nil { + b.FailNow() + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g2.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g3.Insert(uuid.NewString(), uuid.NewString()) + } + + for i := 0; i < 333; i++ { + g1.Insert(uuid.NewString(), predefinedSlice[i]) + } + + for i := 0; i < 333; i++ { + g2.Insert(uuid.NewString(), predefinedSlice[333+i]) + } + + for i := 0; i < 333; i++ { + g3.Insert(uuid.NewString(), predefinedSlice[666+i]) + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g1.Get(predefinedSlice[i]) + _ = exist + } + } + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g2.Get(predefinedSlice[333+i]) + _ = exist + } + } + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g3.Get(predefinedSlice[666+i]) + _ = exist + } + } +} + +func BenchmarkBigSearchWithRemoves(b *testing.B) { + g1 := NewBST() + g2 := NewBST() + g3 := NewBST() + + predefinedSlice := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + predefinedSlice = append(predefinedSlice, uuid.NewString()) + } + if predefinedSlice == nil { + b.FailNow() + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g2.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g3.Insert(uuid.NewString(), uuid.NewString()) + } + + for i := 0; i < 333; i++ { + g1.Insert(uuid.NewString(), predefinedSlice[i]) + } + + for i := 0; i < 333; i++ { + g2.Insert(uuid.NewString(), predefinedSlice[333+i]) + } + + for i := 0; i < 333; i++ { + g3.Insert(uuid.NewString(), predefinedSlice[666+i]) + } + + go func() { + tt := time.NewTicker(time.Millisecond) + for { + select { + case <-tt.C: + num := rand.Intn(333) //nolint:gosec + values := g1.Get(predefinedSlice[num]) + for k := range values { + g1.Remove(k, predefinedSlice[num]) + } + } + } + }() + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g1.Get(predefinedSlice[i]) + _ = exist + } + } + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g2.Get(predefinedSlice[333+i]) + _ = exist + } + } + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g3.Get(predefinedSlice[666+i]) + _ = exist + } + } +} + +func TestGraph(t *testing.T) { + g := NewBST() + + for i := 0; i < 1000; i++ { + uid := uuid.New().String() + g.Insert(uuid.NewString(), uid) + } + + g.Insert(uuid.NewString(), predifined) + + exist := g.Get(predifined) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) +} + +func TestTreeConcurrentContains(t *testing.T) { + g := NewBST() + + key1 := uuid.NewString() + key2 := uuid.NewString() + key3 := uuid.NewString() + key4 := uuid.NewString() + key5 := uuid.NewString() + + g.Insert(key1, predifined) + g.Insert(key2, predifined) + g.Insert(key3, predifined) + g.Insert(key4, predifined) + g.Insert(key5, predifined) + + for i := 0; i < 100; i++ { + go func() { + _ = g.Get(predifined) + }() + + go func() { + _ = g.Get(predifined) + }() + + go func() { + _ = g.Get(predifined) + }() + + go func() { + _ = g.Get(predifined) + }() + } + + time.Sleep(time.Second * 2) + + exist := g.Get(predifined) + assert.NotNil(t, exist) + assert.Len(t, exist, 5) +} + +func TestGraphRemove(t *testing.T) { + g := NewBST() + + key1 := uuid.NewString() + key2 := uuid.NewString() + key3 := uuid.NewString() + key4 := uuid.NewString() + key5 := uuid.NewString() + + g.Insert(key1, predifined) + g.Insert(key2, predifined) + g.Insert(key3, predifined) + g.Insert(key4, predifined) + g.Insert(key5, predifined) + + exist := g.Get(predifined) + assert.NotNil(t, exist) + assert.Len(t, exist, 5) + + g.Remove(key1, predifined) + + exist = g.Get(predifined) + assert.NotNil(t, exist) + assert.Len(t, exist, 4) +} + +func TestBigSearch(t *testing.T) { + g1 := NewBST() + g2 := NewBST() + g3 := NewBST() + + predefinedSlice := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + predefinedSlice = append(predefinedSlice, uuid.NewString()) + } + if predefinedSlice == nil { + t.FailNow() + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g2.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g3.Insert(uuid.NewString(), uuid.NewString()) + } + + for i := 0; i < 333; i++ { + g1.Insert(uuid.NewString(), predefinedSlice[i]) + } + + for i := 0; i < 333; i++ { + g2.Insert(uuid.NewString(), predefinedSlice[333+i]) + } + + for i := 0; i < 333; i++ { + g3.Insert(uuid.NewString(), predefinedSlice[666+i]) + } + + for i := 0; i < 333; i++ { + exist := g1.Get(predefinedSlice[i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } + + for i := 0; i < 333; i++ { + exist := g2.Get(predefinedSlice[333+i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } + + for i := 0; i < 333; i++ { + exist := g3.Get(predefinedSlice[666+i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } +} + +func TestBigSearchWithRemoves(t *testing.T) { + g1 := NewBST() + g2 := NewBST() + g3 := NewBST() + + predefinedSlice := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + predefinedSlice = append(predefinedSlice, uuid.NewString()) + } + if predefinedSlice == nil { + t.FailNow() + } + + for i := 0; i < 100000; i++ { + g1.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 100000; i++ { + g2.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 100000; i++ { + g3.Insert(uuid.NewString(), uuid.NewString()) + } + + for i := 0; i < 333; i++ { + g1.Insert(uuid.NewString(), predefinedSlice[i]) + } + + for i := 0; i < 333; i++ { + g2.Insert(uuid.NewString(), predefinedSlice[333+i]) + } + + for i := 0; i < 333; i++ { + g3.Insert(uuid.NewString(), predefinedSlice[666+i]) + } + + time.Sleep(time.Second * 1) + go func() { + tt := time.NewTicker(time.Second) + for { + select { + case <-tt.C: + num := rand.Intn(333) //nolint:gosec + values := g1.Get(predefinedSlice[num]) + for k := range values { + g1.Remove(k, predefinedSlice[num]) + } + } + } + }() + + for i := 0; i < 333; i++ { + exist := g1.Get(predefinedSlice[i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } + + for i := 0; i < 333; i++ { + exist := g2.Get(predefinedSlice[333+i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } + + for i := 0; i < 333; i++ { + exist := g3.Get(predefinedSlice[666+i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } +} diff --git a/pkg/bst/doc.go b/pkg/bst/doc.go new file mode 100644 index 00000000..abb7e6e9 --- /dev/null +++ b/pkg/bst/doc.go @@ -0,0 +1,7 @@ +package bst + +/* +Binary search tree for the pubsub + +The vertex may have one or multiply topics associated with the single websocket connection UUID +*/ diff --git a/pkg/bst/interface.go b/pkg/bst/interface.go new file mode 100644 index 00000000..ecf40414 --- /dev/null +++ b/pkg/bst/interface.go @@ -0,0 +1,11 @@ +package bst + +// Storage is general in-memory BST storage implementation +type Storage interface { + // Insert inserts to a vertex with topic ident connection uuid + Insert(uuid string, topic string) + // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed + Remove(uuid, topic string) + // Get will return all connections associated with the topic + Get(topic string) map[string]struct{} +} diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index 4ef2f2e7..c22fbbd3 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -18,7 +18,7 @@ type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) - // Remove worker from the pool. + // RemoveWorker removes worker from the pool. RemoveWorker(worker worker.BaseProcess) error // Destroy all underlying stack (but let them to complete the task). diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index d57cc95c..b5d97b8b 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -47,7 +47,7 @@ type StaticPool struct { allocator worker.Allocator // err_encoder is the default Exec error encoder - err_encoder ErrorEncoder //nolint:golint,stylecheck + err_encoder ErrorEncoder //nolint:stylecheck } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. @@ -245,7 +245,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return func(err error, w worker.BaseProcess) (payload.Payload, error) { - const op = errors.Op("error encoder") + const op = errors.Op("error_encoder") // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 40903db3..ca61dbc4 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -15,7 +15,7 @@ import ( const MB = 1024 * 1024 // NSEC_IN_SEC nanoseconds in second -const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck +const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck type Supervised interface { Pool diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go new file mode 100644 index 00000000..caf8783f --- /dev/null +++ b/pkg/pubsub/interface.go @@ -0,0 +1,32 @@ +package pubsub + +// PubSub ... +type PubSub interface { + Publisher + Subscriber + Reader +} + +// Subscriber defines the ability to operate as message passing broker. +type Subscriber interface { + // Subscribe broker to one or multiple topics. + Subscribe(topics ...string) error + + // Unsubscribe from one or multiply topics + Unsubscribe(topics ...string) error +} + +// Publisher publish one or more messages +type Publisher interface { + // Publish one or multiple Channel. + Publish(messages []*Message) error + + // PublishAsync publish message and return immediately + // If error occurred it will be printed into the logger + PublishAsync(messages []*Message) +} + +// Reader interface should return next message +type Reader interface { + Next() (*Message, error) +} diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go new file mode 100644 index 00000000..c17d153b --- /dev/null +++ b/pkg/pubsub/message.go @@ -0,0 +1,24 @@ +package pubsub + +import ( + json "github.com/json-iterator/go" +) + +type Message struct { + // Command (join, leave, headers) + Command string `json:"command"` + + // Broker (redis, memory) + Broker string `json:"broker"` + + // Topic message been pushed into. + Topics []string `json:"topic"` + + // Payload to be broadcasted + Payload []byte `json:"payload"` +} + +// MarshalBinary needed to marshal message for the redis +func (m *Message) MarshalBinary() ([]byte, error) { + return json.Marshal(m) +} diff --git a/pkg/worker_handler/constants.go b/pkg/worker_handler/constants.go new file mode 100644 index 00000000..3355d9c2 --- /dev/null +++ b/pkg/worker_handler/constants.go @@ -0,0 +1,8 @@ +package handler + +import "net/http" + +var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push") + +// TrailerHeaderKey http header key +var TrailerHeaderKey = http.CanonicalHeaderKey("trailer") diff --git a/pkg/worker_handler/errors.go b/pkg/worker_handler/errors.go new file mode 100644 index 00000000..5fa8e64e --- /dev/null +++ b/pkg/worker_handler/errors.go @@ -0,0 +1,25 @@ +// +build !windows + +package handler + +import ( + "errors" + "net" + "os" + "syscall" +) + +// Broken pipe +var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer") + +// handleWriteError just check if error was caused by aborted connection on linux +func handleWriteError(err error) error { + if netErr, ok2 := err.(*net.OpError); ok2 { + if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { + if errors.Is(syscallErr.Err, syscall.EPIPE) { + return errEPIPE + } + } + } + return err +} diff --git a/pkg/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go new file mode 100644 index 00000000..390cc7d1 --- /dev/null +++ b/pkg/worker_handler/errors_windows.go @@ -0,0 +1,27 @@ +// +build windows + +package handler + +import ( + "errors" + "net" + "os" + "syscall" +) + +//Software caused connection abort. +//An established connection was aborted by the software in your host computer, +//possibly due to a data transmission time-out or protocol error. +var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer") + +// handleWriteError just check if error was caused by aborted connection on windows +func handleWriteError(err error) error { + if netErr, ok2 := err.(*net.OpError); ok2 { + if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { + if syscallErr.Err == syscall.WSAECONNABORTED { + return errEPIPE + } + } + } + return err +} diff --git a/pkg/worker_handler/handler.go b/pkg/worker_handler/handler.go new file mode 100644 index 00000000..e0d1aae0 --- /dev/null +++ b/pkg/worker_handler/handler.go @@ -0,0 +1,217 @@ +package handler + +import ( + "net" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/http/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// MB is 1024 bytes +const MB uint64 = 1024 * 1024 + +// ErrorEvent represents singular http error event. +type ErrorEvent struct { + // Request contains client request, must not be stored. + Request *http.Request + + // Error - associated error, if any. + Error error + + // event timings + start time.Time + elapsed time.Duration +} + +// Elapsed returns duration of the invocation. +func (e *ErrorEvent) Elapsed() time.Duration { + return e.elapsed +} + +// ResponseEvent represents singular http response event. +type ResponseEvent struct { + // Request contains client request, must not be stored. + Request *Request + + // Response contains service response. + Response *Response + + // event timings + start time.Time + elapsed time.Duration +} + +// Elapsed returns duration of the invocation. +func (e *ResponseEvent) Elapsed() time.Duration { + return e.elapsed +} + +// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, +// parsed files and query, payload will include parsed form dataTree (if any). +type Handler struct { + maxRequestSize uint64 + uploads config.Uploads + trusted config.Cidrs + log logger.Logger + pool pool.Pool + mul sync.Mutex + lsn events.Listener +} + +// NewHandler return handle interface implementation +func NewHandler(maxReqSize uint64, uploads config.Uploads, trusted config.Cidrs, pool pool.Pool) (*Handler, error) { + if pool == nil { + return nil, errors.E(errors.Str("pool should be initialized")) + } + return &Handler{ + maxRequestSize: maxReqSize * MB, + uploads: uploads, + pool: pool, + trusted: trusted, + }, nil +} + +// AddListener attaches handler event controller. +func (h *Handler) AddListener(l events.Listener) { + h.mul.Lock() + defer h.mul.Unlock() + + h.lsn = l +} + +// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + const op = errors.Op("serve_http") + start := time.Now() + + // validating request size + if h.maxRequestSize != 0 { + const op = errors.Op("http_handler_max_size") + if length := r.Header.Get("content-length"); length != "" { + // try to parse the value from the `content-length` header + size, err := strconv.ParseInt(length, 10, 64) + if err != nil { + // if got an error while parsing -> assign 500 code to the writer and return + http.Error(w, errors.E(op, err).Error(), 500) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("error while parsing value from the `content-length` header")), start: start, elapsed: time.Since(start)}) + return + } + + if size > int64(h.maxRequestSize) { + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("request body max size is exceeded")), start: start, elapsed: time.Since(start)}) + http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), 500) + return + } + } + } + + req, err := NewRequest(r, h.uploads) + if err != nil { + // if pipe is broken, there is no sense to write the header + // in this case we just report about error + if err == errEPIPE { + h.sendEvent(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) + return + } + + http.Error(w, errors.E(op, err).Error(), 500) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) + return + } + + // proxy IP resolution + h.resolveIP(req) + + req.Open(h.log) + defer req.Close(h.log) + + p, err := req.Payload() + if err != nil { + http.Error(w, errors.E(op, err).Error(), 500) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) + return + } + + rsp, err := h.pool.Exec(p) + if err != nil { + http.Error(w, errors.E(op, err).Error(), 500) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) + return + } + + resp, err := NewResponse(rsp) + if err != nil { + http.Error(w, errors.E(op, err).Error(), resp.Status) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) + return + } + + h.handleResponse(req, resp, start) + err = resp.Write(w) + if err != nil { + http.Error(w, errors.E(op, err).Error(), 500) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) + } +} + +// handleResponse triggers response event. +func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) { + h.sendEvent(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)}) +} + +// sendEvent invokes event handler if any. +func (h *Handler) sendEvent(event interface{}) { + if h.lsn != nil { + h.lsn(event) + } +} + +// get real ip passing multiple proxy +func (h *Handler) resolveIP(r *Request) { + if h.trusted.IsTrusted(r.RemoteAddr) == false { //nolint:gosimple + return + } + + if r.Header.Get("X-Forwarded-For") != "" { + ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",") + ipCount := len(ips) + + for i := ipCount - 1; i >= 0; i-- { + addr := strings.TrimSpace(ips[i]) + if net.ParseIP(addr) != nil { + r.RemoteAddr = addr + return + } + } + + return + } + + // The logic here is the following: + // In general case, we only expect X-Real-Ip header. If it exist, we get the IP address from header and set request Remote address + // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers + // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF. + // CF-Connecting-IP is an Enterprise feature and we check it last in order. + // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string + if r.Header.Get("X-Real-Ip") != "" { + r.RemoteAddr = FetchIP(r.Header.Get("X-Real-Ip")) + return + } + + if r.Header.Get("True-Client-IP") != "" { + r.RemoteAddr = FetchIP(r.Header.Get("True-Client-IP")) + return + } + + if r.Header.Get("CF-Connecting-IP") != "" { + r.RemoteAddr = FetchIP(r.Header.Get("CF-Connecting-IP")) + } +} diff --git a/pkg/worker_handler/parse.go b/pkg/worker_handler/parse.go new file mode 100644 index 00000000..2790da2a --- /dev/null +++ b/pkg/worker_handler/parse.go @@ -0,0 +1,149 @@ +package handler + +import ( + "net/http" + + "github.com/spiral/roadrunner/v2/plugins/http/config" +) + +// MaxLevel defines maximum tree depth for incoming request data and files. +const MaxLevel = 127 + +type dataTree map[string]interface{} +type fileTree map[string]interface{} + +// parseData parses incoming request body into data tree. +func parseData(r *http.Request) dataTree { + data := make(dataTree) + if r.PostForm != nil { + for k, v := range r.PostForm { + data.push(k, v) + } + } + + if r.MultipartForm != nil { + for k, v := range r.MultipartForm.Value { + data.push(k, v) + } + } + + return data +} + +// pushes value into data tree. +func (d dataTree) push(k string, v []string) { + keys := FetchIndexes(k) + if len(keys) <= MaxLevel { + d.mount(keys, v) + } +} + +// mount mounts data tree recursively. +func (d dataTree) mount(i []string, v []string) { + if len(i) == 1 { + // single value context (last element) + d[i[0]] = v[len(v)-1] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(dataTree).mount(i[1:], v) + return + } + + d[i[0]] = make(dataTree) + d[i[0]].(dataTree).mount(i[1:], v) +} + +// parse incoming dataTree request into JSON (including contentMultipart form dataTree) +func parseUploads(r *http.Request, cfg config.Uploads) *Uploads { + u := &Uploads{ + cfg: cfg, + tree: make(fileTree), + list: make([]*FileUpload, 0), + } + + for k, v := range r.MultipartForm.File { + files := make([]*FileUpload, 0, len(v)) + for _, f := range v { + files = append(files, NewUpload(f)) + } + + u.list = append(u.list, files...) + u.tree.push(k, files) + } + + return u +} + +// pushes new file upload into it's proper place. +func (d fileTree) push(k string, v []*FileUpload) { + keys := FetchIndexes(k) + if len(keys) <= MaxLevel { + d.mount(keys, v) + } +} + +// mount mounts data tree recursively. +func (d fileTree) mount(i []string, v []*FileUpload) { + if len(i) == 1 { + // single value context + d[i[0]] = v[0] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(fileTree).mount(i[1:], v) + return + } + + d[i[0]] = make(fileTree) + d[i[0]].(fileTree).mount(i[1:], v) +} + +// FetchIndexes parses input name and splits it into separate indexes list. +func FetchIndexes(s string) []string { + var ( + pos int + ch string + keys = make([]string, 1) + ) + + for _, c := range s { + ch = string(c) + switch ch { + case " ": + // ignore all spaces + continue + case "[": + pos = 1 + continue + case "]": + if pos == 1 { + keys = append(keys, "") + } + pos = 2 + default: + if pos == 1 || pos == 2 { + keys = append(keys, "") + } + + keys[len(keys)-1] += ch + pos = 0 + } + } + + return keys +} diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go new file mode 100644 index 00000000..75ee8381 --- /dev/null +++ b/pkg/worker_handler/request.go @@ -0,0 +1,187 @@ +package handler + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "strings" + + j "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/http/attributes" + "github.com/spiral/roadrunner/v2/plugins/http/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +var json = j.ConfigCompatibleWithStandardLibrary + +const ( + defaultMaxMemory = 32 << 20 // 32 MB + contentNone = iota + 900 + contentStream + contentMultipart + contentFormData +) + +// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. +type Request struct { + // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address. + RemoteAddr string `json:"remoteAddr"` + + // Protocol includes HTTP protocol version. + Protocol string `json:"protocol"` + + // Method contains name of HTTP method used for the request. + Method string `json:"method"` + + // URI contains full request URI with scheme and query. + URI string `json:"uri"` + + // Header contains list of request headers. + Header http.Header `json:"headers"` + + // Cookies contains list of request cookies. + Cookies map[string]string `json:"cookies"` + + // RawQuery contains non parsed query string (to be parsed on php end). + RawQuery string `json:"rawQuery"` + + // Parsed indicates that request body has been parsed on RR end. + Parsed bool `json:"parsed"` + + // Uploads contains list of uploaded files, their names, sized and associations with temporary files. + Uploads *Uploads `json:"uploads"` + + // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions. + Attributes map[string]interface{} `json:"attributes"` + + // request body can be parsedData or []byte + body interface{} +} + +func FetchIP(pair string) string { + if !strings.ContainsRune(pair, ':') { + return pair + } + + addr, _, _ := net.SplitHostPort(pair) + return addr +} + +// NewRequest creates new PSR7 compatible request using net/http request. +func NewRequest(r *http.Request, cfg config.Uploads) (*Request, error) { + req := &Request{ + RemoteAddr: FetchIP(r.RemoteAddr), + Protocol: r.Proto, + Method: r.Method, + URI: URI(r), + Header: r.Header, + Cookies: make(map[string]string), + RawQuery: r.URL.RawQuery, + Attributes: attributes.All(r), + } + + for _, c := range r.Cookies() { + if v, err := url.QueryUnescape(c.Value); err == nil { + req.Cookies[c.Name] = v + } + } + + switch req.contentType() { + case contentNone: + return req, nil + + case contentStream: + var err error + req.body, err = ioutil.ReadAll(r.Body) + return req, err + + case contentMultipart: + if err := r.ParseMultipartForm(defaultMaxMemory); err != nil { + return nil, err + } + + req.Uploads = parseUploads(r, cfg) + fallthrough + case contentFormData: + if err := r.ParseForm(); err != nil { + return nil, err + } + + req.body = parseData(r) + } + + req.Parsed = true + return req, nil +} + +// Open moves all uploaded files to temporary directory so it can be given to php later. +func (r *Request) Open(log logger.Logger) { + if r.Uploads == nil { + return + } + + r.Uploads.Open(log) +} + +// Close clears all temp file uploads +func (r *Request) Close(log logger.Logger) { + if r.Uploads == nil { + return + } + + r.Uploads.Clear(log) +} + +// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open +// files prior to calling this method. +func (r *Request) Payload() (payload.Payload, error) { + p := payload.Payload{} + + var err error + if p.Context, err = json.Marshal(r); err != nil { + return payload.Payload{}, err + } + + if r.Parsed { + if p.Body, err = json.Marshal(r.body); err != nil { + return payload.Payload{}, err + } + } else if r.body != nil { + p.Body = r.body.([]byte) + } + + return p, nil +} + +// contentType returns the payload content type. +func (r *Request) contentType() int { + if r.Method == "HEAD" || r.Method == "OPTIONS" { + return contentNone + } + + ct := r.Header.Get("content-type") + if strings.Contains(ct, "application/x-www-form-urlencoded") { + return contentFormData + } + + if strings.Contains(ct, "multipart/form-data") { + return contentMultipart + } + + return contentStream +} + +// URI fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). +func URI(r *http.Request) string { + if r.URL.Host != "" { + return r.URL.String() + } + if r.TLS != nil { + return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) + } + + return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) +} diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go new file mode 100644 index 00000000..1763d304 --- /dev/null +++ b/pkg/worker_handler/response.go @@ -0,0 +1,105 @@ +package handler + +import ( + "io" + "net/http" + "strings" + "sync" + + "github.com/spiral/roadrunner/v2/pkg/payload" +) + +// Response handles PSR7 response logic. +type Response struct { + // Status contains response status. + Status int `json:"status"` + + // Header contains list of response headers. + Headers map[string][]string `json:"headers"` + + // associated Body payload. + Body interface{} + sync.Mutex +} + +// NewResponse creates new response based on given pool payload. +func NewResponse(p payload.Payload) (*Response, error) { + r := &Response{Body: p.Body} + if err := json.Unmarshal(p.Context, r); err != nil { + return nil, err + } + + return r, nil +} + +// Write writes response headers, status and body into ResponseWriter. +func (r *Response) Write(w http.ResponseWriter) error { + // INFO map is the reference type in golang + p := handlePushHeaders(r.Headers) + if pusher, ok := w.(http.Pusher); ok { + for _, v := range p { + err := pusher.Push(v, nil) + if err != nil { + return err + } + } + } + + handleTrailers(r.Headers) + for n, h := range r.Headers { + for _, v := range h { + w.Header().Add(n, v) + } + } + + w.WriteHeader(r.Status) + + if data, ok := r.Body.([]byte); ok { + _, err := w.Write(data) + if err != nil { + return handleWriteError(err) + } + } + + if rc, ok := r.Body.(io.Reader); ok { + if _, err := io.Copy(w, rc); err != nil { + return err + } + } + + return nil +} + +func handlePushHeaders(h map[string][]string) []string { + var p []string + pushHeader, ok := h[http2pushHeaderKey] + if !ok { + return p + } + + p = append(p, pushHeader...) + + delete(h, http2pushHeaderKey) + + return p +} + +func handleTrailers(h map[string][]string) { + trailers, ok := h[TrailerHeaderKey] + if !ok { + return + } + + for _, tr := range trailers { + for _, n := range strings.Split(tr, ",") { + n = strings.Trim(n, "\t ") + if v, ok := h[n]; ok { + h["Trailer:"+n] = v + + delete(h, n) + } + } + } + + delete(h, TrailerHeaderKey) +} diff --git a/pkg/worker_handler/uploads.go b/pkg/worker_handler/uploads.go new file mode 100644 index 00000000..e695000e --- /dev/null +++ b/pkg/worker_handler/uploads.go @@ -0,0 +1,159 @@ +package handler + +import ( + "github.com/spiral/roadrunner/v2/plugins/http/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + + "io" + "io/ioutil" + "mime/multipart" + "os" + "sync" +) + +const ( + // UploadErrorOK - no error, the file uploaded with success. + UploadErrorOK = 0 + + // UploadErrorNoFile - no file was uploaded. + UploadErrorNoFile = 4 + + // UploadErrorNoTmpDir - missing a temporary folder. + UploadErrorNoTmpDir = 6 + + // UploadErrorCantWrite - failed to write file to disk. + UploadErrorCantWrite = 7 + + // UploadErrorExtension - forbidden file extension. + UploadErrorExtension = 8 +) + +// Uploads tree manages uploaded files tree and temporary files. +type Uploads struct { + // associated temp directory and forbidden extensions. + cfg config.Uploads + + // pre processed data tree for Uploads. + tree fileTree + + // flat list of all file Uploads. + list []*FileUpload +} + +// MarshalJSON marshal tree tree into JSON. +func (u *Uploads) MarshalJSON() ([]byte, error) { + return json.Marshal(u.tree) +} + +// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors +// will be handled individually. +func (u *Uploads) Open(log logger.Logger) { + var wg sync.WaitGroup + for _, f := range u.list { + wg.Add(1) + go func(f *FileUpload) { + defer wg.Done() + err := f.Open(u.cfg) + if err != nil && log != nil { + log.Error("error opening the file", "err", err) + } + }(f) + } + + wg.Wait() +} + +// Clear deletes all temporary files. +func (u *Uploads) Clear(log logger.Logger) { + for _, f := range u.list { + if f.TempFilename != "" && exists(f.TempFilename) { + err := os.Remove(f.TempFilename) + if err != nil && log != nil { + log.Error("error removing the file", "err", err) + } + } + } +} + +// FileUpload represents singular file NewUpload. +type FileUpload struct { + // ID contains filename specified by the client. + Name string `json:"name"` + + // Mime contains mime-type provided by the client. + Mime string `json:"mime"` + + // Size of the uploaded file. + Size int64 `json:"size"` + + // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php + Error int `json:"error"` + + // TempFilename points to temporary file location. + TempFilename string `json:"tmpName"` + + // associated file header + header *multipart.FileHeader +} + +// NewUpload wraps net/http upload into PRS-7 compatible structure. +func NewUpload(f *multipart.FileHeader) *FileUpload { + return &FileUpload{ + Name: f.Filename, + Mime: f.Header.Get("Content-Type"), + Error: UploadErrorOK, + header: f, + } +} + +// Open moves file content into temporary file available for PHP. +// NOTE: +// There is 2 deferred functions, and in case of getting 2 errors from both functions +// error from close of temp file would be overwritten by error from the main file +// STACK +// DEFER FILE CLOSE (2) +// DEFER TMP CLOSE (1) +func (f *FileUpload) Open(cfg config.Uploads) (err error) { + if cfg.Forbids(f.Name) { + f.Error = UploadErrorExtension + return nil + } + + file, err := f.header.Open() + if err != nil { + f.Error = UploadErrorNoFile + return err + } + + defer func() { + // close the main file + err = file.Close() + }() + + tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload") + if err != nil { + // most likely cause of this issue is missing tmp dir + f.Error = UploadErrorNoTmpDir + return err + } + + f.TempFilename = tmp.Name() + defer func() { + // close the temp file + err = tmp.Close() + }() + + if f.Size, err = io.Copy(tmp, file); err != nil { + f.Error = UploadErrorCantWrite + } + + return err +} + +// exists if file exists. +func exists(path string) bool { + if _, err := os.Stat(path); os.IsNotExist(err) { + return false + } + return true +} diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go index 4625b7a7..29fa3640 100644 --- a/pkg/worker_watcher/interface.go +++ b/pkg/worker_watcher/interface.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:golint,stylecheck +package worker_watcher //nolint:stylecheck import ( "context" @@ -23,9 +23,9 @@ type Watcher interface { // Destroy destroys the underlying container Destroy(ctx context.Context) - // WorkersList return all container w/o removing it from internal storage + // List return all container w/o removing it from internal storage List() []worker.BaseProcess - // RemoveWorker remove worker from the container + // Remove will remove worker from the container Remove(wb worker.BaseProcess) } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 5aec4ee6..557563ac 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:golint,stylecheck +package worker_watcher //nolint:stylecheck import ( "context" @@ -11,7 +11,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) -// workerCreateFunc can be nil, but in that case, dead container will not be replaced +// NewSyncWorkerWatcher is a constructor for the Watcher func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ container: container.NewVector(numWorkers), @@ -215,7 +215,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } } -// Warning, this is O(n) operation, and it will return copy of the actual workers +// List - this is O(n) operation, and it will return copy of the actual workers func (ww *workerWatcher) List() []worker.BaseProcess { ww.RLock() defer ww.RUnlock() |