commit
f7baea7406
|
@ -27,6 +27,8 @@ import (
|
|||
"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
|
||||
)
|
||||
|
||||
const keysPrefix = "/v2/keys"
|
||||
|
||||
type Peers map[int64][]string
|
||||
|
||||
func (ps Peers) Pick(id int64) string {
|
||||
|
@ -152,7 +154,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
switch {
|
||||
case strings.HasPrefix(r.URL.Path, "/raft"):
|
||||
h.serveRaft(ctx, w, r)
|
||||
case strings.HasPrefix(r.URL.Path, "/v2/keys/"):
|
||||
case strings.HasPrefix(r.URL.Path, keysPrefix):
|
||||
h.serveKeys(ctx, w, r)
|
||||
default:
|
||||
http.NotFound(w, r)
|
||||
|
@ -160,7 +162,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
rr, err := parseRequest(r)
|
||||
rr, err := parseRequest(r, genId())
|
||||
if err != nil {
|
||||
log.Println(err) // reading of body failed
|
||||
return
|
||||
|
@ -176,12 +178,12 @@ func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.R
|
|||
return
|
||||
default:
|
||||
log.Println(err)
|
||||
http.Error(w, "Internal Server Error", 500)
|
||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if err := encodeResponse(ctx, w, resp); err != nil {
|
||||
http.Error(w, "Timeout while waiting for response", 504)
|
||||
http.Error(w, "Timeout while waiting for response", http.StatusGatewayTimeout)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -215,17 +217,22 @@ func genId() int64 {
|
|||
}
|
||||
}
|
||||
|
||||
func parseRequest(r *http.Request) (etcdserverpb.Request, error) {
|
||||
func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return etcdserverpb.Request{}, err
|
||||
}
|
||||
if !strings.HasPrefix(r.URL.Path, keysPrefix) {
|
||||
return etcdserverpb.Request{}, errors.New("unexpected key prefix!")
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
// TODO(jonboulle): perform strict validation of all parameters
|
||||
// https://github.com/coreos/etcd/issues/1011
|
||||
rr := etcdserverpb.Request{
|
||||
Id: genId(),
|
||||
Id: id,
|
||||
Method: r.Method,
|
||||
Val: r.FormValue("value"),
|
||||
Path: r.URL.Path[len("/v2/keys"):],
|
||||
Path: r.URL.Path[len(keysPrefix):],
|
||||
PrevValue: q.Get("prevValue"),
|
||||
PrevIndex: parseUint64(q.Get("prevIndex")),
|
||||
Recursive: parseBool(q.Get("recursive")),
|
||||
|
@ -245,6 +252,8 @@ func parseRequest(r *http.Request) (etcdserverpb.Request, error) {
|
|||
ttl := parseUint64(q.Get("ttl"))
|
||||
if ttl > 0 {
|
||||
expr := time.Duration(ttl) * time.Second
|
||||
// TODO(jonboulle): use fake clock instead of time module
|
||||
// https://github.com/coreos/etcd/issues/1021
|
||||
rr.Expiration = time.Now().Add(expr).UnixNano()
|
||||
}
|
||||
|
||||
|
@ -261,6 +270,8 @@ func parseUint64(s string) uint64 {
|
|||
return v
|
||||
}
|
||||
|
||||
// encodeResponse serializes the given etcdserver Response and writes the
|
||||
// resulting JSON to the given ResponseWriter, utilizing the provided context
|
||||
func encodeResponse(ctx context.Context, w http.ResponseWriter, resp etcdserver.Response) (err error) {
|
||||
var ev *store.Event
|
||||
switch {
|
||||
|
@ -288,7 +299,10 @@ func encodeResponse(ctx context.Context, w http.ResponseWriter, resp etcdserver.
|
|||
return nil
|
||||
}
|
||||
|
||||
func waitForEvent(ctx context.Context, w http.ResponseWriter, wa *store.Watcher) (*store.Event, error) {
|
||||
// waitForEvent waits for a given watcher to return its associated
|
||||
// event. It returns a non-nil error if the given Context times out
|
||||
// or the given ResponseWriter triggers a CloseNotify.
|
||||
func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher) (*store.Event, error) {
|
||||
// TODO(bmizerany): support streaming?
|
||||
defer wa.Remove()
|
||||
var nch <-chan bool
|
||||
|
@ -297,7 +311,7 @@ func waitForEvent(ctx context.Context, w http.ResponseWriter, wa *store.Watcher)
|
|||
}
|
||||
|
||||
select {
|
||||
case ev := <-wa.EventChan:
|
||||
case ev := <-wa.EventChan():
|
||||
return ev, nil
|
||||
case <-nch:
|
||||
elog.TODO()
|
||||
|
|
|
@ -1,73 +1,349 @@
|
|||
package etcdhttp
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"path"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
|
||||
)
|
||||
|
||||
func nopSave(st raftpb.State, ents []raftpb.Entry) {}
|
||||
func nopSend(m []raftpb.Message) {}
|
||||
func boolp(b bool) *bool { return &b }
|
||||
|
||||
func TestSet(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
st := store.New()
|
||||
|
||||
n := raft.Start(1, []int64{1}, 0, 0)
|
||||
n.Campaign(ctx)
|
||||
|
||||
srv := &etcdserver.Server{
|
||||
Node: n,
|
||||
Store: st,
|
||||
Send: etcdserver.SendFunc(nopSend),
|
||||
Save: func(st raftpb.State, ents []raftpb.Entry) {},
|
||||
}
|
||||
etcdserver.Start(srv)
|
||||
defer srv.Stop()
|
||||
|
||||
h := Handler{
|
||||
Timeout: time.Hour,
|
||||
Server: srv,
|
||||
}
|
||||
|
||||
s := httptest.NewServer(h)
|
||||
defer s.Close()
|
||||
|
||||
resp, err := http.PostForm(s.URL+"/v2/keys/foo", url.Values{"value": {"bar"}})
|
||||
func mustNewURL(t *testing.T, s string) *url.URL {
|
||||
u, err := url.Parse(s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
t.Fatalf("error creating URL from %q: %v", s, err)
|
||||
}
|
||||
return u
|
||||
}
|
||||
|
||||
if resp.StatusCode != 201 {
|
||||
t.Errorf("StatusCode = %d, expected %d", 201, resp.StatusCode)
|
||||
func TestBadParseRequest(t *testing.T) {
|
||||
tests := []struct {
|
||||
in *http.Request
|
||||
}{
|
||||
{
|
||||
// parseForm failure
|
||||
&http.Request{
|
||||
Body: nil,
|
||||
Method: "PUT",
|
||||
},
|
||||
},
|
||||
{
|
||||
// bad key prefix
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, "/badprefix/"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
g := new(store.Event)
|
||||
if err := json.NewDecoder(resp.Body).Decode(&g); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
w := &store.NodeExtern{
|
||||
Key: "/foo/1",
|
||||
Value: stringp("bar"),
|
||||
ModifiedIndex: 1,
|
||||
CreatedIndex: 1,
|
||||
}
|
||||
if !reflect.DeepEqual(g.Node, w) {
|
||||
t.Errorf("g = %+v, want %+v", g.Node, w)
|
||||
for i, tt := range tests {
|
||||
got, err := parseRequest(tt.in, 1234)
|
||||
if err == nil {
|
||||
t.Errorf("case %d: unexpected nil error!", i)
|
||||
}
|
||||
if !reflect.DeepEqual(got, etcdserverpb.Request{}) {
|
||||
t.Errorf("case %d: unexpected non-empty Request: %#v", i, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func stringp(s string) *string { return &s }
|
||||
func TestGoodParseRequest(t *testing.T) {
|
||||
tests := []struct {
|
||||
in *http.Request
|
||||
w etcdserverpb.Request
|
||||
}{
|
||||
{
|
||||
// good prefix, all other values default
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(keysPrefix, "foo")),
|
||||
},
|
||||
etcdserverpb.Request{
|
||||
Id: 1234,
|
||||
Path: "/foo",
|
||||
},
|
||||
},
|
||||
{
|
||||
// value specified
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(keysPrefix, "foo?value=some_value")),
|
||||
},
|
||||
etcdserverpb.Request{
|
||||
Id: 1234,
|
||||
Val: "some_value",
|
||||
Path: "/foo",
|
||||
},
|
||||
},
|
||||
{
|
||||
// prevIndex specified
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(keysPrefix, "foo?prevIndex=98765")),
|
||||
},
|
||||
etcdserverpb.Request{
|
||||
Id: 1234,
|
||||
PrevIndex: 98765,
|
||||
Path: "/foo",
|
||||
},
|
||||
},
|
||||
{
|
||||
// recursive specified
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(keysPrefix, "foo?recursive=true")),
|
||||
},
|
||||
etcdserverpb.Request{
|
||||
Id: 1234,
|
||||
Recursive: true,
|
||||
Path: "/foo",
|
||||
},
|
||||
},
|
||||
{
|
||||
// sorted specified
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(keysPrefix, "foo?sorted=true")),
|
||||
},
|
||||
etcdserverpb.Request{
|
||||
Id: 1234,
|
||||
Sorted: true,
|
||||
Path: "/foo",
|
||||
},
|
||||
},
|
||||
{
|
||||
// wait specified
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(keysPrefix, "foo?wait=true")),
|
||||
},
|
||||
etcdserverpb.Request{
|
||||
Id: 1234,
|
||||
Wait: true,
|
||||
Path: "/foo",
|
||||
},
|
||||
},
|
||||
{
|
||||
// prevExists should be non-null if specified
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(keysPrefix, "foo?prevExists=true")),
|
||||
},
|
||||
etcdserverpb.Request{
|
||||
Id: 1234,
|
||||
PrevExists: boolp(true),
|
||||
Path: "/foo",
|
||||
},
|
||||
},
|
||||
{
|
||||
// prevExists should be non-null if specified
|
||||
&http.Request{
|
||||
URL: mustNewURL(t, path.Join(keysPrefix, "foo?prevExists=false")),
|
||||
},
|
||||
etcdserverpb.Request{
|
||||
Id: 1234,
|
||||
PrevExists: boolp(false),
|
||||
Path: "/foo",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
got, err := parseRequest(tt.in, 1234)
|
||||
if err != nil {
|
||||
t.Errorf("#%d: err = %v, want %v", i, err, nil)
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.w) {
|
||||
t.Errorf("#%d: bad request: got %#v, want %#v", i, got, tt.w)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// eventingWatcher immediately returns a simple event of the given action on its channel
|
||||
type eventingWatcher struct {
|
||||
action string
|
||||
}
|
||||
|
||||
func (w *eventingWatcher) EventChan() chan *store.Event {
|
||||
ch := make(chan *store.Event)
|
||||
go func() {
|
||||
ch <- &store.Event{
|
||||
Action: w.action,
|
||||
Node: &store.NodeExtern{},
|
||||
}
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (w *eventingWatcher) Remove() {}
|
||||
|
||||
func TestEncodeResponse(t *testing.T) {
|
||||
tests := []struct {
|
||||
resp etcdserver.Response
|
||||
idx string
|
||||
code int
|
||||
err error
|
||||
}{
|
||||
// standard case, standard 200 response
|
||||
{
|
||||
etcdserver.Response{
|
||||
Event: &store.Event{
|
||||
Action: store.Get,
|
||||
Node: &store.NodeExtern{},
|
||||
PrevNode: &store.NodeExtern{},
|
||||
},
|
||||
Watcher: nil,
|
||||
},
|
||||
"0",
|
||||
http.StatusOK,
|
||||
nil,
|
||||
},
|
||||
// check new nodes return StatusCreated
|
||||
{
|
||||
etcdserver.Response{
|
||||
Event: &store.Event{
|
||||
Action: store.Create,
|
||||
Node: &store.NodeExtern{},
|
||||
PrevNode: &store.NodeExtern{},
|
||||
},
|
||||
Watcher: nil,
|
||||
},
|
||||
"0",
|
||||
http.StatusCreated,
|
||||
nil,
|
||||
},
|
||||
{
|
||||
etcdserver.Response{
|
||||
Watcher: &eventingWatcher{store.Create},
|
||||
},
|
||||
"0",
|
||||
http.StatusCreated,
|
||||
nil,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
rw := httptest.NewRecorder()
|
||||
err := encodeResponse(context.Background(), rw, tt.resp)
|
||||
if err != tt.err {
|
||||
t.Errorf("case %d: unexpected err: got %v, want %v", i, err, tt.err)
|
||||
continue
|
||||
}
|
||||
|
||||
if gct := rw.Header().Get("Content-Type"); gct != "application/json" {
|
||||
t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct)
|
||||
}
|
||||
|
||||
if gei := rw.Header().Get("X-Etcd-Index"); gei != tt.idx {
|
||||
t.Errorf("case %d: bad X-Etcd-Index header: got %s, want %s", i, gei, tt.idx)
|
||||
}
|
||||
|
||||
if rw.Code != tt.code {
|
||||
t.Errorf("case %d: bad response code: got %d, want %v", i, rw.Code, tt.code)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
type dummyWatcher struct {
|
||||
echan chan *store.Event
|
||||
}
|
||||
|
||||
func (w *dummyWatcher) EventChan() chan *store.Event {
|
||||
return w.echan
|
||||
}
|
||||
func (w *dummyWatcher) Remove() {}
|
||||
|
||||
type dummyResponseWriter struct {
|
||||
cnchan chan bool
|
||||
http.ResponseWriter
|
||||
}
|
||||
|
||||
func (rw *dummyResponseWriter) CloseNotify() <-chan bool {
|
||||
return rw.cnchan
|
||||
}
|
||||
|
||||
func TestWaitForEventChan(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ec := make(chan *store.Event)
|
||||
dw := &dummyWatcher{
|
||||
echan: ec,
|
||||
}
|
||||
w := httptest.NewRecorder()
|
||||
var wg sync.WaitGroup
|
||||
var ev *store.Event
|
||||
var err error
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
ev, err = waitForEvent(ctx, w, dw)
|
||||
wg.Done()
|
||||
}()
|
||||
ec <- &store.Event{
|
||||
Action: store.Get,
|
||||
Node: &store.NodeExtern{
|
||||
Key: "/foo/bar",
|
||||
ModifiedIndex: 12345,
|
||||
},
|
||||
}
|
||||
wg.Wait()
|
||||
want := &store.Event{
|
||||
Action: store.Get,
|
||||
Node: &store.NodeExtern{
|
||||
Key: "/foo/bar",
|
||||
ModifiedIndex: 12345,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(ev, want) {
|
||||
t.Fatalf("bad event: got %#v, want %#v", ev, want)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitForEventCloseNotify(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dw := &dummyWatcher{}
|
||||
cnchan := make(chan bool)
|
||||
w := &dummyResponseWriter{
|
||||
cnchan: cnchan,
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
var ev *store.Event
|
||||
var err error
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
ev, err = waitForEvent(ctx, w, dw)
|
||||
wg.Done()
|
||||
}()
|
||||
close(cnchan)
|
||||
wg.Wait()
|
||||
if ev != nil {
|
||||
t.Fatalf("non-nil Event returned with CloseNotifier: %v", ev)
|
||||
}
|
||||
if err == nil {
|
||||
t.Fatalf("nil err returned with CloseNotifier!")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitForEventCancelledContext(t *testing.T) {
|
||||
cctx, cancel := context.WithCancel(context.Background())
|
||||
dw := &dummyWatcher{}
|
||||
w := httptest.NewRecorder()
|
||||
var wg sync.WaitGroup
|
||||
var ev *store.Event
|
||||
var err error
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
ev, err = waitForEvent(cctx, w, dw)
|
||||
wg.Done()
|
||||
}()
|
||||
cancel()
|
||||
wg.Wait()
|
||||
if ev != nil {
|
||||
t.Fatalf("non-nil Event returned with cancelled context: %v", ev)
|
||||
}
|
||||
if err == nil {
|
||||
t.Fatalf("nil err returned with cancelled context!")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ type SendFunc func(m []raftpb.Message)
|
|||
|
||||
type Response struct {
|
||||
Event *store.Event
|
||||
Watcher *store.Watcher
|
||||
Watcher store.Watcher
|
||||
err error
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
package functional
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
|
||||
)
|
||||
|
||||
func nopSave(st raftpb.State, ents []raftpb.Entry) {}
|
||||
func nopSend(m []raftpb.Message) {}
|
||||
|
||||
func TestSet(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
st := store.New()
|
||||
|
||||
n := raft.Start(1, []int64{1}, 0, 0)
|
||||
n.Campaign(ctx)
|
||||
|
||||
srv := &etcdserver.Server{
|
||||
Node: n,
|
||||
Store: st,
|
||||
Send: etcdserver.SendFunc(nopSend),
|
||||
Save: func(st raftpb.State, ents []raftpb.Entry) {},
|
||||
}
|
||||
etcdserver.Start(srv)
|
||||
defer srv.Stop()
|
||||
|
||||
h := etcdhttp.Handler{
|
||||
Timeout: time.Hour,
|
||||
Server: srv,
|
||||
}
|
||||
|
||||
s := httptest.NewServer(h)
|
||||
defer s.Close()
|
||||
|
||||
resp, err := http.PostForm(s.URL+"/v2/keys/foo", url.Values{"value": {"bar"}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != 201 {
|
||||
t.Errorf("StatusCode = %d, expected %d", 201, resp.StatusCode)
|
||||
}
|
||||
|
||||
g := new(store.Event)
|
||||
if err := json.NewDecoder(resp.Body).Decode(&g); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
w := &store.NodeExtern{
|
||||
Key: "/foo/1",
|
||||
Value: stringp("bar"),
|
||||
ModifiedIndex: 1,
|
||||
CreatedIndex: 1,
|
||||
}
|
||||
if !reflect.DeepEqual(g.Node, w) {
|
||||
t.Errorf("g = %+v, want %+v", g.Node, w)
|
||||
}
|
||||
}
|
||||
|
||||
func stringp(s string) *string { return &s }
|
|
@ -51,7 +51,7 @@ type Store interface {
|
|||
Delete(nodePath string, recursive, dir bool) (*Event, error)
|
||||
CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
|
||||
|
||||
Watch(prefix string, recursive, stream bool, sinceIndex uint64) (*Watcher, error)
|
||||
Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
|
||||
|
||||
Save() ([]byte, error)
|
||||
Recovery(state []byte) error
|
||||
|
@ -344,14 +344,14 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
|
|||
return e, nil
|
||||
}
|
||||
|
||||
func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (*Watcher, error) {
|
||||
func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
|
||||
s.worldLock.RLock()
|
||||
defer s.worldLock.RUnlock()
|
||||
|
||||
key = path.Clean(path.Join("/", key))
|
||||
nextIndex := s.CurrentIndex + 1
|
||||
|
||||
var w *Watcher
|
||||
var w Watcher
|
||||
var err *etcdErr.Error
|
||||
|
||||
if sinceIndex == 0 {
|
||||
|
|
|
@ -113,7 +113,7 @@ func BenchmarkWatch(b *testing.B) {
|
|||
|
||||
e := newEvent("set", kvs[i][0], uint64(i+1), uint64(i+1))
|
||||
s.WatcherHub.notify(e)
|
||||
<-w.EventChan
|
||||
<-w.EventChan()
|
||||
s.CurrentIndex++
|
||||
}
|
||||
|
||||
|
@ -135,7 +135,7 @@ func BenchmarkWatchWithSet(b *testing.B) {
|
|||
w, _ := s.Watch(kvs[i][0], false, false, 0)
|
||||
|
||||
s.Set(kvs[i][0], false, "test", Permanent)
|
||||
<-w.EventChan
|
||||
<-w.EventChan()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,7 +145,7 @@ func BenchmarkWatchWithSetBatch(b *testing.B) {
|
|||
kvs, _ := generateNRandomKV(b.N, 128)
|
||||
b.StartTimer()
|
||||
|
||||
watchers := make([]*Watcher, b.N)
|
||||
watchers := make([]Watcher, b.N)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
watchers[i], _ = s.Watch(kvs[i][0], false, false, 0)
|
||||
|
@ -156,14 +156,14 @@ func BenchmarkWatchWithSetBatch(b *testing.B) {
|
|||
}
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
<-watchers[i].EventChan
|
||||
<-watchers[i].EventChan()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func BenchmarkWatchOneKey(b *testing.B) {
|
||||
s := newStore()
|
||||
watchers := make([]*Watcher, b.N)
|
||||
watchers := make([]Watcher, b.N)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
watchers[i], _ = s.Watch("/foo", false, false, 0)
|
||||
|
@ -172,7 +172,7 @@ func BenchmarkWatchOneKey(b *testing.B) {
|
|||
s.Set("/foo", false, "", Permanent)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
<-watchers[i].EventChan
|
||||
<-watchers[i].EventChan()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -535,7 +535,7 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
|
|||
func TestStoreWatchCreate(t *testing.T) {
|
||||
s := newStore()
|
||||
w, _ := s.Watch("/foo", false, false, 0)
|
||||
c := w.EventChan
|
||||
c := w.EventChan()
|
||||
s.Create("/foo", false, "bar", false, Permanent)
|
||||
e := nbselect(c)
|
||||
assert.Equal(t, e.Action, "create", "")
|
||||
|
@ -549,7 +549,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
|
|||
s := newStore()
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "create", "")
|
||||
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
||||
}
|
||||
|
@ -560,7 +560,7 @@ func TestStoreWatchUpdate(t *testing.T) {
|
|||
s.Create("/foo", false, "bar", false, Permanent)
|
||||
w, _ := s.Watch("/foo", false, false, 0)
|
||||
s.Update("/foo", "baz", Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "update", "")
|
||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||
}
|
||||
|
@ -571,7 +571,7 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
|
|||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Update("/foo/bar", "baz", Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "update", "")
|
||||
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
||||
}
|
||||
|
@ -582,7 +582,7 @@ func TestStoreWatchDelete(t *testing.T) {
|
|||
s.Create("/foo", false, "bar", false, Permanent)
|
||||
w, _ := s.Watch("/foo", false, false, 0)
|
||||
s.Delete("/foo", false, false)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "delete", "")
|
||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||
}
|
||||
|
@ -593,7 +593,7 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
|
|||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Delete("/foo/bar", false, false)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "delete", "")
|
||||
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
||||
}
|
||||
|
@ -604,7 +604,7 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
|
|||
s.Create("/foo", false, "bar", false, Permanent)
|
||||
w, _ := s.Watch("/foo", false, false, 0)
|
||||
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "compareAndSwap", "")
|
||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||
}
|
||||
|
@ -615,7 +615,7 @@ func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
|
|||
s.Create("/foo/bar", false, "baz", false, Permanent)
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "compareAndSwap", "")
|
||||
assert.Equal(t, e.Node.Key, "/foo/bar", "")
|
||||
}
|
||||
|
@ -634,7 +634,7 @@ func TestStoreWatchExpire(t *testing.T) {
|
|||
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
|
||||
|
||||
w, _ := s.Watch("/", true, false, 0)
|
||||
c := w.EventChan
|
||||
c := w.EventChan()
|
||||
e := nbselect(c)
|
||||
assert.Nil(t, e, "")
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
|
@ -642,7 +642,7 @@ func TestStoreWatchExpire(t *testing.T) {
|
|||
assert.Equal(t, e.Action, "expire", "")
|
||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||
w, _ = s.Watch("/", true, false, 4)
|
||||
e = nbselect(w.EventChan)
|
||||
e = nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "expire", "")
|
||||
assert.Equal(t, e.Node.Key, "/foofoo", "")
|
||||
}
|
||||
|
@ -653,19 +653,19 @@ func TestStoreWatchStream(t *testing.T) {
|
|||
w, _ := s.Watch("/foo", false, true, 0)
|
||||
// first modification
|
||||
s.Create("/foo", false, "bar", false, Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "create", "")
|
||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||
assert.Equal(t, *e.Node.Value, "bar", "")
|
||||
e = nbselect(w.EventChan)
|
||||
e = nbselect(w.EventChan())
|
||||
assert.Nil(t, e, "")
|
||||
// second modification
|
||||
s.Update("/foo", "baz", Permanent)
|
||||
e = nbselect(w.EventChan)
|
||||
e = nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "update", "")
|
||||
assert.Equal(t, e.Node.Key, "/foo", "")
|
||||
assert.Equal(t, *e.Node.Value, "baz", "")
|
||||
e = nbselect(w.EventChan)
|
||||
e = nbselect(w.EventChan())
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
|
@ -732,10 +732,10 @@ func TestStoreWatchCreateWithHiddenKey(t *testing.T) {
|
|||
s := newStore()
|
||||
w, _ := s.Watch("/_foo", false, false, 0)
|
||||
s.Create("/_foo", false, "bar", false, Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "create", "")
|
||||
assert.Equal(t, e.Node.Key, "/_foo", "")
|
||||
e = nbselect(w.EventChan)
|
||||
e = nbselect(w.EventChan())
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
|
@ -744,14 +744,14 @@ func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) {
|
|||
s := newStore()
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Nil(t, e, "")
|
||||
w, _ = s.Watch("/foo", true, false, 0)
|
||||
s.Create("/foo/_baz", true, "", false, Permanent)
|
||||
e = nbselect(w.EventChan)
|
||||
e = nbselect(w.EventChan())
|
||||
assert.Nil(t, e, "")
|
||||
s.Create("/foo/_baz/quux", false, "quux", false, Permanent)
|
||||
e = nbselect(w.EventChan)
|
||||
e = nbselect(w.EventChan())
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
|
@ -761,10 +761,10 @@ func TestStoreWatchUpdateWithHiddenKey(t *testing.T) {
|
|||
s.Create("/_foo", false, "bar", false, Permanent)
|
||||
w, _ := s.Watch("/_foo", false, false, 0)
|
||||
s.Update("/_foo", "baz", Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "update", "")
|
||||
assert.Equal(t, e.Node.Key, "/_foo", "")
|
||||
e = nbselect(w.EventChan)
|
||||
e = nbselect(w.EventChan())
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
|
@ -774,7 +774,7 @@ func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) {
|
|||
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Update("/foo/_bar", "baz", Permanent)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
|
@ -784,10 +784,10 @@ func TestStoreWatchDeleteWithHiddenKey(t *testing.T) {
|
|||
s.Create("/_foo", false, "bar", false, Permanent)
|
||||
w, _ := s.Watch("/_foo", false, false, 0)
|
||||
s.Delete("/_foo", false, false)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Equal(t, e.Action, "delete", "")
|
||||
assert.Equal(t, e.Node.Key, "/_foo", "")
|
||||
e = nbselect(w.EventChan)
|
||||
e = nbselect(w.EventChan())
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
|
@ -797,7 +797,7 @@ func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) {
|
|||
s.Create("/foo/_bar", false, "baz", false, Permanent)
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Delete("/foo/_bar", false, false)
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.Nil(t, e, "")
|
||||
}
|
||||
|
||||
|
@ -815,7 +815,7 @@ func TestStoreWatchExpireWithHiddenKey(t *testing.T) {
|
|||
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(1000*time.Millisecond))
|
||||
|
||||
w, _ := s.Watch("/", true, false, 0)
|
||||
c := w.EventChan
|
||||
c := w.EventChan()
|
||||
e := nbselect(c)
|
||||
assert.Nil(t, e, "")
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
|
@ -833,7 +833,7 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) {
|
|||
w, _ := s.Watch("/_foo/bar", true, false, 0)
|
||||
s.Create("/_foo/bar/baz", false, "baz", false, Permanent)
|
||||
|
||||
e := nbselect(w.EventChan)
|
||||
e := nbselect(w.EventChan())
|
||||
assert.NotNil(t, e, "")
|
||||
assert.Equal(t, e.Action, "create", "")
|
||||
assert.Equal(t, e.Node.Key, "/_foo/bar/baz", "")
|
||||
|
@ -841,7 +841,7 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) {
|
|||
|
||||
// Ensure that slow consumers are handled properly.
|
||||
//
|
||||
// Since Watcher.EventChan has a buffer of size 1 we can only queue 1
|
||||
// Since Watcher.EventChan() has a buffer of size 1 we can only queue 1
|
||||
// event per watcher. If the consumer cannot consume the event on time and
|
||||
// another event arrives, the channel is closed and event is discarded.
|
||||
// This test ensures that after closing the channel, the store can continue
|
||||
|
|
|
@ -16,8 +16,13 @@ limitations under the License.
|
|||
|
||||
package store
|
||||
|
||||
type Watcher struct {
|
||||
EventChan chan *Event
|
||||
type Watcher interface {
|
||||
EventChan() chan *Event
|
||||
Remove()
|
||||
}
|
||||
|
||||
type watcher struct {
|
||||
eventChan chan *Event
|
||||
stream bool
|
||||
recursive bool
|
||||
sinceIndex uint64
|
||||
|
@ -26,9 +31,13 @@ type Watcher struct {
|
|||
remove func()
|
||||
}
|
||||
|
||||
func (w *watcher) EventChan() chan *Event {
|
||||
return w.eventChan
|
||||
}
|
||||
|
||||
// notify function notifies the watcher. If the watcher interests in the given path,
|
||||
// the function will return true.
|
||||
func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
|
||||
func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
|
||||
// watcher is interested the path in three cases and under one condition
|
||||
// the condition is that the event happens after the watcher's sinceIndex
|
||||
|
||||
|
@ -45,15 +54,15 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
|
|||
// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
|
||||
// should get notified even if "/foo" is not the path it is watching.
|
||||
if (w.recursive || originalPath || deleted) && e.Index() >= w.sinceIndex {
|
||||
// We cannot block here if the EventChan capacity is full, otherwise
|
||||
// etcd will hang. EventChan capacity is full when the rate of
|
||||
// We cannot block here if the eventChan capacity is full, otherwise
|
||||
// etcd will hang. eventChan capacity is full when the rate of
|
||||
// notifications are higher than our send rate.
|
||||
// If this happens, we close the channel.
|
||||
select {
|
||||
case w.EventChan <- e:
|
||||
case w.eventChan <- e:
|
||||
default:
|
||||
// We have missed a notification. Remove the watcher.
|
||||
// Removing the watcher also closes the EventChan.
|
||||
// Removing the watcher also closes the eventChan.
|
||||
w.remove()
|
||||
}
|
||||
return true
|
||||
|
@ -63,11 +72,11 @@ func (w *Watcher) notify(e *Event, originalPath bool, deleted bool) bool {
|
|||
|
||||
// Remove removes the watcher from watcherHub
|
||||
// The actual remove function is guaranteed to only be executed once
|
||||
func (w *Watcher) Remove() {
|
||||
func (w *watcher) Remove() {
|
||||
w.hub.mutex.Lock()
|
||||
defer w.hub.mutex.Unlock()
|
||||
|
||||
close(w.EventChan)
|
||||
close(w.eventChan)
|
||||
if w.remove != nil {
|
||||
w.remove()
|
||||
}
|
||||
|
|
|
@ -34,19 +34,19 @@ func newWatchHub(capacity int) *watcherHub {
|
|||
}
|
||||
}
|
||||
|
||||
// Watch function returns a watcher.
|
||||
// Watch function returns a Watcher.
|
||||
// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
|
||||
// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
|
||||
// If index is zero, watch will start from the current index + 1.
|
||||
func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*Watcher, *etcdErr.Error) {
|
||||
func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (Watcher, *etcdErr.Error) {
|
||||
event, err := wh.EventHistory.scan(key, recursive, index)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w := &Watcher{
|
||||
EventChan: make(chan *Event, 1), // use a buffered channel
|
||||
w := &watcher{
|
||||
eventChan: make(chan *Event, 1), // use a buffered channel
|
||||
recursive: recursive,
|
||||
stream: stream,
|
||||
sinceIndex: index,
|
||||
|
@ -54,7 +54,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*
|
|||
}
|
||||
|
||||
if event != nil {
|
||||
w.EventChan <- event
|
||||
w.eventChan <- event
|
||||
return w, nil
|
||||
}
|
||||
|
||||
|
@ -75,7 +75,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (*
|
|||
}
|
||||
|
||||
w.remove = func() {
|
||||
if w.removed { // avoid remove it twice
|
||||
if w.removed { // avoid removing it twice
|
||||
return
|
||||
}
|
||||
w.removed = true
|
||||
|
@ -121,7 +121,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
|
|||
for curr != nil {
|
||||
next := curr.Next() // save reference to the next one in the list
|
||||
|
||||
w, _ := curr.Value.(*Watcher)
|
||||
w, _ := curr.Value.(*watcher)
|
||||
|
||||
originalPath := (e.Node.Key == nodePath)
|
||||
if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
|
||||
|
|
|
@ -27,7 +27,7 @@ func TestWatcher(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
c := w.EventChan
|
||||
c := w.EventChan()
|
||||
|
||||
select {
|
||||
case <-c:
|
||||
|
@ -47,7 +47,7 @@ func TestWatcher(t *testing.T) {
|
|||
}
|
||||
|
||||
w, _ = wh.watch("/foo", false, false, 2)
|
||||
c = w.EventChan
|
||||
c = w.EventChan()
|
||||
|
||||
e = newEvent(Create, "/foo/bar", 2, 2)
|
||||
|
||||
|
@ -72,7 +72,7 @@ func TestWatcher(t *testing.T) {
|
|||
|
||||
// ensure we are doing exact matching rather than prefix matching
|
||||
w, _ = wh.watch("/fo", true, false, 1)
|
||||
c = w.EventChan
|
||||
c = w.EventChan()
|
||||
|
||||
select {
|
||||
case re = <-c:
|
||||
|
|
2
test
2
test
|
@ -14,7 +14,7 @@ COVER=${COVER:-"-cover"}
|
|||
|
||||
source ./build
|
||||
|
||||
TESTABLE="wal snap etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb raft store"
|
||||
TESTABLE="wal snap etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb functional raft store"
|
||||
FORMATTABLE="$TESTABLE cors.go main.go"
|
||||
|
||||
# user has not provided PKG override
|
||||
|
|
Loading…
Reference in New Issue