diff --git a/etcdserver/server.go b/etcdserver/server.go index 905213acd..b0f16a1c9 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -121,11 +121,23 @@ func (s *EtcdServer) run() { // care to apply entries in a single goroutine, and not // race them. for _, e := range rd.CommittedEntries { - var r pb.Request - if err := r.Unmarshal(e.Data); err != nil { - panic("TODO: this is bad, what do we do about it?") + switch e.Type { + case raftpb.EntryNormal: + var r pb.Request + if err := r.Unmarshal(e.Data); err != nil { + panic("TODO: this is bad, what do we do about it?") + } + s.w.Trigger(r.Id, s.apply(r)) + case raftpb.EntryConfChange: + var cc raftpb.ConfChange + if err := cc.Unmarshal(e.Data); err != nil { + panic("TODO: this is bad, what do we do about it?") + } + s.Node.ApplyConfChange(cc) + s.w.Trigger(cc.ID, nil) + default: + panic("unexpected entry type") } - s.w.Trigger(r.Id, s.apply(r)) appliedi = e.Index } @@ -218,6 +230,45 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } } +func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error { + cc := raftpb.ConfChange{ + ID: GenID(), + Type: raftpb.ConfChangeAddNode, + NodeID: id, + Context: context, + } + return s.configure(ctx, cc) +} + +func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { + cc := raftpb.ConfChange{ + ID: GenID(), + Type: raftpb.ConfChangeRemoveNode, + NodeID: id, + } + return s.configure(ctx, cc) +} + +// configure sends configuration change through consensus then performs it. +// It will block until the change is performed or there is an error. +func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error { + ch := s.w.Register(cc.ID) + if err := s.Node.ProposeConfChange(ctx, cc); err != nil { + log.Printf("configure error: %v", err) + s.w.Trigger(cc.ID, nil) + return err + } + select { + case <-ch: + return nil + case <-ctx.Done(): + s.w.Trigger(cc.ID, nil) // GC wait + return ctx.Err() + case <-s.done: + return ErrStopped + } +} + // sync proposes a SYNC request and is non-blocking. // This makes no guarantee that the request will be proposed or performed. // The request will be cancelled after the given timeout. diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index ee3039a48..d63abe781 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -594,6 +594,46 @@ func TestRecvSlowSnapshot(t *testing.T) { } } +// TestAddNode tests AddNode can propose and perform node addition. +func TestAddNode(t *testing.T) { + n := newNodeConfChangeCommitterRecorder() + s := &EtcdServer{ + Node: n, + Store: &storeRecorder{}, + Send: func(_ []raftpb.Message) {}, + Storage: &storageRecorder{}, + } + s.Start() + s.AddNode(context.TODO(), 1, []byte("foo")) + action := n.Action() + s.Stop() + + waction := []string{"ProposeConfChange:ConfChangeAddNode", "ApplyConfChange:ConfChangeAddNode"} + if !reflect.DeepEqual(action, waction) { + t.Errorf("action = %v, want %v", action, waction) + } +} + +// TestRemoveNode tests RemoveNode can propose and perform node removal. +func TestRemoveNode(t *testing.T) { + n := newNodeConfChangeCommitterRecorder() + s := &EtcdServer{ + Node: n, + Store: &storeRecorder{}, + Send: func(_ []raftpb.Message) {}, + Storage: &storageRecorder{}, + } + s.Start() + s.RemoveNode(context.TODO(), 1) + action := n.Action() + s.Stop() + + waction := []string{"ProposeConfChange:ConfChangeRemoveNode", "ApplyConfChange:ConfChangeRemoveNode"} + if !reflect.DeepEqual(action, waction) { + t.Errorf("action = %v, want %v", action, waction) + } +} + // TODO: test wait trigger correctness in multi-server case func TestGetBool(t *testing.T) { @@ -762,16 +802,17 @@ func newReadyNode() *readyNode { readyc := make(chan raft.Ready, 1) return &readyNode{readyc: readyc} } -func (n *readyNode) Tick() {} -func (n *readyNode) Campaign(ctx context.Context) error { return nil } -func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil } -func (n *readyNode) Configure(ctx context.Context, data []byte) error { return nil } +func (n *readyNode) Tick() {} +func (n *readyNode) Campaign(ctx context.Context) error { return nil } +func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil } +func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { + return nil +} func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil } func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } +func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {} func (n *readyNode) Stop() {} func (n *readyNode) Compact(d []byte) {} -func (n *readyNode) AddNode(id int64) {} -func (n *readyNode) RemoveNode(id int64) {} type nodeRecorder struct { recorder @@ -788,13 +829,17 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error { n.record("Propose") return nil } +func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { + n.record("ProposeConfChange") + return nil +} func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error { n.record("Step") return nil } -func (n *nodeRecorder) Ready() <-chan raft.Ready { - n.record("Ready") - return nil +func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil } +func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) { + n.record("ApplyConfChange") } func (n *nodeRecorder) Stop() { n.record("Stop") @@ -832,3 +877,29 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) n.record("Propose blocked") return nil } + +type nodeConfChangeCommitterRecorder struct { + nodeRecorder + readyc chan raft.Ready +} + +func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder { + readyc := make(chan raft.Ready, 1) + readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}} + return &nodeConfChangeCommitterRecorder{readyc: readyc} +} +func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { + data, err := conf.Marshal() + if err != nil { + return err + } + n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange, Data: data}}} + n.record("ProposeConfChange:" + conf.Type.String()) + return nil +} +func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready { + return n.readyc +} +func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) { + n.record("ApplyConfChange:" + conf.Type.String()) +} diff --git a/raft/doc.go b/raft/doc.go index d9e3d2a3b..3a0b22278 100644 --- a/raft/doc.go +++ b/raft/doc.go @@ -61,5 +61,19 @@ data, serialize it into a byte slice and call: n.Propose(ctx, data) +If the proposal is committed, data will appear in committed entries with type +raftpb.EntryNormal. + +To add or remove node in a cluster, build ConfChange struct 'cc' and call: + + n.ProposeConfChange(ctx, cc) + +After config change is committed, some committed entry with type +raftpb.EntryConfChange will be returned. You should apply it to node through: + + var cc raftpb.ConfChange + cc.Unmarshal(data) + n.ApplyConfChange(cc) + */ package raft diff --git a/raft/node.go b/raft/node.go index d93c1292e..f0ef4d260 100644 --- a/raft/node.go +++ b/raft/node.go @@ -76,14 +76,22 @@ type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election // timeouts and heartbeat timeouts are in units of ticks. Tick() - // Campaign causes the Node to transition to candidate state and start campaigning to become leader + // Campaign causes the Node to transition to candidate state and start campaigning to become leader. Campaign(ctx context.Context) error // Propose proposes that data be appended to the log. Propose(ctx context.Context, data []byte) error + // ProposeConfChange proposes config change. + // At most one ConfChange can be in the process of going through consensus. + // Application needs to call ApplyConfChange when applying EntryConfChange type entry. + ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state Ready() <-chan Ready + // ApplyConfChange applies config change to the local node. + // TODO: reject existing node when add node + // TODO: reject non-existant node when remove node + ApplyConfChange(cc pb.ConfChange) // Stop performs any necessary termination of the Node Stop() // Compact @@ -119,6 +127,7 @@ type node struct { propc chan pb.Message recvc chan pb.Message compactc chan []byte + confc chan pb.ConfChange readyc chan Ready tickc chan struct{} done chan struct{} @@ -129,6 +138,7 @@ func newNode() node { propc: make(chan pb.Message), recvc: make(chan pb.Message), compactc: make(chan []byte), + confc: make(chan pb.ConfChange), readyc: make(chan Ready), tickc: make(chan struct{}), done: make(chan struct{}), @@ -167,6 +177,9 @@ func (n *node) run(r *raft) { } select { + // TODO: maybe buffer the config propose if there exists one (the way + // described in raft dissertation) + // Currently it is dropped in Step silently. case m := <-propc: m.From = r.id r.Step(m) @@ -174,6 +187,15 @@ func (n *node) run(r *raft) { r.Step(m) // raft never returns an error case d := <-n.compactc: r.compact(d) + case cc := <-n.confc: + switch cc.Type { + case pb.ConfChangeAddNode: + r.addNode(cc.NodeID) + case pb.ConfChangeRemoveNode: + r.removeNode(cc.NodeID) + default: + panic("unexpected conf type") + } case <-n.tickc: r.tick() case readyc <- rd: @@ -186,6 +208,10 @@ func (n *node) run(r *raft) { if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Index } + // TODO(yichengq): we assume that all committed config + // entries will be applied to make things easy for now. + // TODO(yichengq): it may have race because applied is set + // before entries are applied. r.raftLog.resetNextEnts() r.raftLog.resetUnstable() r.msgs = nil @@ -212,6 +238,14 @@ func (n *node) Propose(ctx context.Context, data []byte) error { return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}}) } +func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { + data, err := cc.Marshal() + if err != nil { + return err + } + return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) +} + // Step advances the state machine using msgs. The ctx.Err() will be returned, // if any. func (n *node) Step(ctx context.Context, m pb.Message) error { @@ -234,6 +268,13 @@ func (n *node) Ready() <-chan Ready { return n.readyc } +func (n *node) ApplyConfChange(cc pb.ConfChange) { + select { + case n.confc <- cc: + case <-n.done: + } +} + func (n *node) Compact(d []byte) { select { case n.compactc <- d: diff --git a/raft/raft.go b/raft/raft.go index 1097bb36f..eb69f903d 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -105,6 +105,9 @@ type raft struct { // the leader id lead int64 + // New configuration is ignored if there exists unapplied configuration. + pendingConf bool + elapsed int // number of ticks since the last msg heartbeatTimeout int electionTimeout int @@ -245,6 +248,7 @@ func (r *raft) reset(term int64) { r.prs[i].match = r.raftLog.lastIndex() } } + r.pendingConf = false } func (r *raft) q() int { @@ -308,6 +312,15 @@ func (r *raft) becomeLeader() { r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader + for _, e := range r.raftLog.entries(r.raftLog.committed + 1) { + if e.Type != pb.EntryConfChange { + continue + } + if r.pendingConf { + panic("unexpected double uncommitted config entry") + } + r.pendingConf = true + } r.appendEntry(pb.Entry{Data: nil}) } @@ -373,6 +386,16 @@ func (r *raft) handleSnapshot(m pb.Message) { } } +func (r *raft) addNode(id int64) { + r.setProgress(id, 0, r.raftLog.lastIndex()+1) + r.pendingConf = false +} + +func (r *raft) removeNode(id int64) { + r.delProgress(id) + r.pendingConf = false +} + type stepFunc func(r *raft, m pb.Message) func stepLeader(r *raft, m pb.Message) { @@ -384,6 +407,12 @@ func stepLeader(r *raft, m pb.Message) { panic("unexpected length(entries) of a msgProp") } e := m.Entries[0] + if e.Type == pb.EntryConfChange { + if r.pendingConf { + return + } + r.pendingConf = true + } r.appendEntry(e) r.bcastAppend() case msgAppResp: diff --git a/raft/raft_test.go b/raft/raft_test.go index 24c15e622..106d4fd34 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -948,6 +948,111 @@ func TestSlowNodeRestore(t *testing.T) { } } +// TestStepConfig tests that when raft step msgProp in EntryConfChange type, +// it appends the entry to log and sets pendingConf to be true. +func TestStepConfig(t *testing.T) { + // a raft that cannot make progress + r := newRaft(1, []int64{1, 2}, 0, 0) + r.becomeCandidate() + r.becomeLeader() + index := r.raftLog.lastIndex() + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) + if g := r.raftLog.lastIndex(); g != index+1 { + t.Errorf("index = %d, want %d", g, index+1) + } + if r.pendingConf != true { + t.Errorf("pendingConf = %v, want true", r.pendingConf) + } +} + +// TestStepIgnoreConfig tests that if raft step the second msgProp in +// EntryConfChange type when the first one is uncommitted, the node will deny +// the proposal and keep its original state. +func TestStepIgnoreConfig(t *testing.T) { + // a raft that cannot make progress + r := newRaft(1, []int64{1, 2}, 0, 0) + r.becomeCandidate() + r.becomeLeader() + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) + index := r.raftLog.lastIndex() + pendingConf := r.pendingConf + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) + if g := r.raftLog.lastIndex(); g != index { + t.Errorf("index = %d, want %d", g, index) + } + if r.pendingConf != pendingConf { + t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf) + } +} + +// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag +// based on uncommitted entries. +func TestRecoverPendingConfig(t *testing.T) { + tests := []struct { + entType pb.EntryType + wpending bool + }{ + {pb.EntryNormal, false}, + {pb.EntryConfChange, true}, + } + for i, tt := range tests { + r := newRaft(1, []int64{1, 2}, 0, 0) + r.appendEntry(pb.Entry{Type: tt.entType}) + r.becomeCandidate() + r.becomeLeader() + if r.pendingConf != tt.wpending { + t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending) + } + } +} + +// TestRecoverDoublePendingConfig tests that new leader will panic if +// there exist two uncommitted config entries. +func TestRecoverDoublePendingConfig(t *testing.T) { + func() { + defer func() { + if err := recover(); err == nil { + t.Errorf("expect panic, but nothing happens") + } + }() + r := newRaft(1, []int64{1, 2}, 0, 0) + r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) + r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) + r.becomeCandidate() + r.becomeLeader() + }() +} + +// TestAddNode tests that addNode could update pendingConf and peer list correctly. +func TestAddNode(t *testing.T) { + r := newRaft(1, []int64{1}, 0, 0) + r.pendingConf = true + r.addNode(2) + if r.pendingConf != false { + t.Errorf("pendingConf = %v, want false", r.pendingConf) + } + nodes := r.nodes() + sort.Sort(int64Slice(nodes)) + wnodes := []int64{1, 2} + if !reflect.DeepEqual(nodes, wnodes) { + t.Errorf("nodes = %v, want %v", nodes, wnodes) + } +} + +// TestRemoveNode tests that removeNode could update pendingConf and peer list correctly. +func TestRemoveNode(t *testing.T) { + r := newRaft(1, []int64{1, 2}, 0, 0) + r.pendingConf = true + r.removeNode(2) + if r.pendingConf != false { + t.Errorf("pendingConf = %v, want false", r.pendingConf) + } + w := []int64{1} + if g := r.nodes(); !reflect.DeepEqual(g, w) { + t.Errorf("nodes = %v, want %v", g, w) + } +} + func ents(terms ...int64) *raft { ents := []pb.Entry{{}} for _, term := range terms { diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 148c302e2..a2852b71a 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -14,6 +14,7 @@ Snapshot Message HardState + ConfChange */ package raftpb @@ -31,6 +32,72 @@ var _ = proto.Marshal var _ = &json.SyntaxError{} var _ = math.Inf +type EntryType int32 + +const ( + EntryNormal EntryType = 0 + EntryConfChange EntryType = 1 +) + +var EntryType_name = map[int32]string{ + 0: "EntryNormal", + 1: "EntryConfChange", +} +var EntryType_value = map[string]int32{ + "EntryNormal": 0, + "EntryConfChange": 1, +} + +func (x EntryType) Enum() *EntryType { + p := new(EntryType) + *p = x + return p +} +func (x EntryType) String() string { + return proto.EnumName(EntryType_name, int32(x)) +} +func (x *EntryType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(EntryType_value, data, "EntryType") + if err != nil { + return err + } + *x = EntryType(value) + return nil +} + +type ConfChangeType int32 + +const ( + ConfChangeAddNode ConfChangeType = 0 + ConfChangeRemoveNode ConfChangeType = 1 +) + +var ConfChangeType_name = map[int32]string{ + 0: "ConfChangeAddNode", + 1: "ConfChangeRemoveNode", +} +var ConfChangeType_value = map[string]int32{ + "ConfChangeAddNode": 0, + "ConfChangeRemoveNode": 1, +} + +func (x ConfChangeType) Enum() *ConfChangeType { + p := new(ConfChangeType) + *p = x + return p +} +func (x ConfChangeType) String() string { + return proto.EnumName(ConfChangeType_name, int32(x)) +} +func (x *ConfChangeType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(ConfChangeType_value, data, "ConfChangeType") + if err != nil { + return err + } + *x = ConfChangeType(value) + return nil +} + type Info struct { Id int64 `protobuf:"varint,1,req,name=id" json:"id"` XXX_unrecognized []byte `json:"-"` @@ -41,10 +108,11 @@ func (m *Info) String() string { return proto.CompactTextString(m) } func (*Info) ProtoMessage() {} type Entry struct { - Term int64 `protobuf:"varint,2,req,name=term" json:"term"` - Index int64 `protobuf:"varint,3,req,name=index" json:"index"` - Data []byte `protobuf:"bytes,4,opt,name=data" json:"data"` - XXX_unrecognized []byte `json:"-"` + Type EntryType `protobuf:"varint,1,req,enum=raftpb.EntryType" json:"Type"` + Term int64 `protobuf:"varint,2,req" json:"Term"` + Index int64 `protobuf:"varint,3,req" json:"Index"` + Data []byte `protobuf:"bytes,4,opt" json:"Data"` + XXX_unrecognized []byte `json:"-"` } func (m *Entry) Reset() { *m = Entry{} } @@ -91,7 +159,21 @@ func (m *HardState) Reset() { *m = HardState{} } func (m *HardState) String() string { return proto.CompactTextString(m) } func (*HardState) ProtoMessage() {} +type ConfChange struct { + ID int64 `protobuf:"varint,1,req" json:"ID"` + Type ConfChangeType `protobuf:"varint,2,req,enum=raftpb.ConfChangeType" json:"Type"` + NodeID int64 `protobuf:"varint,3,req" json:"NodeID"` + Context []byte `protobuf:"bytes,4,opt" json:"Context"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ConfChange) Reset() { *m = ConfChange{} } +func (m *ConfChange) String() string { return proto.CompactTextString(m) } +func (*ConfChange) ProtoMessage() {} + func init() { + proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value) + proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value) } func (m *Info) Unmarshal(data []byte) error { l := len(data) @@ -169,6 +251,21 @@ func (m *Entry) Unmarshal(data []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Type |= (EntryType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } case 2: if wireType != 0 { return code_google_com_p_gogoprotobuf_proto.ErrWrongType @@ -636,6 +733,115 @@ func (m *HardState) Unmarshal(data []byte) error { } return nil } +func (m *ConfChange) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.ID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Type |= (ConfChangeType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.NodeID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Context = append(m.Context, data[index:postIndex]...) + index = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} func (m *Info) Size() (n int) { var l int _ = l @@ -648,6 +854,7 @@ func (m *Info) Size() (n int) { func (m *Entry) Size() (n int) { var l int _ = l + n += 1 + sovRaft(uint64(m.Type)) n += 1 + sovRaft(uint64(m.Term)) n += 1 + sovRaft(uint64(m.Index)) l = len(m.Data) @@ -708,6 +915,19 @@ func (m *HardState) Size() (n int) { } return n } +func (m *ConfChange) Size() (n int) { + var l int + _ = l + n += 1 + sovRaft(uint64(m.ID)) + n += 1 + sovRaft(uint64(m.Type)) + n += 1 + sovRaft(uint64(m.NodeID)) + l = len(m.Context) + n += 1 + l + sovRaft(uint64(l)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} func sovRaft(x uint64) (n int) { for { @@ -760,6 +980,9 @@ func (m *Entry) MarshalTo(data []byte) (n int, err error) { _ = i var l int _ = l + data[i] = 0x8 + i++ + i = encodeVarintRaft(data, i, uint64(m.Type)) data[i] = 0x10 i++ i = encodeVarintRaft(data, i, uint64(m.Term)) @@ -908,6 +1131,39 @@ func (m *HardState) MarshalTo(data []byte) (n int, err error) { } return i, nil } +func (m *ConfChange) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *ConfChange) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintRaft(data, i, uint64(m.ID)) + data[i] = 0x10 + i++ + i = encodeVarintRaft(data, i, uint64(m.Type)) + data[i] = 0x18 + i++ + i = encodeVarintRaft(data, i, uint64(m.NodeID)) + data[i] = 0x22 + i++ + i = encodeVarintRaft(data, i, uint64(len(m.Context))) + i += copy(data[i:], m.Context) + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} func encodeFixed64Raft(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index d6bf94ab0..2c3b9d637 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -6,15 +6,22 @@ option (gogoproto.marshaler_all) = true; option (gogoproto.sizer_all) = true; option (gogoproto.unmarshaler_all) = true; option (gogoproto.goproto_getters_all) = false; +option (gogoproto.goproto_enum_prefix_all) = false; message Info { required int64 id = 1 [(gogoproto.nullable) = false]; } +enum EntryType { + EntryNormal = 0; + EntryConfChange = 1; +} + message Entry { - required int64 term = 2 [(gogoproto.nullable) = false]; - required int64 index = 3 [(gogoproto.nullable) = false]; - optional bytes data = 4 [(gogoproto.nullable) = false]; + required EntryType Type = 1 [(gogoproto.nullable) = false]; + required int64 Term = 2 [(gogoproto.nullable) = false]; + required int64 Index = 3 [(gogoproto.nullable) = false]; + optional bytes Data = 4 [(gogoproto.nullable) = false]; } message Snapshot { @@ -41,3 +48,15 @@ message HardState { required int64 vote = 2 [(gogoproto.nullable) = false]; required int64 commit = 3 [(gogoproto.nullable) = false]; } + +enum ConfChangeType { + ConfChangeAddNode = 0; + ConfChangeRemoveNode = 1; +} + +message ConfChange { + required int64 ID = 1 [(gogoproto.nullable) = false]; + required ConfChangeType Type = 2 [(gogoproto.nullable) = false]; + required int64 NodeID = 3 [(gogoproto.nullable) = false]; + optional bytes Context = 4 [(gogoproto.nullable) = false]; +}