GO的框架之gRPC(二)

服务端 核心数据结构 grpc 服务端领域,自上而下有着三个层次分明的结构:server->service->method 最高级别是 server,是对整个 grpc 服务端的抽象 一个 server 下可以注册挂载多个业务服务 service 一个 service 下存在多个业务处理方法 met

服务端

核心数据结构

grpc 服务端领域,自上而下有着三个层次分明的结构:server->service->method

  • 最高级别是 server,是对整个 grpc 服务端的抽象
  • 一个 server 下可以注册挂载多个业务服务 service
  • 一个 service 下存在多个业务处理方法 method

(1)server

type Server struct {
    // 配置项
    opts serverOptions
    // 互斥锁保证并发安全
    mu  sync.Mutex
    // tcp 端口监听器池
    lis map[net.Listener]bool
    // ...
    // 连接池
    conns    map[string]map[transport.ServerTransport]bool
    serve    bool
    cv       *sync.Cond
    // 业务服务映射管理
    services map[string]*serviceInfo // service name -> service info
    // ...
    serveWG            sync.WaitGroup
    // ...
} 

Server 类是对 grpc 服务端的代码实现,其中通过一个名为 services 的 map,记录了由服务名到具体业务服务模块的映射关系.

(2)serviceInfo

type serviceInfo struct {
    // 业务服务类
    serviceImpl interface{
    // 业务方法映射管理  
    methods     map[string]*MethodDesc
    // ...
}

serviceInfo 是某一个具体的业务服务模块,其中通过一个名为 methods 的 map 记录了由方法名到具体方法的映射关系.

(3)MethodDesc

type MethodDesc struct {
    MethodName string
    Handler    methodHandler
} 

MethodDesc 是对方法的封装,其中的字段 Handler 是真正的业务处理方法.

(4)methodHandler

type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, **interceptor UnaryServerInterceptor**) (interface{}, error)

methodsHandler 是业务处理方法的类型,其中几个关键入参的含义分别是:

  • srv:业务处理方法从属的业务服务模块
  • dec:进行入参 req 反序列化的闭包函数
  • interceptor:业务处理方法外部包裹的拦截器方法

创建server

grpc.NewServer 方法中会创建 server 实例,并调用 chainUnaryServerInterceptors 方法,将一系列拦截器 interceptor 成链,并注入到 ServerOption 当中.

func NewServer(opt ...ServerOption) *Server {
    opts := defaultServerOptions
    for _, o := range extraServerOptions {
        o.apply(&opts)
    }
    for _, o := range opt {
        o.apply(&opts)
    }
    s := &Server{
        lis:      make(map[net.Listener]bool),
        opts:     opts,
        conns:    make(map[string]map[transport.ServerTransport]bool),
        services: make(map[string]*serviceInfo),
        quit:     grpcsync.NewEvent(),
        done:     grpcsync.NewEvent(),
        czData:   new(channelzData),
    }
    chainUnaryServerInterceptors(s)
    //...
    s.cv = sync.NewCond(&s.mu)
    // ...   
    return s
}

注册service

Untitled.png

创建好 grpc server 后,接下来通过使用桩代码中预生成好的 RegisterXXXServer 方法,业务处理服务 service 模块注入到 server 当中.

func RegisterHelloServiceServer(s grpc.ServiceRegistrar, srv HelloServiceServer) {
    s.RegisterService(&HelloService_ServiceDesc, srv)
}

注册过程会经历 RegisterHelloServiceServer->Server.RegisterService -> Server.register 的调用链路,把 service 的所有方法注册到 serviceInfo 的 methods map 当中,然后将 service 封装到 serviceInfo 实例中,注册到 server 的 services map 当中

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    s.mu.Lock()
    defer s.mu.Unlock()
    // ...
    info := &serviceInfo{
        serviceImpl: ss,
        methods:     make(map[string]*MethodDesc),
        streams:     make(map[string]*StreamDesc),
        mdata:       sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        info.methods[d.MethodName] = d
    }
    // ...
    s.services[sd.ServiceName] = info
}

Server运行

Untitled.png

grpc server 运行的流程,核心是基于 for 循环实现的主动轮询模型,每轮会通过调用 net.Listener.Accept 方法,基于 IO 多路复用 epoll 方式,阻塞等待 grpc 请求的到达.

func (s *Server) Serve(lis net.Listener) error {
    // ...


    var tempDelay time.Duration // how long to sleep on accept failure
    for {
        rawConn, err := lis.Accept()
        if err != nil {
            // ...
        }
        // ...
        s.serveWG.Add(1)
        go func() {
            s.handleRawConn(lis.Addr().String(), rawConn)
            s.serveWG.Done()
        }()
    }
}

