Merge branch 'refactoring' of https://github.com/benbjohnson/etcd into refactoring

Conflicts:
	server/registry.go
release-0.4
Ben Johnson 2013-10-13 15:04:21 -06:00
commit 55c1f45805
24 changed files with 897 additions and 907 deletions

View File

@ -32,7 +32,6 @@ func TestSingleNode(t *testing.T) {
time.Sleep(time.Second)
etcd.OpenDebug()
c := etcd.NewClient()
c.SyncCluster()

View File

@ -14,18 +14,18 @@ func init() {
// The JoinCommand adds a node to the cluster.
type JoinCommand struct {
RaftVersion string `json:"raftVersion"`
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
RaftVersion string `json:"raftVersion"`
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
}
func NewJoinCommand(version, name, raftUrl, etcdUrl string) *JoinCommand {
return &JoinCommand{
RaftVersion: version,
Name: name,
RaftURL: raftUrl,
EtcdURL: etcdUrl,
RaftVersion: version,
Name: name,
RaftURL: raftUrl,
EtcdURL: etcdUrl,
}
}

View File

@ -1,363 +1,362 @@
package server
import (
"bytes"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"bytes"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
type PeerServer struct {
*raft.Server
server *Server
joinIndex uint64
name string
url string
listenHost string
tlsConf *TLSConfig
tlsInfo *TLSInfo
followersStats *raftFollowersStats
serverStats *raftServerStats
registry *Registry
store *store.Store
snapConf *snapshotConf
MaxClusterSize int
RetryTimes int
*raft.Server
server *Server
joinIndex uint64
name string
url string
listenHost string
tlsConf *TLSConfig
tlsInfo *TLSInfo
followersStats *raftFollowersStats
serverStats *raftServerStats
registry *Registry
store *store.Store
snapConf *snapshotConf
MaxClusterSize int
RetryTimes int
}
// TODO: find a good policy to do snapshot
type snapshotConf struct {
// Etcd will check if snapshot is need every checkingInterval
checkingInterval time.Duration
// Etcd will check if snapshot is need every checkingInterval
checkingInterval time.Duration
// The number of writes when the last snapshot happened
lastWrites uint64
// The number of writes when the last snapshot happened
lastWrites uint64
// If the incremental number of writes since the last snapshot
// exceeds the write Threshold, etcd will do a snapshot
writesThr uint64
// If the incremental number of writes since the last snapshot
// exceeds the write Threshold, etcd will do a snapshot
writesThr uint64
}
func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store *store.Store) *PeerServer {
s := &PeerServer{
name: name,
url: url,
listenHost: listenHost,
tlsConf: tlsConf,
tlsInfo: tlsInfo,
registry: registry,
store: store,
snapConf: &snapshotConf{time.Second * 3, 0, 20 * 1000},
followersStats: &raftFollowersStats{
Leader: name,
Followers: make(map[string]*raftFollowerStats),
},
serverStats: &raftServerStats{
StartTime: time.Now(),
sendRateQueue: &statsQueue{
back: -1,
},
recvRateQueue: &statsQueue{
back: -1,
},
},
}
s := &PeerServer{
name: name,
url: url,
listenHost: listenHost,
tlsConf: tlsConf,
tlsInfo: tlsInfo,
registry: registry,
store: store,
snapConf: &snapshotConf{time.Second * 3, 0, 20 * 1000},
followersStats: &raftFollowersStats{
Leader: name,
Followers: make(map[string]*raftFollowerStats),
},
serverStats: &raftServerStats{
StartTime: time.Now(),
sendRateQueue: &statsQueue{
back: -1,
},
recvRateQueue: &statsQueue{
back: -1,
},
},
}
// Create transporter for raft
raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s)
// Create transporter for raft
raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s)
// Create raft server
server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "")
if err != nil {
log.Fatal(err)
}
// Create raft server
server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "")
if err != nil {
log.Fatal(err)
}
s.Server = server
s.Server = server
return s
return s
}
// Start the raft server
func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
// LoadSnapshot
if snapshot {
err := s.LoadSnapshot()
// LoadSnapshot
if snapshot {
err := s.LoadSnapshot()
if err == nil {
log.Debugf("%s finished load snapshot", s.name)
} else {
log.Debug(err)
}
}
if err == nil {
log.Debugf("%s finished load snapshot", s.name)
} else {
log.Debug(err)
}
}
s.SetElectionTimeout(ElectionTimeout)
s.SetHeartbeatTimeout(HeartbeatTimeout)
s.SetElectionTimeout(ElectionTimeout)
s.SetHeartbeatTimeout(HeartbeatTimeout)
s.Start()
s.Start()
if s.IsLogEmpty() {
// start as a leader in a new cluster
if len(cluster) == 0 {
s.startAsLeader()
} else {
s.startAsFollower(cluster)
}
if s.IsLogEmpty() {
// start as a leader in a new cluster
if len(cluster) == 0 {
s.startAsLeader()
} else {
s.startAsFollower(cluster)
}
} else {
// Rejoin the previous cluster
cluster = s.registry.PeerURLs(s.Leader(), s.name)
for i := 0; i < len(cluster); i++ {
u, err := url.Parse(cluster[i])
if err != nil {
log.Debug("rejoin cannot parse url: ", err)
}
cluster[i] = u.Host
}
ok := s.joinCluster(cluster)
if !ok {
log.Warn("the entire cluster is down! this machine will restart the cluster.")
}
} else {
// Rejoin the previous cluster
cluster = s.registry.PeerURLs(s.Leader(), s.name)
for i := 0; i < len(cluster); i++ {
u, err := url.Parse(cluster[i])
if err != nil {
log.Debug("rejoin cannot parse url: ", err)
}
cluster[i] = u.Host
}
ok := s.joinCluster(cluster)
if !ok {
log.Warn("the entire cluster is down! this machine will restart the cluster.")
}
log.Debugf("%s restart as a follower", s.name)
}
log.Debugf("%s restart as a follower", s.name)
}
// open the snapshot
if snapshot {
go s.monitorSnapshot()
}
// open the snapshot
if snapshot {
go s.monitorSnapshot()
}
// start to response to raft requests
go s.startTransport(s.tlsConf.Scheme, s.tlsConf.Server)
// start to response to raft requests
go s.startTransport(s.tlsConf.Scheme, s.tlsConf.Server)
}
// Retrieves the underlying Raft server.
func (s *PeerServer) RaftServer() *raft.Server {
return s.Server
return s.Server
}
// Associates the client server with the peer server.
func (s *PeerServer) SetServer(server *Server) {
s.server = server
s.server = server
}
// Get all the current logs
func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] GET %s/log", s.url)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(s.LogEntries())
log.Debugf("[recv] GET %s/log", s.url)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(s.LogEntries())
}
// Response to vote request
func (s *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
rvreq := &raft.RequestVoteRequest{}
err := decodeJsonRequest(req, rvreq)
if err == nil {
log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName)
if resp := s.RequestVote(rvreq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
log.Warnf("[vote] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
rvreq := &raft.RequestVoteRequest{}
err := decodeJsonRequest(req, rvreq)
if err == nil {
log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName)
if resp := s.RequestVote(rvreq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
log.Warnf("[vote] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
// Response to append entries request
func (s *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.AppendEntriesRequest{}
err := decodeJsonRequest(req, aereq)
aereq := &raft.AppendEntriesRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
log.Debugf("[recv] POST %s/log/append [%d]", s.url, len(aereq.Entries))
if err == nil {
log.Debugf("[recv] POST %s/log/append [%d]", s.url, len(aereq.Entries))
s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
if resp := s.AppendEntries(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
if !resp.Success {
log.Debugf("[Append Entry] Step back")
}
return
}
}
log.Warnf("[Append Entry] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
if resp := s.AppendEntries(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
if !resp.Success {
log.Debugf("[Append Entry] Step back")
}
return
}
}
log.Warnf("[Append Entry] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
// Response to recover from snapshot request
func (s *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
log.Debugf("[recv] POST %s/snapshot/ ", s.url)
if resp := s.RequestSnapshot(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
log.Warnf("[Snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
aereq := &raft.SnapshotRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
log.Debugf("[recv] POST %s/snapshot/ ", s.url)
if resp := s.RequestSnapshot(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
log.Warnf("[Snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
// Response to recover from snapshot request
func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRecoveryRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url)
if resp := s.SnapshotRecoveryRequest(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
log.Warnf("[Snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
aereq := &raft.SnapshotRecoveryRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url)
if resp := s.SnapshotRecoveryRequest(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
log.Warnf("[Snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
// Get the port that listening for etcd connecting of the server
func (s *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/etcdURL/ ", s.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(s.server.URL()))
log.Debugf("[recv] Get %s/etcdURL/ ", s.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(s.server.URL()))
}
// Response to the join request
func (s *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &JoinCommand{}
command := &JoinCommand{}
// Write CORS header.
if s.server.OriginAllowed("*") {
w.Header().Add("Access-Control-Allow-Origin", "*")
} else if s.server.OriginAllowed(req.Header.Get("Origin")) {
w.Header().Add("Access-Control-Allow-Origin", req.Header.Get("Origin"))
}
// Write CORS header.
if s.server.OriginAllowed("*") {
w.Header().Add("Access-Control-Allow-Origin", "*")
} else if s.server.OriginAllowed(req.Header.Get("Origin")) {
w.Header().Add("Access-Control-Allow-Origin", req.Header.Get("Origin"))
}
err := decodeJsonRequest(req, command)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
err := decodeJsonRequest(req, command)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
log.Debugf("Receive Join Request from %s", command.Name)
err = s.dispatchRaftCommand(command, w, req)
log.Debugf("Receive Join Request from %s", command.Name)
err = s.dispatchRaftCommand(command, w, req)
// Return status.
if err != nil {
if etcdErr, ok := err.(*etcdErr.Error); ok {
log.Debug("Return error: ", (*etcdErr).Error())
etcdErr.Write(w)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
// Return status.
if err != nil {
if etcdErr, ok := err.(*etcdErr.Error); ok {
log.Debug("Return error: ", (*etcdErr).Error())
etcdErr.Write(w)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
// Response to remove request
func (s *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
if req.Method != "DELETE" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
if req.Method != "DELETE" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
nodeName := req.URL.Path[len("/remove/"):]
command := &RemoveCommand{
Name: nodeName,
}
nodeName := req.URL.Path[len("/remove/"):]
command := &RemoveCommand{
Name: nodeName,
}
log.Debugf("[recv] Remove Request [%s]", command.Name)
log.Debugf("[recv] Remove Request [%s]", command.Name)
s.dispatchRaftCommand(command, w, req)
s.dispatchRaftCommand(command, w, req)
}
// Response to the name request
func (s *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/name/ ", s.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(s.name))
log.Debugf("[recv] Get %s/name/ ", s.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(s.name))
}
// Response to the name request
func (s *PeerServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/version/ ", s.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(PeerVersion))
log.Debugf("[recv] Get %s/version/ ", s.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(PeerVersion))
}
func (s *PeerServer) dispatchRaftCommand(c raft.Command, w http.ResponseWriter, req *http.Request) error {
return s.dispatch(c, w, req)
return s.dispatch(c, w, req)
}
func (s *PeerServer) startAsLeader() {
// leader need to join self as a peer
for {
_, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL()))
if err == nil {
break
}
}
log.Debugf("%s start as a leader", s.name)
// leader need to join self as a peer
for {
_, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL()))
if err == nil {
break
}
}
log.Debugf("%s start as a leader", s.name)
}
func (s *PeerServer) startAsFollower(cluster []string) {
// start as a follower in a existing cluster
for i := 0; i < s.RetryTimes; i++ {
ok := s.joinCluster(cluster)
if ok {
return
}
log.Warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
time.Sleep(time.Second * RetryInterval)
}
// start as a follower in a existing cluster
for i := 0; i < s.RetryTimes; i++ {
ok := s.joinCluster(cluster)
if ok {
return
}
log.Warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
time.Sleep(time.Second * RetryInterval)
}
log.Fatalf("Cannot join the cluster via given machines after %x retries", s.RetryTimes)
log.Fatalf("Cannot join the cluster via given machines after %x retries", s.RetryTimes)
}
// Start to listen and response raft command
func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) {
log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url)
log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url)
raftMux := http.NewServeMux()
raftMux := http.NewServeMux()
server := &http.Server{
Handler: raftMux,
TLSConfig: &tlsConf,
Addr: s.listenHost,
}
server := &http.Server{
Handler: raftMux,
TLSConfig: &tlsConf,
Addr: s.listenHost,
}
// internal commands
raftMux.HandleFunc("/name", s.NameHttpHandler)
raftMux.HandleFunc("/version", s.RaftVersionHttpHandler)
raftMux.HandleFunc("/join", s.JoinHttpHandler)
raftMux.HandleFunc("/remove/", s.RemoveHttpHandler)
raftMux.HandleFunc("/vote", s.VoteHttpHandler)
raftMux.HandleFunc("/log", s.GetLogHttpHandler)
raftMux.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
raftMux.HandleFunc("/snapshot", s.SnapshotHttpHandler)
raftMux.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
// internal commands
raftMux.HandleFunc("/name", s.NameHttpHandler)
raftMux.HandleFunc("/version", s.RaftVersionHttpHandler)
raftMux.HandleFunc("/join", s.JoinHttpHandler)
raftMux.HandleFunc("/remove/", s.RemoveHttpHandler)
raftMux.HandleFunc("/vote", s.VoteHttpHandler)
raftMux.HandleFunc("/log", s.GetLogHttpHandler)
raftMux.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
raftMux.HandleFunc("/snapshot", s.SnapshotHttpHandler)
raftMux.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
if scheme == "http" {
log.Fatal(server.ListenAndServe())
} else {
log.Fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile))
}
if scheme == "http" {
log.Fatal(server.ListenAndServe())
} else {
log.Fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile))
}
}
@ -365,184 +364,182 @@ func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) {
// will need to do something more sophisticated later when we allow mixed
// version clusters.
func getVersion(t *transporter, versionURL url.URL) (string, error) {
resp, req, err := t.Get(versionURL.String())
if err != nil {
return "", err
}
defer resp.Body.Close()
resp, req, err := t.Get(versionURL.String())
if err != nil {
return "", err
}
defer resp.Body.Close()
t.CancelWhenTimeout(req)
t.CancelWhenTimeout(req)
body, err := ioutil.ReadAll(resp.Body)
body, err := ioutil.ReadAll(resp.Body)
return string(body), nil
return string(body), nil
}
func (s *PeerServer) joinCluster(cluster []string) bool {
for _, machine := range cluster {
if len(machine) == 0 {
continue
}
for _, machine := range cluster {
if len(machine) == 0 {
continue
}
err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme)
if err == nil {
log.Debugf("%s success join to the cluster via machine %s", s.name, machine)
return true
err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme)
if err == nil {
log.Debugf("%s success join to the cluster via machine %s", s.name, machine)
return true
} else {
if _, ok := err.(etcdErr.Error); ok {
log.Fatal(err)
}
} else {
if _, ok := err.(etcdErr.Error); ok {
log.Fatal(err)
}
log.Debugf("cannot join to cluster via machine %s %s", machine, err)
}
}
return false
log.Debugf("cannot join to cluster via machine %s %s", machine, err)
}
}
return false
}
// Send join requests to machine.
func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme string) error {
var b bytes.Buffer
var b bytes.Buffer
// t must be ok
t, _ := server.Transporter().(*transporter)
// t must be ok
t, _ := server.Transporter().(*transporter)
// Our version must match the leaders version
versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}
version, err := getVersion(t, versionURL)
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}
// Our version must match the leaders version
versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}
version, err := getVersion(t, versionURL)
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}
// TODO: versioning of the internal protocol. See:
// Documentation/internatl-protocol-versioning.md
if version != PeerVersion {
return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
}
// TODO: versioning of the internal protocol. See:
// Documentation/internatl-protocol-versioning.md
if version != PeerVersion {
return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
}
json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
log.Debugf("Send Join Request to %s", joinURL.String())
log.Debugf("Send Join Request to %s", joinURL.String())
resp, req, err := t.Post(joinURL.String(), &b)
resp, req, err := t.Post(joinURL.String(), &b)
for {
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}
if resp != nil {
defer resp.Body.Close()
for {
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(req)
t.CancelWhenTimeout(req)
if resp.StatusCode == http.StatusOK {
b, _ := ioutil.ReadAll(resp.Body)
s.joinIndex, _ = binary.Uvarint(b)
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
if resp.StatusCode == http.StatusOK {
b, _ := ioutil.ReadAll(resp.Body)
s.joinIndex, _ = binary.Uvarint(b)
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
address := resp.Header.Get("Location")
log.Debugf("Send Join Request to %s", address)
address := resp.Header.Get("Location")
log.Debugf("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
resp, req, err = t.Post(address, &b)
resp, req, err = t.Post(address, &b)
} else if resp.StatusCode == http.StatusBadRequest {
log.Debug("Reach max number machines in the cluster")
decoder := json.NewDecoder(resp.Body)
err := &etcdErr.Error{}
decoder.Decode(err)
return *err
} else {
return fmt.Errorf("Unable to join")
}
}
} else if resp.StatusCode == http.StatusBadRequest {
log.Debug("Reach max number machines in the cluster")
decoder := json.NewDecoder(resp.Body)
err := &etcdErr.Error{}
decoder.Decode(err)
return *err
} else {
return fmt.Errorf("Unable to join")
}
}
}
return fmt.Errorf("Unable to join: %v", err)
}
}
func (s *PeerServer) Stats() []byte {
s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.startTime).String()
s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.startTime).String()
queue := s.serverStats.sendRateQueue
queue := s.serverStats.sendRateQueue
s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate()
s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate()
queue = s.serverStats.recvRateQueue
queue = s.serverStats.recvRateQueue
s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate()
s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate()
b, _ := json.Marshal(s.serverStats)
b, _ := json.Marshal(s.serverStats)
return b
return b
}
func (s *PeerServer) PeerStats() []byte {
if s.State() == raft.Leader {
b, _ := json.Marshal(s.followersStats)
return b
}
return nil
if s.State() == raft.Leader {
b, _ := json.Marshal(s.followersStats)
return b
}
return nil
}
func (s *PeerServer) monitorSnapshot() {
for {
time.Sleep(s.snapConf.checkingInterval)
currentWrites := 0
if uint64(currentWrites) > s.snapConf.writesThr {
s.TakeSnapshot()
s.snapConf.lastWrites = 0
}
}
for {
time.Sleep(s.snapConf.checkingInterval)
currentWrites := 0
if uint64(currentWrites) > s.snapConf.writesThr {
s.TakeSnapshot()
s.snapConf.lastWrites = 0
}
}
}
func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
if s.State() == raft.Leader {
if response, err := s.Do(c); err != nil {
return err
} else {
if response == nil {
return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm)
}
if s.State() == raft.Leader {
if response, err := s.Do(c); err != nil {
return err
} else {
if response == nil {
return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm)
}
event, ok := response.(*store.Event)
if ok {
bytes, err := json.Marshal(event)
if err != nil {
fmt.Println(err)
}
event, ok := response.(*store.Event)
if ok {
bytes, err := json.Marshal(event)
if err != nil {
fmt.Println(err)
}
w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
w.WriteHeader(http.StatusOK)
w.Write(bytes)
w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
w.WriteHeader(http.StatusOK)
w.Write(bytes)
return nil
}
return nil
}
bytes, _ := response.([]byte)
w.WriteHeader(http.StatusOK)
w.Write(bytes)
bytes, _ := response.([]byte)
w.WriteHeader(http.StatusOK)
w.Write(bytes)
return nil
}
return nil
}
} else {
leader := s.Leader()
// current no leader
if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
}
url, _ := s.registry.PeerURL(leader)
} else {
leader := s.Leader()
// current no leader
if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
}
url, _ := s.registry.PeerURL(leader)
redirect(url, w, req)
redirect(url, w, req)
return nil
}
return nil
}
}

View File

@ -1,14 +1,14 @@
package server
import (
"fmt"
"net/url"
"path"
"strings"
"sync"
"fmt"
"net/url"
"path"
"strings"
"sync"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
)
// The location of the machine URL data.
@ -16,174 +16,174 @@ const RegistryKey = "/_etcd/machines"
// The Registry stores URL information for nodes.
type Registry struct {
sync.Mutex
store *store.Store
nodes map[string]*node
sync.Mutex
store *store.Store
nodes map[string]*node
}
// The internal storage format of the registry.
type node struct {
peerVersion string
peerURL string
url string
peerVersion string
peerURL string
url string
}
// Creates a new Registry.
func NewRegistry(s *store.Store) *Registry {
return &Registry{
store: s,
nodes: make(map[string]*node),
}
return &Registry{
store: s,
nodes: make(map[string]*node),
}
}
// Adds a node to the registry.
func (r *Registry) Register(name string, peerVersion string, peerURL string, url string, commitIndex uint64, term uint64) {
r.Lock()
defer r.Unlock()
r.Lock()
defer r.Unlock()
// Write data to store.
key := path.Join(RegistryKey, name)
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion)
r.store.Create(key, value, false, false, store.Permanent, commitIndex, term)
// Write data to store.
key := path.Join(RegistryKey, name)
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion)
r.store.Create(key, value, false, false, store.Permanent, commitIndex, term)
}
// Removes a node from the registry.
func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) error {
r.Lock()
defer r.Unlock()
r.Lock()
defer r.Unlock()
// Remove the key from the store.
_, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term)
return err
// Remove the key from the store.
_, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term)
return err
}
// Returns the number of nodes in the cluster.
func (r *Registry) Count() int {
e, err := r.store.Get(RegistryKey, false, false, 0, 0)
if err != nil {
return 0
}
return len(e.KVPairs)
e, err := r.store.Get(RegistryKey, false, false, 0, 0)
if err != nil {
return 0
}
return len(e.KVPairs)
}
// Retrieves the URL for a given node by name.
func (r *Registry) URL(name string) (string, bool) {
r.Lock()
defer r.Unlock()
return r.url(name)
r.Lock()
defer r.Unlock()
return r.url(name)
}
func (r *Registry) url(name string) (string, bool) {
if r.nodes[name] == nil {
r.load(name)
}
if r.nodes[name] == nil {
r.load(name)
}
if node := r.nodes[name]; node != nil {
return node.url, true
}
if node := r.nodes[name]; node != nil {
return node.url, true
}
return "", false
return "", false
}
// Retrieves the URLs for all nodes.
func (r *Registry) URLs(leaderName, selfName string) []string {
r.Lock()
defer r.Unlock()
r.Lock()
defer r.Unlock()
// Build list including the leader and self.
urls := make([]string, 0)
if url, _ := r.url(leaderName); len(url) > 0 {
urls = append(urls, url)
}
if url, _ := r.url(selfName); len(url) > 0 {
urls = append(urls, url)
}
// Build list including the leader and self.
urls := make([]string, 0)
if url, _ := r.url(leaderName); len(url) > 0 {
urls = append(urls, url)
}
if url, _ := r.url(selfName); len(url) > 0 {
urls = append(urls, url)
}
// Retrieve a list of all nodes.
if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil {
// Lookup the URL for each one.
for _, pair := range e.KVPairs {
if url, _ := r.url(pair.Key); len(url) > 0 && pair.Key != leaderName && pair.Key != selfName {
urls = append(urls, url)
}
}
}
// Retrieve a list of all nodes.
if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil {
// Lookup the URL for each one.
for _, pair := range e.KVPairs {
if url, _ := r.url(pair.Key); len(url) > 0 && pair.Key != leaderName && pair.Key != selfName {
urls = append(urls, url)
}
}
}
log.Infof("URLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ","))
log.Infof("URLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ","))
return urls
return urls
}
// Retrieves the peer URL for a given node by name.
func (r *Registry) PeerURL(name string) (string, bool) {
r.Lock()
defer r.Unlock()
return r.peerURL(name)
r.Lock()
defer r.Unlock()
return r.peerURL(name)
}
func (r *Registry) peerURL(name string) (string, bool) {
if r.nodes[name] == nil {
r.load(name)
}
if r.nodes[name] == nil {
r.load(name)
}
if node := r.nodes[name]; node != nil {
return node.peerURL, true
}
if node := r.nodes[name]; node != nil {
return node.peerURL, true
}
return "", false
return "", false
}
// Retrieves the peer URLs for all nodes.
func (r *Registry) PeerURLs(leaderName, selfName string) []string {
r.Lock()
defer r.Unlock()
r.Lock()
defer r.Unlock()
// Build list including the leader and self.
urls := make([]string, 0)
if url, _ := r.peerURL(leaderName); len(url) > 0 {
urls = append(urls, url)
}
if url, _ := r.peerURL(selfName); len(url) > 0 {
urls = append(urls, url)
}
// Build list including the leader and self.
urls := make([]string, 0)
if url, _ := r.peerURL(leaderName); len(url) > 0 {
urls = append(urls, url)
}
if url, _ := r.peerURL(selfName); len(url) > 0 {
urls = append(urls, url)
}
// Retrieve a list of all nodes.
if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil {
// Lookup the URL for each one.
for _, pair := range e.KVPairs {
if url, _ := r.peerURL(pair.Key); len(url) > 0 && pair.Key != leaderName && pair.Key != selfName {
urls = append(urls, url)
}
}
}
// Retrieve a list of all nodes.
if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil {
// Lookup the URL for each one.
for _, pair := range e.KVPairs {
if url, _ := r.peerURL(pair.Key); len(url) > 0 && pair.Key != leaderName && pair.Key != selfName {
urls = append(urls, url)
}
}
}
log.Infof("PeerURLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ","))
log.Infof("PeerURLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ","))
return urls
return urls
}
// Loads the given node by name from the store into the cache.
func (r *Registry) load(name string) {
if name == "" {
return
}
if name == "" {
return
}
// Retrieve from store.
e, err := r.store.Get(path.Join(RegistryKey, name), false, false, 0, 0)
if err != nil {
return
}
// Retrieve from store.
e, err := r.store.Get(path.Join(RegistryKey, name), false, false, 0, 0)
if err != nil {
return
}
// Parse as a query string.
m, err := url.ParseQuery(e.Value)
if err != nil {
panic(fmt.Sprintf("Failed to parse machines entry: %s", name))
}
// Parse as a query string.
m, err := url.ParseQuery(e.Value)
if err != nil {
panic(fmt.Sprintf("Failed to parse machines entry: %s", name))
}
// Create node.
r.nodes[name] = &node{
url: m["etcd"][0],
peerURL: m["raft"][0],
peerVersion: m["raftVersion"][0],
}
// Create node.
r.nodes[name] = &node{
url: m["etcd"][0],
peerURL: m["raft"][0],
peerVersion: m["raftVersion"][0],
}
}

View File

@ -1,8 +1,8 @@
package server
import (
"fmt"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
@ -38,6 +38,7 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI
TLSConfig: &tlsConf.Server,
Addr: listenHost,
},
name: name,
store: store,
registry: registry,
url: urlStr,
@ -134,7 +135,7 @@ func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Reque
// Wrap the standard HandleFunc interface to pass in the server reference.
return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
// Log request.
log.Debugf("[recv] %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr)
log.Debugf("[recv] %s %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr)
// Write CORS header.
if s.OriginAllowed("*") {
@ -242,28 +243,28 @@ func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) erro
// Handler to return all the known machines in the current cluster.
func (s *Server) GetMachinesHandler(w http.ResponseWriter, req *http.Request) error {
machines := s.registry.URLs(s.peerServer.Leader(), s.name)
w.WriteHeader(http.StatusOK)
w.Write([]byte(strings.Join(machines, ", ")))
return nil
machines := s.registry.URLs(s.peerServer.Leader(), s.name)
w.WriteHeader(http.StatusOK)
w.Write([]byte(strings.Join(machines, ", ")))
return nil
}
// Retrieves stats on the Raft server.
func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error {
w.Write(s.peerServer.Stats())
return nil
w.Write(s.peerServer.Stats())
return nil
}
// Retrieves stats on the leader.
func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error {
if s.peerServer.State() == raft.Leader {
w.Write(s.peerServer.PeerStats())
return nil
w.Write(s.peerServer.PeerStats())
return nil
}
leader := s.peerServer.Leader()
if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
}
hostname, _ := s.registry.URL(leader)
redirect(hostname, w, req)
@ -272,8 +273,8 @@ func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request)
// Retrieves stats on the leader.
func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request) error {
w.Write(s.store.JsonStats())
return nil
w.Write(s.store.JsonStats())
return nil
}
// Executes a speed test to evaluate the performance of update replication.
@ -284,8 +285,8 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro
go func() {
for j := 0; j < 10; j++ {
c := &store.UpdateCommand{
Key: "foo",
Value: "bar",
Key: "foo",
Value: "bar",
ExpireTime: time.Unix(0, 0),
}
s.peerServer.Do(c)

View File

@ -1,15 +1,15 @@
package server
import (
"time"
"time"
)
const (
// The amount of time to elapse without a heartbeat before becoming a candidate.
ElectionTimeout = 200 * time.Millisecond
// The amount of time to elapse without a heartbeat before becoming a candidate.
ElectionTimeout = 200 * time.Millisecond
// The frequency by which heartbeats are sent to followers.
HeartbeatTimeout = 50 * time.Millisecond
// The frequency by which heartbeats are sent to followers.
HeartbeatTimeout = 50 * time.Millisecond
RetryInterval = 10
RetryInterval = 10
)

View File

@ -1,17 +1,17 @@
package server
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"time"
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"time"
"github.com/coreos/etcd/log"
"github.com/coreos/go-raft"
"github.com/coreos/etcd/log"
"github.com/coreos/go-raft"
)
// Timeout for setup internal raft http connection
@ -28,200 +28,200 @@ var tranTimeout = ElectionTimeout
// Transporter layer for communication between raft nodes
type transporter struct {
client *http.Client
transport *http.Transport
peerServer *PeerServer
client *http.Client
transport *http.Transport
peerServer *PeerServer
}
// Create transporter using by raft server
// Create http or https transporter based on
// whether the user give the server cert and key
func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
t := transporter{}
t := transporter{}
tr := &http.Transport{
Dial: dialWithTimeout,
ResponseHeaderTimeout: responseHeaderTimeout,
}
tr := &http.Transport{
Dial: dialWithTimeout,
ResponseHeaderTimeout: responseHeaderTimeout,
}
if scheme == "https" {
tr.TLSClientConfig = &tlsConf
tr.DisableCompression = true
}
if scheme == "https" {
tr.TLSClientConfig = &tlsConf
tr.DisableCompression = true
}
t.client = &http.Client{Transport: tr}
t.transport = tr
t.peerServer = peerServer
t.client = &http.Client{Transport: tr}
t.transport = tr
t.peerServer = peerServer
return &t
return &t
}
// Dial with timeout
func dialWithTimeout(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, dailTimeout)
return net.DialTimeout(network, addr, dailTimeout)
}
// Sends AppendEntries RPCs to a peer when the server is the leader.
func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
var aersp *raft.AppendEntriesResponse
var b bytes.Buffer
var aersp *raft.AppendEntriesResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
json.NewEncoder(&b).Encode(req)
size := b.Len()
size := b.Len()
t.peerServer.serverStats.SendAppendReq(size)
t.peerServer.serverStats.SendAppendReq(size)
u, _ := t.peerServer.registry.PeerURL(peer.Name)
u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send LogEntries to %s ", u)
log.Debugf("Send LogEntries to %s ", u)
thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name]
thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name]
if !ok { //this is the first time this follower has been seen
thisFollowerStats = &raftFollowerStats{}
thisFollowerStats.Latency.Minimum = 1 << 63
t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats
}
if !ok { //this is the first time this follower has been seen
thisFollowerStats = &raftFollowerStats{}
thisFollowerStats.Latency.Minimum = 1 << 63
t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats
}
start := time.Now()
start := time.Now()
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
end := time.Now()
end := time.Now()
if err != nil {
log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
if ok {
thisFollowerStats.Fail()
}
} else {
if ok {
thisFollowerStats.Succ(end.Sub(start))
}
}
if err != nil {
log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
if ok {
thisFollowerStats.Fail()
}
} else {
if ok {
thisFollowerStats.Succ(end.Sub(start))
}
}
if resp != nil {
defer resp.Body.Close()
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest)
t.CancelWhenTimeout(httpRequest)
aersp = &raft.AppendEntriesResponse{}
if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
aersp = &raft.AppendEntriesResponse{}
if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
}
}
return aersp
return aersp
}
// Sends RequestVote RPCs to a peer when the server is the candidate.
func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
var rvrsp *raft.RequestVoteResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
var rvrsp *raft.RequestVoteResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send Vote to %s", u)
u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send Vote to %s", u)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
if err != nil {
log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
}
if err != nil {
log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
}
if resp != nil {
defer resp.Body.Close()
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest)
t.CancelWhenTimeout(httpRequest)
rvrsp := &raft.RequestVoteResponse{}
if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
return rvrsp
}
rvrsp := &raft.RequestVoteResponse{}
if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
return rvrsp
}
}
return rvrsp
}
return rvrsp
}
// Sends SnapshotRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
var aersp *raft.SnapshotResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
var aersp *raft.SnapshotResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
if err != nil {
log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
}
if err != nil {
log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
}
if resp != nil {
defer resp.Body.Close()
if resp != nil {
defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest)
t.CancelWhenTimeout(httpRequest)
aersp = &raft.SnapshotResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
aersp = &raft.SnapshotResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
}
return aersp
}
}
return aersp
return aersp
}
// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
var aersp *raft.SnapshotRecoveryResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
var aersp *raft.SnapshotRecoveryResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
if err != nil {
log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
}
if err != nil {
log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
}
if resp != nil {
defer resp.Body.Close()
aersp = &raft.SnapshotRecoveryResponse{}
if resp != nil {
defer resp.Body.Close()
aersp = &raft.SnapshotRecoveryResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
}
return aersp
return aersp
}
// Send server side POST request
func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
req, _ := http.NewRequest("POST", urlStr, body)
resp, err := t.client.Do(req)
return resp, req, err
req, _ := http.NewRequest("POST", urlStr, body)
resp, err := t.client.Do(req)
return resp, req, err
}
// Send server side GET request
func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
req, _ := http.NewRequest("GET", urlStr, nil)
resp, err := t.client.Do(req)
return resp, req, err
req, _ := http.NewRequest("GET", urlStr, nil)
resp, err := t.client.Do(req)
return resp, req, err
}
// Cancel the on fly HTTP transaction when timeout happens.
func (t *transporter) CancelWhenTimeout(req *http.Request) {
go func() {
time.Sleep(ElectionTimeout)
t.transport.CancelRequest(req)
}()
go func() {
time.Sleep(ElectionTimeout)
t.transport.CancelRequest(req)
}()
}

