1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
package grpc
import (
"context"
"crypto/tls"
"crypto/x509"
"os"
"github.com/spiral/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
func (p *Plugin) createGRPCserver() (*grpc.Server, error) {
const op = errors.Op("grpc_plugin_create_server")
opts, err := p.serverOptions()
if err != nil {
return nil, errors.E(op, err)
}
server := grpc.NewServer(opts...)
/*
proto descriptors parser
*/
return server, nil
}
func (p *Plugin) interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// start := time.Now()
resp, err = handler(ctx, req)
// svc.throw(EventUnaryCall, &UnaryCallEvent{
// Info: info,
// Context: ctx,
// Error: err,
// start: start,
// elapsed: time.Since(start),
// })
return resp, err
}
func (p *Plugin) serverOptions() ([]grpc.ServerOption, error) {
const op = errors.Op("grpc_plugin_server_options")
var tcreds credentials.TransportCredentials
var opts []grpc.ServerOption
var cert tls.Certificate
var certPool *x509.CertPool
var rca []byte
var err error
if p.config.EnableTLS() {
// if client CA is not empty we combine it with Cert and Key
if p.config.TLS.RootCA != "" {
cert, err = tls.LoadX509KeyPair(p.config.TLS.Cert, p.config.TLS.Key)
if err != nil {
return nil, err
}
certPool, err = x509.SystemCertPool()
if err != nil {
return nil, err
}
if certPool == nil {
certPool = x509.NewCertPool()
}
rca, err = os.ReadFile(p.config.TLS.RootCA)
if err != nil {
return nil, err
}
if ok := certPool.AppendCertsFromPEM(rca); !ok {
return nil, errors.E(op, errors.Str("could not append Certs from PEM"))
}
tcreds = credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{cert},
ClientCAs: certPool,
})
} else {
tcreds, err = credentials.NewServerTLSFromFile(p.config.TLS.Cert, p.config.TLS.Key)
if err != nil {
return nil, err
}
}
serverOptions := []grpc.ServerOption{
grpc.MaxSendMsgSize(int(p.config.MaxSendMsgSize)),
grpc.MaxRecvMsgSize(int(p.config.MaxRecvMsgSize)),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: p.config.MaxConnectionIdle,
MaxConnectionAge: p.config.MaxConnectionAge,
MaxConnectionAgeGrace: p.config.MaxConnectionAge,
Time: p.config.PingTime,
Timeout: p.config.Timeout,
}),
grpc.MaxConcurrentStreams(uint32(p.config.MaxConcurrentStreams)),
}
opts = append(opts, grpc.Creds(tcreds))
opts = append(opts, serverOptions...)
}
opts = append(opts, p.opts...)
// custom codec is required to bypass protobuf, common interceptor used for debug and stats
return append(
opts,
grpc.UnaryInterceptor(p.interceptor),
// grpc.CustomCodec(&codec{encoding.GetCodec(encCodec)}),
), nil
}
|