diff options
Diffstat (limited to 'internal')
-rwxr-xr-x | internal/protocol.go | 94 | ||||
-rwxr-xr-x | internal/state.go | 122 | ||||
-rwxr-xr-x | internal/state_test.go | 27 |
3 files changed, 243 insertions, 0 deletions
diff --git a/internal/protocol.go b/internal/protocol.go new file mode 100755 index 00000000..a099ce4d --- /dev/null +++ b/internal/protocol.go @@ -0,0 +1,94 @@ +package internal + +import ( + "os" + + j "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/frame" +) + +var json = j.ConfigCompatibleWithStandardLibrary + +type StopCommand struct { + Stop bool `json:"stop"` +} + +type pidCommand struct { + Pid int `json:"pid"` +} + +func SendControl(rl relay.Relay, payload interface{}) error { + const op = errors.Op("send control frame") + fr := frame.NewFrame() + fr.WriteVersion(frame.VERSION_1) + fr.WriteFlags(frame.CONTROL) + + if data, ok := payload.([]byte); ok { + // check if payload no more that 4Gb + if uint32(len(data)) > ^uint32(0) { + return errors.E(op, errors.Str("payload is more that 4gb")) + } + + fr.WritePayloadLen(uint32(len(data))) + fr.WritePayload(data) + fr.WriteCRC() + + err := rl.Send(fr) + if err != nil { + return errors.E(op, err) + } + return nil + } + + data, err := json.Marshal(payload) + if err != nil { + return errors.E(op, errors.Errorf("invalid payload: %s", err)) + } + + fr.WritePayloadLen(uint32(len(data))) + fr.WritePayload(data) + fr.WriteCRC() + + err = rl.Send(fr) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func FetchPID(rl relay.Relay) (int64, error) { + const op = errors.Op("fetchPID") + err := SendControl(rl, pidCommand{Pid: os.Getpid()}) + if err != nil { + return 0, errors.E(op, err) + } + + frameR := frame.NewFrame() + err = rl.Receive(frameR) + if !frameR.VerifyCRC() { + return 0, errors.E(op, errors.Str("CRC mismatch")) + } + if err != nil { + return 0, errors.E(op, err) + } + if frameR == nil { + return 0, errors.E(op, errors.Str("nil frame received")) + } + + flags := frameR.ReadFlags() + + if flags&(byte(frame.CONTROL)) == 0 { + return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag")) + } + + link := &pidCommand{} + err = json.Unmarshal(frameR.Payload(), link) + if err != nil { + return 0, errors.E(op, err) + } + + return int64(link.Pid), nil +} diff --git a/internal/state.go b/internal/state.go new file mode 100755 index 00000000..8f7d939b --- /dev/null +++ b/internal/state.go @@ -0,0 +1,122 @@ +package internal + +import ( + "fmt" + "sync/atomic" +) + +// State represents WorkerProcess status and updated time. +type State interface { + fmt.Stringer + // Value returns WorkerState value + Value() int64 + // Set sets the WorkerState + Set(value int64) + // NumJobs shows how many times WorkerProcess was invoked + NumExecs() int64 + // IsActive returns true if WorkerProcess not Inactive or Stopped + IsActive() bool + // RegisterExec using to registering php executions + RegisterExec() + // SetLastUsed sets worker last used time + SetLastUsed(lu uint64) + // LastUsed return worker last used time + LastUsed() uint64 +} + +const ( + // StateInactive - no associated process + StateInactive int64 = iota + + // StateReady - ready for job. + StateReady + + // StateWorking - working on given payload. + StateWorking + + // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. + StateInvalid + + // StateStopping - process is being softly stopped. + StateStopping + + StateKilling + + // State of worker, when no need to allocate new one + StateDestroyed + + // StateStopped - process has been terminated. + StateStopped + + // StateErrored - error WorkerState (can't be used). + StateErrored + + StateRemove +) + +type WorkerState struct { + value int64 + numExecs int64 + // to be lightweight, use UnixNano + lastUsed uint64 +} + +// Thread safe +func NewWorkerState(value int64) *WorkerState { + return &WorkerState{value: value} +} + +// String returns current WorkerState as string. +func (s *WorkerState) String() string { + switch s.Value() { + case StateInactive: + return "inactive" + case StateReady: + return "ready" + case StateWorking: + return "working" + case StateInvalid: + return "invalid" + case StateStopped: + return "stopped" + case StateErrored: + return "errored" + } + + return "undefined" +} + +// NumExecs returns number of registered WorkerProcess execs. +func (s *WorkerState) NumExecs() int64 { + return atomic.LoadInt64(&s.numExecs) +} + +// Value WorkerState returns WorkerState value +func (s *WorkerState) Value() int64 { + return atomic.LoadInt64(&s.value) +} + +// IsActive returns true if WorkerProcess not Inactive or Stopped +func (s *WorkerState) IsActive() bool { + val := s.Value() + return val == StateWorking || val == StateReady +} + +// change WorkerState value (status) +func (s *WorkerState) Set(value int64) { + atomic.StoreInt64(&s.value, value) +} + +// register new execution atomically +func (s *WorkerState) RegisterExec() { + atomic.AddInt64(&s.numExecs, 1) +} + +// Update last used time +func (s *WorkerState) SetLastUsed(lu uint64) { + atomic.StoreUint64(&s.lastUsed, lu) +} + +func (s *WorkerState) LastUsed() uint64 { + return atomic.LoadUint64(&s.lastUsed) +} diff --git a/internal/state_test.go b/internal/state_test.go new file mode 100755 index 00000000..bdb05825 --- /dev/null +++ b/internal/state_test.go @@ -0,0 +1,27 @@ +package internal + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_NewState(t *testing.T) { + st := NewWorkerState(StateErrored) + + assert.Equal(t, "errored", st.String()) + + assert.Equal(t, "inactive", NewWorkerState(StateInactive).String()) + assert.Equal(t, "ready", NewWorkerState(StateReady).String()) + assert.Equal(t, "working", NewWorkerState(StateWorking).String()) + assert.Equal(t, "stopped", NewWorkerState(StateStopped).String()) + assert.Equal(t, "undefined", NewWorkerState(1000).String()) +} + +func Test_IsActive(t *testing.T) { + assert.False(t, NewWorkerState(StateInactive).IsActive()) + assert.True(t, NewWorkerState(StateReady).IsActive()) + assert.True(t, NewWorkerState(StateWorking).IsActive()) + assert.False(t, NewWorkerState(StateStopped).IsActive()) + assert.False(t, NewWorkerState(StateErrored).IsActive()) +} |