diff --git a/contrib/recipes/client.go b/contrib/recipes/client.go index 9a5613723..2e7bc8d91 100644 --- a/contrib/recipes/client.go +++ b/contrib/recipes/client.go @@ -48,11 +48,17 @@ func (ec *EtcdClient) deleteRevKey(key string, rev int64) (bool, error) { Result: pb.Compare_EQUAL, Target: pb.Compare_MOD, Key: []byte(key), - ModRevision: rev} - req := &pb.RequestUnion{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}} + TargetUnion: &pb.Compare_ModRevision{ModRevision: rev}, + } + req := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{ + RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}} txnresp, err := ec.KV.Txn( context.TODO(), - &pb.TxnRequest{[]*pb.Compare{cmp}, []*pb.RequestUnion{req}, nil}) + &pb.TxnRequest{ + Compare: []*pb.Compare{cmp}, + Success: []*pb.RequestUnion{req}, + Failure: nil, + }) if err != nil { return false, err } else if txnresp.Succeeded == false { diff --git a/contrib/recipes/key.go b/contrib/recipes/key.go index f4d30ed71..2fd2f770f 100644 --- a/contrib/recipes/key.go +++ b/contrib/recipes/key.go @@ -86,15 +86,17 @@ func NewUniqueKV(client *EtcdClient, prefix string, val string, leaseID lease.Le // not yet exist. func putNewKV(ec *EtcdClient, key, val string, leaseID lease.LeaseID) (int64, error) { cmp := &pb.Compare{ - Result: pb.Compare_EQUAL, - Target: pb.Compare_VERSION, - Key: []byte(key)} - req := &pb.RequestUnion{ - RequestPut: &pb.PutRequest{ - Key: []byte(key), - Value: []byte(val), - Lease: int64(leaseID)}} + Result: pb.Compare_EQUAL, + Target: pb.Compare_VERSION, + Key: []byte(key), + TargetUnion: &pb.Compare_Version{Version: 0}} + req := &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte(key), + Value: []byte(val), + Lease: int64(leaseID)}}} txnresp, err := ec.KV.Txn( context.TODO(), &pb.TxnRequest{[]*pb.Compare{cmp}, []*pb.RequestUnion{req}, nil}) @@ -143,17 +145,23 @@ func newSequentialKV(client *EtcdClient, prefix, val string, leaseID lease.Lease Target: pb.Compare_MOD, Key: []byte(baseKey), // current revision might contain modification so +1 - ModRevision: resp.Header.Revision + 1, + TargetUnion: &pb.Compare_ModRevision{ModRevision: resp.Header.Revision + 1}, } - prPrefix := &pb.PutRequest{Key: baseKey, Lease: int64(leaseID)} - reqPrefix := &pb.RequestUnion{RequestPut: prPrefix} - prNewKey := &pb.PutRequest{ - Key: []byte(newKey), - Value: []byte(val), - Lease: int64(leaseID), - } - reqNewKey := &pb.RequestUnion{RequestPut: prNewKey} + reqPrefix := &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: baseKey, + Lease: int64(leaseID), + }}} + + reqNewKey := &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte(newKey), + Value: []byte(val), + Lease: int64(leaseID), + }}} txnresp, err := client.KV.Txn( context.TODO(), diff --git a/contrib/recipes/stm.go b/contrib/recipes/stm.go index e4d2b47c8..d7a02ec18 100644 --- a/contrib/recipes/stm.go +++ b/contrib/recipes/stm.go @@ -84,15 +84,19 @@ func (s *STM) commit() (ok bool, err error) { Result: pb.Compare_LESS, Target: pb.Compare_MOD, Key: []byte(k), - ModRevision: rk.Revision() + 1, + TargetUnion: &pb.Compare_ModRevision{ModRevision: rk.Revision() + 1}, } cmps = append(cmps, cmp) } // apply all writes puts := []*pb.RequestUnion{} for k, v := range s.wset { - put := &pb.PutRequest{Key: []byte(k), Value: []byte(v)} - puts = append(puts, &pb.RequestUnion{RequestPut: put}) + puts = append(puts, &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte(k), + Value: []byte(v), + }}}) } txnresp, err := s.client.KV.Txn(context.TODO(), &pb.TxnRequest{cmps, puts, nil}) return txnresp.Succeeded, err diff --git a/contrib/recipes/watch.go b/contrib/recipes/watch.go index 64fbc7f62..0f25c965d 100644 --- a/contrib/recipes/watch.go +++ b/contrib/recipes/watch.go @@ -23,6 +23,7 @@ import ( type Watcher struct { wstream pb.Watch_WatchClient + cancel context.CancelFunc donec chan struct{} id storage.WatchID recvc chan *storagepb.Event @@ -38,7 +39,8 @@ func NewPrefixWatcher(c *EtcdClient, prefix string, rev int64) (*Watcher, error) } func newWatcher(c *EtcdClient, key string, rev int64, isPrefix bool) (*Watcher, error) { - w, err := c.Watch.Watch(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + w, err := c.Watch.Watch(ctx) if err != nil { return nil, err } @@ -50,7 +52,7 @@ func newWatcher(c *EtcdClient, key string, rev int64, isPrefix bool) (*Watcher, req.Key = []byte(key) } - if err := w.Send(&pb.WatchRequest{CreateRequest: req}); err != nil { + if err := w.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{CreateRequest: req}}); err != nil { return nil, err } @@ -63,6 +65,7 @@ func newWatcher(c *EtcdClient, key string, rev int64, isPrefix bool) (*Watcher, } ret := &Watcher{ wstream: w, + cancel: cancel, donec: make(chan struct{}), id: storage.WatchID(wresp.WatchId), recvc: make(chan *storagepb.Event), @@ -72,11 +75,14 @@ func newWatcher(c *EtcdClient, key string, rev int64, isPrefix bool) (*Watcher, } func (w *Watcher) Close() error { + defer w.cancel() if w.wstream == nil { return w.lastErr } - req := &pb.WatchCancelRequest{WatchId: int64(w.id)} - err := w.wstream.Send(&pb.WatchRequest{CancelRequest: req}) + req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CancelRequest{ + CancelRequest: &pb.WatchCancelRequest{ + WatchId: int64(w.id)}}} + err := w.wstream.Send(req) if err != nil && w.lastErr == nil { return err }