etcd/etcdserver/storage.go

178 lines
4.5 KiB
Go

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"fmt"
"io"
"os"
"path"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/version"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
type Storage interface {
// Save function saves ents and state to the underlying stable storage.
// Save MUST block until st and ents are on stable storage.
Save(st raftpb.HardState, ents []raftpb.Entry) error
// SaveSnap function saves snapshot to the underlying stable storage.
SaveSnap(snap raftpb.Snapshot) error
// DBFilePath returns the file path of database snapshot saved with given
// id.
DBFilePath(id uint64) (string, error)
// Close closes the Storage and performs finalization.
Close() error
}
type storage struct {
*wal.WAL
*snap.Snapshotter
}
func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
return &storage{w, s}
}
// SaveSnap saves the snapshot to disk and release the locked
// wal files since they will not be used.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
}
err := st.WAL.SaveSnapshot(walsnap)
if err != nil {
return err
}
err = st.Snapshotter.SaveSnap(snap)
if err != nil {
return err
}
err = st.WAL.ReleaseLockTo(snap.Metadata.Index)
if err != nil {
return err
}
return nil
}
func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
var (
err error
wmetadata []byte
)
repaired := false
for {
if w, err = wal.Open(waldir, snap); err != nil {
plog.Fatalf("open wal error: %v", err)
}
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
plog.Fatalf("read wal error (%v) and cannot be repaired", err)
}
if !wal.Repair(waldir) {
plog.Fatalf("WAL error (%v) cannot be repaired", err)
} else {
plog.Infof("repaired WAL error (%v)", err)
repaired = true
}
continue
}
break
}
var metadata pb.Metadata
pbutil.MustUnmarshal(&metadata, wmetadata)
id = types.ID(metadata.NodeID)
cid = types.ID(metadata.ClusterID)
return
}
// upgradeWAL converts an older version of the etcdServer data to the newest version.
// It must ensure that, after upgrading, the most recent version is present.
func upgradeDataDir(baseDataDir string, name string, ver version.DataDirVersion) error {
switch ver {
case version.DataDir2_0:
err := makeMemberDir(baseDataDir)
if err != nil {
return err
}
fallthrough
case version.DataDir2_0_1:
fallthrough
default:
}
return nil
}
func makeMemberDir(dir string) error {
membdir := path.Join(dir, "member")
_, err := os.Stat(membdir)
switch {
case err == nil:
return nil
case !os.IsNotExist(err):
return err
}
if err := os.MkdirAll(membdir, 0700); err != nil {
return err
}
names := []string{"snap", "wal"}
for _, name := range names {
if err := os.Rename(path.Join(dir, name), path.Join(membdir, name)); err != nil {
return err
}
}
return nil
}
type storageRecorder struct {
testutil.RecorderBuffered
dbPath string // must have '/' suffix if set
}
func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error {
p.Record(testutil.Action{Name: "Save"})
return nil
}
func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
if !raft.IsEmptySnap(st) {
p.Record(testutil.Action{Name: "SaveSnap"})
}
return nil
}
func (p *storageRecorder) DBFilePath(id uint64) (string, error) {
p.Record(testutil.Action{Name: "DBFilePath"})
path := p.dbPath
if path != "" {
path = path + "/"
}
return fmt.Sprintf("%s%016x.snap.db", path, id), nil
}
func (p *storageRecorder) Close() error { return nil }