Intermediate commit.

release-0.4
Ben Johnson 2013-10-12 13:35:23 -06:00
parent eb78d96a20
commit bb9401544a
8 changed files with 122 additions and 72 deletions

View File

@ -185,12 +185,15 @@ func main() {
// Create peer server.
ps := NewPeerServer(info.Name, dirPath, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS, registry)
ps.MaxClusterSize = maxClusterSize
ps.RetryTimes = retryTimes
s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r)
if err := e.AllowOrigins(cors); err != nil {
panic(err)
}
ps.SetServer(server)
ps.ListenAndServe(snapshot)
s.ListenAndServe()
}

View File

@ -19,16 +19,14 @@ type JoinCommand struct {
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
MaxClusterSize int `json:"maxClusterSize"`
}
func NewJoinCommand(version, name, raftUrl, etcdUrl string, maxClusterSize int) *JoinCommand {
func NewJoinCommand(version, name, raftUrl, etcdUrl string) *JoinCommand {
return &JoinCommand{
RaftVersion: version,
Name: name,
RaftURL: raftUrl,
EtcdURL: etcdUrl,
MaxClusterSize: maxClusterSize,
}
}
@ -51,7 +49,7 @@ func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
}
// Check machine number in the cluster
if ps.registry.Count() == c.MaxClusterSize {
if ps.registry.Count() == ps.MaxClusterSize {
log.Debug("Reject join request from ", c.Name)
return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term())
}

View File

@ -19,6 +19,7 @@ import (
type PeerServer struct {
*raft.Server
server Server
joinIndex uint64
name string
url string
@ -31,6 +32,7 @@ type PeerServer struct {
store *store.Store
snapConf *snapshotConf
MaxClusterSize int
RetryTimes int
}
// TODO: find a good policy to do snapshot
@ -140,6 +142,16 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
}
// Retrieves the underlying Raft server.
func (s *PeerServer) RaftServer() *raft.Server {
return s.Server
}
// Associates the client server with the peer server.
func (s *PeerServer) SetServer(server Server) {
s.server = server
}
// Get all the current logs
func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] GET %s/log", s.url)
@ -223,7 +235,7 @@ func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *htt
func (s *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/etcdURL/ ", s.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(argInfo.EtcdURL))
w.Write([]byte(s.server.URL()))
}
// Response to the join request
@ -271,13 +283,13 @@ func (s *PeerServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Req
}
func (s *PeerServer) dispatchRaftCommand(c raft.Command, w http.ResponseWriter, req *http.Request) error {
return s.dispatch(c, w, req, nameToRaftURL)
return s.dispatch(c, w, req)
}
func (s *PeerServer) startAsLeader() {
// leader need to join self as a peer
for {
_, err := s.Do(newJoinCommand(PeerVersion, s.Name(), s.url, e.url))
_, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL()))
if err == nil {
break
}
@ -287,7 +299,7 @@ func (s *PeerServer) startAsLeader() {
func (s *PeerServer) startAsFollower(cluster []string) {
// start as a follower in a existing cluster
for i := 0; i < retryTimes; i++ {
for i := 0; i < s.RetryTimes; i++ {
ok := s.joinCluster(cluster)
if ok {
return
@ -296,12 +308,12 @@ func (s *PeerServer) startAsFollower(cluster []string) {
time.Sleep(time.Second * RetryInterval)
}
fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
log.Fatalf("Cannot join the cluster via given machines after %x retries", s.RetryTimes)
}
// Start to listen and response raft command
func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) {
infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url)
log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url)
raftMux := http.NewServeMux()
@ -324,9 +336,9 @@ func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) {
raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
if scheme == "http" {
fatal(server.ListenAndServe())
log.Fatal(server.ListenAndServe())
} else {
fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile))
log.Fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile))
}
}
@ -336,11 +348,9 @@ func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) {
// version clusters.
func getVersion(t *transporter, versionURL url.URL) (string, error) {
resp, req, err := t.Get(versionURL.String())
if err != nil {
return "", err
}
defer resp.Body.Close()
t.CancelWhenTimeout(req)
@ -363,7 +373,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
} else {
if _, ok := err.(etcdErr.Error); ok {
fatal(err)
log.Fatal(err)
}
log.Debugf("cannot join to cluster via machine %s %s", machine, err)
@ -392,7 +402,7 @@ func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme s
return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
}
json.NewEncoder(&b).Encode(newJoinCommand(PeerVersion, server.Name(), server.url, e.url))
json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
@ -419,7 +429,7 @@ func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme s
address := resp.Header.Get("Location")
log.Debugf("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(newJoinCommand(PeerVersion, server.Name(), server.url, e.url))
json.NewEncoder(&b).Encode(newJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
resp, req, err = t.Post(address, &b)
@ -472,3 +482,81 @@ func (s *PeerServer) monitorSnapshot() {
}
}
}
func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
if r.State() == raft.Leader {
if response, err := r.Do(c); err != nil {
return err
} else {
if response == nil {
return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm)
}
event, ok := response.(*store.Event)
if ok {
bytes, err := json.Marshal(event)
if err != nil {
fmt.Println(err)
}
w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
w.WriteHeader(http.StatusOK)
w.Write(bytes)
return nil
}
bytes, _ := response.([]byte)
w.WriteHeader(http.StatusOK)
w.Write(bytes)
return nil
}
} else {
leader := r.Leader()
// current no leader
if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
}
url, _ := s.registry.PeerURL(leader)
redirect(url, w, req)
return nil
}
}
type errorHandler func(http.ResponseWriter, *http.Request) error
// addCorsHeader parses the request Origin header and loops through the user
// provided allowed origins and sets the Access-Control-Allow-Origin header if
// there is a match.
func addCorsHeader(w http.ResponseWriter, r *http.Request) {
val, ok := corsList["*"]
if val && ok {
w.Header().Add("Access-Control-Allow-Origin", "*")
return
}
requestOrigin := r.Header.Get("Origin")
val, ok = corsList[requestOrigin]
if val && ok {
w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
return
}
}
func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
addCorsHeader(w, r)
if e := fn(w, r); e != nil {
if etcdErr, ok := e.(*etcdErr.Error); ok {
debug("Return error: ", (*etcdErr).Error())
etcdErr.Write(w)
} else {
http.Error(w, e.Error(), http.StatusInternalServerError)
}
}
}

