结构体和接口定义
// Cache is the registry cache interface
type Cache interface {
// embed the registry interface
registry.Registry
// stop the cache watcher
Stop()
}
看下实现类,核心结果cache存储服务对应注册中心服务实例,watch表示这个服务是否在watch,ttls表示服务的ttl,exit控制是否退出,running表示是否再异步监听执行
type cache struct {
registry.Registry
opts Options
// registry cache
sync.RWMutex
cache map[string][]*registry.Service
ttls map[string]time.Time
watched map[string]bool
// used to stop the cache
exit chan bool
// indicate whether its running
running bool
// status of the registry
// used to hold onto the cache
// in failure state
status error
// used to prevent cache breakdwon
sg singleflight.Group
}
对外接口有:
```go
GetService(service string, opts …registry.GetOption) ([]*registry.Service, error)
Stop()
New(r registry.Registry, opts …Option) Cache- getservice的核心方法get ```go unc (c *cache) get(service string) ([]*registry.Service, error) { // read lock c.RLock() // check the cache first services := c.cache[service] // get cache ttl ttl := c.ttls[service] // make a copy cp := util.Copy(services) // got services && within ttl so return cache if c.isValid(cp, ttl) { c.RUnlock() // return services return cp, nil } // get does the actual request for a service and cache it get := func(service string, cached []*registry.Service) ([]*registry.Service, error) { // ask the registry val, err, _ := c.sg.Do(service, func() (interface{}, error) { return c.Registry.GetService(service) }) services, _ := val.([]*registry.Service) if err != nil { // check the cache if len(cached) > 0 { // set the error status c.setStatus(err) // return the stale cache return cached, nil } // otherwise return error return nil, err } // reset the status if err := c.getStatus(); err != nil { c.setStatus(nil) } // cache results c.Lock() c.set(service, util.Copy(services)) c.Unlock() return services, nil } // watch service if not watched _, ok := c.watched[service] // unlock the read lock c.RUnlock() // check if its being watched if !ok { c.Lock() // set to watched c.watched[service] = true // only kick it off if not running if !c.running { go c.run() } c.Unlock() } // get and return services return get(service, cp) }总体思路:检查缓存是否过期,过期再判断是否再异步watch服务,然后根据反射请求对应注册中心获取服务并更新到缓存中
// run starts the cache watcher loop
// it creates a new watcher if there's a problem
func (c *cache) run() {
c.Lock()
c.running = true
c.Unlock()
// reset watcher on exit
defer func() {
c.Lock()
c.watched = make(map[string]bool)
c.running = false
c.Unlock()
}()
var a, b int
for {
// exit early if already dead
if c.quit() {
return
}
// jitter before starting
j := rand.Int63n(100)
time.Sleep(time.Duration(j) * time.Millisecond)
// create new watcher
w, err := c.Registry.Watch()
if err != nil {
if c.quit() {
return
}
d := backoff(a)
c.setStatus(err)
if a > 3 {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debug("rcache: ", err, " backing off ", d)
}
a = 0
}
time.Sleep(d)
a++
continue
}
// reset a
a = 0
// watch for events
if err := c.watch(w); err != nil {
if c.quit() {
return
}
d := backoff(b)
c.setStatus(err)
if b > 3 {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debug("rcache: ", err, " backing off ", d)
}
b = 0
}
time.Sleep(d)
b++
continue
}
// reset b
b = 0
}
}
// watch loops the next event and calls update
// it returns if there's an error
func (c *cache) watch(w registry.Watcher) error {
// used to stop the watch
stop := make(chan bool)
// manage this loop
go func() {
defer w.Stop()
select {
// wait for exit
case <-c.exit:
return
// we've been stopped
case <-stop:
return
}
}()
for {
res, err := w.Next()
if err != nil {
close(stop)
return err
}
// reset the error status since we succeeded
if err := c.getStatus(); err != nil {
// reset status
c.setStatus(nil)
}
c.update(res)
}
}
func (c *cache) Stop() {
c.Lock()
defer c.Unlock()
select {
case <-c.exit:
return
default:
close(c.exit)
}
}
watch方法是cache自己watch事件的方法,异步监听是否退出,退出时使用defer来保证Watch已经stop,循环调用Watch的Next方法,这里使用stop这个channel保证异常也能在异步线程中调用Stop方法,这里Watch接口的Stop和Next以Zookeeper为例: