raft/rafttest: introduce datadriven testing

It has often been tedious to test the interactions between multi-member
Raft groups, especially when many steps were required to reach a certain
scenario. Often, this boilerplate was as boring as it is hard to write
and hard to maintain, making it attractive to resort to shortcuts
whenever possible, which in turn tended to undercut how meaningful and
maintainable the tests ended up being - that is, if the tests were even
written, which sometimes they weren't.

This change introduces a datadriven framework specifically for testing
deterministically the interaction between multiple members of a raft group
with the goal of reducing the friction for writing these tests to near
zero.

In the near term, this will be used to add thorough testing for joint
consensus (which is already available today, but wildly undertested),
but just converting an existing test into this framework has shown that
the concise representation and built-in inspection of log messages
highlights unexpected behavior much more readily than the previous unit
tests did (the test in question is `snapshot_succeed_via_app_resp`; the
reader is invited to compare the old and new version of it).

The main building block is `InteractionEnv`, which holds on to the state
of the whole system and exposes various relevant methods for
manipulating it, including but not limited to adding nodes, delivering
and dropping messages, and proposing configuration changes. All of this
is extensible so that in the future I hope to use it to explore the
phenomena discussed in

https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263

which requires injecting appropriate "crash points" in the Ready
handling loop. Discussions of the "what if X happened in state Y"
can quickly be made concrete by "scripting up an interaction test".

Additionally, this framework is intentionally not kept internal to the
raft package.. Though this is in its infancy, a goal is that it should
be possible for a suite of interaction tests to allow applications to
validate that their Storage implementation behaves accordingly, simply
by running a raft-provided interaction suite against their Storage.
release-3.5
Tobias Schottdorf 2019-08-12 11:13:51 +02:00
parent f57c16c271
commit e8090e57a2
25 changed files with 1613 additions and 132 deletions

31
raft/interaction_test.go Normal file
View File

@ -0,0 +1,31 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raft_test
import (
"testing"
"github.com/cockroachdb/datadriven"
"go.etcd.io/etcd/raft/rafttest"
)
func TestInteraction(t *testing.T) {
datadriven.Walk(t, "testdata", func(t *testing.T, path string) {
env := rafttest.NewInteractionEnv(nil)
datadriven.RunTest(t, path, func(d *datadriven.TestData) string {
return env.Handle(t, *d)
})
})
}

View File

@ -20,6 +20,7 @@ import (
"fmt"
"math"
"math/rand"
"sort"
"strings"
"sync"
"time"
@ -529,7 +530,6 @@ func (r *raft) bcastAppend() {
if id == r.id {
return
}
r.sendAppend(id)
})
}
@ -795,7 +795,16 @@ func (r *raft) campaign(t CampaignType) {
}
return
}
for id := range r.prs.Voters.IDs() {
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue
}

View File

@ -18,7 +18,6 @@ import (
"testing"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)
var (
@ -112,125 +111,6 @@ func TestSnapshotSucceed(t *testing.T) {
}
}
// TestSnapshotSucceedViaAppResp regression tests the situation in which a snap-
// shot is sent to a follower at the most recent index (i.e. the snapshot index
// is the leader's last index is the committed index). In that situation, a bug
// in the past left the follower in probing status until the next log entry was
// committed.
func TestSnapshotSucceedViaAppResp(t *testing.T) {
s1 := NewMemoryStorage()
// Create a single-node leader.
n1 := newTestRaft(1, []uint64{1}, 10, 1, s1)
n1.becomeCandidate()
n1.becomeLeader()
// We need to add a second empty entry so that we can truncate the first
// one away.
n1.Step(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{}}})
rd := newReady(n1, &SoftState{}, pb.HardState{})
s1.Append(rd.Entries)
s1.SetHardState(rd.HardState)
if exp, ci := s1.lastIndex(), n1.raftLog.committed; ci != exp {
t.Fatalf("unexpected committed index %d, wanted %d: %+v", ci, exp, s1)
}
// Force a log truncation.
if err := s1.Compact(1); err != nil {
t.Fatal(err)
}
// Add a follower to the group. Do this in a clandestine way for simplicity.
// Also set up a snapshot that will be sent to the follower.
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
s1.snapshot = pb.Snapshot{
Metadata: pb.SnapshotMetadata{
ConfState: pb.ConfState{Voters: []uint64{1, 2}},
Index: s1.lastIndex(),
Term: s1.ents[len(s1.ents)-1].Term,
},
}
noMessage := pb.MessageType(-1)
mustSend := func(from, to *raft, typ pb.MessageType) pb.Message {
t.Helper()
for i, msg := range from.msgs {
if msg.From != from.id || msg.To != to.id || msg.Type != typ {
continue
}
t.Log(DescribeMessage(msg, func([]byte) string { return "" }))
if len(msg.Entries) > 0 {
t.Log(DescribeEntries(msg.Entries, func(b []byte) string { return string(b) }))
}
if err := to.Step(msg); err != nil {
t.Fatalf("%v: %s", msg, err)
}
from.msgs = append(from.msgs[:i], from.msgs[i+1:]...)
return msg
}
if typ == noMessage {
if len(from.msgs) == 0 {
return pb.Message{}
}
t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs)
}
t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs)
return pb.Message{} // unreachable
}
// Create the follower that will receive the snapshot.
s2 := NewMemoryStorage()
n2 := newTestRaft(2, []uint64{1, 2}, 10, 1, s2)
// Let the leader probe the follower.
if !n1.maybeSendAppend(2, true /* sendIfEmpty */) {
t.Fatalf("expected message to be sent")
}
if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 {
// For this test to work, the leader must not have anything to append
// to the follower right now.
t.Fatalf("unexpectedly appending entries %v", msg.Entries)
}
// Follower rejects the append (because it doesn't have any log entries)
if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject {
t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint)
}
const expIdx = 2
// Leader sends snapshot due to RejectHint of zero (we set up the raft log
// to start at index 2).
if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx {
t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index)
}
// n2 reacts to snapshot with MsgAppResp.
if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx {
t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index)
}
// Leader sends MsgApp to communicate commit index.
if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx {
t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit)
}
// Follower responds.
mustSend(n2, n1, pb.MsgAppResp)
// Leader has correct state for follower.
pr := n1.prs.Progress[2]
if pr.State != tracker.StateReplicate {
t.Fatalf("unexpected state %v", pr)
}
if pr.Match != expIdx || pr.Next != expIdx+1 {
t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next)
}
// Leader and follower are done.
mustSend(n1, n2, noMessage)
mustSend(n2, n1, noMessage)
}
func TestSnapshotAbort(t *testing.T) {
storage := NewMemoryStorage()
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)