每当有新的连接到达后,服务端会开启一个 goroutine,调用对应的Server.handleRawConn 方法对请求进行处理.

请求处理

Untitled.png

在 Server.handleRawConn 方法中,会基于原始的 net.Conn 封装生成一个 HTTP2Transport,然后开启 goroutine 调用 Server.serveStream 方法处理请求.

接下来一连建立了 Server.serveStreams -> http2Server.HandleStreams -> http2Server.operateHeaders -> http2Server.handleStream -> Server.processUnaryRPC 的方法调用链:

  • 在 Server.handleStream 方法中,会拆解来自客户端的请求路径 ${service}/${method},通过"/" 前段得到 service 名称,通过 “/” 后端得到 method 名称,并分别映射到对应的业务服务和业务方法
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
    sm := stream.Method()
    // ...
    **pos := strings.LastIndex(sm, "/")
    
    service := sm[:pos]
    method := sm[pos+1:]**


    srv, knownService := s.services[service]
    if knownService {
        if md, ok := srv.methods[method]; ok {
            s.processUnaryRPC(t, stream, srv, md, trInfo)
            return
        }
        if sd, ok := srv.streams[method]; ok {
            s.processStreamingRPC(t, stream, srv, sd, trInfo)
            return
        }
    }
    // ...
}
  • 在 Server.processUnaryRPC 方法中,会通过 recvAndDecompress 读取到请求内容字节流,然后通过闭包函数 df 封装好反序列请求参数的逻辑,继而调用 md.Handler 方法处理请求,最终通过 Server.sendResponse 方法将响应结果进行返回
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
    // ...
    d, err := **recvAndDecompress**(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
    // ...
    df := func(v interface{}) error {
        if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
           // ...
        }
        // ...
    }
    ctx := NewContextWithServerTransportStream(stream.Context(), stream)
    reply, appErr := **md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)**
    // ...


    if err := **s.sendResponse**(t, stream, reply, cp, opts, comp); err != nil {
        // ...
    }
    // ...
}

以helloService 为例,客户端调用 SayHello 方法后,服务端对应的 md.Handler 正是 .proto 文件生成的位于 .grpc.pb.go 文件中的桩方法 _HelloService_SayHello_Handler.

在该桩方法内部中,包含的执行步骤如下:

  • 调用闭包函数 dec,将请求内容反序列化到请求入参 in 当中
  • 将业务处理方法 HelloServiceServer.SayHello 闭包封装到一个 UnaryHandler 当中
  • 调用 intercetor 方法,分别执行拦截器和 handler 的处理逻辑

拦截器

拦截器的作用,是在执行核心业务方法的前后,创造出一个统一的切片,来执行所有业务方法锁共有的通用逻辑. 此外,我们还能够通过这部分通用逻辑的执行结果,来判断是否需要熔断当前的执行链路,以起到所谓的”拦截“效果.

有关 grpc 拦截器的内容,其实和 gin 框架中的 handlersChain 是异曲同工的. 在我之前分享的文章 ”解析 Gin 框架底层原理“ 的第 5 章内容中有作详细介绍,大家不妨引用对比,以此来触类旁通,加深理解.

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error) 

其中几个入参的含义分别为:

  • req:业务处理方法的请求参数
  • info:当前所属的业务服务 service
  • handler:真正的业务处理方法

因此拦截器函数的使用模式应该是:

var myInterceptor1 = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    // 前处理校验
    if err := preLogicCheck();err != nil{
       // 前处理校验不通过,则拦截,不调用业务方法直接返回
       return nil,err
    }

     // 前处理校验通过,正常调用业务方法
     resp, err = handle(ctx,req)
     if err != nil{
         return nil,err
     }

      // 后置处理校验
      if err := postLogicCheck();err != nil{
         // 后置处理校验不通过,则拦截结果,包装错误返回
         return nil,err
      }

      // 正常返回结果
      return resp,nil
}

拦截器链

Untitled.png

func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
        return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
    }
}


func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
    if curr == len(interceptors)-1 {
        return finalHandler
    }
    return func(ctx context.Context, req interface{}) (interface{}, error) {
        return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
    }
}

首先,chainUnaryInterceptors 方法会将一系列拦截器 interceptor 成链,并返回首枚interceptor 供 ServerOption 接收设置.(和gin中的handlerChain几乎一样)

