diff --git a/storage/index.go b/storage/index.go index 905501072..6c2ff7b4f 100644 --- a/storage/index.go +++ b/storage/index.go @@ -2,6 +2,7 @@ package storage import ( "log" + "sort" "sync" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/google/btree" @@ -13,6 +14,7 @@ type index interface { Put(key []byte, rev revision) Restore(key []byte, created, modified revision, ver int64) Tombstone(key []byte, rev revision) error + RangeEvents(key, end []byte, rev int64) []revision Compact(rev int64) map[revision]struct{} Equal(b index) bool } @@ -118,6 +120,38 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error { return ki.tombstone(rev.main, rev.sub) } +// RangeEvents returns all revisions from key(including) to end(excluding) +// at or after the given rev. The returned slice is sorted in the order +// of revision. +func (ti *treeIndex) RangeEvents(key, end []byte, rev int64) []revision { + ti.RLock() + defer ti.RUnlock() + + keyi := &keyIndex{key: key} + if end == nil { + item := ti.tree.Get(keyi) + if item == nil { + return nil + } + keyi = item.(*keyIndex) + return keyi.since(rev) + } + + endi := &keyIndex{key: end} + var revs []revision + ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool { + if !item.Less(endi) { + return false + } + curKeyi := item.(*keyIndex) + revs = append(revs, curKeyi.since(rev)...) + return true + }) + sort.Sort(revisions(revs)) + + return revs +} + func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { available := make(map[revision]struct{}) emptyki := make([]*keyIndex, 0) diff --git a/storage/key_index.go b/storage/key_index.go index f100d9f81..a80698018 100644 --- a/storage/key_index.go +++ b/storage/key_index.go @@ -132,6 +132,41 @@ func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err return revision{}, revision{}, 0, ErrRevisionNotFound } +// since returns revisions since the give rev. Only the revision with the +// largest sub revision will be returned if multiple revisions have the same +// main revision. +func (ki *keyIndex) since(rev int64) []revision { + if ki.isEmpty() { + log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) + } + since := revision{rev, 0} + var gi int + // find the generations to start checking + for gi = len(ki.generations) - 1; gi > 0; gi-- { + if since.GreaterThan(ki.generations[gi].created) { + break + } + } + + var revs []revision + var last int64 + for ; gi < len(ki.generations); gi++ { + for _, r := range ki.generations[gi].revs { + if since.GreaterThan(r) { + continue + } + if r.main == last { + // replace the revision with a new one that has higher sub value, + // because the original one should not be seen by external + revs[len(revs)-1] = r + } + revs = append(revs, r) + last = r.main + } + } + return revs +} + // compact compacts a keyIndex by removing the versions with smaller or equal // revision than the given atRev except the largest one (If the largest one is // a tombstone, it will not be kept). diff --git a/storage/kv.go b/storage/kv.go index e0a37cb42..7547f017c 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -6,6 +6,10 @@ import ( "github.com/coreos/etcd/storage/storagepb" ) +// CancelFunc tells an operation to abandon its work. A CancelFunc does not +// wait for the work to stop. +type CancelFunc func() + type KV interface { // Range gets the keys in the range at rangeRev. // If rangeRev <=0, range gets the keys at currentRev. @@ -46,3 +50,34 @@ type KV interface { Restore() error Close() error } + +// Watcher watches on the KV. It will be notified if there is an event +// happened on the watched key or prefix. +type Watcher interface { + // Event returns a channel that receives observed event that matches the + // context of watcher. When watch finishes or is canceled or aborted, the + // channel is closed and returns empty event. + // Successive calls to Event return the same value. + Event() <-chan storagepb.Event + + // Err returns a non-nil error value after Event is closed. Err returns + // Compacted if the history was compacted, Canceled if watch is canceled, + // or EOF if watch reaches the end revision. No other values for Err are defined. + // After Event is closed, successive calls to Err return the same value. + Err() error +} + +// WatchableKV is a KV that can be watched. +type WatchableKV interface { + KV + + // Watcher watches the events happening or happened in etcd. The whole + // event history can be watched unless compacted. + // If `prefix` is true, watch observes all events whose key prefix could be the given `key`. + // If `startRev` <=0, watch observes events after currentRev. + // If `endRev` <=0, watch observes events until watch is cancelled. + // + // Canceling the watcher releases resources associated with it, so code + // should always call cancel as soon as watch is done. + Watcher(key []byte, prefix bool, startRev, endRev int64) (Watcher, CancelFunc) +} diff --git a/storage/kv_test.go b/storage/kv_test.go index 808a53089..705b888aa 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -691,6 +691,109 @@ func TestKVSnapshot(t *testing.T) { } } +func TestWatchableKVWatch(t *testing.T) { + s := newWatchableStore(tmpPath) + defer cleanup(s, tmpPath) + + wa, cancel := s.Watcher([]byte("foo"), true, 0, 0) + defer cancel() + + s.Put([]byte("foo"), []byte("bar")) + select { + case ev := <-wa.Event(): + wev := storagepb.Event{ + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 1, + ModRevision: 1, + Version: 1, + }, + } + if !reflect.DeepEqual(ev, wev) { + t.Errorf("watched event = %+v, want %+v", ev, wev) + } + case <-time.After(time.Second): + t.Fatalf("failed to watch the event") + } + + s.Put([]byte("foo1"), []byte("bar1")) + select { + case ev := <-wa.Event(): + wev := storagepb.Event{ + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{ + Key: []byte("foo1"), + Value: []byte("bar1"), + CreateRevision: 2, + ModRevision: 2, + Version: 1, + }, + } + if !reflect.DeepEqual(ev, wev) { + t.Errorf("watched event = %+v, want %+v", ev, wev) + } + case <-time.After(time.Second): + t.Fatalf("failed to watch the event") + } + + wa, cancel = s.Watcher([]byte("foo1"), false, 1, 4) + defer cancel() + + select { + case ev := <-wa.Event(): + wev := storagepb.Event{ + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{ + Key: []byte("foo1"), + Value: []byte("bar1"), + CreateRevision: 2, + ModRevision: 2, + Version: 1, + }, + } + if !reflect.DeepEqual(ev, wev) { + t.Errorf("watched event = %+v, want %+v", ev, wev) + } + case <-time.After(time.Second): + t.Fatalf("failed to watch the event") + } + + s.Put([]byte("foo1"), []byte("bar11")) + select { + case ev := <-wa.Event(): + wev := storagepb.Event{ + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{ + Key: []byte("foo1"), + Value: []byte("bar11"), + CreateRevision: 2, + ModRevision: 3, + Version: 2, + }, + } + if !reflect.DeepEqual(ev, wev) { + t.Errorf("watched event = %+v, want %+v", ev, wev) + } + case <-time.After(time.Second): + t.Fatalf("failed to watch the event") + } + + select { + case ev := <-wa.Event(): + if !reflect.DeepEqual(ev, storagepb.Event{}) { + t.Errorf("watched event = %+v, want %+v", ev, storagepb.Event{}) + } + if g := wa.Err(); g != ExceedEnd { + t.Errorf("err = %+v, want %+v", g, ExceedEnd) + } + case <-time.After(time.Second): + t.Fatalf("failed to watch the event") + } + +} + func cleanup(s KV, path string) { s.Close() os.Remove(path) diff --git a/storage/kvstore.go b/storage/kvstore.go index 538fc6f8b..c57b08c86 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -25,6 +25,7 @@ var ( ErrTxnIDMismatch = errors.New("storage: txn id mismatch") ErrCompacted = errors.New("storage: required revision has been compacted") ErrFutureRev = errors.New("storage: required revision is a future revision") + ErrCanceled = errors.New("storage: watcher is canceled") ) type store struct { @@ -170,6 +171,54 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err return n, rev, nil } +// RangeEvents gets the events from key to end at or after rangeRev. +// If rangeRev <=0, rangeEvents returns events from the beginning of the history. +// If `end` is nil, the request only observes the events on key. +// If `end` is not nil, it observes the events on key range [key, range_end). +// Limit limits the number of events returned. +// If the required rev is compacted, ErrCompacted will be returned. +// TODO: return byte slices instead of events to avoid meaningless encode and decode. +func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs []storagepb.Event, nextRev int64, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + if startRev <= s.compactMainRev { + return nil, 0, ErrCompacted + } + + revs := s.kvindex.RangeEvents(key, end, startRev) + if len(revs) == 0 { + return nil, s.currentRev.main + 1, nil + } + + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + // fetch events from the backend using revisions + for _, rev := range revs { + if rev.main >= endRev { + return evs, rev.main, nil + } + revbytes := newRevBytes() + revToBytes(rev, revbytes) + + _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + if len(vs) != 1 { + log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub) + } + + e := storagepb.Event{} + if err := e.Unmarshal(vs[0]); err != nil { + log.Fatalf("storage: cannot unmarshal event: %v", err) + } + evs = append(evs, e) + if limit > 0 && len(evs) >= int(limit) { + return evs, rev.main + 1, nil + } + } + return evs, s.currentRev.main + 1, nil +} + func (s *store) Compact(rev int64) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index d50bba108..a8c2ed71c 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -473,6 +473,9 @@ func (i *fakeIndex) Tombstone(key []byte, rev revision) error { i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}}) return nil } +func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision { + return nil +} func (i *fakeIndex) Compact(rev int64) map[revision]struct{} { i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}}) return <-i.indexCompactRespc diff --git a/storage/revision.go b/storage/revision.go index d579fc28b..8624f27e5 100644 --- a/storage/revision.go +++ b/storage/revision.go @@ -33,3 +33,9 @@ func bytesToRev(bytes []byte) revision { sub: int64(binary.BigEndian.Uint64(bytes[9:])), } } + +type revisions []revision + +func (a revisions) Len() int { return len(a) } +func (a revisions) Less(i, j int) bool { return a[j].GreaterThan(a[i]) } +func (a revisions) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/storage/watchable_store.go b/storage/watchable_store.go new file mode 100644 index 000000000..52a784e00 --- /dev/null +++ b/storage/watchable_store.go @@ -0,0 +1,336 @@ +package storage + +import ( + "errors" + "log" + "sync" + "time" + + "github.com/coreos/etcd/storage/storagepb" +) + +// ReachEnd is the error returned by Watcher.Err when watcher reaches its end revision and +// no more event is available. +var ExceedEnd = errors.New("storage: watcher reaches end revision") + +type watchableStore struct { + mu sync.Mutex + + KV + + // contains all unsynced watchers that needs to sync events that have happened + // TODO: use map to reduce cancel cost + unsynced []*watcher + // contains all synced watchers that are tracking the events that will happen + // The key of the map is the key that the watcher is watching on. + synced map[string][]*watcher + // contains all synced watchers that have an end revision + // The key of the map is the end revision of the watcher. + endm map[int64][]*watcher + tx *ongoingTx + + stopc chan struct{} + wg sync.WaitGroup +} + +func newWatchableStore(path string) *watchableStore { + s := &watchableStore{ + KV: newStore(path), + synced: make(map[string][]*watcher), + endm: make(map[int64][]*watcher), + stopc: make(chan struct{}), + } + s.wg.Add(1) + go s.syncWatchersLoop() + return s +} + +func (s *watchableStore) Put(key, value []byte) (rev int64) { + s.mu.Lock() + defer s.mu.Unlock() + + rev = s.KV.Put(key, value) + // TODO: avoid this range + kvs, _, err := s.KV.Range(key, nil, 0, rev) + if err != nil { + log.Panicf("unexpected range error (%v)", err) + } + s.handle(rev, storagepb.Event{ + Type: storagepb.PUT, + Kv: &kvs[0], + }) + return rev +} + +func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { + s.mu.Lock() + defer s.mu.Unlock() + + // TODO: avoid this range + kvs, _, err := s.KV.Range(key, end, 0, 0) + if err != nil { + log.Panicf("unexpected range error (%v)", err) + } + n, rev = s.KV.DeleteRange(key, end) + for _, kv := range kvs { + s.handle(rev, storagepb.Event{ + Type: storagepb.DELETE, + Kv: &storagepb.KeyValue{ + Key: kv.Key, + }, + }) + } + return n, rev +} + +func (s *watchableStore) TxnBegin() int64 { + s.mu.Lock() + s.tx = newOngoingTx() + return s.KV.TxnBegin() +} + +func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { + rev, err = s.KV.TxnPut(txnID, key, value) + if err == nil { + s.tx.put(string(key)) + } + return rev, err +} + +func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { + kvs, _, err := s.KV.TxnRange(txnID, key, end, 0, 0) + if err != nil { + log.Panicf("unexpected range error (%v)", err) + } + n, rev, err = s.KV.TxnDeleteRange(txnID, key, end) + if err == nil { + for _, kv := range kvs { + s.tx.del(string(kv.Key)) + } + } + return n, rev, err +} + +func (s *watchableStore) TxnEnd(txnID int64) error { + err := s.KV.TxnEnd(txnID) + if err != nil { + return err + } + + _, rev, _ := s.KV.Range(nil, nil, 0, 0) + for k := range s.tx.putm { + kvs, _, err := s.KV.Range([]byte(k), nil, 0, 0) + if err != nil { + log.Panicf("unexpected range error (%v)", err) + } + s.handle(rev, storagepb.Event{ + Type: storagepb.PUT, + Kv: &kvs[0], + }) + } + for k := range s.tx.delm { + s.handle(rev, storagepb.Event{ + Type: storagepb.DELETE, + Kv: &storagepb.KeyValue{ + Key: []byte(k), + }, + }) + } + s.mu.Unlock() + return nil +} + +func (s *watchableStore) Close() error { + close(s.stopc) + s.wg.Wait() + return s.KV.Close() +} + +func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64) (Watcher, CancelFunc) { + s.mu.Lock() + defer s.mu.Unlock() + + wa := newWatcher(key, prefix, startRev, endRev) + k := string(key) + if startRev == 0 { + s.synced[k] = append(s.synced[k], wa) + if endRev != 0 { + s.endm[endRev] = append(s.endm[endRev], wa) + } + } else { + s.unsynced = append(s.unsynced, wa) + } + + cancel := CancelFunc(func() { + s.mu.Lock() + s.mu.Unlock() + wa.stopWithError(ErrCanceled) + + // remove global references of the watcher + for i, w := range s.unsynced { + if w == wa { + s.unsynced = append(s.unsynced[:i], s.unsynced[i+1:]...) + return + } + } + + for i, w := range s.synced[k] { + if w == wa { + s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...) + } + } + if wa.end != 0 { + for i, w := range s.endm[wa.end] { + if w == wa { + s.endm[wa.end] = append(s.endm[wa.end][:i], s.endm[wa.end][i+1:]...) + } + } + } + // If we cannot find it, it should have finished watch. + }) + + return wa, cancel +} + +// keepSyncWatchers syncs the watchers in the unsyncd map every 100ms. +func (s *watchableStore) syncWatchersLoop() { + defer s.wg.Done() + + for { + s.mu.Lock() + s.syncWatchers() + s.mu.Unlock() + + select { + case <-time.After(100 * time.Millisecond): + case <-s.stopc: + return + } + } +} + +// syncWatchers syncs the watchers in the unsyncd map. +func (s *watchableStore) syncWatchers() { + _, curRev, _ := s.KV.Range(nil, nil, 0, 0) + + // filtering without allocating + // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating + nws := s.unsynced[:0] + for _, w := range s.unsynced { + var end []byte + if w.prefix { + end = make([]byte, len(w.key)) + copy(end, w.key) + end[len(w.key)-1]++ + } + limit := cap(w.ch) - len(w.ch) + // the channel is full, try it in the next round + if limit == 0 { + nws = append(nws, w) + continue + } + evs, nextRev, err := s.KV.(*store).RangeEvents(w.key, end, int64(limit), w.cur, w.end) + if err != nil { + w.stopWithError(err) + continue + } + + // push events to the channel + for _, ev := range evs { + w.ch <- ev + } + // stop watcher if it reaches the end + if w.end > 0 && nextRev >= w.end { + w.stopWithError(ExceedEnd) + continue + } + // switch to tracking future events if needed + if nextRev > curRev { + s.synced[string(w.key)] = append(s.synced[string(w.key)], w) + if w.end != 0 { + s.endm[w.end] = append(s.endm[w.end], w) + } + continue + } + // put it back to try it in the next round + w.cur = nextRev + nws = append(nws, w) + } + s.unsynced = nws +} + +// handle handles the change of the happening event on all watchers. +func (s *watchableStore) handle(rev int64, ev storagepb.Event) { + s.notify(rev, ev) + s.stopWatchers(rev) +} + +// notify notifies the fact that given event at the given rev just happened to +// watchers that watch on the key of the event. +func (s *watchableStore) notify(rev int64, ev storagepb.Event) { + // check all prefixes of the key to notify all corresponded watchers + for i := 0; i <= len(ev.Kv.Key); i++ { + ws := s.synced[string(ev.Kv.Key[:i])] + nws := ws[:0] + for _, w := range ws { + // the watcher needs to be notified when either it watches prefix or + // the key is exactly matched. + if !w.prefix && i != len(ev.Kv.Key) { + continue + } + select { + case w.ch <- ev: + nws = append(nws, w) + default: + // put it back to unsynced place + if w.end != 0 { + for i, ew := range s.endm[w.end] { + if ew == w { + s.endm[w.end] = append(s.endm[w.end][:i], s.endm[w.end][i+1:]...) + } + } + } + w.cur = rev + s.unsynced = append(s.unsynced, w) + } + } + s.synced[string(ev.Kv.Key[:i])] = nws + } +} + +// stopWatchers stops watchers with limit equal to rev. +func (s *watchableStore) stopWatchers(rev int64) { + for i, wa := range s.endm[rev+1] { + k := string(wa.key) + for _, w := range s.synced[k] { + if w == wa { + s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...) + } + } + wa.stopWithError(ExceedEnd) + } + delete(s.endm, rev+1) +} + +type ongoingTx struct { + // keys put/deleted in the ongoing txn + putm map[string]bool + delm map[string]bool +} + +func newOngoingTx() *ongoingTx { + return &ongoingTx{ + putm: make(map[string]bool), + delm: make(map[string]bool), + } +} + +func (tx *ongoingTx) put(k string) { + tx.putm[k] = true + tx.delm[k] = false +} + +func (tx *ongoingTx) del(k string) { + tx.delm[k] = true + tx.putm[k] = false +} diff --git a/storage/watcher.go b/storage/watcher.go new file mode 100644 index 000000000..8ade64da3 --- /dev/null +++ b/storage/watcher.go @@ -0,0 +1,46 @@ +package storage + +import ( + "sync" + + "github.com/coreos/etcd/storage/storagepb" +) + +type watcher struct { + key []byte + prefix bool + cur int64 + end int64 + + ch chan storagepb.Event + mu sync.Mutex + err error +} + +func newWatcher(key []byte, prefix bool, start, end int64) *watcher { + return &watcher{ + key: key, + prefix: prefix, + cur: start, + end: end, + ch: make(chan storagepb.Event, 10), + } +} + +func (w *watcher) Event() <-chan storagepb.Event { return w.ch } + +func (w *watcher) Err() error { + w.mu.Lock() + defer w.mu.Unlock() + return w.err +} + +func (w *watcher) stopWithError(err error) { + if w.err != nil { + return + } + close(w.ch) + w.mu.Lock() + w.err = err + w.mu.Unlock() +} diff --git a/storage/watcher_bench_test.go b/storage/watcher_bench_test.go new file mode 100644 index 000000000..485df8161 --- /dev/null +++ b/storage/watcher_bench_test.go @@ -0,0 +1,17 @@ +package storage + +import ( + "fmt" + "testing" +) + +func BenchmarkKVWatcherMemoryUsage(b *testing.B) { + s := newWatchableStore(tmpPath) + defer cleanup(s, tmpPath) + + b.ReportAllocs() + b.StartTimer() + for i := 0; i < b.N; i++ { + s.Watcher([]byte(fmt.Sprint("foo", i)), false, 0, 0) + } +}