diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 21:46:50 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-16 21:46:50 +0300 |
commit | 3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch) | |
tree | e723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /plugins/grpc/proxy | |
parent | 337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff) | |
parent | 823d831b57b75f70c7c3bbbee355f2016633bb3b (diff) |
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'plugins/grpc/proxy')
-rw-r--r-- | plugins/grpc/proxy/proxy.go | 219 | ||||
-rw-r--r-- | plugins/grpc/proxy/proxy_test.go | 134 |
2 files changed, 0 insertions, 353 deletions
diff --git a/plugins/grpc/proxy/proxy.go b/plugins/grpc/proxy/proxy.go deleted file mode 100644 index 074aac85..00000000 --- a/plugins/grpc/proxy/proxy.go +++ /dev/null @@ -1,219 +0,0 @@ -package proxy - -import ( - "encoding/json" - "fmt" - "strconv" - "strings" - "sync" - - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/grpc/codec" - "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/peer" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" -) - -const ( - peerAddr string = ":peer.address" - peerAuthType string = ":peer.auth-type" - delimiter string = "|:|" -) - -// base interface for Proxy class -type proxyService interface { - // RegisterMethod registers new RPC method. - RegisterMethod(method string) - - // ServiceDesc returns service description for the proxy. - ServiceDesc() *grpc.ServiceDesc -} - -// carry details about service, method and RPC context to PHP process -type rpcContext struct { - Service string `json:"service"` - Method string `json:"method"` - Context map[string][]string `json:"context"` -} - -// Proxy manages GRPC/RoadRunner bridge. -type Proxy struct { - mu *sync.RWMutex - grpcPool pool.Pool - name string - metadata string - methods []string -} - -// NewProxy creates new service proxy object. -func NewProxy(name string, metadata string, grpcPool pool.Pool, mu *sync.RWMutex) *Proxy { - return &Proxy{ - mu: mu, - grpcPool: grpcPool, - name: name, - metadata: metadata, - methods: make([]string, 0), - } -} - -// RegisterMethod registers new RPC method. -func (p *Proxy) RegisterMethod(method string) { - p.methods = append(p.methods, method) -} - -// ServiceDesc returns service description for the proxy. -func (p *Proxy) ServiceDesc() *grpc.ServiceDesc { - desc := &grpc.ServiceDesc{ - ServiceName: p.name, - Metadata: p.metadata, - HandlerType: (*proxyService)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{}, - } - - // Registering methods - for _, m := range p.methods { - desc.Methods = append(desc.Methods, grpc.MethodDesc{ - MethodName: m, - Handler: p.methodHandler(m), - }) - } - - return desc -} - -// Generate method handler proxy. -func (p *Proxy) methodHandler(method string) func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - return func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := codec.RawMessage{} - if err := dec(&in); err != nil { - return nil, wrapError(err) - } - - if interceptor == nil { - return p.invoke(ctx, method, in) - } - - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: fmt.Sprintf("/%s/%s", p.name, method), - } - - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return p.invoke(ctx, method, req.(codec.RawMessage)) - } - - return interceptor(ctx, in, info, handler) - } -} - -func (p *Proxy) invoke(ctx context.Context, method string, in codec.RawMessage) (interface{}, error) { - payload, err := p.makePayload(ctx, method, in) - if err != nil { - return nil, err - } - - p.mu.RLock() - resp, err := p.grpcPool.Exec(payload) - p.mu.RUnlock() - - if err != nil { - return nil, wrapError(err) - } - - md, err := p.responseMetadata(resp) - if err != nil { - return nil, err - } - ctx = metadata.NewIncomingContext(ctx, md) - err = grpc.SetHeader(ctx, md) - if err != nil { - return nil, err - } - - return codec.RawMessage(resp.Body), nil -} - -// responseMetadata extracts metadata from roadrunner response Payload.Context and converts it to metadata.MD -func (p *Proxy) responseMetadata(resp *payload.Payload) (metadata.MD, error) { - var md metadata.MD - if resp == nil || len(resp.Context) == 0 { - return md, nil - } - - var rpcMetadata map[string]string - err := json.Unmarshal(resp.Context, &rpcMetadata) - if err != nil { - return md, err - } - - if len(rpcMetadata) > 0 { - md = metadata.New(rpcMetadata) - } - - return md, nil -} - -// makePayload generates RoadRunner compatible payload based on GRPC message. todo: return error -func (p *Proxy) makePayload(ctx context.Context, method string, body codec.RawMessage) (*payload.Payload, error) { - ctxMD := make(map[string][]string) - - if md, ok := metadata.FromIncomingContext(ctx); ok { - for k, v := range md { - ctxMD[k] = v - } - } - - if pr, ok := peer.FromContext(ctx); ok { - ctxMD[peerAddr] = []string{pr.Addr.String()} - if pr.AuthInfo != nil { - ctxMD[peerAuthType] = []string{pr.AuthInfo.AuthType()} - } - } - - ctxData, err := json.Marshal(rpcContext{Service: p.name, Method: method, Context: ctxMD}) - - if err != nil { - return nil, err - } - - return &payload.Payload{Context: ctxData, Body: body}, nil -} - -// mounts proper error code for the error -func wrapError(err error) error { - // internal agreement - if strings.Contains(err.Error(), delimiter) { - chunks := strings.Split(err.Error(), delimiter) - code := codes.Internal - - // protect the slice access - if len(chunks) < 2 { - return err - } - - if phpCode, errConv := strconv.ParseUint(chunks[0], 10, 32); errConv == nil { - code = codes.Code(phpCode) - } - - st := status.New(code, chunks[1]).Proto() - - for _, detailsMessage := range chunks[2:] { - anyDetailsMessage := anypb.Any{} - errP := proto.Unmarshal([]byte(detailsMessage), &anyDetailsMessage) - if errP == nil { - st.Details = append(st.Details, &anyDetailsMessage) - } - } - - return status.ErrorProto(st) - } - - return status.Error(codes.Internal, err.Error()) -} diff --git a/plugins/grpc/proxy/proxy_test.go b/plugins/grpc/proxy/proxy_test.go deleted file mode 100644 index 2c024ee3..00000000 --- a/plugins/grpc/proxy/proxy_test.go +++ /dev/null @@ -1,134 +0,0 @@ -package proxy - -// import ( -// "testing" -// "time" - -// "github.com/sirupsen/logrus" -// "github.com/sirupsen/logrus/hooks/test" -// "github.com/stretchr/testify/assert" -// "golang.org/x/net/context" -// "google.golang.org/grpc" -// "google.golang.org/grpc/codes" -// "google.golang.org/grpc/metadata" -// "google.golang.org/grpc/status" -// ) - -// const addr = "localhost:9080" - -// func Test_Proxy_Error(t *testing.T) { -// logger, _ := test.NewNullLogger() -// logger.SetLevel(logrus.DebugLevel) - -// c := service.NewContainer(logger) -// c.Register(ID, &Service{}) - -// assert.NoError(t, c.Init(&testCfg{ -// grpcCfg: `{ -// "listen": "tcp://:9080", -// "tls": { -// "key": "tests/server.key", -// "cert": "tests/server.crt" -// }, -// "proto": "tests/test.proto", -// "workers":{ -// "command": "php tests/worker.php", -// "relay": "pipes", -// "pool": { -// "numWorkers": 1, -// "allocateTimeout": 10, -// "destroyTimeout": 10 -// } -// } -// }`, -// })) - -// s, st := c.Get(ID) -// assert.NotNil(t, s) -// assert.Equal(t, service.StatusOK, st) - -// // should do nothing -// s.(*Service).Stop() - -// go func() { assert.NoError(t, c.Serve()) }() -// time.Sleep(time.Millisecond * 100) -// defer c.Stop() - -// cl, cn := getClient(addr) -// defer cn.Close() - -// _, err := cl.Throw(context.Background(), &tests.Message{Msg: "notFound"}) - -// assert.Error(t, err) -// se, _ := status.FromError(err) -// assert.Equal(t, "nothing here", se.Message()) -// assert.Equal(t, codes.NotFound, se.Code()) - -// _, errWithDetails := cl.Throw(context.Background(), &tests.Message{Msg: "withDetails"}) - -// assert.Error(t, errWithDetails) -// statusWithDetails, _ := status.FromError(errWithDetails) -// assert.Equal(t, "main exception message", statusWithDetails.Message()) -// assert.Equal(t, codes.InvalidArgument, statusWithDetails.Code()) - -// details := statusWithDetails.Details() - -// detailsMessageForException := details[0].(*tests.DetailsMessageForException) - -// assert.Equal(t, detailsMessageForException.Code, uint64(1)) -// assert.Equal(t, detailsMessageForException.Message, "details message") -// } - -// func Test_Proxy_Metadata(t *testing.T) { -// logger, _ := test.NewNullLogger() -// logger.SetLevel(logrus.DebugLevel) - -// c := service.NewContainer(logger) -// c.Register(ID, &Service{}) - -// assert.NoError(t, c.Init(&testCfg{ -// grpcCfg: `{ -// "listen": "tcp://:9080", -// "tls": { -// "key": "tests/server.key", -// "cert": "tests/server.crt" -// }, -// "proto": "tests/test.proto", -// "workers":{ -// "command": "php tests/worker.php", -// "relay": "pipes", -// "pool": { -// "numWorkers": 1, -// "allocateTimeout": 10, -// "destroyTimeout": 10 -// } -// } -// }`, -// })) - -// s, st := c.Get(ID) -// assert.NotNil(t, s) -// assert.Equal(t, service.StatusOK, st) - -// // should do nothing -// s.(*Service).Stop() - -// go func() { assert.NoError(t, c.Serve()) }() -// time.Sleep(time.Millisecond * 100) -// defer c.Stop() - -// cl, cn := getClient(addr) -// defer cn.Close() - -// ctx := metadata.AppendToOutgoingContext(context.Background(), "key", "proxy-value") -// var header metadata.MD -// out, err := cl.Info( -// ctx, -// &tests.Message{Msg: "MD"}, -// grpc.Header(&header), -// grpc.WaitForReady(true), -// ) -// assert.Equal(t, []string{"bar"}, header.Get("foo")) -// assert.NoError(t, err) -// assert.Equal(t, `["proxy-value"]`, out.Msg) -// } |