其中,拦截器成链的关键在于 getChainUnaryHandler 方法中,其中会闭包调用拦截器数组的首枚拦截器函数,接下来依次用下一枚拦截器对业务方法 handler 进行包裹,封装成一个新的 ”handler“ 供当前拦截器使用.

Untitled.png

展示一下 grpc 拦截器链的实操例子.

  • 依次声明拦截器1 myInterceptor1 和 拦截器2 myInterceptor2,会在调用业务方法 handler 前后分别打印一行内容
  • 在创建 grpc server 时,将两个拦截器基于 option 注入
  • 通过客户端请求服务端,通过输出日志观察拦截器运行效果
var myInterceptor1 = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    fmt.Printf("interceptor1 preprocess, req: %+v\n", req)
    resp, err = handler(ctx, req)
    fmt.Printf("interceptor1 postprocess, req: %+v\n", resp)
    return
}
 

var myInterceptor2 = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    fmt.Printf("interceptor2 preprocess, req: %+v\n", req)
    resp, err = handler(ctx, req)
    fmt.Printf("interceptor2 postprocess, resp: %+v\n", resp)
    return
}


func (s *Server) SayHello(ctx context.Context, req *proto.HelloReq) (*proto.HelloResp, error) {
    fmt.Println("core handle logic......")
    return &proto.HelloResp{
        Reply: fmt.Sprintf("hello name: %s", req.Name),
    }, nil
}


func main() {
    listener, err := net.Listen("tcp", ":8093")
    if err != nil {
        panic(err)
    }


    server := grpc.NewServer(grpc.ChainUnaryInterceptor(myInterceptor1, myInterceptor2))
    proto.RegisterHelloServiceServer(server, &Server{})


    if err := server.Serve(listener); err != nil {
        panic(err)
    }
}

Untitled.png

grpc客户端

Untitled.png

首先给出 grpc-go 启动客户端的代码示例,核心内容分三块:

  • 调用 grpc.Dial 方法,指定服务端 target,创建 grpc 连接代理对象 ClientConn
  • 调用 proto.NewHelloServiceClient 方法,基于 pb 桩代码构造客户端实例
  • 调用 client.SayHello 方法,真正发起 grpc 请求
package main


import (
    "context"
    "fmt"

    "github.com/grpc_demo/proto"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

func main() {
    conn, err := grpc.Dial("localhost:8093", grpc.WithTransportCredentials(insecure.NewCredentials()))
    // ...
    defer conn.Close()

    client := proto.NewHelloServiceClient(conn)

    resp, err := client.SayHello(context.Background(), &proto.HelloReq{
        Name: "xiaoxuxiansheng",
    })
    // ...


    fmt.Printf("resp: %+v", resp)
}

核心数据结构

Untitled.png

ClientConn

ClientConn 是广义上的 grpc 连接代理对象,和 grpc 客户端是一对一的关系,内部包含了一个连接池,根据配置可能同时管理多笔连接. 其中几个核心字段包括:

  • target/parsedTarget:对服务端地址信息的封装
  • balancerWrapper:负载均衡器. 初始化时会启动一个守护协程,动态地对 ClientConn 及 Subconn 的状态进行刷新
  • blockingpicker:连接选择器. 在发送请求时,由其最终挑选出使用的 Subconn
  • resolverWrapper:解析器. 负责根据不同的 schema,通过 target 解析出服务端的实际地址
type ClientConn struct {
    // 连接上下文
    ctx    context.Context   
    // 上下文终止控制器
    cancel context.CancelFunc 


    // 连接的目标地址
    target          string               
    // 对连接目标地址的封装
    parsedTarget    resolver.Target    
    // ...
    // 连接配置项
    dopts           dialOptions          
    // 负载均衡器,底层基于 gracefulswitch.balancer
    balancerWrapper *ccBalancerWrapper  


    // 连接状态管理器
    csMgr              *connectivityStateManager
    // 连接选择器
    blockingpicker     *pickerWrapper
    // ...
    // 读写互斥锁
    mu              sync.RWMutex
    // 解析器
    resolverWrapper *ccResolverWrapper         
    // 连接池
    conns           map[*addrConn]struct{}     // Set to nil on close.
    // ...
}

ccBalancerWrapper

ccBalancerWrapper是在负载均衡器Balancer的基础上做的封装. 在ccBalancerWrapper被初始化时,会开启一个守护协程,通过监听 updateCh 中到达的事件,对 ClientConn 和 Subconn 的状态进行刷新.

type ccBalancerWrapper struct {
    // 指向所属的 clientConn
    cc *ClientConn


    // 负载均衡器
    balancer        *gracefulswitch.Balancer
    curBalancerName string
  
    // 接收更新事件的 chan
    updateCh *buffer.Unbounded 
    // 接收处理结果的 chan
    resultCh *buffer.Unbounded 
    // ...
}

ccBalancerWrapper 的核心是一个负载均衡器 Balancer 接口,其中包含了几个核心方法:

  • UpdateClientConnState:更新 ClientConn 的连接状态
  • ResolverError:错误后处理
  • UpdateSubConnState:更新子连接 Subconn 状态
  • Close:关闭负载均衡器
type Balancer interface {
    UpdateClientConnState(ClientConnState) error
    ResolverError(error)
    UpdateSubConnState(SubConn, SubConnState)
    Close()
}

在默认情况下,grpc客户端框架会为我们提供一个默认的负载均衡器 pickfirstBalancer:

type pickfirstBalancer struct {
    state   connectivity.State
    cc      balancer.ClientConn
    subConn balancer.SubConn
}

ccResolverWrapper

ccResolverWrapper的核心是内置的解析器 resolver.

type ccResolverWrapper struct {
    // 指向所属的 clientConn
    cc         *ClientConn
    resolverMu sync.Mutex
    // 核心成员:内置的解析器
    resolver   resolver.Resolver
    // ...
}

resolver通过Builder构造,对应的Buidler是一个interface,用户也可以提供自己的实现版本:

type Builder interface {
    // 构造解析器 resolver
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    // Scheme returns the scheme supported by this resolver.
    // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
    Scheme() string
}

resolver本身是一个接口,核心的方法是 ResolveNow:通过 target 解析出实际的客户端地址.

type Resolver interface {
    // 解析 target
    ResolveNow(ResolveNowOptions)
    // 关闭 resolver
    Close()
}

ClientStream

在grpc客户端客户端发起请求时,会首先创建出一个 ClientStream,并依赖其核心方法 SendMsg 和 RecvMsg 进行请求的发送和响应的接受.

type ClientStream interface {
    // 获取元数据
    Header() (metadata.MD, error)
    // 获取上下文
    Context() context.Context
    // 发送消息
    SendMsg(m interface{}) error
    // 接收消息
    RecvMsg(m interface{}) error
}

grpc.Dial

Untitled.png

在通过 DialContext 创建 grpc 连接代理 ClientConn 时,核心步骤包括:

  • 创建 ClientConn 实例
  • 调用 ClientConn.parseTargetAndFindResolver 方法,通过 target 中的 schema 获取到对应的解析器构造器 resolverBuilder
  • 调用 newCCBalancerWrapper 方法构造出负载均衡器封装对象 ccBalancerWrapper,在内部会开启守护协程感知和处理 ClientConn 和 Subconn 状态变更的事件
  • 调用 newCCResolverWrapper 方法,内部会调用 resolverBuilder 构造并启动 resolver 实例,同时会通过 ccBalancerWrapper 方法对 ClientConn 的状态进行更新

newCCBalancerWrapper

func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
    ccb := &ccBalancerWrapper{
        cc:       cc,
        updateCh: buffer.NewUnbounded(),
        resultCh: buffer.NewUnbounded(),
        closed:   grpcsync.NewEvent(),
        done:     grpcsync.NewEvent(),
    }
    go ccb.watcher()
    ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
    return ccb
}

Untitled.png

newCCBalancerWrapper 方法构造了 ccBalancer 实例,然后调用 ccBalancerWrapper.watcher 方法开启守护协程,分别监听 ClientConn 状态变更(ccStateUpdate)、Subconn 状态变更(scStateUpdate)、设定负载均衡器 balancer(switchToUpdate)、连接移除(subConnUpdate)等事件,并分别进行处理.

在 newCCBalancerWrapper 方法中,还调用了 gracefulswitch.NewBalancer 构造了内置负载均衡器的外壳,但真正的负载均衡器 Balancer 此时还未注入,注入实际会在 3.4 小节,resovler 启动的链路当中.

newCCResolverWrapper

newCCResolverWrapper 方法构造了 ccResolverWrapper 实例,但真正的核心逻辑是根据传入的 resolverBuilder 构造器出对应的 resolver 然后注入到 ccResolverWrapper 当中.

grpc-go 客户端默认的 resovlerBuilder 和 resolver 是 passthrough 类型,下面我们再一次展开

LICENSED UNDER CC BY-NC-SA 4.0
Comment