commit
4e175a98c3
|
@ -208,6 +208,10 @@ type EtcdServer struct {
|
|||
forceVersionC chan struct{}
|
||||
|
||||
msgSnapC chan raftpb.Message
|
||||
|
||||
// wg is used to wait for the go routines that depends on the server state
|
||||
// to exit when stopping the server.
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewServer creates a new EtcdServer from the supplied configuration. The
|
||||
|
@ -536,6 +540,8 @@ func (s *EtcdServer) run() {
|
|||
s.r.stop()
|
||||
sched.Stop()
|
||||
|
||||
s.wg.Wait()
|
||||
|
||||
// kv, lessor and backend can be nil if running without v3 enabled
|
||||
// or running unit tests.
|
||||
if s.lessor != nil {
|
||||
|
@ -1089,7 +1095,10 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
|||
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
||||
clone := s.store.Clone()
|
||||
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
|
||||
d, err := clone.SaveNoCopy()
|
||||
// TODO: current store will never fail to do a snapshot
|
||||
// what should we do if the store might fail?
|
||||
|
|
|
@ -201,6 +201,18 @@ func (b *backend) Commits() int64 {
|
|||
}
|
||||
|
||||
func (b *backend) Defrag() error {
|
||||
err := b.defrag()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// commit to update metadata like db.size
|
||||
b.batchTx.Commit()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *backend) defrag() error {
|
||||
// TODO: make this non-blocking?
|
||||
// lock batchTx to ensure nobody is using previous tx, and then
|
||||
// close previous ongoing tx.
|
||||
|
@ -251,8 +263,6 @@ func (b *backend) Defrag() error {
|
|||
if err != nil {
|
||||
log.Fatalf("backend: cannot begin tx (%s)", err)
|
||||
}
|
||||
// commit to update metadata like db.size
|
||||
b.batchTx.commit(false)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -160,6 +160,8 @@ func (t *batchTx) commit(stop bool) {
|
|||
// commit the last tx
|
||||
if t.tx != nil {
|
||||
if t.pending == 0 && !stop {
|
||||
t.backend.mu.RLock()
|
||||
defer t.backend.mu.RUnlock()
|
||||
atomic.StoreInt64(&t.backend.size, t.tx.Size())
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue