summaryrefslogtreecommitdiff
path: root/plugins/broadcast/root/service.go
blob: 8b175b3e0ff459e547e03f84f6632f71507521d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package broadcast

import (
	"errors"
	"sync"

	"github.com/spiral/roadrunner/service/rpc"
)

// ID defines public service name.
const ID = "broadcast"

// Service manages even broadcasting and websocket interface.
type Service struct {
	// service and broker configuration
	cfg *Config

	// broker
	mu     sync.Mutex
	broker Broker
}

// Init service.
func (s *Service) Init(cfg *Config, rpc *rpc.Service) (ok bool, err error) {
	s.cfg = cfg

	if rpc != nil {
		if err := rpc.Register(ID, &rpcService{svc: s}); err != nil {
			return false, err
		}
	}

	s.mu.Lock()
	if s.cfg.Redis != nil {
		if s.broker, err = redisBroker(s.cfg.Redis); err != nil {
			return false, err
		}
	} else {
		s.broker = memoryBroker()
	}
	s.mu.Unlock()

	return true, nil
}

// Serve broadcast broker.
func (s *Service) Serve() (err error) {
	return s.broker.Serve()
}

// Stop closes broadcast broker.
func (s *Service) Stop() {
	broker := s.Broker()
	if broker != nil {
		broker.Stop()
	}
}

// Broker returns associated broker.
func (s *Service) Broker() Broker {
	s.mu.Lock()
	defer s.mu.Unlock()

	return s.broker
}

// NewClient returns single connected client with ability to consume or produce into associated topic(svc).
func (s *Service) NewClient() *Client {
	return &Client{
		upstream: make(chan *Message),
		broker:   s.Broker(),
		topics:   make([]string, 0),
		patterns: make([]string, 0),
	}
}

// Publish one or multiple Channel.
func (s *Service) Publish(msg ...*Message) error {
	broker := s.Broker()
	if broker == nil {
		return errors.New("no stopped broker")
	}

	return s.Broker().Publish(msg...)
}