Merge pull request #4091 from gyuho/watch_events_slice

storage: watch events in slice
release-2.3
Gyu-Ho Lee 2015-12-31 23:54:08 -08:00
commit 7dd599b69d
9 changed files with 286 additions and 101 deletions

View File

@ -98,6 +98,9 @@ func recvLoop(wStream pb.Watch_WatchClient) {
if err != nil {
ExitWithError(ExitError, err)
}
fmt.Printf("%s: %s %s\n", resp.Event.Type, string(resp.Event.Kv.Key), string(resp.Event.Kv.Value))
evs := resp.Events
for _, ev := range evs {
fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
}
}
}

View File

@ -19,6 +19,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/storage"
"github.com/coreos/etcd/storage/storagepb"
)
type watchServer struct {
@ -61,15 +62,25 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan struct{}) {
for {
select {
case e, ok := <-watcher.Chan():
case evs, ok := <-watcher.Chan():
if !ok {
return
}
err := stream.Send(&pb.WatchResponse{Event: &e})
// TODO: evs is []storagepb.Event type
// either return []*storagepb.Event from storage package
// or define protocol buffer with []storagepb.Event.
events := make([]*storagepb.Event, len(evs))
for i := range evs {
events[i] = &evs[i]
}
err := stream.Send(&pb.WatchResponse{Events: events})
storage.ReportEventReceived()
if err != nil {
return
}
case <-closec:
// drain the chan to clean up pending events
for {

View File

@ -368,9 +368,8 @@ func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
func (*WatchRequest) ProtoMessage() {}
type WatchResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// TODO: support batched events response?
Event *storagepb.Event `protobuf:"bytes,2,opt,name=event" json:"event,omitempty"`
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
Events []*storagepb.Event `protobuf:"bytes,2,rep,name=events" json:"events,omitempty"`
}
func (m *WatchResponse) Reset() { *m = WatchResponse{} }
@ -384,9 +383,9 @@ func (m *WatchResponse) GetHeader() *ResponseHeader {
return nil
}
func (m *WatchResponse) GetEvent() *storagepb.Event {
func (m *WatchResponse) GetEvents() []*storagepb.Event {
if m != nil {
return m.Event
return m.Events
}
return nil
}
@ -1567,15 +1566,17 @@ func (m *WatchResponse) MarshalTo(data []byte) (int, error) {
}
i += n12
}
if m.Event != nil {
data[i] = 0x12
i++
i = encodeVarintRpc(data, i, uint64(m.Event.Size()))
n13, err := m.Event.MarshalTo(data[i:])
if err != nil {
return 0, err
if len(m.Events) > 0 {
for _, msg := range m.Events {
data[i] = 0x12
i++
i = encodeVarintRpc(data, i, uint64(msg.Size()))
n, err := msg.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n
}
i += n13
}
return i, nil
}
@ -1622,11 +1623,11 @@ func (m *LeaseCreateResponse) MarshalTo(data []byte) (int, error) {
data[i] = 0xa
i++
i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
n14, err := m.Header.MarshalTo(data[i:])
n13, err := m.Header.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n14
i += n13
}
if m.LeaseId != 0 {
data[i] = 0x10
@ -1689,11 +1690,11 @@ func (m *LeaseRevokeResponse) MarshalTo(data []byte) (int, error) {
data[i] = 0xa
i++
i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
n15, err := m.Header.MarshalTo(data[i:])
n14, err := m.Header.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n15
i += n14
}
return i, nil
}
@ -1740,11 +1741,11 @@ func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) {
data[i] = 0xa
i++
i = encodeVarintRpc(data, i, uint64(m.Header.Size()))
n16, err := m.Header.MarshalTo(data[i:])
n15, err := m.Header.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n16
i += n15
}
if m.LeaseId != 0 {
data[i] = 0x10
@ -2065,9 +2066,11 @@ func (m *WatchResponse) Size() (n int) {
l = m.Header.Size()
n += 1 + l + sovRpc(uint64(l))
}
if m.Event != nil {
l = m.Event.Size()
n += 1 + l + sovRpc(uint64(l))
if len(m.Events) > 0 {
for _, e := range m.Events {
l = e.Size()
n += 1 + l + sovRpc(uint64(l))
}
}
return n
}
@ -3862,7 +3865,7 @@ func (m *WatchResponse) Unmarshal(data []byte) error {
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Events", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
@ -3883,10 +3886,8 @@ func (m *WatchResponse) Unmarshal(data []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Event == nil {
m.Event = &storagepb.Event{}
}
if err := m.Event.Unmarshal(data[iNdEx:postIndex]); err != nil {
m.Events = append(m.Events, &storagepb.Event{})
if err := m.Events[len(m.Events)-1].Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex

View File

@ -210,8 +210,7 @@ message WatchRequest {
message WatchResponse {
ResponseHeader header = 1;
// TODO: support batched events response?
storagepb.Event event = 2;
repeated storagepb.Event events = 2;
}
message LeaseCreateRequest {

View File

@ -740,7 +740,7 @@ func TestWatchableKVWatch(t *testing.T) {
s.Put([]byte("foo"), []byte("bar"))
select {
case ev := <-w.Chan():
case evs := <-w.Chan():
wev := storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
@ -752,6 +752,7 @@ func TestWatchableKVWatch(t *testing.T) {
},
WatchID: wid,
}
ev := evs[0]
if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev)
}
@ -761,7 +762,7 @@ func TestWatchableKVWatch(t *testing.T) {
s.Put([]byte("foo1"), []byte("bar1"))
select {
case ev := <-w.Chan():
case evs := <-w.Chan():
wev := storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
@ -773,6 +774,7 @@ func TestWatchableKVWatch(t *testing.T) {
},
WatchID: wid,
}
ev := evs[0]
if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev)
}
@ -787,7 +789,7 @@ func TestWatchableKVWatch(t *testing.T) {
defer cancel()
select {
case ev := <-w.Chan():
case evs := <-w.Chan():
wev := storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
@ -799,6 +801,7 @@ func TestWatchableKVWatch(t *testing.T) {
},
WatchID: wid,
}
ev := evs[0]
if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev)
}
@ -808,7 +811,7 @@ func TestWatchableKVWatch(t *testing.T) {
s.Put([]byte("foo1"), []byte("bar11"))
select {
case ev := <-w.Chan():
case evs := <-w.Chan():
wev := storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
@ -820,6 +823,7 @@ func TestWatchableKVWatch(t *testing.T) {
},
WatchID: wid,
}
ev := evs[0]
if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev)
}

View File

@ -33,7 +33,7 @@ const (
)
type watchable interface {
watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc)
watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc)
}
type watchableStore struct {
@ -75,10 +75,11 @@ func (s *watchableStore) Put(key, value []byte) (rev int64) {
if err != nil {
log.Panicf("unexpected range error (%v)", err)
}
s.handle(rev, storagepb.Event{
ev := storagepb.Event{
Type: storagepb.PUT,
Kv: &kvs[0],
})
}
s.handle(rev, []storagepb.Event{ev})
return rev
}
@ -92,14 +93,15 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
log.Panicf("unexpected range error (%v)", err)
}
n, rev = s.store.DeleteRange(key, end)
for _, kv := range kvs {
s.handle(rev, storagepb.Event{
evs := make([]storagepb.Event, len(kvs))
for i, kv := range kvs {
evs[i] = storagepb.Event{
Type: storagepb.DELETE,
Kv: &storagepb.KeyValue{
Key: kv.Key,
},
})
}}
}
s.handle(rev, evs)
return n, rev
}
@ -138,24 +140,33 @@ func (s *watchableStore) TxnEnd(txnID int64) error {
}
_, rev, _ := s.store.Range(nil, nil, 0, 0)
evs := []storagepb.Event{}
for k := range s.tx.putm {
kvs, _, err := s.store.Range([]byte(k), nil, 0, 0)
if err != nil {
log.Panicf("unexpected range error (%v)", err)
}
s.handle(rev, storagepb.Event{
ev := storagepb.Event{
Type: storagepb.PUT,
Kv: &kvs[0],
})
}
evs = append(evs, ev)
}
for k := range s.tx.delm {
s.handle(rev, storagepb.Event{
ev := storagepb.Event{
Type: storagepb.DELETE,
Kv: &storagepb.KeyValue{
Key: []byte(k),
},
})
}
evs = append(evs, ev)
}
s.handle(rev, evs)
s.mu.Unlock()
return nil
}
@ -170,11 +181,11 @@ func (s *watchableStore) NewWatcher() Watcher {
watcherGauge.Inc()
return &watcher{
watchable: s,
ch: make(chan storagepb.Event, chanBufLen),
ch: make(chan []storagepb.Event, chanBufLen),
}
}
func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc) {
func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watching, CancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()
@ -301,6 +312,9 @@ func (s *watchableStore) syncWatchings() {
ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
tx.Unlock()
evs := []storagepb.Event{}
// get the list of all events from all key-value pairs
for i, v := range vs {
var kv storagepb.KeyValue
if err := kv.Unmarshal(v); err != nil {
@ -308,8 +322,7 @@ func (s *watchableStore) syncWatchings() {
}
k := string(kv.Key)
wm, ok := keyToUnsynced[k]
if !ok {
if _, ok := keyToUnsynced[k]; !ok {
continue
}
@ -322,56 +335,53 @@ func (s *watchableStore) syncWatchings() {
}
ev.Kv = &kv
for w := range wm {
ev.WatchID = w.id
evs = append(evs, ev)
}
select {
case w.ch <- ev:
pendingEventsGauge.Inc()
default:
// TODO: handle the full unsynced watchings.
// continue to process other watchings for now, the full ones
// will be processed next time and hopefully it will not be full.
continue
}
if err := unsafeAddWatching(&s.synced, k, w); err != nil {
log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
}
delete(s.unsynced, w)
for w, es := range newWatchingToEventMap(keyToUnsynced, evs) {
select {
case w.ch <- es:
pendingEventsGauge.Add(float64(len(es)))
default:
// TODO: handle the full unsynced watchings.
// continue to process other watchings for now, the full ones
// will be processed next time and hopefully it will not be full.
continue
}
k := string(w.key)
if err := unsafeAddWatching(&s.synced, k, w); err != nil {
log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
}
delete(s.unsynced, w)
}
slowWatchingGauge.Set(float64(len(s.unsynced)))
}
// handle handles the change of the happening event on all watchings.
func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
s.notify(rev, ev)
func (s *watchableStore) handle(rev int64, evs []storagepb.Event) {
s.notify(rev, evs)
}
// notify notifies the fact that given event at the given rev just happened to
// watchings that watch on the key of the event.
func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
// check all prefixes of the key to notify all corresponded watchings
for i := 0; i <= len(ev.Kv.Key); i++ {
k := string(ev.Kv.Key[:i])
if wm, ok := s.synced[k]; ok {
for w := range wm {
// the watching needs to be notified when either it watches prefix or
// the key is exactly matched.
if !w.prefix && i != len(ev.Kv.Key) {
continue
}
ev.WatchID = w.id
select {
case w.ch <- ev:
pendingEventsGauge.Inc()
default:
w.cur = rev
s.unsynced[w] = struct{}{}
delete(wm, w)
slowWatchingGauge.Inc()
}
func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
we := newWatchingToEventMap(s.synced, evs)
for _, wm := range s.synced {
for w := range wm {
if _, ok := we[w]; !ok {
continue
}
es := we[w]
select {
case w.ch <- es:
pendingEventsGauge.Add(float64(len(es)))
default:
// move slow watching to unsynced
w.cur = rev
s.unsynced[w] = struct{}{}
delete(wm, w)
slowWatchingGauge.Inc()
}
}
}
@ -418,7 +428,7 @@ type watching struct {
// a chan to send out the watched events.
// The chan might be shared with other watchings.
ch chan<- storagepb.Event
ch chan<- []storagepb.Event
}
// unsafeAddWatching puts watching with key k into watchableStore's synced.
@ -441,3 +451,38 @@ func unsafeAddWatching(synced *map[string]map[*watching]struct{}, k string, wa *
mp[k][wa] = struct{}{}
return nil
}
// newWatchingToEventMap creates a map that has watching as key and events as
// value. It enables quick events look up by watching.
func newWatchingToEventMap(sm map[string]map[*watching]struct{}, evs []storagepb.Event) map[*watching][]storagepb.Event {
watchingToEvents := make(map[*watching][]storagepb.Event)
for _, ev := range evs {
key := string(ev.Kv.Key)
// check all prefixes of the key to notify all corresponded watchings
for i := 0; i <= len(key); i++ {
k := string(key[:i])
wm, ok := sm[k]
if !ok {
continue
}
for w := range wm {
// the watching needs to be notified when either it watches prefix or
// the key is exactly matched.
if !w.prefix && i != len(ev.Kv.Key) {
continue
}
ev.WatchID = w.id
if _, ok := watchingToEvents[w]; !ok {
watchingToEvents[w] = []storagepb.Event{}
}
watchingToEvents[w] = append(watchingToEvents[w], ev)
}
}
}
return watchingToEvents
}

View File

@ -15,8 +15,12 @@
package storage
import (
"bytes"
"os"
"reflect"
"testing"
"github.com/coreos/etcd/storage/storagepb"
)
func TestWatch(t *testing.T) {
@ -183,6 +187,19 @@ func TestSyncWatchings(t *testing.T) {
if len(w.(*watcher).ch) != watcherN {
t.Errorf("watched event size = %d, want %d", len(w.(*watcher).ch), watcherN)
}
evs := <-w.(*watcher).ch
if len(evs) != 1 {
t.Errorf("len(evs) got = %d, want = 1", len(evs))
}
if evs[0].Type != storagepb.PUT {
t.Errorf("got = %v, want = %v", evs[0].Type, storagepb.PUT)
}
if !bytes.Equal(evs[0].Kv.Key, testKey) {
t.Errorf("got = %s, want = %s", evs[0].Kv.Key, testKey)
}
if !bytes.Equal(evs[0].Kv.Value, testValue) {
t.Errorf("got = %s, want = %s", evs[0].Kv.Value, testValue)
}
}
func TestUnsafeAddWatching(t *testing.T) {
@ -222,3 +239,105 @@ func TestUnsafeAddWatching(t *testing.T) {
}
}
}
func TestNewMapWatchingToEventMap(t *testing.T) {
k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2")
v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2")
ws := []*watching{{key: k0}, {key: k1}, {key: k2}}
evs := []storagepb.Event{
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: k0, Value: v0},
},
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: k1, Value: v1},
},
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: k2, Value: v2},
},
}
tests := []struct {
sync map[string]map[*watching]struct{}
evs []storagepb.Event
wwe map[*watching][]storagepb.Event
}{
// no watching in sync, some events should return empty wwe
{
map[string]map[*watching]struct{}{},
evs,
map[*watching][]storagepb.Event{},
},
// one watching in sync, one event that does not match the key of that
// watching should return empty wwe
{
map[string]map[*watching]struct{}{
string(k2): {ws[2]: struct{}{}},
},
evs[:1],
map[*watching][]storagepb.Event{},
},
// one watching in sync, one event that matches the key of that
// watching should return wwe with that matching watching
{
map[string]map[*watching]struct{}{
string(k1): {ws[1]: struct{}{}},
},
evs[1:2],
map[*watching][]storagepb.Event{
ws[1]: evs[1:2],
},
},
// two watchings in sync that watches two different keys, one event
// that matches the key of only one of the watching should return wwe
// with the matching watching
{
map[string]map[*watching]struct{}{
string(k0): {ws[0]: struct{}{}},
string(k2): {ws[2]: struct{}{}},
},
evs[2:],
map[*watching][]storagepb.Event{
ws[2]: evs[2:],
},
},
// two watchings in sync that watches the same key, two events that
// match the keys should return wwe with those two watchings
{
map[string]map[*watching]struct{}{
string(k0): {ws[0]: struct{}{}},
string(k1): {ws[1]: struct{}{}},
},
evs[:2],
map[*watching][]storagepb.Event{
ws[0]: evs[:1],
ws[1]: evs[1:2],
},
},
}
for i, tt := range tests {
gwe := newWatchingToEventMap(tt.sync, tt.evs)
if len(gwe) != len(tt.wwe) {
t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe))
}
// compare gwe and tt.wwe
for w, mevs := range gwe {
if len(mevs) != len(tt.wwe[w]) {
t.Errorf("#%d: len(mevs) got = %d, want = %d", i, len(mevs), len(tt.wwe[w]))
}
if !reflect.DeepEqual(mevs, tt.wwe[w]) {
t.Errorf("#%d: reflect.DeepEqual events got = %v, want = true", i, false)
}
}
}
}

