*: expose integration functions for clientv3

release-2.3
Gyu-Ho Lee 2016-01-28 19:07:35 -08:00
parent 127d717c0a
commit 1767788074
13 changed files with 826 additions and 700 deletions

View File

@ -0,0 +1,63 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"bytes"
"testing"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/pkg/testutil"
)
func TestKVPut(t *testing.T) {
defer testutil.AfterTest(t)
tests := []struct {
key, val string
leaseID lease.LeaseID
}{
{"foo", "bar", lease.NoLease},
// TODO: test with leaseID
}
for i, tt := range tests {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
kv := clientv3.NewKV(clus.RandClient())
if _, err := kv.Put(tt.key, tt.val, tt.leaseID); err != nil {
t.Fatalf("#%d: couldn't put (%v)", i, tt.key, err)
}
resp, err := kv.Get(tt.key, 0)
if err != nil {
t.Fatalf("#%d: couldn't get key (%v)", i, err)
}
if len(resp.Kvs) != 1 {
t.Fatalf("#%d: expected 1 key, got %d", i, len(resp.Kvs))
}
if !bytes.Equal([]byte(tt.val), resp.Kvs[0].Value) {
t.Errorf("#%d: val = %s, want %s", i, tt.val, resp.Kvs[0].Value)
}
if tt.leaseID != lease.LeaseID(resp.Kvs[0].Lease) {
t.Errorf("#%d: val = %d, want %d", i, tt.leaseID, resp.Kvs[0].Lease)
}
}
}

View File

@ -0,0 +1,17 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package integration implements tests built upon embedded etcd, and focuses on
// correctness of etcd client.
package integration

View File

@ -0,0 +1,20 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package integration
import (
"os"
"testing"
"github.com/coreos/etcd/pkg/testutil"
)
func TestMain(m *testing.M) {
v := m.Run()
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
}

674
integration/cluster.go Normal file
View File

@ -0,0 +1,674 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.package recipe
package integration
import (
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"os"
"reflect"
"sort"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/etcdserver/etcdhttp"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
)
const (
tickDuration = 10 * time.Millisecond
clusterName = "etcd"
requestTimeout = 20 * time.Second
)
var (
electionTicks = 10
// integration test uses well-known ports to listen for each running member,
// which ensures restarted member could listen on specific port again.
nextListenPort int64 = 20000
)
type ClusterConfig struct {
Size int
UsePeerTLS bool
DiscoveryURL string
UseV3 bool
UseGRPC bool
}
type cluster struct {
cfg *ClusterConfig
Members []*member
}
func (c *cluster) fillClusterForMembers() error {
if c.cfg.DiscoveryURL != "" {
// cluster will be discovered
return nil
}
addrs := make([]string, 0)
for _, m := range c.Members {
scheme := "http"
if !m.PeerTLSInfo.Empty() {
scheme = "https"
}
for _, l := range m.PeerListeners {
addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
}
}
clusterStr := strings.Join(addrs, ",")
var err error
for _, m := range c.Members {
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
if err != nil {
return err
}
}
return nil
}
func newCluster(t *testing.T, cfg *ClusterConfig) *cluster {
c := &cluster{cfg: cfg}
ms := make([]*member, cfg.Size)
for i := 0; i < cfg.Size; i++ {
ms[i] = c.mustNewMember(t)
}
c.Members = ms
if err := c.fillClusterForMembers(); err != nil {
t.Fatal(err)
}
return c
}
// NewCluster returns an unlaunched cluster of the given size which has been
// set to use static bootstrap.
func NewCluster(t *testing.T, size int) *cluster {
return newCluster(t, &ClusterConfig{Size: size})
}
// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
func NewClusterByConfig(t *testing.T, cfg *ClusterConfig) *cluster {
return newCluster(t, cfg)
}
func (c *cluster) Launch(t *testing.T) {
errc := make(chan error)
for _, m := range c.Members {
// Members are launched in separate goroutines because if they boot
// using discovery url, they have to wait for others to register to continue.
go func(m *member) {
errc <- m.Launch()
}(m)
}
for range c.Members {
if err := <-errc; err != nil {
t.Fatalf("error setting up member: %v", err)
}
}
// wait cluster to be stable to receive future client requests
c.waitMembersMatch(t, c.HTTPMembers())
c.waitVersion()
}
func (c *cluster) URL(i int) string {
return c.Members[i].ClientURLs[0].String()
}
func (c *cluster) URLs() []string {
urls := make([]string, 0)
for _, m := range c.Members {
for _, u := range m.ClientURLs {
urls = append(urls, u.String())
}
}
return urls
}
func (c *cluster) HTTPMembers() []client.Member {
ms := make([]client.Member, len(c.Members))
for i, m := range c.Members {
scheme := "http"
if !m.PeerTLSInfo.Empty() {
scheme = "https"
}
ms[i].Name = m.Name
for _, ln := range m.PeerListeners {
ms[i].PeerURLs = append(ms[i].PeerURLs, scheme+"://"+ln.Addr().String())
}
for _, ln := range m.ClientListeners {
ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String())
}
}
return ms
}
func (c *cluster) mustNewMember(t *testing.T) *member {
name := c.name(rand.Int())
m := mustNewMember(t, name, c.cfg.UsePeerTLS)
m.DiscoveryURL = c.cfg.DiscoveryURL
m.V3demo = c.cfg.UseV3
if c.cfg.UseGRPC {
if err := m.listenGRPC(); err != nil {
t.Fatal(err)
}
}
return m
}
func (c *cluster) addMember(t *testing.T) {
m := c.mustNewMember(t)
scheme := "http"
if c.cfg.UsePeerTLS {
scheme = "https"
}
// send add request to the cluster
cc := mustNewHTTPClient(t, []string{c.URL(0)})
ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
if _, err := ma.Add(ctx, peerURL); err != nil {
t.Fatalf("add member on %s error: %v", c.URL(0), err)
}
cancel()
// wait for the add node entry applied in the cluster
members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
c.waitMembersMatch(t, members)
m.InitialPeerURLsMap = types.URLsMap{}
for _, mm := range c.Members {
m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
}
m.InitialPeerURLsMap[m.Name] = m.PeerURLs
m.NewCluster = false
if err := m.Launch(); err != nil {
t.Fatal(err)
}
c.Members = append(c.Members, m)
// wait cluster to be stable to receive future client requests
c.waitMembersMatch(t, c.HTTPMembers())
}
func (c *cluster) AddMember(t *testing.T) {
c.addMember(t)
}
func (c *cluster) RemoveMember(t *testing.T, id uint64) {
// send remove request to the cluster
cc := mustNewHTTPClient(t, c.URLs())
ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
t.Fatalf("unexpected remove error %v", err)
}
cancel()
newMembers := make([]*member, 0)
for _, m := range c.Members {
if uint64(m.s.ID()) != id {
newMembers = append(newMembers, m)
} else {
select {
case <-m.s.StopNotify():
m.Terminate(t)
// 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
// TODO: remove connection write timeout by selecting on http response closeNotifier
// blocking on https://github.com/golang/go/issues/9524
case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
t.Fatalf("failed to remove member %s in time", m.s.ID())
}
}
}
c.Members = newMembers
c.waitMembersMatch(t, c.HTTPMembers())
}
func (c *cluster) Terminate(t *testing.T) {
for _, m := range c.Members {
m.Terminate(t)
}
}
func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
for _, u := range c.URLs() {
cc := mustNewHTTPClient(t, []string{u})
ma := client.NewMembersAPI(cc)
for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
ms, err := ma.List(ctx)
cancel()
if err == nil && isMembersEqual(ms, membs) {
break
}
time.Sleep(tickDuration)
}
}
return
}
func (c *cluster) waitLeader(t *testing.T, membs []*member) {
possibleLead := make(map[uint64]bool)
var lead uint64
for _, m := range membs {
possibleLead[uint64(m.s.ID())] = true
}
for lead == 0 || !possibleLead[lead] {
lead = 0
for _, m := range membs {
if lead != 0 && lead != m.s.Lead() {
lead = 0
break
}
lead = m.s.Lead()
}
time.Sleep(10 * tickDuration)
}
}
func (c *cluster) waitVersion() {
for _, m := range c.Members {
for {
if m.s.ClusterVersion() != nil {
break
}
time.Sleep(tickDuration)
}
}
}
func (c *cluster) name(i int) string {
return fmt.Sprint("node", i)
}
// isMembersEqual checks whether two members equal except ID field.
// The given wmembs should always set ID field to empty string.
func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
sort.Sort(SortableMemberSliceByPeerURLs(membs))
sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
for i := range membs {
membs[i].ID = ""
}
return reflect.DeepEqual(membs, wmembs)
}
func newLocalListener(t *testing.T) net.Listener {
port := atomic.AddInt64(&nextListenPort, 1)
l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(port, 10))
if err != nil {
t.Fatal(err)
}
return l
}
func newListenerWithAddr(t *testing.T, addr string) net.Listener {
var err error
var l net.Listener
// TODO: we want to reuse a previous closed port immediately.
// a better way is to set SO_REUSExx instead of doing retry.
for i := 0; i < 5; i++ {
l, err = net.Listen("tcp", addr)
if err == nil {
break
}
time.Sleep(500 * time.Millisecond)
}
if err != nil {
t.Fatal(err)
}
return l
}
type member struct {
etcdserver.ServerConfig
PeerListeners, ClientListeners []net.Listener
grpcListener net.Listener
// inited PeerTLSInfo implies to enable peer TLS
PeerTLSInfo transport.TLSInfo
raftHandler *testutil.PauseableHandler
s *etcdserver.EtcdServer
hss []*httptest.Server
grpcServer *grpc.Server
grpcAddr string
}
// mustNewMember return an inited member with the given name. If usePeerTLS is
// true, it will set PeerTLSInfo and use https scheme to communicate between
// peers.
func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
var (
testTLSInfo = transport.TLSInfo{
KeyFile: "./fixtures/server.key.insecure",
CertFile: "./fixtures/server.crt",
TrustedCAFile: "./fixtures/ca.crt",
ClientCertAuth: true,
}
err error
)
m := &member{}
peerScheme := "http"
if usePeerTLS {
peerScheme = "https"
}
pln := newLocalListener(t)
m.PeerListeners = []net.Listener{pln}
m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
if err != nil {
t.Fatal(err)
}
if usePeerTLS {
m.PeerTLSInfo = testTLSInfo
}
cln := newLocalListener(t)
m.ClientListeners = []net.Listener{cln}
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
if err != nil {
t.Fatal(err)
}
m.Name = name
m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
if err != nil {
t.Fatal(err)
}
clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String())
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
if err != nil {
t.Fatal(err)
}
m.InitialClusterToken = clusterName
m.NewCluster = true
m.ServerConfig.PeerTLSInfo = m.PeerTLSInfo
m.ElectionTicks = electionTicks
m.TickMs = uint(tickDuration / time.Millisecond)
return m
}
// startGRPC starts a grpc server over a unix domain socket on the member
func (m *member) listenGRPC() error {
if m.V3demo == false {
return fmt.Errorf("starting grpc server without v3 configured")
}
m.grpcAddr = m.Name + ".sock"
if err := os.RemoveAll(m.grpcAddr); err != nil {
return err
}
l, err := net.Listen("unix", m.grpcAddr)
if err != nil {
return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
}
m.grpcListener = l
return nil
}
// NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) {
if m.grpcAddr == "" {
return nil, fmt.Errorf("member not configured for grpc")
}
f := func(a string, t time.Duration) (net.Conn, error) {
return net.Dial("unix", a)
}
unixdialer := grpc.WithDialer(f)
conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer)
if err != nil {
return nil, err
}
return clientv3.NewFromConn(conn), nil
}
// Clone returns a member with the same server configuration. The returned
// member will not set PeerListeners and ClientListeners.
func (m *member) Clone(t *testing.T) *member {
mm := &member{}
mm.ServerConfig = m.ServerConfig
var err error
clientURLStrs := m.ClientURLs.StringSlice()
mm.ClientURLs, err = types.NewURLs(clientURLStrs)
if err != nil {
// this should never fail
panic(err)
}
peerURLStrs := m.PeerURLs.StringSlice()
mm.PeerURLs, err = types.NewURLs(peerURLStrs)
if err != nil {
// this should never fail
panic(err)
}
clusterStr := m.InitialPeerURLsMap.String()
mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
if err != nil {
// this should never fail
panic(err)
}
mm.InitialClusterToken = m.InitialClusterToken
mm.ElectionTicks = m.ElectionTicks
mm.PeerTLSInfo = m.PeerTLSInfo
return mm
}
// Launch starts a member based on ServerConfig, PeerListeners
// and ClientListeners.
func (m *member) Launch() error {
var err error
if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil {
return fmt.Errorf("failed to initialize the etcd server: %v", err)
}
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{
Listener: ln,
Config: &http.Server{Handler: m.raftHandler},
}
if m.PeerTLSInfo.Empty() {
hs.Start()
} else {
hs.TLS, err = m.PeerTLSInfo.ServerConfig()
if err != nil {
return err
}
hs.StartTLS()
}
m.hss = append(m.hss, hs)
}
for _, ln := range m.ClientListeners {
hs := &httptest.Server{
Listener: ln,
Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
}
hs.Start()
m.hss = append(m.hss, hs)
}
if m.grpcListener != nil {
m.grpcServer = grpc.NewServer()
etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s))
etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s))
etcdserverpb.RegisterLeaseServer(m.grpcServer, v3rpc.NewLeaseServer(m.s))
go m.grpcServer.Serve(m.grpcListener)
}
return nil
}
func (m *member) WaitOK(t *testing.T) {
cc := mustNewHTTPClient(t, []string{m.URL()})
kapi := client.NewKeysAPI(cc)
for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err := kapi.Get(ctx, "/", nil)
if err != nil {
time.Sleep(tickDuration)
continue
}
cancel()
break
}
for m.s.Leader() == 0 {
time.Sleep(tickDuration)
}
}
func (m *member) URL() string { return m.ClientURLs[0].String() }
func (m *member) Pause() {
m.raftHandler.Pause()
m.s.PauseSending()
}
func (m *member) Resume() {
m.raftHandler.Resume()
m.s.ResumeSending()
}
// Close stops the member's etcdserver and closes its connections
func (m *member) Close() {
if m.grpcServer != nil {
m.grpcServer.Stop()
m.grpcServer = nil
}
m.s.Stop()
for _, hs := range m.hss {
hs.CloseClientConnections()
hs.Close()
}
}
// Stop stops the member, but the data dir of the member is preserved.
func (m *member) Stop(t *testing.T) {
m.Close()
m.hss = nil
}
// Restart starts the member using the preserved data dir.
func (m *member) Restart(t *testing.T) error {
newPeerListeners := make([]net.Listener, 0)
for _, ln := range m.PeerListeners {
newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String()))
}
m.PeerListeners = newPeerListeners
newClientListeners := make([]net.Listener, 0)
for _, ln := range m.ClientListeners {
newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String()))
}
m.ClientListeners = newClientListeners
if m.grpcListener != nil {
if err := m.listenGRPC(); err != nil {
t.Fatal(err)
}
}
return m.Launch()
}
// Terminate stops the member and removes the data dir.
func (m *member) Terminate(t *testing.T) {
m.Close()
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
}
func mustNewHTTPClient(t *testing.T, eps []string) client.Client {
cfg := client.Config{Transport: mustNewTransport(t, transport.TLSInfo{}), Endpoints: eps}
c, err := client.New(cfg)
if err != nil {
t.Fatal(err)
}
return c
}
func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport {
// tick in integration test is short, so 1s dial timeout could play well.
tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
if err != nil {
t.Fatal(err)
}
return tr
}
type SortableMemberSliceByPeerURLs []client.Member
func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
return p[i].PeerURLs[0] < p[j].PeerURLs[0]
}
func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
type ClusterV3 struct {
*cluster
clients []*clientv3.Client
}
// NewClusterV3 returns a launched cluster with a grpc client connection
// for each cluster member.
func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
cfg.UseV3 = true
cfg.UseGRPC = true
clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
for _, m := range clus.Members {
client, err := NewClientV3(m)
if err != nil {
t.Fatal(err)
}
clus.clients = append(clus.clients, client)
}
clus.Launch(t)
return clus
}
func (c *ClusterV3) Terminate(t *testing.T) {
for _, client := range c.clients {
if err := client.Close(); err != nil {
t.Error(err)
}
}
c.cluster.Terminate(t)
}
func (c *ClusterV3) RandClient() *clientv3.Client {
return c.clients[rand.Intn(len(c.clients))]
}

View File

@ -16,48 +16,16 @@ package integration
import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"os"
"reflect"
"sort"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/etcdserver/etcdhttp"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
)
const (
tickDuration = 10 * time.Millisecond
clusterName = "etcd"
requestTimeout = 20 * time.Second
)
var (
electionTicks = 10
// integration test uses well-known ports to listen for each running member,
// which ensures restarted member could listen on specific port again.
nextListenPort int64 = 20000
)
func init() {
@ -83,7 +51,7 @@ func testCluster(t *testing.T, size int) {
func TestTLSClusterOf3(t *testing.T) {
defer testutil.AfterTest(t)
c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true})
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true})
c.Launch(t)
defer c.Terminate(t)
clusterMustProgress(t, c.Members)
@ -108,7 +76,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
c := NewClusterByConfig(
t,
&clusterConfig{size: size, discoveryURL: dc.URL(0) + "/v2/keys"},
&ClusterConfig{Size: size, DiscoveryURL: dc.URL(0) + "/v2/keys"},
)
c.Launch(t)
defer c.Terminate(t)
@ -130,10 +98,10 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
cancel()
c := NewClusterByConfig(t,
&clusterConfig{
size: 3,
usePeerTLS: true,
discoveryURL: dc.URL(0) + "/v2/keys"},
&ClusterConfig{
Size: 3,
UsePeerTLS: true,
DiscoveryURL: dc.URL(0) + "/v2/keys"},
)
c.Launch(t)
defer c.Terminate(t)
@ -157,7 +125,7 @@ func testDoubleClusterSize(t *testing.T, size int) {
func TestDoubleTLSClusterSizeOf3(t *testing.T) {
defer testutil.AfterTest(t)
c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true})
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true})
c.Launch(t)
defer c.Terminate(t)
@ -347,583 +315,3 @@ func clusterMustProgress(t *testing.T, membs []*member) {
mcancel()
}
}
type clusterConfig struct {
size int
usePeerTLS bool
discoveryURL string
useV3 bool
useGRPC bool
}
type cluster struct {
cfg *clusterConfig
Members []*member
}
func (c *cluster) fillClusterForMembers() error {
if c.cfg.discoveryURL != "" {
// cluster will be discovered
return nil
}
addrs := make([]string, 0)
for _, m := range c.Members {
scheme := "http"
if !m.PeerTLSInfo.Empty() {
scheme = "https"
}
for _, l := range m.PeerListeners {
addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
}
}
clusterStr := strings.Join(addrs, ",")
var err error
for _, m := range c.Members {
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
if err != nil {
return err
}
}
return nil
}
func newCluster(t *testing.T, cfg *clusterConfig) *cluster {
c := &cluster{cfg: cfg}
ms := make([]*member, cfg.size)
for i := 0; i < cfg.size; i++ {
ms[i] = c.mustNewMember(t)
}
c.Members = ms
if err := c.fillClusterForMembers(); err != nil {
t.Fatal(err)
}
return c
}
// NewCluster returns an unlaunched cluster of the given size which has been
// set to use static bootstrap.
func NewCluster(t *testing.T, size int) *cluster {
return newCluster(t, &clusterConfig{size: size})
}
// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
func NewClusterByConfig(t *testing.T, cfg *clusterConfig) *cluster {
return newCluster(t, cfg)
}
func (c *cluster) Launch(t *testing.T) {
errc := make(chan error)
for _, m := range c.Members {
// Members are launched in separate goroutines because if they boot
// using discovery url, they have to wait for others to register to continue.
go func(m *member) {
errc <- m.Launch()
}(m)
}
for range c.Members {
if err := <-errc; err != nil {
t.Fatalf("error setting up member: %v", err)
}
}
// wait cluster to be stable to receive future client requests
c.waitMembersMatch(t, c.HTTPMembers())
c.waitVersion()
}
func (c *cluster) URL(i int) string {
return c.Members[i].ClientURLs[0].String()
}
func (c *cluster) URLs() []string {
urls := make([]string, 0)
for _, m := range c.Members {
for _, u := range m.ClientURLs {
urls = append(urls, u.String())
}
}
return urls
}
func (c *cluster) HTTPMembers() []client.Member {
ms := make([]client.Member, len(c.Members))
for i, m := range c.Members {
scheme := "http"
if !m.PeerTLSInfo.Empty() {
scheme = "https"
}
ms[i].Name = m.Name
for _, ln := range m.PeerListeners {
ms[i].PeerURLs = append(ms[i].PeerURLs, scheme+"://"+ln.Addr().String())
}
for _, ln := range m.ClientListeners {
ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String())
}
}
return ms
}
func (c *cluster) mustNewMember(t *testing.T) *member {
name := c.name(rand.Int())
m := mustNewMember(t, name, c.cfg.usePeerTLS)
m.DiscoveryURL = c.cfg.discoveryURL
m.V3demo = c.cfg.useV3
if c.cfg.useGRPC {
if err := m.listenGRPC(); err != nil {
t.Fatal(err)
}
}
return m
}
func (c *cluster) addMember(t *testing.T) {
m := c.mustNewMember(t)
scheme := "http"
if c.cfg.usePeerTLS {
scheme = "https"
}
// send add request to the cluster
cc := mustNewHTTPClient(t, []string{c.URL(0)})
ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
if _, err := ma.Add(ctx, peerURL); err != nil {
t.Fatalf("add member on %s error: %v", c.URL(0), err)
}
cancel()
// wait for the add node entry applied in the cluster
members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
c.waitMembersMatch(t, members)
m.InitialPeerURLsMap = types.URLsMap{}
for _, mm := range c.Members {
m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
}
m.InitialPeerURLsMap[m.Name] = m.PeerURLs
m.NewCluster = false
if err := m.Launch(); err != nil {
t.Fatal(err)
}
c.Members = append(c.Members, m)
// wait cluster to be stable to receive future client requests
c.waitMembersMatch(t, c.HTTPMembers())
}
func (c *cluster) AddMember(t *testing.T) {
c.addMember(t)
}
func (c *cluster) RemoveMember(t *testing.T, id uint64) {
// send remove request to the cluster
cc := mustNewHTTPClient(t, c.URLs())
ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
t.Fatalf("unexpected remove error %v", err)
}
cancel()
newMembers := make([]*member, 0)
for _, m := range c.Members {
if uint64(m.s.ID()) != id {
newMembers = append(newMembers, m)
} else {
select {
case <-m.s.StopNotify():
m.Terminate(t)
// 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
// TODO: remove connection write timeout by selecting on http response closeNotifier
// blocking on https://github.com/golang/go/issues/9524
case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
t.Fatalf("failed to remove member %s in time", m.s.ID())
}
}
}
c.Members = newMembers
c.waitMembersMatch(t, c.HTTPMembers())
}
func (c *cluster) Terminate(t *testing.T) {
for _, m := range c.Members {
m.Terminate(t)
}
}
func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
for _, u := range c.URLs() {
cc := mustNewHTTPClient(t, []string{u})
ma := client.NewMembersAPI(cc)
for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
ms, err := ma.List(ctx)
cancel()
if err == nil && isMembersEqual(ms, membs) {
break
}
time.Sleep(tickDuration)
}
}
return
}
func (c *cluster) waitLeader(t *testing.T, membs []*member) {
possibleLead := make(map[uint64]bool)
var lead uint64
for _, m := range membs {
possibleLead[uint64(m.s.ID())] = true
}
for lead == 0 || !possibleLead[lead] {
lead = 0
for _, m := range membs {
if lead != 0 && lead != m.s.Lead() {
lead = 0
break
}
lead = m.s.Lead()
}
time.Sleep(10 * tickDuration)
}
}
func (c *cluster) waitVersion() {
for _, m := range c.Members {
for {
if m.s.ClusterVersion() != nil {
break
}
time.Sleep(tickDuration)
}
}
}
func (c *cluster) name(i int) string {
return fmt.Sprint("node", i)
}
// isMembersEqual checks whether two members equal except ID field.
// The given wmembs should always set ID field to empty string.
func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
sort.Sort(SortableMemberSliceByPeerURLs(membs))
sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
for i := range membs {
membs[i].ID = ""
}
return reflect.DeepEqual(membs, wmembs)
}
func newLocalListener(t *testing.T) net.Listener {
port := atomic.AddInt64(&nextListenPort, 1)
l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(port, 10))
if err != nil {
t.Fatal(err)
}
return l
}
func newListenerWithAddr(t *testing.T, addr string) net.Listener {
var err error
var l net.Listener
// TODO: we want to reuse a previous closed port immediately.
// a better way is to set SO_REUSExx instead of doing retry.
for i := 0; i < 5; i++ {
l, err = net.Listen("tcp", addr)
if err == nil {
break
}
time.Sleep(500 * time.Millisecond)
}
if err != nil {
t.Fatal(err)
}
return l
}
type member struct {
etcdserver.ServerConfig
PeerListeners, ClientListeners []net.Listener
grpcListener net.Listener
// inited PeerTLSInfo implies to enable peer TLS
PeerTLSInfo transport.TLSInfo
raftHandler *testutil.PauseableHandler
s *etcdserver.EtcdServer
hss []*httptest.Server
grpcServer *grpc.Server
grpcAddr string
}
// mustNewMember return an inited member with the given name. If usePeerTLS is
// true, it will set PeerTLSInfo and use https scheme to communicate between
// peers.
func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
var (
testTLSInfo = transport.TLSInfo{
KeyFile: "./fixtures/server.key.insecure",
CertFile: "./fixtures/server.crt",
TrustedCAFile: "./fixtures/ca.crt",
ClientCertAuth: true,
}
err error
)
m := &member{}
peerScheme := "http"
if usePeerTLS {
peerScheme = "https"
}
pln := newLocalListener(t)
m.PeerListeners = []net.Listener{pln}
m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
if err != nil {
t.Fatal(err)
}
if usePeerTLS {
m.PeerTLSInfo = testTLSInfo
}
cln := newLocalListener(t)
m.ClientListeners = []net.Listener{cln}
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
if err != nil {
t.Fatal(err)
}
m.Name = name
m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
if err != nil {
t.Fatal(err)
}
clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String())
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
if err != nil {
t.Fatal(err)
}
m.InitialClusterToken = clusterName
m.NewCluster = true
m.ServerConfig.PeerTLSInfo = m.PeerTLSInfo
m.ElectionTicks = electionTicks
m.TickMs = uint(tickDuration / time.Millisecond)
return m
}
// startGRPC starts a grpc server over a unix domain socket on the member
func (m *member) listenGRPC() error {
if m.V3demo == false {
return fmt.Errorf("starting grpc server without v3 configured")
}
m.grpcAddr = m.Name + ".sock"
if err := os.RemoveAll(m.grpcAddr); err != nil {
return err
}
l, err := net.Listen("unix", m.grpcAddr)
if err != nil {
return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
}
m.grpcListener = l
return nil
}
// NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) {
if m.grpcAddr == "" {
return nil, fmt.Errorf("member not configured for grpc")
}
f := func(a string, t time.Duration) (net.Conn, error) {
return net.Dial("unix", a)
}
unixdialer := grpc.WithDialer(f)
conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer)
if err != nil {
return nil, err
}
return clientv3.NewFromConn(conn), nil
}
// Clone returns a member with the same server configuration. The returned
// member will not set PeerListeners and ClientListeners.
func (m *member) Clone(t *testing.T) *member {
mm := &member{}
mm.ServerConfig = m.ServerConfig
var err error
clientURLStrs := m.ClientURLs.StringSlice()
mm.ClientURLs, err = types.NewURLs(clientURLStrs)
if err != nil {
// this should never fail
panic(err)
}
peerURLStrs := m.PeerURLs.StringSlice()
mm.PeerURLs, err = types.NewURLs(peerURLStrs)
if err != nil {
// this should never fail
panic(err)
}
clusterStr := m.InitialPeerURLsMap.String()
mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
if err != nil {
// this should never fail
panic(err)
}
mm.InitialClusterToken = m.InitialClusterToken
mm.ElectionTicks = m.ElectionTicks
mm.PeerTLSInfo = m.PeerTLSInfo
return mm
}
// Launch starts a member based on ServerConfig, PeerListeners
// and ClientListeners.
func (m *member) Launch() error {
var err error
if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil {
return fmt.Errorf("failed to initialize the etcd server: %v", err)
}
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{
Listener: ln,
Config: &http.Server{Handler: m.raftHandler},
}
if m.PeerTLSInfo.Empty() {
hs.Start()
} else {
hs.TLS, err = m.PeerTLSInfo.ServerConfig()
if err != nil {
return err
}
hs.StartTLS()
}
m.hss = append(m.hss, hs)
}
for _, ln := range m.ClientListeners {
hs := &httptest.Server{
Listener: ln,
Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
}
hs.Start()
m.hss = append(m.hss, hs)
}
if m.grpcListener != nil {
m.grpcServer = grpc.NewServer()
etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s))
etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s))
etcdserverpb.RegisterLeaseServer(m.grpcServer, v3rpc.NewLeaseServer(m.s))
go m.grpcServer.Serve(m.grpcListener)
}
return nil
}
func (m *member) WaitOK(t *testing.T) {
cc := mustNewHTTPClient(t, []string{m.URL()})
kapi := client.NewKeysAPI(cc)
for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err := kapi.Get(ctx, "/", nil)
if err != nil {
time.Sleep(tickDuration)
continue
}
cancel()
break
}
for m.s.Leader() == 0 {
time.Sleep(tickDuration)
}
}
func (m *member) URL() string { return m.ClientURLs[0].String() }
func (m *member) Pause() {
m.raftHandler.Pause()
m.s.PauseSending()
}
func (m *member) Resume() {
m.raftHandler.Resume()
m.s.ResumeSending()
}
// Close stops the member's etcdserver and closes its connections
func (m *member) Close() {
if m.grpcServer != nil {
m.grpcServer.Stop()
m.grpcServer = nil
}
m.s.Stop()
for _, hs := range m.hss {
hs.CloseClientConnections()
hs.Close()
}
}
// Stop stops the member, but the data dir of the member is preserved.
func (m *member) Stop(t *testing.T) {
m.Close()
m.hss = nil
}
// Restart starts the member using the preserved data dir.
func (m *member) Restart(t *testing.T) error {
newPeerListeners := make([]net.Listener, 0)
for _, ln := range m.PeerListeners {
newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String()))
}
m.PeerListeners = newPeerListeners
newClientListeners := make([]net.Listener, 0)
for _, ln := range m.ClientListeners {
newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String()))
}
m.ClientListeners = newClientListeners
if m.grpcListener != nil {
if err := m.listenGRPC(); err != nil {
t.Fatal(err)
}
}
return m.Launch()
}
// Terminate stops the member and removes the data dir.
func (m *member) Terminate(t *testing.T) {
m.Close()
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
}
func mustNewHTTPClient(t *testing.T, eps []string) client.Client {
cfg := client.Config{Transport: mustNewTransport(t, transport.TLSInfo{}), Endpoints: eps}
c, err := client.New(cfg)
if err != nil {
t.Fatal(err)
}
return c
}
func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport {
// tick in integration test is short, so 1s dial timeout could play well.
tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
if err != nil {
t.Fatal(err)
}
return tr
}
type SortableMemberSliceByPeerURLs []client.Member
func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
return p[i].PeerURLs[0] < p[j].PeerURLs[0]
}
func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@ -24,14 +24,14 @@ import (
func TestBarrierSingleNode(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
testBarrier(t, 5, func() *clientv3.Client { return clus.clients[0] })
}
func TestBarrierMultiNode(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
testBarrier(t, 5, func() *clientv3.Client { return clus.RandClient() })
}

View File

@ -21,7 +21,7 @@ import (
)
func TestDoubleBarrier(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
defer closeSessionLease(clus)
@ -82,7 +82,7 @@ func TestDoubleBarrier(t *testing.T) {
}
func TestDoubleBarrierFailover(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
defer closeSessionLease(clus)
@ -122,7 +122,7 @@ func TestDoubleBarrierFailover(t *testing.T) {
}
}
func closeSessionLease(clus *clusterV3) {
func closeSessionLease(clus *ClusterV3) {
for _, client := range clus.clients {
recipe.StopSessionLease(client)
}

View File

@ -23,7 +23,7 @@ import (
// TestElectionWait tests if followers can correcty wait for elections.
func TestElectionWait(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
defer closeSessionLease(clus)
@ -86,7 +86,7 @@ func TestElectionWait(t *testing.T) {
// TestElectionFailover tests that an election will
func TestElectionFailover(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
defer closeSessionLease(clus)

View File

@ -16,7 +16,6 @@ package integration
import (
"bytes"
"fmt"
"math/rand"
"reflect"
"sort"
"sync"
@ -24,7 +23,6 @@ import (
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
@ -32,46 +30,11 @@ import (
"github.com/coreos/etcd/storage/storagepb"
)
type clusterV3 struct {
*cluster
clients []*clientv3.Client
}
// newClusterV3 returns a launched cluster with a grpc client connection
// for each cluster member.
func newClusterV3(t *testing.T, cfg *clusterConfig) *clusterV3 {
cfg.useV3 = true
cfg.useGRPC = true
clus := &clusterV3{cluster: NewClusterByConfig(t, cfg)}
for _, m := range clus.Members {
client, err := NewClientV3(m)
if err != nil {
t.Fatal(err)
}
clus.clients = append(clus.clients, client)
}
clus.Launch(t)
return clus
}
func (c *clusterV3) Terminate(t *testing.T) {
for _, client := range c.clients {
if err := client.Close(); err != nil {
t.Error(err)
}
}
c.cluster.Terminate(t)
}
func (c *clusterV3) RandClient() *clientv3.Client {
return c.clients[rand.Intn(len(c.clients))]
}
// TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
// overwrites it, then checks that the change was applied.
func TestV3PutOverwrite(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
kvc := clus.RandClient().KV
@ -115,7 +78,7 @@ func TestV3PutOverwrite(t *testing.T) {
func TestV3TxnTooManyOps(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
kvc := clus.RandClient().KV
@ -173,7 +136,7 @@ func TestV3TxnTooManyOps(t *testing.T) {
// TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
func TestV3PutMissingLease(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
kvc := clus.RandClient().KV
@ -290,7 +253,7 @@ func TestV3DeleteRange(t *testing.T) {
}
for i, tt := range tests {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
kvc := clus.RandClient().KV
ks := tt.keySet
@ -336,7 +299,7 @@ func TestV3DeleteRange(t *testing.T) {
// TestV3TxnInvaildRange tests txn
func TestV3TxnInvaildRange(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
kvc := clus.RandClient().KV
@ -553,7 +516,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
}
for i, tt := range tests {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
wAPI := clus.RandClient().Watch
ctx, cancel := context.WithCancel(context.Background())
@ -629,7 +592,7 @@ func TestV3WatchCancelUnsynced(t *testing.T) {
}
func testV3WatchCancel(t *testing.T, startRev int64) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -697,7 +660,7 @@ func TestV3WatchMultipleWatchersUnsynced(t *testing.T) {
// that matches all watchers, and another key that matches only
// one watcher to test if it receives expected events.
func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
kvc := clus.RandClient().KV
ctx, cancel := context.WithCancel(context.Background())
@ -799,7 +762,7 @@ func TestV3WatchMultipleEventsTxnUnsynced(t *testing.T) {
// testV3WatchMultipleEventsTxn tests Watch APIs when it receives multiple events.
func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -882,7 +845,7 @@ func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.
func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
kvc := clus.RandClient().KV
@ -971,7 +934,7 @@ func TestV3WatchMultipleStreamsUnsynced(t *testing.T) {
// testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
wAPI := clus.RandClient().Watch
kvc := clus.RandClient().KV
@ -1195,7 +1158,7 @@ func TestV3RangeRequest(t *testing.T) {
}
for i, tt := range tests {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
for _, k := range tt.putKeys {
kvc := clus.RandClient().KV
req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
@ -1239,7 +1202,7 @@ func TestV3RangeRequest(t *testing.T) {
// TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
func TestV3LeaseRevoke(t *testing.T) {
defer testutil.AfterTest(t)
testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
lc := clus.RandClient().Lease
_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
return err
@ -1249,7 +1212,7 @@ func TestV3LeaseRevoke(t *testing.T) {
// TestV3LeaseCreateById ensures leases may be created by a given id.
func TestV3LeaseCreateByID(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
// create fixed lease
@ -1290,7 +1253,7 @@ func TestV3LeaseCreateByID(t *testing.T) {
// TestV3LeaseExpire ensures a key is deleted once a key expires.
func TestV3LeaseExpire(t *testing.T) {
defer testutil.AfterTest(t)
testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
// let lease lapse; wait for deleted key
ctx, cancel := context.WithCancel(context.Background())
@ -1342,7 +1305,7 @@ func TestV3LeaseExpire(t *testing.T) {
// TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
func TestV3LeaseKeepAlive(t *testing.T) {
defer testutil.AfterTest(t)
testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
lc := clus.RandClient().Lease
lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
ctx, cancel := context.WithCancel(context.Background())
@ -1376,7 +1339,7 @@ func TestV3LeaseKeepAlive(t *testing.T) {
// client to confirm it's visible to the whole cluster.
func TestV3LeaseExists(t *testing.T) {
defer testutil.AfterTest(t)
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
// create lease
@ -1409,7 +1372,7 @@ func TestV3LeaseExists(t *testing.T) {
}
// acquireLeaseAndKey creates a new lease and creates an attached key.
func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
// create lease
lresp, err := clus.RandClient().Lease.LeaseCreate(
context.TODO(),
@ -1430,8 +1393,8 @@ func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
// testLeaseRemoveLeasedKey performs some action while holding a lease with an
// attached key "foo", then confirms the key is gone.
func testLeaseRemoveLeasedKey(t *testing.T, act func(*clusterV3, int64) error) {
clus := newClusterV3(t, &clusterConfig{size: 3})
func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
leaseID, err := acquireLeaseAndKey(clus, "foo")

View File

@ -23,13 +23,13 @@ import (
)
func TestMutexSingleNode(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
testMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
}
func TestMutexMultiNode(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
testMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
}
@ -68,7 +68,7 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
func BenchmarkMutex4Waiters(b *testing.B) {
// XXX switch tests to use TB interface
clus := newClusterV3(nil, &clusterConfig{size: 3})
clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
defer clus.Terminate(nil)
for i := 0; i < b.N; i++ {
testMutex(nil, 4, func() *clientv3.Client { return clus.RandClient() })
@ -76,13 +76,13 @@ func BenchmarkMutex4Waiters(b *testing.B) {
}
func TestRWMutexSingleNode(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
testRWMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
}
func TestRWMutexMultiNode(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
testRWMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
}

View File

@ -29,7 +29,7 @@ const (
// TestQueueOneReaderOneWriter confirms the queue is FIFO
func TestQueueOneReaderOneWriter(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 1})
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
done := make(chan struct{})
@ -75,7 +75,7 @@ func TestQueueManyReaderManyWriter(t *testing.T) {
// BenchmarkQueue benchmarks Queues using many/many readers/writers
func BenchmarkQueue(b *testing.B) {
// XXX switch tests to use TB interface
clus := newClusterV3(nil, &clusterConfig{size: 3})
clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
defer clus.Terminate(nil)
for i := 0; i < b.N; i++ {
testQueueNReaderMWriter(nil, manyQueueClients, manyQueueClients)
@ -84,7 +84,7 @@ func BenchmarkQueue(b *testing.B) {
// TestPrQueue tests whether priority queues respect priorities.
func TestPrQueueOneReaderOneWriter(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 1})
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
// write out five items with random priority
@ -116,7 +116,7 @@ func TestPrQueueOneReaderOneWriter(t *testing.T) {
}
func TestPrQueueManyReaderManyWriter(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
rqs := newPriorityQueues(clus, manyQueueClients)
wqs := newPriorityQueues(clus, manyQueueClients)
@ -126,7 +126,7 @@ func TestPrQueueManyReaderManyWriter(t *testing.T) {
// BenchmarkQueue benchmarks Queues using n/n readers/writers
func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
// XXX switch tests to use TB interface
clus := newClusterV3(nil, &clusterConfig{size: 3})
clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
defer clus.Terminate(nil)
rqs := newPriorityQueues(clus, 1)
wqs := newPriorityQueues(clus, 1)
@ -136,12 +136,12 @@ func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
}
func testQueueNReaderMWriter(t *testing.T, n int, m int) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
testReadersWriters(t, newQueues(clus, n), newQueues(clus, m))
}
func newQueues(clus *clusterV3, n int) (qs []testQueue) {
func newQueues(clus *ClusterV3, n int) (qs []testQueue) {
for i := 0; i < n; i++ {
etcdc := clus.RandClient()
qs = append(qs, recipe.NewQueue(etcdc, "q"))
@ -149,7 +149,7 @@ func newQueues(clus *clusterV3, n int) (qs []testQueue) {
return qs
}
func newPriorityQueues(clus *clusterV3, n int) (qs []testQueue) {
func newPriorityQueues(clus *ClusterV3, n int) (qs []testQueue) {
for i := 0; i < n; i++ {
etcdc := clus.RandClient()
q := &flatPriorityQueue{recipe.NewPriorityQueue(etcdc, "prq")}

View File

@ -24,7 +24,7 @@ import (
// TestSTMConflict tests that conflicts are retried.
func TestSTMConflict(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
etcdc := clus.RandClient()
@ -89,7 +89,7 @@ func TestSTMConflict(t *testing.T) {
// TestSTMPut confirms a STM put on a new key is visible after commit.
func TestSTMPutNewKey(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 1})
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
etcdc := clus.RandClient()
@ -113,7 +113,7 @@ func TestSTMPutNewKey(t *testing.T) {
// TestSTMAbort tests that an aborted txn does not modify any keys.
func TestSTMAbort(t *testing.T) {
clus := newClusterV3(t, &clusterConfig{size: 1})
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
etcdc := clus.RandClient()

3
test
View File

@ -20,7 +20,7 @@ TESTABLE_AND_FORMATTABLE="client clientv3 discovery error etcdctl/command etcdma
# TODO: add it to race testing when the issue is resolved
# https://github.com/golang/go/issues/9946
NO_RACE_TESTABLE="rafthttp"
FORMATTABLE="$TESTABLE_AND_FORMATTABLE $NO_RACE_TESTABLE *.go etcdctl/ integration e2e"
FORMATTABLE="$TESTABLE_AND_FORMATTABLE $NO_RACE_TESTABLE *.go etcdctl/ integration clientv3/integration e2e"
# user has not provided PKG override
if [ -z "$PKG" ]; then
@ -60,6 +60,7 @@ function integration_tests {
echo "Running integration tests..."
go test -timeout 5m -v -cpu 1,2,4 $@ ${REPO_PATH}/e2e
go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/integration
go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
}