gRPC服务注册发现及负载均衡的实现方案与源码解析[通俗易懂]

gRPC服务注册发现及负载均衡的实现方案与源码解析[通俗易懂]今天聊一下gRPC的服务发现和负载均衡原理相关的话题,不同于Nginx、Lvs或者F5这些服务端的负载均衡策略,gRPC采用的是客户端实现的负载

欢迎大家来到IT世界,在知识的湖畔探索吧!

今天聊一下gRPC的服务发现和负载均衡原理相关的话题,不同于NginxLvs或者F5这些服务端的负载均衡策略,gRPC采用的是客户端实现的负载均衡。什么意思呢,对于使用服务端负载均衡的系统,客户端会首先访问负载均衡的域名/IP,再由负载均衡按照策略分发请求到后端具体某个服务节点上。而对于客户端的负载均衡则是,客户端从可用的后端服务节点列表中根据自己的负载均衡策略选择一个节点直连后端服务器。

Etcd软件包的naming组件里提供了一个命名解析器(naming resolver)结合gRPC本身自带的RoundRobin 轮询调度负载均衡器,让使用者能方便地搭建起一套服务注册/发现和负载均衡体系。如果轮询调度满足不了调度需求或者不想使用Etcd作为服务的注册中心和命名解析器的话,可以通过写代码实现gRPC定义的ResolverBalancer接口来满足系统的自定义需求。

本文引用的源码对应的版本为:gRPC v1.2.x、 Etcd v3.3
如果你对gRPC和Etcd还不了解,可以先看看我很早之前写的gRPC入门和Etcd入门 系列的文章。

gRPC服务注册发现

先来简单的说明一下用Etcd实现服务注册和发现的原理。服务注册和发现这个流程可以用下面这个示意图简单描述出来:

gRPC服务注册发现及负载均衡的实现方案与源码解析[通俗易懂]

上图的服务A包含了两个节点,服务在节点上启动后,会以包含服务名加节点IP的唯一标识作为Key(比如/service/a/114.128.45.117),服务节点IP和端口信息作为值存储到Etcd上。这些Key都是带租约的Key,需要我们的服务自己去定期续租,一旦服务节点本身宕掉,比如node2上的服务宕掉,无法完成续租后,那么它对应的Key:/service/a/114.128.45.117 就会过期,客户端也就无法再从Etcd上获取到这个服务节点的信息了。

与此同时客户端也会利用Etcd的Watch功能监听以/servive/a为前缀的所有Key的变化,如果有新增或者删除节点Key的事件发生Etcd都会通过WatchChan发送给客户端,WatchChan在编程语言上的实现就是Go的Channel。

服务注册

关于Etcd的服务注册,官方提供的软件包里并没有提供统一的注册函数供调用。那么我们在新增服务节点后怎么把节点的信息存储到Etcd上并通知给命名解析器呢?在Etcd源码包的naming/grpc.go里可以发现提供了一个Update方法,这个Update既能执行添加也能执行删除操作:

func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts …etcd.OpOption) (err error) {

switch nm.Op {

case naming.Add:

var v []byte

if v, err = json.Marshal(nm); err != nil {

return status.Error(codes.InvalidArgument, err.Error())

}

_, err = gr.Client.KV.Put(ctx, target+”/”+nm.Addr, string(v), opts…)

case naming.Delete:

_, err = gr.Client.Delete(ctx, target+”/”+nm.Addr, opts…)

default:

return status.Error(codes.InvalidArgument, “naming: bad naming op”)

}

return err

}

服务在启动完成后可以通过Update方法把自己的服务地址和端口Put到自定义的target为前缀的key里,针对上面图示里的例子,变量target就应该是我们定义的服务名/service/a。一般在具体实践里都是自己根据系统的需求封装Update方法完成服务注册,以及服务节点Key在Etcd上的定期续租,这块每个公司的实践都不一样,我就不放具体的代码了,一般续租都是通过Etcd租约里的KeepAlive方法实现的(Lease.KeepAlive)。

服务发现

