etcd/wal/wal.go

413 lines
10 KiB
Go
Raw Normal View History

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2014-07-28 07:58:39 +04:00
2014-07-26 01:20:21 +04:00
package wal
import (
2014-09-04 02:10:15 +04:00
"errors"
2014-07-26 01:20:21 +04:00
"fmt"
2014-09-04 02:10:15 +04:00
"hash/crc32"
2014-07-26 11:10:59 +04:00
"io"
2014-12-13 01:38:18 +03:00
"log"
2014-07-26 01:20:21 +04:00
"os"
2014-08-22 23:44:18 +04:00
"path"
"reflect"
2014-07-26 01:20:21 +04:00
2014-11-29 08:32:28 +03:00
"github.com/coreos/etcd/pkg/fileutil"
2014-10-11 16:41:00 +04:00
"github.com/coreos/etcd/pkg/pbutil"
2014-09-09 08:45:10 +04:00
"github.com/coreos/etcd/raft"
2014-09-04 02:10:15 +04:00
"github.com/coreos/etcd/raft/raftpb"
2014-09-04 03:46:42 +04:00
"github.com/coreos/etcd/wal/walpb"
2014-07-26 01:20:21 +04:00
)
2014-08-22 23:44:18 +04:00
const (
metadataType int64 = iota + 1
2014-08-22 23:44:18 +04:00
entryType
stateType
2014-09-04 02:10:15 +04:00
crcType
2015-01-06 10:27:03 +03:00
snapshotType
// the owner can make/remove files inside the directory
privateDirMode = 0700
2014-08-22 23:44:18 +04:00
)
2014-07-26 01:20:21 +04:00
var (
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
ErrFileNotFound = errors.New("wal: file not found")
ErrCRCMismatch = errors.New("wal: crc mismatch")
2015-01-06 10:27:03 +03:00
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
crcTable = crc32.MakeTable(crc32.Castagnoli)
2014-07-26 01:20:21 +04:00
)
2014-09-04 02:10:15 +04:00
// WAL is a logical repersentation of the stable storage.
// WAL is either in read mode or append mode but not both.
// A newly created WAL is in append mode, and ready for appending records.
// A just opened WAL is in read mode, and ready for reading records.
// The WAL will be ready for appending after reading out all the previous records.
2014-07-26 01:20:21 +04:00
type WAL struct {
dir string // the living directory of the underlay files
metadata []byte // metadata recorded at the head of each WAL
state raftpb.HardState // hardstate recorded at the head of WAL
2014-07-26 01:20:21 +04:00
2015-01-06 10:27:03 +03:00
start walpb.Snapshot // snapshot to start reading
decoder *decoder // decoder to decode records
2014-07-28 07:32:23 +04:00
2014-09-04 02:10:15 +04:00
f *os.File // underlay file opened for appending, sync
2014-10-08 15:58:53 +04:00
seq uint64 // sequence of the wal file currently used for writes
enti uint64 // index of the last entry saved to the wal
2014-09-04 02:10:15 +04:00
encoder *encoder // encoder to encode records
locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing)
2014-08-22 23:44:18 +04:00
}
// Create creates a WAL ready for appending records. The given metadata is
// recorded at the head of each WAL file, and can be retrieved with ReadAll.
func Create(dirpath string, metadata []byte) (*WAL, error) {
2014-08-22 23:44:18 +04:00
if Exist(dirpath) {
2014-07-26 01:54:08 +04:00
return nil, os.ErrExist
}
2014-09-05 08:15:39 +04:00
if err := os.MkdirAll(dirpath, privateDirMode); err != nil {
2014-09-05 08:15:39 +04:00
return nil, err
}
p := path.Join(dirpath, walName(0, 0))
2014-08-22 23:44:18 +04:00
f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
2014-07-26 01:20:21 +04:00
if err != nil {
return nil, err
}
l, err := fileutil.NewLock(f.Name())
if err != nil {
return nil, err
}
err = l.Lock()
if err != nil {
return nil, err
}
2014-09-04 02:10:15 +04:00
w := &WAL{
2014-10-11 16:41:00 +04:00
dir: dirpath,
metadata: metadata,
seq: 0,
f: f,
encoder: newEncoder(f, 0),
2014-09-04 02:10:15 +04:00
}
w.locks = append(w.locks, l)
2014-09-04 02:10:15 +04:00
if err := w.saveCrc(0); err != nil {
return nil, err
}
if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
return nil, err
}
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
2014-11-08 01:41:34 +03:00
return nil, err
}
2014-09-04 02:10:15 +04:00
return w, nil
2014-07-26 01:20:21 +04:00
}
2015-01-06 10:27:03 +03:00
// Open opens the WAL at the given snap.
// The snap SHOULD have been previously saved to the WAL, or the following
// ReadAll will fail.
2015-01-06 10:27:03 +03:00
// The returned WAL is ready to read and the first record will be the one after
// the given snap. The WAL cannot be appended to before reading out all of its
// previous records.
2015-01-06 10:27:03 +03:00
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(dirpath, snap, true)
2014-12-13 01:38:18 +03:00
}
// OpenNotInUse only opens the wal files that are not in use.
// Other than that, it is similar to Open.
2015-01-06 10:27:03 +03:00
func OpenNotInUse(dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(dirpath, snap, false)
2014-12-13 01:38:18 +03:00
}
2015-01-06 10:27:03 +03:00
func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) {
2014-11-29 08:32:28 +03:00
names, err := fileutil.ReadDir(dirpath)
2014-08-22 23:44:18 +04:00
if err != nil {
return nil, err
}
names = checkWalNames(names)
if len(names) == 0 {
return nil, ErrFileNotFound
2014-08-22 23:44:18 +04:00
}
2015-01-06 10:27:03 +03:00
nameIndex, ok := searchIndex(names, snap.Index)
2014-09-04 02:10:15 +04:00
if !ok || !isValidSeq(names[nameIndex:]) {
return nil, ErrFileNotFound
2014-07-26 01:54:08 +04:00
}
2014-09-04 02:10:15 +04:00
// open the wal files for reading
rcs := make([]io.ReadCloser, 0)
ls := make([]fileutil.Lock, 0)
2014-09-04 02:10:15 +04:00
for _, name := range names[nameIndex:] {
f, err := os.Open(path.Join(dirpath, name))
if err != nil {
return nil, err
}
l, err := fileutil.NewLock(f.Name())
if err != nil {
return nil, err
}
err = l.TryLock()
if err != nil {
2014-12-13 01:38:18 +03:00
if all {
return nil, err
} else {
log.Printf("wal: opened all the files until %s, since it is still in use by an etcd server", name)
break
}
}
2014-09-04 02:10:15 +04:00
rcs = append(rcs, f)
ls = append(ls, l)
2014-07-26 01:20:21 +04:00
}
2014-09-04 02:10:15 +04:00
rc := MultiReadCloser(rcs...)
2014-07-26 01:20:21 +04:00
2014-09-04 02:10:15 +04:00
// open the lastest wal file for appending
2014-09-15 02:33:52 +04:00
seq, _, err := parseWalName(names[len(names)-1])
if err != nil {
rc.Close()
return nil, err
}
2014-09-04 02:10:15 +04:00
last := path.Join(dirpath, names[len(names)-1])
f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0)
2014-07-26 01:20:21 +04:00
if err != nil {
2014-09-04 02:10:15 +04:00
rc.Close()
return nil, err
2014-07-26 01:20:21 +04:00
}
2014-09-04 02:10:15 +04:00
// create a WAL ready for reading
w := &WAL{
2014-09-17 05:18:45 +04:00
dir: dirpath,
2015-01-06 10:27:03 +03:00
start: snap,
2014-09-04 02:10:15 +04:00
decoder: newDecoder(rc),
2014-07-26 01:20:21 +04:00
f: f,
seq: seq,
locks: ls,
2014-07-26 11:10:59 +04:00
}
2014-09-04 02:10:15 +04:00
return w, nil
2014-07-26 11:10:59 +04:00
}
2014-09-04 02:10:15 +04:00
// ReadAll reads out all records of the current WAL.
2015-01-06 10:27:03 +03:00
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
// If loaded snap doesn't match with the expected one, it will return
// all the records and error ErrSnapshotMismatch.
2015-01-06 10:27:03 +03:00
// TODO: detect not-last-snap error.
// TODO: maybe loose the checking of match.
2014-09-04 02:10:15 +04:00
// After ReadAll, the WAL will be ready for appending new records.
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
2014-09-04 03:46:42 +04:00
rec := &walpb.Record{}
2014-09-04 02:10:15 +04:00
decoder := w.decoder
2014-09-09 02:36:25 +04:00
2015-01-06 10:27:03 +03:00
var match bool
2014-09-04 02:10:15 +04:00
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
2014-08-05 02:46:12 +04:00
switch rec.Type {
2014-07-26 11:10:59 +04:00
case entryType:
2014-09-04 02:10:15 +04:00
e := mustUnmarshalEntry(rec.Data)
2015-01-06 10:27:03 +03:00
if e.Index > w.start.Index {
ents = append(ents[:e.Index-w.start.Index-1], e)
2014-07-26 11:10:59 +04:00
}
2014-09-15 02:11:50 +04:00
w.enti = e.Index
2014-07-26 11:10:59 +04:00
case stateType:
2014-09-04 02:10:15 +04:00
state = mustUnmarshalState(rec.Data)
case metadataType:
if metadata != nil && !reflect.DeepEqual(metadata, rec.Data) {
2014-09-04 02:10:15 +04:00
state.Reset()
return nil, state, nil, ErrMetadataConflict
2014-07-26 11:10:59 +04:00
}
metadata = rec.Data
2014-09-04 02:10:15 +04:00
case crcType:
crc := decoder.crc.Sum32()
// current crc of decoder must match the crc of the record.
// do no need to match 0 crc, since the decoder is a new one at this case.
2014-09-04 03:46:42 +04:00
if crc != 0 && rec.Validate(crc) != nil {
2014-09-04 02:10:15 +04:00
state.Reset()
return nil, state, nil, ErrCRCMismatch
2014-09-04 02:10:15 +04:00
}
decoder.updateCRC(rec.Crc)
2015-01-06 10:27:03 +03:00
case snapshotType:
var snap walpb.Snapshot
pbutil.MustUnmarshal(&snap, rec.Data)
if snap.Index == w.start.Index {
if snap.Term != w.start.Term {
state.Reset()
return nil, state, nil, ErrSnapshotMismatch
}
match = true
}
2014-07-26 11:10:59 +04:00
default:
2014-09-04 02:10:15 +04:00
state.Reset()
return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
2014-07-26 11:10:59 +04:00
}
}
2014-07-28 04:23:04 +04:00
if err != io.EOF {
2014-09-04 02:10:15 +04:00
state.Reset()
return nil, state, nil, err
2014-08-22 23:44:18 +04:00
}
err = nil
2015-01-06 10:27:03 +03:00
if !match {
err = ErrSnapshotNotFound
2015-01-06 10:27:03 +03:00
}
2014-08-22 23:44:18 +04:00
2014-09-04 02:10:15 +04:00
// close decoder, disable reading
w.decoder.close()
2015-01-06 10:27:03 +03:00
w.start = walpb.Snapshot{}
2014-09-04 02:10:15 +04:00
2014-10-11 16:41:00 +04:00
w.metadata = metadata
2014-09-04 02:10:15 +04:00
// create encoder (chain crc with the decoder), enable appending
w.encoder = newEncoder(w.f, w.decoder.lastCRC())
w.decoder = nil
return metadata, state, ents, err
2014-08-22 23:44:18 +04:00
}
2014-09-04 02:10:15 +04:00
// Cut closes current file written and creates a new one ready to append.
2014-09-15 02:11:50 +04:00
func (w *WAL) Cut() error {
2014-09-04 02:10:15 +04:00
// create a new wal file with name sequence + 1
fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
2014-09-04 02:10:15 +04:00
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
2014-08-22 23:44:18 +04:00
if err != nil {
2014-09-04 02:10:15 +04:00
return err
2014-08-22 23:44:18 +04:00
}
l, err := fileutil.NewLock(f.Name())
if err != nil {
return err
}
err = l.Lock()
if err != nil {
return err
}
w.locks = append(w.locks, l)
2014-11-08 01:41:34 +03:00
if err = w.sync(); err != nil {
return err
}
2014-09-04 02:10:15 +04:00
w.f.Close()
2014-08-22 23:44:18 +04:00
2014-09-04 02:10:15 +04:00
// update writer and save the previous crc
w.f = f
w.seq++
prevCrc := w.encoder.crc.Sum32()
w.encoder = newEncoder(w.f, prevCrc)
if err := w.saveCrc(prevCrc); err != nil {
return err
}
if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
return err
}
if err := w.saveState(&w.state); err != nil {
return err
}
return w.sync()
2014-08-22 23:44:18 +04:00
}
2014-11-08 01:41:34 +03:00
func (w *WAL) sync() error {
2014-09-04 02:10:15 +04:00
if w.encoder != nil {
if err := w.encoder.flush(); err != nil {
return err
2014-08-22 23:44:18 +04:00
}
}
2014-09-04 02:10:15 +04:00
return w.f.Sync()
2014-08-22 23:44:18 +04:00
}
// ReleaseLockTo releases the locks w is holding, which
// have index smaller or equal to the given index.
func (w *WAL) ReleaseLockTo(index uint64) error {
for _, l := range w.locks {
_, i, err := parseWalName(path.Base(l.Name()))
if err != nil {
return err
}
if i > index {
return nil
}
err = l.Unlock()
if err != nil {
return err
}
err = l.Destroy()
if err != nil {
return err
}
w.locks = w.locks[1:]
}
return nil
}
2014-11-08 01:41:34 +03:00
func (w *WAL) Close() error {
2014-09-04 02:10:15 +04:00
if w.f != nil {
2014-11-08 01:41:34 +03:00
if err := w.sync(); err != nil {
return err
}
if err := w.f.Close(); err != nil {
return err
}
2014-08-22 23:44:18 +04:00
}
for _, l := range w.locks {
// TODO: log the error
l.Unlock()
l.Destroy()
}
2014-11-08 01:41:34 +03:00
return nil
2014-07-26 11:10:59 +04:00
}
func (w *WAL) saveEntry(e *raftpb.Entry) error {
2014-10-11 16:41:00 +04:00
b := pbutil.MustMarshal(e)
2014-09-04 03:46:42 +04:00
rec := &walpb.Record{Type: entryType, Data: b}
2014-09-15 02:11:50 +04:00
if err := w.encoder.encode(rec); err != nil {
return err
}
w.enti = e.Index
return nil
2014-07-26 11:10:59 +04:00
}
func (w *WAL) saveState(s *raftpb.HardState) error {
if raft.IsEmptyHardState(*s) {
2014-09-09 08:45:10 +04:00
return nil
}
w.state = *s
2014-10-11 16:41:00 +04:00
b := pbutil.MustMarshal(s)
2014-09-04 03:46:42 +04:00
rec := &walpb.Record{Type: stateType, Data: b}
2014-09-04 02:10:15 +04:00
return w.encoder.encode(rec)
2014-08-22 23:44:18 +04:00
}
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
2014-09-09 00:57:35 +04:00
// TODO(xiangli): no more reference operator
if err := w.saveState(&st); err != nil {
return err
}
2014-09-05 08:15:39 +04:00
for i := range ents {
if err := w.saveEntry(&ents[i]); err != nil {
return err
}
2014-09-05 08:15:39 +04:00
}
2014-11-08 01:41:34 +03:00
return w.sync()
2014-09-05 08:15:39 +04:00
}
2015-01-06 10:27:03 +03:00
func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
b := pbutil.MustMarshal(&e)
rec := &walpb.Record{Type: snapshotType, Data: b}
if err := w.encoder.encode(rec); err != nil {
return err
}
// update enti only when snapshot is ahead of last index
if w.enti < e.Index {
w.enti = e.Index
}
return w.sync()
}
2014-09-04 02:10:15 +04:00
func (w *WAL) saveCrc(prevCrc uint32) error {
2014-09-04 03:46:42 +04:00
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
2014-07-26 02:21:04 +04:00
}