etcd/etcdserver/participant.go

523 lines
14 KiB
Go

/*
Copyright 2014 CoreOS Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcdserver
import (
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"os"
"path"
"time"
"github.com/coreos/etcd/conf"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/wal"
)
const (
defaultHeartbeat = 1
defaultElection = 5
maxBufferedProposal = 128
defaultTickDuration = time.Millisecond * 100
v2machineKVPrefix = "/_etcd/machines"
v2configKVPrefix = "/_etcd/config"
v2Prefix = "/v2/keys"
v2machinePrefix = "/v2/machines"
v2peersPrefix = "/v2/peers"
v2LeaderPrefix = "/v2/leader"
v2SelfStatsPrefix = "/v2/stats/self"
v2LeaderStatsPrefix = "/v2/stats/leader"
v2StoreStatsPrefix = "/v2/stats/store"
v2adminConfigPrefix = "/v2/admin/config"
v2adminMachinesPrefix = "/v2/admin/machines/"
)
var (
defaultCompact = 10000
tmpErr = fmt.Errorf("try again")
stopErr = fmt.Errorf("server is stopped")
raftStopErr = fmt.Errorf("raft is stopped")
)
type participant struct {
id int64
clusterId int64
cfg *conf.Config
pubAddr string
raftPubAddr string
tickDuration time.Duration
client *v2client
peerHub *peerHub
proposal chan v2Proposal
addNodeC chan raft.Config
removeNodeC chan raft.Config
node *v2Raft
store.Store
rh *raftHandler
w *wal.WAL
snapshotter *snap.Snapshotter
serverStats *raftServerStats
stopNotifyc chan struct{}
*http.ServeMux
}
func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) {
p := &participant{
clusterId: -1,
cfg: c,
tickDuration: tickDuration,
client: client,
peerHub: peerHub,
proposal: make(chan v2Proposal, maxBufferedProposal),
addNodeC: make(chan raft.Config, 1),
removeNodeC: make(chan raft.Config, 1),
node: &v2Raft{
result: make(map[wait]chan interface{}),
},
Store: store.New(),
serverStats: NewRaftServerStats(c.Name),
stopNotifyc: make(chan struct{}),
ServeMux: http.NewServeMux(),
}
p.rh = newRaftHandler(peerHub, p.Store.Version(), p.serverStats)
p.peerHub.setServerStats(p.serverStats)
snapDir := path.Join(p.cfg.DataDir, "snap")
if err := os.Mkdir(snapDir, 0700); err != nil {
if !os.IsExist(err) {
log.Printf("id=%x participant.new.mkdir err=%v", p.id, err)
return nil, err
}
}
p.snapshotter = snap.New(snapDir)
walDir := path.Join(p.cfg.DataDir, "wal")
if err := os.Mkdir(walDir, 0700); err != nil {
if !os.IsExist(err) {
log.Printf("id=%x participant.new.mkdir err=%v", p.id, err)
return nil, err
}
}
var w *wal.WAL
var err error
if !wal.Exist(walDir) {
p.id = genId()
p.pubAddr = c.Addr
p.raftPubAddr = c.Peer.Addr
if w, err = wal.Create(walDir); err != nil {
return nil, err
}
p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection)
info := p.node.Info()
if err = w.SaveInfo(&info); err != nil {
return nil, err
}
log.Printf("id=%x participant.new path=%s\n", p.id, walDir)
} else {
s, err := p.snapshotter.Load()
if err != nil && err != snap.ErrNoSnapshot {
log.Printf("participant.snapload err=%s\n", err)
return nil, err
}
var snapIndex int64
if s != nil {
if err := p.Recovery(s.Data); err != nil {
log.Printf("store.recover err=%v", err)
return nil, err
}
log.Printf("participant.store.recovered index=%d\n", s.Index)
for _, node := range s.Nodes {
pp := path.Join(v2machineKVPrefix, fmt.Sprint(node))
ev, err := p.Store.Get(pp, false, false)
if err != nil {
log.Printf("store.get err=%v", err)
return nil, err
}
q, err := url.ParseQuery(*ev.Node.Value)
if err != nil {
log.Printf("url.parse err=%v", err)
return nil, err
}
peer, err := p.peerHub.add(node, q["raft"][0])
if err != nil {
log.Printf("peerHub.add err=%v", err)
return nil, err
}
peer.participate()
}
snapIndex = s.Index
}
n, err := wal.Read(walDir, snapIndex)
if err != nil {
return nil, err
}
p.id = n.Id
p.node.Node = raft.Recover(n.Id, s, n.Ents, n.State, defaultHeartbeat, defaultElection)
p.apply(p.node.Next())
log.Printf("id=%x participant.load path=%s snapIndex=%d state=\"%+v\" len(ents)=%d", p.id, p.cfg.DataDir, snapIndex, n.State, len(n.Ents))
if w, err = wal.Open(walDir); err != nil {
return nil, err
}
}
p.w = w
p.Handle(v2Prefix+"/", handlerErr(p.serveValue))
p.Handle(v2machinePrefix, handlerErr(p.serveMachines))
p.Handle(v2peersPrefix, handlerErr(p.serveMachines))
p.Handle(v2LeaderPrefix, handlerErr(p.serveLeader))
p.Handle(v2SelfStatsPrefix, handlerErr(p.serveSelfStats))
p.Handle(v2LeaderStatsPrefix, handlerErr(p.serveLeaderStats))
p.Handle(v2StoreStatsPrefix, handlerErr(p.serveStoreStats))
p.rh.Handle(v2adminConfigPrefix, handlerErr(p.serveAdminConfig))
p.rh.Handle(v2adminMachinesPrefix, handlerErr(p.serveAdminMachines))
// TODO: remind to set application/json for /v2/stats endpoint
return p, nil
}
func (p *participant) run(stop chan struct{}) error {
defer p.cleanup()
if p.node.IsEmpty() {
seeds := p.peerHub.getSeeds()
if len(seeds) == 0 {
log.Printf("id=%x participant.run action=bootstrap\n", p.id)
p.node.Campaign()
p.node.InitCluster(genId())
p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr))
p.apply(p.node.Next())
} else {
log.Printf("id=%x participant.run action=join seeds=\"%v\"\n", p.id, seeds)
if err := p.join(); err != nil {
log.Printf("id=%x participant.join err=%q", p.id, err)
return err
}
}
}
p.rh.start()
defer p.rh.stop()
node := p.node
recv := p.rh.recv
ticker := time.NewTicker(p.tickDuration)
defer ticker.Stop()
v2SyncTicker := time.NewTicker(time.Millisecond * 500)
defer v2SyncTicker.Stop()
var proposal chan v2Proposal
var addNodeC, removeNodeC chan raft.Config
for {
if node.HasLeader() {
proposal = p.proposal
addNodeC = p.addNodeC
removeNodeC = p.removeNodeC
} else {
proposal = nil
addNodeC = nil
removeNodeC = nil
}
select {
case p := <-proposal:
node.Propose(p)
case c := <-addNodeC:
node.UpdateConf(raft.AddNode, &c)
case c := <-removeNodeC:
node.UpdateConf(raft.RemoveNode, &c)
case msg := <-recv:
node.Step(*msg)
case <-ticker.C:
node.Tick()
case <-v2SyncTicker.C:
node.Sync()
case <-stop:
log.Printf("id=%x participant.stop\n", p.id)
return nil
}
if s := node.UnstableSnapshot(); !s.IsEmpty() {
if err := p.Recovery(s.Data); err != nil {
log.Printf("id=%x participant.recover err=%q", p.id, err)
return err
}
log.Printf("id=%x participant.recovered index=%d", p.id, s.Index)
}
p.apply(node.Next())
if err := p.save(node.UnstableEnts(), node.UnstableState()); err != nil {
return err
}
p.send(node.Msgs())
if node.IsRemoved() {
log.Printf("id=%x participant.end\n", p.id)
return nil
}
if p.node.EntsLen() > defaultCompact {
d, err := p.Save()
if err != nil {
log.Printf("id=%x participant.compact err=%q", p.id, err)
return err
}
p.node.Compact(d)
snap := p.node.GetSnap()
log.Printf("id=%x compacted index=%d", p.id, snap.Index)
if err := p.snapshotter.Save(&snap); err != nil {
log.Printf("id=%x snapshot.save err=%v", p.id, err)
return err
}
if err := p.w.Cut(p.node.Index()); err != nil {
log.Printf("id=%x wal.cut err=%v", p.id, err)
return err
}
info := p.node.Info()
if err = p.w.SaveInfo(&info); err != nil {
log.Printf("id=%x wal.saveInfo err=%v", p.id, err)
return err
}
}
}
}
func (p *participant) cleanup() {
p.w.Close()
close(p.stopNotifyc)
p.peerHub.stop()
}
func (p *participant) raftHandler() http.Handler {
return p.rh
}
func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
log.Printf("id=%x participant.add nodeId=%x raftPubAddr=%s pubAddr=%s\n", p.id, id, raftPubAddr, pubAddr)
pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
_, err := p.Store.Get(pp, false, false)
if err == nil {
return nil
}
if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound {
log.Printf("id=%x participant.add getErr=\"%v\"\n", p.id, err)
return err
}
w, err := p.Watch(pp, true, false, 0)
if err != nil {
log.Printf("id=%x participant.add watchErr=\"%v\"\n", p.id, err)
return tmpErr
}
select {
case p.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
default:
w.Remove()
log.Printf("id=%x participant.add proposeErr=\"unable to send out addNode proposal\"\n", p.id)
return tmpErr
}
select {
case v := <-w.EventChan:
if v.Action == store.Set {
return nil
}
log.Printf("id=%x participant.add watchErr=\"unexpected action\" action=%s\n", p.id, v.Action)
return tmpErr
case <-time.After(6 * defaultHeartbeat * p.tickDuration):
w.Remove()
log.Printf("id=%x participant.add watchErr=timeout\n", p.id)
return tmpErr
case <-p.stopNotifyc:
return stopErr
}
}
func (p *participant) remove(id int64) error {
log.Printf("id=%x participant.remove nodeId=%x\n", p.id, id)
pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
v, err := p.Store.Get(pp, false, false)
if err != nil {
return nil
}
select {
case p.removeNodeC <- raft.Config{NodeId: id}:
default:
log.Printf("id=%x participant.remove proposeErr=\"unable to send out removeNode proposal\"\n", p.id)
return tmpErr
}
// TODO(xiangli): do not need to watch if the
// removal target is self
w, err := p.Watch(pp, true, false, v.Index()+1)
if err != nil {
log.Printf("id=%x participant.remove watchErr=\"%v\"\n", p.id, err)
return tmpErr
}
select {
case v := <-w.EventChan:
if v.Action == store.Delete {
return nil
}
log.Printf("id=%x participant.remove watchErr=\"unexpected action\" action=%s\n", p.id, v.Action)
return tmpErr
case <-time.After(6 * defaultHeartbeat * p.tickDuration):
w.Remove()
log.Printf("id=%x participant.remove watchErr=timeout\n", p.id)
return tmpErr
case <-p.stopNotifyc:
return stopErr
}
}
func (p *participant) apply(ents []raft.Entry) {
offset := p.node.Applied() - int64(len(ents)) + 1
for i, ent := range ents {
switch ent.Type {
// expose raft entry type
case raft.Normal:
if len(ent.Data) == 0 {
continue
}
p.v2apply(offset+int64(i), ent)
case raft.ClusterInit:
p.clusterId = p.node.ClusterId()
log.Printf("id=%x participant.cluster.setId clusterId=%x\n", p.id, p.clusterId)
case raft.AddNode:
cfg := new(raft.Config)
if err := json.Unmarshal(ent.Data, cfg); err != nil {
log.Printf("id=%x participant.cluster.addNode unmarshalErr=\"%v\"\n", p.id, err)
break
}
pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
if ev, _ := p.Store.Get(pp, false, false); ev != nil {
log.Printf("id=%x participant.cluster.addNode err=existed value=%q", p.id, *ev.Node.Value)
break
}
peer, err := p.peerHub.add(cfg.NodeId, cfg.Addr)
if err != nil {
log.Printf("id=%x participant.cluster.addNode peerAddErr=\"%v\"\n", p.id, err)
break
}
peer.participate()
p.Store.Set(pp, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent)
if p.id == cfg.NodeId {
p.raftPubAddr = cfg.Addr
p.pubAddr = string(cfg.Context)
}
log.Printf("id=%x participant.cluster.addNode nodeId=%x addr=%s context=%s\n", p.id, cfg.NodeId, cfg.Addr, cfg.Context)
case raft.RemoveNode:
cfg := new(raft.Config)
if err := json.Unmarshal(ent.Data, cfg); err != nil {
log.Printf("id=%x participant.cluster.removeNode unmarshalErr=\"%v\"\n", p.id, err)
break
}
peer, err := p.peerHub.peer(cfg.NodeId)
if err != nil {
log.Fatal("id=%x participant.apply getPeerErr=\"%v\"", p.id, err)
}
peer.idle()
pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
p.Store.Delete(pp, false, false)
log.Printf("id=%x participant.cluster.removeNode nodeId=%x\n", p.id, cfg.NodeId)
default:
panic("unimplemented")
}
}
}
func (p *participant) save(ents []raft.Entry, state raft.State) error {
for _, ent := range ents {
if err := p.w.SaveEntry(&ent); err != nil {
log.Printf("id=%x participant.save saveEntryErr=%q", p.id, err)
return err
}
}
if !state.IsEmpty() {
if err := p.w.SaveState(&state); err != nil {
log.Printf("id=%x participant.save saveStateErr=%q", p.id, err)
return err
}
}
if err := p.w.Sync(); err != nil {
log.Printf("id=%x participant.save syncErr=%q", p.id, err)
return err
}
return nil
}
func (p *participant) send(msgs []raft.Message) {
for i := range msgs {
if err := p.peerHub.send(msgs[i]); err != nil {
log.Printf("id=%x participant.send err=\"%v\"\n", p.id, err)
}
}
}
func (p *participant) join() error {
info := &context{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
ClientURL: p.pubAddr,
PeerURL: p.raftPubAddr,
}
max := p.cfg.MaxRetryAttempts
for attempt := 0; ; attempt++ {
for seed := range p.peerHub.getSeeds() {
if err := p.client.AddMachine(seed, fmt.Sprint(p.id), info); err == nil {
return nil
} else {
log.Printf("id=%x participant.join addMachineErr=\"%v\"\n", p.id, err)
}
}
if attempt == max {
return fmt.Errorf("etcd: cannot join cluster after %d attempts", max)
}
time.Sleep(time.Millisecond * time.Duration(p.cfg.RetryInterval*1000))
}
}
func genId() int64 {
r := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
return r.Int63()
}