functional/agent: handle "embed.Etcd", logger sync

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
release-3.4
Gyuho Lee 2018-04-14 20:14:03 -07:00
parent 575cf94852
commit 33128104c0
2 changed files with 94 additions and 47 deletions

View File

@ -25,6 +25,7 @@ import (
"syscall"
"time"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/functional/rpcpb"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/proxy"
@ -99,19 +100,21 @@ func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Respons
}
srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir))
if err = srv.createEtcdLogFile(); err != nil {
return nil, err
if srv.etcdServer == nil {
if err = srv.createEtcdLogFile(); err != nil {
return nil, err
}
}
srv.creatEtcdCmd(false)
if err = srv.creatEtcd(false); err != nil {
return nil, err
}
if err = srv.saveTLSAssets(); err != nil {
return nil, err
}
if err = srv.startEtcdCmd(); err != nil {
if err = srv.startEtcd(); err != nil {
return nil, err
}
srv.lg.Info("started etcd", zap.String("command-path", srv.etcdCmd.Path))
if err = srv.loadAutoTLSAssets(); err != nil {
return nil, err
}
@ -224,30 +227,47 @@ func (srv *Server) stopProxy() {
func (srv *Server) createEtcdLogFile() error {
var err error
srv.etcdLogFile, err = os.Create(srv.Member.EtcdLogPath)
srv.etcdLogFile, err = os.Create(srv.Member.Etcd.LogOutput)
if err != nil {
return err
}
srv.lg.Info("created etcd log file", zap.String("path", srv.Member.EtcdLogPath))
srv.lg.Info("created etcd log file", zap.String("path", srv.Member.Etcd.LogOutput))
return nil
}
func (srv *Server) creatEtcdCmd(fromSnapshot bool) {
etcdPath, etcdFlags := srv.Member.EtcdExec, srv.Member.Etcd.Flags()
if fromSnapshot {
etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags()
func (srv *Server) creatEtcd(fromSnapshot bool) error {
if !fileutil.Exist(srv.Member.EtcdExec) || srv.Member.EtcdExec != "embed" {
return fmt.Errorf("unknown etcd exec %q or path does not exist", srv.Member.EtcdExec)
}
u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
srv.lg.Info("creating etcd command",
zap.String("etcd-exec", etcdPath),
zap.Strings("etcd-flags", etcdFlags),
zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
zap.String("failpoint-addr", u.Host),
)
srv.etcdCmd = exec.Command(etcdPath, etcdFlags...)
srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host}
srv.etcdCmd.Stdout = srv.etcdLogFile
srv.etcdCmd.Stderr = srv.etcdLogFile
if fileutil.Exist(srv.Member.EtcdExec) && srv.Member.EtcdExec != "embed" {
etcdPath, etcdFlags := srv.Member.EtcdExec, srv.Member.Etcd.Flags()
if fromSnapshot {
etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags()
}
u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
srv.lg.Info("creating etcd command",
zap.String("etcd-exec", etcdPath),
zap.Strings("etcd-flags", etcdFlags),
zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
zap.String("failpoint-addr", u.Host),
)
srv.etcdCmd = exec.Command(etcdPath, etcdFlags...)
srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host}
srv.etcdCmd.Stdout = srv.etcdLogFile
srv.etcdCmd.Stderr = srv.etcdLogFile
} else if srv.Member.EtcdExec == "embed" {
cfg, err := srv.Member.Etcd.EmbedConfig()
if err != nil {
return err
}
srv.etcdServer, err = embed.StartEtcd(cfg)
if err != nil {
return err
}
// TODO: set up logging
}
return nil
}
// if started with manual TLS, stores TLS assets
@ -322,7 +342,6 @@ func (srv *Server) saveTLSAssets() error {
zap.String("client-trusted-ca", srv.Member.ClientTrustedCAPath),
)
}
return nil
}
@ -413,8 +432,22 @@ func (srv *Server) loadAutoTLSAssets() error {
}
// start but do not wait for it to complete
func (srv *Server) startEtcdCmd() error {
return srv.etcdCmd.Start()
func (srv *Server) startEtcd() error {
if srv.etcdCmd != nil {
srv.lg.Info(
"started etcd",
zap.String("command-path", srv.etcdCmd.Path),
)
return srv.etcdCmd.Start()
}
select {
case <-srv.etcdServer.Server.ReadyNotify():
srv.lg.Info("started embedded etcd")
case <-time.After(time.Minute):
srv.etcdServer.Close()
return fmt.Errorf("took too long to start %v", <-srv.etcdServer.Err())
}
return nil
}
func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
@ -426,15 +459,15 @@ func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
}
}
srv.creatEtcdCmd(false)
if err = srv.creatEtcd(false); err != nil {
return nil, err
}
if err = srv.saveTLSAssets(); err != nil {
return nil, err
}
if err = srv.startEtcdCmd(); err != nil {
if err = srv.startEtcd(); err != nil {
return nil, err
}
srv.lg.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path))
if err = srv.loadAutoTLSAssets(); err != nil {
return nil, err
}
@ -479,8 +512,12 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error
}
srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
if srv.etcdServer != nil {
srv.etcdServer.GetLogger().Sync()
} else {
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
}
// for debugging purposes, rename instead of removing
if err = os.RemoveAll(srv.Member.BaseDir + ".backup"); err != nil {
@ -502,9 +539,6 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error
return nil, err
}
}
if err = srv.createEtcdLogFile(); err != nil {
return nil, err
}
return &rpcpb.Response{
Success: true,
@ -537,15 +571,15 @@ func (srv *Server) handle_RESTORE_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response,
}
func (srv *Server) handle_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err error) {
srv.creatEtcdCmd(true)
if err = srv.creatEtcd(true); err != nil {
return nil, err
}
if err = srv.saveTLSAssets(); err != nil {
return nil, err
}
if err = srv.startEtcdCmd(); err != nil {
if err = srv.startEtcd(); err != nil {
return nil, err
}
srv.lg.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path))
if err = srv.loadAutoTLSAssets(); err != nil {
return nil, err
}
@ -576,21 +610,27 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, erro
}
srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
if srv.etcdServer != nil {
srv.etcdServer.GetLogger().Sync()
} else {
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
}
// TODO: support separate WAL directory
if err = archive(
srv.Member.BaseDir,
srv.Member.EtcdLogPath,
srv.Member.Etcd.LogOutput,
srv.Member.Etcd.DataDir,
); err != nil {
return nil, err
}
srv.lg.Info("archived data", zap.String("base-dir", srv.Member.BaseDir))
if err = srv.createEtcdLogFile(); err != nil {
return nil, err
if srv.etcdServer == nil {
if err = srv.createEtcdLogFile(); err != nil {
return nil, err
}
}
srv.lg.Info("cleaning up page cache")
@ -615,8 +655,12 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.
}
srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
if srv.etcdServer != nil {
srv.etcdServer.GetLogger().Sync()
} else {
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
}
err = os.RemoveAll(srv.Member.BaseDir)
if err != nil {

View File

@ -21,6 +21,7 @@ import (
"os/exec"
"strings"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/functional/rpcpb"
"github.com/coreos/etcd/pkg/proxy"
@ -33,8 +34,9 @@ import (
// no need to lock fields since request operations are
// serialized in tester-side
type Server struct {
lg *zap.Logger
grpcServer *grpc.Server
lg *zap.Logger
network string
address string
@ -46,6 +48,7 @@ type Server struct {
*rpcpb.Member
*rpcpb.Tester
etcdServer *embed.Etcd
etcdCmd *exec.Cmd
etcdLogFile *os.File