在注册完新节点、或者是原来的节点停掉后,客户端是怎么知道的呢?这块就需要命名解析器Resolver来帮助实现了,Resolver的作用可以理解为从一个字符串映射到一组IP端口等信息。

gRPC对Resolver的接口定义如下:

type Resolver interface {

// Resolve creates a Watcher for target.

Resolve(target string) (Watcher, error)

}

命名解析器的Resolve方法会返回一个Watcher,这个Watcher可以监听命名解析器发来的target(类似上面例子里说的与服务名相对应的Key)对应的后端服务器地址信息变化,通知Balancer对自己维护的地址进行动态地增删。

Watcher接口的定义如下:

//源码地址 https://github.com/grpc/grpc-go/blob/v1.2.x/naming/naming.go

type Watcher interface {

Next() ([]*Update, error)

// Close closes the Watcher.

Close()

}

Etcd为这两个接口都提供了实现:

// 源码地址:https://github.com/etcd-io/etcd/blob/release-3.3/clientv3/naming/grpc.go

// GRPCResolver 实现了grpc的naming.Resolver接口

type GRPCResolver struct {

// Client is an initialized etcd client.

Client *etcd.Client

}

func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {

ctx, cancel := context.WithCancel(context.Background())

w := &gRPCWatcher{c: gr.Client, target: target + “/”, ctx: ctx, cancel: cancel}

return w, nil

}

// 实现了grpc的naming.Watcher接口

type gRPCWatcher struct {

c *etcd.Client

target string

ctx context.Context

cancel context.CancelFunc

wch etcd.WatchChan

err error

}

func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {

if gw.wch == nil {

// first Next() returns all addresses

return gw.firstNext()

}

// process new events on target/*

wr, ok := <-gw.wch

if !ok {

updates := make([]*naming.Update, 0, len(wr.Events))

for _, e := range wr.Events {

var jupdate naming.Update

var err error

switch e.Type {

case etcd.EventTypePut:

err = json.Unmarshal(e.Kv.Value, &jupdate)

jupdate.Op = naming.Add

case etcd.EventTypeDelete:

err = json.Unmarshal(e.PrevKv.Value, &jupdate)

jupdate.Op = naming.Delete

default:

continue

}

if err == nil {

updates = append(updates, &jupdate)

}

}

return updates, nil

}

func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {

// 获取前缀为gw.target的所有Key的值,放到现有数组里

resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())

if gw.err = err; err != nil {

return nil, err

}

updates := make([]*naming.Update, 0, len(resp.Kvs))

for _, kv := range resp.Kvs {

var jupdate naming.Update

if err := json.Unmarshal(kv.Value, &jupdate); err != nil {

continue

}

updates = append(updates, &jupdate)

}

opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}

// watch 监听这些Key的变化,包括前缀相同的新Key的加入

gw.wch = gw.c.Watch(gw.ctx, gw.target, opts…)

return updates, nil

}

func (gw *gRPCWatcher) Close() { gw.cancel() }

这部分GRPCResolver和gRPCWatcher类型的每个方法的功能和起到的作用都和RoundRobin这个gRPC Balancer结合地比较紧密,我准备放到下面和负载均衡的源码实现一起说明。

负载均衡

首先我们来看一下gRPC对负载均衡的接口定义:

type Balancer interface {

Start(target string, config BalancerConfig) error

Up(addr Address) (down func(error))

Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)

Notify() <-chan []Address

// Close shuts down the balancer.

Close() error

}

在gRPC 客户端与服务端之间建立连接时调用的Dail方法里可以用WithBalancer方法在DiaplOption里指定负载均衡组件:

client, err := etcd.Client()

resolver := &naming.GRPCResolver{Client: client}

b := grpc.RoundRobin(resolver)

opt0 := grpc.WithBalancer(b)

grpc.Dial(target, opt0 , opt1, …) // 后面省略了

上面的例子使用了gRPC自带的Balancer实现RoundRobin,RoundRobin除了实现了Balancer接口外自己内置了Resolver用来从名字获取其后绑定的IP信息以及服务的更新事件(增加删除

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/17870.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们YX

mu99908888

在线咨询: 微信交谈

邮件:itzsgw@126.com

工作时间:时刻准备着!

关注微信