diff --git a/etcdserver2/server.go b/etcdserver2/server.go index 79e100189..994c571ec 100644 --- a/etcdserver2/server.go +++ b/etcdserver2/server.go @@ -1,6 +1,8 @@ package etcdserver import ( + "log" + "code.google.com/p/go.net/context" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/wait" @@ -13,22 +15,24 @@ type Response struct { type Server struct { n raft.Node w wait.List + + msgsc chan raft.Message } func (s *Server) Run(ctx context.Context) { for { st, ents, cents, msgs, err := s.n.ReadState(ctx) if err != nil { - do something here + log.Println("etcdserver: error while reading state -", err) + return } - save state to wal - go send messages + s.save(st, ents) + s.send(msgs) go func() { - for e in cents { - req = decode e.Data - apply req to state machine - build Response from result of apply - trigger wait with (r.Id, resp) + for _, e := range cents { + var r Request + r.Unmarshal(e.Data) + s.w.Trigger(r.Id, s.apply(r)) } }() } @@ -53,3 +57,25 @@ func (s *Server) Do(ctx context.Context, r Request) (Response, error) { return Response{}, ctx.Err() } } + +// send sends dispatches msgs to the sending goroutine. If the goroutine is +// busy, it will drop msgs and clients should timeout and reissue. +// TODO: we could use s.w to trigger and error to cancel the clients faster???? Is this a good idea?? +func (s *Server) send(msgs []raft.Message) { + for _, m := range msgs { + select { + case s.msgsc <- m: + default: + log.Println("TODO: log dropped message") + } + } +} + +func (s *Server) save(st raft.State, ents []raft.Entry) { + panic("not implemented") +} + +// apply interprets r as a call to store.X and returns an Response interpreted from store.Event +func (s *Server) apply(r Request) Response { + panic("not implmented") +}