etcdsever: swap kv pointer atomically
parent
cc6d98bf89
commit
c4cbaf5c2a
|
@ -24,6 +24,7 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -162,7 +163,9 @@ type EtcdServer struct {
|
|||
cluster *cluster
|
||||
|
||||
store store.Store
|
||||
kv dstorage.ConsistentWatchableKV
|
||||
|
||||
kvMu sync.RWMutex
|
||||
kv dstorage.ConsistentWatchableKV
|
||||
|
||||
stats *stats.ServerStats
|
||||
lstats *stats.LeaderStats
|
||||
|
@ -506,9 +509,7 @@ func (s *EtcdServer) run() {
|
|||
plog.Panicf("restore KV error: %v", err)
|
||||
}
|
||||
|
||||
oldKV := s.kv
|
||||
// TODO: swap the kv pointer atomically
|
||||
s.kv = newKV
|
||||
oldKV := s.swapKV(newKV)
|
||||
|
||||
// Closing oldKV might block until all the txns
|
||||
// on the kv are finished.
|
||||
|
@ -1032,7 +1033,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|||
if s.cfg.V3demo {
|
||||
// commit v3 storage because WAL file before snapshot index
|
||||
// could be removed after SaveSnap.
|
||||
s.kv.Commit()
|
||||
s.getKV().Commit()
|
||||
}
|
||||
// SaveSnap saves the snapshot and releases the locked wal files
|
||||
// to the snapshot index.
|
||||
|
@ -1172,6 +1173,20 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV {
|
||||
s.kvMu.RLock()
|
||||
defer s.kvMu.RUnlock()
|
||||
return s.kv
|
||||
}
|
||||
|
||||
func (s *EtcdServer) swapKV(kv dstorage.ConsistentWatchableKV) dstorage.ConsistentWatchableKV {
|
||||
s.kvMu.Lock()
|
||||
defer s.kvMu.Unlock()
|
||||
old := s.kv
|
||||
s.kv = kv
|
||||
return old
|
||||
}
|
||||
|
||||
// isConnectedToQuorumSince checks whether the local member is connected to the
|
||||
// quorum of the cluster since the given time.
|
||||
func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*Member) bool {
|
||||
|
|
|
@ -102,7 +102,7 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern
|
|||
|
||||
// Watcable returns a watchable interface attached to the etcdserver.
|
||||
func (s *EtcdServer) Watchable() dstorage.Watchable {
|
||||
return s.kv
|
||||
return s.getKV()
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -113,19 +113,21 @@ const (
|
|||
)
|
||||
|
||||
func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} {
|
||||
kv := s.getKV()
|
||||
|
||||
ar := &applyResult{}
|
||||
|
||||
switch {
|
||||
case r.Range != nil:
|
||||
ar.resp, ar.err = applyRange(noTxn, s.kv, r.Range)
|
||||
ar.resp, ar.err = applyRange(noTxn, kv, r.Range)
|
||||
case r.Put != nil:
|
||||
ar.resp, ar.err = applyPut(noTxn, s.kv, r.Put)
|
||||
ar.resp, ar.err = applyPut(noTxn, kv, r.Put)
|
||||
case r.DeleteRange != nil:
|
||||
ar.resp, ar.err = applyDeleteRange(noTxn, s.kv, r.DeleteRange)
|
||||
ar.resp, ar.err = applyDeleteRange(noTxn, kv, r.DeleteRange)
|
||||
case r.Txn != nil:
|
||||
ar.resp, ar.err = applyTxn(s.kv, r.Txn)
|
||||
ar.resp, ar.err = applyTxn(kv, r.Txn)
|
||||
case r.Compaction != nil:
|
||||
ar.resp, ar.err = applyCompaction(s.kv, r.Compaction)
|
||||
ar.resp, ar.err = applyCompaction(kv, r.Compaction)
|
||||
default:
|
||||
panic("not implemented")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue