Compare commits
23 Commits
v3.2.29
...
release-3.
Author | SHA1 | Date | |
---|---|---|---|
![]() |
333b9f2656 | ||
![]() |
7436c63a48 | ||
![]() |
5caf504f40 | ||
![]() |
7dc07f2a9b | ||
![]() |
7ec9c48a45 | ||
![]() |
8ce82ff877 | ||
![]() |
6064a0e39c | ||
![]() |
78f1a05493 | ||
![]() |
7a07e9f3b3 | ||
![]() |
8d4ab97008 | ||
![]() |
79c998d91a | ||
![]() |
b14255c0b4 | ||
![]() |
13465d6d3d | ||
![]() |
229492c969 | ||
![]() |
ba92a0e70f | ||
![]() |
55db5b49e8 | ||
![]() |
b7243f0175 | ||
![]() |
7619b2f744 | ||
![]() |
17acb61209 | ||
![]() |
6e77b87c06 | ||
![]() |
b7644ae5f0 | ||
![]() |
325f2d253c | ||
![]() |
b05103392d |
@@ -6,12 +6,15 @@ sudo: required
|
||||
services: docker
|
||||
|
||||
go:
|
||||
- 1.8.7
|
||||
- 1.12.17
|
||||
|
||||
notifications:
|
||||
on_success: never
|
||||
on_failure: never
|
||||
|
||||
env:
|
||||
- GO111MODULE=off
|
||||
|
||||
env:
|
||||
matrix:
|
||||
- TARGET=linux-amd64-integration
|
||||
@@ -23,7 +26,7 @@ env:
|
||||
matrix:
|
||||
fast_finish: true
|
||||
allow_failures:
|
||||
- go: 1.8.7
|
||||
- go: 1.12.17
|
||||
env: TARGET=linux-386-unit
|
||||
exclude:
|
||||
- go: tip
|
||||
|
@@ -1,4 +1,4 @@
|
||||
FROM ubuntu:16.10
|
||||
FROM ubuntu:16.04
|
||||
|
||||
RUN rm /bin/sh && ln -s /bin/bash /bin/sh
|
||||
RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
|
||||
@@ -29,6 +29,7 @@ RUN apt-get -y update \
|
||||
&& apt-get -y autoremove \
|
||||
&& apt-get -y autoclean
|
||||
|
||||
ENV GO111MODULE=off
|
||||
ENV GOROOT /usr/local/go
|
||||
ENV GOPATH /go
|
||||
ENV PATH ${GOPATH}/bin:${GOROOT}/bin:${PATH}
|
||||
@@ -46,11 +47,18 @@ ADD ./scripts/install-marker.sh /tmp/install-marker.sh
|
||||
|
||||
RUN go get -v -u -tags spell github.com/chzchzchz/goword \
|
||||
&& go get -v -u github.com/coreos/license-bill-of-materials \
|
||||
&& go get -v -u honnef.co/go/tools/cmd/gosimple \
|
||||
&& go get -v -u honnef.co/go/tools/cmd/unused \
|
||||
&& go get -v -u honnef.co/go/tools/cmd/staticcheck \
|
||||
&& go get -v -u github.com/wadey/gocovmerge \
|
||||
&& go get -v -u github.com/gordonklaus/ineffassign \
|
||||
&& mkdir -p $GOPATH/src/honnef.co/go/tools \
|
||||
&& git clone https://github.com/dominikh/go-tools.git $GOPATH/src/honnef.co/go/tools \
|
||||
&& cd $GOPATH/src/honnef.co/go/tools/cmd/staticcheck \
|
||||
&& git checkout 2017.2.2 \
|
||||
&& go get \
|
||||
&& go install \
|
||||
&& cd $GOPATH/src/honnef.co/go/tools/cmd/gosimple \
|
||||
&& go install \
|
||||
&& cd $GOPATH/src/honnef.co/go/tools/cmd/unused \
|
||||
&& go install \
|
||||
&& /tmp/install-marker.sh amd64 \
|
||||
&& rm -f /tmp/install-marker.sh \
|
||||
&& curl -s https://codecov.io/bash >/codecov \
|
||||
|
@@ -162,6 +162,9 @@ type AuthStore interface {
|
||||
|
||||
// AuthInfoFromTLS gets AuthInfo from TLS info of gRPC's context
|
||||
AuthInfoFromTLS(ctx context.Context) *AuthInfo
|
||||
|
||||
// WithRoot generates and installs a token that can be used as a root credential
|
||||
WithRoot(ctx context.Context) context.Context
|
||||
}
|
||||
|
||||
type TokenProvider interface {
|
||||
@@ -1070,3 +1073,40 @@ func NewTokenProvider(tokenOpts string, indexWaiter func(uint64) <-chan struct{}
|
||||
return nil, ErrInvalidAuthOpts
|
||||
}
|
||||
}
|
||||
|
||||
func (as *authStore) WithRoot(ctx context.Context) context.Context {
|
||||
if !as.isAuthEnabled() {
|
||||
return ctx
|
||||
}
|
||||
|
||||
var ctxForAssign context.Context
|
||||
if ts, ok := as.tokenProvider.(*tokenSimple); ok && ts != nil {
|
||||
ctx1 := context.WithValue(ctx, "index", uint64(0))
|
||||
prefix, err := ts.genTokenPrefix()
|
||||
if err != nil {
|
||||
plog.Errorf("failed to generate prefix of internally used token")
|
||||
return ctx
|
||||
}
|
||||
ctxForAssign = context.WithValue(ctx1, "simpleToken", prefix)
|
||||
} else {
|
||||
ctxForAssign = ctx
|
||||
}
|
||||
|
||||
token, err := as.tokenProvider.assign(ctxForAssign, "root", as.Revision())
|
||||
if err != nil {
|
||||
// this must not happen
|
||||
plog.Errorf("failed to assign token for lease revoking: %s", err)
|
||||
return ctx
|
||||
}
|
||||
|
||||
mdMap := map[string]string{
|
||||
"token": token,
|
||||
}
|
||||
tokenMD := metadata.New(mdMap)
|
||||
|
||||
// clean up tls info to ensure using root credential
|
||||
ctx = peer.NewContext(ctx, nil)
|
||||
|
||||
// use "mdIncomingKey{}" since it's called from local etcdserver
|
||||
return metadata.NewIncomingContext(ctx, tokenMD)
|
||||
}
|
||||
|
@@ -135,7 +135,7 @@ type loggableValueCompare struct {
|
||||
Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult"`
|
||||
Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget"`
|
||||
Key []byte `protobuf:"bytes,3,opt,name=key,proto3"`
|
||||
ValueSize int `protobuf:"bytes,7,opt,name=value_size,proto3"`
|
||||
ValueSize int64 `protobuf:"varint,7,opt,name=value_size,proto3"`
|
||||
}
|
||||
|
||||
func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompare {
|
||||
@@ -143,7 +143,7 @@ func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompar
|
||||
c.Result,
|
||||
c.Target,
|
||||
c.Key,
|
||||
len(cv.Value),
|
||||
int64(len(cv.Value)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,7 +156,7 @@ func (*loggableValueCompare) ProtoMessage() {}
|
||||
// To preserve proto encoding of the key bytes, a faked out proto type is used here.
|
||||
type loggablePutRequest struct {
|
||||
Key []byte `protobuf:"bytes,1,opt,name=key,proto3"`
|
||||
ValueSize int `protobuf:"varint,2,opt,name=value_size,proto3"`
|
||||
ValueSize int64 `protobuf:"varint,2,opt,name=value_size,proto3"`
|
||||
Lease int64 `protobuf:"varint,3,opt,name=lease,proto3"`
|
||||
PrevKv bool `protobuf:"varint,4,opt,name=prev_kv,proto3"`
|
||||
IgnoreValue bool `protobuf:"varint,5,opt,name=ignore_value,proto3"`
|
||||
@@ -166,7 +166,7 @@ type loggablePutRequest struct {
|
||||
func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
|
||||
return &loggablePutRequest{
|
||||
request.Key,
|
||||
len(request.Value),
|
||||
int64(len(request.Value)),
|
||||
request.Lease,
|
||||
request.PrevKv,
|
||||
request.IgnoreValue,
|
||||
|
@@ -123,6 +123,19 @@ var (
|
||||
Help: "Server or member ID in hexadecimal format. 1 for 'server_id' label with current ID.",
|
||||
},
|
||||
[]string{"server_id"})
|
||||
|
||||
fdUsed = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "os",
|
||||
Subsystem: "fd",
|
||||
Name: "used",
|
||||
Help: "The number of used file descriptors.",
|
||||
})
|
||||
fdLimit = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "os",
|
||||
Subsystem: "fd",
|
||||
Name: "limit",
|
||||
Help: "The file descriptor limit.",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -142,6 +155,8 @@ func init() {
|
||||
prometheus.MustRegister(currentVersion)
|
||||
prometheus.MustRegister(currentGoVersion)
|
||||
prometheus.MustRegister(serverID)
|
||||
prometheus.MustRegister(fdUsed)
|
||||
prometheus.MustRegister(fdLimit)
|
||||
|
||||
currentVersion.With(prometheus.Labels{
|
||||
"server_version": version.Version,
|
||||
@@ -152,7 +167,12 @@ func init() {
|
||||
}
|
||||
|
||||
func monitorFileDescriptor(done <-chan struct{}) {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
// This ticker will check File Descriptor Requirements ,and count all fds in used.
|
||||
// And recorded some logs when in used >= limit/5*4. Just recorded message.
|
||||
// If fds was more than 10K,It's low performance due to FDUsage() works.
|
||||
// So need to increase it.
|
||||
// See https://github.com/etcd-io/etcd/issues/11969 for more detail.
|
||||
ticker := time.NewTicker(10 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
used, err := runtime.FDUsage()
|
||||
@@ -160,11 +180,13 @@ func monitorFileDescriptor(done <-chan struct{}) {
|
||||
plog.Errorf("cannot monitor file descriptor usage (%v)", err)
|
||||
return
|
||||
}
|
||||
fdUsed.Set(float64(used))
|
||||
limit, err := runtime.FDLimit()
|
||||
if err != nil {
|
||||
plog.Errorf("cannot monitor file descriptor usage (%v)", err)
|
||||
return
|
||||
}
|
||||
fdLimit.Set(float64(limit))
|
||||
if used >= limit/5*4 {
|
||||
plog.Warningf("80%% of the file descriptor limit is used [used = %d, limit = %d]", used, limit)
|
||||
}
|
||||
|
@@ -222,6 +222,9 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
|
||||
// fix: LeaseRevoke may fail to apply when authentication is enabled and upgrading cluster from etcd-3.2 to etcd-3.3
|
||||
// see https://github.com/etcd-io/etcd/issues/11689
|
||||
ctx = s.authStore.WithRoot(ctx)
|
||||
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@@ -29,7 +29,7 @@ import (
|
||||
//
|
||||
// constants from /usr/include/bits/fcntl-linux.h
|
||||
const (
|
||||
F_OFD_GETLK = 37
|
||||
F_OFD_GETLK = 36
|
||||
F_OFD_SETLK = 37
|
||||
F_OFD_SETLKW = 38
|
||||
)
|
||||
|
@@ -95,12 +95,23 @@ func (pw *PageWriter) Write(p []byte) (n int, err error) {
|
||||
return n, werr
|
||||
}
|
||||
|
||||
// Flush flushes buffered data.
|
||||
func (pw *PageWriter) Flush() error {
|
||||
if pw.bufferedBytes == 0 {
|
||||
return nil
|
||||
}
|
||||
_, err := pw.w.Write(pw.buf[:pw.bufferedBytes])
|
||||
pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes
|
||||
pw.bufferedBytes = 0
|
||||
_, err := pw.flush()
|
||||
return err
|
||||
}
|
||||
|
||||
// FlushN flushes buffered data and returns the number of written bytes.
|
||||
func (pw *PageWriter) FlushN() (int, error) {
|
||||
return pw.flush()
|
||||
}
|
||||
|
||||
func (pw *PageWriter) flush() (int, error) {
|
||||
if pw.bufferedBytes == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
n, err := pw.w.Write(pw.buf[:pw.bufferedBytes])
|
||||
pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes
|
||||
pw.bufferedBytes = 0
|
||||
return n, err
|
||||
}
|
||||
|
@@ -24,7 +24,7 @@ func TestMarshaler(t *testing.T) {
|
||||
data := []byte("test data")
|
||||
m := &fakeMarshaler{data: data}
|
||||
if g := MustMarshal(m); !reflect.DeepEqual(g, data) {
|
||||
t.Errorf("data = %s, want %s", g, m)
|
||||
t.Errorf("data = %s, want %s", g, m.data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ func TestUnmarshaler(t *testing.T) {
|
||||
m := &fakeUnmarshaler{}
|
||||
MustUnmarshal(m, data)
|
||||
if !reflect.DeepEqual(m.data, data) {
|
||||
t.Errorf("data = %s, want %s", m.data, m)
|
||||
t.Errorf("data = %s, want %s", m.data, data)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -16,7 +16,7 @@
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
@@ -29,9 +29,20 @@ func FDLimit() (uint64, error) {
|
||||
}
|
||||
|
||||
func FDUsage() (uint64, error) {
|
||||
fds, err := ioutil.ReadDir("/proc/self/fd")
|
||||
return countFiles("/proc/self/fd")
|
||||
}
|
||||
|
||||
// countFiles reads the directory named by dirname and returns the count.
|
||||
// This is same as stdlib "io/ioutil.ReadDir" but without sorting.
|
||||
func countFiles(dirname string) (uint64, error) {
|
||||
f, err := os.Open(dirname)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return uint64(len(fds)), nil
|
||||
list, err := f.Readdir(-1)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return uint64(len(list)), nil
|
||||
}
|
||||
|
@@ -114,7 +114,7 @@ func (l *DefaultLogger) Fatalf(format string, v ...interface{}) {
|
||||
}
|
||||
|
||||
func (l *DefaultLogger) Panic(v ...interface{}) {
|
||||
l.Logger.Panic(v)
|
||||
l.Logger.Panic(v...)
|
||||
}
|
||||
|
||||
func (l *DefaultLogger) Panicf(format string, v ...interface{}) {
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.2.29"
|
||||
Version = "3.2.32"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
@@ -56,6 +56,11 @@ func (d *decoder) decode(rec *walpb.Record) error {
|
||||
return d.decodeRecord(rec)
|
||||
}
|
||||
|
||||
// raft max message size is set to 1 MB in etcd server
|
||||
// assume projects set reasonable message size limit,
|
||||
// thus entry size should never exceed 10 MB
|
||||
const maxWALEntrySizeLimit = int64(10 * 1024 * 1024)
|
||||
|
||||
func (d *decoder) decodeRecord(rec *walpb.Record) error {
|
||||
if len(d.brs) == 0 {
|
||||
return io.EOF
|
||||
@@ -76,6 +81,9 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
|
||||
}
|
||||
|
||||
recBytes, padBytes := decodeFrameSize(l)
|
||||
if recBytes >= maxWALEntrySizeLimit-padBytes {
|
||||
return ErrMaxWALEntrySizeLimitExceeded
|
||||
}
|
||||
|
||||
data := make([]byte, recBytes+padBytes)
|
||||
if _, err = io.ReadFull(d.brs[0], data); err != nil {
|
||||
|
@@ -92,7 +92,8 @@ func (e *encoder) encode(rec *walpb.Record) error {
|
||||
if padBytes != 0 {
|
||||
data = append(data, make([]byte, padBytes)...)
|
||||
}
|
||||
_, err = e.bw.Write(data)
|
||||
n, err = e.bw.Write(data)
|
||||
walWriteBytes.Add(float64(n))
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -108,13 +109,16 @@ func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) {
|
||||
|
||||
func (e *encoder) flush() error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
return e.bw.Flush()
|
||||
n, err := e.bw.FlushN()
|
||||
e.mu.Unlock()
|
||||
walWriteBytes.Add(float64(n))
|
||||
return err
|
||||
}
|
||||
|
||||
func writeUint64(w io.Writer, n uint64, buf []byte) error {
|
||||
// http://golang.org/src/encoding/binary/binary.go
|
||||
binary.LittleEndian.PutUint64(buf, n)
|
||||
_, err := w.Write(buf)
|
||||
nv, err := w.Write(buf)
|
||||
walWriteBytes.Add(float64(nv))
|
||||
return err
|
||||
}
|
||||
|
@@ -24,8 +24,15 @@ var (
|
||||
Help: "The latency distributions of fsync called by wal.",
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||
})
|
||||
walWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "disk",
|
||||
Name: "wal_write_bytes_total",
|
||||
Help: "Total number of bytes written in WAL.",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(syncDurations)
|
||||
prometheus.MustRegister(walWriteBytes)
|
||||
}
|
||||
|
@@ -18,6 +18,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
@@ -76,10 +77,14 @@ func Repair(dirpath string) bool {
|
||||
plog.Errorf("could not repair %v, failed to truncate file", f.Name())
|
||||
return false
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err = fileutil.Fsync(f.File); err != nil {
|
||||
plog.Errorf("could not repair %v, failed to sync file", f.Name())
|
||||
return false
|
||||
}
|
||||
syncDurations.Observe(time.Since(start).Seconds())
|
||||
|
||||
return true
|
||||
default:
|
||||
plog.Errorf("could not repair error (%v)", err)
|
||||
|
39
wal/wal.go
39
wal/wal.go
@@ -54,13 +54,15 @@ var (
|
||||
SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
|
||||
|
||||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "wal")
|
||||
|
||||
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
|
||||
ErrFileNotFound = errors.New("wal: file not found")
|
||||
ErrCRCMismatch = errors.New("wal: crc mismatch")
|
||||
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
|
||||
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
|
||||
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
||||
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
|
||||
ErrFileNotFound = errors.New("wal: file not found")
|
||||
ErrCRCMismatch = errors.New("wal: crc mismatch")
|
||||
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
|
||||
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
|
||||
ErrSliceOutOfRange = errors.New("wal: slice bounds out of range")
|
||||
ErrMaxWALEntrySizeLimitExceeded = errors.New("wal: max entry size limit exceeded")
|
||||
ErrDecoderNotFound = errors.New("wal: decoder not found")
|
||||
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
||||
)
|
||||
|
||||
// WAL is a logical representation of the stable storage.
|
||||
@@ -90,7 +92,8 @@ type WAL struct {
|
||||
}
|
||||
|
||||
// Create creates a WAL ready for appending records. The given metadata is
|
||||
// recorded at the head of each WAL file, and can be retrieved with ReadAll.
|
||||
// recorded at the head of each WAL file, and can be retrieved with ReadAll
|
||||
// after the file is Open.
|
||||
func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
if Exist(dirpath) {
|
||||
return nil, os.ErrExist
|
||||
@@ -147,9 +150,13 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
if perr != nil {
|
||||
return nil, perr
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if perr = fileutil.Fsync(pdir); perr != nil {
|
||||
return nil, perr
|
||||
}
|
||||
syncDurations.Observe(time.Since(start).Seconds())
|
||||
|
||||
if perr = pdir.Close(); err != nil {
|
||||
return nil, perr
|
||||
}
|
||||
@@ -257,6 +264,10 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||
defer w.mu.Unlock()
|
||||
|
||||
rec := &walpb.Record{}
|
||||
|
||||
if w.decoder == nil {
|
||||
return nil, state, nil, ErrDecoderNotFound
|
||||
}
|
||||
decoder := w.decoder
|
||||
|
||||
var match bool
|
||||
@@ -264,8 +275,15 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||
switch rec.Type {
|
||||
case entryType:
|
||||
e := mustUnmarshalEntry(rec.Data)
|
||||
// 0 <= e.Index-w.start.Index - 1 < len(ents)
|
||||
if e.Index > w.start.Index {
|
||||
ents = append(ents[:e.Index-w.start.Index-1], e)
|
||||
// prevent "panic: runtime error: slice bounds out of range [:13038096702221461992] with capacity 0"
|
||||
up := e.Index - w.start.Index - 1
|
||||
if up > uint64(len(ents)) {
|
||||
// return error before append call causes runtime panic
|
||||
return nil, state, nil, ErrSliceOutOfRange
|
||||
}
|
||||
ents = append(ents[:up], e)
|
||||
}
|
||||
w.enti = e.Index
|
||||
case stateType:
|
||||
@@ -409,9 +427,12 @@ func (w *WAL) cut() error {
|
||||
if err = os.Rename(newTail.Name(), fpath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err = fileutil.Fsync(w.dirFile); err != nil {
|
||||
return err
|
||||
}
|
||||
syncDurations.Observe(time.Since(start).Seconds())
|
||||
|
||||
newTail.Close()
|
||||
|
||||
|
@@ -18,6 +18,7 @@ import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
@@ -524,6 +525,35 @@ func TestOpenForRead(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenWithMaxIndex(t *testing.T) {
|
||||
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
// create WAL
|
||||
w, err := Create(p, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
es := []raftpb.Entry{{Index: uint64(math.MaxInt64)}}
|
||||
if err = w.Save(raftpb.HardState{}, es); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.Close()
|
||||
|
||||
w, err = Open(p, walpb.Snapshot{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, _, _, err = w.ReadAll()
|
||||
if err == nil || err != ErrSliceOutOfRange {
|
||||
t.Fatalf("err = %v, want ErrSliceOutOfRange", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEmpty(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
var est raftpb.HardState
|
||||
@@ -793,3 +823,23 @@ func TestOpenOnTornWrite(t *testing.T) {
|
||||
t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents))
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadAllFail(t *testing.T) {
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
// create initial WAL
|
||||
f, err := Create(dir, []byte("metadata"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f.Close()
|
||||
// try to read without opening the WAL
|
||||
_, _, _, err = f.ReadAll()
|
||||
if err == nil || err != ErrDecoderNotFound {
|
||||
t.Fatalf("err = %v, want ErrDecoderNotFound", err)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user