View File

@ -16,6 +16,8 @@ package raftpb
import (
"fmt"
"strconv"
"strings"
"github.com/gogo/protobuf/proto"
)
@ -103,3 +105,66 @@ func (c *ConfChangeV2) LeaveJoint() bool {
cpy.Context = nil
return proto.Equal(&cpy, &ConfChangeV2{})
}
// ConfChangesFromString parses a Space-delimited sequence of operations into a
// slice of ConfChangeSingle. The supported operations are:
// - vn: make n a voter,
// - ln: make n a learner,
// - rn: remove n, and
// - un: update n.
func ConfChangesFromString(s string) ([]ConfChangeSingle, error) {
var ccs []ConfChangeSingle
toks := strings.Split(strings.TrimSpace(s), " ")
if toks[0] == "" {
toks = nil
}
for _, tok := range toks {
if len(tok) < 2 {
return nil, fmt.Errorf("unknown token %s", tok)
}
var cc ConfChangeSingle
switch tok[0] {
case 'v':
cc.Type = ConfChangeAddNode
case 'l':
cc.Type = ConfChangeAddLearnerNode
case 'r':
cc.Type = ConfChangeRemoveNode
case 'u':
cc.Type = ConfChangeUpdateNode
default:
return nil, fmt.Errorf("unknown input: %s", tok)
}
id, err := strconv.ParseUint(tok[1:], 10, 64)
if err != nil {
return nil, err
}
cc.NodeID = id
ccs = append(ccs, cc)
}
return ccs, nil
}
// ConfChangesToString is the inverse to ConfChangesFromString.
func ConfChangesToString(ccs []ConfChangeSingle) string {
var buf strings.Builder
for i, cc := range ccs {
if i > 0 {
buf.WriteByte(' ')
}
switch cc.Type {
case ConfChangeAddNode:
buf.WriteByte('v')
case ConfChangeAddLearnerNode:
buf.WriteByte('l')
case ConfChangeRemoveNode:
buf.WriteByte('r')
case ConfChangeUpdateNode:
buf.WriteByte('u')
default:
buf.WriteString("unknown")
}
fmt.Fprintf(&buf, "%d", cc.NodeID)
}
return buf.String()
}

View File

@ -0,0 +1,90 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"fmt"
"math"
"strings"
"go.etcd.io/etcd/raft"
pb "go.etcd.io/etcd/raft/raftpb"
)
// InteractionOpts groups the options for an InteractionEnv.
type InteractionOpts struct {
OnConfig func(*raft.Config)
}
// A Node is a member of a raft group tested via an InteractionEnv.
type Node struct {
*raft.RawNode
Storage
Config *raft.Config
History []pb.Snapshot
}
// InteractionEnv facilitates testing of complex interactions between the
// members of a raft group.
type InteractionEnv struct {
Options *InteractionOpts
Nodes []Node
Messages []pb.Message // in-flight messages
Output *RedirectLogger
}
// NewInteractionEnv initializes an InteractionEnv. opts may be nil.
func NewInteractionEnv(opts *InteractionOpts) *InteractionEnv {
if opts == nil {
opts = &InteractionOpts{}
}
return &InteractionEnv{
Options: opts,
Output: &RedirectLogger{
Builder: &strings.Builder{},
},
}
}
// Storage is the interface used by InteractionEnv. It is comprised of raft's
// Storage interface plus access to operations that maintain the log and drive
// the Ready handling loop.
type Storage interface {
raft.Storage
SetHardState(state pb.HardState) error
ApplySnapshot(pb.Snapshot) error
Compact(newFirstIndex uint64) error
Append([]pb.Entry) error
}
// defaultRaftConfig sets up a *raft.Config with reasonable testing defaults.
// In particular, no limits are set.
func defaultRaftConfig(id uint64, applied uint64, s raft.Storage) *raft.Config {
return &raft.Config{
ID: id,
Applied: applied,
ElectionTick: 3,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: math.MaxUint64,
MaxInflightMsgs: math.MaxInt32,
}
}
func defaultEntryFormatter(b []byte) string {
return fmt.Sprintf("%q", b)
}

View File

