diff --git a/etcdserver/server.go b/etcdserver/server.go index 39ea3067c..1d408a637 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -459,8 +459,13 @@ func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved( func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) } +// ReportSnapshot reports snapshot sent status to the raft state machine, +// and clears the used snapshot from the snapshot store. func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { s.r.ReportSnapshot(id, status) + if s.cfg.V3demo { + s.r.raftStorage.snapStore.clearUsedSnap() + } } func (s *EtcdServer) run() { @@ -1022,6 +1027,9 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { plog.Panicf("unexpected compaction error %v", err) } plog.Infof("compacted raft log at %d", compacti) + if s.cfg.V3demo && s.r.raftStorage.snapStore.closeSnapBefore(compacti) { + plog.Infof("closed snapshot stored due to compaction at %d", compacti) + } }() } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 600fd05a0..fc20f25ce 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -761,6 +761,7 @@ func TestSnapshot(t *testing.T) { st := &storeRecorder{} p := &storageRecorder{} srv := &EtcdServer{ + cfg: &ServerConfig{}, r: raftNode{ Node: &nodeRecorder{}, raftStorage: s, diff --git a/etcdserver/snapshot_store.go b/etcdserver/snapshot_store.go index 0f07655c0..573f0e7ff 100644 --- a/etcdserver/snapshot_store.go +++ b/etcdserver/snapshot_store.go @@ -20,7 +20,10 @@ import ( "io/ioutil" "os" "path" + "sync" + "time" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" @@ -28,6 +31,14 @@ import ( dstorage "github.com/coreos/etcd/storage" ) +// clearUnusedSnapshotInterval specifies the time interval to wait +// before clearing unused snapshot. +// The newly created snapshot should be retrieved within one heartbeat +// interval because raft state machine retries to send snapshot +// to slow follower when receiving MsgHeartbeatResp from the follower. +// Set it as 5s to match the upper limit of heartbeat interval. +const clearUnusedSnapshotInterval = 5 * time.Second + type snapshot struct { r raftpb.Snapshot @@ -78,7 +89,13 @@ type snapshotStore struct { // snapshotStore will receive from the chan immediately after it sends empty to reqsnapc raftsnapc chan raftpb.Snapshot - snap *snapshot + mu sync.Mutex // protect belowing vars + // snap is nil iff there is no snapshot stored + snap *snapshot + inUse bool + createOnce sync.Once // ensure at most one snapshot is created when no snapshot stored + + clock clockwork.Clock } func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore { @@ -87,35 +104,113 @@ func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore { kv: kv, reqsnapc: make(chan struct{}), raftsnapc: make(chan raftpb.Snapshot), + clock: clockwork.NewRealClock(), } } // getSnap returns a snapshot. // If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned. // -// Internally it creates new snapshot and returns the snapshot. Unless the -// returned snapshot is closed, it rejects creating new one and returns -// ErrSnapshotTemporarilyUnavailable. +// If the snapshot stored is in use, it returns ErrSnapshotTemporarilyUnavailable. +// If there is no snapshot stored, it creates new snapshot +// asynchronously and returns ErrSnapshotTemporarilyUnavailable, so +// caller could get snapshot later when the snapshot is created. +// Otherwise, it returns the snapshot stored. +// +// The created snapshot is cleared from the snapshot store if it is +// either unused after clearUnusedSnapshotInterval, or explicitly cleared +// through clearUsedSnap after using. +// closeSnapBefore is used to close outdated snapshot, +// so the snapshot will be cleared faster when in use. +// +// snapshot store stores at most one snapshot at a time. // If raft state machine wants to send two snapshot messages to two followers, // the second snapshot message will keep getting snapshot and succeed only after // the first message is sent. This increases the time used to send messages, // but it is acceptable because this should happen seldomly. func (ss *snapshotStore) getSnap() (*snapshot, error) { - // If snapshotStore has some snapshot that has not been closed, it cannot - // request new snapshot. So it returns ErrSnapshotTemporarilyUnavailable. - if ss.snap != nil && !ss.snap.isClosed() { + ss.mu.Lock() + defer ss.mu.Unlock() + + if ss.inUse { return nil, raft.ErrSnapshotTemporarilyUnavailable } + if ss.snap == nil { + // create snapshot asynchronously + ss.createOnce.Do(func() { go ss.createSnap() }) + return nil, raft.ErrSnapshotTemporarilyUnavailable + } + + ss.inUse = true + // give transporter the generated snapshot that is ready to send out + ss.tr.SnapshotReady(ss.snap, ss.snap.raft().Metadata.Index) + return ss.snap, nil +} + +// clearUsedSnap clears the snapshot from the snapshot store after it +// is used. +// After clear, snapshotStore could create new snapshot when getSnap. +func (ss *snapshotStore) clearUsedSnap() { + ss.mu.Lock() + defer ss.mu.Unlock() + if !ss.inUse { + plog.Panicf("unexpected clearUsedSnap when snapshot is not in use") + } + ss.clear() +} + +// closeSnapBefore closes the stored snapshot if its index is not greater +// than the given compact index. +// If it closes the snapshot, it returns true. +func (ss *snapshotStore) closeSnapBefore(index uint64) bool { + ss.mu.Lock() + defer ss.mu.Unlock() + if ss.snap != nil && ss.snap.raft().Metadata.Index <= index { + if err := ss.snap.Close(); err != nil { + plog.Errorf("snapshot close error (%v)", err) + } + return true + } + return false +} + +// createSnap creates a new snapshot and stores it into the snapshot store. +// It also sets a timer to clear the snapshot if it is not in use after +// some time interval. +// It should only be called in snapshotStore functions. +func (ss *snapshotStore) createSnap() { // ask to generate v2 snapshot ss.reqsnapc <- struct{}{} // generate KV snapshot kvsnap := ss.kv.Snapshot() raftsnap := <-ss.raftsnapc - ss.snap = newSnapshot(raftsnap, kvsnap) - // give transporter the generated snapshot that is ready to send out - ss.tr.SnapshotReady(ss.snap, raftsnap.Metadata.Index) - return ss.snap, nil + snap := newSnapshot(raftsnap, kvsnap) + + ss.mu.Lock() + ss.snap = snap + ss.mu.Unlock() + + go func() { + <-ss.clock.After(clearUnusedSnapshotInterval) + ss.mu.Lock() + defer ss.mu.Unlock() + if snap == ss.snap && !ss.inUse { + ss.clear() + } + }() +} + +// clear clears snapshot related variables in snapshotStore. It closes +// the snapshot stored and sets the variables to initial values. +// It should only be called in snapshotStore functions. +func (ss *snapshotStore) clear() { + if err := ss.snap.Close(); err != nil { + plog.Errorf("snapshot close error (%v)", err) + } + ss.snap = nil + ss.inUse = false + ss.createOnce = sync.Once{} } // SaveFrom saves snapshot at the given index from the given reader. diff --git a/etcdserver/snapshot_store_test.go b/etcdserver/snapshot_store_test.go new file mode 100644 index 000000000..119108a34 --- /dev/null +++ b/etcdserver/snapshot_store_test.go @@ -0,0 +1,204 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "io" + "reflect" + "sync" + "testing" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" + "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + dstorage "github.com/coreos/etcd/storage" + "github.com/coreos/etcd/storage/storagepb" +) + +func TestSnapshotStoreCreateSnap(t *testing.T) { + snap := raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{Index: 1}, + } + ss := newSnapshotStore("", &nopKV{}) + fakeClock := clockwork.NewFakeClock() + ss.clock = fakeClock + go func() { + <-ss.reqsnapc + ss.raftsnapc <- snap + }() + + // create snapshot + ss.createSnap() + if !reflect.DeepEqual(ss.snap.raft(), snap) { + t.Errorf("raftsnap = %+v, want %+v", ss.snap.raft(), snap) + } + + // unused snapshot is cleared after clearUnusedSnapshotInterval + fakeClock.BlockUntil(1) + fakeClock.Advance(clearUnusedSnapshotInterval) + testutil.WaitSchedule() + ss.mu.Lock() + if ss.snap != nil { + t.Errorf("snap = %+v, want %+v", ss.snap, nil) + } + ss.mu.Unlock() +} + +func TestSnapshotStoreGetSnap(t *testing.T) { + snap := raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{Index: 1}, + } + ss := newSnapshotStore("", &nopKV{}) + fakeClock := clockwork.NewFakeClock() + ss.clock = fakeClock + ss.tr = &nopTransporter{} + go func() { + <-ss.reqsnapc + ss.raftsnapc <- snap + }() + + // get snap when no snapshot stored + _, err := ss.getSnap() + if err != raft.ErrSnapshotTemporarilyUnavailable { + t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable) + } + + // wait for asynchronous snapshot creation to finish + testutil.WaitSchedule() + // get the created snapshot + s, err := ss.getSnap() + if err != nil { + t.Fatalf("getSnap error = %v, want nil", err) + } + if !reflect.DeepEqual(s.raft(), snap) { + t.Errorf("raftsnap = %+v, want %+v", s.raft(), snap) + } + if !ss.inUse { + t.Errorf("inUse = %v, want true", ss.inUse) + } + + // get snap when snapshot stored has been in use + _, err = ss.getSnap() + if err != raft.ErrSnapshotTemporarilyUnavailable { + t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable) + } + + // clean up + fakeClock.Advance(clearUnusedSnapshotInterval) +} + +func TestSnapshotStoreClearUsedSnap(t *testing.T) { + s := &fakeSnapshot{} + var once sync.Once + once.Do(func() {}) + ss := &snapshotStore{ + snap: newSnapshot(raftpb.Snapshot{}, s), + inUse: true, + createOnce: once, + } + + ss.clearUsedSnap() + // wait for underlying KV snapshot closed + testutil.WaitSchedule() + s.mu.Lock() + if !s.closed { + t.Errorf("snapshot closed = %v, want true", s.closed) + } + s.mu.Unlock() + if ss.snap != nil { + t.Errorf("snapshot = %v, want nil", ss.snap) + } + if ss.inUse { + t.Errorf("isUse = %v, want false", ss.inUse) + } + // test createOnce is reset + if ss.createOnce == once { + t.Errorf("createOnce fails to reset") + } +} + +func TestSnapshotStoreCloseSnapBefore(t *testing.T) { + snapIndex := uint64(5) + + tests := []struct { + index uint64 + wok bool + }{ + {snapIndex - 2, false}, + {snapIndex - 1, false}, + {snapIndex, true}, + } + for i, tt := range tests { + rs := raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{Index: 5}, + } + s := &fakeSnapshot{} + ss := &snapshotStore{ + snap: newSnapshot(rs, s), + } + + ok := ss.closeSnapBefore(tt.index) + if ok != tt.wok { + t.Errorf("#%d: closeSnapBefore = %v, want %v", i, ok, tt.wok) + } + if ok { + // wait for underlying KV snapshot closed + testutil.WaitSchedule() + s.mu.Lock() + if !s.closed { + t.Errorf("#%d: snapshot closed = %v, want true", i, s.closed) + } + s.mu.Unlock() + } + } +} + +type nopKV struct{} + +func (kv *nopKV) Rev() int64 { return 0 } +func (kv *nopKV) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { + return nil, 0, nil +} +func (kv *nopKV) Put(key, value []byte) (rev int64) { return 0 } +func (kv *nopKV) DeleteRange(key, end []byte) (n, rev int64) { return 0, 0 } +func (kv *nopKV) TxnBegin() int64 { return 0 } +func (kv *nopKV) TxnEnd(txnID int64) error { return nil } +func (kv *nopKV) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { + return nil, 0, nil +} +func (kv *nopKV) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { return 0, nil } +func (kv *nopKV) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { + return 0, 0, nil +} +func (kv *nopKV) Compact(rev int64) error { return nil } +func (kv *nopKV) Hash() (uint32, error) { return 0, nil } +func (kv *nopKV) Snapshot() dstorage.Snapshot { return &fakeSnapshot{} } +func (kv *nopKV) Restore() error { return nil } +func (kv *nopKV) Close() error { return nil } + +type fakeSnapshot struct { + mu sync.Mutex + closed bool +} + +func (s *fakeSnapshot) Size() int64 { return 0 } +func (s *fakeSnapshot) WriteTo(w io.Writer) (int64, error) { return 0, nil } +func (s *fakeSnapshot) Close() error { + s.mu.Lock() + s.closed = true + s.mu.Unlock() + return nil +} diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 0a88d5da3..dc287801c 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -86,7 +86,11 @@ type Transporter interface { // If the connection is currently inactive, it returns zero time. ActiveSince(id types.ID) time.Time // SnapshotReady accepts a snapshot at the given index that is ready to send out. - // SnapshotReady MUST not be called when the snapshot sent result of previous + // It is expected that caller sends a raft snapshot message with + // the given index soon, and the accepted snapshot will be sent out + // together. After sending, snapshot sent status is reported + // through Raft.SnapshotStatus. + // SnapshotReady MUST not be called when the snapshot sent status of previous // accepted one has not been reported. SnapshotReady(rc io.ReadCloser, index uint64) // Stop closes the connections and stops the transporter.