diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 21:24:13 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-16 21:24:13 +0300 |
commit | 337d292dd2d6ff0a555098b1970d8194d8df8bc2 (patch) | |
tree | a2ab31666f95813a592bea2b207f2db0ba188c92 /plugins/grpc/plugin.go | |
parent | 5d2cd55ab522d4f1e65a833f91146444465a32ac (diff) | |
parent | cc56349f3ad19aa54ae7900c50e018d757305804 (diff) |
[#783]: feat(grpc): update GRPC plugin to RR `v2`
[#783]: feat(grpc): update GRPC plugin to RR `v2`
Diffstat (limited to 'plugins/grpc/plugin.go')
-rw-r--r-- | plugins/grpc/plugin.go | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/plugins/grpc/plugin.go b/plugins/grpc/plugin.go new file mode 100644 index 00000000..7518d352 --- /dev/null +++ b/plugins/grpc/plugin.go @@ -0,0 +1,195 @@ +package grpc + +import ( + "context" + "sync" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/state/process" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/grpc/codec" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding" +) + +const ( + name string = "grpc" + RrGrpc string = "RR_GRPC" +) + +type Plugin struct { + mu *sync.RWMutex + config *Config + gPool pool.Pool + opts []grpc.ServerOption + services []func(server *grpc.Server) + server *grpc.Server + rrServer server.Server + + // events handler + events events.Handler + log logger.Logger +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { + const op = errors.Op("grpc_plugin_init") + + if !cfg.Has(name) { + return errors.E(errors.Disabled) + } + // register the codec + encoding.RegisterCodec(&codec.Codec{}) + + err := cfg.UnmarshalKey(name, &p.config) + if err != nil { + return errors.E(op, err) + } + + err = p.config.InitDefaults() + if err != nil { + return errors.E(op, err) + } + + p.opts = make([]grpc.ServerOption, 0) + p.services = make([]func(server *grpc.Server), 0) + p.events = events.NewEventsHandler() + p.events.AddListener(p.collectGRPCEvents) + p.rrServer = server + + // worker's GRPC mode + if p.config.Env == nil { + p.config.Env = make(map[string]string) + } + p.config.Env[RrGrpc] = "true" + + p.log = log + p.mu = &sync.RWMutex{} + + return nil +} + +func (p *Plugin) Serve() chan error { + const op = errors.Op("grpc_plugin_serve") + errCh := make(chan error, 1) + + var err error + p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{ + Debug: p.config.GrpcPool.Debug, + NumWorkers: p.config.GrpcPool.NumWorkers, + MaxJobs: p.config.GrpcPool.MaxJobs, + AllocateTimeout: p.config.GrpcPool.AllocateTimeout, + DestroyTimeout: p.config.GrpcPool.DestroyTimeout, + Supervisor: p.config.GrpcPool.Supervisor, + }, p.config.Env, p.collectGRPCEvents) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + go func() { + var err error + p.mu.Lock() + p.server, err = p.createGRPCserver() + if err != nil { + p.log.Error("create grpc server", "error", err) + errCh <- errors.E(op, err) + return + } + + l, err := utils.CreateListener(p.config.Listen) + if err != nil { + p.log.Error("create grpc listener", "error", err) + errCh <- errors.E(op, err) + } + + // protect serve + p.mu.Unlock() + err = p.server.Serve(l) + if err != nil { + // skip errors when stopping the server + if err == grpc.ErrServerStopped { + return + } + + p.log.Error("grpc server stopped", "error", err) + errCh <- errors.E(op, err) + return + } + }() + + return errCh +} + +func (p *Plugin) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.server != nil { + p.server.Stop() + } + return nil +} + +func (p *Plugin) Available() {} + +func (p *Plugin) Name() string { + return name +} + +func (p *Plugin) Reset() error { + p.mu.Lock() + defer p.mu.Unlock() + const op = errors.Op("grpc_plugin_reset") + + // destroy old pool + p.gPool.Destroy(context.Background()) + + var err error + p.gPool, err = p.rrServer.NewWorkerPool(context.Background(), &pool.Config{ + Debug: p.config.GrpcPool.Debug, + NumWorkers: p.config.GrpcPool.NumWorkers, + MaxJobs: p.config.GrpcPool.MaxJobs, + AllocateTimeout: p.config.GrpcPool.AllocateTimeout, + DestroyTimeout: p.config.GrpcPool.DestroyTimeout, + Supervisor: p.config.GrpcPool.Supervisor, + }, p.config.Env, p.collectGRPCEvents) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (p *Plugin) Workers() []*process.State { + p.mu.RLock() + defer p.mu.RUnlock() + + workers := p.gPool.Workers() + + ps := make([]*process.State, 0, len(workers)) + for i := 0; i < len(workers); i++ { + state, err := process.WorkerProcessState(workers[i]) + if err != nil { + return nil + } + ps = append(ps, state) + } + + return ps +} + +func (p *Plugin) collectGRPCEvents(event interface{}) { + if gev, ok := event.(events.GRPCEvent); ok { + switch gev.Event { + case events.EventUnaryCallOk: + p.log.Info("method called", "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed) + case events.EventUnaryCallErr: + p.log.Info("method call finished with error", "error", gev.Error, "method", gev.Info.FullMethod, "started", gev.Start, "elapsed", gev.Elapsed) + } + } +} |