@ -0,0 +1,159 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"fmt"
"strconv"
"testing"
"github.com/cockroachdb/datadriven"
)
// Handle is the entrypoint for data-driven interaction testing. Commands and
// parameters are parsed from the supplied TestData. Errors during data parsing
// are reported via the supplied *testing.T; errors from the raft nodes and the
// storage engine are reported to the output buffer.
func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
env.Output.Reset()
var err error
switch d.Cmd {
case "add-nodes":
// Example:
//
// add-nodes <number-of-nodes-to-add> voters=(1 2 3) learners=(4 5) index=2 content=foo
err = env.handleAddNodes(t, d)
case "campaign":
// Example:
//
// campaign <id-of-candidate>
err = env.handleCampaign(t, d)
case "compact":
// Example:
//
// compact <id> <new-first-index>
err = env.handleCompact(t, d)
case "deliver-msgs":
// Deliver the messages for a given recipient.
//
// Example:
//
// deliver-msgs <idx>
err = env.handleDeliverMsgs(t, d)
case "process-ready":
// Example:
//
// process-ready 3
err = env.handleProcessReady(t, d)
case "log-level":
// Set the log level. NONE disables all output, including from the test
// harness (except errors).
//
// Example:
//
// log-level WARN
err = env.handleLogLevel(t, d)
case "raft-log":
// Print the Raft log.
//
// Example:
//
// raft-log 3
err = env.handleRaftLog(t, d)
case "stabilize":
// Deliver messages to and run process-ready on the set of IDs until
// no more work is to be done.
//
// Example:
//
// stabilize 1 4
err = env.handleStabilize(t, d)
case "status":
// Print Raft status.
//
// Example:
//
// status 5
err = env.handleStatus(t, d)
case "tick-heartbeat":
// Tick a heartbeat interval.
//
// Example:
//
// tick-heartbeat 3
err = env.handleTickHeartbeat(t, d)
case "propose-conf-change":
// Propose a configuration change.
//
// Example:
//
// propose-conf-change transition=explicit
// v1 v3 l4 r5
//
// Example:
//
// propose-conf-change v1=true
// v5
err = env.handleProposeConfChange(t, d)
default:
err = fmt.Errorf("unknown command")
}
if err != nil {
env.Output.WriteString(err.Error())
}
// NB: the highest log level suppresses all output, including that of the
// handlers. This comes in useful during setup which can be chatty.
// However, errors are always logged.
if env.Output.Len() == 0 {
return "ok"
}
if env.Output.Lvl == len(lvlNames)-1 {
if err != nil {
return err.Error()
}
return "ok (quiet)"
}
return env.Output.String()
}
func firstAsInt(t *testing.T, d datadriven.TestData) int {
t.Helper()
n, err := strconv.Atoi(d.CmdArgs[0].Key)
if err != nil {
t.Fatal(err)
}
return n
}
func firstAsNodeIdx(t *testing.T, d datadriven.TestData) int {
t.Helper()
n := firstAsInt(t, d)
return n - 1
}
func ints(t *testing.T, d datadriven.TestData) []int {
var ints []int
for i := 0; i < len(d.CmdArgs); i++ {
if len(d.CmdArgs[i].Vals) != 0 {
continue
}
n, err := strconv.Atoi(d.CmdArgs[i].Key)
if err != nil {
t.Fatal(err)
}
ints = append(ints, n)
}
return ints
}

View File

@ -0,0 +1,136 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"errors"
"fmt"
"reflect"
"testing"
"github.com/cockroachdb/datadriven"
"go.etcd.io/etcd/raft"
pb "go.etcd.io/etcd/raft/raftpb"
)
func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) error {
n := firstAsInt(t, d)
var snap pb.Snapshot
for _, arg := range d.CmdArgs[1:] {
for i := range arg.Vals {
switch arg.Key {
case "voters":
var id uint64
arg.Scan(t, i, &id)
snap.Metadata.ConfState.Voters = append(snap.Metadata.ConfState.Voters, id)
case "learners":
var id uint64
arg.Scan(t, i, &id)
snap.Metadata.ConfState.Learners = append(snap.Metadata.ConfState.Learners, id)
case "index":
arg.Scan(t, i, &snap.Metadata.Index)
case "content":
arg.Scan(t, i, &snap.Data)
}
}
}
return env.AddNodes(n, snap)
}
type snapOverrideStorage struct {
Storage
snapshotOverride func() (pb.Snapshot, error)
}
func (s snapOverrideStorage) Snapshot() (pb.Snapshot, error) {
if s.snapshotOverride != nil {
return s.snapshotOverride()
}
return s.Storage.Snapshot()
}
var _ raft.Storage = snapOverrideStorage{}
// AddNodes adds n new nodes initializes from the given snapshot (which may be
// empty). They will be assigned consecutive IDs.
func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error {
bootstrap := !reflect.DeepEqual(snap, pb.Snapshot{})
for i := 0; i < n; i++ {
id := uint64(1 + len(env.Nodes))
s := snapOverrideStorage{
Storage: raft.NewMemoryStorage(),
// When you ask for a snapshot, you get the most recent snapshot.
//
// TODO(tbg): this is sort of clunky, but MemoryStorage itself will
// give you some fixed snapshot and also the snapshot changes
// whenever you compact the logs and vice versa, so it's all a bit
// awkward to use.
snapshotOverride: func() (pb.Snapshot, error) {
snaps := env.Nodes[int(id-1)].History
return snaps[len(snaps)-1], nil
},
}
if bootstrap {
// NB: we could make this work with 1, but MemoryStorage just
// doesn't play well with that and it's not a loss of generality.
if snap.Metadata.Index <= 1 {
return errors.New("index must be specified as > 1 due to bootstrap")
}
snap.Metadata.Term = 1
if err := s.ApplySnapshot(snap); err != nil {
return err
}
fi, err := s.FirstIndex()
if err != nil {
return err
}
// At the time of writing and for *MemoryStorage, applying a
// snapshot also truncates appropriately, but this would change with
// other storage engines potentially.
if exp := snap.Metadata.Index + 1; fi != exp {
return fmt.Errorf("failed to establish first index %d; got %d", exp, fi)
}
}
cfg := defaultRaftConfig(id, snap.Metadata.Index, s)
if env.Options.OnConfig != nil {
env.Options.OnConfig(cfg)
if cfg.ID != id {
// This could be supported but then we need to do more work
// translating back and forth -- not worth it.
return errors.New("OnConfig must not change the ID")
}
}
if cfg.Logger != nil {
return errors.New("OnConfig must not set Logger")
}
cfg.Logger = env.Output
rn, err := raft.NewRawNode(cfg)
if err != nil {
return err
}
node := Node{
RawNode: rn,
// TODO(tbg): allow a more general Storage, as long as it also allows
// us to apply snapshots, append entries, and update the HardState.
Storage: s,
Config: cfg,
History: []pb.Snapshot{snap},
}
env.Nodes = append(env.Nodes, node)
}
return nil
}

View File

@ -0,0 +1,31 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"testing"
"github.com/cockroachdb/datadriven"
)
func (env *InteractionEnv) handleCampaign(t *testing.T, d datadriven.TestData) error {
idx := firstAsNodeIdx(t, d)
return env.Campaign(t, idx)
}
// Campaign the node at the given index.
func (env *InteractionEnv) Campaign(t *testing.T, idx int) error {
return env.Nodes[idx].Campaign()
}

View File

@ -0,0 +1,40 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"strconv"
"testing"
"github.com/cockroachdb/datadriven"
)
func (env *InteractionEnv) handleCompact(t *testing.T, d datadriven.TestData) error {
idx := firstAsNodeIdx(t, d)
newFirstIndex, err := strconv.ParseUint(d.CmdArgs[1].Key, 10, 64)
if err != nil {
return err
}
return env.Compact(idx, newFirstIndex)
}
// Compact truncates the log on the node at index idx so that the supplied new
// first index results.
func (env *InteractionEnv) Compact(idx int, newFirstIndex uint64) error {
if err := env.Nodes[idx].Compact(newFirstIndex); err != nil {
return err
}
return env.RaftLog(idx)
}

View File

@ -0,0 +1,58 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"errors"
"fmt"
"testing"
"github.com/cockroachdb/datadriven"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
)
func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData) error {
if len(env.Messages) == 0 {
return errors.New("no messages to deliver")
}
msgs := env.Messages
env.Messages = nil
return env.DeliverMsgs(msgs)
}
// DeliverMsgs delivers the supplied messages typically taken from env.Messages.
func (env *InteractionEnv) DeliverMsgs(msgs []raftpb.Message) error {
for _, msg := range msgs {
toIdx := int(msg.To - 1)
var drop bool
if toIdx >= len(env.Nodes) {
// Drop messages for peers that don't exist yet.
drop = true
env.Output.WriteString("dropped: ")
}
fmt.Fprintln(env.Output, raft.DescribeMessage(msg, defaultEntryFormatter))
if drop {
continue
}
if err := env.Nodes[toIdx].Step(msg); err != nil {
env.Output.WriteString(err.Error())
continue
}
}
return nil
}

View File

@ -0,0 +1,37 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"fmt"
"strings"
"testing"
"github.com/cockroachdb/datadriven"
)
func (env *InteractionEnv) handleLogLevel(t *testing.T, d datadriven.TestData) error {
return env.LogLevel(d.CmdArgs[0].Key)
}
func (env *InteractionEnv) LogLevel(name string) error {
for i, s := range lvlNames {
if strings.ToLower(s) == strings.ToLower(name) {
env.Output.Lvl = i
return nil
}
}
return fmt.Errorf("log levels must be either of %v", lvlNames)
}

View File

@ -0,0 +1,96 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"testing"
"github.com/cockroachdb/datadriven"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/quorum"
"go.etcd.io/etcd/raft/raftpb"
)
func (env *InteractionEnv) handleProcessReady(t *testing.T, d datadriven.TestData) error {
idx := firstAsNodeIdx(t, d)
return env.ProcessReady(idx)
}
// ProcessReady runs Ready handling on the node with the given index.
func (env *InteractionEnv) ProcessReady(idx int) error {
// TODO(tbg): Allow simulating crashes here.
rn, s := env.Nodes[idx].RawNode, env.Nodes[idx].Storage
rd := rn.Ready()
// TODO(tbg): the order of operations here is not necessarily safe. See:
// https://github.com/etcd-io/etcd/pull/10861
if !raft.IsEmptyHardState(rd.HardState) {
if err := s.SetHardState(rd.HardState); err != nil {
return err
}
}
if err := s.Append(rd.Entries); err != nil {
return err
}
if !raft.IsEmptySnap(rd.Snapshot) {
if err := s.ApplySnapshot(rd.Snapshot); err != nil {
return err
}
}
for _, ent := range rd.CommittedEntries {
var update []byte
switch ent.Type {
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(ent.Data); err != nil {
return err
}
update = cc.Context
rn.ApplyConfChange(cc)
case raftpb.EntryConfChangeV2:
var cc raftpb.ConfChangeV2
if err := cc.Unmarshal(ent.Data); err != nil {
return err
}
rn.ApplyConfChange(cc)
update = cc.Context
default:
update = ent.Data
}
// Record the new state by starting with the current state and applying
// the command.
lastSnap := env.Nodes[idx].History[len(env.Nodes[idx].History)-1]
var snap raftpb.Snapshot
snap.Data = append(snap.Data, lastSnap.Data...)
// NB: this hard-codes an "appender" state machine.
snap.Data = append(snap.Data, update...)
snap.Metadata.Index = ent.Index
snap.Metadata.Term = ent.Term
cfg := rn.Status().Config
snap.Metadata.ConfState = raftpb.ConfState{
Voters: cfg.Voters[0].Slice(),
VotersOutgoing: cfg.Voters[1].Slice(),
Learners: quorum.MajorityConfig(cfg.Learners).Slice(),
LearnersNext: quorum.MajorityConfig(cfg.LearnersNext).Slice(),
}
env.Nodes[idx].History = append(env.Nodes[idx].History, snap)
}
for _, msg := range rd.Messages {
env.Messages = append(env.Messages, msg)
}
rn.Advance(rd)
env.Output.WriteString(raft.DescribeReady(rd, defaultEntryFormatter))
return nil
}

View File

@ -0,0 +1,82 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"fmt"
"strconv"
"testing"
"github.com/cockroachdb/datadriven"
"go.etcd.io/etcd/raft/raftpb"
)
func (env *InteractionEnv) handleProposeConfChange(t *testing.T, d datadriven.TestData) error {
idx := firstAsNodeIdx(t, d)
var v1 bool
transition := raftpb.ConfChangeTransitionAuto
for _, arg := range d.CmdArgs[1:] {
for _, val := range arg.Vals {
switch arg.Key {
case "v1":
var err error
v1, err = strconv.ParseBool(val)
if err != nil {
return err
}
case "transition":
switch val {
case "auto":
transition = raftpb.ConfChangeTransitionAuto
case "implicit":
transition = raftpb.ConfChangeTransitionJointImplicit
case "explicit":
transition = raftpb.ConfChangeTransitionJointExplicit
default:
return fmt.Errorf("unknown transition %s", val)
}
default:
return fmt.Errorf("unknown command %s", arg.Key)
}
}
}
ccs, err := raftpb.ConfChangesFromString(d.Input)
if err != nil {
return err
}
var c raftpb.ConfChangeI
if v1 {
if len(ccs) > 1 || transition != raftpb.ConfChangeTransitionAuto {
return fmt.Errorf("v1 conf change can only have one operation and no transition")
}
c = raftpb.ConfChange{
Type: ccs[0].Type,
NodeID: ccs[0].NodeID,
}
} else {
c = raftpb.ConfChangeV2{
Transition: transition,
Changes: ccs,
}
}
return env.ProposeConfChange(idx, c)
}
// ProposeConfChange proposes a configuration change on the node with the given index.
func (env *InteractionEnv) ProposeConfChange(idx int, c raftpb.ConfChangeI) error {
return env.Nodes[idx].ProposeConfChange(c)
}

