etcdhttp: support capability checking

etcdhttp will check the cluster version and update its
capability version periodically.

Any new handler's after 2.0 needs to wrap by capability handler
to ensure it is not accessable until rolling upgrade finished.
release-2.1
Xiang Li 2015-05-12 17:22:08 -07:00
parent 197437316f
commit d3b1d5c008
3 changed files with 90 additions and 5 deletions

View File

@ -0,0 +1,82 @@
package etcdhttp
import (
"fmt"
"net/http"
"sync"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
)
type capability string
const (
securityCapability capability = "security"
)
var (
// capabilityMap is a static map of version to capability map.
// the base capabilities is the set of capability 2.0 supports.
capabilityMaps = map[string]map[capability]bool{
"2.1.0": {securityCapability: true},
}
enableMapMu sync.Mutex
// enabled points to a map in cpapbilityMaps
enabledMap map[capability]bool
)
// capabilityLoop checks the cluster version every 500ms and updates
// the enabledCapability when the cluster version increased.
// capabilityLoop MUST be ran in a goroutine before checking capability
// or using capabilityHandler.
func capabilityLoop(s *etcdserver.EtcdServer) {
stopped := s.StopNotify()
var pv *semver.Version
for {
if v := s.ClusterVersion(); v != pv {
if pv == nil {
pv = v
} else if v != nil && pv.LessThan(*v) {
pv = v
}
enableMapMu.Lock()
enabledMap = capabilityMaps[pv.String()]
enableMapMu.Unlock()
}
select {
case <-stopped:
return
case <-time.After(500 * time.Millisecond):
}
}
}
func isCapabilityEnabled(c capability) bool {
enableMapMu.Lock()
defer enableMapMu.Unlock()
if enabledMap == nil {
return false
}
return enabledMap[c]
}
func capabilityHandler(c capability, fn func(http.ResponseWriter, *http.Request)) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !isCapabilityEnabled(c) {
notCapable(w, c)
return
}
fn(w, r)
}
}
func notCapable(w http.ResponseWriter, c capability) {
herr := httptypes.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Not capable of accessing %s feature during rolling upgrades.", c))
herr.WriteTo(w)
}

View File

@ -57,6 +57,8 @@ const (
// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
go capabilityLoop(server)
sec := security.NewStore(server, defaultServerTimeout)
kh := &keysHandler{
@ -102,6 +104,7 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
mux.Handle(membersPrefix+"/", mh)
mux.Handle(deprecatedMachinesPrefix, dmh)
handleSecurity(mux, sech)
return mux
}

View File

@ -125,11 +125,11 @@ func writeNoAuth(w http.ResponseWriter) {
}
func handleSecurity(mux *http.ServeMux, sh *securityHandler) {
mux.HandleFunc(securityPrefix+"/roles", sh.baseRoles)
mux.HandleFunc(securityPrefix+"/roles/", sh.handleRoles)
mux.HandleFunc(securityPrefix+"/users", sh.baseUsers)
mux.HandleFunc(securityPrefix+"/users/", sh.handleUsers)
mux.HandleFunc(securityPrefix+"/enable", sh.enableDisable)
mux.HandleFunc(securityPrefix+"/roles", capabilityHandler(securityCapability, sh.baseRoles))
mux.HandleFunc(securityPrefix+"/roles/", capabilityHandler(securityCapability, sh.handleRoles))
mux.HandleFunc(securityPrefix+"/users", capabilityHandler(securityCapability, sh.baseUsers))
mux.HandleFunc(securityPrefix+"/users/", capabilityHandler(securityCapability, sh.handleUsers))
mux.HandleFunc(securityPrefix+"/enable", capabilityHandler(securityCapability, sh.enableDisable))
}
func (sh *securityHandler) baseRoles(w http.ResponseWriter, r *http.Request) {