diff --git a/functional/agent/handler.go b/functional/agent/handler.go index d30e8d4c4..0c16a14ad 100644 --- a/functional/agent/handler.go +++ b/functional/agent/handler.go @@ -25,6 +25,7 @@ import ( "syscall" "time" + "github.com/coreos/etcd/embed" "github.com/coreos/etcd/functional/rpcpb" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/proxy" @@ -99,19 +100,21 @@ func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Respons } srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir)) - if err = srv.createEtcdLogFile(); err != nil { - return nil, err + if srv.etcdServer == nil { + if err = srv.createEtcdLogFile(); err != nil { + return nil, err + } } - srv.creatEtcdCmd(false) - + if err = srv.creatEtcd(false); err != nil { + return nil, err + } if err = srv.saveTLSAssets(); err != nil { return nil, err } - if err = srv.startEtcdCmd(); err != nil { + if err = srv.startEtcd(); err != nil { return nil, err } - srv.lg.Info("started etcd", zap.String("command-path", srv.etcdCmd.Path)) if err = srv.loadAutoTLSAssets(); err != nil { return nil, err } @@ -224,30 +227,47 @@ func (srv *Server) stopProxy() { func (srv *Server) createEtcdLogFile() error { var err error - srv.etcdLogFile, err = os.Create(srv.Member.EtcdLogPath) + srv.etcdLogFile, err = os.Create(srv.Member.Etcd.LogOutput) if err != nil { return err } - srv.lg.Info("created etcd log file", zap.String("path", srv.Member.EtcdLogPath)) + srv.lg.Info("created etcd log file", zap.String("path", srv.Member.Etcd.LogOutput)) return nil } -func (srv *Server) creatEtcdCmd(fromSnapshot bool) { - etcdPath, etcdFlags := srv.Member.EtcdExec, srv.Member.Etcd.Flags() - if fromSnapshot { - etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags() +func (srv *Server) creatEtcd(fromSnapshot bool) error { + if !fileutil.Exist(srv.Member.EtcdExec) || srv.Member.EtcdExec != "embed" { + return fmt.Errorf("unknown etcd exec %q or path does not exist", srv.Member.EtcdExec) } - u, _ := url.Parse(srv.Member.FailpointHTTPAddr) - srv.lg.Info("creating etcd command", - zap.String("etcd-exec", etcdPath), - zap.Strings("etcd-flags", etcdFlags), - zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr), - zap.String("failpoint-addr", u.Host), - ) - srv.etcdCmd = exec.Command(etcdPath, etcdFlags...) - srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host} - srv.etcdCmd.Stdout = srv.etcdLogFile - srv.etcdCmd.Stderr = srv.etcdLogFile + + if fileutil.Exist(srv.Member.EtcdExec) && srv.Member.EtcdExec != "embed" { + etcdPath, etcdFlags := srv.Member.EtcdExec, srv.Member.Etcd.Flags() + if fromSnapshot { + etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags() + } + u, _ := url.Parse(srv.Member.FailpointHTTPAddr) + srv.lg.Info("creating etcd command", + zap.String("etcd-exec", etcdPath), + zap.Strings("etcd-flags", etcdFlags), + zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr), + zap.String("failpoint-addr", u.Host), + ) + srv.etcdCmd = exec.Command(etcdPath, etcdFlags...) + srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host} + srv.etcdCmd.Stdout = srv.etcdLogFile + srv.etcdCmd.Stderr = srv.etcdLogFile + } else if srv.Member.EtcdExec == "embed" { + cfg, err := srv.Member.Etcd.EmbedConfig() + if err != nil { + return err + } + srv.etcdServer, err = embed.StartEtcd(cfg) + if err != nil { + return err + } + // TODO: set up logging + } + return nil } // if started with manual TLS, stores TLS assets @@ -322,7 +342,6 @@ func (srv *Server) saveTLSAssets() error { zap.String("client-trusted-ca", srv.Member.ClientTrustedCAPath), ) } - return nil } @@ -413,8 +432,22 @@ func (srv *Server) loadAutoTLSAssets() error { } // start but do not wait for it to complete -func (srv *Server) startEtcdCmd() error { - return srv.etcdCmd.Start() +func (srv *Server) startEtcd() error { + if srv.etcdCmd != nil { + srv.lg.Info( + "started etcd", + zap.String("command-path", srv.etcdCmd.Path), + ) + return srv.etcdCmd.Start() + } + select { + case <-srv.etcdServer.Server.ReadyNotify(): + srv.lg.Info("started embedded etcd") + case <-time.After(time.Minute): + srv.etcdServer.Close() + return fmt.Errorf("took too long to start %v", <-srv.etcdServer.Err()) + } + return nil } func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) { @@ -426,15 +459,15 @@ func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) { } } - srv.creatEtcdCmd(false) - + if err = srv.creatEtcd(false); err != nil { + return nil, err + } if err = srv.saveTLSAssets(); err != nil { return nil, err } - if err = srv.startEtcdCmd(); err != nil { + if err = srv.startEtcd(); err != nil { return nil, err } - srv.lg.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path)) if err = srv.loadAutoTLSAssets(); err != nil { return nil, err } @@ -479,8 +512,12 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error } srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String())) - srv.etcdLogFile.Sync() - srv.etcdLogFile.Close() + if srv.etcdServer != nil { + srv.etcdServer.GetLogger().Sync() + } else { + srv.etcdLogFile.Sync() + srv.etcdLogFile.Close() + } // for debugging purposes, rename instead of removing if err = os.RemoveAll(srv.Member.BaseDir + ".backup"); err != nil { @@ -502,9 +539,6 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error return nil, err } } - if err = srv.createEtcdLogFile(); err != nil { - return nil, err - } return &rpcpb.Response{ Success: true, @@ -537,15 +571,15 @@ func (srv *Server) handle_RESTORE_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, } func (srv *Server) handle_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err error) { - srv.creatEtcdCmd(true) - + if err = srv.creatEtcd(true); err != nil { + return nil, err + } if err = srv.saveTLSAssets(); err != nil { return nil, err } - if err = srv.startEtcdCmd(); err != nil { + if err = srv.startEtcd(); err != nil { return nil, err } - srv.lg.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path)) if err = srv.loadAutoTLSAssets(); err != nil { return nil, err } @@ -576,21 +610,27 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, erro } srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String())) - srv.etcdLogFile.Sync() - srv.etcdLogFile.Close() + if srv.etcdServer != nil { + srv.etcdServer.GetLogger().Sync() + } else { + srv.etcdLogFile.Sync() + srv.etcdLogFile.Close() + } // TODO: support separate WAL directory if err = archive( srv.Member.BaseDir, - srv.Member.EtcdLogPath, + srv.Member.Etcd.LogOutput, srv.Member.Etcd.DataDir, ); err != nil { return nil, err } srv.lg.Info("archived data", zap.String("base-dir", srv.Member.BaseDir)) - if err = srv.createEtcdLogFile(); err != nil { - return nil, err + if srv.etcdServer == nil { + if err = srv.createEtcdLogFile(); err != nil { + return nil, err + } } srv.lg.Info("cleaning up page cache") @@ -615,8 +655,12 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb. } srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String())) - srv.etcdLogFile.Sync() - srv.etcdLogFile.Close() + if srv.etcdServer != nil { + srv.etcdServer.GetLogger().Sync() + } else { + srv.etcdLogFile.Sync() + srv.etcdLogFile.Close() + } err = os.RemoveAll(srv.Member.BaseDir) if err != nil { diff --git a/functional/agent/server.go b/functional/agent/server.go index d6313d955..7f8ec98b5 100644 --- a/functional/agent/server.go +++ b/functional/agent/server.go @@ -21,6 +21,7 @@ import ( "os/exec" "strings" + "github.com/coreos/etcd/embed" "github.com/coreos/etcd/functional/rpcpb" "github.com/coreos/etcd/pkg/proxy" @@ -33,8 +34,9 @@ import ( // no need to lock fields since request operations are // serialized in tester-side type Server struct { + lg *zap.Logger + grpcServer *grpc.Server - lg *zap.Logger network string address string @@ -46,6 +48,7 @@ type Server struct { *rpcpb.Member *rpcpb.Tester + etcdServer *embed.Etcd etcdCmd *exec.Cmd etcdLogFile *os.File