raft: ConfigChange -> ConfChange

release-2.0
Yicheng Qin 2014-09-23 12:02:44 -07:00
parent d92931853e
commit bc7b0108dc
8 changed files with 103 additions and 103 deletions

View File

@ -128,12 +128,12 @@ func (s *EtcdServer) run() {
panic("TODO: this is bad, what do we do about it?")
}
s.w.Trigger(r.Id, s.apply(r))
case raftpb.EntryConfigChange:
var cc raftpb.ConfigChange
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
s.Node.ApplyConfigChange(cc)
s.Node.ApplyConfChange(cc)
s.w.Trigger(cc.ID, nil)
default:
panic("unexpected entry type")
@ -231,9 +231,9 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
}
func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error {
cc := raftpb.ConfigChange{
cc := raftpb.ConfChange{
ID: GenID(),
Type: raftpb.ConfigChangeAddNode,
Type: raftpb.ConfChangeAddNode,
NodeID: id,
Context: context,
}
@ -241,9 +241,9 @@ func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) erro
}
func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error {
cc := raftpb.ConfigChange{
cc := raftpb.ConfChange{
ID: GenID(),
Type: raftpb.ConfigChangeRemoveNode,
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
}
return s.configure(ctx, cc)
@ -251,9 +251,9 @@ func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error {
// configure sends configuration change through consensus then performs it.
// It will block until the change is performed or there is an error.
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfigChange) error {
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
ch := s.w.Register(cc.ID)
if err := s.Node.ProposeConfigChange(ctx, cc); err != nil {
if err := s.Node.ProposeConfChange(ctx, cc); err != nil {
log.Printf("configure error: %v", err)
s.w.Trigger(cc.ID, nil)
return err

View File

@ -596,7 +596,7 @@ func TestRecvSlowSnapshot(t *testing.T) {
// TestAddNode tests AddNode could propose and perform node addition.
func TestAddNode(t *testing.T) {
n := newNodeConfigChangeCommitterRecorder()
n := newNodeConfChangeCommitterRecorder()
s := &EtcdServer{
Node: n,
Store: &storeRecorder{},
@ -608,7 +608,7 @@ func TestAddNode(t *testing.T) {
action := n.Action()
s.Stop()
waction := []string{"ProposeConfigChange:ConfigChangeAddNode", "ApplyConfigChange:ConfigChangeAddNode"}
waction := []string{"ProposeConfChange:ConfChangeAddNode", "ApplyConfChange:ConfChangeAddNode"}
if !reflect.DeepEqual(action, waction) {
t.Errorf("action = %v, want %v", action, waction)
}
@ -616,7 +616,7 @@ func TestAddNode(t *testing.T) {
// TestRemoveNode tests RemoveNode could propose and perform node removal.
func TestRemoveNode(t *testing.T) {
n := newNodeConfigChangeCommitterRecorder()
n := newNodeConfChangeCommitterRecorder()
s := &EtcdServer{
Node: n,
Store: &storeRecorder{},
@ -628,7 +628,7 @@ func TestRemoveNode(t *testing.T) {
action := n.Action()
s.Stop()
waction := []string{"ProposeConfigChange:ConfigChangeRemoveNode", "ApplyConfigChange:ConfigChangeRemoveNode"}
waction := []string{"ProposeConfChange:ConfChangeRemoveNode", "ApplyConfChange:ConfChangeRemoveNode"}
if !reflect.DeepEqual(action, waction) {
t.Errorf("action = %v, want %v", action, waction)
}
@ -805,12 +805,12 @@ func newReadyNode() *readyNode {
func (n *readyNode) Tick() {}
func (n *readyNode) Campaign(ctx context.Context) error { return nil }
func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil }
func (n *readyNode) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error {
func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
return nil
}
func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
func (n *readyNode) ApplyConfigChange(conf raftpb.ConfigChange) {}
func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) {}
func (n *readyNode) Stop() {}
func (n *readyNode) Compact(d []byte) {}
@ -829,8 +829,8 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
n.record("Propose")
return nil
}
func (n *nodeRecorder) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error {
n.record("ProposeConfigChange")
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
n.record("ProposeConfChange")
return nil
}
func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
@ -838,8 +838,8 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
return nil
}
func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
func (n *nodeRecorder) ApplyConfigChange(conf raftpb.ConfigChange) {
n.record("ApplyConfigChange")
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
n.record("ApplyConfChange")
}
func (n *nodeRecorder) Stop() {
n.record("Stop")
@ -878,28 +878,28 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte)
return nil
}
type nodeConfigChangeCommitterRecorder struct {
type nodeConfChangeCommitterRecorder struct {
nodeRecorder
readyc chan raft.Ready
}
func newNodeConfigChangeCommitterRecorder() *nodeConfigChangeCommitterRecorder {
func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
readyc := make(chan raft.Ready, 1)
readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}}
return &nodeConfigChangeCommitterRecorder{readyc: readyc}
return &nodeConfChangeCommitterRecorder{readyc: readyc}
}
func (n *nodeConfigChangeCommitterRecorder) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error {
func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
data, err := conf.Marshal()
if err != nil {
return err
}
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfigChange, Data: data}}}
n.record("ProposeConfigChange:" + conf.Type.String())
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange, Data: data}}}
n.record("ProposeConfChange:" + conf.Type.String())
return nil
}
func (n *nodeConfigChangeCommitterRecorder) Ready() <-chan raft.Ready {
func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
return n.readyc
}
func (n *nodeConfigChangeCommitterRecorder) ApplyConfigChange(conf raftpb.ConfigChange) {
n.record("ApplyConfigChange:" + conf.Type.String())
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) {
n.record("ApplyConfChange:" + conf.Type.String())
}

View File

@ -64,16 +64,16 @@ data, serialize it into a byte slice and call:
If the proposal is committed, data will appear in committed entries with type
raftpb.EntryNormal.
To add or remove node in a cluster, build ConfigChange struct 'cc' and call:
To add or remove node in a cluster, build ConfChange struct 'cc' and call:
n.ProposeConfigChange(ctx, cc)
n.ProposeConfChange(ctx, cc)
After config change is committed, some committed entry with type
raftpb.EntryConfigChange will be returned. You should apply it to node through:
raftpb.EntryConfChange will be returned. You should apply it to node through:
var cc raftpb.ConfigChange
var cc raftpb.ConfChange
cc.Unmarshal(data)
n.ApplyConfigChange(cc)
n.ApplyConfChange(cc)
*/
package raft

View File

@ -80,18 +80,18 @@ type Node interface {
Campaign(ctx context.Context) error
// Propose proposes that data be appended to the log.
Propose(ctx context.Context, data []byte) error
// ProposeConfigChange proposes config change.
// At most one ConfigChange can be in the process of going through consensus.
// Application needs to call ApplyConfigChange when applying EntryConfigChange type entry.
ProposeConfigChange(ctx context.Context, cc pb.ConfigChange) error
// ProposeConfChange proposes config change.
// At most one ConfChange can be in the process of going through consensus.
// Application needs to call ApplyConfChange when applying EntryConfChange type entry.
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error
// Ready returns a channel that returns the current point-in-time state
Ready() <-chan Ready
// ApplyConfigChange applies config change to the local node.
// ApplyConfChange applies config change to the local node.
// TODO: reject existing node when add node
// TODO: reject non-existant node when remove node
ApplyConfigChange(cc pb.ConfigChange)
ApplyConfChange(cc pb.ConfChange)
// Stop performs any necessary termination of the Node
Stop()
// Compact
@ -127,7 +127,7 @@ type node struct {
propc chan pb.Message
recvc chan pb.Message
compactc chan []byte
confc chan pb.ConfigChange
confc chan pb.ConfChange
readyc chan Ready
tickc chan struct{}
done chan struct{}
@ -138,7 +138,7 @@ func newNode() node {
propc: make(chan pb.Message),
recvc: make(chan pb.Message),
compactc: make(chan []byte),
confc: make(chan pb.ConfigChange),
confc: make(chan pb.ConfChange),
readyc: make(chan Ready),
tickc: make(chan struct{}),
done: make(chan struct{}),
@ -189,9 +189,9 @@ func (n *node) run(r *raft) {
r.compact(d)
case cc := <-n.confc:
switch cc.Type {
case pb.ConfigChangeAddNode:
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
case pb.ConfigChangeRemoveNode:
case pb.ConfChangeRemoveNode:
r.removeNode(cc.NodeID)
default:
panic("unexpected conf type")
@ -238,12 +238,12 @@ func (n *node) Propose(ctx context.Context, data []byte) error {
return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}})
}
func (n *node) ProposeConfigChange(ctx context.Context, cc pb.ConfigChange) error {
func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
data, err := cc.Marshal()
if err != nil {
return err
}
return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange, Data: data}}})
return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
}
// Step advances the state machine using msgs. The ctx.Err() will be returned,
@ -268,7 +268,7 @@ func (n *node) Ready() <-chan Ready {
return n.readyc
}
func (n *node) ApplyConfigChange(cc pb.ConfigChange) {
func (n *node) ApplyConfChange(cc pb.ConfChange) {
select {
case n.confc <- cc:
case <-n.done:

View File

@ -313,7 +313,7 @@ func (r *raft) becomeLeader() {
r.lead = r.id
r.state = StateLeader
for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
if e.Type != pb.EntryConfigChange {
if e.Type != pb.EntryConfChange {
continue
}
if r.pendingConf {
@ -407,7 +407,7 @@ func stepLeader(r *raft, m pb.Message) {
panic("unexpected length(entries) of a msgProp")
}
e := m.Entries[0]
if e.Type == pb.EntryConfigChange {
if e.Type == pb.EntryConfChange {
if r.pendingConf {
return
}

View File

@ -948,7 +948,7 @@ func TestSlowNodeRestore(t *testing.T) {
}
}
// TestStepConfig tests that when raft step msgProp in EntryConfigChange type,
// TestStepConfig tests that when raft step msgProp in EntryConfChange type,
// it appends the entry to log and sets pendingConf to be true.
func TestStepConfig(t *testing.T) {
// a raft that cannot make progress
@ -956,7 +956,7 @@ func TestStepConfig(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
index := r.raftLog.lastIndex()
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}})
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
if g := r.raftLog.lastIndex(); g != index+1 {
t.Errorf("index = %d, want %d", g, index+1)
}
@ -966,17 +966,17 @@ func TestStepConfig(t *testing.T) {
}
// TestStepIgnoreConfig tests that if raft step the second msgProp in
// EntryConfigChange type when the first one is uncommitted, the node will deny
// EntryConfChange type when the first one is uncommitted, the node will deny
// the proposal and keep its original state.
func TestStepIgnoreConfig(t *testing.T) {
// a raft that cannot make progress
r := newRaft(1, []int64{1, 2}, 0, 0)
r.becomeCandidate()
r.becomeLeader()
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}})
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
index := r.raftLog.lastIndex()
pendingConf := r.pendingConf
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}})
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
if g := r.raftLog.lastIndex(); g != index {
t.Errorf("index = %d, want %d", g, index)
}
@ -993,7 +993,7 @@ func TestRecoverPendingConfig(t *testing.T) {
wpending bool
}{
{pb.EntryNormal, false},
{pb.EntryConfigChange, true},
{pb.EntryConfChange, true},
}
for i, tt := range tests {
r := newRaft(1, []int64{1, 2}, 0, 0)
@ -1016,8 +1016,8 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
}
}()
r := newRaft(1, []int64{1, 2}, 0, 0)
r.appendEntry(pb.Entry{Type: pb.EntryConfigChange})
r.appendEntry(pb.Entry{Type: pb.EntryConfigChange})
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.becomeCandidate()
r.becomeLeader()
}()

