From ff6705b94b7a47e4f6948e9b35b5e21e0123cc92 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 19 Sep 2014 12:35:56 -0700 Subject: [PATCH 01/11] raft: add Configure, AddNode, RemoveNode Configure is used to propose config change. AddNode and RemoveNode is used to apply cluster change to raft state machine. They are the basics for dynamic configuration. --- raft/doc.go | 11 +++++ raft/log.go | 5 ++ raft/node.go | 54 ++++++++++++++++++++- raft/raft.go | 30 ++++++++++++ raft/raft_test.go | 105 +++++++++++++++++++++++++++++++++++++++++ raft/raftpb/raft.pb.go | 20 ++++++++ raft/raftpb/raft.proto | 1 + 7 files changed, 225 insertions(+), 1 deletion(-) diff --git a/raft/doc.go b/raft/doc.go index d9e3d2a3b..53a42f1f7 100644 --- a/raft/doc.go +++ b/raft/doc.go @@ -61,5 +61,16 @@ data, serialize it into a byte slice and call: n.Propose(ctx, data) +To add or remove node in a cluster, serialize the data for configuration change +into a byte slice and call: + + n.Configure(ctx, data) + +For the safety consideration, one configuration should include at most one node +change, which is applied through: + + n.AddNode(id) + n.RemoveNode(id) + */ package raft diff --git a/raft/log.go b/raft/log.go index 562afae9f..b17dba8df 100644 --- a/raft/log.go +++ b/raft/log.go @@ -10,6 +10,11 @@ const ( defaultCompactThreshold = 10000 ) +const ( + EntryNormal int64 = iota + EntryConfig +) + type raftLog struct { ents []pb.Entry unstable int64 diff --git a/raft/node.go b/raft/node.go index d93c1292e..af0e98a0c 100644 --- a/raft/node.go +++ b/raft/node.go @@ -76,10 +76,12 @@ type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election // timeouts and heartbeat timeouts are in units of ticks. Tick() - // Campaign causes the Node to transition to candidate state and start campaigning to become leader + // Campaign causes the Node to transition to candidate state and start campaigning to become leader. Campaign(ctx context.Context) error // Propose proposes that data be appended to the log. Propose(ctx context.Context, data []byte) error + // Configure proposes config change. Only one config can be in the process of going through consensus at a time. + Configure(ctx context.Context, data []byte) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state @@ -88,6 +90,12 @@ type Node interface { Stop() // Compact Compact(d []byte) + // AddNode adds a node with given id into peer list. + // TODO: reject existed node + AddNode(id int64) + // RemoveNode removes a node with give id from peer list. + // TODO: reject unexisted node + RemoveNode(id int64) } // StartNode returns a new Node given a unique raft id, a list of raft peers, and @@ -114,11 +122,22 @@ func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb. return &n } +const ( + confAdd = iota + confRemove +) + +type conf struct { + typ int + id int64 +} + // node is the canonical implementation of the Node interface type node struct { propc chan pb.Message recvc chan pb.Message compactc chan []byte + confc chan conf readyc chan Ready tickc chan struct{} done chan struct{} @@ -129,6 +148,7 @@ func newNode() node { propc: make(chan pb.Message), recvc: make(chan pb.Message), compactc: make(chan []byte), + confc: make(chan conf), readyc: make(chan Ready), tickc: make(chan struct{}), done: make(chan struct{}), @@ -167,6 +187,7 @@ func (n *node) run(r *raft) { } select { + // TODO: buffer the config propose if there exists one case m := <-propc: m.From = r.id r.Step(m) @@ -174,6 +195,15 @@ func (n *node) run(r *raft) { r.Step(m) // raft never returns an error case d := <-n.compactc: r.compact(d) + case c := <-n.confc: + switch c.typ { + case confAdd: + r.addNode(c.id) + case confRemove: + r.removeNode(c.id) + default: + panic("unexpected conf type") + } case <-n.tickc: r.tick() case readyc <- rd: @@ -186,6 +216,10 @@ func (n *node) run(r *raft) { if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Index } + // TODO(yichengq): we assume that all committed config + // entries will be applied to make things easy for now. + // TODO(yichengq): it may have race because applied is set + // before entries are applied. r.raftLog.resetNextEnts() r.raftLog.resetUnstable() r.msgs = nil @@ -212,6 +246,10 @@ func (n *node) Propose(ctx context.Context, data []byte) error { return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}}) } +func (n *node) Configure(ctx context.Context, data []byte) error { + return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig, Data: data}}}) +} + // Step advances the state machine using msgs. The ctx.Err() will be returned, // if any. func (n *node) Step(ctx context.Context, m pb.Message) error { @@ -241,6 +279,20 @@ func (n *node) Compact(d []byte) { } } +func (n *node) AddNode(id int64) { + select { + case n.confc <- conf{typ: confAdd, id: id}: + case <-n.done: + } +} + +func (n *node) RemoveNode(id int64) { + select { + case n.confc <- conf{typ: confRemove, id: id}: + case <-n.done: + } +} + func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi int64) Ready { rd := Ready{ Entries: r.raftLog.unstableEnts(), diff --git a/raft/raft.go b/raft/raft.go index 1097bb36f..988a79ace 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -105,6 +105,10 @@ type raft struct { // the leader id lead int64 + // pending configuration + // New configuration is ignored if there exists configuration unapplied. + pendingConf bool + elapsed int // number of ticks since the last msg heartbeatTimeout int electionTimeout int @@ -245,6 +249,7 @@ func (r *raft) reset(term int64) { r.prs[i].match = r.raftLog.lastIndex() } } + r.pendingConf = false } func (r *raft) q() int { @@ -308,6 +313,15 @@ func (r *raft) becomeLeader() { r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader + for _, e := range r.raftLog.entries(r.raftLog.committed + 1) { + if e.Type != EntryConfig { + continue + } + if r.pendingConf { + panic("unexpected double uncommitted config entry") + } + r.pendingConf = true + } r.appendEntry(pb.Entry{Data: nil}) } @@ -373,6 +387,16 @@ func (r *raft) handleSnapshot(m pb.Message) { } } +func (r *raft) addNode(id int64) { + r.setProgress(id, 0, r.raftLog.lastIndex()+1) + r.pendingConf = false +} + +func (r *raft) removeNode(id int64) { + r.delProgress(id) + r.pendingConf = false +} + type stepFunc func(r *raft, m pb.Message) func stepLeader(r *raft, m pb.Message) { @@ -384,6 +408,12 @@ func stepLeader(r *raft, m pb.Message) { panic("unexpected length(entries) of a msgProp") } e := m.Entries[0] + if e.Type == EntryConfig { + if r.pendingConf { + return + } + r.pendingConf = true + } r.appendEntry(e) r.bcastAppend() case msgAppResp: diff --git a/raft/raft_test.go b/raft/raft_test.go index 24c15e622..37274f683 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -948,6 +948,111 @@ func TestSlowNodeRestore(t *testing.T) { } } +// TestStepConfig tests that when raft step msgProp in ConfigEntry type, +// it appends the entry to log and sets pendingConf to be true. +func TestStepConfig(t *testing.T) { + // a raft that cannot make progress + r := newRaft(1, []int64{1, 2}, 0, 0) + r.becomeCandidate() + r.becomeLeader() + index := r.raftLog.lastIndex() + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}}) + if g := r.raftLog.lastIndex(); g != index+1 { + t.Errorf("index = %d, want %d", g, index+1) + } + if r.pendingConf != true { + t.Errorf("pendingConf = %v, want true", r.pendingConf) + } +} + +// TestStepIgnoreConfig tests that if raft step the second msgProp in +// ConfigEntry type when the first one is uncommitted, the node will deny +// the proposal and keep its original state. +func TestStepIgnoreConfig(t *testing.T) { + // a raft that cannot make progress + r := newRaft(1, []int64{1, 2}, 0, 0) + r.becomeCandidate() + r.becomeLeader() + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}}) + index := r.raftLog.lastIndex() + pendingConf := r.pendingConf + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}}) + if g := r.raftLog.lastIndex(); g != index { + t.Errorf("index = %d, want %d", g, index) + } + if r.pendingConf != pendingConf { + t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf) + } +} + +// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag +// based on uncommitted entries. +func TestRecoverPendingConfig(t *testing.T) { + tests := []struct { + entType int64 + wpending bool + }{ + {EntryNormal, false}, + {EntryConfig, true}, + } + for i, tt := range tests { + r := newRaft(1, []int64{1, 2}, 0, 0) + r.appendEntry(pb.Entry{Type: tt.entType}) + r.becomeCandidate() + r.becomeLeader() + if r.pendingConf != tt.wpending { + t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending) + } + } +} + +// TestRecoverDoublePendingConfig tests that new leader will panic if +// there exist two uncommitted config entries. +func TestRecoverDoublePendingConfig(t *testing.T) { + func() { + defer func() { + if err := recover(); err == nil { + t.Errorf("expect panic, but nothing happens") + } + }() + r := newRaft(1, []int64{1, 2}, 0, 0) + r.appendEntry(pb.Entry{Type: EntryConfig}) + r.appendEntry(pb.Entry{Type: EntryConfig}) + r.becomeCandidate() + r.becomeLeader() + }() +} + +// TestAddNode tests that addNode could update pendingConf and peer list correctly. +func TestAddNode(t *testing.T) { + r := newRaft(1, []int64{1}, 0, 0) + r.pendingConf = true + r.addNode(2) + if r.pendingConf != false { + t.Errorf("pendingConf = %v, want false", r.pendingConf) + } + nodes := r.nodes() + sort.Sort(int64Slice(nodes)) + wnodes := []int64{1, 2} + if !reflect.DeepEqual(nodes, wnodes) { + t.Errorf("nodes = %v, want %v", nodes, wnodes) + } +} + +// TestRemoveNode tests that removeNode could update pendingConf and peer list correctly. +func TestRemoveNode(t *testing.T) { + r := newRaft(1, []int64{1, 2}, 0, 0) + r.pendingConf = true + r.removeNode(2) + if r.pendingConf != false { + t.Errorf("pendingConf = %v, want false", r.pendingConf) + } + w := []int64{1} + if g := r.nodes(); !reflect.DeepEqual(g, w) { + t.Errorf("nodes = %v, want %v", g, w) + } +} + func ents(terms ...int64) *raft { ents := []pb.Entry{{}} for _, term := range terms { diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 148c302e2..1f5808f21 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -41,6 +41,7 @@ func (m *Info) String() string { return proto.CompactTextString(m) } func (*Info) ProtoMessage() {} type Entry struct { + Type int64 `protobuf:"varint,1,req,name=type" json:"type"` Term int64 `protobuf:"varint,2,req,name=term" json:"term"` Index int64 `protobuf:"varint,3,req,name=index" json:"index"` Data []byte `protobuf:"bytes,4,opt,name=data" json:"data"` @@ -169,6 +170,21 @@ func (m *Entry) Unmarshal(data []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Type |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } case 2: if wireType != 0 { return code_google_com_p_gogoprotobuf_proto.ErrWrongType @@ -648,6 +664,7 @@ func (m *Info) Size() (n int) { func (m *Entry) Size() (n int) { var l int _ = l + n += 1 + sovRaft(uint64(m.Type)) n += 1 + sovRaft(uint64(m.Term)) n += 1 + sovRaft(uint64(m.Index)) l = len(m.Data) @@ -760,6 +777,9 @@ func (m *Entry) MarshalTo(data []byte) (n int, err error) { _ = i var l int _ = l + data[i] = 0x8 + i++ + i = encodeVarintRaft(data, i, uint64(m.Type)) data[i] = 0x10 i++ i = encodeVarintRaft(data, i, uint64(m.Term)) diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index d6bf94ab0..f5249c1a5 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -12,6 +12,7 @@ message Info { } message Entry { + required int64 type = 1 [(gogoproto.nullable) = false]; required int64 term = 2 [(gogoproto.nullable) = false]; required int64 index = 3 [(gogoproto.nullable) = false]; optional bytes data = 4 [(gogoproto.nullable) = false]; From aaffb9eb78cd6cd75947bf57ba397abfa3932964 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 19 Sep 2014 12:36:19 -0700 Subject: [PATCH 02/11] etcdserver: add AddNode, RemoveNode AddNode and RemoveNode is used to propose config change to the cluster. If succeeds, it will add/remove node from the cluster. --- etcdserver/etcdserverpb/etcdserver.pb.go | 168 +++++++++++++++++++++++ etcdserver/etcdserverpb/etcdserver.proto | 7 + etcdserver/server.go | 81 ++++++++++- etcdserver/server_test.go | 81 ++++++++++- 4 files changed, 325 insertions(+), 12 deletions(-) diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index a4176a31f..c299e30b3 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -10,6 +10,7 @@ It has these top-level messages: Request + Config */ package etcdserverpb @@ -50,6 +51,18 @@ func (m *Request) Reset() { *m = Request{} } func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} +type Config struct { + Id int64 `protobuf:"varint,1,req,name=id" json:"id"` + Type int64 `protobuf:"varint,2,req,name=type" json:"type"` + NodeID int64 `protobuf:"varint,3,req,name=nodeID" json:"nodeID"` + Context []byte `protobuf:"bytes,4,opt,name=context" json:"context"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Config) Reset() { *m = Config{} } +func (m *Config) String() string { return proto.CompactTextString(m) } +func (*Config) ProtoMessage() {} + func init() { } func (m *Request) Unmarshal(data []byte) error { @@ -360,6 +373,115 @@ func (m *Request) Unmarshal(data []byte) error { } return nil } +func (m *Config) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Id |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Type |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.NodeID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Context = append(m.Context, data[index:postIndex]...) + index = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} func (m *Request) Size() (n int) { var l int _ = l @@ -389,6 +511,19 @@ func (m *Request) Size() (n int) { } return n } +func (m *Config) Size() (n int) { + var l int + _ = l + n += 1 + sovEtcdserver(uint64(m.Id)) + n += 1 + sovEtcdserver(uint64(m.Type)) + n += 1 + sovEtcdserver(uint64(m.NodeID)) + l = len(m.Context) + n += 1 + l + sovEtcdserver(uint64(l)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} func sovEtcdserver(x uint64) (n int) { for { @@ -504,6 +639,39 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) { } return i, nil } +func (m *Config) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Config) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintEtcdserver(data, i, uint64(m.Id)) + data[i] = 0x10 + i++ + i = encodeVarintEtcdserver(data, i, uint64(m.Type)) + data[i] = 0x18 + i++ + i = encodeVarintEtcdserver(data, i, uint64(m.NodeID)) + data[i] = 0x22 + i++ + i = encodeVarintEtcdserver(data, i, uint64(len(m.Context))) + i += copy(data[i:], m.Context) + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index bb34ac250..10ba4cbf4 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -24,3 +24,10 @@ message Request { required bool quorum = 14 [(gogoproto.nullable) = false]; required int64 time = 15 [(gogoproto.nullable) = false]; } + +message Config { + required int64 id = 1 [(gogoproto.nullable) = false]; + required int64 type = 2 [(gogoproto.nullable) = false]; + required int64 nodeID = 3 [(gogoproto.nullable) = false]; + optional bytes context = 4 [(gogoproto.nullable) = false]; +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 905213acd..8a864954f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -19,6 +19,11 @@ const ( DefaultSnapCount = 10000 ) +const ( + configAddNode int64 = iota + configRemoveNode +) + var ( ErrUnknownMethod = errors.New("etcdserver: unknown method") ErrStopped = errors.New("etcdserver: server stopped") @@ -121,11 +126,23 @@ func (s *EtcdServer) run() { // care to apply entries in a single goroutine, and not // race them. for _, e := range rd.CommittedEntries { - var r pb.Request - if err := r.Unmarshal(e.Data); err != nil { - panic("TODO: this is bad, what do we do about it?") + switch e.Type { + case raft.EntryNormal: + var r pb.Request + if err := r.Unmarshal(e.Data); err != nil { + panic("TODO: this is bad, what do we do about it?") + } + s.w.Trigger(r.Id, s.applyRequest(r)) + case raft.EntryConfig: + var c pb.Config + if err := c.Unmarshal(e.Data); err != nil { + panic("TODO: this is bad, what do we do about it?") + } + s.applyConfig(c) + s.w.Trigger(c.Id, nil) + default: + panic("unsupported entry type") } - s.w.Trigger(r.Id, s.apply(r)) appliedi = e.Index } @@ -218,6 +235,46 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } } +func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error { + req := pb.Config{ + Id: GenID(), + Type: configAddNode, + NodeID: id, + Context: context, + } + return s.configure(ctx, req) +} + +func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { + req := pb.Config{ + Id: GenID(), + Type: configRemoveNode, + NodeID: id, + } + return s.configure(ctx, req) +} + +// configure sends configuration change through consensus then performs it. +// It will block until the change is performed or there is an error. +func (s *EtcdServer) configure(ctx context.Context, r pb.Config) error { + data, err := r.Marshal() + if err != nil { + log.Printf("marshal request %#v error: %v", r, err) + return err + } + ch := s.w.Register(r.Id) + s.Node.Configure(ctx, data) + select { + case <-ch: + return nil + case <-ctx.Done(): + s.w.Trigger(r.Id, nil) // GC wait + return ctx.Err() + case <-s.done: + return ErrStopped + } +} + // sync proposes a SYNC request and is non-blocking. // This makes no guarantee that the request will be proposed or performed. // The request will be cancelled after the given timeout. @@ -249,8 +306,8 @@ func getExpirationTime(r *pb.Request) time.Time { return t } -// apply interprets r as a call to store.X and returns an Response interpreted from store.Event -func (s *EtcdServer) apply(r pb.Request) Response { +// applyRequest interprets r as a call to store.X and returns an Response interpreted from store.Event +func (s *EtcdServer) applyRequest(r pb.Request) Response { f := func(ev *store.Event, err error) Response { return Response{Event: ev, err: err} } @@ -290,6 +347,18 @@ func (s *EtcdServer) apply(r pb.Request) Response { } } +func (s *EtcdServer) applyConfig(r pb.Config) { + switch r.Type { + case configAddNode: + s.Node.AddNode(r.NodeID) + case configRemoveNode: + s.Node.RemoveNode(r.NodeID) + default: + // This should never be reached + panic("unsupported config type") + } +} + // TODO: non-blocking snapshot func (s *EtcdServer) snapshot() { d, err := s.Store.Save() diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index ee3039a48..ba48434f4 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -120,7 +120,7 @@ func TestDoBadLocalAction(t *testing.T) { } } -func TestApply(t *testing.T) { +func TestApplyRequest(t *testing.T) { tests := []struct { req pb.Request @@ -188,7 +188,7 @@ func TestApply(t *testing.T) { for i, tt := range tests { st := &storeRecorder{} srv := &EtcdServer{Store: st} - resp := srv.apply(tt.req) + resp := srv.applyRequest(tt.req) if !reflect.DeepEqual(resp, tt.wresp) { t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp) @@ -594,6 +594,46 @@ func TestRecvSlowSnapshot(t *testing.T) { } } +// TestAddNode tests AddNode could propose configuration and add node to raft. +func TestAddNode(t *testing.T) { + n := newNodeCommitterRecorder() + s := &EtcdServer{ + Node: n, + Store: &storeRecorder{}, + Send: func(_ []raftpb.Message) {}, + Storage: &storageRecorder{}, + } + s.Start() + s.AddNode(context.TODO(), 1, []byte("foo")) + action := n.Action() + s.Stop() + + waction := []string{"Configure", "AddNode"} + if !reflect.DeepEqual(action, waction) { + t.Errorf("action = %v, want %v", action, waction) + } +} + +// TestRemoveNode tests RemoveNode could propose configuration and remove node from raft. +func TestRemoveNode(t *testing.T) { + n := newNodeCommitterRecorder() + s := &EtcdServer{ + Node: n, + Store: &storeRecorder{}, + Send: func(_ []raftpb.Message) {}, + Storage: &storageRecorder{}, + } + s.Start() + s.RemoveNode(context.TODO(), 1) + action := n.Action() + s.Stop() + + waction := []string{"Configure", "RemoveNode"} + if !reflect.DeepEqual(action, waction) { + t.Errorf("action = %v, want %v", action, waction) + } +} + // TODO: test wait trigger correctness in multi-server case func TestGetBool(t *testing.T) { @@ -788,20 +828,27 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error { n.record("Propose") return nil } +func (n *nodeRecorder) Configure(ctx context.Context, data []byte) error { + n.record("Configure") + return nil +} func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error { n.record("Step") return nil } -func (n *nodeRecorder) Ready() <-chan raft.Ready { - n.record("Ready") - return nil -} +func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil } func (n *nodeRecorder) Stop() { n.record("Stop") } func (n *nodeRecorder) Compact(d []byte) { n.record("Compact") } +func (n *nodeRecorder) AddNode(id int64) { + n.record("AddNode") +} +func (n *nodeRecorder) RemoveNode(id int64) { + n.record("RemoveNode") +} type nodeProposeDataRecorder struct { nodeRecorder @@ -832,3 +879,25 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) n.record("Propose blocked") return nil } + +type nodeCommitterRecorder struct { + nodeRecorder + readyc chan raft.Ready +} + +func newNodeCommitterRecorder() *nodeCommitterRecorder { + readyc := make(chan raft.Ready, 1) + readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}} + return &nodeCommitterRecorder{readyc: readyc} +} +func (n *nodeCommitterRecorder) Propose(ctx context.Context, data []byte) error { + n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Data: data}}} + return n.nodeRecorder.Propose(ctx, data) +} +func (n *nodeCommitterRecorder) Configure(ctx context.Context, data []byte) error { + n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raft.EntryConfig, Data: data}}} + return n.nodeRecorder.Configure(ctx, data) +} +func (n *nodeCommitterRecorder) Ready() <-chan raft.Ready { + return n.readyc +} From abdb2cad152469e6d994fee34fcf727bdf88eef4 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sat, 20 Sep 2014 18:56:40 -0700 Subject: [PATCH 03/11] etcdserver: Config.Id -> Config.ID --- etcdserver/etcdserverpb/etcdserver.pb.go | 14 +++++++------- etcdserver/etcdserverpb/etcdserver.proto | 8 ++++---- etcdserver/server.go | 10 +++++----- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index c299e30b3..07179d5e4 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -52,10 +52,10 @@ func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} type Config struct { - Id int64 `protobuf:"varint,1,req,name=id" json:"id"` - Type int64 `protobuf:"varint,2,req,name=type" json:"type"` - NodeID int64 `protobuf:"varint,3,req,name=nodeID" json:"nodeID"` - Context []byte `protobuf:"bytes,4,opt,name=context" json:"context"` + ID int64 `protobuf:"varint,1,req" json:"ID"` + Type int64 `protobuf:"varint,2,req" json:"Type"` + NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` + Context []byte `protobuf:"bytes,4,opt" json:"Context"` XXX_unrecognized []byte `json:"-"` } @@ -402,7 +402,7 @@ func (m *Config) Unmarshal(data []byte) error { } b := data[index] index++ - m.Id |= (int64(b) & 0x7F) << shift + m.ID |= (int64(b) & 0x7F) << shift if b < 0x80 { break } @@ -514,7 +514,7 @@ func (m *Request) Size() (n int) { func (m *Config) Size() (n int) { var l int _ = l - n += 1 + sovEtcdserver(uint64(m.Id)) + n += 1 + sovEtcdserver(uint64(m.ID)) n += 1 + sovEtcdserver(uint64(m.Type)) n += 1 + sovEtcdserver(uint64(m.NodeID)) l = len(m.Context) @@ -656,7 +656,7 @@ func (m *Config) MarshalTo(data []byte) (n int, err error) { _ = l data[i] = 0x8 i++ - i = encodeVarintEtcdserver(data, i, uint64(m.Id)) + i = encodeVarintEtcdserver(data, i, uint64(m.ID)) data[i] = 0x10 i++ i = encodeVarintEtcdserver(data, i, uint64(m.Type)) diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index 10ba4cbf4..24b5996cd 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -26,8 +26,8 @@ message Request { } message Config { - required int64 id = 1 [(gogoproto.nullable) = false]; - required int64 type = 2 [(gogoproto.nullable) = false]; - required int64 nodeID = 3 [(gogoproto.nullable) = false]; - optional bytes context = 4 [(gogoproto.nullable) = false]; + required int64 ID = 1 [(gogoproto.nullable) = false]; + required int64 Type = 2 [(gogoproto.nullable) = false]; + required int64 NodeID = 3 [(gogoproto.nullable) = false]; + optional bytes Context = 4 [(gogoproto.nullable) = false]; } diff --git a/etcdserver/server.go b/etcdserver/server.go index 8a864954f..3e99b1007 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -139,7 +139,7 @@ func (s *EtcdServer) run() { panic("TODO: this is bad, what do we do about it?") } s.applyConfig(c) - s.w.Trigger(c.Id, nil) + s.w.Trigger(c.ID, nil) default: panic("unsupported entry type") } @@ -237,7 +237,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error { req := pb.Config{ - Id: GenID(), + ID: GenID(), Type: configAddNode, NodeID: id, Context: context, @@ -247,7 +247,7 @@ func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) erro func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { req := pb.Config{ - Id: GenID(), + ID: GenID(), Type: configRemoveNode, NodeID: id, } @@ -262,13 +262,13 @@ func (s *EtcdServer) configure(ctx context.Context, r pb.Config) error { log.Printf("marshal request %#v error: %v", r, err) return err } - ch := s.w.Register(r.Id) + ch := s.w.Register(r.ID) s.Node.Configure(ctx, data) select { case <-ch: return nil case <-ctx.Done(): - s.w.Trigger(r.Id, nil) // GC wait + s.w.Trigger(r.ID, nil) // GC wait return ctx.Err() case <-s.done: return ErrStopped From b801f1affe5c7917a47bbd31004b6958e35e86d3 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sat, 20 Sep 2014 19:32:21 -0700 Subject: [PATCH 04/11] raft: refine comment for raft.pendingConf --- raft/raft.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 988a79ace..63d1e7de2 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -105,8 +105,7 @@ type raft struct { // the leader id lead int64 - // pending configuration - // New configuration is ignored if there exists configuration unapplied. + // New configuration is ignored if there exists unapplied configuration. pendingConf bool elapsed int // number of ticks since the last msg From b82d70871f21b1f9c5f40871b12f0242bcdc6a9b Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sat, 20 Sep 2014 22:48:17 -0700 Subject: [PATCH 05/11] raft: use EntryType in protobuf --- etcdserver/server.go | 4 ++-- etcdserver/server_test.go | 2 +- raft/log.go | 5 ----- raft/node.go | 2 +- raft/raft.go | 4 ++-- raft/raft_test.go | 16 +++++++------- raft/raftpb/raft.pb.go | 46 ++++++++++++++++++++++++++++++++++----- raft/raftpb/raft.proto | 14 ++++++++---- 8 files changed, 64 insertions(+), 29 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 3e99b1007..7a45a9b38 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -127,13 +127,13 @@ func (s *EtcdServer) run() { // race them. for _, e := range rd.CommittedEntries { switch e.Type { - case raft.EntryNormal: + case raftpb.EntryNormal: var r pb.Request if err := r.Unmarshal(e.Data); err != nil { panic("TODO: this is bad, what do we do about it?") } s.w.Trigger(r.Id, s.applyRequest(r)) - case raft.EntryConfig: + case raftpb.EntryConfig: var c pb.Config if err := c.Unmarshal(e.Data); err != nil { panic("TODO: this is bad, what do we do about it?") diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index ba48434f4..87845559c 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -895,7 +895,7 @@ func (n *nodeCommitterRecorder) Propose(ctx context.Context, data []byte) error return n.nodeRecorder.Propose(ctx, data) } func (n *nodeCommitterRecorder) Configure(ctx context.Context, data []byte) error { - n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raft.EntryConfig, Data: data}}} + n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfig, Data: data}}} return n.nodeRecorder.Configure(ctx, data) } func (n *nodeCommitterRecorder) Ready() <-chan raft.Ready { diff --git a/raft/log.go b/raft/log.go index b17dba8df..562afae9f 100644 --- a/raft/log.go +++ b/raft/log.go @@ -10,11 +10,6 @@ const ( defaultCompactThreshold = 10000 ) -const ( - EntryNormal int64 = iota - EntryConfig -) - type raftLog struct { ents []pb.Entry unstable int64 diff --git a/raft/node.go b/raft/node.go index af0e98a0c..43c55b8f1 100644 --- a/raft/node.go +++ b/raft/node.go @@ -247,7 +247,7 @@ func (n *node) Propose(ctx context.Context, data []byte) error { } func (n *node) Configure(ctx context.Context, data []byte) error { - return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig, Data: data}}}) + return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig, Data: data}}}) } // Step advances the state machine using msgs. The ctx.Err() will be returned, diff --git a/raft/raft.go b/raft/raft.go index 63d1e7de2..ddd9eb1e7 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -313,7 +313,7 @@ func (r *raft) becomeLeader() { r.lead = r.id r.state = StateLeader for _, e := range r.raftLog.entries(r.raftLog.committed + 1) { - if e.Type != EntryConfig { + if e.Type != pb.EntryConfig { continue } if r.pendingConf { @@ -407,7 +407,7 @@ func stepLeader(r *raft, m pb.Message) { panic("unexpected length(entries) of a msgProp") } e := m.Entries[0] - if e.Type == EntryConfig { + if e.Type == pb.EntryConfig { if r.pendingConf { return } diff --git a/raft/raft_test.go b/raft/raft_test.go index 37274f683..1d56c4ff8 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -956,7 +956,7 @@ func TestStepConfig(t *testing.T) { r.becomeCandidate() r.becomeLeader() index := r.raftLog.lastIndex() - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}}) + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}}) if g := r.raftLog.lastIndex(); g != index+1 { t.Errorf("index = %d, want %d", g, index+1) } @@ -973,10 +973,10 @@ func TestStepIgnoreConfig(t *testing.T) { r := newRaft(1, []int64{1, 2}, 0, 0) r.becomeCandidate() r.becomeLeader() - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}}) + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}}) index := r.raftLog.lastIndex() pendingConf := r.pendingConf - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}}) + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}}) if g := r.raftLog.lastIndex(); g != index { t.Errorf("index = %d, want %d", g, index) } @@ -989,11 +989,11 @@ func TestStepIgnoreConfig(t *testing.T) { // based on uncommitted entries. func TestRecoverPendingConfig(t *testing.T) { tests := []struct { - entType int64 + entType pb.EntryType wpending bool }{ - {EntryNormal, false}, - {EntryConfig, true}, + {pb.EntryNormal, false}, + {pb.EntryConfig, true}, } for i, tt := range tests { r := newRaft(1, []int64{1, 2}, 0, 0) @@ -1016,8 +1016,8 @@ func TestRecoverDoublePendingConfig(t *testing.T) { } }() r := newRaft(1, []int64{1, 2}, 0, 0) - r.appendEntry(pb.Entry{Type: EntryConfig}) - r.appendEntry(pb.Entry{Type: EntryConfig}) + r.appendEntry(pb.Entry{Type: pb.EntryConfig}) + r.appendEntry(pb.Entry{Type: pb.EntryConfig}) r.becomeCandidate() r.becomeLeader() }() diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 1f5808f21..c12a97417 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -31,6 +31,39 @@ var _ = proto.Marshal var _ = &json.SyntaxError{} var _ = math.Inf +type EntryType int32 + +const ( + EntryNormal EntryType = 0 + EntryConfig EntryType = 1 +) + +var EntryType_name = map[int32]string{ + 0: "EntryNormal", + 1: "EntryConfig", +} +var EntryType_value = map[string]int32{ + "EntryNormal": 0, + "EntryConfig": 1, +} + +func (x EntryType) Enum() *EntryType { + p := new(EntryType) + *p = x + return p +} +func (x EntryType) String() string { + return proto.EnumName(EntryType_name, int32(x)) +} +func (x *EntryType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(EntryType_value, data, "EntryType") + if err != nil { + return err + } + *x = EntryType(value) + return nil +} + type Info struct { Id int64 `protobuf:"varint,1,req,name=id" json:"id"` XXX_unrecognized []byte `json:"-"` @@ -41,11 +74,11 @@ func (m *Info) String() string { return proto.CompactTextString(m) } func (*Info) ProtoMessage() {} type Entry struct { - Type int64 `protobuf:"varint,1,req,name=type" json:"type"` - Term int64 `protobuf:"varint,2,req,name=term" json:"term"` - Index int64 `protobuf:"varint,3,req,name=index" json:"index"` - Data []byte `protobuf:"bytes,4,opt,name=data" json:"data"` - XXX_unrecognized []byte `json:"-"` + Type EntryType `protobuf:"varint,1,req,enum=raftpb.EntryType" json:"Type"` + Term int64 `protobuf:"varint,2,req" json:"Term"` + Index int64 `protobuf:"varint,3,req" json:"Index"` + Data []byte `protobuf:"bytes,4,opt" json:"Data"` + XXX_unrecognized []byte `json:"-"` } func (m *Entry) Reset() { *m = Entry{} } @@ -93,6 +126,7 @@ func (m *HardState) String() string { return proto.CompactTextString(m) } func (*HardState) ProtoMessage() {} func init() { + proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value) } func (m *Info) Unmarshal(data []byte) error { l := len(data) @@ -180,7 +214,7 @@ func (m *Entry) Unmarshal(data []byte) error { } b := data[index] index++ - m.Type |= (int64(b) & 0x7F) << shift + m.Type |= (EntryType(b) & 0x7F) << shift if b < 0x80 { break } diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index f5249c1a5..6eff384ca 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -6,16 +6,22 @@ option (gogoproto.marshaler_all) = true; option (gogoproto.sizer_all) = true; option (gogoproto.unmarshaler_all) = true; option (gogoproto.goproto_getters_all) = false; +option (gogoproto.goproto_enum_prefix_all) = false; message Info { required int64 id = 1 [(gogoproto.nullable) = false]; } +enum EntryType { + EntryNormal = 0; + EntryConfig = 1; +} + message Entry { - required int64 type = 1 [(gogoproto.nullable) = false]; - required int64 term = 2 [(gogoproto.nullable) = false]; - required int64 index = 3 [(gogoproto.nullable) = false]; - optional bytes data = 4 [(gogoproto.nullable) = false]; + required EntryType Type = 1 [(gogoproto.nullable) = false]; + required int64 Term = 2 [(gogoproto.nullable) = false]; + required int64 Index = 3 [(gogoproto.nullable) = false]; + optional bytes Data = 4 [(gogoproto.nullable) = false]; } message Snapshot { From 4203569da2c99512c07e648d3978e55c2e139310 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sat, 20 Sep 2014 22:56:28 -0700 Subject: [PATCH 06/11] etcdserver: use ConfigType in protobuf --- etcdserver/etcdserverpb/etcdserver.pb.go | 46 ++++++++++++++++++++---- etcdserver/etcdserverpb/etcdserver.proto | 16 ++++++--- etcdserver/server.go | 13 +++---- 3 files changed, 55 insertions(+), 20 deletions(-) diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 07179d5e4..f0157bdce 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -28,6 +28,39 @@ var _ = proto.Marshal var _ = &json.SyntaxError{} var _ = math.Inf +type ConfigType int32 + +const ( + ConfigAddNode ConfigType = 0 + ConfigRemoveNode ConfigType = 1 +) + +var ConfigType_name = map[int32]string{ + 0: "ConfigAddNode", + 1: "ConfigRemoveNode", +} +var ConfigType_value = map[string]int32{ + "ConfigAddNode": 0, + "ConfigRemoveNode": 1, +} + +func (x ConfigType) Enum() *ConfigType { + p := new(ConfigType) + *p = x + return p +} +func (x ConfigType) String() string { + return proto.EnumName(ConfigType_name, int32(x)) +} +func (x *ConfigType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(ConfigType_value, data, "ConfigType") + if err != nil { + return err + } + *x = ConfigType(value) + return nil +} + type Request struct { Id int64 `protobuf:"varint,1,req,name=id" json:"id"` Method string `protobuf:"bytes,2,req,name=method" json:"method"` @@ -52,11 +85,11 @@ func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} type Config struct { - ID int64 `protobuf:"varint,1,req" json:"ID"` - Type int64 `protobuf:"varint,2,req" json:"Type"` - NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` - Context []byte `protobuf:"bytes,4,opt" json:"Context"` - XXX_unrecognized []byte `json:"-"` + ID int64 `protobuf:"varint,1,req" json:"ID"` + Type ConfigType `protobuf:"varint,2,req,enum=etcdserverpb.ConfigType" json:"Type"` + NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` + Context []byte `protobuf:"bytes,4,opt" json:"Context"` + XXX_unrecognized []byte `json:"-"` } func (m *Config) Reset() { *m = Config{} } @@ -64,6 +97,7 @@ func (m *Config) String() string { return proto.CompactTextString(m) } func (*Config) ProtoMessage() {} func init() { + proto.RegisterEnum("etcdserverpb.ConfigType", ConfigType_name, ConfigType_value) } func (m *Request) Unmarshal(data []byte) error { l := len(data) @@ -417,7 +451,7 @@ func (m *Config) Unmarshal(data []byte) error { } b := data[index] index++ - m.Type |= (int64(b) & 0x7F) << shift + m.Type |= (ConfigType(b) & 0x7F) << shift if b < 0x80 { break } diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index 24b5996cd..243dc6fe4 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -6,6 +6,7 @@ option (gogoproto.marshaler_all) = true; option (gogoproto.sizer_all) = true; option (gogoproto.unmarshaler_all) = true; option (gogoproto.goproto_getters_all) = false; +option (gogoproto.goproto_enum_prefix_all) = false; message Request { required int64 id = 1 [(gogoproto.nullable) = false]; @@ -25,9 +26,14 @@ message Request { required int64 time = 15 [(gogoproto.nullable) = false]; } -message Config { - required int64 ID = 1 [(gogoproto.nullable) = false]; - required int64 Type = 2 [(gogoproto.nullable) = false]; - required int64 NodeID = 3 [(gogoproto.nullable) = false]; - optional bytes Context = 4 [(gogoproto.nullable) = false]; +enum ConfigType { + ConfigAddNode = 0; + ConfigRemoveNode = 1; +} + +message Config { + required int64 ID = 1 [(gogoproto.nullable) = false]; + required ConfigType Type = 2 [(gogoproto.nullable) = false]; + required int64 NodeID = 3 [(gogoproto.nullable) = false]; + optional bytes Context = 4 [(gogoproto.nullable) = false]; } diff --git a/etcdserver/server.go b/etcdserver/server.go index 7a45a9b38..4eac5a5b9 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -19,11 +19,6 @@ const ( DefaultSnapCount = 10000 ) -const ( - configAddNode int64 = iota - configRemoveNode -) - var ( ErrUnknownMethod = errors.New("etcdserver: unknown method") ErrStopped = errors.New("etcdserver: server stopped") @@ -238,7 +233,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error { req := pb.Config{ ID: GenID(), - Type: configAddNode, + Type: pb.ConfigAddNode, NodeID: id, Context: context, } @@ -248,7 +243,7 @@ func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) erro func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { req := pb.Config{ ID: GenID(), - Type: configRemoveNode, + Type: pb.ConfigRemoveNode, NodeID: id, } return s.configure(ctx, req) @@ -349,9 +344,9 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { func (s *EtcdServer) applyConfig(r pb.Config) { switch r.Type { - case configAddNode: + case pb.ConfigAddNode: s.Node.AddNode(r.NodeID) - case configRemoveNode: + case pb.ConfigRemoveNode: s.Node.RemoveNode(r.NodeID) default: // This should never be reached From dc36ae70585f3fa5021f2607ba15d262be81d9ed Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 22 Sep 2014 14:38:21 -0700 Subject: [PATCH 07/11] raft: use pb.Config instead of []byte for Configure --- etcdserver/etcdserverpb/etcdserver.pb.go | 202 ----------------------- etcdserver/etcdserverpb/etcdserver.proto | 13 -- etcdserver/server.go | 27 ++- etcdserver/server_test.go | 30 ++-- raft/node.go | 9 +- raft/raftpb/raft.pb.go | 202 +++++++++++++++++++++++ raft/raftpb/raft.proto | 12 ++ 7 files changed, 251 insertions(+), 244 deletions(-) diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index f0157bdce..a4176a31f 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -10,7 +10,6 @@ It has these top-level messages: Request - Config */ package etcdserverpb @@ -28,39 +27,6 @@ var _ = proto.Marshal var _ = &json.SyntaxError{} var _ = math.Inf -type ConfigType int32 - -const ( - ConfigAddNode ConfigType = 0 - ConfigRemoveNode ConfigType = 1 -) - -var ConfigType_name = map[int32]string{ - 0: "ConfigAddNode", - 1: "ConfigRemoveNode", -} -var ConfigType_value = map[string]int32{ - "ConfigAddNode": 0, - "ConfigRemoveNode": 1, -} - -func (x ConfigType) Enum() *ConfigType { - p := new(ConfigType) - *p = x - return p -} -func (x ConfigType) String() string { - return proto.EnumName(ConfigType_name, int32(x)) -} -func (x *ConfigType) UnmarshalJSON(data []byte) error { - value, err := proto.UnmarshalJSONEnum(ConfigType_value, data, "ConfigType") - if err != nil { - return err - } - *x = ConfigType(value) - return nil -} - type Request struct { Id int64 `protobuf:"varint,1,req,name=id" json:"id"` Method string `protobuf:"bytes,2,req,name=method" json:"method"` @@ -84,20 +50,7 @@ func (m *Request) Reset() { *m = Request{} } func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} -type Config struct { - ID int64 `protobuf:"varint,1,req" json:"ID"` - Type ConfigType `protobuf:"varint,2,req,enum=etcdserverpb.ConfigType" json:"Type"` - NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` - Context []byte `protobuf:"bytes,4,opt" json:"Context"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *Config) Reset() { *m = Config{} } -func (m *Config) String() string { return proto.CompactTextString(m) } -func (*Config) ProtoMessage() {} - func init() { - proto.RegisterEnum("etcdserverpb.ConfigType", ConfigType_name, ConfigType_value) } func (m *Request) Unmarshal(data []byte) error { l := len(data) @@ -407,115 +360,6 @@ func (m *Request) Unmarshal(data []byte) error { } return nil } -func (m *Config) Unmarshal(data []byte) error { - l := len(data) - index := 0 - for index < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - switch fieldNum { - case 1: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType - } - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - m.ID |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 2: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType - } - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - m.Type |= (ConfigType(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 3: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType - } - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - m.NodeID |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 4: - if wireType != 2 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - byteLen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - postIndex := index + byteLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Context = append(m.Context, data[index:postIndex]...) - index = postIndex - default: - var sizeOfWire int - for { - sizeOfWire++ - wire >>= 7 - if wire == 0 { - break - } - } - index -= sizeOfWire - skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) - if err != nil { - return err - } - if (index + skippy) > l { - return io.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) - index += skippy - } - } - return nil -} func (m *Request) Size() (n int) { var l int _ = l @@ -545,19 +389,6 @@ func (m *Request) Size() (n int) { } return n } -func (m *Config) Size() (n int) { - var l int - _ = l - n += 1 + sovEtcdserver(uint64(m.ID)) - n += 1 + sovEtcdserver(uint64(m.Type)) - n += 1 + sovEtcdserver(uint64(m.NodeID)) - l = len(m.Context) - n += 1 + l + sovEtcdserver(uint64(l)) - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} func sovEtcdserver(x uint64) (n int) { for { @@ -673,39 +504,6 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) { } return i, nil } -func (m *Config) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil -} - -func (m *Config) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - data[i] = 0x8 - i++ - i = encodeVarintEtcdserver(data, i, uint64(m.ID)) - data[i] = 0x10 - i++ - i = encodeVarintEtcdserver(data, i, uint64(m.Type)) - data[i] = 0x18 - i++ - i = encodeVarintEtcdserver(data, i, uint64(m.NodeID)) - data[i] = 0x22 - i++ - i = encodeVarintEtcdserver(data, i, uint64(len(m.Context))) - i += copy(data[i:], m.Context) - if m.XXX_unrecognized != nil { - i += copy(data[i:], m.XXX_unrecognized) - } - return i, nil -} func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index 243dc6fe4..bb34ac250 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -6,7 +6,6 @@ option (gogoproto.marshaler_all) = true; option (gogoproto.sizer_all) = true; option (gogoproto.unmarshaler_all) = true; option (gogoproto.goproto_getters_all) = false; -option (gogoproto.goproto_enum_prefix_all) = false; message Request { required int64 id = 1 [(gogoproto.nullable) = false]; @@ -25,15 +24,3 @@ message Request { required bool quorum = 14 [(gogoproto.nullable) = false]; required int64 time = 15 [(gogoproto.nullable) = false]; } - -enum ConfigType { - ConfigAddNode = 0; - ConfigRemoveNode = 1; -} - -message Config { - required int64 ID = 1 [(gogoproto.nullable) = false]; - required ConfigType Type = 2 [(gogoproto.nullable) = false]; - required int64 NodeID = 3 [(gogoproto.nullable) = false]; - optional bytes Context = 4 [(gogoproto.nullable) = false]; -} diff --git a/etcdserver/server.go b/etcdserver/server.go index 4eac5a5b9..c19d62356 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -129,7 +129,7 @@ func (s *EtcdServer) run() { } s.w.Trigger(r.Id, s.applyRequest(r)) case raftpb.EntryConfig: - var c pb.Config + var c raftpb.Config if err := c.Unmarshal(e.Data); err != nil { panic("TODO: this is bad, what do we do about it?") } @@ -231,9 +231,9 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error { - req := pb.Config{ + req := raftpb.Config{ ID: GenID(), - Type: pb.ConfigAddNode, + Type: raftpb.ConfigAddNode, NodeID: id, Context: context, } @@ -241,9 +241,9 @@ func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) erro } func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { - req := pb.Config{ + req := raftpb.Config{ ID: GenID(), - Type: pb.ConfigRemoveNode, + Type: raftpb.ConfigRemoveNode, NodeID: id, } return s.configure(ctx, req) @@ -251,14 +251,13 @@ func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { // configure sends configuration change through consensus then performs it. // It will block until the change is performed or there is an error. -func (s *EtcdServer) configure(ctx context.Context, r pb.Config) error { - data, err := r.Marshal() - if err != nil { - log.Printf("marshal request %#v error: %v", r, err) +func (s *EtcdServer) configure(ctx context.Context, r raftpb.Config) error { + ch := s.w.Register(r.ID) + if err := s.Node.Configure(ctx, r); err != nil { + log.Printf("configure error: %v", err) + s.w.Trigger(r.ID, nil) return err } - ch := s.w.Register(r.ID) - s.Node.Configure(ctx, data) select { case <-ch: return nil @@ -342,11 +341,11 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { } } -func (s *EtcdServer) applyConfig(r pb.Config) { +func (s *EtcdServer) applyConfig(r raftpb.Config) { switch r.Type { - case pb.ConfigAddNode: + case raftpb.ConfigAddNode: s.Node.AddNode(r.NodeID) - case pb.ConfigRemoveNode: + case raftpb.ConfigRemoveNode: s.Node.RemoveNode(r.NodeID) default: // This should never be reached diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 87845559c..34f88c81b 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -802,16 +802,16 @@ func newReadyNode() *readyNode { readyc := make(chan raft.Ready, 1) return &readyNode{readyc: readyc} } -func (n *readyNode) Tick() {} -func (n *readyNode) Campaign(ctx context.Context) error { return nil } -func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil } -func (n *readyNode) Configure(ctx context.Context, data []byte) error { return nil } -func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil } -func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } -func (n *readyNode) Stop() {} -func (n *readyNode) Compact(d []byte) {} -func (n *readyNode) AddNode(id int64) {} -func (n *readyNode) RemoveNode(id int64) {} +func (n *readyNode) Tick() {} +func (n *readyNode) Campaign(ctx context.Context) error { return nil } +func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil } +func (n *readyNode) Configure(ctx context.Context, conf raftpb.Config) error { return nil } +func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil } +func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } +func (n *readyNode) Stop() {} +func (n *readyNode) Compact(d []byte) {} +func (n *readyNode) AddNode(id int64) {} +func (n *readyNode) RemoveNode(id int64) {} type nodeRecorder struct { recorder @@ -828,7 +828,7 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error { n.record("Propose") return nil } -func (n *nodeRecorder) Configure(ctx context.Context, data []byte) error { +func (n *nodeRecorder) Configure(ctx context.Context, conf raftpb.Config) error { n.record("Configure") return nil } @@ -894,9 +894,13 @@ func (n *nodeCommitterRecorder) Propose(ctx context.Context, data []byte) error n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Data: data}}} return n.nodeRecorder.Propose(ctx, data) } -func (n *nodeCommitterRecorder) Configure(ctx context.Context, data []byte) error { +func (n *nodeCommitterRecorder) Configure(ctx context.Context, conf raftpb.Config) error { + data, err := conf.Marshal() + if err != nil { + return err + } n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfig, Data: data}}} - return n.nodeRecorder.Configure(ctx, data) + return n.nodeRecorder.Configure(ctx, conf) } func (n *nodeCommitterRecorder) Ready() <-chan raft.Ready { return n.readyc diff --git a/raft/node.go b/raft/node.go index 43c55b8f1..46001b1a3 100644 --- a/raft/node.go +++ b/raft/node.go @@ -81,7 +81,8 @@ type Node interface { // Propose proposes that data be appended to the log. Propose(ctx context.Context, data []byte) error // Configure proposes config change. Only one config can be in the process of going through consensus at a time. - Configure(ctx context.Context, data []byte) error + // Configure doesn't perform config change. + Configure(ctx context.Context, conf pb.Config) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state @@ -246,7 +247,11 @@ func (n *node) Propose(ctx context.Context, data []byte) error { return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}}) } -func (n *node) Configure(ctx context.Context, data []byte) error { +func (n *node) Configure(ctx context.Context, conf pb.Config) error { + data, err := conf.Marshal() + if err != nil { + return err + } return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig, Data: data}}}) } diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index c12a97417..1ba9a88d6 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -14,6 +14,7 @@ Snapshot Message HardState + Config */ package raftpb @@ -64,6 +65,39 @@ func (x *EntryType) UnmarshalJSON(data []byte) error { return nil } +type ConfigType int32 + +const ( + ConfigAddNode ConfigType = 0 + ConfigRemoveNode ConfigType = 1 +) + +var ConfigType_name = map[int32]string{ + 0: "ConfigAddNode", + 1: "ConfigRemoveNode", +} +var ConfigType_value = map[string]int32{ + "ConfigAddNode": 0, + "ConfigRemoveNode": 1, +} + +func (x ConfigType) Enum() *ConfigType { + p := new(ConfigType) + *p = x + return p +} +func (x ConfigType) String() string { + return proto.EnumName(ConfigType_name, int32(x)) +} +func (x *ConfigType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(ConfigType_value, data, "ConfigType") + if err != nil { + return err + } + *x = ConfigType(value) + return nil +} + type Info struct { Id int64 `protobuf:"varint,1,req,name=id" json:"id"` XXX_unrecognized []byte `json:"-"` @@ -125,8 +159,21 @@ func (m *HardState) Reset() { *m = HardState{} } func (m *HardState) String() string { return proto.CompactTextString(m) } func (*HardState) ProtoMessage() {} +type Config struct { + ID int64 `protobuf:"varint,1,req" json:"ID"` + Type ConfigType `protobuf:"varint,2,req,enum=raftpb.ConfigType" json:"Type"` + NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` + Context []byte `protobuf:"bytes,4,opt" json:"Context"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Config) Reset() { *m = Config{} } +func (m *Config) String() string { return proto.CompactTextString(m) } +func (*Config) ProtoMessage() {} + func init() { proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value) + proto.RegisterEnum("raftpb.ConfigType", ConfigType_name, ConfigType_value) } func (m *Info) Unmarshal(data []byte) error { l := len(data) @@ -686,6 +733,115 @@ func (m *HardState) Unmarshal(data []byte) error { } return nil } +func (m *Config) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.ID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Type |= (ConfigType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.NodeID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Context = append(m.Context, data[index:postIndex]...) + index = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} func (m *Info) Size() (n int) { var l int _ = l @@ -759,6 +915,19 @@ func (m *HardState) Size() (n int) { } return n } +func (m *Config) Size() (n int) { + var l int + _ = l + n += 1 + sovRaft(uint64(m.ID)) + n += 1 + sovRaft(uint64(m.Type)) + n += 1 + sovRaft(uint64(m.NodeID)) + l = len(m.Context) + n += 1 + l + sovRaft(uint64(l)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} func sovRaft(x uint64) (n int) { for { @@ -962,6 +1131,39 @@ func (m *HardState) MarshalTo(data []byte) (n int, err error) { } return i, nil } +func (m *Config) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Config) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintRaft(data, i, uint64(m.ID)) + data[i] = 0x10 + i++ + i = encodeVarintRaft(data, i, uint64(m.Type)) + data[i] = 0x18 + i++ + i = encodeVarintRaft(data, i, uint64(m.NodeID)) + data[i] = 0x22 + i++ + i = encodeVarintRaft(data, i, uint64(len(m.Context))) + i += copy(data[i:], m.Context) + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} func encodeFixed64Raft(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 6eff384ca..38f0db45f 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -48,3 +48,15 @@ message HardState { required int64 vote = 2 [(gogoproto.nullable) = false]; required int64 commit = 3 [(gogoproto.nullable) = false]; } + +enum ConfigType { + ConfigAddNode = 0; + ConfigRemoveNode = 1; +} + +message Config { + required int64 ID = 1 [(gogoproto.nullable) = false]; + required ConfigType Type = 2 [(gogoproto.nullable) = false]; + required int64 NodeID = 3 [(gogoproto.nullable) = false]; + optional bytes Context = 4 [(gogoproto.nullable) = false]; +} From ec8f493fde6b227af6d287a25b0a9ed87ee7f890 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 22 Sep 2014 15:01:16 -0700 Subject: [PATCH 08/11] raft: refine comments for Configure --- etcdserver/server.go | 2 +- raft/doc.go | 18 +++++++++++------- raft/node.go | 4 ++-- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index c19d62356..0c75b18af 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -349,7 +349,7 @@ func (s *EtcdServer) applyConfig(r raftpb.Config) { s.Node.RemoveNode(r.NodeID) default: // This should never be reached - panic("unsupported config type") + panic("unexpected config type") } } diff --git a/raft/doc.go b/raft/doc.go index 53a42f1f7..5a13a87b9 100644 --- a/raft/doc.go +++ b/raft/doc.go @@ -61,16 +61,20 @@ data, serialize it into a byte slice and call: n.Propose(ctx, data) -To add or remove node in a cluster, serialize the data for configuration change -into a byte slice and call: +To add or remove node in a cluster, build Config struct and call: - n.Configure(ctx, data) + n.Configure(ctx, conf) -For the safety consideration, one configuration should include at most one node -change, which is applied through: +After configuration is committed, you should apply it to node through: - n.AddNode(id) - n.RemoveNode(id) + var conf raftpb.Config + conf.Unmarshal(data) + switch conf.Type { + case raftpb.ConfigAddNode: + n.AddNode(conf.ID) + case raftpb.ConfigRemoveNode: + n.RemoveNode(conf.ID) + } */ package raft diff --git a/raft/node.go b/raft/node.go index 46001b1a3..a8c6385e7 100644 --- a/raft/node.go +++ b/raft/node.go @@ -80,8 +80,8 @@ type Node interface { Campaign(ctx context.Context) error // Propose proposes that data be appended to the log. Propose(ctx context.Context, data []byte) error - // Configure proposes config change. Only one config can be in the process of going through consensus at a time. - // Configure doesn't perform config change. + // Configure proposes config change. At most one config can be in the process of going through consensus. + // Application needs to call AddNode/RemoveNode when applying EntryConfig type entry. Configure(ctx context.Context, conf pb.Config) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error From d92931853ec4ae18f78b9e0b9dc3c16eddcfa712 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 22 Sep 2014 21:59:13 -0700 Subject: [PATCH 09/11] raft: Config -> ConfigChange Configure -> ProposeConfigChange AddNode, RemoveNode -> ApplyConfigChange --- etcdserver/server.go | 52 +++++++++++---------------- etcdserver/server_test.go | 72 ++++++++++++++++++------------------- raft/doc.go | 21 ++++++----- raft/node.go | 74 +++++++++++++++------------------------ raft/raft.go | 4 +-- raft/raft_test.go | 16 ++++----- raft/raftpb/raft.pb.go | 74 +++++++++++++++++++-------------------- raft/raftpb/raft.proto | 20 +++++------ 8 files changed, 151 insertions(+), 182 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 0c75b18af..420f8b038 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -127,16 +127,16 @@ func (s *EtcdServer) run() { if err := r.Unmarshal(e.Data); err != nil { panic("TODO: this is bad, what do we do about it?") } - s.w.Trigger(r.Id, s.applyRequest(r)) - case raftpb.EntryConfig: - var c raftpb.Config - if err := c.Unmarshal(e.Data); err != nil { + s.w.Trigger(r.Id, s.apply(r)) + case raftpb.EntryConfigChange: + var cc raftpb.ConfigChange + if err := cc.Unmarshal(e.Data); err != nil { panic("TODO: this is bad, what do we do about it?") } - s.applyConfig(c) - s.w.Trigger(c.ID, nil) + s.Node.ApplyConfigChange(cc) + s.w.Trigger(cc.ID, nil) default: - panic("unsupported entry type") + panic("unexpected entry type") } appliedi = e.Index } @@ -231,38 +231,38 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error { - req := raftpb.Config{ + cc := raftpb.ConfigChange{ ID: GenID(), - Type: raftpb.ConfigAddNode, + Type: raftpb.ConfigChangeAddNode, NodeID: id, Context: context, } - return s.configure(ctx, req) + return s.configure(ctx, cc) } func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { - req := raftpb.Config{ + cc := raftpb.ConfigChange{ ID: GenID(), - Type: raftpb.ConfigRemoveNode, + Type: raftpb.ConfigChangeRemoveNode, NodeID: id, } - return s.configure(ctx, req) + return s.configure(ctx, cc) } // configure sends configuration change through consensus then performs it. // It will block until the change is performed or there is an error. -func (s *EtcdServer) configure(ctx context.Context, r raftpb.Config) error { - ch := s.w.Register(r.ID) - if err := s.Node.Configure(ctx, r); err != nil { +func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfigChange) error { + ch := s.w.Register(cc.ID) + if err := s.Node.ProposeConfigChange(ctx, cc); err != nil { log.Printf("configure error: %v", err) - s.w.Trigger(r.ID, nil) + s.w.Trigger(cc.ID, nil) return err } select { case <-ch: return nil case <-ctx.Done(): - s.w.Trigger(r.ID, nil) // GC wait + s.w.Trigger(cc.ID, nil) // GC wait return ctx.Err() case <-s.done: return ErrStopped @@ -300,8 +300,8 @@ func getExpirationTime(r *pb.Request) time.Time { return t } -// applyRequest interprets r as a call to store.X and returns an Response interpreted from store.Event -func (s *EtcdServer) applyRequest(r pb.Request) Response { +// apply interprets r as a call to store.X and returns an Response interpreted from store.Event +func (s *EtcdServer) apply(r pb.Request) Response { f := func(ev *store.Event, err error) Response { return Response{Event: ev, err: err} } @@ -341,18 +341,6 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { } } -func (s *EtcdServer) applyConfig(r raftpb.Config) { - switch r.Type { - case raftpb.ConfigAddNode: - s.Node.AddNode(r.NodeID) - case raftpb.ConfigRemoveNode: - s.Node.RemoveNode(r.NodeID) - default: - // This should never be reached - panic("unexpected config type") - } -} - // TODO: non-blocking snapshot func (s *EtcdServer) snapshot() { d, err := s.Store.Save() diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 34f88c81b..7577dc306 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -120,7 +120,7 @@ func TestDoBadLocalAction(t *testing.T) { } } -func TestApplyRequest(t *testing.T) { +func TestApply(t *testing.T) { tests := []struct { req pb.Request @@ -188,7 +188,7 @@ func TestApplyRequest(t *testing.T) { for i, tt := range tests { st := &storeRecorder{} srv := &EtcdServer{Store: st} - resp := srv.applyRequest(tt.req) + resp := srv.apply(tt.req) if !reflect.DeepEqual(resp, tt.wresp) { t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp) @@ -594,9 +594,9 @@ func TestRecvSlowSnapshot(t *testing.T) { } } -// TestAddNode tests AddNode could propose configuration and add node to raft. +// TestAddNode tests AddNode could propose and perform node addition. func TestAddNode(t *testing.T) { - n := newNodeCommitterRecorder() + n := newNodeConfigChangeCommitterRecorder() s := &EtcdServer{ Node: n, Store: &storeRecorder{}, @@ -608,15 +608,15 @@ func TestAddNode(t *testing.T) { action := n.Action() s.Stop() - waction := []string{"Configure", "AddNode"} + waction := []string{"ProposeConfigChange:ConfigChangeAddNode", "ApplyConfigChange:ConfigChangeAddNode"} if !reflect.DeepEqual(action, waction) { t.Errorf("action = %v, want %v", action, waction) } } -// TestRemoveNode tests RemoveNode could propose configuration and remove node from raft. +// TestRemoveNode tests RemoveNode could propose and perform node removal. func TestRemoveNode(t *testing.T) { - n := newNodeCommitterRecorder() + n := newNodeConfigChangeCommitterRecorder() s := &EtcdServer{ Node: n, Store: &storeRecorder{}, @@ -628,7 +628,7 @@ func TestRemoveNode(t *testing.T) { action := n.Action() s.Stop() - waction := []string{"Configure", "RemoveNode"} + waction := []string{"ProposeConfigChange:ConfigChangeRemoveNode", "ApplyConfigChange:ConfigChangeRemoveNode"} if !reflect.DeepEqual(action, waction) { t.Errorf("action = %v, want %v", action, waction) } @@ -802,16 +802,17 @@ func newReadyNode() *readyNode { readyc := make(chan raft.Ready, 1) return &readyNode{readyc: readyc} } -func (n *readyNode) Tick() {} -func (n *readyNode) Campaign(ctx context.Context) error { return nil } -func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil } -func (n *readyNode) Configure(ctx context.Context, conf raftpb.Config) error { return nil } -func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil } -func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } -func (n *readyNode) Stop() {} -func (n *readyNode) Compact(d []byte) {} -func (n *readyNode) AddNode(id int64) {} -func (n *readyNode) RemoveNode(id int64) {} +func (n *readyNode) Tick() {} +func (n *readyNode) Campaign(ctx context.Context) error { return nil } +func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil } +func (n *readyNode) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error { + return nil +} +func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil } +func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } +func (n *readyNode) ApplyConfigChange(conf raftpb.ConfigChange) {} +func (n *readyNode) Stop() {} +func (n *readyNode) Compact(d []byte) {} type nodeRecorder struct { recorder @@ -828,8 +829,8 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error { n.record("Propose") return nil } -func (n *nodeRecorder) Configure(ctx context.Context, conf raftpb.Config) error { - n.record("Configure") +func (n *nodeRecorder) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error { + n.record("ProposeConfigChange") return nil } func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error { @@ -837,18 +838,15 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error { return nil } func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil } +func (n *nodeRecorder) ApplyConfigChange(conf raftpb.ConfigChange) { + n.record("ApplyConfigChange") +} func (n *nodeRecorder) Stop() { n.record("Stop") } func (n *nodeRecorder) Compact(d []byte) { n.record("Compact") } -func (n *nodeRecorder) AddNode(id int64) { - n.record("AddNode") -} -func (n *nodeRecorder) RemoveNode(id int64) { - n.record("RemoveNode") -} type nodeProposeDataRecorder struct { nodeRecorder @@ -880,28 +878,28 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) return nil } -type nodeCommitterRecorder struct { +type nodeConfigChangeCommitterRecorder struct { nodeRecorder readyc chan raft.Ready } -func newNodeCommitterRecorder() *nodeCommitterRecorder { +func newNodeConfigChangeCommitterRecorder() *nodeConfigChangeCommitterRecorder { readyc := make(chan raft.Ready, 1) readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}} - return &nodeCommitterRecorder{readyc: readyc} + return &nodeConfigChangeCommitterRecorder{readyc: readyc} } -func (n *nodeCommitterRecorder) Propose(ctx context.Context, data []byte) error { - n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Data: data}}} - return n.nodeRecorder.Propose(ctx, data) -} -func (n *nodeCommitterRecorder) Configure(ctx context.Context, conf raftpb.Config) error { +func (n *nodeConfigChangeCommitterRecorder) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error { data, err := conf.Marshal() if err != nil { return err } - n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfig, Data: data}}} - return n.nodeRecorder.Configure(ctx, conf) + n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfigChange, Data: data}}} + n.record("ProposeConfigChange:" + conf.Type.String()) + return nil } -func (n *nodeCommitterRecorder) Ready() <-chan raft.Ready { +func (n *nodeConfigChangeCommitterRecorder) Ready() <-chan raft.Ready { return n.readyc } +func (n *nodeConfigChangeCommitterRecorder) ApplyConfigChange(conf raftpb.ConfigChange) { + n.record("ApplyConfigChange:" + conf.Type.String()) +} diff --git a/raft/doc.go b/raft/doc.go index 5a13a87b9..878a33645 100644 --- a/raft/doc.go +++ b/raft/doc.go @@ -61,20 +61,19 @@ data, serialize it into a byte slice and call: n.Propose(ctx, data) -To add or remove node in a cluster, build Config struct and call: +If the proposal is committed, data will appear in committed entries with type +raftpb.EntryNormal. - n.Configure(ctx, conf) +To add or remove node in a cluster, build ConfigChange struct 'cc' and call: -After configuration is committed, you should apply it to node through: + n.ProposeConfigChange(ctx, cc) - var conf raftpb.Config - conf.Unmarshal(data) - switch conf.Type { - case raftpb.ConfigAddNode: - n.AddNode(conf.ID) - case raftpb.ConfigRemoveNode: - n.RemoveNode(conf.ID) - } +After config change is committed, some committed entry with type +raftpb.EntryConfigChange will be returned. You should apply it to node through: + + var cc raftpb.ConfigChange + cc.Unmarshal(data) + n.ApplyConfigChange(cc) */ package raft diff --git a/raft/node.go b/raft/node.go index a8c6385e7..f23892248 100644 --- a/raft/node.go +++ b/raft/node.go @@ -80,23 +80,22 @@ type Node interface { Campaign(ctx context.Context) error // Propose proposes that data be appended to the log. Propose(ctx context.Context, data []byte) error - // Configure proposes config change. At most one config can be in the process of going through consensus. - // Application needs to call AddNode/RemoveNode when applying EntryConfig type entry. - Configure(ctx context.Context, conf pb.Config) error + // ProposeConfigChange proposes config change. + // At most one ConfigChange can be in the process of going through consensus. + // Application needs to call ApplyConfigChange when applying EntryConfigChange type entry. + ProposeConfigChange(ctx context.Context, cc pb.ConfigChange) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state Ready() <-chan Ready + // ApplyConfigChange applies config change to the local node. + // TODO: reject existing node when add node + // TODO: reject non-existant node when remove node + ApplyConfigChange(cc pb.ConfigChange) // Stop performs any necessary termination of the Node Stop() // Compact Compact(d []byte) - // AddNode adds a node with given id into peer list. - // TODO: reject existed node - AddNode(id int64) - // RemoveNode removes a node with give id from peer list. - // TODO: reject unexisted node - RemoveNode(id int64) } // StartNode returns a new Node given a unique raft id, a list of raft peers, and @@ -123,22 +122,12 @@ func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb. return &n } -const ( - confAdd = iota - confRemove -) - -type conf struct { - typ int - id int64 -} - // node is the canonical implementation of the Node interface type node struct { propc chan pb.Message recvc chan pb.Message compactc chan []byte - confc chan conf + confc chan pb.ConfigChange readyc chan Ready tickc chan struct{} done chan struct{} @@ -149,7 +138,7 @@ func newNode() node { propc: make(chan pb.Message), recvc: make(chan pb.Message), compactc: make(chan []byte), - confc: make(chan conf), + confc: make(chan pb.ConfigChange), readyc: make(chan Ready), tickc: make(chan struct{}), done: make(chan struct{}), @@ -188,7 +177,9 @@ func (n *node) run(r *raft) { } select { - // TODO: buffer the config propose if there exists one + // TODO: maybe buffer the config propose if there exists one (the way + // described in raft dissertation) + // Currently it is dropped in Step silently. case m := <-propc: m.From = r.id r.Step(m) @@ -196,12 +187,12 @@ func (n *node) run(r *raft) { r.Step(m) // raft never returns an error case d := <-n.compactc: r.compact(d) - case c := <-n.confc: - switch c.typ { - case confAdd: - r.addNode(c.id) - case confRemove: - r.removeNode(c.id) + case cc := <-n.confc: + switch cc.Type { + case pb.ConfigChangeAddNode: + r.addNode(cc.NodeID) + case pb.ConfigChangeRemoveNode: + r.removeNode(cc.NodeID) default: panic("unexpected conf type") } @@ -247,12 +238,12 @@ func (n *node) Propose(ctx context.Context, data []byte) error { return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}}) } -func (n *node) Configure(ctx context.Context, conf pb.Config) error { - data, err := conf.Marshal() +func (n *node) ProposeConfigChange(ctx context.Context, cc pb.ConfigChange) error { + data, err := cc.Marshal() if err != nil { return err } - return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig, Data: data}}}) + return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange, Data: data}}}) } // Step advances the state machine using msgs. The ctx.Err() will be returned, @@ -277,6 +268,13 @@ func (n *node) Ready() <-chan Ready { return n.readyc } +func (n *node) ApplyConfigChange(cc pb.ConfigChange) { + select { + case n.confc <- cc: + case <-n.done: + } +} + func (n *node) Compact(d []byte) { select { case n.compactc <- d: @@ -284,20 +282,6 @@ func (n *node) Compact(d []byte) { } } -func (n *node) AddNode(id int64) { - select { - case n.confc <- conf{typ: confAdd, id: id}: - case <-n.done: - } -} - -func (n *node) RemoveNode(id int64) { - select { - case n.confc <- conf{typ: confRemove, id: id}: - case <-n.done: - } -} - func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi int64) Ready { rd := Ready{ Entries: r.raftLog.unstableEnts(), diff --git a/raft/raft.go b/raft/raft.go index ddd9eb1e7..ede8e2634 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -313,7 +313,7 @@ func (r *raft) becomeLeader() { r.lead = r.id r.state = StateLeader for _, e := range r.raftLog.entries(r.raftLog.committed + 1) { - if e.Type != pb.EntryConfig { + if e.Type != pb.EntryConfigChange { continue } if r.pendingConf { @@ -407,7 +407,7 @@ func stepLeader(r *raft, m pb.Message) { panic("unexpected length(entries) of a msgProp") } e := m.Entries[0] - if e.Type == pb.EntryConfig { + if e.Type == pb.EntryConfigChange { if r.pendingConf { return } diff --git a/raft/raft_test.go b/raft/raft_test.go index 1d56c4ff8..dade6004d 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -948,7 +948,7 @@ func TestSlowNodeRestore(t *testing.T) { } } -// TestStepConfig tests that when raft step msgProp in ConfigEntry type, +// TestStepConfig tests that when raft step msgProp in EntryConfigChange type, // it appends the entry to log and sets pendingConf to be true. func TestStepConfig(t *testing.T) { // a raft that cannot make progress @@ -956,7 +956,7 @@ func TestStepConfig(t *testing.T) { r.becomeCandidate() r.becomeLeader() index := r.raftLog.lastIndex() - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}}) + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}}) if g := r.raftLog.lastIndex(); g != index+1 { t.Errorf("index = %d, want %d", g, index+1) } @@ -966,17 +966,17 @@ func TestStepConfig(t *testing.T) { } // TestStepIgnoreConfig tests that if raft step the second msgProp in -// ConfigEntry type when the first one is uncommitted, the node will deny +// EntryConfigChange type when the first one is uncommitted, the node will deny // the proposal and keep its original state. func TestStepIgnoreConfig(t *testing.T) { // a raft that cannot make progress r := newRaft(1, []int64{1, 2}, 0, 0) r.becomeCandidate() r.becomeLeader() - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}}) + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}}) index := r.raftLog.lastIndex() pendingConf := r.pendingConf - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}}) + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}}) if g := r.raftLog.lastIndex(); g != index { t.Errorf("index = %d, want %d", g, index) } @@ -993,7 +993,7 @@ func TestRecoverPendingConfig(t *testing.T) { wpending bool }{ {pb.EntryNormal, false}, - {pb.EntryConfig, true}, + {pb.EntryConfigChange, true}, } for i, tt := range tests { r := newRaft(1, []int64{1, 2}, 0, 0) @@ -1016,8 +1016,8 @@ func TestRecoverDoublePendingConfig(t *testing.T) { } }() r := newRaft(1, []int64{1, 2}, 0, 0) - r.appendEntry(pb.Entry{Type: pb.EntryConfig}) - r.appendEntry(pb.Entry{Type: pb.EntryConfig}) + r.appendEntry(pb.Entry{Type: pb.EntryConfigChange}) + r.appendEntry(pb.Entry{Type: pb.EntryConfigChange}) r.becomeCandidate() r.becomeLeader() }() diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 1ba9a88d6..0474b7cd8 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -14,7 +14,7 @@ Snapshot Message HardState - Config + ConfigChange */ package raftpb @@ -35,17 +35,17 @@ var _ = math.Inf type EntryType int32 const ( - EntryNormal EntryType = 0 - EntryConfig EntryType = 1 + EntryNormal EntryType = 0 + EntryConfigChange EntryType = 1 ) var EntryType_name = map[int32]string{ 0: "EntryNormal", - 1: "EntryConfig", + 1: "EntryConfigChange", } var EntryType_value = map[string]int32{ - "EntryNormal": 0, - "EntryConfig": 1, + "EntryNormal": 0, + "EntryConfigChange": 1, } func (x EntryType) Enum() *EntryType { @@ -65,36 +65,36 @@ func (x *EntryType) UnmarshalJSON(data []byte) error { return nil } -type ConfigType int32 +type ConfigChangeType int32 const ( - ConfigAddNode ConfigType = 0 - ConfigRemoveNode ConfigType = 1 + ConfigChangeAddNode ConfigChangeType = 0 + ConfigChangeRemoveNode ConfigChangeType = 1 ) -var ConfigType_name = map[int32]string{ - 0: "ConfigAddNode", - 1: "ConfigRemoveNode", +var ConfigChangeType_name = map[int32]string{ + 0: "ConfigChangeAddNode", + 1: "ConfigChangeRemoveNode", } -var ConfigType_value = map[string]int32{ - "ConfigAddNode": 0, - "ConfigRemoveNode": 1, +var ConfigChangeType_value = map[string]int32{ + "ConfigChangeAddNode": 0, + "ConfigChangeRemoveNode": 1, } -func (x ConfigType) Enum() *ConfigType { - p := new(ConfigType) +func (x ConfigChangeType) Enum() *ConfigChangeType { + p := new(ConfigChangeType) *p = x return p } -func (x ConfigType) String() string { - return proto.EnumName(ConfigType_name, int32(x)) +func (x ConfigChangeType) String() string { + return proto.EnumName(ConfigChangeType_name, int32(x)) } -func (x *ConfigType) UnmarshalJSON(data []byte) error { - value, err := proto.UnmarshalJSONEnum(ConfigType_value, data, "ConfigType") +func (x *ConfigChangeType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(ConfigChangeType_value, data, "ConfigChangeType") if err != nil { return err } - *x = ConfigType(value) + *x = ConfigChangeType(value) return nil } @@ -159,21 +159,21 @@ func (m *HardState) Reset() { *m = HardState{} } func (m *HardState) String() string { return proto.CompactTextString(m) } func (*HardState) ProtoMessage() {} -type Config struct { - ID int64 `protobuf:"varint,1,req" json:"ID"` - Type ConfigType `protobuf:"varint,2,req,enum=raftpb.ConfigType" json:"Type"` - NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` - Context []byte `protobuf:"bytes,4,opt" json:"Context"` - XXX_unrecognized []byte `json:"-"` +type ConfigChange struct { + ID int64 `protobuf:"varint,1,req" json:"ID"` + Type ConfigChangeType `protobuf:"varint,2,req,enum=raftpb.ConfigChangeType" json:"Type"` + NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` + Context []byte `protobuf:"bytes,4,opt" json:"Context"` + XXX_unrecognized []byte `json:"-"` } -func (m *Config) Reset() { *m = Config{} } -func (m *Config) String() string { return proto.CompactTextString(m) } -func (*Config) ProtoMessage() {} +func (m *ConfigChange) Reset() { *m = ConfigChange{} } +func (m *ConfigChange) String() string { return proto.CompactTextString(m) } +func (*ConfigChange) ProtoMessage() {} func init() { proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value) - proto.RegisterEnum("raftpb.ConfigType", ConfigType_name, ConfigType_value) + proto.RegisterEnum("raftpb.ConfigChangeType", ConfigChangeType_name, ConfigChangeType_value) } func (m *Info) Unmarshal(data []byte) error { l := len(data) @@ -733,7 +733,7 @@ func (m *HardState) Unmarshal(data []byte) error { } return nil } -func (m *Config) Unmarshal(data []byte) error { +func (m *ConfigChange) Unmarshal(data []byte) error { l := len(data) index := 0 for index < l { @@ -777,7 +777,7 @@ func (m *Config) Unmarshal(data []byte) error { } b := data[index] index++ - m.Type |= (ConfigType(b) & 0x7F) << shift + m.Type |= (ConfigChangeType(b) & 0x7F) << shift if b < 0x80 { break } @@ -915,7 +915,7 @@ func (m *HardState) Size() (n int) { } return n } -func (m *Config) Size() (n int) { +func (m *ConfigChange) Size() (n int) { var l int _ = l n += 1 + sovRaft(uint64(m.ID)) @@ -1131,7 +1131,7 @@ func (m *HardState) MarshalTo(data []byte) (n int, err error) { } return i, nil } -func (m *Config) Marshal() (data []byte, err error) { +func (m *ConfigChange) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) n, err := m.MarshalTo(data) @@ -1141,7 +1141,7 @@ func (m *Config) Marshal() (data []byte, err error) { return data[:n], nil } -func (m *Config) MarshalTo(data []byte) (n int, err error) { +func (m *ConfigChange) MarshalTo(data []byte) (n int, err error) { var i int _ = i var l int diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 38f0db45f..dcc34587a 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -13,8 +13,8 @@ message Info { } enum EntryType { - EntryNormal = 0; - EntryConfig = 1; + EntryNormal = 0; + EntryConfigChange = 1; } message Entry { @@ -49,14 +49,14 @@ message HardState { required int64 commit = 3 [(gogoproto.nullable) = false]; } -enum ConfigType { - ConfigAddNode = 0; - ConfigRemoveNode = 1; +enum ConfigChangeType { + ConfigChangeAddNode = 0; + ConfigChangeRemoveNode = 1; } -message Config { - required int64 ID = 1 [(gogoproto.nullable) = false]; - required ConfigType Type = 2 [(gogoproto.nullable) = false]; - required int64 NodeID = 3 [(gogoproto.nullable) = false]; - optional bytes Context = 4 [(gogoproto.nullable) = false]; +message ConfigChange { + required int64 ID = 1 [(gogoproto.nullable) = false]; + required ConfigChangeType Type = 2 [(gogoproto.nullable) = false]; + required int64 NodeID = 3 [(gogoproto.nullable) = false]; + optional bytes Context = 4 [(gogoproto.nullable) = false]; } From bc7b0108dc88960cc71d5b76a96deed351d3fd72 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 23 Sep 2014 12:02:44 -0700 Subject: [PATCH 10/11] raft: ConfigChange -> ConfChange --- etcdserver/server.go | 18 +++++----- etcdserver/server_test.go | 38 ++++++++++---------- raft/doc.go | 10 +++--- raft/node.go | 26 +++++++------- raft/raft.go | 4 +-- raft/raft_test.go | 16 ++++----- raft/raftpb/raft.pb.go | 74 +++++++++++++++++++-------------------- raft/raftpb/raft.proto | 20 +++++------ 8 files changed, 103 insertions(+), 103 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 420f8b038..b0f16a1c9 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -128,12 +128,12 @@ func (s *EtcdServer) run() { panic("TODO: this is bad, what do we do about it?") } s.w.Trigger(r.Id, s.apply(r)) - case raftpb.EntryConfigChange: - var cc raftpb.ConfigChange + case raftpb.EntryConfChange: + var cc raftpb.ConfChange if err := cc.Unmarshal(e.Data); err != nil { panic("TODO: this is bad, what do we do about it?") } - s.Node.ApplyConfigChange(cc) + s.Node.ApplyConfChange(cc) s.w.Trigger(cc.ID, nil) default: panic("unexpected entry type") @@ -231,9 +231,9 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error { - cc := raftpb.ConfigChange{ + cc := raftpb.ConfChange{ ID: GenID(), - Type: raftpb.ConfigChangeAddNode, + Type: raftpb.ConfChangeAddNode, NodeID: id, Context: context, } @@ -241,9 +241,9 @@ func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) erro } func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { - cc := raftpb.ConfigChange{ + cc := raftpb.ConfChange{ ID: GenID(), - Type: raftpb.ConfigChangeRemoveNode, + Type: raftpb.ConfChangeRemoveNode, NodeID: id, } return s.configure(ctx, cc) @@ -251,9 +251,9 @@ func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { // configure sends configuration change through consensus then performs it. // It will block until the change is performed or there is an error. -func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfigChange) error { +func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error { ch := s.w.Register(cc.ID) - if err := s.Node.ProposeConfigChange(ctx, cc); err != nil { + if err := s.Node.ProposeConfChange(ctx, cc); err != nil { log.Printf("configure error: %v", err) s.w.Trigger(cc.ID, nil) return err diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 7577dc306..6661caccb 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -596,7 +596,7 @@ func TestRecvSlowSnapshot(t *testing.T) { // TestAddNode tests AddNode could propose and perform node addition. func TestAddNode(t *testing.T) { - n := newNodeConfigChangeCommitterRecorder() + n := newNodeConfChangeCommitterRecorder() s := &EtcdServer{ Node: n, Store: &storeRecorder{}, @@ -608,7 +608,7 @@ func TestAddNode(t *testing.T) { action := n.Action() s.Stop() - waction := []string{"ProposeConfigChange:ConfigChangeAddNode", "ApplyConfigChange:ConfigChangeAddNode"} + waction := []string{"ProposeConfChange:ConfChangeAddNode", "ApplyConfChange:ConfChangeAddNode"} if !reflect.DeepEqual(action, waction) { t.Errorf("action = %v, want %v", action, waction) } @@ -616,7 +616,7 @@ func TestAddNode(t *testing.T) { // TestRemoveNode tests RemoveNode could propose and perform node removal. func TestRemoveNode(t *testing.T) { - n := newNodeConfigChangeCommitterRecorder() + n := newNodeConfChangeCommitterRecorder() s := &EtcdServer{ Node: n, Store: &storeRecorder{}, @@ -628,7 +628,7 @@ func TestRemoveNode(t *testing.T) { action := n.Action() s.Stop() - waction := []string{"ProposeConfigChange:ConfigChangeRemoveNode", "ApplyConfigChange:ConfigChangeRemoveNode"} + waction := []string{"ProposeConfChange:ConfChangeRemoveNode", "ApplyConfChange:ConfChangeRemoveNode"} if !reflect.DeepEqual(action, waction) { t.Errorf("action = %v, want %v", action, waction) } @@ -805,12 +805,12 @@ func newReadyNode() *readyNode { func (n *readyNode) Tick() {} func (n *readyNode) Campaign(ctx context.Context) error { return nil } func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil } -func (n *readyNode) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error { +func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { return nil } func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil } func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } -func (n *readyNode) ApplyConfigChange(conf raftpb.ConfigChange) {} +func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {} func (n *readyNode) Stop() {} func (n *readyNode) Compact(d []byte) {} @@ -829,8 +829,8 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error { n.record("Propose") return nil } -func (n *nodeRecorder) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error { - n.record("ProposeConfigChange") +func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { + n.record("ProposeConfChange") return nil } func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error { @@ -838,8 +838,8 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error { return nil } func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil } -func (n *nodeRecorder) ApplyConfigChange(conf raftpb.ConfigChange) { - n.record("ApplyConfigChange") +func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) { + n.record("ApplyConfChange") } func (n *nodeRecorder) Stop() { n.record("Stop") @@ -878,28 +878,28 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) return nil } -type nodeConfigChangeCommitterRecorder struct { +type nodeConfChangeCommitterRecorder struct { nodeRecorder readyc chan raft.Ready } -func newNodeConfigChangeCommitterRecorder() *nodeConfigChangeCommitterRecorder { +func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder { readyc := make(chan raft.Ready, 1) readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}} - return &nodeConfigChangeCommitterRecorder{readyc: readyc} + return &nodeConfChangeCommitterRecorder{readyc: readyc} } -func (n *nodeConfigChangeCommitterRecorder) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error { +func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { data, err := conf.Marshal() if err != nil { return err } - n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfigChange, Data: data}}} - n.record("ProposeConfigChange:" + conf.Type.String()) + n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange, Data: data}}} + n.record("ProposeConfChange:" + conf.Type.String()) return nil } -func (n *nodeConfigChangeCommitterRecorder) Ready() <-chan raft.Ready { +func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready { return n.readyc } -func (n *nodeConfigChangeCommitterRecorder) ApplyConfigChange(conf raftpb.ConfigChange) { - n.record("ApplyConfigChange:" + conf.Type.String()) +func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) { + n.record("ApplyConfChange:" + conf.Type.String()) } diff --git a/raft/doc.go b/raft/doc.go index 878a33645..3a0b22278 100644 --- a/raft/doc.go +++ b/raft/doc.go @@ -64,16 +64,16 @@ data, serialize it into a byte slice and call: If the proposal is committed, data will appear in committed entries with type raftpb.EntryNormal. -To add or remove node in a cluster, build ConfigChange struct 'cc' and call: +To add or remove node in a cluster, build ConfChange struct 'cc' and call: - n.ProposeConfigChange(ctx, cc) + n.ProposeConfChange(ctx, cc) After config change is committed, some committed entry with type -raftpb.EntryConfigChange will be returned. You should apply it to node through: +raftpb.EntryConfChange will be returned. You should apply it to node through: - var cc raftpb.ConfigChange + var cc raftpb.ConfChange cc.Unmarshal(data) - n.ApplyConfigChange(cc) + n.ApplyConfChange(cc) */ package raft diff --git a/raft/node.go b/raft/node.go index f23892248..f0ef4d260 100644 --- a/raft/node.go +++ b/raft/node.go @@ -80,18 +80,18 @@ type Node interface { Campaign(ctx context.Context) error // Propose proposes that data be appended to the log. Propose(ctx context.Context, data []byte) error - // ProposeConfigChange proposes config change. - // At most one ConfigChange can be in the process of going through consensus. - // Application needs to call ApplyConfigChange when applying EntryConfigChange type entry. - ProposeConfigChange(ctx context.Context, cc pb.ConfigChange) error + // ProposeConfChange proposes config change. + // At most one ConfChange can be in the process of going through consensus. + // Application needs to call ApplyConfChange when applying EntryConfChange type entry. + ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state Ready() <-chan Ready - // ApplyConfigChange applies config change to the local node. + // ApplyConfChange applies config change to the local node. // TODO: reject existing node when add node // TODO: reject non-existant node when remove node - ApplyConfigChange(cc pb.ConfigChange) + ApplyConfChange(cc pb.ConfChange) // Stop performs any necessary termination of the Node Stop() // Compact @@ -127,7 +127,7 @@ type node struct { propc chan pb.Message recvc chan pb.Message compactc chan []byte - confc chan pb.ConfigChange + confc chan pb.ConfChange readyc chan Ready tickc chan struct{} done chan struct{} @@ -138,7 +138,7 @@ func newNode() node { propc: make(chan pb.Message), recvc: make(chan pb.Message), compactc: make(chan []byte), - confc: make(chan pb.ConfigChange), + confc: make(chan pb.ConfChange), readyc: make(chan Ready), tickc: make(chan struct{}), done: make(chan struct{}), @@ -189,9 +189,9 @@ func (n *node) run(r *raft) { r.compact(d) case cc := <-n.confc: switch cc.Type { - case pb.ConfigChangeAddNode: + case pb.ConfChangeAddNode: r.addNode(cc.NodeID) - case pb.ConfigChangeRemoveNode: + case pb.ConfChangeRemoveNode: r.removeNode(cc.NodeID) default: panic("unexpected conf type") @@ -238,12 +238,12 @@ func (n *node) Propose(ctx context.Context, data []byte) error { return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}}) } -func (n *node) ProposeConfigChange(ctx context.Context, cc pb.ConfigChange) error { +func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { data, err := cc.Marshal() if err != nil { return err } - return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange, Data: data}}}) + return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) } // Step advances the state machine using msgs. The ctx.Err() will be returned, @@ -268,7 +268,7 @@ func (n *node) Ready() <-chan Ready { return n.readyc } -func (n *node) ApplyConfigChange(cc pb.ConfigChange) { +func (n *node) ApplyConfChange(cc pb.ConfChange) { select { case n.confc <- cc: case <-n.done: diff --git a/raft/raft.go b/raft/raft.go index ede8e2634..eb69f903d 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -313,7 +313,7 @@ func (r *raft) becomeLeader() { r.lead = r.id r.state = StateLeader for _, e := range r.raftLog.entries(r.raftLog.committed + 1) { - if e.Type != pb.EntryConfigChange { + if e.Type != pb.EntryConfChange { continue } if r.pendingConf { @@ -407,7 +407,7 @@ func stepLeader(r *raft, m pb.Message) { panic("unexpected length(entries) of a msgProp") } e := m.Entries[0] - if e.Type == pb.EntryConfigChange { + if e.Type == pb.EntryConfChange { if r.pendingConf { return } diff --git a/raft/raft_test.go b/raft/raft_test.go index dade6004d..106d4fd34 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -948,7 +948,7 @@ func TestSlowNodeRestore(t *testing.T) { } } -// TestStepConfig tests that when raft step msgProp in EntryConfigChange type, +// TestStepConfig tests that when raft step msgProp in EntryConfChange type, // it appends the entry to log and sets pendingConf to be true. func TestStepConfig(t *testing.T) { // a raft that cannot make progress @@ -956,7 +956,7 @@ func TestStepConfig(t *testing.T) { r.becomeCandidate() r.becomeLeader() index := r.raftLog.lastIndex() - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}}) + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) if g := r.raftLog.lastIndex(); g != index+1 { t.Errorf("index = %d, want %d", g, index+1) } @@ -966,17 +966,17 @@ func TestStepConfig(t *testing.T) { } // TestStepIgnoreConfig tests that if raft step the second msgProp in -// EntryConfigChange type when the first one is uncommitted, the node will deny +// EntryConfChange type when the first one is uncommitted, the node will deny // the proposal and keep its original state. func TestStepIgnoreConfig(t *testing.T) { // a raft that cannot make progress r := newRaft(1, []int64{1, 2}, 0, 0) r.becomeCandidate() r.becomeLeader() - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}}) + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) index := r.raftLog.lastIndex() pendingConf := r.pendingConf - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}}) + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) if g := r.raftLog.lastIndex(); g != index { t.Errorf("index = %d, want %d", g, index) } @@ -993,7 +993,7 @@ func TestRecoverPendingConfig(t *testing.T) { wpending bool }{ {pb.EntryNormal, false}, - {pb.EntryConfigChange, true}, + {pb.EntryConfChange, true}, } for i, tt := range tests { r := newRaft(1, []int64{1, 2}, 0, 0) @@ -1016,8 +1016,8 @@ func TestRecoverDoublePendingConfig(t *testing.T) { } }() r := newRaft(1, []int64{1, 2}, 0, 0) - r.appendEntry(pb.Entry{Type: pb.EntryConfigChange}) - r.appendEntry(pb.Entry{Type: pb.EntryConfigChange}) + r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) + r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) r.becomeCandidate() r.becomeLeader() }() diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 0474b7cd8..a2852b71a 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -14,7 +14,7 @@ Snapshot Message HardState - ConfigChange + ConfChange */ package raftpb @@ -35,17 +35,17 @@ var _ = math.Inf type EntryType int32 const ( - EntryNormal EntryType = 0 - EntryConfigChange EntryType = 1 + EntryNormal EntryType = 0 + EntryConfChange EntryType = 1 ) var EntryType_name = map[int32]string{ 0: "EntryNormal", - 1: "EntryConfigChange", + 1: "EntryConfChange", } var EntryType_value = map[string]int32{ - "EntryNormal": 0, - "EntryConfigChange": 1, + "EntryNormal": 0, + "EntryConfChange": 1, } func (x EntryType) Enum() *EntryType { @@ -65,36 +65,36 @@ func (x *EntryType) UnmarshalJSON(data []byte) error { return nil } -type ConfigChangeType int32 +type ConfChangeType int32 const ( - ConfigChangeAddNode ConfigChangeType = 0 - ConfigChangeRemoveNode ConfigChangeType = 1 + ConfChangeAddNode ConfChangeType = 0 + ConfChangeRemoveNode ConfChangeType = 1 ) -var ConfigChangeType_name = map[int32]string{ - 0: "ConfigChangeAddNode", - 1: "ConfigChangeRemoveNode", +var ConfChangeType_name = map[int32]string{ + 0: "ConfChangeAddNode", + 1: "ConfChangeRemoveNode", } -var ConfigChangeType_value = map[string]int32{ - "ConfigChangeAddNode": 0, - "ConfigChangeRemoveNode": 1, +var ConfChangeType_value = map[string]int32{ + "ConfChangeAddNode": 0, + "ConfChangeRemoveNode": 1, } -func (x ConfigChangeType) Enum() *ConfigChangeType { - p := new(ConfigChangeType) +func (x ConfChangeType) Enum() *ConfChangeType { + p := new(ConfChangeType) *p = x return p } -func (x ConfigChangeType) String() string { - return proto.EnumName(ConfigChangeType_name, int32(x)) +func (x ConfChangeType) String() string { + return proto.EnumName(ConfChangeType_name, int32(x)) } -func (x *ConfigChangeType) UnmarshalJSON(data []byte) error { - value, err := proto.UnmarshalJSONEnum(ConfigChangeType_value, data, "ConfigChangeType") +func (x *ConfChangeType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(ConfChangeType_value, data, "ConfChangeType") if err != nil { return err } - *x = ConfigChangeType(value) + *x = ConfChangeType(value) return nil } @@ -159,21 +159,21 @@ func (m *HardState) Reset() { *m = HardState{} } func (m *HardState) String() string { return proto.CompactTextString(m) } func (*HardState) ProtoMessage() {} -type ConfigChange struct { - ID int64 `protobuf:"varint,1,req" json:"ID"` - Type ConfigChangeType `protobuf:"varint,2,req,enum=raftpb.ConfigChangeType" json:"Type"` - NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` - Context []byte `protobuf:"bytes,4,opt" json:"Context"` - XXX_unrecognized []byte `json:"-"` +type ConfChange struct { + ID int64 `protobuf:"varint,1,req" json:"ID"` + Type ConfChangeType `protobuf:"varint,2,req,enum=raftpb.ConfChangeType" json:"Type"` + NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` + Context []byte `protobuf:"bytes,4,opt" json:"Context"` + XXX_unrecognized []byte `json:"-"` } -func (m *ConfigChange) Reset() { *m = ConfigChange{} } -func (m *ConfigChange) String() string { return proto.CompactTextString(m) } -func (*ConfigChange) ProtoMessage() {} +func (m *ConfChange) Reset() { *m = ConfChange{} } +func (m *ConfChange) String() string { return proto.CompactTextString(m) } +func (*ConfChange) ProtoMessage() {} func init() { proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value) - proto.RegisterEnum("raftpb.ConfigChangeType", ConfigChangeType_name, ConfigChangeType_value) + proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value) } func (m *Info) Unmarshal(data []byte) error { l := len(data) @@ -733,7 +733,7 @@ func (m *HardState) Unmarshal(data []byte) error { } return nil } -func (m *ConfigChange) Unmarshal(data []byte) error { +func (m *ConfChange) Unmarshal(data []byte) error { l := len(data) index := 0 for index < l { @@ -777,7 +777,7 @@ func (m *ConfigChange) Unmarshal(data []byte) error { } b := data[index] index++ - m.Type |= (ConfigChangeType(b) & 0x7F) << shift + m.Type |= (ConfChangeType(b) & 0x7F) << shift if b < 0x80 { break } @@ -915,7 +915,7 @@ func (m *HardState) Size() (n int) { } return n } -func (m *ConfigChange) Size() (n int) { +func (m *ConfChange) Size() (n int) { var l int _ = l n += 1 + sovRaft(uint64(m.ID)) @@ -1131,7 +1131,7 @@ func (m *HardState) MarshalTo(data []byte) (n int, err error) { } return i, nil } -func (m *ConfigChange) Marshal() (data []byte, err error) { +func (m *ConfChange) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) n, err := m.MarshalTo(data) @@ -1141,7 +1141,7 @@ func (m *ConfigChange) Marshal() (data []byte, err error) { return data[:n], nil } -func (m *ConfigChange) MarshalTo(data []byte) (n int, err error) { +func (m *ConfChange) MarshalTo(data []byte) (n int, err error) { var i int _ = i var l int diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index dcc34587a..2c3b9d637 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -13,8 +13,8 @@ message Info { } enum EntryType { - EntryNormal = 0; - EntryConfigChange = 1; + EntryNormal = 0; + EntryConfChange = 1; } message Entry { @@ -49,14 +49,14 @@ message HardState { required int64 commit = 3 [(gogoproto.nullable) = false]; } -enum ConfigChangeType { - ConfigChangeAddNode = 0; - ConfigChangeRemoveNode = 1; +enum ConfChangeType { + ConfChangeAddNode = 0; + ConfChangeRemoveNode = 1; } -message ConfigChange { - required int64 ID = 1 [(gogoproto.nullable) = false]; - required ConfigChangeType Type = 2 [(gogoproto.nullable) = false]; - required int64 NodeID = 3 [(gogoproto.nullable) = false]; - optional bytes Context = 4 [(gogoproto.nullable) = false]; +message ConfChange { + required int64 ID = 1 [(gogoproto.nullable) = false]; + required ConfChangeType Type = 2 [(gogoproto.nullable) = false]; + required int64 NodeID = 3 [(gogoproto.nullable) = false]; + optional bytes Context = 4 [(gogoproto.nullable) = false]; } From c6cb635e017552524f9b700925f92f50a2632486 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 23 Sep 2014 13:03:30 -0700 Subject: [PATCH 11/11] etcdserver: refine comments of config change tests --- etcdserver/server_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 6661caccb..d63abe781 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -594,7 +594,7 @@ func TestRecvSlowSnapshot(t *testing.T) { } } -// TestAddNode tests AddNode could propose and perform node addition. +// TestAddNode tests AddNode can propose and perform node addition. func TestAddNode(t *testing.T) { n := newNodeConfChangeCommitterRecorder() s := &EtcdServer{ @@ -614,7 +614,7 @@ func TestAddNode(t *testing.T) { } } -// TestRemoveNode tests RemoveNode could propose and perform node removal. +// TestRemoveNode tests RemoveNode can propose and perform node removal. func TestRemoveNode(t *testing.T) { n := newNodeConfChangeCommitterRecorder() s := &EtcdServer{