Merge pull request #1635 from jonboulle/doc

etcdserver: add docstrings for confchanges
release-2.0
Jonathan Boulle 2014-11-07 10:20:05 -08:00
commit 810a5146dd
2 changed files with 14 additions and 8 deletions

View File

@ -259,18 +259,20 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
func (c *Cluster) SetStore(st store.Store) { c.store = st }
// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid.
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
appliedMembers, appliedRemoved := membersFromStore(c.store)
if appliedRemoved[types.ID(cc.NodeID)] {
members, removed := membersFromStore(c.store)
if removed[types.ID(cc.NodeID)] {
return ErrIDRemoved
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
if appliedMembers[types.ID(cc.NodeID)] != nil {
if members[types.ID(cc.NodeID)] != nil {
return ErrIDExists
}
urls := make(map[string]bool)
for _, m := range appliedMembers {
for _, m := range members {
for _, u := range m.PeerURLs {
urls[u] = true
}
@ -285,7 +287,7 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
}
case raftpb.ConfChangeRemoveNode:
if appliedMembers[types.ID(cc.NodeID)] == nil {
if members[types.ID(cc.NodeID)] == nil {
return ErrIDNotFound
}
default:

View File

@ -333,7 +333,6 @@ func (s *EtcdServer) run() {
// TODO(bmizerany): do this in the background, but take
// care to apply entries in a single goroutine, and not
// race them.
// TODO: apply configuration change into ClusterStore.
if len(rd.CommittedEntries) != 0 {
appliedi = s.apply(rd.CommittedEntries)
}
@ -480,8 +479,9 @@ func (s *EtcdServer) Term() uint64 {
return atomic.LoadUint64(&s.raftTerm)
}
// configure sends configuration change through consensus then performs it.
// It will block until the change is performed or there is an error.
// configure sends a configuration change through consensus and
// then waits for it to be applied to the server. It
// will block until the change is performed or there is an error.
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
ch := s.w.Register(cc.ID)
if err := s.node.ProposeConfChange(ctx, cc); err != nil {
@ -567,6 +567,8 @@ func getExpirationTime(r *pb.Request) time.Time {
return t
}
// apply takes an Entry received from Raft (after it has been committed) and
// applies it to the current state of the EtcdServer
func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
var applied uint64
for i := range es {
@ -641,6 +643,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
}
}
// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None