etcdserver: add SYNC request

release-2.0
Yicheng Qin 2014-09-15 17:35:02 -07:00
parent 6f17fa6c90
commit 023dc7cba2
18 changed files with 436 additions and 208 deletions

View File

@ -1,11 +1,9 @@
package etcdhttp
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
@ -14,8 +12,6 @@ import (
"strings"
"time"
crand "crypto/rand"
"github.com/coreos/etcd/elog"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
@ -71,7 +67,7 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()
rr, err := parseRequest(r, genID())
rr, err := parseRequest(r, etcdserver.GenID())
if err != nil {
writeError(w, err)
return
@ -139,20 +135,6 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
// genID generates a random id that is: n < 0 < n.
func genID() int64 {
for {
b := make([]byte, 8)
if _, err := io.ReadFull(crand.Reader, b); err != nil {
panic(err) // really bad stuff happened
}
n := int64(binary.BigEndian.Uint64(b))
if n != 0 {
return n
}
}
}
// parseRequest converts a received http.Request to a server Request,
// performing validation of supplied fields as appropriate.
// If any validation fails, an empty Request and non-nil error is returned.

View File

@ -42,6 +42,7 @@ type Request struct {
Recursive bool `protobuf:"varint,12,req,name=recursive" json:"recursive"`
Sorted bool `protobuf:"varint,13,req,name=sorted" json:"sorted"`
Quorum bool `protobuf:"varint,14,req,name=quorum" json:"quorum"`
Time int64 `protobuf:"varint,15,req,name=time" json:"time"`
XXX_unrecognized []byte `json:"-"`
}
@ -321,6 +322,21 @@ func (m *Request) Unmarshal(data []byte) error {
}
}
m.Quorum = bool(v != 0)
case 15:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Time |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
var sizeOfWire int
for {
@ -367,6 +383,7 @@ func (m *Request) Size() (n int) {
n += 2
n += 2
n += 2
n += 1 + sovEtcdserver(uint64(m.Time))
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -479,6 +496,9 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) {
data[i] = 0
}
i++
data[i] = 0x78
i++
i = encodeVarintEtcdserver(data, i, uint64(m.Time))
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}

View File

@ -8,18 +8,19 @@ option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
message Request {
required int64 id = 1 [(gogoproto.nullable) = false];
required int64 id = 1 [(gogoproto.nullable) = false];
required string method = 2 [(gogoproto.nullable) = false];
required string path = 3 [(gogoproto.nullable) = false];
required string val = 4 [(gogoproto.nullable) = false];
required bool dir = 5 [(gogoproto.nullable) = false];
required string prevValue = 6 [(gogoproto.nullable) = false];
required uint64 prevIndex = 7 [(gogoproto.nullable) = false];
required uint64 prevIndex = 7 [(gogoproto.nullable) = false];
required bool prevExists = 8 [(gogoproto.nullable) = true];
required int64 expiration = 9 [(gogoproto.nullable) = false];
required bool wait = 10 [(gogoproto.nullable) = false];
required uint64 since = 11 [(gogoproto.nullable) = false];
required uint64 since = 11 [(gogoproto.nullable) = false];
required bool recursive = 12 [(gogoproto.nullable) = false];
required bool sorted = 13 [(gogoproto.nullable) = false];
required bool quorum = 14 [(gogoproto.nullable) = false];
required int64 time = 15 [(gogoproto.nullable) = false];
}

0
etcdserver/etcdserverpb/genproto.sh Normal file → Executable file
View File

View File

@ -1,9 +1,14 @@
package etcdserver
import (
"encoding/binary"
"errors"
"io"
"log"
"time"
crand "crypto/rand"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
@ -12,13 +17,15 @@ import (
"github.com/coreos/etcd/wait"
)
const defaultSyncTimeout = time.Second
var (
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
)
type SendFunc func(m []raftpb.Message)
type SaveFunc func(st raftpb.State, ents []raftpb.Entry)
type SaveFunc func(st raftpb.HardState, ents []raftpb.Entry)
type Response struct {
Event *store.Event
@ -59,9 +66,10 @@ type EtcdServer struct {
// Save specifies the save function for saving ents to stable storage.
// Save MUST block until st and ents are on stable storage. If Send is
// nil, server will panic.
Save func(st raftpb.State, ents []raftpb.Entry)
Save func(st raftpb.HardState, ents []raftpb.Entry)
Ticker <-chan time.Time
Ticker <-chan time.Time
SyncTicker <-chan time.Time
}
// Start prepares and starts server in a new goroutine. It is no longer safe to
@ -77,12 +85,13 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
}
func (s *EtcdServer) run() {
var syncC <-chan time.Time
for {
select {
case <-s.Ticker:
s.Node.Tick()
case rd := <-s.Node.Ready():
s.Save(rd.State, rd.Entries)
s.Save(rd.HardState, rd.Entries)
s.Send(rd.Messages)
// TODO(bmizerany): do this in the background, but take
@ -95,6 +104,16 @@ func (s *EtcdServer) run() {
}
s.w.Trigger(r.Id, s.apply(r))
}
if rd.SoftState != nil {
if rd.RaftState == raft.StateLeader {
syncC = s.SyncTicker
} else {
syncC = nil
}
}
case <-syncC:
s.sync(defaultSyncTimeout)
case <-s.done:
return
}
@ -109,7 +128,7 @@ func (s *EtcdServer) Stop() {
}
// Do interprets r and performs an operation on s.Store according to r.Method
// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET with
// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
// Quorum == true, r will be sent through consensus before performing its
// respective operation. Do will block until an action is performed or there is
// an error.
@ -158,6 +177,29 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
}
}
// sync proposes a SYNC request and is non-blocking.
// This makes no guarantee that the request will be proposed or performed.
// The request will be cancelled after the given timeout.
func (s *EtcdServer) sync(timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
req := pb.Request{
Method: "SYNC",
Id: GenID(),
Time: time.Now().UnixNano(),
}
data, err := req.Marshal()
if err != nil {
log.Printf("marshal request %#v error: %v", req, err)
return
}
// There is no promise that node has leader when do SYNC request,
// so it uses goroutine to propose.
go func() {
s.Node.Propose(ctx, data)
cancel()
}()
}
// apply interprets r as a call to store.X and returns an Response interpreted from store.Event
func (s *EtcdServer) apply(r pb.Request) Response {
f := func(ev *store.Event, err error) Response {
@ -190,12 +232,30 @@ func (s *EtcdServer) apply(r pb.Request) Response {
}
case "QGET":
return f(s.Store.Get(r.Path, r.Recursive, r.Sorted))
case "SYNC":
s.Store.DeleteExpiredKeys(time.Unix(0, r.Time))
return Response{}
default:
// This should never be reached, but just in case:
return Response{err: ErrUnknownMethod}
}
}
// TODO: move the function to /id pkg maybe?
// GenID generates a random id that is not equal to 0.
func GenID() int64 {
for {
b := make([]byte, 8)
if _, err := io.ReadFull(crand.Reader, b); err != nil {
panic(err) // really bad stuff happened
}
n := int64(binary.BigEndian.Uint64(b))
if n != 0 {
return n
}
}
}
func getBool(v *bool) (vv bool, set bool) {
if v == nil {
return false, false

View File

@ -4,6 +4,8 @@ import (
"fmt"
"math/rand"
"reflect"
"runtime"
"sync"
"testing"
"time"
@ -109,6 +111,10 @@ func TestApply(t *testing.T) {
pb.Request{Method: "QGET", Id: 1},
Response{Event: &store.Event{}}, []string{"Get"},
},
{
pb.Request{Method: "SYNC", Id: 1},
Response{}, []string{"DeleteExpiredKeys"},
},
{
pb.Request{Method: "BADMETHOD", Id: 1},
Response{err: ErrUnknownMethod}, nil,
@ -159,7 +165,7 @@ func testServer(t *testing.T, ns int64) {
Node: n,
Store: store.New(),
Send: send,
Save: func(_ raftpb.State, _ []raftpb.Entry) {},
Save: func(_ raftpb.HardState, _ []raftpb.Entry) {},
Ticker: tk.C,
}
srv.Start()
@ -228,7 +234,7 @@ func TestDoProposal(t *testing.T) {
Node: n,
Store: st,
Send: func(_ []raftpb.Message) {},
Save: func(_ raftpb.State, _ []raftpb.Entry) {},
Save: func(_ raftpb.HardState, _ []raftpb.Entry) {},
Ticker: tk,
}
srv.Start()
@ -296,7 +302,7 @@ func TestDoProposalStopped(t *testing.T) {
Node: n,
Store: st,
Send: func(_ []raftpb.Message) {},
Save: func(_ raftpb.State, _ []raftpb.Entry) {},
Save: func(_ raftpb.HardState, _ []raftpb.Entry) {},
Ticker: tk,
}
srv.Start()
@ -318,6 +324,117 @@ func TestDoProposalStopped(t *testing.T) {
}
}
// TestSync tests sync 1. is nonblocking 2. sends out SYNC request.
func TestSync(t *testing.T) {
n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1)
n.Campaign(context.TODO())
select {
case <-n.Ready():
case <-time.After(time.Millisecond):
t.Fatalf("expect to receive ready within 1ms, but fail")
}
srv := &EtcdServer{
// TODO: use fake node for better testability
Node: n,
}
start := time.Now()
srv.sync(defaultSyncTimeout)
// check that sync is non-blocking
if d := time.Since(start); d > time.Millisecond {
t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond)
}
// give time for goroutine in sync to run
// TODO: use fake clock
var ready raft.Ready
select {
case ready = <-n.Ready():
case <-time.After(time.Millisecond):
t.Fatalf("expect to receive ready within 1ms, but fail")
}
if len(ready.CommittedEntries) != 1 {
t.Fatalf("len(CommittedEntries) = %d, want 1", len(ready.CommittedEntries))
}
e := ready.CommittedEntries[0]
var req pb.Request
if err := req.Unmarshal(e.Data); err != nil {
t.Fatalf("unmarshal error: %v", err)
}
if req.Method != "SYNC" {
t.Errorf("method = %s, want SYNC", req.Method)
}
}
// TestSyncFail tests the case that sync 1. is non-blocking 2. fails to
// propose SYNC request because there is no leader
func TestSyncFail(t *testing.T) {
// The node is run without Tick and Campaign, so it has no leader forever.
n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1)
select {
case <-n.Ready():
case <-time.After(time.Millisecond):
t.Fatalf("no ready")
}
srv := &EtcdServer{
// TODO: use fake node for better testability
Node: n,
}
routineN := runtime.NumGoroutine()
start := time.Now()
srv.sync(time.Millisecond)
// check that sync is non-blocking
if d := time.Since(start); d > time.Millisecond {
t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond)
}
// give time for goroutine in sync to cancel
// TODO: use fake clock
time.Sleep(2 * time.Millisecond)
if g := runtime.NumGoroutine(); g != routineN {
t.Errorf("NumGoroutine = %d, want %d", g, routineN)
}
select {
case g := <-n.Ready():
t.Errorf("ready = %+v, want no", g)
default:
}
}
func TestSyncTriggerDeleteExpriedKeys(t *testing.T) {
n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1)
n.Campaign(context.TODO())
st := &storeRecorder{}
syncInterval := 5 * time.Millisecond
syncTicker := time.NewTicker(syncInterval)
defer syncTicker.Stop()
srv := &EtcdServer{
// TODO: use fake node for better testability
Node: n,
Store: st,
Send: func(_ []raftpb.Message) {},
Save: func(_ raftpb.HardState, _ []raftpb.Entry) {},
SyncTicker: syncTicker.C,
}
srv.Start()
// give time for sync request to be proposed and performed
// TODO: use fake clock
time.Sleep(syncInterval + time.Millisecond)
srv.Stop()
action := st.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0] != "DeleteExpiredKeys" {
t.Errorf("action = %s, want DeleteExpiredKeys", action[0])
}
}
// TODO: test wait trigger correctness in multi-server case
func TestGetBool(t *testing.T) {
@ -342,48 +459,63 @@ func TestGetBool(t *testing.T) {
}
type storeRecorder struct {
sync.Mutex
action []string
}
func (s *storeRecorder) record(action string) {
s.Lock()
s.action = append(s.action, action)
s.Unlock()
}
func (s *storeRecorder) Action() []string {
s.Lock()
cpy := make([]string, len(s.action))
copy(cpy, s.action)
s.Unlock()
return cpy
}
func (s *storeRecorder) Version() int { return 0 }
func (s *storeRecorder) Index() uint64 { return 0 }
func (s *storeRecorder) Get(_ string, _, _ bool) (*store.Event, error) {
s.action = append(s.action, "Get")
s.record("Get")
return &store.Event{}, nil
}
func (s *storeRecorder) Set(_ string, _ bool, _ string, _ time.Time) (*store.Event, error) {
s.action = append(s.action, "Set")
s.record("Set")
return &store.Event{}, nil
}
func (s *storeRecorder) Update(_, _ string, _ time.Time) (*store.Event, error) {
s.action = append(s.action, "Update")
s.record("Update")
return &store.Event{}, nil
}
func (s *storeRecorder) Create(_ string, _ bool, _ string, _ bool, _ time.Time) (*store.Event, error) {
s.action = append(s.action, "Create")
s.record("Create")
return &store.Event{}, nil
}
func (s *storeRecorder) CompareAndSwap(_, _ string, _ uint64, _ string, _ time.Time) (*store.Event, error) {
s.action = append(s.action, "CompareAndSwap")
s.record("CompareAndSwap")
return &store.Event{}, nil
}
func (s *storeRecorder) Delete(_ string, _, _ bool) (*store.Event, error) {
s.action = append(s.action, "Delete")
s.record("Delete")
return &store.Event{}, nil
}
func (s *storeRecorder) CompareAndDelete(_, _ string, _ uint64) (*store.Event, error) {
s.action = append(s.action, "CompareAndDelete")
s.record("CompareAndDelete")
return &store.Event{}, nil
}
func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) {
s.action = append(s.action, "Watch")
s.record("Watch")
return &stubWatcher{}, nil
}
func (s *storeRecorder) Save() ([]byte, error) { return nil, nil }
func (s *storeRecorder) Recovery(b []byte) error { return nil }
func (s *storeRecorder) TotalTransactions() uint64 { return 0 }
func (s *storeRecorder) JsonStats() []byte { return nil }
func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {}
func (s *storeRecorder) Save() ([]byte, error) { return nil, nil }
func (s *storeRecorder) Recovery(b []byte) error { return nil }
func (s *storeRecorder) TotalTransactions() uint64 { return 0 }
func (s *storeRecorder) JsonStats() []byte { return nil }
func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {
s.record("DeleteExpiredKeys")
}
type stubWatcher struct{}

View File

@ -17,8 +17,8 @@ import (
"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
)
func nopSave(st raftpb.State, ents []raftpb.Entry) {}
func nopSend(m []raftpb.Message) {}
func nopSave(st raftpb.HardState, ents []raftpb.Entry) {}
func nopSend(m []raftpb.Message) {}
func TestSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
@ -30,7 +30,7 @@ func TestSet(t *testing.T) {
srv := &etcdserver.EtcdServer{
Store: store.New(),
Node: n,
Save: func(st raftpb.State, ents []raftpb.Entry) {},
Save: func(st raftpb.HardState, ents []raftpb.Entry) {},
Send: etcdserver.SendFunc(nopSend),
}
srv.Start()

11
main.go
View File

@ -76,11 +76,12 @@ func startEtcd() http.Handler {
n, w := startRaft(id, peers.IDs(), path.Join(*dir, "wal"))
s := &etcdserver.EtcdServer{
Store: store.New(),
Node: n,
Save: w.Save,
Send: etcdhttp.Sender(*peers),
Ticker: time.Tick(100 * time.Millisecond),
Store: store.New(),
Node: n,
Save: w.Save,
Send: etcdhttp.Sender(*peers),
Ticker: time.Tick(100 * time.Millisecond),
SyncTicker: time.Tick(500 * time.Millisecond),
}
s.Start()
return etcdhttp.NewHandler(s, *peers, *timeout)

View File

@ -4,10 +4,10 @@ import (
pb "github.com/coreos/etcd/raft/raftpb"
)
func applyToStore(ents []pb.Entry) {}
func sendMessages(msgs []pb.Message) {}
func saveStateToDisk(st pb.State) {}
func saveToDisk(ents []pb.Entry) {}
func applyToStore(ents []pb.Entry) {}
func sendMessages(msgs []pb.Message) {}
func saveStateToDisk(st pb.HardState) {}
func saveToDisk(ents []pb.Entry) {}
func Example_Node() {
n := Start(0, nil, 0, 0)
@ -15,13 +15,13 @@ func Example_Node() {
// stuff to n happens in other goroutines
// the last known state
var prev pb.State
var prev pb.HardState
for {
// ReadState blocks until there is new state ready.
rd := <-n.Ready()
if !isStateEqual(prev, rd.State) {
saveStateToDisk(rd.State)
prev = rd.State
if !isStateEqual(prev, rd.HardState) {
saveStateToDisk(rd.HardState)
prev = rd.HardState
}
saveToDisk(rd.Entries)

View File

@ -9,15 +9,34 @@ import (
)
var (
emptyState = pb.State{}
emptyState = pb.HardState{}
ErrStopped = errors.New("raft: stopped")
)
// Ready encapsulates the entries and messages that are ready to be saved to
// stable storage, committed or sent to other peers.
// SoftState provides state that is useful for logging and debugging.
// The state is volatile and does not need to be persisted to the WAL.
type SoftState struct {
Lead int64
RaftState StateType
}
func (a *SoftState) equal(b *SoftState) bool {
return a.Lead == b.Lead && a.RaftState == b.RaftState
}
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current state of a Node
pb.State
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
@ -33,16 +52,16 @@ type Ready struct {
Messages []pb.Message
}
func isStateEqual(a, b pb.State) bool {
func isStateEqual(a, b pb.HardState) bool {
return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
}
func IsEmptyState(st pb.State) bool {
func IsEmptyState(st pb.HardState) bool {
return isStateEqual(st, emptyState)
}
func (rd Ready) containsUpdates() bool {
return !IsEmptyState(rd.State) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
return rd.SoftState != nil || !IsEmptyState(rd.HardState) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
}
type Node struct {
@ -65,7 +84,7 @@ func Start(id int64, peers []int64, election, heartbeat int) Node {
// Restart is identical to Start but takes an initial State and a slice of
// entries. Generally this is used when restarting from a stable storage
// log.
func Restart(id int64, peers []int64, election, heartbeat int, st pb.State, ents []pb.Entry) Node {
func Restart(id int64, peers []int64, election, heartbeat int, st pb.HardState, ents []pb.Entry) Node {
n := newNode()
r := newRaft(id, peers, election, heartbeat)
r.loadState(st)
@ -92,13 +111,21 @@ func (n *Node) run(r *raft) {
var propc chan pb.Message
var readyc chan Ready
var lead int64
prevSt := r.State
lead := None
prevSoftSt := r.softState()
prevHardSt := r.HardState
for {
if lead != r.lead {
log.Printf("raft: leader changed from %#x to %#x", lead, r.lead)
lead = r.lead
rd := newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
if rd.SoftState != nil && lead != rd.SoftState.Lead {
log.Printf("raft: leader changed from %#x to %#x", lead, rd.SoftState.Lead)
lead = rd.SoftState.Lead
if r.hasLeader() {
propc = n.propc
} else {
@ -106,13 +133,6 @@ func (n *Node) run(r *raft) {
}
}
rd := newReady(r, prevSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
select {
case m := <-propc:
m.From = r.id
@ -122,11 +142,14 @@ func (n *Node) run(r *raft) {
case <-n.tickc:
r.tick()
case readyc <- rd:
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if !IsEmptyState(rd.HardState) {
prevHardSt = rd.HardState
}
r.raftLog.resetNextEnts()
r.raftLog.resetUnstable()
if !IsEmptyState(rd.State) {
prevSt = rd.State
}
r.msgs = nil
case <-n.done:
return
@ -175,14 +198,17 @@ func (n *Node) Ready() <-chan Ready {
return n.readyc
}
func newReady(r *raft, prev pb.State) Ready {
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEnts(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
if !isStateEqual(r.State, prev) {
rd.State = r.State
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
rd.SoftState = softSt
}
if !isStateEqual(r.HardState, prevHardSt) {
rd.HardState = r.HardState
}
return rd
}

View File

@ -121,15 +121,16 @@ func TestReadyContainUpdates(t *testing.T) {
wcontain bool
}{
{Ready{}, false},
{Ready{State: raftpb.State{Vote: 1}}, true},
{Ready{SoftState: &SoftState{Lead: 1}}, true},
{Ready{HardState: raftpb.HardState{Vote: 1}}, true},
{Ready{Entries: make([]raftpb.Entry, 1, 1)}, true},
{Ready{CommittedEntries: make([]raftpb.Entry, 1, 1)}, true},
{Ready{Messages: make([]raftpb.Message, 1, 1)}, true},
}
for i, tt := range tests {
if tt.rd.containsUpdates() != tt.wcontain {
t.Errorf("#%d: containUpdates = %v, want %v", i, tt.rd.containsUpdates(), tt.wcontain)
if g := tt.rd.containsUpdates(); g != tt.wcontain {
t.Errorf("#%d: containUpdates = %v, want %v", i, g, tt.wcontain)
}
}
}
@ -140,12 +141,13 @@ func TestNode(t *testing.T) {
wants := []Ready{
{
State: raftpb.State{Term: 1, Commit: 1},
SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
HardState: raftpb.HardState{Term: 1, Commit: 1},
Entries: []raftpb.Entry{{}, {Term: 1, Index: 1}},
CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}},
},
{
State: raftpb.State{Term: 1, Commit: 2},
HardState: raftpb.HardState{Term: 1, Commit: 2},
Entries: []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}},
},
@ -175,10 +177,10 @@ func TestNodeRestart(t *testing.T) {
{Term: 1, Index: 1},
{Term: 1, Index: 2, Data: []byte("foo")},
}
st := raftpb.State{Term: 1, Commit: 1}
st := raftpb.HardState{Term: 1, Commit: 1}
want := Ready{
State: emptyState,
HardState: emptyState,
// commit upto index commit index in st
CommittedEntries: entries[1 : st.Commit+1],
}
@ -197,13 +199,13 @@ func TestNodeRestart(t *testing.T) {
func TestIsStateEqual(t *testing.T) {
tests := []struct {
st raftpb.State
st raftpb.HardState
we bool
}{
{emptyState, true},
{raftpb.State{Vote: 1}, false},
{raftpb.State{Commit: 1}, false},
{raftpb.State{Term: 1}, false},
{raftpb.HardState{Vote: 1}, false},
{raftpb.HardState{Commit: 1}, false},
{raftpb.HardState{Term: 1}, false},
}
for i, tt := range tests {

View File

@ -8,7 +8,7 @@ import (
pb "github.com/coreos/etcd/raft/raftpb"
)
const None = 0
const None int64 = 0
type messageType int64
@ -43,20 +43,20 @@ func (mt messageType) String() string {
var errNoLeader = errors.New("no leader")
const (
stateFollower stateType = iota
stateCandidate
stateLeader
StateFollower StateType = iota
StateCandidate
StateLeader
)
type stateType int64
type StateType int64
var stmap = [...]string{
stateFollower: "stateFollower",
stateCandidate: "stateCandidate",
stateLeader: "stateLeader",
StateFollower: "StateFollower",
StateCandidate: "StateCandidate",
StateLeader: "StateLeader",
}
func (st stateType) String() string {
func (st StateType) String() string {
return stmap[int64(st)]
}
@ -87,7 +87,7 @@ func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
type raft struct {
pb.State
pb.HardState
id int64
@ -96,7 +96,7 @@ type raft struct {
prs map[int64]*progress
state stateType
state StateType
votes map[int64]bool
@ -133,14 +133,18 @@ func newRaft(id int64, peers []int64, election, heartbeat int) *raft {
func (r *raft) hasLeader() bool { return r.lead != None }
func (r *raft) softState() *SoftState {
return &SoftState{Lead: r.lead, RaftState: r.state}
}
func (r *raft) String() string {
s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term)
switch r.state {
case stateFollower:
case StateFollower:
s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead)
case stateCandidate:
case StateCandidate:
s += fmt.Sprintf(` votes="%v"`, r.votes)
case stateLeader:
case StateLeader:
s += fmt.Sprintf(` prs="%v"`, r.prs)
}
return s
@ -279,31 +283,31 @@ func (r *raft) becomeFollower(term int64, lead int64) {
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = stateFollower
r.state = StateFollower
}
func (r *raft) becomeCandidate() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == stateLeader {
if r.state == StateLeader {
panic("invalid transition [leader -> candidate]")
}
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = stateCandidate
r.state = StateCandidate
}
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == stateFollower {
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = stateLeader
r.state = StateLeader
r.appendEntry(pb.Entry{Data: nil})
}
@ -504,7 +508,7 @@ func (r *raft) loadEnts(ents []pb.Entry) {
r.raftLog.load(ents)
}
func (r *raft) loadState(state pb.State) {
func (r *raft) loadState(state pb.HardState) {
r.raftLog.committed = state.Commit
r.Term = state.Term
r.Vote = state.Vote

View File

@ -26,19 +26,19 @@ type Interface interface {
func TestLeaderElection(t *testing.T) {
tests := []struct {
*network
state stateType
state StateType
}{
{newNetwork(nil, nil, nil), stateLeader},
{newNetwork(nil, nil, nopStepper), stateLeader},
{newNetwork(nil, nopStepper, nopStepper), stateCandidate},
{newNetwork(nil, nopStepper, nopStepper, nil), stateCandidate},
{newNetwork(nil, nopStepper, nopStepper, nil, nil), stateLeader},
{newNetwork(nil, nil, nil), StateLeader},
{newNetwork(nil, nil, nopStepper), StateLeader},
{newNetwork(nil, nopStepper, nopStepper), StateCandidate},
{newNetwork(nil, nopStepper, nopStepper, nil), StateCandidate},
{newNetwork(nil, nopStepper, nopStepper, nil, nil), StateLeader},
// three logs further along than 0
{newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), stateFollower},
{newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), StateFollower},
// logs converge
{newNetwork(ents(1), nil, ents(2), ents(1), nil), stateLeader},
{newNetwork(ents(1), nil, ents(2), ents(1), nil), StateLeader},
}
for i, tt := range tests {
@ -226,13 +226,13 @@ func TestDuelingCandidates(t *testing.T) {
wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}, committed: 1}
tests := []struct {
sm *raft
state stateType
state StateType
term int64
raftLog *raftLog
}{
{a, stateFollower, 2, wlog},
{b, stateFollower, 2, wlog},
{c, stateFollower, 2, newLog()},
{a, StateFollower, 2, wlog},
{b, StateFollower, 2, wlog},
{c, StateFollower, 2, newLog()},
}
for i, tt := range tests {
@ -269,8 +269,8 @@ func TestCandidateConcede(t *testing.T) {
tt.send(pb.Message{From: 3, To: 3, Type: msgProp, Entries: []pb.Entry{{Data: data}}})
a := tt.peers[1].(*raft)
if g := a.state; g != stateFollower {
t.Errorf("state = %s, want %s", g, stateFollower)
if g := a.state; g != StateFollower {
t.Errorf("state = %s, want %s", g, StateFollower)
}
if g := a.Term; g != 1 {
t.Errorf("term = %d, want %d", g, 1)
@ -293,8 +293,8 @@ func TestSingleNodeCandidate(t *testing.T) {
tt.send(pb.Message{From: 1, To: 1, Type: msgHup})
sm := tt.peers[1].(*raft)
if sm.state != stateLeader {
t.Errorf("state = %d, want %d", sm.state, stateLeader)
if sm.state != StateLeader {
t.Errorf("state = %d, want %d", sm.state, StateLeader)
}
}
@ -450,7 +450,7 @@ func TestCommit(t *testing.T) {
for j := 0; j < len(tt.matches); j++ {
prs[int64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
}
sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, State: pb.State{Term: tt.smTerm}}
sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, HardState: pb.HardState{Term: tt.smTerm}}
sm.maybeCommit()
if g := sm.raftLog.committed; g != tt.w {
t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
@ -504,9 +504,9 @@ func TestHandleMsgApp(t *testing.T) {
for i, tt := range tests {
sm := &raft{
state: stateFollower,
State: pb.State{Term: 2},
raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
state: StateFollower,
HardState: pb.HardState{Term: 2},
raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
}
sm.handleAppendEntries(tt.m)
@ -532,50 +532,50 @@ func TestHandleMsgApp(t *testing.T) {
func TestRecvMsgVote(t *testing.T) {
tests := []struct {
state stateType
state StateType
i, term int64
voteFor int64
w int64
}{
{stateFollower, 0, 0, None, -1},
{stateFollower, 0, 1, None, -1},
{stateFollower, 0, 2, None, -1},
{stateFollower, 0, 3, None, 2},
{StateFollower, 0, 0, None, -1},
{StateFollower, 0, 1, None, -1},
{StateFollower, 0, 2, None, -1},
{StateFollower, 0, 3, None, 2},
{stateFollower, 1, 0, None, -1},
{stateFollower, 1, 1, None, -1},
{stateFollower, 1, 2, None, -1},
{stateFollower, 1, 3, None, 2},
{StateFollower, 1, 0, None, -1},
{StateFollower, 1, 1, None, -1},
{StateFollower, 1, 2, None, -1},
{StateFollower, 1, 3, None, 2},
{stateFollower, 2, 0, None, -1},
{stateFollower, 2, 1, None, -1},
{stateFollower, 2, 2, None, 2},
{stateFollower, 2, 3, None, 2},
{StateFollower, 2, 0, None, -1},
{StateFollower, 2, 1, None, -1},
{StateFollower, 2, 2, None, 2},
{StateFollower, 2, 3, None, 2},
{stateFollower, 3, 0, None, -1},
{stateFollower, 3, 1, None, -1},
{stateFollower, 3, 2, None, 2},
{stateFollower, 3, 3, None, 2},
{StateFollower, 3, 0, None, -1},
{StateFollower, 3, 1, None, -1},
{StateFollower, 3, 2, None, 2},
{StateFollower, 3, 3, None, 2},
{stateFollower, 3, 2, 2, 2},
{stateFollower, 3, 2, 1, -1},
{StateFollower, 3, 2, 2, 2},
{StateFollower, 3, 2, 1, -1},
{stateLeader, 3, 3, 1, -1},
{stateCandidate, 3, 3, 1, -1},
{StateLeader, 3, 3, 1, -1},
{StateCandidate, 3, 3, 1, -1},
}
for i, tt := range tests {
sm := newRaft(1, []int64{1}, 0, 0)
sm.state = tt.state
switch tt.state {
case stateFollower:
case StateFollower:
sm.step = stepFollower
case stateCandidate:
case StateCandidate:
sm.step = stepCandidate
case stateLeader:
case StateLeader:
sm.step = stepLeader
}
sm.State = pb.State{Vote: tt.voteFor}
sm.HardState = pb.HardState{Vote: tt.voteFor}
sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}}
sm.Step(pb.Message{Type: msgVote, From: 2, Index: tt.i, LogTerm: tt.term})
@ -593,23 +593,23 @@ func TestRecvMsgVote(t *testing.T) {
func TestStateTransition(t *testing.T) {
tests := []struct {
from stateType
to stateType
from StateType
to StateType
wallow bool
wterm int64
wlead int64
}{
{stateFollower, stateFollower, true, 1, None},
{stateFollower, stateCandidate, true, 1, None},
{stateFollower, stateLeader, false, -1, None},
{StateFollower, StateFollower, true, 1, None},
{StateFollower, StateCandidate, true, 1, None},
{StateFollower, StateLeader, false, -1, None},
{stateCandidate, stateFollower, true, 0, None},
{stateCandidate, stateCandidate, true, 1, None},
{stateCandidate, stateLeader, true, 0, 1},
{StateCandidate, StateFollower, true, 0, None},
{StateCandidate, StateCandidate, true, 1, None},
{StateCandidate, StateLeader, true, 0, 1},
{stateLeader, stateFollower, true, 1, None},
{stateLeader, stateCandidate, false, 1, None},
{stateLeader, stateLeader, true, 0, 1},
{StateLeader, StateFollower, true, 1, None},
{StateLeader, StateCandidate, false, 1, None},
{StateLeader, StateLeader, true, 0, 1},
}
for i, tt := range tests {
@ -626,11 +626,11 @@ func TestStateTransition(t *testing.T) {
sm.state = tt.from
switch tt.to {
case stateFollower:
case StateFollower:
sm.becomeFollower(tt.wterm, tt.wlead)
case stateCandidate:
case StateCandidate:
sm.becomeCandidate()
case stateLeader:
case StateLeader:
sm.becomeLeader()
}
@ -646,15 +646,15 @@ func TestStateTransition(t *testing.T) {
func TestAllServerStepdown(t *testing.T) {
tests := []struct {
state stateType
state StateType
wstate stateType
wstate StateType
wterm int64
windex int64
}{
{stateFollower, stateFollower, 3, 1},
{stateCandidate, stateFollower, 3, 1},
{stateLeader, stateFollower, 3, 2},
{StateFollower, StateFollower, 3, 1},
{StateCandidate, StateFollower, 3, 1},
{StateLeader, StateFollower, 3, 2},
}
tmsgTypes := [...]int64{msgVote, msgApp}
@ -663,11 +663,11 @@ func TestAllServerStepdown(t *testing.T) {
for i, tt := range tests {
sm := newRaft(1, []int64{1, 2, 3}, 0, 0)
switch tt.state {
case stateFollower:
case StateFollower:
sm.becomeFollower(1, None)
case stateCandidate:
case StateCandidate:
sm.becomeCandidate()
case stateLeader:
case StateLeader:
sm.becomeCandidate()
sm.becomeLeader()
}
@ -796,13 +796,13 @@ func TestBcastBeat(t *testing.T) {
// tests the output of the statemachine when receiving msgBeat
func TestRecvMsgBeat(t *testing.T) {
tests := []struct {
state stateType
state StateType
wMsg int
}{
{stateLeader, 2},
{StateLeader, 2},
// candidate and follower should ignore msgBeat
{stateCandidate, 0},
{stateFollower, 0},
{StateCandidate, 0},
{StateFollower, 0},
}
for i, tt := range tests {
@ -811,11 +811,11 @@ func TestRecvMsgBeat(t *testing.T) {
sm.Term = 1
sm.state = tt.state
switch tt.state {
case stateFollower:
case StateFollower:
sm.step = stepFollower
case stateCandidate:
case StateCandidate:
sm.step = stepCandidate
case stateLeader:
case StateLeader:
sm.step = stepLeader
}
sm.Step(pb.Message{From: 1, To: 1, Type: msgBeat})

View File

@ -13,7 +13,7 @@
Entry
Snapshot
Message
State
HardState
*/
package raftpb
@ -80,16 +80,16 @@ func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
type State struct {
type HardState struct {
Term int64 `protobuf:"varint,1,req,name=term" json:"term"`
Vote int64 `protobuf:"varint,2,req,name=vote" json:"vote"`
Commit int64 `protobuf:"varint,3,req,name=commit" json:"commit"`
XXX_unrecognized []byte `json:"-"`
}
func (m *State) Reset() { *m = State{} }
func (m *State) String() string { return proto.CompactTextString(m) }
func (*State) ProtoMessage() {}
func (m *HardState) Reset() { *m = HardState{} }
func (m *HardState) String() string { return proto.CompactTextString(m) }
func (*HardState) ProtoMessage() {}
func init() {
}
@ -549,7 +549,7 @@ func (m *Message) Unmarshal(data []byte) error {
}
return nil
}
func (m *State) Unmarshal(data []byte) error {
func (m *HardState) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
@ -697,7 +697,7 @@ func (m *Message) Size() (n int) {
}
return n
}
func (m *State) Size() (n int) {
func (m *HardState) Size() (n int) {
var l int
_ = l
n += 1 + sovRaft(uint64(m.Term))
@ -879,7 +879,7 @@ func (m *Message) MarshalTo(data []byte) (n int, err error) {
}
return i, nil
}
func (m *State) Marshal() (data []byte, err error) {
func (m *HardState) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
@ -889,7 +889,7 @@ func (m *State) Marshal() (data []byte, err error) {
return data[:n], nil
}
func (m *State) MarshalTo(data []byte) (n int, err error) {
func (m *HardState) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int

View File

@ -36,7 +36,7 @@ message Message {
required Snapshot snapshot = 9 [(gogoproto.nullable) = false];
}
message State {
message HardState {
required int64 term = 1 [(gogoproto.nullable) = false];
required int64 vote = 2 [(gogoproto.nullable) = false];
required int64 commit = 3 [(gogoproto.nullable) = false];

View File

@ -76,8 +76,8 @@ func mustUnmarshalEntry(d []byte) raftpb.Entry {
return e
}
func mustUnmarshalState(d []byte) raftpb.State {
var s raftpb.State
func mustUnmarshalState(d []byte) raftpb.HardState {
var s raftpb.HardState
if err := s.Unmarshal(d); err != nil {
panic(err)
}

View File

@ -153,7 +153,7 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
// ReadAll reads out all records of the current WAL.
// After ReadAll, the WAL will be ready for appending new records.
func (w *WAL) ReadAll() (id int64, state raftpb.State, ents []raftpb.Entry, err error) {
func (w *WAL) ReadAll() (id int64, state raftpb.HardState, ents []raftpb.Entry, err error) {
rec := &walpb.Record{}
decoder := w.decoder
@ -264,7 +264,7 @@ func (w *WAL) SaveEntry(e *raftpb.Entry) error {
return nil
}
func (w *WAL) SaveState(s *raftpb.State) error {
func (w *WAL) SaveState(s *raftpb.HardState) error {
if raft.IsEmptyState(*s) {
return nil
}
@ -277,7 +277,7 @@ func (w *WAL) SaveState(s *raftpb.State) error {
return w.encoder.encode(rec)
}
func (w *WAL) Save(st raftpb.State, ents []raftpb.Entry) {
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) {
// TODO(xiangli): no more reference operator
w.SaveState(&st)
for i := range ents {

View File

@ -175,7 +175,7 @@ func TestRecover(t *testing.T) {
t.Fatal(err)
}
}
sts := []raftpb.State{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}}
sts := []raftpb.HardState{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}}
for _, s := range sts {
if err = w.SaveState(&s); err != nil {
t.Fatal(err)
@ -341,7 +341,7 @@ func TestRecoverAfterCut(t *testing.T) {
func TestSaveEmpty(t *testing.T) {
var buf bytes.Buffer
var est raftpb.State
var est raftpb.HardState
w := WAL{
encoder: newEncoder(&buf, 0),
}