grpc

what

gprc 是一种通信协议 ,使用protocol buffers作为结构数据序列化机制,通信协议格式

分成client server

  • client

就像本地对象一样,可以远程调用不同机器server上的方法,轻松地创建分布式应用程序和服务

  • server

服务器实现此接口并运行gRPC服务器来处理客户端调用

install

1
2
3
brew install protobuf

go get -u github.com/golang/protobuf/protoc-gen-go

how

  • 定义*.proto消息结构体,服务
  • protoc --go_out=plugins=grpc:. *.proto 生成对应语言的协议接口 ( client server接口 && 消息的请求响应序列化 )通过 protocol buffer 的编译器 protoc 以及一个特殊的 gRPC Go 插件来完成

ssl

gen ssl 证书

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func main() {
	b, _ := ioutil.ReadFile("server.crt")

	cp := x509.NewCertPool()
	cp.AppendCertsFromPEM(b)
	creds := credentials.NewTLS(&tls.Config{RootCAs: cp, InsecureSkipVerify: true})

	conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	c := NewInternalClient(conn)
	data := []*SensorExpire{
		{
			DeviceToken: []byte("fadfadftetwtwtwt"),
			ExpireTime:  uint32(1232312313),
		},
	}
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := c.UpdateSensorExpireTime(ctx, &SensorExpireTimeRequest{
		SensorExpire: data,
	})
	if err != nil {
		fmt.Print(err.Error())
	}

	log.Print(r.GetCode())

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
credentials = grpc.ssl_channel_credentials(
    root_certificates=load_cert.ROOT_CERTIFICATE
)


def update_sensor_expire(license_data: typing.List):
    if not license_data:
        return
    cert_cn = "localhost"  # domain name CN replace
    options = (('grpc.ssl_target_name_override', cert_cn,),)
    with grpc.secure_channel('localhost:9102', credentials, options) as channel:
        stub = sensor_pb2_grpc.InternalStub(channel)

        request = sensor_pb2.SensorExpireTimeRequest()
        for l in license_data:
            request.sensorExpire.append(sensor_pb2.SensorExpire(
                deviceToken=l['device_token'], expireTime=l['expire_time']
            ))
        assert len(request.sensorExpire) == len(license_data)
        stub.UpdateSensorExpireTime(request)


data = [
    {
        'device_token': "fadsfadfadfasfdtest".encode(),
        'expire_time': 1580962440,
    }
]
update_sensor_expire(data)

metadata header 过滤器

grpc获取nginx header的value,使用metadata从context获取

1
2
3
4
5
6
7
8
9
10
11
12
13
server {
        listen port1 ssl http2;
        client_max_body_size 50M;

        ssl_certificate     /etc/nginx/ssl/tracer_server_cert;
        ssl_certificate_key /etc/nginx/ssl/tracer_server_key;

        location / {
                grpc_pass grpcs://127.0.0.1:port2;
                grpc_set_header X-Real-IP $remote_addr;
        }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func NewServer(conf config.RPCConfigItemTemplate) (server *Server, err error) {
	server = &Server{
		config: conf,
	}

	creds, err := credentials.NewServerTLSFromFile(conf.Cert, conf.Key)
	if err != nil {
		return
	}
	// UnaryInterceptor 一元拦截器   StreamInterceptor  流拦截器
	server.instance = grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(authHeaderInterceptor), grpc.StreamInterceptor(streamAuthServerInterceptor))
	tracerproto.RegisterTracerServiceServer(server.instance, server)

	return
}

拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func authHeaderInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
	if !checkInnerIP(ctx) {
		log.Warnf("%v %v", getRealAddr(ctx), info.FullMethod)
		return resp, errors.New(InnerMachineError)
	}
	resp, err = handler(ctx, req)
	return
}


func streamAuthServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
	if !checkInnerIP(ss.Context()) {
		log.Warnf("%v %v", getRealAddr(ss.Context()), info.FullMethod)
		return errors.New(InnerMachineError)
	}
	err = handler(srv, ss)
	return err
}

// get header
func getRealAddr(ctx context.Context) string {
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return ""
	}
	// from nginx
	rips := md.Get("x-real-ip")
	if len(rips) == 0 {
		return ""
	}

	return rips[0]
}