diff --git a/pkg/wait/wait_time.go b/pkg/wait/wait_time.go new file mode 100644 index 000000000..32a401aae --- /dev/null +++ b/pkg/wait/wait_time.go @@ -0,0 +1,62 @@ +/* + Copyright 2015 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package wait + +import ( + "sync" + "time" +) + +type WaitTime interface { + // Wait returns a chan that waits on the given deadline. + // The chan will be triggered when Trigger is called with a + // deadline that is later than the one it is waiting for. + // The given deadline MUST be unique. The deadline should be + // retrived by calling time.Now() in most cases. + Wait(deadline time.Time) <-chan struct{} + // Trigger triggers all the waiting chans with an earlier deadline. + Trigger(deadline time.Time) +} + +type timeList struct { + l sync.Mutex + m map[int64]chan struct{} +} + +func NewTimeList() *timeList { + return &timeList{m: make(map[int64]chan struct{})} +} + +func (tl *timeList) Wait(deadline time.Time) <-chan struct{} { + tl.l.Lock() + defer tl.l.Unlock() + ch := make(chan struct{}, 1) + // The given deadline SHOULD be unique. + tl.m[deadline.UnixNano()] = ch + return ch +} + +func (tl *timeList) Trigger(deadline time.Time) { + tl.l.Lock() + defer tl.l.Unlock() + for t, ch := range tl.m { + if t < deadline.UnixNano() { + delete(tl.m, t) + close(ch) + } + } +} diff --git a/pkg/wait/wait_time_test.go b/pkg/wait/wait_time_test.go new file mode 100644 index 000000000..9d713eaad --- /dev/null +++ b/pkg/wait/wait_time_test.go @@ -0,0 +1,85 @@ +/* + + Copyright 2015 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package wait + +import ( + "testing" + "time" +) + +func TestWaitTime(t *testing.T) { + wt := NewTimeList() + ch1 := wt.Wait(time.Now()) + t1 := time.Now() + wt.Trigger(t1) + select { + case <-ch1: + case <-time.After(10 * time.Millisecond): + t.Fatalf("cannot receive from ch as expected") + } + + ch2 := wt.Wait(time.Now()) + t2 := time.Now() + wt.Trigger(t1) + select { + case <-ch2: + t.Fatalf("unexpected to receive from ch") + case <-time.After(10 * time.Millisecond): + } + wt.Trigger(t2) + select { + case <-ch2: + case <-time.After(10 * time.Millisecond): + t.Fatalf("cannot receive from ch as expected") + } +} + +func TestWaitTestStress(t *testing.T) { + chs := make([]<-chan struct{}, 0) + wt := NewTimeList() + for i := 0; i < 10000; i++ { + chs = append(chs, wt.Wait(time.Now())) + } + wt.Trigger(time.Now()) + + for _, ch := range chs { + select { + case <-ch: + case <-time.After(10 * time.Millisecond): + t.Fatalf("cannot receive from ch as expected") + } + } +} + +func BenchmarkWaitTime(b *testing.B) { + t := time.Now() + wt := NewTimeList() + for i := 0; i < b.N; i++ { + wt.Wait(t) + } +} + +func BenchmarkTriggerAnd10KWaitTime(b *testing.B) { + for i := 0; i < b.N; i++ { + t := time.Now() + wt := NewTimeList() + for j := 0; j < 10000; j++ { + wt.Wait(t) + } + wt.Trigger(time.Now()) + } +}