From 0f7374ce89941876f9681d409deb9fa5afe6c7d2 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 20 Oct 2015 16:10:52 -0700 Subject: [PATCH] storage: KV field -> store field in watchableStore We need to access the underlying store to use its RangeEvents function. It is not good to use unnecessary type conversion. The underlying store is also needed for further store upon watchableStore. --- storage/watchable_store.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 242901152..5126baba6 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -25,7 +25,7 @@ import ( type watchableStore struct { mu sync.Mutex - KV + *store // contains all unsynced watchers that needs to sync events that have happened // TODO: use map to reduce cancel cost @@ -41,7 +41,7 @@ type watchableStore struct { func newWatchableStore(path string) *watchableStore { s := &watchableStore{ - KV: newStore(path), + store: newStore(path), synced: make(map[string][]*watcher), stopc: make(chan struct{}), } @@ -54,9 +54,9 @@ func (s *watchableStore) Put(key, value []byte) (rev int64) { s.mu.Lock() defer s.mu.Unlock() - rev = s.KV.Put(key, value) + rev = s.store.Put(key, value) // TODO: avoid this range - kvs, _, err := s.KV.Range(key, nil, 0, rev) + kvs, _, err := s.store.Range(key, nil, 0, rev) if err != nil { log.Panicf("unexpected range error (%v)", err) } @@ -72,11 +72,11 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { defer s.mu.Unlock() // TODO: avoid this range - kvs, _, err := s.KV.Range(key, end, 0, 0) + kvs, _, err := s.store.Range(key, end, 0, 0) if err != nil { log.Panicf("unexpected range error (%v)", err) } - n, rev = s.KV.DeleteRange(key, end) + n, rev = s.store.DeleteRange(key, end) for _, kv := range kvs { s.handle(rev, storagepb.Event{ Type: storagepb.DELETE, @@ -91,11 +91,11 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { func (s *watchableStore) TxnBegin() int64 { s.mu.Lock() s.tx = newOngoingTx() - return s.KV.TxnBegin() + return s.store.TxnBegin() } func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { - rev, err = s.KV.TxnPut(txnID, key, value) + rev, err = s.store.TxnPut(txnID, key, value) if err == nil { s.tx.put(string(key)) } @@ -103,11 +103,11 @@ func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err } func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { - kvs, _, err := s.KV.TxnRange(txnID, key, end, 0, 0) + kvs, _, err := s.store.TxnRange(txnID, key, end, 0, 0) if err != nil { log.Panicf("unexpected range error (%v)", err) } - n, rev, err = s.KV.TxnDeleteRange(txnID, key, end) + n, rev, err = s.store.TxnDeleteRange(txnID, key, end) if err == nil { for _, kv := range kvs { s.tx.del(string(kv.Key)) @@ -117,14 +117,14 @@ func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev in } func (s *watchableStore) TxnEnd(txnID int64) error { - err := s.KV.TxnEnd(txnID) + err := s.store.TxnEnd(txnID) if err != nil { return err } - _, rev, _ := s.KV.Range(nil, nil, 0, 0) + _, rev, _ := s.store.Range(nil, nil, 0, 0) for k := range s.tx.putm { - kvs, _, err := s.KV.Range([]byte(k), nil, 0, 0) + kvs, _, err := s.store.Range([]byte(k), nil, 0, 0) if err != nil { log.Panicf("unexpected range error (%v)", err) } @@ -148,7 +148,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error { func (s *watchableStore) Close() error { close(s.stopc) s.wg.Wait() - return s.KV.Close() + return s.store.Close() } func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc) { @@ -211,7 +211,7 @@ func (s *watchableStore) syncWatchersLoop() { // syncWatchers syncs the watchers in the unsyncd map. func (s *watchableStore) syncWatchers() { - _, curRev, _ := s.KV.Range(nil, nil, 0, 0) + _, curRev, _ := s.store.Range(nil, nil, 0, 0) // filtering without allocating // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating @@ -229,7 +229,7 @@ func (s *watchableStore) syncWatchers() { nws = append(nws, w) continue } - evs, nextRev, err := s.KV.(*store).RangeEvents(w.key, end, int64(limit), w.cur) + evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur) if err != nil { w.stopWithError(err) continue