diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 242901152..5126baba6 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -25,7 +25,7 @@ import ( type watchableStore struct { mu sync.Mutex - KV + *store // contains all unsynced watchers that needs to sync events that have happened // TODO: use map to reduce cancel cost @@ -41,7 +41,7 @@ type watchableStore struct { func newWatchableStore(path string) *watchableStore { s := &watchableStore{ - KV: newStore(path), + store: newStore(path), synced: make(map[string][]*watcher), stopc: make(chan struct{}), } @@ -54,9 +54,9 @@ func (s *watchableStore) Put(key, value []byte) (rev int64) { s.mu.Lock() defer s.mu.Unlock() - rev = s.KV.Put(key, value) + rev = s.store.Put(key, value) // TODO: avoid this range - kvs, _, err := s.KV.Range(key, nil, 0, rev) + kvs, _, err := s.store.Range(key, nil, 0, rev) if err != nil { log.Panicf("unexpected range error (%v)", err) } @@ -72,11 +72,11 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { defer s.mu.Unlock() // TODO: avoid this range - kvs, _, err := s.KV.Range(key, end, 0, 0) + kvs, _, err := s.store.Range(key, end, 0, 0) if err != nil { log.Panicf("unexpected range error (%v)", err) } - n, rev = s.KV.DeleteRange(key, end) + n, rev = s.store.DeleteRange(key, end) for _, kv := range kvs { s.handle(rev, storagepb.Event{ Type: storagepb.DELETE, @@ -91,11 +91,11 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { func (s *watchableStore) TxnBegin() int64 { s.mu.Lock() s.tx = newOngoingTx() - return s.KV.TxnBegin() + return s.store.TxnBegin() } func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { - rev, err = s.KV.TxnPut(txnID, key, value) + rev, err = s.store.TxnPut(txnID, key, value) if err == nil { s.tx.put(string(key)) } @@ -103,11 +103,11 @@ func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err } func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { - kvs, _, err := s.KV.TxnRange(txnID, key, end, 0, 0) + kvs, _, err := s.store.TxnRange(txnID, key, end, 0, 0) if err != nil { log.Panicf("unexpected range error (%v)", err) } - n, rev, err = s.KV.TxnDeleteRange(txnID, key, end) + n, rev, err = s.store.TxnDeleteRange(txnID, key, end) if err == nil { for _, kv := range kvs { s.tx.del(string(kv.Key)) @@ -117,14 +117,14 @@ func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev in } func (s *watchableStore) TxnEnd(txnID int64) error { - err := s.KV.TxnEnd(txnID) + err := s.store.TxnEnd(txnID) if err != nil { return err } - _, rev, _ := s.KV.Range(nil, nil, 0, 0) + _, rev, _ := s.store.Range(nil, nil, 0, 0) for k := range s.tx.putm { - kvs, _, err := s.KV.Range([]byte(k), nil, 0, 0) + kvs, _, err := s.store.Range([]byte(k), nil, 0, 0) if err != nil { log.Panicf("unexpected range error (%v)", err) } @@ -148,7 +148,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error { func (s *watchableStore) Close() error { close(s.stopc) s.wg.Wait() - return s.KV.Close() + return s.store.Close() } func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc) { @@ -211,7 +211,7 @@ func (s *watchableStore) syncWatchersLoop() { // syncWatchers syncs the watchers in the unsyncd map. func (s *watchableStore) syncWatchers() { - _, curRev, _ := s.KV.Range(nil, nil, 0, 0) + _, curRev, _ := s.store.Range(nil, nil, 0, 0) // filtering without allocating // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating @@ -229,7 +229,7 @@ func (s *watchableStore) syncWatchers() { nws = append(nws, w) continue } - evs, nextRev, err := s.KV.(*store).RangeEvents(w.key, end, int64(limit), w.cur) + evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur) if err != nil { w.stopWithError(err) continue