View File

@ -1,27 +1,26 @@
package server
import (
"encoding/json"
"fmt"
"io"
"net/http"
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/log"
)
func decodeJsonRequest(req *http.Request, data interface{}) error {
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&data); err != nil && err != io.EOF {
log.Warnf("Malformed json request: %v", err)
return fmt.Errorf("Malformed json request: %v", err)
}
return nil
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&data); err != nil && err != io.EOF {
log.Warnf("Malformed json request: %v", err)
return fmt.Errorf("Malformed json request: %v", err)
}
return nil
}
func redirect(hostname string, w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
url := hostname + path
log.Debugf("Redirect to %s", url)
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
path := req.URL.Path
url := hostname + path
log.Debugf("Redirect to %s", url)
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
}

View File

@ -1,9 +1,9 @@
package v1
import (
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
"net/http"
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
)
// Removes a key from the store.

View File

@ -1,9 +1,9 @@
package v1
import (
"net/http"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"net/http"
)
// The Server interface provides all the methods required for the v1 API.

View File

@ -1,29 +1,29 @@
package v2
import (
"net/http"
"net/http"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
)
func CreateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
vars := mux.Vars(req)
key := "/" + vars["key"]
value := req.FormValue("value")
expireTime, err := store.TTL(req.FormValue("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
}
value := req.FormValue("value")
expireTime, err := store.TTL(req.FormValue("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
}
c := &store.CreateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
IncrementalSuffix: (req.FormValue("incremental") == "true"),
}
c := &store.CreateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
IncrementalSuffix: (req.FormValue("incremental") == "true"),
}
return s.Dispatch(c, w, req)
return s.Dispatch(c, w, req)
}

