etcdserver: support structured logging

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
release-3.4
Gyuho Lee 2018-04-15 22:40:44 -07:00
parent f879c1de33
commit bdbed26f64
26 changed files with 1804 additions and 356 deletions

View File

@ -18,6 +18,7 @@ import (
"sync"
"github.com/coreos/etcd/version"
"go.uber.org/zap"
"github.com/coreos/go-semver/semver"
"github.com/coreos/pkg/capnslog"
@ -56,7 +57,7 @@ func init() {
}
// UpdateCapability updates the enabledMap when the cluster version increases.
func UpdateCapability(v *semver.Version) {
func UpdateCapability(lg *zap.Logger, v *semver.Version) {
if v == nil {
// if recovered but version was never set by cluster
return
@ -69,7 +70,15 @@ func UpdateCapability(v *semver.Version) {
curVersion = v
enabledMap = capabilityMaps[curVersion.String()]
enableMapMu.Unlock()
plog.Infof("enabled capabilities for version %s", version.Cluster(v.String()))
if lg != nil {
lg.Info(
"enabled capabilities for version",
zap.String("cluster-version", version.Cluster(v.String())),
)
} else {
plog.Infof("enabled capabilities for version %s", version.Cluster(v.String()))
}
}
func IsCapabilityEnabled(c Capability) bool {

View File

@ -27,6 +27,7 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
)
type fakeStats struct{}
@ -36,12 +37,13 @@ func (s *fakeStats) LeaderStats() []byte { return nil }
func (s *fakeStats) StoreStats() []byte { return nil }
type v2v3Server struct {
lg *zap.Logger
c *clientv3.Client
store *v2v3Store
fakeStats
}
func NewServer(c *clientv3.Client, pfx string) etcdserver.ServerPeer {
func NewServer(lg *zap.Logger, c *clientv3.Client, pfx string) etcdserver.ServerPeer {
return &v2v3Server{c: c, store: newStore(c, pfx)}
}
@ -106,7 +108,7 @@ func (s *v2v3Server) Cluster() api.Cluster { return s }
func (s *v2v3Server) Alarms() []*pb.AlarmMember { return nil }
func (s *v2v3Server) Do(ctx context.Context, r pb.Request) (etcdserver.Response, error) {
applier := etcdserver.NewApplierV2(s.store, nil)
applier := etcdserver.NewApplierV2(s.lg, s.store, nil)
reqHandler := etcdserver.NewStoreRequestV2Handler(s.store, applier)
req := (*etcdserver.RequestV2)(&r)
resp, err := req.Handle(ctx, reqHandler)

View File

@ -22,15 +22,18 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
"go.uber.org/zap"
)
type LeaseServer struct {
lg *zap.Logger
hdr header
le etcdserver.Lessor
}
func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
return &LeaseServer{le: s, hdr: newHeader(s)}
return &LeaseServer{lg: s.Cfg.Logger, le: s, hdr: newHeader(s)}
}
func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
@ -108,9 +111,17 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
}
if err != nil {
if isClientCtxErr(stream.Context().Err(), err) {
plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
if ls.lg != nil {
ls.lg.Debug("failed to receive lease keepalive request from gRPC stream", zap.Error(err))
} else {
plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
}
} else {
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
if ls.lg != nil {
ls.lg.Warn("failed to receive lease keepalive request from gRPC stream", zap.Error(err))
} else {
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
}
}
return err
}
@ -138,9 +149,17 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
err = stream.Send(resp)
if err != nil {
if isClientCtxErr(stream.Context().Err(), err) {
plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
if ls.lg != nil {
ls.lg.Debug("failed to send lease keepalive response to gRPC stream", zap.Error(err))
} else {
plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
}
} else {
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
if ls.lg != nil {
ls.lg.Warn("failed to send lease keepalive response to gRPC stream", zap.Error(err))
} else {
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
}
}
return err
}

View File

@ -27,6 +27,8 @@ import (
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/version"
"go.uber.org/zap"
)
type KVGetter interface {
@ -54,6 +56,7 @@ type AuthGetter interface {
}
type maintenanceServer struct {
lg *zap.Logger
rg etcdserver.RaftStatusGetter
kg KVGetter
bg BackendGetter
@ -63,18 +66,30 @@ type maintenanceServer struct {
}
func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
srv := &maintenanceServer{rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s)}
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s)}
return &authMaintenanceServer{srv, s}
}
func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
plog.Noticef("starting to defragment the storage backend...")
if ms.lg != nil {
ms.lg.Info("starting defragment")
} else {
plog.Noticef("starting to defragment the storage backend...")
}
err := ms.bg.Backend().Defrag()
if err != nil {
plog.Errorf("failed to defragment the storage backend (%v)", err)
if ms.lg != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
} else {
plog.Errorf("failed to defragment the storage backend (%v)", err)
}
return nil, err
}
plog.Noticef("finished defragmenting the storage backend")
if ms.lg != nil {
ms.lg.Info("finished defragment")
} else {
plog.Noticef("finished defragmenting the storage backend")
}
return &pb.DefragmentResponse{}, nil
}
@ -87,7 +102,11 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
go func() {
snap.WriteTo(pw)
if err := snap.Close(); err != nil {
plog.Errorf("error closing snapshot (%v)", err)
if ms.lg != nil {
ms.lg.Warn("failed to close snapshot", zap.Error(err))
} else {
plog.Errorf("error closing snapshot (%v)", err)
}
}
pw.Close()
}()

View File

@ -52,7 +52,7 @@ func (qa *quotaAlarmer) check(ctx context.Context, r interface{}) error {
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return &quotaKVServer{
NewKVServer(s),
quotaAlarmer{etcdserver.NewBackendQuota(s), s, s.ID()},
quotaAlarmer{etcdserver.NewBackendQuota(s, "kv"), s, s.ID()},
}
}
@ -85,6 +85,6 @@ func (s *quotaLeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequ
func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
return &quotaLeaseServer{
NewLeaseServer(s),
quotaAlarmer{etcdserver.NewBackendQuota(s), s, s.ID()},
quotaAlarmer{etcdserver.NewBackendQuota(s, "lease"), s, s.ID()},
}
}

View File

