summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-01 00:10:31 +0300
committerGitHub <[email protected]>2021-06-01 00:10:31 +0300
commit548ee4432e48b316ada00feec1a6b89e67ae4f2f (patch)
tree5cd2aaeeafdb50e3e46824197c721223f54695bf /pkg
parent8cd696bbca8fac2ced30d8172c41b7434ec86650 (diff)
parentdf4d316d519cea6dff654bd917521a616a37f769 (diff)
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
Diffstat (limited to 'pkg')
-rw-r--r--pkg/bst/bst.go136
-rw-r--r--pkg/bst/bst_test.go394
-rw-r--r--pkg/bst/doc.go7
-rw-r--r--pkg/bst/interface.go11
-rw-r--r--pkg/pool/interface.go2
-rwxr-xr-xpkg/pool/static_pool.go4
-rwxr-xr-xpkg/pool/supervisor_pool.go2
-rw-r--r--pkg/pubsub/interface.go32
-rw-r--r--pkg/pubsub/message.go24
-rw-r--r--pkg/worker_handler/constants.go8
-rw-r--r--pkg/worker_handler/errors.go25
-rw-r--r--pkg/worker_handler/errors_windows.go27
-rw-r--r--pkg/worker_handler/handler.go217
-rw-r--r--pkg/worker_handler/parse.go149
-rw-r--r--pkg/worker_handler/request.go187
-rw-r--r--pkg/worker_handler/response.go105
-rw-r--r--pkg/worker_handler/uploads.go159
-rw-r--r--pkg/worker_watcher/interface.go6
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go6
19 files changed, 1491 insertions, 10 deletions
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go
new file mode 100644
index 00000000..664937ba
--- /dev/null
+++ b/pkg/bst/bst.go
@@ -0,0 +1,136 @@
+package bst
+
+// BST ...
+type BST struct {
+ // registered topic, not unique
+ topic string
+ // associated connections with the topic
+ uuids map[string]struct{}
+
+ // left and right subtrees
+ left *BST
+ right *BST
+}
+
+func NewBST() Storage {
+ return &BST{
+ uuids: make(map[string]struct{}, 10),
+ }
+}
+
+// Insert uuid to the topic
+func (b *BST) Insert(uuid string, topic string) {
+ curr := b
+
+ for {
+ if topic == curr.topic {
+ curr.uuids[uuid] = struct{}{}
+ return
+ }
+ // if topic less than curr topic
+ if topic < curr.topic {
+ if curr.left == nil {
+ curr.left = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+ // move forward
+ curr = curr.left
+ } else {
+ if curr.right == nil {
+ curr.right = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+
+ curr = curr.right
+ }
+ }
+}
+
+func (b *BST) Get(topic string) map[string]struct{} {
+ curr := b
+ for curr != nil {
+ switch {
+ case topic < curr.topic:
+ curr = curr.left
+ case topic > curr.topic:
+ curr = curr.right
+ case topic == curr.topic:
+ return curr.uuids
+ }
+ }
+
+ return nil
+}
+
+func (b *BST) Remove(uuid string, topic string) {
+ b.removeHelper(uuid, topic, nil)
+}
+
+func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit
+ curr := b
+ for curr != nil {
+ if topic < curr.topic { //nolint:gocritic
+ parent = curr
+ curr = curr.left
+ } else if topic > curr.topic {
+ parent = curr
+ curr = curr.right
+ } else {
+ // if more than 1 topic - remove only topic, do not remove the whole vertex
+ if len(curr.uuids) > 1 {
+ if _, ok := curr.uuids[uuid]; ok {
+ delete(curr.uuids, uuid)
+ return
+ }
+ }
+
+ if curr.left != nil && curr.right != nil { //nolint:gocritic
+ curr.topic, curr.uuids = curr.right.traverseForMinString()
+ curr.right.removeHelper(curr.topic, uuid, curr)
+ } else if parent == nil {
+ if curr.left != nil { //nolint:gocritic
+ curr.topic = curr.left.topic
+ curr.uuids = curr.left.uuids
+
+ curr.right = curr.left.right
+ curr.left = curr.left.left
+ } else if curr.right != nil {
+ curr.topic = curr.right.topic
+ curr.uuids = curr.right.uuids
+
+ curr.left = curr.right.left
+ curr.right = curr.right.right
+ } else { //nolint:staticcheck
+ // single node tree
+ }
+ } else if parent.left == curr {
+ if curr.left != nil {
+ parent.left = curr.left
+ } else {
+ parent.left = curr.right
+ }
+ } else if parent.right == curr {
+ if curr.left != nil {
+ parent.right = curr.left
+ } else {
+ parent.right = curr.right
+ }
+ }
+ break
+ }
+ }
+}
+
+//go:inline
+func (b *BST) traverseForMinString() (string, map[string]struct{}) {
+ if b.left == nil {
+ return b.topic, b.uuids
+ }
+ return b.left.traverseForMinString()
+}
diff --git a/pkg/bst/bst_test.go b/pkg/bst/bst_test.go
new file mode 100644
index 00000000..e4d4e4c3
--- /dev/null
+++ b/pkg/bst/bst_test.go
@@ -0,0 +1,394 @@
+package bst
+
+import (
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+)
+
+const predifined = "chat-1-2"
+
+func TestNewBST(t *testing.T) {
+ // create a new bst
+ g := NewBST()
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments2")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments3")
+ }
+
+ // should be 100
+ exist := g.Get("comments")
+ assert.Len(t, exist, 100)
+
+ // should be 100
+ exist2 := g.Get("comments2")
+ assert.Len(t, exist2, 100)
+
+ // should be 100
+ exist3 := g.Get("comments3")
+ assert.Len(t, exist3, 100)
+}
+
+func BenchmarkGraph(b *testing.B) {
+ g := NewBST()
+
+ for i := 0; i < 1000; i++ {
+ uid := uuid.New().String()
+ g.Insert(uuid.NewString(), uid)
+ }
+
+ g.Insert(uuid.NewString(), predifined)
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ exist := g.Get(predifined)
+ _ = exist
+ }
+}
+
+func BenchmarkBigSearch(b *testing.B) {
+ g1 := NewBST()
+ g2 := NewBST()
+ g3 := NewBST()
+
+ predefinedSlice := make([]string, 0, 1000)
+ for i := 0; i < 1000; i++ {
+ predefinedSlice = append(predefinedSlice, uuid.NewString())
+ }
+ if predefinedSlice == nil {
+ b.FailNow()
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g2.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g3.Insert(uuid.NewString(), uuid.NewString())
+ }
+
+ for i := 0; i < 333; i++ {
+ g1.Insert(uuid.NewString(), predefinedSlice[i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g2.Insert(uuid.NewString(), predefinedSlice[333+i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g3.Insert(uuid.NewString(), predefinedSlice[666+i])
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g1.Get(predefinedSlice[i])
+ _ = exist
+ }
+ }
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g2.Get(predefinedSlice[333+i])
+ _ = exist
+ }
+ }
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g3.Get(predefinedSlice[666+i])
+ _ = exist
+ }
+ }
+}
+
+func BenchmarkBigSearchWithRemoves(b *testing.B) {
+ g1 := NewBST()
+ g2 := NewBST()
+ g3 := NewBST()
+
+ predefinedSlice := make([]string, 0, 1000)
+ for i := 0; i < 1000; i++ {
+ predefinedSlice = append(predefinedSlice, uuid.NewString())
+ }
+ if predefinedSlice == nil {
+ b.FailNow()
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g2.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g3.Insert(uuid.NewString(), uuid.NewString())
+ }
+
+ for i := 0; i < 333; i++ {
+ g1.Insert(uuid.NewString(), predefinedSlice[i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g2.Insert(uuid.NewString(), predefinedSlice[333+i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g3.Insert(uuid.NewString(), predefinedSlice[666+i])
+ }
+
+ go func() {
+ tt := time.NewTicker(time.Millisecond)
+ for {
+ select {
+ case <-tt.C:
+ num := rand.Intn(333) //nolint:gosec
+ values := g1.Get(predefinedSlice[num])
+ for k := range values {
+ g1.Remove(k, predefinedSlice[num])
+ }
+ }
+ }
+ }()
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g1.Get(predefinedSlice[i])
+ _ = exist
+ }
+ }
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g2.Get(predefinedSlice[333+i])
+ _ = exist
+ }
+ }
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g3.Get(predefinedSlice[666+i])
+ _ = exist
+ }
+ }
+}
+
+func TestGraph(t *testing.T) {
+ g := NewBST()
+
+ for i := 0; i < 1000; i++ {
+ uid := uuid.New().String()
+ g.Insert(uuid.NewString(), uid)
+ }
+
+ g.Insert(uuid.NewString(), predifined)
+
+ exist := g.Get(predifined)
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+}
+
+func TestTreeConcurrentContains(t *testing.T) {
+ g := NewBST()
+
+ key1 := uuid.NewString()
+ key2 := uuid.NewString()
+ key3 := uuid.NewString()
+ key4 := uuid.NewString()
+ key5 := uuid.NewString()
+
+ g.Insert(key1, predifined)
+ g.Insert(key2, predifined)
+ g.Insert(key3, predifined)
+ g.Insert(key4, predifined)
+ g.Insert(key5, predifined)
+
+ for i := 0; i < 100; i++ {
+ go func() {
+ _ = g.Get(predifined)
+ }()
+
+ go func() {
+ _ = g.Get(predifined)
+ }()
+
+ go func() {
+ _ = g.Get(predifined)
+ }()
+
+ go func() {
+ _ = g.Get(predifined)
+ }()
+ }
+
+ time.Sleep(time.Second * 2)
+
+ exist := g.Get(predifined)
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 5)
+}
+
+func TestGraphRemove(t *testing.T) {
+ g := NewBST()
+
+ key1 := uuid.NewString()
+ key2 := uuid.NewString()
+ key3 := uuid.NewString()
+ key4 := uuid.NewString()
+ key5 := uuid.NewString()
+
+ g.Insert(key1, predifined)
+ g.Insert(key2, predifined)
+ g.Insert(key3, predifined)
+ g.Insert(key4, predifined)
+ g.Insert(key5, predifined)
+
+ exist := g.Get(predifined)
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 5)
+
+ g.Remove(key1, predifined)
+
+ exist = g.Get(predifined)
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 4)
+}
+
+func TestBigSearch(t *testing.T) {
+ g1 := NewBST()
+ g2 := NewBST()
+ g3 := NewBST()
+
+ predefinedSlice := make([]string, 0, 1000)
+ for i := 0; i < 1000; i++ {
+ predefinedSlice = append(predefinedSlice, uuid.NewString())
+ }
+ if predefinedSlice == nil {
+ t.FailNow()
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g2.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g3.Insert(uuid.NewString(), uuid.NewString())
+ }
+
+ for i := 0; i < 333; i++ {
+ g1.Insert(uuid.NewString(), predefinedSlice[i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g2.Insert(uuid.NewString(), predefinedSlice[333+i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g3.Insert(uuid.NewString(), predefinedSlice[666+i])
+ }
+
+ for i := 0; i < 333; i++ {
+ exist := g1.Get(predefinedSlice[i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+
+ for i := 0; i < 333; i++ {
+ exist := g2.Get(predefinedSlice[333+i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+
+ for i := 0; i < 333; i++ {
+ exist := g3.Get(predefinedSlice[666+i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+}
+
+func TestBigSearchWithRemoves(t *testing.T) {
+ g1 := NewBST()
+ g2 := NewBST()
+ g3 := NewBST()
+
+ predefinedSlice := make([]string, 0, 1000)
+ for i := 0; i < 1000; i++ {
+ predefinedSlice = append(predefinedSlice, uuid.NewString())
+ }
+ if predefinedSlice == nil {
+ t.FailNow()
+ }
+
+ for i := 0; i < 100000; i++ {
+ g1.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 100000; i++ {
+ g2.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 100000; i++ {
+ g3.Insert(uuid.NewString(), uuid.NewString())
+ }
+
+ for i := 0; i < 333; i++ {
+ g1.Insert(uuid.NewString(), predefinedSlice[i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g2.Insert(uuid.NewString(), predefinedSlice[333+i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g3.Insert(uuid.NewString(), predefinedSlice[666+i])
+ }
+
+ time.Sleep(time.Second * 1)
+ go func() {
+ tt := time.NewTicker(time.Second)
+ for {
+ select {
+ case <-tt.C:
+ num := rand.Intn(333) //nolint:gosec
+ values := g1.Get(predefinedSlice[num])
+ for k := range values {
+ g1.Remove(k, predefinedSlice[num])
+ }
+ }
+ }
+ }()
+
+ for i := 0; i < 333; i++ {
+ exist := g1.Get(predefinedSlice[i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+
+ for i := 0; i < 333; i++ {
+ exist := g2.Get(predefinedSlice[333+i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+
+ for i := 0; i < 333; i++ {
+ exist := g3.Get(predefinedSlice[666+i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+}
diff --git a/pkg/bst/doc.go b/pkg/bst/doc.go
new file mode 100644
index 00000000..abb7e6e9
--- /dev/null
+++ b/pkg/bst/doc.go
@@ -0,0 +1,7 @@
+package bst
+
+/*
+Binary search tree for the pubsub
+
+The vertex may have one or multiply topics associated with the single websocket connection UUID
+*/
diff --git a/pkg/bst/interface.go b/pkg/bst/interface.go
new file mode 100644
index 00000000..ecf40414
--- /dev/null
+++ b/pkg/bst/interface.go
@@ -0,0 +1,11 @@
+package bst
+
+// Storage is general in-memory BST storage implementation
+type Storage interface {
+ // Insert inserts to a vertex with topic ident connection uuid
+ Insert(uuid string, topic string)
+ // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed
+ Remove(uuid, topic string)
+ // Get will return all connections associated with the topic
+ Get(topic string) map[string]struct{}
+}
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index 4ef2f2e7..c22fbbd3 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -18,7 +18,7 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
- // Remove worker from the pool.
+ // RemoveWorker removes worker from the pool.
RemoveWorker(worker worker.BaseProcess) error
// Destroy all underlying stack (but let them to complete the task).
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index d57cc95c..b5d97b8b 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -47,7 +47,7 @@ type StaticPool struct {
allocator worker.Allocator
// err_encoder is the default Exec error encoder
- err_encoder ErrorEncoder //nolint:golint,stylecheck
+ err_encoder ErrorEncoder //nolint:stylecheck
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -245,7 +245,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.BaseProcess) (payload.Payload, error) {
- const op = errors.Op("error encoder")
+ const op = errors.Op("error_encoder")
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 40903db3..ca61dbc4 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -15,7 +15,7 @@ import (
const MB = 1024 * 1024
// NSEC_IN_SEC nanoseconds in second
-const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
+const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
type Supervised interface {
Pool
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
new file mode 100644
index 00000000..caf8783f
--- /dev/null
+++ b/pkg/pubsub/interface.go
@@ -0,0 +1,32 @@
+package pubsub
+
+// PubSub ...
+type PubSub interface {
+ Publisher
+ Subscriber
+ Reader
+}
+
+// Subscriber defines the ability to operate as message passing broker.
+type Subscriber interface {
+ // Subscribe broker to one or multiple topics.
+ Subscribe(topics ...string) error
+
+ // Unsubscribe from one or multiply topics
+ Unsubscribe(topics ...string) error
+}
+
+// Publisher publish one or more messages
+type Publisher interface {
+ // Publish one or multiple Channel.
+ Publish(messages []*Message) error
+
+ // PublishAsync publish message and return immediately
+ // If error occurred it will be printed into the logger
+ PublishAsync(messages []*Message)
+}
+
+// Reader interface should return next message
+type Reader interface {
+ Next() (*Message, error)
+}
diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go
new file mode 100644
index 00000000..c17d153b
--- /dev/null
+++ b/pkg/pubsub/message.go
@@ -0,0 +1,24 @@
+package pubsub
+
+import (
+ json "github.com/json-iterator/go"
+)
+
+type Message struct {
+ // Command (join, leave, headers)
+ Command string `json:"command"`
+
+ // Broker (redis, memory)
+ Broker string `json:"broker"`
+
+ // Topic message been pushed into.
+ Topics []string `json:"topic"`
+
+ // Payload to be broadcasted
+ Payload []byte `json:"payload"`
+}
+
+// MarshalBinary needed to marshal message for the redis
+func (m *Message) MarshalBinary() ([]byte, error) {
+ return json.Marshal(m)
+}
diff --git a/pkg/worker_handler/constants.go b/pkg/worker_handler/constants.go
new file mode 100644
index 00000000..3355d9c2
--- /dev/null
+++ b/pkg/worker_handler/constants.go
@@ -0,0 +1,8 @@
+package handler
+
+import "net/http"
+
+var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push")
+
+// TrailerHeaderKey http header key
+var TrailerHeaderKey = http.CanonicalHeaderKey("trailer")
diff --git a/pkg/worker_handler/errors.go b/pkg/worker_handler/errors.go
new file mode 100644
index 00000000..5fa8e64e
--- /dev/null
+++ b/pkg/worker_handler/errors.go
@@ -0,0 +1,25 @@
+// +build !windows
+
+package handler
+
+import (
+ "errors"
+ "net"
+ "os"
+ "syscall"
+)
+
+// Broken pipe
+var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer")
+
+// handleWriteError just check if error was caused by aborted connection on linux
+func handleWriteError(err error) error {
+ if netErr, ok2 := err.(*net.OpError); ok2 {
+ if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
+ if errors.Is(syscallErr.Err, syscall.EPIPE) {
+ return errEPIPE
+ }
+ }
+ }
+ return err
+}
diff --git a/pkg/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go
new file mode 100644
index 00000000..390cc7d1
--- /dev/null
+++ b/pkg/worker_handler/errors_windows.go
@@ -0,0 +1,27 @@
+// +build windows
+
+package handler
+
+import (
+ "errors"
+ "net"
+ "os"
+ "syscall"
+)
+
+//Software caused connection abort.
+//An established connection was aborted by the software in your host computer,
+//possibly due to a data transmission time-out or protocol error.
+var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer")
+
+// handleWriteError just check if error was caused by aborted connection on windows
+func handleWriteError(err error) error {
+ if netErr, ok2 := err.(*net.OpError); ok2 {
+ if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
+ if syscallErr.Err == syscall.WSAECONNABORTED {
+ return errEPIPE
+ }
+ }
+ }
+ return err
+}
diff --git a/pkg/worker_handler/handler.go b/pkg/worker_handler/handler.go
new file mode 100644
index 00000000..e0d1aae0
--- /dev/null
+++ b/pkg/worker_handler/handler.go
@@ -0,0 +1,217 @@
+package handler
+
+import (
+ "net"
+ "net/http"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/http/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// MB is 1024 bytes
+const MB uint64 = 1024 * 1024
+
+// ErrorEvent represents singular http error event.
+type ErrorEvent struct {
+ // Request contains client request, must not be stored.
+ Request *http.Request
+
+ // Error - associated error, if any.
+ Error error
+
+ // event timings
+ start time.Time
+ elapsed time.Duration
+}
+
+// Elapsed returns duration of the invocation.
+func (e *ErrorEvent) Elapsed() time.Duration {
+ return e.elapsed
+}
+
+// ResponseEvent represents singular http response event.
+type ResponseEvent struct {
+ // Request contains client request, must not be stored.
+ Request *Request
+
+ // Response contains service response.
+ Response *Response
+
+ // event timings
+ start time.Time
+ elapsed time.Duration
+}
+
+// Elapsed returns duration of the invocation.
+func (e *ResponseEvent) Elapsed() time.Duration {
+ return e.elapsed
+}
+
+// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
+// parsed files and query, payload will include parsed form dataTree (if any).
+type Handler struct {
+ maxRequestSize uint64
+ uploads config.Uploads
+ trusted config.Cidrs
+ log logger.Logger
+ pool pool.Pool
+ mul sync.Mutex
+ lsn events.Listener
+}
+
+// NewHandler return handle interface implementation
+func NewHandler(maxReqSize uint64, uploads config.Uploads, trusted config.Cidrs, pool pool.Pool) (*Handler, error) {
+ if pool == nil {
+ return nil, errors.E(errors.Str("pool should be initialized"))
+ }
+ return &Handler{
+ maxRequestSize: maxReqSize * MB,
+ uploads: uploads,
+ pool: pool,
+ trusted: trusted,
+ }, nil
+}
+
+// AddListener attaches handler event controller.
+func (h *Handler) AddListener(l events.Listener) {
+ h.mul.Lock()
+ defer h.mul.Unlock()
+
+ h.lsn = l
+}
+
+// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
+func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ const op = errors.Op("serve_http")
+ start := time.Now()
+
+ // validating request size
+ if h.maxRequestSize != 0 {
+ const op = errors.Op("http_handler_max_size")
+ if length := r.Header.Get("content-length"); length != "" {
+ // try to parse the value from the `content-length` header
+ size, err := strconv.ParseInt(length, 10, 64)
+ if err != nil {
+ // if got an error while parsing -> assign 500 code to the writer and return
+ http.Error(w, errors.E(op, err).Error(), 500)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("error while parsing value from the `content-length` header")), start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ if size > int64(h.maxRequestSize) {
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("request body max size is exceeded")), start: start, elapsed: time.Since(start)})
+ http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), 500)
+ return
+ }
+ }
+ }
+
+ req, err := NewRequest(r, h.uploads)
+ if err != nil {
+ // if pipe is broken, there is no sense to write the header
+ // in this case we just report about error
+ if err == errEPIPE {
+ h.sendEvent(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ http.Error(w, errors.E(op, err).Error(), 500)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ // proxy IP resolution
+ h.resolveIP(req)
+
+ req.Open(h.log)
+ defer req.Close(h.log)
+
+ p, err := req.Payload()
+ if err != nil {
+ http.Error(w, errors.E(op, err).Error(), 500)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ rsp, err := h.pool.Exec(p)
+ if err != nil {
+ http.Error(w, errors.E(op, err).Error(), 500)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ resp, err := NewResponse(rsp)
+ if err != nil {
+ http.Error(w, errors.E(op, err).Error(), resp.Status)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ h.handleResponse(req, resp, start)
+ err = resp.Write(w)
+ if err != nil {
+ http.Error(w, errors.E(op, err).Error(), 500)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
+ }
+}
+
+// handleResponse triggers response event.
+func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) {
+ h.sendEvent(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)})
+}
+
+// sendEvent invokes event handler if any.
+func (h *Handler) sendEvent(event interface{}) {
+ if h.lsn != nil {
+ h.lsn(event)
+ }
+}
+
+// get real ip passing multiple proxy
+func (h *Handler) resolveIP(r *Request) {
+ if h.trusted.IsTrusted(r.RemoteAddr) == false { //nolint:gosimple
+ return
+ }
+
+ if r.Header.Get("X-Forwarded-For") != "" {
+ ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",")
+ ipCount := len(ips)
+
+ for i := ipCount - 1; i >= 0; i-- {
+ addr := strings.TrimSpace(ips[i])
+ if net.ParseIP(addr) != nil {
+ r.RemoteAddr = addr
+ return
+ }
+ }
+
+ return
+ }
+
+ // The logic here is the following:
+ // In general case, we only expect X-Real-Ip header. If it exist, we get the IP address from header and set request Remote address
+ // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers
+ // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF.
+ // CF-Connecting-IP is an Enterprise feature and we check it last in order.
+ // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string
+ if r.Header.Get("X-Real-Ip") != "" {
+ r.RemoteAddr = FetchIP(r.Header.Get("X-Real-Ip"))
+ return
+ }
+
+ if r.Header.Get("True-Client-IP") != "" {
+ r.RemoteAddr = FetchIP(r.Header.Get("True-Client-IP"))
+ return
+ }
+
+ if r.Header.Get("CF-Connecting-IP") != "" {
+ r.RemoteAddr = FetchIP(r.Header.Get("CF-Connecting-IP"))
+ }
+}
diff --git a/pkg/worker_handler/parse.go b/pkg/worker_handler/parse.go
new file mode 100644
index 00000000..2790da2a
--- /dev/null
+++ b/pkg/worker_handler/parse.go
@@ -0,0 +1,149 @@
+package handler
+
+import (
+ "net/http"
+
+ "github.com/spiral/roadrunner/v2/plugins/http/config"
+)
+
+// MaxLevel defines maximum tree depth for incoming request data and files.
+const MaxLevel = 127
+
+type dataTree map[string]interface{}
+type fileTree map[string]interface{}
+
+// parseData parses incoming request body into data tree.
+func parseData(r *http.Request) dataTree {
+ data := make(dataTree)
+ if r.PostForm != nil {
+ for k, v := range r.PostForm {
+ data.push(k, v)
+ }
+ }
+
+ if r.MultipartForm != nil {
+ for k, v := range r.MultipartForm.Value {
+ data.push(k, v)
+ }
+ }
+
+ return data
+}
+
+// pushes value into data tree.
+func (d dataTree) push(k string, v []string) {
+ keys := FetchIndexes(k)
+ if len(keys) <= MaxLevel {
+ d.mount(keys, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d dataTree) mount(i []string, v []string) {
+ if len(i) == 1 {
+ // single value context (last element)
+ d[i[0]] = v[len(v)-1]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(dataTree).mount(i[1:], v)
+ return
+ }
+
+ d[i[0]] = make(dataTree)
+ d[i[0]].(dataTree).mount(i[1:], v)
+}
+
+// parse incoming dataTree request into JSON (including contentMultipart form dataTree)
+func parseUploads(r *http.Request, cfg config.Uploads) *Uploads {
+ u := &Uploads{
+ cfg: cfg,
+ tree: make(fileTree),
+ list: make([]*FileUpload, 0),
+ }
+
+ for k, v := range r.MultipartForm.File {
+ files := make([]*FileUpload, 0, len(v))
+ for _, f := range v {
+ files = append(files, NewUpload(f))
+ }
+
+ u.list = append(u.list, files...)
+ u.tree.push(k, files)
+ }
+
+ return u
+}
+
+// pushes new file upload into it's proper place.
+func (d fileTree) push(k string, v []*FileUpload) {
+ keys := FetchIndexes(k)
+ if len(keys) <= MaxLevel {
+ d.mount(keys, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d fileTree) mount(i []string, v []*FileUpload) {
+ if len(i) == 1 {
+ // single value context
+ d[i[0]] = v[0]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(fileTree).mount(i[1:], v)
+ return
+ }
+
+ d[i[0]] = make(fileTree)
+ d[i[0]].(fileTree).mount(i[1:], v)
+}
+
+// FetchIndexes parses input name and splits it into separate indexes list.
+func FetchIndexes(s string) []string {
+ var (
+ pos int
+ ch string
+ keys = make([]string, 1)
+ )
+
+ for _, c := range s {
+ ch = string(c)
+ switch ch {
+ case " ":
+ // ignore all spaces
+ continue
+ case "[":
+ pos = 1
+ continue
+ case "]":
+ if pos == 1 {
+ keys = append(keys, "")
+ }
+ pos = 2
+ default:
+ if pos == 1 || pos == 2 {
+ keys = append(keys, "")
+ }
+
+ keys[len(keys)-1] += ch
+ pos = 0
+ }
+ }
+
+ return keys
+}
diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go
new file mode 100644
index 00000000..75ee8381
--- /dev/null
+++ b/pkg/worker_handler/request.go
@@ -0,0 +1,187 @@
+package handler
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "net/url"
+ "strings"
+
+ j "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/http/attributes"
+ "github.com/spiral/roadrunner/v2/plugins/http/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+var json = j.ConfigCompatibleWithStandardLibrary
+
+const (
+ defaultMaxMemory = 32 << 20 // 32 MB
+ contentNone = iota + 900
+ contentStream
+ contentMultipart
+ contentFormData
+)
+
+// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files.
+type Request struct {
+ // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address.
+ RemoteAddr string `json:"remoteAddr"`
+
+ // Protocol includes HTTP protocol version.
+ Protocol string `json:"protocol"`
+
+ // Method contains name of HTTP method used for the request.
+ Method string `json:"method"`
+
+ // URI contains full request URI with scheme and query.
+ URI string `json:"uri"`
+
+ // Header contains list of request headers.
+ Header http.Header `json:"headers"`
+
+ // Cookies contains list of request cookies.
+ Cookies map[string]string `json:"cookies"`
+
+ // RawQuery contains non parsed query string (to be parsed on php end).
+ RawQuery string `json:"rawQuery"`
+
+ // Parsed indicates that request body has been parsed on RR end.
+ Parsed bool `json:"parsed"`
+
+ // Uploads contains list of uploaded files, their names, sized and associations with temporary files.
+ Uploads *Uploads `json:"uploads"`
+
+ // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions.
+ Attributes map[string]interface{} `json:"attributes"`
+
+ // request body can be parsedData or []byte
+ body interface{}
+}
+
+func FetchIP(pair string) string {
+ if !strings.ContainsRune(pair, ':') {
+ return pair
+ }
+
+ addr, _, _ := net.SplitHostPort(pair)
+ return addr
+}
+
+// NewRequest creates new PSR7 compatible request using net/http request.
+func NewRequest(r *http.Request, cfg config.Uploads) (*Request, error) {
+ req := &Request{
+ RemoteAddr: FetchIP(r.RemoteAddr),
+ Protocol: r.Proto,
+ Method: r.Method,
+ URI: URI(r),
+ Header: r.Header,
+ Cookies: make(map[string]string),
+ RawQuery: r.URL.RawQuery,
+ Attributes: attributes.All(r),
+ }
+
+ for _, c := range r.Cookies() {
+ if v, err := url.QueryUnescape(c.Value); err == nil {
+ req.Cookies[c.Name] = v
+ }
+ }
+
+ switch req.contentType() {
+ case contentNone:
+ return req, nil
+
+ case contentStream:
+ var err error
+ req.body, err = ioutil.ReadAll(r.Body)
+ return req, err
+
+ case contentMultipart:
+ if err := r.ParseMultipartForm(defaultMaxMemory); err != nil {
+ return nil, err
+ }
+
+ req.Uploads = parseUploads(r, cfg)
+ fallthrough
+ case contentFormData:
+ if err := r.ParseForm(); err != nil {
+ return nil, err
+ }
+
+ req.body = parseData(r)
+ }
+
+ req.Parsed = true
+ return req, nil
+}
+
+// Open moves all uploaded files to temporary directory so it can be given to php later.
+func (r *Request) Open(log logger.Logger) {
+ if r.Uploads == nil {
+ return
+ }
+
+ r.Uploads.Open(log)
+}
+
+// Close clears all temp file uploads
+func (r *Request) Close(log logger.Logger) {
+ if r.Uploads == nil {
+ return
+ }
+
+ r.Uploads.Clear(log)
+}
+
+// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
+// files prior to calling this method.
+func (r *Request) Payload() (payload.Payload, error) {
+ p := payload.Payload{}
+
+ var err error
+ if p.Context, err = json.Marshal(r); err != nil {
+ return payload.Payload{}, err
+ }
+
+ if r.Parsed {
+ if p.Body, err = json.Marshal(r.body); err != nil {
+ return payload.Payload{}, err
+ }
+ } else if r.body != nil {
+ p.Body = r.body.([]byte)
+ }
+
+ return p, nil
+}
+
+// contentType returns the payload content type.
+func (r *Request) contentType() int {
+ if r.Method == "HEAD" || r.Method == "OPTIONS" {
+ return contentNone
+ }
+
+ ct := r.Header.Get("content-type")
+ if strings.Contains(ct, "application/x-www-form-urlencoded") {
+ return contentFormData
+ }
+
+ if strings.Contains(ct, "multipart/form-data") {
+ return contentMultipart
+ }
+
+ return contentStream
+}
+
+// URI fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
+func URI(r *http.Request) string {
+ if r.URL.Host != "" {
+ return r.URL.String()
+ }
+ if r.TLS != nil {
+ return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
+ }
+
+ return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
+}
diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go
new file mode 100644
index 00000000..1763d304
--- /dev/null
+++ b/pkg/worker_handler/response.go
@@ -0,0 +1,105 @@
+package handler
+
+import (
+ "io"
+ "net/http"
+ "strings"
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+)
+
+// Response handles PSR7 response logic.
+type Response struct {
+ // Status contains response status.
+ Status int `json:"status"`
+
+ // Header contains list of response headers.
+ Headers map[string][]string `json:"headers"`
+
+ // associated Body payload.
+ Body interface{}
+ sync.Mutex
+}
+
+// NewResponse creates new response based on given pool payload.
+func NewResponse(p payload.Payload) (*Response, error) {
+ r := &Response{Body: p.Body}
+ if err := json.Unmarshal(p.Context, r); err != nil {
+ return nil, err
+ }
+
+ return r, nil
+}
+
+// Write writes response headers, status and body into ResponseWriter.
+func (r *Response) Write(w http.ResponseWriter) error {
+ // INFO map is the reference type in golang
+ p := handlePushHeaders(r.Headers)
+ if pusher, ok := w.(http.Pusher); ok {
+ for _, v := range p {
+ err := pusher.Push(v, nil)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ handleTrailers(r.Headers)
+ for n, h := range r.Headers {
+ for _, v := range h {
+ w.Header().Add(n, v)
+ }
+ }
+
+ w.WriteHeader(r.Status)
+
+ if data, ok := r.Body.([]byte); ok {
+ _, err := w.Write(data)
+ if err != nil {
+ return handleWriteError(err)
+ }
+ }
+
+ if rc, ok := r.Body.(io.Reader); ok {
+ if _, err := io.Copy(w, rc); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func handlePushHeaders(h map[string][]string) []string {
+ var p []string
+ pushHeader, ok := h[http2pushHeaderKey]
+ if !ok {
+ return p
+ }
+
+ p = append(p, pushHeader...)
+
+ delete(h, http2pushHeaderKey)
+
+ return p
+}
+
+func handleTrailers(h map[string][]string) {
+ trailers, ok := h[TrailerHeaderKey]
+ if !ok {
+ return
+ }
+
+ for _, tr := range trailers {
+ for _, n := range strings.Split(tr, ",") {
+ n = strings.Trim(n, "\t ")
+ if v, ok := h[n]; ok {
+ h["Trailer:"+n] = v
+
+ delete(h, n)
+ }
+ }
+ }
+
+ delete(h, TrailerHeaderKey)
+}
diff --git a/pkg/worker_handler/uploads.go b/pkg/worker_handler/uploads.go
new file mode 100644
index 00000000..e695000e
--- /dev/null
+++ b/pkg/worker_handler/uploads.go
@@ -0,0 +1,159 @@
+package handler
+
+import (
+ "github.com/spiral/roadrunner/v2/plugins/http/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+
+ "io"
+ "io/ioutil"
+ "mime/multipart"
+ "os"
+ "sync"
+)
+
+const (
+ // UploadErrorOK - no error, the file uploaded with success.
+ UploadErrorOK = 0
+
+ // UploadErrorNoFile - no file was uploaded.
+ UploadErrorNoFile = 4
+
+ // UploadErrorNoTmpDir - missing a temporary folder.
+ UploadErrorNoTmpDir = 6
+
+ // UploadErrorCantWrite - failed to write file to disk.
+ UploadErrorCantWrite = 7
+
+ // UploadErrorExtension - forbidden file extension.
+ UploadErrorExtension = 8
+)
+
+// Uploads tree manages uploaded files tree and temporary files.
+type Uploads struct {
+ // associated temp directory and forbidden extensions.
+ cfg config.Uploads
+
+ // pre processed data tree for Uploads.
+ tree fileTree
+
+ // flat list of all file Uploads.
+ list []*FileUpload
+}
+
+// MarshalJSON marshal tree tree into JSON.
+func (u *Uploads) MarshalJSON() ([]byte, error) {
+ return json.Marshal(u.tree)
+}
+
+// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
+// will be handled individually.
+func (u *Uploads) Open(log logger.Logger) {
+ var wg sync.WaitGroup
+ for _, f := range u.list {
+ wg.Add(1)
+ go func(f *FileUpload) {
+ defer wg.Done()
+ err := f.Open(u.cfg)
+ if err != nil && log != nil {
+ log.Error("error opening the file", "err", err)
+ }
+ }(f)
+ }
+
+ wg.Wait()
+}
+
+// Clear deletes all temporary files.
+func (u *Uploads) Clear(log logger.Logger) {
+ for _, f := range u.list {
+ if f.TempFilename != "" && exists(f.TempFilename) {
+ err := os.Remove(f.TempFilename)
+ if err != nil && log != nil {
+ log.Error("error removing the file", "err", err)
+ }
+ }
+ }
+}
+
+// FileUpload represents singular file NewUpload.
+type FileUpload struct {
+ // ID contains filename specified by the client.
+ Name string `json:"name"`
+
+ // Mime contains mime-type provided by the client.
+ Mime string `json:"mime"`
+
+ // Size of the uploaded file.
+ Size int64 `json:"size"`
+
+ // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php
+ Error int `json:"error"`
+
+ // TempFilename points to temporary file location.
+ TempFilename string `json:"tmpName"`
+
+ // associated file header
+ header *multipart.FileHeader
+}
+
+// NewUpload wraps net/http upload into PRS-7 compatible structure.
+func NewUpload(f *multipart.FileHeader) *FileUpload {
+ return &FileUpload{
+ Name: f.Filename,
+ Mime: f.Header.Get("Content-Type"),
+ Error: UploadErrorOK,
+ header: f,
+ }
+}
+
+// Open moves file content into temporary file available for PHP.
+// NOTE:
+// There is 2 deferred functions, and in case of getting 2 errors from both functions
+// error from close of temp file would be overwritten by error from the main file
+// STACK
+// DEFER FILE CLOSE (2)
+// DEFER TMP CLOSE (1)
+func (f *FileUpload) Open(cfg config.Uploads) (err error) {
+ if cfg.Forbids(f.Name) {
+ f.Error = UploadErrorExtension
+ return nil
+ }
+
+ file, err := f.header.Open()
+ if err != nil {
+ f.Error = UploadErrorNoFile
+ return err
+ }
+
+ defer func() {
+ // close the main file
+ err = file.Close()
+ }()
+
+ tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload")
+ if err != nil {
+ // most likely cause of this issue is missing tmp dir
+ f.Error = UploadErrorNoTmpDir
+ return err
+ }
+
+ f.TempFilename = tmp.Name()
+ defer func() {
+ // close the temp file
+ err = tmp.Close()
+ }()
+
+ if f.Size, err = io.Copy(tmp, file); err != nil {
+ f.Error = UploadErrorCantWrite
+ }
+
+ return err
+}
+
+// exists if file exists.
+func exists(path string) bool {
+ if _, err := os.Stat(path); os.IsNotExist(err) {
+ return false
+ }
+ return true
+}
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index 4625b7a7..29fa3640 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package worker_watcher //nolint:stylecheck
import (
"context"
@@ -23,9 +23,9 @@ type Watcher interface {
// Destroy destroys the underlying container
Destroy(ctx context.Context)
- // WorkersList return all container w/o removing it from internal storage
+ // List return all container w/o removing it from internal storage
List() []worker.BaseProcess
- // RemoveWorker remove worker from the container
+ // Remove will remove worker from the container
Remove(wb worker.BaseProcess)
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 5aec4ee6..557563ac 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package worker_watcher //nolint:stylecheck
import (
"context"
@@ -11,7 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
-// workerCreateFunc can be nil, but in that case, dead container will not be replaced
+// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
ww := &workerWatcher{
container: container.NewVector(numWorkers),
@@ -215,7 +215,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
}
-// Warning, this is O(n) operation, and it will return copy of the actual workers
+// List - this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) List() []worker.BaseProcess {
ww.RLock()
defer ww.RUnlock()