server: refactor server

release-2.0
Yicheng Qin 2014-07-18 01:36:58 -07:00
parent 447f6a16cc
commit 6d81aabd48
16 changed files with 744 additions and 669 deletions

View File

@ -2,88 +2,43 @@ package etcd
import (
"crypto/tls"
"encoding/json"
"fmt"
"errors"
"log"
"net/http"
"net/url"
"path"
"time"
"github.com/coreos/etcd/config"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/store"
)
const (
defaultHeartbeat = 1
defaultElection = 5
maxBufferedProposal = 128
defaultTickDuration = time.Millisecond * 100
v2machineKVPrefix = "/_etcd/machines"
v2configKVPrefix = "/_etcd/config"
v2Prefix = "/v2/keys"
v2machinePrefix = "/v2/machines"
v2peersPrefix = "/v2/peers"
v2LeaderPrefix = "/v2/leader"
v2StoreStatsPrefix = "/v2/stats/store"
v2adminConfigPrefix = "/v2/admin/config"
v2adminMachinesPrefix = "/v2/admin/machines/"
raftPrefix = "/raft"
)
const (
participant = iota
standby
stop
participantMode int64 = iota
standbyMode
stopMode
)
var (
tmpErr = fmt.Errorf("try again")
raftStopErr = fmt.Errorf("raft is stopped")
noneId int64 = -1
stopErr = errors.New("stopped")
)
type Server struct {
config *config.Config
mode int
id int64
pubAddr string
raftPubAddr string
nodes map[string]bool
peerHub *peerHub
config *config.Config
id int64
pubAddr string
raftPubAddr string
tickDuration time.Duration
client *v2client
rh *raftHandler
mode atomicInt
nodes map[string]bool
p *participant
s *standby
// participant mode vars
proposal chan v2Proposal
addNodeC chan raft.Config
removeNodeC chan raft.Config
node *v2Raft
store.Store
client *v2client
peerHub *peerHub
// standby mode vars
leader int64
leaderAddr string
clusterConf *config.ClusterConfig
modeC chan int
stop chan struct{}
participantHandler http.Handler
standbyHandler http.Handler
modeC chan int64
stopc chan struct{}
}
func New(c *config.Config, id int64) *Server {
@ -105,406 +60,112 @@ func New(c *config.Config, id int64) *Server {
tr := new(http.Transport)
tr.TLSClientConfig = tc
client := &http.Client{Transport: tr}
peerHub := newPeerHub(c.Peers, client)
s := &Server{
config: c,
id: id,
pubAddr: c.Addr,
raftPubAddr: c.Peer.Addr,
nodes: make(map[string]bool),
peerHub: peerHub,
config: c,
id: id,
pubAddr: c.Addr,
raftPubAddr: c.Peer.Addr,
tickDuration: defaultTickDuration,
client: newClient(tc),
rh: newRaftHandler(peerHub),
mode: atomicInt(stopMode),
nodes: make(map[string]bool),
node: &v2Raft{
Node: raft.New(id, defaultHeartbeat, defaultElection),
result: make(map[wait]chan interface{}),
},
Store: store.New(),
client: newClient(tc),
peerHub: newPeerHub(c.Peers, client),
modeC: make(chan int, 10),
stop: make(chan struct{}),
modeC: make(chan int64, 10),
stopc: make(chan struct{}),
}
for _, seed := range c.Peers {
s.nodes[seed] = true
}
m := http.NewServeMux()
m.Handle(v2Prefix+"/", handlerErr(s.serveValue))
m.Handle(v2machinePrefix, handlerErr(s.serveMachines))
m.Handle(v2peersPrefix, handlerErr(s.serveMachines))
m.Handle(v2LeaderPrefix, handlerErr(s.serveLeader))
m.Handle(v2StoreStatsPrefix, handlerErr(s.serveStoreStats))
m.Handle(v2adminConfigPrefix, handlerErr(s.serveAdminConfig))
m.Handle(v2adminMachinesPrefix, handlerErr(s.serveAdminMachines))
s.participantHandler = m
m = http.NewServeMux()
m.Handle("/", handlerErr(s.serveRedirect))
s.standbyHandler = m
return s
}
func (s *Server) SetTick(d time.Duration) {
s.tickDuration = d
}
func (s *Server) RaftHandler() http.Handler {
return s.rh
}
func (s *Server) Run() {
if len(s.config.Peers) == 0 {
s.Bootstrap()
} else {
s.Join()
}
func (s *Server) SetTick(tick time.Duration) {
s.tickDuration = tick
}
// Stop stops the server elegently.
func (s *Server) Stop() {
if s.mode == stop {
if s.mode.Get() == stopMode {
return
}
s.mode = stop
s.rh.stop()
s.client.CloseConnections()
s.peerHub.stop()
close(s.stop)
}
func (s *Server) Bootstrap() {
log.Println("starting a bootstrap node")
s.initParticipant()
s.node.Campaign()
s.node.Add(s.id, s.raftPubAddr, []byte(s.pubAddr))
s.apply(s.node.Next())
s.run()
}
func (s *Server) Join() {
log.Println("joining cluster via peers", s.config.Peers)
s.initParticipant()
info := &context{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
ClientURL: s.pubAddr,
PeerURL: s.raftPubAddr,
}
url := ""
for i := 0; i < 5; i++ {
for seed := range s.nodes {
if err := s.client.AddMachine(seed, fmt.Sprint(s.id), info); err == nil {
url = seed
break
} else {
log.Println(err)
}
}
if url != "" {
break
}
time.Sleep(100 * time.Millisecond)
}
s.nodes = map[string]bool{url: true}
s.run()
}
func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error {
p := path.Join(v2machineKVPrefix, fmt.Sprint(id))
_, err := s.Get(p, false, false)
if err == nil {
return nil
}
if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound {
return err
}
w, err := s.Watch(p, true, false, 0)
if err != nil {
log.Println("add error:", err)
return tmpErr
}
if s.mode != participant {
return raftStopErr
}
select {
case s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
default:
w.Remove()
log.Println("unable to send out addNode proposal")
return tmpErr
}
select {
case v := <-w.EventChan:
if v.Action == store.Set {
return nil
}
log.Println("add error: action =", v.Action)
return tmpErr
case <-time.After(6 * defaultHeartbeat * s.tickDuration):
w.Remove()
log.Println("add error: wait timeout")
return tmpErr
}
}
func (s *Server) Remove(id int64) error {
p := path.Join(v2machineKVPrefix, fmt.Sprint(id))
v, err := s.Get(p, false, false)
if err != nil {
return nil
}
if s.mode != participant {
return raftStopErr
}
select {
case s.removeNodeC <- raft.Config{NodeId: id}:
default:
log.Println("unable to send out removeNode proposal")
return tmpErr
}
// TODO(xiangli): do not need to watch if the
// removal target is self
w, err := s.Watch(p, true, false, v.Index()+1)
if err != nil {
log.Println("remove error:", err)
return tmpErr
}
select {
case v := <-w.EventChan:
if v.Action == store.Delete {
return nil
}
log.Println("remove error: action =", v.Action)
return tmpErr
case <-time.After(6 * defaultHeartbeat * s.tickDuration):
w.Remove()
log.Println("remove error: wait timeout")
return tmpErr
}
s.stopc <- struct{}{}
<-s.stopc
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch s.mode {
case participant:
s.participantHandler.ServeHTTP(w, r)
case standby:
s.standbyHandler.ServeHTTP(w, r)
case stop:
http.Error(w, "server is stopped", http.StatusInternalServerError)
switch s.mode.Get() {
case participantMode:
s.p.ServeHTTP(w, r)
case standbyMode:
s.s.ServeHTTP(w, r)
case stopMode:
http.NotFound(w, r)
}
}
func (s *Server) initParticipant() {
s.proposal = make(chan v2Proposal, maxBufferedProposal)
s.addNodeC = make(chan raft.Config, 1)
s.removeNodeC = make(chan raft.Config, 1)
s.rh.start()
s.mode = participant
func (s *Server) RaftHandler() http.Handler {
return http.HandlerFunc(s.ServeRaftHTTP)
}
func (s *Server) initStandby() {
s.leader = noneId
s.leaderAddr = ""
s.clusterConf = config.NewClusterConfig()
s.mode = standby
func (s *Server) ServeRaftHTTP(w http.ResponseWriter, r *http.Request) {
switch s.mode.Get() {
case participantMode:
s.p.raftHandler().ServeHTTP(w, r)
case standbyMode:
http.NotFound(w, r)
case stopMode:
http.NotFound(w, r)
}
}
func (s *Server) run() {
func (s *Server) Run() {
runc := make(chan struct{})
next := participantMode
for {
select {
case s.modeC <- s.mode:
default:
}
switch s.mode {
case participant:
s.runParticipant()
case standby:
s.runStandby()
case stop:
return
switch next {
case participantMode:
s.p = newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub, s.tickDuration)
s.mode.Set(participantMode)
// TODO: it may block here. remove modeC later.
s.modeC <- s.mode.Get()
next = standbyMode
go func() {
s.p.run()
runc <- struct{}{}
}()
case standbyMode:
s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub)
s.mode.Set(standbyMode)
s.modeC <- s.mode.Get()
next = participantMode
go func() {
s.s.run()
runc <- struct{}{}
}()
default:
panic("unsupport mode")
}
}
}
func (s *Server) runParticipant() {
defer func() {
s.node.StopProposalWaiters()
s.rh.stop()
}()
node := s.node
recv := s.rh.recv
ticker := time.NewTicker(s.tickDuration)
v2SyncTicker := time.NewTicker(time.Millisecond * 500)
var proposal chan v2Proposal
var addNodeC, removeNodeC chan raft.Config
for {
if node.HasLeader() {
proposal = s.proposal
addNodeC = s.addNodeC
removeNodeC = s.removeNodeC
} else {
proposal = nil
addNodeC = nil
removeNodeC = nil
}
select {
case p := <-proposal:
node.Propose(p)
case c := <-addNodeC:
node.UpdateConf(raft.AddNode, &c)
case c := <-removeNodeC:
node.UpdateConf(raft.RemoveNode, &c)
case msg := <-recv:
node.Step(*msg)
case <-ticker.C:
node.Tick()
case <-v2SyncTicker.C:
node.Sync()
case <-s.stop:
log.Printf("Node: %d stopped\n", s.id)
return
}
s.apply(node.Next())
s.send(node.Msgs())
if node.IsRemoved() {
log.Printf("Node: %d removed to standby mode\n", s.id)
s.initStandby()
case <-runc:
case <-s.stopc:
switch s.mode.Get() {
case participantMode:
s.p.stop()
case standbyMode:
s.s.stop()
}
<-runc
s.mode.Set(stopMode)
s.modeC <- s.mode.Get()
s.client.CloseConnections()
s.peerHub.stop()
s.stopc <- struct{}{}
return
}
}
}
func (s *Server) runStandby() {
var syncDuration time.Duration
for {
select {
case <-time.After(syncDuration):
case <-s.stop:
log.Printf("Node: %d stopped\n", s.id)
return
}
if err := s.syncCluster(); err != nil {
log.Println("standby sync:", err)
continue
}
syncDuration = time.Duration(s.clusterConf.SyncInterval * float64(time.Second))
if s.clusterConf.ActiveSize <= len(s.nodes) {
continue
}
if err := s.joinByPeer(s.leaderAddr); err != nil {
log.Println("standby join:", err)
continue
}
log.Printf("Node: %d removed to participant mode\n", s.id)
// TODO(yichengq): use old v2Raft
// 1. reject proposal in leader state when sm is removed
// 2. record removeIndex in node to ignore msgDenial and old removal
s.node = &v2Raft{
Node: raft.New(s.id, defaultHeartbeat, defaultElection),
result: make(map[wait]chan interface{}),
}
s.Store = store.New()
s.initParticipant()
return
}
}
func (s *Server) apply(ents []raft.Entry) {
offset := s.node.Applied() - int64(len(ents)) + 1
for i, ent := range ents {
switch ent.Type {
// expose raft entry type
case raft.Normal:
if len(ent.Data) == 0 {
continue
}
s.v2apply(offset+int64(i), ent)
case raft.AddNode:
cfg := new(raft.Config)
if err := json.Unmarshal(ent.Data, cfg); err != nil {
log.Println(err)
break
}
peer, err := s.peerHub.add(cfg.NodeId, cfg.Addr)
if err != nil {
log.Println(err)
break
}
peer.participate()
log.Printf("Add Node %x %v %v\n", cfg.NodeId, cfg.Addr, string(cfg.Context))
p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
if _, err := s.Store.Set(p, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent); err == nil {
s.nodes[cfg.Addr] = true
}
case raft.RemoveNode:
cfg := new(raft.Config)
if err := json.Unmarshal(ent.Data, cfg); err != nil {
log.Println(err)
break
}
log.Printf("Remove Node %x\n", cfg.NodeId)
delete(s.nodes, s.fetchAddrFromStore(cfg.NodeId))
peer, err := s.peerHub.peer(cfg.NodeId)
if err != nil {
log.Fatal("cannot get the added peer:", err)
}
peer.idle()
p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
s.Store.Delete(p, false, false)
default:
panic("unimplemented")
}
}
}
func (s *Server) send(msgs []raft.Message) {
for i := range msgs {
if err := s.peerHub.send(msgs[i]); err != nil {
log.Println("send:", err)
}
}
}
func (s *Server) fetchAddrFromStore(nodeId int64) string {
p := path.Join(v2machineKVPrefix, fmt.Sprint(nodeId))
if ev, err := s.Get(p, false, false); err == nil {
if m, err := url.ParseQuery(*ev.Node.Value); err == nil {
return m["raft"][0]
}
}
return ""
}
func (s *Server) joinByPeer(addr string) error {
info := &context{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
ClientURL: s.pubAddr,
PeerURL: s.raftPubAddr,
}
if err := s.client.AddMachine(s.leaderAddr, fmt.Sprint(s.id), info); err != nil {
return err
}
return nil
}

View File

@ -17,14 +17,14 @@ func TestKillLeader(t *testing.T) {
waitCluster(t, es)
waitLeader(es)
lead := es[0].node.Leader()
lead := es[0].p.node.Leader()
es[lead].Stop()
time.Sleep(es[0].tickDuration * defaultElection * 2)
waitLeader(es)
if es[1].node.Leader() == 0 {
t.Errorf("#%d: lead = %d, want not 0", i, es[1].node.Leader())
if es[1].p.node.Leader() == 0 {
t.Errorf("#%d: lead = %d, want not 0", i, es[1].p.node.Leader())
}
for i := range es {
@ -81,7 +81,7 @@ func TestJoinThroughFollower(t *testing.T) {
es[i], hs[i] = initTestServer(c, int64(i), false)
}
go es[0].Bootstrap()
go es[0].Run()
for i := 1; i < tt; i++ {
go es[i].Run()
@ -106,7 +106,7 @@ type leadterm struct {
func waitActiveLeader(es []*Server) (lead, term int64) {
for {
if l, t := waitLeader(es); l >= 0 && es[l].mode == participant {
if l, t := waitLeader(es); l >= 0 && es[l].mode.Get() == participantMode {
return l, t
}
}
@ -118,12 +118,12 @@ func waitLeader(es []*Server) (lead, term int64) {
for {
ls := make([]leadterm, 0, len(es))
for i := range es {
switch es[i].mode {
case participant:
switch es[i].mode.Get() {
case participantMode:
ls = append(ls, getLead(es[i]))
case standby:
case standbyMode:
//TODO(xiangli) add standby support
case stop:
case stopMode:
}
}
if isSameLead(ls) {
@ -134,7 +134,7 @@ func waitLeader(es []*Server) (lead, term int64) {
}
func getLead(s *Server) leadterm {
return leadterm{s.node.Leader(), s.node.Term()}
return leadterm{s.p.node.Leader(), s.p.node.Term()}
}
func isSameLead(ls []leadterm) bool {

View File

@ -91,18 +91,19 @@ func TestAdd(t *testing.T) {
es[i], hs[i] = initTestServer(c, int64(i), false)
}
go es[0].Bootstrap()
go es[0].Run()
<-es[0].modeC
for i := 1; i < tt; i++ {
id := int64(i)
for {
lead := es[0].node.Leader()
lead := es[0].p.node.Leader()
if lead == -1 {
time.Sleep(defaultElection * es[0].tickDuration)
continue
}
err := es[lead].Add(id, es[id].raftPubAddr, es[id].pubAddr)
err := es[lead].p.add(id, es[id].raftPubAddr, es[id].pubAddr)
if err == nil {
break
}
@ -115,12 +116,12 @@ func TestAdd(t *testing.T) {
t.Fatal(err)
}
}
es[i].initParticipant()
go es[i].run()
go es[i].Run()
<-es[i].modeC
for j := 0; j <= i; j++ {
p := fmt.Sprintf("%s/%d", v2machineKVPrefix, id)
w, err := es[j].Watch(p, false, false, 1)
w, err := es[j].p.Watch(p, false, false, 1)
if err != nil {
t.Errorf("#%d on %d: %v", i, j, err)
break
@ -149,7 +150,7 @@ func TestRemove(t *testing.T) {
lead, _ := waitLeader(es)
config := config.NewClusterConfig()
config.ActiveSize = 0
if err := es[lead].setClusterConfig(config); err != nil {
if err := es[lead].p.setClusterConfig(config); err != nil {
t.Fatalf("#%d: setClusterConfig err = %v", k, err)
}
@ -157,8 +158,6 @@ func TestRemove(t *testing.T) {
// not 100 percent safe in our raft.
// TODO(yichengq): improve it later.
for i := 0; i < tt-2; i++ {
<-es[i].modeC
id := int64(i)
send := id
for {
@ -167,13 +166,13 @@ func TestRemove(t *testing.T) {
send = id
}
lead := es[send].node.Leader()
lead := es[send].p.node.Leader()
if lead == -1 {
time.Sleep(defaultElection * 5 * time.Millisecond)
continue
}
err := es[lead].Remove(id)
err := es[lead].p.remove(id)
if err == nil {
break
}
@ -190,7 +189,7 @@ func TestRemove(t *testing.T) {
}
if g := <-es[i].modeC; g != standby {
if g := <-es[i].modeC; g != standbyMode {
t.Errorf("#%d on %d: mode = %d, want standby", k, i, g)
}
}
@ -223,22 +222,18 @@ func TestBecomeStandby(t *testing.T) {
}
id := int64(i)
if g := <-es[i].modeC; g != participant {
t.Fatalf("#%d: mode = %d, want participant", i, g)
}
config := config.NewClusterConfig()
config.SyncInterval = 1000
config.ActiveSize = size - 1
if err := es[lead].setClusterConfig(config); err != nil {
if err := es[lead].p.setClusterConfig(config); err != nil {
t.Fatalf("#%d: setClusterConfig err = %v", i, err)
}
if err := es[lead].Remove(id); err != nil {
if err := es[lead].p.remove(id); err != nil {
t.Fatalf("#%d: remove err = %v", i, err)
}
if g := <-es[i].modeC; g != standby {
if g := <-es[i].modeC; g != standbyMode {
t.Fatalf("#%d: mode = %d, want standby", i, g)
}
if g := len(es[i].modeC); g != 0 {
@ -246,12 +241,12 @@ func TestBecomeStandby(t *testing.T) {
}
for k := 0; k < 4; k++ {
if es[i].leader != noneId {
if es[i].s.leader != noneId {
break
}
time.Sleep(20 * time.Millisecond)
}
if g := es[i].leader; g != lead {
if g := es[i].s.leader; g != lead {
t.Errorf("#%d: lead = %d, want %d", i, g, lead)
}
@ -279,7 +274,7 @@ func TestModeSwitch(t *testing.T) {
es, hs := buildCluster(size, false)
waitCluster(t, es)
if g := <-es[i].modeC; g != participant {
if g := <-es[i].modeC; g != participantMode {
t.Fatalf("#%d: mode = %d, want participant", i, g)
}
@ -294,14 +289,14 @@ func TestModeSwitch(t *testing.T) {
}
config.ActiveSize = size - 1
if err := es[lead].setClusterConfig(config); err != nil {
if err := es[lead].p.setClusterConfig(config); err != nil {
t.Fatalf("#%d: setClusterConfig err = %v", i, err)
}
if err := es[lead].Remove(id); err != nil {
if err := es[lead].p.remove(id); err != nil {
t.Fatalf("#%d: remove err = %v", i, err)
}
if g := <-es[i].modeC; g != standby {
if g := <-es[i].modeC; g != standbyMode {
t.Fatalf("#%d: mode = %d, want standby", i, g)
}
if g := len(es[i].modeC); g != 0 {
@ -309,21 +304,21 @@ func TestModeSwitch(t *testing.T) {
}
for k := 0; k < 4; k++ {
if es[i].leader != noneId {
if es[i].s.leader != noneId {
break
}
time.Sleep(20 * time.Millisecond)
}
if g := es[i].leader; g != lead {
if g := es[i].s.leader; g != lead {
t.Errorf("#%d: lead = %d, want %d", i, g, lead)
}
config.ActiveSize = size
if err := es[lead].setClusterConfig(config); err != nil {
if err := es[lead].p.setClusterConfig(config); err != nil {
t.Fatalf("#%d: setClusterConfig err = %v", i, err)
}
if g := <-es[i].modeC; g != participant {
if g := <-es[i].modeC; g != participantMode {
t.Fatalf("#%d: mode = %d, want participant", i, g)
}
if g := len(es[i].modeC); g != 0 {
@ -364,17 +359,17 @@ func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {
if i == bootstrapper {
seed = hs[i].URL
go es[i].Bootstrap()
} else {
// wait for the previous configuration change to be committed
// or this configuration request might be dropped
w, err := es[0].Watch(v2machineKVPrefix, true, false, uint64(i))
w, err := es[0].p.Watch(v2machineKVPrefix, true, false, uint64(i))
if err != nil {
panic(err)
}
<-w.EventChan
go es[i].Join()
}
go es[i].Run()
<-es[i].modeC
}
return es, hs
}
@ -404,7 +399,7 @@ func waitCluster(t *testing.T, es []*Server) {
var index uint64
for k := 0; k < n; k++ {
index++
w, err := e.Watch(v2machineKVPrefix, true, false, index)
w, err := e.p.Watch(v2machineKVPrefix, true, false, index)
if err != nil {
panic(err)
}
@ -429,12 +424,12 @@ func waitCluster(t *testing.T, es []*Server) {
func checkParticipant(i int, es []*Server) error {
lead, _ := waitActiveLeader(es)
key := fmt.Sprintf("/%d", rand.Int31())
ev, err := es[lead].Set(key, false, "bar", store.Permanent)
ev, err := es[lead].p.Set(key, false, "bar", store.Permanent)
if err != nil {
return err
}
w, err := es[i].Watch(key, false, false, ev.Index())
w, err := es[i].p.Watch(key, false, false, ev.Index())
if err != nil {
return err
}

328
etcd/participant.go Normal file
View File

@ -0,0 +1,328 @@
package etcd
import (
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"path"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/store"
)
const (
defaultHeartbeat = 1
defaultElection = 5
maxBufferedProposal = 128
defaultTickDuration = time.Millisecond * 100
v2machineKVPrefix = "/_etcd/machines"
v2configKVPrefix = "/_etcd/config"
v2Prefix = "/v2/keys"
v2machinePrefix = "/v2/machines"
v2peersPrefix = "/v2/peers"
v2LeaderPrefix = "/v2/leader"
v2StoreStatsPrefix = "/v2/stats/store"
v2adminConfigPrefix = "/v2/admin/config"
v2adminMachinesPrefix = "/v2/admin/machines/"
)
var (
tmpErr = fmt.Errorf("try again")
raftStopErr = fmt.Errorf("raft is stopped")
noneId int64 = -1
)
type participant struct {
id int64
pubAddr string
raftPubAddr string
seeds map[string]bool
tickDuration time.Duration
client *v2client
peerHub *peerHub
proposal chan v2Proposal
addNodeC chan raft.Config
removeNodeC chan raft.Config
node *v2Raft
store.Store
rh *raftHandler
stopc chan struct{}
*http.ServeMux
}
func newParticipant(id int64, pubAddr string, raftPubAddr string, seeds map[string]bool, client *v2client, peerHub *peerHub, tickDuration time.Duration) *participant {
p := &participant{
id: id,
pubAddr: pubAddr,
raftPubAddr: raftPubAddr,
seeds: seeds,
tickDuration: tickDuration,
client: client,
peerHub: peerHub,
proposal: make(chan v2Proposal, maxBufferedProposal),
addNodeC: make(chan raft.Config, 1),
removeNodeC: make(chan raft.Config, 1),
node: &v2Raft{
Node: raft.New(id, defaultHeartbeat, defaultElection),
result: make(map[wait]chan interface{}),
},
Store: store.New(),
rh: newRaftHandler(peerHub),
stopc: make(chan struct{}),
ServeMux: http.NewServeMux(),
}
p.Handle(v2Prefix+"/", handlerErr(p.serveValue))
p.Handle(v2machinePrefix, handlerErr(p.serveMachines))
p.Handle(v2peersPrefix, handlerErr(p.serveMachines))
p.Handle(v2LeaderPrefix, handlerErr(p.serveLeader))
p.Handle(v2StoreStatsPrefix, handlerErr(p.serveStoreStats))
p.Handle(v2adminConfigPrefix, handlerErr(p.serveAdminConfig))
p.Handle(v2adminMachinesPrefix, handlerErr(p.serveAdminMachines))
return p
}
func (p *participant) run() {
if len(p.seeds) == 0 {
log.Println("starting a bootstrap node")
p.node.Campaign()
p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr))
p.apply(p.node.Next())
} else {
log.Println("joining cluster via peers", p.seeds)
p.join()
}
p.rh.start()
defer p.rh.stop()
node := p.node
defer node.StopProposalWaiters()
recv := p.rh.recv
ticker := time.NewTicker(p.tickDuration)
v2SyncTicker := time.NewTicker(time.Millisecond * 500)
var proposal chan v2Proposal
var addNodeC, removeNodeC chan raft.Config
for {
if node.HasLeader() {
proposal = p.proposal
addNodeC = p.addNodeC
removeNodeC = p.removeNodeC
} else {
proposal = nil
addNodeC = nil
removeNodeC = nil
}
select {
case p := <-proposal:
node.Propose(p)
case c := <-addNodeC:
node.UpdateConf(raft.AddNode, &c)
case c := <-removeNodeC:
node.UpdateConf(raft.RemoveNode, &c)
case msg := <-recv:
node.Step(*msg)
case <-ticker.C:
node.Tick()
case <-v2SyncTicker.C:
node.Sync()
case <-p.stopc:
log.Printf("Participant %d stopped\n", p.id)
return
}
p.apply(node.Next())
p.send(node.Msgs())
if node.IsRemoved() {
log.Printf("Participant %d return\n", p.id)
return
}
}
}
func (p *participant) stop() {
close(p.stopc)
}
func (p *participant) raftHandler() http.Handler {
return p.rh
}
func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
_, err := p.Get(pp, false, false)
if err == nil {
return nil
}
if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound {
return err
}
w, err := p.Watch(pp, true, false, 0)
if err != nil {
log.Println("add error:", err)
return tmpErr
}
select {
case p.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
default:
w.Remove()
log.Println("unable to send out addNode proposal")
return tmpErr
}
select {
case v := <-w.EventChan:
if v.Action == store.Set {
return nil
}
log.Println("add error: action =", v.Action)
return tmpErr
case <-time.After(6 * defaultHeartbeat * p.tickDuration):
w.Remove()
log.Println("add error: wait timeout")
return tmpErr
}
}
func (p *participant) remove(id int64) error {
pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
v, err := p.Get(pp, false, false)
if err != nil {
return nil
}
select {
case p.removeNodeC <- raft.Config{NodeId: id}:
default:
log.Println("unable to send out removeNode proposal")
return tmpErr
}
// TODO(xiangli): do not need to watch if the
// removal target is self
w, err := p.Watch(pp, true, false, v.Index()+1)
if err != nil {
log.Println("remove error:", err)
return tmpErr
}
select {
case v := <-w.EventChan:
if v.Action == store.Delete {
return nil
}
log.Println("remove error: action =", v.Action)
return tmpErr
case <-time.After(6 * defaultHeartbeat * p.tickDuration):
w.Remove()
log.Println("remove error: wait timeout")
return tmpErr
}
}
func (p *participant) apply(ents []raft.Entry) {
offset := p.node.Applied() - int64(len(ents)) + 1
for i, ent := range ents {
switch ent.Type {
// expose raft entry type
case raft.Normal:
if len(ent.Data) == 0 {
continue
}
p.v2apply(offset+int64(i), ent)
case raft.AddNode:
cfg := new(raft.Config)
if err := json.Unmarshal(ent.Data, cfg); err != nil {
log.Println(err)
break
}
peer, err := p.peerHub.add(cfg.NodeId, cfg.Addr)
if err != nil {
log.Println(err)
break
}
peer.participate()
log.Printf("Add Node %x %v %v\n", cfg.NodeId, cfg.Addr, string(cfg.Context))
pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
if _, err := p.Store.Set(pp, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent); err == nil {
p.seeds[cfg.Addr] = true
}
case raft.RemoveNode:
cfg := new(raft.Config)
if err := json.Unmarshal(ent.Data, cfg); err != nil {
log.Println(err)
break
}
log.Printf("Remove Node %x\n", cfg.NodeId)
delete(p.seeds, p.fetchAddrFromStore(cfg.NodeId))
peer, err := p.peerHub.peer(cfg.NodeId)
if err != nil {
log.Fatal("cannot get the added peer:", err)
}
peer.idle()
pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
p.Store.Delete(pp, false, false)
default:
panic("unimplemented")
}
}
}
func (p *participant) send(msgs []raft.Message) {
for i := range msgs {
if err := p.peerHub.send(msgs[i]); err != nil {
log.Println("send:", err)
}
}
}
func (p *participant) fetchAddrFromStore(nodeId int64) string {
pp := path.Join(v2machineKVPrefix, fmt.Sprint(nodeId))
if ev, err := p.Get(pp, false, false); err == nil {
if m, err := url.ParseQuery(*ev.Node.Value); err == nil {
return m["raft"][0]
}
}
return ""
}
func (p *participant) join() {
info := &context{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
ClientURL: p.pubAddr,
PeerURL: p.raftPubAddr,
}
for i := 0; i < 5; i++ {
for seed := range p.seeds {
if err := p.client.AddMachine(seed, fmt.Sprint(p.id), info); err == nil {
return
} else {
log.Println(err)
}
}
time.Sleep(100 * time.Millisecond)
}
}

View File

@ -14,9 +14,9 @@ const (
)
const (
// participant is defined in etcd.go
idle = iota + 1
stopped
participantPeer = iota
idlePeer
stoppedPeer
)
type peer struct {
@ -32,7 +32,7 @@ type peer struct {
func newPeer(url string, c *http.Client) *peer {
return &peer{
url: url,
status: idle,
status: idlePeer,
c: c,
}
}
@ -41,7 +41,7 @@ func (p *peer) participate() {
p.mu.Lock()
defer p.mu.Unlock()
p.queue = make(chan []byte)
p.status = participant
p.status = participantPeer
for i := 0; i < maxInflight; i++ {
p.wg.Add(1)
go p.handle(p.queue)
@ -51,18 +51,18 @@ func (p *peer) participate() {
func (p *peer) idle() {
p.mu.Lock()
defer p.mu.Unlock()
if p.status == participant {
if p.status == participantPeer {
close(p.queue)
}
p.status = idle
p.status = idlePeer
}
func (p *peer) stop() {
p.mu.Lock()
if p.status == participant {
if p.status == participantPeer {
close(p.queue)
}
p.status = stopped
p.status = stoppedPeer
p.mu.Unlock()
p.wg.Wait()
}
@ -79,13 +79,13 @@ func (p *peer) send(d []byte) error {
defer p.mu.Unlock()
switch p.status {
case participant:
case participantPeer:
select {
case p.queue <- d:
default:
return fmt.Errorf("reach max serving")
}
case idle:
case idlePeer:
if p.inflight.Get() > maxInflight {
return fmt.Errorf("reach max idle")
}
@ -94,7 +94,7 @@ func (p *peer) send(d []byte) error {
p.post(d)
p.wg.Done()
}()
case stopped:
case stoppedPeer:
return fmt.Errorf("sender stopped")
}
return nil
@ -122,3 +122,7 @@ func (i *atomicInt) Add(d int64) {
func (i *atomicInt) Get() int64 {
return atomic.LoadInt64((*int64)(i))
}
func (i *atomicInt) Set(n int64) {
atomic.StoreInt64((*int64)(i), n)
}

137
etcd/standby.go Normal file
View File

@ -0,0 +1,137 @@
package etcd
import (
"fmt"
"log"
"net/http"
"strconv"
"time"
"github.com/coreos/etcd/config"
"github.com/coreos/etcd/store"
)
type standby struct {
id int64
pubAddr string
raftPubAddr string
client *v2client
peerHub *peerHub
nodes map[string]bool
leader int64
leaderAddr string
clusterConf *config.ClusterConfig
stopc chan struct{}
*http.ServeMux
}
func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]bool, client *v2client, peerHub *peerHub) *standby {
s := &standby{
id: id,
pubAddr: pubAddr,
raftPubAddr: raftPubAddr,
client: client,
peerHub: peerHub,
nodes: nodes,
leader: noneId,
leaderAddr: "",
clusterConf: config.NewClusterConfig(),
stopc: make(chan struct{}),
ServeMux: http.NewServeMux(),
}
s.Handle("/", handlerErr(s.serveRedirect))
return s
}
func (s *standby) run() {
var syncDuration time.Duration
for {
select {
case <-time.After(syncDuration):
case <-s.stopc:
log.Printf("Standby %d stopped\n", s.id)
return
}
if err := s.syncCluster(); err != nil {
log.Println("standby sync:", err)
continue
}
syncDuration = time.Duration(s.clusterConf.SyncInterval * float64(time.Second))
if s.clusterConf.ActiveSize <= len(s.nodes) {
continue
}
if err := s.joinByAddr(s.leaderAddr); err != nil {
log.Println("standby join:", err)
continue
}
return
}
}
func (s *standby) stop() {
close(s.stopc)
}
func (s *standby) serveRedirect(w http.ResponseWriter, r *http.Request) error {
if s.leader == noneId {
return fmt.Errorf("no leader in the cluster")
}
redirectAddr, err := buildRedirectURL(s.leaderAddr, r.URL)
if err != nil {
return err
}
http.Redirect(w, r, redirectAddr, http.StatusTemporaryRedirect)
return nil
}
func (s *standby) syncCluster() error {
for node := range s.nodes {
machines, err := s.client.GetMachines(node)
if err != nil {
continue
}
config, err := s.client.GetClusterConfig(node)
if err != nil {
continue
}
s.nodes = make(map[string]bool)
for _, machine := range machines {
s.nodes[machine.PeerURL] = true
if machine.State == stateLeader {
id, err := strconv.ParseInt(machine.Name, 0, 64)
if err != nil {
return err
}
s.leader = id
s.leaderAddr = machine.PeerURL
}
}
s.clusterConf = config
return nil
}
return fmt.Errorf("unreachable cluster")
}
func (s *standby) joinByAddr(addr string) error {
info := &context{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
ClientURL: s.pubAddr,
PeerURL: s.raftPubAddr,
}
if err := s.client.AddMachine(s.leaderAddr, fmt.Sprint(s.id), info); err != nil {
return err
}
return nil
}

View File

@ -34,19 +34,19 @@ type context struct {
PeerURL string `json:"peerURL"`
}
func (s *Server) serveAdminConfig(w http.ResponseWriter, r *http.Request) error {
func (p *participant) serveAdminConfig(w http.ResponseWriter, r *http.Request) error {
switch r.Method {
case "GET":
case "PUT":
if !s.node.IsLeader() {
return s.redirect(w, r, s.node.Leader())
if !p.node.IsLeader() {
return p.redirect(w, r, p.node.Leader())
}
c := s.ClusterConfig()
c := p.clusterConfig()
if err := json.NewDecoder(r.Body).Decode(c); err != nil {
return err
}
c.Sanitize()
if err := s.setClusterConfig(c); err != nil {
if err := p.setClusterConfig(c); err != nil {
return err
}
default:
@ -54,20 +54,20 @@ func (s *Server) serveAdminConfig(w http.ResponseWriter, r *http.Request) error
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(s.ClusterConfig())
json.NewEncoder(w).Encode(p.clusterConfig())
return nil
}
func (s *Server) serveAdminMachines(w http.ResponseWriter, r *http.Request) error {
func (p *participant) serveAdminMachines(w http.ResponseWriter, r *http.Request) error {
name := strings.TrimPrefix(r.URL.Path, v2adminMachinesPrefix)
switch r.Method {
case "GET":
var info interface{}
var err error
if name != "" {
info, err = s.someMachineMessage(name)
info, err = p.someMachineMessage(name)
} else {
info, err = s.allMachineMessages()
info, err = p.allMachineMessages()
}
if err != nil {
return err
@ -75,8 +75,8 @@ func (s *Server) serveAdminMachines(w http.ResponseWriter, r *http.Request) erro
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(info)
case "PUT":
if !s.node.IsLeader() {
return s.redirect(w, r, s.node.Leader())
if !p.node.IsLeader() {
return p.redirect(w, r, p.node.Leader())
}
id, err := strconv.ParseInt(name, 0, 64)
if err != nil {
@ -86,60 +86,60 @@ func (s *Server) serveAdminMachines(w http.ResponseWriter, r *http.Request) erro
if err := json.NewDecoder(r.Body).Decode(info); err != nil {
return err
}
return s.Add(id, info.PeerURL, info.ClientURL)
return p.add(id, info.PeerURL, info.ClientURL)
case "DELETE":
if !s.node.IsLeader() {
return s.redirect(w, r, s.node.Leader())
if !p.node.IsLeader() {
return p.redirect(w, r, p.node.Leader())
}
id, err := strconv.ParseInt(name, 0, 64)
if err != nil {
return err
}
return s.Remove(id)
return p.remove(id)
default:
return allow(w, "GET", "PUT", "DELETE")
}
return nil
}
func (s *Server) ClusterConfig() *config.ClusterConfig {
func (p *participant) clusterConfig() *config.ClusterConfig {
c := config.NewClusterConfig()
// This is used for backward compatibility because it doesn't
// set cluster config in older version.
if e, err := s.Get(v2configKVPrefix, false, false); err == nil {
if e, err := p.Get(v2configKVPrefix, false, false); err == nil {
json.Unmarshal([]byte(*e.Node.Value), c)
}
return c
}
func (s *Server) setClusterConfig(c *config.ClusterConfig) error {
func (p *participant) setClusterConfig(c *config.ClusterConfig) error {
b, err := json.Marshal(c)
if err != nil {
return err
}
if _, err := s.Set(v2configKVPrefix, false, string(b), store.Permanent); err != nil {
if _, err := p.Set(v2configKVPrefix, false, string(b), store.Permanent); err != nil {
return err
}
return nil
}
// someMachineMessage return machine message of specified name.
func (s *Server) someMachineMessage(name string) (*machineMessage, error) {
p := filepath.Join(v2machineKVPrefix, name)
e, err := s.Get(p, false, false)
func (p *participant) someMachineMessage(name string) (*machineMessage, error) {
pp := filepath.Join(v2machineKVPrefix, name)
e, err := p.Get(pp, false, false)
if err != nil {
return nil, err
}
lead := fmt.Sprint(s.node.Leader())
lead := fmt.Sprint(p.node.Leader())
return newMachineMessage(e.Node, lead), nil
}
func (s *Server) allMachineMessages() ([]*machineMessage, error) {
e, err := s.Get(v2machineKVPrefix, false, false)
func (p *participant) allMachineMessages() ([]*machineMessage, error) {
e, err := p.Get(v2machineKVPrefix, false, false)
if err != nil {
return nil, err
}
lead := fmt.Sprint(s.node.Leader())
lead := fmt.Sprint(p.node.Leader())
ms := make([]*machineMessage, len(e.Node.Nodes))
for i, n := range e.Node.Nodes {
ms[i] = newMachineMessage(n, lead)

View File

@ -9,7 +9,7 @@ import (
"github.com/coreos/etcd/store"
)
func (s *Server) v2apply(index int64, ent raft.Entry) {
func (p *participant) v2apply(index int64, ent raft.Entry) {
var ret interface{}
var e *store.Event
var err error
@ -22,36 +22,36 @@ func (s *Server) v2apply(index int64, ent raft.Entry) {
switch cmd.Type {
case "set":
e, err = s.Store.Set(cmd.Key, cmd.Dir, cmd.Value, cmd.Time)
e, err = p.Store.Set(cmd.Key, cmd.Dir, cmd.Value, cmd.Time)
case "update":
e, err = s.Store.Update(cmd.Key, cmd.Value, cmd.Time)
e, err = p.Store.Update(cmd.Key, cmd.Value, cmd.Time)
case "create", "unique":
e, err = s.Store.Create(cmd.Key, cmd.Dir, cmd.Value, cmd.Unique, cmd.Time)
e, err = p.Store.Create(cmd.Key, cmd.Dir, cmd.Value, cmd.Unique, cmd.Time)
case "delete":
e, err = s.Store.Delete(cmd.Key, cmd.Dir, cmd.Recursive)
e, err = p.Store.Delete(cmd.Key, cmd.Dir, cmd.Recursive)
case "cad":
e, err = s.Store.CompareAndDelete(cmd.Key, cmd.PrevValue, cmd.PrevIndex)
e, err = p.Store.CompareAndDelete(cmd.Key, cmd.PrevValue, cmd.PrevIndex)
case "cas":
e, err = s.Store.CompareAndSwap(cmd.Key, cmd.PrevValue, cmd.PrevIndex, cmd.Value, cmd.Time)
e, err = p.Store.CompareAndSwap(cmd.Key, cmd.PrevValue, cmd.PrevIndex, cmd.Value, cmd.Time)
case "sync":
s.Store.DeleteExpiredKeys(cmd.Time)
p.Store.DeleteExpiredKeys(cmd.Time)
return
default:
log.Println("unexpected command type:", cmd.Type)
}
if ent.Term > s.node.term {
s.node.term = ent.Term
for k, v := range s.node.result {
if k.term < s.node.term {
if ent.Term > p.node.term {
p.node.term = ent.Term
for k, v := range p.node.result {
if k.term < p.node.term {
v <- fmt.Errorf("proposal lost due to leader election")
delete(s.node.result, k)
delete(p.node.result, k)
}
}
}
w := wait{index, ent.Term}
if s.node.result[w] == nil {
if p.node.result[w] == nil {
return
}
@ -60,6 +60,6 @@ func (s *Server) v2apply(index int64, ent raft.Entry) {
} else {
ret = e
}
s.node.result[w] <- ret
delete(s.node.result, w)
p.node.result[w] <- ret
delete(p.node.result, w)
}

View File

@ -10,28 +10,28 @@ import (
etcdErr "github.com/coreos/etcd/error"
)
func (s *Server) serveValue(w http.ResponseWriter, r *http.Request) error {
func (p *participant) serveValue(w http.ResponseWriter, r *http.Request) error {
switch r.Method {
case "GET":
return s.GetHandler(w, r)
return p.GetHandler(w, r)
case "HEAD":
w = &HEADResponseWriter{w}
return s.GetHandler(w, r)
return p.GetHandler(w, r)
case "PUT":
return s.PutHandler(w, r)
return p.PutHandler(w, r)
case "POST":
return s.PostHandler(w, r)
return p.PostHandler(w, r)
case "DELETE":
return s.DeleteHandler(w, r)
return p.DeleteHandler(w, r)
}
return allow(w, "GET", "PUT", "POST", "DELETE", "HEAD")
}
func (s *Server) serveMachines(w http.ResponseWriter, r *http.Request) error {
func (p *participant) serveMachines(w http.ResponseWriter, r *http.Request) error {
if r.Method != "GET" {
return allow(w, "GET")
}
v, err := s.Store.Get(v2machineKVPrefix, false, false)
v, err := p.Store.Get(v2machineKVPrefix, false, false)
if err != nil {
panic(err)
}
@ -47,20 +47,20 @@ func (s *Server) serveMachines(w http.ResponseWriter, r *http.Request) error {
return nil
}
func (s *Server) serveLeader(w http.ResponseWriter, r *http.Request) error {
func (p *participant) serveLeader(w http.ResponseWriter, r *http.Request) error {
if r.Method != "GET" {
return allow(w, "GET")
}
if p, ok := s.peerHub.peers[s.node.Leader()]; ok {
if p, ok := p.peerHub.peers[p.node.Leader()]; ok {
w.Write([]byte(p.url))
return nil
}
return fmt.Errorf("no leader")
}
func (s *Server) serveStoreStats(w http.ResponseWriter, req *http.Request) error {
func (p *participant) serveStoreStats(w http.ResponseWriter, req *http.Request) error {
w.Header().Set("Content-Type", "application/json")
w.Write(s.Store.JsonStats())
w.Write(p.Store.JsonStats())
return nil
}
@ -99,8 +99,8 @@ func (w *HEADResponseWriter) Write([]byte) (int, error) {
return 0, nil
}
func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int64) error {
e, err := s.Store.Get(fmt.Sprintf("%v/%d", v2machineKVPrefix, s.node.Leader()), false, false)
func (p *participant) redirect(w http.ResponseWriter, r *http.Request, id int64) error {
e, err := p.Store.Get(fmt.Sprintf("%v/%d", v2machineKVPrefix, p.node.Leader()), false, false)
if err != nil {
log.Println("redirect cannot find node", id)
return fmt.Errorf("redirect cannot find node %d", id)
@ -111,7 +111,7 @@ func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int64) erro
return fmt.Errorf("failed to parse node entry: %s", *e.Node.Value)
}
redirectAddr, err := s.buildRedirectURL(m["etcd"][0], r.URL)
redirectAddr, err := buildRedirectURL(m["etcd"][0], r.URL)
if err != nil {
log.Println("redirect cannot build new url:", err)
return err
@ -121,7 +121,7 @@ func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int64) erro
return nil
}
func (s *Server) buildRedirectURL(redirectAddr string, originalURL *url.URL) (string, error) {
func buildRedirectURL(redirectAddr string, originalURL *url.URL) (string, error) {
redirectURL, err := url.Parse(redirectAddr)
if err != nil {
return "", fmt.Errorf("redirect cannot parse url: %v", err)

View File

@ -8,9 +8,9 @@ import (
etcdErr "github.com/coreos/etcd/error"
)
func (s *Server) DeleteHandler(w http.ResponseWriter, req *http.Request) error {
if !s.node.IsLeader() {
return s.redirect(w, req, s.node.Leader())
func (p *participant) DeleteHandler(w http.ResponseWriter, req *http.Request) error {
if !p.node.IsLeader() {
return p.redirect(w, req, p.node.Leader())
}
key := req.URL.Path[len("/v2/keys"):]
@ -23,7 +23,7 @@ func (s *Server) DeleteHandler(w http.ResponseWriter, req *http.Request) error {
_, indexOk := req.Form["prevIndex"]
if !valueOk && !indexOk {
return s.serveDelete(w, req, key, dir, recursive)
return p.serveDelete(w, req, key, dir, recursive)
}
var err error
@ -36,32 +36,32 @@ func (s *Server) DeleteHandler(w http.ResponseWriter, req *http.Request) error {
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndDelete", s.Store.Index())
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndDelete", p.Store.Index())
}
}
if valueOk {
if prevValue == "" {
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndDelete", s.Store.Index())
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndDelete", p.Store.Index())
}
}
return s.serveCAD(w, req, key, prevValue, prevIndex)
return p.serveCAD(w, req, key, prevValue, prevIndex)
}
func (s *Server) serveDelete(w http.ResponseWriter, req *http.Request, key string, dir, recursive bool) error {
ret, err := s.Delete(key, dir, recursive)
func (p *participant) serveDelete(w http.ResponseWriter, req *http.Request, key string, dir, recursive bool) error {
ret, err := p.Delete(key, dir, recursive)
if err == nil {
s.handleRet(w, ret)
p.handleRet(w, ret)
return nil
}
log.Println("delete:", err)
return err
}
func (s *Server) serveCAD(w http.ResponseWriter, req *http.Request, key string, prevValue string, prevIndex uint64) error {
ret, err := s.CAD(key, prevValue, prevIndex)
func (p *participant) serveCAD(w http.ResponseWriter, req *http.Request, key string, prevValue string, prevIndex uint64) error {
ret, err := p.CAD(key, prevValue, prevIndex)
if err == nil {
s.handleRet(w, ret)
p.handleRet(w, ret)
return nil
}
log.Println("cad:", err)

View File

@ -200,7 +200,7 @@ func TestPutAdminConfigEndPoint(t *testing.T) {
barrier(t, 0, es)
for j := range es {
e, err := es[j].Get(v2configKVPrefix, false, false)
e, err := es[j].p.Get(v2configKVPrefix, false, false)
if err != nil {
t.Errorf("%v", err)
continue
@ -321,17 +321,17 @@ func TestGetAdminMachinesEndPoint(t *testing.T) {
// barrier ensures that all servers have made further progress on applied index
// compared to the base one.
func barrier(t *testing.T, base int, es []*Server) {
applied := es[base].node.Applied()
applied := es[base].p.node.Applied()
// time used for goroutine scheduling
time.Sleep(5 * time.Millisecond)
for i, e := range es {
for j := 0; ; j++ {
if e.node.Applied() >= applied {
if e.p.node.Applied() >= applied {
break
}
time.Sleep(defaultHeartbeat * defaultTickDuration)
if j == 2 {
t.Fatalf("#%d: applied = %d, want >= %d", i, e.node.Applied(), applied)
t.Fatalf("#%d: applied = %d, want >= %d", i, e.p.node.Applied(), applied)
}
}
}

View File

@ -9,7 +9,7 @@ import (
etcdErr "github.com/coreos/etcd/error"
)
func (s *Server) GetHandler(w http.ResponseWriter, req *http.Request) error {
func (p *participant) GetHandler(w http.ResponseWriter, req *http.Request) error {
key := req.URL.Path[len("/v2/keys"):]
// TODO(xiangli): handle consistent get
recursive := (req.FormValue("recursive") == "true")
@ -17,12 +17,12 @@ func (s *Server) GetHandler(w http.ResponseWriter, req *http.Request) error {
waitIndex := req.FormValue("waitIndex")
stream := (req.FormValue("stream") == "true")
if req.FormValue("wait") == "true" {
return s.handleWatch(key, recursive, stream, waitIndex, w, req)
return p.handleWatch(key, recursive, stream, waitIndex, w, req)
}
return s.handleGet(key, recursive, sort, w, req)
return p.handleGet(key, recursive, sort, w, req)
}
func (s *Server) handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request) error {
func (p *participant) handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request) error {
// Create a command to watch from a given index (default 0).
var sinceIndex uint64 = 0
var err error
@ -30,11 +30,11 @@ func (s *Server) handleWatch(key string, recursive, stream bool, waitIndex strin
if waitIndex != "" {
sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store.Index())
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", p.Store.Index())
}
}
watcher, err := s.Store.Watch(key, recursive, stream, sinceIndex)
watcher, err := p.Store.Watch(key, recursive, stream, sinceIndex)
if err != nil {
return err
}
@ -42,7 +42,7 @@ func (s *Server) handleWatch(key string, recursive, stream bool, waitIndex strin
cn, _ := w.(http.CloseNotifier)
closeChan := cn.CloseNotify()
s.writeHeaders(w)
p.writeHeaders(w)
if stream {
// watcher hub will not help to remove stream watcher
@ -86,12 +86,12 @@ func (s *Server) handleWatch(key string, recursive, stream bool, waitIndex strin
return nil
}
func (s *Server) handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error {
event, err := s.Store.Get(key, recursive, sort)
func (p *participant) handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error {
event, err := p.Store.Get(key, recursive, sort)
if err != nil {
return err
}
s.writeHeaders(w)
p.writeHeaders(w)
if req.Method == "HEAD" {
return nil
}
@ -103,9 +103,9 @@ func (s *Server) handleGet(key string, recursive, sort bool, w http.ResponseWrit
return nil
}
func (s *Server) writeHeaders(w http.ResponseWriter) {
func (p *participant) writeHeaders(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json")
w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store.Index()))
w.Header().Add("X-Etcd-Index", fmt.Sprint(p.Store.Index()))
// TODO(xiangli): raft-index and term
w.WriteHeader(http.StatusOK)
}

View File

@ -8,9 +8,9 @@ import (
"github.com/coreos/etcd/store"
)
func (s *Server) PostHandler(w http.ResponseWriter, req *http.Request) error {
if !s.node.IsLeader() {
return s.redirect(w, req, s.node.Leader())
func (p *participant) PostHandler(w http.ResponseWriter, req *http.Request) error {
if !p.node.IsLeader() {
return p.redirect(w, req, p.node.Leader())
}
key := req.URL.Path[len("/v2/keys"):]
@ -19,12 +19,12 @@ func (s *Server) PostHandler(w http.ResponseWriter, req *http.Request) error {
dir := (req.FormValue("dir") == "true")
expireTime, err := store.TTL(req.FormValue("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store.Index())
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", p.Store.Index())
}
ret, err := s.Create(key, dir, value, expireTime, true)
ret, err := p.Create(key, dir, value, expireTime, true)
if err == nil {
s.handleRet(w, ret)
p.handleRet(w, ret)
return nil
}
log.Println("unique:", err)

View File

@ -13,9 +13,9 @@ import (
"github.com/coreos/etcd/store"
)
func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error {
if !s.node.IsLeader() {
return s.redirect(w, req, s.node.Leader())
func (p *participant) PutHandler(w http.ResponseWriter, req *http.Request) error {
if !p.node.IsLeader() {
return p.redirect(w, req, p.node.Leader())
}
key := req.URL.Path[len("/v2/keys"):]
@ -27,7 +27,7 @@ func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error {
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", s.Store.Index())
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", p.Store.Index())
}
prevValue, valueOk := firstValue(req.Form, "prevValue")
@ -36,7 +36,7 @@ func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error {
// Set handler: create a new node or replace the old one.
if !valueOk && !indexOk && !existOk {
return s.serveSet(w, req, key, dir, value, expireTime)
return p.serveSet(w, req, key, dir, value, expireTime)
}
// update with test
@ -44,11 +44,11 @@ func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error {
if prevExist == "false" {
// Create command: create a new node. Fail, if a node already exists
// Ignore prevIndex and prevValue
return s.serveCreate(w, req, key, dir, value, expireTime)
return p.serveCreate(w, req, key, dir, value, expireTime)
}
if prevExist == "true" && !indexOk && !valueOk {
return s.serveUpdate(w, req, key, value, expireTime)
return p.serveUpdate(w, req, key, value, expireTime)
}
}
@ -59,7 +59,7 @@ func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error {
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", s.Store.Index())
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", p.Store.Index())
}
} else {
prevIndex = 0
@ -67,22 +67,22 @@ func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error {
if valueOk {
if prevValue == "" {
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", s.Store.Index())
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", p.Store.Index())
}
}
return s.serveCAS(w, req, key, value, prevValue, prevIndex, expireTime)
return p.serveCAS(w, req, key, value, prevValue, prevIndex, expireTime)
}
func (s *Server) handleRet(w http.ResponseWriter, ret *store.Event) {
func (p *participant) handleRet(w http.ResponseWriter, ret *store.Event) {
b, _ := json.Marshal(ret)
w.Header().Set("Content-Type", "application/json")
// etcd index should be the same as the event index
// which is also the last modified index of the node
w.Header().Add("X-Etcd-Index", fmt.Sprint(ret.Index()))
// w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
// w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
// w.Header().Add("X-Raft-Index", fmt.Sprint(p.CommitIndex()))
// w.Header().Add("X-Raft-Term", fmt.Sprint(p.Term()))
if ret.IsCreated() {
w.WriteHeader(http.StatusCreated)
@ -93,44 +93,44 @@ func (s *Server) handleRet(w http.ResponseWriter, ret *store.Event) {
w.Write(b)
}
func (s *Server) serveSet(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error {
ret, err := s.Set(key, dir, value, expireTime)
func (p *participant) serveSet(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error {
ret, err := p.Set(key, dir, value, expireTime)
if err == nil {
s.handleRet(w, ret)
p.handleRet(w, ret)
return nil
}
log.Println("set:", err)
return err
}
func (s *Server) serveCreate(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error {
ret, err := s.Create(key, dir, value, expireTime, false)
func (p *participant) serveCreate(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error {
ret, err := p.Create(key, dir, value, expireTime, false)
if err == nil {
s.handleRet(w, ret)
p.handleRet(w, ret)
return nil
}
log.Println("create:", err)
return err
}
func (s *Server) serveUpdate(w http.ResponseWriter, req *http.Request, key, value string, expireTime time.Time) error {
func (p *participant) serveUpdate(w http.ResponseWriter, req *http.Request, key, value string, expireTime time.Time) error {
// Update should give at least one option
if value == "" && expireTime.Sub(store.Permanent) == 0 {
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", s.Store.Index())
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", p.Store.Index())
}
ret, err := s.Update(key, value, expireTime)
ret, err := p.Update(key, value, expireTime)
if err == nil {
s.handleRet(w, ret)
p.handleRet(w, ret)
return nil
}
log.Println("update:", err)
return err
}
func (s *Server) serveCAS(w http.ResponseWriter, req *http.Request, key, value, prevValue string, prevIndex uint64, expireTime time.Time) error {
ret, err := s.CAS(key, value, prevValue, prevIndex, expireTime)
func (p *participant) serveCAS(w http.ResponseWriter, req *http.Request, key, value, prevValue string, prevIndex uint64, expireTime time.Time) error {
ret, err := p.CAS(key, value, prevValue, prevIndex, expireTime)
if err == nil {
s.handleRet(w, ret)
p.handleRet(w, ret)
return nil
}
log.Println("update:", err)

View File

@ -1,47 +0,0 @@
package etcd
import (
"fmt"
"net/http"
"strconv"
)
func (s *Server) serveRedirect(w http.ResponseWriter, r *http.Request) error {
if s.leader == noneId {
return fmt.Errorf("no leader in the cluster")
}
redirectAddr, err := s.buildRedirectURL(s.leaderAddr, r.URL)
if err != nil {
return err
}
http.Redirect(w, r, redirectAddr, http.StatusTemporaryRedirect)
return nil
}
func (s *Server) syncCluster() error {
for node := range s.nodes {
machines, err := s.client.GetMachines(node)
if err != nil {
continue
}
config, err := s.client.GetClusterConfig(node)
if err != nil {
continue
}
s.nodes = make(map[string]bool)
for _, machine := range machines {
s.nodes[machine.PeerURL] = true
if machine.State == stateLeader {
id, err := strconv.ParseInt(machine.Name, 0, 64)
if err != nil {
return err
}
s.leader = id
s.leaderAddr = machine.PeerURL
}
}
s.clusterConf = config
return nil
}
return fmt.Errorf("unreachable cluster")
}

View File

@ -20,57 +20,54 @@ type cmd struct {
Time time.Time
}
func (s *Server) Set(key string, dir bool, value string, expireTime time.Time) (*store.Event, error) {
func (p *participant) Set(key string, dir bool, value string, expireTime time.Time) (*store.Event, error) {
set := &cmd{Type: "set", Key: key, Dir: dir, Value: value, Time: expireTime}
return s.do(set)
return p.do(set)
}
func (s *Server) Create(key string, dir bool, value string, expireTime time.Time, unique bool) (*store.Event, error) {
func (p *participant) Create(key string, dir bool, value string, expireTime time.Time, unique bool) (*store.Event, error) {
create := &cmd{Type: "create", Key: key, Dir: dir, Value: value, Time: expireTime, Unique: unique}
return s.do(create)
return p.do(create)
}
func (s *Server) Update(key string, value string, expireTime time.Time) (*store.Event, error) {
func (p *participant) Update(key string, value string, expireTime time.Time) (*store.Event, error) {
update := &cmd{Type: "update", Key: key, Value: value, Time: expireTime}
return s.do(update)
return p.do(update)
}
func (s *Server) CAS(key, value, prevValue string, prevIndex uint64, expireTime time.Time) (*store.Event, error) {
func (p *participant) CAS(key, value, prevValue string, prevIndex uint64, expireTime time.Time) (*store.Event, error) {
cas := &cmd{Type: "cas", Key: key, Value: value, PrevValue: prevValue, PrevIndex: prevIndex, Time: expireTime}
return s.do(cas)
return p.do(cas)
}
func (s *Server) Delete(key string, dir, recursive bool) (*store.Event, error) {
func (p *participant) Delete(key string, dir, recursive bool) (*store.Event, error) {
d := &cmd{Type: "delete", Key: key, Dir: dir, Recursive: recursive}
return s.do(d)
return p.do(d)
}
func (s *Server) CAD(key string, prevValue string, prevIndex uint64) (*store.Event, error) {
func (p *participant) CAD(key string, prevValue string, prevIndex uint64) (*store.Event, error) {
cad := &cmd{Type: "cad", Key: key, PrevValue: prevValue, PrevIndex: prevIndex}
return s.do(cad)
return p.do(cad)
}
func (s *Server) do(c *cmd) (*store.Event, error) {
func (p *participant) do(c *cmd) (*store.Event, error) {
data, err := json.Marshal(c)
if err != nil {
panic(err)
}
p := v2Proposal{
pp := v2Proposal{
data: data,
ret: make(chan interface{}, 1),
}
if s.mode != participant {
return nil, raftStopErr
}
select {
case s.proposal <- p:
case p.proposal <- pp:
default:
return nil, fmt.Errorf("unable to send out the proposal")
}
switch t := (<-p.ret).(type) {
switch t := (<-pp.ret).(type) {
case *store.Event:
return t, nil
case error: