GRPC (2): 四种通信模式‍

作者: adm 分类: go 发布时间: 2023-09-25

上篇文章我们介绍了gRPC 的基本概念,今天实现一个完整的 gRPC 服务,包括 proto 文件的定义,客户端和服务端代码的生成以及业务逻辑代码的补充。

GRPC 四种通信模式‍
普通模式(unary RPC)

假如我们需要构建一个订单管理系统,这个系统为用户提供了订单查询的接口,每次用户输入订单号,便会返回对应的订单信息。每个请求独立,且响应和请求一一对应,这就是简单的 RPC 模式,对于大多数业务场景均可以适用。

service MysqlService {

rpc SelectRecord(MysqlReq) returns (MysqlRes) {};

}
服务端流模式(server-streaming RPC)

假如我们要创建一个订单的信息的缓存池,实现订单的高效查询,缓存服务启动的时候就需要请求服务端,同步服务端的订单信息。这种情况下,我们只需要请求一次,服务端便会持续的为我们提供同步订单信息,这就是服务端流 RPC 的场景。

和简单的 RPC 模式不同,服务端流 RPC 模式,客户端发起一个 RPC 请求,服务端会返回一系列的响应结果,发送完所有的响应后,服务端在流结尾标记服务状态详情作为结束的元数据。

service MysqlService {

rpc DeleteRecord(MysqlReq) returns (stream MysqlRes) {}

}
客户端流模式(client-streaming RPC)

假设以下场景,有一个服务每天需要定时和订单管理系统更新订单的最终状态,服务端只需要在最后告诉该服务,最终的处理结果(哪些更新成功,哪些失败)。不需要频繁和服务端建立和断开连接。从而可以降低服务端的并发节省连接资源。

与服务端流模式类似,客户端流会发送连续的更新请求给服务端,服务端在收到请求后不会立马给到客户端响应结果,直到请求结束了,才会返回一个单独的响应。

service MysqlService {

rpc InsertRecord(stream MysqlReq) returns (MysqlRes) {};

}
双向流模式(bidirectional-streaming RPC)

同样以上面的订单更新的服务为例子,每天定时向管理系统推送一批订单进行更新,假如这个批订单有上千万的量,而且订单更新服务之后还需要和其他系统进行数据同步,这个时候使用客户端流模式显然不是那么合适,我们可以采用双向流模式,不断地推送订单请求给管理系统,服务端在收到请求,处理完立马返回处理结果给客户端,直到所有的请求处理结束。这种场景就是双向流模式。

service MysqlService {

rpc UpdateRecord(stream MysqlReq) returns (stream MysqlRes) {};

}
一个简单的 GRPC 服务实现
这个服务为客户端提供 MYSQL 增删改查的 gRPC 接口。

项目目录结构:

_01_grpc_demo:
-> conf # 数据库连接配置
-> cmd # 测试样例
-> pb # proto && pb.go 文件
-> client.go # grpc 客户端代码
-> server.go # grpc 服务端代码
-> Makefile # 命令脚本
代码实现
mysql.proto

syntax = “proto3”;

package _01_grpc_demo;

option go_package = “./;pb”;

message MysqlReq {
string sql = 1;
}

message MysqlRes {
uint32 code = 1;
string msg = 2;
string data = 3;
}

service MysqlService {
rpc SelectRecord(MysqlReq) returns (MysqlRes) {};
rpc InsertRecord(stream MysqlReq) returns (MysqlRes) {};
rpc DeleteRecord(MysqlReq) returns (stream MysqlRes) {}
rpc UpdateRecord(stream MysqlReq) returns (stream MysqlRes) {};
}
server.go

package _01_grpc_demo

import (
“context”
“encoding/json”
“errors”
“fmt”
“io”
“log”

“_01_grpc_demo/conf”
“_01_grpc_demo/pb”
)

type MysqlServer struct {}

func NewMysqlServer() *MysqlServer {
return &MysqlServer{}
}

func (*MysqlServer) SelectRecord(ctx context.Context, req *pb.MysqlReq) (*pb.MysqlRes, error) {

sql := req.GetSql()
log.Printf(“receives a select record request with sql : %s”, sql)

// test sleep
// time.Sleep(time.Second * 5)

if err := ctx.Err(); err != nil {
return nil, err
}

users, res := make([]*conf.User, 0), &pb.MysqlRes{}
if err := conf.DB.Raw(sql).Scan(&users).Error; err != nil {
return nil, err
}

rows, err := json.Marshal(users)
if err != nil {
return nil, err
}

res.Code = 200
res.Msg = “select success”
res.Data = string(rows)

return res, nil
}

func (*MysqlServer) InsertRecord(stream pb.MysqlService_InsertRecordServer) error {

var sql string
for {
req, err := stream.Recv()
if err == io.EOF || req == nil{
log.Printf(“read ends”)
break
}

if len(sql) == 0 {
sql = req.Sql
} else {
sql = fmt.Sprintf(“%s;%s”, sql, req.Sql)
}
}

log.Printf(“receives a insert record request with sql : %s”, sql)

// test sleep
// time.Sleep(time.Second * 5)

res := &pb.MysqlRes{
Code: 200,
Msg: fmt.Sprintf(“insert records sql [%s] succ”, sql),
Data: _empty,
}

if err := conf.DB.Exec(sql).Error; err != nil {
res.Code = 500
res.Msg = fmt.Sprintf(“exec %s err [%s]”, sql, err)
return err
}

// send res
if err := stream.SendAndClose(res); err != nil {
errs := fmt.Sprintf(“send res [%s] err [%s]”, res.Msg, err)
return errors.New(errs)
}

return nil
}

