etcdserver: fix raft state machine may block

When snapshot store requests raft snapshot from etcdserver apply loop,
it may block on the channel for some time, or wait some time for KV to
snapshot. This is unexpected because raft state machine should be unblocked.

Even worse, this block may lead to deadlock:
1. raft state machine waits on getting snapshot from raft memory storage
2. raft memory storage waits snapshot store to get snapshot
3. snapshot store requests raft snapshot from apply loop
4. apply loop is applying entries, and waits raftNode loop to finish
messages sending
5. raftNode loop waits peer loop in Transport to send out messages
6. peer loop in Transport waits for raft state machine to process message

Fix it by changing the logic of getSnap to be asynchronously creation.
release-2.3
Yicheng Qin 2015-10-13 17:29:01 -07:00
parent 1f21ccf166
commit ab5df57ecf
5 changed files with 324 additions and 12 deletions

View File

@ -459,8 +459,13 @@ func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(
func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
// ReportSnapshot reports snapshot sent status to the raft state machine,
// and clears the used snapshot from the snapshot store.
func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
s.r.ReportSnapshot(id, status)
if s.cfg.V3demo {
s.r.raftStorage.snapStore.clearUsedSnap()
}
}
func (s *EtcdServer) run() {
@ -1019,6 +1024,9 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
plog.Panicf("unexpected compaction error %v", err)
}
plog.Infof("compacted raft log at %d", compacti)
if s.cfg.V3demo && s.r.raftStorage.snapStore.closeSnapBefore(compacti) {
plog.Infof("closed snapshot stored due to compaction at %d", compacti)
}
}()
}

View File