@ -27,6 +27,8 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)
type watchServer struct {
@ -36,6 +38,8 @@ type watchServer struct {
watchable mvcc.WatchableKV
ag AuthGetter
lg *zap.Logger
}
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
@ -45,6 +49,7 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
sg: s,
watchable: s.Watchable(),
ag: s,
lg: s.Cfg.Logger,
}
}
@ -114,6 +119,8 @@ type serverWatchStream struct {
wg sync.WaitGroup
ag AuthGetter
lg *zap.Logger
}
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
@ -133,6 +140,8 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
closec: make(chan struct{}),
ag: ws.ag,
lg: ws.lg,
}
sws.wg.Add(1)
@ -149,9 +158,17 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
go func() {
if rerr := sws.recvLoop(); rerr != nil {
if isClientCtxErr(stream.Context().Err(), rerr) {
plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
if sws.lg != nil {
sws.lg.Debug("failed to receive watch request from gRPC stream", zap.Error(err))
} else {
plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
}
} else {
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
if sws.lg != nil {
sws.lg.Warn("failed to receive watch request from gRPC stream", zap.Error(err))
} else {
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
}
}
errc <- rerr
}
@ -355,9 +372,17 @@ func (sws *serverWatchStream) sendLoop() {
mvcc.ReportEventReceived(len(evs))
if err := sws.gRPCStream.Send(wr); err != nil {
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
if sws.lg != nil {
sws.lg.Debug("failed to send watch response to gRPC stream", zap.Error(err))
} else {
plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
}
} else {
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
if sws.lg != nil {
sws.lg.Warn("failed to send watch response to gRPC stream", zap.Error(err))
} else {
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
}
}
return
}
@ -376,9 +401,17 @@ func (sws *serverWatchStream) sendLoop() {
if err := sws.gRPCStream.Send(c); err != nil {
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
if sws.lg != nil {
sws.lg.Debug("failed to send watch control response to gRPC stream", zap.Error(err))
} else {
plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
}
} else {
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
if sws.lg != nil {
sws.lg.Warn("failed to send watch control response to gRPC stream", zap.Error(err))
} else {
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
}
}
return
}
@ -396,9 +429,17 @@ func (sws *serverWatchStream) sendLoop() {
mvcc.ReportEventReceived(len(v.Events))
if err := sws.gRPCStream.Send(v); err != nil {
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
if sws.lg != nil {
sws.lg.Debug("failed to send pending watch response to gRPC stream", zap.Error(err))
} else {
plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
}
} else {
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
if sws.lg != nil {
sws.lg.Warn("failed to send pending watch response to gRPC stream", zap.Error(err))
} else {
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
}
}
return
}

View File

@ -17,6 +17,7 @@ package etcdserver
import (
"bytes"
"context"
"fmt"
"sort"
"time"
@ -26,6 +27,7 @@ import (
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/types"
"go.uber.org/zap"
"github.com/gogo/protobuf/proto"
)
@ -107,7 +109,7 @@ func (s *EtcdServer) newApplierV3() applierV3 {
}
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
defer warnOfExpensiveRequest(time.Now(), r)
defer warnOfExpensiveRequest(a.s.getLogger(), time.Now(), r)
ar := &applyResult{}
@ -503,25 +505,39 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
if !txnPath[0] {
reqs = rt.Failure
}
lg := a.s.getLogger()
for i, req := range reqs {
respi := tresp.Responses[i].Response
switch tv := req.Request.(type) {
case *pb.RequestOp_RequestRange:
resp, err := a.Range(txn, tv.RequestRange)
if err != nil {
plog.Panicf("unexpected error during txn: %v", err)
if lg != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
} else {
plog.Panicf("unexpected error during txn: %v", err)
}
}
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
case *pb.RequestOp_RequestPut:
resp, err := a.Put(txn, tv.RequestPut)
if err != nil {
plog.Panicf("unexpected error during txn: %v", err)
if lg != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
} else {
plog.Panicf("unexpected error during txn: %v", err)
}
}
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
case *pb.RequestOp_RequestDeleteRange:
resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
if err != nil {
plog.Panicf("unexpected error during txn: %v", err)
if lg != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
} else {
plog.Panicf("unexpected error during txn: %v", err)
}
}
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
case *pb.RequestOp_RequestTxn:
@ -569,6 +585,7 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
resp := &pb.AlarmResponse{}
oldCount := len(a.s.alarmStore.Get(ar.Alarm))
lg := a.s.getLogger()
switch ar.Action {
case pb.AlarmRequest_GET:
resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
@ -583,14 +600,22 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
break
}
plog.Warningf("alarm %v raised by peer %s", m.Alarm, types.ID(m.MemberID))
if lg != nil {
lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
} else {
plog.Warningf("alarm %v raised by peer %s", m.Alarm, types.ID(m.MemberID))
}
switch m.Alarm {
case pb.AlarmType_CORRUPT:
a.s.applyV3 = newApplierV3Corrupt(a)
case pb.AlarmType_NOSPACE:
a.s.applyV3 = newApplierV3Capped(a)
default:
plog.Errorf("unimplemented alarm activation (%+v)", m)
if lg != nil {
lg.Warn("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m)))
} else {
plog.Errorf("unimplemented alarm activation (%+v)", m)
}
}
case pb.AlarmRequest_DEACTIVATE:
m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
@ -606,10 +631,18 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
switch m.Alarm {
case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT:
// TODO: check kv hash before deactivating CORRUPT?
plog.Infof("alarm disarmed %+v", ar)
if lg != nil {
lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
} else {
plog.Infof("alarm disarmed %+v", ar)
}
a.s.applyV3 = a.s.newApplierV3()
default:
plog.Errorf("unimplemented alarm deactivation (%+v)", m)
if lg != nil {
lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m)))
} else {
plog.Errorf("unimplemented alarm deactivation (%+v)", m)
}
}
default:
return nil, nil
@ -773,7 +806,7 @@ type quotaApplierV3 struct {
}
func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
return &quotaApplierV3{app, NewBackendQuota(s)}
return &quotaApplierV3{app, NewBackendQuota(s, "v3-applier")}
}
func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {

View File

@ -25,6 +25,7 @@ import (
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
)
// ApplierV2 is the interface for processing V2 raft messages
@ -36,11 +37,12 @@ type ApplierV2 interface {
Sync(r *RequestV2) Response
}
func NewApplierV2(s v2store.Store, c *membership.RaftCluster) ApplierV2 {
func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 {
return &applierV2store{store: s, cluster: c}
}
type applierV2store struct {
lg *zap.Logger
store v2store.Store
cluster *membership.RaftCluster
}
@ -77,7 +79,11 @@ func (a *applierV2store) Put(r *RequestV2) Response {
id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
if a.lg != nil {
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
} else {
plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
}
}
if a.cluster != nil {
a.cluster.UpdateAttributes(id, attr)
@ -108,7 +114,7 @@ func (a *applierV2store) Sync(r *RequestV2) Response {
// applyV2Request interprets r as a call to v2store.X
// and returns a Response interpreted from v2store.Event
func (s *EtcdServer) applyV2Request(r *RequestV2) Response {
defer warnOfExpensiveRequest(time.Now(), r)
defer warnOfExpensiveRequest(s.getLogger(), time.Now(), r)
switch r.Method {
case "POST":

View File

@ -24,11 +24,13 @@ import (
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/raftsnap"
"go.uber.org/zap"
)
func newBackend(cfg ServerConfig) backend.Backend {
bcfg := backend.DefaultBackendConfig()
bcfg.Path = cfg.backendPath()
bcfg.Logger = cfg.Logger
if cfg.QuotaBackendBytes > 0 && cfg.QuotaBackendBytes != DefaultQuotaBytes {
// permit 10% excess over quota for disarm
bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
@ -51,17 +53,32 @@ func openSnapshotBackend(cfg ServerConfig, ss *raftsnap.Snapshotter, snapshot ra
// openBackend returns a backend using the current etcd db.
func openBackend(cfg ServerConfig) backend.Backend {
fn := cfg.backendPath()
beOpened := make(chan backend.Backend)
now, beOpened := time.Now(), make(chan backend.Backend)
go func() {
beOpened <- newBackend(cfg)
}()
select {
case be := <-beOpened:
if cfg.Logger != nil {
cfg.Logger.Info("opened backend db", zap.String("path", fn), zap.Duration("took", time.Since(now)))
}
return be
case <-time.After(10 * time.Second):
plog.Warningf("another etcd process is using %q and holds the file lock, or loading backend file is taking >10 seconds", fn)
plog.Warningf("waiting for it to exit before starting...")
if cfg.Logger != nil {
cfg.Logger.Info(
"db file is flocked by another process, or taking too long",
zap.String("path", fn),
zap.Duration("took", time.Since(now)),
)
} else {
plog.Warningf("another etcd process is using %q and holds the file lock, or loading backend file is taking >10 seconds", fn)
plog.Warningf("waiting for it to exit before starting...")
}
}
return <-beOpened
}
@ -71,11 +88,11 @@ func openBackend(cfg ServerConfig) backend.Backend {
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex)
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil
}
oldbe.Close()
return openSnapshotBackend(cfg, raftsnap.New(cfg.SnapDir()), snapshot)
return openSnapshotBackend(cfg, raftsnap.New(cfg.Logger, cfg.SnapDir()), snapshot)
}

View File

@ -27,12 +27,13 @@ import (
"github.com/coreos/etcd/version"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
)
// isMemberBootstrapped tries to check if the given member has been bootstrapped
// in the given cluster.
func isMemberBootstrapped(cl *membership.RaftCluster, member string, rt http.RoundTripper, timeout time.Duration) bool {
rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), timeout, false, rt)
func isMemberBootstrapped(lg *zap.Logger, cl *membership.RaftCluster, member string, rt http.RoundTripper, timeout time.Duration) bool {
rcl, err := getClusterFromRemotePeers(lg, getRemotePeerURLs(cl, member), timeout, false, rt)
if err != nil {
return false
}
@ -54,21 +55,26 @@ func isMemberBootstrapped(cl *membership.RaftCluster, member string, rt http.Rou
// response, an error is returned.
// Each request has a 10-second timeout. Because the upper limit of TTL is 5s,
// 10 second is enough for building connection and finishing request.
func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*membership.RaftCluster, error) {
return getClusterFromRemotePeers(urls, 10*time.Second, true, rt)
func GetClusterFromRemotePeers(lg *zap.Logger, urls []string, rt http.RoundTripper) (*membership.RaftCluster, error) {
return getClusterFromRemotePeers(lg, urls, 10*time.Second, true, rt)
}
// If logerr is true, it prints out more error messages.
func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*membership.RaftCluster, error) {
func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*membership.RaftCluster, error) {
cc := &http.Client{
Transport: rt,
Timeout: timeout,
}
for _, u := range urls {
resp, err := cc.Get(u + "/members")
addr := u + "/members"
resp, err := cc.Get(addr)
if err != nil {
if logerr {
plog.Warningf("could not get cluster response from %s: %v", u, err)
if lg != nil {
lg.Warn("failed to get cluster response", zap.String("address", addr), zap.Error(err))
} else {
plog.Warningf("could not get cluster response from %s: %v", u, err)
}
}
continue
}
@ -76,21 +82,38 @@ func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool
resp.Body.Close()
if err != nil {
if logerr {
plog.Warningf("could not read the body of cluster response: %v", err)
if lg != nil {
lg.Warn("failed to read body of cluster response", zap.String("address", addr), zap.Error(err))
} else {
plog.Warningf("could not read the body of cluster response: %v", err)
}
}
continue
}
var membs []*membership.Member
if err = json.Unmarshal(b, &membs); err != nil {
if logerr {
plog.Warningf("could not unmarshal cluster response: %v", err)
if lg != nil {
lg.Warn("failed to unmarshal cluster response", zap.String("address", addr), zap.Error(err))
} else {
plog.Warningf("could not unmarshal cluster response: %v", err)
}
}
continue
}
id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID"))
if err != nil {
if logerr {
plog.Warningf("could not parse the cluster ID from cluster res: %v", err)
if lg != nil {
lg.Warn(
"failed to parse cluster ID",
zap.String("address", addr),
zap.String("header", resp.Header.Get("X-Etcd-Cluster-ID")),
zap.Error(err),
)
} else {
plog.Warningf("could not parse the cluster ID from cluster res: %v", err)
}
}
continue
}
@ -100,12 +123,11 @@ func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool
// if membership members are not present then the raft cluster formed will be
// an invalid empty cluster hence return failed to get raft cluster member(s) from the given urls error
if len(membs) > 0 {
return membership.NewClusterFromMembers("", id, membs), nil
return membership.NewClusterFromMembers(lg, "", id, membs), nil
}
return nil, fmt.Errorf("failed to get raft cluster member(s) from the given urls.")
return nil, fmt.Errorf("failed to get raft cluster member(s) from the given URLs")
}
return nil, fmt.Errorf("could not retrieve cluster information from the given urls")
return nil, fmt.Errorf("could not retrieve cluster information from the given URLs")
}
// getRemotePeerURLs returns peer urls of remote members in the cluster. The
@ -126,7 +148,7 @@ func getRemotePeerURLs(cl *membership.RaftCluster, local string) []string {
// The key of the returned map is the member's ID. The value of the returned map
// is the semver versions string, including server and cluster.
// If it fails to get the version of a member, the key will be nil.
func getVersions(cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
members := cl.Members()
vers := make(map[string]*version.Versions)
for _, m := range members {
@ -138,9 +160,13 @@ func getVersions(cl *membership.RaftCluster, local types.ID, rt http.RoundTrippe
vers[m.ID.String()] = &version.Versions{Server: version.Version, Cluster: cv}
continue
}
ver, err := getVersion(m, rt)
ver, err := getVersion(lg, m, rt)
if err != nil {
plog.Warningf("cannot get the version of member %s (%v)", m.ID, err)
if lg != nil {
lg.Warn("failed to get version", zap.String("remote-member-id", m.ID.String()), zap.Error(err))
} else {
plog.Warningf("cannot get the version of member %s (%v)", m.ID, err)
}
vers[m.ID.String()] = nil
} else {
vers[m.ID.String()] = ver
@ -152,7 +178,7 @@ func getVersions(cl *membership.RaftCluster, local types.ID, rt http.RoundTrippe
// decideClusterVersion decides the cluster version based on the versions map.
// The returned version is the min server version in the map, or nil if the min
// version in unknown.
func decideClusterVersion(vers map[string]*version.Versions) *semver.Version {
func decideClusterVersion(lg *zap.Logger, vers map[string]*version.Versions) *semver.Version {
var cv *semver.Version
lv := semver.Must(semver.NewVersion(version.Version))
@ -162,12 +188,30 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version {
}
v, err := semver.NewVersion(ver.Server)
if err != nil {
plog.Errorf("cannot understand the version of member %s (%v)", mid, err)
if lg != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
} else {
plog.Errorf("cannot understand the version of member %s (%v)", mid, err)
}
return nil
}
if lv.LessThan(*v) {
plog.Warningf("the local etcd version %s is not up-to-date", lv.String())
plog.Warningf("member %s has a higher version %s", mid, ver.Server)
if lg != nil {
lg.Warn(
"local etcd version is not up-to-date",
zap.String("local-member-version", lv.String()),
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
)
} else {
plog.Warningf("the local etcd version %s is not up-to-date", lv.String())
plog.Warningf("member %s has a higher version %s", mid, ver.Server)
}
}
if cv == nil {
cv = v
@ -184,19 +228,18 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version {
// cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version
// out of the range.
// We set this rule since when the local member joins, another member might be offline.
func isCompatibleWithCluster(cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
vers := getVersions(cl, local, rt)
func isCompatibleWithCluster(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
vers := getVersions(lg, cl, local, rt)
minV := semver.Must(semver.NewVersion(version.MinClusterVersion))
maxV := semver.Must(semver.NewVersion(version.Version))
maxV = &semver.Version{
Major: maxV.Major,
Minor: maxV.Minor,
}
return isCompatibleWithVers(vers, local, minV, maxV)
return isCompatibleWithVers(lg, vers, local, minV, maxV)
}
func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, minV, maxV *semver.Version) bool {
func isCompatibleWithVers(lg *zap.Logger, vers map[string]*version.Versions, local types.ID, minV, maxV *semver.Version) bool {
var ok bool
for id, v := range vers {
// ignore comparison with local version
@ -208,15 +251,42 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min
}
clusterv, err := semver.NewVersion(v.Cluster)
if err != nil {
plog.Errorf("cannot understand the cluster version of member %s (%v)", id, err)
if lg != nil {
lg.Warn(
"failed to parse cluster version of remote member",
zap.String("remote-member-id", id),
zap.String("remote-member-cluster-version", v.Cluster),
zap.Error(err),
)
} else {
plog.Errorf("cannot understand the cluster version of member %s (%v)", id, err)
}
continue
}
if clusterv.LessThan(*minV) {
plog.Warningf("the running cluster version(%v) is lower than the minimal cluster version(%v) supported", clusterv.String(), minV.String())
if lg != nil {
lg.Warn(
"cluster version of remote member is not compatible; too low",
zap.String("remote-member-id", id),
zap.String("remote-member-cluster-version", clusterv.String()),
zap.String("minimum-cluster-version-supported", minV.String()),
)
} else {
plog.Warningf("the running cluster version(%v) is lower than the minimal cluster version(%v) supported", clusterv.String(), minV.String())
}
return false
}
if maxV.LessThan(*clusterv) {
plog.Warningf("the running cluster version(%v) is higher than the maximum cluster version(%v) supported", clusterv.String(), maxV.String())
if lg != nil {
lg.Warn(
"cluster version of remote member is not compatible; too high",
zap.String("remote-member-id", id),
zap.String("remote-member-cluster-version", clusterv.String()),
zap.String("minimum-cluster-version-supported", minV.String()),
)
} else {
plog.Warningf("the running cluster version(%v) is higher than the maximum cluster version(%v) supported", clusterv.String(), maxV.String())
}
return false
}
ok = true
@ -226,7 +296,7 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min
// getVersion returns the Versions of the given member via its
// peerURLs. Returns the last error if it fails to get the version.
func getVersion(m *membership.Member, rt http.RoundTripper) (*version.Versions, error) {
func getVersion(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (*version.Versions, error) {
cc := &http.Client{
Transport: rt,
}
@ -236,21 +306,49 @@ func getVersion(m *membership.Member, rt http.RoundTripper) (*version.Versions,
)
for _, u := range m.PeerURLs {
resp, err = cc.Get(u + "/version")
addr := u + "/version"
resp, err = cc.Get(addr)
if err != nil {
plog.Warningf("failed to reach the peerURL(%s) of member %s (%v)", u, m.ID, err)
if lg != nil {
lg.Warn(
"failed to reach the peer URL",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
} else {
plog.Warningf("failed to reach the peerURL(%s) of member %s (%v)", u, m.ID, err)
}
continue
}
var b []byte
b, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
plog.Warningf("failed to read out the response body from the peerURL(%s) of member %s (%v)", u, m.ID, err)
if lg != nil {
lg.Warn(
"failed to read body of response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
} else {
plog.Warningf("failed to read out the response body from the peerURL(%s) of member %s (%v)", u, m.ID, err)
}
continue
}
var vers version.Versions
if err = json.Unmarshal(b, &vers); err != nil {
plog.Warningf("failed to unmarshal the response body got from the peerURL(%s) of member %s (%v)", u, m.ID, err)
if lg != nil {
lg.Warn(
"failed to unmarshal response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
} else {
plog.Warningf("failed to unmarshal the response body got from the peerURL(%s) of member %s (%v)", u, m.ID, err)
}
continue
}
return &vers, nil

View File

@ -22,8 +22,11 @@ import (
"github.com/coreos/etcd/version"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
)
var testLogger = zap.NewExample()
func TestDecideClusterVersion(t *testing.T) {
tests := []struct {
vers map[string]*version.Versions
@ -53,7 +56,7 @@ func TestDecideClusterVersion(t *testing.T) {
}
for i, tt := range tests {
dver := decideClusterVersion(tt.vers)
dver := decideClusterVersion(testLogger, tt.vers)
if !reflect.DeepEqual(dver, tt.wdver) {
t.Errorf("#%d: ver = %+v, want %+v", i, dver, tt.wdver)
}
@ -124,7 +127,7 @@ func TestIsCompatibleWithVers(t *testing.T) {
}
for i, tt := range tests {
ok := isCompatibleWithVers(tt.vers, tt.local, tt.minV, tt.maxV)
ok := isCompatibleWithVers(testLogger, tt.vers, tt.local, tt.minV, tt.maxV)
if ok != tt.wok {
t.Errorf("#%d: ok = %+v, want %+v", i, ok, tt.wok)
}

View File

@ -19,6 +19,8 @@ import (
"testing"
"github.com/coreos/etcd/pkg/types"
"go.uber.org/zap"
)
func mustNewURLs(t *testing.T, urls []string) []url.URL {
@ -37,6 +39,7 @@ func TestConfigVerifyBootstrapWithoutClusterAndDiscoveryURLFail(t *testing.T) {
Name: "node1",
DiscoveryURL: "",
InitialPeerURLsMap: types.URLsMap{},
Logger: zap.NewExample(),
}
if err := c.VerifyBootstrap(); err == nil {
t.Errorf("err = nil, want not nil")
@ -54,6 +57,7 @@ func TestConfigVerifyExistingWithDiscoveryURLFail(t *testing.T) {
PeerURLs: mustNewURLs(t, []string{"http://127.0.0.1:2380"}),
InitialPeerURLsMap: cluster,
NewCluster: false,
Logger: zap.NewExample(),
}
if err := c.VerifyJoinExisting(); err == nil {
t.Errorf("err = nil, want not nil")
@ -141,6 +145,7 @@ func TestConfigVerifyLocalMember(t *testing.T) {
cfg := ServerConfig{
Name: "node1",
InitialPeerURLsMap: cluster,
Logger: zap.NewExample(),
}
if tt.apurls != nil {
cfg.PeerURLs = mustNewURLs(t, tt.apurls)
@ -165,6 +170,7 @@ func TestSnapDir(t *testing.T) {
for dd, w := range tests {
cfg := ServerConfig{
DataDir: dd,
Logger: zap.NewExample(),
}
if g := cfg.SnapDir(); g != w {
t.Errorf("DataDir=%q: SnapDir()=%q, want=%q", dd, g, w)
@ -180,6 +186,7 @@ func TestWALDir(t *testing.T) {
for dd, w := range tests {
cfg := ServerConfig{
DataDir: dd,
Logger: zap.NewExample(),
}
if g := cfg.WALDir(); g != w {
t.Errorf("DataDir=%q: WALDir()=%q, want=%q", dd, g, w)
@ -196,6 +203,7 @@ func TestShouldDiscover(t *testing.T) {
for durl, w := range tests {
cfg := ServerConfig{
DiscoveryURL: durl,
Logger: zap.NewExample(),
}
if g := cfg.ShouldDiscover(); g != w {
t.Errorf("durl=%q: ShouldDiscover()=%t, want=%t", durl, g, w)

View File

@ -24,6 +24,9 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/pkg/types"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// CheckInitialHashKV compares initial hash values with its peers
@ -34,7 +37,18 @@ func (s *EtcdServer) CheckInitialHashKV() error {
return nil
}
plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
lg := s.getLogger()
if lg != nil {
lg.Info(
"starting initial corruption check",
zap.String("local-member-id", s.ID().String()),
zap.Duration("timeout", s.Cfg.ReqTimeout()),
)
} else {
plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
}
h, rev, crev, err := s.kv.HashByRev(0)
if err != nil {
return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err)
@ -44,22 +58,70 @@ func (s *EtcdServer) CheckInitialHashKV() error {
for _, p := range peers {
if p.resp != nil {
peerID := types.ID(p.resp.Header.MemberId)
fields := []zapcore.Field{
zap.String("local-member-id", s.ID().String()),
zap.Int64("local-member-revision", rev),
zap.Int64("local-member-compact-revision", crev),
zap.Uint32("local-member-hash", h),
zap.String("remote-member-id", peerID.String()),
zap.Strings("remote-member-endpoints", p.eps),
zap.Int64("remote-member-revision", p.resp.Header.Revision),
zap.Int64("remote-member-compact-revision", p.resp.CompactRevision),
zap.Uint32("remote-member-hash", p.resp.Hash),
}
if h != p.resp.Hash {
if crev == p.resp.CompactRevision {
plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
if lg != nil {
lg.Warn("found different hash values from remote peer", fields...)
} else {
plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
}
mismatch++
} else {
plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
if lg != nil {
lg.Warn("found different compact revision values from remote peer", fields...)
} else {
plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
}
}
}
continue
}
if p.err != nil {
switch p.err {
case rpctypes.ErrFutureRev:
plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
if lg != nil {
lg.Warn(
"cannot fetch hash from slow remote peer",
zap.String("local-member-id", s.ID().String()),
zap.Int64("local-member-revision", rev),
zap.Int64("local-member-compact-revision", crev),
zap.Uint32("local-member-hash", h),
zap.String("remote-member-id", p.id.String()),
zap.Strings("remote-member-endpoints", p.eps),
zap.Error(err),
)
} else {
plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
}
case rpctypes.ErrCompacted:
plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
if lg != nil {
lg.Warn(
"cannot fetch hash from remote peer; local member is behind",
zap.String("local-member-id", s.ID().String()),
zap.Int64("local-member-revision", rev),
zap.Int64("local-member-compact-revision", crev),
zap.Uint32("local-member-hash", h),
zap.String("remote-member-id", p.id.String()),
zap.Strings("remote-member-endpoints", p.eps),
zap.Error(err),
)
} else {
plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
}
}
}
}
@ -67,7 +129,14 @@ func (s *EtcdServer) CheckInitialHashKV() error {
return fmt.Errorf("%s found data inconsistency with peers", s.ID())
}
plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
if lg != nil {
lg.Info(
"initial corruption checking passed; no corruption",
zap.String("local-member-id", s.ID().String()),
)
} else {
plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
}
return nil
}
@ -76,7 +145,18 @@ func (s *EtcdServer) monitorKVHash() {
if t == 0 {
return
}
plog.Infof("enabled corruption checking with %s interval", t)
lg := s.getLogger()
if lg != nil {
lg.Info(
"enabled corruption checking",
zap.String("local-member-id", s.ID().String()),
zap.Duration("interval", t),
)
} else {
plog.Infof("enabled corruption checking with %s interval", t)
}
for {
select {
case <-s.stopping:
@ -87,15 +167,21 @@ func (s *EtcdServer) monitorKVHash() {
continue
}
if err := s.checkHashKV(); err != nil {
plog.Debugf("check hash kv failed %v", err)
if lg != nil {
lg.Warn("failed to check hash KV", zap.Error(err))
} else {
plog.Debugf("check hash kv failed %v", err)
}
}
}
}
func (s *EtcdServer) checkHashKV() error {
lg := s.getLogger()
h, rev, crev, err := s.kv.HashByRev(0)
if err != nil {
plog.Fatalf("failed to hash kv store (%v)", err)
return err
}
peers := s.getPeerHashKVs(rev)
@ -108,7 +194,6 @@ func (s *EtcdServer) checkHashKV() error {
h2, rev2, crev2, err := s.kv.HashByRev(0)
if err != nil {
plog.Warningf("failed to hash kv store (%v)", err)
return err
}
@ -129,7 +214,19 @@ func (s *EtcdServer) checkHashKV() error {
}
if h2 != h && rev2 == rev && crev == crev2 {
plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev)
if lg != nil {
lg.Warn(
"found hash mismatch",
zap.Int64("revision-1", rev),
zap.Int64("compact-revision-1", crev),
zap.Uint32("hash-1", h),
zap.Int64("revision-2", rev2),
zap.Int64("compact-revision-2", crev2),
zap.Uint32("hash-2", h2),
)
} else {
plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev)
}
mismatch(uint64(s.ID()))
}
@ -141,34 +238,63 @@ func (s *EtcdServer) checkHashKV() error {
// leader expects follower's latest revision less than or equal to leader's
if p.resp.Header.Revision > rev2 {
plog.Warningf(
"revision %d from member %v, expected at most %d",
p.resp.Header.Revision,
types.ID(id),
rev2)
if lg != nil {
lg.Warn(
"revision from follower must be less than or equal to leader's",
zap.Int64("leader-revision", rev2),
zap.Int64("follower-revision", p.resp.Header.Revision),
zap.String("follower-peer-id", types.ID(id).String()),
)
} else {
plog.Warningf(
"revision %d from member %v, expected at most %d",
p.resp.Header.Revision,
types.ID(id),
rev2)
}
mismatch(id)
}
// leader expects follower's latest compact revision less than or equal to leader's
if p.resp.CompactRevision > crev2 {
plog.Warningf(
"compact revision %d from member %v, expected at most %d",
p.resp.CompactRevision,
types.ID(id),
crev2,
)
if lg != nil {
lg.Warn(
"compact revision from follower must be less than or equal to leader's",
zap.Int64("leader-compact-revision", crev2),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.String("follower-peer-id", types.ID(id).String()),
)
} else {
plog.Warningf(
"compact revision %d from member %v, expected at most %d",
p.resp.CompactRevision,
types.ID(id),
crev2,
)
}
mismatch(id)
}
// follower's compact revision is leader's old one, then hashes must match
if p.resp.CompactRevision == crev && p.resp.Hash != h {
plog.Warningf(
"hash %d at revision %d from member %v, expected hash %d",
p.resp.Hash,
rev,
types.ID(id),
h,
)
if lg != nil {
lg.Warn(
"same compact revision then hashes must match",
zap.Int64("leader-compact-revision", crev2),
zap.Uint32("leader-hash", h),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.Uint32("follower-hash", p.resp.Hash),
zap.String("follower-peer-id", types.ID(id).String()),
)
} else {
plog.Warningf(
"hash %d at revision %d from member %v, expected hash %d",
p.resp.Hash,
rev,
types.ID(id),
h,
)
}
mismatch(id)
}
}
@ -176,33 +302,47 @@ func (s *EtcdServer) checkHashKV() error {
}
type peerHashKVResp struct {
id types.ID
eps []string
resp *clientv3.HashKVResponse
err error
eps []string
}
func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
// TODO: handle the case when "s.cluster.Members" have not
// been populated (e.g. no snapshot to load from disk)
mbs := s.cluster.Members()
pURLs := make([][]string, len(mbs))
pss := make([]peerHashKVResp, len(mbs))
for _, m := range mbs {
if m.ID == s.ID() {
continue
}
pURLs = append(pURLs, m.PeerURLs)
pss = append(pss, peerHashKVResp{id: m.ID, eps: m.PeerURLs})
}
for _, purls := range pURLs {
if len(purls) == 0 {
lg := s.getLogger()
for _, p := range pss {
if len(p.eps) == 0 {
continue
}
cli, cerr := clientv3.New(clientv3.Config{
DialTimeout: s.Cfg.ReqTimeout(),
Endpoints: purls,
Endpoints: p.eps,
})
if cerr != nil {
plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), purls, cerr.Error())
if lg != nil {
lg.Warn(
"failed to create client to peer URL",
zap.String("local-member-id", s.ID().String()),
zap.String("remote-member-id", p.id.String()),
zap.Strings("remote-member-endpoints", p.eps),
zap.Error(cerr),
)
} else {
plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), p.eps, cerr.Error())
}
continue
}
@ -213,15 +353,25 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
resp, cerr = cli.HashKV(ctx, c, rev)
cancel()
if cerr == nil {
resps = append(resps, &peerHashKVResp{resp: resp})
resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: resp, err: nil})
break
}
plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev)
if lg != nil {
lg.Warn(
"failed hash kv request",
zap.String("local-member-id", s.ID().String()),
zap.Int64("requested-revision", rev),
zap.String("remote-member-endpoint", c),
zap.Error(cerr),
)
} else {
plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev)
}
}
cli.Close()
if respsLen == len(resps) {
resps = append(resps, &peerHashKVResp{err: cerr, eps: purls})
resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: nil, err: cerr})
}
}
return resps

View File

@ -36,10 +36,13 @@ import (
"github.com/coreos/etcd/version"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
)
// RaftCluster is a list of Members that belong to the same raft cluster
type RaftCluster struct {
lg *zap.Logger
id types.ID
token string
@ -54,8 +57,8 @@ type RaftCluster struct {
removed map[types.ID]bool
}
func NewClusterFromURLsMap(token string, urlsmap types.URLsMap) (*RaftCluster, error) {
c := NewCluster(token)
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
c := NewCluster(lg, token)
for name, urls := range urlsmap {
m := NewMember(name, urls, token, nil)
if _, ok := c.members[m.ID]; ok {
@ -70,8 +73,8 @@ func NewClusterFromURLsMap(token string, urlsmap types.URLsMap) (*RaftCluster, e
return c, nil
}
func NewClusterFromMembers(token string, id types.ID, membs []*Member) *RaftCluster {
c := NewCluster(token)
func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*Member) *RaftCluster {
c := NewCluster(lg, token)
c.id = id
for _, m := range membs {
c.members[m.ID] = m
@ -79,8 +82,9 @@ func NewClusterFromMembers(token string, id types.ID, membs []*Member) *RaftClus
return c
}
func NewCluster(token string) *RaftCluster {
func NewCluster(lg *zap.Logger, token string) *RaftCluster {
return &RaftCluster{
lg: lg,
token: token,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
@ -115,7 +119,11 @@ func (c *RaftCluster) MemberByName(name string) *Member {
for _, m := range c.members {
if m.Name == name {
if memb != nil {
plog.Panicf("two members with the given name %q exist", name)
if c.lg != nil {
c.lg.Panic("two member with same name found", zap.String("name", name))
} else {
plog.Panicf("two members with the given name %q exist", name)
}
}
memb = m
}
@ -203,27 +211,43 @@ func (c *RaftCluster) SetBackend(be backend.Backend) {
mustCreateBackendBuckets(c.be)
}
func (c *RaftCluster) Recover(onSet func(*semver.Version)) {
func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()
c.members, c.removed = membersFromStore(c.v2store)
c.version = clusterVersionFromStore(c.v2store)
mustDetectDowngrade(c.version)
onSet(c.version)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
c.version = clusterVersionFromStore(c.lg, c.v2store)
mustDetectDowngrade(c.lg, c.version)
onSet(c.lg, c.version)
for _, m := range c.members {
plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id)
if c.lg != nil {
c.lg.Info(
"added member from store",
zap.String("cluster-id", c.id.String()),
zap.String("member-id", m.ID.String()),
zap.Strings("member-peer-urls", m.PeerURLs),
)
} else {
plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id)
}
}
if c.version != nil {
plog.Infof("set the cluster version to %v from store", version.Cluster(c.version.String()))
if c.lg != nil {
c.lg.Info(
"set cluster version from store",
zap.String("cluster-version", version.Cluster(c.version.String())),
)
} else {
plog.Infof("set the cluster version to %v from store", version.Cluster(c.version.String()))
}
}
}
// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid.
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
members, removed := membersFromStore(c.v2store)
members, removed := membersFromStore(c.lg, c.v2store)
id := types.ID(cc.NodeID)
if removed[id] {
return ErrIDRemoved
@ -241,17 +265,23 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
m := new(Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
plog.Panicf("unmarshal member should never fail: %v", err)
if c.lg != nil {
c.lg.Panic("failed to unmarshal member", zap.Error(err))
} else {
plog.Panicf("unmarshal member should never fail: %v", err)
}
}
for _, u := range m.PeerURLs {
if urls[u] {
return ErrPeerURLexists
}
}
case raftpb.ConfChangeRemoveNode:
if members[id] == nil {
return ErrIDNotFound
}
case raftpb.ConfChangeUpdateNode:
if members[id] == nil {
return ErrIDNotFound
@ -267,15 +297,24 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
m := new(Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
plog.Panicf("unmarshal member should never fail: %v", err)
if c.lg != nil {
c.lg.Panic("failed to unmarshal member", zap.Error(err))
} else {
plog.Panicf("unmarshal member should never fail: %v", err)
}
}
for _, u := range m.PeerURLs {
if urls[u] {
return ErrPeerURLexists
}
}
default:
plog.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode")
if c.lg != nil {
c.lg.Panic("unknown ConfChange type", zap.String("type", cc.Type.String()))
} else {
plog.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode")
}
}
return nil
}
@ -295,7 +334,16 @@ func (c *RaftCluster) AddMember(m *Member) {
c.members[m.ID] = m
plog.Infof("added member %s %v to cluster %s", m.ID, m.PeerURLs, c.id)
if c.lg != nil {
c.lg.Info(
"added member",
zap.String("member-id", m.ID.String()),
zap.Strings("member-peer-urls", m.PeerURLs),
zap.String("cluster-id", c.id.String()),
)
} else {
plog.Infof("added member %s %v to cluster %s", m.ID, m.PeerURLs, c.id)
}
}
// RemoveMember removes a member from the store.
@ -313,7 +361,15 @@ func (c *RaftCluster) RemoveMember(id types.ID) {
delete(c.members, id)
c.removed[id] = true
plog.Infof("removed member %s from cluster %s", id, c.id)
if c.lg != nil {
c.lg.Info(
"removed member",
zap.String("member-id", id.String()),
zap.String("cluster-id", c.id.String()),
)
} else {
plog.Infof("removed member %s from cluster %s", id, c.id)
}
}
func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
@ -331,9 +387,18 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
}
_, ok := c.removed[id]
if !ok {
plog.Panicf("error updating attributes of unknown member %s", id)
if c.lg != nil {
c.lg.Panic("failed to update; member unknown", zap.String("member-id", id.String()))
} else {
plog.Panicf("error updating attributes of unknown member %s", id)
}
}
if c.lg != nil {
c.lg.Warn("skipped attributes update of removed member", zap.String("member-id", id.String()))
} else {
plog.Warningf("skipped updating attributes of removed member %s", id)
}
plog.Warningf("skipped updating attributes of removed member %s", id)
}
func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
@ -348,7 +413,16 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes)
mustSaveMemberToBackend(c.be, c.members[id])
}
plog.Noticef("updated member %s %v in cluster %s", id, raftAttr.PeerURLs, c.id)
if c.lg != nil {
c.lg.Info(
"updated member",
zap.String("member-id", id.String()),
zap.Strings("member-peer-urls", raftAttr.PeerURLs),
zap.String("cluster-id", c.id.String()),
)
} else {
plog.Noticef("updated member %s %v in cluster %s", id, raftAttr.PeerURLs, c.id)
}
}
func (c *RaftCluster) Version() *semver.Version {
@ -360,23 +434,38 @@ func (c *RaftCluster) Version() *semver.Version {
return semver.Must(semver.NewVersion(c.version.String()))
}
func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*semver.Version)) {
func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()
if c.version != nil {
plog.Noticef("updated the cluster version from %v to %v", version.Cluster(c.version.String()), version.Cluster(ver.String()))
if c.lg != nil {
c.lg.Info(
"updated cluster version",
zap.String("from", version.Cluster(c.version.String())),
zap.String("from", version.Cluster(ver.String())),
)
} else {
plog.Noticef("updated the cluster version from %v to %v", version.Cluster(c.version.String()), version.Cluster(ver.String()))
}
} else {
plog.Noticef("set the initial cluster version to %v", version.Cluster(ver.String()))
if c.lg != nil {
c.lg.Info(
"set initial cluster version",
zap.String("cluster-version", version.Cluster(ver.String())),
)
} else {
plog.Noticef("set the initial cluster version to %v", version.Cluster(ver.String()))
}
}
c.version = ver
mustDetectDowngrade(c.version)
mustDetectDowngrade(c.lg, c.version)
if c.v2store != nil {
mustSaveClusterVersionToStore(c.v2store, ver)
}
if c.be != nil {
mustSaveClusterVersionToBackend(c.be, ver)
}
onSet(ver)
onSet(c.lg, ver)
}
func (c *RaftCluster) IsReadyToAddNewMember() bool {
@ -393,14 +482,25 @@ func (c *RaftCluster) IsReadyToAddNewMember() bool {
if nstarted == 1 && nmembers == 2 {
// a case of adding a new node to 1-member cluster for restoring cluster data
// https://github.com/coreos/etcd/blob/master/Documentation/v2/admin_guide.md#restoring-the-cluster
plog.Debugf("The number of started member is 1. This cluster can accept add member request.")
if c.lg != nil {
c.lg.Debug("number of started member is 1; can accept add member request")
} else {
plog.Debugf("The number of started member is 1. This cluster can accept add member request.")
}
return true
}
nquorum := nmembers/2 + 1
if nstarted < nquorum {
plog.Warningf("Reject add member request: the number of started member (%d) will be less than the quorum number of the cluster (%d)", nstarted, nquorum)
if c.lg != nil {
c.lg.Warn(
"rejecting member add; started member will be less than quorum",
zap.Int("number-of-started-member", nstarted),
zap.Int("quorum", nquorum),
)
} else {
plog.Warningf("Reject add member request: the number of started member (%d) will be less than the quorum number of the cluster (%d)", nstarted, nquorum)
}
return false
}
@ -424,14 +524,22 @@ func (c *RaftCluster) IsReadyToRemoveMember(id uint64) bool {
nquorum := nmembers/2 + 1
if nstarted < nquorum {
plog.Warningf("Reject remove member request: the number of started member (%d) will be less than the quorum number of the cluster (%d)", nstarted, nquorum)
if c.lg != nil {
c.lg.Warn(
"rejecting member remove; started member will be less than quorum",
zap.Int("number-of-started-member", nstarted),
zap.Int("quorum", nquorum),
)
} else {
plog.Warningf("Reject remove member request: the number of started member (%d) will be less than the quorum number of the cluster (%d)", nstarted, nquorum)
}
return false
}
return true
}
func membersFromStore(st v2store.Store) (map[types.ID]*Member, map[types.ID]bool) {
func membersFromStore(lg *zap.Logger, st v2store.Store) (map[types.ID]*Member, map[types.ID]bool) {
members := make(map[types.ID]*Member)
removed := make(map[types.ID]bool)
e, err := st.Get(StoreMembersPrefix, true, true)
@ -439,13 +547,21 @@ func membersFromStore(st v2store.Store) (map[types.ID]*Member, map[types.ID]bool
if isKeyNotFound(err) {
return members, removed
}
plog.Panicf("get storeMembers should never fail: %v", err)
if lg != nil {
lg.Panic("failed to get members from store", zap.String("path", StoreMembersPrefix), zap.Error(err))
} else {
plog.Panicf("get storeMembers should never fail: %v", err)
}
}
for _, n := range e.Node.Nodes {
var m *Member
m, err = nodeToMember(n)
if err != nil {
plog.Panicf("nodeToMember should never fail: %v", err)
if lg != nil {
lg.Panic("failed to nodeToMember", zap.Error(err))
} else {
plog.Panicf("nodeToMember should never fail: %v", err)
}
}
members[m.ID] = m
}
@ -455,7 +571,15 @@ func membersFromStore(st v2store.Store) (map[types.ID]*Member, map[types.ID]bool
if isKeyNotFound(err) {
return members, removed
}
plog.Panicf("get storeRemovedMembers should never fail: %v", err)
if lg != nil {
lg.Panic(
"failed to get removed members from store",
zap.String("path", storeRemovedMembersPrefix),
zap.Error(err),
)
} else {
plog.Panicf("get storeRemovedMembers should never fail: %v", err)
}
}
for _, n := range e.Node.Nodes {
removed[MustParseMemberIDFromKey(n.Key)] = true
@ -463,13 +587,21 @@ func membersFromStore(st v2store.Store) (map[types.ID]*Member, map[types.ID]bool
return members, removed
}
func clusterVersionFromStore(st v2store.Store) *semver.Version {
func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
e, err := st.Get(path.Join(storePrefix, "version"), false, false)
if err != nil {
if isKeyNotFound(err) {
return nil
}
plog.Panicf("unexpected error (%v) when getting cluster version from store", err)
if lg != nil {
lg.Panic(
"failed to get cluster version from store",
zap.String("path", path.Join(storePrefix, "version")),
zap.Error(err),
)
} else {
plog.Panicf("unexpected error (%v) when getting cluster version from store", err)
}
}
return semver.Must(semver.NewVersion(*e.Node.Value))
}
@ -502,11 +634,19 @@ func ValidateClusterAndAssignIDs(local *RaftCluster, existing *RaftCluster) erro
return nil
}
func mustDetectDowngrade(cv *semver.Version) {
func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version) {
lv := semver.Must(semver.NewVersion(version.Version))
// only keep major.minor version for comparison against cluster version
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
if cv != nil && lv.LessThan(*cv) {
plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
if lg != nil {
lg.Fatal(
"invalid downgrade; server version is lower than determined cluster version",
zap.String("current-server-version", version.Version),
zap.String("determined-cluster-version", version.Cluster(cv.String())),
)
} else {
plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
}
}
}

View File

@ -26,6 +26,8 @@ import (
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"go.uber.org/zap"
)
func TestClusterMember(t *testing.T) {
@ -274,7 +276,7 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
}
func TestClusterValidateConfigurationChange(t *testing.T) {
cl := NewCluster("")
cl := NewCluster(zap.NewExample(), "")
cl.SetStore(v2store.New())
for i := 1; i <= 4; i++ {
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
@ -559,7 +561,7 @@ func TestNodeToMember(t *testing.T) {
}
func newTestCluster(membs []*Member) *RaftCluster {
c := &RaftCluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
c := &RaftCluster{lg: zap.NewExample(), members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
for _, m := range membs {
c.members[m.ID] = m
}

View File

@ -77,7 +77,7 @@ func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.T
// It will panic if there is no PeerURLs available in Member.
func (m *Member) PickPeerURL() string {
if len(m.PeerURLs) == 0 {
plog.Panicf("member should always have some peer url")
panic("member should always have some peer url")
}
return m.PeerURLs[rand.Intn(len(m.PeerURLs))]
}

View File

@ -16,6 +16,9 @@ package etcdserver
import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
humanize "github.com/dustin/go-humanize"
"go.uber.org/zap"
)
const (
@ -57,18 +60,58 @@ const (
kvOverhead = 256
)
func NewBackendQuota(s *EtcdServer) Quota {
func NewBackendQuota(s *EtcdServer, name string) Quota {
lg := s.getLogger()
if s.Cfg.QuotaBackendBytes < 0 {
// disable quotas if negative
plog.Warningf("disabling backend quota")
if lg != nil {
lg.Info(
"disabled backend quota",
zap.String("quota-name", name),
zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
)
} else {
plog.Warningf("disabling backend quota")
}
return &passthroughQuota{}
}
if s.Cfg.QuotaBackendBytes == 0 {
// use default size if no quota size given
if lg != nil {
lg.Info(
"enabled backend quota with default value",
zap.String("quota-name", name),
zap.Int64("quota-size-bytes", DefaultQuotaBytes),
zap.String("quota-size", humanize.Bytes(uint64(DefaultQuotaBytes))),
)
}
return &backendQuota{s, DefaultQuotaBytes}
}
if s.Cfg.QuotaBackendBytes > MaxQuotaBytes {
plog.Warningf("backend quota %v exceeds maximum recommended quota %v", s.Cfg.QuotaBackendBytes, MaxQuotaBytes)
if lg != nil {
lg.Warn(
"quota exceeds the maximum value",
zap.String("quota-name", name),
zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))),
zap.Int64("quota-maximum-size-bytes", MaxQuotaBytes),
zap.String("quota-maximum-size", humanize.Bytes(uint64(MaxQuotaBytes))),
)
} else {
plog.Warningf("backend quota %v exceeds maximum recommended quota %v", s.Cfg.QuotaBackendBytes, MaxQuotaBytes)
}
}
if lg != nil {
lg.Info(
"enabled backend quota",
zap.String("quota-name", name),
zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))),
)
}
return &backendQuota{s, s.Cfg.QuotaBackendBytes}
}

View File

@ -35,6 +35,7 @@ import (
"github.com/coreos/etcd/wal/walpb"
"github.com/coreos/pkg/capnslog"
"go.uber.org/zap"
)
const (
@ -85,6 +86,8 @@ type apply struct {
}
type raftNode struct {
lg *zap.Logger
tickMu *sync.Mutex
raftNodeConfig
@ -107,6 +110,8 @@ type raftNode struct {
}
type raftNodeConfig struct {
lg *zap.Logger
// to check if msg receiver is removed from cluster
isIDRemoved func(id uint64) bool
raft.Node
@ -122,6 +127,7 @@ type raftNodeConfig struct {
func newRaftNode(cfg raftNodeConfig) *raftNode {
r := &raftNode{
lg: cfg.lg,
tickMu: new(sync.Mutex),
raftNodeConfig: cfg,
// set up contention detectors for raft heartbeat message.
@ -184,7 +190,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-time.After(internalTimeout):
plog.Warningf("timed out sending read state")
if r.lg != nil {
r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
} else {
plog.Warningf("timed out sending read state")
}
case <-r.stopped:
return
}
@ -215,7 +225,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
plog.Fatalf("raft save state and entries error: %v", err)
if r.lg != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
} else {
plog.Fatalf("raft save state and entries error: %v", err)
}
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
@ -225,14 +239,22 @@ func (r *raftNode) start(rh *raftReadyHandler) {
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("raft save snapshot error: %v", err)
if r.lg != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
} else {
plog.Fatalf("raft save snapshot error: %v", err)
}
}
// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}
// gofail: var raftAfterSaveSnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
if r.lg != nil {
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
} else {
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
}
// gofail: var raftAfterApplySnap struct{}
}
@ -329,8 +351,16 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
ok, exceed := r.td.Observe(ms[i].To)
if !ok {
// TODO: limit request rate.
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
plog.Warningf("server is likely overloaded")
if r.lg != nil {
r.lg.Warn(
"heartbeat took too long to send out; server is overloaded, likely from slow disk",
zap.Duration("exceeded", exceed),
zap.Duration("heartbeat-interval", r.heartbeat),
)
} else {
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
plog.Warningf("server is likely overloaded")
}
}
}
}
@ -351,7 +381,11 @@ func (r *raftNode) onStop() {
r.ticker.Stop()
r.transport.Stop()
if err := r.storage.Close(); err != nil {
plog.Panicf("raft close storage error: %v", err)
if r.lg != nil {
r.lg.Panic("failed to close Raft storage", zap.Error(err))
} else {
plog.Panicf("raft close storage error: %v", err)
}
}
close(r.done)
}
@ -386,19 +420,36 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
ClusterID: uint64(cl.ID()),
},
)
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
plog.Fatalf("create wal error: %v", err)
if w, err = wal.Create(cfg.Logger, cfg.WALDir(), metadata); err != nil {
if cfg.Logger != nil {
cfg.Logger.Fatal("failed to create WAL", zap.Error(err))
} else {
plog.Fatalf("create wal error: %v", err)
}
}
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
ctx, err := json.Marshal((*cl).Member(id))
var ctx []byte
ctx, err = json.Marshal((*cl).Member(id))
if err != nil {
plog.Panicf("marshal member should never fail: %v", err)
if cfg.Logger != nil {
cfg.Logger.Panic("failed to marshal member", zap.Error(err))
} else {
plog.Panicf("marshal member should never fail: %v", err)
}
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
id = member.ID
plog.Infof("starting member %s in cluster %s", id, cl.ID())
if cfg.Logger != nil {
cfg.Logger.Info(
"starting local member",
zap.String("local-member-id", id.String()),
zap.String("cluster-id", cl.ID().String()),
)
} else {
plog.Infof("starting member %s in cluster %s", id, cl.ID())
}
s = raft.NewMemoryStorage()
c := &raft.Config{
ID: uint64(id),
@ -430,10 +481,19 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap)
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := membership.NewCluster("")
if cfg.Logger != nil {
cfg.Logger.Info(
"restarting local member",
zap.String("local-member-id", id.String()),
zap.String("cluster-id", cid.String()),
zap.Uint64("commit-index", st.Commit),
)
} else {
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
}
cl := membership.NewCluster(cfg.Logger, "")
cl.SetID(cid)
s := raft.NewMemoryStorage()
if snapshot != nil {
@ -472,32 +532,61 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap)
// discard the previously uncommitted entries
for i, ent := range ents {
if ent.Index > st.Commit {
plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i)
if cfg.Logger != nil {
cfg.Logger.Info(
"discarding uncommitted WAL entries",
zap.Uint64("entry-index", ent.Index),
zap.Uint64("commit-index-from-wal", st.Commit),
zap.Int("number-of-discarded-entries", len(ents)-i),
)
} else {
plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i)
}
ents = ents[:i]
break
}
}
// force append the configuration change entries
toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(id), st.Term, st.Commit)
toAppEnts := createConfigChangeEnts(
cfg.Logger,
getIDs(cfg.Logger, snapshot, ents),
uint64(id),
st.Term,
st.Commit,
)
ents = append(ents, toAppEnts...)
// force commit newly appended entries
err := w.Save(raftpb.HardState{}, toAppEnts)
if err != nil {
plog.Fatalf("%v", err)
if cfg.Logger != nil {
cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err))
} else {
plog.Fatalf("%v", err)
}
}
if len(ents) != 0 {
st.Commit = ents[len(ents)-1].Index
}
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := membership.NewCluster("")
if cfg.Logger != nil {
cfg.Logger.Info(
"forcing restart member",
zap.String("local-member-id", id.String()),
zap.String("cluster-id", cid.String()),
zap.Uint64("commit-index", st.Commit),
)
} else {
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
}
cl := membership.NewCluster(cfg.Logger, "")
cl.SetID(cid)
s := raft.NewMemoryStorage()
if snapshot != nil {
@ -533,7 +622,7 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
// ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
ids := make(map[uint64]bool)
if snap != nil {
for _, id := range snap.Metadata.ConfState.Nodes {
@ -554,7 +643,11 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
case raftpb.ConfChangeUpdateNode:
// do nothing
default:
plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
if lg != nil {
lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String()))
} else {
plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
}
}
}
sids := make(types.Uint64Slice, 0, len(ids))
@ -570,7 +663,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
// `self` is _not_ removed, even if present in the set.
// If `self` is not inside the given ids, it creates a Raft entry to add a
// default member with the given `self`.
func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
func createConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
ents := make([]raftpb.Entry, 0)
next := index + 1
found := false
@ -599,7 +692,11 @@ func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raf
}
ctx, err := json.Marshal(m)
if err != nil {
plog.Panicf("marshal member should never fail: %v", err)
if lg != nil {
lg.Panic("failed to marshal member", zap.Error(err))
} else {
plog.Panicf("marshal member should never fail: %v", err)
}
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,

View File

@ -17,6 +17,7 @@ package etcdserver
import (
"encoding/json"
"reflect"
"sync"
"testing"
"time"
@ -27,6 +28,8 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"go.uber.org/zap"
)
func TestGetIDs(t *testing.T) {
@ -64,7 +67,7 @@ func TestGetIDs(t *testing.T) {
if tt.confState != nil {
snap.Metadata.ConfState = *tt.confState
}
idSet := getIDs(&snap, tt.ents)
idSet := getIDs(testLogger, &snap, tt.ents)
if !reflect.DeepEqual(idSet, tt.widSet) {
t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet)
}
@ -144,7 +147,7 @@ func TestCreateConfigChangeEnts(t *testing.T) {
}
for i, tt := range tests {
gents := createConfigChangeEnts(tt.ids, tt.self, tt.term, tt.index)
gents := createConfigChangeEnts(testLogger, tt.ids, tt.self, tt.term, tt.index)
if !reflect.DeepEqual(gents, tt.wents) {
t.Errorf("#%d: ents = %v, want %v", i, gents, tt.wents)
}
@ -154,12 +157,13 @@ func TestCreateConfigChangeEnts(t *testing.T) {
func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
n := newNopReadyNode()
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{r: *r}
srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *r}
srv.r.start(nil)
n.readyc <- raft.Ready{}
select {
@ -181,12 +185,13 @@ func TestConfgChangeBlocksApply(t *testing.T) {
n := newNopReadyNode()
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{r: *r}
srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *r}
srv.r.start(&raftReadyHandler{
getLead: func() uint64 { return 0 },

File diff suppressed because it is too large Load Diff

View File

@ -23,9 +23,12 @@ import (
"path"
"path/filepath"
"reflect"
"sync"
"testing"
"time"
"go.uber.org/zap"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/etcdserver/v2store"
@ -89,6 +92,8 @@ func TestDoLocalAction(t *testing.T) {
for i, tt := range tests {
st := mockstore.NewRecorder()
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -142,6 +147,8 @@ func TestDoBadLocalAction(t *testing.T) {
for i, tt := range tests {
st := mockstore.NewErrRecorder(storeErr)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -171,12 +178,15 @@ func TestApplyRepeat(t *testing.T) {
cl.SetStore(v2store.New())
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
@ -448,7 +458,11 @@ func TestApplyRequest(t *testing.T) {
for i, tt := range tests {
st := mockstore.NewRecorder()
srv := &EtcdServer{v2store: st}
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
v2store: st,
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
resp := srv.applyV2Request((*RequestV2)(&tt.req))
@ -465,6 +479,8 @@ func TestApplyRequest(t *testing.T) {
func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
cl := newTestCluster([]*membership.Member{{ID: 1}})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
v2store: mockstore.NewRecorder(),
cluster: cl,
}
@ -484,7 +500,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
}
func TestApplyConfChangeError(t *testing.T) {
cl := membership.NewCluster("")
cl := membership.NewCluster(zap.NewExample(), "")
cl.SetStore(v2store.New())
for i := 1; i <= 4; i++ {
cl.AddMember(&membership.Member{ID: types.ID(i)})
@ -527,7 +543,9 @@ func TestApplyConfChangeError(t *testing.T) {
for i, tt := range tests {
n := newNodeRecorder()
srv := &EtcdServer{
r: *newRaftNode(raftNodeConfig{Node: n}),
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
cluster: cl,
}
_, err := srv.applyConfChange(tt.cc, nil)
@ -548,16 +566,19 @@ func TestApplyConfChangeError(t *testing.T) {
}
func TestApplyConfChangeShouldStop(t *testing.T) {
cl := membership.NewCluster("")
cl := membership.NewCluster(zap.NewExample(), "")
cl.SetStore(v2store.New())
for i := 1; i <= 3; i++ {
cl.AddMember(&membership.Member{ID: types.ID(i)})
}
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
id: 1,
r: *r,
cluster: cl,
@ -589,14 +610,17 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
// TestApplyConfigChangeUpdatesConsistIndex ensures a config change also updates the consistIndex
// where consistIndex equals to applied index.
func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
cl := membership.NewCluster("")
cl := membership.NewCluster(zap.NewExample(), "")
cl.SetStore(v2store.New())
cl.AddMember(&membership.Member{ID: types.ID(1)})
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
id: 1,
r: *r,
cluster: cl,
@ -632,16 +656,19 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
// TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
// if the local member is removed along with other conf updates.
func TestApplyMultiConfChangeShouldStop(t *testing.T) {
cl := membership.NewCluster("")
cl := membership.NewCluster(zap.NewExample(), "")
cl.SetStore(v2store.New())
for i := 1; i <= 5; i++ {
cl.AddMember(&membership.Member{ID: types.ID(i)})
}
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
id: 2,
r: *r,
cluster: cl,
@ -677,13 +704,16 @@ func TestDoProposal(t *testing.T) {
for i, tt := range tests {
st := mockstore.NewRecorder()
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: newNodeCommitter(),
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
Cfg: ServerConfig{TickMs: 1},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
r: *r,
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -712,7 +742,9 @@ func TestDoProposal(t *testing.T) {
func TestDoProposalCancelled(t *testing.T) {
wt := mockwait.NewRecorder()
srv := &EtcdServer{
Cfg: ServerConfig{TickMs: 1},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
w: wt,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -734,7 +766,9 @@ func TestDoProposalCancelled(t *testing.T) {
func TestDoProposalTimeout(t *testing.T) {
srv := &EtcdServer{
Cfg: ServerConfig{TickMs: 1},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -751,8 +785,10 @@ func TestDoProposalTimeout(t *testing.T) {
func TestDoProposalStopped(t *testing.T) {
srv := &EtcdServer{
Cfg: ServerConfig{TickMs: 1},
r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeNop()}),
w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -771,7 +807,9 @@ func TestSync(t *testing.T) {
n := newNodeRecorder()
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
r: *newRaftNode(raftNodeConfig{Node: n}),
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
ctx: ctx,
cancel: cancel,
@ -814,7 +852,9 @@ func TestSyncTimeout(t *testing.T) {
n := newProposalBlockerRecorder()
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
r: *newRaftNode(raftNodeConfig{Node: n}),
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
ctx: ctx,
cancel: cancel,
@ -848,6 +888,7 @@ func TestSyncTrigger(t *testing.T) {
st := make(chan time.Time, 1)
tk := &time.Ticker{C: st}
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: n,
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
@ -855,7 +896,9 @@ func TestSyncTrigger(t *testing.T) {
})
srv := &EtcdServer{
Cfg: ServerConfig{TickMs: 1},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
r: *r,
v2store: mockstore.NewNop(),
SyncTicker: tk,
@ -908,15 +951,18 @@ func TestSnapshot(t *testing.T) {
st := mockstore.NewRecorderStream()
p := mockstorage.NewStorageRecorderStream("")
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: newNodeNop(),
raftStorage: s,
storage: p,
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
}
srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
srv.be = be
ch := make(chan struct{}, 2)
@ -958,7 +1004,7 @@ func TestSnapshot(t *testing.T) {
func TestSnapshotOrdering(t *testing.T) {
n := newNopReadyNode()
st := v2store.New()
cl := membership.NewCluster("abc")
cl := membership.NewCluster(zap.NewExample(), "abc")
cl.SetStore(st)
testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
@ -976,6 +1022,7 @@ func TestSnapshotOrdering(t *testing.T) {
p := mockstorage.NewStorageRecorderStream(testdir)
tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir)
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
transport: tr,
@ -983,10 +1030,12 @@ func TestSnapshotOrdering(t *testing.T) {
raftStorage: rs,
})
s := &EtcdServer{
Cfg: ServerConfig{DataDir: testdir},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir},
r: *r,
v2store: st,
snapshotter: raftsnap.New(snapdir),
snapshotter: raftsnap.New(zap.NewExample(), snapdir),
cluster: cl,
SyncTicker: &time.Ticker{},
}
@ -994,7 +1043,7 @@ func TestSnapshotOrdering(t *testing.T) {
be, tmpPath := backend.NewDefaultTmpBackend()
defer os.RemoveAll(tmpPath)
s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
s.be = be
s.start()
@ -1038,13 +1087,16 @@ func TestTriggerSnap(t *testing.T) {
st := mockstore.NewRecorder()
p := mockstorage.NewStorageRecorderStream("")
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: newNodeCommitter(),
raftStorage: raft.NewMemoryStorage(),
storage: p,
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
Cfg: ServerConfig{TickMs: 1, SnapCount: uint64(snapc)},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapCount: uint64(snapc)},
r: *r,
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1052,7 +1104,7 @@ func TestTriggerSnap(t *testing.T) {
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
srv.be = be
srv.start()
@ -1086,7 +1138,7 @@ func TestTriggerSnap(t *testing.T) {
func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
n := newNopReadyNode()
st := v2store.New()
cl := membership.NewCluster("abc")
cl := membership.NewCluster(zap.NewExample(), "abc")
cl.SetStore(st)
testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
@ -1101,6 +1153,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
rs := raft.NewMemoryStorage()
tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
transport: tr,
@ -1108,10 +1161,12 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
raftStorage: rs,
})
s := &EtcdServer{
Cfg: ServerConfig{DataDir: testdir},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir},
r: *r,
v2store: st,
snapshotter: raftsnap.New(testdir),
snapshotter: raftsnap.New(zap.NewExample(), testdir),
cluster: cl,
SyncTicker: &time.Ticker{},
}
@ -1121,7 +1176,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
defer func() {
os.RemoveAll(tmpPath)
}()
s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
s.be = be
s.start()
@ -1186,12 +1241,15 @@ func TestAddMember(t *testing.T) {
st := v2store.New()
cl.SetStore(st)
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
@ -1227,12 +1285,15 @@ func TestRemoveMember(t *testing.T) {
cl.SetStore(v2store.New())
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
@ -1267,12 +1328,15 @@ func TestUpdateMember(t *testing.T) {
cl.SetStore(st)
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
@ -1307,10 +1371,12 @@ func TestPublish(t *testing.T) {
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
readych: make(chan struct{}),
Cfg: ServerConfig{TickMs: 1},
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
id: 1,
r: *newRaftNode(raftNodeConfig{Node: n}),
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &membership.RaftCluster{},
w: w,
@ -1354,11 +1420,14 @@ func TestPublish(t *testing.T) {
func TestPublishStopped(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
Cfg: ServerConfig{TickMs: 1},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
r: *r,
cluster: &membership.RaftCluster{},
w: mockwait.NewNop(),
@ -1380,8 +1449,10 @@ func TestPublishRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
n := newNodeRecorderStream()
srv := &EtcdServer{
Cfg: ServerConfig{TickMs: 1},
r: *newRaftNode(raftNodeConfig{Node: n}),
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
w: mockwait.NewNop(),
stopping: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1420,9 +1491,11 @@ func TestUpdateVersion(t *testing.T) {
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
id: 1,
Cfg: ServerConfig{TickMs: 1},
r: *newRaftNode(raftNodeConfig{Node: n}),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1},
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
cluster: &membership.RaftCluster{},
w: w,
@ -1459,6 +1532,8 @@ func TestUpdateVersion(t *testing.T) {
func TestStopNotify(t *testing.T) {
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
stop: make(chan struct{}),
done: make(chan struct{}),
}
@ -1510,7 +1585,7 @@ func TestGetOtherPeerURLs(t *testing.T) {
},
}
for i, tt := range tests {
cl := membership.NewClusterFromMembers("", types.ID(0), tt.membs)
cl := membership.NewClusterFromMembers(zap.NewExample(), "", types.ID(0), tt.membs)
self := "1"
urls := getRemotePeerURLs(cl, self)
if !reflect.DeepEqual(urls, tt.wurls) {
@ -1646,7 +1721,7 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
}
func newTestCluster(membs []*membership.Member) *membership.RaftCluster {
c := membership.NewCluster("")
c := membership.NewCluster(zap.NewExample(), "")
for _, m := range membs {
c.AddMember(m)
}

View File

@ -20,6 +20,9 @@ import (
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/raftsnap"
humanize "github.com/dustin/go-humanize"
"go.uber.org/zap"
)
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
@ -30,14 +33,18 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi
clone := s.v2store.Clone()
d, err := clone.SaveNoCopy()
if err != nil {
plog.Panicf("store save should never fail: %v", err)
if lg := s.getLogger(); lg != nil {
lg.Panic("failed to save v2 store data", zap.Error(err))
} else {
plog.Panicf("store save should never fail: %v", err)
}
}
// commit kv to write metadata(for example: consistent index).
s.KV().Commit()
dbsnap := s.be.Snapshot()
// get a snapshot of v3 KV as readCloser
rc := newSnapshotReaderCloser(dbsnap)
rc := newSnapshotReaderCloser(s.getLogger(), dbsnap)
// put the []byte snapshot of store into raft snapshot and return the merged snapshot with
// KV readCloser snapshot.
@ -54,19 +61,39 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi
return *raftsnap.NewMessage(m, rc, dbsnap.Size())
}
func newSnapshotReaderCloser(snapshot backend.Snapshot) io.ReadCloser {
func newSnapshotReaderCloser(lg *zap.Logger, snapshot backend.Snapshot) io.ReadCloser {
pr, pw := io.Pipe()
go func() {
n, err := snapshot.WriteTo(pw)
if err == nil {
plog.Infof("wrote database snapshot out [total bytes: %d]", n)
if lg != nil {
lg.Info(
"sent database snapshot to writer",
zap.Int64("bytes", n),
zap.String("size", humanize.Bytes(uint64(n))),
)
} else {
plog.Infof("wrote database snapshot out [total bytes: %d]", n)
}
} else {
plog.Warningf("failed to write database snapshot out [written bytes: %d]: %v", n, err)
if lg != nil {
lg.Warn(
"failed to send database snapshot to writer",
zap.String("size", humanize.Bytes(uint64(n))),
zap.Error(err),
)
} else {
plog.Warningf("failed to write database snapshot out [written bytes: %d]: %v", n, err)
}
}
pw.CloseWithError(err)
err = snapshot.Close()
if err != nil {
plog.Panicf("failed to close database snapshot: %v", err)
if lg != nil {
lg.Panic("failed to close database snapshot", zap.Error(err))
} else {
plog.Panicf("failed to close database snapshot: %v", err)
}
}
}()
return pr

View File

@ -24,6 +24,8 @@ import (
"github.com/coreos/etcd/raftsnap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"go.uber.org/zap"
)
type Storage interface {
@ -63,7 +65,7 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
}
func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
var (
err error
wmetadata []byte
@ -71,19 +73,35 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID,
repaired := false
for {
if w, err = wal.Open(waldir, snap); err != nil {
plog.Fatalf("open wal error: %v", err)
if w, err = wal.Open(lg, waldir, snap); err != nil {
if lg != nil {
lg.Fatal("failed to open WAL", zap.Error(err))
} else {
plog.Fatalf("open wal error: %v", err)
}
}
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
plog.Fatalf("read wal error (%v) and cannot be repaired", err)
if lg != nil {
lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
} else {
plog.Fatalf("read wal error (%v) and cannot be repaired", err)
}
}
if !wal.Repair(waldir) {
plog.Fatalf("WAL error (%v) cannot be repaired", err)
if !wal.Repair(lg, waldir) {
if lg != nil {
lg.Fatal("failed to repair WAL", zap.Error(err))
} else {
plog.Fatalf("WAL error (%v) cannot be repaired", err)
}
} else {
plog.Infof("repaired WAL error (%v)", err)
if lg != nil {
lg.Info("repaired WAL", zap.Error(err))
} else {
plog.Infof("repaired WAL error (%v)", err)
}
repaired = true
}
continue

View File

@ -21,6 +21,8 @@ import (
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"go.uber.org/zap"
)
// isConnectedToQuorumSince checks whether the local member is connected to the
@ -97,18 +99,28 @@ func (nc *notifier) notify(err error) {
close(nc.c)
}
func warnOfExpensiveRequest(now time.Time, stringer fmt.Stringer) {
warnOfExpensiveGenericRequest(now, stringer, "")
func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, stringer fmt.Stringer) {
warnOfExpensiveGenericRequest(lg, now, stringer, "")
}
func warnOfExpensiveReadOnlyRangeRequest(now time.Time, stringer fmt.Stringer) {
warnOfExpensiveGenericRequest(now, stringer, "read-only range ")
func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, stringer fmt.Stringer) {
warnOfExpensiveGenericRequest(lg, now, stringer, "read-only range ")
}
func warnOfExpensiveGenericRequest(now time.Time, stringer fmt.Stringer, prefix string) {
func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, stringer fmt.Stringer, prefix string) {
// TODO: add metrics
d := time.Since(now)
if d > warnApplyDuration {
plog.Warningf("%srequest %q took too long (%v) to execute", prefix, stringer.String(), d)
if lg != nil {
lg.Warn(
"request took too long",
zap.Duration("took", d),
zap.Duration("expected-duration", warnApplyDuration),
zap.String("prefix", prefix),
zap.String("request", stringer.String()),
)
} else {
plog.Warningf("%srequest %q took too long (%v) to execute", prefix, stringer.String(), d)
}
}
}

View File

@ -19,6 +19,8 @@ import (
"testing"
"time"
"go.uber.org/zap"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
@ -31,7 +33,7 @@ func TestLongestConnected(t *testing.T) {
if err != nil {
t.Fatal(err)
}
clus, err := membership.NewClusterFromURLsMap("test", umap)
clus, err := membership.NewClusterFromURLsMap(zap.NewExample(), "test", umap)
if err != nil {
t.Fatal(err)
}

View File

@ -18,8 +18,11 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"time"
"go.uber.org/zap"
"github.com/coreos/etcd/auth"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
@ -84,7 +87,7 @@ type Authenticator interface {
}
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
defer warnOfExpensiveReadOnlyRangeRequest(time.Now(), r)
defer warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), time.Now(), r)
if !r.Serializable {
err := s.linearizableReadNotify(ctx)
@ -135,7 +138,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
return checkTxnAuth(s.authStore, ai, r)
}
defer warnOfExpensiveReadOnlyRangeRequest(time.Now(), r)
defer warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), time.Now(), r)
get := func() { resp, err = s.applyV3Base.Txn(r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
@ -358,12 +361,22 @@ func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest
return nil, err
}
lg := s.getLogger()
var resp proto.Message
for {
checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
if err != nil {
if err != auth.ErrAuthNotEnabled {
plog.Errorf("invalid authentication request to user %s was issued", r.Name)
if lg != nil {
lg.Warn(
"invalid authentication was requested",
zap.String("user", r.Name),
zap.Error(err),
)
} else {
plog.Errorf("invalid authentication request to user %s was issued", r.Name)
}
}
return nil, err
}
@ -386,7 +399,12 @@ func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest
if checkedRevision == s.AuthStore().Revision() {
break
}
plog.Infof("revision when password checked is obsolete, retrying")
if lg != nil {
lg.Info("revision when password checked became stale; retrying")
} else {
plog.Infof("revision when password checked is obsolete, retrying")
}
}
return resp.(*pb.AuthenticateResponse), nil
@ -626,13 +644,18 @@ func (s *EtcdServer) linearizableReadLoop() {
s.readNotifier = nextnr
s.readMu.Unlock()
lg := s.getLogger()
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if err := s.r.ReadIndex(cctx, ctx); err != nil {
cancel()
if err == raft.ErrStopped {
return
}
plog.Errorf("failed to get read index from raft: %v", err)
if lg != nil {
lg.Warn("failed to get read index from Raft", zap.Error(err))
} else {
plog.Errorf("failed to get read index from raft: %v", err)
}
nr.notify(err)
continue
}
@ -649,10 +672,22 @@ func (s *EtcdServer) linearizableReadLoop() {
if !done {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
if lg != nil {
lg.Warn(
"ignored out-of-date read index response",
zap.String("ctx-expected", fmt.Sprintf("%+v", string(rs.RequestCtx))),
zap.String("ctx-got", fmt.Sprintf("%+v", string(ctx))),
)
} else {
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
}
}
case <-time.After(s.Cfg.ReqTimeout()):
plog.Warningf("timed out waiting for read index response")
if lg != nil {
lg.Warn("timed out waiting for read index response", zap.Duration("timeout", s.Cfg.ReqTimeout()))
} else {
plog.Warningf("timed out waiting for read index response")
}
nr.notify(ErrTimeout)
timeout = true
case <-s.stopping: