From 10de2e6dbefb480ff7d98c76a36ccbfc5b2b26af Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 3 Nov 2015 15:57:35 -0800 Subject: [PATCH] *: serve watch service Implement watch service and hook it up with grpc server in etcdmain. --- etcdmain/etcd.go | 1 + etcdserver/api/v3rpc/watch.go | 73 +++++++++++++++++++++++++++++++++++ etcdserver/v3demo_server.go | 5 +++ storage/kv.go | 4 ++ 4 files changed, 83 insertions(+) create mode 100644 etcdserver/api/v3rpc/watch.go diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index d67156a58..35fd1d922 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -322,6 +322,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { // set up v3 demo rpc grpcServer := grpc.NewServer() etcdserverpb.RegisterEtcdServer(grpcServer, v3rpc.New(s)) + etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable())) go plog.Fatal(grpcServer.Serve(v3l)) } diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go new file mode 100644 index 000000000..66a3a9eb3 --- /dev/null +++ b/etcdserver/api/v3rpc/watch.go @@ -0,0 +1,73 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3rpc + +import ( + "io" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/storage" +) + +type watchServer struct { + watchable storage.Watchable +} + +func NewWatchServer(w storage.Watchable) pb.WatchServer { + return &watchServer{w} +} + +func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { + closec := make(chan struct{}) + defer close(closec) + + watcher := ws.watchable.NewWatcher() + defer watcher.Close() + + go sendLoop(stream, watcher, closec) + + for { + req, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + var prefix bool + toWatch := req.Key + if len(req.Key) == 0 { + toWatch = req.Prefix + prefix = true + } + // TODO: support cancellation + watcher.Watch(toWatch, prefix, req.StartRevision) + } +} + +func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan struct{}) { + for { + select { + case e := <-watcher.Chan(): + err := stream.Send(&pb.WatchResponse{Event: &e}) + if err != nil { + return + } + case <-closec: + return + } + } +} diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 0a0bc415a..52d6874ab 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -55,6 +55,11 @@ func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (pr } } +// Watcable returns a watchable interface attached to the etcdserver. +func (s *EtcdServer) Watchable() dstorage.Watchable { + return s.kv +} + func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { ar := &applyResult{} diff --git a/storage/kv.go b/storage/kv.go index 44abb5d30..1cfa1dffe 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -79,7 +79,11 @@ type KV interface { // WatchableKV is a KV that can be watched. type WatchableKV interface { KV + Watchable +} +// Watchable is the interface that wraps the NewWatcher function. +type Watchable interface { // NewWatcher returns a Watcher that can be used to // watch events happened or happending on the KV. NewWatcher() Watcher