// Copyright 2015 CoreOS, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package raft import ( "fmt" "log" pb "github.com/coreos/etcd/raft/raftpb" ) type raftLog struct { // storage contains all stable entries since the last snapshot. storage Storage // unstable contains all unstable entries and snapshot. // they will be saved into storage. unstable unstable // committed is the highest log position that is known to be in // stable storage on a quorum of nodes. committed uint64 // applied is the highest log position that the application has // been instructed to apply to its state machine. // Invariant: applied <= committed applied uint64 logger Logger } // newLog returns log using the given storage. It recovers the log to the state // that it just commits and applies the latest snapshot. func newLog(storage Storage, logger Logger) *raftLog { if storage == nil { log.Panic("storage must not be nil") } log := &raftLog{ storage: storage, logger: logger, } firstIndex, err := storage.FirstIndex() if err != nil { panic(err) // TODO(bdarnell) } lastIndex, err := storage.LastIndex() if err != nil { panic(err) // TODO(bdarnell) } log.unstable.offset = lastIndex + 1 log.unstable.logger = logger // Initialize our committed and applied pointers to the time of the last compaction. log.committed = firstIndex - 1 log.applied = firstIndex - 1 return log } func (l *raftLog) String() string { return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries)) } // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { lastnewi = index + uint64(len(ents)) if l.matchTerm(index, logTerm) { ci := l.findConflict(ents) switch { case ci == 0: case ci <= l.committed: l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) default: offset := index + 1 l.append(ents[ci-offset:]...) } l.commitTo(min(committed, lastnewi)) return lastnewi, true } return 0, false } func (l *raftLog) append(ents ...pb.Entry) uint64 { if len(ents) == 0 { return l.lastIndex() } if after := ents[0].Index - 1; after < l.committed { l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) } l.unstable.truncateAndAppend(ents) return l.lastIndex() } // findConflict finds the index of the conflict. // It returns the first pair of conflicting entries between the existing // entries and the given entries, if there are any. // If there is no conflicting entries, and the existing entries contains // all the given entries, zero will be returned. // If there is no conflicting entries, but the given entries contains new // entries, the index of the first new entry will be returned. // An entry is considered to be conflicting if it has the same index but // a different term. // The first entry MUST have an index equal to the argument 'from'. // The index of the given entries MUST be continuously increasing. func (l *raftLog) findConflict(ents []pb.Entry) uint64 { for _, ne := range ents { if !l.matchTerm(ne.Index, ne.Term) { if ne.Index <= l.lastIndex() { l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term) } return ne.Index } } return 0 } func (l *raftLog) unstableEntries() []pb.Entry { if len(l.unstable.entries) == 0 { return nil } return l.unstable.entries } // nextEnts returns all the available entries for execution. // If applied is smaller than the index of snapshot, it returns all committed // entries after the index of snapshot. func (l *raftLog) nextEnts() (ents []pb.Entry) { off := max(l.applied+1, l.firstIndex()) if l.committed+1 > off { ents, err := l.slice(off, l.committed+1, noLimit) if err != nil { l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) } return ents } return nil } // hasNextEnts returns if there is any available entries for execution. This // is a fast check without heavy raftLog.slice() in raftLog.nextEnts(). func (l *raftLog) hasNextEnts() bool { off := max(l.applied+1, l.firstIndex()) return l.committed+1 > off } func (l *raftLog) snapshot() (pb.Snapshot, error) { if l.unstable.snapshot != nil { return *l.unstable.snapshot, nil } return l.storage.Snapshot() } func (l *raftLog) firstIndex() uint64 { if i, ok := l.unstable.maybeFirstIndex(); ok { return i } index, err := l.storage.FirstIndex() if err != nil { panic(err) // TODO(bdarnell) } return index } func (l *raftLog) lastIndex() uint64 { if i, ok := l.unstable.maybeLastIndex(); ok { return i } i, err := l.storage.LastIndex() if err != nil { panic(err) // TODO(bdarnell) } return i } func (l *raftLog) commitTo(tocommit uint64) { // never decrease commit if l.committed < tocommit { if l.lastIndex() < tocommit { l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex()) } l.committed = tocommit } } func (l *raftLog) appliedTo(i uint64) { if i == 0 { return } if l.committed < i || i < l.applied { l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) } l.applied = i } func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) } func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } func (l *raftLog) lastTerm() uint64 { t, err := l.term(l.lastIndex()) if err != nil { l.logger.Panicf("unexpected error when getting the last term (%v)", err) } return t } func (l *raftLog) term(i uint64) (uint64, error) { // the valid term range is [index of dummy entry, last index] dummyIndex := l.firstIndex() - 1 if i < dummyIndex || i > l.lastIndex() { // TODO: return an error instead? return 0, nil } if t, ok := l.unstable.maybeTerm(i); ok { return t, nil } t, err := l.storage.Term(i) if err == nil { return t, nil } if err == ErrCompacted { return 0, err } panic(err) // TODO(bdarnell) } func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) { if i > l.lastIndex() { return nil, nil } return l.slice(i, l.lastIndex()+1, maxsize) } // allEntries returns all entries in the log. func (l *raftLog) allEntries() []pb.Entry { ents, err := l.entries(l.firstIndex(), noLimit) if err == nil { return ents } if err == ErrCompacted { // try again if there was a racing compaction return l.allEntries() } // TODO (xiangli): handle error? panic(err) } // isUpToDate determines if the given (lastIndex,term) log is more up-to-date // by comparing the index and term of the last entries in the existing logs. // If the logs have last entries with different terms, then the log with the // later term is more up-to-date. If the logs end with the same term, then // whichever log has the larger lastIndex is more up-to-date. If the logs are // the same, the given log is up-to-date. func (l *raftLog) isUpToDate(lasti, term uint64) bool { return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex()) } func (l *raftLog) matchTerm(i, term uint64) bool { t, err := l.term(i) if err != nil { return false } return t == term } func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } return false } func (l *raftLog) restore(s pb.Snapshot) { l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term) l.committed = s.Metadata.Index l.unstable.restore(s) } // slice returns a slice of log entries from lo through hi-1, inclusive. func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) { err := l.mustCheckOutOfBounds(lo, hi) if err != nil { return nil, err } if lo == hi { return nil, nil } var ents []pb.Entry if lo < l.unstable.offset { storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize) if err == ErrCompacted { return nil, err } else if err == ErrUnavailable { l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset)) } else if err != nil { panic(err) // TODO(bdarnell) } // check if ents has reached the size limitation if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo { return storedEnts, nil } ents = storedEnts } if hi > l.unstable.offset { unstable := l.unstable.slice(max(lo, l.unstable.offset), hi) if len(ents) > 0 { ents = append([]pb.Entry{}, ents...) ents = append(ents, unstable...) } else { ents = unstable } } return limitSize(ents, maxSize), nil } // l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries) func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { if lo > hi { l.logger.Panicf("invalid slice %d > %d", lo, hi) } fi := l.firstIndex() if lo < fi { return ErrCompacted } length := l.lastIndex() - fi + 1 if lo < fi || hi > fi+length { l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex()) } return nil } func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 { if err == nil { return t } if err == ErrCompacted { return 0 } l.logger.Panicf("unexpected error (%v)", err) return 0 }