diff options
author | Valery Piashchynski <[email protected]> | 2021-05-27 00:09:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-27 00:09:33 +0300 |
commit | dc3c5455e5c9b32737a0620c8bdb8bda0226dba7 (patch) | |
tree | 6ba562da6de7f32a8d528b72cbb56a8bc98c1b30 /pkg | |
parent | d2e9d8320857f5768c54843a43ad16f59d6a3e8f (diff) |
- Update all main abstractions
- Desighn a new interfaces responsible for the whole PubSub
- New plugin - websockets
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/pool/interface.go | 2 | ||||
-rw-r--r-- | pkg/pubsub/interface.go | 38 | ||||
-rw-r--r-- | pkg/pubsub/message.go | 64 |
3 files changed, 103 insertions, 1 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index 4ef2f2e7..c22fbbd3 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -18,7 +18,7 @@ type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) - // Remove worker from the pool. + // RemoveWorker removes worker from the pool. RemoveWorker(worker worker.BaseProcess) error // Destroy all underlying stack (but let them to complete the task). diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go new file mode 100644 index 00000000..80dab0c3 --- /dev/null +++ b/pkg/pubsub/interface.go @@ -0,0 +1,38 @@ +package pubsub + +// PubSub ... +type PubSub interface { + Publisher + Subscriber + Reader +} + +// Subscriber defines the ability to operate as message passing broker. +type Subscriber interface { + // Subscribe broker to one or multiple topics. + Subscribe(topics ...string) error + // Unsubscribe from one or multiply topics + Unsubscribe(topics ...string) error +} + +// Publisher publish one or more messages +type Publisher interface { + // Publish one or multiple Channel. + Publish(messages []Message) error + + // PublishAsync publish message and return immediately + // If error occurred it will be printed into the logger + PublishAsync(messages []Message) +} + +// Reader interface should return next message +type Reader interface { + Next() (Message, error) +} + +type Message interface { + Command() string + Payload() []byte + Topics() []string + Broker() string +} diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go new file mode 100644 index 00000000..ab74eb98 --- /dev/null +++ b/pkg/pubsub/message.go @@ -0,0 +1,64 @@ +package pubsub + +import ( + json "github.com/json-iterator/go" +) + +type Msg struct { + // Topic message been pushed into. + T []string `json:"topic"` + + // Command (join, leave, headers) + C string `json:"command"` + + // Broker (redis, memory) + B string `json:"broker"` + + // Payload to be broadcasted + P []byte `json:"payload"` +} + +//func (m Msg) UnmarshalBinary(data []byte) error { +// //Use default gob decoder +// reader := bytes.NewReader(data) +// dec := gob.NewDecoder(reader) +// if err := dec.Decode(&m); err != nil { +// return err +// } +// +// return nil +//} + +func (m *Msg) MarshalBinary() ([]byte, error) { + //buf := new(bytes.Buffer) + // + //for i := 0; i < len(m.T); i++ { + // buf.WriteString(m.T[i]) + //} + // + //buf.WriteString(m.C) + //buf.WriteString(m.B) + //buf.Write(m.P) + + return json.Marshal(m) + +} + +// Payload in raw bytes +func (m *Msg) Payload() []byte { + return m.P +} + +// Command for the connection +func (m *Msg) Command() string { + return m.C +} + +// Topics to subscribe +func (m *Msg) Topics() []string { + return m.T +} + +func (m *Msg) Broker() string { + return m.B +} |