Merge pull request #1100 from unihorn/129

raft: add Configure
release-2.0
Yicheng Qin 2014-09-23 13:50:10 -07:00
commit dcdc7913c0
8 changed files with 607 additions and 21 deletions

View File

@ -121,11 +121,23 @@ func (s *EtcdServer) run() {
// care to apply entries in a single goroutine, and not
// race them.
for _, e := range rd.CommittedEntries {
var r pb.Request
if err := r.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
switch e.Type {
case raftpb.EntryNormal:
var r pb.Request
if err := r.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
s.w.Trigger(r.Id, s.apply(r))
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
s.Node.ApplyConfChange(cc)
s.w.Trigger(cc.ID, nil)
default:
panic("unexpected entry type")
}
s.w.Trigger(r.Id, s.apply(r))
appliedi = e.Index
}
@ -218,6 +230,45 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
}
}
func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error {
cc := raftpb.ConfChange{
ID: GenID(),
Type: raftpb.ConfChangeAddNode,
NodeID: id,
Context: context,
}
return s.configure(ctx, cc)
}
func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error {
cc := raftpb.ConfChange{
ID: GenID(),
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
}
return s.configure(ctx, cc)
}
// configure sends configuration change through consensus then performs it.
// It will block until the change is performed or there is an error.
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
ch := s.w.Register(cc.ID)
if err := s.Node.ProposeConfChange(ctx, cc); err != nil {
log.Printf("configure error: %v", err)
s.w.Trigger(cc.ID, nil)
return err
}
select {
case <-ch:
return nil
case <-ctx.Done():
s.w.Trigger(cc.ID, nil) // GC wait
return ctx.Err()
case <-s.done:
return ErrStopped
}
}
// sync proposes a SYNC request and is non-blocking.
// This makes no guarantee that the request will be proposed or performed.
// The request will be cancelled after the given timeout.

View File

@ -594,6 +594,46 @@ func TestRecvSlowSnapshot(t *testing.T) {
}
}
// TestAddNode tests AddNode can propose and perform node addition.
func TestAddNode(t *testing.T) {
n := newNodeConfChangeCommitterRecorder()
s := &EtcdServer{
Node: n,
Store: &storeRecorder{},
Send: func(_ []raftpb.Message) {},
Storage: &storageRecorder{},
}
s.Start()
s.AddNode(context.TODO(), 1, []byte("foo"))
action := n.Action()
s.Stop()
waction := []string{"ProposeConfChange:ConfChangeAddNode", "ApplyConfChange:ConfChangeAddNode"}
if !reflect.DeepEqual(action, waction) {
t.Errorf("action = %v, want %v", action, waction)
}
}
// TestRemoveNode tests RemoveNode can propose and perform node removal.
func TestRemoveNode(t *testing.T) {
n := newNodeConfChangeCommitterRecorder()
s := &EtcdServer{
Node: n,
Store: &storeRecorder{},
Send: func(_ []raftpb.Message) {},
Storage: &storageRecorder{},
}
s.Start()
s.RemoveNode(context.TODO(), 1)
action := n.Action()
s.Stop()
waction := []string{"ProposeConfChange:ConfChangeRemoveNode", "ApplyConfChange:ConfChangeRemoveNode"}
if !reflect.DeepEqual(action, waction) {
t.Errorf("action = %v, want %v", action, waction)
}
}
// TODO: test wait trigger correctness in multi-server case
func TestGetBool(t *testing.T) {
@ -762,16 +802,17 @@ func newReadyNode() *readyNode {
readyc := make(chan raft.Ready, 1)
return &readyNode{readyc: readyc}
}
func (n *readyNode) Tick() {}
func (n *readyNode) Campaign(ctx context.Context) error { return nil }
func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil }
func (n *readyNode) Configure(ctx context.Context, data []byte) error { return nil }
func (n *readyNode) Tick() {}
func (n *readyNode) Campaign(ctx context.Context) error { return nil }
func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil }
func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
return nil
}
func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {}
func (n *readyNode) Stop() {}
func (n *readyNode) Compact(d []byte) {}
func (n *readyNode) AddNode(id int64) {}
func (n *readyNode) RemoveNode(id int64) {}
type nodeRecorder struct {
recorder
@ -788,13 +829,17 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
n.record("Propose")
return nil
}
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
n.record("ProposeConfChange")
return nil
}
func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
n.record("Step")
return nil
}
func (n *nodeRecorder) Ready() <-chan raft.Ready {
n.record("Ready")
return nil
func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
n.record("ApplyConfChange")
}
func (n *nodeRecorder) Stop() {
n.record("Stop")
@ -832,3 +877,29 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte)
n.record("Propose blocked")
return nil
}
type nodeConfChangeCommitterRecorder struct {
nodeRecorder
readyc chan raft.Ready
}
func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
readyc := make(chan raft.Ready, 1)
readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}}
return &nodeConfChangeCommitterRecorder{readyc: readyc}
}
func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
data, err := conf.Marshal()
if err != nil {
return err
}
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange, Data: data}}}
n.record("ProposeConfChange:" + conf.Type.String())
return nil
}
func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
return n.readyc
}
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) {
n.record("ApplyConfChange:" + conf.Type.String())
}

