diff --git a/etcdserver/server.go b/etcdserver/server.go index 3466355f0..429ac8766 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -190,12 +190,14 @@ type EtcdServer struct { applyV3 applierV3 // applyV3Base is the core applier without auth or quotas applyV3Base applierV3 - kv mvcc.ConsistentWatchableKV - lessor lease.Lessor - bemu sync.Mutex - be backend.Backend - authStore auth.AuthStore - alarmStore *alarm.AlarmStore + applyWait wait.WaitTime + + kv mvcc.ConsistentWatchableKV + lessor lease.Lessor + bemu sync.Mutex + be backend.Backend + authStore auth.AuthStore + alarmStore *alarm.AlarmStore stats *stats.ServerStats lstats *stats.LeaderStats @@ -475,6 +477,7 @@ func (s *EtcdServer) start() { s.snapCount = DefaultSnapCount } s.w = wait.New() + s.applyWait = wait.NewTimeList() s.done = make(chan struct{}) s.stop = make(chan struct{}) if s.ClusterVersion() != nil { @@ -629,10 +632,12 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { plog.Warningf("avoid queries with large range/delete range!") } proposalsApplied.Set(float64(ep.appliedi)) + s.applyWait.Trigger(ep.appliedi) // wait for the raft routine to finish the disk writes before triggering a // snapshot. or applied index might be greater than the last index in raft // storage, since the raft routine might be slower than apply routine. <-apply.raftDone + s.triggerSnapshot(ep) select { // snapshot requested via send() diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index e044dbe9d..dca135c46 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -463,24 +463,12 @@ func (s *EtcdServer) isValidSimpleToken(token string) bool { return false } - // CAUTION: below index synchronization is required because this node - // might not receive and apply the log entry of Authenticate() RPC. - authApplied := false - for i := 0; i < 10; i++ { - if uint64(index) <= s.getAppliedIndex() { - authApplied = true - break - } - - time.Sleep(100 * time.Millisecond) + select { + case <-s.applyWait.Wait(uint64(index)): + return true + case <-s.stop: + return true } - - if !authApplied { - plog.Errorf("timeout of waiting Authenticate() RPC") - return false - } - - return true } func (s *EtcdServer) authInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) { diff --git a/pkg/wait/wait_time.go b/pkg/wait/wait_time.go index eebbf49b3..297e48a47 100644 --- a/pkg/wait/wait_time.go +++ b/pkg/wait/wait_time.go @@ -25,9 +25,14 @@ type WaitTime interface { Trigger(deadline uint64) } +var closec chan struct{} + +func init() { closec = make(chan struct{}); close(closec) } + type timeList struct { - l sync.Mutex - m map[uint64]chan struct{} + l sync.Mutex + lastTriggerDeadline uint64 + m map[uint64]chan struct{} } func NewTimeList() *timeList { @@ -37,6 +42,9 @@ func NewTimeList() *timeList { func (tl *timeList) Wait(deadline uint64) <-chan struct{} { tl.l.Lock() defer tl.l.Unlock() + if tl.lastTriggerDeadline >= deadline { + return closec + } ch := tl.m[deadline] if ch == nil { ch = make(chan struct{}) @@ -48,6 +56,7 @@ func (tl *timeList) Wait(deadline uint64) <-chan struct{} { func (tl *timeList) Trigger(deadline uint64) { tl.l.Lock() defer tl.l.Unlock() + tl.lastTriggerDeadline = deadline for t, ch := range tl.m { if t <= deadline { delete(tl.m, t) diff --git a/pkg/wait/wait_time_test.go b/pkg/wait/wait_time_test.go index 13f4d7cf2..26164c4ac 100644 --- a/pkg/wait/wait_time_test.go +++ b/pkg/wait/wait_time_test.go @@ -29,19 +29,26 @@ func TestWaitTime(t *testing.T) { t.Fatalf("cannot receive from ch as expected") } - ch2 := wt.Wait(2) - wt.Trigger(1) + ch2 := wt.Wait(4) + wt.Trigger(3) select { case <-ch2: t.Fatalf("unexpected to receive from ch2") default: } - wt.Trigger(3) + wt.Trigger(4) select { case <-ch2: default: t.Fatalf("cannot receive from ch2 as expected") } + + select { + // wait on a triggered deadline + case <-wt.Wait(4): + default: + t.Fatalf("unexpected blocking when wait on triggered deadline") + } } func TestWaitTestStress(t *testing.T) {