GRPC (4) : 名字解析器

作者: adm 分类: go 发布时间: 2023-09-27

上一章学习了 gRPC 截止时间,多路复用和元数据等特性,今天学习名字解析器j及其实现原理。

名字解析器(Name Resolver)
名字解析器用作将给定的服务名称解析为对应的后端 IP 地址和端口号,gRPC 中默认使用的是 passthrough 解析器,即没有指定 scheme 的时候会默认使用它作为解析器。此外,gRPC还支持通过接口的方式,自定义名字解析器,详见后面的 demo。

名字解析器的使用

服务端
package main

import (
“context”
“fmt”
“log”
“net”

pb “github.com/unendlichkeiten/private_projects/pb”
“google.golang.org/grpc”
)

const addr = “localhost:50051”

type ecServer struct {
pb.UnimplementedEchoServer
addr string
}

func (s *ecServer) UnaryEcho(
ctx context.Context,
req *pb.EchoRequest) (*pb.EchoResponse, error) {

return &pb.EchoResponse{
Message: fmt.Sprintf(“%s (from %s)”, req.Message,
s.addr)}, nil
}

func main() {
lis, err := net.Listen(“tcp”, addr)
if err != nil {
log.Fatalf(“failed to listen: %v”, err)
}
s := grpc.NewServer()
pb.RegisterEchoServer(s, &ecServer{addr: addr})
log.Printf(“serving on %s\n”, addr)
if err := s.Serve(lis); err != nil {
log.Fatalf(“failed to serve: %v”, err)
}
}
客户端
客户端建立连接是使用自己定义的 scheme,需要自己实现 scheme 对应的 resolver 和 resolverBuilder

package main

import (
“context”
“fmt”
“log”
“time”

“google.golang.org/grpc”
“google.golang.org/grpc/resolver”

pb “github.com/unendlichkeiten/private_projects/pb”
)

const (
myScheme = “custom”
myServiceName = “resolver.custom.hamming.com”

backendAddr = “localhost:50051”
)

func callUnaryEcho(c pb.EchoClient, message string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message})
if err != nil {
log.Fatalf(“could not greet: %v”, err)
}
fmt.Println(r.Message)
}

