diff --git a/Documentation/dev-guide/api_grpc_gateway.md b/Documentation/dev-guide/api_grpc_gateway.md index 6407937f6..7169a7a01 100644 --- a/Documentation/dev-guide/api_grpc_gateway.md +++ b/Documentation/dev-guide/api_grpc_gateway.md @@ -8,6 +8,8 @@ etcd v3 uses [gRPC][grpc] for its messaging protocol. The etcd project includes The gateway accepts a [JSON mapping][json-mapping] for etcd's [protocol buffer][api-ref] message definitions. Note that `key` and `value` fields are defined as byte arrays and therefore must be base64 encoded in JSON. +Use `curl` to put and get a key: + ```bash </dev/null 2>&1 +# {"result":{"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"2","raft_term":"2"},"events":[{"kv":{"key":"Zm9v","create_revision":"2","mod_revision":"2","version":"1","value":"YmFy"}}]}} +``` ## Swagger diff --git a/e2e/v2_curl_test.go b/e2e/v2_curl_test.go index ee536c3de..289d64c0d 100644 --- a/e2e/v2_curl_test.go +++ b/e2e/v2_curl_test.go @@ -120,7 +120,8 @@ type cURLReq struct { username string password string - isTLS bool + isTLS bool + timeout int endpoint string @@ -151,6 +152,9 @@ func cURLPrefixArgs(clus *etcdProcessCluster, method string, req cURLReq) []stri } else { cmdArgs = append(cmdArgs, "-L", ep) } + if req.timeout != 0 { + cmdArgs = append(cmdArgs, "-m", fmt.Sprintf("%d", req.timeout)) + } switch method { case "POST", "PUT": diff --git a/e2e/v3_curl_test.go b/e2e/v3_curl_test.go index f3dee62d4..0f7193a85 100644 --- a/e2e/v3_curl_test.go +++ b/e2e/v3_curl_test.go @@ -18,7 +18,7 @@ import ( "encoding/json" "testing" - "github.com/coreos/etcd/etcdserver/etcdserverpb" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" ) @@ -47,14 +47,14 @@ func testCurlPutGetGRPCGateway(t *testing.T, cfg *etcdProcessClusterConfig) { expectPut = `"revision":"` expectGet = `"value":"` ) - putData, err := json.Marshal(&etcdserverpb.PutRequest{ + putData, err := json.Marshal(&pb.PutRequest{ Key: key, Value: value, }) if err != nil { t.Fatal(err) } - rangeData, err := json.Marshal(&etcdserverpb.RangeRequest{ + rangeData, err := json.Marshal(&pb.RangeRequest{ Key: key, }) if err != nil { @@ -74,3 +74,40 @@ func testCurlPutGetGRPCGateway(t *testing.T, cfg *etcdProcessClusterConfig) { } } } + +func TestV3CurlWatch(t *testing.T) { + defer testutil.AfterTest(t) + + epc, err := newEtcdProcessCluster(&configNoTLS) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + defer func() { + if cerr := epc.Close(); err != nil { + t.Fatalf("error closing etcd processes (%v)", cerr) + } + }() + + // store "bar" into "foo" + putreq, err := json.Marshal(&pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) + if err != nil { + t.Fatal(err) + } + if err = cURLPost(epc, cURLReq{endpoint: "/v3alpha/kv/put", value: string(putreq), expected: "revision"}); err != nil { + t.Fatalf("failed put with curl (%v)", err) + } + // watch for first update to "foo" + wcr := &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1} + wreq, err := json.Marshal(wcr) + if err != nil { + t.Fatal(err) + } + // marshaling the grpc to json gives: + // "{"RequestUnion":{"CreateRequest":{"key":"Zm9v","start_revision":1}}}" + // but the gprc-gateway expects a different format.. + wstr := `{"create_request" : ` + string(wreq) + "}" + // expects "bar", timeout after 2 seconds since stream waits forever + if err = cURLPost(epc, cURLReq{endpoint: "/v3alpha/watch", value: wstr, expected: `"YmFy"`, timeout: 2}); err != nil { + t.Fatal(err) + } +} diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index c7fb97b6d..f0215531d 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -131,10 +131,14 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { // but when stream.Context().Done() is closed, the stream's recv // may continue to block since it uses a different context, leading to // deadlock when calling sws.close(). - go func() { errc <- sws.recvLoop() }() - + go func() { + if rerr := sws.recvLoop(); rerr != nil { + errc <- rerr + } + }() select { case err = <-errc: + close(sws.ctrlStream) case <-stream.Context().Done(): err = stream.Context().Err() // the only server-side cancellation is noleader for now. @@ -147,7 +151,6 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { } func (sws *serverWatchStream) recvLoop() error { - defer close(sws.ctrlStream) for { req, err := sws.gRPCStream.Recv() if err == io.EOF { diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 637004ebe..61b9b7e82 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -948,21 +948,23 @@ func testV3WatchMultipleStreams(t *testing.T, startRev int64) { // returned closing the WatchClient stream. Or the response will // be returned. func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.WatchResponse) { - rCh := make(chan *pb.WatchResponse) + rCh := make(chan *pb.WatchResponse, 1) + donec := make(chan struct{}) + defer close(donec) go func() { resp, _ := wc.Recv() - rCh <- resp + select { + case rCh <- resp: + case <-donec: + } }() select { case nr := <-rCh: return false, nr case <-time.After(timeout): } + // didn't get response wc.CloseSend() - rv, ok := <-rCh - if rv != nil || !ok { - return false, rv - } return true, nil }