Merge pull request #1810 from xiang90/purge

*: support purging old wal/snap files
release-2.0
Xiang Li 2014-12-01 12:05:05 -08:00
commit 7beac083ff
8 changed files with 145 additions and 18 deletions

View File

@ -63,6 +63,8 @@ var (
snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
printVersion = fs.Bool("version", false, "Print the version and exit")
forceNewCluster = fs.Bool("force-new-cluster", false, "Force to create a new one member cluster")
maxSnapFiles = fs.Uint("max-snapshots", 5, "Maximum number of snapshot files to retain (0 is unlimited)")
maxWalFiles = fs.Uint("max-wals", 5, "Maximum number of wal files to retain (0 is unlimited)")
initialCluster = fs.String("initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping")
initialClusterToken = fs.String("initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap")
@ -280,6 +282,8 @@ func startEtcd() (<-chan struct{}, error) {
PeerURLs: apurls,
DataDir: *dir,
SnapCount: *snapCount,
MaxSnapFiles: *maxSnapFiles,
MaxWALFiles: *maxWalFiles,
Cluster: cls,
DiscoveryURL: *durl,
DiscoveryProxy: *dproxy,

View File

@ -37,6 +37,8 @@ type ServerConfig struct {
PeerURLs types.URLs
DataDir string
SnapCount uint64
MaxSnapFiles uint
MaxWALFiles uint
Cluster *Cluster
NewCluster bool
ForceNewCluster bool

View File

@ -37,6 +37,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/migrate"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/pkg/wait"
@ -59,6 +60,8 @@ const (
StoreAdminPrefix = "/0"
StoreKeysPrefix = "/1"
purgeFileInterval = 30 * time.Second
)
var (
@ -157,6 +160,7 @@ type RaftTimer interface {
// EtcdServer is the production implementation of the Server interface
type EtcdServer struct {
cfg *ServerConfig
w wait.Wait
done chan struct{}
stop chan struct{}
@ -301,6 +305,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
lstats := stats.NewLeaderStats(id.String())
srv := &EtcdServer{
cfg: cfg,
store: st,
node: n,
raftStorage: s,
@ -327,6 +332,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
func (s *EtcdServer) Start() {
s.start()
go s.publish(defaultPublishRetryInterval)
go s.purgeFile()
}
// start prepares and starts server in a new goroutine. It is no longer safe to
@ -346,6 +352,24 @@ func (s *EtcdServer) start() {
go s.run()
}
func (s *EtcdServer) purgeFile() {
var serrc, werrc <-chan error
if s.cfg.MaxSnapFiles > 0 {
serrc = fileutil.PurgeFile(s.cfg.SnapDir(), "snap", s.cfg.MaxSnapFiles, purgeFileInterval, s.done)
}
if s.cfg.MaxWALFiles > 0 {
werrc = fileutil.PurgeFile(s.cfg.WALDir(), "wal", s.cfg.MaxWALFiles, purgeFileInterval, s.done)
}
select {
case e := <-werrc:
log.Fatalf("etcdserver: failed to purge wal file %v", e)
case e := <-serrc:
log.Fatalf("etcdserver: failed to purge snap file %v", e)
case <-s.done:
return
}
}
func (s *EtcdServer) ID() types.ID { return s.id }
func (s *EtcdServer) SenderFinder() rafthttp.SenderFinder { return s.sendhub }

View File

@ -35,3 +35,17 @@ func IsDirWriteable(dir string) error {
}
return os.Remove(f)
}
// ReadDir returns the filenames in the given directory.
func ReadDir(dirpath string) ([]string, error) {
dir, err := os.Open(dirpath)
if err != nil {
return nil, err
}
defer dir.Close()
names, err := dir.Readdirnames(-1)
if err != nil {
return nil, err
}
return names, nil
}

46
pkg/fileutil/purge.go Normal file
View File

@ -0,0 +1,46 @@
package fileutil
import (
"log"
"os"
"path"
"sort"
"strings"
"time"
)
func PurgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) <-chan error {
errC := make(chan error, 1)
go func() {
for {
fnames, err := ReadDir(dirname)
if err != nil {
errC <- err
return
}
newfnames := make([]string, 0)
for _, fname := range fnames {
if strings.HasSuffix(fname, suffix) {
newfnames = append(newfnames, fname)
}
}
sort.Strings(newfnames)
for len(newfnames) > int(max) {
f := path.Join(dirname, newfnames[0])
err := os.Remove(f)
if err != nil {
errC <- err
return
}
log.Printf("filePurge: successfully remvoed file %s", f)
newfnames = newfnames[1:]
}
select {
case <-time.After(interval):
case <-stop:
return
}
}
}()
return errC
}

View File

@ -0,0 +1,50 @@
package fileutil
import (
"fmt"
"io/ioutil"
"os"
"path"
"reflect"
"testing"
"time"
)
func TestPurgeFile(t *testing.T) {
dir, err := ioutil.TempDir("", "purgefile")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
for i := 0; i < 5; i++ {
_, err := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
if err != nil {
t.Fatal(err)
}
}
stop := make(chan struct{})
errch := PurgeFile(dir, "test", 3, time.Millisecond, stop)
for i := 5; i < 10; i++ {
_, err := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond)
}
fnames, err := ReadDir(dir)
if err != nil {
t.Fatal(err)
}
wnames := []string{"7.test", "8.test", "9.test"}
if !reflect.DeepEqual(fnames, wnames) {
t.Errorf("filenames = %v, want %v", fnames, wnames)
}
select {
case err := <-errch:
t.Errorf("unexpected purge error %v", err)
case <-time.After(time.Millisecond):
}
close(stop)
}

View File

@ -19,9 +19,9 @@ package wal
import (
"fmt"
"log"
"os"
"path"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/types"
)
@ -36,7 +36,7 @@ const (
)
func DetectVersion(dirpath string) WalVersion {
names, err := readDir(dirpath)
names, err := fileutil.ReadDir(dirpath)
if err != nil || len(names) == 0 {
return WALNotExist
}
@ -56,7 +56,7 @@ func DetectVersion(dirpath string) WalVersion {
}
func Exist(dirpath string) bool {
names, err := readDir(dirpath)
names, err := fileutil.ReadDir(dirpath)
if err != nil {
return false
}
@ -97,20 +97,6 @@ func isValidSeq(names []string) bool {
return true
}
// readDir returns the filenames in wal directory.
func readDir(dirpath string) ([]string, error) {
dir, err := os.Open(dirpath)
if err != nil {
return nil, err
}
defer dir.Close()
names, err := dir.Readdirnames(-1)
if err != nil {
return nil, err
}
return names, nil
}
func checkWalNames(names []string) []string {
wnames := make([]string, 0)
for _, name := range names {

View File

@ -26,6 +26,7 @@ import (
"reflect"
"sort"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
@ -110,7 +111,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
// index. The WAL cannot be appended to before reading out all of its
// previous records.
func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
names, err := readDir(dirpath)
names, err := fileutil.ReadDir(dirpath)
if err != nil {
return nil, err
}