Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
121edf0467 | ||
![]() |
b5abfe1858 | ||
![]() |
33633da64c | ||
![]() |
e08abbeae4 | ||
![]() |
bdc3ed1970 | ||
![]() |
1b3ac99e8a | ||
![]() |
fd4595aa04 | ||
![]() |
e5f63b64c3 | ||
![]() |
68d27b2d84 | ||
![]() |
7c4274be05 |
@@ -16,6 +16,7 @@ package clientv3
|
||||
|
||||
import (
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
@@ -65,6 +66,11 @@ func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
|
||||
}
|
||||
|
||||
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
|
||||
// fail-fast before panic in rafthttp
|
||||
if _, err := types.NewURLs(peerAddrs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
|
||||
resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
|
||||
if err != nil {
|
||||
@@ -83,6 +89,11 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
|
||||
}
|
||||
|
||||
func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
|
||||
// fail-fast before panic in rafthttp
|
||||
if _, err := types.NewURLs(peerAddrs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// it is safe to retry on update.
|
||||
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
|
||||
resp, err := c.remote.MemberUpdate(ctx, r, c.callOpts...)
|
||||
|
@@ -127,3 +127,36 @@ func TestMemberUpdate(t *testing.T) {
|
||||
t.Errorf("urls = %v, want %v", urls, resp.Members[0].PeerURLs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemberAddUpdateWrongURLs(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
capi := clus.RandClient()
|
||||
tt := [][]string{
|
||||
// missing protocol scheme
|
||||
{"://127.0.0.1:2379"},
|
||||
// unsupported scheme
|
||||
{"mailto://127.0.0.1:2379"},
|
||||
// not conform to host:port
|
||||
{"http://127.0.0.1"},
|
||||
// contain a path
|
||||
{"http://127.0.0.1:2379/path"},
|
||||
// first path segment in URL cannot contain colon
|
||||
{"127.0.0.1:1234"},
|
||||
// URL scheme must be http, https, unix, or unixs
|
||||
{"localhost:1234"},
|
||||
}
|
||||
for i := range tt {
|
||||
_, err := capi.MemberAdd(context.Background(), tt[i])
|
||||
if err == nil {
|
||||
t.Errorf("#%d: MemberAdd err = nil, but error", i)
|
||||
}
|
||||
_, err = capi.MemberUpdate(context.Background(), 0, tt[i])
|
||||
if err == nil {
|
||||
t.Errorf("#%d: MemberUpdate err = nil, but error", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -15,6 +15,8 @@
|
||||
package v3rpc
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/auth"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
@@ -113,6 +115,25 @@ func isClientCtxErr(ctxErr error, err error) bool {
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
code := ev.Code()
|
||||
return code == codes.Canceled || code == codes.DeadlineExceeded
|
||||
|
||||
switch ev.Code() {
|
||||
case codes.Canceled, codes.DeadlineExceeded:
|
||||
// client-side context cancel or deadline exceeded
|
||||
// "rpc error: code = Canceled desc = context canceled"
|
||||
// "rpc error: code = DeadlineExceeded desc = context deadline exceeded"
|
||||
return true
|
||||
case codes.Unavailable:
|
||||
msg := ev.Message()
|
||||
// client-side context cancel or deadline exceeded with TLS ("http2.errClientDisconnected")
|
||||
// "rpc error: code = Unavailable desc = client disconnected"
|
||||
if msg == "client disconnected" {
|
||||
return true
|
||||
}
|
||||
// "grpc/transport.ClientTransport.CloseStream" on canceled streams
|
||||
// "rpc error: code = Unavailable desc = stream error: stream ID 21; CANCEL")
|
||||
if strings.HasPrefix(msg, "stream error: ") && strings.HasSuffix(msg, "; CANCEL") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@@ -188,7 +188,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
|
||||
}
|
||||
|
||||
for wa := range s.synced.watchers {
|
||||
s.unsynced.watchers.add(wa)
|
||||
s.unsynced.add(wa)
|
||||
}
|
||||
s.synced = newWatcherGroup()
|
||||
return nil
|
||||
|
@@ -295,36 +295,45 @@ func TestWatchFutureRev(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatchRestore(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
test := func(delay time.Duration) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
testKey := []byte("foo")
|
||||
testValue := []byte("bar")
|
||||
rev := s.Put(testKey, testValue, lease.NoLease)
|
||||
testKey := []byte("foo")
|
||||
testValue := []byte("bar")
|
||||
rev := s.Put(testKey, testValue, lease.NoLease)
|
||||
|
||||
newBackend, newPath := backend.NewDefaultTmpBackend()
|
||||
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
|
||||
defer cleanup(newStore, newBackend, newPath)
|
||||
newBackend, newPath := backend.NewDefaultTmpBackend()
|
||||
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
|
||||
defer cleanup(newStore, newBackend, newPath)
|
||||
|
||||
w := newStore.NewWatchStream()
|
||||
w.Watch(testKey, nil, rev-1)
|
||||
w := newStore.NewWatchStream()
|
||||
w.Watch(testKey, nil, rev-1)
|
||||
|
||||
newStore.Restore(b)
|
||||
select {
|
||||
case resp := <-w.Chan():
|
||||
if resp.Revision != rev {
|
||||
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
|
||||
time.Sleep(delay)
|
||||
|
||||
newStore.Restore(b)
|
||||
select {
|
||||
case resp := <-w.Chan():
|
||||
if resp.Revision != rev {
|
||||
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
|
||||
}
|
||||
if len(resp.Events) != 1 {
|
||||
t.Fatalf("failed to get events from the response")
|
||||
}
|
||||
if resp.Events[0].Kv.ModRevision != rev {
|
||||
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("failed to receive event in 1 second.")
|
||||
}
|
||||
}
|
||||
if len(resp.Events) != 1 {
|
||||
t.Fatalf("failed to get events from the response")
|
||||
}
|
||||
if resp.Events[0].Kv.ModRevision != rev {
|
||||
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("failed to receive event in 1 second.")
|
||||
}
|
||||
|
||||
t.Run("Normal", test(0))
|
||||
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
||||
}
|
||||
|
||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.2.14"
|
||||
Version = "3.2.16"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user