From c8bbb8c53e5033b77c7a7bc744aed6c7beb46dfc Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 18 Aug 2016 16:44:40 -0700 Subject: [PATCH] grpc-proxy: invalidate cache entries when there is a put/delete --- proxy/grpcproxy/cache/store.go | 59 ++++++++++++++++++++++++++++++++-- proxy/grpcproxy/kv.go | 10 ++++-- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/proxy/grpcproxy/cache/store.go b/proxy/grpcproxy/cache/store.go index 994621e78..ccbb18d37 100644 --- a/proxy/grpcproxy/cache/store.go +++ b/proxy/grpcproxy/cache/store.go @@ -20,6 +20,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/adt" "github.com/golang/groupcache/lru" ) @@ -32,6 +33,7 @@ type Cache interface { Add(req *pb.RangeRequest, resp *pb.RangeResponse) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) Compact(revision int64) + Invalidate(key []byte, endkey []byte) } // keyFunc returns the key of an request, which is used to look up in the cache for it's caching response. @@ -53,8 +55,12 @@ func NewCache(maxCacheEntries int) Cache { // cache implements Cache type cache struct { - mu sync.RWMutex - lru *lru.Cache + mu sync.RWMutex + lru *lru.Cache + + // a reverse index for cache invalidation + cachedRanges adt.IntervalTree + compactedRev int64 } @@ -68,6 +74,29 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) { if req.Revision > c.compactedRev { c.lru.Add(key, resp) } + // we do not need to invalidate a request with a revision specified. + // so we do not need to add it into the reverse index. + if req.Revision != 0 { + return + } + + var ( + iv *adt.IntervalValue + ivl adt.Interval + ) + if len(req.RangeEnd) != 0 { + ivl = adt.NewStringAffineInterval(string(req.Key), string(req.RangeEnd)) + } else { + ivl = adt.NewStringAffinePoint(string(req.Key)) + } + + iv = c.cachedRanges.Find(ivl) + + if iv == nil { + c.cachedRanges.Insert(ivl, []string{key}) + } else { + iv.Val = append(iv.Val.([]string), key) + } } // Get looks up the caching response for a given request. @@ -89,6 +118,32 @@ func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) { return nil, errors.New("not exist") } +// Invalidate invalidates the cache entries that intersecting with the given range from key to endkey. +func (c *cache) Invalidate(key, endkey []byte) { + c.mu.Lock() + defer c.mu.Unlock() + + var ( + ivs []*adt.IntervalValue + ivl adt.Interval + ) + if len(endkey) == 0 { + ivl = adt.NewStringAffinePoint(string(key)) + } else { + ivl = adt.NewStringAffineInterval(string(key), string(endkey)) + } + + ivs = c.cachedRanges.Stab(ivl) + c.cachedRanges.Delete(ivl) + + for _, iv := range ivs { + keys := iv.Val.([]string) + for _, key := range keys { + c.lru.Remove(key) + } + } +} + // Compact invalidate all caching response before the given rev. // Replace with the invalidation is lazy. The actual removal happens when the entries is accessed. func (c *cache) Compact(revision int64) { diff --git a/proxy/grpcproxy/kv.go b/proxy/grpcproxy/kv.go index db64b2a5b..feb10a4fb 100644 --- a/proxy/grpcproxy/kv.go +++ b/proxy/grpcproxy/kv.go @@ -35,7 +35,6 @@ func NewKvProxy(c *clientv3.Client) pb.KVServer { } func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { - // if request set Serializable, serve it from local cache first if r.Serializable { resp, err := p.cache.Get(r) switch err { @@ -51,17 +50,22 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo return nil, err } - p.cache.Add(r, (*pb.RangeResponse)(resp.Get())) + // cache linearizable as serializable + r.Serializable = true + gresp := (*pb.RangeResponse)(resp.Get()) + p.cache.Add(r, gresp) - return (*pb.RangeResponse)(resp.Get()), nil + return gresp, nil } func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + p.cache.Invalidate(r.Key, nil) resp, err := p.kv.Do(ctx, PutRequestToOp(r)) return (*pb.PutResponse)(resp.Put()), err } func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + p.cache.Invalidate(r.Key, r.RangeEnd) resp, err := p.kv.Do(ctx, DelRequestToOp(r)) return (*pb.DeleteRangeResponse)(resp.Del()), err }