summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-24 17:40:49 +0300
committerGitHub <[email protected]>2021-06-24 17:40:49 +0300
commite9249c7896331bab97a18a7ee0db17803fdd31fb (patch)
tree99512001f757eb88614acb9b20dada3200008a5d
parentce53a8e149b76f15e8a5dd88ac3b953798d57e8b (diff)
parent60001dbe15b5ff0fec32239ad18b3d308a4150b5 (diff)
#736 feat(kv): `clear` RPC method which completely cleans storagev2.3.1-beta.6
#736 feat(kv): `clear` RPC method which completely cleans storage
-rw-r--r--.github/workflows/windows.yml96
-rw-r--r--CHANGELOG.md1
-rw-r--r--README.md1
-rw-r--r--plugins/kv/drivers/boltdb/driver.go34
-rw-r--r--plugins/kv/drivers/memcached/driver.go10
-rw-r--r--plugins/kv/interface.go5
-rw-r--r--plugins/kv/rpc.go16
-rw-r--r--plugins/memory/kv.go16
-rw-r--r--plugins/redis/kv.go9
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go6
-rw-r--r--tests/plugins/kv/storage_plugin_test.go189
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go123
12 files changed, 345 insertions, 161 deletions
diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml
deleted file mode 100644
index f23f9b5d..00000000
--- a/.github/workflows/windows.yml
+++ /dev/null
@@ -1,96 +0,0 @@
-name: Windows
-
-on:
- push:
- branches:
- - master
- - beta
- - stable
- tags-ignore:
- - '**'
- paths-ignore:
- - '**.md'
- pull_request:
- paths-ignore:
- - '**.md'
-
-jobs:
- golang:
- name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}})
- runs-on: ${{ matrix.os }}
- timeout-minutes: 60
- strategy:
- fail-fast: true
- matrix:
- php: [ "8.0" ]
- go: [ "1.16" ]
- os: [ windows-latest ]
- steps:
- - name: Set up Go ${{ matrix.go }}
- uses: actions/setup-go@v2 # action page: <https://github.com/actions/setup-go>
- with:
- go-version: ${{ matrix.go }}
-
- - name: Set up PHP ${{ matrix.php }}
- uses: shivammathur/setup-php@v2 # action page: <https://github.com/shivammathur/setup-php>
- with:
- php-version: ${{ matrix.php }}
- extensions: sockets
-
- - name: Check out code
- uses: actions/checkout@v2
-
- - name: Get Composer Cache Directory
- id: composer-cache
- run: echo "::set-output name=dir::$(composer config cache-files-dir)"
-
- - name: Init Composer Cache # Docs: <https://git.io/JfAKn#php---composer>
- uses: actions/cache@v2
- with:
- path: ${{ steps.composer-cache.outputs.dir }}
- key: ${{ runner.os }}-composer-${{ matrix.php }}-${{ hashFiles('**/composer.json') }}
- restore-keys: ${{ runner.os }}-composer-
-
- - name: Install Composer dependencies
- run: cd tests && composer update --prefer-dist --no-progress --ansi
-
- - name: Init Go modules Cache # Docs: <https://git.io/JfAKn#go---modules>
- uses: actions/cache@v2
- with:
- path: ~/go/pkg/mod
- key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
- restore-keys: ${{ runner.os }}-go-
-
- - name: Install Go dependencies
- run: go mod download
-
- - name: Run golang tests on Windows
- run: |
- docker-compose -f ./tests/docker-compose.yaml up -d
- go test -v -race ./pkg/transport/pipe
- go test -v -race ./pkg/transport/socket
- go test -v -race ./pkg/pool
- go test -v -race ./pkg/worker
- go test -v -race ./pkg/bst
- go test -v -race ./pkg/worker_watcher
- go test -v -race ./plugins/http/config
- go test -v -race ./plugins/server
- go test -v -race ./tests/plugins/http
- go test -v -race ./tests/plugins/informer
- go test -v -race ./tests/plugins/reload
- go test -v -race ./tests/plugins/server
- go test -v -race ./tests/plugins/service
- go test -v -race ./tests/plugins/status
- go test -v -race ./tests/plugins/config
- go test -v -race ./tests/plugins/gzip
- go test -v -race ./tests/plugins/headers
- go test -v -race ./tests/plugins/logger
- go test -v -race ./tests/plugins/metrics
- go test -v -race ./tests/plugins/redis
- go test -v -race ./tests/plugins/resetter
- go test -v -race ./tests/plugins/rpc
- go test -v -race ./tests/plugins/kv
- go test -v -race ./tests/plugins/broadcast
- go test -v -race ./tests/plugins/websockets
- go test -v -race ./plugins/websockets
- docker-compose -f ./tests/docker-compose.yaml down
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1f8e1733..ea55a10d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,7 @@ v2.3.1 (_.06.2021)
## 👀 New:
- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc` folder. [PR](https://github.com/spiral/roadrunner/pull/732)
+- ✏️ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736)
## 🩹 Fixes:
diff --git a/README.md b/README.md
index 08580f86..ea100f24 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,6 @@
<a href="https://packagist.org/packages/spiral/roadrunner"><img src="https://poser.pugx.org/spiral/roadrunner/version"></a>
<a href="https://pkg.go.dev/github.com/spiral/roadrunner/v2?tab=doc"><img src="https://godoc.org/github.com/spiral/roadrunner/v2?status.svg"></a>
<a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/Linux/badge.svg" alt=""></a>
- <a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/Windows/badge.svg" alt=""></a>
<a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/Linters/badge.svg" alt=""></a>
<a href="https://goreportcard.com/report/github.com/spiral/roadrunner"><img src="https://goreportcard.com/badge/github.com/spiral/roadrunner"></a>
<a href="https://scrutinizer-ci.com/g/spiral/roadrunner/?branch=master"><img src="https://scrutinizer-ci.com/g/spiral/roadrunner/badges/quality-score.png"></a>
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 4b675271..47d37cc2 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -18,6 +18,7 @@ import (
)
type Driver struct {
+ clearMu sync.RWMutex
// db instance
DB *bolt.DB
// name should be UTF-8
@@ -373,6 +374,35 @@ func (d *Driver) TTL(keys ...string) (map[string]string, error) {
return m, nil
}
+func (d *Driver) Clear() error {
+ err := d.DB.Update(func(tx *bolt.Tx) error {
+ err := tx.DeleteBucket(d.bucket)
+ if err != nil {
+ d.log.Error("boltdb delete bucket", "error", err)
+ return err
+ }
+
+ _, err = tx.CreateBucket(d.bucket)
+ if err != nil {
+ d.log.Error("boltdb create bucket", "error", err)
+ return err
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ d.log.Error("clear transaction failed", "error", err)
+ return err
+ }
+
+ d.clearMu.Lock()
+ d.gc = sync.Map{}
+ d.clearMu.Unlock()
+
+ return nil
+}
+
// ========================= PRIVATE =================================
func (d *Driver) startGCLoop() { //nolint:gocognit
@@ -382,6 +412,8 @@ func (d *Driver) startGCLoop() { //nolint:gocognit
for {
select {
case <-t.C:
+ d.clearMu.RLock()
+
// calculate current time before loop started to be fair
now := time.Now()
d.gc.Range(func(key, value interface{}) bool {
@@ -414,6 +446,8 @@ func (d *Driver) startGCLoop() { //nolint:gocognit
}
return true
})
+
+ d.clearMu.RUnlock()
case <-d.stop:
err := d.DB.Close()
if err != nil {
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index a2787d72..14e7c078 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -237,3 +237,13 @@ func (d *Driver) Delete(keys ...string) error {
}
return nil
}
+
+func (d *Driver) Clear() error {
+ err := d.client.DeleteAll()
+ if err != nil {
+ d.log.Error("flush_all operation failed", "error", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index ffdbbe62..5736a6a7 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -22,9 +22,12 @@ type Storage interface {
MExpire(items ...*kvv1.Item) error
// TTL return the rest time to live for provided keys
- // Not supported for the memcached and boltdb
+ // Not supported for the memcached
TTL(keys ...string) (map[string]string, error)
+ // Clear clean the entire storage
+ Clear() error
+
// Delete one or multiple keys.
Delete(keys ...string) error
}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index af763600..3f7ba97c 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -161,3 +161,19 @@ func (r *rpc) Delete(in *kvv1.Request, _ *kvv1.Response) error {
return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
+
+// Clear clean the storage
+func (r *rpc) Clear(in *kvv1.Request, _ *kvv1.Response) error {
+ const op = errors.Op("rcp_delete")
+
+ if st, exists := r.storages[in.GetStorage()]; exists {
+ err := st.Clear()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
+}
diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go
index 1cf031d1..c13c2314 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/kv.go
@@ -13,7 +13,8 @@ import (
)
type Driver struct {
- heap sync.Map
+ clearMu sync.RWMutex
+ heap sync.Map
// stop is used to stop keys GC and close boltdb connection
stop chan struct{}
log logger.Logger
@@ -203,6 +204,14 @@ func (s *Driver) Delete(keys ...string) error {
return nil
}
+func (s *Driver) Clear() error {
+ s.clearMu.Lock()
+ s.heap = sync.Map{}
+ s.clearMu.Unlock()
+
+ return nil
+}
+
// ================================== PRIVATE ======================================
func (s *Driver) gc() {
@@ -213,6 +222,9 @@ func (s *Driver) gc() {
ticker.Stop()
return
case now := <-ticker.C:
+ // mutes needed to clear the map
+ s.clearMu.RLock()
+
// check every second
s.heap.Range(func(key, value interface{}) bool {
v := value.(*kvv1.Item)
@@ -231,6 +243,8 @@ func (s *Driver) gc() {
}
return true
})
+
+ s.clearMu.RUnlock()
}
}
}
diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go
index 320b7443..2e4b9bfd 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv.go
@@ -240,3 +240,12 @@ func (d *Driver) TTL(keys ...string) (map[string]string, error) {
}
return m, nil
}
+
+func (d *Driver) Clear() error {
+ fdb := d.universalClient.FlushDB(context.Background())
+ if fdb.Err() != nil {
+ return fdb.Err()
+ }
+
+ return nil
+}
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
index 2cd4b451..0ec813f3 100644
--- a/tests/plugins/broadcast/broadcast_plugin_test.go
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -205,7 +205,8 @@ func TestBroadcastSameSubscriber(t *testing.T) {
cfg,
&broadcast.Plugin{},
&rpcPlugin.Plugin{},
- mockLogger,
+ &logger.ZapLogger{},
+ // mockLogger,
&server.Plugin{},
&redis.Plugin{},
&websockets.Plugin{},
@@ -314,7 +315,8 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
cfg,
&broadcast.Plugin{},
&rpcPlugin.Plugin{},
- mockLogger,
+ &logger.ZapLogger{},
+ // mockLogger,
&server.Plugin{},
&redis.Plugin{},
&websockets.Plugin{},
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go
index 1e466e06..ced1c5fe 100644
--- a/tests/plugins/kv/storage_plugin_test.go
+++ b/tests/plugins/kv/storage_plugin_test.go
@@ -575,6 +575,53 @@ func testRPCMethods(t *testing.T) {
err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
assert.Len(t, ret.GetItems(), 0)
+
+ dataClear := &payload.Request{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: []byte("aa"),
+ },
+ {
+ Key: "b",
+ Value: []byte("bb"),
+ },
+ {
+ Key: "c",
+ Value: []byte("cc"),
+ },
+ {
+ Key: "d",
+ Value: []byte("dd"),
+ },
+ {
+ Key: "e",
+ Value: []byte("ee"),
+ },
+ },
+ }
+
+ clear := &payload.Request{Storage: "boltdb-rr"}
+
+ ret = &payload.Response{}
+ // Register 3 keys with values
+ err = client.Call("kv.Set", dataClear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 5) // should be 5
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Clear", clear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 0) // should be 5
}
func TestMemcached(t *testing.T) {
@@ -790,6 +837,54 @@ func testRPCMethodsMemcached(t *testing.T) {
err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
assert.Len(t, ret.GetItems(), 0)
+
+ dataClear := &payload.Request{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: []byte("aa"),
+ },
+ {
+ Key: "b",
+ Value: []byte("bb"),
+ },
+ {
+ Key: "c",
+ Value: []byte("cc"),
+ },
+ {
+ Key: "d",
+ Value: []byte("dd"),
+ },
+ {
+ Key: "e",
+ Value: []byte("ee"),
+ },
+ },
+ }
+
+ clear := &payload.Request{Storage: "memcached-rr"}
+
+ ret = &payload.Response{}
+ // Register 3 keys with values
+ err = client.Call("kv.Set", dataClear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 5) // should be 5
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Clear", clear, ret)
+ assert.NoError(t, err)
+
+ time.Sleep(time.Second * 2)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 0) // should be 5
}
func TestInMemory(t *testing.T) {
@@ -1004,6 +1099,53 @@ func testRPCMethodsInMemory(t *testing.T) {
err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
assert.Len(t, ret.GetItems(), 0)
+
+ dataClear := &payload.Request{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: []byte("aa"),
+ },
+ {
+ Key: "b",
+ Value: []byte("bb"),
+ },
+ {
+ Key: "c",
+ Value: []byte("cc"),
+ },
+ {
+ Key: "d",
+ Value: []byte("dd"),
+ },
+ {
+ Key: "e",
+ Value: []byte("ee"),
+ },
+ },
+ }
+
+ clear := &payload.Request{Storage: "memory-rr"}
+
+ ret = &payload.Response{}
+ // Register 3 keys with values
+ err = client.Call("kv.Set", dataClear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 5) // should be 5
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Clear", clear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 0) // should be 5
}
func TestRedis(t *testing.T) {
@@ -1354,4 +1496,51 @@ func testRPCMethodsRedis(t *testing.T) {
err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
assert.Len(t, ret.GetItems(), 0)
+
+ dataClear := &payload.Request{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: []byte("aa"),
+ },
+ {
+ Key: "b",
+ Value: []byte("bb"),
+ },
+ {
+ Key: "c",
+ Value: []byte("cc"),
+ },
+ {
+ Key: "d",
+ Value: []byte("dd"),
+ },
+ {
+ Key: "e",
+ Value: []byte("ee"),
+ },
+ },
+ }
+
+ clear := &payload.Request{Storage: "redis-rr"}
+
+ ret = &payload.Response{}
+ // Register 3 keys with values
+ err = client.Call("kv.Set", dataClear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 5) // should be 5
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Clear", clear, ret)
+ assert.NoError(t, err)
+
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", dataClear, ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 0) // should be 5
}
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 5ed0c3f3..53b6a572 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -645,7 +645,7 @@ func RPCWsPubAsync(port string) func(t *testing.T) {
return func(t *testing.T) {
da := websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 18,
+ HandshakeTimeout: time.Second * 20,
}
connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"}
@@ -654,9 +654,32 @@ func RPCWsPubAsync(port string) func(t *testing.T) {
assert.NoError(t, err)
defer func() {
- _ = resp.Body.Close()
+ if resp != nil && resp.Body != nil {
+ _ = resp.Body.Close()
+ }
+ }()
+
+ go func() {
+ messagesToVerify := make([]string, 0, 10)
+ messagesToVerify = append(messagesToVerify, `{"topic":"@join","payload":["foo","foo2"]}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"foo","payload":"hello, PHP"}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"@leave","payload":["foo"]}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"foo2","payload":"hello, PHP2"}`)
+ i := 0
+ for {
+ _, msg, err2 := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err2)
+ assert.Equal(t, messagesToVerify[i], retMsg)
+ i++
+ if i == 3 {
+ return
+ }
+ }
}()
+ time.Sleep(time.Second)
+
d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
@@ -665,20 +688,11 @@ func RPCWsPubAsync(port string) func(t *testing.T) {
err = c.WriteMessage(websocket.BinaryMessage, d)
assert.NoError(t, err)
- _, msg, err := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err)
+ time.Sleep(time.Second)
- // subscription done
- assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+ publishAsync(t, "foo")
- publishAsync(t, "placeholder", "foo")
-
- // VERIFY a makeMessage
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
+ time.Sleep(time.Second)
// //// LEAVE foo /////////
d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
@@ -689,27 +703,16 @@ func RPCWsPubAsync(port string) func(t *testing.T) {
err = c.WriteMessage(websocket.BinaryMessage, d)
assert.NoError(t, err)
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+ time.Sleep(time.Second)
// TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
- publishAsync(t, "placeholder", "foo")
+ publishAsync(t, "foo")
go func() {
- time.Sleep(time.Second * 3)
- publishAsync(t, "placeholder", "foo2")
+ time.Sleep(time.Second * 5)
+ publishAsync(t, "foo2")
}()
- // should be only makeMessage from the subscribed foo0 topic
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP\"}", retMsg)
-
err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
assert.NoError(t, err)
}
@@ -733,6 +736,27 @@ func RPCWsPub(port string) func(t *testing.T) {
}
}()
+ go func() {
+ messagesToVerify := make([]string, 0, 10)
+ messagesToVerify = append(messagesToVerify, `{"topic":"@join","payload":["foo","foo2"]}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"foo","payload":"hello, PHP"}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"@leave","payload":["foo"]}`)
+ messagesToVerify = append(messagesToVerify, `{"topic":"foo2","payload":"hello, PHP2"}`)
+ i := 0
+ for {
+ _, msg, err2 := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err2)
+ assert.Equal(t, messagesToVerify[i], retMsg)
+ i++
+ if i == 3 {
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second)
+
d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
@@ -741,20 +765,11 @@ func RPCWsPub(port string) func(t *testing.T) {
err = c.WriteMessage(websocket.BinaryMessage, d)
assert.NoError(t, err)
- _, msg, err := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+ time.Sleep(time.Second)
publish("", "foo")
- // VERIFY a makeMessage
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
+ time.Sleep(time.Second)
// //// LEAVE foo /////////
d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
@@ -765,12 +780,7 @@ func RPCWsPub(port string) func(t *testing.T) {
err = c.WriteMessage(websocket.BinaryMessage, d)
assert.NoError(t, err)
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+ time.Sleep(time.Second)
// TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
publish("", "foo")
@@ -780,12 +790,6 @@ func RPCWsPub(port string) func(t *testing.T) {
publish2(t, "", "foo2")
}()
- // should be only makeMessage from the subscribed foo2 topic
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg)
-
err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
assert.NoError(t, err)
}
@@ -849,7 +853,7 @@ func RPCWsDeny(port string) func(t *testing.T) {
// ---------------------------------------------------------------------------------------------------
-func publish(command string, topics ...string) {
+func publish(topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -858,13 +862,13 @@ func publish(command string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP"), topics...), ret)
+ err = client.Call("broadcast.Publish", makeMessage([]byte("hello, PHP"), topics...), ret)
if err != nil {
panic(err)
}
}
-func publishAsync(t *testing.T, command string, topics ...string) {
+func publishAsync(t *testing.T, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -873,12 +877,12 @@ func publishAsync(t *testing.T, command string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP"), topics...), ret)
+ err = client.Call("broadcast.PublishAsync", makeMessage([]byte("hello, PHP"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
-func publish2(t *testing.T, command string, topics ...string) {
+func publish2(t *testing.T, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -887,7 +891,7 @@ func publish2(t *testing.T, command string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP2"), topics...), ret)
+ err = client.Call("broadcast.Publish", makeMessage([]byte("hello, PHP2"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
@@ -900,12 +904,11 @@ func messageWS(command string, payload []byte, topics ...string) *websocketsv1.M
}
}
-func makeMessage(command string, payload []byte, topics ...string) *websocketsv1.Request {
+func makeMessage(payload []byte, topics ...string) *websocketsv1.Request {
m := &websocketsv1.Request{
Messages: []*websocketsv1.Message{
{
Topics: topics,
- Command: command,
Payload: payload,
},
},