View File

@ -0,0 +1,60 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"fmt"
"math"
"testing"
"github.com/cockroachdb/datadriven"
"go.etcd.io/etcd/raft"
)
func (env *InteractionEnv) writeErr(err error) {
if err != nil {
env.Output.WriteString(err.Error())
}
}
func (env *InteractionEnv) handleRaftLog(t *testing.T, d datadriven.TestData) error {
idx := firstAsNodeIdx(t, d)
return env.RaftLog(idx)
}
// RaftLog pretty prints the raft log to the output buffer.
func (env *InteractionEnv) RaftLog(idx int) error {
s := env.Nodes[idx].Storage
fi, err := s.FirstIndex()
if err != nil {
return err
}
li, err := s.LastIndex()
if err != nil {
return err
}
if li < fi {
// TODO(tbg): this is what MemoryStorage returns, but unclear if it's
// the "correct" thing to do.
fmt.Fprintf(env.Output, "log is empty: first index=%d, last index=%d", fi, li)
return nil
}
ents, err := s.Entries(fi, li+1, math.MaxUint64)
if err != nil {
return err
}
env.Output.WriteString(raft.DescribeEntries(ents, defaultEntryFormatter))
return err
}

View File

@ -0,0 +1,92 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"bufio"
"fmt"
"strings"
"testing"
"github.com/cockroachdb/datadriven"
"go.etcd.io/etcd/raft/raftpb"
)
func (env *InteractionEnv) handleStabilize(t *testing.T, d datadriven.TestData) error {
var idxs []int
for _, id := range ints(t, d) {
idxs = append(idxs, id-1)
}
return env.Stabilize(idxs...)
}
// Stabilize repeatedly runs Ready handling on and message delivery to the set
// of nodes specified via the idxs slice until reaching a fixed point.
func (env *InteractionEnv) Stabilize(idxs ...int) error {
var nodes []Node
for _, idx := range idxs {
nodes = append(nodes, env.Nodes[idx])
}
if len(nodes) == 0 {
nodes = env.Nodes
}
withIndent := func(f func()) {
orig := env.Output.Builder
env.Output.Builder = &strings.Builder{}
f()
scanner := bufio.NewScanner(strings.NewReader(env.Output.Builder.String()))
for scanner.Scan() {
orig.WriteString(" " + scanner.Text() + "\n")
}
env.Output.Builder = orig
}
for {
done := true
for _, rn := range nodes {
if rn.HasReady() {
done = false
idx := int(rn.Status().ID - 1)
fmt.Fprintf(env.Output, "> %d handling Ready\n", idx+1)
withIndent(func() { env.ProcessReady(idx) })
}
}
var msgs []raftpb.Message
for _, rn := range nodes {
msgs, env.Messages = splitMsgs(env.Messages, rn.Status().ID)
if len(msgs) > 0 {
fmt.Fprintf(env.Output, "> delivering messages\n")
withIndent(func() { env.DeliverMsgs(msgs) })
done = false
}
if done {
return nil
}
}
}
}
func splitMsgs(msgs []raftpb.Message, to uint64) (toMsgs []raftpb.Message, rmdr []raftpb.Message) {
for _, msg := range msgs {
if msg.To == to {
toMsgs = append(toMsgs, msg)
} else {
rmdr = append(rmdr, msg)
}
}
return toMsgs, rmdr
}

View File

@ -0,0 +1,42 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"fmt"
"testing"
"github.com/cockroachdb/datadriven"
"go.etcd.io/etcd/raft/tracker"
)
func (env *InteractionEnv) handleStatus(t *testing.T, d datadriven.TestData) error {
idx := firstAsNodeIdx(t, d)
return env.Status(idx)
}
// Status pretty-prints the raft status for the node at the given index to the output
// buffer.
func (env *InteractionEnv) Status(idx int) error {
// TODO(tbg): actually print the full status.
st := env.Nodes[idx].Status()
m := tracker.ProgressMap{}
for id, pr := range st.Progress {
pr := pr // loop-local copy
m[id] = &pr
}
fmt.Fprint(env.Output, m)
return nil
}

View File

@ -0,0 +1,34 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"testing"
"github.com/cockroachdb/datadriven"
)
func (env *InteractionEnv) handleTickHeartbeat(t *testing.T, d datadriven.TestData) error {
idx := firstAsNodeIdx(t, d)
return env.Tick(idx, env.Nodes[idx].Config.HeartbeatTick)
}
// Tick the node at the given index the given number of times.
func (env *InteractionEnv) Tick(idx int, num int) error {
for i := 0; i < num; i++ {
env.Nodes[idx].Tick()
}
return nil
}

View File

