blob: 2918f396fe83a0838761ccd794e3cd317100376a (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
package roadrunner
import (
"sync"
"sync/atomic"
"time"
)
// State represents worker status and updated time.
type State interface {
// Value returns state value
Value() int64 //todo: change to state value
// NumExecs shows how many times worker was invoked
NumExecs() uint64
// Updated indicates a moment updated last state change
Updated() time.Time
}
const (
// StateInactive - no associated process
StateInactive int64 = iota
// StateReady - ready for job.
StateReady
// StateWorking - working on given payload.
StateWorking
// StateStopped - process has been terminated
StateStopped
// StateErrored - error state (can't be used)
StateErrored
)
type state struct {
mu sync.RWMutex
value int64
numExecs uint64
updated time.Time
}
func newState(value int64) *state {
return &state{value: value, updated: time.Now()}
}
// String returns current state as string.
func (s *state) String() string {
switch s.value {
case StateInactive:
return "inactive"
case StateReady:
return "ready"
case StateWorking:
return "working"
case StateStopped:
return "stopped"
case StateErrored:
return "errored"
}
return "undefined"
}
// Value state returns state value
func (s *state) Value() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.value
}
// IsActive returns true if worker not Inactive or Stopped
func (s *state) IsActive() bool {
state := s.Value()
return state == StateWorking || state == StateReady
}
// Updated indicates a moment updated last state change
func (s *state) Updated() time.Time {
s.mu.RLock()
defer s.mu.RUnlock()
return s.updated
}
func (s *state) NumExecs() uint64 {
return atomic.LoadUint64(&s.numExecs)
}
// change state value (status)
func (s *state) set(value int64) {
s.mu.Lock()
defer s.mu.Unlock()
s.value = value
s.updated = time.Now()
}
// register new execution atomically
func (s *state) registerExec() {
atomic.AddUint64(&s.numExecs, 1)
}
|