// Copyright 2015 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mvcc import ( "bytes" "fmt" "os" "reflect" "testing" "time" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" ) // TestWatcherWatchID tests that each watcher provides unique watchID, // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() defer w.Close() idm := make(map[WatchID]struct{}) for i := 0; i < 10; i++ { id := w.Watch([]byte("foo"), nil, 0) if _, ok := idm[id]; ok { t.Errorf("#%d: id %d exists", i, id) } idm[id] = struct{}{} s.Put([]byte("foo"), []byte("bar"), lease.NoLease) resp := <-w.Chan() if resp.WatchID != id { t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) } if err := w.Cancel(id); err != nil { t.Error(err) } } s.Put([]byte("foo2"), []byte("bar"), lease.NoLease) // unsynced watchers for i := 10; i < 20; i++ { id := w.Watch([]byte("foo2"), nil, 1) if _, ok := idm[id]; ok { t.Errorf("#%d: id %d exists", i, id) } idm[id] = struct{}{} resp := <-w.Chan() if resp.WatchID != id { t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) } if err := w.Cancel(id); err != nil { t.Error(err) } } } // TestWatcherWatchPrefix tests if Watch operation correctly watches // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() defer w.Close() idm := make(map[WatchID]struct{}) val := []byte("bar") keyWatch, keyEnd, keyPut := []byte("foo"), []byte("fop"), []byte("foobar") for i := 0; i < 10; i++ { id := w.Watch(keyWatch, keyEnd, 0) if _, ok := idm[id]; ok { t.Errorf("#%d: unexpected duplicated id %x", i, id) } idm[id] = struct{}{} s.Put(keyPut, val, lease.NoLease) resp := <-w.Chan() if resp.WatchID != id { t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) } if err := w.Cancel(id); err != nil { t.Errorf("#%d: unexpected cancel error %v", i, err) } if len(resp.Events) != 1 { t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events)) } if len(resp.Events) == 1 { if !bytes.Equal(resp.Events[0].Kv.Key, keyPut) { t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut) } } } keyWatch1, keyEnd1, keyPut1 := []byte("foo1"), []byte("foo2"), []byte("foo1bar") s.Put(keyPut1, val, lease.NoLease) // unsynced watchers for i := 10; i < 15; i++ { id := w.Watch(keyWatch1, keyEnd1, 1) if _, ok := idm[id]; ok { t.Errorf("#%d: id %d exists", i, id) } idm[id] = struct{}{} resp := <-w.Chan() if resp.WatchID != id { t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) } if err := w.Cancel(id); err != nil { t.Error(err) } if len(resp.Events) != 1 { t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events)) } if len(resp.Events) == 1 { if !bytes.Equal(resp.Events[0].Kv.Key, keyPut1) { t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut1) } } } } // TestWatcherWatchWrongRange ensures that watcher with wrong 'end' range // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() defer w.Close() if id := w.Watch([]byte("foa"), []byte("foa"), 1); id != -1 { t.Fatalf("key == end range given; id expected -1, got %d", id) } if id := w.Watch([]byte("fob"), []byte("foa"), 1); id != -1 { t.Fatalf("key > end range given; id expected -1, got %d", id) } // watch request with 'WithFromKey' has empty-byte range end if id := w.Watch([]byte("foo"), []byte{}, 1); id != 0 { t.Fatalf("\x00 is range given; id expected 0, got %d", id) } } func TestWatchDeleteRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := newWatchableStore(b, &lease.FakeLessor{}, nil) defer func() { s.store.Close() os.Remove(tmpPath) }() testKeyPrefix := []byte("foo") for i := 0; i < 3; i++ { s.Put([]byte(fmt.Sprintf("%s_%d", testKeyPrefix, i)), []byte("bar"), lease.NoLease) } w := s.NewWatchStream() from, to := []byte(testKeyPrefix), []byte(fmt.Sprintf("%s_%d", testKeyPrefix, 99)) w.Watch(from, to, 0) s.DeleteRange(from, to) we := []mvccpb.Event{ {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_0"), ModRevision: 5}}, {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_1"), ModRevision: 5}}, {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_2"), ModRevision: 5}}, } select { case r := <-w.Chan(): if !reflect.DeepEqual(r.Events, we) { t.Errorf("event = %v, want %v", r.Events, we) } case <-time.After(10 * time.Second): t.Fatal("failed to receive event after 10 seconds!") } } // TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() defer w.Close() id := w.Watch([]byte("foo"), nil, 0) tests := []struct { cancelID WatchID werr error }{ // no error should be returned when cancel the created watcher. {id, nil}, // not exist error should be returned when cancel again. {id, ErrWatcherNotExist}, // not exist error should be returned when cancel a bad id. {id + 1, ErrWatcherNotExist}, } for i, tt := range tests { gerr := w.Cancel(tt.cancelID) if gerr != tt.werr { t.Errorf("#%d: err = %v, want %v", i, gerr, tt.werr) } } if l := len(w.(*watchStream).cancels); l != 0 { t.Errorf("cancels = %d, want 0", l) } } // TestWatcherRequestProgress ensures synced watcher can correctly // report its correct progress. func TestWatcherRequestProgress(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() // manually create watchableStore instead of newWatchableStore // because newWatchableStore automatically calls syncWatchers // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ store: NewStore(b, &lease.FakeLessor{}, nil), unsynced: newWatcherGroup(), synced: newWatcherGroup(), } defer func() { s.store.Close() os.Remove(tmpPath) }() testKey := []byte("foo") notTestKey := []byte("bad") testValue := []byte("bar") s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() badID := WatchID(1000) w.RequestProgress(badID) select { case resp := <-w.Chan(): t.Fatalf("unexpected %+v", resp) default: } id := w.Watch(notTestKey, nil, 1) w.RequestProgress(id) select { case resp := <-w.Chan(): t.Fatalf("unexpected %+v", resp) default: } s.syncWatchers() w.RequestProgress(id) wrs := WatchResponse{WatchID: 0, Revision: 2} select { case resp := <-w.Chan(): if !reflect.DeepEqual(resp, wrs) { t.Fatalf("got %+v, expect %+v", resp, wrs) } case <-time.After(time.Second): t.Fatal("failed to receive progress") } } func TestWatcherWatchWithFilter(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() defer w.Close() filterPut := func(e mvccpb.Event) bool { return e.Type == mvccpb.PUT } w.Watch([]byte("foo"), nil, 0, filterPut) done := make(chan struct{}) go func() { <-w.Chan() done <- struct{}{} }() s.Put([]byte("foo"), []byte("bar"), 0) select { case <-done: t.Fatal("failed to filter put request") case <-time.After(100 * time.Millisecond): } s.DeleteRange([]byte("foo"), nil) select { case <-done: case <-time.After(100 * time.Millisecond): t.Fatal("failed to receive delete request") } }