@ -0,0 +1,107 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafttest
import (
"fmt"
"strings"
"go.etcd.io/etcd/raft"
)
type logLevels [6]string
func (l logLevels) strToLev(s string) int {
for i, lvl := range l {
if strings.ToUpper(s) == lvl {
return i
}
}
panic(fmt.Sprintf("unknown level %q", s))
}
var lvlNames logLevels = [...]string{"DEBUG", "INFO", "WARN", "ERROR", "FATAL", "NONE"}
type RedirectLogger struct {
*strings.Builder
Lvl int // 0 = DEBUG, 1 = INFO, 2 = WARNING, 3 = ERROR, 4 = FATAL, 5 = NONE
}
var _ raft.Logger = (*RedirectLogger)(nil)
func (l *RedirectLogger) printf(lvl int, format string, args ...interface{}) {
if l.Lvl <= lvl {
fmt.Fprint(l, lvlNames[lvl], " ")
fmt.Fprintf(l, format, args...)
if n := len(format); n > 0 && format[n-1] != '\n' {
l.WriteByte('\n')
}
}
}
func (l *RedirectLogger) print(lvl int, args ...interface{}) {
if l.Lvl <= lvl {
fmt.Fprint(l, lvlNames[lvl], " ")
fmt.Fprintln(l, args...)
}
}
func (l *RedirectLogger) Debug(v ...interface{}) {
l.print(0, v...)
}
func (l *RedirectLogger) Debugf(format string, v ...interface{}) {
l.printf(0, format, v...)
}
func (l *RedirectLogger) Info(v ...interface{}) {
l.print(1, v...)
}
func (l *RedirectLogger) Infof(format string, v ...interface{}) {
l.printf(1, format, v...)
}
func (l *RedirectLogger) Warning(v ...interface{}) {
l.print(2, v...)
}
func (l *RedirectLogger) Warningf(format string, v ...interface{}) {
l.printf(2, format, v...)
}
func (l *RedirectLogger) Error(v ...interface{}) {
l.print(3, v...)
}
func (l *RedirectLogger) Errorf(format string, v ...interface{}) {
l.printf(3, format, v...)
}
func (l *RedirectLogger) Fatal(v ...interface{}) {
l.print(4, v...)
}
func (l *RedirectLogger) Fatalf(format string, v ...interface{}) {
l.printf(4, format, v...)
}
func (l *RedirectLogger) Panic(v ...interface{}) {
l.print(4, v...)
}
func (l *RedirectLogger) Panicf(format string, v ...interface{}) {
l.printf(4, format, v...)
}

View File

@ -44,6 +44,7 @@ type network interface {
}
type raftNetwork struct {
rand *rand.Rand
mu sync.Mutex
disconnected map[uint64]bool
dropmap map[conn]float64
@ -62,6 +63,7 @@ type delay struct {
func newRaftNetwork(nodes ...uint64) *raftNetwork {
pn := &raftNetwork{
rand: rand.New(rand.NewSource(1)),
recvQueues: make(map[uint64]chan raftpb.Message),
dropmap: make(map[conn]float64),
delaymap: make(map[conn]delay),
@ -91,12 +93,12 @@ func (rn *raftNetwork) send(m raftpb.Message) {
if to == nil {
return
}
if drop != 0 && rand.Float64() < drop {
if drop != 0 && rn.rand.Float64() < drop {
return
}
// TODO: shall we dl without blocking the send call?
if dl.d != 0 && rand.Float64() < dl.rate {
rd := rand.Int63n(int64(dl.d))
if dl.d != 0 && rn.rand.Float64() < dl.rate {
rd := rn.rand.Int63n(int64(dl.d))
time.Sleep(time.Duration(rd))
}

View File

@ -44,6 +44,8 @@ var ErrSnapshotTemporarilyUnavailable = errors.New("snapshot is temporarily unav
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
// TODO(tbg): split this into two interfaces, LogStorage and StateStorage.
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).

117
raft/testdata/campaign.txt vendored Normal file
View File

@ -0,0 +1,117 @@
log-level info
----
ok
add-nodes 3 voters=(1,2,3) index=2
----
INFO 1 switched to configuration voters=(1 2 3)
INFO 1 became follower at term 0
INFO newRaft 1 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1]
INFO 2 switched to configuration voters=(1 2 3)
INFO 2 became follower at term 0
INFO newRaft 2 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1]
INFO 3 switched to configuration voters=(1 2 3)
INFO 3 became follower at term 0
INFO newRaft 3 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1]
campaign 1
----
INFO 1 is starting a new election at term 0
INFO 1 became candidate at term 1
INFO 1 received MsgVoteResp from 1 at term 1
INFO 1 [logterm: 1, index: 2] sent MsgVote request to 2 at term 1
INFO 1 [logterm: 1, index: 2] sent MsgVote request to 3 at term 1
stabilize
----
> 1 handling Ready
Ready MustSync=true:
Lead:0 State:StateCandidate
HardState Term:1 Vote:1 Commit:2
Messages:
1->2 MsgVote Term:1 Log:1/2
1->3 MsgVote Term:1 Log:1/2
> delivering messages
1->2 MsgVote Term:1 Log:1/2
INFO 2 [term: 0] received a MsgVote message with higher term from 1 [term: 1]
INFO 2 became follower at term 1
INFO 2 [logterm: 1, index: 2, vote: 0] cast MsgVote for 1 [logterm: 1, index: 2] at term 1
> delivering messages
1->3 MsgVote Term:1 Log:1/2
INFO 3 [term: 0] received a MsgVote message with higher term from 1 [term: 1]
INFO 3 became follower at term 1
INFO 3 [logterm: 1, index: 2, vote: 0] cast MsgVote for 1 [logterm: 1, index: 2] at term 1
> 2 handling Ready
Ready MustSync=true:
HardState Term:1 Vote:1 Commit:2
Messages:
2->1 MsgVoteResp Term:1 Log:0/0
> 3 handling Ready
Ready MustSync=true:
HardState Term:1 Vote:1 Commit:2
Messages:
3->1 MsgVoteResp Term:1 Log:0/0
> delivering messages
2->1 MsgVoteResp Term:1 Log:0/0
INFO 1 received MsgVoteResp from 2 at term 1
INFO 1 has received 2 MsgVoteResp votes and 0 vote rejections
INFO 1 became leader at term 1
3->1 MsgVoteResp Term:1 Log:0/0
> 1 handling Ready
Ready MustSync=true:
Lead:1 State:StateLeader
Entries:
1/3 EntryNormal ""
Messages:
1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""]
1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""]
> delivering messages
1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""]
> delivering messages
1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""]
> 2 handling Ready
Ready MustSync=true:
Lead:1 State:StateFollower
Entries:
1/3 EntryNormal ""
Messages:
2->1 MsgAppResp Term:1 Log:0/3
> 3 handling Ready
Ready MustSync=true:
Lead:1 State:StateFollower
Entries:
1/3 EntryNormal ""
Messages:
3->1 MsgAppResp Term:1 Log:0/3
> delivering messages
2->1 MsgAppResp Term:1 Log:0/3
3->1 MsgAppResp Term:1 Log:0/3
> 1 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:3
CommittedEntries:
1/3 EntryNormal ""
Messages:
1->2 MsgApp Term:1 Log:1/3 Commit:3
1->3 MsgApp Term:1 Log:1/3 Commit:3
> delivering messages
1->2 MsgApp Term:1 Log:1/3 Commit:3
> delivering messages
1->3 MsgApp Term:1 Log:1/3 Commit:3
> 2 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:3
CommittedEntries:
1/3 EntryNormal ""
Messages:
2->1 MsgAppResp Term:1 Log:0/3
> 3 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:3
CommittedEntries:
1/3 EntryNormal ""
Messages:
3->1 MsgAppResp Term:1 Log:0/3
> delivering messages
2->1 MsgAppResp Term:1 Log:0/3
3->1 MsgAppResp Term:1 Log:0/3

