diff options
author | Valery Piashchynski <[email protected]> | 2021-02-08 23:21:54 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-08 23:21:54 +0300 |
commit | da64d9fbab7d73e203e7dbbb9503f4d422feaab0 (patch) | |
tree | 3dc3d5dd4a8c4de7d4b57baf2eeb1089f831bc1c /pkg | |
parent | 3e92e3df723ca1c4f152d8526eebfd7184e6fcec (diff) |
BaseProcess interface as a return type in the worker_watcher,pool and
worker interface
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/pool/interface.go | 4 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 24 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 4 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 4 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 17 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 3 | ||||
-rw-r--r-- | pkg/worker_watcher/interface.go | 10 | ||||
-rw-r--r-- | pkg/worker_watcher/stack.go | 58 | ||||
-rw-r--r-- | pkg/worker_watcher/stack_test.go | 2 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 25 |
10 files changed, 68 insertions, 83 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index 4f7ae595..bfc56c3f 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -19,10 +19,10 @@ type Pool interface { ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) // Workers returns worker list associated with the pool. - Workers() (workers []worker.SyncWorker) + Workers() (workers []worker.BaseProcess) // Remove worker from the pool. - RemoveWorker(worker worker.SyncWorker) error + RemoveWorker(worker worker.BaseProcess) error // Destroy all underlying stack (but let them to complete the task). Destroy(ctx context.Context) diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index c667dc94..bb68151f 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -18,7 +18,7 @@ import ( const StopRequest = "{\"stop\":true}" // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error) +type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) type Options func(p *StaticPool) @@ -125,11 +125,11 @@ func (sp *StaticPool) GetConfig() interface{} { } // Workers returns worker list associated with the pool. -func (sp *StaticPool) Workers() (workers []worker.SyncWorker) { +func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { return sp.ww.List() } -func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error { +func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { return sp.ww.Remove(wb) } @@ -146,7 +146,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, err) } - rsp, err := w.Exec(p) + rsp, err := w.(worker.SyncWorker).Exec(p) if err != nil { return sp.err_encoder(err, w) } @@ -176,7 +176,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p return payload.Payload{}, errors.E(op, err) } - rsp, err := w.ExecWithTimeout(ctx, p) + rsp, err := w.(worker.SyncWorker).ExecWithTimeout(ctx, p) if err != nil { return sp.err_encoder(err, w) } @@ -195,7 +195,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p return rsp, nil } -func (sp *StaticPool) stopWorker(w worker.SyncWorker) { +func (sp *StaticPool) stopWorker(w worker.BaseProcess) { const op = errors.Op("static_pool_stop_worker") w.State().Set(worker.StateInvalid) err := w.Stop() @@ -205,7 +205,7 @@ func (sp *StaticPool) stopWorker(w worker.SyncWorker) { } // checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs -func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error { +func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error { const op = errors.Op("static_pool_check_max_jobs") if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err := sp.ww.Allocate() @@ -218,7 +218,7 @@ func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error { return nil } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) { +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { // Get function consumes context with timeout w, err := sp.ww.Get(ctxGetFree) if err != nil { @@ -239,7 +239,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.SyncWorker) (payload.Payload, error) { + return func(err error, w worker.BaseProcess) (payload.Payload, error) { const op = errors.Op("error encoder") // just push event if on any stage was timeout error @@ -277,7 +277,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { - return func() (*worker.SyncWorkerImpl, error) { + return func() (worker.SyncWorker, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...) @@ -311,9 +311,9 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // allocate required number of stack -func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) { +func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) { const op = errors.Op("allocate workers") - var workers []worker.SyncWorker + var workers []worker.BaseProcess // constant number of stack simplify logic for i := uint64(0); i < numWorkers; i++ { diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 6ffb05b3..4c1c90e5 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -649,7 +649,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { // BenchmarkToStringUnsafe-12 566317729 1.91 ns/op 0 B/op 0 allocs/op // inline BenchmarkToStringUnsafe-12 1000000000 0.295 ns/op 0 B/op 0 allocs/op func BenchmarkToStringUnsafe(b *testing.B) { - testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") + testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") b.ResetTimer() b.ReportAllocs() @@ -662,7 +662,7 @@ func BenchmarkToStringUnsafe(b *testing.B) { // BenchmarkToStringSafe-12 28584489 39.1 ns/op 112 B/op 1 allocs/op // inline BenchmarkToStringSafe-12 28926276 46.6 ns/op 128 B/op 1 allocs/op func BenchmarkToStringSafe(b *testing.B) { - testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") + testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") b.ResetTimer() b.ReportAllocs() diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 33438ae6..3618786d 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -99,13 +99,13 @@ func (sp *supervised) GetConfig() interface{} { return sp.pool.GetConfig() } -func (sp *supervised) Workers() (workers []worker.SyncWorker) { +func (sp *supervised) Workers() (workers []worker.BaseProcess) { sp.mu.Lock() defer sp.mu.Unlock() return sp.pool.Workers() } -func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error { +func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { return sp.pool.RemoveWorker(worker) } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 304c40d6..82a5462a 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -13,32 +13,19 @@ import ( ) // Allocator is responsible for worker allocation in the pool -type Allocator func() (*SyncWorkerImpl, error) +type Allocator func() (SyncWorker, error) type SyncWorkerImpl struct { process *Process } // From creates SyncWorker from BaseProcess -func From(process *Process) *SyncWorkerImpl { +func From(process *Process) SyncWorker { return &SyncWorkerImpl{ process: process, } } -// FromSync creates BaseProcess from SyncWorkerImpl -func FromSync(w *SyncWorkerImpl) BaseProcess { - return &Process{ - created: w.process.created, - events: w.process.events, - state: w.process.state, - cmd: w.process.cmd, - pid: w.process.pid, - endState: w.process.endState, - relay: w.process.relay, - } -} - // Exec payload without TTL timeout. func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec") diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index c315cb2d..0f7ab755 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -40,9 +40,6 @@ type Process struct { // can be nil while process is not started. pid int - // contains information about resulted process state. - endState *os.ProcessState - // communication bus with underlying process. relay relay.Relay } diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go index ce5011c0..a3552e7e 100644 --- a/pkg/worker_watcher/interface.go +++ b/pkg/worker_watcher/interface.go @@ -9,13 +9,13 @@ import ( // Watcher is an interface for the Sync workers lifecycle type Watcher interface { // Watch used to add workers to the stack - Watch(workers []worker.SyncWorker) error + Watch(workers []worker.BaseProcess) error // Get provide first free worker - Get(ctx context.Context) (worker.SyncWorker, error) + Get(ctx context.Context) (worker.BaseProcess, error) // Push enqueues worker back - Push(w worker.SyncWorker) + Push(w worker.BaseProcess) // Allocate - allocates new worker and put it into the WorkerWatcher Allocate() error @@ -24,8 +24,8 @@ type Watcher interface { Destroy(ctx context.Context) // WorkersList return all stack w/o removing it from internal storage - List() []worker.SyncWorker + List() []worker.BaseProcess // RemoveWorker remove worker from the stack - Remove(wb worker.SyncWorker) error + Remove(wb worker.BaseProcess) error } diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index 9a0bc6a4..69e2024b 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -9,8 +9,8 @@ import ( ) type Stack struct { - workers []*worker.SyncWorkerImpl - mutex sync.RWMutex + sync.RWMutex + workers []worker.BaseProcess destroy bool actualNumOfWorkers uint64 initialNumOfWorkers uint64 @@ -19,15 +19,15 @@ type Stack struct { func NewWorkersStack(initialNumOfWorkers uint64) *Stack { w := runtime.NumCPU() return &Stack{ - workers: make([]*worker.SyncWorkerImpl, 0, w), + workers: make([]worker.BaseProcess, 0, w), actualNumOfWorkers: 0, initialNumOfWorkers: initialNumOfWorkers, } } func (stack *Stack) Reset() { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() stack.actualNumOfWorkers = 0 stack.workers = nil } @@ -35,21 +35,21 @@ func (stack *Stack) Reset() { // Push worker back to the stack // If stack in destroy state, Push will provide 100ms window to unlock the mutex func (stack *Stack) Push(w worker.BaseProcess) { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() stack.actualNumOfWorkers++ - stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl)) + stack.workers = append(stack.workers, w) } func (stack *Stack) IsEmpty() bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() return len(stack.workers) == 0 } -func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { - stack.mutex.Lock() - defer stack.mutex.Unlock() +func (stack *Stack) Pop() (worker.BaseProcess, bool) { + stack.Lock() + defer stack.Unlock() // do not release new stack if stack.destroy { @@ -68,8 +68,8 @@ func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { } func (stack *Stack) FindAndRemoveByPid(pid int64) bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() for i := 0; i < len(stack.workers); i++ { // worker in the stack, reallocating if stack.workers[i].Pid() == pid { @@ -84,10 +84,10 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { } // Workers return copy of the workers in the stack -func (stack *Stack) Workers() []worker.SyncWorker { - stack.mutex.Lock() - defer stack.mutex.Unlock() - workersCopy := make([]worker.SyncWorker, 0, 1) +func (stack *Stack) Workers() []worker.BaseProcess { + stack.Lock() + defer stack.Unlock() + workersCopy := make([]worker.BaseProcess, 0, 1) // copy // TODO pointers, copy have no sense for _, v := range stack.workers { @@ -100,40 +100,40 @@ func (stack *Stack) Workers() []worker.SyncWorker { } func (stack *Stack) isDestroying() bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() return stack.destroy } // we also have to give a chance to pool to Push worker (return it) -func (stack *Stack) Destroy(ctx context.Context) { - stack.mutex.Lock() +func (stack *Stack) Destroy(_ context.Context) { + stack.Lock() stack.destroy = true - stack.mutex.Unlock() + stack.Unlock() tt := time.NewTicker(time.Millisecond * 500) defer tt.Stop() for { select { case <-tt.C: - stack.mutex.Lock() + stack.Lock() // that might be one of the workers is working if stack.initialNumOfWorkers != stack.actualNumOfWorkers { - stack.mutex.Unlock() + stack.Unlock() continue } - stack.mutex.Unlock() + stack.Unlock() // unnecessary mutex, but // just to make sure. All stack at this moment are in the stack // Pop operation is blocked, push can't be done, since it's not possible to pop - stack.mutex.Lock() + stack.Lock() for i := 0; i < len(stack.workers); i++ { // set state for the stack in the stack (unused at the moment) stack.workers[i].State().Set(worker.StateDestroyed) // kill the worker _ = stack.workers[i].Kill() } - stack.mutex.Unlock() + stack.Unlock() // clear stack.Reset() return diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go index 5287a6dc..769419e4 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/stack_test.go @@ -12,7 +12,7 @@ import ( func TestNewWorkersStack(t *testing.T) { stack := NewWorkersStack(0) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) - assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers) + assert.Equal(t, []worker.BaseProcess{}, stack.workers) } func TestStack_Push(t *testing.T) { diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index d065bae5..2380c190 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -27,11 +27,11 @@ type workerWatcher struct { events events.Handler } -func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error { +func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { ww.stack.Push(workers[i]) - go func(swc worker.SyncWorker) { + go func(swc worker.BaseProcess) { ww.wait(swc) }(workers[i]) } @@ -39,7 +39,7 @@ func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error { } // Get is not a thread safe operation -func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { +func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("worker_watcher_get_free_worker") // FAST PATH // thread safe operation @@ -72,6 +72,10 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { } switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + return w, nil case worker.StateRemove: err := ww.Remove(w) if err != nil { @@ -94,9 +98,6 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { _ = w.Kill() // try to get new worker continue - // return only workers in the Ready state - case worker.StateReady: - return w, nil } case <-ctx.Done(): return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) @@ -105,7 +106,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { } func (ww *workerWatcher) Allocate() error { - ww.stack.mutex.Lock() + ww.mutex.Lock() const op = errors.Op("worker_watcher_allocate_new") sw, err := ww.allocator() if err != nil { @@ -113,14 +114,14 @@ func (ww *workerWatcher) Allocate() error { } ww.addToWatch(sw) - ww.stack.mutex.Unlock() + ww.mutex.Unlock() ww.Push(sw) return nil } // Remove -func (ww *workerWatcher) Remove(wb worker.SyncWorker) error { +func (ww *workerWatcher) Remove(wb worker.BaseProcess) error { ww.mutex.Lock() defer ww.mutex.Unlock() @@ -139,7 +140,7 @@ func (ww *workerWatcher) Remove(wb worker.SyncWorker) error { } // O(1) operation -func (ww *workerWatcher) Push(w worker.SyncWorker) { +func (ww *workerWatcher) Push(w worker.BaseProcess) { ww.mutex.Lock() defer ww.mutex.Unlock() ww.stack.Push(w) @@ -152,7 +153,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) List() []worker.SyncWorker { +func (ww *workerWatcher) List() []worker.BaseProcess { return ww.stack.Workers() } @@ -183,7 +184,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { } } -func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) { +func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { go func() { ww.wait(wb) }() |