summaryrefslogtreecommitdiff
path: root/pool_watcher.go
blob: 6eb614dc4d7102e8bbbba473f9ab3e837877a192 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package roadrunner

import (
	"context"
	"github.com/spiral/roadrunner/v2/util"
	"time"
)

const MB = 1024 * 1024

type supervisedPool struct {
	cfg    SupervisorConfig
	events *util.EventHandler
	pool   Pool
	stopCh chan struct{}
}

func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool {
	return &supervisedPool{
		cfg:    cfg,
		events: events,
		pool:   pool,
		stopCh: make(chan struct{}),
	}
}

func (sp *supervisedPool) Start() {
	go func() {
		watchTout := time.NewTicker(sp.cfg.WatchTick)
		for {
			select {
			case <-sp.stopCh:
				watchTout.Stop()
				return
			// stop here
			case <-watchTout.C:
				sp.control()
			}
		}
	}()
}

func (sp *supervisedPool) Stop() {
	sp.stopCh <- struct{}{}
}

func (sp *supervisedPool) control() {
	now := time.Now()
	ctx := context.TODO()

	// THIS IS A COPY OF WORKERS
	workers := sp.pool.Workers()

	for i := 0; i < len(workers); i++ {
		if workers[i].State().Value() == StateInvalid {
			continue
		}

		s, err := WorkerProcessState(workers[i])
		if err != nil {
			// worker not longer valid for supervision
			continue
		}

		if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
			err = sp.pool.RemoveWorker(ctx, workers[i])
			if err != nil {
				sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
				return
			} else {
				sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
			}

			continue
		}

		if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
			// TODO events
			//sp.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sp.maxWorkerMemory)}
			err = sp.pool.RemoveWorker(ctx, workers[i])
			if err != nil {
				sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
				return
			} else {
				sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
			}

			continue
		}

		// firs we check maxWorker idle
		if sp.cfg.IdleTTL != 0 {
			// then check for the worker state
			if workers[i].State().Value() != StateReady {
				continue
			}

			/*
				Calculate idle time
				If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
				2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
				we are guessing that worker overlap idle time and has to be killed
			*/

			// get last used unix nano
			lu := workers[i].State().LastUsed()

			// convert last used to unixNano and sub time.now
			res := int64(lu) - now.UnixNano()

			// maxWorkerIdle more than diff between now and last used
			if sp.cfg.IdleTTL-res <= 0 {
				err = sp.pool.RemoveWorker(ctx, workers[i])
				if err != nil {
					sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
					return
				} else {
					sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]})
				}
			}
		}

		// the very last step is to calculate pool memory usage (except excluded workers)
		//totalUsedMemory += s.MemoryUsage
	}

	//// if current usage more than max allowed pool memory usage
	//if totalUsedMemory > sp.maxPoolMemory {
	//	sp.pool.Destroy(ctx)
	//}
}