View File

@ -1,20 +1,20 @@
package v2
import (
"net/http"
"net/http"
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
)
func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
vars := mux.Vars(req)
key := "/" + vars["key"]
c := &store.DeleteCommand{
Key: key,
Recursive: (req.FormValue("recursive") == "true"),
}
c := &store.DeleteCommand{
Key: key,
Recursive: (req.FormValue("recursive") == "true"),
}
return s.Dispatch(c, w, req)
return s.Dispatch(c, w, req)
}

View File

@ -1,69 +1,69 @@
package v2
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"encoding/json"
"fmt"
"net/http"
"strconv"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"github.com/gorilla/mux"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"github.com/gorilla/mux"
)
func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
var err error
var event *store.Event
var err error
var event *store.Event
vars := mux.Vars(req)
key := "/" + vars["key"]
vars := mux.Vars(req)
key := "/" + vars["key"]
// Help client to redirect the request to the current leader
if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
leader := s.Leader()
hostname, _ := s.PeerURL(leader)
url := hostname + req.URL.Path
log.Debugf("Redirect to %s", url)
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
return nil
}
// Help client to redirect the request to the current leader
if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
leader := s.Leader()
hostname, _ := s.PeerURL(leader)
url := hostname + req.URL.Path
log.Debugf("Redirect to %s", url)
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
return nil
}
recursive := (req.FormValue("recursive") == "true")
sorted := (req.FormValue("sorted") == "true")
recursive := (req.FormValue("recursive") == "true")
sorted := (req.FormValue("sorted") == "true")
if req.FormValue("wait") == "true" { // watch
// Create a command to watch from a given index (default 0).
var sinceIndex uint64 = 0
if req.Method == "POST" {
sinceIndex, err = strconv.ParseUint(string(req.FormValue("wait_index")), 10, 64)
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
}
}
if req.FormValue("wait") == "true" { // watch
// Create a command to watch from a given index (default 0).
var sinceIndex uint64 = 0
if req.Method == "POST" {
sinceIndex, err = strconv.ParseUint(string(req.FormValue("wait_index")), 10, 64)
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
}
}
// Start the watcher on the store.
c, err := s.Store().Watch(key, recursive, sinceIndex, s.CommitIndex(), s.Term())
if err != nil {
return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
}
event = <-c
// Start the watcher on the store.
c, err := s.Store().Watch(key, recursive, sinceIndex, s.CommitIndex(), s.Term())
if err != nil {
return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
}
event = <-c
} else { //get
// Retrieve the key from the store.
event, err = s.Store().Get(key, recursive, sorted, s.CommitIndex(), s.Term())
if err != nil {
return err
}
}
} else { //get
// Retrieve the key from the store.
event, err = s.Store().Get(key, recursive, sorted, s.CommitIndex(), s.Term())
if err != nil {
return err
}
}
w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
w.WriteHeader(http.StatusOK)
w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
w.WriteHeader(http.StatusOK)
b, _ := json.Marshal(event)
w.Write(b)
b, _ := json.Marshal(event)
w.Write(b)
return nil
return nil
}

