Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
8a03d2e961 | ||
![]() |
a4b43b388d | ||
![]() |
e3b29b66a4 | ||
![]() |
eb0fb0e799 | ||
![]() |
40b71074e8 | ||
![]() |
7e2d426ec0 | ||
![]() |
3019246742 | ||
![]() |
dd1b699fc4 | ||
![]() |
f44aaf8248 |
@@ -582,6 +582,30 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigurableWatchProgressNotifyInterval(t *testing.T) {
|
||||
progressInterval := 200 * time.Millisecond
|
||||
clus := integration.NewClusterV3(t,
|
||||
&integration.ClusterConfig{
|
||||
Size: 3,
|
||||
WatchProgressNotifyInterval: progressInterval,
|
||||
})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
|
||||
rch := clus.RandClient().Watch(context.Background(), "foo", opts...)
|
||||
|
||||
timeout := 1 * time.Second // we expect to receive watch progress notify in 2 * progressInterval,
|
||||
// but for CPU-starved situation it may take longer. So we use 1 second here for timeout.
|
||||
select {
|
||||
case resp := <-rch: // waiting for a watch progress notify response
|
||||
if !resp.IsProgressNotify() {
|
||||
t.Fatalf("expected resp.IsProgressNotify() == true")
|
||||
}
|
||||
case <-time.After(timeout):
|
||||
t.Fatalf("timed out waiting for watch progress notify response in %v", timeout)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchRequestProgress(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
|
@@ -105,6 +105,16 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp
|
||||
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
ctx = withVersion(ctx)
|
||||
// getToken automatically
|
||||
// TODO(cfc4n): keep this code block, remove codes about getToken in client.go after pr #12165 merged.
|
||||
if c.authTokenBundle != nil {
|
||||
// equal to c.Username != "" && c.Password != ""
|
||||
err := c.getToken(ctx)
|
||||
if err != nil && rpctypes.Error(err) != rpctypes.ErrAuthNotEnabled {
|
||||
logger.Error("clientv3/retry_interceptor: getToken failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
grpcOpts, retryOpts := filterCallOptions(opts)
|
||||
callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
|
||||
// short circuit for simplicity, and avoiding allocations.
|
||||
|
@@ -675,10 +675,11 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
|
||||
}
|
||||
|
||||
// getIDs returns an ordered set of IDs included in the given snapshot and
|
||||
// the entries. The given snapshot/entries can contain two kinds of
|
||||
// the entries. The given snapshot/entries can contain three kinds of
|
||||
// ID-related entry:
|
||||
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
|
||||
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
|
||||
// - ConfChangeAddLearnerNode, in which the contained ID will be added into the set.
|
||||
func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
|
||||
ids := make(map[uint64]bool)
|
||||
if snap != nil {
|
||||
@@ -693,6 +694,8 @@ func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64
|
||||
var cc raftpb.ConfChange
|
||||
pbutil.MustUnmarshal(&cc, e.Data)
|
||||
switch cc.Type {
|
||||
case raftpb.ConfChangeAddLearnerNode:
|
||||
ids[cc.NodeID] = true
|
||||
case raftpb.ConfChangeAddNode:
|
||||
ids[cc.NodeID] = true
|
||||
case raftpb.ConfChangeRemoveNode:
|
||||
|
@@ -152,6 +152,8 @@ type ClusterConfig struct {
|
||||
|
||||
EnableLeaseCheckpoint bool
|
||||
LeaseCheckpointInterval time.Duration
|
||||
|
||||
WatchProgressNotifyInterval time.Duration
|
||||
}
|
||||
|
||||
type cluster struct {
|
||||
@@ -279,23 +281,24 @@ func (c *cluster) HTTPMembers() []client.Member {
|
||||
func (c *cluster) mustNewMember(t testing.TB) *member {
|
||||
m := mustNewMember(t,
|
||||
memberConfig{
|
||||
name: c.name(rand.Int()),
|
||||
authToken: c.cfg.AuthToken,
|
||||
peerTLS: c.cfg.PeerTLS,
|
||||
clientTLS: c.cfg.ClientTLS,
|
||||
quotaBackendBytes: c.cfg.QuotaBackendBytes,
|
||||
maxTxnOps: c.cfg.MaxTxnOps,
|
||||
maxRequestBytes: c.cfg.MaxRequestBytes,
|
||||
snapshotCount: c.cfg.SnapshotCount,
|
||||
snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
|
||||
grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
|
||||
grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
|
||||
grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
|
||||
clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
|
||||
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
|
||||
useIP: c.cfg.UseIP,
|
||||
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
|
||||
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
|
||||
name: c.name(rand.Int()),
|
||||
authToken: c.cfg.AuthToken,
|
||||
peerTLS: c.cfg.PeerTLS,
|
||||
clientTLS: c.cfg.ClientTLS,
|
||||
quotaBackendBytes: c.cfg.QuotaBackendBytes,
|
||||
maxTxnOps: c.cfg.MaxTxnOps,
|
||||
maxRequestBytes: c.cfg.MaxRequestBytes,
|
||||
snapshotCount: c.cfg.SnapshotCount,
|
||||
snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
|
||||
grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
|
||||
grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
|
||||
grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
|
||||
clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
|
||||
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
|
||||
useIP: c.cfg.UseIP,
|
||||
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
|
||||
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
|
||||
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
|
||||
})
|
||||
m.DiscoveryURL = c.cfg.DiscoveryURL
|
||||
if c.cfg.UseGRPC {
|
||||
@@ -568,23 +571,24 @@ type member struct {
|
||||
func (m *member) GRPCAddr() string { return m.grpcAddr }
|
||||
|
||||
type memberConfig struct {
|
||||
name string
|
||||
peerTLS *transport.TLSInfo
|
||||
clientTLS *transport.TLSInfo
|
||||
authToken string
|
||||
quotaBackendBytes int64
|
||||
maxTxnOps uint
|
||||
maxRequestBytes uint
|
||||
snapshotCount uint64
|
||||
snapshotCatchUpEntries uint64
|
||||
grpcKeepAliveMinTime time.Duration
|
||||
grpcKeepAliveInterval time.Duration
|
||||
grpcKeepAliveTimeout time.Duration
|
||||
clientMaxCallSendMsgSize int
|
||||
clientMaxCallRecvMsgSize int
|
||||
useIP bool
|
||||
enableLeaseCheckpoint bool
|
||||
leaseCheckpointInterval time.Duration
|
||||
name string
|
||||
peerTLS *transport.TLSInfo
|
||||
clientTLS *transport.TLSInfo
|
||||
authToken string
|
||||
quotaBackendBytes int64
|
||||
maxTxnOps uint
|
||||
maxRequestBytes uint
|
||||
snapshotCount uint64
|
||||
snapshotCatchUpEntries uint64
|
||||
grpcKeepAliveMinTime time.Duration
|
||||
grpcKeepAliveInterval time.Duration
|
||||
grpcKeepAliveTimeout time.Duration
|
||||
clientMaxCallSendMsgSize int
|
||||
clientMaxCallRecvMsgSize int
|
||||
useIP bool
|
||||
enableLeaseCheckpoint bool
|
||||
leaseCheckpointInterval time.Duration
|
||||
WatchProgressNotifyInterval time.Duration
|
||||
}
|
||||
|
||||
// mustNewMember return an inited member with the given name. If peerTLS is
|
||||
@@ -678,6 +682,8 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member {
|
||||
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
|
||||
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
|
||||
|
||||
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
|
||||
|
||||
m.InitialCorruptCheck = true
|
||||
|
||||
lcfg := logutil.DefaultZapLoggerConfig
|
||||
|
@@ -1,82 +0,0 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package netutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os/exec"
|
||||
)
|
||||
|
||||
// DropPort drops all tcp packets that are received from the given port and sent to the given port.
|
||||
func DropPort(port int) error {
|
||||
cmdStr := fmt.Sprintf("sudo iptables -A OUTPUT -p tcp --destination-port %d -j DROP", port)
|
||||
if _, err := exec.Command("/bin/sh", "-c", cmdStr).Output(); err != nil {
|
||||
return err
|
||||
}
|
||||
cmdStr = fmt.Sprintf("sudo iptables -A INPUT -p tcp --destination-port %d -j DROP", port)
|
||||
_, err := exec.Command("/bin/sh", "-c", cmdStr).Output()
|
||||
return err
|
||||
}
|
||||
|
||||
// RecoverPort stops dropping tcp packets at given port.
|
||||
func RecoverPort(port int) error {
|
||||
cmdStr := fmt.Sprintf("sudo iptables -D OUTPUT -p tcp --destination-port %d -j DROP", port)
|
||||
if _, err := exec.Command("/bin/sh", "-c", cmdStr).Output(); err != nil {
|
||||
return err
|
||||
}
|
||||
cmdStr = fmt.Sprintf("sudo iptables -D INPUT -p tcp --destination-port %d -j DROP", port)
|
||||
_, err := exec.Command("/bin/sh", "-c", cmdStr).Output()
|
||||
return err
|
||||
}
|
||||
|
||||
// SetLatency adds latency in millisecond scale with random variations.
|
||||
func SetLatency(ms, rv int) error {
|
||||
ifces, err := GetDefaultInterfaces()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if rv > ms {
|
||||
rv = 1
|
||||
}
|
||||
for ifce := range ifces {
|
||||
cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
|
||||
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
|
||||
if err != nil {
|
||||
// the rule has already been added. Overwrite it.
|
||||
cmdStr = fmt.Sprintf("sudo tc qdisc change dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
|
||||
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveLatency resets latency configurations.
|
||||
func RemoveLatency() error {
|
||||
ifces, err := GetDefaultInterfaces()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for ifce := range ifces {
|
||||
_, err = exec.Command("/bin/sh", "-c", fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)).Output()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -1,25 +0,0 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !linux
|
||||
|
||||
package netutil
|
||||
|
||||
func DropPort(port int) error { return nil }
|
||||
|
||||
func RecoverPort(port int) error { return nil }
|
||||
|
||||
func SetLatency(ms, rv int) error { return nil }
|
||||
|
||||
func RemoveLatency() error { return nil }
|
@@ -62,12 +62,22 @@ func setupEmbedCfg(cfg *embed.Config, curls, purls, ics []url.URL) {
|
||||
cfg.InitialCluster = cfg.InitialCluster[1:]
|
||||
}
|
||||
|
||||
func getCommand(exec, name, dir, cURL, pURL, cluster string) string {
|
||||
s := fmt.Sprintf("%s --name %s --data-dir %s --listen-client-urls %s --advertise-client-urls %s ",
|
||||
exec, name, dir, cURL, cURL)
|
||||
s += fmt.Sprintf("--listen-peer-urls %s --initial-advertise-peer-urls %s ", pURL, pURL)
|
||||
s += fmt.Sprintf("--initial-cluster %s ", cluster)
|
||||
return s + "--initial-cluster-token tkn --initial-cluster-state new"
|
||||
func getCommand(exec, name, dir, cURL, pURL, cluster string) (args []string) {
|
||||
if !strings.Contains(exec, "etcd") {
|
||||
panic(fmt.Errorf("%q doesn't seem like etcd binary", exec))
|
||||
}
|
||||
return []string{
|
||||
exec,
|
||||
"--name", name,
|
||||
"--data-dir", dir,
|
||||
"--listen-client-urls", cURL,
|
||||
"--advertise-client-urls", cURL,
|
||||
"--listen-peer-urls", pURL,
|
||||
"--initial-advertise-peer-urls", pURL,
|
||||
"--initial-cluster", cluster,
|
||||
"--initial-cluster-token=tkn",
|
||||
"--initial-cluster-state=new",
|
||||
}
|
||||
}
|
||||
|
||||
func write(ep string) {
|
||||
|
@@ -47,7 +47,8 @@ func install(ver, dir string) (string, error) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err = exec.Command("bash", "-c", fmt.Sprintf("tar xzvf %s -C %s --strip-components=1", tarPath, dir)).Run(); err != nil {
|
||||
// parametrizes to prevent attackers from adding arbitrary OS commands
|
||||
if err = exec.Command("tar", "xzvf", tarPath, "-C", dir, "--strip-components=1").Run(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return filepath.Join(dir, "etcd"), nil
|
||||
|
@@ -87,7 +87,7 @@ func main() {
|
||||
rc := make(chan run)
|
||||
|
||||
cs1 := getCommand(bp, "s1", d1, "http://localhost:2379", "http://localhost:2380", cluster)
|
||||
cmd1 := exec.Command("bash", "-c", cs1)
|
||||
cmd1 := exec.Command(cs1[0], cs1[1:]...)
|
||||
go func() {
|
||||
if *debug {
|
||||
cmd1.Stderr = os.Stderr
|
||||
@@ -101,7 +101,7 @@ func main() {
|
||||
rc <- run{cmd: cmd1}
|
||||
}()
|
||||
cs2 := getCommand(bp, "s2", d2, "http://localhost:22379", "http://localhost:22380", cluster)
|
||||
cmd2 := exec.Command("bash", "-c", cs2)
|
||||
cmd2 := exec.Command(cs2[0], cs2[1:]...)
|
||||
go func() {
|
||||
if *debug {
|
||||
cmd2.Stderr = os.Stderr
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.4.13"
|
||||
Version = "3.4.14"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user