Compare commits
12 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
c31bec0f29 | ||
![]() |
19fe4b0cac | ||
![]() |
a5d94fe229 | ||
![]() |
e8f3cbf1c6 | ||
![]() |
856502f788 | ||
![]() |
ae23b0ef2f | ||
![]() |
5ee89be616 | ||
![]() |
38373b342d | ||
![]() |
536a5f594b | ||
![]() |
49e6916e66 | ||
![]() |
b9b6f6f7c4 | ||
![]() |
6ecbb3bbc5 |
@@ -24,6 +24,11 @@ curl -L http://localhost:2379/v3alpha/kv/put \
|
|||||||
curl -L http://localhost:2379/v3alpha/kv/range \
|
curl -L http://localhost:2379/v3alpha/kv/range \
|
||||||
-X POST -d '{"key": "Zm9v"}'
|
-X POST -d '{"key": "Zm9v"}'
|
||||||
# {"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"2","raft_term":"3"},"kvs":[{"key":"Zm9v","create_revision":"2","mod_revision":"2","version":"1","value":"YmFy"}],"count":"1"}
|
# {"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"2","raft_term":"3"},"kvs":[{"key":"Zm9v","create_revision":"2","mod_revision":"2","version":"1","value":"YmFy"}],"count":"1"}
|
||||||
|
|
||||||
|
# get all keys prefixed with "foo"
|
||||||
|
curl -L http://localhost:2379/v3alpha/kv/range \
|
||||||
|
-X POST -d '{"key": "Zm9v", "range_end": "Zm9w"}'
|
||||||
|
# {"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"2","raft_term":"3"},"kvs":[{"key":"Zm9v","create_revision":"2","mod_revision":"2","version":"1","value":"YmFy"}],"count":"1"}
|
||||||
```
|
```
|
||||||
|
|
||||||
Use `curl` to watch a key:
|
Use `curl` to watch a key:
|
||||||
|
@@ -449,7 +449,7 @@ message LeaseRevokeRequest {
|
|||||||
|
|
||||||
### Keep alives
|
### Keep alives
|
||||||
|
|
||||||
Leases are refreshed using a bi-directional stream created with the `LeaseKeepAlive` API call. When the client wishes to refresh a lease, it sends a `LeaseGrantRequest` over the stream:
|
Leases are refreshed using a bi-directional stream created with the `LeaseKeepAlive` API call. When the client wishes to refresh a lease, it sends a `LeaseKeepAliveRequest` over the stream:
|
||||||
|
|
||||||
```protobuf
|
```protobuf
|
||||||
message LeaseKeepAliveRequest {
|
message LeaseKeepAliveRequest {
|
||||||
|
@@ -186,11 +186,29 @@ func (e *Etcd) Config() Config {
|
|||||||
func (e *Etcd) Close() {
|
func (e *Etcd) Close() {
|
||||||
e.closeOnce.Do(func() { close(e.stopc) })
|
e.closeOnce.Do(func() { close(e.stopc) })
|
||||||
|
|
||||||
// (gRPC server) stops accepting new connections,
|
timeout := 2 * time.Second
|
||||||
// RPCs, and blocks until all pending RPCs are finished
|
if e.Server != nil {
|
||||||
|
timeout = e.Server.Cfg.ReqTimeout()
|
||||||
|
}
|
||||||
for _, sctx := range e.sctxs {
|
for _, sctx := range e.sctxs {
|
||||||
for gs := range sctx.grpcServerC {
|
for gs := range sctx.grpcServerC {
|
||||||
|
ch := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(ch)
|
||||||
|
// close listeners to stop accepting new connections,
|
||||||
|
// will block on any existing transports
|
||||||
gs.GracefulStop()
|
gs.GracefulStop()
|
||||||
|
}()
|
||||||
|
// wait until all pending RPCs are finished
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case <-time.After(timeout):
|
||||||
|
// took too long, manually close open transports
|
||||||
|
// e.g. watch streams
|
||||||
|
gs.Stop()
|
||||||
|
// concurrent GracefulStop should be interrupted
|
||||||
|
<-ch
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -16,6 +16,7 @@ package v3rpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"math"
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
@@ -24,6 +25,8 @@ import (
|
|||||||
"google.golang.org/grpc/grpclog"
|
"google.golang.org/grpc/grpclog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const maxStreams = math.MaxUint32
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
grpclog.SetLogger(plog)
|
grpclog.SetLogger(plog)
|
||||||
}
|
}
|
||||||
@@ -36,8 +39,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
|
|||||||
}
|
}
|
||||||
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
|
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
|
||||||
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
|
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
|
||||||
|
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
|
||||||
grpcServer := grpc.NewServer(opts...)
|
grpcServer := grpc.NewServer(opts...)
|
||||||
|
|
||||||
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
|
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
|
||||||
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
|
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
|
||||||
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
|
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
|
||||||
|
@@ -15,13 +15,16 @@
|
|||||||
package integration
|
package integration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/embed"
|
"github.com/coreos/etcd/embed"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -102,6 +105,47 @@ func TestEmbedEtcd(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestEmbedEtcdGracefulStop ensures embedded server stops
|
||||||
|
// cutting existing transports.
|
||||||
|
func TestEmbedEtcdGracefulStop(t *testing.T) {
|
||||||
|
cfg := embed.NewConfig()
|
||||||
|
|
||||||
|
urls := newEmbedURLs(2)
|
||||||
|
setupEmbedCfg(cfg, []url.URL{urls[0]}, []url.URL{urls[1]})
|
||||||
|
|
||||||
|
cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
|
||||||
|
os.RemoveAll(cfg.Dir)
|
||||||
|
defer os.RemoveAll(cfg.Dir)
|
||||||
|
|
||||||
|
e, err := embed.StartEtcd(cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
|
||||||
|
|
||||||
|
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{urls[0].String()}})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer cli.Close()
|
||||||
|
|
||||||
|
// open watch connection
|
||||||
|
cli.Watch(context.Background(), "foo")
|
||||||
|
|
||||||
|
donec := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
e.Close()
|
||||||
|
close(donec)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case err := <-e.Err():
|
||||||
|
t.Fatal(err)
|
||||||
|
case <-donec:
|
||||||
|
case <-time.After(2*time.Second + e.Server.Cfg.ReqTimeout()):
|
||||||
|
t.Fatalf("took too long to close server")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newEmbedURLs(n int) (urls []url.URL) {
|
func newEmbedURLs(n int) (urls []url.URL) {
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d%06d", os.Getpid(), i))
|
u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d%06d", os.Getpid(), i))
|
||||||
|
@@ -118,6 +118,7 @@ func interestingGoroutines() (gs []string) {
|
|||||||
}
|
}
|
||||||
stack := strings.TrimSpace(sl[1])
|
stack := strings.TrimSpace(sl[1])
|
||||||
if stack == "" ||
|
if stack == "" ||
|
||||||
|
strings.Contains(stack, "sync.(*WaitGroup).Done") ||
|
||||||
strings.Contains(stack, "created by os/signal.init") ||
|
strings.Contains(stack, "created by os/signal.init") ||
|
||||||
strings.Contains(stack, "runtime/panic.go") ||
|
strings.Contains(stack, "runtime/panic.go") ||
|
||||||
strings.Contains(stack, "created by testing.RunTests") ||
|
strings.Contains(stack, "created by testing.RunTests") ||
|
||||||
|
@@ -15,6 +15,8 @@
|
|||||||
package grpcproxy
|
package grpcproxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/clientv3"
|
||||||
@@ -49,6 +51,9 @@ func (mp *maintenanceProxy) Snapshot(sr *pb.SnapshotRequest, stream pb.Maintenan
|
|||||||
for {
|
for {
|
||||||
rr, err := sc.Recv()
|
rr, err := sc.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = stream.Send(rr)
|
err = stream.Send(rr)
|
||||||
|
10
test
10
test
@@ -100,13 +100,21 @@ function functional_pass {
|
|||||||
agent_pids="${agent_pids} $pid"
|
agent_pids="${agent_pids} $pid"
|
||||||
done
|
done
|
||||||
|
|
||||||
|
for a in 1 2 3; do
|
||||||
|
echo "Waiting for 'etcd-agent' on ${a}9027..."
|
||||||
|
while ! nc -z localhost ${a}9027; do
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
done
|
||||||
|
|
||||||
|
echo "Starting 'etcd-tester'"
|
||||||
./bin/etcd-tester \
|
./bin/etcd-tester \
|
||||||
-agent-endpoints "127.0.0.1:19027,127.0.0.1:29027,127.0.0.1:39027" \
|
-agent-endpoints "127.0.0.1:19027,127.0.0.1:29027,127.0.0.1:39027" \
|
||||||
-client-ports 12379,22379,32379 \
|
-client-ports 12379,22379,32379 \
|
||||||
-peer-ports 12380,22380,32380 \
|
-peer-ports 12380,22380,32380 \
|
||||||
-limit 1 \
|
-limit 1 \
|
||||||
-schedule-cases "0 1 2 3 4 5" \
|
-schedule-cases "0 1 2 3 4 5" \
|
||||||
-exit-on-failure
|
-exit-on-failure && echo "'etcd-tester' succeeded"
|
||||||
ETCD_TESTER_EXIT_CODE=$?
|
ETCD_TESTER_EXIT_CODE=$?
|
||||||
echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}
|
echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}
|
||||||
|
|
||||||
|
@@ -26,7 +26,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "3.0.0"
|
MinClusterVersion = "3.0.0"
|
||||||
Version = "3.2.2"
|
Version = "3.2.4"
|
||||||
APIVersion = "unknown"
|
APIVersion = "unknown"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
|
Reference in New Issue
Block a user