View File

@ -1,64 +1,64 @@
package v2
import (
"net/http"
"strconv"
"net/http"
"strconv"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"github.com/gorilla/mux"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"github.com/gorilla/mux"
)
func UpdateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
vars := mux.Vars(req)
key := "/" + vars["key"]
req.ParseForm()
req.ParseForm()
value := req.Form.Get("value")
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
}
value := req.Form.Get("value")
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
}
// Update should give at least one option
if value == "" && expireTime.Sub(store.Permanent) == 0 {
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
}
// Update should give at least one option
if value == "" && expireTime.Sub(store.Permanent) == 0 {
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
}
prevValue, valueOk := req.Form["prevValue"]
prevIndexStr, indexOk := req.Form["prevIndex"]
prevValue, valueOk := req.Form["prevValue"]
prevIndexStr, indexOk := req.Form["prevIndex"]
var c raft.Command
if !valueOk && !indexOk { // update without test
c = &store.UpdateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
var c raft.Command
if !valueOk && !indexOk { // update without test
c = &store.UpdateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
}
} else { // update with test
var prevIndex uint64
} else { // update with test
var prevIndex uint64
if indexOk {
prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
if indexOk {
prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
}
} else {
prevIndex = 0
}
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
}
} else {
prevIndex = 0
}
c = &store.TestAndSetCommand{
Key: key,
Value: value,
PrevValue: prevValue[0],
PrevIndex: prevIndex,
}
}
c = &store.TestAndSetCommand{
Key: key,
Value: value,
PrevValue: prevValue[0],
PrevIndex: prevIndex,
}
}
return s.Dispatch(c, w, req)
return s.Dispatch(c, w, req)
}

View File

@ -1,18 +1,18 @@
package v2
import (
"net/http"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"net/http"
)
// The Server interface provides all the methods required for the v2 API.
type Server interface {
State() string
Leader() string
CommitIndex() uint64
Term() uint64
PeerURL(string) (string, bool)
Store() *store.Store
Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
State() string
Leader() string
CommitIndex() uint64
Term() uint64
PeerURL(string) (string, bool)
Store() *store.Store
Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
}

View File

@ -7,7 +7,7 @@ import (
)
func init() {
raft.RegisterCommand(&CreateCommand{})
raft.RegisterCommand(&CreateCommand{})
}
// Create command

View File

@ -6,7 +6,7 @@ import (
)
func init() {
raft.RegisterCommand(&DeleteCommand{})
raft.RegisterCommand(&DeleteCommand{})
}
// The DeleteCommand removes a key from the Store.

View File

@ -77,5 +77,3 @@ func (event *Event) Response() interface{} {
return responses
}
}

View File

@ -1,112 +1,112 @@
package store
import (
"fmt"
"strings"
"sync"
"fmt"
"strings"
"sync"
etcdErr "github.com/coreos/etcd/error"
etcdErr "github.com/coreos/etcd/error"
)
type EventHistory struct {
Queue eventQueue
StartIndex uint64
LastIndex uint64
LastTerm uint64
DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue
rwl sync.RWMutex
Queue eventQueue
StartIndex uint64
LastIndex uint64
LastTerm uint64
DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue
rwl sync.RWMutex
}
func newEventHistory(capacity int) *EventHistory {
return &EventHistory{
Queue: eventQueue{
Capacity: capacity,
Events: make([]*Event, capacity),
},
}
return &EventHistory{
Queue: eventQueue{
Capacity: capacity,
Events: make([]*Event, capacity),
},
}
}
// addEvent function adds event into the eventHistory
func (eh *EventHistory) addEvent(e *Event) *Event {
eh.rwl.Lock()
defer eh.rwl.Unlock()
eh.rwl.Lock()
defer eh.rwl.Unlock()
var duped uint64
var duped uint64
if e.Index == UndefIndex {
e.Index = eh.LastIndex
e.Term = eh.LastTerm
duped = 1
}
if e.Index == UndefIndex {
e.Index = eh.LastIndex
e.Term = eh.LastTerm
duped = 1
}
eh.Queue.insert(e)
eh.Queue.insert(e)
eh.LastIndex = e.Index
eh.LastTerm = e.Term
eh.DupCnt += duped
eh.LastIndex = e.Index
eh.LastTerm = e.Term
eh.DupCnt += duped
eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
return e
return e
}
// scan function is enumerating events from the index in history and
// stops till the first point where the key has identified prefix
func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) {
eh.rwl.RLock()
defer eh.rwl.RUnlock()
eh.rwl.RLock()
defer eh.rwl.RUnlock()
start := index - eh.StartIndex
start := index - eh.StartIndex
// the index should locate after the event history's StartIndex
if start < 0 {
return nil,
etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
fmt.Sprintf("the requested history has been cleared [%v/%v]",
eh.StartIndex, index), UndefIndex, UndefTerm)
}
// the index should locate after the event history's StartIndex
if start < 0 {
return nil,
etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
fmt.Sprintf("the requested history has been cleared [%v/%v]",
eh.StartIndex, index), UndefIndex, UndefTerm)
}
// the index should locate before the size of the queue minus the duplicate count
if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index
return nil, nil
}
// the index should locate before the size of the queue minus the duplicate count
if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index
return nil, nil
}
i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
for {
e := eh.Queue.Events[i]
if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one
return e, nil
}
for {
e := eh.Queue.Events[i]
if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one
return e, nil
}
i = (i + 1) % eh.Queue.Capacity
i = (i + 1) % eh.Queue.Capacity
if i == eh.Queue.back() { // find nothing, return and watch from current index
return nil, nil
}
}
if i == eh.Queue.back() { // find nothing, return and watch from current index
return nil, nil
}
}
}
// clone will be protected by a stop-world lock
// do not need to obtain internal lock
func (eh *EventHistory) clone() *EventHistory {
clonedQueue := eventQueue{
Capacity: eh.Queue.Capacity,
Events: make([]*Event, eh.Queue.Capacity),
Size: eh.Queue.Size,
Front: eh.Queue.Front,
}
clonedQueue := eventQueue{
Capacity: eh.Queue.Capacity,
Events: make([]*Event, eh.Queue.Capacity),
Size: eh.Queue.Size,
Front: eh.Queue.Front,
}
for i, e := range eh.Queue.Events {
clonedQueue.Events[i] = e
}
for i, e := range eh.Queue.Events {
clonedQueue.Events[i] = e
}
return &EventHistory{
StartIndex: eh.StartIndex,
Queue: clonedQueue,
LastIndex: eh.LastIndex,
LastTerm: eh.LastTerm,
DupCnt: eh.DupCnt,
}
return &EventHistory{
StartIndex: eh.StartIndex,
Queue: clonedQueue,
LastIndex: eh.LastIndex,
LastTerm: eh.LastTerm,
DupCnt: eh.DupCnt,
}
}