78
raft/testdata/confchange_v1.txt vendored Normal file
View File

@ -0,0 +1,78 @@
add-nodes 1 voters=(1) index=2
----
INFO 1 switched to configuration voters=(1)
INFO 1 became follower at term 0
INFO newRaft 1 [peers: [1], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1]
campaign 1
----
INFO 1 is starting a new election at term 0
INFO 1 became candidate at term 1
INFO 1 received MsgVoteResp from 1 at term 1
INFO 1 became leader at term 1
propose-conf-change 1
v2 v3
----
ok
add-nodes 2
process-ready 1
----
INFO 2 switched to configuration voters=()
INFO 2 became follower at term 0
INFO newRaft 2 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0]
INFO 3 switched to configuration voters=()
INFO 3 became follower at term 0
INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0]
stabilize 1
----
> 1 handling Ready
INFO 1 switched to configuration voters=(1 2 3)&&(1) autoleave
INFO initiating automatic transition out of joint configuration voters=(1 2 3)&&(1) autoleave
Ready MustSync=true:
Lead:1 State:StateLeader
HardState Term:1 Vote:1 Commit:4
Entries:
1/3 EntryNormal ""
1/4 EntryConfChangeV2 v2 v3
CommittedEntries:
1/3 EntryNormal ""
1/4 EntryConfChangeV2 v2 v3
> 1 handling Ready
Ready MustSync=true:
Entries:
1/5 EntryConfChangeV2
# NB: this test is broken from here on because the leader doesn't propagate the
# commit index proactively, see the buglet #11002.
stabilize 2
----
ok
stabilize 1
----
ok
stabilize 2
----
ok
stabilize 1
----
ok
stabilize 2
----
ok
stabilize 1
----
ok
stabilize 2
----
ok

View File

@ -0,0 +1,141 @@
# TestSnapshotSucceedViaAppResp regression tests the situation in which a snap-
# shot is sent to a follower at the most recent index (i.e. the snapshot index
# is the leader's last index is the committed index). In that situation, a bug
# in the past left the follower in probing status until the next log entry was
# committed.
#
# See https://github.com/etcd-io/etcd/pull/10308 for additional background.
# Turn off output during the setup of the test.
log-level none
----
ok
# Start with two nodes, but the config already has a third.
add-nodes 2 voters=(1,2,3) index=10
----
ok
campaign 1
----
ok
# Fully replicate everything, including the leader's empty index.
stabilize
----
ok (quiet)
compact 1 11
----
ok (quiet)
# Drop inflight messages to n3.
deliver-msgs 3
----
ok (quiet)
# Show the Raft log messages from now on.
log-level debug
----
ok
status 1
----
1: StateReplicate match=11 next=12 inactive
2: StateReplicate match=11 next=12
3: StateProbe match=0 next=11 paused inactive
# Add the node that will receive a snapshot (it has no state at all, does not
# even have a config).
add-nodes 1
----
INFO 3 switched to configuration voters=()
INFO 3 became follower at term 0
INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0]
# Time passes on the leader so that it will try the previously missing follower
# again.
tick-heartbeat 1
----
ok
process-ready 1
----
Ready MustSync=false:
Messages:
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11
1->3 MsgHeartbeat Term:1 Log:0/0
# Iterate until no more work is done by the new peer. It receives the heartbeat
# and responds.
stabilize 3
----
> delivering messages
1->3 MsgHeartbeat Term:1 Log:0/0
INFO 3 [term: 0] received a MsgHeartbeat message with higher term from 1 [term: 1]
INFO 3 became follower at term 1
> 3 handling Ready
Ready MustSync=true:
Lead:1 State:StateFollower
HardState Term:1 Commit:0
Messages:
3->1 MsgHeartbeatResp Term:1 Log:0/0
# The leader in turn will realize that n3 needs a snapshot, which it initiates.
stabilize 1
----
> delivering messages
3->1 MsgHeartbeatResp Term:1 Log:0/0
DEBUG 1 [firstindex: 12, commit: 11] sent snapshot[index: 11, term: 1] to 3 [StateProbe match=0 next=11]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=11 paused pendingSnap=11]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[]
status 1
----
1: StateReplicate match=11 next=12 inactive
2: StateReplicate match=11 next=12
3: StateSnapshot match=0 next=11 paused pendingSnap=11
# Follower applies the snapshot. Note how it reacts with a MsgAppResp upon completion.
# The snapshot fully catches the follower up (i.e. there are no more log entries it
# needs to apply after). The bug was that the leader failed to realize that the follower
# was now fully caught up.
stabilize 3
----
> delivering messages
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[]
INFO log [committed=0, applied=0, unstable.offset=1, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1]
INFO 3 switched to configuration voters=(1 2 3)
INFO 3 [commit: 11, lastindex: 11, lastterm: 1] restored snapshot [index: 11, term: 1]
INFO 3 [commit: 11] restored snapshot [index: 11, term: 1]
> 3 handling Ready
Ready MustSync=false:
HardState Term:1 Commit:11
Snapshot Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[]
Messages:
3->1 MsgAppResp Term:1 Log:0/11
# The MsgAppResp lets the leader move the follower back to replicating state.
# Leader sends another MsgAppResp, to communicate the updated commit index.
stabilize 1
----
> delivering messages
3->1 MsgAppResp Term:1 Log:0/11
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=11]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:1 Log:1/11 Commit:11
status 1
----
1: StateReplicate match=11 next=12 inactive
2: StateReplicate match=11 next=12
3: StateReplicate match=11 next=12
stabilize
----
ok

