diff --git a/etcdserver/config.go b/etcdserver/config.go index fea97a010..e2c380832 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -3,6 +3,7 @@ package etcdserver import ( "fmt" "net/http" + "path" "github.com/coreos/etcd/pkg/types" ) @@ -40,3 +41,18 @@ func (c *ServerConfig) Verify() error { } return nil } + +func (c *ServerConfig) WALDir() string { return path.Join(c.DataDir, "wal") } + +func (c *ServerConfig) SnapDir() string { return path.Join(c.DataDir, "snap") } + +func (c *ServerConfig) ID() uint64 { return c.Cluster.FindName(c.Name).ID } + +func (c *ServerConfig) ShouldDiscover() bool { + return c.DiscoveryURL != "" +} + +// IsBootstrap returns true if a bootstrap method is provided. +func (c *ServerConfig) IsBootstrap() bool { + return c.DiscoveryURL != "" || c.ClusterState == ClusterStateValueNew +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 02dad2224..0f4727776 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -6,7 +6,6 @@ import ( "log" "math/rand" "os" - "path" "sync/atomic" "time" @@ -82,111 +81,6 @@ type RaftTimer interface { Term() uint64 } -// NewServer creates a new EtcdServer from the supplied configuration. The -// configuration is considered static for the lifetime of the EtcdServer. -func NewServer(cfg *ServerConfig) *EtcdServer { - err := cfg.Verify() - if err != nil { - log.Fatalln(err) - } - snapdir := path.Join(cfg.DataDir, "snap") - if err := os.MkdirAll(snapdir, privateDirMode); err != nil { - log.Fatalf("etcdserver: cannot create snapshot directory: %v", err) - } - ss := snap.New(snapdir) - st := store.New() - var w *wal.WAL - var n raft.Node - m := cfg.Cluster.FindName(cfg.Name) - waldir := path.Join(cfg.DataDir, "wal") - if !wal.Exist(waldir) { - if cfg.DiscoveryURL != "" { - d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String()) - if err != nil { - log.Fatalf("etcd: cannot init discovery %v", err) - } - s, err := d.Discover() - if err != nil { - log.Fatalf("etcd: %v", err) - } - if err = cfg.Cluster.Set(s); err != nil { - log.Fatalf("etcd: %v", err) - } - } else if (cfg.ClusterState) != ClusterStateValueNew { - log.Fatalf("etcd: initial cluster state unset and no wal or discovery URL found") - } - i := pb.Info{ID: m.ID} - b, err := i.Marshal() - if err != nil { - log.Fatal(err) - } - if w, err = wal.Create(waldir, b); err != nil { - log.Fatal(err) - } - - ids := cfg.Cluster.IDs() - peers := make([]raft.Peer, len(ids)) - for i, id := range ids { - ctx, err := json.Marshal((*cfg.Cluster)[id]) - if err != nil { - log.Fatal(err) - } - peers[i] = raft.Peer{ID: id, Context: ctx} - } - n = raft.StartNode(m.ID, peers, 10, 1) - } else { - if cfg.DiscoveryURL != "" { - log.Printf("etcd: warn: ignoring discovery URL: etcd has already been initialized and has a valid log in %q", waldir) - } - var index uint64 - snapshot, err := ss.Load() - if err != nil && err != snap.ErrNoSnapshot { - log.Fatal(err) - } - if snapshot != nil { - log.Printf("etcdserver: restart from snapshot at index %d", snapshot.Index) - st.Recovery(snapshot.Data) - index = snapshot.Index - } - - // restart a node from previous wal - if w, err = wal.OpenAtIndex(waldir, index); err != nil { - log.Fatal(err) - } - md, st, ents, err := w.ReadAll() - if err != nil { - log.Fatal(err) - } - var info pb.Info - if err := info.Unmarshal(md); err != nil { - log.Fatal(err) - } - // TODO(xiangli): save/recovery nodeID? - if info.ID != m.ID { - log.Fatalf("unexpected nodeid %x, want %x: nodeid should always be the same until we support name/peerURLs update or dynamic configuration", info.ID, m.ID) - } - n = raft.RestartNode(m.ID, 10, 1, snapshot, st, ents) - } - - cls := &clusterStore{Store: st} - s := &EtcdServer{ - store: st, - node: n, - id: m.ID, - attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - storage: struct { - *wal.WAL - *snap.Snapshotter - }{w, ss}, - send: Sender(cfg.Transport, cls), - ticker: time.Tick(100 * time.Millisecond), - syncTicker: time.Tick(500 * time.Millisecond), - snapCount: cfg.SnapCount, - ClusterStore: cls, - } - return s -} - // EtcdServer is the production implementation of the Server interface type EtcdServer struct { w wait.Wait @@ -217,6 +111,73 @@ type EtcdServer struct { raftTerm uint64 } +// NewServer creates a new EtcdServer from the supplied configuration. The +// configuration is considered static for the lifetime of the EtcdServer. +func NewServer(cfg *ServerConfig) *EtcdServer { + if err := cfg.Verify(); err != nil { + log.Fatalln(err) + } + if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil { + log.Fatalf("etcdserver: cannot create snapshot directory: %v", err) + } + ss := snap.New(cfg.SnapDir()) + st := store.New() + var w *wal.WAL + var n raft.Node + if !wal.Exist(cfg.WALDir()) { + if !cfg.IsBootstrap() { + log.Fatalf("etcd: initial cluster state unset and no wal or discovery URL found") + } + if cfg.ShouldDiscover() { + d, err := discovery.New(cfg.DiscoveryURL, cfg.ID(), cfg.Cluster.String()) + if err != nil { + log.Fatalf("etcd: cannot init discovery %v", err) + } + s, err := d.Discover() + if err != nil { + log.Fatalf("etcd: %v", err) + } + if err = cfg.Cluster.Set(s); err != nil { + log.Fatalf("etcd: %v", err) + } + } + n, w = startNode(cfg) + } else { + if cfg.ShouldDiscover() { + log.Printf("etcd: warn: ignoring discovery: etcd has already been initialized and has a valid log in %q", cfg.WALDir()) + } + var index uint64 + snapshot, err := ss.Load() + if err != nil && err != snap.ErrNoSnapshot { + log.Fatal(err) + } + if snapshot != nil { + log.Printf("etcdserver: restart from snapshot at index %d", snapshot.Index) + st.Recovery(snapshot.Data) + index = snapshot.Index + } + n, w = restartNode(cfg, index, snapshot) + } + + cls := &clusterStore{Store: st} + s := &EtcdServer{ + store: st, + node: n, + id: cfg.ID(), + attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, + storage: struct { + *wal.WAL + *snap.Snapshotter + }{w, ss}, + send: Sender(cfg.Transport, cls), + ticker: time.Tick(100 * time.Millisecond), + syncTicker: time.Tick(500 * time.Millisecond), + snapCount: cfg.SnapCount, + ClusterStore: cls, + } + return s +} + // Start prepares and starts server in a new goroutine. It is no longer safe to // modify a server's fields after it has been sent to Start. // It also starts a goroutine to publish its server information. @@ -262,27 +223,8 @@ func (s *EtcdServer) run() { // care to apply entries in a single goroutine, and not // race them. // TODO: apply configuration change into ClusterStore. - for _, e := range rd.CommittedEntries { - switch e.Type { - case raftpb.EntryNormal: - var r pb.Request - if err := r.Unmarshal(e.Data); err != nil { - panic("TODO: this is bad, what do we do about it?") - } - s.w.Trigger(r.ID, s.applyRequest(r)) - case raftpb.EntryConfChange: - var cc raftpb.ConfChange - if err := cc.Unmarshal(e.Data); err != nil { - panic("TODO: this is bad, what do we do about it?") - } - s.applyConfChange(cc) - s.w.Trigger(cc.ID, nil) - default: - panic("unexpected entry type") - } - atomic.StoreUint64(&s.raftIndex, e.Index) - atomic.StoreUint64(&s.raftTerm, e.Term) - appliedi = e.Index + if len(rd.CommittedEntries) != 0 { + appliedi = s.apply(rd.CommittedEntries) } if rd.SoftState != nil { @@ -336,7 +278,7 @@ func (s *EtcdServer) Stop() { // an error. func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { if r.ID == 0 { - panic("r.Id cannot be 0") + panic("r.ID cannot be 0") } if r.Method == "GET" && r.Quorum { r.Method = "QGET" @@ -498,6 +440,34 @@ func getExpirationTime(r *pb.Request) time.Time { return t } +func (s *EtcdServer) apply(es []raftpb.Entry) uint64 { + var applied uint64 + for i := range es { + e := es[i] + switch e.Type { + case raftpb.EntryNormal: + var r pb.Request + if err := r.Unmarshal(e.Data); err != nil { + panic("TODO: this is bad, what do we do about it?") + } + s.w.Trigger(r.ID, s.applyRequest(r)) + case raftpb.EntryConfChange: + var cc raftpb.ConfChange + if err := cc.Unmarshal(e.Data); err != nil { + panic("TODO: this is bad, what do we do about it?") + } + s.applyConfChange(cc) + s.w.Trigger(cc.ID, nil) + default: + panic("unexpected entry type") + } + atomic.StoreUint64(&s.raftIndex, e.Index) + atomic.StoreUint64(&s.raftTerm, e.Term) + applied = e.Index + } + return applied +} + // applyRequest interprets r as a call to store.X and returns a Response interpreted // from store.Event func (s *EtcdServer) applyRequest(r pb.Request) Response { @@ -570,6 +540,46 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) { s.storage.Cut() } +func startNode(cfg *ServerConfig) (n raft.Node, w *wal.WAL) { + i := pb.Info{ID: cfg.ID()} + b, err := i.Marshal() + if err != nil { + log.Fatal(err) + } + if w, err = wal.Create(cfg.WALDir(), b); err != nil { + log.Fatal(err) + } + ids := cfg.Cluster.IDs() + peers := make([]raft.Peer, len(ids)) + for i, id := range ids { + ctx, err := json.Marshal((*cfg.Cluster)[id]) + if err != nil { + log.Fatal(err) + } + peers[i] = raft.Peer{ID: id, Context: ctx} + } + n = raft.StartNode(cfg.ID(), peers, 10, 1) + return +} + +func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (n raft.Node, w *wal.WAL) { + var err error + // restart a node from previous wal + if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil { + log.Fatal(err) + } + md, st, ents, err := w.ReadAll() + if err != nil { + log.Fatal(err) + } + var info pb.Info + if err := info.Unmarshal(md); err != nil { + log.Fatal(err) + } + n = raft.RestartNode(info.ID, 10, 1, snapshot, st, ents) + return +} + // TODO: move the function to /id pkg maybe? // GenID generates a random id that is not equal to 0. func GenID() (n uint64) {