summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bst/bst_test.go43
-rw-r--r--codecov.yml6
-rw-r--r--events/events.go12
-rw-r--r--events/events_test.go92
-rw-r--r--events/eventsbus.go2
-rw-r--r--state/job/state.go19
-rw-r--r--state/process/state.go20
7 files changed, 141 insertions, 53 deletions
diff --git a/bst/bst_test.go b/bst/bst_test.go
index 2271508c..2afbee10 100644
--- a/bst/bst_test.go
+++ b/bst/bst_test.go
@@ -40,6 +40,49 @@ func TestNewBST(t *testing.T) {
assert.Len(t, exist3, 100)
}
+func TestBSTContains(t *testing.T) {
+ // create a new bst
+ g := NewBST()
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments2")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments3")
+ }
+
+ exist := g.Contains("comments")
+ assert.True(t, exist)
+
+ exist2 := g.Contains("comments2")
+ assert.True(t, exist2)
+
+ exist3 := g.Contains("comments3")
+ assert.True(t, exist3)
+
+ exist4 := g.Contains("comments5")
+ assert.False(t, exist4)
+
+ exist5 := g.Contains("comments6")
+ assert.False(t, exist5)
+}
+
+func TestBSTRemove(t *testing.T) {
+ // create a new bst
+ g := NewBST()
+ u := uuid.NewString()
+ g.Insert(u, "comments")
+ g.Remove(u, "comments")
+
+ res := g.Contains("comments")
+ assert.False(t, res)
+}
+
func BenchmarkGraph(b *testing.B) {
g := NewBST()
diff --git a/codecov.yml b/codecov.yml
index 82e69052..7554dabf 100644
--- a/codecov.yml
+++ b/codecov.yml
@@ -14,13 +14,17 @@ coverage:
# do not include tests folders
ignore:
- "common"
- - "internal"
+ - "internal/protocol.go"
- "proto"
- "utils"
- "tests"
- "systemd"
- "doc"
+ # is not ready yet
+ - "worker_watcher/container/queue"
- "bst/bst_test.go"
+ - "bst/doc.go"
+ - "bst/interface.go"
- "pool/static_pool_test.go"
- "pool/supervisor_test.go"
- "transport/pipe/pipe_factory_spawn_test.go"
diff --git a/events/events.go b/events/events.go
index 5a417e7f..0d6483e3 100644
--- a/events/events.go
+++ b/events/events.go
@@ -7,8 +7,6 @@ const (
EventWorkerConstruct EventType = iota
// EventWorkerDestruct thrown after worker destruction.
EventWorkerDestruct
- // EventSupervisorError triggered when supervisor can not complete work.
- EventSupervisorError
// EventWorkerProcessExit triggered on process wait exit
EventWorkerProcessExit
// EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
@@ -21,12 +19,8 @@ const (
EventIdleTTL
// EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
EventExecTTL
- // EventPoolRestart triggered when pool restart is needed
- EventPoolRestart
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
EventWorkerError
- // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
- EventWorkerLog
// EventWorkerStderr is the worker standard error output
EventWorkerStderr
// EventWorkerWaitExit is the worker exit event
@@ -41,8 +35,6 @@ func (et EventType) String() string {
return "EventWorkerConstruct"
case EventWorkerDestruct:
return "EventWorkerDestruct"
- case EventSupervisorError:
- return "EventSupervisorError"
case EventNoFreeWorkers:
return "EventNoFreeWorkers"
case EventMaxMemory:
@@ -53,12 +45,8 @@ func (et EventType) String() string {
return "EventIdleTTL"
case EventExecTTL:
return "EventExecTTL"
- case EventPoolRestart:
- return "EventPoolRestart"
case EventWorkerError:
return "EventWorkerError"
- case EventWorkerLog:
- return "EventWorkerLog"
case EventWorkerStderr:
return "EventWorkerStderr"
case EventWorkerWaitExit:
diff --git a/events/events_test.go b/events/events_test.go
index f7cb4205..62ddd903 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -21,6 +21,8 @@ func TestEvenHandler(t *testing.T) {
require.Equal(t, "foo", evt.Message())
require.Equal(t, "http", evt.Plugin())
require.Equal(t, "EventWorkerError", evt.Type().String())
+
+ eh.Unsubscribe(id)
}
func TestEvenHandler2(t *testing.T) {
@@ -58,6 +60,9 @@ func TestEvenHandler2(t *testing.T) {
l = eh.Len()
require.Equal(t, uint(0), l)
+
+ eh.Unsubscribe(id)
+ eh2.Unsubscribe(id2)
}
func TestEvenHandler3(t *testing.T) {
@@ -67,6 +72,8 @@ func TestEvenHandler3(t *testing.T) {
ch := make(chan Event, 100)
err := eh.SubscribeP(id, "EventWorkerError", ch)
require.Error(t, err)
+
+ eh.Unsubscribe(id)
}
func TestEvenHandler4(t *testing.T) {
@@ -75,6 +82,8 @@ func TestEvenHandler4(t *testing.T) {
err := eh.SubscribeP(id, "EventWorkerError", nil)
require.Error(t, err)
+
+ eh.Unsubscribe(id)
}
func TestEvenHandler5(t *testing.T) {
@@ -91,6 +100,8 @@ func TestEvenHandler5(t *testing.T) {
require.Equal(t, "foo", evt.Message())
require.Equal(t, "http", evt.Plugin())
require.Equal(t, "EventWorkerError", evt.Type().String())
+
+ eh.Unsubscribe(id)
}
type MySuperEvent uint32
@@ -123,4 +134,85 @@ func TestEvenHandler6(t *testing.T) {
require.Equal(t, "foo", evt.Message())
require.Equal(t, "http", evt.Plugin())
require.Equal(t, "EventHTTPError", evt.Type().String())
+
+ eh.Unsubscribe(id)
+}
+
+func TestEvenHandler7(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeAll(id, ch)
+ require.NoError(t, err)
+
+ eh.Send(NewEvent(EventHTTPError, "http", "foo"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventHTTPError", evt.Type().String())
+
+ eh.Unsubscribe(id)
+}
+
+func TestEvenHandler8(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ err := eh.SubscribeAll(id, nil)
+ require.Error(t, err)
+
+ eh.Unsubscribe(id)
+}
+
+func TestEvenHandler9(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "http.EventWorkerError", ch)
+ require.NoError(t, err)
+
+ eh.Send(NewEvent(EventWorkerError, "http", "foo"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventWorkerError", evt.Type().String())
+
+ eh.UnsubscribeP(id, "http.EventWorkerError")
+
+ eh.Send(NewEvent(EventWorkerError, "http", "foo"))
+
+ select {
+ case <-ch:
+ require.Fail(t, "should not read any events")
+ default:
+ return
+ }
+}
+
+func TestEvenHandler10(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "http.EventHTTPError", ch)
+ require.NoError(t, err)
+ err = eh.SubscribeP(id, "http.Foo", ch)
+ require.NoError(t, err)
+ err = eh.SubscribeP(id, "http.Foo2", ch)
+ require.NoError(t, err)
+ err = eh.SubscribeP(id, "http.Foo2", ch)
+ require.NoError(t, err)
+
+ eh.Send(NewEvent(EventHTTPError, "http", "foo"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventHTTPError", evt.Type().String())
+
+ eh.Unsubscribe(id)
}
diff --git a/events/eventsbus.go b/events/eventsbus.go
index cd0dca71..a2a2c859 100644
--- a/events/eventsbus.go
+++ b/events/eventsbus.go
@@ -130,7 +130,7 @@ func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) er
return nil
}
- subArr := make([]*sub, 0, 5)
+ subArr := make([]*sub, 0, 1)
subArr = append(subArr, &sub{
pattern: pattern,
w: w,
diff --git a/state/job/state.go b/state/job/state.go
deleted file mode 100644
index 56050084..00000000
--- a/state/job/state.go
+++ /dev/null
@@ -1,19 +0,0 @@
-package job
-
-// State represents job's state
-type State struct {
- // Pipeline name
- Pipeline string
- // Driver name
- Driver string
- // Queue name (tube for the beanstalk)
- Queue string
- // Active jobs which are consumed from the driver but not handled by the PHP worker yet
- Active int64
- // Delayed jobs
- Delayed int64
- // Reserved jobs which are in the driver but not consumed yet
- Reserved int64
- // Status - 1 Ready, 0 - Paused
- Ready bool
-}
diff --git a/state/process/state.go b/state/process/state.go
index f88f8b03..d49d526f 100644
--- a/state/process/state.go
+++ b/state/process/state.go
@@ -54,23 +54,3 @@ func WorkerProcessState(w worker.BaseProcess) (*State, error) {
MemoryUsage: i.RSS,
}, nil
}
-
-func GeneralProcessState(pid int, command string) (State, error) {
- const op = errors.Op("process_state")
- p, _ := process.NewProcess(int32(pid))
- i, err := p.MemoryInfo()
- if err != nil {
- return State{}, errors.E(op, err)
- }
- percent, err := p.CPUPercent()
- if err != nil {
- return State{}, err
- }
-
- return State{
- CPUPercent: percent,
- Pid: pid,
- MemoryUsage: i.RSS,
- Command: command,
- }, nil
-}