diff options
-rw-r--r-- | pkg/pool/interface.go | 2 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 4 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 2 | ||||
-rw-r--r-- | pkg/worker_watcher/interface.go | 4 | ||||
-rw-r--r-- | pkg/worker_watcher/stack.go | 4 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 6 | ||||
-rw-r--r-- | plugins/http/plugin.go | 2 | ||||
-rw-r--r-- | tests/plugins/informer/test_plugin.go | 2 |
8 files changed, 13 insertions, 13 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index f3fe4065..4f7ae595 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -19,7 +19,7 @@ type Pool interface { ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) // Workers returns worker list associated with the pool. - Workers() (workers []*worker.SyncWorkerImpl) + Workers() (workers []worker.SyncWorker) // Remove worker from the pool. RemoveWorker(worker worker.SyncWorker) error diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index bb416b29..7f66eaac 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -122,7 +122,7 @@ func (sp *StaticPool) GetConfig() interface{} { } // Workers returns worker list associated with the pool. -func (sp *StaticPool) Workers() (workers []*worker.SyncWorkerImpl) { +func (sp *StaticPool) Workers() (workers []worker.SyncWorker) { return sp.ww.WorkersList() } @@ -216,7 +216,7 @@ func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error { return nil } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (*worker.SyncWorkerImpl, error) { +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) { // GetFreeWorker function consumes context with timeout w, err := sp.ww.GetFreeWorker(ctxGetFree) if err != nil { diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 2bae8f9e..583d05b4 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -100,7 +100,7 @@ func (sp *supervised) GetConfig() interface{} { return sp.pool.GetConfig() } -func (sp *supervised) Workers() (workers []*worker.SyncWorkerImpl) { +func (sp *supervised) Workers() (workers []worker.SyncWorker) { sp.mu.Lock() defer sp.mu.Unlock() return sp.pool.Workers() diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go index 13991541..927aa270 100644 --- a/pkg/worker_watcher/interface.go +++ b/pkg/worker_watcher/interface.go @@ -11,7 +11,7 @@ type Watcher interface { AddToWatch(workers []worker.SyncWorker) error // GetFreeWorker provide first free worker - GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) + GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) // PutWorker enqueues worker back PushWorker(w worker.SyncWorker) @@ -23,7 +23,7 @@ type Watcher interface { Destroy(ctx context.Context) // WorkersList return all stack w/o removing it from internal storage - WorkersList() []*worker.SyncWorkerImpl + WorkersList() []worker.SyncWorker // RemoveWorker remove worker from the stack RemoveWorker(wb worker.SyncWorker) error diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index 2d23d0e9..d76f4d8f 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -85,10 +85,10 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { } // Workers return copy of the workers in the stack -func (stack *Stack) Workers() []*worker.SyncWorkerImpl { +func (stack *Stack) Workers() []worker.SyncWorker { stack.mutex.Lock() defer stack.mutex.Unlock() - workersCopy := make([]*worker.SyncWorkerImpl, 0, 1) + workersCopy := make([]worker.SyncWorker, 0, 1) // copy for _, v := range stack.workers { if v != nil { diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index f87bd021..2c3d512d 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -39,7 +39,7 @@ func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) { +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation w, stop := ww.stack.Pop() @@ -127,11 +127,11 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []*worker.SyncWorkerImpl { +func (ww *workerWatcher) WorkersList() []worker.SyncWorker { return ww.stack.Workers() } -func (ww *workerWatcher) wait(w worker.SyncWorker) { +func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 144148af..4d64ac6d 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -306,7 +306,7 @@ func (s *Plugin) Workers() []worker.BaseProcess { workers := s.pool.Workers() baseWorkers := make([]worker.BaseProcess, 0, len(workers)) for i := 0; i < len(workers); i++ { - baseWorkers = append(baseWorkers, worker.FromSync(workers[i])) + baseWorkers = append(baseWorkers, worker.FromSync(workers[i].(*worker.SyncWorkerImpl))) } return baseWorkers } diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go index 2e5af988..7436a7fb 100644 --- a/tests/plugins/informer/test_plugin.go +++ b/tests/plugins/informer/test_plugin.go @@ -58,7 +58,7 @@ func (p1 *Plugin1) Workers() []worker.BaseProcess { workers := p.Workers() baseWorkers := make([]worker.BaseProcess, 0, len(workers)) for i := 0; i < len(workers); i++ { - baseWorkers = append(baseWorkers, worker.FromSync(workers[i])) + baseWorkers = append(baseWorkers, worker.FromSync(workers[i].(*worker.SyncWorkerImpl))) } return baseWorkers |