View File

@ -61,5 +61,19 @@ data, serialize it into a byte slice and call:
n.Propose(ctx, data)
If the proposal is committed, data will appear in committed entries with type
raftpb.EntryNormal.
To add or remove node in a cluster, build ConfChange struct 'cc' and call:
n.ProposeConfChange(ctx, cc)
After config change is committed, some committed entry with type
raftpb.EntryConfChange will be returned. You should apply it to node through:
var cc raftpb.ConfChange
cc.Unmarshal(data)
n.ApplyConfChange(cc)
*/
package raft

View File

@ -76,14 +76,22 @@ type Node interface {
// Tick increments the internal logical clock for the Node by a single tick. Election
// timeouts and heartbeat timeouts are in units of ticks.
Tick()
// Campaign causes the Node to transition to candidate state and start campaigning to become leader
// Campaign causes the Node to transition to candidate state and start campaigning to become leader.
Campaign(ctx context.Context) error
// Propose proposes that data be appended to the log.
Propose(ctx context.Context, data []byte) error
// ProposeConfChange proposes config change.
// At most one ConfChange can be in the process of going through consensus.
// Application needs to call ApplyConfChange when applying EntryConfChange type entry.
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error
// Ready returns a channel that returns the current point-in-time state
Ready() <-chan Ready
// ApplyConfChange applies config change to the local node.
// TODO: reject existing node when add node
// TODO: reject non-existant node when remove node
ApplyConfChange(cc pb.ConfChange)
// Stop performs any necessary termination of the Node
Stop()
// Compact
@ -119,6 +127,7 @@ type node struct {
propc chan pb.Message
recvc chan pb.Message
compactc chan []byte
confc chan pb.ConfChange
readyc chan Ready
tickc chan struct{}
done chan struct{}
@ -129,6 +138,7 @@ func newNode() node {
propc: make(chan pb.Message),
recvc: make(chan pb.Message),
compactc: make(chan []byte),
confc: make(chan pb.ConfChange),
readyc: make(chan Ready),
tickc: make(chan struct{}),
done: make(chan struct{}),
@ -167,6 +177,9 @@ func (n *node) run(r *raft) {
}
select {
// TODO: maybe buffer the config propose if there exists one (the way
// described in raft dissertation)
// Currently it is dropped in Step silently.
case m := <-propc:
m.From = r.id
r.Step(m)
@ -174,6 +187,15 @@ func (n *node) run(r *raft) {
r.Step(m) // raft never returns an error
case d := <-n.compactc:
r.compact(d)
case cc := <-n.confc:
switch cc.Type {
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
case pb.ConfChangeRemoveNode:
r.removeNode(cc.NodeID)
default:
panic("unexpected conf type")
}
case <-n.tickc:
r.tick()
case readyc <- rd:
@ -186,6 +208,10 @@ func (n *node) run(r *raft) {
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Index
}
// TODO(yichengq): we assume that all committed config
// entries will be applied to make things easy for now.
// TODO(yichengq): it may have race because applied is set
// before entries are applied.
r.raftLog.resetNextEnts()
r.raftLog.resetUnstable()
r.msgs = nil
@ -212,6 +238,14 @@ func (n *node) Propose(ctx context.Context, data []byte) error {
return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}})
}
func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
data, err := cc.Marshal()
if err != nil {
return err
}
return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
}
// Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any.
func (n *node) Step(ctx context.Context, m pb.Message) error {
@ -234,6 +268,13 @@ func (n *node) Ready() <-chan Ready {
return n.readyc
}
func (n *node) ApplyConfChange(cc pb.ConfChange) {
select {
case n.confc <- cc:
case <-n.done:
}
}
func (n *node) Compact(d []byte) {
select {
case n.compactc <- d:

View File

@ -105,6 +105,9 @@ type raft struct {
// the leader id
lead int64
// New configuration is ignored if there exists unapplied configuration.
pendingConf bool
elapsed int // number of ticks since the last msg
heartbeatTimeout int
electionTimeout int
@ -245,6 +248,7 @@ func (r *raft) reset(term int64) {
r.prs[i].match = r.raftLog.lastIndex()
}
}
r.pendingConf = false
}
func (r *raft) q() int {
@ -308,6 +312,15 @@ func (r *raft) becomeLeader() {
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
if e.Type != pb.EntryConfChange {
continue
}
if r.pendingConf {
panic("unexpected double uncommitted config entry")
}
r.pendingConf = true
}
r.appendEntry(pb.Entry{Data: nil})
}
@ -373,6 +386,16 @@ func (r *raft) handleSnapshot(m pb.Message) {
}
}
func (r *raft) addNode(id int64) {
r.setProgress(id, 0, r.raftLog.lastIndex()+1)
r.pendingConf = false
}
func (r *raft) removeNode(id int64) {
r.delProgress(id)
r.pendingConf = false
}
type stepFunc func(r *raft, m pb.Message)
func stepLeader(r *raft, m pb.Message) {
@ -384,6 +407,12 @@ func stepLeader(r *raft, m pb.Message) {
panic("unexpected length(entries) of a msgProp")
}
e := m.Entries[0]
if e.Type == pb.EntryConfChange {
if r.pendingConf {
return
}
r.pendingConf = true
}
r.appendEntry(e)
r.bcastAppend()
case msgAppResp:

View File

@ -948,6 +948,111 @@ func TestSlowNodeRestore(t *testing.T) {
}
}
// TestStepConfig tests that when raft step msgProp in EntryConfChange type,
// it appends the entry to log and sets pendingConf to be true.
func TestStepConfig(t *testing.T) {
// a raft that cannot make progress
r := newRaft(1, []int64{1, 2}, 0, 0)
r.becomeCandidate()
r.becomeLeader()
index := r.raftLog.lastIndex()
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
if g := r.raftLog.lastIndex(); g != index+1 {
t.Errorf("index = %d, want %d", g, index+1)
}
if r.pendingConf != true {
t.Errorf("pendingConf = %v, want true", r.pendingConf)
}
}
// TestStepIgnoreConfig tests that if raft step the second msgProp in
// EntryConfChange type when the first one is uncommitted, the node will deny
// the proposal and keep its original state.
func TestStepIgnoreConfig(t *testing.T) {
// a raft that cannot make progress
r := newRaft(1, []int64{1, 2}, 0, 0)
r.becomeCandidate()
r.becomeLeader()
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
index := r.raftLog.lastIndex()
pendingConf := r.pendingConf
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
if g := r.raftLog.lastIndex(); g != index {
t.Errorf("index = %d, want %d", g, index)
}
if r.pendingConf != pendingConf {
t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
}
}
// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
// based on uncommitted entries.
func TestRecoverPendingConfig(t *testing.T) {
tests := []struct {
entType pb.EntryType
wpending bool
}{
{pb.EntryNormal, false},
{pb.EntryConfChange, true},
}
for i, tt := range tests {
r := newRaft(1, []int64{1, 2}, 0, 0)
r.appendEntry(pb.Entry{Type: tt.entType})
r.becomeCandidate()
r.becomeLeader()
if r.pendingConf != tt.wpending {
t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
}
}
}
// TestRecoverDoublePendingConfig tests that new leader will panic if
// there exist two uncommitted config entries.
func TestRecoverDoublePendingConfig(t *testing.T) {
func() {
defer func() {
if err := recover(); err == nil {
t.Errorf("expect panic, but nothing happens")
}
}()
r := newRaft(1, []int64{1, 2}, 0, 0)
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.becomeCandidate()
r.becomeLeader()
}()
}
// TestAddNode tests that addNode could update pendingConf and peer list correctly.
func TestAddNode(t *testing.T) {
r := newRaft(1, []int64{1}, 0, 0)
r.pendingConf = true
r.addNode(2)
if r.pendingConf != false {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes()
sort.Sort(int64Slice(nodes))
wnodes := []int64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) {
t.Errorf("nodes = %v, want %v", nodes, wnodes)
}
}
// TestRemoveNode tests that removeNode could update pendingConf and peer list correctly.
func TestRemoveNode(t *testing.T) {
r := newRaft(1, []int64{1, 2}, 0, 0)
r.pendingConf = true
r.removeNode(2)
if r.pendingConf != false {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
w := []int64{1}
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
}
func ents(terms ...int64) *raft {
ents := []pb.Entry{{}}
for _, term := range terms {

View File

@ -14,6 +14,7 @@
Snapshot
Message
HardState
ConfChange
*/
package raftpb
@ -31,6 +32,72 @@ var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type EntryType int32
const (
EntryNormal EntryType = 0
EntryConfChange EntryType = 1
)
var EntryType_name = map[int32]string{
0: "EntryNormal",
1: "EntryConfChange",
}
var EntryType_value = map[string]int32{
"EntryNormal": 0,
"EntryConfChange": 1,
}
func (x EntryType) Enum() *EntryType {
p := new(EntryType)
*p = x
return p
}
func (x EntryType) String() string {
return proto.EnumName(EntryType_name, int32(x))
}
func (x *EntryType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(EntryType_value, data, "EntryType")
if err != nil {
return err
}
*x = EntryType(value)
return nil
}
type ConfChangeType int32
const (
ConfChangeAddNode ConfChangeType = 0
ConfChangeRemoveNode ConfChangeType = 1
)
var ConfChangeType_name = map[int32]string{
0: "ConfChangeAddNode",
1: "ConfChangeRemoveNode",
}
var ConfChangeType_value = map[string]int32{
"ConfChangeAddNode": 0,
"ConfChangeRemoveNode": 1,
}
func (x ConfChangeType) Enum() *ConfChangeType {
p := new(ConfChangeType)
*p = x
return p
}
func (x ConfChangeType) String() string {
return proto.EnumName(ConfChangeType_name, int32(x))
}
func (x *ConfChangeType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(ConfChangeType_value, data, "ConfChangeType")
if err != nil {
return err
}
*x = ConfChangeType(value)
return nil
}
type Info struct {
Id int64 `protobuf:"varint,1,req,name=id" json:"id"`
XXX_unrecognized []byte `json:"-"`
@ -41,10 +108,11 @@ func (m *Info) String() string { return proto.CompactTextString(m) }
func (*Info) ProtoMessage() {}
type Entry struct {
Term int64 `protobuf:"varint,2,req,name=term" json:"term"`
Index int64 `protobuf:"varint,3,req,name=index" json:"index"`
Data []byte `protobuf:"bytes,4,opt,name=data" json:"data"`
XXX_unrecognized []byte `json:"-"`
Type EntryType `protobuf:"varint,1,req,enum=raftpb.EntryType" json:"Type"`
Term int64 `protobuf:"varint,2,req" json:"Term"`
Index int64 `protobuf:"varint,3,req" json:"Index"`
Data []byte `protobuf:"bytes,4,opt" json:"Data"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Entry) Reset() { *m = Entry{} }
@ -91,7 +159,21 @@ func (m *HardState) Reset() { *m = HardState{} }
func (m *HardState) String() string { return proto.CompactTextString(m) }
func (*HardState) ProtoMessage() {}
type ConfChange struct {
ID int64 `protobuf:"varint,1,req" json:"ID"`
Type ConfChangeType `protobuf:"varint,2,req,enum=raftpb.ConfChangeType" json:"Type"`
NodeID int64 `protobuf:"varint,3,req" json:"NodeID"`
Context []byte `protobuf:"bytes,4,opt" json:"Context"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ConfChange) Reset() { *m = ConfChange{} }
func (m *ConfChange) String() string { return proto.CompactTextString(m) }
func (*ConfChange) ProtoMessage() {}
func init() {
proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value)
proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value)
}
func (m *Info) Unmarshal(data []byte) error {
l := len(data)
@ -169,6 +251,21 @@ func (m *Entry) Unmarshal(data []byte) error {
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Type |= (EntryType(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
@ -636,6 +733,115 @@ func (m *HardState) Unmarshal(data []byte) error {
}
return nil
}
func (m *ConfChange) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.ID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Type |= (ConfChangeType(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.NodeID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 2 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
postIndex := index + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Context = append(m.Context, data[index:postIndex]...)
index = postIndex
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
index -= sizeOfWire
skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
if err != nil {
return err
}
if (index + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
index += skippy
}
}
return nil
}
func (m *Info) Size() (n int) {
var l int
_ = l
@ -648,6 +854,7 @@ func (m *Info) Size() (n int) {
func (m *Entry) Size() (n int) {
var l int
_ = l
n += 1 + sovRaft(uint64(m.Type))
n += 1 + sovRaft(uint64(m.Term))
n += 1 + sovRaft(uint64(m.Index))
l = len(m.Data)
@ -708,6 +915,19 @@ func (m *HardState) Size() (n int) {
}
return n
}
func (m *ConfChange) Size() (n int) {
var l int
_ = l
n += 1 + sovRaft(uint64(m.ID))
n += 1 + sovRaft(uint64(m.Type))
n += 1 + sovRaft(uint64(m.NodeID))
l = len(m.Context)
n += 1 + l + sovRaft(uint64(l))
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovRaft(x uint64) (n int) {
for {
@ -760,6 +980,9 @@ func (m *Entry) MarshalTo(data []byte) (n int, err error) {
_ = i
var l int
_ = l
data[i] = 0x8
i++
i = encodeVarintRaft(data, i, uint64(m.Type))
data[i] = 0x10
i++
i = encodeVarintRaft(data, i, uint64(m.Term))
@ -908,6 +1131,39 @@ func (m *HardState) MarshalTo(data []byte) (n int, err error) {
}
return i, nil
}
func (m *ConfChange) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *ConfChange) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int
_ = l
data[i] = 0x8
i++
i = encodeVarintRaft(data, i, uint64(m.ID))
data[i] = 0x10
i++
i = encodeVarintRaft(data, i, uint64(m.Type))
data[i] = 0x18
i++
i = encodeVarintRaft(data, i, uint64(m.NodeID))
data[i] = 0x22
i++
i = encodeVarintRaft(data, i, uint64(len(m.Context)))
i += copy(data[i:], m.Context)
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeFixed64Raft(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)

View File

@ -6,15 +6,22 @@ option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
message Info {
required int64 id = 1 [(gogoproto.nullable) = false];
}
enum EntryType {
EntryNormal = 0;
EntryConfChange = 1;
}
message Entry {
required int64 term = 2 [(gogoproto.nullable) = false];
required int64 index = 3 [(gogoproto.nullable) = false];
optional bytes data = 4 [(gogoproto.nullable) = false];
required EntryType Type = 1 [(gogoproto.nullable) = false];
required int64 Term = 2 [(gogoproto.nullable) = false];
required int64 Index = 3 [(gogoproto.nullable) = false];
optional bytes Data = 4 [(gogoproto.nullable) = false];
}
message Snapshot {
@ -41,3 +48,15 @@ message HardState {
required int64 vote = 2 [(gogoproto.nullable) = false];
required int64 commit = 3 [(gogoproto.nullable) = false];
}
enum ConfChangeType {
ConfChangeAddNode = 0;
ConfChangeRemoveNode = 1;
}
message ConfChange {
required int64 ID = 1 [(gogoproto.nullable) = false];
required ConfChangeType Type = 2 [(gogoproto.nullable) = false];
required int64 NodeID = 3 [(gogoproto.nullable) = false];
optional bytes Context = 4 [(gogoproto.nullable) = false];
}