From 1767788074843bbc9d203701e34d9dc678e3e0d7 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Thu, 28 Jan 2016 19:07:35 -0800 Subject: [PATCH] *: expose integration functions for clientv3 --- clientv3/integration/client_test.go | 63 +++ clientv3/integration/doc.go | 17 + clientv3/integration/main_test.go | 20 + integration/cluster.go | 674 ++++++++++++++++++++++++++ integration/cluster_test.go | 626 +----------------------- integration/v3_barrier_test.go | 4 +- integration/v3_double_barrier_test.go | 6 +- integration/v3_election_test.go | 4 +- integration/v3_grpc_test.go | 77 +-- integration/v3_lock_test.go | 10 +- integration/v3_queue_test.go | 16 +- integration/v3_stm_test.go | 6 +- test | 3 +- 13 files changed, 826 insertions(+), 700 deletions(-) create mode 100644 clientv3/integration/client_test.go create mode 100644 clientv3/integration/doc.go create mode 100644 clientv3/integration/main_test.go create mode 100644 integration/cluster.go diff --git a/clientv3/integration/client_test.go b/clientv3/integration/client_test.go new file mode 100644 index 000000000..12b70b443 --- /dev/null +++ b/clientv3/integration/client_test.go @@ -0,0 +1,63 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "bytes" + "testing" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/pkg/testutil" +) + +func TestKVPut(t *testing.T) { + defer testutil.AfterTest(t) + + tests := []struct { + key, val string + leaseID lease.LeaseID + }{ + {"foo", "bar", lease.NoLease}, + + // TODO: test with leaseID + } + + for i, tt := range tests { + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + kv := clientv3.NewKV(clus.RandClient()) + + if _, err := kv.Put(tt.key, tt.val, tt.leaseID); err != nil { + t.Fatalf("#%d: couldn't put (%v)", i, tt.key, err) + } + + resp, err := kv.Get(tt.key, 0) + if err != nil { + t.Fatalf("#%d: couldn't get key (%v)", i, err) + } + if len(resp.Kvs) != 1 { + t.Fatalf("#%d: expected 1 key, got %d", i, len(resp.Kvs)) + } + if !bytes.Equal([]byte(tt.val), resp.Kvs[0].Value) { + t.Errorf("#%d: val = %s, want %s", i, tt.val, resp.Kvs[0].Value) + } + if tt.leaseID != lease.LeaseID(resp.Kvs[0].Lease) { + t.Errorf("#%d: val = %d, want %d", i, tt.leaseID, resp.Kvs[0].Lease) + } + } +} diff --git a/clientv3/integration/doc.go b/clientv3/integration/doc.go new file mode 100644 index 000000000..7e3a1ccec --- /dev/null +++ b/clientv3/integration/doc.go @@ -0,0 +1,17 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package integration implements tests built upon embedded etcd, and focuses on +// correctness of etcd client. +package integration diff --git a/clientv3/integration/main_test.go b/clientv3/integration/main_test.go new file mode 100644 index 000000000..2913ce511 --- /dev/null +++ b/clientv3/integration/main_test.go @@ -0,0 +1,20 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package integration + +import ( + "os" + "testing" + + "github.com/coreos/etcd/pkg/testutil" +) + +func TestMain(m *testing.M) { + v := m.Run() + if v == 0 && testutil.CheckLeakedGoroutine() { + os.Exit(1) + } + os.Exit(v) +} diff --git a/integration/cluster.go b/integration/cluster.go new file mode 100644 index 000000000..5a740a733 --- /dev/null +++ b/integration/cluster.go @@ -0,0 +1,674 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package recipe +package integration + +import ( + "fmt" + "io/ioutil" + "math/rand" + "net" + "net/http" + "net/http/httptest" + "os" + "reflect" + "sort" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + + "github.com/coreos/etcd/client" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc" + "github.com/coreos/etcd/etcdserver/etcdhttp" + "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/pkg/transport" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/rafthttp" +) + +const ( + tickDuration = 10 * time.Millisecond + clusterName = "etcd" + requestTimeout = 20 * time.Second +) + +var ( + electionTicks = 10 + + // integration test uses well-known ports to listen for each running member, + // which ensures restarted member could listen on specific port again. + nextListenPort int64 = 20000 +) + +type ClusterConfig struct { + Size int + UsePeerTLS bool + DiscoveryURL string + UseV3 bool + UseGRPC bool +} + +type cluster struct { + cfg *ClusterConfig + Members []*member +} + +func (c *cluster) fillClusterForMembers() error { + if c.cfg.DiscoveryURL != "" { + // cluster will be discovered + return nil + } + + addrs := make([]string, 0) + for _, m := range c.Members { + scheme := "http" + if !m.PeerTLSInfo.Empty() { + scheme = "https" + } + for _, l := range m.PeerListeners { + addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String())) + } + } + clusterStr := strings.Join(addrs, ",") + var err error + for _, m := range c.Members { + m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) + if err != nil { + return err + } + } + return nil +} + +func newCluster(t *testing.T, cfg *ClusterConfig) *cluster { + c := &cluster{cfg: cfg} + ms := make([]*member, cfg.Size) + for i := 0; i < cfg.Size; i++ { + ms[i] = c.mustNewMember(t) + } + c.Members = ms + if err := c.fillClusterForMembers(); err != nil { + t.Fatal(err) + } + + return c +} + +// NewCluster returns an unlaunched cluster of the given size which has been +// set to use static bootstrap. +func NewCluster(t *testing.T, size int) *cluster { + return newCluster(t, &ClusterConfig{Size: size}) +} + +// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration +func NewClusterByConfig(t *testing.T, cfg *ClusterConfig) *cluster { + return newCluster(t, cfg) +} + +func (c *cluster) Launch(t *testing.T) { + errc := make(chan error) + for _, m := range c.Members { + // Members are launched in separate goroutines because if they boot + // using discovery url, they have to wait for others to register to continue. + go func(m *member) { + errc <- m.Launch() + }(m) + } + for range c.Members { + if err := <-errc; err != nil { + t.Fatalf("error setting up member: %v", err) + } + } + // wait cluster to be stable to receive future client requests + c.waitMembersMatch(t, c.HTTPMembers()) + c.waitVersion() +} + +func (c *cluster) URL(i int) string { + return c.Members[i].ClientURLs[0].String() +} + +func (c *cluster) URLs() []string { + urls := make([]string, 0) + for _, m := range c.Members { + for _, u := range m.ClientURLs { + urls = append(urls, u.String()) + } + } + return urls +} + +func (c *cluster) HTTPMembers() []client.Member { + ms := make([]client.Member, len(c.Members)) + for i, m := range c.Members { + scheme := "http" + if !m.PeerTLSInfo.Empty() { + scheme = "https" + } + ms[i].Name = m.Name + for _, ln := range m.PeerListeners { + ms[i].PeerURLs = append(ms[i].PeerURLs, scheme+"://"+ln.Addr().String()) + } + for _, ln := range m.ClientListeners { + ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String()) + } + } + return ms +} + +func (c *cluster) mustNewMember(t *testing.T) *member { + name := c.name(rand.Int()) + m := mustNewMember(t, name, c.cfg.UsePeerTLS) + m.DiscoveryURL = c.cfg.DiscoveryURL + m.V3demo = c.cfg.UseV3 + if c.cfg.UseGRPC { + if err := m.listenGRPC(); err != nil { + t.Fatal(err) + } + } + return m +} + +func (c *cluster) addMember(t *testing.T) { + m := c.mustNewMember(t) + + scheme := "http" + if c.cfg.UsePeerTLS { + scheme = "https" + } + + // send add request to the cluster + cc := mustNewHTTPClient(t, []string{c.URL(0)}) + ma := client.NewMembersAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + peerURL := scheme + "://" + m.PeerListeners[0].Addr().String() + if _, err := ma.Add(ctx, peerURL); err != nil { + t.Fatalf("add member on %s error: %v", c.URL(0), err) + } + cancel() + + // wait for the add node entry applied in the cluster + members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) + c.waitMembersMatch(t, members) + + m.InitialPeerURLsMap = types.URLsMap{} + for _, mm := range c.Members { + m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs + } + m.InitialPeerURLsMap[m.Name] = m.PeerURLs + m.NewCluster = false + if err := m.Launch(); err != nil { + t.Fatal(err) + } + c.Members = append(c.Members, m) + // wait cluster to be stable to receive future client requests + c.waitMembersMatch(t, c.HTTPMembers()) +} + +func (c *cluster) AddMember(t *testing.T) { + c.addMember(t) +} + +func (c *cluster) RemoveMember(t *testing.T, id uint64) { + // send remove request to the cluster + cc := mustNewHTTPClient(t, c.URLs()) + ma := client.NewMembersAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + if err := ma.Remove(ctx, types.ID(id).String()); err != nil { + t.Fatalf("unexpected remove error %v", err) + } + cancel() + newMembers := make([]*member, 0) + for _, m := range c.Members { + if uint64(m.s.ID()) != id { + newMembers = append(newMembers, m) + } else { + select { + case <-m.s.StopNotify(): + m.Terminate(t) + // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout + // TODO: remove connection write timeout by selecting on http response closeNotifier + // blocking on https://github.com/golang/go/issues/9524 + case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout): + t.Fatalf("failed to remove member %s in time", m.s.ID()) + } + } + } + c.Members = newMembers + c.waitMembersMatch(t, c.HTTPMembers()) +} + +func (c *cluster) Terminate(t *testing.T) { + for _, m := range c.Members { + m.Terminate(t) + } +} + +func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { + for _, u := range c.URLs() { + cc := mustNewHTTPClient(t, []string{u}) + ma := client.NewMembersAPI(cc) + for { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + ms, err := ma.List(ctx) + cancel() + if err == nil && isMembersEqual(ms, membs) { + break + } + time.Sleep(tickDuration) + } + } + return +} + +func (c *cluster) waitLeader(t *testing.T, membs []*member) { + possibleLead := make(map[uint64]bool) + var lead uint64 + for _, m := range membs { + possibleLead[uint64(m.s.ID())] = true + } + + for lead == 0 || !possibleLead[lead] { + lead = 0 + for _, m := range membs { + if lead != 0 && lead != m.s.Lead() { + lead = 0 + break + } + lead = m.s.Lead() + } + time.Sleep(10 * tickDuration) + } +} + +func (c *cluster) waitVersion() { + for _, m := range c.Members { + for { + if m.s.ClusterVersion() != nil { + break + } + time.Sleep(tickDuration) + } + } +} + +func (c *cluster) name(i int) string { + return fmt.Sprint("node", i) +} + +// isMembersEqual checks whether two members equal except ID field. +// The given wmembs should always set ID field to empty string. +func isMembersEqual(membs []client.Member, wmembs []client.Member) bool { + sort.Sort(SortableMemberSliceByPeerURLs(membs)) + sort.Sort(SortableMemberSliceByPeerURLs(wmembs)) + for i := range membs { + membs[i].ID = "" + } + return reflect.DeepEqual(membs, wmembs) +} + +func newLocalListener(t *testing.T) net.Listener { + port := atomic.AddInt64(&nextListenPort, 1) + l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(port, 10)) + if err != nil { + t.Fatal(err) + } + return l +} + +func newListenerWithAddr(t *testing.T, addr string) net.Listener { + var err error + var l net.Listener + // TODO: we want to reuse a previous closed port immediately. + // a better way is to set SO_REUSExx instead of doing retry. + for i := 0; i < 5; i++ { + l, err = net.Listen("tcp", addr) + if err == nil { + break + } + time.Sleep(500 * time.Millisecond) + } + if err != nil { + t.Fatal(err) + } + return l +} + +type member struct { + etcdserver.ServerConfig + PeerListeners, ClientListeners []net.Listener + grpcListener net.Listener + // inited PeerTLSInfo implies to enable peer TLS + PeerTLSInfo transport.TLSInfo + + raftHandler *testutil.PauseableHandler + s *etcdserver.EtcdServer + hss []*httptest.Server + + grpcServer *grpc.Server + grpcAddr string +} + +// mustNewMember return an inited member with the given name. If usePeerTLS is +// true, it will set PeerTLSInfo and use https scheme to communicate between +// peers. +func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { + var ( + testTLSInfo = transport.TLSInfo{ + KeyFile: "./fixtures/server.key.insecure", + CertFile: "./fixtures/server.crt", + TrustedCAFile: "./fixtures/ca.crt", + ClientCertAuth: true, + } + err error + ) + m := &member{} + + peerScheme := "http" + if usePeerTLS { + peerScheme = "https" + } + + pln := newLocalListener(t) + m.PeerListeners = []net.Listener{pln} + m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()}) + if err != nil { + t.Fatal(err) + } + if usePeerTLS { + m.PeerTLSInfo = testTLSInfo + } + + cln := newLocalListener(t) + m.ClientListeners = []net.Listener{cln} + m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) + if err != nil { + t.Fatal(err) + } + + m.Name = name + + m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd") + if err != nil { + t.Fatal(err) + } + clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String()) + m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) + if err != nil { + t.Fatal(err) + } + m.InitialClusterToken = clusterName + m.NewCluster = true + m.ServerConfig.PeerTLSInfo = m.PeerTLSInfo + m.ElectionTicks = electionTicks + m.TickMs = uint(tickDuration / time.Millisecond) + return m +} + +// startGRPC starts a grpc server over a unix domain socket on the member +func (m *member) listenGRPC() error { + if m.V3demo == false { + return fmt.Errorf("starting grpc server without v3 configured") + } + m.grpcAddr = m.Name + ".sock" + if err := os.RemoveAll(m.grpcAddr); err != nil { + return err + } + l, err := net.Listen("unix", m.grpcAddr) + if err != nil { + return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) + } + m.grpcListener = l + return nil +} + +// NewClientV3 creates a new grpc client connection to the member +func NewClientV3(m *member) (*clientv3.Client, error) { + if m.grpcAddr == "" { + return nil, fmt.Errorf("member not configured for grpc") + } + f := func(a string, t time.Duration) (net.Conn, error) { + return net.Dial("unix", a) + } + unixdialer := grpc.WithDialer(f) + conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer) + if err != nil { + return nil, err + } + return clientv3.NewFromConn(conn), nil +} + +// Clone returns a member with the same server configuration. The returned +// member will not set PeerListeners and ClientListeners. +func (m *member) Clone(t *testing.T) *member { + mm := &member{} + mm.ServerConfig = m.ServerConfig + + var err error + clientURLStrs := m.ClientURLs.StringSlice() + mm.ClientURLs, err = types.NewURLs(clientURLStrs) + if err != nil { + // this should never fail + panic(err) + } + peerURLStrs := m.PeerURLs.StringSlice() + mm.PeerURLs, err = types.NewURLs(peerURLStrs) + if err != nil { + // this should never fail + panic(err) + } + clusterStr := m.InitialPeerURLsMap.String() + mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) + if err != nil { + // this should never fail + panic(err) + } + mm.InitialClusterToken = m.InitialClusterToken + mm.ElectionTicks = m.ElectionTicks + mm.PeerTLSInfo = m.PeerTLSInfo + return mm +} + +// Launch starts a member based on ServerConfig, PeerListeners +// and ClientListeners. +func (m *member) Launch() error { + var err error + if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil { + return fmt.Errorf("failed to initialize the etcd server: %v", err) + } + m.s.SyncTicker = time.Tick(500 * time.Millisecond) + m.s.Start() + + m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)} + + for _, ln := range m.PeerListeners { + hs := &httptest.Server{ + Listener: ln, + Config: &http.Server{Handler: m.raftHandler}, + } + if m.PeerTLSInfo.Empty() { + hs.Start() + } else { + hs.TLS, err = m.PeerTLSInfo.ServerConfig() + if err != nil { + return err + } + hs.StartTLS() + } + m.hss = append(m.hss, hs) + } + for _, ln := range m.ClientListeners { + hs := &httptest.Server{ + Listener: ln, + Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())}, + } + hs.Start() + m.hss = append(m.hss, hs) + } + if m.grpcListener != nil { + m.grpcServer = grpc.NewServer() + etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s)) + etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s)) + etcdserverpb.RegisterLeaseServer(m.grpcServer, v3rpc.NewLeaseServer(m.s)) + go m.grpcServer.Serve(m.grpcListener) + } + return nil +} + +func (m *member) WaitOK(t *testing.T) { + cc := mustNewHTTPClient(t, []string{m.URL()}) + kapi := client.NewKeysAPI(cc) + for { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + _, err := kapi.Get(ctx, "/", nil) + if err != nil { + time.Sleep(tickDuration) + continue + } + cancel() + break + } + for m.s.Leader() == 0 { + time.Sleep(tickDuration) + } +} + +func (m *member) URL() string { return m.ClientURLs[0].String() } + +func (m *member) Pause() { + m.raftHandler.Pause() + m.s.PauseSending() +} + +func (m *member) Resume() { + m.raftHandler.Resume() + m.s.ResumeSending() +} + +// Close stops the member's etcdserver and closes its connections +func (m *member) Close() { + if m.grpcServer != nil { + m.grpcServer.Stop() + m.grpcServer = nil + } + m.s.Stop() + for _, hs := range m.hss { + hs.CloseClientConnections() + hs.Close() + } +} + +// Stop stops the member, but the data dir of the member is preserved. +func (m *member) Stop(t *testing.T) { + m.Close() + m.hss = nil +} + +// Restart starts the member using the preserved data dir. +func (m *member) Restart(t *testing.T) error { + newPeerListeners := make([]net.Listener, 0) + for _, ln := range m.PeerListeners { + newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String())) + } + m.PeerListeners = newPeerListeners + newClientListeners := make([]net.Listener, 0) + for _, ln := range m.ClientListeners { + newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String())) + } + m.ClientListeners = newClientListeners + + if m.grpcListener != nil { + if err := m.listenGRPC(); err != nil { + t.Fatal(err) + } + } + + return m.Launch() +} + +// Terminate stops the member and removes the data dir. +func (m *member) Terminate(t *testing.T) { + m.Close() + if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil { + t.Fatal(err) + } +} + +func mustNewHTTPClient(t *testing.T, eps []string) client.Client { + cfg := client.Config{Transport: mustNewTransport(t, transport.TLSInfo{}), Endpoints: eps} + c, err := client.New(cfg) + if err != nil { + t.Fatal(err) + } + return c +} + +func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport { + // tick in integration test is short, so 1s dial timeout could play well. + tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) + if err != nil { + t.Fatal(err) + } + return tr +} + +type SortableMemberSliceByPeerURLs []client.Member + +func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) } +func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool { + return p[i].PeerURLs[0] < p[j].PeerURLs[0] +} +func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +type ClusterV3 struct { + *cluster + clients []*clientv3.Client +} + +// NewClusterV3 returns a launched cluster with a grpc client connection +// for each cluster member. +func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 { + cfg.UseV3 = true + cfg.UseGRPC = true + clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)} + for _, m := range clus.Members { + client, err := NewClientV3(m) + if err != nil { + t.Fatal(err) + } + clus.clients = append(clus.clients, client) + } + clus.Launch(t) + return clus +} + +func (c *ClusterV3) Terminate(t *testing.T) { + for _, client := range c.clients { + if err := client.Close(); err != nil { + t.Error(err) + } + } + c.cluster.Terminate(t) +} + +func (c *ClusterV3) RandClient() *clientv3.Client { + return c.clients[rand.Intn(len(c.clients))] +} diff --git a/integration/cluster_test.go b/integration/cluster_test.go index c424ebe5a..a99033ec6 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -16,48 +16,16 @@ package integration import ( "fmt" - "io/ioutil" "log" "math/rand" - "net" - "net/http" - "net/http/httptest" "os" - "reflect" - "sort" "strconv" - "strings" - "sync/atomic" "testing" - "time" "github.com/coreos/etcd/client" - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/etcdserver/api/v3rpc" - "github.com/coreos/etcd/etcdserver/etcdhttp" - "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" - "github.com/coreos/etcd/pkg/transport" - "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" -) - -const ( - tickDuration = 10 * time.Millisecond - clusterName = "etcd" - requestTimeout = 20 * time.Second -) - -var ( - electionTicks = 10 - - // integration test uses well-known ports to listen for each running member, - // which ensures restarted member could listen on specific port again. - nextListenPort int64 = 20000 ) func init() { @@ -83,7 +51,7 @@ func testCluster(t *testing.T, size int) { func TestTLSClusterOf3(t *testing.T) { defer testutil.AfterTest(t) - c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true}) + c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true}) c.Launch(t) defer c.Terminate(t) clusterMustProgress(t, c.Members) @@ -108,7 +76,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) { c := NewClusterByConfig( t, - &clusterConfig{size: size, discoveryURL: dc.URL(0) + "/v2/keys"}, + &ClusterConfig{Size: size, DiscoveryURL: dc.URL(0) + "/v2/keys"}, ) c.Launch(t) defer c.Terminate(t) @@ -130,10 +98,10 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) { cancel() c := NewClusterByConfig(t, - &clusterConfig{ - size: 3, - usePeerTLS: true, - discoveryURL: dc.URL(0) + "/v2/keys"}, + &ClusterConfig{ + Size: 3, + UsePeerTLS: true, + DiscoveryURL: dc.URL(0) + "/v2/keys"}, ) c.Launch(t) defer c.Terminate(t) @@ -157,7 +125,7 @@ func testDoubleClusterSize(t *testing.T, size int) { func TestDoubleTLSClusterSizeOf3(t *testing.T) { defer testutil.AfterTest(t) - c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true}) + c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true}) c.Launch(t) defer c.Terminate(t) @@ -347,583 +315,3 @@ func clusterMustProgress(t *testing.T, membs []*member) { mcancel() } } - -type clusterConfig struct { - size int - usePeerTLS bool - discoveryURL string - useV3 bool - useGRPC bool -} - -type cluster struct { - cfg *clusterConfig - Members []*member -} - -func (c *cluster) fillClusterForMembers() error { - if c.cfg.discoveryURL != "" { - // cluster will be discovered - return nil - } - - addrs := make([]string, 0) - for _, m := range c.Members { - scheme := "http" - if !m.PeerTLSInfo.Empty() { - scheme = "https" - } - for _, l := range m.PeerListeners { - addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String())) - } - } - clusterStr := strings.Join(addrs, ",") - var err error - for _, m := range c.Members { - m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) - if err != nil { - return err - } - } - return nil -} - -func newCluster(t *testing.T, cfg *clusterConfig) *cluster { - c := &cluster{cfg: cfg} - ms := make([]*member, cfg.size) - for i := 0; i < cfg.size; i++ { - ms[i] = c.mustNewMember(t) - } - c.Members = ms - if err := c.fillClusterForMembers(); err != nil { - t.Fatal(err) - } - - return c -} - -// NewCluster returns an unlaunched cluster of the given size which has been -// set to use static bootstrap. -func NewCluster(t *testing.T, size int) *cluster { - return newCluster(t, &clusterConfig{size: size}) -} - -// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration -func NewClusterByConfig(t *testing.T, cfg *clusterConfig) *cluster { - return newCluster(t, cfg) -} - -func (c *cluster) Launch(t *testing.T) { - errc := make(chan error) - for _, m := range c.Members { - // Members are launched in separate goroutines because if they boot - // using discovery url, they have to wait for others to register to continue. - go func(m *member) { - errc <- m.Launch() - }(m) - } - for range c.Members { - if err := <-errc; err != nil { - t.Fatalf("error setting up member: %v", err) - } - } - // wait cluster to be stable to receive future client requests - c.waitMembersMatch(t, c.HTTPMembers()) - c.waitVersion() -} - -func (c *cluster) URL(i int) string { - return c.Members[i].ClientURLs[0].String() -} - -func (c *cluster) URLs() []string { - urls := make([]string, 0) - for _, m := range c.Members { - for _, u := range m.ClientURLs { - urls = append(urls, u.String()) - } - } - return urls -} - -func (c *cluster) HTTPMembers() []client.Member { - ms := make([]client.Member, len(c.Members)) - for i, m := range c.Members { - scheme := "http" - if !m.PeerTLSInfo.Empty() { - scheme = "https" - } - ms[i].Name = m.Name - for _, ln := range m.PeerListeners { - ms[i].PeerURLs = append(ms[i].PeerURLs, scheme+"://"+ln.Addr().String()) - } - for _, ln := range m.ClientListeners { - ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String()) - } - } - return ms -} - -func (c *cluster) mustNewMember(t *testing.T) *member { - name := c.name(rand.Int()) - m := mustNewMember(t, name, c.cfg.usePeerTLS) - m.DiscoveryURL = c.cfg.discoveryURL - m.V3demo = c.cfg.useV3 - if c.cfg.useGRPC { - if err := m.listenGRPC(); err != nil { - t.Fatal(err) - } - } - return m -} - -func (c *cluster) addMember(t *testing.T) { - m := c.mustNewMember(t) - - scheme := "http" - if c.cfg.usePeerTLS { - scheme = "https" - } - - // send add request to the cluster - cc := mustNewHTTPClient(t, []string{c.URL(0)}) - ma := client.NewMembersAPI(cc) - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - peerURL := scheme + "://" + m.PeerListeners[0].Addr().String() - if _, err := ma.Add(ctx, peerURL); err != nil { - t.Fatalf("add member on %s error: %v", c.URL(0), err) - } - cancel() - - // wait for the add node entry applied in the cluster - members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) - c.waitMembersMatch(t, members) - - m.InitialPeerURLsMap = types.URLsMap{} - for _, mm := range c.Members { - m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs - } - m.InitialPeerURLsMap[m.Name] = m.PeerURLs - m.NewCluster = false - if err := m.Launch(); err != nil { - t.Fatal(err) - } - c.Members = append(c.Members, m) - // wait cluster to be stable to receive future client requests - c.waitMembersMatch(t, c.HTTPMembers()) -} - -func (c *cluster) AddMember(t *testing.T) { - c.addMember(t) -} - -func (c *cluster) RemoveMember(t *testing.T, id uint64) { - // send remove request to the cluster - cc := mustNewHTTPClient(t, c.URLs()) - ma := client.NewMembersAPI(cc) - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - if err := ma.Remove(ctx, types.ID(id).String()); err != nil { - t.Fatalf("unexpected remove error %v", err) - } - cancel() - newMembers := make([]*member, 0) - for _, m := range c.Members { - if uint64(m.s.ID()) != id { - newMembers = append(newMembers, m) - } else { - select { - case <-m.s.StopNotify(): - m.Terminate(t) - // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout - // TODO: remove connection write timeout by selecting on http response closeNotifier - // blocking on https://github.com/golang/go/issues/9524 - case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout): - t.Fatalf("failed to remove member %s in time", m.s.ID()) - } - } - } - c.Members = newMembers - c.waitMembersMatch(t, c.HTTPMembers()) -} - -func (c *cluster) Terminate(t *testing.T) { - for _, m := range c.Members { - m.Terminate(t) - } -} - -func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { - for _, u := range c.URLs() { - cc := mustNewHTTPClient(t, []string{u}) - ma := client.NewMembersAPI(cc) - for { - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - ms, err := ma.List(ctx) - cancel() - if err == nil && isMembersEqual(ms, membs) { - break - } - time.Sleep(tickDuration) - } - } - return -} - -func (c *cluster) waitLeader(t *testing.T, membs []*member) { - possibleLead := make(map[uint64]bool) - var lead uint64 - for _, m := range membs { - possibleLead[uint64(m.s.ID())] = true - } - - for lead == 0 || !possibleLead[lead] { - lead = 0 - for _, m := range membs { - if lead != 0 && lead != m.s.Lead() { - lead = 0 - break - } - lead = m.s.Lead() - } - time.Sleep(10 * tickDuration) - } -} - -func (c *cluster) waitVersion() { - for _, m := range c.Members { - for { - if m.s.ClusterVersion() != nil { - break - } - time.Sleep(tickDuration) - } - } -} - -func (c *cluster) name(i int) string { - return fmt.Sprint("node", i) -} - -// isMembersEqual checks whether two members equal except ID field. -// The given wmembs should always set ID field to empty string. -func isMembersEqual(membs []client.Member, wmembs []client.Member) bool { - sort.Sort(SortableMemberSliceByPeerURLs(membs)) - sort.Sort(SortableMemberSliceByPeerURLs(wmembs)) - for i := range membs { - membs[i].ID = "" - } - return reflect.DeepEqual(membs, wmembs) -} - -func newLocalListener(t *testing.T) net.Listener { - port := atomic.AddInt64(&nextListenPort, 1) - l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(port, 10)) - if err != nil { - t.Fatal(err) - } - return l -} - -func newListenerWithAddr(t *testing.T, addr string) net.Listener { - var err error - var l net.Listener - // TODO: we want to reuse a previous closed port immediately. - // a better way is to set SO_REUSExx instead of doing retry. - for i := 0; i < 5; i++ { - l, err = net.Listen("tcp", addr) - if err == nil { - break - } - time.Sleep(500 * time.Millisecond) - } - if err != nil { - t.Fatal(err) - } - return l -} - -type member struct { - etcdserver.ServerConfig - PeerListeners, ClientListeners []net.Listener - grpcListener net.Listener - // inited PeerTLSInfo implies to enable peer TLS - PeerTLSInfo transport.TLSInfo - - raftHandler *testutil.PauseableHandler - s *etcdserver.EtcdServer - hss []*httptest.Server - - grpcServer *grpc.Server - grpcAddr string -} - -// mustNewMember return an inited member with the given name. If usePeerTLS is -// true, it will set PeerTLSInfo and use https scheme to communicate between -// peers. -func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { - var ( - testTLSInfo = transport.TLSInfo{ - KeyFile: "./fixtures/server.key.insecure", - CertFile: "./fixtures/server.crt", - TrustedCAFile: "./fixtures/ca.crt", - ClientCertAuth: true, - } - err error - ) - m := &member{} - - peerScheme := "http" - if usePeerTLS { - peerScheme = "https" - } - - pln := newLocalListener(t) - m.PeerListeners = []net.Listener{pln} - m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()}) - if err != nil { - t.Fatal(err) - } - if usePeerTLS { - m.PeerTLSInfo = testTLSInfo - } - - cln := newLocalListener(t) - m.ClientListeners = []net.Listener{cln} - m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) - if err != nil { - t.Fatal(err) - } - - m.Name = name - - m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd") - if err != nil { - t.Fatal(err) - } - clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String()) - m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) - if err != nil { - t.Fatal(err) - } - m.InitialClusterToken = clusterName - m.NewCluster = true - m.ServerConfig.PeerTLSInfo = m.PeerTLSInfo - m.ElectionTicks = electionTicks - m.TickMs = uint(tickDuration / time.Millisecond) - return m -} - -// startGRPC starts a grpc server over a unix domain socket on the member -func (m *member) listenGRPC() error { - if m.V3demo == false { - return fmt.Errorf("starting grpc server without v3 configured") - } - m.grpcAddr = m.Name + ".sock" - if err := os.RemoveAll(m.grpcAddr); err != nil { - return err - } - l, err := net.Listen("unix", m.grpcAddr) - if err != nil { - return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) - } - m.grpcListener = l - return nil -} - -// NewClientV3 creates a new grpc client connection to the member -func NewClientV3(m *member) (*clientv3.Client, error) { - if m.grpcAddr == "" { - return nil, fmt.Errorf("member not configured for grpc") - } - f := func(a string, t time.Duration) (net.Conn, error) { - return net.Dial("unix", a) - } - unixdialer := grpc.WithDialer(f) - conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer) - if err != nil { - return nil, err - } - return clientv3.NewFromConn(conn), nil -} - -// Clone returns a member with the same server configuration. The returned -// member will not set PeerListeners and ClientListeners. -func (m *member) Clone(t *testing.T) *member { - mm := &member{} - mm.ServerConfig = m.ServerConfig - - var err error - clientURLStrs := m.ClientURLs.StringSlice() - mm.ClientURLs, err = types.NewURLs(clientURLStrs) - if err != nil { - // this should never fail - panic(err) - } - peerURLStrs := m.PeerURLs.StringSlice() - mm.PeerURLs, err = types.NewURLs(peerURLStrs) - if err != nil { - // this should never fail - panic(err) - } - clusterStr := m.InitialPeerURLsMap.String() - mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) - if err != nil { - // this should never fail - panic(err) - } - mm.InitialClusterToken = m.InitialClusterToken - mm.ElectionTicks = m.ElectionTicks - mm.PeerTLSInfo = m.PeerTLSInfo - return mm -} - -// Launch starts a member based on ServerConfig, PeerListeners -// and ClientListeners. -func (m *member) Launch() error { - var err error - if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil { - return fmt.Errorf("failed to initialize the etcd server: %v", err) - } - m.s.SyncTicker = time.Tick(500 * time.Millisecond) - m.s.Start() - - m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)} - - for _, ln := range m.PeerListeners { - hs := &httptest.Server{ - Listener: ln, - Config: &http.Server{Handler: m.raftHandler}, - } - if m.PeerTLSInfo.Empty() { - hs.Start() - } else { - hs.TLS, err = m.PeerTLSInfo.ServerConfig() - if err != nil { - return err - } - hs.StartTLS() - } - m.hss = append(m.hss, hs) - } - for _, ln := range m.ClientListeners { - hs := &httptest.Server{ - Listener: ln, - Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())}, - } - hs.Start() - m.hss = append(m.hss, hs) - } - if m.grpcListener != nil { - m.grpcServer = grpc.NewServer() - etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s)) - etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s)) - etcdserverpb.RegisterLeaseServer(m.grpcServer, v3rpc.NewLeaseServer(m.s)) - go m.grpcServer.Serve(m.grpcListener) - } - return nil -} - -func (m *member) WaitOK(t *testing.T) { - cc := mustNewHTTPClient(t, []string{m.URL()}) - kapi := client.NewKeysAPI(cc) - for { - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - _, err := kapi.Get(ctx, "/", nil) - if err != nil { - time.Sleep(tickDuration) - continue - } - cancel() - break - } - for m.s.Leader() == 0 { - time.Sleep(tickDuration) - } -} - -func (m *member) URL() string { return m.ClientURLs[0].String() } - -func (m *member) Pause() { - m.raftHandler.Pause() - m.s.PauseSending() -} - -func (m *member) Resume() { - m.raftHandler.Resume() - m.s.ResumeSending() -} - -// Close stops the member's etcdserver and closes its connections -func (m *member) Close() { - if m.grpcServer != nil { - m.grpcServer.Stop() - m.grpcServer = nil - } - m.s.Stop() - for _, hs := range m.hss { - hs.CloseClientConnections() - hs.Close() - } -} - -// Stop stops the member, but the data dir of the member is preserved. -func (m *member) Stop(t *testing.T) { - m.Close() - m.hss = nil -} - -// Restart starts the member using the preserved data dir. -func (m *member) Restart(t *testing.T) error { - newPeerListeners := make([]net.Listener, 0) - for _, ln := range m.PeerListeners { - newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String())) - } - m.PeerListeners = newPeerListeners - newClientListeners := make([]net.Listener, 0) - for _, ln := range m.ClientListeners { - newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String())) - } - m.ClientListeners = newClientListeners - - if m.grpcListener != nil { - if err := m.listenGRPC(); err != nil { - t.Fatal(err) - } - } - - return m.Launch() -} - -// Terminate stops the member and removes the data dir. -func (m *member) Terminate(t *testing.T) { - m.Close() - if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil { - t.Fatal(err) - } -} - -func mustNewHTTPClient(t *testing.T, eps []string) client.Client { - cfg := client.Config{Transport: mustNewTransport(t, transport.TLSInfo{}), Endpoints: eps} - c, err := client.New(cfg) - if err != nil { - t.Fatal(err) - } - return c -} - -func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport { - // tick in integration test is short, so 1s dial timeout could play well. - tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) - if err != nil { - t.Fatal(err) - } - return tr -} - -type SortableMemberSliceByPeerURLs []client.Member - -func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) } -func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool { - return p[i].PeerURLs[0] < p[j].PeerURLs[0] -} -func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/integration/v3_barrier_test.go b/integration/v3_barrier_test.go index 2bcab189d..6905f67f8 100644 --- a/integration/v3_barrier_test.go +++ b/integration/v3_barrier_test.go @@ -24,14 +24,14 @@ import ( func TestBarrierSingleNode(t *testing.T) { defer testutil.AfterTest(t) - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) testBarrier(t, 5, func() *clientv3.Client { return clus.clients[0] }) } func TestBarrierMultiNode(t *testing.T) { defer testutil.AfterTest(t) - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) testBarrier(t, 5, func() *clientv3.Client { return clus.RandClient() }) } diff --git a/integration/v3_double_barrier_test.go b/integration/v3_double_barrier_test.go index 14e4bc4a6..e61b15baa 100644 --- a/integration/v3_double_barrier_test.go +++ b/integration/v3_double_barrier_test.go @@ -21,7 +21,7 @@ import ( ) func TestDoubleBarrier(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) defer closeSessionLease(clus) @@ -82,7 +82,7 @@ func TestDoubleBarrier(t *testing.T) { } func TestDoubleBarrierFailover(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) defer closeSessionLease(clus) @@ -122,7 +122,7 @@ func TestDoubleBarrierFailover(t *testing.T) { } } -func closeSessionLease(clus *clusterV3) { +func closeSessionLease(clus *ClusterV3) { for _, client := range clus.clients { recipe.StopSessionLease(client) } diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go index 1e315b1cd..67d6c8bbf 100644 --- a/integration/v3_election_test.go +++ b/integration/v3_election_test.go @@ -23,7 +23,7 @@ import ( // TestElectionWait tests if followers can correcty wait for elections. func TestElectionWait(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) defer closeSessionLease(clus) @@ -86,7 +86,7 @@ func TestElectionWait(t *testing.T) { // TestElectionFailover tests that an election will func TestElectionFailover(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) defer closeSessionLease(clus) diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 4efc1ca63..a998b77e3 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -16,7 +16,6 @@ package integration import ( "bytes" "fmt" - "math/rand" "reflect" "sort" "sync" @@ -24,7 +23,6 @@ import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" @@ -32,46 +30,11 @@ import ( "github.com/coreos/etcd/storage/storagepb" ) -type clusterV3 struct { - *cluster - clients []*clientv3.Client -} - -// newClusterV3 returns a launched cluster with a grpc client connection -// for each cluster member. -func newClusterV3(t *testing.T, cfg *clusterConfig) *clusterV3 { - cfg.useV3 = true - cfg.useGRPC = true - clus := &clusterV3{cluster: NewClusterByConfig(t, cfg)} - for _, m := range clus.Members { - client, err := NewClientV3(m) - if err != nil { - t.Fatal(err) - } - clus.clients = append(clus.clients, client) - } - clus.Launch(t) - return clus -} - -func (c *clusterV3) Terminate(t *testing.T) { - for _, client := range c.clients { - if err := client.Close(); err != nil { - t.Error(err) - } - } - c.cluster.Terminate(t) -} - -func (c *clusterV3) RandClient() *clientv3.Client { - return c.clients[rand.Intn(len(c.clients))] -} - // TestV3PutOverwrite puts a key with the v3 api to a random cluster member, // overwrites it, then checks that the change was applied. func TestV3PutOverwrite(t *testing.T) { defer testutil.AfterTest(t) - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) kvc := clus.RandClient().KV @@ -115,7 +78,7 @@ func TestV3PutOverwrite(t *testing.T) { func TestV3TxnTooManyOps(t *testing.T) { defer testutil.AfterTest(t) - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) kvc := clus.RandClient().KV @@ -173,7 +136,7 @@ func TestV3TxnTooManyOps(t *testing.T) { // TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails. func TestV3PutMissingLease(t *testing.T) { defer testutil.AfterTest(t) - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) kvc := clus.RandClient().KV @@ -290,7 +253,7 @@ func TestV3DeleteRange(t *testing.T) { } for i, tt := range tests { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) kvc := clus.RandClient().KV ks := tt.keySet @@ -336,7 +299,7 @@ func TestV3DeleteRange(t *testing.T) { // TestV3TxnInvaildRange tests txn func TestV3TxnInvaildRange(t *testing.T) { defer testutil.AfterTest(t) - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) kvc := clus.RandClient().KV @@ -553,7 +516,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { } for i, tt := range tests { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) wAPI := clus.RandClient().Watch ctx, cancel := context.WithCancel(context.Background()) @@ -629,7 +592,7 @@ func TestV3WatchCancelUnsynced(t *testing.T) { } func testV3WatchCancel(t *testing.T, startRev int64) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -697,7 +660,7 @@ func TestV3WatchMultipleWatchersUnsynced(t *testing.T) { // that matches all watchers, and another key that matches only // one watcher to test if it receives expected events. func testV3WatchMultipleWatchers(t *testing.T, startRev int64) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) kvc := clus.RandClient().KV ctx, cancel := context.WithCancel(context.Background()) @@ -799,7 +762,7 @@ func TestV3WatchMultipleEventsTxnUnsynced(t *testing.T) { // testV3WatchMultipleEventsTxn tests Watch APIs when it receives multiple events. func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -882,7 +845,7 @@ func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv. func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { defer testutil.AfterTest(t) - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) kvc := clus.RandClient().KV @@ -971,7 +934,7 @@ func TestV3WatchMultipleStreamsUnsynced(t *testing.T) { // testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams. func testV3WatchMultipleStreams(t *testing.T, startRev int64) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) wAPI := clus.RandClient().Watch kvc := clus.RandClient().KV @@ -1195,7 +1158,7 @@ func TestV3RangeRequest(t *testing.T) { } for i, tt := range tests { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) for _, k := range tt.putKeys { kvc := clus.RandClient().KV req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")} @@ -1239,7 +1202,7 @@ func TestV3RangeRequest(t *testing.T) { // TestV3LeaseRevoke ensures a key is deleted once its lease is revoked. func TestV3LeaseRevoke(t *testing.T) { defer testutil.AfterTest(t) - testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error { + testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { lc := clus.RandClient().Lease _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID}) return err @@ -1249,7 +1212,7 @@ func TestV3LeaseRevoke(t *testing.T) { // TestV3LeaseCreateById ensures leases may be created by a given id. func TestV3LeaseCreateByID(t *testing.T) { defer testutil.AfterTest(t) - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) // create fixed lease @@ -1290,7 +1253,7 @@ func TestV3LeaseCreateByID(t *testing.T) { // TestV3LeaseExpire ensures a key is deleted once a key expires. func TestV3LeaseExpire(t *testing.T) { defer testutil.AfterTest(t) - testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error { + testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { // let lease lapse; wait for deleted key ctx, cancel := context.WithCancel(context.Background()) @@ -1342,7 +1305,7 @@ func TestV3LeaseExpire(t *testing.T) { // TestV3LeaseKeepAlive ensures keepalive keeps the lease alive. func TestV3LeaseKeepAlive(t *testing.T) { defer testutil.AfterTest(t) - testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error { + testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error { lc := clus.RandClient().Lease lreq := &pb.LeaseKeepAliveRequest{ID: leaseID} ctx, cancel := context.WithCancel(context.Background()) @@ -1376,7 +1339,7 @@ func TestV3LeaseKeepAlive(t *testing.T) { // client to confirm it's visible to the whole cluster. func TestV3LeaseExists(t *testing.T) { defer testutil.AfterTest(t) - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) // create lease @@ -1409,7 +1372,7 @@ func TestV3LeaseExists(t *testing.T) { } // acquireLeaseAndKey creates a new lease and creates an attached key. -func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) { +func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) { // create lease lresp, err := clus.RandClient().Lease.LeaseCreate( context.TODO(), @@ -1430,8 +1393,8 @@ func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) { // testLeaseRemoveLeasedKey performs some action while holding a lease with an // attached key "foo", then confirms the key is gone. -func testLeaseRemoveLeasedKey(t *testing.T, act func(*clusterV3, int64) error) { - clus := newClusterV3(t, &clusterConfig{size: 3}) +func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) { + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) leaseID, err := acquireLeaseAndKey(clus, "foo") diff --git a/integration/v3_lock_test.go b/integration/v3_lock_test.go index 5995ddc94..6add54263 100644 --- a/integration/v3_lock_test.go +++ b/integration/v3_lock_test.go @@ -23,13 +23,13 @@ import ( ) func TestMutexSingleNode(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) testMutex(t, 5, func() *clientv3.Client { return clus.clients[0] }) } func TestMutexMultiNode(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) testMutex(t, 5, func() *clientv3.Client { return clus.RandClient() }) } @@ -68,7 +68,7 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) func BenchmarkMutex4Waiters(b *testing.B) { // XXX switch tests to use TB interface - clus := newClusterV3(nil, &clusterConfig{size: 3}) + clus := NewClusterV3(nil, &ClusterConfig{Size: 3}) defer clus.Terminate(nil) for i := 0; i < b.N; i++ { testMutex(nil, 4, func() *clientv3.Client { return clus.RandClient() }) @@ -76,13 +76,13 @@ func BenchmarkMutex4Waiters(b *testing.B) { } func TestRWMutexSingleNode(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) testRWMutex(t, 5, func() *clientv3.Client { return clus.clients[0] }) } func TestRWMutexMultiNode(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) testRWMutex(t, 5, func() *clientv3.Client { return clus.RandClient() }) } diff --git a/integration/v3_queue_test.go b/integration/v3_queue_test.go index ab26173de..19038bdc2 100644 --- a/integration/v3_queue_test.go +++ b/integration/v3_queue_test.go @@ -29,7 +29,7 @@ const ( // TestQueueOneReaderOneWriter confirms the queue is FIFO func TestQueueOneReaderOneWriter(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) done := make(chan struct{}) @@ -75,7 +75,7 @@ func TestQueueManyReaderManyWriter(t *testing.T) { // BenchmarkQueue benchmarks Queues using many/many readers/writers func BenchmarkQueue(b *testing.B) { // XXX switch tests to use TB interface - clus := newClusterV3(nil, &clusterConfig{size: 3}) + clus := NewClusterV3(nil, &ClusterConfig{Size: 3}) defer clus.Terminate(nil) for i := 0; i < b.N; i++ { testQueueNReaderMWriter(nil, manyQueueClients, manyQueueClients) @@ -84,7 +84,7 @@ func BenchmarkQueue(b *testing.B) { // TestPrQueue tests whether priority queues respect priorities. func TestPrQueueOneReaderOneWriter(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) // write out five items with random priority @@ -116,7 +116,7 @@ func TestPrQueueOneReaderOneWriter(t *testing.T) { } func TestPrQueueManyReaderManyWriter(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) rqs := newPriorityQueues(clus, manyQueueClients) wqs := newPriorityQueues(clus, manyQueueClients) @@ -126,7 +126,7 @@ func TestPrQueueManyReaderManyWriter(t *testing.T) { // BenchmarkQueue benchmarks Queues using n/n readers/writers func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) { // XXX switch tests to use TB interface - clus := newClusterV3(nil, &clusterConfig{size: 3}) + clus := NewClusterV3(nil, &ClusterConfig{Size: 3}) defer clus.Terminate(nil) rqs := newPriorityQueues(clus, 1) wqs := newPriorityQueues(clus, 1) @@ -136,12 +136,12 @@ func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) { } func testQueueNReaderMWriter(t *testing.T, n int, m int) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) testReadersWriters(t, newQueues(clus, n), newQueues(clus, m)) } -func newQueues(clus *clusterV3, n int) (qs []testQueue) { +func newQueues(clus *ClusterV3, n int) (qs []testQueue) { for i := 0; i < n; i++ { etcdc := clus.RandClient() qs = append(qs, recipe.NewQueue(etcdc, "q")) @@ -149,7 +149,7 @@ func newQueues(clus *clusterV3, n int) (qs []testQueue) { return qs } -func newPriorityQueues(clus *clusterV3, n int) (qs []testQueue) { +func newPriorityQueues(clus *ClusterV3, n int) (qs []testQueue) { for i := 0; i < n; i++ { etcdc := clus.RandClient() q := &flatPriorityQueue{recipe.NewPriorityQueue(etcdc, "prq")} diff --git a/integration/v3_stm_test.go b/integration/v3_stm_test.go index 511047701..8f6b29557 100644 --- a/integration/v3_stm_test.go +++ b/integration/v3_stm_test.go @@ -24,7 +24,7 @@ import ( // TestSTMConflict tests that conflicts are retried. func TestSTMConflict(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) etcdc := clus.RandClient() @@ -89,7 +89,7 @@ func TestSTMConflict(t *testing.T) { // TestSTMPut confirms a STM put on a new key is visible after commit. func TestSTMPutNewKey(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) etcdc := clus.RandClient() @@ -113,7 +113,7 @@ func TestSTMPutNewKey(t *testing.T) { // TestSTMAbort tests that an aborted txn does not modify any keys. func TestSTMAbort(t *testing.T) { - clus := newClusterV3(t, &clusterConfig{size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) etcdc := clus.RandClient() diff --git a/test b/test index edb213566..401ad71e4 100755 --- a/test +++ b/test @@ -20,7 +20,7 @@ TESTABLE_AND_FORMATTABLE="client clientv3 discovery error etcdctl/command etcdma # TODO: add it to race testing when the issue is resolved # https://github.com/golang/go/issues/9946 NO_RACE_TESTABLE="rafthttp" -FORMATTABLE="$TESTABLE_AND_FORMATTABLE $NO_RACE_TESTABLE *.go etcdctl/ integration e2e" +FORMATTABLE="$TESTABLE_AND_FORMATTABLE $NO_RACE_TESTABLE *.go etcdctl/ integration clientv3/integration e2e" # user has not provided PKG override if [ -z "$PKG" ]; then @@ -60,6 +60,7 @@ function integration_tests { echo "Running integration tests..." go test -timeout 5m -v -cpu 1,2,4 $@ ${REPO_PATH}/e2e go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/integration + go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample }