diff --git a/command.go b/command.go index 46da7329d..480d9db70 100644 --- a/command.go +++ b/command.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "fmt" + etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/store" "github.com/coreos/go-raft" "path" @@ -147,7 +148,8 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { // check machine number in the cluster num := machineNum() if num == maxClusterSize { - return []byte("join fail"), fmt.Errorf(errors[103]) + debug("Reject join request from ", c.Name) + return []byte("join fail"), etcdErr.NewError(103, "") } addNameToURL(c.Name, c.RaftURL, c.EtcdURL) diff --git a/error.go b/error/error.go similarity index 63% rename from error.go rename to error/error.go index a748be48a..dc209f208 100644 --- a/error.go +++ b/error/error.go @@ -1,11 +1,14 @@ -package main +package error import ( "encoding/json" + "net/http" ) var errors map[int]string +const () + func init() { errors = make(map[int]string) @@ -33,17 +36,39 @@ func init() { } -type jsonError struct { +type Error struct { ErrorCode int `json:"errorCode"` Message string `json:"message"` Cause string `json:"cause,omitempty"` } -func newJsonError(errorCode int, cause string) []byte { - b, _ := json.Marshal(jsonError{ +func NewError(errorCode int, cause string) Error { + return Error{ ErrorCode: errorCode, Message: errors[errorCode], Cause: cause, - }) - return b + } +} + +func Message(code int) string { + return errors[code] +} + +// Only for error interface +func (e Error) Error() string { + return e.Message +} + +func (e Error) toJsonString() string { + b, _ := json.Marshal(e) + return string(b) +} + +func (e Error) Write(w http.ResponseWriter) { + // 3xx is reft internal error + if e.ErrorCode/100 == 3 { + http.Error(w, e.toJsonString(), http.StatusInternalServerError) + } else { + http.Error(w, e.toJsonString(), http.StatusBadRequest) + } } diff --git a/etcd_handlers.go b/etcd_handlers.go index 7b2d7a267..1a606bf55 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -2,6 +2,7 @@ package main import ( "fmt" + etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/store" "github.com/coreos/go-raft" "net/http" @@ -16,29 +17,42 @@ import ( func NewEtcdMuxer() *http.ServeMux { // external commands etcdMux := http.NewServeMux() - etcdMux.HandleFunc("/"+version+"/keys/", Multiplexer) - etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler) - etcdMux.HandleFunc("/leader", LeaderHttpHandler) - etcdMux.HandleFunc("/machines", MachinesHttpHandler) - etcdMux.HandleFunc("/version", VersionHttpHandler) - etcdMux.HandleFunc("/stats", StatsHttpHandler) + etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer)) + etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler)) + etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler)) + etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler)) + etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler)) + etcdMux.Handle("/version", errorHandler(VersionHttpHandler)) etcdMux.HandleFunc("/test/", TestHttpHandler) return etcdMux } +type errorHandler func(http.ResponseWriter, *http.Request) error + +func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if e := fn(w, r); e != nil { + if etcdErr, ok := e.(etcdErr.Error); ok { + debug("Return error: ", etcdErr.Error()) + etcdErr.Write(w) + } else { + http.Error(w, e.Error(), http.StatusInternalServerError) + } + } +} + // Multiplex GET/POST/DELETE request to corresponding handlers -func Multiplexer(w http.ResponseWriter, req *http.Request) { +func Multiplexer(w http.ResponseWriter, req *http.Request) error { switch req.Method { case "GET": - GetHttpHandler(&w, req) + return GetHttpHandler(w, req) case "POST": - SetHttpHandler(&w, req) + return SetHttpHandler(w, req) case "DELETE": - DeleteHttpHandler(&w, req) + return DeleteHttpHandler(w, req) default: w.WriteHeader(http.StatusMethodNotAllowed) - return + return nil } } @@ -48,15 +62,11 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) { //-------------------------------------- // Set Command Handler -func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { +func SetHttpHandler(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/keys/"):] if store.CheckKeyword(key) { - - (*w).WriteHeader(http.StatusBadRequest) - - (*w).Write(newJsonError(400, "Set")) - return + return etcdErr.NewError(400, "Set") } debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) @@ -64,10 +74,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { value := req.FormValue("value") if len(value) == 0 { - (*w).WriteHeader(http.StatusBadRequest) - - (*w).Write(newJsonError(200, "Set")) - return + return etcdErr.NewError(200, "Set") } prevValue := req.FormValue("prevValue") @@ -77,11 +84,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { expireTime, err := durationToExpireTime(strDuration) if err != nil { - - (*w).WriteHeader(http.StatusBadRequest) - - (*w).Write(newJsonError(202, "Set")) - return + return etcdErr.NewError(202, "Set") } if len(prevValue) != 0 { @@ -92,7 +95,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { ExpireTime: expireTime, } - dispatch(command, w, req, true) + return dispatch(command, w, req, true) } else { command := &SetCommand{ @@ -101,13 +104,12 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { ExpireTime: expireTime, } - dispatch(command, w, req, true) + return dispatch(command, w, req, true) } - } // Delete Handler -func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { +func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/keys/"):] debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) @@ -116,76 +118,40 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { Key: key, } - dispatch(command, w, req, true) + return dispatch(command, w, req, true) } // Dispatch the command to leader -func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) { +func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) error { if r.State() == raft.Leader { if body, err := r.Do(c); err != nil { - - if _, ok := err.(store.NotFoundError); ok { - (*w).WriteHeader(http.StatusNotFound) - (*w).Write(newJsonError(100, err.Error())) - return - } - - if _, ok := err.(store.TestFail); ok { - (*w).WriteHeader(http.StatusBadRequest) - (*w).Write(newJsonError(101, err.Error())) - return - } - - if _, ok := err.(store.NotFile); ok { - (*w).WriteHeader(http.StatusBadRequest) - (*w).Write(newJsonError(102, err.Error())) - return - } - if err.Error() == errors[103] { - (*w).WriteHeader(http.StatusBadRequest) - (*w).Write(newJsonError(103, "")) - return - } - (*w).WriteHeader(http.StatusInternalServerError) - (*w).Write(newJsonError(300, err.Error())) - return + return err } else { - if body == nil { - (*w).WriteHeader(http.StatusNotFound) - (*w).Write(newJsonError(300, "Empty result from raft")) + return etcdErr.NewError(300, "Empty result from raft") } else { - body, ok := body.([]byte) - // this should not happen - if !ok { - panic("wrong type") - } - (*w).WriteHeader(http.StatusOK) - (*w).Write(body) + body, _ := body.([]byte) + w.WriteHeader(http.StatusOK) + w.Write(body) + return nil } - return } + } else { leader := r.Leader() // current no leader if leader == "" { - (*w).WriteHeader(http.StatusInternalServerError) - (*w).Write(newJsonError(300, "")) - return + return etcdErr.NewError(300, "") } // tell the client where is the leader - path := req.URL.Path var url string if etcd { etcdAddr, _ := nameToEtcdURL(leader) - if etcdAddr == "" { - panic(leader) - } url = etcdAddr + path } else { raftAddr, _ := nameToRaftURL(leader) @@ -194,12 +160,10 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) { debugf("Redirect to %s", url) - http.Redirect(*w, req, url, http.StatusTemporaryRedirect) - return + http.Redirect(w, req, url, http.StatusTemporaryRedirect) + return nil } - (*w).WriteHeader(http.StatusInternalServerError) - (*w).Write(newJsonError(300, "")) - return + return etcdErr.NewError(300, "") } //-------------------------------------- @@ -210,44 +174,44 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) { //-------------------------------------- // Handler to return the current leader's raft address -func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) { +func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { leader := r.Leader() if leader != "" { w.WriteHeader(http.StatusOK) raftURL, _ := nameToRaftURL(leader) w.Write([]byte(raftURL)) + return nil } else { - - // not likely, but it may happen - w.WriteHeader(http.StatusInternalServerError) - w.Write(newJsonError(301, "")) + return etcdErr.NewError(301, "") } } // Handler to return all the known machines in the current cluster -func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) { +func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { machines := getMachines() w.WriteHeader(http.StatusOK) w.Write([]byte(strings.Join(machines, ", "))) + return nil } // Handler to return the current version of etcd -func VersionHttpHandler(w http.ResponseWriter, req *http.Request) { +func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) - w.Write([]byte(fmt.Sprintf("etcd %s", releaseVersion))) - w.Write([]byte(fmt.Sprintf("etcd API %s", version))) + fmt.Fprintf(w, "etcd %s", releaseVersion) + return nil } // Handler to return the basic stats of etcd -func StatsHttpHandler(w http.ResponseWriter, req *http.Request) { +func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) w.Write(etcdStore.Stats()) + return nil } // Get Handler -func GetHttpHandler(w *http.ResponseWriter, req *http.Request) { +func GetHttpHandler(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/keys/"):] debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) @@ -257,31 +221,19 @@ func GetHttpHandler(w *http.ResponseWriter, req *http.Request) { } if body, err := command.Apply(r.Server); err != nil { - - if _, ok := err.(store.NotFoundError); ok { - (*w).WriteHeader(http.StatusNotFound) - (*w).Write(newJsonError(100, err.Error())) - return - } - - (*w).WriteHeader(http.StatusInternalServerError) - (*w).Write(newJsonError(300, "")) - + return err } else { - body, ok := body.([]byte) - if !ok { - panic("wrong type") - } - - (*w).WriteHeader(http.StatusOK) - (*w).Write(body) + body, _ := body.([]byte) + w.WriteHeader(http.StatusOK) + w.Write(body) + return nil } } // Watch handler -func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { +func WatchHttpHandler(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/watch/"):] command := &WatchCommand{ @@ -300,28 +252,23 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { sinceIndex, err := strconv.ParseUint(string(content), 10, 64) if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write(newJsonError(203, "Watch From Index")) + return etcdErr.NewError(203, "Watch From Index") } command.SinceIndex = sinceIndex } else { w.WriteHeader(http.StatusMethodNotAllowed) - return + return nil } if body, err := command.Apply(r.Server); err != nil { - w.WriteHeader(http.StatusInternalServerError) - w.Write(newJsonError(500, key)) + return etcdErr.NewError(500, key) } else { w.WriteHeader(http.StatusOK) - body, ok := body.([]byte) - if !ok { - panic("wrong type") - } - + body, _ := body.([]byte) w.Write(body) + return nil } } diff --git a/raft_handlers.go b/raft_handlers.go index 75d69bb5e..30272d420 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -94,16 +94,16 @@ func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { } // Response to the join request -func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { +func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error { command := &JoinCommand{} if err := decodeJsonRequest(req, command); err == nil { debugf("Receive Join Request from %s", command.Name) - dispatch(command, &w, req, false) + return dispatch(command, w, req, false) } else { w.WriteHeader(http.StatusInternalServerError) - return + return nil } } diff --git a/raft_server.go b/raft_server.go index 00d07f59b..1bd190f56 100644 --- a/raft_server.go +++ b/raft_server.go @@ -5,11 +5,11 @@ import ( "crypto/tls" "encoding/json" "fmt" + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/go-raft" "net/http" "net/url" "time" - - "github.com/coreos/go-raft" ) type raftServer struct { @@ -67,55 +67,10 @@ func (r *raftServer) ListenAndServe() { // start as a leader in a new cluster if len(cluster) == 0 { + startAsLeader() - time.Sleep(time.Millisecond * 20) - - // leader need to join self as a peer - for { - _, err := r.Do(newJoinCommand()) - if err == nil { - break - } - } - debugf("%s start as a leader", r.name) - - // start as a follower in a existing cluster } else { - - time.Sleep(time.Millisecond * 20) - - var err error - - for i := 0; i < retryTimes; i++ { - - success := false - for _, machine := range cluster { - if len(machine) == 0 { - continue - } - err = joinCluster(r.Server, machine, r.tlsConf.Scheme) - if err != nil { - if err.Error() == errors[103] { - fatal(err) - } - debugf("cannot join to cluster via machine %s %s", machine, err) - } else { - success = true - break - } - } - - if success { - break - } - - warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) - time.Sleep(time.Second * RetryInterval) - } - if err != nil { - fatalf("Cannot join the cluster via given machines after %x retries", retryTimes) - } - debugf("%s success join to the cluster", r.name) + startAsFollower() } } else { @@ -133,6 +88,47 @@ func (r *raftServer) ListenAndServe() { } +func startAsLeader() { + // leader need to join self as a peer + for { + _, err := r.Do(newJoinCommand()) + if err == nil { + break + } + } + debugf("%s start as a leader", r.name) +} + +func startAsFollower() { + // start as a follower in a existing cluster + for i := 0; i < retryTimes; i++ { + + for _, machine := range cluster { + + if len(machine) == 0 { + continue + } + + err := joinCluster(r.Server, machine, r.tlsConf.Scheme) + if err == nil { + debugf("%s success join to the cluster via machine %s", r.name, machine) + return + + } else { + if _, ok := err.(etcdErr.Error); ok { + fatal(err) + } + debugf("cannot join to cluster via machine %s %s", machine, err) + } + } + + warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) + time.Sleep(time.Second * RetryInterval) + } + + fatalf("Cannot join the cluster via given machines after %x retries", retryTimes) +} + // Start to listen and response raft command func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { u, _ := url.Parse(r.url) @@ -148,7 +144,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { // internal commands raftMux.HandleFunc("/name", NameHttpHandler) - raftMux.HandleFunc("/join", JoinHttpHandler) + raftMux.Handle("/join", errorHandler(JoinHttpHandler)) raftMux.HandleFunc("/vote", VoteHttpHandler) raftMux.HandleFunc("/log", GetLogHttpHandler) raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler) @@ -171,11 +167,7 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error { json.NewEncoder(&b).Encode(newJoinCommand()) // t must be ok - t, ok := r.Transporter().(transporter) - - if !ok { - panic("wrong type") - } + t, _ := r.Transporter().(transporter) joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"} @@ -203,7 +195,10 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error { } else if resp.StatusCode == http.StatusBadRequest { debug("Reach max number machines in the cluster") - return fmt.Errorf(errors[103]) + decoder := json.NewDecoder(resp.Body) + err := &etcdErr.Error{} + decoder.Decode(err) + return *err } else { return fmt.Errorf("Unable to join") } diff --git a/store/error.go b/store/error.go deleted file mode 100644 index 2ad4b0aef..000000000 --- a/store/error.go +++ /dev/null @@ -1,25 +0,0 @@ -package store - -type NotFoundError string - -func (e NotFoundError) Error() string { - return string(e) -} - -type NotFile string - -func (e NotFile) Error() string { - return string(e) -} - -type TestFail string - -func (e TestFail) Error() string { - return string(e) -} - -type Keyword string - -func (e Keyword) Error() string { - return string(e) -} diff --git a/store/store.go b/store/store.go index 5447649c0..d37345f4d 100644 --- a/store/store.go +++ b/store/store.go @@ -3,6 +3,7 @@ package store import ( "encoding/json" "fmt" + etcdErr "github.com/coreos/etcd/error" "path" "strconv" "sync" @@ -239,8 +240,7 @@ func (s *Store) internalSet(key string, value string, expireTime time.Time, inde ok := s.Tree.set(key, Node{value, expireTime, update}) if !ok { - err := NotFile(key) - return nil, err + return nil, etcdErr.NewError(102, "set: "+key) } if isExpire { @@ -393,8 +393,7 @@ func (s *Store) RawGet(key string) ([]*Response, error) { return resps, nil } - err := NotFoundError(key) - return nil, err + return nil, etcdErr.NewError(100, "get: "+key) } func (s *Store) Delete(key string, index uint64) ([]byte, error) { @@ -451,8 +450,7 @@ func (s *Store) internalDelete(key string, index uint64) ([]byte, error) { return msg, err } else { - err := NotFoundError(key) - return nil, err + return nil, etcdErr.NewError(100, "delete: "+key) } } @@ -467,8 +465,7 @@ func (s *Store) TestAndSet(key string, prevValue string, value string, expireTim resp := s.internalGet(key) if resp == nil { - err := NotFoundError(key) - return nil, err + return nil, etcdErr.NewError(100, "testandset: "+key) } if resp.Value == prevValue { @@ -478,8 +475,8 @@ func (s *Store) TestAndSet(key string, prevValue string, value string, expireTim } else { // If fails, return err - err := TestFail(fmt.Sprintf("TestAndSet: %s!=%s", resp.Value, prevValue)) - return nil, err + return nil, etcdErr.NewError(101, fmt.Sprintf("TestAndSet: %s!=%s", + resp.Value, prevValue)) } } diff --git a/test/test.go b/test/test.go index fc96509f3..acb212d48 100644 --- a/test/test.go +++ b/test/test.go @@ -176,7 +176,7 @@ func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, func getLeader(addr string) (string, error) { - resp, err := client.Get(addr + "/leader") + resp, err := client.Get(addr + "/v1/leader") if err != nil { return "", err diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index b6a66ccb4..c4150c09b 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -116,7 +116,7 @@ func (c *Client) SyncCluster() bool { // sync cluster information by providing machine list func (c *Client) internalSyncCluster(machines []string) bool { for _, machine := range machines { - httpPath := c.createHttpPath(machine, "machines") + httpPath := c.createHttpPath(machine, "v1/machines") resp, err := c.httpClient.Get(httpPath) if err != nil { // try another machine in the cluster diff --git a/third_party/github.com/coreos/go-etcd/examples/mutex/mutex.go b/third_party/github.com/coreos/go-etcd/examples/mutex/mutex.go index 6b9b24c3e..45131195c 100644 --- a/third_party/github.com/coreos/go-etcd/examples/mutex/mutex.go +++ b/third_party/github.com/coreos/go-etcd/examples/mutex/mutex.go @@ -17,7 +17,6 @@ func main() { c := etcd.NewClient() c.Set("lock", "unlock", 0) - for i := 0; i < 10; i++ { go t(i, ch, etcd.NewClient()) } diff --git a/third_party/github.com/coreos/go-etcd/examples/speed/speed.go b/third_party/github.com/coreos/go-etcd/examples/speed/speed.go index 97a6e9d02..e643e02b7 100644 --- a/third_party/github.com/coreos/go-etcd/examples/speed/speed.go +++ b/third_party/github.com/coreos/go-etcd/examples/speed/speed.go @@ -1,8 +1,8 @@ -package main +package main import ( - "github.com/coreos/go-etcd/etcd" "fmt" + "github.com/coreos/go-etcd/etcd" "time" ) @@ -11,21 +11,21 @@ var count = 0 func main() { ch := make(chan bool, 10) // set up a lock - for i:=0; i < 100; i++ { + for i := 0; i < 100; i++ { go t(i, ch, etcd.NewClient()) } start := time.Now() - for i:=0; i< 100; i++ { + for i := 0; i < 100; i++ { <-ch } - fmt.Println(time.Now().Sub(start), ": ", 100 * 50, "commands") + fmt.Println(time.Now().Sub(start), ": ", 100*50, "commands") } func t(num int, ch chan bool, c *etcd.Client) { c.SyncCluster() for i := 0; i < 50; i++ { - str := fmt.Sprintf("foo_%d",num * i) + str := fmt.Sprintf("foo_%d", num*i) c.Set(str, "10", 0) } - ch<-true + ch <- true } diff --git a/web/web.go b/web/web.go index 0cd2463c0..1ce9d3fe5 100644 --- a/web/web.go +++ b/web/web.go @@ -29,12 +29,12 @@ func Start(raftServer *raft.Server, webURL string) { webMux := http.NewServeMux() server := &http.Server{ - Handler: webMux, - Addr: u.Host, + Handler: webMux, + Addr: u.Host, } mainPage = &MainPage{ - Leader: raftServer.Leader(), + Leader: raftServer.Leader(), Address: u.Host, }