View File

@ -1,26 +1,25 @@
package store
type eventQueue struct {
Events []*Event
Size int
Front int
Capacity int
Events []*Event
Size int
Front int
Capacity int
}
func (eq *eventQueue) back() int {
return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
}
func (eq *eventQueue) insert(e *Event) {
index := (eq.back() + 1) % eq.Capacity
index := (eq.back() + 1) % eq.Capacity
eq.Events[index] = e
eq.Events[index] = e
if eq.Size == eq.Capacity { //dequeue
eq.Front = (index + 1) % eq.Capacity
} else {
eq.Size++
}
if eq.Size == eq.Capacity { //dequeue
eq.Front = (index + 1) % eq.Capacity
} else {
eq.Size++
}
}

View File

@ -8,7 +8,7 @@ import (
)
func init() {
raft.RegisterCommand(&TestAndSetCommand{})
raft.RegisterCommand(&TestAndSetCommand{})
}
// The TestAndSetCommand performs a conditional update on a key in the store.

View File

@ -1,21 +1,20 @@
package store
import (
"strconv"
"time"
"strconv"
"time"
)
// Convert string duration to time format
func TTL(duration string) (time.Time, error) {
if duration != "" {
duration, err := strconv.Atoi(duration)
if err != nil {
return Permanent, err
}
return time.Now().Add(time.Second * (time.Duration)(duration)), nil
if duration != "" {
duration, err := strconv.Atoi(duration)
if err != nil {
return Permanent, err
}
return time.Now().Add(time.Second * (time.Duration)(duration)), nil
} else {
return Permanent, nil
}
} else {
return Permanent, nil
}
}

View File

@ -8,7 +8,7 @@ import (
)
func init() {
raft.RegisterCommand(&UpdateCommand{})
raft.RegisterCommand(&UpdateCommand{})
}
// The UpdateCommand updates the value of a key in the Store.

View File

@ -14,7 +14,6 @@ import (
// HTTP Utilities
//--------------------------------------
// sanitizeURL will cleanup a host string in the format hostname:port and
// attach a schema.
func sanitizeURL(host string, defaultScheme string) string {
@ -87,4 +86,3 @@ func runCPUProfile() {
}
}()
}