summaryrefslogtreecommitdiff
path: root/state.go
blob: af29de5039bee5fad718d667dfa3bcce03b6aa6b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package roadrunner

import (
	"fmt"
	"sync/atomic"
)

// State represents worker status and updated time.
type State interface {
	fmt.Stringer

	// Value returns state value
	Value() int64

	// NumJobs shows how many times worker was invoked
	NumExecs() int64
}

const (
	// StateInactive - no associated process
	StateInactive int64 = iota

	// StateReady - ready for job.
	StateReady

	// StateWorking - working on given payload.
	StateWorking

	// StateStreaming - indicates that worker is streaming the data at the moment.
	StateStreaming

	// StateStopping - process is being softly stopped.
	StateStopping

	// StateStopped - process has been terminated.
	StateStopped

	// StateErrored - error state (can't be used).
	StateErrored
)

type state struct {
	value    int64
	numExecs int64
}

func newState(value int64) *state {
	return &state{value: value}
}

// String returns current state as string.
func (s *state) String() string {
	switch s.Value() {
	case StateInactive:
		return "inactive"
	case StateReady:
		return "ready"
	case StateWorking:
		return "working"
	case StateStreaming:
		return "streaming"
	case StateStopped:
		return "stopped"
	case StateErrored:
		return "errored"
	}

	return "undefined"
}

// Value state returns state value
func (s *state) Value() int64 {
	return atomic.LoadInt64(&s.value)
}

// IsActive returns true if worker not Inactive or Stopped
func (s *state) IsActive() bool {
	state := s.Value()
	return state == StateWorking || state == StateReady
}

func (s *state) NumExecs() int64 {
	return atomic.LoadInt64(&s.numExecs)
}

// change state value (status)
func (s *state) set(value int64) {
	atomic.StoreInt64(&s.value, value)
}

// register new execution atomically
func (s *state) registerExec() {
	atomic.AddInt64(&s.numExecs, 1)
}