From d3809abe42f1b467f382fad7454aa3a780243a8c Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 21 Mar 2016 14:21:09 -0700 Subject: [PATCH] *: gRPC + HTTP on the same port We use cmux to do this since we want to do http+https on the same port in the near future too. --- V3DemoProcfile | 6 +-- clientv3/README.md | 2 +- clientv3/doc.go | 2 +- clientv3/example_cluster_test.go | 2 +- clientv3/example_test.go | 2 +- etcdctlv3/main.go | 2 +- etcdmain/config.go | 2 - etcdmain/etcd.go | 45 ++++++++----------- etcdmain/help.go | 2 - etcdmain/{http.go => serve.go} | 17 +++++-- tools/benchmark/cmd/root.go | 2 +- .../functional-tester/etcd-tester/cluster.go | 15 +++---- 12 files changed, 47 insertions(+), 52 deletions(-) rename etcdmain/{http.go => serve.go} (69%) diff --git a/V3DemoProcfile b/V3DemoProcfile index 09a288691..ecd6a8550 100644 --- a/V3DemoProcfile +++ b/V3DemoProcfile @@ -1,7 +1,7 @@ # Use goreman to run `go get github.com/mattn/goreman` # etcd1 is the default client server for etcdctlv3 commands -etcd1: bin/etcd --experimental-v3demo=true --experimental-gRPC-addr 127.0.0.1:2378 --name infra1 --listen-client-urls http://127.0.0.1:12379 --advertise-client-urls http://127.0.0.1:12379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof -etcd2: bin/etcd --experimental-v3demo=true --experimental-gRPC-addr 127.0.0.1:22378 --name infra2 --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof -etcd3: bin/etcd --experimental-v3demo=true --experimental-gRPC-addr 127.0.0.1:32378 --name infra3 --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof +etcd1: bin/etcd --experimental-v3demo=true --name infra1 --listen-client-urls http://127.0.0.1:2379 --advertise-client-urls http://127.0.0.1:2379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof +etcd2: bin/etcd --experimental-v3demo=true --name infra2 --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof +etcd3: bin/etcd --experimental-v3demo=true --name infra3 --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof # in future, use proxy to listen on 2379 #proxy: bin/etcd --name infra-proxy1 --proxy=on --listen-client-urls http://127.0.0.1:2378 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --enable-pprof diff --git a/clientv3/README.md b/clientv3/README.md index 6b9735d03..581329465 100644 --- a/clientv3/README.md +++ b/clientv3/README.md @@ -16,7 +16,7 @@ Create client using `clientv3.New`: ```go cli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{"localhost:12378", "localhost:22378", "localhost:32378"}, + Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}, DialTimeout: 5 * time.Second, }) if err != nil { diff --git a/clientv3/doc.go b/clientv3/doc.go index 49b4037bb..b1cecf726 100644 --- a/clientv3/doc.go +++ b/clientv3/doc.go @@ -17,7 +17,7 @@ // Create client using `clientv3.New`: // // cli, err := clientv3.New(clientv3.Config{ -// Endpoints: []string{"localhost:12378", "localhost:22378", "localhost:32378"}, +// Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}, // DialTimeout: 5 * time.Second, // }) // if err != nil { diff --git a/clientv3/example_cluster_test.go b/clientv3/example_cluster_test.go index 2db168fa7..24c75d229 100644 --- a/clientv3/example_cluster_test.go +++ b/clientv3/example_cluster_test.go @@ -113,7 +113,7 @@ func ExampleCluster_memberUpdate() { log.Fatal(err) } - peerURLs := []string{"http://localhost:12378"} + peerURLs := []string{"http://localhost:12380"} _, err = cli.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs) if err != nil { log.Fatal(err) diff --git a/clientv3/example_test.go b/clientv3/example_test.go index 5cf696a6a..a3f5c9b4f 100644 --- a/clientv3/example_test.go +++ b/clientv3/example_test.go @@ -25,7 +25,7 @@ import ( var ( dialTimeout = 5 * time.Second requestTimeout = 1 * time.Second - endpoints = []string{"localhost:2378", "localhost:22378", "http://localhost:32380"} + endpoints = []string{"localhost:2379", "localhost:22379", "http://localhost:32379"} ) func Example() { diff --git a/etcdctlv3/main.go b/etcdctlv3/main.go index 135c458d0..c0f4014d2 100644 --- a/etcdctlv3/main.go +++ b/etcdctlv3/main.go @@ -41,7 +41,7 @@ var ( ) func init() { - rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2378", "127.0.0.1:22378", "127.0.0.1:32378"}, "gRPC endpoints") + rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"}, "gRPC endpoints") rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (simple, json, protobuf)") rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings") diff --git a/etcdmain/config.go b/etcdmain/config.go index df8ba5e8d..7823f1b63 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -123,7 +123,6 @@ type config struct { printVersion bool v3demo bool - gRPCAddr string autoCompactionRetention int enablePprof bool @@ -226,7 +225,6 @@ func NewConfig() *config { // demo flag fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API.") - fs.StringVar(&cfg.gRPCAddr, "experimental-gRPC-addr", "127.0.0.1:2378", "gRPC address for experimental v3 demo API.") fs.IntVar(&cfg.autoCompactionRetention, "experimental-auto-compaction-retention", 0, "Auto compaction retention in hour. 0 means disable auto compaction.") // backwards-compatibility with v0.4.6 diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index d19557bca..69138b680 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -35,6 +35,7 @@ import ( systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc" @@ -281,15 +282,6 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { clns = append(clns, l) } - var v3l net.Listener - if cfg.v3demo { - v3l, err = net.Listen("tcp", cfg.gRPCAddr) - if err != nil { - plog.Fatal(err) - } - plog.Infof("listening for client rpc on %s", cfg.gRPCAddr) - } - srvcfg := &etcdserver.ServerConfig{ Name: cfg.name, ClientURLs: cfg.acurls, @@ -329,10 +321,26 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { Info: cfg.corsInfo, } ph := etcdhttp.NewPeerHandler(s) + + var grpcS *grpc.Server + if cfg.v3demo { + // set up v3 demo rpc + tls := &cfg.clientTLSInfo + if cfg.clientTLSInfo.Empty() { + tls = nil + } + grpcS, err = v3rpc.Server(s, tls) + if err != nil { + s.Stop() + <-s.StopNotify() + return nil, err + } + } + // Start the peer server in a goroutine for _, l := range plns { go func(l net.Listener) { - plog.Fatal(serveHTTP(l, ph, 5*time.Minute)) + plog.Fatal(serve(l, nil, ph, 5*time.Minute)) }(l) } // Start a client server goroutine for each listen address @@ -340,25 +348,10 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { go func(l net.Listener) { // read timeout does not work with http close notify // TODO: https://github.com/golang/go/issues/9524 - plog.Fatal(serveHTTP(l, ch, 0)) + plog.Fatal(serve(l, grpcS, ch, 0)) }(l) } - if cfg.v3demo { - // set up v3 demo rpc - tls := &cfg.clientTLSInfo - if cfg.clientTLSInfo.Empty() { - tls = nil - } - grpcServer, err := v3rpc.Server(s, tls) - if err != nil { - s.Stop() - <-s.StopNotify() - return nil, err - } - go func() { plog.Fatal(grpcServer.Serve(v3l)) }() - } - return s.StopNotify(), nil } diff --git a/etcdmain/help.go b/etcdmain/help.go index 2e4bb6e5b..5379c36f0 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -139,8 +139,6 @@ experimental flags: enable experimental v3 demo API. --experimental-auto-compaction-retention '0' auto compaction retention in hour. 0 means disable auto compaction. - --experimental-gRPC-addr '127.0.0.1:2378' - gRPC address for experimental v3 demo API. profiling flags: --enable-pprof 'false' diff --git a/etcdmain/http.go b/etcdmain/serve.go similarity index 69% rename from etcdmain/http.go rename to etcdmain/serve.go index d45996605..7df942c75 100644 --- a/etcdmain/http.go +++ b/etcdmain/serve.go @@ -20,14 +20,25 @@ import ( "net" "net/http" "time" + + "github.com/cockroachdb/cmux" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" ) -// serveHTTP accepts incoming HTTP connections on the listener l, +// serve accepts incoming connections on the listener l, // creating a new service goroutine for each. The service goroutines // read requests and then call handler to reply to them. -func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) error { +func serve(l net.Listener, grpcS *grpc.Server, handler http.Handler, readTimeout time.Duration) error { // TODO: assert net.Listener type? Arbitrary listener might break HTTPS server which // expect a TLS Conn type. + httpl := l + if grpcS != nil { + m := cmux.New(l) + grpcl := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) + httpl = m.Match(cmux.Any()) + go plog.Fatal(m.Serve()) + go plog.Fatal(grpcS.Serve(grpcl)) + } logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) // TODO: add debug flag; enable logging when debug flag is set @@ -36,5 +47,5 @@ func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) ReadTimeout: readTimeout, ErrorLog: logger, // do not log user error } - return srv.Serve(l) + return srv.Serve(httpl) } diff --git a/tools/benchmark/cmd/root.go b/tools/benchmark/cmd/root.go index ad843d630..5fd050d7b 100644 --- a/tools/benchmark/cmd/root.go +++ b/tools/benchmark/cmd/root.go @@ -49,7 +49,7 @@ var ( ) func init() { - RootCmd.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"127.0.0.1:2378"}, "gRPC endpoints") + RootCmd.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints") RootCmd.PersistentFlags().UintVar(&totalConns, "conns", 1, "Total number of gRPC connections") RootCmd.PersistentFlags().UintVar(&totalClients, "clients", 1, "Total number of gRPC clients") diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index c1256080d..c60f883cd 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -44,7 +44,6 @@ type cluster struct { Agents []client.Agent Stressers []Stresser Names []string - GRPCURLs []string ClientURLs []string } @@ -89,7 +88,6 @@ func (c *cluster) Bootstrap() error { if err != nil { return err } - grpcURLs[i] = fmt.Sprintf("%s:2378", host) clientURLs[i] = fmt.Sprintf("http://%s:2379", host) peerURLs[i] = fmt.Sprintf("http://%s:%d", host, peerURLPort) @@ -115,9 +113,7 @@ func (c *cluster) Bootstrap() error { } if !c.v2Only { flags = append(flags, - "--experimental-v3demo", - "--experimental-gRPC-addr", grpcURLs[i], - ) + "--experimental-v3demo") } if _, err := a.Start(flags...); err != nil { @@ -161,7 +157,6 @@ func (c *cluster) Bootstrap() error { c.Agents = agents c.Stressers = stressers c.Names = names - c.GRPCURLs = grpcURLs c.ClientURLs = clientURLs return nil } @@ -172,7 +167,7 @@ func (c *cluster) WaitHealth() error { // TODO: set it to a reasonable value. It is set that high because // follower may use long time to catch up the leader when reboot under // reasonable workload (https://github.com/coreos/etcd/issues/2698) - healthFunc, urls := setHealthKey, c.GRPCURLs + healthFunc, urls := setHealthKey, c.ClientURLs if c.v2Only { healthFunc, urls = setHealthKeyV2, c.ClientURLs } @@ -192,7 +187,7 @@ func (c *cluster) GetLeader() (int, error) { return 0, nil } cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.GRPCURLs, + Endpoints: c.ClientURLs, DialTimeout: 5 * time.Second, }) if err != nil { @@ -304,7 +299,7 @@ func setHealthKeyV2(us []string) error { func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) { revs := make(map[string]int64) hashes := make(map[string]int64) - for _, u := range c.GRPCURLs { + for _, u := range c.ClientURLs { conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second)) if err != nil { return nil, nil, err @@ -328,7 +323,7 @@ func (c *cluster) compactKV(rev int64) error { conn *grpc.ClientConn err error ) - for _, u := range c.GRPCURLs { + for _, u := range c.ClientURLs { conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second)) if err != nil { continue