diff options
author | Valery Piashchynski <[email protected]> | 2021-05-03 22:52:30 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-03 22:52:30 +0300 |
commit | 9ee78f937d5be67058882dd3590f89da35bca239 (patch) | |
tree | 17cda27feabf5f2b8afc6a2796117835045afd36 /plugins/broadcast/root/service.go | |
parent | 009b7009885d8a15e6fa6c7e78436087b2f20129 (diff) |
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast/root/service.go')
-rw-r--r-- | plugins/broadcast/root/service.go | 85 |
1 files changed, 85 insertions, 0 deletions
diff --git a/plugins/broadcast/root/service.go b/plugins/broadcast/root/service.go new file mode 100644 index 00000000..8b175b3e --- /dev/null +++ b/plugins/broadcast/root/service.go @@ -0,0 +1,85 @@ +package broadcast + +import ( + "errors" + "sync" + + "github.com/spiral/roadrunner/service/rpc" +) + +// ID defines public service name. +const ID = "broadcast" + +// Service manages even broadcasting and websocket interface. +type Service struct { + // service and broker configuration + cfg *Config + + // broker + mu sync.Mutex + broker Broker +} + +// Init service. +func (s *Service) Init(cfg *Config, rpc *rpc.Service) (ok bool, err error) { + s.cfg = cfg + + if rpc != nil { + if err := rpc.Register(ID, &rpcService{svc: s}); err != nil { + return false, err + } + } + + s.mu.Lock() + if s.cfg.Redis != nil { + if s.broker, err = redisBroker(s.cfg.Redis); err != nil { + return false, err + } + } else { + s.broker = memoryBroker() + } + s.mu.Unlock() + + return true, nil +} + +// Serve broadcast broker. +func (s *Service) Serve() (err error) { + return s.broker.Serve() +} + +// Stop closes broadcast broker. +func (s *Service) Stop() { + broker := s.Broker() + if broker != nil { + broker.Stop() + } +} + +// Broker returns associated broker. +func (s *Service) Broker() Broker { + s.mu.Lock() + defer s.mu.Unlock() + + return s.broker +} + +// NewClient returns single connected client with ability to consume or produce into associated topic(svc). +func (s *Service) NewClient() *Client { + return &Client{ + upstream: make(chan *Message), + broker: s.Broker(), + topics: make([]string, 0), + patterns: make([]string, 0), + } +} + +// Publish one or multiple Channel. +func (s *Service) Publish(msg ...*Message) error { + broker := s.Broker() + if broker == nil { + return errors.New("no stopped broker") + } + + return s.Broker().Publish(msg...) +} |