package roadrunner import ( "sync" "os/exec" "fmt" "errors" ) const ( // EventNewPool triggered when server creates new pool. EventStart = iota + 128 // EventNewPool triggered when server creates new pool. EventStop // EventNewPool triggered when server creates new pool. EventNewPool // EventDestroyPool triggered when server destroys existed pool. EventDestroyPool ) // Service manages pool creation and swapping. type Server struct { // configures server, pool, cmd creation and factory. cfg *ServerConfig // observes pool events (can be attached to multiple pools at the same time) observer func(event int, ctx interface{}) // protects pool while the re-configuration mu sync.Mutex // indicates that server was started started bool // worker command creator cmd func() *exec.Cmd // creates and connects to workers factory Factory // currently active pool instance pool Pool } // NewServer creates new router. Make sure to call configure before the usage. func NewServer(cfg *ServerConfig, o func(event int, ctx interface{})) *Server { return &Server{ cfg: cfg, observer: o, } } // Reconfigure re-configures underlying pool and destroys it's previous version if any. func (srv *Server) Reconfigure(cfg *ServerConfig) error { srv.mu.Lock() if !srv.started { srv.cfg = cfg return nil } srv.mu.Unlock() // we are not allowing factory or cmd changes while the server is running. if srv.cfg.Differs(cfg) { return errors.New("config change while running server (only pool config change is allowed)") } srv.mu.Lock() previous := srv.pool srv.mu.Unlock() pool, err := NewPool(srv.cmd, srv.factory, *cfg.Pool) if err != nil { return err } srv.throw(EventNewPool, pool) srv.mu.Lock() srv.cfg, srv.pool = cfg, pool srv.pool.Observe(srv.poolObserver) srv.mu.Unlock() if previous != nil { go func(previous Pool) { srv.throw(EventDestroyPool, previous) previous.Destroy() }(previous) } return nil } // Reset resets the state of underlying pool and rebuilds all of it's workers. func (srv *Server) Reset() error { return srv.Reconfigure(srv.cfg) } // Start underlying worker pool, configure factory and command provider. func (srv *Server) Start() (err error) { srv.mu.Lock() defer srv.mu.Unlock() if srv.cmd, err = srv.cfg.makeCommand(); err != nil { return err } if srv.factory, err = srv.cfg.makeFactory(); err != nil { return err } if srv.pool, err = NewPool(srv.cmd, srv.factory, *srv.cfg.Pool); err != nil { return err } srv.pool.Observe(srv.poolObserver) srv.started = true return nil } // Stop underlying worker pool and close the factory. func (srv *Server) Stop() error { srv.mu.Lock() defer srv.mu.Unlock() if !srv.started { return nil } srv.throw(EventDestroyPool, srv.pool) srv.pool.Destroy() srv.factory.Close() srv.cmd = nil srv.factory = nil srv.pool = nil srv.started = false return nil } // Exec one task with given payload and context, returns result or error. func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) { pool := srv.Pool() if pool == nil { return nil, fmt.Errorf("no associared pool") } return pool.Exec(rqs) } // Workers returns worker list associated with the server pool. func (srv *Server) Workers() (workers []*Worker) { p := srv.Pool() if p == nil { return nil } return p.Workers() } // Pool returns active pool or error. func (srv *Server) Pool() Pool { srv.mu.Lock() defer srv.mu.Unlock() return srv.pool } // Observe pool events. func (srv *Server) poolObserver(event int, ctx interface{}) { // bypassing to user specified observer srv.throw(event, ctx) if event == EventPoolError { // pool failure, rebuilding srv.Reset() } } // throw invokes event handler if any. func (srv *Server) throw(event int, ctx interface{}) { if srv.observer != nil { srv.observer(event, ctx) } }