diff options
-rwxr-xr-x | .rr.yaml | 10 | ||||
-rw-r--r-- | pkg/worker_watcher/container/interface.go | 4 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 8 |
3 files changed, 15 insertions, 7 deletions
@@ -1,3 +1,7 @@ +###################################################################################### +# THIS IS SAMPLE OF THE CONFIGURATION # +# IT'S NOT A DEFAULT CONFIGURATION, IT'S JUST A REFERENCE TO ALL OPTIONS AND PLUGINS # +###################################################################################### rpc: listen: tcp://127.0.0.1:6001 @@ -5,9 +9,6 @@ server: command: "php tests/psr-worker-bench.php" user: "" group: "" - env: - "RR_HTTP": "true" - "RR_RPC": "tcp://127.0.0.1:6001" relay: "pipes" relay_timeout: 20s @@ -133,7 +134,6 @@ redis: read_only: false # boltdb simple driver -# boltdb: dir: "." file: "rr" @@ -142,7 +142,7 @@ boltdb: # keys ttl check interval TTL: 60 # seconds - # memcached driver +# memcached driver memcached: addr: - "localhost:11211" diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go index bb66897f..532bace9 100644 --- a/pkg/worker_watcher/container/interface.go +++ b/pkg/worker_watcher/container/interface.go @@ -2,8 +2,12 @@ package container import "github.com/spiral/roadrunner/v2/pkg/worker" +// Vector interface represents vector container type Vector interface { + // Enqueue used to put worker to the vector Enqueue(worker.BaseProcess) + // Dequeue used to get worker from the vector Dequeue() (worker.BaseProcess, bool) + // Destroy used to stop releasing the workers Destroy() } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 3e0633a3..804e4658 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -103,7 +103,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { } return case worker.StateWorking: // how?? - ww.container.Enqueue(w) + ww.container.Enqueue(w) // put it back, let worker finish the work continue case // all the possible wrong states @@ -143,11 +143,15 @@ func (ww *workerWatcher) Allocate() error { return errors.E(op, errors.WorkerAllocate, err) } + // add worker to Wait ww.addToWatch(sw) + // add new worker to the workers slice (to get information about workers in parallel) ww.workers = append(ww.workers, sw) + // unlock Allocate mutex ww.Unlock() + // push the worker to the container ww.Push(sw) return nil } @@ -184,7 +188,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { ww.container.Destroy() ww.Unlock() - tt := time.NewTicker(time.Millisecond * 500) + tt := time.NewTicker(time.Millisecond * 100) defer tt.Stop() for { select { |