summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-08 23:21:54 +0300
committerValery Piashchynski <[email protected]>2021-02-08 23:21:54 +0300
commitda64d9fbab7d73e203e7dbbb9503f4d422feaab0 (patch)
tree3dc3d5dd4a8c4de7d4b57baf2eeb1089f831bc1c /pkg
parent3e92e3df723ca1c4f152d8526eebfd7184e6fcec (diff)
BaseProcess interface as a return type in the worker_watcher,pool and
worker interface
Diffstat (limited to 'pkg')
-rw-r--r--pkg/pool/interface.go4
-rwxr-xr-xpkg/pool/static_pool.go24
-rwxr-xr-xpkg/pool/static_pool_test.go4
-rwxr-xr-xpkg/pool/supervisor_pool.go4
-rwxr-xr-xpkg/worker/sync_worker.go17
-rwxr-xr-xpkg/worker/worker.go3
-rw-r--r--pkg/worker_watcher/interface.go10
-rw-r--r--pkg/worker_watcher/stack.go58
-rw-r--r--pkg/worker_watcher/stack_test.go2
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go25
10 files changed, 68 insertions, 83 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index 4f7ae595..bfc56c3f 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -19,10 +19,10 @@ type Pool interface {
ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
// Workers returns worker list associated with the pool.
- Workers() (workers []worker.SyncWorker)
+ Workers() (workers []worker.BaseProcess)
// Remove worker from the pool.
- RemoveWorker(worker worker.SyncWorker) error
+ RemoveWorker(worker worker.BaseProcess) error
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index c667dc94..bb68151f 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -18,7 +18,7 @@ import (
const StopRequest = "{\"stop\":true}"
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
type Options func(p *StaticPool)
@@ -125,11 +125,11 @@ func (sp *StaticPool) GetConfig() interface{} {
}
// Workers returns worker list associated with the pool.
-func (sp *StaticPool) Workers() (workers []worker.SyncWorker) {
+func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
return sp.ww.List()
}
-func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error {
+func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
return sp.ww.Remove(wb)
}
@@ -146,7 +146,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, err)
}
- rsp, err := w.Exec(p)
+ rsp, err := w.(worker.SyncWorker).Exec(p)
if err != nil {
return sp.err_encoder(err, w)
}
@@ -176,7 +176,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
return payload.Payload{}, errors.E(op, err)
}
- rsp, err := w.ExecWithTimeout(ctx, p)
+ rsp, err := w.(worker.SyncWorker).ExecWithTimeout(ctx, p)
if err != nil {
return sp.err_encoder(err, w)
}
@@ -195,7 +195,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
return rsp, nil
}
-func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
+func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
const op = errors.Op("static_pool_stop_worker")
w.State().Set(worker.StateInvalid)
err := w.Stop()
@@ -205,7 +205,7 @@ func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
}
// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
+func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error {
const op = errors.Op("static_pool_check_max_jobs")
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err := sp.ww.Allocate()
@@ -218,7 +218,7 @@ func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
return nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
// Get function consumes context with timeout
w, err := sp.ww.Get(ctxGetFree)
if err != nil {
@@ -239,7 +239,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.SyncWorker) (payload.Payload, error) {
+ return func(err error, w worker.BaseProcess) (payload.Payload, error) {
const op = errors.Op("error encoder")
// just push event if on any stage was timeout error
@@ -277,7 +277,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
- return func() (*worker.SyncWorkerImpl, error) {
+ return func() (worker.SyncWorker, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
@@ -311,9 +311,9 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
- var workers []worker.SyncWorker
+ var workers []worker.BaseProcess
// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 6ffb05b3..4c1c90e5 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -649,7 +649,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
// BenchmarkToStringUnsafe-12 566317729 1.91 ns/op 0 B/op 0 allocs/op
// inline BenchmarkToStringUnsafe-12 1000000000 0.295 ns/op 0 B/op 0 allocs/op
func BenchmarkToStringUnsafe(b *testing.B) {
- testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
+ testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
b.ResetTimer()
b.ReportAllocs()
@@ -662,7 +662,7 @@ func BenchmarkToStringUnsafe(b *testing.B) {
// BenchmarkToStringSafe-12 28584489 39.1 ns/op 112 B/op 1 allocs/op
// inline BenchmarkToStringSafe-12 28926276 46.6 ns/op 128 B/op 1 allocs/op
func BenchmarkToStringSafe(b *testing.B) {
- testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
+ testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
b.ResetTimer()
b.ReportAllocs()
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 33438ae6..3618786d 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -99,13 +99,13 @@ func (sp *supervised) GetConfig() interface{} {
return sp.pool.GetConfig()
}
-func (sp *supervised) Workers() (workers []worker.SyncWorker) {
+func (sp *supervised) Workers() (workers []worker.BaseProcess) {
sp.mu.Lock()
defer sp.mu.Unlock()
return sp.pool.Workers()
}
-func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error {
+func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
return sp.pool.RemoveWorker(worker)
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 304c40d6..82a5462a 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -13,32 +13,19 @@ import (
)
// Allocator is responsible for worker allocation in the pool
-type Allocator func() (*SyncWorkerImpl, error)
+type Allocator func() (SyncWorker, error)
type SyncWorkerImpl struct {
process *Process
}
// From creates SyncWorker from BaseProcess
-func From(process *Process) *SyncWorkerImpl {
+func From(process *Process) SyncWorker {
return &SyncWorkerImpl{
process: process,
}
}
-// FromSync creates BaseProcess from SyncWorkerImpl
-func FromSync(w *SyncWorkerImpl) BaseProcess {
- return &Process{
- created: w.process.created,
- events: w.process.events,
- state: w.process.state,
- cmd: w.process.cmd,
- pid: w.process.pid,
- endState: w.process.endState,
- relay: w.process.relay,
- }
-}
-
// Exec payload without TTL timeout.
func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index c315cb2d..0f7ab755 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -40,9 +40,6 @@ type Process struct {
// can be nil while process is not started.
pid int
- // contains information about resulted process state.
- endState *os.ProcessState
-
// communication bus with underlying process.
relay relay.Relay
}
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index ce5011c0..a3552e7e 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -9,13 +9,13 @@ import (
// Watcher is an interface for the Sync workers lifecycle
type Watcher interface {
// Watch used to add workers to the stack
- Watch(workers []worker.SyncWorker) error
+ Watch(workers []worker.BaseProcess) error
// Get provide first free worker
- Get(ctx context.Context) (worker.SyncWorker, error)
+ Get(ctx context.Context) (worker.BaseProcess, error)
// Push enqueues worker back
- Push(w worker.SyncWorker)
+ Push(w worker.BaseProcess)
// Allocate - allocates new worker and put it into the WorkerWatcher
Allocate() error
@@ -24,8 +24,8 @@ type Watcher interface {
Destroy(ctx context.Context)
// WorkersList return all stack w/o removing it from internal storage
- List() []worker.SyncWorker
+ List() []worker.BaseProcess
// RemoveWorker remove worker from the stack
- Remove(wb worker.SyncWorker) error
+ Remove(wb worker.BaseProcess) error
}
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
index 9a0bc6a4..69e2024b 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/stack.go
@@ -9,8 +9,8 @@ import (
)
type Stack struct {
- workers []*worker.SyncWorkerImpl
- mutex sync.RWMutex
+ sync.RWMutex
+ workers []worker.BaseProcess
destroy bool
actualNumOfWorkers uint64
initialNumOfWorkers uint64
@@ -19,15 +19,15 @@ type Stack struct {
func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
w := runtime.NumCPU()
return &Stack{
- workers: make([]*worker.SyncWorkerImpl, 0, w),
+ workers: make([]worker.BaseProcess, 0, w),
actualNumOfWorkers: 0,
initialNumOfWorkers: initialNumOfWorkers,
}
}
func (stack *Stack) Reset() {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
stack.actualNumOfWorkers = 0
stack.workers = nil
}
@@ -35,21 +35,21 @@ func (stack *Stack) Reset() {
// Push worker back to the stack
// If stack in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w worker.BaseProcess) {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
stack.actualNumOfWorkers++
- stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl))
+ stack.workers = append(stack.workers, w)
}
func (stack *Stack) IsEmpty() bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
return len(stack.workers) == 0
}
-func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+func (stack *Stack) Pop() (worker.BaseProcess, bool) {
+ stack.Lock()
+ defer stack.Unlock()
// do not release new stack
if stack.destroy {
@@ -68,8 +68,8 @@ func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
}
func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
for i := 0; i < len(stack.workers); i++ {
// worker in the stack, reallocating
if stack.workers[i].Pid() == pid {
@@ -84,10 +84,10 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
}
// Workers return copy of the workers in the stack
-func (stack *Stack) Workers() []worker.SyncWorker {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
- workersCopy := make([]worker.SyncWorker, 0, 1)
+func (stack *Stack) Workers() []worker.BaseProcess {
+ stack.Lock()
+ defer stack.Unlock()
+ workersCopy := make([]worker.BaseProcess, 0, 1)
// copy
// TODO pointers, copy have no sense
for _, v := range stack.workers {
@@ -100,40 +100,40 @@ func (stack *Stack) Workers() []worker.SyncWorker {
}
func (stack *Stack) isDestroying() bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
return stack.destroy
}
// we also have to give a chance to pool to Push worker (return it)
-func (stack *Stack) Destroy(ctx context.Context) {
- stack.mutex.Lock()
+func (stack *Stack) Destroy(_ context.Context) {
+ stack.Lock()
stack.destroy = true
- stack.mutex.Unlock()
+ stack.Unlock()
tt := time.NewTicker(time.Millisecond * 500)
defer tt.Stop()
for {
select {
case <-tt.C:
- stack.mutex.Lock()
+ stack.Lock()
// that might be one of the workers is working
if stack.initialNumOfWorkers != stack.actualNumOfWorkers {
- stack.mutex.Unlock()
+ stack.Unlock()
continue
}
- stack.mutex.Unlock()
+ stack.Unlock()
// unnecessary mutex, but
// just to make sure. All stack at this moment are in the stack
// Pop operation is blocked, push can't be done, since it's not possible to pop
- stack.mutex.Lock()
+ stack.Lock()
for i := 0; i < len(stack.workers); i++ {
// set state for the stack in the stack (unused at the moment)
stack.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
_ = stack.workers[i].Kill()
}
- stack.mutex.Unlock()
+ stack.Unlock()
// clear
stack.Reset()
return
diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go
index 5287a6dc..769419e4 100644
--- a/pkg/worker_watcher/stack_test.go
+++ b/pkg/worker_watcher/stack_test.go
@@ -12,7 +12,7 @@ import (
func TestNewWorkersStack(t *testing.T) {
stack := NewWorkersStack(0)
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
- assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers)
+ assert.Equal(t, []worker.BaseProcess{}, stack.workers)
}
func TestStack_Push(t *testing.T) {
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index d065bae5..2380c190 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -27,11 +27,11 @@ type workerWatcher struct {
events events.Handler
}
-func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error {
+func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
ww.stack.Push(workers[i])
- go func(swc worker.SyncWorker) {
+ go func(swc worker.BaseProcess) {
ww.wait(swc)
}(workers[i])
}
@@ -39,7 +39,7 @@ func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error {
}
// Get is not a thread safe operation
-func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
+func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// FAST PATH
// thread safe operation
@@ -72,6 +72,10 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
}
switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ return w, nil
case worker.StateRemove:
err := ww.Remove(w)
if err != nil {
@@ -94,9 +98,6 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
_ = w.Kill()
// try to get new worker
continue
- // return only workers in the Ready state
- case worker.StateReady:
- return w, nil
}
case <-ctx.Done():
return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
@@ -105,7 +106,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
}
func (ww *workerWatcher) Allocate() error {
- ww.stack.mutex.Lock()
+ ww.mutex.Lock()
const op = errors.Op("worker_watcher_allocate_new")
sw, err := ww.allocator()
if err != nil {
@@ -113,14 +114,14 @@ func (ww *workerWatcher) Allocate() error {
}
ww.addToWatch(sw)
- ww.stack.mutex.Unlock()
+ ww.mutex.Unlock()
ww.Push(sw)
return nil
}
// Remove
-func (ww *workerWatcher) Remove(wb worker.SyncWorker) error {
+func (ww *workerWatcher) Remove(wb worker.BaseProcess) error {
ww.mutex.Lock()
defer ww.mutex.Unlock()
@@ -139,7 +140,7 @@ func (ww *workerWatcher) Remove(wb worker.SyncWorker) error {
}
// O(1) operation
-func (ww *workerWatcher) Push(w worker.SyncWorker) {
+func (ww *workerWatcher) Push(w worker.BaseProcess) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
ww.stack.Push(w)
@@ -152,7 +153,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) List() []worker.SyncWorker {
+func (ww *workerWatcher) List() []worker.BaseProcess {
return ww.stack.Workers()
}
@@ -183,7 +184,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
}
}
-func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) {
+func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) {
go func() {
ww.wait(wb)
}()