View File

@ -14,7 +14,7 @@
Snapshot
Message
HardState
ConfigChange
ConfChange
*/
package raftpb
@ -35,17 +35,17 @@ var _ = math.Inf
type EntryType int32
const (
EntryNormal EntryType = 0
EntryConfigChange EntryType = 1
EntryNormal EntryType = 0
EntryConfChange EntryType = 1
)
var EntryType_name = map[int32]string{
0: "EntryNormal",
1: "EntryConfigChange",
1: "EntryConfChange",
}
var EntryType_value = map[string]int32{
"EntryNormal": 0,
"EntryConfigChange": 1,
"EntryNormal": 0,
"EntryConfChange": 1,
}
func (x EntryType) Enum() *EntryType {
@ -65,36 +65,36 @@ func (x *EntryType) UnmarshalJSON(data []byte) error {
return nil
}
type ConfigChangeType int32
type ConfChangeType int32
const (
ConfigChangeAddNode ConfigChangeType = 0
ConfigChangeRemoveNode ConfigChangeType = 1
ConfChangeAddNode ConfChangeType = 0
ConfChangeRemoveNode ConfChangeType = 1
)
var ConfigChangeType_name = map[int32]string{
0: "ConfigChangeAddNode",
1: "ConfigChangeRemoveNode",
var ConfChangeType_name = map[int32]string{
0: "ConfChangeAddNode",
1: "ConfChangeRemoveNode",
}
var ConfigChangeType_value = map[string]int32{
"ConfigChangeAddNode": 0,
"ConfigChangeRemoveNode": 1,
var ConfChangeType_value = map[string]int32{
"ConfChangeAddNode": 0,
"ConfChangeRemoveNode": 1,
}
func (x ConfigChangeType) Enum() *ConfigChangeType {
p := new(ConfigChangeType)
func (x ConfChangeType) Enum() *ConfChangeType {
p := new(ConfChangeType)
*p = x
return p
}
func (x ConfigChangeType) String() string {
return proto.EnumName(ConfigChangeType_name, int32(x))
func (x ConfChangeType) String() string {
return proto.EnumName(ConfChangeType_name, int32(x))
}
func (x *ConfigChangeType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(ConfigChangeType_value, data, "ConfigChangeType")
func (x *ConfChangeType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(ConfChangeType_value, data, "ConfChangeType")
if err != nil {
return err
}
*x = ConfigChangeType(value)
*x = ConfChangeType(value)
return nil
}
@ -159,21 +159,21 @@ func (m *HardState) Reset() { *m = HardState{} }
func (m *HardState) String() string { return proto.CompactTextString(m) }
func (*HardState) ProtoMessage() {}
type ConfigChange struct {
ID int64 `protobuf:"varint,1,req" json:"ID"`
Type ConfigChangeType `protobuf:"varint,2,req,enum=raftpb.ConfigChangeType" json:"Type"`
NodeID int64 `protobuf:"varint,3,req" json:"NodeID"`
Context []byte `protobuf:"bytes,4,opt" json:"Context"`
XXX_unrecognized []byte `json:"-"`
type ConfChange struct {
ID int64 `protobuf:"varint,1,req" json:"ID"`
Type ConfChangeType `protobuf:"varint,2,req,enum=raftpb.ConfChangeType" json:"Type"`
NodeID int64 `protobuf:"varint,3,req" json:"NodeID"`
Context []byte `protobuf:"bytes,4,opt" json:"Context"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ConfigChange) Reset() { *m = ConfigChange{} }
func (m *ConfigChange) String() string { return proto.CompactTextString(m) }
func (*ConfigChange) ProtoMessage() {}
func (m *ConfChange) Reset() { *m = ConfChange{} }
func (m *ConfChange) String() string { return proto.CompactTextString(m) }
func (*ConfChange) ProtoMessage() {}
func init() {
proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value)
proto.RegisterEnum("raftpb.ConfigChangeType", ConfigChangeType_name, ConfigChangeType_value)
proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value)
}
func (m *Info) Unmarshal(data []byte) error {
l := len(data)
@ -733,7 +733,7 @@ func (m *HardState) Unmarshal(data []byte) error {
}
return nil
}
func (m *ConfigChange) Unmarshal(data []byte) error {
func (m *ConfChange) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
@ -777,7 +777,7 @@ func (m *ConfigChange) Unmarshal(data []byte) error {
}
b := data[index]
index++
m.Type |= (ConfigChangeType(b) & 0x7F) << shift
m.Type |= (ConfChangeType(b) & 0x7F) << shift
if b < 0x80 {
break
}
@ -915,7 +915,7 @@ func (m *HardState) Size() (n int) {
}
return n
}
func (m *ConfigChange) Size() (n int) {
func (m *ConfChange) Size() (n int) {
var l int
_ = l
n += 1 + sovRaft(uint64(m.ID))
@ -1131,7 +1131,7 @@ func (m *HardState) MarshalTo(data []byte) (n int, err error) {
}
return i, nil
}
func (m *ConfigChange) Marshal() (data []byte, err error) {
func (m *ConfChange) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
@ -1141,7 +1141,7 @@ func (m *ConfigChange) Marshal() (data []byte, err error) {
return data[:n], nil
}
func (m *ConfigChange) MarshalTo(data []byte) (n int, err error) {
func (m *ConfChange) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int

View File

@ -13,8 +13,8 @@ message Info {
}
enum EntryType {
EntryNormal = 0;
EntryConfigChange = 1;
EntryNormal = 0;
EntryConfChange = 1;
}
message Entry {
@ -49,14 +49,14 @@ message HardState {
required int64 commit = 3 [(gogoproto.nullable) = false];
}
enum ConfigChangeType {
ConfigChangeAddNode = 0;
ConfigChangeRemoveNode = 1;
enum ConfChangeType {
ConfChangeAddNode = 0;
ConfChangeRemoveNode = 1;
}
message ConfigChange {
required int64 ID = 1 [(gogoproto.nullable) = false];
required ConfigChangeType Type = 2 [(gogoproto.nullable) = false];
required int64 NodeID = 3 [(gogoproto.nullable) = false];
optional bytes Context = 4 [(gogoproto.nullable) = false];
message ConfChange {
required int64 ID = 1 [(gogoproto.nullable) = false];
required ConfChangeType Type = 2 [(gogoproto.nullable) = false];
required int64 NodeID = 3 [(gogoproto.nullable) = false];
optional bytes Context = 4 [(gogoproto.nullable) = false];
}