fix(registry.go) protect the peer and proxy map in the registry by lock
parent
6d08976cbe
commit
af87fa40c2
|
@ -45,6 +45,9 @@ func NewRegistry(s store.Store) *Registry {
|
||||||
|
|
||||||
// Peers returns a list of cached peer names.
|
// Peers returns a list of cached peer names.
|
||||||
func (r *Registry) Peers() []string {
|
func (r *Registry) Peers() []string {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
names := make([]string, 0, len(r.peers))
|
names := make([]string, 0, len(r.peers))
|
||||||
for name := range r.peers {
|
for name := range r.peers {
|
||||||
names = append(names, name)
|
names = append(names, name)
|
||||||
|
@ -55,6 +58,9 @@ func (r *Registry) Peers() []string {
|
||||||
|
|
||||||
// Proxies returns a list of cached proxy names.
|
// Proxies returns a list of cached proxy names.
|
||||||
func (r *Registry) Proxies() []string {
|
func (r *Registry) Proxies() []string {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
names := make([]string, 0, len(r.proxies))
|
names := make([]string, 0, len(r.proxies))
|
||||||
for name := range r.proxies {
|
for name := range r.proxies {
|
||||||
names = append(names, name)
|
names = append(names, name)
|
||||||
|
@ -68,6 +74,9 @@ func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) err
|
||||||
if err := r.register(RegistryPeerKey, name, peerURL, machURL); err != nil {
|
if err := r.register(RegistryPeerKey, name, peerURL, machURL); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
r.peers[name] = r.load(RegistryPeerKey, name)
|
r.peers[name] = r.load(RegistryPeerKey, name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -77,14 +86,14 @@ func (r *Registry) RegisterProxy(name string, peerURL string, machURL string) er
|
||||||
if err := r.register(RegistryProxyKey, name, peerURL, machURL); err != nil {
|
if err := r.register(RegistryProxyKey, name, peerURL, machURL); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
r.proxies[name] = r.load(RegistryProxyKey, name)
|
r.proxies[name] = r.load(RegistryProxyKey, name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) register(key, name string, peerURL string, machURL string) error {
|
func (r *Registry) register(key, name string, peerURL string, machURL string) error {
|
||||||
r.Lock()
|
|
||||||
defer r.Unlock()
|
|
||||||
|
|
||||||
// Write data to store.
|
// Write data to store.
|
||||||
v := url.Values{}
|
v := url.Values{}
|
||||||
v.Set("raft", peerURL)
|
v.Set("raft", peerURL)
|
||||||
|
@ -105,9 +114,6 @@ func (r *Registry) UnregisterProxy(name string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) unregister(key, name string) error {
|
func (r *Registry) unregister(key, name string) error {
|
||||||
r.Lock()
|
|
||||||
defer r.Unlock()
|
|
||||||
|
|
||||||
// Remove the key from the store.
|
// Remove the key from the store.
|
||||||
_, err := r.store.Delete(path.Join(key, name), false, false)
|
_, err := r.store.Delete(path.Join(key, name), false, false)
|
||||||
log.Debugf("Unregister: %s", name)
|
log.Debugf("Unregister: %s", name)
|
||||||
|
@ -282,6 +288,9 @@ func (r *Registry) urls(key, leaderName, selfName string, url func(key, name str
|
||||||
|
|
||||||
// Removes a node from the cache.
|
// Removes a node from the cache.
|
||||||
func (r *Registry) Invalidate(name string) {
|
func (r *Registry) Invalidate(name string) {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
delete(r.peers, name)
|
delete(r.peers, name)
|
||||||
delete(r.proxies, name)
|
delete(r.proxies, name)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue