etcd-tester: refactor cluster member handling

release-3.1
Anthony Romano 2016-06-28 15:51:25 -07:00
parent 402df5bd03
commit 5f459a64ce
3 changed files with 236 additions and 182 deletions

View File

@ -22,10 +22,7 @@ import (
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
clientv2 "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
)
@ -35,17 +32,14 @@ const peerURLPort = 2380
type cluster struct {
v2Only bool // to be deprecated
agentEndpoints []string
datadir string
stressKeySize int
stressKeySuffixRange int
Size int
Agents []client.Agent
Stressers []Stresser
Names []string
GRPCURLs []string
ClientURLs []string
Size int
Stressers []Stresser
Members []*member
}
type ClusterStatus struct {
@ -56,68 +50,53 @@ type ClusterStatus struct {
func newCluster(agentEndpoints []string, datadir string, stressKeySize, stressKeySuffixRange int, isV2Only bool) (*cluster, error) {
c := &cluster{
v2Only: isV2Only,
agentEndpoints: agentEndpoints,
datadir: datadir,
stressKeySize: stressKeySize,
stressKeySuffixRange: stressKeySuffixRange,
}
if err := c.Bootstrap(); err != nil {
if err := c.bootstrap(agentEndpoints); err != nil {
return nil, err
}
return c, nil
}
func (c *cluster) Bootstrap() error {
size := len(c.agentEndpoints)
func (c *cluster) bootstrap(agentEndpoints []string) error {
size := len(agentEndpoints)
agents := make([]client.Agent, size)
names := make([]string, size)
grpcURLs := make([]string, size)
clientURLs := make([]string, size)
peerURLs := make([]string, size)
members := make([]string, size)
for i, u := range c.agentEndpoints {
var err error
agents[i], err = client.NewAgent(u)
members := make([]*member, size)
memberNameURLs := make([]string, size)
for i, u := range agentEndpoints {
agent, err := client.NewAgent(u)
if err != nil {
return err
}
names[i] = fmt.Sprintf("etcd-%d", i)
host, _, err := net.SplitHostPort(u)
if err != nil {
return err
}
grpcURLs[i] = fmt.Sprintf("%s:2379", host)
clientURLs[i] = fmt.Sprintf("http://%s:2379", host)
peerURLs[i] = fmt.Sprintf("http://%s:%d", host, peerURLPort)
members[i] = fmt.Sprintf("%s=%s", names[i], peerURLs[i])
members[i] = &member{
Agent: agent,
Endpoint: u,
Name: fmt.Sprintf("etcd-%d", i),
ClientURL: fmt.Sprintf("http://%s:2379", host),
PeerURL: fmt.Sprintf("http://%s:%d", host, peerURLPort),
}
memberNameURLs[i] = members[i].ClusterEntry()
}
clusterStr := strings.Join(members, ",")
clusterStr := strings.Join(memberNameURLs, ",")
token := fmt.Sprint(rand.Int())
for i, a := range agents {
flags := []string{
"--name", names[i],
for i, m := range members {
flags := append(
m.Flags(),
"--data-dir", c.datadir,
"--listen-client-urls", clientURLs[i],
"--advertise-client-urls", clientURLs[i],
"--listen-peer-urls", peerURLs[i],
"--initial-advertise-peer-urls", peerURLs[i],
"--initial-cluster-token", token,
"--initial-cluster", clusterStr,
"--initial-cluster-state", "new",
}
"--initial-cluster", clusterStr)
if _, err := a.Start(flags...); err != nil {
if _, err := m.Agent.Start(flags...); err != nil {
// cleanup
for j := 0; j < i; j++ {
agents[j].Terminate()
for _, m := range members[:i] {
m.Agent.Terminate()
}
return err
}
@ -126,52 +105,55 @@ func (c *cluster) Bootstrap() error {
// TODO: Too intensive stressers can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
stressN := 100
var stressers []Stresser
if c.v2Only {
for _, u := range clientURLs {
s := &stresserV2{
Endpoint: u,
c.Stressers = make([]Stresser, len(members))
for i, m := range members {
if c.v2Only {
c.Stressers[i] = &stresserV2{
Endpoint: m.ClientURL,
KeySize: c.stressKeySize,
KeySuffixRange: c.stressKeySuffixRange,
N: stressN,
}
go s.Stress()
stressers = append(stressers, s)
}
} else {
for _, u := range grpcURLs {
s := &stresser{
Endpoint: u,
} else {
c.Stressers[i] = &stresser{
Endpoint: m.grpcAddr(),
KeySize: c.stressKeySize,
KeySuffixRange: c.stressKeySuffixRange,
N: stressN,
}
go s.Stress()
stressers = append(stressers, s)
}
go c.Stressers[i].Stress()
}
c.Size = size
c.Agents = agents
c.Stressers = stressers
c.Names = names
c.GRPCURLs = grpcURLs
c.ClientURLs = clientURLs
c.Members = members
return nil
}
func (c *cluster) Reset() error {
eps := make([]string, len(c.Members))
for i, m := range c.Members {
eps[i] = m.Endpoint
}
return c.bootstrap(eps)
}
func (c *cluster) WaitHealth() error {
var err error
// wait 60s to check cluster health.
// TODO: set it to a reasonable value. It is set that high because
// follower may use long time to catch up the leader when reboot under
// reasonable workload (https://github.com/coreos/etcd/issues/2698)
healthFunc, urls := setHealthKey, c.GRPCURLs
healthFunc := func(m *member) error { return m.SetHealthKeyV3() }
if c.v2Only {
healthFunc, urls = setHealthKeyV2, c.ClientURLs
healthFunc = func(m *member) error { return m.SetHealthKeyV2() }
}
for i := 0; i < 60; i++ {
err = healthFunc(urls)
for _, m := range c.Members {
if err = healthFunc(m); err != nil {
break
}
}
if err == nil {
return nil
}
@ -186,27 +168,12 @@ func (c *cluster) GetLeader() (int, error) {
if c.v2Only {
return 0, nil
}
for i, ep := range c.GRPCURLs {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
DialTimeout: 5 * time.Second,
})
if err != nil {
return 0, err
}
defer cli.Close()
mapi := clientv3.NewMaintenance(cli)
resp, err := mapi.Status(context.Background(), ep)
if err != nil {
return 0, err
}
if resp.Header.MemberId == resp.Leader {
return i, nil
for i, m := range c.Members {
isLeader, err := m.IsLeader()
if isLeader || err != nil {
return i, err
}
}
return 0, fmt.Errorf("no leader found")
}
@ -221,8 +188,8 @@ func (c *cluster) Report() (success, failure int) {
func (c *cluster) Cleanup() error {
var lasterr error
for _, a := range c.Agents {
if err := a.Cleanup(); err != nil {
for _, m := range c.Members {
if err := m.Agent.Cleanup(); err != nil {
lasterr = err
}
}
@ -233,8 +200,8 @@ func (c *cluster) Cleanup() error {
}
func (c *cluster) Terminate() {
for _, a := range c.Agents {
a.Terminate()
for _, m := range c.Members {
m.Agent.Terminate()
}
for _, s := range c.Stressers {
s.Cancel()
@ -246,10 +213,10 @@ func (c *cluster) Status() ClusterStatus {
AgentStatuses: make(map[string]client.Status),
}
for i, a := range c.Agents {
s, err := a.Status()
for _, m := range c.Members {
s, err := m.Agent.Status()
// TODO: add a.Desc() as a key of the map
desc := c.agentEndpoints[i]
desc := m.Endpoint
if err != nil {
cs.AgentStatuses[desc] = client.Status{State: "unknown"}
plog.Printf("failed to get the status of agent [%s]", desc)
@ -259,64 +226,16 @@ func (c *cluster) Status() ClusterStatus {
return cs
}
// setHealthKey sets health key on all given urls.
func setHealthKey(us []string) error {
for _, u := range us {
conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
if err != nil {
return fmt.Errorf("%v (%s)", err, u)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
kvc := pb.NewKVClient(conn)
_, err = kvc.Put(ctx, &pb.PutRequest{Key: []byte("health"), Value: []byte("good")})
cancel()
conn.Close()
if err != nil {
return fmt.Errorf("%v (%s)", err, u)
}
}
return nil
}
// setHealthKeyV2 sets health key on all given urls.
func setHealthKeyV2(us []string) error {
for _, u := range us {
cfg := clientv2.Config{
Endpoints: []string{u},
}
c, err := clientv2.New(cfg)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
kapi := clientv2.NewKeysAPI(c)
_, err = kapi.Set(ctx, "health", "good", nil)
cancel()
if err != nil {
return err
}
}
return nil
}
func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
revs := make(map[string]int64)
hashes := make(map[string]int64)
for _, u := range c.GRPCURLs {
conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
for _, m := range c.Members {
rev, hash, err := m.RevHash()
if err != nil {
return nil, nil, err
}
m := pb.NewMaintenanceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := m.Hash(ctx, &pb.HashRequest{})
cancel()
conn.Close()
if err != nil {
return nil, nil, err
}
revs[u] = resp.Header.Revision
hashes[u] = int64(resp.Hash)
revs[m.ClientURL] = rev
hashes[m.ClientURL] = hash
}
return revs, hashes, nil
}
@ -326,8 +245,9 @@ func (c *cluster) compactKV(rev int64, timeout time.Duration) (err error) {
return nil
}
for i, u := range c.GRPCURLs {
conn, derr := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
for i, m := range c.Members {
u := m.ClientURL
conn, derr := m.dialGRPC()
if derr != nil {
plog.Printf("[compact kv #%d] dial error %v (endpoint %s)", i, derr, u)
err = derr
@ -360,45 +280,19 @@ func (c *cluster) checkCompact(rev int64) error {
if rev == 0 {
return nil
}
for _, u := range c.GRPCURLs {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{u},
DialTimeout: 5 * time.Second,
})
if err != nil {
return fmt.Errorf("%v (endpoint %s)", err, u)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
wr, ok := <-wch
cancel()
cli.Close()
if !ok {
return fmt.Errorf("watch channel terminated (endpoint %s)", u)
}
if wr.CompactRevision != rev {
return fmt.Errorf("got compact revision %v, wanted %v (endpoint %s)", wr.CompactRevision, rev, u)
for _, m := range c.Members {
if err := m.CheckCompact(rev); err != nil {
return err
}
}
return nil
}
func (c *cluster) defrag() error {
for _, u := range c.GRPCURLs {
plog.Printf("defragmenting %s\n", u)
conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
if err != nil {
for _, m := range c.Members {
if err := m.Defrag(); err != nil {
return err
}
mt := pb.NewMaintenanceClient(conn)
if _, err = mt.Defragment(context.Background(), &pb.DefragmentRequest{}); err != nil {
return err
}
conn.Close()
plog.Printf("defragmented %s\n", u)
}
return nil
}

View File

@ -0,0 +1,160 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"net/url"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
clientv2 "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
)
type member struct {
Agent client.Agent
Endpoint string
Name string
ClientURL string
PeerURL string
}
func (m *member) ClusterEntry() string { return m.Name + "=" + m.PeerURL }
func (m *member) Flags() []string {
return []string{
"--name", m.Name,
"--listen-client-urls", m.ClientURL,
"--advertise-client-urls", m.ClientURL,
"--listen-peer-urls", m.PeerURL,
"--initial-advertise-peer-urls", m.PeerURL,
"--initial-cluster-state", "new",
}
}
func (m *member) CheckCompact(rev int64) error {
cli, err := m.newClientV3()
if err != nil {
return fmt.Errorf("%v (endpoint %s)", err, m.ClientURL)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
wr, ok := <-wch
cancel()
if !ok {
return fmt.Errorf("watch channel terminated (endpoint %s)", m.ClientURL)
}
if wr.CompactRevision != rev {
return fmt.Errorf("got compact revision %v, wanted %v (endpoint %s)", wr.CompactRevision, rev, m.ClientURL)
}
return nil
}
func (m *member) Defrag() error {
plog.Printf("defragmenting %s\n", m.ClientURL)
cli, err := m.newClientV3()
if err != nil {
return err
}
defer cli.Close()
if _, err = cli.Defragment(context.Background(), m.ClientURL); err != nil {
return err
}
plog.Printf("defragmented %s\n", m.ClientURL)
return nil
}
func (m *member) RevHash() (int64, int64, error) {
conn, err := m.dialGRPC()
if err != nil {
return 0, 0, err
}
mt := pb.NewMaintenanceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := mt.Hash(ctx, &pb.HashRequest{})
cancel()
conn.Close()
return resp.Header.Revision, int64(resp.Hash), nil
}
func (m *member) IsLeader() (bool, error) {
cli, err := m.newClientV3()
if err != nil {
return false, err
}
defer cli.Close()
resp, err := cli.Status(context.Background(), m.ClientURL)
if err != nil {
return false, err
}
return resp.Header.MemberId == resp.Leader, nil
}
func (m *member) SetHealthKeyV3() error {
cli, err := m.newClientV3()
if err != nil {
return fmt.Errorf("%v (%s)", err, m.ClientURL)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "health", "good")
cancel()
if err != nil {
return fmt.Errorf("%v (%s)", err, m.ClientURL)
}
return nil
}
func (m *member) SetHealthKeyV2() error {
cfg := clientv2.Config{Endpoints: []string{m.ClientURL}}
c, err := clientv2.New(cfg)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
kapi := clientv2.NewKeysAPI(c)
_, err = kapi.Set(ctx, "health", "good", nil)
cancel()
return err
}
func (m *member) newClientV3() (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Endpoints: []string{m.ClientURL},
DialTimeout: 5 * time.Second,
})
}
func (m *member) dialGRPC() (*grpc.ClientConn, error) {
return grpc.Dial(m.grpcAddr(), grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
}
// grpcAddr gets the host from clientURL so it works with grpc.Dial()
func (m *member) grpcAddr() string {
u, err := url.Parse(m.ClientURL)
if err != nil {
panic(err)
}
return u.Host
}

View File

@ -244,7 +244,7 @@ func (tt *tester) cleanup() error {
return err
}
if err := tt.cluster.Bootstrap(); err != nil {
if err := tt.cluster.Reset(); err != nil {
plog.Warningf("%s cleanup Bootstrap error: %v", tt.logPrefix(), err)
return err
}