diff --git a/etcdserver/server.go b/etcdserver/server.go index e49da3cf9..8faf05165 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -24,6 +24,7 @@ import ( "os" "path" "regexp" + "sync" "sync/atomic" "time" @@ -162,7 +163,9 @@ type EtcdServer struct { cluster *cluster store store.Store - kv dstorage.ConsistentWatchableKV + + kvMu sync.RWMutex + kv dstorage.ConsistentWatchableKV stats *stats.ServerStats lstats *stats.LeaderStats @@ -506,9 +509,7 @@ func (s *EtcdServer) run() { plog.Panicf("restore KV error: %v", err) } - oldKV := s.kv - // TODO: swap the kv pointer atomically - s.kv = newKV + oldKV := s.swapKV(newKV) // Closing oldKV might block until all the txns // on the kv are finished. @@ -1032,7 +1033,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { if s.cfg.V3demo { // commit v3 storage because WAL file before snapshot index // could be removed after SaveSnap. - s.kv.Commit() + s.getKV().Commit() } // SaveSnap saves the snapshot and releases the locked wal files // to the snapshot index. @@ -1172,6 +1173,20 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { } } +func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV { + s.kvMu.RLock() + defer s.kvMu.RUnlock() + return s.kv +} + +func (s *EtcdServer) swapKV(kv dstorage.ConsistentWatchableKV) dstorage.ConsistentWatchableKV { + s.kvMu.Lock() + defer s.kvMu.Unlock() + old := s.kv + s.kv = kv + return old +} + // isConnectedToQuorumSince checks whether the local member is connected to the // quorum of the cluster since the given time. func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*Member) bool { diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 30b277358..6746041ff 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -102,7 +102,7 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern // Watcable returns a watchable interface attached to the etcdserver. func (s *EtcdServer) Watchable() dstorage.Watchable { - return s.kv + return s.getKV() } const ( @@ -113,19 +113,21 @@ const ( ) func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { + kv := s.getKV() + ar := &applyResult{} switch { case r.Range != nil: - ar.resp, ar.err = applyRange(noTxn, s.kv, r.Range) + ar.resp, ar.err = applyRange(noTxn, kv, r.Range) case r.Put != nil: - ar.resp, ar.err = applyPut(noTxn, s.kv, r.Put) + ar.resp, ar.err = applyPut(noTxn, kv, r.Put) case r.DeleteRange != nil: - ar.resp, ar.err = applyDeleteRange(noTxn, s.kv, r.DeleteRange) + ar.resp, ar.err = applyDeleteRange(noTxn, kv, r.DeleteRange) case r.Txn != nil: - ar.resp, ar.err = applyTxn(s.kv, r.Txn) + ar.resp, ar.err = applyTxn(kv, r.Txn) case r.Compaction != nil: - ar.resp, ar.err = applyCompaction(s.kv, r.Compaction) + ar.resp, ar.err = applyCompaction(kv, r.Compaction) default: panic("not implemented") }