go-micro注册中心源码解析


结构体和接口定义

// Cache is the registry cache interface
type Cache interface {
	// embed the registry interface
	registry.Registry
	// stop the cache watcher
	Stop()
}

看下实现类,核心结果cache存储服务对应注册中心服务实例,watch表示这个服务是否在watch,ttls表示服务的ttl,exit控制是否退出,running表示是否再异步监听执行

type cache struct {
	registry.Registry
	opts Options

	// registry cache
	sync.RWMutex
	cache   map[string][]*registry.Service
	ttls    map[string]time.Time
	watched map[string]bool

	// used to stop the cache
	exit chan bool

	// indicate whether its running
	running bool
	// status of the registry
	// used to hold onto the cache
	// in failure state
	status error
	// used to prevent cache breakdwon
	sg singleflight.Group
}

对外接口有:

  • ```go
    GetService(service string, opts …registry.GetOption) ([]*registry.Service, error)
    Stop()
    New(r registry.Registry, opts …Option) Cache

    
    - getservice的核心方法get
    
      ```go
      unc (c *cache) get(service string) ([]*registry.Service, error) {
      	// read lock
      	c.RLock()
      
      	// check the cache first
      	services := c.cache[service]
      	// get cache ttl
      	ttl := c.ttls[service]
      	// make a copy
      	cp := util.Copy(services)
      
      	// got services && within ttl so return cache
      	if c.isValid(cp, ttl) {
      		c.RUnlock()
      		// return services
      		return cp, nil
      	}
      
      	// get does the actual request for a service and cache it
      	get := func(service string, cached []*registry.Service) ([]*registry.Service, error) {
      		// ask the registry
      		val, err, _ := c.sg.Do(service, func() (interface{}, error) {
      			return c.Registry.GetService(service)
      		})
      		services, _ := val.([]*registry.Service)
      		if err != nil {
      			// check the cache
      			if len(cached) > 0 {
      				// set the error status
      				c.setStatus(err)
      
      				// return the stale cache
      				return cached, nil
      			}
      			// otherwise return error
      			return nil, err
      		}
      
      		// reset the status
      		if err := c.getStatus(); err != nil {
      			c.setStatus(nil)
      		}
      
      		// cache results
      		c.Lock()
      		c.set(service, util.Copy(services))
      		c.Unlock()
      
      		return services, nil
      	}
      
      	// watch service if not watched
      	_, ok := c.watched[service]
      
      	// unlock the read lock
      	c.RUnlock()
      
      	// check if its being watched
      	if !ok {
      		c.Lock()
      
      		// set to watched
      		c.watched[service] = true
      
      		// only kick it off if not running
      		if !c.running {
      			go c.run()
      		}
      
      		c.Unlock()
      	}
      
      	// get and return services
      	return get(service, cp)
      }

    总体思路:检查缓存是否过期,过期再判断是否再异步watch服务,然后根据反射请求对应注册中心获取服务并更新到缓存中

// run starts the cache watcher loop
// it creates a new watcher if there's a problem
func (c *cache) run() {
	c.Lock()
	c.running = true
	c.Unlock()

	// reset watcher on exit
	defer func() {
		c.Lock()
		c.watched = make(map[string]bool)
		c.running = false
		c.Unlock()
	}()

	var a, b int

	for {
		// exit early if already dead
		if c.quit() {
			return
		}

		// jitter before starting
		j := rand.Int63n(100)
		time.Sleep(time.Duration(j) * time.Millisecond)

		// create new watcher
		w, err := c.Registry.Watch()
		if err != nil {
			if c.quit() {
				return
			}

			d := backoff(a)
			c.setStatus(err)

			if a > 3 {
				if logger.V(logger.DebugLevel, logger.DefaultLogger) {
					logger.Debug("rcache: ", err, " backing off ", d)
				}
				a = 0
			}

			time.Sleep(d)
			a++

			continue
		}

		// reset a
		a = 0

		// watch for events
		if err := c.watch(w); err != nil {
			if c.quit() {
				return
			}

			d := backoff(b)
			c.setStatus(err)

			if b > 3 {
				if logger.V(logger.DebugLevel, logger.DefaultLogger) {
					logger.Debug("rcache: ", err, " backing off ", d)
				}
				b = 0
			}

			time.Sleep(d)
			b++

			continue
		}

		// reset b
		b = 0
	}
}

// watch loops the next event and calls update
// it returns if there's an error
func (c *cache) watch(w registry.Watcher) error {
	// used to stop the watch
	stop := make(chan bool)

	// manage this loop
	go func() {
		defer w.Stop()

		select {
		// wait for exit
		case <-c.exit:
			return
		// we've been stopped
		case <-stop:
			return
		}
	}()

	for {
		res, err := w.Next()
		if err != nil {
			close(stop)
			return err
		}

		// reset the error status since we succeeded
		if err := c.getStatus(); err != nil {
			// reset status
			c.setStatus(nil)
		}

		c.update(res)
	}
}


func (c *cache) Stop() {
	c.Lock()
	defer c.Unlock()

	select {
	case <-c.exit:
		return
	default:
		close(c.exit)
	}
}

watch方法是cache自己watch事件的方法,异步监听是否退出,退出时使用defer来保证Watch已经stop,循环调用Watch的Next方法,这里使用stop这个channel保证异常也能在异步线程中调用Stop方法,这里Watch接口的Stop和Next以Zookeeper为例:


文章作者: li5ch
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 li5ch !
  目录