From ab5df57ecf9f52a026e45b8df3b884f33b17b7c2 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 13 Oct 2015 17:29:01 -0700 Subject: [PATCH] etcdserver: fix raft state machine may block When snapshot store requests raft snapshot from etcdserver apply loop, it may block on the channel for some time, or wait some time for KV to snapshot. This is unexpected because raft state machine should be unblocked. Even worse, this block may lead to deadlock: 1. raft state machine waits on getting snapshot from raft memory storage 2. raft memory storage waits snapshot store to get snapshot 3. snapshot store requests raft snapshot from apply loop 4. apply loop is applying entries, and waits raftNode loop to finish messages sending 5. raftNode loop waits peer loop in Transport to send out messages 6. peer loop in Transport waits for raft state machine to process message Fix it by changing the logic of getSnap to be asynchronously creation. --- etcdserver/server.go | 8 ++ etcdserver/server_test.go | 1 + etcdserver/snapshot_store.go | 117 +++++++++++++++-- etcdserver/snapshot_store_test.go | 204 ++++++++++++++++++++++++++++++ rafthttp/transport.go | 6 +- 5 files changed, 324 insertions(+), 12 deletions(-) create mode 100644 etcdserver/snapshot_store_test.go diff --git a/etcdserver/server.go b/etcdserver/server.go index 92cf3a87e..0b081bb92 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -459,8 +459,13 @@ func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved( func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) } +// ReportSnapshot reports snapshot sent status to the raft state machine, +// and clears the used snapshot from the snapshot store. func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { s.r.ReportSnapshot(id, status) + if s.cfg.V3demo { + s.r.raftStorage.snapStore.clearUsedSnap() + } } func (s *EtcdServer) run() { @@ -1019,6 +1024,9 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { plog.Panicf("unexpected compaction error %v", err) } plog.Infof("compacted raft log at %d", compacti) + if s.cfg.V3demo && s.r.raftStorage.snapStore.closeSnapBefore(compacti) { + plog.Infof("closed snapshot stored due to compaction at %d", compacti) + } }() } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 600fd05a0..fc20f25ce 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -761,6 +761,7 @@ func TestSnapshot(t *testing.T) { st := &storeRecorder{} p := &storageRecorder{} srv := &EtcdServer{ + cfg: &ServerConfig{}, r: raftNode{ Node: &nodeRecorder{}, raftStorage: s, diff --git a/etcdserver/snapshot_store.go b/etcdserver/snapshot_store.go index 0f07655c0..573f0e7ff 100644 --- a/etcdserver/snapshot_store.go +++ b/etcdserver/snapshot_store.go @@ -20,7 +20,10 @@ import ( "io/ioutil" "os" "path" + "sync" + "time" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" @@ -28,6 +31,14 @@ import ( dstorage "github.com/coreos/etcd/storage" ) +// clearUnusedSnapshotInterval specifies the time interval to wait +// before clearing unused snapshot. +// The newly created snapshot should be retrieved within one heartbeat +// interval because raft state machine retries to send snapshot +// to slow follower when receiving MsgHeartbeatResp from the follower. +// Set it as 5s to match the upper limit of heartbeat interval. +const clearUnusedSnapshotInterval = 5 * time.Second + type snapshot struct { r raftpb.Snapshot @@ -78,7 +89,13 @@ type snapshotStore struct { // snapshotStore will receive from the chan immediately after it sends empty to reqsnapc raftsnapc chan raftpb.Snapshot - snap *snapshot + mu sync.Mutex // protect belowing vars + // snap is nil iff there is no snapshot stored + snap *snapshot + inUse bool + createOnce sync.Once // ensure at most one snapshot is created when no snapshot stored + + clock clockwork.Clock } func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore { @@ -87,35 +104,113 @@ func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore { kv: kv, reqsnapc: make(chan struct{}), raftsnapc: make(chan raftpb.Snapshot), + clock: clockwork.NewRealClock(), } } // getSnap returns a snapshot. // If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned. // -// Internally it creates new snapshot and returns the snapshot. Unless the -// returned snapshot is closed, it rejects creating new one and returns -// ErrSnapshotTemporarilyUnavailable. +// If the snapshot stored is in use, it returns ErrSnapshotTemporarilyUnavailable. +// If there is no snapshot stored, it creates new snapshot +// asynchronously and returns ErrSnapshotTemporarilyUnavailable, so +// caller could get snapshot later when the snapshot is created. +// Otherwise, it returns the snapshot stored. +// +// The created snapshot is cleared from the snapshot store if it is +// either unused after clearUnusedSnapshotInterval, or explicitly cleared +// through clearUsedSnap after using. +// closeSnapBefore is used to close outdated snapshot, +// so the snapshot will be cleared faster when in use. +// +// snapshot store stores at most one snapshot at a time. // If raft state machine wants to send two snapshot messages to two followers, // the second snapshot message will keep getting snapshot and succeed only after // the first message is sent. This increases the time used to send messages, // but it is acceptable because this should happen seldomly. func (ss *snapshotStore) getSnap() (*snapshot, error) { - // If snapshotStore has some snapshot that has not been closed, it cannot - // request new snapshot. So it returns ErrSnapshotTemporarilyUnavailable. - if ss.snap != nil && !ss.snap.isClosed() { + ss.mu.Lock() + defer ss.mu.Unlock() + + if ss.inUse { return nil, raft.ErrSnapshotTemporarilyUnavailable } + if ss.snap == nil { + // create snapshot asynchronously + ss.createOnce.Do(func() { go ss.createSnap() }) + return nil, raft.ErrSnapshotTemporarilyUnavailable + } + + ss.inUse = true + // give transporter the generated snapshot that is ready to send out + ss.tr.SnapshotReady(ss.snap, ss.snap.raft().Metadata.Index) + return ss.snap, nil +} + +// clearUsedSnap clears the snapshot from the snapshot store after it +// is used. +// After clear, snapshotStore could create new snapshot when getSnap. +func (ss *snapshotStore) clearUsedSnap() { + ss.mu.Lock() + defer ss.mu.Unlock() + if !ss.inUse { + plog.Panicf("unexpected clearUsedSnap when snapshot is not in use") + } + ss.clear() +} + +// closeSnapBefore closes the stored snapshot if its index is not greater +// than the given compact index. +// If it closes the snapshot, it returns true. +func (ss *snapshotStore) closeSnapBefore(index uint64) bool { + ss.mu.Lock() + defer ss.mu.Unlock() + if ss.snap != nil && ss.snap.raft().Metadata.Index <= index { + if err := ss.snap.Close(); err != nil { + plog.Errorf("snapshot close error (%v)", err) + } + return true + } + return false +} + +// createSnap creates a new snapshot and stores it into the snapshot store. +// It also sets a timer to clear the snapshot if it is not in use after +// some time interval. +// It should only be called in snapshotStore functions. +func (ss *snapshotStore) createSnap() { // ask to generate v2 snapshot ss.reqsnapc <- struct{}{} // generate KV snapshot kvsnap := ss.kv.Snapshot() raftsnap := <-ss.raftsnapc - ss.snap = newSnapshot(raftsnap, kvsnap) - // give transporter the generated snapshot that is ready to send out - ss.tr.SnapshotReady(ss.snap, raftsnap.Metadata.Index) - return ss.snap, nil + snap := newSnapshot(raftsnap, kvsnap) + + ss.mu.Lock() + ss.snap = snap + ss.mu.Unlock() + + go func() { + <-ss.clock.After(clearUnusedSnapshotInterval) + ss.mu.Lock() + defer ss.mu.Unlock() + if snap == ss.snap && !ss.inUse { + ss.clear() + } + }() +} + +// clear clears snapshot related variables in snapshotStore. It closes +// the snapshot stored and sets the variables to initial values. +// It should only be called in snapshotStore functions. +func (ss *snapshotStore) clear() { + if err := ss.snap.Close(); err != nil { + plog.Errorf("snapshot close error (%v)", err) + } + ss.snap = nil + ss.inUse = false + ss.createOnce = sync.Once{} } // SaveFrom saves snapshot at the given index from the given reader. diff --git a/etcdserver/snapshot_store_test.go b/etcdserver/snapshot_store_test.go new file mode 100644 index 000000000..119108a34 --- /dev/null +++ b/etcdserver/snapshot_store_test.go @@ -0,0 +1,204 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "io" + "reflect" + "sync" + "testing" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" + "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + dstorage "github.com/coreos/etcd/storage" + "github.com/coreos/etcd/storage/storagepb" +) + +func TestSnapshotStoreCreateSnap(t *testing.T) { + snap := raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{Index: 1}, + } + ss := newSnapshotStore("", &nopKV{}) + fakeClock := clockwork.NewFakeClock() + ss.clock = fakeClock + go func() { + <-ss.reqsnapc + ss.raftsnapc <- snap + }() + + // create snapshot + ss.createSnap() + if !reflect.DeepEqual(ss.snap.raft(), snap) { + t.Errorf("raftsnap = %+v, want %+v", ss.snap.raft(), snap) + } + + // unused snapshot is cleared after clearUnusedSnapshotInterval + fakeClock.BlockUntil(1) + fakeClock.Advance(clearUnusedSnapshotInterval) + testutil.WaitSchedule() + ss.mu.Lock() + if ss.snap != nil { + t.Errorf("snap = %+v, want %+v", ss.snap, nil) + } + ss.mu.Unlock() +} + +func TestSnapshotStoreGetSnap(t *testing.T) { + snap := raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{Index: 1}, + } + ss := newSnapshotStore("", &nopKV{}) + fakeClock := clockwork.NewFakeClock() + ss.clock = fakeClock + ss.tr = &nopTransporter{} + go func() { + <-ss.reqsnapc + ss.raftsnapc <- snap + }() + + // get snap when no snapshot stored + _, err := ss.getSnap() + if err != raft.ErrSnapshotTemporarilyUnavailable { + t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable) + } + + // wait for asynchronous snapshot creation to finish + testutil.WaitSchedule() + // get the created snapshot + s, err := ss.getSnap() + if err != nil { + t.Fatalf("getSnap error = %v, want nil", err) + } + if !reflect.DeepEqual(s.raft(), snap) { + t.Errorf("raftsnap = %+v, want %+v", s.raft(), snap) + } + if !ss.inUse { + t.Errorf("inUse = %v, want true", ss.inUse) + } + + // get snap when snapshot stored has been in use + _, err = ss.getSnap() + if err != raft.ErrSnapshotTemporarilyUnavailable { + t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable) + } + + // clean up + fakeClock.Advance(clearUnusedSnapshotInterval) +} + +func TestSnapshotStoreClearUsedSnap(t *testing.T) { + s := &fakeSnapshot{} + var once sync.Once + once.Do(func() {}) + ss := &snapshotStore{ + snap: newSnapshot(raftpb.Snapshot{}, s), + inUse: true, + createOnce: once, + } + + ss.clearUsedSnap() + // wait for underlying KV snapshot closed + testutil.WaitSchedule() + s.mu.Lock() + if !s.closed { + t.Errorf("snapshot closed = %v, want true", s.closed) + } + s.mu.Unlock() + if ss.snap != nil { + t.Errorf("snapshot = %v, want nil", ss.snap) + } + if ss.inUse { + t.Errorf("isUse = %v, want false", ss.inUse) + } + // test createOnce is reset + if ss.createOnce == once { + t.Errorf("createOnce fails to reset") + } +} + +func TestSnapshotStoreCloseSnapBefore(t *testing.T) { + snapIndex := uint64(5) + + tests := []struct { + index uint64 + wok bool + }{ + {snapIndex - 2, false}, + {snapIndex - 1, false}, + {snapIndex, true}, + } + for i, tt := range tests { + rs := raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{Index: 5}, + } + s := &fakeSnapshot{} + ss := &snapshotStore{ + snap: newSnapshot(rs, s), + } + + ok := ss.closeSnapBefore(tt.index) + if ok != tt.wok { + t.Errorf("#%d: closeSnapBefore = %v, want %v", i, ok, tt.wok) + } + if ok { + // wait for underlying KV snapshot closed + testutil.WaitSchedule() + s.mu.Lock() + if !s.closed { + t.Errorf("#%d: snapshot closed = %v, want true", i, s.closed) + } + s.mu.Unlock() + } + } +} + +type nopKV struct{} + +func (kv *nopKV) Rev() int64 { return 0 } +func (kv *nopKV) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { + return nil, 0, nil +} +func (kv *nopKV) Put(key, value []byte) (rev int64) { return 0 } +func (kv *nopKV) DeleteRange(key, end []byte) (n, rev int64) { return 0, 0 } +func (kv *nopKV) TxnBegin() int64 { return 0 } +func (kv *nopKV) TxnEnd(txnID int64) error { return nil } +func (kv *nopKV) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { + return nil, 0, nil +} +func (kv *nopKV) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { return 0, nil } +func (kv *nopKV) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { + return 0, 0, nil +} +func (kv *nopKV) Compact(rev int64) error { return nil } +func (kv *nopKV) Hash() (uint32, error) { return 0, nil } +func (kv *nopKV) Snapshot() dstorage.Snapshot { return &fakeSnapshot{} } +func (kv *nopKV) Restore() error { return nil } +func (kv *nopKV) Close() error { return nil } + +type fakeSnapshot struct { + mu sync.Mutex + closed bool +} + +func (s *fakeSnapshot) Size() int64 { return 0 } +func (s *fakeSnapshot) WriteTo(w io.Writer) (int64, error) { return 0, nil } +func (s *fakeSnapshot) Close() error { + s.mu.Lock() + s.closed = true + s.mu.Unlock() + return nil +} diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 85f8a59c8..6f1fbe7fb 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -86,7 +86,11 @@ type Transporter interface { // If the connection is currently inactive, it returns zero time. ActiveSince(id types.ID) time.Time // SnapshotReady accepts a snapshot at the given index that is ready to send out. - // SnapshotReady MUST not be called when the snapshot sent result of previous + // It is expected that caller sends a raft snapshot message with + // the given index soon, and the accepted snapshot will be sent out + // together. After sending, snapshot sent status is reported + // through Raft.SnapshotStatus. + // SnapshotReady MUST not be called when the snapshot sent status of previous // accepted one has not been reported. SnapshotReady(rc io.ReadCloser, index uint64) // Stop closes the connections and stops the transporter.