From a19048841f160fdb2b28b7a406f372156f9dd306 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Fri, 9 Aug 2013 16:25:07 -0700 Subject: [PATCH] feat(trasnport): add an independent node name Don't let the raft algorithm know anything about the transport. Give it a nodename instead. This will allow us to support more complex networking setups in the future. --- client_handlers.go | 8 ++--- command.go | 9 ++--- etcd.go | 90 ++++++++++++++++++++++++++++------------------ etcd_long_test.go | 2 +- machines.go | 12 +++---- raft_handlers.go | 16 +++++++-- test.go | 30 ++++++++++++---- transporter.go | 41 ++++++++++++++------- web/web.go | 3 +- 9 files changed, 141 insertions(+), 70 deletions(-) diff --git a/client_handlers.go b/client_handlers.go index 7bf83be57..941484c74 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -45,7 +45,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { return } - debugf("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key) + debugf("[recv] POST %v/v1/keys/%s", raftServer.Name(), key) value := req.FormValue("value") @@ -96,7 +96,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/v1/keys/"):] - debugf("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key) + debugf("[recv] DELETE %v/v1/keys/%s", raftServer.Name(), key) command := &DeleteCommand{ Key: key, @@ -172,9 +172,9 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) if client { clientAddr, _ := getClientAddr(raftServer.Leader()) - url = scheme + clientAddr + path + url = clientAddr + path } else { - url = scheme + raftServer.Leader() + path + url = raftServer.Leader() + path } debugf("Redirect to %s", url) diff --git a/command.go b/command.go index 8674ec9a9..29118c8bb 100644 --- a/command.go +++ b/command.go @@ -111,9 +111,8 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { // JoinCommand type JoinCommand struct { Name string `json:"name"` - Hostname string `json:"hostName"` - RaftPort int `json:"raftPort"` - ClientPort int `json:"clientPort"` + RaftURL string `json:"raftURL"` + ClientURL string `json:"clientURL"` } // The name of the join command in the log @@ -137,12 +136,14 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { return []byte("join fail"), fmt.Errorf(errors[103]) } + raftTransporter.AddPeer(c) + // add peer in raft err := raftServer.AddPeer(c.Name) // add machine in etcd storage key := path.Join("_etcd/machines", c.Name) - value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort) + value := fmt.Sprintf("server=%s&client=%s", c.RaftURL, c.ClientURL) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) return []byte("join success"), err diff --git a/etcd.go b/etcd.go index 9a34400ef..8e171d4e9 100644 --- a/etcd.go +++ b/etcd.go @@ -56,13 +56,14 @@ func init() { flag.BoolVar(&verbose, "v", false, "verbose logging") flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging") + flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma") flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma") - flag.StringVar(&argInfo.Hostname, "h", "0.0.0.0", "the hostname of the local machine") - flag.IntVar(&argInfo.ClientPort, "c", 4001, "the port to communicate with clients") - flag.IntVar(&argInfo.RaftPort, "s", 7001, "the port to communicate with servers") - flag.IntVar(&argInfo.WebPort, "w", -1, "the port of web interface (-1 means do not start web interface)") + flag.StringVar(&argInfo.Name, "n", "", "the node name (required)") + flag.StringVar(&argInfo.ClientURL, "c", "127.0.0.1:4001", "the port to communicate with clients") + flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the port to communicate with servers") + flag.StringVar(&argInfo.WebURL, "w", "", "the port of web interface") flag.StringVar(&argInfo.ServerCAFile, "serverCAFile", "", "the path of the CAFile") flag.StringVar(&argInfo.ServerCertFile, "serverCert", "", "the cert file of the server") @@ -111,10 +112,11 @@ const ( //------------------------------------------------------------------------------ type Info struct { - Hostname string `json:"hostname"` - RaftPort int `json:"raftPort"` - ClientPort int `json:"clientPort"` - WebPort int `json:"webPort"` + Name string `json:"name"` + + RaftURL string `json:"raftURL"` + ClientURL string `json:"clientURL"` + WebURL string `json:"webURL"` ServerCertFile string `json:"serverCertFile"` ServerKeyFile string `json:"serverKeyFile"` @@ -142,6 +144,21 @@ var info *Info // //------------------------------------------------------------------------------ +// Check a URL and clean it up if the user forgot the schema +func checkURL(u string, defaultSchema string) string { + p, err := url.Parse(u) + + if err != nil { + panic(err) + } + + if len(p.Host) == 0 && len(defaultSchema) != 0 { + return checkURL(fmt.Sprintf("%s://%s", defaultSchema, u), "") + } + + return p.String() +} + //-------------------------------------- // Main //-------------------------------------- @@ -184,6 +201,16 @@ func main() { cluster = strings.Split(string(b), ",") } + // Otherwise ask user for info and write it to file. + argInfo.Name = strings.TrimSpace(argInfo.Name) + + if argInfo.Name == "" { + fatal("Please give the name of the server") + } + + argInfo.RaftURL = checkURL(argInfo.RaftURL, "http") + argInfo.ClientURL = checkURL(argInfo.ClientURL, "http") + // Setup commands. registerCommands() @@ -209,11 +236,11 @@ func main() { startRaft(raftTlsConfs) - if argInfo.WebPort != -1 { + if argInfo.WebURL != "" { // start web etcdStore.SetMessager(storeMsg) go webHelper() - go web.Start(raftServer, argInfo.WebPort) + go web.Start(raftServer, argInfo.WebURL) } startEtcdTransport(*info, etcdTlsConfs[0]) @@ -224,7 +251,7 @@ func main() { func startRaft(tlsConfs []*tls.Config) { var err error - raftName := fmt.Sprintf("%s:%d", info.Hostname, info.RaftPort) + raftName := info.Name // Create transporter for raft raftTransporter = newTransporter(tlsConfs[1]) @@ -262,10 +289,9 @@ func startRaft(tlsConfs []*tls.Config) { // leader need to join self as a peer for { command := &JoinCommand{ - Name: raftServer.Name(), - Hostname: argInfo.Hostname, - RaftPort: argInfo.RaftPort, - ClientPort: argInfo.ClientPort, + Name: raftServer.Name(), + RaftURL: argInfo.RaftURL, + ClientURL: argInfo.ClientURL, } _, err := raftServer.Do(command) if err == nil { @@ -333,6 +359,8 @@ func startRaft(tlsConfs []*tls.Config) { func newTransporter(tlsConf *tls.Config) transporter { t := transporter{} + t.names = make(map[string]*JoinCommand) + if tlsConf == nil { t.scheme = "http://" @@ -366,6 +394,7 @@ func dialTimeout(network, addr string) (net.Conn, error) { func startRaftTransport(info Info, tlsConf *tls.Config) { // internal commands + http.HandleFunc("/name", NameHttpHandler) http.HandleFunc("/join", JoinHttpHandler) http.HandleFunc("/vote", VoteHttpHandler) http.HandleFunc("/log", GetLogHttpHandler) @@ -374,16 +403,16 @@ func startRaftTransport(info Info, tlsConf *tls.Config) { http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler) http.HandleFunc("/client", ClientHttpHandler) - if tlsConf == nil { - fmt.Printf("raft server [%s] listen on http port %v\n", info.Hostname, info.RaftPort) - fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.RaftPort), nil)) + u, _ := url.Parse(info.RaftURL) + fmt.Printf("raft server [%s] listening on %s\n", info.Name, u) + if tlsConf == nil { + http.ListenAndServe(u.Host, nil) } else { server := &http.Server{ TLSConfig: tlsConf, - Addr: fmt.Sprintf(":%d", info.RaftPort), + Addr: u.Host, } - fmt.Printf("raft server [%s] listen on https port %v\n", info.Hostname, info.RaftPort) fatal(server.ListenAndServeTLS(info.ServerCertFile, argInfo.ServerKeyFile)) } @@ -400,15 +429,16 @@ func startEtcdTransport(info Info, tlsConf *tls.Config) { http.HandleFunc("/stats", StatsHttpHandler) http.HandleFunc("/test/", TestHttpHandler) + u, _ := url.Parse(info.ClientURL) + fmt.Printf("raft server [%s] listening on %s\n", info.Name, u) + if tlsConf == nil { - fmt.Printf("etcd [%s] listen on http port %v\n", info.Hostname, info.ClientPort) - fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.ClientPort), nil)) + fatal(http.ListenAndServe(u.Host, nil)) } else { server := &http.Server{ TLSConfig: tlsConf, - Addr: fmt.Sprintf(":%d", info.ClientPort), + Addr: u.Host, } - fmt.Printf("etcd [%s] listen on https port %v\n", info.Hostname, info.ClientPort) fatal(server.ListenAndServeTLS(info.ClientCertFile, info.ClientKeyFile)) } } @@ -518,13 +548,6 @@ func getInfo(path string) *Info { return info } - // Otherwise ask user for info and write it to file. - argInfo.Hostname = strings.TrimSpace(argInfo.Hostname) - - if argInfo.Hostname == "" { - fatal("Please give the address of the local machine") - } - info = &argInfo // Write to file. @@ -567,9 +590,8 @@ func joinCluster(s *raft.Server, serverName string) error { command := &JoinCommand{ Name: s.Name(), - Hostname: info.Hostname, - RaftPort: info.RaftPort, - ClientPort: info.ClientPort, + RaftURL: info.RaftURL, + ClientURL: info.ClientURL, } json.NewEncoder(&b).Encode(command) diff --git a/etcd_long_test.go b/etcd_long_test.go index e80643a37..0247332ad 100644 --- a/etcd_long_test.go +++ b/etcd_long_test.go @@ -36,7 +36,7 @@ func TestKillLeader(t *testing.T) { leader := "127.0.0.1:7001" - for i := 0; i < 10; i++ { + for i := 0; i < clusterSize; i++ { port, _ := strconv.Atoi(strings.Split(leader, ":")[1]) num := port - 7001 fmt.Println("kill server ", num) diff --git a/machines.go b/machines.go index dc358a8e3..bb2bdb3c6 100644 --- a/machines.go +++ b/machines.go @@ -1,20 +1,20 @@ package main import ( - "fmt" "path" - "strings" + "net/url" ) func getClientAddr(name string) (string, bool) { response, _ := etcdStore.RawGet(path.Join("_etcd/machines", name)) - values := strings.Split(response[0].Value, ",") + m, err := url.ParseQuery(response[0].Value) - hostname := values[0] - clientPort := values[2] + if err != nil { + panic("Failed to parse machines entry") + } - addr := fmt.Sprintf("%s:%s", hostname, clientPort) + addr := m["client"][0] return addr, true } diff --git a/raft_handlers.go b/raft_handlers.go index e535d3871..dbe30e12c 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -4,7 +4,6 @@ import ( "encoding/json" "github.com/coreos/go-raft" "net/http" - "strconv" ) //------------------------------------------------------------- @@ -91,7 +90,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { func ClientHttpHandler(w http.ResponseWriter, req *http.Request) { debugf("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name()) w.WriteHeader(http.StatusOK) - client := argInfo.Hostname + ":" + strconv.Itoa(argInfo.ClientPort) + client := argInfo.ClientURL w.Write([]byte(client)) } @@ -108,3 +107,16 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { return } } + +// Response to the join request +func NameHttpHandler(w http.ResponseWriter, req *http.Request) { + command := &JoinCommand{} + + if err := decodeJsonRequest(req, command); err == nil { + debugf("Receive Join Request from %s", command.Name) + dispatch(command, &w, req, false) + } else { + w.WriteHeader(http.StatusInternalServerError) + return + } +} diff --git a/test.go b/test.go index 279bccb97..e34f0b6bb 100644 --- a/test.go +++ b/test.go @@ -9,6 +9,7 @@ import ( "os" "strconv" "time" + "net/url" ) var client = http.Client{ @@ -59,10 +60,10 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, argGroup := make([][]string, size) for i := 0; i < size; i++ { if i == 0 { - argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-d=/tmp/node1"} + argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1", "-vv"} } else { strI := strconv.Itoa(i + 1) - argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"} + argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=http://127.0.0.1:7001"} } } @@ -103,7 +104,7 @@ func destroyCluster(etcds []*os.Process) error { // func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) { leaderMap := make(map[int]string) - baseAddrFormat := "http://0.0.0.0:400%d/leader" + baseAddrFormat := "http://0.0.0.0:400%d" for { knownLeader := "unknown" @@ -151,7 +152,7 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) { func getLeader(addr string) (string, error) { - resp, err := client.Get(addr) + resp, err := client.Get(addr + "/leader") if err != nil { return "", err @@ -163,14 +164,31 @@ func getLeader(addr string) (string, error) { } b, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() + c := etcd.NewClient() + path := "/_etcd/machines/" + string(b) + fmt.Println(path) + fmt.Println(addr) + response, err := c.GetFrom(path, addr) + fmt.Println(response) + if err != nil { + return "", err + } + + m, err := url.ParseQuery(response[0].Value) + + if err != nil { + panic("Failed to parse machines entry") + } + + addr = m["server"][0] + if err != nil { return "", err } - return string(b), nil + return addr, nil } diff --git a/transporter.go b/transporter.go index 012f53171..cc04ce8c6 100644 --- a/transporter.go +++ b/transporter.go @@ -15,6 +15,19 @@ type transporter struct { client *http.Client // scheme scheme string + names map[string]*JoinCommand +} + +func (t transporter) NameToRaftURL(name string) string { + return t.names[name].RaftURL +} + +func (t transporter) NameToClientURL(name string) string { + return t.names[name].ClientURL +} + +func (t transporter) AddPeer(jc *JoinCommand) { + t.names[jc.Name] = jc } // Sends AppendEntries RPCs to a peer when the server is the leader. @@ -23,12 +36,13 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debugf("Send LogEntries to %s ", peer.Name()) + u := t.NameToRaftURL(peer.Name()) + debugf("Send LogEntries to %s ", u) - resp, err := t.Post(fmt.Sprintf("%s/log/append", peer.Name()), &b) + resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) if err != nil { - debugf("Cannot send AppendEntriesRequest to %s : %s", peer.Name(), err) + debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) } if resp != nil { @@ -48,12 +62,13 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req * var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debugf("Send Vote to %s", peer.Name()) + u := t.NameToRaftURL(peer.Name()) + debugf("Send Vote to %s", u) - resp, err := t.Post(fmt.Sprintf("%s/vote", peer.Name()), &b) + resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b) if err != nil { - debugf("Cannot send VoteRequest to %s : %s", peer.Name(), err) + debugf("Cannot send VoteRequest to %s : %s", u, err) } if resp != nil { @@ -73,10 +88,11 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(), + u := t.NameToRaftURL(peer.Name()) + debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex) - resp, err := t.Post(fmt.Sprintf("%s/snapshot", peer.Name()), &b) + resp, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) if resp != nil { defer resp.Body.Close() @@ -95,10 +111,11 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(), + u := t.NameToRaftURL(peer.Name()) + debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex) - resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", peer.Name()), &b) + resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) if resp != nil { defer resp.Body.Close() @@ -123,12 +140,12 @@ func (t transporter) GetLeaderClientAddress() string { // Send server side POST request func (t transporter) Post(path string, body io.Reader) (*http.Response, error) { - resp, err := t.client.Post(t.scheme+path, "application/json", body) + resp, err := t.client.Post(path, "application/json", body) return resp, err } // Send server side GET request func (t transporter) Get(path string) (*http.Response, error) { - resp, err := t.client.Get(t.scheme + path) + resp, err := t.client.Get(path) return resp, err } diff --git a/web/web.go b/web/web.go index bd7d74297..a9eb2adc8 100644 --- a/web/web.go +++ b/web/web.go @@ -24,7 +24,8 @@ func mainHandler(c http.ResponseWriter, req *http.Request) { mainTempl.Execute(c, p) } -func Start(server *raft.Server, port int) { +func Start(server *raft.Server, webURL string) { + port := "4002" mainTempl = template.Must(template.New("index.html").Parse(index_html)) s = server