View File

@ -159,7 +159,7 @@ func (r *Registry) load(name string) {
}
// Create node.
r.nodes[name] := &node{
r.nodes[name] = &node{
url: m["etcd"][0],
peerURL: m["raft"][0],
peerVersion: m["raftVersion"][0],

View File

@ -2,7 +2,6 @@ package server
import (
"encoding/binary"
"path"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"

View File

@ -12,6 +12,7 @@ import (
type Server interface {
CommitIndex() uint64
Term() uint64
URL() string
Dispatch(raft.Command, http.ResponseWriter, *http.Request)
}
@ -49,12 +50,17 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI
// The current Raft committed index.
func (s *server) CommitIndex() uint64 {
return c.raftServer.CommitIndex()
return s.raftServer.CommitIndex()
}
// The current Raft term.
func (s *server) Term() uint64 {
return c.raftServer.Term()
return s.raftServer.Term()
}
// The server URL.
func (s *server) URL() string {
return s.url
}
func (s *server) installV1() {

View File

@ -15,3 +15,10 @@ func decodeJsonRequest(req *http.Request, data interface{}) error {
return nil
}
func redirect(hostname string, w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
url := hostname + path
debugf("Redirect to %s", url)
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
}

51
util.go
View File

@ -41,57 +41,6 @@ func durationToExpireTime(strDuration string) (time.Time, error) {
// HTTP Utilities
//--------------------------------------
func (r *raftServer) dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
if r.State() == raft.Leader {
if response, err := r.Do(c); err != nil {
return err
} else {
if response == nil {
return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm)
}
event, ok := response.(*store.Event)
if ok {
bytes, err := json.Marshal(event)
if err != nil {
fmt.Println(err)
}
w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
w.WriteHeader(http.StatusOK)
w.Write(bytes)
return nil
}
bytes, _ := response.([]byte)
w.WriteHeader(http.StatusOK)
w.Write(bytes)
return nil
}
} else {
leader := r.Leader()
// current no leader
if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
}
url, _ := toURL(leader)
redirect(url, w, req)
return nil
}
}
func redirect(hostname string, w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
url := hostname + path
debugf("Redirect to %s", url)
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
}
// sanitizeURL will cleanup a host string in the format hostname:port and
// attach a schema.