summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/pool/interface.go2
-rwxr-xr-xpkg/pool/static_pool.go4
-rwxr-xr-xpkg/pool/supervisor_pool.go2
-rw-r--r--pkg/worker_watcher/interface.go4
-rw-r--r--pkg/worker_watcher/stack.go4
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go6
-rw-r--r--plugins/http/plugin.go2
-rw-r--r--tests/plugins/informer/test_plugin.go2
8 files changed, 13 insertions, 13 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index f3fe4065..4f7ae595 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -19,7 +19,7 @@ type Pool interface {
ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
// Workers returns worker list associated with the pool.
- Workers() (workers []*worker.SyncWorkerImpl)
+ Workers() (workers []worker.SyncWorker)
// Remove worker from the pool.
RemoveWorker(worker worker.SyncWorker) error
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index bb416b29..7f66eaac 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -122,7 +122,7 @@ func (sp *StaticPool) GetConfig() interface{} {
}
// Workers returns worker list associated with the pool.
-func (sp *StaticPool) Workers() (workers []*worker.SyncWorkerImpl) {
+func (sp *StaticPool) Workers() (workers []worker.SyncWorker) {
return sp.ww.WorkersList()
}
@@ -216,7 +216,7 @@ func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
return nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (*worker.SyncWorkerImpl, error) {
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
// GetFreeWorker function consumes context with timeout
w, err := sp.ww.GetFreeWorker(ctxGetFree)
if err != nil {
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 2bae8f9e..583d05b4 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -100,7 +100,7 @@ func (sp *supervised) GetConfig() interface{} {
return sp.pool.GetConfig()
}
-func (sp *supervised) Workers() (workers []*worker.SyncWorkerImpl) {
+func (sp *supervised) Workers() (workers []worker.SyncWorker) {
sp.mu.Lock()
defer sp.mu.Unlock()
return sp.pool.Workers()
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index 13991541..927aa270 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -11,7 +11,7 @@ type Watcher interface {
AddToWatch(workers []worker.SyncWorker) error
// GetFreeWorker provide first free worker
- GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error)
+ GetFreeWorker(ctx context.Context) (worker.SyncWorker, error)
// PutWorker enqueues worker back
PushWorker(w worker.SyncWorker)
@@ -23,7 +23,7 @@ type Watcher interface {
Destroy(ctx context.Context)
// WorkersList return all stack w/o removing it from internal storage
- WorkersList() []*worker.SyncWorkerImpl
+ WorkersList() []worker.SyncWorker
// RemoveWorker remove worker from the stack
RemoveWorker(wb worker.SyncWorker) error
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
index 2d23d0e9..d76f4d8f 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/stack.go
@@ -85,10 +85,10 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
}
// Workers return copy of the workers in the stack
-func (stack *Stack) Workers() []*worker.SyncWorkerImpl {
+func (stack *Stack) Workers() []worker.SyncWorker {
stack.mutex.Lock()
defer stack.mutex.Unlock()
- workersCopy := make([]*worker.SyncWorkerImpl, 0, 1)
+ workersCopy := make([]worker.SyncWorker, 0, 1)
// copy
for _, v := range stack.workers {
if v != nil {
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index f87bd021..2c3d512d 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -39,7 +39,7 @@ func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error {
return nil
}
-func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) {
+func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
w, stop := ww.stack.Pop()
@@ -127,11 +127,11 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) WorkersList() []*worker.SyncWorkerImpl {
+func (ww *workerWatcher) WorkersList() []worker.SyncWorker {
return ww.stack.Workers()
}
-func (ww *workerWatcher) wait(w worker.SyncWorker) {
+func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 144148af..4d64ac6d 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -306,7 +306,7 @@ func (s *Plugin) Workers() []worker.BaseProcess {
workers := s.pool.Workers()
baseWorkers := make([]worker.BaseProcess, 0, len(workers))
for i := 0; i < len(workers); i++ {
- baseWorkers = append(baseWorkers, worker.FromSync(workers[i]))
+ baseWorkers = append(baseWorkers, worker.FromSync(workers[i].(*worker.SyncWorkerImpl)))
}
return baseWorkers
}
diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go
index 2e5af988..7436a7fb 100644
--- a/tests/plugins/informer/test_plugin.go
+++ b/tests/plugins/informer/test_plugin.go
@@ -58,7 +58,7 @@ func (p1 *Plugin1) Workers() []worker.BaseProcess {
workers := p.Workers()
baseWorkers := make([]worker.BaseProcess, 0, len(workers))
for i := 0; i < len(workers); i++ {
- baseWorkers = append(baseWorkers, worker.FromSync(workers[i]))
+ baseWorkers = append(baseWorkers, worker.FromSync(workers[i].(*worker.SyncWorkerImpl)))
}
return baseWorkers