summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rwxr-xr-xinternal/protocol.go94
-rwxr-xr-xinternal/state.go122
-rwxr-xr-xinternal/state_test.go27
3 files changed, 243 insertions, 0 deletions
diff --git a/internal/protocol.go b/internal/protocol.go
new file mode 100755
index 00000000..a099ce4d
--- /dev/null
+++ b/internal/protocol.go
@@ -0,0 +1,94 @@
+package internal
+
+import (
+ "os"
+
+ j "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/frame"
+)
+
+var json = j.ConfigCompatibleWithStandardLibrary
+
+type StopCommand struct {
+ Stop bool `json:"stop"`
+}
+
+type pidCommand struct {
+ Pid int `json:"pid"`
+}
+
+func SendControl(rl relay.Relay, payload interface{}) error {
+ const op = errors.Op("send control frame")
+ fr := frame.NewFrame()
+ fr.WriteVersion(frame.VERSION_1)
+ fr.WriteFlags(frame.CONTROL)
+
+ if data, ok := payload.([]byte); ok {
+ // check if payload no more that 4Gb
+ if uint32(len(data)) > ^uint32(0) {
+ return errors.E(op, errors.Str("payload is more that 4gb"))
+ }
+
+ fr.WritePayloadLen(uint32(len(data)))
+ fr.WritePayload(data)
+ fr.WriteCRC()
+
+ err := rl.Send(fr)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ }
+
+ data, err := json.Marshal(payload)
+ if err != nil {
+ return errors.E(op, errors.Errorf("invalid payload: %s", err))
+ }
+
+ fr.WritePayloadLen(uint32(len(data)))
+ fr.WritePayload(data)
+ fr.WriteCRC()
+
+ err = rl.Send(fr)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func FetchPID(rl relay.Relay) (int64, error) {
+ const op = errors.Op("fetchPID")
+ err := SendControl(rl, pidCommand{Pid: os.Getpid()})
+ if err != nil {
+ return 0, errors.E(op, err)
+ }
+
+ frameR := frame.NewFrame()
+ err = rl.Receive(frameR)
+ if !frameR.VerifyCRC() {
+ return 0, errors.E(op, errors.Str("CRC mismatch"))
+ }
+ if err != nil {
+ return 0, errors.E(op, err)
+ }
+ if frameR == nil {
+ return 0, errors.E(op, errors.Str("nil frame received"))
+ }
+
+ flags := frameR.ReadFlags()
+
+ if flags&(byte(frame.CONTROL)) == 0 {
+ return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag"))
+ }
+
+ link := &pidCommand{}
+ err = json.Unmarshal(frameR.Payload(), link)
+ if err != nil {
+ return 0, errors.E(op, err)
+ }
+
+ return int64(link.Pid), nil
+}
diff --git a/internal/state.go b/internal/state.go
new file mode 100755
index 00000000..8f7d939b
--- /dev/null
+++ b/internal/state.go
@@ -0,0 +1,122 @@
+package internal
+
+import (
+ "fmt"
+ "sync/atomic"
+)
+
+// State represents WorkerProcess status and updated time.
+type State interface {
+ fmt.Stringer
+ // Value returns WorkerState value
+ Value() int64
+ // Set sets the WorkerState
+ Set(value int64)
+ // NumJobs shows how many times WorkerProcess was invoked
+ NumExecs() int64
+ // IsActive returns true if WorkerProcess not Inactive or Stopped
+ IsActive() bool
+ // RegisterExec using to registering php executions
+ RegisterExec()
+ // SetLastUsed sets worker last used time
+ SetLastUsed(lu uint64)
+ // LastUsed return worker last used time
+ LastUsed() uint64
+}
+
+const (
+ // StateInactive - no associated process
+ StateInactive int64 = iota
+
+ // StateReady - ready for job.
+ StateReady
+
+ // StateWorking - working on given payload.
+ StateWorking
+
+ // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
+ StateInvalid
+
+ // StateStopping - process is being softly stopped.
+ StateStopping
+
+ StateKilling
+
+ // State of worker, when no need to allocate new one
+ StateDestroyed
+
+ // StateStopped - process has been terminated.
+ StateStopped
+
+ // StateErrored - error WorkerState (can't be used).
+ StateErrored
+
+ StateRemove
+)
+
+type WorkerState struct {
+ value int64
+ numExecs int64
+ // to be lightweight, use UnixNano
+ lastUsed uint64
+}
+
+// Thread safe
+func NewWorkerState(value int64) *WorkerState {
+ return &WorkerState{value: value}
+}
+
+// String returns current WorkerState as string.
+func (s *WorkerState) String() string {
+ switch s.Value() {
+ case StateInactive:
+ return "inactive"
+ case StateReady:
+ return "ready"
+ case StateWorking:
+ return "working"
+ case StateInvalid:
+ return "invalid"
+ case StateStopped:
+ return "stopped"
+ case StateErrored:
+ return "errored"
+ }
+
+ return "undefined"
+}
+
+// NumExecs returns number of registered WorkerProcess execs.
+func (s *WorkerState) NumExecs() int64 {
+ return atomic.LoadInt64(&s.numExecs)
+}
+
+// Value WorkerState returns WorkerState value
+func (s *WorkerState) Value() int64 {
+ return atomic.LoadInt64(&s.value)
+}
+
+// IsActive returns true if WorkerProcess not Inactive or Stopped
+func (s *WorkerState) IsActive() bool {
+ val := s.Value()
+ return val == StateWorking || val == StateReady
+}
+
+// change WorkerState value (status)
+func (s *WorkerState) Set(value int64) {
+ atomic.StoreInt64(&s.value, value)
+}
+
+// register new execution atomically
+func (s *WorkerState) RegisterExec() {
+ atomic.AddInt64(&s.numExecs, 1)
+}
+
+// Update last used time
+func (s *WorkerState) SetLastUsed(lu uint64) {
+ atomic.StoreUint64(&s.lastUsed, lu)
+}
+
+func (s *WorkerState) LastUsed() uint64 {
+ return atomic.LoadUint64(&s.lastUsed)
+}
diff --git a/internal/state_test.go b/internal/state_test.go
new file mode 100755
index 00000000..bdb05825
--- /dev/null
+++ b/internal/state_test.go
@@ -0,0 +1,27 @@
+package internal
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_NewState(t *testing.T) {
+ st := NewWorkerState(StateErrored)
+
+ assert.Equal(t, "errored", st.String())
+
+ assert.Equal(t, "inactive", NewWorkerState(StateInactive).String())
+ assert.Equal(t, "ready", NewWorkerState(StateReady).String())
+ assert.Equal(t, "working", NewWorkerState(StateWorking).String())
+ assert.Equal(t, "stopped", NewWorkerState(StateStopped).String())
+ assert.Equal(t, "undefined", NewWorkerState(1000).String())
+}
+
+func Test_IsActive(t *testing.T) {
+ assert.False(t, NewWorkerState(StateInactive).IsActive())
+ assert.True(t, NewWorkerState(StateReady).IsActive())
+ assert.True(t, NewWorkerState(StateWorking).IsActive())
+ assert.False(t, NewWorkerState(StateStopped).IsActive())
+ assert.False(t, NewWorkerState(StateErrored).IsActive())
+}