Add --raw mode to ./etcd-dump-log
This mode allows to look at RAW protos for all entries in WAL logs in the given directory. Signed-off-by: Piotr Tabor <ptab@google.com>dependabot/go_modules/go.uber.org/atomic-1.10.0
parent
58681d3feb
commit
e571fb7baa
|
@ -54,17 +54,22 @@ type decoder struct {
|
|||
continueOnCrcError bool
|
||||
}
|
||||
|
||||
func NewDecoder(r ...fileutil.FileReader) Decoder {
|
||||
func NewDecoderAdvanced(continueOnCrcError bool, r ...fileutil.FileReader) Decoder {
|
||||
readers := make([]*fileutil.FileBufReader, len(r))
|
||||
for i := range r {
|
||||
readers[i] = fileutil.NewFileBufReader(r[i])
|
||||
}
|
||||
return &decoder{
|
||||
brs: readers,
|
||||
crc: crc.New(0, crcTable),
|
||||
brs: readers,
|
||||
crc: crc.New(0, crcTable),
|
||||
continueOnCrcError: continueOnCrcError,
|
||||
}
|
||||
}
|
||||
|
||||
func NewDecoder(r ...fileutil.FileReader) Decoder {
|
||||
return NewDecoderAdvanced(false, r...)
|
||||
}
|
||||
|
||||
// Decode reads the next record out of the file.
|
||||
// In the success path, fills 'rec' and returns nil.
|
||||
// When it fails, it returns err and usually resets 'rec' to the defaults.
|
||||
|
|
|
@ -16,12 +16,12 @@ package wal
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
|
||||
|
|
|
@ -52,38 +52,7 @@ func TestEtcdDumpLogEntryType(t *testing.T) {
|
|||
|
||||
p := t.TempDir()
|
||||
|
||||
memberdir := filepath.Join(p, "member")
|
||||
err = os.Mkdir(memberdir, 0744)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
waldir := walDir(p)
|
||||
snapdir := snapDir(p)
|
||||
|
||||
w, err := wal.Create(zaptest.NewLogger(t), waldir, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = os.Mkdir(snapdir, 0744)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ents := make([]raftpb.Entry, 0)
|
||||
|
||||
// append entries into wal log
|
||||
appendConfigChangeEnts(&ents)
|
||||
appendNormalRequestEnts(&ents)
|
||||
appendNormalIRREnts(&ents)
|
||||
appendUnknownNormalEnts(&ents)
|
||||
|
||||
// force commit newly appended entries
|
||||
err = w.Save(raftpb.HardState{}, ents)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.Close()
|
||||
mustCreateWalLog(t, p)
|
||||
|
||||
argtests := []struct {
|
||||
name string
|
||||
|
@ -128,6 +97,41 @@ func TestEtcdDumpLogEntryType(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func mustCreateWalLog(t *testing.T, path string) {
|
||||
memberdir := filepath.Join(path, "member")
|
||||
err := os.Mkdir(memberdir, 0744)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
waldir := walDir(path)
|
||||
snapdir := snapDir(path)
|
||||
|
||||
w, err := wal.Create(zaptest.NewLogger(t), waldir, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = os.Mkdir(snapdir, 0744)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ents := make([]raftpb.Entry, 0)
|
||||
|
||||
// append entries into wal log
|
||||
appendConfigChangeEnts(&ents)
|
||||
appendNormalRequestEnts(&ents)
|
||||
appendNormalIRREnts(&ents)
|
||||
appendUnknownNormalEnts(&ents)
|
||||
|
||||
// force commit newly appended entries
|
||||
err = w.Save(raftpb.HardState{}, ents)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.Close()
|
||||
}
|
||||
|
||||
func appendConfigChangeEnts(ents *[]raftpb.Entry) {
|
||||
configChangeData := []raftpb.ConfChange{
|
||||
{ID: 1, Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: []byte("")},
|
||||
|
|
|
@ -56,8 +56,10 @@ IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke, IRRLeaseCheckpoint`)
|
|||
streamdecoder := flag.String("stream-decoder", "", `The name of an executable decoding tool, the executable must process
|
||||
hex encoded lines of binary input (from etcd-dump-logs)
|
||||
and output a hex encoded line of binary for each input line`)
|
||||
raw := flag.Bool("raw", false, "Read the logs in the low-level form")
|
||||
|
||||
flag.Parse()
|
||||
lg := zap.NewExample()
|
||||
|
||||
if len(flag.Args()) != 1 {
|
||||
log.Fatalf("Must provide data-dir argument (got %+v)", flag.Args())
|
||||
|
@ -68,6 +70,37 @@ and output a hex encoded line of binary for each input line`)
|
|||
log.Fatal("start-snap and start-index flags cannot be used together.")
|
||||
}
|
||||
|
||||
if !*raw {
|
||||
ents := readUsingReadAll(lg, index, snapfile, dataDir, waldir)
|
||||
|
||||
fmt.Printf("WAL entries: %d\n", len(ents))
|
||||
if len(ents) > 0 {
|
||||
fmt.Printf("lastIndex=%d\n", ents[len(ents)-1].Index)
|
||||
}
|
||||
|
||||
fmt.Printf("%4s\t%10s\ttype\tdata", "term", "index")
|
||||
if *streamdecoder != "" {
|
||||
fmt.Print("\tdecoder_status\tdecoded_data")
|
||||
}
|
||||
fmt.Println()
|
||||
|
||||
listEntriesType(*entrytype, *streamdecoder, ents)
|
||||
} else {
|
||||
if *snapfile != "" ||
|
||||
*entrytype != defaultEntryTypes ||
|
||||
*streamdecoder != "" {
|
||||
log.Fatalf("Flags --entry-type, --stream-decoder, --entrytype not supported in the RAW mode.")
|
||||
}
|
||||
|
||||
wd := *waldir
|
||||
if wd == "" {
|
||||
wd = walDir(dataDir)
|
||||
}
|
||||
readRaw(lg, index, wd, os.Stdout)
|
||||
}
|
||||
}
|
||||
|
||||
func readUsingReadAll(lg *zap.Logger, index *uint64, snapfile *string, dataDir string, waldir *string) []raftpb.Entry {
|
||||
var (
|
||||
walsnap walpb.Snapshot
|
||||
snapshot *raftpb.Snapshot
|
||||
|
@ -84,7 +117,7 @@ and output a hex encoded line of binary for each input line`)
|
|||
ss := snap.New(zap.NewExample(), snapDir(dataDir))
|
||||
snapshot, err = ss.Load()
|
||||
} else {
|
||||
snapshot, err = snap.Read(zap.NewExample(), filepath.Join(snapDir(dataDir), *snapfile))
|
||||
snapshot, err = snap.Read(lg, filepath.Join(snapDir(dataDir), *snapfile))
|
||||
}
|
||||
|
||||
switch err {
|
||||
|
@ -123,19 +156,7 @@ and output a hex encoded line of binary for each input line`)
|
|||
vid := types.ID(state.Vote)
|
||||
fmt.Printf("WAL metadata:\nnodeID=%s clusterID=%s term=%d commitIndex=%d vote=%s\n",
|
||||
id, cid, state.Term, state.Commit, vid)
|
||||
|
||||
fmt.Printf("WAL entries: %d\n", len(ents))
|
||||
if len(ents) > 0 {
|
||||
fmt.Printf("lastIndex=%d\n", ents[len(ents)-1].Index)
|
||||
}
|
||||
|
||||
fmt.Printf("%4s\t%10s\ttype\tdata", "term", "index")
|
||||
if *streamdecoder != "" {
|
||||
fmt.Print("\tdecoder_status\tdecoded_data")
|
||||
}
|
||||
fmt.Println()
|
||||
|
||||
listEntriesType(*entrytype, *streamdecoder, ents)
|
||||
return ents
|
||||
}
|
||||
|
||||
func walDir(dataDir string) string { return filepath.Join(dataDir, "member", "wal") }
|
||||
|
@ -360,7 +381,7 @@ func listEntriesType(entrytype string, streamdecoder string, ents []raftpb.Entry
|
|||
printer(e)
|
||||
if streamdecoder == "" {
|
||||
fmt.Println()
|
||||
continue
|
||||
//continue
|
||||
}
|
||||
|
||||
// if decoder is set, pass the e.Data to stdin and read the stdout from decoder
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||
"go.etcd.io/etcd/pkg/v3/pbutil"
|
||||
"go.etcd.io/etcd/server/v3/storage/wal"
|
||||
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
|
||||
"go.etcd.io/raft/v3/raftpb"
|
||||
)
|
||||
|
||||
func readRaw(lg *zap.Logger, fromIndex *uint64, waldir string, out io.Writer) {
|
||||
var walReaders []fileutil.FileReader
|
||||
files, err := ioutil.ReadDir(waldir)
|
||||
if err != nil {
|
||||
lg.Fatal("Failed to read directory.", zap.String("directory", waldir), zap.Error(err))
|
||||
}
|
||||
for _, finfo := range files {
|
||||
if filepath.Ext(finfo.Name()) != ".wal" {
|
||||
lg.Warn("Ignoring not .wal file", zap.String("filename", finfo.Name()))
|
||||
}
|
||||
f, err := os.Open(filepath.Join(waldir, finfo.Name()))
|
||||
if err != nil {
|
||||
lg.Fatal("Failed to read file", zap.String("filename", finfo.Name()), zap.Error(err))
|
||||
}
|
||||
walReaders = append(walReaders, fileutil.NewFileReader(f))
|
||||
}
|
||||
decoder := wal.NewDecoderAdvanced(true, walReaders...)
|
||||
// The variable is used to not pollute log with multiple continuous crc errors.
|
||||
crcDesync := false
|
||||
for {
|
||||
rec := walpb.Record{}
|
||||
err := decoder.Decode(&rec)
|
||||
if err == nil || errors.Is(err, walpb.ErrCRCMismatch) {
|
||||
if err != nil && !crcDesync {
|
||||
lg.Warn("Reading entry failed with CRC error", zap.Error(err))
|
||||
crcDesync = true
|
||||
}
|
||||
printRec(lg, &rec, fromIndex, out)
|
||||
if rec.Type == wal.CrcType {
|
||||
decoder.UpdateCRC(rec.Crc)
|
||||
crcDesync = false
|
||||
}
|
||||
continue
|
||||
}
|
||||
if errors.Is(err, io.EOF) {
|
||||
lg.Info("EOF: All entries were processed")
|
||||
break
|
||||
} else {
|
||||
lg.Error("Reading failed", zap.Error(err))
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func printRec(lg *zap.Logger, rec *walpb.Record, fromIndex *uint64, out io.Writer) {
|
||||
switch rec.Type {
|
||||
case wal.MetadataType:
|
||||
var metadata etcdserverpb.Metadata
|
||||
pbutil.MustUnmarshal(&metadata, rec.Data)
|
||||
fmt.Fprintf(out, "Metadata: %s\n", metadata.String())
|
||||
case wal.CrcType:
|
||||
fmt.Fprintf(out, "CRC: %d\n", rec.Crc)
|
||||
case wal.EntryType:
|
||||
e := wal.MustUnmarshalEntry(rec.Data)
|
||||
if fromIndex == nil || e.Index >= *fromIndex {
|
||||
fmt.Fprintf(out, "Entry: %s\n", e.String())
|
||||
}
|
||||
case wal.SnapshotType:
|
||||
var snap walpb.Snapshot
|
||||
pbutil.MustUnmarshal(&snap, rec.Data)
|
||||
if fromIndex == nil || snap.Index >= *fromIndex {
|
||||
fmt.Fprintf(out, "Snapshot: %s\n", snap.String())
|
||||
}
|
||||
case wal.StateType:
|
||||
var state raftpb.HardState
|
||||
pbutil.MustUnmarshal(&state, rec.Data)
|
||||
if fromIndex == nil || state.Commit >= *fromIndex {
|
||||
fmt.Fprintf(out, "HardState: %s\n", state.String())
|
||||
}
|
||||
default:
|
||||
lg.Error("Unexpected WAL log type", zap.Int64("type", rec.Type))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func Test_readRaw(t *testing.T) {
|
||||
path := t.TempDir()
|
||||
mustCreateWalLog(t, path)
|
||||
var out bytes.Buffer
|
||||
readRaw(zaptest.NewLogger(t), nil, walDir(path), &out)
|
||||
assert.Equal(t,
|
||||
`CRC: 0
|
||||
Metadata:
|
||||
Snapshot:
|
||||
Entry: Term:1 Index:1 Type:EntryConfChange Data:"\010\001\020\000\030\002\"\000"
|
||||
Entry: Term:2 Index:2 Type:EntryConfChange Data:"\010\002\020\001\030\002\"\000"
|
||||
Entry: Term:2 Index:3 Type:EntryConfChange Data:"\010\003\020\002\030\002\"\000"
|
||||
Entry: Term:2 Index:4 Type:EntryConfChange Data:"\010\004\020\003\030\003\"\000"
|
||||
Entry: Term:3 Index:5 Data:"\010\000\022\000\032\006/path0\"\030{\"hey\":\"ho\",\"hi\":[\"yo\"]}(\0012\0008\000@\000H\tP\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000"
|
||||
Entry: Term:3 Index:6 Data:"\010\001\022\004QGET\032\006/path1\"\023{\"0\":\"1\",\"2\":[\"3\"]}(\0002\0008\000@\000H\tP\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000"
|
||||
Entry: Term:3 Index:7 Data:"\010\002\022\004SYNC\032\006/path2\"\023{\"0\":\"1\",\"2\":[\"3\"]}(\0002\0008\000@\000H\002P\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000"
|
||||
Entry: Term:3 Index:8 Data:"\010\003\022\006DELETE\032\006/path3\"\030{\"hey\":\"ho\",\"hi\":[\"yo\"]}(\0002\0008\000@\001H\002P\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000"
|
||||
Entry: Term:3 Index:9 Data:"\010\004\022\006RANDOM\032\246\001/path4/superlong/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path\"\030{\"hey\":\"ho\",\"hi\":[\"yo\"]}(\0002\0008\000@\000H\002P\000X\001`+"`"+`\000h\000p\000x\001\200\001\000\210\001\000"
|
||||
Entry: Term:4 Index:10 Data:"\010\005\032\025\n\0011\022\002hi\030\006 \001(\001X\240\234\001h\240\234\001"
|
||||
Entry: Term:5 Index:11 Data:"\010\006\"\020\n\004foo1\022\004bar1\030\0010\001"
|
||||
Entry: Term:6 Index:12 Data:"\010\007*\010\n\0010\022\0019\030\001"
|
||||
Entry: Term:7 Index:13 Data:"\010\0102\024\022\010\032\006\n\001a\022\001b\032\010\032\006\n\001a\022\001b"
|
||||
Entry: Term:8 Index:14 Data:"\010\t:\002\020\001"
|
||||
Entry: Term:9 Index:15 Data:"\010\nB\004\010\001\020\001"
|
||||
Entry: Term:10 Index:16 Data:"\010\013J\002\010\002"
|
||||
Entry: Term:11 Index:17 Data:"\010\014R\006\010\003\020\004\030\005"
|
||||
Entry: Term:12 Index:18 Data:"\010\r\302>\000"
|
||||
Entry: Term:13 Index:19 Data:"\010\016\232?\000"
|
||||
Entry: Term:14 Index:20 Data:"\010\017\242?\031\n\006myname\022\010password\032\005token"
|
||||
Entry: Term:15 Index:21 Data:"\010\020\342D\020\n\005name1\022\005pass1\032\000"
|
||||
Entry: Term:16 Index:22 Data:"\010\021\352D\007\n\005name1"
|
||||
Entry: Term:17 Index:23 Data:"\010\022\362D\007\n\005name1"
|
||||
Entry: Term:18 Index:24 Data:"\010\023\372D\016\n\005name1\022\005pass2"
|
||||
Entry: Term:19 Index:25 Data:"\010\024\202E\016\n\005user1\022\005role1"
|
||||
Entry: Term:20 Index:26 Data:"\010\025\212E\016\n\005user2\022\005role2"
|
||||
Entry: Term:21 Index:27 Data:"\010\026\222E\000"
|
||||
Entry: Term:22 Index:28 Data:"\010\027\232E\000"
|
||||
Entry: Term:23 Index:29 Data:"\010\030\202K\007\n\005role2"
|
||||
Entry: Term:24 Index:30 Data:"\010\031\212K\007\n\005role1"
|
||||
Entry: Term:25 Index:31 Data:"\010\032\222K\007\n\005role3"
|
||||
Entry: Term:26 Index:32 Data:"\010\033\232K\033\n\005role3\022\022\010\001\022\004Keys\032\010RangeEnd"
|
||||
Entry: Term:27 Index:33 Data:"\010\034\242K\026\n\005role3\022\003key\032\010rangeend"
|
||||
Entry: Term:27 Index:34 Data:"?"
|
||||
`, out.String())
|
||||
}
|
Loading…
Reference in New Issue