func (*MysqlServer) DeleteRecord(
req *pb.MysqlReq,
stream pb.MysqlService_DeleteRecordServer,
) error {

sql := req.GetSql()
log.Printf(“receives a delete record request with sql : %s”, sql)

// test sleep
// time.Sleep(time.Second * 5)

res := &pb.MysqlRes{
Code: 200,
Msg: fmt.Sprintf(“delete records sql [%s] succ”, sql),
Data: _empty,
}

if err := conf.DB.Exec(sql).Error; err != nil {
res.Code = 500
res.Msg = fmt.Sprintf(“exec %s err [%s]”, sql, err)
}

// send res
if err := stream.Send(res); err != nil {
resBytes, _ := json.Marshal(res)
return fmt.Errorf(“send res [%s] to stream err [%s]”, string(resBytes), err)
}

return nil
}

func (*MysqlServer) UpdateRecord(stream pb.MysqlService_UpdateRecordServer) error {
var sql string
for {
req, err := stream.Recv()
if err == io.EOF || req == nil{
log.Printf(“read ends\n”)
break
}

if len(sql) == 0 {
sql = req.Sql
} else {
sql = fmt.Sprintf(“%s;%s”, sql, req.Sql)
}
}

log.Printf(“receives a update record request with sql : %s\n”, sql)

res := &pb.MysqlRes{
Code: 200,
Msg: fmt.Sprintf(“update records sql [%s] succ”, sql),
Data: _empty,
}

if err := conf.DB.Exec(sql).Error; err != nil {
res.Code = 500
res.Msg = fmt.Sprintf(“exec %s err [%s]”, sql, err)
}

if err := stream.Send(res); err != nil {
resBytes, _ := json.Marshal(res)
return fmt.Errorf(“send res [%s] to stream err [%s]”, string(resBytes), err)
}

return nil
}
client.go

package _01_grpc_demo

import (
“context”
“encoding/json”
“fmt”
“io”
“log”
“time”

“google.golang.org/grpc”

“_01_grpc_demo/pb”
)

const (
_empty = “”
)

type MysqlClient struct {
service pb.MysqlServiceClient
}

func NewMysqlClient(cc *grpc.ClientConn) *MysqlClient {
service := pb.NewMysqlServiceClient(cc)

return &MysqlClient{service}
}

func (mysqlClient *MysqlClient) SelectRecord(sql string) string {
req := &pb.MysqlReq{
Sql: sql,
}

// set timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

res, err := mysqlClient.service.SelectRecord(ctx, req)
if err != nil {
log.Fatal(“select record err “, err)
return _empty
}

return fmt.Sprintf(“select record success %s”, res.GetData())
}

func (mysqlClient *MysqlClient) InsertRecord(sql string) string {

req := &pb.MysqlReq{
Sql: sql,
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

stream, err := mysqlClient.service.InsertRecord(ctx)
if err != nil {
log.Fatal(“insert record err “, err)
return _empty
}

if err = stream.Send(req); err != nil {
log.Fatal(“send req err “, err)
return _empty
}

res, err := stream.CloseAndRecv()
if err != nil {
log.Fatal(“receive res err “, err)
return _empty
}

resBytes, _ := json.Marshal(res)
return string(resBytes)
}

func (mysqlClient *MysqlClient) DeleteRecord(sql string) string {

req := &pb.MysqlReq{
Sql: sql,
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

stream, err := mysqlClient.service.DeleteRecord(ctx, req)
if err != nil {
log.Fatal(“delete record err “, err)
return _empty
}

resBytes := make([]byte, 0)
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(“cannot receive response: “, err)
return _empty
}

resBytes, _ = json.Marshal(res)
}

return string(resBytes)
}

func (mysqlClient *MysqlClient) UpdateRecord(sql string) string {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

stream, err := mysqlClient.service.UpdateRecord(ctx)
if err != nil {
log.Fatal(“update record err “, err)
return _empty
}

waitResponse := make(chan error)
// go routine to receive responses
go func() {
for {
res, err := stream.Recv()
if err == io.EOF {
log.Print(“no more responses”)
waitResponse <- nil return } if err != nil { waitResponse <- fmt.Errorf("cannot receive stream response: %v", err) return } log.Print("received response: ", res) } }() req := &pb.MysqlReq{ Sql: sql, } err = stream.Send(req) if err != nil { return fmt.Sprintf("send stream err: %s", err) } err = stream.CloseSend() if err != nil { return fmt.Sprintf("cannot close send: %s", err) } err = <-waitResponse return fmt.Sprintf("%s0000", err) } makeFile clean: rm pb/*.go gen: protoc --plugin=protoc-gen-go=/d/workspace/golang/bin/protoc-gen-go.exe --go_out=plugins=grpc:pb --proto_path=pb pb/*.proto server: go run cmd/server/main.go -port 8080 client: go run cmd/client/main.go -address 0.0.0.0:8080

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!