diff --git a/raft/interaction_test.go b/raft/interaction_test.go new file mode 100644 index 000000000..375e636dc --- /dev/null +++ b/raft/interaction_test.go @@ -0,0 +1,31 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft_test + +import ( + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft/rafttest" +) + +func TestInteraction(t *testing.T) { + datadriven.Walk(t, "testdata", func(t *testing.T, path string) { + env := rafttest.NewInteractionEnv(nil) + datadriven.RunTest(t, path, func(d *datadriven.TestData) string { + return env.Handle(t, *d) + }) + }) +} diff --git a/raft/raft.go b/raft/raft.go index 62e79642c..83d783eb3 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "math/rand" + "sort" "strings" "sync" "time" @@ -529,7 +530,6 @@ func (r *raft) bcastAppend() { if id == r.id { return } - r.sendAppend(id) }) } @@ -795,7 +795,16 @@ func (r *raft) campaign(t CampaignType) { } return } - for id := range r.prs.Voters.IDs() { + var ids []uint64 + { + idMap := r.prs.Voters.IDs() + ids = make([]uint64, 0, len(idMap)) + for id := range idMap { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + } + for _, id := range ids { if id == r.id { continue } diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index d49c8837e..070f3a2b9 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -18,7 +18,6 @@ import ( "testing" pb "go.etcd.io/etcd/raft/raftpb" - "go.etcd.io/etcd/raft/tracker" ) var ( @@ -112,125 +111,6 @@ func TestSnapshotSucceed(t *testing.T) { } } -// TestSnapshotSucceedViaAppResp regression tests the situation in which a snap- -// shot is sent to a follower at the most recent index (i.e. the snapshot index -// is the leader's last index is the committed index). In that situation, a bug -// in the past left the follower in probing status until the next log entry was -// committed. -func TestSnapshotSucceedViaAppResp(t *testing.T) { - s1 := NewMemoryStorage() - // Create a single-node leader. - n1 := newTestRaft(1, []uint64{1}, 10, 1, s1) - n1.becomeCandidate() - n1.becomeLeader() - // We need to add a second empty entry so that we can truncate the first - // one away. - n1.Step(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - - rd := newReady(n1, &SoftState{}, pb.HardState{}) - s1.Append(rd.Entries) - s1.SetHardState(rd.HardState) - - if exp, ci := s1.lastIndex(), n1.raftLog.committed; ci != exp { - t.Fatalf("unexpected committed index %d, wanted %d: %+v", ci, exp, s1) - } - - // Force a log truncation. - if err := s1.Compact(1); err != nil { - t.Fatal(err) - } - - // Add a follower to the group. Do this in a clandestine way for simplicity. - // Also set up a snapshot that will be sent to the follower. - n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) - s1.snapshot = pb.Snapshot{ - Metadata: pb.SnapshotMetadata{ - ConfState: pb.ConfState{Voters: []uint64{1, 2}}, - Index: s1.lastIndex(), - Term: s1.ents[len(s1.ents)-1].Term, - }, - } - - noMessage := pb.MessageType(-1) - mustSend := func(from, to *raft, typ pb.MessageType) pb.Message { - t.Helper() - for i, msg := range from.msgs { - if msg.From != from.id || msg.To != to.id || msg.Type != typ { - continue - } - t.Log(DescribeMessage(msg, func([]byte) string { return "" })) - if len(msg.Entries) > 0 { - t.Log(DescribeEntries(msg.Entries, func(b []byte) string { return string(b) })) - } - if err := to.Step(msg); err != nil { - t.Fatalf("%v: %s", msg, err) - } - from.msgs = append(from.msgs[:i], from.msgs[i+1:]...) - return msg - } - if typ == noMessage { - if len(from.msgs) == 0 { - return pb.Message{} - } - t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs) - } - t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs) - return pb.Message{} // unreachable - } - - // Create the follower that will receive the snapshot. - s2 := NewMemoryStorage() - n2 := newTestRaft(2, []uint64{1, 2}, 10, 1, s2) - - // Let the leader probe the follower. - if !n1.maybeSendAppend(2, true /* sendIfEmpty */) { - t.Fatalf("expected message to be sent") - } - if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 { - // For this test to work, the leader must not have anything to append - // to the follower right now. - t.Fatalf("unexpectedly appending entries %v", msg.Entries) - } - - // Follower rejects the append (because it doesn't have any log entries) - if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject { - t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint) - } - - const expIdx = 2 - // Leader sends snapshot due to RejectHint of zero (we set up the raft log - // to start at index 2). - if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx { - t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index) - } - - // n2 reacts to snapshot with MsgAppResp. - if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx { - t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index) - } - - // Leader sends MsgApp to communicate commit index. - if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx { - t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit) - } - - // Follower responds. - mustSend(n2, n1, pb.MsgAppResp) - - // Leader has correct state for follower. - pr := n1.prs.Progress[2] - if pr.State != tracker.StateReplicate { - t.Fatalf("unexpected state %v", pr) - } - if pr.Match != expIdx || pr.Next != expIdx+1 { - t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next) - } - - // Leader and follower are done. - mustSend(n1, n2, noMessage) - mustSend(n2, n1, noMessage) -} - func TestSnapshotAbort(t *testing.T) { storage := NewMemoryStorage() sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) diff --git a/raft/raftpb/confchange.go b/raft/raftpb/confchange.go index a91c18dc1..46a7a7021 100644 --- a/raft/raftpb/confchange.go +++ b/raft/raftpb/confchange.go @@ -16,6 +16,8 @@ package raftpb import ( "fmt" + "strconv" + "strings" "github.com/gogo/protobuf/proto" ) @@ -103,3 +105,66 @@ func (c *ConfChangeV2) LeaveJoint() bool { cpy.Context = nil return proto.Equal(&cpy, &ConfChangeV2{}) } + +// ConfChangesFromString parses a Space-delimited sequence of operations into a +// slice of ConfChangeSingle. The supported operations are: +// - vn: make n a voter, +// - ln: make n a learner, +// - rn: remove n, and +// - un: update n. +func ConfChangesFromString(s string) ([]ConfChangeSingle, error) { + var ccs []ConfChangeSingle + toks := strings.Split(strings.TrimSpace(s), " ") + if toks[0] == "" { + toks = nil + } + for _, tok := range toks { + if len(tok) < 2 { + return nil, fmt.Errorf("unknown token %s", tok) + } + var cc ConfChangeSingle + switch tok[0] { + case 'v': + cc.Type = ConfChangeAddNode + case 'l': + cc.Type = ConfChangeAddLearnerNode + case 'r': + cc.Type = ConfChangeRemoveNode + case 'u': + cc.Type = ConfChangeUpdateNode + default: + return nil, fmt.Errorf("unknown input: %s", tok) + } + id, err := strconv.ParseUint(tok[1:], 10, 64) + if err != nil { + return nil, err + } + cc.NodeID = id + ccs = append(ccs, cc) + } + return ccs, nil +} + +// ConfChangesToString is the inverse to ConfChangesFromString. +func ConfChangesToString(ccs []ConfChangeSingle) string { + var buf strings.Builder + for i, cc := range ccs { + if i > 0 { + buf.WriteByte(' ') + } + switch cc.Type { + case ConfChangeAddNode: + buf.WriteByte('v') + case ConfChangeAddLearnerNode: + buf.WriteByte('l') + case ConfChangeRemoveNode: + buf.WriteByte('r') + case ConfChangeUpdateNode: + buf.WriteByte('u') + default: + buf.WriteString("unknown") + } + fmt.Fprintf(&buf, "%d", cc.NodeID) + } + return buf.String() +} diff --git a/raft/rafttest/interaction_env.go b/raft/rafttest/interaction_env.go new file mode 100644 index 000000000..c0ec44f6f --- /dev/null +++ b/raft/rafttest/interaction_env.go @@ -0,0 +1,90 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "math" + "strings" + + "go.etcd.io/etcd/raft" + pb "go.etcd.io/etcd/raft/raftpb" +) + +// InteractionOpts groups the options for an InteractionEnv. +type InteractionOpts struct { + OnConfig func(*raft.Config) +} + +// A Node is a member of a raft group tested via an InteractionEnv. +type Node struct { + *raft.RawNode + Storage + + Config *raft.Config + History []pb.Snapshot +} + +// InteractionEnv facilitates testing of complex interactions between the +// members of a raft group. +type InteractionEnv struct { + Options *InteractionOpts + Nodes []Node + Messages []pb.Message // in-flight messages + + Output *RedirectLogger +} + +// NewInteractionEnv initializes an InteractionEnv. opts may be nil. +func NewInteractionEnv(opts *InteractionOpts) *InteractionEnv { + if opts == nil { + opts = &InteractionOpts{} + } + return &InteractionEnv{ + Options: opts, + Output: &RedirectLogger{ + Builder: &strings.Builder{}, + }, + } +} + +// Storage is the interface used by InteractionEnv. It is comprised of raft's +// Storage interface plus access to operations that maintain the log and drive +// the Ready handling loop. +type Storage interface { + raft.Storage + SetHardState(state pb.HardState) error + ApplySnapshot(pb.Snapshot) error + Compact(newFirstIndex uint64) error + Append([]pb.Entry) error +} + +// defaultRaftConfig sets up a *raft.Config with reasonable testing defaults. +// In particular, no limits are set. +func defaultRaftConfig(id uint64, applied uint64, s raft.Storage) *raft.Config { + return &raft.Config{ + ID: id, + Applied: applied, + ElectionTick: 3, + HeartbeatTick: 1, + Storage: s, + MaxSizePerMsg: math.MaxUint64, + MaxInflightMsgs: math.MaxInt32, + } +} + +func defaultEntryFormatter(b []byte) string { + return fmt.Sprintf("%q", b) +} diff --git a/raft/rafttest/interaction_env_handler.go b/raft/rafttest/interaction_env_handler.go new file mode 100644 index 000000000..a6e51d372 --- /dev/null +++ b/raft/rafttest/interaction_env_handler.go @@ -0,0 +1,159 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strconv" + "testing" + + "github.com/cockroachdb/datadriven" +) + +// Handle is the entrypoint for data-driven interaction testing. Commands and +// parameters are parsed from the supplied TestData. Errors during data parsing +// are reported via the supplied *testing.T; errors from the raft nodes and the +// storage engine are reported to the output buffer. +func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { + env.Output.Reset() + var err error + switch d.Cmd { + case "add-nodes": + // Example: + // + // add-nodes voters=(1 2 3) learners=(4 5) index=2 content=foo + err = env.handleAddNodes(t, d) + case "campaign": + // Example: + // + // campaign + err = env.handleCampaign(t, d) + case "compact": + // Example: + // + // compact + err = env.handleCompact(t, d) + case "deliver-msgs": + // Deliver the messages for a given recipient. + // + // Example: + // + // deliver-msgs + err = env.handleDeliverMsgs(t, d) + case "process-ready": + // Example: + // + // process-ready 3 + err = env.handleProcessReady(t, d) + case "log-level": + // Set the log level. NONE disables all output, including from the test + // harness (except errors). + // + // Example: + // + // log-level WARN + err = env.handleLogLevel(t, d) + case "raft-log": + // Print the Raft log. + // + // Example: + // + // raft-log 3 + err = env.handleRaftLog(t, d) + case "stabilize": + // Deliver messages to and run process-ready on the set of IDs until + // no more work is to be done. + // + // Example: + // + // stabilize 1 4 + err = env.handleStabilize(t, d) + case "status": + // Print Raft status. + // + // Example: + // + // status 5 + err = env.handleStatus(t, d) + case "tick-heartbeat": + // Tick a heartbeat interval. + // + // Example: + // + // tick-heartbeat 3 + err = env.handleTickHeartbeat(t, d) + case "propose-conf-change": + // Propose a configuration change. + // + // Example: + // + // propose-conf-change transition=explicit + // v1 v3 l4 r5 + // + // Example: + // + // propose-conf-change v1=true + // v5 + err = env.handleProposeConfChange(t, d) + default: + err = fmt.Errorf("unknown command") + } + if err != nil { + env.Output.WriteString(err.Error()) + } + // NB: the highest log level suppresses all output, including that of the + // handlers. This comes in useful during setup which can be chatty. + // However, errors are always logged. + if env.Output.Len() == 0 { + return "ok" + } + if env.Output.Lvl == len(lvlNames)-1 { + if err != nil { + return err.Error() + } + return "ok (quiet)" + } + return env.Output.String() +} + +func firstAsInt(t *testing.T, d datadriven.TestData) int { + t.Helper() + n, err := strconv.Atoi(d.CmdArgs[0].Key) + if err != nil { + t.Fatal(err) + } + return n +} + +func firstAsNodeIdx(t *testing.T, d datadriven.TestData) int { + t.Helper() + n := firstAsInt(t, d) + return n - 1 +} + +func ints(t *testing.T, d datadriven.TestData) []int { + var ints []int + for i := 0; i < len(d.CmdArgs); i++ { + if len(d.CmdArgs[i].Vals) != 0 { + continue + } + n, err := strconv.Atoi(d.CmdArgs[i].Key) + if err != nil { + t.Fatal(err) + } + ints = append(ints, n) + } + return ints +} diff --git a/raft/rafttest/interaction_env_handler_add_nodes.go b/raft/rafttest/interaction_env_handler_add_nodes.go new file mode 100644 index 000000000..a68a2cbdd --- /dev/null +++ b/raft/rafttest/interaction_env_handler_add_nodes.go @@ -0,0 +1,136 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "errors" + "fmt" + "reflect" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" + pb "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) error { + n := firstAsInt(t, d) + var snap pb.Snapshot + for _, arg := range d.CmdArgs[1:] { + for i := range arg.Vals { + switch arg.Key { + case "voters": + var id uint64 + arg.Scan(t, i, &id) + snap.Metadata.ConfState.Voters = append(snap.Metadata.ConfState.Voters, id) + case "learners": + var id uint64 + arg.Scan(t, i, &id) + snap.Metadata.ConfState.Learners = append(snap.Metadata.ConfState.Learners, id) + case "index": + arg.Scan(t, i, &snap.Metadata.Index) + case "content": + arg.Scan(t, i, &snap.Data) + } + } + } + return env.AddNodes(n, snap) +} + +type snapOverrideStorage struct { + Storage + snapshotOverride func() (pb.Snapshot, error) +} + +func (s snapOverrideStorage) Snapshot() (pb.Snapshot, error) { + if s.snapshotOverride != nil { + return s.snapshotOverride() + } + return s.Storage.Snapshot() +} + +var _ raft.Storage = snapOverrideStorage{} + +// AddNodes adds n new nodes initializes from the given snapshot (which may be +// empty). They will be assigned consecutive IDs. +func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { + bootstrap := !reflect.DeepEqual(snap, pb.Snapshot{}) + for i := 0; i < n; i++ { + id := uint64(1 + len(env.Nodes)) + s := snapOverrideStorage{ + Storage: raft.NewMemoryStorage(), + // When you ask for a snapshot, you get the most recent snapshot. + // + // TODO(tbg): this is sort of clunky, but MemoryStorage itself will + // give you some fixed snapshot and also the snapshot changes + // whenever you compact the logs and vice versa, so it's all a bit + // awkward to use. + snapshotOverride: func() (pb.Snapshot, error) { + snaps := env.Nodes[int(id-1)].History + return snaps[len(snaps)-1], nil + }, + } + if bootstrap { + // NB: we could make this work with 1, but MemoryStorage just + // doesn't play well with that and it's not a loss of generality. + if snap.Metadata.Index <= 1 { + return errors.New("index must be specified as > 1 due to bootstrap") + } + snap.Metadata.Term = 1 + if err := s.ApplySnapshot(snap); err != nil { + return err + } + fi, err := s.FirstIndex() + if err != nil { + return err + } + // At the time of writing and for *MemoryStorage, applying a + // snapshot also truncates appropriately, but this would change with + // other storage engines potentially. + if exp := snap.Metadata.Index + 1; fi != exp { + return fmt.Errorf("failed to establish first index %d; got %d", exp, fi) + } + } + cfg := defaultRaftConfig(id, snap.Metadata.Index, s) + if env.Options.OnConfig != nil { + env.Options.OnConfig(cfg) + if cfg.ID != id { + // This could be supported but then we need to do more work + // translating back and forth -- not worth it. + return errors.New("OnConfig must not change the ID") + } + } + if cfg.Logger != nil { + return errors.New("OnConfig must not set Logger") + } + cfg.Logger = env.Output + + rn, err := raft.NewRawNode(cfg) + if err != nil { + return err + } + + node := Node{ + RawNode: rn, + // TODO(tbg): allow a more general Storage, as long as it also allows + // us to apply snapshots, append entries, and update the HardState. + Storage: s, + Config: cfg, + History: []pb.Snapshot{snap}, + } + env.Nodes = append(env.Nodes, node) + } + return nil +} diff --git a/raft/rafttest/interaction_env_handler_campaign.go b/raft/rafttest/interaction_env_handler_campaign.go new file mode 100644 index 000000000..bde5cc42e --- /dev/null +++ b/raft/rafttest/interaction_env_handler_campaign.go @@ -0,0 +1,31 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleCampaign(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.Campaign(t, idx) +} + +// Campaign the node at the given index. +func (env *InteractionEnv) Campaign(t *testing.T, idx int) error { + return env.Nodes[idx].Campaign() +} diff --git a/raft/rafttest/interaction_env_handler_compact.go b/raft/rafttest/interaction_env_handler_compact.go new file mode 100644 index 000000000..25fa1d22c --- /dev/null +++ b/raft/rafttest/interaction_env_handler_compact.go @@ -0,0 +1,40 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "strconv" + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleCompact(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + newFirstIndex, err := strconv.ParseUint(d.CmdArgs[1].Key, 10, 64) + if err != nil { + return err + } + return env.Compact(idx, newFirstIndex) +} + +// Compact truncates the log on the node at index idx so that the supplied new +// first index results. +func (env *InteractionEnv) Compact(idx int, newFirstIndex uint64) error { + if err := env.Nodes[idx].Compact(newFirstIndex); err != nil { + return err + } + return env.RaftLog(idx) +} diff --git a/raft/rafttest/interaction_env_handler_deliver_msgs.go b/raft/rafttest/interaction_env_handler_deliver_msgs.go new file mode 100644 index 000000000..c4eb675c5 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_deliver_msgs.go @@ -0,0 +1,58 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "errors" + "fmt" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData) error { + if len(env.Messages) == 0 { + return errors.New("no messages to deliver") + } + + msgs := env.Messages + env.Messages = nil + + return env.DeliverMsgs(msgs) +} + +// DeliverMsgs delivers the supplied messages typically taken from env.Messages. +func (env *InteractionEnv) DeliverMsgs(msgs []raftpb.Message) error { + for _, msg := range msgs { + toIdx := int(msg.To - 1) + var drop bool + if toIdx >= len(env.Nodes) { + // Drop messages for peers that don't exist yet. + drop = true + env.Output.WriteString("dropped: ") + } + fmt.Fprintln(env.Output, raft.DescribeMessage(msg, defaultEntryFormatter)) + if drop { + continue + } + if err := env.Nodes[toIdx].Step(msg); err != nil { + env.Output.WriteString(err.Error()) + continue + } + } + return nil +} diff --git a/raft/rafttest/interaction_env_handler_log_level.go b/raft/rafttest/interaction_env_handler_log_level.go new file mode 100644 index 000000000..a61ba37cd --- /dev/null +++ b/raft/rafttest/interaction_env_handler_log_level.go @@ -0,0 +1,37 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleLogLevel(t *testing.T, d datadriven.TestData) error { + return env.LogLevel(d.CmdArgs[0].Key) +} + +func (env *InteractionEnv) LogLevel(name string) error { + for i, s := range lvlNames { + if strings.ToLower(s) == strings.ToLower(name) { + env.Output.Lvl = i + return nil + } + } + return fmt.Errorf("log levels must be either of %v", lvlNames) +} diff --git a/raft/rafttest/interaction_env_handler_process_ready.go b/raft/rafttest/interaction_env_handler_process_ready.go new file mode 100644 index 000000000..ff5cf2bd9 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_process_ready.go @@ -0,0 +1,96 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/quorum" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleProcessReady(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.ProcessReady(idx) +} + +// ProcessReady runs Ready handling on the node with the given index. +func (env *InteractionEnv) ProcessReady(idx int) error { + // TODO(tbg): Allow simulating crashes here. + rn, s := env.Nodes[idx].RawNode, env.Nodes[idx].Storage + rd := rn.Ready() + // TODO(tbg): the order of operations here is not necessarily safe. See: + // https://github.com/etcd-io/etcd/pull/10861 + if !raft.IsEmptyHardState(rd.HardState) { + if err := s.SetHardState(rd.HardState); err != nil { + return err + } + } + if err := s.Append(rd.Entries); err != nil { + return err + } + if !raft.IsEmptySnap(rd.Snapshot) { + if err := s.ApplySnapshot(rd.Snapshot); err != nil { + return err + } + } + for _, ent := range rd.CommittedEntries { + var update []byte + switch ent.Type { + case raftpb.EntryConfChange: + var cc raftpb.ConfChange + if err := cc.Unmarshal(ent.Data); err != nil { + return err + } + update = cc.Context + rn.ApplyConfChange(cc) + case raftpb.EntryConfChangeV2: + var cc raftpb.ConfChangeV2 + if err := cc.Unmarshal(ent.Data); err != nil { + return err + } + rn.ApplyConfChange(cc) + update = cc.Context + default: + update = ent.Data + } + + // Record the new state by starting with the current state and applying + // the command. + lastSnap := env.Nodes[idx].History[len(env.Nodes[idx].History)-1] + var snap raftpb.Snapshot + snap.Data = append(snap.Data, lastSnap.Data...) + // NB: this hard-codes an "appender" state machine. + snap.Data = append(snap.Data, update...) + snap.Metadata.Index = ent.Index + snap.Metadata.Term = ent.Term + cfg := rn.Status().Config + snap.Metadata.ConfState = raftpb.ConfState{ + Voters: cfg.Voters[0].Slice(), + VotersOutgoing: cfg.Voters[1].Slice(), + Learners: quorum.MajorityConfig(cfg.Learners).Slice(), + LearnersNext: quorum.MajorityConfig(cfg.LearnersNext).Slice(), + } + env.Nodes[idx].History = append(env.Nodes[idx].History, snap) + } + for _, msg := range rd.Messages { + env.Messages = append(env.Messages, msg) + } + rn.Advance(rd) + env.Output.WriteString(raft.DescribeReady(rd, defaultEntryFormatter)) + return nil +} diff --git a/raft/rafttest/interaction_env_handler_propose_conf_change.go b/raft/rafttest/interaction_env_handler_propose_conf_change.go new file mode 100644 index 000000000..278339675 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_propose_conf_change.go @@ -0,0 +1,82 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strconv" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleProposeConfChange(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + var v1 bool + transition := raftpb.ConfChangeTransitionAuto + for _, arg := range d.CmdArgs[1:] { + for _, val := range arg.Vals { + switch arg.Key { + case "v1": + var err error + v1, err = strconv.ParseBool(val) + if err != nil { + return err + } + case "transition": + switch val { + case "auto": + transition = raftpb.ConfChangeTransitionAuto + case "implicit": + transition = raftpb.ConfChangeTransitionJointImplicit + case "explicit": + transition = raftpb.ConfChangeTransitionJointExplicit + default: + return fmt.Errorf("unknown transition %s", val) + } + default: + return fmt.Errorf("unknown command %s", arg.Key) + } + } + } + + ccs, err := raftpb.ConfChangesFromString(d.Input) + if err != nil { + return err + } + + var c raftpb.ConfChangeI + if v1 { + if len(ccs) > 1 || transition != raftpb.ConfChangeTransitionAuto { + return fmt.Errorf("v1 conf change can only have one operation and no transition") + } + c = raftpb.ConfChange{ + Type: ccs[0].Type, + NodeID: ccs[0].NodeID, + } + } else { + c = raftpb.ConfChangeV2{ + Transition: transition, + Changes: ccs, + } + } + return env.ProposeConfChange(idx, c) +} + +// ProposeConfChange proposes a configuration change on the node with the given index. +func (env *InteractionEnv) ProposeConfChange(idx int, c raftpb.ConfChangeI) error { + return env.Nodes[idx].ProposeConfChange(c) +} diff --git a/raft/rafttest/interaction_env_handler_raft_log.go b/raft/rafttest/interaction_env_handler_raft_log.go new file mode 100644 index 000000000..a99db29f3 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_raft_log.go @@ -0,0 +1,60 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "math" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" +) + +func (env *InteractionEnv) writeErr(err error) { + if err != nil { + env.Output.WriteString(err.Error()) + } +} + +func (env *InteractionEnv) handleRaftLog(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.RaftLog(idx) +} + +// RaftLog pretty prints the raft log to the output buffer. +func (env *InteractionEnv) RaftLog(idx int) error { + s := env.Nodes[idx].Storage + fi, err := s.FirstIndex() + if err != nil { + return err + } + li, err := s.LastIndex() + if err != nil { + return err + } + if li < fi { + // TODO(tbg): this is what MemoryStorage returns, but unclear if it's + // the "correct" thing to do. + fmt.Fprintf(env.Output, "log is empty: first index=%d, last index=%d", fi, li) + return nil + } + ents, err := s.Entries(fi, li+1, math.MaxUint64) + if err != nil { + return err + } + env.Output.WriteString(raft.DescribeEntries(ents, defaultEntryFormatter)) + return err +} diff --git a/raft/rafttest/interaction_env_handler_stabilize.go b/raft/rafttest/interaction_env_handler_stabilize.go new file mode 100644 index 000000000..16baa4373 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_stabilize.go @@ -0,0 +1,92 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "bufio" + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleStabilize(t *testing.T, d datadriven.TestData) error { + var idxs []int + for _, id := range ints(t, d) { + idxs = append(idxs, id-1) + } + return env.Stabilize(idxs...) +} + +// Stabilize repeatedly runs Ready handling on and message delivery to the set +// of nodes specified via the idxs slice until reaching a fixed point. +func (env *InteractionEnv) Stabilize(idxs ...int) error { + var nodes []Node + for _, idx := range idxs { + nodes = append(nodes, env.Nodes[idx]) + } + if len(nodes) == 0 { + nodes = env.Nodes + } + + withIndent := func(f func()) { + orig := env.Output.Builder + env.Output.Builder = &strings.Builder{} + f() + + scanner := bufio.NewScanner(strings.NewReader(env.Output.Builder.String())) + for scanner.Scan() { + orig.WriteString(" " + scanner.Text() + "\n") + } + env.Output.Builder = orig + } + + for { + done := true + for _, rn := range nodes { + if rn.HasReady() { + done = false + idx := int(rn.Status().ID - 1) + fmt.Fprintf(env.Output, "> %d handling Ready\n", idx+1) + withIndent(func() { env.ProcessReady(idx) }) + } + } + var msgs []raftpb.Message + for _, rn := range nodes { + msgs, env.Messages = splitMsgs(env.Messages, rn.Status().ID) + if len(msgs) > 0 { + fmt.Fprintf(env.Output, "> delivering messages\n") + withIndent(func() { env.DeliverMsgs(msgs) }) + done = false + } + if done { + return nil + } + } + } +} + +func splitMsgs(msgs []raftpb.Message, to uint64) (toMsgs []raftpb.Message, rmdr []raftpb.Message) { + for _, msg := range msgs { + if msg.To == to { + toMsgs = append(toMsgs, msg) + } else { + rmdr = append(rmdr, msg) + } + } + return toMsgs, rmdr +} diff --git a/raft/rafttest/interaction_env_handler_status.go b/raft/rafttest/interaction_env_handler_status.go new file mode 100644 index 000000000..3a0c953e5 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_status.go @@ -0,0 +1,42 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft/tracker" +) + +func (env *InteractionEnv) handleStatus(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.Status(idx) +} + +// Status pretty-prints the raft status for the node at the given index to the output +// buffer. +func (env *InteractionEnv) Status(idx int) error { + // TODO(tbg): actually print the full status. + st := env.Nodes[idx].Status() + m := tracker.ProgressMap{} + for id, pr := range st.Progress { + pr := pr // loop-local copy + m[id] = &pr + } + fmt.Fprint(env.Output, m) + return nil +} diff --git a/raft/rafttest/interaction_env_handler_tick_heartbeat.go b/raft/rafttest/interaction_env_handler_tick_heartbeat.go new file mode 100644 index 000000000..349ca78ef --- /dev/null +++ b/raft/rafttest/interaction_env_handler_tick_heartbeat.go @@ -0,0 +1,34 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleTickHeartbeat(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.Tick(idx, env.Nodes[idx].Config.HeartbeatTick) +} + +// Tick the node at the given index the given number of times. +func (env *InteractionEnv) Tick(idx int, num int) error { + for i := 0; i < num; i++ { + env.Nodes[idx].Tick() + } + return nil +} diff --git a/raft/rafttest/interaction_env_logger.go b/raft/rafttest/interaction_env_logger.go new file mode 100644 index 000000000..00fd57ee7 --- /dev/null +++ b/raft/rafttest/interaction_env_logger.go @@ -0,0 +1,107 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strings" + + "go.etcd.io/etcd/raft" +) + +type logLevels [6]string + +func (l logLevels) strToLev(s string) int { + for i, lvl := range l { + if strings.ToUpper(s) == lvl { + return i + } + } + panic(fmt.Sprintf("unknown level %q", s)) +} + +var lvlNames logLevels = [...]string{"DEBUG", "INFO", "WARN", "ERROR", "FATAL", "NONE"} + +type RedirectLogger struct { + *strings.Builder + Lvl int // 0 = DEBUG, 1 = INFO, 2 = WARNING, 3 = ERROR, 4 = FATAL, 5 = NONE +} + +var _ raft.Logger = (*RedirectLogger)(nil) + +func (l *RedirectLogger) printf(lvl int, format string, args ...interface{}) { + if l.Lvl <= lvl { + fmt.Fprint(l, lvlNames[lvl], " ") + fmt.Fprintf(l, format, args...) + if n := len(format); n > 0 && format[n-1] != '\n' { + l.WriteByte('\n') + } + } +} +func (l *RedirectLogger) print(lvl int, args ...interface{}) { + if l.Lvl <= lvl { + fmt.Fprint(l, lvlNames[lvl], " ") + fmt.Fprintln(l, args...) + } +} + +func (l *RedirectLogger) Debug(v ...interface{}) { + l.print(0, v...) +} + +func (l *RedirectLogger) Debugf(format string, v ...interface{}) { + l.printf(0, format, v...) +} + +func (l *RedirectLogger) Info(v ...interface{}) { + l.print(1, v...) +} + +func (l *RedirectLogger) Infof(format string, v ...interface{}) { + l.printf(1, format, v...) +} + +func (l *RedirectLogger) Warning(v ...interface{}) { + l.print(2, v...) +} + +func (l *RedirectLogger) Warningf(format string, v ...interface{}) { + l.printf(2, format, v...) +} + +func (l *RedirectLogger) Error(v ...interface{}) { + l.print(3, v...) +} + +func (l *RedirectLogger) Errorf(format string, v ...interface{}) { + l.printf(3, format, v...) +} + +func (l *RedirectLogger) Fatal(v ...interface{}) { + l.print(4, v...) +} + +func (l *RedirectLogger) Fatalf(format string, v ...interface{}) { + + l.printf(4, format, v...) +} + +func (l *RedirectLogger) Panic(v ...interface{}) { + l.print(4, v...) +} + +func (l *RedirectLogger) Panicf(format string, v ...interface{}) { + l.printf(4, format, v...) +} diff --git a/raft/rafttest/network.go b/raft/rafttest/network.go index ee30fc0c4..5e7cdf5ce 100644 --- a/raft/rafttest/network.go +++ b/raft/rafttest/network.go @@ -44,6 +44,7 @@ type network interface { } type raftNetwork struct { + rand *rand.Rand mu sync.Mutex disconnected map[uint64]bool dropmap map[conn]float64 @@ -62,6 +63,7 @@ type delay struct { func newRaftNetwork(nodes ...uint64) *raftNetwork { pn := &raftNetwork{ + rand: rand.New(rand.NewSource(1)), recvQueues: make(map[uint64]chan raftpb.Message), dropmap: make(map[conn]float64), delaymap: make(map[conn]delay), @@ -91,12 +93,12 @@ func (rn *raftNetwork) send(m raftpb.Message) { if to == nil { return } - if drop != 0 && rand.Float64() < drop { + if drop != 0 && rn.rand.Float64() < drop { return } // TODO: shall we dl without blocking the send call? - if dl.d != 0 && rand.Float64() < dl.rate { - rd := rand.Int63n(int64(dl.d)) + if dl.d != 0 && rn.rand.Float64() < dl.rate { + rd := rn.rand.Int63n(int64(dl.d)) time.Sleep(time.Duration(rd)) } diff --git a/raft/storage.go b/raft/storage.go index 14ad68608..6be574590 100644 --- a/raft/storage.go +++ b/raft/storage.go @@ -44,6 +44,8 @@ var ErrSnapshotTemporarilyUnavailable = errors.New("snapshot is temporarily unav // become inoperable and refuse to participate in elections; the // application is responsible for cleanup and recovery in this case. type Storage interface { + // TODO(tbg): split this into two interfaces, LogStorage and StateStorage. + // InitialState returns the saved HardState and ConfState information. InitialState() (pb.HardState, pb.ConfState, error) // Entries returns a slice of log entries in the range [lo,hi). diff --git a/raft/testdata/campaign.txt b/raft/testdata/campaign.txt new file mode 100644 index 000000000..19bb06b9e --- /dev/null +++ b/raft/testdata/campaign.txt @@ -0,0 +1,117 @@ +log-level info +---- +ok + +add-nodes 3 voters=(1,2,3) index=2 +---- +INFO 1 switched to configuration voters=(1 2 3) +INFO 1 became follower at term 0 +INFO newRaft 1 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] +INFO 2 switched to configuration voters=(1 2 3) +INFO 2 became follower at term 0 +INFO newRaft 2 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] +INFO 3 switched to configuration voters=(1 2 3) +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] + +campaign 1 +---- +INFO 1 is starting a new election at term 0 +INFO 1 became candidate at term 1 +INFO 1 received MsgVoteResp from 1 at term 1 +INFO 1 [logterm: 1, index: 2] sent MsgVote request to 2 at term 1 +INFO 1 [logterm: 1, index: 2] sent MsgVote request to 3 at term 1 + +stabilize +---- +> 1 handling Ready + Ready MustSync=true: + Lead:0 State:StateCandidate + HardState Term:1 Vote:1 Commit:2 + Messages: + 1->2 MsgVote Term:1 Log:1/2 + 1->3 MsgVote Term:1 Log:1/2 +> delivering messages + 1->2 MsgVote Term:1 Log:1/2 + INFO 2 [term: 0] received a MsgVote message with higher term from 1 [term: 1] + INFO 2 became follower at term 1 + INFO 2 [logterm: 1, index: 2, vote: 0] cast MsgVote for 1 [logterm: 1, index: 2] at term 1 +> delivering messages + 1->3 MsgVote Term:1 Log:1/2 + INFO 3 [term: 0] received a MsgVote message with higher term from 1 [term: 1] + INFO 3 became follower at term 1 + INFO 3 [logterm: 1, index: 2, vote: 0] cast MsgVote for 1 [logterm: 1, index: 2] at term 1 +> 2 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:2 + Messages: + 2->1 MsgVoteResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:2 + Messages: + 3->1 MsgVoteResp Term:1 Log:0/0 +> delivering messages + 2->1 MsgVoteResp Term:1 Log:0/0 + INFO 1 received MsgVoteResp from 2 at term 1 + INFO 1 has received 2 MsgVoteResp votes and 0 vote rejections + INFO 1 became leader at term 1 + 3->1 MsgVoteResp Term:1 Log:0/0 +> 1 handling Ready + Ready MustSync=true: + Lead:1 State:StateLeader + Entries: + 1/3 EntryNormal "" + Messages: + 1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] + 1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> delivering messages + 1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> delivering messages + 1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> 2 handling Ready + Ready MustSync=true: + Lead:1 State:StateFollower + Entries: + 1/3 EntryNormal "" + Messages: + 2->1 MsgAppResp Term:1 Log:0/3 +> 3 handling Ready + Ready MustSync=true: + Lead:1 State:StateFollower + Entries: + 1/3 EntryNormal "" + Messages: + 3->1 MsgAppResp Term:1 Log:0/3 +> delivering messages + 2->1 MsgAppResp Term:1 Log:0/3 + 3->1 MsgAppResp Term:1 Log:0/3 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:3 + CommittedEntries: + 1/3 EntryNormal "" + Messages: + 1->2 MsgApp Term:1 Log:1/3 Commit:3 + 1->3 MsgApp Term:1 Log:1/3 Commit:3 +> delivering messages + 1->2 MsgApp Term:1 Log:1/3 Commit:3 +> delivering messages + 1->3 MsgApp Term:1 Log:1/3 Commit:3 +> 2 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:3 + CommittedEntries: + 1/3 EntryNormal "" + Messages: + 2->1 MsgAppResp Term:1 Log:0/3 +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:3 + CommittedEntries: + 1/3 EntryNormal "" + Messages: + 3->1 MsgAppResp Term:1 Log:0/3 +> delivering messages + 2->1 MsgAppResp Term:1 Log:0/3 + 3->1 MsgAppResp Term:1 Log:0/3 diff --git a/raft/testdata/confchange_v1.txt b/raft/testdata/confchange_v1.txt new file mode 100644 index 000000000..57fb51f9a --- /dev/null +++ b/raft/testdata/confchange_v1.txt @@ -0,0 +1,78 @@ +add-nodes 1 voters=(1) index=2 +---- +INFO 1 switched to configuration voters=(1) +INFO 1 became follower at term 0 +INFO newRaft 1 [peers: [1], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] + +campaign 1 +---- +INFO 1 is starting a new election at term 0 +INFO 1 became candidate at term 1 +INFO 1 received MsgVoteResp from 1 at term 1 +INFO 1 became leader at term 1 + +propose-conf-change 1 +v2 v3 +---- +ok + +add-nodes 2 + +process-ready 1 +---- +INFO 2 switched to configuration voters=() +INFO 2 became follower at term 0 +INFO newRaft 2 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0] +INFO 3 switched to configuration voters=() +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0] + +stabilize 1 +---- +> 1 handling Ready + INFO 1 switched to configuration voters=(1 2 3)&&(1) autoleave + INFO initiating automatic transition out of joint configuration voters=(1 2 3)&&(1) autoleave + Ready MustSync=true: + Lead:1 State:StateLeader + HardState Term:1 Vote:1 Commit:4 + Entries: + 1/3 EntryNormal "" + 1/4 EntryConfChangeV2 v2 v3 + CommittedEntries: + 1/3 EntryNormal "" + 1/4 EntryConfChangeV2 v2 v3 +> 1 handling Ready + Ready MustSync=true: + Entries: + 1/5 EntryConfChangeV2 + +# NB: this test is broken from here on because the leader doesn't propagate the +# commit index proactively, see the buglet #11002. + +stabilize 2 +---- +ok + +stabilize 1 +---- +ok + +stabilize 2 +---- +ok + +stabilize 1 +---- +ok + +stabilize 2 +---- +ok + +stabilize 1 +---- +ok + +stabilize 2 +---- +ok diff --git a/raft/testdata/snapshot_succeed_via_app_resp.txt b/raft/testdata/snapshot_succeed_via_app_resp.txt new file mode 100644 index 000000000..ea82d1b29 --- /dev/null +++ b/raft/testdata/snapshot_succeed_via_app_resp.txt @@ -0,0 +1,141 @@ +# TestSnapshotSucceedViaAppResp regression tests the situation in which a snap- +# shot is sent to a follower at the most recent index (i.e. the snapshot index +# is the leader's last index is the committed index). In that situation, a bug +# in the past left the follower in probing status until the next log entry was +# committed. +# +# See https://github.com/etcd-io/etcd/pull/10308 for additional background. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with two nodes, but the config already has a third. +add-nodes 2 voters=(1,2,3) index=10 +---- +ok + +campaign 1 +---- +ok + +# Fully replicate everything, including the leader's empty index. +stabilize +---- +ok (quiet) + +compact 1 11 +---- +ok (quiet) + +# Drop inflight messages to n3. +deliver-msgs 3 +---- +ok (quiet) + +# Show the Raft log messages from now on. +log-level debug +---- +ok + +status 1 +---- +1: StateReplicate match=11 next=12 inactive +2: StateReplicate match=11 next=12 +3: StateProbe match=0 next=11 paused inactive + +# Add the node that will receive a snapshot (it has no state at all, does not +# even have a config). +add-nodes 1 +---- +INFO 3 switched to configuration voters=() +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0] + +# Time passes on the leader so that it will try the previously missing follower +# again. +tick-heartbeat 1 +---- +ok + +process-ready 1 +---- +Ready MustSync=false: +Messages: +1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 +1->3 MsgHeartbeat Term:1 Log:0/0 + +# Iterate until no more work is done by the new peer. It receives the heartbeat +# and responds. +stabilize 3 +---- +> delivering messages + 1->3 MsgHeartbeat Term:1 Log:0/0 + INFO 3 [term: 0] received a MsgHeartbeat message with higher term from 1 [term: 1] + INFO 3 became follower at term 1 +> 3 handling Ready + Ready MustSync=true: + Lead:1 State:StateFollower + HardState Term:1 Commit:0 + Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + +# The leader in turn will realize that n3 needs a snapshot, which it initiates. +stabilize 1 +---- +> delivering messages + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + DEBUG 1 [firstindex: 12, commit: 11] sent snapshot[index: 11, term: 1] to 3 [StateProbe match=0 next=11] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=11 paused pendingSnap=11] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] + +status 1 +---- +1: StateReplicate match=11 next=12 inactive +2: StateReplicate match=11 next=12 +3: StateSnapshot match=0 next=11 paused pendingSnap=11 + +# Follower applies the snapshot. Note how it reacts with a MsgAppResp upon completion. +# The snapshot fully catches the follower up (i.e. there are no more log entries it +# needs to apply after). The bug was that the leader failed to realize that the follower +# was now fully caught up. +stabilize 3 +---- +> delivering messages + 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] + INFO log [committed=0, applied=0, unstable.offset=1, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1] + INFO 3 switched to configuration voters=(1 2 3) + INFO 3 [commit: 11, lastindex: 11, lastterm: 1] restored snapshot [index: 11, term: 1] + INFO 3 [commit: 11] restored snapshot [index: 11, term: 1] +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Commit:11 + Snapshot Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] + Messages: + 3->1 MsgAppResp Term:1 Log:0/11 + +# The MsgAppResp lets the leader move the follower back to replicating state. +# Leader sends another MsgAppResp, to communicate the updated commit index. +stabilize 1 +---- +> delivering messages + 3->1 MsgAppResp Term:1 Log:0/11 + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=11] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/11 Commit:11 + +status 1 +---- +1: StateReplicate match=11 next=12 inactive +2: StateReplicate match=11 next=12 +3: StateReplicate match=11 next=12 + +stabilize +---- +ok diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index 697277b26..62c81f45a 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -52,6 +52,8 @@ type Progress struct { // RecentActive is true if the progress is recently active. Receiving any messages // from the corresponding follower indicates the progress is active. // RecentActive can be reset to false after an election timeout. + // + // TODO(tbg): the leader should always have this set to true. RecentActive bool // ProbeSent is used while this follower is in StateProbe. When ProbeSent is diff --git a/raft/util.go b/raft/util.go index 8394f647c..881a6e14e 100644 --- a/raft/util.go +++ b/raft/util.go @@ -17,6 +17,7 @@ package raft import ( "bytes" "fmt" + "strings" pb "go.etcd.io/etcd/raft/raftpb" ) @@ -60,6 +61,69 @@ func voteRespMsgType(msgt pb.MessageType) pb.MessageType { } } +func DescribeHardState(hs pb.HardState) string { + var buf strings.Builder + fmt.Fprintf(&buf, "Term:%d", hs.Term) + if hs.Vote != 0 { + fmt.Fprintf(&buf, " Vote:%d", hs.Vote) + } + fmt.Fprintf(&buf, " Commit:%d", hs.Commit) + return buf.String() +} + +func DescribeSoftState(ss SoftState) string { + return fmt.Sprintf("Lead:%d State:%s", ss.Lead, ss.RaftState) +} + +func DescribeConfState(state pb.ConfState) string { + return fmt.Sprintf( + "Voters:%v VotersOutgoing:%v Learners:%v LearnersNext:%v", + state.Voters, state.VotersOutgoing, state.Learners, state.LearnersNext, + ) +} + +func DescribeSnapshot(snap pb.Snapshot) string { + m := snap.Metadata + return fmt.Sprintf("Index:%d Term:%d ConfState:%s", m.Index, m.Term, DescribeConfState(m.ConfState)) +} + +func DescribeReady(rd Ready, f EntryFormatter) string { + var buf strings.Builder + if rd.SoftState != nil { + fmt.Fprint(&buf, DescribeSoftState(*rd.SoftState)) + buf.WriteByte('\n') + } + if !IsEmptyHardState(rd.HardState) { + fmt.Fprintf(&buf, "HardState %s", DescribeHardState(rd.HardState)) + buf.WriteByte('\n') + } + if len(rd.ReadStates) > 0 { + fmt.Fprintf(&buf, "ReadStates %v\n", rd.ReadStates) + } + if len(rd.Entries) > 0 { + buf.WriteString("Entries:\n") + fmt.Fprint(&buf, DescribeEntries(rd.Entries, f)) + } + if !IsEmptySnap(rd.Snapshot) { + fmt.Fprintf(&buf, "Snapshot %s\n", DescribeSnapshot(rd.Snapshot)) + } + if len(rd.CommittedEntries) > 0 { + buf.WriteString("CommittedEntries:\n") + fmt.Fprint(&buf, DescribeEntries(rd.CommittedEntries, f)) + } + if len(rd.Messages) > 0 { + buf.WriteString("Messages:\n") + for _, msg := range rd.Messages { + fmt.Fprint(&buf, DescribeMessage(msg, f)) + buf.WriteByte('\n') + } + } + if buf.Len() > 0 { + return fmt.Sprintf("Ready MustSync=%t:\n%s", rd.MustSync, buf.String()) + } + return "" +} + // EntryFormatter can be implemented by the application to provide human-readable formatting // of entry data. Nil is a valid EntryFormatter and will use a default format. type EntryFormatter func([]byte) string @@ -86,7 +150,7 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string { fmt.Fprintf(&buf, "]") } if !IsEmptySnap(m.Snapshot) { - fmt.Fprintf(&buf, " Snapshot:%v", m.Snapshot) + fmt.Fprintf(&buf, " Snapshot: %s", DescribeSnapshot(m.Snapshot)) } return buf.String() } @@ -100,13 +164,39 @@ func PayloadSize(e pb.Entry) int { // DescribeEntry returns a concise human-readable description of an // Entry for debugging. func DescribeEntry(e pb.Entry, f EntryFormatter) string { - var formatted string - if e.Type == pb.EntryNormal && f != nil { - formatted = f(e.Data) - } else { - formatted = fmt.Sprintf("%q", e.Data) + if f == nil { + f = func(data []byte) string { return fmt.Sprintf("%q", data) } } - return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted) + + formatConfChange := func(cc pb.ConfChangeI) string { + // TODO(tbg): give the EntryFormatter a type argument so that it gets + // a chance to expose the Context. + return pb.ConfChangesToString(cc.AsV2().Changes) + } + + var formatted string + switch e.Type { + case pb.EntryNormal: + formatted = f(e.Data) + case pb.EntryConfChange: + var cc pb.ConfChange + if err := cc.Unmarshal(e.Data); err != nil { + formatted = err.Error() + } else { + formatted = formatConfChange(cc) + } + case pb.EntryConfChangeV2: + var cc pb.ConfChangeV2 + if err := cc.Unmarshal(e.Data); err != nil { + formatted = err.Error() + } else { + formatted = formatConfChange(cc) + } + } + if formatted != "" { + formatted = " " + formatted + } + return fmt.Sprintf("%d/%d %s%s", e.Term, e.Index, e.Type, formatted) } // DescribeEntries calls DescribeEntry for each Entry, adding a newline to