summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-x.rr.yaml10
-rw-r--r--pkg/worker_watcher/container/interface.go4
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go8
3 files changed, 15 insertions, 7 deletions
diff --git a/.rr.yaml b/.rr.yaml
index f5185808..3f41c126 100755
--- a/.rr.yaml
+++ b/.rr.yaml
@@ -1,3 +1,7 @@
+######################################################################################
+# THIS IS SAMPLE OF THE CONFIGURATION #
+# IT'S NOT A DEFAULT CONFIGURATION, IT'S JUST A REFERENCE TO ALL OPTIONS AND PLUGINS #
+######################################################################################
rpc:
listen: tcp://127.0.0.1:6001
@@ -5,9 +9,6 @@ server:
command: "php tests/psr-worker-bench.php"
user: ""
group: ""
- env:
- "RR_HTTP": "true"
- "RR_RPC": "tcp://127.0.0.1:6001"
relay: "pipes"
relay_timeout: 20s
@@ -133,7 +134,6 @@ redis:
read_only: false
# boltdb simple driver
-#
boltdb:
dir: "."
file: "rr"
@@ -142,7 +142,7 @@ boltdb:
# keys ttl check interval
TTL: 60 # seconds
- # memcached driver
+# memcached driver
memcached:
addr:
- "localhost:11211"
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go
index bb66897f..532bace9 100644
--- a/pkg/worker_watcher/container/interface.go
+++ b/pkg/worker_watcher/container/interface.go
@@ -2,8 +2,12 @@ package container
import "github.com/spiral/roadrunner/v2/pkg/worker"
+// Vector interface represents vector container
type Vector interface {
+ // Enqueue used to put worker to the vector
Enqueue(worker.BaseProcess)
+ // Dequeue used to get worker from the vector
Dequeue() (worker.BaseProcess, bool)
+ // Destroy used to stop releasing the workers
Destroy()
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 3e0633a3..804e4658 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -103,7 +103,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
}
return
case worker.StateWorking: // how??
- ww.container.Enqueue(w)
+ ww.container.Enqueue(w) // put it back, let worker finish the work
continue
case
// all the possible wrong states
@@ -143,11 +143,15 @@ func (ww *workerWatcher) Allocate() error {
return errors.E(op, errors.WorkerAllocate, err)
}
+ // add worker to Wait
ww.addToWatch(sw)
+ // add new worker to the workers slice (to get information about workers in parallel)
ww.workers = append(ww.workers, sw)
+ // unlock Allocate mutex
ww.Unlock()
+ // push the worker to the container
ww.Push(sw)
return nil
}
@@ -184,7 +188,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
ww.container.Destroy()
ww.Unlock()
- tt := time.NewTicker(time.Millisecond * 500)
+ tt := time.NewTicker(time.Millisecond * 100)
defer tt.Stop()
for {
select {