@ -761,6 +761,7 @@ func TestSnapshot(t *testing.T) {
st := &storeRecorder{}
p := &storageRecorder{}
srv := &EtcdServer{
cfg: &ServerConfig{},
r: raftNode{
Node: &nodeRecorder{},
raftStorage: s,

View File

@ -20,7 +20,10 @@ import (
"io/ioutil"
"os"
"path"
"sync"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
@ -28,6 +31,14 @@ import (
dstorage "github.com/coreos/etcd/storage"
)
// clearUnusedSnapshotInterval specifies the time interval to wait
// before clearing unused snapshot.
// The newly created snapshot should be retrieved within one heartbeat
// interval because raft state machine retries to send snapshot
// to slow follower when receiving MsgHeartbeatResp from the follower.
// Set it as 5s to match the upper limit of heartbeat interval.
const clearUnusedSnapshotInterval = 5 * time.Second
type snapshot struct {
r raftpb.Snapshot
@ -78,7 +89,13 @@ type snapshotStore struct {
// snapshotStore will receive from the chan immediately after it sends empty to reqsnapc
raftsnapc chan raftpb.Snapshot
snap *snapshot
mu sync.Mutex // protect belowing vars
// snap is nil iff there is no snapshot stored
snap *snapshot
inUse bool
createOnce sync.Once // ensure at most one snapshot is created when no snapshot stored
clock clockwork.Clock
}
func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
@ -87,35 +104,113 @@ func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
kv: kv,
reqsnapc: make(chan struct{}),
raftsnapc: make(chan raftpb.Snapshot),
clock: clockwork.NewRealClock(),
}
}
// getSnap returns a snapshot.
// If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned.
//
// Internally it creates new snapshot and returns the snapshot. Unless the
// returned snapshot is closed, it rejects creating new one and returns
// ErrSnapshotTemporarilyUnavailable.
// If the snapshot stored is in use, it returns ErrSnapshotTemporarilyUnavailable.
// If there is no snapshot stored, it creates new snapshot
// asynchronously and returns ErrSnapshotTemporarilyUnavailable, so
// caller could get snapshot later when the snapshot is created.
// Otherwise, it returns the snapshot stored.
//
// The created snapshot is cleared from the snapshot store if it is
// either unused after clearUnusedSnapshotInterval, or explicitly cleared
// through clearUsedSnap after using.
// closeSnapBefore is used to close outdated snapshot,
// so the snapshot will be cleared faster when in use.
//
// snapshot store stores at most one snapshot at a time.
// If raft state machine wants to send two snapshot messages to two followers,
// the second snapshot message will keep getting snapshot and succeed only after
// the first message is sent. This increases the time used to send messages,
// but it is acceptable because this should happen seldomly.
func (ss *snapshotStore) getSnap() (*snapshot, error) {
// If snapshotStore has some snapshot that has not been closed, it cannot
// request new snapshot. So it returns ErrSnapshotTemporarilyUnavailable.
if ss.snap != nil && !ss.snap.isClosed() {
ss.mu.Lock()
defer ss.mu.Unlock()
if ss.inUse {
return nil, raft.ErrSnapshotTemporarilyUnavailable
}
if ss.snap == nil {
// create snapshot asynchronously
ss.createOnce.Do(func() { go ss.createSnap() })
return nil, raft.ErrSnapshotTemporarilyUnavailable
}
ss.inUse = true
// give transporter the generated snapshot that is ready to send out
ss.tr.SnapshotReady(ss.snap, ss.snap.raft().Metadata.Index)
return ss.snap, nil
}
// clearUsedSnap clears the snapshot from the snapshot store after it
// is used.
// After clear, snapshotStore could create new snapshot when getSnap.
func (ss *snapshotStore) clearUsedSnap() {
ss.mu.Lock()
defer ss.mu.Unlock()
if !ss.inUse {
plog.Panicf("unexpected clearUsedSnap when snapshot is not in use")
}
ss.clear()
}
// closeSnapBefore closes the stored snapshot if its index is not greater
// than the given compact index.
// If it closes the snapshot, it returns true.
func (ss *snapshotStore) closeSnapBefore(index uint64) bool {
ss.mu.Lock()
defer ss.mu.Unlock()
if ss.snap != nil && ss.snap.raft().Metadata.Index <= index {
if err := ss.snap.Close(); err != nil {
plog.Errorf("snapshot close error (%v)", err)
}
return true
}
return false
}
// createSnap creates a new snapshot and stores it into the snapshot store.
// It also sets a timer to clear the snapshot if it is not in use after
// some time interval.
// It should only be called in snapshotStore functions.
func (ss *snapshotStore) createSnap() {
// ask to generate v2 snapshot
ss.reqsnapc <- struct{}{}
// generate KV snapshot
kvsnap := ss.kv.Snapshot()
raftsnap := <-ss.raftsnapc
ss.snap = newSnapshot(raftsnap, kvsnap)
// give transporter the generated snapshot that is ready to send out
ss.tr.SnapshotReady(ss.snap, raftsnap.Metadata.Index)
return ss.snap, nil
snap := newSnapshot(raftsnap, kvsnap)
ss.mu.Lock()
ss.snap = snap
ss.mu.Unlock()
go func() {
<-ss.clock.After(clearUnusedSnapshotInterval)
ss.mu.Lock()
defer ss.mu.Unlock()
if snap == ss.snap && !ss.inUse {
ss.clear()
}
}()
}
// clear clears snapshot related variables in snapshotStore. It closes
// the snapshot stored and sets the variables to initial values.
// It should only be called in snapshotStore functions.
func (ss *snapshotStore) clear() {
if err := ss.snap.Close(); err != nil {
plog.Errorf("snapshot close error (%v)", err)
}
ss.snap = nil
ss.inUse = false
ss.createOnce = sync.Once{}
}
// SaveFrom saves snapshot at the given index from the given reader.

View File

@ -0,0 +1,204 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"io"
"reflect"
"sync"
"testing"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
dstorage "github.com/coreos/etcd/storage"
"github.com/coreos/etcd/storage/storagepb"
)
func TestSnapshotStoreCreateSnap(t *testing.T) {
snap := raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{Index: 1},
}
ss := newSnapshotStore("", &nopKV{})
fakeClock := clockwork.NewFakeClock()
ss.clock = fakeClock
go func() {
<-ss.reqsnapc
ss.raftsnapc <- snap
}()
// create snapshot
ss.createSnap()
if !reflect.DeepEqual(ss.snap.raft(), snap) {
t.Errorf("raftsnap = %+v, want %+v", ss.snap.raft(), snap)
}
// unused snapshot is cleared after clearUnusedSnapshotInterval
fakeClock.BlockUntil(1)
fakeClock.Advance(clearUnusedSnapshotInterval)
testutil.WaitSchedule()
ss.mu.Lock()
if ss.snap != nil {
t.Errorf("snap = %+v, want %+v", ss.snap, nil)
}
ss.mu.Unlock()
}
func TestSnapshotStoreGetSnap(t *testing.T) {
snap := raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{Index: 1},
}
ss := newSnapshotStore("", &nopKV{})
fakeClock := clockwork.NewFakeClock()
ss.clock = fakeClock
ss.tr = &nopTransporter{}
go func() {
<-ss.reqsnapc
ss.raftsnapc <- snap
}()
// get snap when no snapshot stored
_, err := ss.getSnap()
if err != raft.ErrSnapshotTemporarilyUnavailable {
t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable)
}
// wait for asynchronous snapshot creation to finish
testutil.WaitSchedule()
// get the created snapshot
s, err := ss.getSnap()
if err != nil {
t.Fatalf("getSnap error = %v, want nil", err)
}
if !reflect.DeepEqual(s.raft(), snap) {
t.Errorf("raftsnap = %+v, want %+v", s.raft(), snap)
}
if !ss.inUse {
t.Errorf("inUse = %v, want true", ss.inUse)
}
// get snap when snapshot stored has been in use
_, err = ss.getSnap()
if err != raft.ErrSnapshotTemporarilyUnavailable {
t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable)
}
// clean up
fakeClock.Advance(clearUnusedSnapshotInterval)
}
func TestSnapshotStoreClearUsedSnap(t *testing.T) {
s := &fakeSnapshot{}
var once sync.Once
once.Do(func() {})
ss := &snapshotStore{
snap: newSnapshot(raftpb.Snapshot{}, s),
inUse: true,
createOnce: once,
}
ss.clearUsedSnap()
// wait for underlying KV snapshot closed
testutil.WaitSchedule()
s.mu.Lock()
if !s.closed {
t.Errorf("snapshot closed = %v, want true", s.closed)
}
s.mu.Unlock()
if ss.snap != nil {
t.Errorf("snapshot = %v, want nil", ss.snap)
}
if ss.inUse {
t.Errorf("isUse = %v, want false", ss.inUse)
}
// test createOnce is reset
if ss.createOnce == once {
t.Errorf("createOnce fails to reset")
}
}
func TestSnapshotStoreCloseSnapBefore(t *testing.T) {
snapIndex := uint64(5)
tests := []struct {
index uint64
wok bool
}{
{snapIndex - 2, false},
{snapIndex - 1, false},
{snapIndex, true},
}
for i, tt := range tests {
rs := raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{Index: 5},
}
s := &fakeSnapshot{}
ss := &snapshotStore{
snap: newSnapshot(rs, s),
}
ok := ss.closeSnapBefore(tt.index)
if ok != tt.wok {
t.Errorf("#%d: closeSnapBefore = %v, want %v", i, ok, tt.wok)
}
if ok {
// wait for underlying KV snapshot closed
testutil.WaitSchedule()
s.mu.Lock()
if !s.closed {
t.Errorf("#%d: snapshot closed = %v, want true", i, s.closed)
}
s.mu.Unlock()
}
}
}
type nopKV struct{}
func (kv *nopKV) Rev() int64 { return 0 }
func (kv *nopKV) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
return nil, 0, nil
}
func (kv *nopKV) Put(key, value []byte) (rev int64) { return 0 }
func (kv *nopKV) DeleteRange(key, end []byte) (n, rev int64) { return 0, 0 }
func (kv *nopKV) TxnBegin() int64 { return 0 }
func (kv *nopKV) TxnEnd(txnID int64) error { return nil }
func (kv *nopKV) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
return nil, 0, nil
}
func (kv *nopKV) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { return 0, nil }
func (kv *nopKV) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
return 0, 0, nil
}
func (kv *nopKV) Compact(rev int64) error { return nil }
func (kv *nopKV) Hash() (uint32, error) { return 0, nil }
func (kv *nopKV) Snapshot() dstorage.Snapshot { return &fakeSnapshot{} }
func (kv *nopKV) Restore() error { return nil }
func (kv *nopKV) Close() error { return nil }
type fakeSnapshot struct {
mu sync.Mutex
closed bool
}
func (s *fakeSnapshot) Size() int64 { return 0 }
func (s *fakeSnapshot) WriteTo(w io.Writer) (int64, error) { return 0, nil }
func (s *fakeSnapshot) Close() error {
s.mu.Lock()
s.closed = true
s.mu.Unlock()
return nil
}

View File

@ -86,7 +86,11 @@ type Transporter interface {
// If the connection is currently inactive, it returns zero time.
ActiveSince(id types.ID) time.Time
// SnapshotReady accepts a snapshot at the given index that is ready to send out.
// SnapshotReady MUST not be called when the snapshot sent result of previous
// It is expected that caller sends a raft snapshot message with
// the given index soon, and the accepted snapshot will be sent out
// together. After sending, snapshot sent status is reported
// through Raft.SnapshotStatus.
// SnapshotReady MUST not be called when the snapshot sent status of previous
// accepted one has not been reported.
SnapshotReady(rc io.ReadCloser, index uint64)
// Stop closes the connections and stops the transporter.