summaryrefslogtreecommitdiff
path: root/pkg/pubsub
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-27 00:09:33 +0300
committerValery Piashchynski <[email protected]>2021-05-27 00:09:33 +0300
commitdc3c5455e5c9b32737a0620c8bdb8bda0226dba7 (patch)
tree6ba562da6de7f32a8d528b72cbb56a8bc98c1b30 /pkg/pubsub
parentd2e9d8320857f5768c54843a43ad16f59d6a3e8f (diff)
- Update all main abstractions
- Desighn a new interfaces responsible for the whole PubSub - New plugin - websockets Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/pubsub')
-rw-r--r--pkg/pubsub/interface.go38
-rw-r--r--pkg/pubsub/message.go64
2 files changed, 102 insertions, 0 deletions
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
new file mode 100644
index 00000000..80dab0c3
--- /dev/null
+++ b/pkg/pubsub/interface.go
@@ -0,0 +1,38 @@
+package pubsub
+
+// PubSub ...
+type PubSub interface {
+ Publisher
+ Subscriber
+ Reader
+}
+
+// Subscriber defines the ability to operate as message passing broker.
+type Subscriber interface {
+ // Subscribe broker to one or multiple topics.
+ Subscribe(topics ...string) error
+ // Unsubscribe from one or multiply topics
+ Unsubscribe(topics ...string) error
+}
+
+// Publisher publish one or more messages
+type Publisher interface {
+ // Publish one or multiple Channel.
+ Publish(messages []Message) error
+
+ // PublishAsync publish message and return immediately
+ // If error occurred it will be printed into the logger
+ PublishAsync(messages []Message)
+}
+
+// Reader interface should return next message
+type Reader interface {
+ Next() (Message, error)
+}
+
+type Message interface {
+ Command() string
+ Payload() []byte
+ Topics() []string
+ Broker() string
+}
diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go
new file mode 100644
index 00000000..ab74eb98
--- /dev/null
+++ b/pkg/pubsub/message.go
@@ -0,0 +1,64 @@
+package pubsub
+
+import (
+ json "github.com/json-iterator/go"
+)
+
+type Msg struct {
+ // Topic message been pushed into.
+ T []string `json:"topic"`
+
+ // Command (join, leave, headers)
+ C string `json:"command"`
+
+ // Broker (redis, memory)
+ B string `json:"broker"`
+
+ // Payload to be broadcasted
+ P []byte `json:"payload"`
+}
+
+//func (m Msg) UnmarshalBinary(data []byte) error {
+// //Use default gob decoder
+// reader := bytes.NewReader(data)
+// dec := gob.NewDecoder(reader)
+// if err := dec.Decode(&m); err != nil {
+// return err
+// }
+//
+// return nil
+//}
+
+func (m *Msg) MarshalBinary() ([]byte, error) {
+ //buf := new(bytes.Buffer)
+ //
+ //for i := 0; i < len(m.T); i++ {
+ // buf.WriteString(m.T[i])
+ //}
+ //
+ //buf.WriteString(m.C)
+ //buf.WriteString(m.B)
+ //buf.Write(m.P)
+
+ return json.Marshal(m)
+
+}
+
+// Payload in raw bytes
+func (m *Msg) Payload() []byte {
+ return m.P
+}
+
+// Command for the connection
+func (m *Msg) Command() string {
+ return m.C
+}
+
+// Topics to subscribe
+func (m *Msg) Topics() []string {
+ return m.T
+}
+
+func (m *Msg) Broker() string {
+ return m.B
+}