From e8090e57a2b0f861a05a4ab2ce6899363682b7c7 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 12 Aug 2019 11:13:51 +0200 Subject: [PATCH] raft/rafttest: introduce datadriven testing It has often been tedious to test the interactions between multi-member Raft groups, especially when many steps were required to reach a certain scenario. Often, this boilerplate was as boring as it is hard to write and hard to maintain, making it attractive to resort to shortcuts whenever possible, which in turn tended to undercut how meaningful and maintainable the tests ended up being - that is, if the tests were even written, which sometimes they weren't. This change introduces a datadriven framework specifically for testing deterministically the interaction between multiple members of a raft group with the goal of reducing the friction for writing these tests to near zero. In the near term, this will be used to add thorough testing for joint consensus (which is already available today, but wildly undertested), but just converting an existing test into this framework has shown that the concise representation and built-in inspection of log messages highlights unexpected behavior much more readily than the previous unit tests did (the test in question is `snapshot_succeed_via_app_resp`; the reader is invited to compare the old and new version of it). The main building block is `InteractionEnv`, which holds on to the state of the whole system and exposes various relevant methods for manipulating it, including but not limited to adding nodes, delivering and dropping messages, and proposing configuration changes. All of this is extensible so that in the future I hope to use it to explore the phenomena discussed in https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263 which requires injecting appropriate "crash points" in the Ready handling loop. Discussions of the "what if X happened in state Y" can quickly be made concrete by "scripting up an interaction test". Additionally, this framework is intentionally not kept internal to the raft package.. Though this is in its infancy, a goal is that it should be possible for a suite of interaction tests to allow applications to validate that their Storage implementation behaves accordingly, simply by running a raft-provided interaction suite against their Storage. --- raft/interaction_test.go | 31 ++++ raft/raft.go | 13 +- raft/raft_snap_test.go | 120 ------------- raft/raftpb/confchange.go | 65 +++++++ raft/rafttest/interaction_env.go | 90 ++++++++++ raft/rafttest/interaction_env_handler.go | 159 ++++++++++++++++++ .../interaction_env_handler_add_nodes.go | 136 +++++++++++++++ .../interaction_env_handler_campaign.go | 31 ++++ .../interaction_env_handler_compact.go | 40 +++++ .../interaction_env_handler_deliver_msgs.go | 58 +++++++ .../interaction_env_handler_log_level.go | 37 ++++ .../interaction_env_handler_process_ready.go | 96 +++++++++++ ...raction_env_handler_propose_conf_change.go | 82 +++++++++ .../interaction_env_handler_raft_log.go | 60 +++++++ .../interaction_env_handler_stabilize.go | 92 ++++++++++ .../interaction_env_handler_status.go | 42 +++++ .../interaction_env_handler_tick_heartbeat.go | 34 ++++ raft/rafttest/interaction_env_logger.go | 107 ++++++++++++ raft/rafttest/network.go | 8 +- raft/storage.go | 2 + raft/testdata/campaign.txt | 117 +++++++++++++ raft/testdata/confchange_v1.txt | 78 +++++++++ .../snapshot_succeed_via_app_resp.txt | 141 ++++++++++++++++ raft/tracker/progress.go | 2 + raft/util.go | 104 +++++++++++- 25 files changed, 1613 insertions(+), 132 deletions(-) create mode 100644 raft/interaction_test.go create mode 100644 raft/rafttest/interaction_env.go create mode 100644 raft/rafttest/interaction_env_handler.go create mode 100644 raft/rafttest/interaction_env_handler_add_nodes.go create mode 100644 raft/rafttest/interaction_env_handler_campaign.go create mode 100644 raft/rafttest/interaction_env_handler_compact.go create mode 100644 raft/rafttest/interaction_env_handler_deliver_msgs.go create mode 100644 raft/rafttest/interaction_env_handler_log_level.go create mode 100644 raft/rafttest/interaction_env_handler_process_ready.go create mode 100644 raft/rafttest/interaction_env_handler_propose_conf_change.go create mode 100644 raft/rafttest/interaction_env_handler_raft_log.go create mode 100644 raft/rafttest/interaction_env_handler_stabilize.go create mode 100644 raft/rafttest/interaction_env_handler_status.go create mode 100644 raft/rafttest/interaction_env_handler_tick_heartbeat.go create mode 100644 raft/rafttest/interaction_env_logger.go create mode 100644 raft/testdata/campaign.txt create mode 100644 raft/testdata/confchange_v1.txt create mode 100644 raft/testdata/snapshot_succeed_via_app_resp.txt diff --git a/raft/interaction_test.go b/raft/interaction_test.go new file mode 100644 index 000000000..375e636dc --- /dev/null +++ b/raft/interaction_test.go @@ -0,0 +1,31 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft_test + +import ( + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft/rafttest" +) + +func TestInteraction(t *testing.T) { + datadriven.Walk(t, "testdata", func(t *testing.T, path string) { + env := rafttest.NewInteractionEnv(nil) + datadriven.RunTest(t, path, func(d *datadriven.TestData) string { + return env.Handle(t, *d) + }) + }) +} diff --git a/raft/raft.go b/raft/raft.go index 62e79642c..83d783eb3 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "math/rand" + "sort" "strings" "sync" "time" @@ -529,7 +530,6 @@ func (r *raft) bcastAppend() { if id == r.id { return } - r.sendAppend(id) }) } @@ -795,7 +795,16 @@ func (r *raft) campaign(t CampaignType) { } return } - for id := range r.prs.Voters.IDs() { + var ids []uint64 + { + idMap := r.prs.Voters.IDs() + ids = make([]uint64, 0, len(idMap)) + for id := range idMap { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + } + for _, id := range ids { if id == r.id { continue } diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index d49c8837e..070f3a2b9 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -18,7 +18,6 @@ import ( "testing" pb "go.etcd.io/etcd/raft/raftpb" - "go.etcd.io/etcd/raft/tracker" ) var ( @@ -112,125 +111,6 @@ func TestSnapshotSucceed(t *testing.T) { } } -// TestSnapshotSucceedViaAppResp regression tests the situation in which a snap- -// shot is sent to a follower at the most recent index (i.e. the snapshot index -// is the leader's last index is the committed index). In that situation, a bug -// in the past left the follower in probing status until the next log entry was -// committed. -func TestSnapshotSucceedViaAppResp(t *testing.T) { - s1 := NewMemoryStorage() - // Create a single-node leader. - n1 := newTestRaft(1, []uint64{1}, 10, 1, s1) - n1.becomeCandidate() - n1.becomeLeader() - // We need to add a second empty entry so that we can truncate the first - // one away. - n1.Step(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - - rd := newReady(n1, &SoftState{}, pb.HardState{}) - s1.Append(rd.Entries) - s1.SetHardState(rd.HardState) - - if exp, ci := s1.lastIndex(), n1.raftLog.committed; ci != exp { - t.Fatalf("unexpected committed index %d, wanted %d: %+v", ci, exp, s1) - } - - // Force a log truncation. - if err := s1.Compact(1); err != nil { - t.Fatal(err) - } - - // Add a follower to the group. Do this in a clandestine way for simplicity. - // Also set up a snapshot that will be sent to the follower. - n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) - s1.snapshot = pb.Snapshot{ - Metadata: pb.SnapshotMetadata{ - ConfState: pb.ConfState{Voters: []uint64{1, 2}}, - Index: s1.lastIndex(), - Term: s1.ents[len(s1.ents)-1].Term, - }, - } - - noMessage := pb.MessageType(-1) - mustSend := func(from, to *raft, typ pb.MessageType) pb.Message { - t.Helper() - for i, msg := range from.msgs { - if msg.From != from.id || msg.To != to.id || msg.Type != typ { - continue - } - t.Log(DescribeMessage(msg, func([]byte) string { return "" })) - if len(msg.Entries) > 0 { - t.Log(DescribeEntries(msg.Entries, func(b []byte) string { return string(b) })) - } - if err := to.Step(msg); err != nil { - t.Fatalf("%v: %s", msg, err) - } - from.msgs = append(from.msgs[:i], from.msgs[i+1:]...) - return msg - } - if typ == noMessage { - if len(from.msgs) == 0 { - return pb.Message{} - } - t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs) - } - t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs) - return pb.Message{} // unreachable - } - - // Create the follower that will receive the snapshot. - s2 := NewMemoryStorage() - n2 := newTestRaft(2, []uint64{1, 2}, 10, 1, s2) - - // Let the leader probe the follower. - if !n1.maybeSendAppend(2, true /* sendIfEmpty */) { - t.Fatalf("expected message to be sent") - } - if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 { - // For this test to work, the leader must not have anything to append - // to the follower right now. - t.Fatalf("unexpectedly appending entries %v", msg.Entries) - } - - // Follower rejects the append (because it doesn't have any log entries) - if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject { - t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint) - } - - const expIdx = 2 - // Leader sends snapshot due to RejectHint of zero (we set up the raft log - // to start at index 2). - if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx { - t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index) - } - - // n2 reacts to snapshot with MsgAppResp. - if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx { - t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index) - } - - // Leader sends MsgApp to communicate commit index. - if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx { - t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit) - } - - // Follower responds. - mustSend(n2, n1, pb.MsgAppResp) - - // Leader has correct state for follower. - pr := n1.prs.Progress[2] - if pr.State != tracker.StateReplicate { - t.Fatalf("unexpected state %v", pr) - } - if pr.Match != expIdx || pr.Next != expIdx+1 { - t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next) - } - - // Leader and follower are done. - mustSend(n1, n2, noMessage) - mustSend(n2, n1, noMessage) -} - func TestSnapshotAbort(t *testing.T) { storage := NewMemoryStorage() sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) diff --git a/raft/raftpb/confchange.go b/raft/raftpb/confchange.go index a91c18dc1..46a7a7021 100644 --- a/raft/raftpb/confchange.go +++ b/raft/raftpb/confchange.go @@ -16,6 +16,8 @@ package raftpb import ( "fmt" + "strconv" + "strings" "github.com/gogo/protobuf/proto" ) @@ -103,3 +105,66 @@ func (c *ConfChangeV2) LeaveJoint() bool { cpy.Context = nil return proto.Equal(&cpy, &ConfChangeV2{}) } + +// ConfChangesFromString parses a Space-delimited sequence of operations into a +// slice of ConfChangeSingle. The supported operations are: +// - vn: make n a voter, +// - ln: make n a learner, +// - rn: remove n, and +// - un: update n. +func ConfChangesFromString(s string) ([]ConfChangeSingle, error) { + var ccs []ConfChangeSingle + toks := strings.Split(strings.TrimSpace(s), " ") + if toks[0] == "" { + toks = nil + } + for _, tok := range toks { + if len(tok) < 2 { + return nil, fmt.Errorf("unknown token %s", tok) + } + var cc ConfChangeSingle + switch tok[0] { + case 'v': + cc.Type = ConfChangeAddNode + case 'l': + cc.Type = ConfChangeAddLearnerNode + case 'r': + cc.Type = ConfChangeRemoveNode + case 'u': + cc.Type = ConfChangeUpdateNode + default: + return nil, fmt.Errorf("unknown input: %s", tok) + } + id, err := strconv.ParseUint(tok[1:], 10, 64) + if err != nil { + return nil, err + } + cc.NodeID = id + ccs = append(ccs, cc) + } + return ccs, nil +} + +// ConfChangesToString is the inverse to ConfChangesFromString. +func ConfChangesToString(ccs []ConfChangeSingle) string { + var buf strings.Builder + for i, cc := range ccs { + if i > 0 { + buf.WriteByte(' ') + } + switch cc.Type { + case ConfChangeAddNode: + buf.WriteByte('v') + case ConfChangeAddLearnerNode: + buf.WriteByte('l') + case ConfChangeRemoveNode: + buf.WriteByte('r') + case ConfChangeUpdateNode: + buf.WriteByte('u') + default: + buf.WriteString("unknown") + } + fmt.Fprintf(&buf, "%d", cc.NodeID) + } + return buf.String() +} diff --git a/raft/rafttest/interaction_env.go b/raft/rafttest/interaction_env.go new file mode 100644 index 000000000..c0ec44f6f --- /dev/null +++ b/raft/rafttest/interaction_env.go @@ -0,0 +1,90 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "math" + "strings" + + "go.etcd.io/etcd/raft" + pb "go.etcd.io/etcd/raft/raftpb" +) + +// InteractionOpts groups the options for an InteractionEnv. +type InteractionOpts struct { + OnConfig func(*raft.Config) +} + +// A Node is a member of a raft group tested via an InteractionEnv. +type Node struct { + *raft.RawNode + Storage + + Config *raft.Config + History []pb.Snapshot +} + +// InteractionEnv facilitates testing of complex interactions between the +// members of a raft group. +type InteractionEnv struct { + Options *InteractionOpts + Nodes []Node + Messages []pb.Message // in-flight messages + + Output *RedirectLogger +} + +// NewInteractionEnv initializes an InteractionEnv. opts may be nil. +func NewInteractionEnv(opts *InteractionOpts) *InteractionEnv { + if opts == nil { + opts = &InteractionOpts{} + } + return &InteractionEnv{ + Options: opts, + Output: &RedirectLogger{ + Builder: &strings.Builder{}, + }, + } +} + +// Storage is the interface used by InteractionEnv. It is comprised of raft's +// Storage interface plus access to operations that maintain the log and drive +// the Ready handling loop. +type Storage interface { + raft.Storage + SetHardState(state pb.HardState) error + ApplySnapshot(pb.Snapshot) error + Compact(newFirstIndex uint64) error + Append([]pb.Entry) error +} + +// defaultRaftConfig sets up a *raft.Config with reasonable testing defaults. +// In particular, no limits are set. +func defaultRaftConfig(id uint64, applied uint64, s raft.Storage) *raft.Config { + return &raft.Config{ + ID: id, + Applied: applied, + ElectionTick: 3, + HeartbeatTick: 1, + Storage: s, + MaxSizePerMsg: math.MaxUint64, + MaxInflightMsgs: math.MaxInt32, + } +} + +func defaultEntryFormatter(b []byte) string { + return fmt.Sprintf("%q", b) +} diff --git a/raft/rafttest/interaction_env_handler.go b/raft/rafttest/interaction_env_handler.go new file mode 100644 index 000000000..a6e51d372 --- /dev/null +++ b/raft/rafttest/interaction_env_handler.go @@ -0,0 +1,159 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strconv" + "testing" + + "github.com/cockroachdb/datadriven" +) + +// Handle is the entrypoint for data-driven interaction testing. Commands and +// parameters are parsed from the supplied TestData. Errors during data parsing +// are reported via the supplied *testing.T; errors from the raft nodes and the +// storage engine are reported to the output buffer. +func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { + env.Output.Reset() + var err error + switch d.Cmd { + case "add-nodes": + // Example: + // + // add-nodes voters=(1 2 3) learners=(4 5) index=2 content=foo + err = env.handleAddNodes(t, d) + case "campaign": + // Example: + // + // campaign + err = env.handleCampaign(t, d) + case "compact": + // Example: + // + // compact + err = env.handleCompact(t, d) + case "deliver-msgs": + // Deliver the messages for a given recipient. + // + // Example: + // + // deliver-msgs + err = env.handleDeliverMsgs(t, d) + case "process-ready": + // Example: + // + // process-ready 3 + err = env.handleProcessReady(t, d) + case "log-level": + // Set the log level. NONE disables all output, including from the test + // harness (except errors). + // + // Example: + // + // log-level WARN + err = env.handleLogLevel(t, d) + case "raft-log": + // Print the Raft log. + // + // Example: + // + // raft-log 3 + err = env.handleRaftLog(t, d) + case "stabilize": + // Deliver messages to and run process-ready on the set of IDs until + // no more work is to be done. + // + // Example: + // + // stabilize 1 4 + err = env.handleStabilize(t, d) + case "status": + // Print Raft status. + // + // Example: + // + // status 5 + err = env.handleStatus(t, d) + case "tick-heartbeat": + // Tick a heartbeat interval. + // + // Example: + // + // tick-heartbeat 3 + err = env.handleTickHeartbeat(t, d) + case "propose-conf-change": + // Propose a configuration change. + // + // Example: + // + // propose-conf-change transition=explicit + // v1 v3 l4 r5 + // + // Example: + // + // propose-conf-change v1=true + // v5 + err = env.handleProposeConfChange(t, d) + default: + err = fmt.Errorf("unknown command") + } + if err != nil { + env.Output.WriteString(err.Error()) + } + // NB: the highest log level suppresses all output, including that of the + // handlers. This comes in useful during setup which can be chatty. + // However, errors are always logged. + if env.Output.Len() == 0 { + return "ok" + } + if env.Output.Lvl == len(lvlNames)-1 { + if err != nil { + return err.Error() + } + return "ok (quiet)" + } + return env.Output.String() +} + +func firstAsInt(t *testing.T, d datadriven.TestData) int { + t.Helper() + n, err := strconv.Atoi(d.CmdArgs[0].Key) + if err != nil { + t.Fatal(err) + } + return n +} + +func firstAsNodeIdx(t *testing.T, d datadriven.TestData) int { + t.Helper() + n := firstAsInt(t, d) + return n - 1 +} + +func ints(t *testing.T, d datadriven.TestData) []int { + var ints []int + for i := 0; i < len(d.CmdArgs); i++ { + if len(d.CmdArgs[i].Vals) != 0 { + continue + } + n, err := strconv.Atoi(d.CmdArgs[i].Key) + if err != nil { + t.Fatal(err) + } + ints = append(ints, n) + } + return ints +} diff --git a/raft/rafttest/interaction_env_handler_add_nodes.go b/raft/rafttest/interaction_env_handler_add_nodes.go new file mode 100644 index 000000000..a68a2cbdd --- /dev/null +++ b/raft/rafttest/interaction_env_handler_add_nodes.go @@ -0,0 +1,136 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "errors" + "fmt" + "reflect" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" + pb "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) error { + n := firstAsInt(t, d) + var snap pb.Snapshot + for _, arg := range d.CmdArgs[1:] { + for i := range arg.Vals { + switch arg.Key { + case "voters": + var id uint64 + arg.Scan(t, i, &id) + snap.Metadata.ConfState.Voters = append(snap.Metadata.ConfState.Voters, id) + case "learners": + var id uint64 + arg.Scan(t, i, &id) + snap.Metadata.ConfState.Learners = append(snap.Metadata.ConfState.Learners, id) + case "index": + arg.Scan(t, i, &snap.Metadata.Index) + case "content": + arg.Scan(t, i, &snap.Data) + } + } + } + return env.AddNodes(n, snap) +} + +type snapOverrideStorage struct { + Storage + snapshotOverride func() (pb.Snapshot, error) +} + +func (s snapOverrideStorage) Snapshot() (pb.Snapshot, error) { + if s.snapshotOverride != nil { + return s.snapshotOverride() + } + return s.Storage.Snapshot() +} + +var _ raft.Storage = snapOverrideStorage{} + +// AddNodes adds n new nodes initializes from the given snapshot (which may be +// empty). They will be assigned consecutive IDs. +func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error { + bootstrap := !reflect.DeepEqual(snap, pb.Snapshot{}) + for i := 0; i < n; i++ { + id := uint64(1 + len(env.Nodes)) + s := snapOverrideStorage{ + Storage: raft.NewMemoryStorage(), + // When you ask for a snapshot, you get the most recent snapshot. + // + // TODO(tbg): this is sort of clunky, but MemoryStorage itself will + // give you some fixed snapshot and also the snapshot changes + // whenever you compact the logs and vice versa, so it's all a bit + // awkward to use. + snapshotOverride: func() (pb.Snapshot, error) { + snaps := env.Nodes[int(id-1)].History + return snaps[len(snaps)-1], nil + }, + } + if bootstrap { + // NB: we could make this work with 1, but MemoryStorage just + // doesn't play well with that and it's not a loss of generality. + if snap.Metadata.Index <= 1 { + return errors.New("index must be specified as > 1 due to bootstrap") + } + snap.Metadata.Term = 1 + if err := s.ApplySnapshot(snap); err != nil { + return err + } + fi, err := s.FirstIndex() + if err != nil { + return err + } + // At the time of writing and for *MemoryStorage, applying a + // snapshot also truncates appropriately, but this would change with + // other storage engines potentially. + if exp := snap.Metadata.Index + 1; fi != exp { + return fmt.Errorf("failed to establish first index %d; got %d", exp, fi) + } + } + cfg := defaultRaftConfig(id, snap.Metadata.Index, s) + if env.Options.OnConfig != nil { + env.Options.OnConfig(cfg) + if cfg.ID != id { + // This could be supported but then we need to do more work + // translating back and forth -- not worth it. + return errors.New("OnConfig must not change the ID") + } + } + if cfg.Logger != nil { + return errors.New("OnConfig must not set Logger") + } + cfg.Logger = env.Output + + rn, err := raft.NewRawNode(cfg) + if err != nil { + return err + } + + node := Node{ + RawNode: rn, + // TODO(tbg): allow a more general Storage, as long as it also allows + // us to apply snapshots, append entries, and update the HardState. + Storage: s, + Config: cfg, + History: []pb.Snapshot{snap}, + } + env.Nodes = append(env.Nodes, node) + } + return nil +} diff --git a/raft/rafttest/interaction_env_handler_campaign.go b/raft/rafttest/interaction_env_handler_campaign.go new file mode 100644 index 000000000..bde5cc42e --- /dev/null +++ b/raft/rafttest/interaction_env_handler_campaign.go @@ -0,0 +1,31 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleCampaign(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.Campaign(t, idx) +} + +// Campaign the node at the given index. +func (env *InteractionEnv) Campaign(t *testing.T, idx int) error { + return env.Nodes[idx].Campaign() +} diff --git a/raft/rafttest/interaction_env_handler_compact.go b/raft/rafttest/interaction_env_handler_compact.go new file mode 100644 index 000000000..25fa1d22c --- /dev/null +++ b/raft/rafttest/interaction_env_handler_compact.go @@ -0,0 +1,40 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "strconv" + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleCompact(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + newFirstIndex, err := strconv.ParseUint(d.CmdArgs[1].Key, 10, 64) + if err != nil { + return err + } + return env.Compact(idx, newFirstIndex) +} + +// Compact truncates the log on the node at index idx so that the supplied new +// first index results. +func (env *InteractionEnv) Compact(idx int, newFirstIndex uint64) error { + if err := env.Nodes[idx].Compact(newFirstIndex); err != nil { + return err + } + return env.RaftLog(idx) +} diff --git a/raft/rafttest/interaction_env_handler_deliver_msgs.go b/raft/rafttest/interaction_env_handler_deliver_msgs.go new file mode 100644 index 000000000..c4eb675c5 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_deliver_msgs.go @@ -0,0 +1,58 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "errors" + "fmt" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData) error { + if len(env.Messages) == 0 { + return errors.New("no messages to deliver") + } + + msgs := env.Messages + env.Messages = nil + + return env.DeliverMsgs(msgs) +} + +// DeliverMsgs delivers the supplied messages typically taken from env.Messages. +func (env *InteractionEnv) DeliverMsgs(msgs []raftpb.Message) error { + for _, msg := range msgs { + toIdx := int(msg.To - 1) + var drop bool + if toIdx >= len(env.Nodes) { + // Drop messages for peers that don't exist yet. + drop = true + env.Output.WriteString("dropped: ") + } + fmt.Fprintln(env.Output, raft.DescribeMessage(msg, defaultEntryFormatter)) + if drop { + continue + } + if err := env.Nodes[toIdx].Step(msg); err != nil { + env.Output.WriteString(err.Error()) + continue + } + } + return nil +} diff --git a/raft/rafttest/interaction_env_handler_log_level.go b/raft/rafttest/interaction_env_handler_log_level.go new file mode 100644 index 000000000..a61ba37cd --- /dev/null +++ b/raft/rafttest/interaction_env_handler_log_level.go @@ -0,0 +1,37 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleLogLevel(t *testing.T, d datadriven.TestData) error { + return env.LogLevel(d.CmdArgs[0].Key) +} + +func (env *InteractionEnv) LogLevel(name string) error { + for i, s := range lvlNames { + if strings.ToLower(s) == strings.ToLower(name) { + env.Output.Lvl = i + return nil + } + } + return fmt.Errorf("log levels must be either of %v", lvlNames) +} diff --git a/raft/rafttest/interaction_env_handler_process_ready.go b/raft/rafttest/interaction_env_handler_process_ready.go new file mode 100644 index 000000000..ff5cf2bd9 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_process_ready.go @@ -0,0 +1,96 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/quorum" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleProcessReady(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.ProcessReady(idx) +} + +// ProcessReady runs Ready handling on the node with the given index. +func (env *InteractionEnv) ProcessReady(idx int) error { + // TODO(tbg): Allow simulating crashes here. + rn, s := env.Nodes[idx].RawNode, env.Nodes[idx].Storage + rd := rn.Ready() + // TODO(tbg): the order of operations here is not necessarily safe. See: + // https://github.com/etcd-io/etcd/pull/10861 + if !raft.IsEmptyHardState(rd.HardState) { + if err := s.SetHardState(rd.HardState); err != nil { + return err + } + } + if err := s.Append(rd.Entries); err != nil { + return err + } + if !raft.IsEmptySnap(rd.Snapshot) { + if err := s.ApplySnapshot(rd.Snapshot); err != nil { + return err + } + } + for _, ent := range rd.CommittedEntries { + var update []byte + switch ent.Type { + case raftpb.EntryConfChange: + var cc raftpb.ConfChange + if err := cc.Unmarshal(ent.Data); err != nil { + return err + } + update = cc.Context + rn.ApplyConfChange(cc) + case raftpb.EntryConfChangeV2: + var cc raftpb.ConfChangeV2 + if err := cc.Unmarshal(ent.Data); err != nil { + return err + } + rn.ApplyConfChange(cc) + update = cc.Context + default: + update = ent.Data + } + + // Record the new state by starting with the current state and applying + // the command. + lastSnap := env.Nodes[idx].History[len(env.Nodes[idx].History)-1] + var snap raftpb.Snapshot + snap.Data = append(snap.Data, lastSnap.Data...) + // NB: this hard-codes an "appender" state machine. + snap.Data = append(snap.Data, update...) + snap.Metadata.Index = ent.Index + snap.Metadata.Term = ent.Term + cfg := rn.Status().Config + snap.Metadata.ConfState = raftpb.ConfState{ + Voters: cfg.Voters[0].Slice(), + VotersOutgoing: cfg.Voters[1].Slice(), + Learners: quorum.MajorityConfig(cfg.Learners).Slice(), + LearnersNext: quorum.MajorityConfig(cfg.LearnersNext).Slice(), + } + env.Nodes[idx].History = append(env.Nodes[idx].History, snap) + } + for _, msg := range rd.Messages { + env.Messages = append(env.Messages, msg) + } + rn.Advance(rd) + env.Output.WriteString(raft.DescribeReady(rd, defaultEntryFormatter)) + return nil +} diff --git a/raft/rafttest/interaction_env_handler_propose_conf_change.go b/raft/rafttest/interaction_env_handler_propose_conf_change.go new file mode 100644 index 000000000..278339675 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_propose_conf_change.go @@ -0,0 +1,82 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strconv" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleProposeConfChange(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + var v1 bool + transition := raftpb.ConfChangeTransitionAuto + for _, arg := range d.CmdArgs[1:] { + for _, val := range arg.Vals { + switch arg.Key { + case "v1": + var err error + v1, err = strconv.ParseBool(val) + if err != nil { + return err + } + case "transition": + switch val { + case "auto": + transition = raftpb.ConfChangeTransitionAuto + case "implicit": + transition = raftpb.ConfChangeTransitionJointImplicit + case "explicit": + transition = raftpb.ConfChangeTransitionJointExplicit + default: + return fmt.Errorf("unknown transition %s", val) + } + default: + return fmt.Errorf("unknown command %s", arg.Key) + } + } + } + + ccs, err := raftpb.ConfChangesFromString(d.Input) + if err != nil { + return err + } + + var c raftpb.ConfChangeI + if v1 { + if len(ccs) > 1 || transition != raftpb.ConfChangeTransitionAuto { + return fmt.Errorf("v1 conf change can only have one operation and no transition") + } + c = raftpb.ConfChange{ + Type: ccs[0].Type, + NodeID: ccs[0].NodeID, + } + } else { + c = raftpb.ConfChangeV2{ + Transition: transition, + Changes: ccs, + } + } + return env.ProposeConfChange(idx, c) +} + +// ProposeConfChange proposes a configuration change on the node with the given index. +func (env *InteractionEnv) ProposeConfChange(idx int, c raftpb.ConfChangeI) error { + return env.Nodes[idx].ProposeConfChange(c) +} diff --git a/raft/rafttest/interaction_env_handler_raft_log.go b/raft/rafttest/interaction_env_handler_raft_log.go new file mode 100644 index 000000000..a99db29f3 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_raft_log.go @@ -0,0 +1,60 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "math" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" +) + +func (env *InteractionEnv) writeErr(err error) { + if err != nil { + env.Output.WriteString(err.Error()) + } +} + +func (env *InteractionEnv) handleRaftLog(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.RaftLog(idx) +} + +// RaftLog pretty prints the raft log to the output buffer. +func (env *InteractionEnv) RaftLog(idx int) error { + s := env.Nodes[idx].Storage + fi, err := s.FirstIndex() + if err != nil { + return err + } + li, err := s.LastIndex() + if err != nil { + return err + } + if li < fi { + // TODO(tbg): this is what MemoryStorage returns, but unclear if it's + // the "correct" thing to do. + fmt.Fprintf(env.Output, "log is empty: first index=%d, last index=%d", fi, li) + return nil + } + ents, err := s.Entries(fi, li+1, math.MaxUint64) + if err != nil { + return err + } + env.Output.WriteString(raft.DescribeEntries(ents, defaultEntryFormatter)) + return err +} diff --git a/raft/rafttest/interaction_env_handler_stabilize.go b/raft/rafttest/interaction_env_handler_stabilize.go new file mode 100644 index 000000000..16baa4373 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_stabilize.go @@ -0,0 +1,92 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "bufio" + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleStabilize(t *testing.T, d datadriven.TestData) error { + var idxs []int + for _, id := range ints(t, d) { + idxs = append(idxs, id-1) + } + return env.Stabilize(idxs...) +} + +// Stabilize repeatedly runs Ready handling on and message delivery to the set +// of nodes specified via the idxs slice until reaching a fixed point. +func (env *InteractionEnv) Stabilize(idxs ...int) error { + var nodes []Node + for _, idx := range idxs { + nodes = append(nodes, env.Nodes[idx]) + } + if len(nodes) == 0 { + nodes = env.Nodes + } + + withIndent := func(f func()) { + orig := env.Output.Builder + env.Output.Builder = &strings.Builder{} + f() + + scanner := bufio.NewScanner(strings.NewReader(env.Output.Builder.String())) + for scanner.Scan() { + orig.WriteString(" " + scanner.Text() + "\n") + } + env.Output.Builder = orig + } + + for { + done := true + for _, rn := range nodes { + if rn.HasReady() { + done = false + idx := int(rn.Status().ID - 1) + fmt.Fprintf(env.Output, "> %d handling Ready\n", idx+1) + withIndent(func() { env.ProcessReady(idx) }) + } + } + var msgs []raftpb.Message + for _, rn := range nodes { + msgs, env.Messages = splitMsgs(env.Messages, rn.Status().ID) + if len(msgs) > 0 { + fmt.Fprintf(env.Output, "> delivering messages\n") + withIndent(func() { env.DeliverMsgs(msgs) }) + done = false + } + if done { + return nil + } + } + } +} + +func splitMsgs(msgs []raftpb.Message, to uint64) (toMsgs []raftpb.Message, rmdr []raftpb.Message) { + for _, msg := range msgs { + if msg.To == to { + toMsgs = append(toMsgs, msg) + } else { + rmdr = append(rmdr, msg) + } + } + return toMsgs, rmdr +} diff --git a/raft/rafttest/interaction_env_handler_status.go b/raft/rafttest/interaction_env_handler_status.go new file mode 100644 index 000000000..3a0c953e5 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_status.go @@ -0,0 +1,42 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft/tracker" +) + +func (env *InteractionEnv) handleStatus(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.Status(idx) +} + +// Status pretty-prints the raft status for the node at the given index to the output +// buffer. +func (env *InteractionEnv) Status(idx int) error { + // TODO(tbg): actually print the full status. + st := env.Nodes[idx].Status() + m := tracker.ProgressMap{} + for id, pr := range st.Progress { + pr := pr // loop-local copy + m[id] = &pr + } + fmt.Fprint(env.Output, m) + return nil +} diff --git a/raft/rafttest/interaction_env_handler_tick_heartbeat.go b/raft/rafttest/interaction_env_handler_tick_heartbeat.go new file mode 100644 index 000000000..349ca78ef --- /dev/null +++ b/raft/rafttest/interaction_env_handler_tick_heartbeat.go @@ -0,0 +1,34 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleTickHeartbeat(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.Tick(idx, env.Nodes[idx].Config.HeartbeatTick) +} + +// Tick the node at the given index the given number of times. +func (env *InteractionEnv) Tick(idx int, num int) error { + for i := 0; i < num; i++ { + env.Nodes[idx].Tick() + } + return nil +} diff --git a/raft/rafttest/interaction_env_logger.go b/raft/rafttest/interaction_env_logger.go new file mode 100644 index 000000000..00fd57ee7 --- /dev/null +++ b/raft/rafttest/interaction_env_logger.go @@ -0,0 +1,107 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strings" + + "go.etcd.io/etcd/raft" +) + +type logLevels [6]string + +func (l logLevels) strToLev(s string) int { + for i, lvl := range l { + if strings.ToUpper(s) == lvl { + return i + } + } + panic(fmt.Sprintf("unknown level %q", s)) +} + +var lvlNames logLevels = [...]string{"DEBUG", "INFO", "WARN", "ERROR", "FATAL", "NONE"} + +type RedirectLogger struct { + *strings.Builder + Lvl int // 0 = DEBUG, 1 = INFO, 2 = WARNING, 3 = ERROR, 4 = FATAL, 5 = NONE +} + +var _ raft.Logger = (*RedirectLogger)(nil) + +func (l *RedirectLogger) printf(lvl int, format string, args ...interface{}) { + if l.Lvl <= lvl { + fmt.Fprint(l, lvlNames[lvl], " ") + fmt.Fprintf(l, format, args...) + if n := len(format); n > 0 && format[n-1] != '\n' { + l.WriteByte('\n') + } + } +} +func (l *RedirectLogger) print(lvl int, args ...interface{}) { + if l.Lvl <= lvl { + fmt.Fprint(l, lvlNames[lvl], " ") + fmt.Fprintln(l, args...) + } +} + +func (l *RedirectLogger) Debug(v ...interface{}) { + l.print(0, v...) +} + +func (l *RedirectLogger) Debugf(format string, v ...interface{}) { + l.printf(0, format, v...) +} + +func (l *RedirectLogger) Info(v ...interface{}) { + l.print(1, v...) +} + +func (l *RedirectLogger) Infof(format string, v ...interface{}) { + l.printf(1, format, v...) +} + +func (l *RedirectLogger) Warning(v ...interface{}) { + l.print(2, v...) +} + +func (l *RedirectLogger) Warningf(format string, v ...interface{}) { + l.printf(2, format, v...) +} + +func (l *RedirectLogger) Error(v ...interface{}) { + l.print(3, v...) +} + +func (l *RedirectLogger) Errorf(format string, v ...interface{}) { + l.printf(3, format, v...) +} + +func (l *RedirectLogger) Fatal(v ...interface{}) { + l.print(4, v...) +} + +func (l *RedirectLogger) Fatalf(format string, v ...interface{}) { + + l.printf(4, format, v...) +} + +func (l *RedirectLogger) Panic(v ...interface{}) { + l.print(4, v...) +} + +func (l *RedirectLogger) Panicf(format string, v ...interface{}) { + l.printf(4, format, v...) +} diff --git a/raft/rafttest/network.go b/raft/rafttest/network.go index ee30fc0c4..5e7cdf5ce 100644 --- a/raft/rafttest/network.go +++ b/raft/rafttest/network.go @@ -44,6 +44,7 @@ type network interface { } type raftNetwork struct { + rand *rand.Rand mu sync.Mutex disconnected map[uint64]bool dropmap map[conn]float64 @@ -62,6 +63,7 @@ type delay struct { func newRaftNetwork(nodes ...uint64) *raftNetwork { pn := &raftNetwork{ + rand: rand.New(rand.NewSource(1)), recvQueues: make(map[uint64]chan raftpb.Message), dropmap: make(map[conn]float64), delaymap: make(map[conn]delay), @@ -91,12 +93,12 @@ func (rn *raftNetwork) send(m raftpb.Message) { if to == nil { return } - if drop != 0 && rand.Float64() < drop { + if drop != 0 && rn.rand.Float64() < drop { return } // TODO: shall we dl without blocking the send call? - if dl.d != 0 && rand.Float64() < dl.rate { - rd := rand.Int63n(int64(dl.d)) + if dl.d != 0 && rn.rand.Float64() < dl.rate { + rd := rn.rand.Int63n(int64(dl.d)) time.Sleep(time.Duration(rd)) } diff --git a/raft/storage.go b/raft/storage.go index 14ad68608..6be574590 100644 --- a/raft/storage.go +++ b/raft/storage.go @@ -44,6 +44,8 @@ var ErrSnapshotTemporarilyUnavailable = errors.New("snapshot is temporarily unav // become inoperable and refuse to participate in elections; the // application is responsible for cleanup and recovery in this case. type Storage interface { + // TODO(tbg): split this into two interfaces, LogStorage and StateStorage. + // InitialState returns the saved HardState and ConfState information. InitialState() (pb.HardState, pb.ConfState, error) // Entries returns a slice of log entries in the range [lo,hi). diff --git a/raft/testdata/campaign.txt b/raft/testdata/campaign.txt new file mode 100644 index 000000000..19bb06b9e --- /dev/null +++ b/raft/testdata/campaign.txt @@ -0,0 +1,117 @@ +log-level info +---- +ok + +add-nodes 3 voters=(1,2,3) index=2 +---- +INFO 1 switched to configuration voters=(1 2 3) +INFO 1 became follower at term 0 +INFO newRaft 1 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] +INFO 2 switched to configuration voters=(1 2 3) +INFO 2 became follower at term 0 +INFO newRaft 2 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] +INFO 3 switched to configuration voters=(1 2 3) +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] + +campaign 1 +---- +INFO 1 is starting a new election at term 0 +INFO 1 became candidate at term 1 +INFO 1 received MsgVoteResp from 1 at term 1 +INFO 1 [logterm: 1, index: 2] sent MsgVote request to 2 at term 1 +INFO 1 [logterm: 1, index: 2] sent MsgVote request to 3 at term 1 + +stabilize +---- +> 1 handling Ready + Ready MustSync=true: + Lead:0 State:StateCandidate + HardState Term:1 Vote:1 Commit:2 + Messages: + 1->2 MsgVote Term:1 Log:1/2 + 1->3 MsgVote Term:1 Log:1/2 +> delivering messages + 1->2 MsgVote Term:1 Log:1/2 + INFO 2 [term: 0] received a MsgVote message with higher term from 1 [term: 1] + INFO 2 became follower at term 1 + INFO 2 [logterm: 1, index: 2, vote: 0] cast MsgVote for 1 [logterm: 1, index: 2] at term 1 +> delivering messages + 1->3 MsgVote Term:1 Log:1/2 + INFO 3 [term: 0] received a MsgVote message with higher term from 1 [term: 1] + INFO 3 became follower at term 1 + INFO 3 [logterm: 1, index: 2, vote: 0] cast MsgVote for 1 [logterm: 1, index: 2] at term 1 +> 2 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:2 + Messages: + 2->1 MsgVoteResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:2 + Messages: + 3->1 MsgVoteResp Term:1 Log:0/0 +> delivering messages + 2->1 MsgVoteResp Term:1 Log:0/0 + INFO 1 received MsgVoteResp from 2 at term 1 + INFO 1 has received 2 MsgVoteResp votes and 0 vote rejections + INFO 1 became leader at term 1 + 3->1 MsgVoteResp Term:1 Log:0/0 +> 1 handling Ready + Ready MustSync=true: + Lead:1 State:StateLeader + Entries: + 1/3 EntryNormal "" + Messages: + 1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] + 1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> delivering messages + 1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> delivering messages + 1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> 2 handling Ready + Ready MustSync=true: + Lead:1 State:StateFollower + Entries: + 1/3 EntryNormal "" + Messages: + 2->1 MsgAppResp Term:1 Log:0/3 +> 3 handling Ready + Ready MustSync=true: + Lead:1 State:StateFollower + Entries: + 1/3 EntryNormal "" + Messages: + 3->1 MsgAppResp Term:1 Log:0/3 +> delivering messages + 2->1 MsgAppResp Term:1 Log:0/3 + 3->1 MsgAppResp Term:1 Log:0/3 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:3 + CommittedEntries: + 1/3 EntryNormal "" + Messages: + 1->2 MsgApp Term:1 Log:1/3 Commit:3 + 1->3 MsgApp Term:1 Log:1/3 Commit:3 +> delivering messages + 1->2 MsgApp Term:1 Log:1/3 Commit:3 +> delivering messages + 1->3 MsgApp Term:1 Log:1/3 Commit:3 +> 2 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:3 + CommittedEntries: + 1/3 EntryNormal "" + Messages: + 2->1 MsgAppResp Term:1 Log:0/3 +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:3 + CommittedEntries: + 1/3 EntryNormal "" + Messages: + 3->1 MsgAppResp Term:1 Log:0/3 +> delivering messages + 2->1 MsgAppResp Term:1 Log:0/3 + 3->1 MsgAppResp Term:1 Log:0/3 diff --git a/raft/testdata/confchange_v1.txt b/raft/testdata/confchange_v1.txt new file mode 100644 index 000000000..57fb51f9a --- /dev/null +++ b/raft/testdata/confchange_v1.txt @@ -0,0 +1,78 @@ +add-nodes 1 voters=(1) index=2 +---- +INFO 1 switched to configuration voters=(1) +INFO 1 became follower at term 0 +INFO newRaft 1 [peers: [1], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] + +campaign 1 +---- +INFO 1 is starting a new election at term 0 +INFO 1 became candidate at term 1 +INFO 1 received MsgVoteResp from 1 at term 1 +INFO 1 became leader at term 1 + +propose-conf-change 1 +v2 v3 +---- +ok + +add-nodes 2 + +process-ready 1 +---- +INFO 2 switched to configuration voters=() +INFO 2 became follower at term 0 +INFO newRaft 2 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0] +INFO 3 switched to configuration voters=() +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0] + +stabilize 1 +---- +> 1 handling Ready + INFO 1 switched to configuration voters=(1 2 3)&&(1) autoleave + INFO initiating automatic transition out of joint configuration voters=(1 2 3)&&(1) autoleave + Ready MustSync=true: + Lead:1 State:StateLeader + HardState Term:1 Vote:1 Commit:4 + Entries: + 1/3 EntryNormal "" + 1/4 EntryConfChangeV2 v2 v3 + CommittedEntries: + 1/3 EntryNormal "" + 1/4 EntryConfChangeV2 v2 v3 +> 1 handling Ready + Ready MustSync=true: + Entries: + 1/5 EntryConfChangeV2 + +# NB: this test is broken from here on because the leader doesn't propagate the +# commit index proactively, see the buglet #11002. + +stabilize 2 +---- +ok + +stabilize 1 +---- +ok + +stabilize 2 +---- +ok + +stabilize 1 +---- +ok + +stabilize 2 +---- +ok + +stabilize 1 +---- +ok + +stabilize 2 +---- +ok diff --git a/raft/testdata/snapshot_succeed_via_app_resp.txt b/raft/testdata/snapshot_succeed_via_app_resp.txt new file mode 100644 index 000000000..ea82d1b29 --- /dev/null +++ b/raft/testdata/snapshot_succeed_via_app_resp.txt @@ -0,0 +1,141 @@ +# TestSnapshotSucceedViaAppResp regression tests the situation in which a snap- +# shot is sent to a follower at the most recent index (i.e. the snapshot index +# is the leader's last index is the committed index). In that situation, a bug +# in the past left the follower in probing status until the next log entry was +# committed. +# +# See https://github.com/etcd-io/etcd/pull/10308 for additional background. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with two nodes, but the config already has a third. +add-nodes 2 voters=(1,2,3) index=10 +---- +ok + +campaign 1 +---- +ok + +# Fully replicate everything, including the leader's empty index. +stabilize +---- +ok (quiet) + +compact 1 11 +---- +ok (quiet) + +# Drop inflight messages to n3. +deliver-msgs 3 +---- +ok (quiet) + +# Show the Raft log messages from now on. +log-level debug +---- +ok + +status 1 +---- +1: StateReplicate match=11 next=12 inactive +2: StateReplicate match=11 next=12 +3: StateProbe match=0 next=11 paused inactive + +# Add the node that will receive a snapshot (it has no state at all, does not +# even have a config). +add-nodes 1 +---- +INFO 3 switched to configuration voters=() +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0] + +# Time passes on the leader so that it will try the previously missing follower +# again. +tick-heartbeat 1 +---- +ok + +process-ready 1 +---- +Ready MustSync=false: +Messages: +1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 +1->3 MsgHeartbeat Term:1 Log:0/0 + +# Iterate until no more work is done by the new peer. It receives the heartbeat +# and responds. +stabilize 3 +---- +> delivering messages + 1->3 MsgHeartbeat Term:1 Log:0/0 + INFO 3 [term: 0] received a MsgHeartbeat message with higher term from 1 [term: 1] + INFO 3 became follower at term 1 +> 3 handling Ready + Ready MustSync=true: + Lead:1 State:StateFollower + HardState Term:1 Commit:0 + Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + +# The leader in turn will realize that n3 needs a snapshot, which it initiates. +stabilize 1 +---- +> delivering messages + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + DEBUG 1 [firstindex: 12, commit: 11] sent snapshot[index: 11, term: 1] to 3 [StateProbe match=0 next=11] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=11 paused pendingSnap=11] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] + +status 1 +---- +1: StateReplicate match=11 next=12 inactive +2: StateReplicate match=11 next=12 +3: StateSnapshot match=0 next=11 paused pendingSnap=11 + +# Follower applies the snapshot. Note how it reacts with a MsgAppResp upon completion. +# The snapshot fully catches the follower up (i.e. there are no more log entries it +# needs to apply after). The bug was that the leader failed to realize that the follower +# was now fully caught up. +stabilize 3 +---- +> delivering messages + 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] + INFO log [committed=0, applied=0, unstable.offset=1, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1] + INFO 3 switched to configuration voters=(1 2 3) + INFO 3 [commit: 11, lastindex: 11, lastterm: 1] restored snapshot [index: 11, term: 1] + INFO 3 [commit: 11] restored snapshot [index: 11, term: 1] +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Commit:11 + Snapshot Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] + Messages: + 3->1 MsgAppResp Term:1 Log:0/11 + +# The MsgAppResp lets the leader move the follower back to replicating state. +# Leader sends another MsgAppResp, to communicate the updated commit index. +stabilize 1 +---- +> delivering messages + 3->1 MsgAppResp Term:1 Log:0/11 + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=11] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/11 Commit:11 + +status 1 +---- +1: StateReplicate match=11 next=12 inactive +2: StateReplicate match=11 next=12 +3: StateReplicate match=11 next=12 + +stabilize +---- +ok diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index 697277b26..62c81f45a 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -52,6 +52,8 @@ type Progress struct { // RecentActive is true if the progress is recently active. Receiving any messages // from the corresponding follower indicates the progress is active. // RecentActive can be reset to false after an election timeout. + // + // TODO(tbg): the leader should always have this set to true. RecentActive bool // ProbeSent is used while this follower is in StateProbe. When ProbeSent is diff --git a/raft/util.go b/raft/util.go index 8394f647c..881a6e14e 100644 --- a/raft/util.go +++ b/raft/util.go @@ -17,6 +17,7 @@ package raft import ( "bytes" "fmt" + "strings" pb "go.etcd.io/etcd/raft/raftpb" ) @@ -60,6 +61,69 @@ func voteRespMsgType(msgt pb.MessageType) pb.MessageType { } } +func DescribeHardState(hs pb.HardState) string { + var buf strings.Builder + fmt.Fprintf(&buf, "Term:%d", hs.Term) + if hs.Vote != 0 { + fmt.Fprintf(&buf, " Vote:%d", hs.Vote) + } + fmt.Fprintf(&buf, " Commit:%d", hs.Commit) + return buf.String() +} + +func DescribeSoftState(ss SoftState) string { + return fmt.Sprintf("Lead:%d State:%s", ss.Lead, ss.RaftState) +} + +func DescribeConfState(state pb.ConfState) string { + return fmt.Sprintf( + "Voters:%v VotersOutgoing:%v Learners:%v LearnersNext:%v", + state.Voters, state.VotersOutgoing, state.Learners, state.LearnersNext, + ) +} + +func DescribeSnapshot(snap pb.Snapshot) string { + m := snap.Metadata + return fmt.Sprintf("Index:%d Term:%d ConfState:%s", m.Index, m.Term, DescribeConfState(m.ConfState)) +} + +func DescribeReady(rd Ready, f EntryFormatter) string { + var buf strings.Builder + if rd.SoftState != nil { + fmt.Fprint(&buf, DescribeSoftState(*rd.SoftState)) + buf.WriteByte('\n') + } + if !IsEmptyHardState(rd.HardState) { + fmt.Fprintf(&buf, "HardState %s", DescribeHardState(rd.HardState)) + buf.WriteByte('\n') + } + if len(rd.ReadStates) > 0 { + fmt.Fprintf(&buf, "ReadStates %v\n", rd.ReadStates) + } + if len(rd.Entries) > 0 { + buf.WriteString("Entries:\n") + fmt.Fprint(&buf, DescribeEntries(rd.Entries, f)) + } + if !IsEmptySnap(rd.Snapshot) { + fmt.Fprintf(&buf, "Snapshot %s\n", DescribeSnapshot(rd.Snapshot)) + } + if len(rd.CommittedEntries) > 0 { + buf.WriteString("CommittedEntries:\n") + fmt.Fprint(&buf, DescribeEntries(rd.CommittedEntries, f)) + } + if len(rd.Messages) > 0 { + buf.WriteString("Messages:\n") + for _, msg := range rd.Messages { + fmt.Fprint(&buf, DescribeMessage(msg, f)) + buf.WriteByte('\n') + } + } + if buf.Len() > 0 { + return fmt.Sprintf("Ready MustSync=%t:\n%s", rd.MustSync, buf.String()) + } + return "" +} + // EntryFormatter can be implemented by the application to provide human-readable formatting // of entry data. Nil is a valid EntryFormatter and will use a default format. type EntryFormatter func([]byte) string @@ -86,7 +150,7 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string { fmt.Fprintf(&buf, "]") } if !IsEmptySnap(m.Snapshot) { - fmt.Fprintf(&buf, " Snapshot:%v", m.Snapshot) + fmt.Fprintf(&buf, " Snapshot: %s", DescribeSnapshot(m.Snapshot)) } return buf.String() } @@ -100,13 +164,39 @@ func PayloadSize(e pb.Entry) int { // DescribeEntry returns a concise human-readable description of an // Entry for debugging. func DescribeEntry(e pb.Entry, f EntryFormatter) string { - var formatted string - if e.Type == pb.EntryNormal && f != nil { - formatted = f(e.Data) - } else { - formatted = fmt.Sprintf("%q", e.Data) + if f == nil { + f = func(data []byte) string { return fmt.Sprintf("%q", data) } } - return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted) + + formatConfChange := func(cc pb.ConfChangeI) string { + // TODO(tbg): give the EntryFormatter a type argument so that it gets + // a chance to expose the Context. + return pb.ConfChangesToString(cc.AsV2().Changes) + } + + var formatted string + switch e.Type { + case pb.EntryNormal: + formatted = f(e.Data) + case pb.EntryConfChange: + var cc pb.ConfChange + if err := cc.Unmarshal(e.Data); err != nil { + formatted = err.Error() + } else { + formatted = formatConfChange(cc) + } + case pb.EntryConfChangeV2: + var cc pb.ConfChangeV2 + if err := cc.Unmarshal(e.Data); err != nil { + formatted = err.Error() + } else { + formatted = formatConfChange(cc) + } + } + if formatted != "" { + formatted = " " + formatted + } + return fmt.Sprintf("%d/%d %s%s", e.Term, e.Index, e.Type, formatted) } // DescribeEntries calls DescribeEntry for each Entry, adding a newline to