View File

@ -52,6 +52,8 @@ type Progress struct {
// RecentActive is true if the progress is recently active. Receiving any messages
// from the corresponding follower indicates the progress is active.
// RecentActive can be reset to false after an election timeout.
//
// TODO(tbg): the leader should always have this set to true.
RecentActive bool
// ProbeSent is used while this follower is in StateProbe. When ProbeSent is

View File

@ -17,6 +17,7 @@ package raft
import (
"bytes"
"fmt"
"strings"
pb "go.etcd.io/etcd/raft/raftpb"
)
@ -60,6 +61,69 @@ func voteRespMsgType(msgt pb.MessageType) pb.MessageType {
}
}
func DescribeHardState(hs pb.HardState) string {
var buf strings.Builder
fmt.Fprintf(&buf, "Term:%d", hs.Term)
if hs.Vote != 0 {
fmt.Fprintf(&buf, " Vote:%d", hs.Vote)
}
fmt.Fprintf(&buf, " Commit:%d", hs.Commit)
return buf.String()
}
func DescribeSoftState(ss SoftState) string {
return fmt.Sprintf("Lead:%d State:%s", ss.Lead, ss.RaftState)
}
func DescribeConfState(state pb.ConfState) string {
return fmt.Sprintf(
"Voters:%v VotersOutgoing:%v Learners:%v LearnersNext:%v",
state.Voters, state.VotersOutgoing, state.Learners, state.LearnersNext,
)
}
func DescribeSnapshot(snap pb.Snapshot) string {
m := snap.Metadata
return fmt.Sprintf("Index:%d Term:%d ConfState:%s", m.Index, m.Term, DescribeConfState(m.ConfState))
}
func DescribeReady(rd Ready, f EntryFormatter) string {
var buf strings.Builder
if rd.SoftState != nil {
fmt.Fprint(&buf, DescribeSoftState(*rd.SoftState))
buf.WriteByte('\n')
}
if !IsEmptyHardState(rd.HardState) {
fmt.Fprintf(&buf, "HardState %s", DescribeHardState(rd.HardState))
buf.WriteByte('\n')
}
if len(rd.ReadStates) > 0 {
fmt.Fprintf(&buf, "ReadStates %v\n", rd.ReadStates)
}
if len(rd.Entries) > 0 {
buf.WriteString("Entries:\n")
fmt.Fprint(&buf, DescribeEntries(rd.Entries, f))
}
if !IsEmptySnap(rd.Snapshot) {
fmt.Fprintf(&buf, "Snapshot %s\n", DescribeSnapshot(rd.Snapshot))
}
if len(rd.CommittedEntries) > 0 {
buf.WriteString("CommittedEntries:\n")
fmt.Fprint(&buf, DescribeEntries(rd.CommittedEntries, f))
}
if len(rd.Messages) > 0 {
buf.WriteString("Messages:\n")
for _, msg := range rd.Messages {
fmt.Fprint(&buf, DescribeMessage(msg, f))
buf.WriteByte('\n')
}
}
if buf.Len() > 0 {
return fmt.Sprintf("Ready MustSync=%t:\n%s", rd.MustSync, buf.String())
}
return "<empty Ready>"
}
// EntryFormatter can be implemented by the application to provide human-readable formatting
// of entry data. Nil is a valid EntryFormatter and will use a default format.
type EntryFormatter func([]byte) string
@ -86,7 +150,7 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string {
fmt.Fprintf(&buf, "]")
}
if !IsEmptySnap(m.Snapshot) {
fmt.Fprintf(&buf, " Snapshot:%v", m.Snapshot)
fmt.Fprintf(&buf, " Snapshot: %s", DescribeSnapshot(m.Snapshot))
}
return buf.String()
}
@ -100,13 +164,39 @@ func PayloadSize(e pb.Entry) int {
// DescribeEntry returns a concise human-readable description of an
// Entry for debugging.
func DescribeEntry(e pb.Entry, f EntryFormatter) string {
var formatted string
if e.Type == pb.EntryNormal && f != nil {
formatted = f(e.Data)
} else {
formatted = fmt.Sprintf("%q", e.Data)
if f == nil {
f = func(data []byte) string { return fmt.Sprintf("%q", data) }
}
return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted)
formatConfChange := func(cc pb.ConfChangeI) string {
// TODO(tbg): give the EntryFormatter a type argument so that it gets
// a chance to expose the Context.
return pb.ConfChangesToString(cc.AsV2().Changes)
}
var formatted string
switch e.Type {
case pb.EntryNormal:
formatted = f(e.Data)
case pb.EntryConfChange:
var cc pb.ConfChange
if err := cc.Unmarshal(e.Data); err != nil {
formatted = err.Error()
} else {
formatted = formatConfChange(cc)
}
case pb.EntryConfChangeV2:
var cc pb.ConfChangeV2
if err := cc.Unmarshal(e.Data); err != nil {
formatted = err.Error()
} else {
formatted = formatConfChange(cc)
}
}
if formatted != "" {
formatted = " " + formatted
}
return fmt.Sprintf("%d/%d %s%s", e.Term, e.Index, e.Type, formatted)
}
// DescribeEntries calls DescribeEntry for each Entry, adding a newline to