From 6d8c647db8d8b6ceaecd3f0c81eecb9691879586 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 28 Jun 2016 22:25:10 -0700 Subject: [PATCH] *: initial implementation of grpc-proxy --- etcdmain/grpc_proxy.go | 132 +++++++++++++++++++++++++++++++++ etcdmain/main.go | 2 +- proxy/grpcproxy/cache/store.go | 8 +- proxy/grpcproxy/kv.go | 15 +++- 4 files changed, 149 insertions(+), 8 deletions(-) create mode 100644 etcdmain/grpc_proxy.go diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go new file mode 100644 index 000000000..eb2a3dd5a --- /dev/null +++ b/etcdmain/grpc_proxy.go @@ -0,0 +1,132 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdmain + +import ( + "fmt" + "net" + "os" + "time" + + "github.com/coreos/etcd/clientv3" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/transport" + "github.com/coreos/etcd/proxy/grpcproxy" + + "github.com/spf13/cobra" + "google.golang.org/grpc" +) + +var ( + grpcProxyListenAddr string + grpcProxyEndpoints []string + grpcProxyCert string + grpcProxyKey string + grpcProxyCA string +) + +func init() { + rootCmd.AddCommand(newGRPCProxyCommand()) +} + +// newGRPCProxyCommand returns the cobra command for "grpc-proxy". +func newGRPCProxyCommand() *cobra.Command { + lpc := &cobra.Command{ + Use: "grpc-proxy ", + Short: "grpc-proxy related command", + } + lpc.AddCommand(newGRPCProxyStartCommand()) + + return lpc +} + +func newGRPCProxyStartCommand() *cobra.Command { + cmd := cobra.Command{ + Use: "start", + Short: "start the grpc proxy", + Run: startGRPCProxy, + } + + cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address") + cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints") + cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file") + cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file") + cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle") + + return &cmd +} + +func startGRPCProxy(cmd *cobra.Command, args []string) { + l, err := net.Listen("tcp", grpcProxyListenAddr) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + cfg, err := newClientCfg() + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + client, err := clientv3.New(*cfg) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + kvp := grpcproxy.NewKvProxy(client) + + server := grpc.NewServer() + pb.RegisterKVServer(server, kvp) + + server.Serve(l) +} + +func newClientCfg() (*clientv3.Config, error) { + // set tls if any one tls option set + var cfgtls *transport.TLSInfo + tlsinfo := transport.TLSInfo{} + if grpcProxyCert != "" { + tlsinfo.CertFile = grpcProxyCert + cfgtls = &tlsinfo + } + + if grpcProxyKey != "" { + tlsinfo.KeyFile = grpcProxyKey + cfgtls = &tlsinfo + } + + if grpcProxyCA != "" { + tlsinfo.CAFile = grpcProxyCA + cfgtls = &tlsinfo + } + + cfg := clientv3.Config{ + Endpoints: grpcProxyEndpoints, + DialTimeout: 5 * time.Second, + } + if cfgtls != nil { + clientTLS, err := cfgtls.ClientConfig() + if err != nil { + return nil, err + } + cfg.TLS = clientTLS + } + + // TODO: support insecure tls + + return &cfg, nil +} diff --git a/etcdmain/main.go b/etcdmain/main.go index fb14a98c8..87996f7c2 100644 --- a/etcdmain/main.go +++ b/etcdmain/main.go @@ -24,7 +24,7 @@ func Main() { if len(os.Args) > 1 { switch os.Args[1] { - case "gateway": + case "gateway", "grpc-proxy": if err := rootCmd.Execute(); err != nil { fmt.Fprint(os.Stderr, err) os.Exit(1) diff --git a/proxy/grpcproxy/cache/store.go b/proxy/grpcproxy/cache/store.go index 38fbd1ee1..994621e78 100644 --- a/proxy/grpcproxy/cache/store.go +++ b/proxy/grpcproxy/cache/store.go @@ -15,6 +15,7 @@ package cache import ( + "errors" "sync" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" @@ -45,7 +46,8 @@ func keyFunc(req *pb.RangeRequest) string { func NewCache(maxCacheEntries int) Cache { return &cache{ - lru: lru.New(maxCacheEntries), + lru: lru.New(maxCacheEntries), + compactedRev: -1, } } @@ -76,7 +78,7 @@ func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) { c.mu.Lock() defer c.mu.Unlock() - if req.Revision > c.compactedRev { + if req.Revision < c.compactedRev { c.lru.Remove(key) return nil, ErrCompacted } @@ -84,7 +86,7 @@ func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) { if resp, ok := c.lru.Get(key); ok { return resp.(*pb.RangeResponse), nil } - return nil, nil + return nil, errors.New("not exist") } // Compact invalidate all caching response before the given rev. diff --git a/proxy/grpcproxy/kv.go b/proxy/grpcproxy/kv.go index c8c72567a..260982290 100644 --- a/proxy/grpcproxy/kv.go +++ b/proxy/grpcproxy/kv.go @@ -18,6 +18,7 @@ import ( "github.com/coreos/etcd/clientv3" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/proxy/grpcproxy/cache" + "golang.org/x/net/context" ) @@ -36,17 +37,23 @@ func NewKvProxy(c *clientv3.Client) *kvProxy { func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { // if request set Serializable, serve it from local cache first if r.Serializable { - if resp, err := p.cache.Get(r); err == nil || err == cache.ErrCompacted { - return resp, err + resp, err := p.cache.Get(r) + switch err { + case nil: + return resp, nil + case cache.ErrCompacted: + return nil, err } } resp, err := p.c.Do(ctx, RangeRequestToOp(r)) if err != nil { - p.cache.Add(r, (*pb.RangeResponse)(resp.Get())) + return nil, err } - return (*pb.RangeResponse)(resp.Get()), err + p.cache.Add(r, (*pb.RangeResponse)(resp.Get())) + + return (*pb.RangeResponse)(resp.Get()), nil } func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {