etcd/etcdserver/server.go

183 lines
4.6 KiB
Go
Raw Normal View History

2014-08-26 05:39:02 +04:00
package etcdserver
import (
"errors"
"time"
2014-09-03 08:36:14 +04:00
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
2014-08-26 05:39:02 +04:00
"github.com/coreos/etcd/raft"
2014-08-28 05:53:18 +04:00
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
2014-09-04 07:06:16 +04:00
"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
2014-08-26 05:39:02 +04:00
"github.com/coreos/etcd/wait"
)
2014-08-29 03:41:42 +04:00
var (
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
)
2014-08-28 05:53:18 +04:00
type SendFunc func(m []raftpb.Message)
2014-08-28 00:37:22 +04:00
2014-08-26 05:39:02 +04:00
type Response struct {
2014-08-31 01:19:03 +04:00
Event *store.Event
2014-09-09 03:56:10 +04:00
Watcher store.Watcher
2014-08-31 02:27:43 +04:00
err error
2014-08-26 05:39:02 +04:00
}
type Server struct {
w wait.Wait
2014-08-29 03:41:42 +04:00
done chan struct{}
2014-08-27 00:39:33 +04:00
2014-08-28 00:37:22 +04:00
Node raft.Node
Store store.Store
2014-08-27 00:39:33 +04:00
// Send specifies the send function for sending msgs to peers. Send
// MUST NOT block. It is okay to drop messages, since clients should
// timeout and reissue their messages. If Send is nil, Server will
// panic.
2014-08-28 00:37:22 +04:00
Send SendFunc
2014-08-27 00:39:33 +04:00
// Save specifies the save function for saving ents to stable storage.
// Save MUST block until st and ents are on stable storage. If Send is
// nil, Server will panic.
2014-08-28 05:53:18 +04:00
Save func(st raftpb.State, ents []raftpb.Entry)
2014-09-03 03:59:29 +04:00
Ticker <-chan time.Time
2014-08-26 05:39:02 +04:00
}
2014-09-01 05:48:18 +04:00
// Start prepares and starts server in a new goroutine. It is no longer safe to
// modify a Servers fields after it has been sent to Start.
2014-08-29 03:41:42 +04:00
func Start(s *Server) {
s.w = wait.New()
s.done = make(chan struct{})
go s.run()
}
2014-08-28 00:37:22 +04:00
2014-08-29 03:41:42 +04:00
func (s *Server) run() {
2014-08-26 05:39:02 +04:00
for {
select {
2014-09-03 03:59:29 +04:00
case <-s.Ticker:
s.Node.Tick()
2014-08-28 00:37:22 +04:00
case rd := <-s.Node.Ready():
s.Save(rd.State, rd.Entries)
s.Send(rd.Messages)
// TODO(bmizerany): do this in the background, but take
// care to apply entries in a single goroutine, and not
// race them.
for _, e := range rd.CommittedEntries {
2014-08-29 03:41:42 +04:00
var r pb.Request
if err := r.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
2014-08-31 02:27:43 +04:00
s.w.Trigger(r.Id, s.apply(r))
}
2014-08-29 03:41:42 +04:00
case <-s.done:
2014-08-26 22:37:07 +04:00
return
2014-08-26 05:39:02 +04:00
}
}
}
2014-08-30 03:32:31 +04:00
// Stop stops the server, and shutsdown the running goroutine. Stop should be
// called after a Start(s), otherwise it will panic.
2014-08-30 02:32:41 +04:00
func (s *Server) Stop() {
s.Node.Stop()
close(s.done)
2014-08-30 02:32:41 +04:00
}
2014-08-29 03:41:42 +04:00
2014-08-30 03:32:31 +04:00
// Do interprets r and performs an operation on s.Store according to r.Method
2014-09-01 05:48:18 +04:00
// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET with
// Quorum == true, r will be sent through consensus before performing its
// respective operation. Do will block until an action is performed or there is
// an error.
func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) {
2014-08-26 05:39:02 +04:00
if r.Id == 0 {
panic("r.Id cannot be 0")
}
2014-08-31 01:35:27 +04:00
if r.Method == "GET" && r.Quorum {
r.Method = "QGET"
}
switch r.Method {
2014-08-31 01:35:27 +04:00
case "POST", "PUT", "DELETE", "QGET":
data, err := r.Marshal()
if err != nil {
return Response{}, err
}
ch := s.w.Register(r.Id)
2014-08-29 03:41:42 +04:00
s.Node.Propose(ctx, data)
select {
case x := <-ch:
resp := x.(Response)
return resp, resp.err
case <-ctx.Done():
s.w.Trigger(r.Id, nil) // GC wait
return Response{}, ctx.Err()
2014-08-29 03:41:42 +04:00
case <-s.done:
return Response{}, ErrStopped
}
case "GET":
switch {
case r.Wait:
2014-08-28 00:37:22 +04:00
wc, err := s.Store.Watch(r.Path, r.Recursive, false, r.Since)
if err != nil {
return Response{}, err
}
return Response{Watcher: wc}, nil
default:
2014-08-28 00:37:22 +04:00
ev, err := s.Store.Get(r.Path, r.Recursive, r.Sorted)
if err != nil {
return Response{}, err
}
return Response{Event: ev}, nil
}
default:
return Response{}, ErrUnknownMethod
2014-08-26 05:39:02 +04:00
}
}
2014-08-26 22:37:07 +04:00
// apply interprets r as a call to store.X and returns an Response interpreted from store.Event
2014-08-31 02:27:43 +04:00
func (s *Server) apply(r pb.Request) Response {
f := func(ev *store.Event, err error) Response {
return Response{Event: ev, err: err}
}
2014-08-27 03:39:26 +04:00
expr := time.Unix(0, r.Expiration)
switch r.Method {
case "POST":
2014-08-31 02:27:43 +04:00
return f(s.Store.Create(r.Path, r.Dir, r.Val, true, expr))
case "PUT":
2014-08-27 23:04:00 +04:00
exists, existsSet := getBool(r.PrevExists)
2014-08-27 03:39:26 +04:00
switch {
2014-08-27 23:04:00 +04:00
case existsSet:
2014-08-27 22:54:29 +04:00
if exists {
2014-08-31 02:27:43 +04:00
return f(s.Store.Update(r.Path, r.Val, expr))
2014-08-27 22:54:29 +04:00
} else {
2014-08-31 02:27:43 +04:00
return f(s.Store.Create(r.Path, r.Dir, r.Val, false, expr))
2014-08-27 22:54:29 +04:00
}
case r.PrevIndex > 0 || r.PrevValue != "":
2014-08-31 02:27:43 +04:00
return f(s.Store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
2014-08-27 22:29:33 +04:00
default:
2014-08-31 02:27:43 +04:00
return f(s.Store.Set(r.Path, r.Dir, r.Val, expr))
2014-08-27 03:39:26 +04:00
}
case "DELETE":
2014-08-27 22:34:42 +04:00
switch {
case r.PrevIndex > 0 || r.PrevValue != "":
2014-08-31 02:27:43 +04:00
return f(s.Store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
2014-08-27 22:34:42 +04:00
default:
2014-08-31 02:27:43 +04:00
return f(s.Store.Delete(r.Path, r.Recursive, r.Dir))
2014-08-27 22:34:42 +04:00
}
2014-08-31 01:35:27 +04:00
case "QGET":
2014-08-31 02:27:43 +04:00
return f(s.Store.Get(r.Path, r.Recursive, r.Sorted))
default:
2014-08-28 00:37:22 +04:00
// This should never be reached, but just in case:
2014-08-31 02:27:43 +04:00
return Response{err: ErrUnknownMethod}
}
2014-08-26 22:37:07 +04:00
}
2014-08-27 22:54:29 +04:00
func getBool(v *bool) (vv bool, set bool) {
if v == nil {
return false, false
}
return *v, true
}