View File

@ -31,7 +31,7 @@ type Watcher interface {
Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc)
// Chan returns a chan. All watched events will be sent to the returned chan.
Chan() <-chan storagepb.Event
Chan() <-chan []storagepb.Event
// Close closes the WatchChan and release all related resources.
Close()
@ -41,7 +41,7 @@ type Watcher interface {
// one chan to send out watched events and other control events.
type watcher struct {
watchable watchable
ch chan storagepb.Event
ch chan []storagepb.Event
mu sync.Mutex // guards fields below it
nextID int64 // nextID is the ID allocated for next new watching
@ -67,7 +67,7 @@ func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) (id int64, can
return id, c
}
func (ws *watcher) Chan() <-chan storagepb.Event {
func (ws *watcher) Chan() <-chan []storagepb.Event {
return ws.ch
}

View File

@ -27,7 +27,6 @@ func TestWatcherWatchID(t *testing.T) {
idm := make(map[int64]struct{})
// synced watchings
for i := 0; i < 10; i++ {
id, cancel := w.Watch([]byte("foo"), false, 0)
if _, ok := idm[id]; ok {
@ -37,15 +36,17 @@ func TestWatcherWatchID(t *testing.T) {
s.Put([]byte("foo"), []byte("bar"))
ev := <-w.Chan()
if ev.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, ev.WatchID, id)
evs := <-w.Chan()
for j, ev := range evs {
if ev.WatchID != id {
t.Errorf("#%d.%d: watch id in event = %d, want %d", i, j, ev.WatchID, id)
}
}
cancel()
}
s.Put([]byte("foo2"), []byte("bar"))
// unsynced watchings
for i := 10; i < 20; i++ {
id, cancel := w.Watch([]byte("foo2"), false, 1)
@ -54,9 +55,11 @@ func TestWatcherWatchID(t *testing.T) {
}
idm[id] = struct{}{}
ev := <-w.Chan()
if ev.WatchID != id {
t.Errorf("#%d: watch id in event = %d, want %d", i, ev.WatchID, id)
evs := <-w.Chan()
for j, ev := range evs {
if ev.WatchID != id {
t.Errorf("#%d.%d: watch id in event = %d, want %d", i, j, ev.WatchID, id)
}
}
cancel()