summaryrefslogtreecommitdiff
path: root/plugins/grpc/proxy
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-16 21:46:50 +0300
committerGitHub <[email protected]>2021-09-16 21:46:50 +0300
commit3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch)
treee723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /plugins/grpc/proxy
parent337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff)
parent823d831b57b75f70c7c3bbbee355f2016633bb3b (diff)
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'plugins/grpc/proxy')
-rw-r--r--plugins/grpc/proxy/proxy.go219
-rw-r--r--plugins/grpc/proxy/proxy_test.go134
2 files changed, 0 insertions, 353 deletions
diff --git a/plugins/grpc/proxy/proxy.go b/plugins/grpc/proxy/proxy.go
deleted file mode 100644
index 074aac85..00000000
--- a/plugins/grpc/proxy/proxy.go
+++ /dev/null
@@ -1,219 +0,0 @@
-package proxy
-
-import (
- "encoding/json"
- "fmt"
- "strconv"
- "strings"
- "sync"
-
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/plugins/grpc/codec"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/peer"
- "google.golang.org/grpc/status"
- "google.golang.org/protobuf/proto"
- "google.golang.org/protobuf/types/known/anypb"
-)
-
-const (
- peerAddr string = ":peer.address"
- peerAuthType string = ":peer.auth-type"
- delimiter string = "|:|"
-)
-
-// base interface for Proxy class
-type proxyService interface {
- // RegisterMethod registers new RPC method.
- RegisterMethod(method string)
-
- // ServiceDesc returns service description for the proxy.
- ServiceDesc() *grpc.ServiceDesc
-}
-
-// carry details about service, method and RPC context to PHP process
-type rpcContext struct {
- Service string `json:"service"`
- Method string `json:"method"`
- Context map[string][]string `json:"context"`
-}
-
-// Proxy manages GRPC/RoadRunner bridge.
-type Proxy struct {
- mu *sync.RWMutex
- grpcPool pool.Pool
- name string
- metadata string
- methods []string
-}
-
-// NewProxy creates new service proxy object.
-func NewProxy(name string, metadata string, grpcPool pool.Pool, mu *sync.RWMutex) *Proxy {
- return &Proxy{
- mu: mu,
- grpcPool: grpcPool,
- name: name,
- metadata: metadata,
- methods: make([]string, 0),
- }
-}
-
-// RegisterMethod registers new RPC method.
-func (p *Proxy) RegisterMethod(method string) {
- p.methods = append(p.methods, method)
-}
-
-// ServiceDesc returns service description for the proxy.
-func (p *Proxy) ServiceDesc() *grpc.ServiceDesc {
- desc := &grpc.ServiceDesc{
- ServiceName: p.name,
- Metadata: p.metadata,
- HandlerType: (*proxyService)(nil),
- Methods: []grpc.MethodDesc{},
- Streams: []grpc.StreamDesc{},
- }
-
- // Registering methods
- for _, m := range p.methods {
- desc.Methods = append(desc.Methods, grpc.MethodDesc{
- MethodName: m,
- Handler: p.methodHandler(m),
- })
- }
-
- return desc
-}
-
-// Generate method handler proxy.
-func (p *Proxy) methodHandler(method string) func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
- return func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
- in := codec.RawMessage{}
- if err := dec(&in); err != nil {
- return nil, wrapError(err)
- }
-
- if interceptor == nil {
- return p.invoke(ctx, method, in)
- }
-
- info := &grpc.UnaryServerInfo{
- Server: srv,
- FullMethod: fmt.Sprintf("/%s/%s", p.name, method),
- }
-
- handler := func(ctx context.Context, req interface{}) (interface{}, error) {
- return p.invoke(ctx, method, req.(codec.RawMessage))
- }
-
- return interceptor(ctx, in, info, handler)
- }
-}
-
-func (p *Proxy) invoke(ctx context.Context, method string, in codec.RawMessage) (interface{}, error) {
- payload, err := p.makePayload(ctx, method, in)
- if err != nil {
- return nil, err
- }
-
- p.mu.RLock()
- resp, err := p.grpcPool.Exec(payload)
- p.mu.RUnlock()
-
- if err != nil {
- return nil, wrapError(err)
- }
-
- md, err := p.responseMetadata(resp)
- if err != nil {
- return nil, err
- }
- ctx = metadata.NewIncomingContext(ctx, md)
- err = grpc.SetHeader(ctx, md)
- if err != nil {
- return nil, err
- }
-
- return codec.RawMessage(resp.Body), nil
-}
-
-// responseMetadata extracts metadata from roadrunner response Payload.Context and converts it to metadata.MD
-func (p *Proxy) responseMetadata(resp *payload.Payload) (metadata.MD, error) {
- var md metadata.MD
- if resp == nil || len(resp.Context) == 0 {
- return md, nil
- }
-
- var rpcMetadata map[string]string
- err := json.Unmarshal(resp.Context, &rpcMetadata)
- if err != nil {
- return md, err
- }
-
- if len(rpcMetadata) > 0 {
- md = metadata.New(rpcMetadata)
- }
-
- return md, nil
-}
-
-// makePayload generates RoadRunner compatible payload based on GRPC message. todo: return error
-func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMessage) (*payload.Payload, error) {
- ctxMD := make(map[string][]string)
-
- if md, ok := metadata.FromIncomingContext(ctx); ok {
- for k, v := range md {
- ctxMD[k] = v
- }
- }
-
- if pr, ok := peer.FromContext(ctx); ok {
- ctxMD[peerAddr] = []string{pr.Addr.String()}
- if pr.AuthInfo != nil {
- ctxMD[peerAuthType] = []string{pr.AuthInfo.AuthType()}
- }
- }
-
- ctxData, err := json.Marshal(rpcContext{Service: p.name, Method: method, Context: ctxMD})
-
- if err != nil {
- return nil, err
- }
-
- return &payload.Payload{Context: ctxData, Body: body}, nil
-}
-
-// mounts proper error code for the error
-func wrapError(err error) error {
- // internal agreement
- if strings.Contains(err.Error(), delimiter) {
- chunks := strings.Split(err.Error(), delimiter)
- code := codes.Internal
-
- // protect the slice access
- if len(chunks) < 2 {
- return err
- }
-
- if phpCode, errConv := strconv.ParseUint(chunks[0], 10, 32); errConv == nil {
- code = codes.Code(phpCode)
- }
-
- st := status.New(code, chunks[1]).Proto()
-
- for _, detailsMessage := range chunks[2:] {
- anyDetailsMessage := anypb.Any{}
- errP := proto.Unmarshal([]byte(detailsMessage), &anyDetailsMessage)
- if errP == nil {
- st.Details = append(st.Details, &anyDetailsMessage)
- }
- }
-
- return status.ErrorProto(st)
- }
-
- return status.Error(codes.Internal, err.Error())
-}
diff --git a/plugins/grpc/proxy/proxy_test.go b/plugins/grpc/proxy/proxy_test.go
deleted file mode 100644
index 2c024ee3..00000000
--- a/plugins/grpc/proxy/proxy_test.go
+++ /dev/null
@@ -1,134 +0,0 @@
-package proxy
-
-// import (
-// "testing"
-// "time"
-
-// "github.com/sirupsen/logrus"
-// "github.com/sirupsen/logrus/hooks/test"
-// "github.com/stretchr/testify/assert"
-// "golang.org/x/net/context"
-// "google.golang.org/grpc"
-// "google.golang.org/grpc/codes"
-// "google.golang.org/grpc/metadata"
-// "google.golang.org/grpc/status"
-// )
-
-// const addr = "localhost:9080"
-
-// func Test_Proxy_Error(t *testing.T) {
-// logger, _ := test.NewNullLogger()
-// logger.SetLevel(logrus.DebugLevel)
-
-// c := service.NewContainer(logger)
-// c.Register(ID, &Service{})
-
-// assert.NoError(t, c.Init(&testCfg{
-// grpcCfg: `{
-// "listen": "tcp://:9080",
-// "tls": {
-// "key": "tests/server.key",
-// "cert": "tests/server.crt"
-// },
-// "proto": "tests/test.proto",
-// "workers":{
-// "command": "php tests/worker.php",
-// "relay": "pipes",
-// "pool": {
-// "numWorkers": 1,
-// "allocateTimeout": 10,
-// "destroyTimeout": 10
-// }
-// }
-// }`,
-// }))
-
-// s, st := c.Get(ID)
-// assert.NotNil(t, s)
-// assert.Equal(t, service.StatusOK, st)
-
-// // should do nothing
-// s.(*Service).Stop()
-
-// go func() { assert.NoError(t, c.Serve()) }()
-// time.Sleep(time.Millisecond * 100)
-// defer c.Stop()
-
-// cl, cn := getClient(addr)
-// defer cn.Close()
-
-// _, err := cl.Throw(context.Background(), &tests.Message{Msg: "notFound"})
-
-// assert.Error(t, err)
-// se, _ := status.FromError(err)
-// assert.Equal(t, "nothing here", se.Message())
-// assert.Equal(t, codes.NotFound, se.Code())
-
-// _, errWithDetails := cl.Throw(context.Background(), &tests.Message{Msg: "withDetails"})
-
-// assert.Error(t, errWithDetails)
-// statusWithDetails, _ := status.FromError(errWithDetails)
-// assert.Equal(t, "main exception message", statusWithDetails.Message())
-// assert.Equal(t, codes.InvalidArgument, statusWithDetails.Code())
-
-// details := statusWithDetails.Details()
-
-// detailsMessageForException := details[0].(*tests.DetailsMessageForException)
-
-// assert.Equal(t, detailsMessageForException.Code, uint64(1))
-// assert.Equal(t, detailsMessageForException.Message, "details message")
-// }
-
-// func Test_Proxy_Metadata(t *testing.T) {
-// logger, _ := test.NewNullLogger()
-// logger.SetLevel(logrus.DebugLevel)
-
-// c := service.NewContainer(logger)
-// c.Register(ID, &Service{})
-
-// assert.NoError(t, c.Init(&testCfg{
-// grpcCfg: `{
-// "listen": "tcp://:9080",
-// "tls": {
-// "key": "tests/server.key",
-// "cert": "tests/server.crt"
-// },
-// "proto": "tests/test.proto",
-// "workers":{
-// "command": "php tests/worker.php",
-// "relay": "pipes",
-// "pool": {
-// "numWorkers": 1,
-// "allocateTimeout": 10,
-// "destroyTimeout": 10
-// }
-// }
-// }`,
-// }))
-
-// s, st := c.Get(ID)
-// assert.NotNil(t, s)
-// assert.Equal(t, service.StatusOK, st)
-
-// // should do nothing
-// s.(*Service).Stop()
-
-// go func() { assert.NoError(t, c.Serve()) }()
-// time.Sleep(time.Millisecond * 100)
-// defer c.Stop()
-
-// cl, cn := getClient(addr)
-// defer cn.Close()
-
-// ctx := metadata.AppendToOutgoingContext(context.Background(), "key", "proxy-value")
-// var header metadata.MD
-// out, err := cl.Info(
-// ctx,
-// &tests.Message{Msg: "MD"},
-// grpc.Header(&header),
-// grpc.WaitForReady(true),
-// )
-// assert.Equal(t, []string{"bar"}, header.Get("foo"))
-// assert.NoError(t, err)
-// assert.Equal(t, `["proxy-value"]`, out.Msg)
-// }