From 4b8400b4d5041bd6fd06715d9d8f00464f8b8424 Mon Sep 17 00:00:00 2001 From: Seb Date: Thu, 14 Jul 2022 13:15:01 +0200 Subject: Make RR embeddable in other programs --- internal/cli/serve/command.go | 62 +++++-------------------- roadrunner/roadrunner.go | 102 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 51 deletions(-) create mode 100644 roadrunner/roadrunner.go diff --git a/internal/cli/serve/command.go b/internal/cli/serve/command.go index 73b54c3c..07e346b2 100644 --- a/internal/cli/serve/command.go +++ b/internal/cli/serve/command.go @@ -6,10 +6,8 @@ import ( "os/signal" "syscall" - "github.com/roadrunner-server/roadrunner/v2/internal/container" - "github.com/roadrunner-server/roadrunner/v2/internal/meta" + "github.com/roadrunner-server/roadrunner/v2/roadrunner" - configImpl "github.com/roadrunner-server/config/v2" "github.com/roadrunner-server/errors" "github.com/spf13/cobra" ) @@ -25,53 +23,19 @@ func NewCommand(override *[]string, cfgFile *string, silent *bool) *cobra.Comman Short: "Start RoadRunner server", RunE: func(*cobra.Command, []string) error { const op = errors.Op("handle_serve_command") - // just to be safe - if cfgFile == nil { - return errors.E(op, errors.Str("no configuration file provided")) - } + rr, err := roadrunner.NewRR(*cfgFile, override, roadrunner.DefaultPluginsList()) - // create endure container config - containerCfg, err := container.NewConfig(*cfgFile) if err != nil { return errors.E(op, err) } - cfg := &configImpl.Plugin{ - Path: *cfgFile, - Prefix: rrPrefix, - Timeout: containerCfg.GracePeriod, - Flags: *override, - Version: meta.Version(), - } - - // create endure container - endureContainer, err := container.NewContainer(*containerCfg) - if err != nil { - return errors.E(op, err) - } - - // register config plugin - if err = endureContainer.Register(cfg); err != nil { - return errors.E(op, err) - } - - // register another container plugins - for i, plugins := 0, container.Plugins(); i < len(plugins); i++ { - if err = endureContainer.Register(plugins[i]); err != nil { - return errors.E(op, err) + errCh := make(chan error, 1) + go func() { + err = rr.Serve() + if err != nil { + errCh <- errors.E(op, err) } - } - - // init container and all services - if err = endureContainer.Init(); err != nil { - return errors.E(op, err) - } - - // start serving the graph - errCh, err := endureContainer.Serve() - if err != nil { - return errors.E(op, err) - } + }() oss, stop := make(chan os.Signal, 5), make(chan struct{}, 1) //nolint:gomnd signal.Notify(oss, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) @@ -89,18 +53,14 @@ func NewCommand(override *[]string, cfgFile *string, silent *bool) *cobra.Comman os.Exit(1) }() - if !*silent { - fmt.Printf("[INFO] RoadRunner server started; version: %s, buildtime: %s\n", meta.Version(), meta.BuildTime()) - } - for { select { case e := <-errCh: - return fmt.Errorf("error: %w\nplugin: %s", e.Error, e.VertexID) + return errors.E(op, e) case <-stop: // stop the container after first signal - fmt.Printf("stop signal received, grace timeout is: %0.f seconds\n", containerCfg.GracePeriod.Seconds()) + fmt.Printf("stop signal received\n") - if err = endureContainer.Stop(); err != nil { + if err = rr.Stop(); err != nil { return fmt.Errorf("error: %w", err) } diff --git a/roadrunner/roadrunner.go b/roadrunner/roadrunner.go new file mode 100644 index 00000000..34101782 --- /dev/null +++ b/roadrunner/roadrunner.go @@ -0,0 +1,102 @@ +package roadrunner + +import ( + "fmt" + + configImpl "github.com/roadrunner-server/config/v2" + endure "github.com/roadrunner-server/endure/pkg/container" + "github.com/roadrunner-server/errors" + "github.com/roadrunner-server/roadrunner/v2/internal/container" + "github.com/roadrunner-server/roadrunner/v2/internal/meta" +) + +const ( + rrPrefix string = "rr" +) + +type RR struct { + container *endure.Endure + Version string + BuildTime string +} + +// NewRR creates a new RR instance that can then be started or stopped by the caller +func NewRR(cfgFile string, override *[]string, pluginList []interface{}) (*RR, error) { + const op = errors.Op("new_rr") + // create endure container config + containerCfg, err := container.NewConfig(cfgFile) + if err != nil { + return nil, errors.E(op, err) + } + + cfg := &configImpl.Plugin{ + Path: cfgFile, + Prefix: rrPrefix, + Timeout: containerCfg.GracePeriod, + Flags: *override, + Version: meta.Version(), + } + + // create endure container + endureContainer, err := container.NewContainer(*containerCfg) + if err != nil { + return nil, errors.E(op, err) + } + + // register config plugin + if err = endureContainer.Register(cfg); err != nil { + return nil, errors.E(op, err) + } + + // register another container plugins + for i := 0; i < len(pluginList); i++ { + if err = endureContainer.Register(pluginList[i]); err != nil { + return nil, errors.E(op, err) + } + } + + // init container and all services + if err = endureContainer.Init(); err != nil { + return nil, errors.E(op, err) + } + + rr := &RR{ + container: endureContainer, + Version: meta.Version(), + BuildTime: meta.BuildTime(), + } + + return rr, nil +} + +// Serve starts RR and starts listening for requests. +// This is a blocking call that will return an error if / when one occurs in a plugin +func (rr *RR) Serve() error { + const op = errors.Op("rr.serve") + // start serving the graph + errCh, err := rr.container.Serve() + if err != nil { + return errors.E(op, err) + } + + for { + select { + case e := <-errCh: + rr.Stop() + return fmt.Errorf("error: %w\nplugin: %s", e.Error, e.VertexID) + } + } +} + +// Stop stops roadrunner +func (rr *RR) Stop() error { + if err := rr.container.Stop(); err != nil { + return fmt.Errorf("error: %w", err) + } + return nil +} + +// DefaultPluginsList returns all the plugins that RR can run with and are included by default +func DefaultPluginsList() []interface{} { + return container.Plugins() +} -- cgit v1.2.3