summaryrefslogtreecommitdiff
path: root/state.go
blob: 6c27c4c1cfbad7ac265541495f90dbb0ee7c94ac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package roadrunner

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// State represents worker status and updated time.
type State interface {
	fmt.Stringer

	// Value returns state value
	Value() int64

	// NumExecs shows how many times worker was invoked
	NumExecs() uint64

	// Updated indicates a moment updated last state change
	Updated() time.Time
}

const (
	// StateInactive - no associated process
	StateInactive int64 = iota
	// StateReady - ready for job.
	StateReady
	// StateWorking - working on given payload.
	StateWorking

	// StateDestructing process is being destructed.
	StateDestructing

	// StateStopping - process is being softly stopped.
	StateStopping

	// StateStopped - process has been terminated.
	StateStopped

	// StateErrored - error state (can't be used).
	StateErrored
)

type state struct {
	mu       sync.RWMutex
	value    int64
	numExecs uint64
	updated  time.Time
}

func newState(value int64) *state {
	return &state{value: value, updated: time.Now()}
}

// String returns current state as string.
func (s *state) String() string {
	switch s.value {
	case StateInactive:
		return "inactive"
	case StateReady:
		return "ready"
	case StateWorking:
		return "working"
	case StateStopped:
		return "stopped"
	case StateErrored:
		return "errored"
	}

	return "undefined"
}

// Value state returns state value
func (s *state) Value() int64 {
	s.mu.RLock()
	defer s.mu.RUnlock()

	return s.value
}

// IsActive returns true if worker not Inactive or Stopped
func (s *state) IsActive() bool {
	state := s.Value()
	return state == StateWorking || state == StateReady
}

// Updated indicates a moment updated last state change
func (s *state) Updated() time.Time {
	s.mu.RLock()
	defer s.mu.RUnlock()

	return s.updated
}

func (s *state) NumExecs() uint64 {
	return atomic.LoadUint64(&s.numExecs)
}

// change state value (status)
func (s *state) set(value int64) {
	s.mu.Lock()
	defer s.mu.Unlock()

	s.value = value
	s.updated = time.Now()
}

// register new execution atomically
func (s *state) registerExec() {
	atomic.AddUint64(&s.numExecs, 1)
}