func makeRPCs(cc *grpc.ClientConn, n int) {
hwc := pb.NewEchoClient(cc)
for i := 0; i < n; i++ { callUnaryEcho(hwc, "this is examples/name_resolving") } } func main() { passthroughConn, err := grpc.Dial( // passthrough 是 gRPC 内置的一个 scheme // Dial to "passthrough:///localhost:50051" fmt.Sprintf("passthrough:///%s", backendAddr), grpc.WithInsecure(), grpc.WithBlock(), ) if err != nil { log.Fatalf("did not connect: %v", err) } defer passthroughConn.Close() fmt.Printf("calling SayHello to \"passthrough:///%s\"\n", backendAddr) makeRPCs(passthroughConn, 10) fmt.Println() ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() exampleConn, err := grpc.DialContext( ctx, // 使用自定义的名字解析器 // Dial to "custom:///resolver.custom.hamming.com" fmt.Sprintf("%s:///%s", myScheme, myServiceName), grpc.WithInsecure(), grpc.WithBlock(), ) if err != nil { log.Fatalf("did not connect: %v", err) } defer exampleConn.Close() fmt.Printf("calling SayHello to \"%s:///%s\"\n", myScheme, myServiceName) makeRPCs(exampleConn, 10) } // resolver 的实现 // Following is an example name resolver. It includes a // ResolverBuilder(https://godoc.org/google.golang.org/grpc/resolver#Builder) // and a Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). // // A ResolverBuilder is registered for a scheme (in this example, "example" is // the scheme). When a ClientConn is created for this scheme, the // ResolverBuilder will be picked to build a Resolver. Note that a new Resolver // is built for each ClientConn. The Resolver will watch the updates for the // target, and send updates to the ClientConn. // customResolverBuilder is a // ResolverBuilder(https://godoc.org/google.golang.org/grpc/resolver#Builder). type customResolverBuilder struct{} // Build 构建解析器 func (*customResolverBuilder) Build( target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { r := &customResolver{ target: target, cc: cc, addrsStore: map[string][]string{ myServiceName: {backendAddr}, }, } r.ResolveNow(resolver.ResolveNowOptions{}) return r, nil } // Scheme 返回 customResolverBuilder 对应的 scheme func (*customResolverBuilder) Scheme() string { return myScheme } // customResolver is a // Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). type customResolver struct { target resolver.Target cc resolver.ClientConn addrsStore map[string][]string } func (r *customResolver) ResolveNow(o resolver.ResolveNowOptions) { // 直接从map中取出对于的addrList addrStrs := r.addrsStore[r.target.Endpoint] addrs := make([]resolver.Address, len(addrStrs)) for i, s := range addrStrs { addrs[i] = resolver.Address{Addr: s} } r.cc.UpdateState(resolver.State{Addresses: addrs}) } func (*customResolver) Close() {} func init() { // Register the example ResolverBuilder. This is usually done in a package's // init() function. resolver.Register(&customResolverBuilder{}) } 运行结果 服务端 $ go run main.go 2022/07/29 19:49:56 serving on localhost:50051 客户端 $ go run main.go calling SayHello to "passthrough:///localhost:50051" this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) calling SayHello to "custom:///resolver.custom.hamming.com" this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) this is examples/name_resolving (from localhost:50051) 一切正常,说明我们的自定义 Resolver 是可以运行的,那么接下来从源码层面来分析一下 gRPC 中 Resolver 具体是如何工作的。 resolver 包括 ResolverBuilder 和 Resolver 两个部分,需要实现 Builder 和 Resolver 两个接口,即上面自定义的 customResolverBuilder 和 customResolver。 // resolver.go // Builder creates a resolver that will be used to watch name resolution updates. type Builder interface { // Build creates a new resolver for the given target. // // gRPC dial calls Build synchronously, and fails if the returned error is // not nil. Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) // Scheme returns the scheme supported by this resolver. // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. Scheme() string } // Resolver watches for the updates on the specified target. // Updates include address updates and service config updates. type Resolver interface { // ResolveNow will be called by gRPC to try to resolve the target name // again. It's just a hint, resolver can ignore this if it's not necessary. // // It could be called multiple times concurrently. ResolveNow(ResolveNowOptions) // Close closes the resolver. Close() } Resolver 是整个功能最核心的代码,用于将服务名解析成对应实例。Builder 则采用 Builder 模式在包初始化时创建并注册构造自定义 Resolver 实例。当客户端通过 Dial 方法对指定服务进行拨号时,grpc resolver 查找注册的 Builder 实例调用其 Build() 方法构建自定义 Resolver。 源码分析(基于 grpc-go v1.36.0) import ( _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver. ) // clientconn.go +103 // 客户端调用 grpc.Dial() 方法建立连接, 进入 DialContext() 方法 // Dial creates a client connection to the given target. func Dial(target string, opts ...DialOption) (*ClientConn, error) { return DialContext(context.Background(), target, opts...) } 阅读DialContext() 方法中 resolver 解析和构建部分逻辑 // clientconn.go +249 // 解析 target 确定要使用的解析器 // grpc 内部支持 passthrough, dns 和 unix 类型 cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil) channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme) // 根据上面解析的 scheme 到列表中找到对应的 reslverBuilder resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme) if resolverBuilder == nil { // 如果指定的 scheme 找不到对应的 resolverBuilder,则使用默认的 defaultScheme // 默认使用 passthrough,直接从根据 target 获取对应的 endpoint 地址 channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) cc.parsedTarget = resolver.Target{ Scheme: resolver.GetDefaultScheme(), Endpoint: target, } // *********** 阶段一 获取 builder *********** resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme) if resolverBuilder == nil { return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme) } } // ...... 这里省略不关心的代码 ...... // Build the resolver. 创建一个解析器 // *********** 阶段二 获取 Resolver *********** rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) if err != nil { return nil, fmt.Errorf("failed to build resolver: %v", err) } 至此我们就拿到了指定 scheme 的 resolver,继续阅读里面的代码发现,上面通过调用以下方法得到对应的 resolver。 阶段一 获取 Builder 分析 // clientconn.go +1586 // 根据解析得到的 scheme 获取对应的 resolverBuilder func (cc *ClientConn) getResolver(scheme string) resolver.Builder { for _, rb := range cc.dopts.resolvers { if scheme == rb.Scheme() { return rb } } return resolver.Get(scheme) } // resolver.go +51 // Get returns the resolver builder registered with the given scheme. // If no builder is register with the scheme, nil will be returned. func Get(scheme string) Builder { if b, ok := m[scheme]; ok { return b } return nil } 源码中可以看到,builder 的获取实际上是从 m 中拿到的,m 中的 builder 又是从哪里来的?返回最开始的代码片段,我们看到有 resolver 有引用4个包 import ( _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver. ) 这四个包都有一个 init() 函数,里面调用了 resolver.Register() 方法,将对应的 builder 注册到 m 中的 map 中,自定义的解析器同样通过初始化函数将 builder 注册到 m 中。 // passthrough.go +55 func init() { resolver.Register(&passthroughBuilder{}) } 阶段二 获取 Resolver 分析 // clientconn.go +313 // 根据 resovlerBuilder 创建解析器 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) if err != nil { return nil, fmt.Errorf("failed to build resolver: %v", err) } // resolver_conn_wrapper +74 // newCCResolverWrapper 调用定义的 Build 方法创建 Resolver ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo) if err != nil { return nil, err } 继续阅读 Build() 方法里面的代码,我们看到,里面会调用 resolveNow(), 进一步调用 UpdateState() 来更新客户端连接状态,至于如何更新这里不做阐述,后面有时间探讨。 func (r *customResolver) ResolveNow(o resolver.ResolveNowOptions) { // 直接从map中取出对于的addrList addrStrs := r.addrsStore[r.target.Endpoint] addrs := make([]resolver.Address, len(addrStrs)) for i, s := range addrStrs { addrs[i] = resolver.Address{Addr: s} } r.cc.UpdateState(resolver.State{Addresses: addrs}) } 总结 客户端启动时,注册自定义的 resolver,通过 init() 将字对应的 resolverBuilder 注册到全局变量 map 中,还有 gRPC 内置的 resolverBuilder。 客户端调用 Dail() 方法构造连接对象 grpc.ClientConn grpcutil.ParseTarget 获取对应的 scheme 根据 scheme 拿到对应的 resolverBuilder(全局 map 中遍历得到) 根据 resolverBuilder 拿到对应的 resolver (build() 方法中调用 resolveNow() 完成名字到IP的解析 )

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!