diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index d3135540d..970569d3a 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -1,11 +1,9 @@ package etcdhttp import ( - "encoding/binary" "encoding/json" "errors" "fmt" - "io" "io/ioutil" "log" "net/http" @@ -14,8 +12,6 @@ import ( "strings" "time" - crand "crypto/rand" - "github.com/coreos/etcd/elog" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/etcdserver" @@ -71,7 +67,7 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(context.Background(), h.timeout) defer cancel() - rr, err := parseRequest(r, genID()) + rr, err := parseRequest(r, etcdserver.GenID()) if err != nil { writeError(w, err) return @@ -139,20 +135,6 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -// genID generates a random id that is: n < 0 < n. -func genID() int64 { - for { - b := make([]byte, 8) - if _, err := io.ReadFull(crand.Reader, b); err != nil { - panic(err) // really bad stuff happened - } - n := int64(binary.BigEndian.Uint64(b)) - if n != 0 { - return n - } - } -} - // parseRequest converts a received http.Request to a server Request, // performing validation of supplied fields as appropriate. // If any validation fails, an empty Request and non-nil error is returned. diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index fd9ad85c7..280c85c66 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -42,6 +42,7 @@ type Request struct { Recursive bool `protobuf:"varint,12,req,name=recursive" json:"recursive"` Sorted bool `protobuf:"varint,13,req,name=sorted" json:"sorted"` Quorum bool `protobuf:"varint,14,req,name=quorum" json:"quorum"` + Time int64 `protobuf:"varint,15,req,name=time" json:"time"` XXX_unrecognized []byte `json:"-"` } @@ -321,6 +322,21 @@ func (m *Request) Unmarshal(data []byte) error { } } m.Quorum = bool(v != 0) + case 15: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Time |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: var sizeOfWire int for { @@ -367,6 +383,7 @@ func (m *Request) Size() (n int) { n += 2 n += 2 n += 2 + n += 1 + sovEtcdserver(uint64(m.Time)) if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -479,6 +496,9 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) { data[i] = 0 } i++ + data[i] = 0x78 + i++ + i = encodeVarintEtcdserver(data, i, uint64(m.Time)) if m.XXX_unrecognized != nil { i += copy(data[i:], m.XXX_unrecognized) } diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index 6031a8983..9cab32874 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -8,18 +8,19 @@ option (gogoproto.unmarshaler_all) = true; option (gogoproto.goproto_getters_all) = false; message Request { - required int64 id = 1 [(gogoproto.nullable) = false]; + required int64 id = 1 [(gogoproto.nullable) = false]; required string method = 2 [(gogoproto.nullable) = false]; required string path = 3 [(gogoproto.nullable) = false]; required string val = 4 [(gogoproto.nullable) = false]; required bool dir = 5 [(gogoproto.nullable) = false]; required string prevValue = 6 [(gogoproto.nullable) = false]; - required uint64 prevIndex = 7 [(gogoproto.nullable) = false]; + required uint64 prevIndex = 7 [(gogoproto.nullable) = false]; required bool prevExists = 8 [(gogoproto.nullable) = true]; required int64 expiration = 9 [(gogoproto.nullable) = false]; required bool wait = 10 [(gogoproto.nullable) = false]; - required uint64 since = 11 [(gogoproto.nullable) = false]; + required uint64 since = 11 [(gogoproto.nullable) = false]; required bool recursive = 12 [(gogoproto.nullable) = false]; required bool sorted = 13 [(gogoproto.nullable) = false]; required bool quorum = 14 [(gogoproto.nullable) = false]; + required int64 time = 15 [(gogoproto.nullable) = false]; } diff --git a/etcdserver/etcdserverpb/genproto.sh b/etcdserver/etcdserverpb/genproto.sh old mode 100644 new mode 100755 diff --git a/etcdserver/server.go b/etcdserver/server.go index 12e94b9eb..d60ce9933 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1,9 +1,14 @@ package etcdserver import ( + "encoding/binary" "errors" + "io" + "log" "time" + crand "crypto/rand" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" @@ -12,13 +17,15 @@ import ( "github.com/coreos/etcd/wait" ) +const defaultSyncTimeout = time.Second + var ( ErrUnknownMethod = errors.New("etcdserver: unknown method") ErrStopped = errors.New("etcdserver: server stopped") ) type SendFunc func(m []raftpb.Message) -type SaveFunc func(st raftpb.State, ents []raftpb.Entry) +type SaveFunc func(st raftpb.HardState, ents []raftpb.Entry) type Response struct { Event *store.Event @@ -59,9 +66,10 @@ type EtcdServer struct { // Save specifies the save function for saving ents to stable storage. // Save MUST block until st and ents are on stable storage. If Send is // nil, server will panic. - Save func(st raftpb.State, ents []raftpb.Entry) + Save func(st raftpb.HardState, ents []raftpb.Entry) - Ticker <-chan time.Time + Ticker <-chan time.Time + SyncTicker <-chan time.Time } // Start prepares and starts server in a new goroutine. It is no longer safe to @@ -77,12 +85,13 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { } func (s *EtcdServer) run() { + var syncC <-chan time.Time for { select { case <-s.Ticker: s.Node.Tick() case rd := <-s.Node.Ready(): - s.Save(rd.State, rd.Entries) + s.Save(rd.HardState, rd.Entries) s.Send(rd.Messages) // TODO(bmizerany): do this in the background, but take @@ -95,6 +104,16 @@ func (s *EtcdServer) run() { } s.w.Trigger(r.Id, s.apply(r)) } + + if rd.SoftState != nil { + if rd.RaftState == raft.StateLeader { + syncC = s.SyncTicker + } else { + syncC = nil + } + } + case <-syncC: + s.sync(defaultSyncTimeout) case <-s.done: return } @@ -109,7 +128,7 @@ func (s *EtcdServer) Stop() { } // Do interprets r and performs an operation on s.Store according to r.Method -// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET with +// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with // Quorum == true, r will be sent through consensus before performing its // respective operation. Do will block until an action is performed or there is // an error. @@ -158,6 +177,29 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { } } +// sync proposes a SYNC request and is non-blocking. +// This makes no guarantee that the request will be proposed or performed. +// The request will be cancelled after the given timeout. +func (s *EtcdServer) sync(timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + req := pb.Request{ + Method: "SYNC", + Id: GenID(), + Time: time.Now().UnixNano(), + } + data, err := req.Marshal() + if err != nil { + log.Printf("marshal request %#v error: %v", req, err) + return + } + // There is no promise that node has leader when do SYNC request, + // so it uses goroutine to propose. + go func() { + s.Node.Propose(ctx, data) + cancel() + }() +} + // apply interprets r as a call to store.X and returns an Response interpreted from store.Event func (s *EtcdServer) apply(r pb.Request) Response { f := func(ev *store.Event, err error) Response { @@ -190,12 +232,30 @@ func (s *EtcdServer) apply(r pb.Request) Response { } case "QGET": return f(s.Store.Get(r.Path, r.Recursive, r.Sorted)) + case "SYNC": + s.Store.DeleteExpiredKeys(time.Unix(0, r.Time)) + return Response{} default: // This should never be reached, but just in case: return Response{err: ErrUnknownMethod} } } +// TODO: move the function to /id pkg maybe? +// GenID generates a random id that is not equal to 0. +func GenID() int64 { + for { + b := make([]byte, 8) + if _, err := io.ReadFull(crand.Reader, b); err != nil { + panic(err) // really bad stuff happened + } + n := int64(binary.BigEndian.Uint64(b)) + if n != 0 { + return n + } + } +} + func getBool(v *bool) (vv bool, set bool) { if v == nil { return false, false diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 190fb1b0d..41b32505c 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -4,6 +4,8 @@ import ( "fmt" "math/rand" "reflect" + "runtime" + "sync" "testing" "time" @@ -109,6 +111,10 @@ func TestApply(t *testing.T) { pb.Request{Method: "QGET", Id: 1}, Response{Event: &store.Event{}}, []string{"Get"}, }, + { + pb.Request{Method: "SYNC", Id: 1}, + Response{}, []string{"DeleteExpiredKeys"}, + }, { pb.Request{Method: "BADMETHOD", Id: 1}, Response{err: ErrUnknownMethod}, nil, @@ -159,7 +165,7 @@ func testServer(t *testing.T, ns int64) { Node: n, Store: store.New(), Send: send, - Save: func(_ raftpb.State, _ []raftpb.Entry) {}, + Save: func(_ raftpb.HardState, _ []raftpb.Entry) {}, Ticker: tk.C, } srv.Start() @@ -228,7 +234,7 @@ func TestDoProposal(t *testing.T) { Node: n, Store: st, Send: func(_ []raftpb.Message) {}, - Save: func(_ raftpb.State, _ []raftpb.Entry) {}, + Save: func(_ raftpb.HardState, _ []raftpb.Entry) {}, Ticker: tk, } srv.Start() @@ -296,7 +302,7 @@ func TestDoProposalStopped(t *testing.T) { Node: n, Store: st, Send: func(_ []raftpb.Message) {}, - Save: func(_ raftpb.State, _ []raftpb.Entry) {}, + Save: func(_ raftpb.HardState, _ []raftpb.Entry) {}, Ticker: tk, } srv.Start() @@ -318,6 +324,117 @@ func TestDoProposalStopped(t *testing.T) { } } +// TestSync tests sync 1. is nonblocking 2. sends out SYNC request. +func TestSync(t *testing.T) { + n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1) + n.Campaign(context.TODO()) + select { + case <-n.Ready(): + case <-time.After(time.Millisecond): + t.Fatalf("expect to receive ready within 1ms, but fail") + } + + srv := &EtcdServer{ + // TODO: use fake node for better testability + Node: n, + } + start := time.Now() + srv.sync(defaultSyncTimeout) + + // check that sync is non-blocking + if d := time.Since(start); d > time.Millisecond { + t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond) + } + + // give time for goroutine in sync to run + // TODO: use fake clock + var ready raft.Ready + select { + case ready = <-n.Ready(): + case <-time.After(time.Millisecond): + t.Fatalf("expect to receive ready within 1ms, but fail") + } + + if len(ready.CommittedEntries) != 1 { + t.Fatalf("len(CommittedEntries) = %d, want 1", len(ready.CommittedEntries)) + } + e := ready.CommittedEntries[0] + var req pb.Request + if err := req.Unmarshal(e.Data); err != nil { + t.Fatalf("unmarshal error: %v", err) + } + if req.Method != "SYNC" { + t.Errorf("method = %s, want SYNC", req.Method) + } +} + +// TestSyncFail tests the case that sync 1. is non-blocking 2. fails to +// propose SYNC request because there is no leader +func TestSyncFail(t *testing.T) { + // The node is run without Tick and Campaign, so it has no leader forever. + n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1) + select { + case <-n.Ready(): + case <-time.After(time.Millisecond): + t.Fatalf("no ready") + } + + srv := &EtcdServer{ + // TODO: use fake node for better testability + Node: n, + } + routineN := runtime.NumGoroutine() + start := time.Now() + srv.sync(time.Millisecond) + + // check that sync is non-blocking + if d := time.Since(start); d > time.Millisecond { + t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond) + } + + // give time for goroutine in sync to cancel + // TODO: use fake clock + time.Sleep(2 * time.Millisecond) + if g := runtime.NumGoroutine(); g != routineN { + t.Errorf("NumGoroutine = %d, want %d", g, routineN) + } + select { + case g := <-n.Ready(): + t.Errorf("ready = %+v, want no", g) + default: + } +} + +func TestSyncTriggerDeleteExpriedKeys(t *testing.T) { + n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1) + n.Campaign(context.TODO()) + st := &storeRecorder{} + syncInterval := 5 * time.Millisecond + syncTicker := time.NewTicker(syncInterval) + defer syncTicker.Stop() + srv := &EtcdServer{ + // TODO: use fake node for better testability + Node: n, + Store: st, + Send: func(_ []raftpb.Message) {}, + Save: func(_ raftpb.HardState, _ []raftpb.Entry) {}, + SyncTicker: syncTicker.C, + } + srv.Start() + // give time for sync request to be proposed and performed + // TODO: use fake clock + time.Sleep(syncInterval + time.Millisecond) + srv.Stop() + + action := st.Action() + if len(action) != 1 { + t.Fatalf("len(action) = %d, want 1", len(action)) + } + if action[0] != "DeleteExpiredKeys" { + t.Errorf("action = %s, want DeleteExpiredKeys", action[0]) + } +} + // TODO: test wait trigger correctness in multi-server case func TestGetBool(t *testing.T) { @@ -342,48 +459,63 @@ func TestGetBool(t *testing.T) { } type storeRecorder struct { + sync.Mutex action []string } +func (s *storeRecorder) record(action string) { + s.Lock() + s.action = append(s.action, action) + s.Unlock() +} +func (s *storeRecorder) Action() []string { + s.Lock() + cpy := make([]string, len(s.action)) + copy(cpy, s.action) + s.Unlock() + return cpy +} func (s *storeRecorder) Version() int { return 0 } func (s *storeRecorder) Index() uint64 { return 0 } func (s *storeRecorder) Get(_ string, _, _ bool) (*store.Event, error) { - s.action = append(s.action, "Get") + s.record("Get") return &store.Event{}, nil } func (s *storeRecorder) Set(_ string, _ bool, _ string, _ time.Time) (*store.Event, error) { - s.action = append(s.action, "Set") + s.record("Set") return &store.Event{}, nil } func (s *storeRecorder) Update(_, _ string, _ time.Time) (*store.Event, error) { - s.action = append(s.action, "Update") + s.record("Update") return &store.Event{}, nil } func (s *storeRecorder) Create(_ string, _ bool, _ string, _ bool, _ time.Time) (*store.Event, error) { - s.action = append(s.action, "Create") + s.record("Create") return &store.Event{}, nil } func (s *storeRecorder) CompareAndSwap(_, _ string, _ uint64, _ string, _ time.Time) (*store.Event, error) { - s.action = append(s.action, "CompareAndSwap") + s.record("CompareAndSwap") return &store.Event{}, nil } func (s *storeRecorder) Delete(_ string, _, _ bool) (*store.Event, error) { - s.action = append(s.action, "Delete") + s.record("Delete") return &store.Event{}, nil } func (s *storeRecorder) CompareAndDelete(_, _ string, _ uint64) (*store.Event, error) { - s.action = append(s.action, "CompareAndDelete") + s.record("CompareAndDelete") return &store.Event{}, nil } func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) { - s.action = append(s.action, "Watch") + s.record("Watch") return &stubWatcher{}, nil } -func (s *storeRecorder) Save() ([]byte, error) { return nil, nil } -func (s *storeRecorder) Recovery(b []byte) error { return nil } -func (s *storeRecorder) TotalTransactions() uint64 { return 0 } -func (s *storeRecorder) JsonStats() []byte { return nil } -func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {} +func (s *storeRecorder) Save() ([]byte, error) { return nil, nil } +func (s *storeRecorder) Recovery(b []byte) error { return nil } +func (s *storeRecorder) TotalTransactions() uint64 { return 0 } +func (s *storeRecorder) JsonStats() []byte { return nil } +func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) { + s.record("DeleteExpiredKeys") +} type stubWatcher struct{} diff --git a/functional/http_functional_test.go b/functional/http_functional_test.go index f3c6751e2..ca5ad9f3d 100644 --- a/functional/http_functional_test.go +++ b/functional/http_functional_test.go @@ -17,8 +17,8 @@ import ( "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context" ) -func nopSave(st raftpb.State, ents []raftpb.Entry) {} -func nopSend(m []raftpb.Message) {} +func nopSave(st raftpb.HardState, ents []raftpb.Entry) {} +func nopSend(m []raftpb.Message) {} func TestSet(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -30,7 +30,7 @@ func TestSet(t *testing.T) { srv := &etcdserver.EtcdServer{ Store: store.New(), Node: n, - Save: func(st raftpb.State, ents []raftpb.Entry) {}, + Save: func(st raftpb.HardState, ents []raftpb.Entry) {}, Send: etcdserver.SendFunc(nopSend), } srv.Start() diff --git a/main.go b/main.go index cb4323be4..1f0323bf4 100644 --- a/main.go +++ b/main.go @@ -76,11 +76,12 @@ func startEtcd() http.Handler { n, w := startRaft(id, peers.IDs(), path.Join(*dir, "wal")) s := &etcdserver.EtcdServer{ - Store: store.New(), - Node: n, - Save: w.Save, - Send: etcdhttp.Sender(*peers), - Ticker: time.Tick(100 * time.Millisecond), + Store: store.New(), + Node: n, + Save: w.Save, + Send: etcdhttp.Sender(*peers), + Ticker: time.Tick(100 * time.Millisecond), + SyncTicker: time.Tick(500 * time.Millisecond), } s.Start() return etcdhttp.NewHandler(s, *peers, *timeout) diff --git a/raft/example_test.go b/raft/example_test.go index efe775d8c..c0726a9f3 100644 --- a/raft/example_test.go +++ b/raft/example_test.go @@ -4,10 +4,10 @@ import ( pb "github.com/coreos/etcd/raft/raftpb" ) -func applyToStore(ents []pb.Entry) {} -func sendMessages(msgs []pb.Message) {} -func saveStateToDisk(st pb.State) {} -func saveToDisk(ents []pb.Entry) {} +func applyToStore(ents []pb.Entry) {} +func sendMessages(msgs []pb.Message) {} +func saveStateToDisk(st pb.HardState) {} +func saveToDisk(ents []pb.Entry) {} func Example_Node() { n := Start(0, nil, 0, 0) @@ -15,13 +15,13 @@ func Example_Node() { // stuff to n happens in other goroutines // the last known state - var prev pb.State + var prev pb.HardState for { // ReadState blocks until there is new state ready. rd := <-n.Ready() - if !isStateEqual(prev, rd.State) { - saveStateToDisk(rd.State) - prev = rd.State + if !isStateEqual(prev, rd.HardState) { + saveStateToDisk(rd.HardState) + prev = rd.HardState } saveToDisk(rd.Entries) diff --git a/raft/node.go b/raft/node.go index a29839c38..b45a8a4cd 100644 --- a/raft/node.go +++ b/raft/node.go @@ -9,15 +9,34 @@ import ( ) var ( - emptyState = pb.State{} + emptyState = pb.HardState{} ErrStopped = errors.New("raft: stopped") ) -// Ready encapsulates the entries and messages that are ready to be saved to -// stable storage, committed or sent to other peers. +// SoftState provides state that is useful for logging and debugging. +// The state is volatile and does not need to be persisted to the WAL. +type SoftState struct { + Lead int64 + RaftState StateType +} + +func (a *SoftState) equal(b *SoftState) bool { + return a.Lead == b.Lead && a.RaftState == b.RaftState +} + +// Ready encapsulates the entries and messages that are ready to read, +// be saved to stable storage, committed or sent to other peers. +// All fields in Ready are read-only. type Ready struct { - // The current state of a Node - pb.State + // The current volatile state of a Node. + // SoftState will be nil if there is no update. + // It is not required to consume or store SoftState. + *SoftState + + // The current state of a Node to be saved to stable storage BEFORE + // Messages are sent. + // HardState will be equal to empty state if there is no update. + pb.HardState // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. @@ -33,16 +52,16 @@ type Ready struct { Messages []pb.Message } -func isStateEqual(a, b pb.State) bool { +func isStateEqual(a, b pb.HardState) bool { return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit } -func IsEmptyState(st pb.State) bool { +func IsEmptyState(st pb.HardState) bool { return isStateEqual(st, emptyState) } func (rd Ready) containsUpdates() bool { - return !IsEmptyState(rd.State) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 + return rd.SoftState != nil || !IsEmptyState(rd.HardState) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 } type Node struct { @@ -65,7 +84,7 @@ func Start(id int64, peers []int64, election, heartbeat int) Node { // Restart is identical to Start but takes an initial State and a slice of // entries. Generally this is used when restarting from a stable storage // log. -func Restart(id int64, peers []int64, election, heartbeat int, st pb.State, ents []pb.Entry) Node { +func Restart(id int64, peers []int64, election, heartbeat int, st pb.HardState, ents []pb.Entry) Node { n := newNode() r := newRaft(id, peers, election, heartbeat) r.loadState(st) @@ -92,13 +111,21 @@ func (n *Node) run(r *raft) { var propc chan pb.Message var readyc chan Ready - var lead int64 - prevSt := r.State + lead := None + prevSoftSt := r.softState() + prevHardSt := r.HardState for { - if lead != r.lead { - log.Printf("raft: leader changed from %#x to %#x", lead, r.lead) - lead = r.lead + rd := newReady(r, prevSoftSt, prevHardSt) + if rd.containsUpdates() { + readyc = n.readyc + } else { + readyc = nil + } + + if rd.SoftState != nil && lead != rd.SoftState.Lead { + log.Printf("raft: leader changed from %#x to %#x", lead, rd.SoftState.Lead) + lead = rd.SoftState.Lead if r.hasLeader() { propc = n.propc } else { @@ -106,13 +133,6 @@ func (n *Node) run(r *raft) { } } - rd := newReady(r, prevSt) - if rd.containsUpdates() { - readyc = n.readyc - } else { - readyc = nil - } - select { case m := <-propc: m.From = r.id @@ -122,11 +142,14 @@ func (n *Node) run(r *raft) { case <-n.tickc: r.tick() case readyc <- rd: + if rd.SoftState != nil { + prevSoftSt = rd.SoftState + } + if !IsEmptyState(rd.HardState) { + prevHardSt = rd.HardState + } r.raftLog.resetNextEnts() r.raftLog.resetUnstable() - if !IsEmptyState(rd.State) { - prevSt = rd.State - } r.msgs = nil case <-n.done: return @@ -175,14 +198,17 @@ func (n *Node) Ready() <-chan Ready { return n.readyc } -func newReady(r *raft, prev pb.State) Ready { +func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { rd := Ready{ Entries: r.raftLog.unstableEnts(), CommittedEntries: r.raftLog.nextEnts(), Messages: r.msgs, } - if !isStateEqual(r.State, prev) { - rd.State = r.State + if softSt := r.softState(); !softSt.equal(prevSoftSt) { + rd.SoftState = softSt + } + if !isStateEqual(r.HardState, prevHardSt) { + rd.HardState = r.HardState } return rd } diff --git a/raft/node_test.go b/raft/node_test.go index 72fb7816c..65483687e 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -121,15 +121,16 @@ func TestReadyContainUpdates(t *testing.T) { wcontain bool }{ {Ready{}, false}, - {Ready{State: raftpb.State{Vote: 1}}, true}, + {Ready{SoftState: &SoftState{Lead: 1}}, true}, + {Ready{HardState: raftpb.HardState{Vote: 1}}, true}, {Ready{Entries: make([]raftpb.Entry, 1, 1)}, true}, {Ready{CommittedEntries: make([]raftpb.Entry, 1, 1)}, true}, {Ready{Messages: make([]raftpb.Message, 1, 1)}, true}, } for i, tt := range tests { - if tt.rd.containsUpdates() != tt.wcontain { - t.Errorf("#%d: containUpdates = %v, want %v", i, tt.rd.containsUpdates(), tt.wcontain) + if g := tt.rd.containsUpdates(); g != tt.wcontain { + t.Errorf("#%d: containUpdates = %v, want %v", i, g, tt.wcontain) } } } @@ -140,12 +141,13 @@ func TestNode(t *testing.T) { wants := []Ready{ { - State: raftpb.State{Term: 1, Commit: 1}, + SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + HardState: raftpb.HardState{Term: 1, Commit: 1}, Entries: []raftpb.Entry{{}, {Term: 1, Index: 1}}, CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}}, }, { - State: raftpb.State{Term: 1, Commit: 2}, + HardState: raftpb.HardState{Term: 1, Commit: 2}, Entries: []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}}, CommittedEntries: []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}}, }, @@ -175,10 +177,10 @@ func TestNodeRestart(t *testing.T) { {Term: 1, Index: 1}, {Term: 1, Index: 2, Data: []byte("foo")}, } - st := raftpb.State{Term: 1, Commit: 1} + st := raftpb.HardState{Term: 1, Commit: 1} want := Ready{ - State: emptyState, + HardState: emptyState, // commit upto index commit index in st CommittedEntries: entries[1 : st.Commit+1], } @@ -197,13 +199,13 @@ func TestNodeRestart(t *testing.T) { func TestIsStateEqual(t *testing.T) { tests := []struct { - st raftpb.State + st raftpb.HardState we bool }{ {emptyState, true}, - {raftpb.State{Vote: 1}, false}, - {raftpb.State{Commit: 1}, false}, - {raftpb.State{Term: 1}, false}, + {raftpb.HardState{Vote: 1}, false}, + {raftpb.HardState{Commit: 1}, false}, + {raftpb.HardState{Term: 1}, false}, } for i, tt := range tests { diff --git a/raft/raft.go b/raft/raft.go index 61d1bdd3c..249198094 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -8,7 +8,7 @@ import ( pb "github.com/coreos/etcd/raft/raftpb" ) -const None = 0 +const None int64 = 0 type messageType int64 @@ -43,20 +43,20 @@ func (mt messageType) String() string { var errNoLeader = errors.New("no leader") const ( - stateFollower stateType = iota - stateCandidate - stateLeader + StateFollower StateType = iota + StateCandidate + StateLeader ) -type stateType int64 +type StateType int64 var stmap = [...]string{ - stateFollower: "stateFollower", - stateCandidate: "stateCandidate", - stateLeader: "stateLeader", + StateFollower: "StateFollower", + StateCandidate: "StateCandidate", + StateLeader: "StateLeader", } -func (st stateType) String() string { +func (st StateType) String() string { return stmap[int64(st)] } @@ -87,7 +87,7 @@ func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } type raft struct { - pb.State + pb.HardState id int64 @@ -96,7 +96,7 @@ type raft struct { prs map[int64]*progress - state stateType + state StateType votes map[int64]bool @@ -133,14 +133,18 @@ func newRaft(id int64, peers []int64, election, heartbeat int) *raft { func (r *raft) hasLeader() bool { return r.lead != None } +func (r *raft) softState() *SoftState { + return &SoftState{Lead: r.lead, RaftState: r.state} +} + func (r *raft) String() string { s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term) switch r.state { - case stateFollower: + case StateFollower: s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead) - case stateCandidate: + case StateCandidate: s += fmt.Sprintf(` votes="%v"`, r.votes) - case stateLeader: + case StateLeader: s += fmt.Sprintf(` prs="%v"`, r.prs) } return s @@ -279,31 +283,31 @@ func (r *raft) becomeFollower(term int64, lead int64) { r.reset(term) r.tick = r.tickElection r.lead = lead - r.state = stateFollower + r.state = StateFollower } func (r *raft) becomeCandidate() { // TODO(xiangli) remove the panic when the raft implementation is stable - if r.state == stateLeader { + if r.state == StateLeader { panic("invalid transition [leader -> candidate]") } r.step = stepCandidate r.reset(r.Term + 1) r.tick = r.tickElection r.Vote = r.id - r.state = stateCandidate + r.state = StateCandidate } func (r *raft) becomeLeader() { // TODO(xiangli) remove the panic when the raft implementation is stable - if r.state == stateFollower { + if r.state == StateFollower { panic("invalid transition [follower -> leader]") } r.step = stepLeader r.reset(r.Term) r.tick = r.tickHeartbeat r.lead = r.id - r.state = stateLeader + r.state = StateLeader r.appendEntry(pb.Entry{Data: nil}) } @@ -504,7 +508,7 @@ func (r *raft) loadEnts(ents []pb.Entry) { r.raftLog.load(ents) } -func (r *raft) loadState(state pb.State) { +func (r *raft) loadState(state pb.HardState) { r.raftLog.committed = state.Commit r.Term = state.Term r.Vote = state.Vote diff --git a/raft/raft_test.go b/raft/raft_test.go index 45c062f2a..24c15e622 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -26,19 +26,19 @@ type Interface interface { func TestLeaderElection(t *testing.T) { tests := []struct { *network - state stateType + state StateType }{ - {newNetwork(nil, nil, nil), stateLeader}, - {newNetwork(nil, nil, nopStepper), stateLeader}, - {newNetwork(nil, nopStepper, nopStepper), stateCandidate}, - {newNetwork(nil, nopStepper, nopStepper, nil), stateCandidate}, - {newNetwork(nil, nopStepper, nopStepper, nil, nil), stateLeader}, + {newNetwork(nil, nil, nil), StateLeader}, + {newNetwork(nil, nil, nopStepper), StateLeader}, + {newNetwork(nil, nopStepper, nopStepper), StateCandidate}, + {newNetwork(nil, nopStepper, nopStepper, nil), StateCandidate}, + {newNetwork(nil, nopStepper, nopStepper, nil, nil), StateLeader}, // three logs further along than 0 - {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), stateFollower}, + {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), StateFollower}, // logs converge - {newNetwork(ents(1), nil, ents(2), ents(1), nil), stateLeader}, + {newNetwork(ents(1), nil, ents(2), ents(1), nil), StateLeader}, } for i, tt := range tests { @@ -226,13 +226,13 @@ func TestDuelingCandidates(t *testing.T) { wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}, committed: 1} tests := []struct { sm *raft - state stateType + state StateType term int64 raftLog *raftLog }{ - {a, stateFollower, 2, wlog}, - {b, stateFollower, 2, wlog}, - {c, stateFollower, 2, newLog()}, + {a, StateFollower, 2, wlog}, + {b, StateFollower, 2, wlog}, + {c, StateFollower, 2, newLog()}, } for i, tt := range tests { @@ -269,8 +269,8 @@ func TestCandidateConcede(t *testing.T) { tt.send(pb.Message{From: 3, To: 3, Type: msgProp, Entries: []pb.Entry{{Data: data}}}) a := tt.peers[1].(*raft) - if g := a.state; g != stateFollower { - t.Errorf("state = %s, want %s", g, stateFollower) + if g := a.state; g != StateFollower { + t.Errorf("state = %s, want %s", g, StateFollower) } if g := a.Term; g != 1 { t.Errorf("term = %d, want %d", g, 1) @@ -293,8 +293,8 @@ func TestSingleNodeCandidate(t *testing.T) { tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) sm := tt.peers[1].(*raft) - if sm.state != stateLeader { - t.Errorf("state = %d, want %d", sm.state, stateLeader) + if sm.state != StateLeader { + t.Errorf("state = %d, want %d", sm.state, StateLeader) } } @@ -450,7 +450,7 @@ func TestCommit(t *testing.T) { for j := 0; j < len(tt.matches); j++ { prs[int64(j)] = &progress{tt.matches[j], tt.matches[j] + 1} } - sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, State: pb.State{Term: tt.smTerm}} + sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, HardState: pb.HardState{Term: tt.smTerm}} sm.maybeCommit() if g := sm.raftLog.committed; g != tt.w { t.Errorf("#%d: committed = %d, want %d", i, g, tt.w) @@ -504,9 +504,9 @@ func TestHandleMsgApp(t *testing.T) { for i, tt := range tests { sm := &raft{ - state: stateFollower, - State: pb.State{Term: 2}, - raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}}, + state: StateFollower, + HardState: pb.HardState{Term: 2}, + raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}}, } sm.handleAppendEntries(tt.m) @@ -532,50 +532,50 @@ func TestHandleMsgApp(t *testing.T) { func TestRecvMsgVote(t *testing.T) { tests := []struct { - state stateType + state StateType i, term int64 voteFor int64 w int64 }{ - {stateFollower, 0, 0, None, -1}, - {stateFollower, 0, 1, None, -1}, - {stateFollower, 0, 2, None, -1}, - {stateFollower, 0, 3, None, 2}, + {StateFollower, 0, 0, None, -1}, + {StateFollower, 0, 1, None, -1}, + {StateFollower, 0, 2, None, -1}, + {StateFollower, 0, 3, None, 2}, - {stateFollower, 1, 0, None, -1}, - {stateFollower, 1, 1, None, -1}, - {stateFollower, 1, 2, None, -1}, - {stateFollower, 1, 3, None, 2}, + {StateFollower, 1, 0, None, -1}, + {StateFollower, 1, 1, None, -1}, + {StateFollower, 1, 2, None, -1}, + {StateFollower, 1, 3, None, 2}, - {stateFollower, 2, 0, None, -1}, - {stateFollower, 2, 1, None, -1}, - {stateFollower, 2, 2, None, 2}, - {stateFollower, 2, 3, None, 2}, + {StateFollower, 2, 0, None, -1}, + {StateFollower, 2, 1, None, -1}, + {StateFollower, 2, 2, None, 2}, + {StateFollower, 2, 3, None, 2}, - {stateFollower, 3, 0, None, -1}, - {stateFollower, 3, 1, None, -1}, - {stateFollower, 3, 2, None, 2}, - {stateFollower, 3, 3, None, 2}, + {StateFollower, 3, 0, None, -1}, + {StateFollower, 3, 1, None, -1}, + {StateFollower, 3, 2, None, 2}, + {StateFollower, 3, 3, None, 2}, - {stateFollower, 3, 2, 2, 2}, - {stateFollower, 3, 2, 1, -1}, + {StateFollower, 3, 2, 2, 2}, + {StateFollower, 3, 2, 1, -1}, - {stateLeader, 3, 3, 1, -1}, - {stateCandidate, 3, 3, 1, -1}, + {StateLeader, 3, 3, 1, -1}, + {StateCandidate, 3, 3, 1, -1}, } for i, tt := range tests { sm := newRaft(1, []int64{1}, 0, 0) sm.state = tt.state switch tt.state { - case stateFollower: + case StateFollower: sm.step = stepFollower - case stateCandidate: + case StateCandidate: sm.step = stepCandidate - case stateLeader: + case StateLeader: sm.step = stepLeader } - sm.State = pb.State{Vote: tt.voteFor} + sm.HardState = pb.HardState{Vote: tt.voteFor} sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}} sm.Step(pb.Message{Type: msgVote, From: 2, Index: tt.i, LogTerm: tt.term}) @@ -593,23 +593,23 @@ func TestRecvMsgVote(t *testing.T) { func TestStateTransition(t *testing.T) { tests := []struct { - from stateType - to stateType + from StateType + to StateType wallow bool wterm int64 wlead int64 }{ - {stateFollower, stateFollower, true, 1, None}, - {stateFollower, stateCandidate, true, 1, None}, - {stateFollower, stateLeader, false, -1, None}, + {StateFollower, StateFollower, true, 1, None}, + {StateFollower, StateCandidate, true, 1, None}, + {StateFollower, StateLeader, false, -1, None}, - {stateCandidate, stateFollower, true, 0, None}, - {stateCandidate, stateCandidate, true, 1, None}, - {stateCandidate, stateLeader, true, 0, 1}, + {StateCandidate, StateFollower, true, 0, None}, + {StateCandidate, StateCandidate, true, 1, None}, + {StateCandidate, StateLeader, true, 0, 1}, - {stateLeader, stateFollower, true, 1, None}, - {stateLeader, stateCandidate, false, 1, None}, - {stateLeader, stateLeader, true, 0, 1}, + {StateLeader, StateFollower, true, 1, None}, + {StateLeader, StateCandidate, false, 1, None}, + {StateLeader, StateLeader, true, 0, 1}, } for i, tt := range tests { @@ -626,11 +626,11 @@ func TestStateTransition(t *testing.T) { sm.state = tt.from switch tt.to { - case stateFollower: + case StateFollower: sm.becomeFollower(tt.wterm, tt.wlead) - case stateCandidate: + case StateCandidate: sm.becomeCandidate() - case stateLeader: + case StateLeader: sm.becomeLeader() } @@ -646,15 +646,15 @@ func TestStateTransition(t *testing.T) { func TestAllServerStepdown(t *testing.T) { tests := []struct { - state stateType + state StateType - wstate stateType + wstate StateType wterm int64 windex int64 }{ - {stateFollower, stateFollower, 3, 1}, - {stateCandidate, stateFollower, 3, 1}, - {stateLeader, stateFollower, 3, 2}, + {StateFollower, StateFollower, 3, 1}, + {StateCandidate, StateFollower, 3, 1}, + {StateLeader, StateFollower, 3, 2}, } tmsgTypes := [...]int64{msgVote, msgApp} @@ -663,11 +663,11 @@ func TestAllServerStepdown(t *testing.T) { for i, tt := range tests { sm := newRaft(1, []int64{1, 2, 3}, 0, 0) switch tt.state { - case stateFollower: + case StateFollower: sm.becomeFollower(1, None) - case stateCandidate: + case StateCandidate: sm.becomeCandidate() - case stateLeader: + case StateLeader: sm.becomeCandidate() sm.becomeLeader() } @@ -796,13 +796,13 @@ func TestBcastBeat(t *testing.T) { // tests the output of the statemachine when receiving msgBeat func TestRecvMsgBeat(t *testing.T) { tests := []struct { - state stateType + state StateType wMsg int }{ - {stateLeader, 2}, + {StateLeader, 2}, // candidate and follower should ignore msgBeat - {stateCandidate, 0}, - {stateFollower, 0}, + {StateCandidate, 0}, + {StateFollower, 0}, } for i, tt := range tests { @@ -811,11 +811,11 @@ func TestRecvMsgBeat(t *testing.T) { sm.Term = 1 sm.state = tt.state switch tt.state { - case stateFollower: + case StateFollower: sm.step = stepFollower - case stateCandidate: + case StateCandidate: sm.step = stepCandidate - case stateLeader: + case StateLeader: sm.step = stepLeader } sm.Step(pb.Message{From: 1, To: 1, Type: msgBeat}) diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index feef8eb25..148c302e2 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -13,7 +13,7 @@ Entry Snapshot Message - State + HardState */ package raftpb @@ -80,16 +80,16 @@ func (m *Message) Reset() { *m = Message{} } func (m *Message) String() string { return proto.CompactTextString(m) } func (*Message) ProtoMessage() {} -type State struct { +type HardState struct { Term int64 `protobuf:"varint,1,req,name=term" json:"term"` Vote int64 `protobuf:"varint,2,req,name=vote" json:"vote"` Commit int64 `protobuf:"varint,3,req,name=commit" json:"commit"` XXX_unrecognized []byte `json:"-"` } -func (m *State) Reset() { *m = State{} } -func (m *State) String() string { return proto.CompactTextString(m) } -func (*State) ProtoMessage() {} +func (m *HardState) Reset() { *m = HardState{} } +func (m *HardState) String() string { return proto.CompactTextString(m) } +func (*HardState) ProtoMessage() {} func init() { } @@ -549,7 +549,7 @@ func (m *Message) Unmarshal(data []byte) error { } return nil } -func (m *State) Unmarshal(data []byte) error { +func (m *HardState) Unmarshal(data []byte) error { l := len(data) index := 0 for index < l { @@ -697,7 +697,7 @@ func (m *Message) Size() (n int) { } return n } -func (m *State) Size() (n int) { +func (m *HardState) Size() (n int) { var l int _ = l n += 1 + sovRaft(uint64(m.Term)) @@ -879,7 +879,7 @@ func (m *Message) MarshalTo(data []byte) (n int, err error) { } return i, nil } -func (m *State) Marshal() (data []byte, err error) { +func (m *HardState) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) n, err := m.MarshalTo(data) @@ -889,7 +889,7 @@ func (m *State) Marshal() (data []byte, err error) { return data[:n], nil } -func (m *State) MarshalTo(data []byte) (n int, err error) { +func (m *HardState) MarshalTo(data []byte) (n int, err error) { var i int _ = i var l int diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index e254ebab7..d6bf94ab0 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -36,7 +36,7 @@ message Message { required Snapshot snapshot = 9 [(gogoproto.nullable) = false]; } -message State { +message HardState { required int64 term = 1 [(gogoproto.nullable) = false]; required int64 vote = 2 [(gogoproto.nullable) = false]; required int64 commit = 3 [(gogoproto.nullable) = false]; diff --git a/wal/decoder.go b/wal/decoder.go index 4921f5ba6..3d243e9e6 100644 --- a/wal/decoder.go +++ b/wal/decoder.go @@ -76,8 +76,8 @@ func mustUnmarshalEntry(d []byte) raftpb.Entry { return e } -func mustUnmarshalState(d []byte) raftpb.State { - var s raftpb.State +func mustUnmarshalState(d []byte) raftpb.HardState { + var s raftpb.HardState if err := s.Unmarshal(d); err != nil { panic(err) } diff --git a/wal/wal.go b/wal/wal.go index d369a23c9..47c22d2eb 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -153,7 +153,7 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) { // ReadAll reads out all records of the current WAL. // After ReadAll, the WAL will be ready for appending new records. -func (w *WAL) ReadAll() (id int64, state raftpb.State, ents []raftpb.Entry, err error) { +func (w *WAL) ReadAll() (id int64, state raftpb.HardState, ents []raftpb.Entry, err error) { rec := &walpb.Record{} decoder := w.decoder @@ -264,7 +264,7 @@ func (w *WAL) SaveEntry(e *raftpb.Entry) error { return nil } -func (w *WAL) SaveState(s *raftpb.State) error { +func (w *WAL) SaveState(s *raftpb.HardState) error { if raft.IsEmptyState(*s) { return nil } @@ -277,7 +277,7 @@ func (w *WAL) SaveState(s *raftpb.State) error { return w.encoder.encode(rec) } -func (w *WAL) Save(st raftpb.State, ents []raftpb.Entry) { +func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) { // TODO(xiangli): no more reference operator w.SaveState(&st) for i := range ents { diff --git a/wal/wal_test.go b/wal/wal_test.go index 49580cf14..9fbefbec6 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -175,7 +175,7 @@ func TestRecover(t *testing.T) { t.Fatal(err) } } - sts := []raftpb.State{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}} + sts := []raftpb.HardState{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}} for _, s := range sts { if err = w.SaveState(&s); err != nil { t.Fatal(err) @@ -341,7 +341,7 @@ func TestRecoverAfterCut(t *testing.T) { func TestSaveEmpty(t *testing.T) { var buf bytes.Buffer - var est raftpb.State + var est raftpb.HardState w := WAL{ encoder: newEncoder(&buf, 0), }