summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-17 19:55:15 +0300
committerValery Piashchynski <[email protected]>2021-08-17 19:55:15 +0300
commitab690ab9c6ae2b00aef1b501e8b17ff02b5da753 (patch)
tree0a58b043605ef1d9b09e75b207c236aacb1ed55a /pkg
parentbd0da830ae345e1ed4a67782bf413673beeba037 (diff)
Update to go 1.17
Add Stat with tests Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pool/static_pool.go58
-rw-r--r--pkg/state/job/state.go13
-rw-r--r--pkg/worker_watcher/container/channel/vec.go19
3 files changed, 62 insertions, 28 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index b20e4242..3eb0714f 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -238,7 +238,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
return w, nil
}
-// Destroy all underlying stack (but let them to complete the task).
+// Destroy all underlying stack (but let them complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
sp.ww.Destroy(ctx)
}
@@ -250,36 +250,48 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
switch {
case errors.Is(errors.ExecTTL, err):
sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
+ w.State().Set(worker.StateInvalid)
+ return nil, err
case errors.Is(errors.SoftJob, err):
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- // TODO suspicious logic, redesign
- err = sp.ww.Allocate()
- if err != nil {
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
- }
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ // if max jobs exceed
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // mark old as invalid and stop
w.State().Set(worker.StateInvalid)
- err = w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
- } else {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
- sp.ww.Release(w)
+
+ return nil, err
}
- }
- w.State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- // kill worker instead of stop, because worker here might be in the broken state (network) which leads us
- // to the error
- errS := w.Kill()
- if errS != nil {
- return nil, errors.E(op, err, errS)
- }
+ // soft jobs errors are allowed, just put the worker back
+ sp.ww.Release(w)
- return nil, errors.E(op, err)
+ return nil, err
+ case errors.Is(errors.Network, err):
+ // in case of network error, we can't stop the worker, we should kill it
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+
+ // kill the worker instead of sending net packet to it
+ _ = w.Kill()
+
+ return nil, err
+ default:
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ // stop the worker, worker here might be in the broken state (network)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, errors.E(op, err)
+ }
}
}
diff --git a/pkg/state/job/state.go b/pkg/state/job/state.go
index e5b142b5..d90118c3 100644
--- a/pkg/state/job/state.go
+++ b/pkg/state/job/state.go
@@ -1,6 +1,17 @@
package job
+// State represents job's state
type State struct {
- Queue string
+ // Pipeline name
+ Pipeline string
+ // Driver name
+ Driver string
+ // Queue name (tube for the beanstalk)
+ Queue string
+ // Active jobs which are consumed from the driver but not handled by the PHP worker yet
Active int64
+ // Delayed jobs
+ Delayed int64
+ // Reserved jobs which are in the driver but not consumed yet
+ Reserved int64
}
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
index 51093978..7fb65a92 100644
--- a/pkg/worker_watcher/container/channel/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -39,6 +39,7 @@ func (v *Vec) Push(w worker.BaseProcess) {
// because in that case, workers in the v.workers channel can be TTL-ed and killed
// but presenting in the channel
default:
+ // Stop Pop operations
v.Lock()
defer v.Unlock()
@@ -48,19 +49,29 @@ func (v *Vec) Push(w worker.BaseProcess) {
2. Violated Get <-> Release operation (how ??)
*/
for i := uint64(0); i < v.len; i++ {
+ /*
+ We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
+ */
wrk := <-v.workers
switch wrk.State().Value() {
- // skip good states
+ // skip good states, put worker back
case worker.StateWorking, worker.StateReady:
// put the worker back
// generally, while send and receive operations are concurrent (from the channel), channel behave
// like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
v.workers <- wrk
continue
+ /*
+ Bad states are here.
+ */
default:
// kill the current worker (just to be sure it's dead)
- _ = wrk.Kill()
- // replace with the new one
+ if wrk != nil {
+ _ = wrk.Kill()
+ }
+ // replace with the new one and return from the loop
+ // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
+ // But this case will be handled in the worker_watcher::Get
v.workers <- w
return
}
@@ -78,7 +89,7 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
}
*/
- if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ if atomic.LoadUint64(&v.destroy) == 1 {
return nil, errors.E(errors.WatcherStopped)
}